Skip to main content

gio/
output_stream.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{io, mem, pin::Pin, ptr};
4
5use glib::{Priority, prelude::*, translate::*};
6
7#[cfg(feature = "v2_60")]
8use crate::OutputVector;
9use crate::{Cancellable, OutputStream, Seekable, error::to_std_io_result, ffi, prelude::*};
10
11pub trait OutputStreamExtManual: IsA<OutputStream> + Sized {
12    /// Request an asynchronous write of @count bytes from @buffer into
13    /// the stream. When the operation is finished @callback will be called.
14    /// You can then call g_output_stream_write_finish() to get the result of the
15    /// operation.
16    ///
17    /// During an async request no other sync and async calls are allowed,
18    /// and will result in [`IOErrorEnum::Pending`][crate::IOErrorEnum::Pending] errors.
19    ///
20    /// A value of @count larger than `G_MAXSSIZE` will cause a
21    /// [`IOErrorEnum::InvalidArgument`][crate::IOErrorEnum::InvalidArgument] error.
22    ///
23    /// On success, the number of bytes written will be passed to the
24    /// @callback. It is not an error if this is not the same as the
25    /// requested size, as it can happen e.g. on a partial I/O error,
26    /// but generally we try to write as many bytes as requested.
27    ///
28    /// You are guaranteed that this method will never fail with
29    /// [`IOErrorEnum::WouldBlock`][crate::IOErrorEnum::WouldBlock] - if @self can't accept more data, the
30    /// method will just wait until this changes.
31    ///
32    /// Any outstanding I/O request with higher priority (lower numerical
33    /// value) will be executed before an outstanding request with lower
34    /// priority. Default priority is `G_PRIORITY_DEFAULT`.
35    ///
36    /// The asynchronous methods have a default fallback that uses threads
37    /// to implement asynchronicity, so they are optional for inheriting
38    /// classes. However, if you override one you must override all.
39    ///
40    /// For the synchronous, blocking version of this function, see
41    /// g_output_stream_write().
42    ///
43    /// Note that no copy of @buffer will be made, so it must stay valid
44    /// until @callback is called. See g_output_stream_write_bytes_async()
45    /// for a #GBytes version that will automatically hold a reference to
46    /// the contents (without copying) for the duration of the call.
47    /// ## `buffer`
48    /// the buffer containing the data to write.
49    /// ## `io_priority`
50    /// the io priority of the request.
51    /// ## `cancellable`
52    /// optional #GCancellable object, [`None`] to ignore.
53    /// ## `callback`
54    /// a #GAsyncReadyCallback
55    ///     to call when the request is satisfied
56    #[doc(alias = "g_output_stream_write_async")]
57    fn write_async<
58        B: AsRef<[u8]> + Send + 'static,
59        Q: FnOnce(Result<(B, usize), (B, glib::Error)>) + 'static,
60        C: IsA<Cancellable>,
61    >(
62        &self,
63        buffer: B,
64        io_priority: Priority,
65        cancellable: Option<&C>,
66        callback: Q,
67    ) {
68        let main_context = glib::MainContext::ref_thread_default();
69        let is_main_context_owner = main_context.is_owner();
70        let has_acquired_main_context = (!is_main_context_owner)
71            .then(|| main_context.acquire().ok())
72            .flatten();
73        assert!(
74            is_main_context_owner || has_acquired_main_context.is_some(),
75            "Async operations only allowed if the thread is owning the MainContext"
76        );
77
78        let cancellable = cancellable.map(|c| c.as_ref());
79        let gcancellable = cancellable.to_glib_none();
80        let user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
81            Box::new((glib::thread_guard::ThreadGuard::new(callback), buffer));
82        // Need to do this after boxing as the contents pointer might change by moving into the box
83        let (count, buffer_ptr) = {
84            let buffer = &user_data.1;
85            let slice = buffer.as_ref();
86            (slice.len(), slice.as_ptr())
87        };
88        unsafe extern "C" fn write_async_trampoline<
89            B: AsRef<[u8]> + Send + 'static,
90            Q: FnOnce(Result<(B, usize), (B, glib::Error)>) + 'static,
91        >(
92            _source_object: *mut glib::gobject_ffi::GObject,
93            res: *mut ffi::GAsyncResult,
94            user_data: glib::ffi::gpointer,
95        ) {
96            unsafe {
97                let user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
98                    Box::from_raw(user_data as *mut _);
99                let (callback, buffer) = *user_data;
100                let callback = callback.into_inner();
101
102                let mut error = ptr::null_mut();
103                let ret =
104                    ffi::g_output_stream_write_finish(_source_object as *mut _, res, &mut error);
105                let result = if error.is_null() {
106                    Ok((buffer, ret as usize))
107                } else {
108                    Err((buffer, from_glib_full(error)))
109                };
110                callback(result);
111            }
112        }
113        let callback = write_async_trampoline::<B, Q>;
114        unsafe {
115            ffi::g_output_stream_write_async(
116                self.as_ref().to_glib_none().0,
117                mut_override(buffer_ptr),
118                count,
119                io_priority.into_glib(),
120                gcancellable.0,
121                Some(callback),
122                Box::into_raw(user_data) as *mut _,
123            );
124        }
125    }
126
127    /// Tries to write @count bytes from @buffer into the stream. Will block
128    /// during the operation.
129    ///
130    /// This function is similar to g_output_stream_write(), except it tries to
131    /// write as many bytes as requested, only stopping on an error.
132    ///
133    /// On a successful write of @count bytes, [`true`] is returned, and @bytes_written
134    /// is set to @count.
135    ///
136    /// If there is an error during the operation [`false`] is returned and @error
137    /// is set to indicate the error status.
138    ///
139    /// As a special exception to the normal conventions for functions that
140    /// use #GError, if this function returns [`false`] (and sets @error) then
141    /// @bytes_written will be set to the number of bytes that were
142    /// successfully written before the error was encountered.  This
143    /// functionality is only available from C.  If you need it from another
144    /// language then you must write your own loop around
145    /// g_output_stream_write().
146    /// ## `buffer`
147    /// the buffer containing the data to write.
148    /// ## `cancellable`
149    /// optional #GCancellable object, [`None`] to ignore.
150    ///
151    /// # Returns
152    ///
153    /// [`true`] on success, [`false`] if there was an error
154    ///
155    /// ## `bytes_written`
156    /// location to store the number of bytes that was
157    ///     written to the stream
158    #[doc(alias = "g_output_stream_write_all")]
159    fn write_all<C: IsA<Cancellable>>(
160        &self,
161        buffer: &[u8],
162        cancellable: Option<&C>,
163    ) -> Result<(usize, Option<glib::Error>), glib::Error> {
164        let cancellable = cancellable.map(|c| c.as_ref());
165        let gcancellable = cancellable.to_glib_none();
166        let count = buffer.len();
167        unsafe {
168            let mut bytes_written = mem::MaybeUninit::uninit();
169            let mut error = ptr::null_mut();
170            let _ = ffi::g_output_stream_write_all(
171                self.as_ref().to_glib_none().0,
172                buffer.to_glib_none().0,
173                count,
174                bytes_written.as_mut_ptr(),
175                gcancellable.0,
176                &mut error,
177            );
178
179            let bytes_written = bytes_written.assume_init();
180            if error.is_null() {
181                Ok((bytes_written, None))
182            } else if bytes_written != 0 {
183                Ok((bytes_written, Some(from_glib_full(error))))
184            } else {
185                Err(from_glib_full(error))
186            }
187        }
188    }
189
190    /// Request an asynchronous write of @count bytes from @buffer into
191    /// the stream. When the operation is finished @callback will be called.
192    /// You can then call g_output_stream_write_all_finish() to get the result of the
193    /// operation.
194    ///
195    /// This is the asynchronous version of g_output_stream_write_all().
196    ///
197    /// Call g_output_stream_write_all_finish() to collect the result.
198    ///
199    /// Any outstanding I/O request with higher priority (lower numerical
200    /// value) will be executed before an outstanding request with lower
201    /// priority. Default priority is `G_PRIORITY_DEFAULT`.
202    ///
203    /// Note that no copy of @buffer will be made, so it must stay valid
204    /// until @callback is called.
205    /// ## `buffer`
206    /// the buffer containing the data to write
207    /// ## `io_priority`
208    /// the io priority of the request
209    /// ## `cancellable`
210    /// optional #GCancellable object, [`None`] to ignore
211    /// ## `callback`
212    /// a #GAsyncReadyCallback
213    ///     to call when the request is satisfied
214    #[doc(alias = "g_output_stream_write_all_async")]
215    fn write_all_async<
216        B: AsRef<[u8]> + Send + 'static,
217        Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + 'static,
218        C: IsA<Cancellable>,
219    >(
220        &self,
221        buffer: B,
222        io_priority: Priority,
223        cancellable: Option<&C>,
224        callback: Q,
225    ) {
226        let main_context = glib::MainContext::ref_thread_default();
227        let is_main_context_owner = main_context.is_owner();
228        let has_acquired_main_context = (!is_main_context_owner)
229            .then(|| main_context.acquire().ok())
230            .flatten();
231        assert!(
232            is_main_context_owner || has_acquired_main_context.is_some(),
233            "Async operations only allowed if the thread is owning the MainContext"
234        );
235
236        let cancellable = cancellable.map(|c| c.as_ref());
237        let gcancellable = cancellable.to_glib_none();
238        let user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
239            Box::new((glib::thread_guard::ThreadGuard::new(callback), buffer));
240        // Need to do this after boxing as the contents pointer might change by moving into the box
241        let (count, buffer_ptr) = {
242            let buffer = &user_data.1;
243            let slice = buffer.as_ref();
244            (slice.len(), slice.as_ptr())
245        };
246        unsafe extern "C" fn write_all_async_trampoline<
247            B: AsRef<[u8]> + Send + 'static,
248            Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + 'static,
249        >(
250            _source_object: *mut glib::gobject_ffi::GObject,
251            res: *mut ffi::GAsyncResult,
252            user_data: glib::ffi::gpointer,
253        ) {
254            unsafe {
255                let user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
256                    Box::from_raw(user_data as *mut _);
257                let (callback, buffer) = *user_data;
258                let callback = callback.into_inner();
259
260                let mut error = ptr::null_mut();
261                let mut bytes_written = mem::MaybeUninit::uninit();
262                let _ = ffi::g_output_stream_write_all_finish(
263                    _source_object as *mut _,
264                    res,
265                    bytes_written.as_mut_ptr(),
266                    &mut error,
267                );
268                let bytes_written = bytes_written.assume_init();
269                let result = if error.is_null() {
270                    Ok((buffer, bytes_written, None))
271                } else if bytes_written != 0 {
272                    Ok((buffer, bytes_written, from_glib_full(error)))
273                } else {
274                    Err((buffer, from_glib_full(error)))
275                };
276                callback(result);
277            }
278        }
279        let callback = write_all_async_trampoline::<B, Q>;
280        unsafe {
281            ffi::g_output_stream_write_all_async(
282                self.as_ref().to_glib_none().0,
283                mut_override(buffer_ptr),
284                count,
285                io_priority.into_glib(),
286                gcancellable.0,
287                Some(callback),
288                Box::into_raw(user_data) as *mut _,
289            );
290        }
291    }
292
293    fn write_future<B: AsRef<[u8]> + Send + 'static>(
294        &self,
295        buffer: B,
296        io_priority: Priority,
297    ) -> Pin<Box<dyn std::future::Future<Output = Result<(B, usize), (B, glib::Error)>> + 'static>>
298    {
299        Box::pin(crate::GioFuture::new(
300            self,
301            move |obj, cancellable, send| {
302                obj.write_async(buffer, io_priority, Some(cancellable), move |res| {
303                    send.resolve(res);
304                });
305            },
306        ))
307    }
308
309    fn write_all_future<B: AsRef<[u8]> + Send + 'static>(
310        &self,
311        buffer: B,
312        io_priority: Priority,
313    ) -> Pin<
314        Box<
315            dyn std::future::Future<
316                    Output = Result<(B, usize, Option<glib::Error>), (B, glib::Error)>,
317                > + 'static,
318        >,
319    > {
320        Box::pin(crate::GioFuture::new(
321            self,
322            move |obj, cancellable, send| {
323                obj.write_all_async(buffer, io_priority, Some(cancellable), move |res| {
324                    send.resolve(res);
325                });
326            },
327        ))
328    }
329
330    /// Tries to write the bytes contained in the @n_vectors @vectors into the
331    /// stream. Will block during the operation.
332    ///
333    /// If @n_vectors is 0 or the sum of all bytes in @vectors is 0, returns 0 and
334    /// does nothing.
335    ///
336    /// On success, the number of bytes written to the stream is returned.
337    /// It is not an error if this is not the same as the requested size, as it
338    /// can happen e.g. on a partial I/O error, or if there is not enough
339    /// storage in the stream. All writes block until at least one byte
340    /// is written or an error occurs; 0 is never returned (unless
341    /// @n_vectors is 0 or the sum of all bytes in @vectors is 0).
342    ///
343    /// If @cancellable is not [`None`], then the operation can be cancelled by
344    /// triggering the cancellable object from another thread. If the operation
345    /// was cancelled, the error [`IOErrorEnum::Cancelled`][crate::IOErrorEnum::Cancelled] will be returned. If an
346    /// operation was partially finished when the operation was cancelled the
347    /// partial result will be returned, without an error.
348    ///
349    /// Some implementations of g_output_stream_writev() may have limitations on the
350    /// aggregate buffer size, and will return [`IOErrorEnum::InvalidArgument`][crate::IOErrorEnum::InvalidArgument] if these
351    /// are exceeded. For example, when writing to a local file on UNIX platforms,
352    /// the aggregate buffer size must not exceed `G_MAXSSIZE` bytes.
353    /// ## `vectors`
354    /// the buffer containing the #GOutputVectors to write.
355    /// ## `cancellable`
356    /// optional cancellable object
357    ///
358    /// # Returns
359    ///
360    /// [`true`] on success, [`false`] if there was an error
361    ///
362    /// ## `bytes_written`
363    /// location to store the number of bytes that were
364    ///     written to the stream
365    #[cfg(feature = "v2_60")]
366    #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
367    #[doc(alias = "g_output_stream_writev")]
368    fn writev(
369        &self,
370        vectors: &[OutputVector],
371        cancellable: Option<&impl IsA<Cancellable>>,
372    ) -> Result<usize, glib::Error> {
373        unsafe {
374            let mut error = ptr::null_mut();
375            let mut bytes_written = mem::MaybeUninit::uninit();
376
377            ffi::g_output_stream_writev(
378                self.as_ref().to_glib_none().0,
379                vectors.as_ptr() as *const _,
380                vectors.len(),
381                bytes_written.as_mut_ptr(),
382                cancellable.map(|p| p.as_ref()).to_glib_none().0,
383                &mut error,
384            );
385            if error.is_null() {
386                Ok(bytes_written.assume_init())
387            } else {
388                Err(from_glib_full(error))
389            }
390        }
391    }
392
393    ///  if @self can't accept more data, the
394    /// method will just wait until this changes.
395    ///
396    /// Any outstanding I/O request with higher priority (lower numerical
397    /// value) will be executed before an outstanding request with lower
398    /// priority. Default priority is `G_PRIORITY_DEFAULT`.
399    ///
400    /// The asynchronous methods have a default fallback that uses threads
401    /// to implement asynchronicity, so they are optional for inheriting
402    /// classes. However, if you override one you must override all.
403    ///
404    /// For the synchronous, blocking version of this function, see
405    /// g_output_stream_writev().
406    ///
407    /// Note that no copy of @vectors will be made, so it must stay valid
408    /// until @callback is called.
409    /// ## `vectors`
410    /// the buffer containing the #GOutputVectors to write.
411    /// ## `io_priority`
412    /// the I/O priority of the request.
413    /// ## `cancellable`
414    /// optional #GCancellable object, [`None`] to ignore.
415    /// ## `callback`
416    /// a #GAsyncReadyCallback
417    ///     to call when the request is satisfied
418    #[cfg(feature = "v2_60")]
419    #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
420    #[doc(alias = "g_output_stream_writev_async")]
421    fn writev_async<
422        B: AsRef<[u8]> + Send + 'static,
423        P: FnOnce(Result<(Vec<B>, usize), (Vec<B>, glib::Error)>) + 'static,
424    >(
425        &self,
426        vectors: impl IntoIterator<Item = B> + 'static,
427        io_priority: glib::Priority,
428        cancellable: Option<&impl IsA<Cancellable>>,
429        callback: P,
430    ) {
431        let main_context = glib::MainContext::ref_thread_default();
432        let is_main_context_owner = main_context.is_owner();
433        let has_acquired_main_context = (!is_main_context_owner)
434            .then(|| main_context.acquire().ok())
435            .flatten();
436        assert!(
437            is_main_context_owner || has_acquired_main_context.is_some(),
438            "Async operations only allowed if the thread is owning the MainContext"
439        );
440
441        let cancellable = cancellable.map(|c| c.as_ref());
442        let gcancellable = cancellable.to_glib_none();
443        let buffers = vectors.into_iter().collect::<Vec<_>>();
444        let vectors = buffers
445            .iter()
446            .map(|v| ffi::GOutputVector {
447                buffer: v.as_ref().as_ptr() as *const _,
448                size: v.as_ref().len(),
449            })
450            .collect::<Vec<_>>();
451        let vectors_ptr = vectors.as_ptr();
452        let num_vectors = vectors.len();
453        let user_data: Box<(
454            glib::thread_guard::ThreadGuard<P>,
455            Vec<B>,
456            Vec<ffi::GOutputVector>,
457        )> = Box::new((
458            glib::thread_guard::ThreadGuard::new(callback),
459            buffers,
460            vectors,
461        ));
462
463        unsafe extern "C" fn writev_async_trampoline<
464            B: AsRef<[u8]> + Send + 'static,
465            P: FnOnce(Result<(Vec<B>, usize), (Vec<B>, glib::Error)>) + 'static,
466        >(
467            _source_object: *mut glib::gobject_ffi::GObject,
468            res: *mut ffi::GAsyncResult,
469            user_data: glib::ffi::gpointer,
470        ) {
471            unsafe {
472                let user_data: Box<(
473                    glib::thread_guard::ThreadGuard<P>,
474                    Vec<B>,
475                    Vec<ffi::GOutputVector>,
476                )> = Box::from_raw(user_data as *mut _);
477                let (callback, buffers, _) = *user_data;
478                let callback = callback.into_inner();
479
480                let mut error = ptr::null_mut();
481                let mut bytes_written = mem::MaybeUninit::uninit();
482                ffi::g_output_stream_writev_finish(
483                    _source_object as *mut _,
484                    res,
485                    bytes_written.as_mut_ptr(),
486                    &mut error,
487                );
488                let bytes_written = bytes_written.assume_init();
489                let result = if error.is_null() {
490                    Ok((buffers, bytes_written))
491                } else {
492                    Err((buffers, from_glib_full(error)))
493                };
494                callback(result);
495            }
496        }
497        let callback = writev_async_trampoline::<B, P>;
498        unsafe {
499            ffi::g_output_stream_writev_async(
500                self.as_ref().to_glib_none().0,
501                vectors_ptr,
502                num_vectors,
503                io_priority.into_glib(),
504                gcancellable.0,
505                Some(callback),
506                Box::into_raw(user_data) as *mut _,
507            );
508        }
509    }
510
511    #[cfg(feature = "v2_60")]
512    #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
513    fn writev_future<B: AsRef<[u8]> + Send + 'static>(
514        &self,
515        vectors: impl IntoIterator<Item = B> + 'static,
516        io_priority: glib::Priority,
517    ) -> Pin<
518        Box<
519            dyn std::future::Future<Output = Result<(Vec<B>, usize), (Vec<B>, glib::Error)>>
520                + 'static,
521        >,
522    > {
523        Box::pin(crate::GioFuture::new(
524            self,
525            move |obj, cancellable, send| {
526                obj.writev_async(vectors, io_priority, Some(cancellable), move |res| {
527                    send.resolve(res);
528                });
529            },
530        ))
531    }
532
533    /// Tries to write the bytes contained in the @n_vectors @vectors into the
534    /// stream. Will block during the operation.
535    ///
536    /// This function is similar to g_output_stream_writev(), except it tries to
537    /// write as many bytes as requested, only stopping on an error.
538    ///
539    /// On a successful write of all @n_vectors vectors, [`true`] is returned, and
540    /// @bytes_written is set to the sum of all the sizes of @vectors.
541    ///
542    /// If there is an error during the operation [`false`] is returned and @error
543    /// is set to indicate the error status.
544    ///
545    /// As a special exception to the normal conventions for functions that
546    /// use #GError, if this function returns [`false`] (and sets @error) then
547    /// @bytes_written will be set to the number of bytes that were
548    /// successfully written before the error was encountered.  This
549    /// functionality is only available from C. If you need it from another
550    /// language then you must write your own loop around
551    /// g_output_stream_write().
552    ///
553    /// The content of the individual elements of @vectors might be changed by this
554    /// function.
555    /// ## `vectors`
556    /// the buffer containing the #GOutputVectors to write.
557    /// ## `cancellable`
558    /// optional #GCancellable object, [`None`] to ignore.
559    ///
560    /// # Returns
561    ///
562    /// [`true`] on success, [`false`] if there was an error
563    ///
564    /// ## `bytes_written`
565    /// location to store the number of bytes that were
566    ///     written to the stream
567    #[cfg(feature = "v2_60")]
568    #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
569    #[doc(alias = "g_output_stream_writev_all")]
570    fn writev_all(
571        &self,
572        vectors: &[OutputVector],
573        cancellable: Option<&impl IsA<Cancellable>>,
574    ) -> Result<(usize, Option<glib::Error>), glib::Error> {
575        unsafe {
576            let mut error = ptr::null_mut();
577            let mut bytes_written = mem::MaybeUninit::uninit();
578
579            ffi::g_output_stream_writev_all(
580                self.as_ref().to_glib_none().0,
581                mut_override(vectors.as_ptr() as *const _),
582                vectors.len(),
583                bytes_written.as_mut_ptr(),
584                cancellable.map(|p| p.as_ref()).to_glib_none().0,
585                &mut error,
586            );
587            let bytes_written = bytes_written.assume_init();
588            if error.is_null() {
589                Ok((bytes_written, None))
590            } else if bytes_written != 0 {
591                Ok((bytes_written, Some(from_glib_full(error))))
592            } else {
593                Err(from_glib_full(error))
594            }
595        }
596    }
597
598    /// Request an asynchronous write of the bytes contained in the @n_vectors @vectors into
599    /// the stream. When the operation is finished @callback will be called.
600    /// You can then call g_output_stream_writev_all_finish() to get the result of the
601    /// operation.
602    ///
603    /// This is the asynchronous version of g_output_stream_writev_all().
604    ///
605    /// Call g_output_stream_writev_all_finish() to collect the result.
606    ///
607    /// Any outstanding I/O request with higher priority (lower numerical
608    /// value) will be executed before an outstanding request with lower
609    /// priority. Default priority is `G_PRIORITY_DEFAULT`.
610    ///
611    /// Note that no copy of @vectors will be made, so it must stay valid
612    /// until @callback is called. The content of the individual elements
613    /// of @vectors might be changed by this function.
614    /// ## `vectors`
615    /// the buffer containing the #GOutputVectors to write.
616    /// ## `io_priority`
617    /// the I/O priority of the request
618    /// ## `cancellable`
619    /// optional #GCancellable object, [`None`] to ignore
620    /// ## `callback`
621    /// a #GAsyncReadyCallback
622    ///     to call when the request is satisfied
623    #[cfg(feature = "v2_60")]
624    #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
625    #[doc(alias = "g_output_stream_writev_all_async")]
626    fn writev_all_async<
627        B: AsRef<[u8]> + Send + 'static,
628        P: FnOnce(Result<(Vec<B>, usize, Option<glib::Error>), (Vec<B>, glib::Error)>) + 'static,
629    >(
630        &self,
631        vectors: impl IntoIterator<Item = B> + 'static,
632        io_priority: glib::Priority,
633        cancellable: Option<&impl IsA<Cancellable>>,
634        callback: P,
635    ) {
636        let main_context = glib::MainContext::ref_thread_default();
637        let is_main_context_owner = main_context.is_owner();
638        let has_acquired_main_context = (!is_main_context_owner)
639            .then(|| main_context.acquire().ok())
640            .flatten();
641        assert!(
642            is_main_context_owner || has_acquired_main_context.is_some(),
643            "Async operations only allowed if the thread is owning the MainContext"
644        );
645
646        let cancellable = cancellable.map(|c| c.as_ref());
647        let gcancellable = cancellable.to_glib_none();
648        let buffers = vectors.into_iter().collect::<Vec<_>>();
649        let vectors = buffers
650            .iter()
651            .map(|v| ffi::GOutputVector {
652                buffer: v.as_ref().as_ptr() as *const _,
653                size: v.as_ref().len(),
654            })
655            .collect::<Vec<_>>();
656        let vectors_ptr = vectors.as_ptr();
657        let num_vectors = vectors.len();
658        let user_data: Box<(
659            glib::thread_guard::ThreadGuard<P>,
660            Vec<B>,
661            Vec<ffi::GOutputVector>,
662        )> = Box::new((
663            glib::thread_guard::ThreadGuard::new(callback),
664            buffers,
665            vectors,
666        ));
667
668        unsafe extern "C" fn writev_all_async_trampoline<
669            B: AsRef<[u8]> + Send + 'static,
670            P: FnOnce(Result<(Vec<B>, usize, Option<glib::Error>), (Vec<B>, glib::Error)>) + 'static,
671        >(
672            _source_object: *mut glib::gobject_ffi::GObject,
673            res: *mut ffi::GAsyncResult,
674            user_data: glib::ffi::gpointer,
675        ) {
676            unsafe {
677                let user_data: Box<(
678                    glib::thread_guard::ThreadGuard<P>,
679                    Vec<B>,
680                    Vec<ffi::GOutputVector>,
681                )> = Box::from_raw(user_data as *mut _);
682                let (callback, buffers, _) = *user_data;
683                let callback = callback.into_inner();
684
685                let mut error = ptr::null_mut();
686                let mut bytes_written = mem::MaybeUninit::uninit();
687                ffi::g_output_stream_writev_all_finish(
688                    _source_object as *mut _,
689                    res,
690                    bytes_written.as_mut_ptr(),
691                    &mut error,
692                );
693                let bytes_written = bytes_written.assume_init();
694                let result = if error.is_null() {
695                    Ok((buffers, bytes_written, None))
696                } else if bytes_written != 0 {
697                    Ok((buffers, bytes_written, from_glib_full(error)))
698                } else {
699                    Err((buffers, from_glib_full(error)))
700                };
701                callback(result);
702            }
703        }
704        let callback = writev_all_async_trampoline::<B, P>;
705        unsafe {
706            ffi::g_output_stream_writev_all_async(
707                self.as_ref().to_glib_none().0,
708                mut_override(vectors_ptr),
709                num_vectors,
710                io_priority.into_glib(),
711                gcancellable.0,
712                Some(callback),
713                Box::into_raw(user_data) as *mut _,
714            );
715        }
716    }
717
718    #[cfg(feature = "v2_60")]
719    #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
720    fn writev_all_future<B: AsRef<[u8]> + Send + 'static>(
721        &self,
722        vectors: impl IntoIterator<Item = B> + 'static,
723        io_priority: glib::Priority,
724    ) -> Pin<
725        Box<
726            dyn std::future::Future<
727                    Output = Result<(Vec<B>, usize, Option<glib::Error>), (Vec<B>, glib::Error)>,
728                > + 'static,
729        >,
730    > {
731        Box::pin(crate::GioFuture::new(
732            self,
733            move |obj, cancellable, send| {
734                obj.writev_all_async(vectors, io_priority, Some(cancellable), move |res| {
735                    send.resolve(res);
736                });
737            },
738        ))
739    }
740
741    fn into_write(self) -> OutputStreamWrite<Self>
742    where
743        Self: IsA<OutputStream>,
744    {
745        OutputStreamWrite(self)
746    }
747}
748
749impl<O: IsA<OutputStream>> OutputStreamExtManual for O {}
750
751#[derive(Debug)]
752pub struct OutputStreamWrite<T: IsA<OutputStream>>(T);
753
754impl<T: IsA<OutputStream>> OutputStreamWrite<T> {
755    pub fn into_output_stream(self) -> T {
756        self.0
757    }
758
759    pub fn output_stream(&self) -> &T {
760        &self.0
761    }
762}
763
764impl<T: IsA<OutputStream>> io::Write for OutputStreamWrite<T> {
765    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
766        let result = self
767            .0
768            .as_ref()
769            .write(buf, crate::Cancellable::NONE)
770            .map(|size| size as usize);
771        to_std_io_result(result)
772    }
773
774    fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
775        let result = self
776            .0
777            .as_ref()
778            .write_all(buf, crate::Cancellable::NONE)
779            .and_then(|(_, e)| e.map(Err).unwrap_or(Ok(())));
780        to_std_io_result(result)
781    }
782
783    #[cfg(feature = "v2_60")]
784    #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
785    fn write_vectored(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
786        let vectors = bufs
787            .iter()
788            .map(|v| OutputVector::new(v))
789            .collect::<smallvec::SmallVec<[_; 2]>>();
790        let result = self.0.as_ref().writev(&vectors, crate::Cancellable::NONE);
791        to_std_io_result(result)
792    }
793
794    fn flush(&mut self) -> io::Result<()> {
795        let gio_result = self.0.as_ref().flush(crate::Cancellable::NONE);
796        to_std_io_result(gio_result)
797    }
798}
799
800impl<T: IsA<OutputStream> + IsA<Seekable>> io::Seek for OutputStreamWrite<T> {
801    fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
802        let (pos, type_) = match pos {
803            io::SeekFrom::Start(pos) => (pos as i64, glib::SeekType::Set),
804            io::SeekFrom::End(pos) => (pos, glib::SeekType::End),
805            io::SeekFrom::Current(pos) => (pos, glib::SeekType::Cur),
806        };
807        let seekable: &Seekable = self.0.as_ref();
808        let gio_result = seekable
809            .seek(pos, type_, crate::Cancellable::NONE)
810            .map(|_| seekable.tell() as u64);
811        to_std_io_result(gio_result)
812    }
813}
814
815#[cfg(test)]
816mod tests {
817    use std::io::Write;
818
819    use glib::Bytes;
820
821    #[cfg(feature = "v2_60")]
822    use crate::OutputVector;
823    use crate::{MemoryInputStream, MemoryOutputStream, prelude::*, test_util::run_async};
824
825    #[test]
826    fn splice_async() {
827        let ret = run_async(|tx, l| {
828            let input = MemoryInputStream::new();
829            let b = Bytes::from_owned(vec![1, 2, 3]);
830            input.add_bytes(&b);
831
832            let strm = MemoryOutputStream::new_resizable();
833            strm.splice_async(
834                &input,
835                crate::OutputStreamSpliceFlags::CLOSE_SOURCE,
836                glib::Priority::DEFAULT_IDLE,
837                crate::Cancellable::NONE,
838                move |ret| {
839                    tx.send(ret).unwrap();
840                    l.quit();
841                },
842            );
843        });
844
845        assert_eq!(ret.unwrap(), 3);
846    }
847
848    #[test]
849    fn write_async() {
850        let ret = run_async(|tx, l| {
851            let strm = MemoryOutputStream::new_resizable();
852
853            let buf = vec![1, 2, 3];
854            strm.write_async(
855                buf,
856                glib::Priority::DEFAULT_IDLE,
857                crate::Cancellable::NONE,
858                move |ret| {
859                    tx.send(ret).unwrap();
860                    l.quit();
861                },
862            );
863        });
864
865        let (buf, size) = ret.unwrap();
866        assert_eq!(buf, vec![1, 2, 3]);
867        assert_eq!(size, 3);
868    }
869
870    #[test]
871    fn write_all_async() {
872        let ret = run_async(|tx, l| {
873            let strm = MemoryOutputStream::new_resizable();
874
875            let buf = vec![1, 2, 3];
876            strm.write_all_async(
877                buf,
878                glib::Priority::DEFAULT_IDLE,
879                crate::Cancellable::NONE,
880                move |ret| {
881                    tx.send(ret).unwrap();
882                    l.quit();
883                },
884            );
885        });
886
887        let (buf, size, err) = ret.unwrap();
888        assert_eq!(buf, vec![1, 2, 3]);
889        assert_eq!(size, 3);
890        assert!(err.is_none());
891    }
892
893    #[test]
894    fn write_bytes_async() {
895        let ret = run_async(|tx, l| {
896            let strm = MemoryOutputStream::new_resizable();
897
898            let b = Bytes::from_owned(vec![1, 2, 3]);
899            strm.write_bytes_async(
900                &b,
901                glib::Priority::DEFAULT_IDLE,
902                crate::Cancellable::NONE,
903                move |ret| {
904                    tx.send(ret).unwrap();
905                    l.quit();
906                },
907            );
908        });
909
910        assert_eq!(ret.unwrap(), 3);
911    }
912
913    #[test]
914    fn std_io_write() {
915        let b = Bytes::from_owned(vec![1, 2, 3]);
916        let mut write = MemoryOutputStream::new_resizable().into_write();
917
918        let ret = write.write(&b);
919
920        let stream = write.into_output_stream();
921        stream.close(crate::Cancellable::NONE).unwrap();
922        assert_eq!(ret.unwrap(), 3);
923        assert_eq!(stream.steal_as_bytes(), [1, 2, 3].as_ref());
924    }
925
926    #[test]
927    fn into_output_stream() {
928        let stream = MemoryOutputStream::new_resizable();
929        let stream_clone = stream.clone();
930        let stream = stream.into_write().into_output_stream();
931
932        assert_eq!(stream, stream_clone);
933    }
934
935    #[test]
936    #[cfg(feature = "v2_60")]
937    fn writev() {
938        let stream = MemoryOutputStream::new_resizable();
939
940        let ret = stream.writev(
941            &[OutputVector::new(&[1, 2, 3]), OutputVector::new(&[4, 5, 6])],
942            crate::Cancellable::NONE,
943        );
944        assert_eq!(ret.unwrap(), 6);
945        stream.close(crate::Cancellable::NONE).unwrap();
946        assert_eq!(stream.steal_as_bytes(), [1, 2, 3, 4, 5, 6].as_ref());
947    }
948
949    #[test]
950    #[cfg(feature = "v2_60")]
951    fn writev_async() {
952        let ret = run_async(|tx, l| {
953            let strm = MemoryOutputStream::new_resizable();
954
955            let strm_clone = strm.clone();
956            strm.writev_async(
957                [vec![1, 2, 3], vec![4, 5, 6]],
958                glib::Priority::DEFAULT_IDLE,
959                crate::Cancellable::NONE,
960                move |ret| {
961                    tx.send(ret).unwrap();
962                    strm_clone.close(crate::Cancellable::NONE).unwrap();
963                    assert_eq!(strm_clone.steal_as_bytes(), [1, 2, 3, 4, 5, 6].as_ref());
964                    l.quit();
965                },
966            );
967        });
968
969        let (buf, size) = ret.unwrap();
970        assert_eq!(buf, [[1, 2, 3], [4, 5, 6]]);
971        assert_eq!(size, 6);
972    }
973
974    #[test]
975    #[cfg(feature = "v2_60")]
976    fn writev_all_async() {
977        let ret = run_async(|tx, l| {
978            let strm = MemoryOutputStream::new_resizable();
979
980            let strm_clone = strm.clone();
981            strm.writev_all_async(
982                [vec![1, 2, 3], vec![4, 5, 6]],
983                glib::Priority::DEFAULT_IDLE,
984                crate::Cancellable::NONE,
985                move |ret| {
986                    tx.send(ret).unwrap();
987                    strm_clone.close(crate::Cancellable::NONE).unwrap();
988                    assert_eq!(strm_clone.steal_as_bytes(), [1, 2, 3, 4, 5, 6].as_ref());
989                    l.quit();
990                },
991            );
992        });
993
994        let (buf, size, err) = ret.unwrap();
995        assert_eq!(buf, [[1, 2, 3], [4, 5, 6]]);
996        assert_eq!(size, 6);
997        assert!(err.is_none());
998    }
999}