gio/
output_stream.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{io, mem, pin::Pin, ptr};
4
5use glib::{prelude::*, translate::*, Priority};
6
7#[cfg(feature = "v2_60")]
8use crate::OutputVector;
9use crate::{error::to_std_io_result, ffi, prelude::*, Cancellable, OutputStream, Seekable};
10
11pub trait OutputStreamExtManual: IsA<OutputStream> + Sized {
12    /// Request an asynchronous write of @count bytes from @buffer into
13    /// the stream. When the operation is finished @callback will be called.
14    /// You can then call g_output_stream_write_finish() to get the result of the
15    /// operation.
16    ///
17    /// During an async request no other sync and async calls are allowed,
18    /// and will result in [`IOErrorEnum::Pending`][crate::IOErrorEnum::Pending] errors.
19    ///
20    /// A value of @count larger than `G_MAXSSIZE` will cause a
21    /// [`IOErrorEnum::InvalidArgument`][crate::IOErrorEnum::InvalidArgument] error.
22    ///
23    /// On success, the number of bytes written will be passed to the
24    /// @callback. It is not an error if this is not the same as the
25    /// requested size, as it can happen e.g. on a partial I/O error,
26    /// but generally we try to write as many bytes as requested.
27    ///
28    /// You are guaranteed that this method will never fail with
29    /// [`IOErrorEnum::WouldBlock`][crate::IOErrorEnum::WouldBlock] - if @self can't accept more data, the
30    /// method will just wait until this changes.
31    ///
32    /// Any outstanding I/O request with higher priority (lower numerical
33    /// value) will be executed before an outstanding request with lower
34    /// priority. Default priority is `G_PRIORITY_DEFAULT`.
35    ///
36    /// The asynchronous methods have a default fallback that uses threads
37    /// to implement asynchronicity, so they are optional for inheriting
38    /// classes. However, if you override one you must override all.
39    ///
40    /// For the synchronous, blocking version of this function, see
41    /// g_output_stream_write().
42    ///
43    /// Note that no copy of @buffer will be made, so it must stay valid
44    /// until @callback is called. See g_output_stream_write_bytes_async()
45    /// for a #GBytes version that will automatically hold a reference to
46    /// the contents (without copying) for the duration of the call.
47    /// ## `buffer`
48    /// the buffer containing the data to write.
49    /// ## `io_priority`
50    /// the io priority of the request.
51    /// ## `cancellable`
52    /// optional #GCancellable object, [`None`] to ignore.
53    /// ## `callback`
54    /// a #GAsyncReadyCallback
55    ///     to call when the request is satisfied
56    #[doc(alias = "g_output_stream_write_async")]
57    fn write_async<
58        B: AsRef<[u8]> + Send + 'static,
59        Q: FnOnce(Result<(B, usize), (B, glib::Error)>) + 'static,
60        C: IsA<Cancellable>,
61    >(
62        &self,
63        buffer: B,
64        io_priority: Priority,
65        cancellable: Option<&C>,
66        callback: Q,
67    ) {
68        let main_context = glib::MainContext::ref_thread_default();
69        let is_main_context_owner = main_context.is_owner();
70        let has_acquired_main_context = (!is_main_context_owner)
71            .then(|| main_context.acquire().ok())
72            .flatten();
73        assert!(
74            is_main_context_owner || has_acquired_main_context.is_some(),
75            "Async operations only allowed if the thread is owning the MainContext"
76        );
77
78        let cancellable = cancellable.map(|c| c.as_ref());
79        let gcancellable = cancellable.to_glib_none();
80        let user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
81            Box::new((glib::thread_guard::ThreadGuard::new(callback), buffer));
82        // Need to do this after boxing as the contents pointer might change by moving into the box
83        let (count, buffer_ptr) = {
84            let buffer = &user_data.1;
85            let slice = buffer.as_ref();
86            (slice.len(), slice.as_ptr())
87        };
88        unsafe extern "C" fn write_async_trampoline<
89            B: AsRef<[u8]> + Send + 'static,
90            Q: FnOnce(Result<(B, usize), (B, glib::Error)>) + 'static,
91        >(
92            _source_object: *mut glib::gobject_ffi::GObject,
93            res: *mut ffi::GAsyncResult,
94            user_data: glib::ffi::gpointer,
95        ) {
96            let user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
97                Box::from_raw(user_data as *mut _);
98            let (callback, buffer) = *user_data;
99            let callback = callback.into_inner();
100
101            let mut error = ptr::null_mut();
102            let ret = ffi::g_output_stream_write_finish(_source_object as *mut _, res, &mut error);
103            let result = if error.is_null() {
104                Ok((buffer, ret as usize))
105            } else {
106                Err((buffer, from_glib_full(error)))
107            };
108            callback(result);
109        }
110        let callback = write_async_trampoline::<B, Q>;
111        unsafe {
112            ffi::g_output_stream_write_async(
113                self.as_ref().to_glib_none().0,
114                mut_override(buffer_ptr),
115                count,
116                io_priority.into_glib(),
117                gcancellable.0,
118                Some(callback),
119                Box::into_raw(user_data) as *mut _,
120            );
121        }
122    }
123
124    /// Tries to write @count bytes from @buffer into the stream. Will block
125    /// during the operation.
126    ///
127    /// This function is similar to g_output_stream_write(), except it tries to
128    /// write as many bytes as requested, only stopping on an error.
129    ///
130    /// On a successful write of @count bytes, [`true`] is returned, and @bytes_written
131    /// is set to @count.
132    ///
133    /// If there is an error during the operation [`false`] is returned and @error
134    /// is set to indicate the error status.
135    ///
136    /// As a special exception to the normal conventions for functions that
137    /// use #GError, if this function returns [`false`] (and sets @error) then
138    /// @bytes_written will be set to the number of bytes that were
139    /// successfully written before the error was encountered.  This
140    /// functionality is only available from C.  If you need it from another
141    /// language then you must write your own loop around
142    /// g_output_stream_write().
143    /// ## `buffer`
144    /// the buffer containing the data to write.
145    /// ## `cancellable`
146    /// optional #GCancellable object, [`None`] to ignore.
147    ///
148    /// # Returns
149    ///
150    /// [`true`] on success, [`false`] if there was an error
151    ///
152    /// ## `bytes_written`
153    /// location to store the number of bytes that was
154    ///     written to the stream
155    #[doc(alias = "g_output_stream_write_all")]
156    fn write_all<C: IsA<Cancellable>>(
157        &self,
158        buffer: &[u8],
159        cancellable: Option<&C>,
160    ) -> Result<(usize, Option<glib::Error>), glib::Error> {
161        let cancellable = cancellable.map(|c| c.as_ref());
162        let gcancellable = cancellable.to_glib_none();
163        let count = buffer.len();
164        unsafe {
165            let mut bytes_written = mem::MaybeUninit::uninit();
166            let mut error = ptr::null_mut();
167            let _ = ffi::g_output_stream_write_all(
168                self.as_ref().to_glib_none().0,
169                buffer.to_glib_none().0,
170                count,
171                bytes_written.as_mut_ptr(),
172                gcancellable.0,
173                &mut error,
174            );
175
176            let bytes_written = bytes_written.assume_init();
177            if error.is_null() {
178                Ok((bytes_written, None))
179            } else if bytes_written != 0 {
180                Ok((bytes_written, Some(from_glib_full(error))))
181            } else {
182                Err(from_glib_full(error))
183            }
184        }
185    }
186
187    /// Request an asynchronous write of @count bytes from @buffer into
188    /// the stream. When the operation is finished @callback will be called.
189    /// You can then call g_output_stream_write_all_finish() to get the result of the
190    /// operation.
191    ///
192    /// This is the asynchronous version of g_output_stream_write_all().
193    ///
194    /// Call g_output_stream_write_all_finish() to collect the result.
195    ///
196    /// Any outstanding I/O request with higher priority (lower numerical
197    /// value) will be executed before an outstanding request with lower
198    /// priority. Default priority is `G_PRIORITY_DEFAULT`.
199    ///
200    /// Note that no copy of @buffer will be made, so it must stay valid
201    /// until @callback is called.
202    /// ## `buffer`
203    /// the buffer containing the data to write
204    /// ## `io_priority`
205    /// the io priority of the request
206    /// ## `cancellable`
207    /// optional #GCancellable object, [`None`] to ignore
208    /// ## `callback`
209    /// a #GAsyncReadyCallback
210    ///     to call when the request is satisfied
211    #[doc(alias = "g_output_stream_write_all_async")]
212    fn write_all_async<
213        B: AsRef<[u8]> + Send + 'static,
214        Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + 'static,
215        C: IsA<Cancellable>,
216    >(
217        &self,
218        buffer: B,
219        io_priority: Priority,
220        cancellable: Option<&C>,
221        callback: Q,
222    ) {
223        let main_context = glib::MainContext::ref_thread_default();
224        let is_main_context_owner = main_context.is_owner();
225        let has_acquired_main_context = (!is_main_context_owner)
226            .then(|| main_context.acquire().ok())
227            .flatten();
228        assert!(
229            is_main_context_owner || has_acquired_main_context.is_some(),
230            "Async operations only allowed if the thread is owning the MainContext"
231        );
232
233        let cancellable = cancellable.map(|c| c.as_ref());
234        let gcancellable = cancellable.to_glib_none();
235        let user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
236            Box::new((glib::thread_guard::ThreadGuard::new(callback), buffer));
237        // Need to do this after boxing as the contents pointer might change by moving into the box
238        let (count, buffer_ptr) = {
239            let buffer = &user_data.1;
240            let slice = buffer.as_ref();
241            (slice.len(), slice.as_ptr())
242        };
243        unsafe extern "C" fn write_all_async_trampoline<
244            B: AsRef<[u8]> + Send + 'static,
245            Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + 'static,
246        >(
247            _source_object: *mut glib::gobject_ffi::GObject,
248            res: *mut ffi::GAsyncResult,
249            user_data: glib::ffi::gpointer,
250        ) {
251            let user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
252                Box::from_raw(user_data as *mut _);
253            let (callback, buffer) = *user_data;
254            let callback = callback.into_inner();
255
256            let mut error = ptr::null_mut();
257            let mut bytes_written = mem::MaybeUninit::uninit();
258            let _ = ffi::g_output_stream_write_all_finish(
259                _source_object as *mut _,
260                res,
261                bytes_written.as_mut_ptr(),
262                &mut error,
263            );
264            let bytes_written = bytes_written.assume_init();
265            let result = if error.is_null() {
266                Ok((buffer, bytes_written, None))
267            } else if bytes_written != 0 {
268                Ok((buffer, bytes_written, from_glib_full(error)))
269            } else {
270                Err((buffer, from_glib_full(error)))
271            };
272            callback(result);
273        }
274        let callback = write_all_async_trampoline::<B, Q>;
275        unsafe {
276            ffi::g_output_stream_write_all_async(
277                self.as_ref().to_glib_none().0,
278                mut_override(buffer_ptr),
279                count,
280                io_priority.into_glib(),
281                gcancellable.0,
282                Some(callback),
283                Box::into_raw(user_data) as *mut _,
284            );
285        }
286    }
287
288    fn write_future<B: AsRef<[u8]> + Send + 'static>(
289        &self,
290        buffer: B,
291        io_priority: Priority,
292    ) -> Pin<Box<dyn std::future::Future<Output = Result<(B, usize), (B, glib::Error)>> + 'static>>
293    {
294        Box::pin(crate::GioFuture::new(
295            self,
296            move |obj, cancellable, send| {
297                obj.write_async(buffer, io_priority, Some(cancellable), move |res| {
298                    send.resolve(res);
299                });
300            },
301        ))
302    }
303
304    fn write_all_future<B: AsRef<[u8]> + Send + 'static>(
305        &self,
306        buffer: B,
307        io_priority: Priority,
308    ) -> Pin<
309        Box<
310            dyn std::future::Future<
311                    Output = Result<(B, usize, Option<glib::Error>), (B, glib::Error)>,
312                > + 'static,
313        >,
314    > {
315        Box::pin(crate::GioFuture::new(
316            self,
317            move |obj, cancellable, send| {
318                obj.write_all_async(buffer, io_priority, Some(cancellable), move |res| {
319                    send.resolve(res);
320                });
321            },
322        ))
323    }
324
325    /// Tries to write the bytes contained in the @n_vectors @vectors into the
326    /// stream. Will block during the operation.
327    ///
328    /// If @n_vectors is 0 or the sum of all bytes in @vectors is 0, returns 0 and
329    /// does nothing.
330    ///
331    /// On success, the number of bytes written to the stream is returned.
332    /// It is not an error if this is not the same as the requested size, as it
333    /// can happen e.g. on a partial I/O error, or if there is not enough
334    /// storage in the stream. All writes block until at least one byte
335    /// is written or an error occurs; 0 is never returned (unless
336    /// @n_vectors is 0 or the sum of all bytes in @vectors is 0).
337    ///
338    /// If @cancellable is not [`None`], then the operation can be cancelled by
339    /// triggering the cancellable object from another thread. If the operation
340    /// was cancelled, the error [`IOErrorEnum::Cancelled`][crate::IOErrorEnum::Cancelled] will be returned. If an
341    /// operation was partially finished when the operation was cancelled the
342    /// partial result will be returned, without an error.
343    ///
344    /// Some implementations of g_output_stream_writev() may have limitations on the
345    /// aggregate buffer size, and will return [`IOErrorEnum::InvalidArgument`][crate::IOErrorEnum::InvalidArgument] if these
346    /// are exceeded. For example, when writing to a local file on UNIX platforms,
347    /// the aggregate buffer size must not exceed `G_MAXSSIZE` bytes.
348    /// ## `vectors`
349    /// the buffer containing the #GOutputVectors to write.
350    /// ## `cancellable`
351    /// optional cancellable object
352    ///
353    /// # Returns
354    ///
355    /// [`true`] on success, [`false`] if there was an error
356    ///
357    /// ## `bytes_written`
358    /// location to store the number of bytes that were
359    ///     written to the stream
360    #[cfg(feature = "v2_60")]
361    #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
362    #[doc(alias = "g_output_stream_writev")]
363    fn writev(
364        &self,
365        vectors: &[OutputVector],
366        cancellable: Option<&impl IsA<Cancellable>>,
367    ) -> Result<usize, glib::Error> {
368        unsafe {
369            let mut error = ptr::null_mut();
370            let mut bytes_written = mem::MaybeUninit::uninit();
371
372            ffi::g_output_stream_writev(
373                self.as_ref().to_glib_none().0,
374                vectors.as_ptr() as *const _,
375                vectors.len(),
376                bytes_written.as_mut_ptr(),
377                cancellable.map(|p| p.as_ref()).to_glib_none().0,
378                &mut error,
379            );
380            if error.is_null() {
381                Ok(bytes_written.assume_init())
382            } else {
383                Err(from_glib_full(error))
384            }
385        }
386    }
387
388    /// Request an asynchronous write of the bytes contained in @n_vectors @vectors into
389    /// the stream. When the operation is finished @callback will be called.
390    /// You can then call g_output_stream_writev_finish() to get the result of the
391    /// operation.
392    ///
393    /// During an async request no other sync and async calls are allowed,
394    /// and will result in [`IOErrorEnum::Pending`][crate::IOErrorEnum::Pending] errors.
395    ///
396    /// On success, the number of bytes written will be passed to the
397    /// @callback. It is not an error if this is not the same as the
398    /// requested size, as it can happen e.g. on a partial I/O error,
399    /// but generally we try to write as many bytes as requested.
400    ///
401    /// You are guaranteed that this method will never fail with
402    /// [`IOErrorEnum::WouldBlock`][crate::IOErrorEnum::WouldBlock] — if @self can't accept more data, the
403    /// method will just wait until this changes.
404    ///
405    /// Any outstanding I/O request with higher priority (lower numerical
406    /// value) will be executed before an outstanding request with lower
407    /// priority. Default priority is `G_PRIORITY_DEFAULT`.
408    ///
409    /// The asynchronous methods have a default fallback that uses threads
410    /// to implement asynchronicity, so they are optional for inheriting
411    /// classes. However, if you override one you must override all.
412    ///
413    /// For the synchronous, blocking version of this function, see
414    /// g_output_stream_writev().
415    ///
416    /// Note that no copy of @vectors will be made, so it must stay valid
417    /// until @callback is called.
418    /// ## `vectors`
419    /// the buffer containing the #GOutputVectors to write.
420    /// ## `io_priority`
421    /// the I/O priority of the request.
422    /// ## `cancellable`
423    /// optional #GCancellable object, [`None`] to ignore.
424    /// ## `callback`
425    /// a #GAsyncReadyCallback
426    ///     to call when the request is satisfied
427    #[cfg(feature = "v2_60")]
428    #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
429    #[doc(alias = "g_output_stream_writev_async")]
430    fn writev_async<
431        B: AsRef<[u8]> + Send + 'static,
432        P: FnOnce(Result<(Vec<B>, usize), (Vec<B>, glib::Error)>) + 'static,
433    >(
434        &self,
435        vectors: impl IntoIterator<Item = B> + 'static,
436        io_priority: glib::Priority,
437        cancellable: Option<&impl IsA<Cancellable>>,
438        callback: P,
439    ) {
440        let main_context = glib::MainContext::ref_thread_default();
441        let is_main_context_owner = main_context.is_owner();
442        let has_acquired_main_context = (!is_main_context_owner)
443            .then(|| main_context.acquire().ok())
444            .flatten();
445        assert!(
446            is_main_context_owner || has_acquired_main_context.is_some(),
447            "Async operations only allowed if the thread is owning the MainContext"
448        );
449
450        let cancellable = cancellable.map(|c| c.as_ref());
451        let gcancellable = cancellable.to_glib_none();
452        let buffers = vectors.into_iter().collect::<Vec<_>>();
453        let vectors = buffers
454            .iter()
455            .map(|v| ffi::GOutputVector {
456                buffer: v.as_ref().as_ptr() as *const _,
457                size: v.as_ref().len(),
458            })
459            .collect::<Vec<_>>();
460        let vectors_ptr = vectors.as_ptr();
461        let num_vectors = vectors.len();
462        let user_data: Box<(
463            glib::thread_guard::ThreadGuard<P>,
464            Vec<B>,
465            Vec<ffi::GOutputVector>,
466        )> = Box::new((
467            glib::thread_guard::ThreadGuard::new(callback),
468            buffers,
469            vectors,
470        ));
471
472        unsafe extern "C" fn writev_async_trampoline<
473            B: AsRef<[u8]> + Send + 'static,
474            P: FnOnce(Result<(Vec<B>, usize), (Vec<B>, glib::Error)>) + 'static,
475        >(
476            _source_object: *mut glib::gobject_ffi::GObject,
477            res: *mut ffi::GAsyncResult,
478            user_data: glib::ffi::gpointer,
479        ) {
480            let user_data: Box<(
481                glib::thread_guard::ThreadGuard<P>,
482                Vec<B>,
483                Vec<ffi::GOutputVector>,
484            )> = Box::from_raw(user_data as *mut _);
485            let (callback, buffers, _) = *user_data;
486            let callback = callback.into_inner();
487
488            let mut error = ptr::null_mut();
489            let mut bytes_written = mem::MaybeUninit::uninit();
490            ffi::g_output_stream_writev_finish(
491                _source_object as *mut _,
492                res,
493                bytes_written.as_mut_ptr(),
494                &mut error,
495            );
496            let bytes_written = bytes_written.assume_init();
497            let result = if error.is_null() {
498                Ok((buffers, bytes_written))
499            } else {
500                Err((buffers, from_glib_full(error)))
501            };
502            callback(result);
503        }
504        let callback = writev_async_trampoline::<B, P>;
505        unsafe {
506            ffi::g_output_stream_writev_async(
507                self.as_ref().to_glib_none().0,
508                vectors_ptr,
509                num_vectors,
510                io_priority.into_glib(),
511                gcancellable.0,
512                Some(callback),
513                Box::into_raw(user_data) as *mut _,
514            );
515        }
516    }
517
518    #[cfg(feature = "v2_60")]
519    #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
520    fn writev_future<B: AsRef<[u8]> + Send + 'static>(
521        &self,
522        vectors: impl IntoIterator<Item = B> + 'static,
523        io_priority: glib::Priority,
524    ) -> Pin<
525        Box<
526            dyn std::future::Future<Output = Result<(Vec<B>, usize), (Vec<B>, glib::Error)>>
527                + 'static,
528        >,
529    > {
530        Box::pin(crate::GioFuture::new(
531            self,
532            move |obj, cancellable, send| {
533                obj.writev_async(vectors, io_priority, Some(cancellable), move |res| {
534                    send.resolve(res);
535                });
536            },
537        ))
538    }
539
540    /// Tries to write the bytes contained in the @n_vectors @vectors into the
541    /// stream. Will block during the operation.
542    ///
543    /// This function is similar to g_output_stream_writev(), except it tries to
544    /// write as many bytes as requested, only stopping on an error.
545    ///
546    /// On a successful write of all @n_vectors vectors, [`true`] is returned, and
547    /// @bytes_written is set to the sum of all the sizes of @vectors.
548    ///
549    /// If there is an error during the operation [`false`] is returned and @error
550    /// is set to indicate the error status.
551    ///
552    /// As a special exception to the normal conventions for functions that
553    /// use #GError, if this function returns [`false`] (and sets @error) then
554    /// @bytes_written will be set to the number of bytes that were
555    /// successfully written before the error was encountered.  This
556    /// functionality is only available from C. If you need it from another
557    /// language then you must write your own loop around
558    /// g_output_stream_write().
559    ///
560    /// The content of the individual elements of @vectors might be changed by this
561    /// function.
562    /// ## `vectors`
563    /// the buffer containing the #GOutputVectors to write.
564    /// ## `cancellable`
565    /// optional #GCancellable object, [`None`] to ignore.
566    ///
567    /// # Returns
568    ///
569    /// [`true`] on success, [`false`] if there was an error
570    ///
571    /// ## `bytes_written`
572    /// location to store the number of bytes that were
573    ///     written to the stream
574    #[cfg(feature = "v2_60")]
575    #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
576    #[doc(alias = "g_output_stream_writev_all")]
577    fn writev_all(
578        &self,
579        vectors: &[OutputVector],
580        cancellable: Option<&impl IsA<Cancellable>>,
581    ) -> Result<(usize, Option<glib::Error>), glib::Error> {
582        unsafe {
583            let mut error = ptr::null_mut();
584            let mut bytes_written = mem::MaybeUninit::uninit();
585
586            ffi::g_output_stream_writev_all(
587                self.as_ref().to_glib_none().0,
588                mut_override(vectors.as_ptr() as *const _),
589                vectors.len(),
590                bytes_written.as_mut_ptr(),
591                cancellable.map(|p| p.as_ref()).to_glib_none().0,
592                &mut error,
593            );
594            let bytes_written = bytes_written.assume_init();
595            if error.is_null() {
596                Ok((bytes_written, None))
597            } else if bytes_written != 0 {
598                Ok((bytes_written, Some(from_glib_full(error))))
599            } else {
600                Err(from_glib_full(error))
601            }
602        }
603    }
604
605    /// Request an asynchronous write of the bytes contained in the @n_vectors @vectors into
606    /// the stream. When the operation is finished @callback will be called.
607    /// You can then call g_output_stream_writev_all_finish() to get the result of the
608    /// operation.
609    ///
610    /// This is the asynchronous version of g_output_stream_writev_all().
611    ///
612    /// Call g_output_stream_writev_all_finish() to collect the result.
613    ///
614    /// Any outstanding I/O request with higher priority (lower numerical
615    /// value) will be executed before an outstanding request with lower
616    /// priority. Default priority is `G_PRIORITY_DEFAULT`.
617    ///
618    /// Note that no copy of @vectors will be made, so it must stay valid
619    /// until @callback is called. The content of the individual elements
620    /// of @vectors might be changed by this function.
621    /// ## `vectors`
622    /// the buffer containing the #GOutputVectors to write.
623    /// ## `io_priority`
624    /// the I/O priority of the request
625    /// ## `cancellable`
626    /// optional #GCancellable object, [`None`] to ignore
627    /// ## `callback`
628    /// a #GAsyncReadyCallback
629    ///     to call when the request is satisfied
630    #[cfg(feature = "v2_60")]
631    #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
632    #[doc(alias = "g_output_stream_writev_all_async")]
633    fn writev_all_async<
634        B: AsRef<[u8]> + Send + 'static,
635        P: FnOnce(Result<(Vec<B>, usize, Option<glib::Error>), (Vec<B>, glib::Error)>) + 'static,
636    >(
637        &self,
638        vectors: impl IntoIterator<Item = B> + 'static,
639        io_priority: glib::Priority,
640        cancellable: Option<&impl IsA<Cancellable>>,
641        callback: P,
642    ) {
643        let main_context = glib::MainContext::ref_thread_default();
644        let is_main_context_owner = main_context.is_owner();
645        let has_acquired_main_context = (!is_main_context_owner)
646            .then(|| main_context.acquire().ok())
647            .flatten();
648        assert!(
649            is_main_context_owner || has_acquired_main_context.is_some(),
650            "Async operations only allowed if the thread is owning the MainContext"
651        );
652
653        let cancellable = cancellable.map(|c| c.as_ref());
654        let gcancellable = cancellable.to_glib_none();
655        let buffers = vectors.into_iter().collect::<Vec<_>>();
656        let vectors = buffers
657            .iter()
658            .map(|v| ffi::GOutputVector {
659                buffer: v.as_ref().as_ptr() as *const _,
660                size: v.as_ref().len(),
661            })
662            .collect::<Vec<_>>();
663        let vectors_ptr = vectors.as_ptr();
664        let num_vectors = vectors.len();
665        let user_data: Box<(
666            glib::thread_guard::ThreadGuard<P>,
667            Vec<B>,
668            Vec<ffi::GOutputVector>,
669        )> = Box::new((
670            glib::thread_guard::ThreadGuard::new(callback),
671            buffers,
672            vectors,
673        ));
674
675        unsafe extern "C" fn writev_all_async_trampoline<
676            B: AsRef<[u8]> + Send + 'static,
677            P: FnOnce(Result<(Vec<B>, usize, Option<glib::Error>), (Vec<B>, glib::Error)>) + 'static,
678        >(
679            _source_object: *mut glib::gobject_ffi::GObject,
680            res: *mut ffi::GAsyncResult,
681            user_data: glib::ffi::gpointer,
682        ) {
683            let user_data: Box<(
684                glib::thread_guard::ThreadGuard<P>,
685                Vec<B>,
686                Vec<ffi::GOutputVector>,
687            )> = Box::from_raw(user_data as *mut _);
688            let (callback, buffers, _) = *user_data;
689            let callback = callback.into_inner();
690
691            let mut error = ptr::null_mut();
692            let mut bytes_written = mem::MaybeUninit::uninit();
693            ffi::g_output_stream_writev_all_finish(
694                _source_object as *mut _,
695                res,
696                bytes_written.as_mut_ptr(),
697                &mut error,
698            );
699            let bytes_written = bytes_written.assume_init();
700            let result = if error.is_null() {
701                Ok((buffers, bytes_written, None))
702            } else if bytes_written != 0 {
703                Ok((buffers, bytes_written, from_glib_full(error)))
704            } else {
705                Err((buffers, from_glib_full(error)))
706            };
707            callback(result);
708        }
709        let callback = writev_all_async_trampoline::<B, P>;
710        unsafe {
711            ffi::g_output_stream_writev_all_async(
712                self.as_ref().to_glib_none().0,
713                mut_override(vectors_ptr),
714                num_vectors,
715                io_priority.into_glib(),
716                gcancellable.0,
717                Some(callback),
718                Box::into_raw(user_data) as *mut _,
719            );
720        }
721    }
722
723    #[cfg(feature = "v2_60")]
724    #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
725    fn writev_all_future<B: AsRef<[u8]> + Send + 'static>(
726        &self,
727        vectors: impl IntoIterator<Item = B> + 'static,
728        io_priority: glib::Priority,
729    ) -> Pin<
730        Box<
731            dyn std::future::Future<
732                    Output = Result<(Vec<B>, usize, Option<glib::Error>), (Vec<B>, glib::Error)>,
733                > + 'static,
734        >,
735    > {
736        Box::pin(crate::GioFuture::new(
737            self,
738            move |obj, cancellable, send| {
739                obj.writev_all_async(vectors, io_priority, Some(cancellable), move |res| {
740                    send.resolve(res);
741                });
742            },
743        ))
744    }
745
746    fn into_write(self) -> OutputStreamWrite<Self>
747    where
748        Self: IsA<OutputStream>,
749    {
750        OutputStreamWrite(self)
751    }
752}
753
754impl<O: IsA<OutputStream>> OutputStreamExtManual for O {}
755
756#[derive(Debug)]
757pub struct OutputStreamWrite<T: IsA<OutputStream>>(T);
758
759impl<T: IsA<OutputStream>> OutputStreamWrite<T> {
760    pub fn into_output_stream(self) -> T {
761        self.0
762    }
763
764    pub fn output_stream(&self) -> &T {
765        &self.0
766    }
767}
768
769impl<T: IsA<OutputStream>> io::Write for OutputStreamWrite<T> {
770    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
771        let result = self
772            .0
773            .as_ref()
774            .write(buf, crate::Cancellable::NONE)
775            .map(|size| size as usize);
776        to_std_io_result(result)
777    }
778
779    fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
780        let result = self
781            .0
782            .as_ref()
783            .write_all(buf, crate::Cancellable::NONE)
784            .and_then(|(_, e)| e.map(Err).unwrap_or(Ok(())));
785        to_std_io_result(result)
786    }
787
788    #[cfg(feature = "v2_60")]
789    #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
790    fn write_vectored(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
791        let vectors = bufs
792            .iter()
793            .map(|v| OutputVector::new(v))
794            .collect::<smallvec::SmallVec<[_; 2]>>();
795        let result = self.0.as_ref().writev(&vectors, crate::Cancellable::NONE);
796        to_std_io_result(result)
797    }
798
799    fn flush(&mut self) -> io::Result<()> {
800        let gio_result = self.0.as_ref().flush(crate::Cancellable::NONE);
801        to_std_io_result(gio_result)
802    }
803}
804
805impl<T: IsA<OutputStream> + IsA<Seekable>> io::Seek for OutputStreamWrite<T> {
806    fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
807        let (pos, type_) = match pos {
808            io::SeekFrom::Start(pos) => (pos as i64, glib::SeekType::Set),
809            io::SeekFrom::End(pos) => (pos, glib::SeekType::End),
810            io::SeekFrom::Current(pos) => (pos, glib::SeekType::Cur),
811        };
812        let seekable: &Seekable = self.0.as_ref();
813        let gio_result = seekable
814            .seek(pos, type_, crate::Cancellable::NONE)
815            .map(|_| seekable.tell() as u64);
816        to_std_io_result(gio_result)
817    }
818}
819
820#[cfg(test)]
821mod tests {
822    use std::io::Write;
823
824    use glib::Bytes;
825
826    #[cfg(feature = "v2_60")]
827    use crate::OutputVector;
828    use crate::{prelude::*, test_util::run_async, MemoryInputStream, MemoryOutputStream};
829
830    #[test]
831    fn splice_async() {
832        let ret = run_async(|tx, l| {
833            let input = MemoryInputStream::new();
834            let b = Bytes::from_owned(vec![1, 2, 3]);
835            input.add_bytes(&b);
836
837            let strm = MemoryOutputStream::new_resizable();
838            strm.splice_async(
839                &input,
840                crate::OutputStreamSpliceFlags::CLOSE_SOURCE,
841                glib::Priority::DEFAULT_IDLE,
842                crate::Cancellable::NONE,
843                move |ret| {
844                    tx.send(ret).unwrap();
845                    l.quit();
846                },
847            );
848        });
849
850        assert_eq!(ret.unwrap(), 3);
851    }
852
853    #[test]
854    fn write_async() {
855        let ret = run_async(|tx, l| {
856            let strm = MemoryOutputStream::new_resizable();
857
858            let buf = vec![1, 2, 3];
859            strm.write_async(
860                buf,
861                glib::Priority::DEFAULT_IDLE,
862                crate::Cancellable::NONE,
863                move |ret| {
864                    tx.send(ret).unwrap();
865                    l.quit();
866                },
867            );
868        });
869
870        let (buf, size) = ret.unwrap();
871        assert_eq!(buf, vec![1, 2, 3]);
872        assert_eq!(size, 3);
873    }
874
875    #[test]
876    fn write_all_async() {
877        let ret = run_async(|tx, l| {
878            let strm = MemoryOutputStream::new_resizable();
879
880            let buf = vec![1, 2, 3];
881            strm.write_all_async(
882                buf,
883                glib::Priority::DEFAULT_IDLE,
884                crate::Cancellable::NONE,
885                move |ret| {
886                    tx.send(ret).unwrap();
887                    l.quit();
888                },
889            );
890        });
891
892        let (buf, size, err) = ret.unwrap();
893        assert_eq!(buf, vec![1, 2, 3]);
894        assert_eq!(size, 3);
895        assert!(err.is_none());
896    }
897
898    #[test]
899    fn write_bytes_async() {
900        let ret = run_async(|tx, l| {
901            let strm = MemoryOutputStream::new_resizable();
902
903            let b = Bytes::from_owned(vec![1, 2, 3]);
904            strm.write_bytes_async(
905                &b,
906                glib::Priority::DEFAULT_IDLE,
907                crate::Cancellable::NONE,
908                move |ret| {
909                    tx.send(ret).unwrap();
910                    l.quit();
911                },
912            );
913        });
914
915        assert_eq!(ret.unwrap(), 3);
916    }
917
918    #[test]
919    fn std_io_write() {
920        let b = Bytes::from_owned(vec![1, 2, 3]);
921        let mut write = MemoryOutputStream::new_resizable().into_write();
922
923        let ret = write.write(&b);
924
925        let stream = write.into_output_stream();
926        stream.close(crate::Cancellable::NONE).unwrap();
927        assert_eq!(ret.unwrap(), 3);
928        assert_eq!(stream.steal_as_bytes(), [1, 2, 3].as_ref());
929    }
930
931    #[test]
932    fn into_output_stream() {
933        let stream = MemoryOutputStream::new_resizable();
934        let stream_clone = stream.clone();
935        let stream = stream.into_write().into_output_stream();
936
937        assert_eq!(stream, stream_clone);
938    }
939
940    #[test]
941    #[cfg(feature = "v2_60")]
942    fn writev() {
943        let stream = MemoryOutputStream::new_resizable();
944
945        let ret = stream.writev(
946            &[OutputVector::new(&[1, 2, 3]), OutputVector::new(&[4, 5, 6])],
947            crate::Cancellable::NONE,
948        );
949        assert_eq!(ret.unwrap(), 6);
950        stream.close(crate::Cancellable::NONE).unwrap();
951        assert_eq!(stream.steal_as_bytes(), [1, 2, 3, 4, 5, 6].as_ref());
952    }
953
954    #[test]
955    #[cfg(feature = "v2_60")]
956    fn writev_async() {
957        let ret = run_async(|tx, l| {
958            let strm = MemoryOutputStream::new_resizable();
959
960            let strm_clone = strm.clone();
961            strm.writev_async(
962                [vec![1, 2, 3], vec![4, 5, 6]],
963                glib::Priority::DEFAULT_IDLE,
964                crate::Cancellable::NONE,
965                move |ret| {
966                    tx.send(ret).unwrap();
967                    strm_clone.close(crate::Cancellable::NONE).unwrap();
968                    assert_eq!(strm_clone.steal_as_bytes(), [1, 2, 3, 4, 5, 6].as_ref());
969                    l.quit();
970                },
971            );
972        });
973
974        let (buf, size) = ret.unwrap();
975        assert_eq!(buf, [[1, 2, 3], [4, 5, 6]]);
976        assert_eq!(size, 6);
977    }
978
979    #[test]
980    #[cfg(feature = "v2_60")]
981    fn writev_all_async() {
982        let ret = run_async(|tx, l| {
983            let strm = MemoryOutputStream::new_resizable();
984
985            let strm_clone = strm.clone();
986            strm.writev_all_async(
987                [vec![1, 2, 3], vec![4, 5, 6]],
988                glib::Priority::DEFAULT_IDLE,
989                crate::Cancellable::NONE,
990                move |ret| {
991                    tx.send(ret).unwrap();
992                    strm_clone.close(crate::Cancellable::NONE).unwrap();
993                    assert_eq!(strm_clone.steal_as_bytes(), [1, 2, 3, 4, 5, 6].as_ref());
994                    l.quit();
995                },
996            );
997        });
998
999        let (buf, size, err) = ret.unwrap();
1000        assert_eq!(buf, [[1, 2, 3], [4, 5, 6]]);
1001        assert_eq!(size, 6);
1002        assert!(err.is_none());
1003    }
1004}