gio/
output_stream.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{io, mem, pin::Pin, ptr};
4
5use glib::{prelude::*, translate::*, Priority};
6
7#[cfg(feature = "v2_60")]
8use crate::OutputVector;
9use crate::{error::to_std_io_result, ffi, prelude::*, Cancellable, OutputStream, Seekable};
10
11mod sealed {
12    pub trait Sealed {}
13    impl<T: super::IsA<super::OutputStream>> Sealed for T {}
14}
15
16pub trait OutputStreamExtManual: sealed::Sealed + IsA<OutputStream> + Sized {
17    /// Request an asynchronous write of @count bytes from @buffer into
18    /// the stream. When the operation is finished @callback will be called.
19    /// You can then call g_output_stream_write_finish() to get the result of the
20    /// operation.
21    ///
22    /// During an async request no other sync and async calls are allowed,
23    /// and will result in [`IOErrorEnum::Pending`][crate::IOErrorEnum::Pending] errors.
24    ///
25    /// A value of @count larger than `G_MAXSSIZE` will cause a
26    /// [`IOErrorEnum::InvalidArgument`][crate::IOErrorEnum::InvalidArgument] error.
27    ///
28    /// On success, the number of bytes written will be passed to the
29    /// @callback. It is not an error if this is not the same as the
30    /// requested size, as it can happen e.g. on a partial I/O error,
31    /// but generally we try to write as many bytes as requested.
32    ///
33    /// You are guaranteed that this method will never fail with
34    /// [`IOErrorEnum::WouldBlock`][crate::IOErrorEnum::WouldBlock] - if @self can't accept more data, the
35    /// method will just wait until this changes.
36    ///
37    /// Any outstanding I/O request with higher priority (lower numerical
38    /// value) will be executed before an outstanding request with lower
39    /// priority. Default priority is `G_PRIORITY_DEFAULT`.
40    ///
41    /// The asynchronous methods have a default fallback that uses threads
42    /// to implement asynchronicity, so they are optional for inheriting
43    /// classes. However, if you override one you must override all.
44    ///
45    /// For the synchronous, blocking version of this function, see
46    /// g_output_stream_write().
47    ///
48    /// Note that no copy of @buffer will be made, so it must stay valid
49    /// until @callback is called. See g_output_stream_write_bytes_async()
50    /// for a #GBytes version that will automatically hold a reference to
51    /// the contents (without copying) for the duration of the call.
52    /// ## `buffer`
53    /// the buffer containing the data to write.
54    /// ## `io_priority`
55    /// the io priority of the request.
56    /// ## `cancellable`
57    /// optional #GCancellable object, [`None`] to ignore.
58    /// ## `callback`
59    /// a #GAsyncReadyCallback
60    ///     to call when the request is satisfied
61    #[doc(alias = "g_output_stream_write_async")]
62    fn write_async<
63        B: AsRef<[u8]> + Send + 'static,
64        Q: FnOnce(Result<(B, usize), (B, glib::Error)>) + 'static,
65        C: IsA<Cancellable>,
66    >(
67        &self,
68        buffer: B,
69        io_priority: Priority,
70        cancellable: Option<&C>,
71        callback: Q,
72    ) {
73        let main_context = glib::MainContext::ref_thread_default();
74        let is_main_context_owner = main_context.is_owner();
75        let has_acquired_main_context = (!is_main_context_owner)
76            .then(|| main_context.acquire().ok())
77            .flatten();
78        assert!(
79            is_main_context_owner || has_acquired_main_context.is_some(),
80            "Async operations only allowed if the thread is owning the MainContext"
81        );
82
83        let cancellable = cancellable.map(|c| c.as_ref());
84        let gcancellable = cancellable.to_glib_none();
85        let user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
86            Box::new((glib::thread_guard::ThreadGuard::new(callback), buffer));
87        // Need to do this after boxing as the contents pointer might change by moving into the box
88        let (count, buffer_ptr) = {
89            let buffer = &user_data.1;
90            let slice = buffer.as_ref();
91            (slice.len(), slice.as_ptr())
92        };
93        unsafe extern "C" fn write_async_trampoline<
94            B: AsRef<[u8]> + Send + 'static,
95            Q: FnOnce(Result<(B, usize), (B, glib::Error)>) + 'static,
96        >(
97            _source_object: *mut glib::gobject_ffi::GObject,
98            res: *mut ffi::GAsyncResult,
99            user_data: glib::ffi::gpointer,
100        ) {
101            let user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
102                Box::from_raw(user_data as *mut _);
103            let (callback, buffer) = *user_data;
104            let callback = callback.into_inner();
105
106            let mut error = ptr::null_mut();
107            let ret = ffi::g_output_stream_write_finish(_source_object as *mut _, res, &mut error);
108            let result = if error.is_null() {
109                Ok((buffer, ret as usize))
110            } else {
111                Err((buffer, from_glib_full(error)))
112            };
113            callback(result);
114        }
115        let callback = write_async_trampoline::<B, Q>;
116        unsafe {
117            ffi::g_output_stream_write_async(
118                self.as_ref().to_glib_none().0,
119                mut_override(buffer_ptr),
120                count,
121                io_priority.into_glib(),
122                gcancellable.0,
123                Some(callback),
124                Box::into_raw(user_data) as *mut _,
125            );
126        }
127    }
128
129    /// Tries to write @count bytes from @buffer into the stream. Will block
130    /// during the operation.
131    ///
132    /// This function is similar to g_output_stream_write(), except it tries to
133    /// write as many bytes as requested, only stopping on an error.
134    ///
135    /// On a successful write of @count bytes, [`true`] is returned, and @bytes_written
136    /// is set to @count.
137    ///
138    /// If there is an error during the operation [`false`] is returned and @error
139    /// is set to indicate the error status.
140    ///
141    /// As a special exception to the normal conventions for functions that
142    /// use #GError, if this function returns [`false`] (and sets @error) then
143    /// @bytes_written will be set to the number of bytes that were
144    /// successfully written before the error was encountered.  This
145    /// functionality is only available from C.  If you need it from another
146    /// language then you must write your own loop around
147    /// g_output_stream_write().
148    /// ## `buffer`
149    /// the buffer containing the data to write.
150    /// ## `cancellable`
151    /// optional #GCancellable object, [`None`] to ignore.
152    ///
153    /// # Returns
154    ///
155    /// [`true`] on success, [`false`] if there was an error
156    ///
157    /// ## `bytes_written`
158    /// location to store the number of bytes that was
159    ///     written to the stream
160    #[doc(alias = "g_output_stream_write_all")]
161    fn write_all<C: IsA<Cancellable>>(
162        &self,
163        buffer: &[u8],
164        cancellable: Option<&C>,
165    ) -> Result<(usize, Option<glib::Error>), glib::Error> {
166        let cancellable = cancellable.map(|c| c.as_ref());
167        let gcancellable = cancellable.to_glib_none();
168        let count = buffer.len();
169        unsafe {
170            let mut bytes_written = mem::MaybeUninit::uninit();
171            let mut error = ptr::null_mut();
172            let _ = ffi::g_output_stream_write_all(
173                self.as_ref().to_glib_none().0,
174                buffer.to_glib_none().0,
175                count,
176                bytes_written.as_mut_ptr(),
177                gcancellable.0,
178                &mut error,
179            );
180
181            let bytes_written = bytes_written.assume_init();
182            if error.is_null() {
183                Ok((bytes_written, None))
184            } else if bytes_written != 0 {
185                Ok((bytes_written, Some(from_glib_full(error))))
186            } else {
187                Err(from_glib_full(error))
188            }
189        }
190    }
191
192    /// Request an asynchronous write of @count bytes from @buffer into
193    /// the stream. When the operation is finished @callback will be called.
194    /// You can then call g_output_stream_write_all_finish() to get the result of the
195    /// operation.
196    ///
197    /// This is the asynchronous version of g_output_stream_write_all().
198    ///
199    /// Call g_output_stream_write_all_finish() to collect the result.
200    ///
201    /// Any outstanding I/O request with higher priority (lower numerical
202    /// value) will be executed before an outstanding request with lower
203    /// priority. Default priority is `G_PRIORITY_DEFAULT`.
204    ///
205    /// Note that no copy of @buffer will be made, so it must stay valid
206    /// until @callback is called.
207    /// ## `buffer`
208    /// the buffer containing the data to write
209    /// ## `io_priority`
210    /// the io priority of the request
211    /// ## `cancellable`
212    /// optional #GCancellable object, [`None`] to ignore
213    /// ## `callback`
214    /// a #GAsyncReadyCallback
215    ///     to call when the request is satisfied
216    #[doc(alias = "g_output_stream_write_all_async")]
217    fn write_all_async<
218        B: AsRef<[u8]> + Send + 'static,
219        Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + 'static,
220        C: IsA<Cancellable>,
221    >(
222        &self,
223        buffer: B,
224        io_priority: Priority,
225        cancellable: Option<&C>,
226        callback: Q,
227    ) {
228        let main_context = glib::MainContext::ref_thread_default();
229        let is_main_context_owner = main_context.is_owner();
230        let has_acquired_main_context = (!is_main_context_owner)
231            .then(|| main_context.acquire().ok())
232            .flatten();
233        assert!(
234            is_main_context_owner || has_acquired_main_context.is_some(),
235            "Async operations only allowed if the thread is owning the MainContext"
236        );
237
238        let cancellable = cancellable.map(|c| c.as_ref());
239        let gcancellable = cancellable.to_glib_none();
240        let user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
241            Box::new((glib::thread_guard::ThreadGuard::new(callback), buffer));
242        // Need to do this after boxing as the contents pointer might change by moving into the box
243        let (count, buffer_ptr) = {
244            let buffer = &user_data.1;
245            let slice = buffer.as_ref();
246            (slice.len(), slice.as_ptr())
247        };
248        unsafe extern "C" fn write_all_async_trampoline<
249            B: AsRef<[u8]> + Send + 'static,
250            Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + 'static,
251        >(
252            _source_object: *mut glib::gobject_ffi::GObject,
253            res: *mut ffi::GAsyncResult,
254            user_data: glib::ffi::gpointer,
255        ) {
256            let user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
257                Box::from_raw(user_data as *mut _);
258            let (callback, buffer) = *user_data;
259            let callback = callback.into_inner();
260
261            let mut error = ptr::null_mut();
262            let mut bytes_written = mem::MaybeUninit::uninit();
263            let _ = ffi::g_output_stream_write_all_finish(
264                _source_object as *mut _,
265                res,
266                bytes_written.as_mut_ptr(),
267                &mut error,
268            );
269            let bytes_written = bytes_written.assume_init();
270            let result = if error.is_null() {
271                Ok((buffer, bytes_written, None))
272            } else if bytes_written != 0 {
273                Ok((buffer, bytes_written, from_glib_full(error)))
274            } else {
275                Err((buffer, from_glib_full(error)))
276            };
277            callback(result);
278        }
279        let callback = write_all_async_trampoline::<B, Q>;
280        unsafe {
281            ffi::g_output_stream_write_all_async(
282                self.as_ref().to_glib_none().0,
283                mut_override(buffer_ptr),
284                count,
285                io_priority.into_glib(),
286                gcancellable.0,
287                Some(callback),
288                Box::into_raw(user_data) as *mut _,
289            );
290        }
291    }
292
293    fn write_future<B: AsRef<[u8]> + Send + 'static>(
294        &self,
295        buffer: B,
296        io_priority: Priority,
297    ) -> Pin<Box<dyn std::future::Future<Output = Result<(B, usize), (B, glib::Error)>> + 'static>>
298    {
299        Box::pin(crate::GioFuture::new(
300            self,
301            move |obj, cancellable, send| {
302                obj.write_async(buffer, io_priority, Some(cancellable), move |res| {
303                    send.resolve(res);
304                });
305            },
306        ))
307    }
308
309    fn write_all_future<B: AsRef<[u8]> + Send + 'static>(
310        &self,
311        buffer: B,
312        io_priority: Priority,
313    ) -> Pin<
314        Box<
315            dyn std::future::Future<
316                    Output = Result<(B, usize, Option<glib::Error>), (B, glib::Error)>,
317                > + 'static,
318        >,
319    > {
320        Box::pin(crate::GioFuture::new(
321            self,
322            move |obj, cancellable, send| {
323                obj.write_all_async(buffer, io_priority, Some(cancellable), move |res| {
324                    send.resolve(res);
325                });
326            },
327        ))
328    }
329
330    /// Tries to write the bytes contained in the @n_vectors @vectors into the
331    /// stream. Will block during the operation.
332    ///
333    /// If @n_vectors is 0 or the sum of all bytes in @vectors is 0, returns 0 and
334    /// does nothing.
335    ///
336    /// On success, the number of bytes written to the stream is returned.
337    /// It is not an error if this is not the same as the requested size, as it
338    /// can happen e.g. on a partial I/O error, or if there is not enough
339    /// storage in the stream. All writes block until at least one byte
340    /// is written or an error occurs; 0 is never returned (unless
341    /// @n_vectors is 0 or the sum of all bytes in @vectors is 0).
342    ///
343    /// If @cancellable is not [`None`], then the operation can be cancelled by
344    /// triggering the cancellable object from another thread. If the operation
345    /// was cancelled, the error [`IOErrorEnum::Cancelled`][crate::IOErrorEnum::Cancelled] will be returned. If an
346    /// operation was partially finished when the operation was cancelled the
347    /// partial result will be returned, without an error.
348    ///
349    /// Some implementations of g_output_stream_writev() may have limitations on the
350    /// aggregate buffer size, and will return [`IOErrorEnum::InvalidArgument`][crate::IOErrorEnum::InvalidArgument] if these
351    /// are exceeded. For example, when writing to a local file on UNIX platforms,
352    /// the aggregate buffer size must not exceed `G_MAXSSIZE` bytes.
353    /// ## `vectors`
354    /// the buffer containing the #GOutputVectors to write.
355    /// ## `cancellable`
356    /// optional cancellable object
357    ///
358    /// # Returns
359    ///
360    /// [`true`] on success, [`false`] if there was an error
361    ///
362    /// ## `bytes_written`
363    /// location to store the number of bytes that were
364    ///     written to the stream
365    #[cfg(feature = "v2_60")]
366    #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
367    #[doc(alias = "g_output_stream_writev")]
368    fn writev(
369        &self,
370        vectors: &[OutputVector],
371        cancellable: Option<&impl IsA<Cancellable>>,
372    ) -> Result<usize, glib::Error> {
373        unsafe {
374            let mut error = ptr::null_mut();
375            let mut bytes_written = mem::MaybeUninit::uninit();
376
377            ffi::g_output_stream_writev(
378                self.as_ref().to_glib_none().0,
379                vectors.as_ptr() as *const _,
380                vectors.len(),
381                bytes_written.as_mut_ptr(),
382                cancellable.map(|p| p.as_ref()).to_glib_none().0,
383                &mut error,
384            );
385            if error.is_null() {
386                Ok(bytes_written.assume_init())
387            } else {
388                Err(from_glib_full(error))
389            }
390        }
391    }
392
393    /// Request an asynchronous write of the bytes contained in @n_vectors @vectors into
394    /// the stream. When the operation is finished @callback will be called.
395    /// You can then call g_output_stream_writev_finish() to get the result of the
396    /// operation.
397    ///
398    /// During an async request no other sync and async calls are allowed,
399    /// and will result in [`IOErrorEnum::Pending`][crate::IOErrorEnum::Pending] errors.
400    ///
401    /// On success, the number of bytes written will be passed to the
402    /// @callback. It is not an error if this is not the same as the
403    /// requested size, as it can happen e.g. on a partial I/O error,
404    /// but generally we try to write as many bytes as requested.
405    ///
406    /// You are guaranteed that this method will never fail with
407    /// [`IOErrorEnum::WouldBlock`][crate::IOErrorEnum::WouldBlock] — if @self can't accept more data, the
408    /// method will just wait until this changes.
409    ///
410    /// Any outstanding I/O request with higher priority (lower numerical
411    /// value) will be executed before an outstanding request with lower
412    /// priority. Default priority is `G_PRIORITY_DEFAULT`.
413    ///
414    /// The asynchronous methods have a default fallback that uses threads
415    /// to implement asynchronicity, so they are optional for inheriting
416    /// classes. However, if you override one you must override all.
417    ///
418    /// For the synchronous, blocking version of this function, see
419    /// g_output_stream_writev().
420    ///
421    /// Note that no copy of @vectors will be made, so it must stay valid
422    /// until @callback is called.
423    /// ## `vectors`
424    /// the buffer containing the #GOutputVectors to write.
425    /// ## `io_priority`
426    /// the I/O priority of the request.
427    /// ## `cancellable`
428    /// optional #GCancellable object, [`None`] to ignore.
429    /// ## `callback`
430    /// a #GAsyncReadyCallback
431    ///     to call when the request is satisfied
432    #[cfg(feature = "v2_60")]
433    #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
434    #[doc(alias = "g_output_stream_writev_async")]
435    fn writev_async<
436        B: AsRef<[u8]> + Send + 'static,
437        P: FnOnce(Result<(Vec<B>, usize), (Vec<B>, glib::Error)>) + 'static,
438    >(
439        &self,
440        vectors: impl IntoIterator<Item = B> + 'static,
441        io_priority: glib::Priority,
442        cancellable: Option<&impl IsA<Cancellable>>,
443        callback: P,
444    ) {
445        let main_context = glib::MainContext::ref_thread_default();
446        let is_main_context_owner = main_context.is_owner();
447        let has_acquired_main_context = (!is_main_context_owner)
448            .then(|| main_context.acquire().ok())
449            .flatten();
450        assert!(
451            is_main_context_owner || has_acquired_main_context.is_some(),
452            "Async operations only allowed if the thread is owning the MainContext"
453        );
454
455        let cancellable = cancellable.map(|c| c.as_ref());
456        let gcancellable = cancellable.to_glib_none();
457        let buffers = vectors.into_iter().collect::<Vec<_>>();
458        let vectors = buffers
459            .iter()
460            .map(|v| ffi::GOutputVector {
461                buffer: v.as_ref().as_ptr() as *const _,
462                size: v.as_ref().len(),
463            })
464            .collect::<Vec<_>>();
465        let vectors_ptr = vectors.as_ptr();
466        let num_vectors = vectors.len();
467        let user_data: Box<(
468            glib::thread_guard::ThreadGuard<P>,
469            Vec<B>,
470            Vec<ffi::GOutputVector>,
471        )> = Box::new((
472            glib::thread_guard::ThreadGuard::new(callback),
473            buffers,
474            vectors,
475        ));
476
477        unsafe extern "C" fn writev_async_trampoline<
478            B: AsRef<[u8]> + Send + 'static,
479            P: FnOnce(Result<(Vec<B>, usize), (Vec<B>, glib::Error)>) + 'static,
480        >(
481            _source_object: *mut glib::gobject_ffi::GObject,
482            res: *mut ffi::GAsyncResult,
483            user_data: glib::ffi::gpointer,
484        ) {
485            let user_data: Box<(
486                glib::thread_guard::ThreadGuard<P>,
487                Vec<B>,
488                Vec<ffi::GOutputVector>,
489            )> = Box::from_raw(user_data as *mut _);
490            let (callback, buffers, _) = *user_data;
491            let callback = callback.into_inner();
492
493            let mut error = ptr::null_mut();
494            let mut bytes_written = mem::MaybeUninit::uninit();
495            ffi::g_output_stream_writev_finish(
496                _source_object as *mut _,
497                res,
498                bytes_written.as_mut_ptr(),
499                &mut error,
500            );
501            let bytes_written = bytes_written.assume_init();
502            let result = if error.is_null() {
503                Ok((buffers, bytes_written))
504            } else {
505                Err((buffers, from_glib_full(error)))
506            };
507            callback(result);
508        }
509        let callback = writev_async_trampoline::<B, P>;
510        unsafe {
511            ffi::g_output_stream_writev_async(
512                self.as_ref().to_glib_none().0,
513                vectors_ptr,
514                num_vectors,
515                io_priority.into_glib(),
516                gcancellable.0,
517                Some(callback),
518                Box::into_raw(user_data) as *mut _,
519            );
520        }
521    }
522
523    #[cfg(feature = "v2_60")]
524    #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
525    fn writev_future<B: AsRef<[u8]> + Send + 'static>(
526        &self,
527        vectors: impl IntoIterator<Item = B> + 'static,
528        io_priority: glib::Priority,
529    ) -> Pin<
530        Box<
531            dyn std::future::Future<Output = Result<(Vec<B>, usize), (Vec<B>, glib::Error)>>
532                + 'static,
533        >,
534    > {
535        Box::pin(crate::GioFuture::new(
536            self,
537            move |obj, cancellable, send| {
538                obj.writev_async(vectors, io_priority, Some(cancellable), move |res| {
539                    send.resolve(res);
540                });
541            },
542        ))
543    }
544
545    /// Tries to write the bytes contained in the @n_vectors @vectors into the
546    /// stream. Will block during the operation.
547    ///
548    /// This function is similar to g_output_stream_writev(), except it tries to
549    /// write as many bytes as requested, only stopping on an error.
550    ///
551    /// On a successful write of all @n_vectors vectors, [`true`] is returned, and
552    /// @bytes_written is set to the sum of all the sizes of @vectors.
553    ///
554    /// If there is an error during the operation [`false`] is returned and @error
555    /// is set to indicate the error status.
556    ///
557    /// As a special exception to the normal conventions for functions that
558    /// use #GError, if this function returns [`false`] (and sets @error) then
559    /// @bytes_written will be set to the number of bytes that were
560    /// successfully written before the error was encountered.  This
561    /// functionality is only available from C. If you need it from another
562    /// language then you must write your own loop around
563    /// g_output_stream_write().
564    ///
565    /// The content of the individual elements of @vectors might be changed by this
566    /// function.
567    /// ## `vectors`
568    /// the buffer containing the #GOutputVectors to write.
569    /// ## `cancellable`
570    /// optional #GCancellable object, [`None`] to ignore.
571    ///
572    /// # Returns
573    ///
574    /// [`true`] on success, [`false`] if there was an error
575    ///
576    /// ## `bytes_written`
577    /// location to store the number of bytes that were
578    ///     written to the stream
579    #[cfg(feature = "v2_60")]
580    #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
581    #[doc(alias = "g_output_stream_writev_all")]
582    fn writev_all(
583        &self,
584        vectors: &[OutputVector],
585        cancellable: Option<&impl IsA<Cancellable>>,
586    ) -> Result<(usize, Option<glib::Error>), glib::Error> {
587        unsafe {
588            let mut error = ptr::null_mut();
589            let mut bytes_written = mem::MaybeUninit::uninit();
590
591            ffi::g_output_stream_writev_all(
592                self.as_ref().to_glib_none().0,
593                mut_override(vectors.as_ptr() as *const _),
594                vectors.len(),
595                bytes_written.as_mut_ptr(),
596                cancellable.map(|p| p.as_ref()).to_glib_none().0,
597                &mut error,
598            );
599            let bytes_written = bytes_written.assume_init();
600            if error.is_null() {
601                Ok((bytes_written, None))
602            } else if bytes_written != 0 {
603                Ok((bytes_written, Some(from_glib_full(error))))
604            } else {
605                Err(from_glib_full(error))
606            }
607        }
608    }
609
610    /// Request an asynchronous write of the bytes contained in the @n_vectors @vectors into
611    /// the stream. When the operation is finished @callback will be called.
612    /// You can then call g_output_stream_writev_all_finish() to get the result of the
613    /// operation.
614    ///
615    /// This is the asynchronous version of g_output_stream_writev_all().
616    ///
617    /// Call g_output_stream_writev_all_finish() to collect the result.
618    ///
619    /// Any outstanding I/O request with higher priority (lower numerical
620    /// value) will be executed before an outstanding request with lower
621    /// priority. Default priority is `G_PRIORITY_DEFAULT`.
622    ///
623    /// Note that no copy of @vectors will be made, so it must stay valid
624    /// until @callback is called. The content of the individual elements
625    /// of @vectors might be changed by this function.
626    /// ## `vectors`
627    /// the buffer containing the #GOutputVectors to write.
628    /// ## `io_priority`
629    /// the I/O priority of the request
630    /// ## `cancellable`
631    /// optional #GCancellable object, [`None`] to ignore
632    /// ## `callback`
633    /// a #GAsyncReadyCallback
634    ///     to call when the request is satisfied
635    #[cfg(feature = "v2_60")]
636    #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
637    #[doc(alias = "g_output_stream_writev_all_async")]
638    fn writev_all_async<
639        B: AsRef<[u8]> + Send + 'static,
640        P: FnOnce(Result<(Vec<B>, usize, Option<glib::Error>), (Vec<B>, glib::Error)>) + 'static,
641    >(
642        &self,
643        vectors: impl IntoIterator<Item = B> + 'static,
644        io_priority: glib::Priority,
645        cancellable: Option<&impl IsA<Cancellable>>,
646        callback: P,
647    ) {
648        let main_context = glib::MainContext::ref_thread_default();
649        let is_main_context_owner = main_context.is_owner();
650        let has_acquired_main_context = (!is_main_context_owner)
651            .then(|| main_context.acquire().ok())
652            .flatten();
653        assert!(
654            is_main_context_owner || has_acquired_main_context.is_some(),
655            "Async operations only allowed if the thread is owning the MainContext"
656        );
657
658        let cancellable = cancellable.map(|c| c.as_ref());
659        let gcancellable = cancellable.to_glib_none();
660        let buffers = vectors.into_iter().collect::<Vec<_>>();
661        let vectors = buffers
662            .iter()
663            .map(|v| ffi::GOutputVector {
664                buffer: v.as_ref().as_ptr() as *const _,
665                size: v.as_ref().len(),
666            })
667            .collect::<Vec<_>>();
668        let vectors_ptr = vectors.as_ptr();
669        let num_vectors = vectors.len();
670        let user_data: Box<(
671            glib::thread_guard::ThreadGuard<P>,
672            Vec<B>,
673            Vec<ffi::GOutputVector>,
674        )> = Box::new((
675            glib::thread_guard::ThreadGuard::new(callback),
676            buffers,
677            vectors,
678        ));
679
680        unsafe extern "C" fn writev_all_async_trampoline<
681            B: AsRef<[u8]> + Send + 'static,
682            P: FnOnce(Result<(Vec<B>, usize, Option<glib::Error>), (Vec<B>, glib::Error)>) + 'static,
683        >(
684            _source_object: *mut glib::gobject_ffi::GObject,
685            res: *mut ffi::GAsyncResult,
686            user_data: glib::ffi::gpointer,
687        ) {
688            let user_data: Box<(
689                glib::thread_guard::ThreadGuard<P>,
690                Vec<B>,
691                Vec<ffi::GOutputVector>,
692            )> = Box::from_raw(user_data as *mut _);
693            let (callback, buffers, _) = *user_data;
694            let callback = callback.into_inner();
695
696            let mut error = ptr::null_mut();
697            let mut bytes_written = mem::MaybeUninit::uninit();
698            ffi::g_output_stream_writev_all_finish(
699                _source_object as *mut _,
700                res,
701                bytes_written.as_mut_ptr(),
702                &mut error,
703            );
704            let bytes_written = bytes_written.assume_init();
705            let result = if error.is_null() {
706                Ok((buffers, bytes_written, None))
707            } else if bytes_written != 0 {
708                Ok((buffers, bytes_written, from_glib_full(error)))
709            } else {
710                Err((buffers, from_glib_full(error)))
711            };
712            callback(result);
713        }
714        let callback = writev_all_async_trampoline::<B, P>;
715        unsafe {
716            ffi::g_output_stream_writev_all_async(
717                self.as_ref().to_glib_none().0,
718                mut_override(vectors_ptr),
719                num_vectors,
720                io_priority.into_glib(),
721                gcancellable.0,
722                Some(callback),
723                Box::into_raw(user_data) as *mut _,
724            );
725        }
726    }
727
728    #[cfg(feature = "v2_60")]
729    #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
730    fn writev_all_future<B: AsRef<[u8]> + Send + 'static>(
731        &self,
732        vectors: impl IntoIterator<Item = B> + 'static,
733        io_priority: glib::Priority,
734    ) -> Pin<
735        Box<
736            dyn std::future::Future<
737                    Output = Result<(Vec<B>, usize, Option<glib::Error>), (Vec<B>, glib::Error)>,
738                > + 'static,
739        >,
740    > {
741        Box::pin(crate::GioFuture::new(
742            self,
743            move |obj, cancellable, send| {
744                obj.writev_all_async(vectors, io_priority, Some(cancellable), move |res| {
745                    send.resolve(res);
746                });
747            },
748        ))
749    }
750
751    fn into_write(self) -> OutputStreamWrite<Self>
752    where
753        Self: IsA<OutputStream>,
754    {
755        OutputStreamWrite(self)
756    }
757}
758
759impl<O: IsA<OutputStream>> OutputStreamExtManual for O {}
760
761#[derive(Debug)]
762pub struct OutputStreamWrite<T: IsA<OutputStream>>(T);
763
764impl<T: IsA<OutputStream>> OutputStreamWrite<T> {
765    pub fn into_output_stream(self) -> T {
766        self.0
767    }
768
769    pub fn output_stream(&self) -> &T {
770        &self.0
771    }
772}
773
774impl<T: IsA<OutputStream>> io::Write for OutputStreamWrite<T> {
775    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
776        let result = self
777            .0
778            .as_ref()
779            .write(buf, crate::Cancellable::NONE)
780            .map(|size| size as usize);
781        to_std_io_result(result)
782    }
783
784    fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
785        let result = self
786            .0
787            .as_ref()
788            .write_all(buf, crate::Cancellable::NONE)
789            .and_then(|(_, e)| e.map(Err).unwrap_or(Ok(())));
790        to_std_io_result(result)
791    }
792
793    #[cfg(feature = "v2_60")]
794    #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
795    fn write_vectored(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
796        let vectors = bufs
797            .iter()
798            .map(|v| OutputVector::new(v))
799            .collect::<smallvec::SmallVec<[_; 2]>>();
800        let result = self.0.as_ref().writev(&vectors, crate::Cancellable::NONE);
801        to_std_io_result(result)
802    }
803
804    fn flush(&mut self) -> io::Result<()> {
805        let gio_result = self.0.as_ref().flush(crate::Cancellable::NONE);
806        to_std_io_result(gio_result)
807    }
808}
809
810impl<T: IsA<OutputStream> + IsA<Seekable>> io::Seek for OutputStreamWrite<T> {
811    fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
812        let (pos, type_) = match pos {
813            io::SeekFrom::Start(pos) => (pos as i64, glib::SeekType::Set),
814            io::SeekFrom::End(pos) => (pos, glib::SeekType::End),
815            io::SeekFrom::Current(pos) => (pos, glib::SeekType::Cur),
816        };
817        let seekable: &Seekable = self.0.as_ref();
818        let gio_result = seekable
819            .seek(pos, type_, crate::Cancellable::NONE)
820            .map(|_| seekable.tell() as u64);
821        to_std_io_result(gio_result)
822    }
823}
824
825#[cfg(test)]
826mod tests {
827    use std::io::Write;
828
829    use glib::Bytes;
830
831    #[cfg(feature = "v2_60")]
832    use crate::OutputVector;
833    use crate::{prelude::*, test_util::run_async, MemoryInputStream, MemoryOutputStream};
834
835    #[test]
836    fn splice_async() {
837        let ret = run_async(|tx, l| {
838            let input = MemoryInputStream::new();
839            let b = Bytes::from_owned(vec![1, 2, 3]);
840            input.add_bytes(&b);
841
842            let strm = MemoryOutputStream::new_resizable();
843            strm.splice_async(
844                &input,
845                crate::OutputStreamSpliceFlags::CLOSE_SOURCE,
846                glib::Priority::DEFAULT_IDLE,
847                crate::Cancellable::NONE,
848                move |ret| {
849                    tx.send(ret).unwrap();
850                    l.quit();
851                },
852            );
853        });
854
855        assert_eq!(ret.unwrap(), 3);
856    }
857
858    #[test]
859    fn write_async() {
860        let ret = run_async(|tx, l| {
861            let strm = MemoryOutputStream::new_resizable();
862
863            let buf = vec![1, 2, 3];
864            strm.write_async(
865                buf,
866                glib::Priority::DEFAULT_IDLE,
867                crate::Cancellable::NONE,
868                move |ret| {
869                    tx.send(ret).unwrap();
870                    l.quit();
871                },
872            );
873        });
874
875        let (buf, size) = ret.unwrap();
876        assert_eq!(buf, vec![1, 2, 3]);
877        assert_eq!(size, 3);
878    }
879
880    #[test]
881    fn write_all_async() {
882        let ret = run_async(|tx, l| {
883            let strm = MemoryOutputStream::new_resizable();
884
885            let buf = vec![1, 2, 3];
886            strm.write_all_async(
887                buf,
888                glib::Priority::DEFAULT_IDLE,
889                crate::Cancellable::NONE,
890                move |ret| {
891                    tx.send(ret).unwrap();
892                    l.quit();
893                },
894            );
895        });
896
897        let (buf, size, err) = ret.unwrap();
898        assert_eq!(buf, vec![1, 2, 3]);
899        assert_eq!(size, 3);
900        assert!(err.is_none());
901    }
902
903    #[test]
904    fn write_bytes_async() {
905        let ret = run_async(|tx, l| {
906            let strm = MemoryOutputStream::new_resizable();
907
908            let b = Bytes::from_owned(vec![1, 2, 3]);
909            strm.write_bytes_async(
910                &b,
911                glib::Priority::DEFAULT_IDLE,
912                crate::Cancellable::NONE,
913                move |ret| {
914                    tx.send(ret).unwrap();
915                    l.quit();
916                },
917            );
918        });
919
920        assert_eq!(ret.unwrap(), 3);
921    }
922
923    #[test]
924    fn std_io_write() {
925        let b = Bytes::from_owned(vec![1, 2, 3]);
926        let mut write = MemoryOutputStream::new_resizable().into_write();
927
928        let ret = write.write(&b);
929
930        let stream = write.into_output_stream();
931        stream.close(crate::Cancellable::NONE).unwrap();
932        assert_eq!(ret.unwrap(), 3);
933        assert_eq!(stream.steal_as_bytes(), [1, 2, 3].as_ref());
934    }
935
936    #[test]
937    fn into_output_stream() {
938        let stream = MemoryOutputStream::new_resizable();
939        let stream_clone = stream.clone();
940        let stream = stream.into_write().into_output_stream();
941
942        assert_eq!(stream, stream_clone);
943    }
944
945    #[test]
946    #[cfg(feature = "v2_60")]
947    fn writev() {
948        let stream = MemoryOutputStream::new_resizable();
949
950        let ret = stream.writev(
951            &[OutputVector::new(&[1, 2, 3]), OutputVector::new(&[4, 5, 6])],
952            crate::Cancellable::NONE,
953        );
954        assert_eq!(ret.unwrap(), 6);
955        stream.close(crate::Cancellable::NONE).unwrap();
956        assert_eq!(stream.steal_as_bytes(), [1, 2, 3, 4, 5, 6].as_ref());
957    }
958
959    #[test]
960    #[cfg(feature = "v2_60")]
961    fn writev_async() {
962        let ret = run_async(|tx, l| {
963            let strm = MemoryOutputStream::new_resizable();
964
965            let strm_clone = strm.clone();
966            strm.writev_async(
967                [vec![1, 2, 3], vec![4, 5, 6]],
968                glib::Priority::DEFAULT_IDLE,
969                crate::Cancellable::NONE,
970                move |ret| {
971                    tx.send(ret).unwrap();
972                    strm_clone.close(crate::Cancellable::NONE).unwrap();
973                    assert_eq!(strm_clone.steal_as_bytes(), [1, 2, 3, 4, 5, 6].as_ref());
974                    l.quit();
975                },
976            );
977        });
978
979        let (buf, size) = ret.unwrap();
980        assert_eq!(buf, [[1, 2, 3], [4, 5, 6]]);
981        assert_eq!(size, 6);
982    }
983
984    #[test]
985    #[cfg(feature = "v2_60")]
986    fn writev_all_async() {
987        let ret = run_async(|tx, l| {
988            let strm = MemoryOutputStream::new_resizable();
989
990            let strm_clone = strm.clone();
991            strm.writev_all_async(
992                [vec![1, 2, 3], vec![4, 5, 6]],
993                glib::Priority::DEFAULT_IDLE,
994                crate::Cancellable::NONE,
995                move |ret| {
996                    tx.send(ret).unwrap();
997                    strm_clone.close(crate::Cancellable::NONE).unwrap();
998                    assert_eq!(strm_clone.steal_as_bytes(), [1, 2, 3, 4, 5, 6].as_ref());
999                    l.quit();
1000                },
1001            );
1002        });
1003
1004        let (buf, size, err) = ret.unwrap();
1005        assert_eq!(buf, [[1, 2, 3], [4, 5, 6]]);
1006        assert_eq!(size, 6);
1007        assert!(err.is_none());
1008    }
1009}