gio/
output_stream.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{io, mem, pin::Pin, ptr};
4
5use glib::{Priority, prelude::*, translate::*};
6
7#[cfg(feature = "v2_60")]
8use crate::OutputVector;
9use crate::{Cancellable, OutputStream, Seekable, error::to_std_io_result, ffi, prelude::*};
10
11pub trait OutputStreamExtManual: IsA<OutputStream> + Sized {
12    /// Request an asynchronous write of @count bytes from @buffer into
13    /// the stream. When the operation is finished @callback will be called.
14    /// You can then call g_output_stream_write_finish() to get the result of the
15    /// operation.
16    ///
17    /// During an async request no other sync and async calls are allowed,
18    /// and will result in [`IOErrorEnum::Pending`][crate::IOErrorEnum::Pending] errors.
19    ///
20    /// A value of @count larger than `G_MAXSSIZE` will cause a
21    /// [`IOErrorEnum::InvalidArgument`][crate::IOErrorEnum::InvalidArgument] error.
22    ///
23    /// On success, the number of bytes written will be passed to the
24    /// @callback. It is not an error if this is not the same as the
25    /// requested size, as it can happen e.g. on a partial I/O error,
26    /// but generally we try to write as many bytes as requested.
27    ///
28    /// You are guaranteed that this method will never fail with
29    /// [`IOErrorEnum::WouldBlock`][crate::IOErrorEnum::WouldBlock] - if @self can't accept more data, the
30    /// method will just wait until this changes.
31    ///
32    /// Any outstanding I/O request with higher priority (lower numerical
33    /// value) will be executed before an outstanding request with lower
34    /// priority. Default priority is `G_PRIORITY_DEFAULT`.
35    ///
36    /// The asynchronous methods have a default fallback that uses threads
37    /// to implement asynchronicity, so they are optional for inheriting
38    /// classes. However, if you override one you must override all.
39    ///
40    /// For the synchronous, blocking version of this function, see
41    /// g_output_stream_write().
42    ///
43    /// Note that no copy of @buffer will be made, so it must stay valid
44    /// until @callback is called. See g_output_stream_write_bytes_async()
45    /// for a #GBytes version that will automatically hold a reference to
46    /// the contents (without copying) for the duration of the call.
47    /// ## `buffer`
48    /// the buffer containing the data to write.
49    /// ## `io_priority`
50    /// the io priority of the request.
51    /// ## `cancellable`
52    /// optional #GCancellable object, [`None`] to ignore.
53    /// ## `callback`
54    /// a #GAsyncReadyCallback
55    ///     to call when the request is satisfied
56    #[doc(alias = "g_output_stream_write_async")]
57    fn write_async<
58        B: AsRef<[u8]> + Send + 'static,
59        Q: FnOnce(Result<(B, usize), (B, glib::Error)>) + 'static,
60        C: IsA<Cancellable>,
61    >(
62        &self,
63        buffer: B,
64        io_priority: Priority,
65        cancellable: Option<&C>,
66        callback: Q,
67    ) {
68        let main_context = glib::MainContext::ref_thread_default();
69        let is_main_context_owner = main_context.is_owner();
70        let has_acquired_main_context = (!is_main_context_owner)
71            .then(|| main_context.acquire().ok())
72            .flatten();
73        assert!(
74            is_main_context_owner || has_acquired_main_context.is_some(),
75            "Async operations only allowed if the thread is owning the MainContext"
76        );
77
78        let cancellable = cancellable.map(|c| c.as_ref());
79        let gcancellable = cancellable.to_glib_none();
80        let user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
81            Box::new((glib::thread_guard::ThreadGuard::new(callback), buffer));
82        // Need to do this after boxing as the contents pointer might change by moving into the box
83        let (count, buffer_ptr) = {
84            let buffer = &user_data.1;
85            let slice = buffer.as_ref();
86            (slice.len(), slice.as_ptr())
87        };
88        unsafe extern "C" fn write_async_trampoline<
89            B: AsRef<[u8]> + Send + 'static,
90            Q: FnOnce(Result<(B, usize), (B, glib::Error)>) + 'static,
91        >(
92            _source_object: *mut glib::gobject_ffi::GObject,
93            res: *mut ffi::GAsyncResult,
94            user_data: glib::ffi::gpointer,
95        ) {
96            unsafe {
97                let user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
98                    Box::from_raw(user_data as *mut _);
99                let (callback, buffer) = *user_data;
100                let callback = callback.into_inner();
101
102                let mut error = ptr::null_mut();
103                let ret =
104                    ffi::g_output_stream_write_finish(_source_object as *mut _, res, &mut error);
105                let result = if error.is_null() {
106                    Ok((buffer, ret as usize))
107                } else {
108                    Err((buffer, from_glib_full(error)))
109                };
110                callback(result);
111            }
112        }
113        let callback = write_async_trampoline::<B, Q>;
114        unsafe {
115            ffi::g_output_stream_write_async(
116                self.as_ref().to_glib_none().0,
117                mut_override(buffer_ptr),
118                count,
119                io_priority.into_glib(),
120                gcancellable.0,
121                Some(callback),
122                Box::into_raw(user_data) as *mut _,
123            );
124        }
125    }
126
127    /// Tries to write @count bytes from @buffer into the stream. Will block
128    /// during the operation.
129    ///
130    /// This function is similar to g_output_stream_write(), except it tries to
131    /// write as many bytes as requested, only stopping on an error.
132    ///
133    /// On a successful write of @count bytes, [`true`] is returned, and @bytes_written
134    /// is set to @count.
135    ///
136    /// If there is an error during the operation [`false`] is returned and @error
137    /// is set to indicate the error status.
138    ///
139    /// As a special exception to the normal conventions for functions that
140    /// use #GError, if this function returns [`false`] (and sets @error) then
141    /// @bytes_written will be set to the number of bytes that were
142    /// successfully written before the error was encountered.  This
143    /// functionality is only available from C.  If you need it from another
144    /// language then you must write your own loop around
145    /// g_output_stream_write().
146    /// ## `buffer`
147    /// the buffer containing the data to write.
148    /// ## `cancellable`
149    /// optional #GCancellable object, [`None`] to ignore.
150    ///
151    /// # Returns
152    ///
153    /// [`true`] on success, [`false`] if there was an error
154    ///
155    /// ## `bytes_written`
156    /// location to store the number of bytes that was
157    ///     written to the stream
158    #[doc(alias = "g_output_stream_write_all")]
159    fn write_all<C: IsA<Cancellable>>(
160        &self,
161        buffer: &[u8],
162        cancellable: Option<&C>,
163    ) -> Result<(usize, Option<glib::Error>), glib::Error> {
164        let cancellable = cancellable.map(|c| c.as_ref());
165        let gcancellable = cancellable.to_glib_none();
166        let count = buffer.len();
167        unsafe {
168            let mut bytes_written = mem::MaybeUninit::uninit();
169            let mut error = ptr::null_mut();
170            let _ = ffi::g_output_stream_write_all(
171                self.as_ref().to_glib_none().0,
172                buffer.to_glib_none().0,
173                count,
174                bytes_written.as_mut_ptr(),
175                gcancellable.0,
176                &mut error,
177            );
178
179            let bytes_written = bytes_written.assume_init();
180            if error.is_null() {
181                Ok((bytes_written, None))
182            } else if bytes_written != 0 {
183                Ok((bytes_written, Some(from_glib_full(error))))
184            } else {
185                Err(from_glib_full(error))
186            }
187        }
188    }
189
190    /// Request an asynchronous write of @count bytes from @buffer into
191    /// the stream. When the operation is finished @callback will be called.
192    /// You can then call g_output_stream_write_all_finish() to get the result of the
193    /// operation.
194    ///
195    /// This is the asynchronous version of g_output_stream_write_all().
196    ///
197    /// Call g_output_stream_write_all_finish() to collect the result.
198    ///
199    /// Any outstanding I/O request with higher priority (lower numerical
200    /// value) will be executed before an outstanding request with lower
201    /// priority. Default priority is `G_PRIORITY_DEFAULT`.
202    ///
203    /// Note that no copy of @buffer will be made, so it must stay valid
204    /// until @callback is called.
205    /// ## `buffer`
206    /// the buffer containing the data to write
207    /// ## `io_priority`
208    /// the io priority of the request
209    /// ## `cancellable`
210    /// optional #GCancellable object, [`None`] to ignore
211    /// ## `callback`
212    /// a #GAsyncReadyCallback
213    ///     to call when the request is satisfied
214    #[doc(alias = "g_output_stream_write_all_async")]
215    fn write_all_async<
216        B: AsRef<[u8]> + Send + 'static,
217        Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + 'static,
218        C: IsA<Cancellable>,
219    >(
220        &self,
221        buffer: B,
222        io_priority: Priority,
223        cancellable: Option<&C>,
224        callback: Q,
225    ) {
226        let main_context = glib::MainContext::ref_thread_default();
227        let is_main_context_owner = main_context.is_owner();
228        let has_acquired_main_context = (!is_main_context_owner)
229            .then(|| main_context.acquire().ok())
230            .flatten();
231        assert!(
232            is_main_context_owner || has_acquired_main_context.is_some(),
233            "Async operations only allowed if the thread is owning the MainContext"
234        );
235
236        let cancellable = cancellable.map(|c| c.as_ref());
237        let gcancellable = cancellable.to_glib_none();
238        let user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
239            Box::new((glib::thread_guard::ThreadGuard::new(callback), buffer));
240        // Need to do this after boxing as the contents pointer might change by moving into the box
241        let (count, buffer_ptr) = {
242            let buffer = &user_data.1;
243            let slice = buffer.as_ref();
244            (slice.len(), slice.as_ptr())
245        };
246        unsafe extern "C" fn write_all_async_trampoline<
247            B: AsRef<[u8]> + Send + 'static,
248            Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + 'static,
249        >(
250            _source_object: *mut glib::gobject_ffi::GObject,
251            res: *mut ffi::GAsyncResult,
252            user_data: glib::ffi::gpointer,
253        ) {
254            unsafe {
255                let user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
256                    Box::from_raw(user_data as *mut _);
257                let (callback, buffer) = *user_data;
258                let callback = callback.into_inner();
259
260                let mut error = ptr::null_mut();
261                let mut bytes_written = mem::MaybeUninit::uninit();
262                let _ = ffi::g_output_stream_write_all_finish(
263                    _source_object as *mut _,
264                    res,
265                    bytes_written.as_mut_ptr(),
266                    &mut error,
267                );
268                let bytes_written = bytes_written.assume_init();
269                let result = if error.is_null() {
270                    Ok((buffer, bytes_written, None))
271                } else if bytes_written != 0 {
272                    Ok((buffer, bytes_written, from_glib_full(error)))
273                } else {
274                    Err((buffer, from_glib_full(error)))
275                };
276                callback(result);
277            }
278        }
279        let callback = write_all_async_trampoline::<B, Q>;
280        unsafe {
281            ffi::g_output_stream_write_all_async(
282                self.as_ref().to_glib_none().0,
283                mut_override(buffer_ptr),
284                count,
285                io_priority.into_glib(),
286                gcancellable.0,
287                Some(callback),
288                Box::into_raw(user_data) as *mut _,
289            );
290        }
291    }
292
293    fn write_future<B: AsRef<[u8]> + Send + 'static>(
294        &self,
295        buffer: B,
296        io_priority: Priority,
297    ) -> Pin<Box<dyn std::future::Future<Output = Result<(B, usize), (B, glib::Error)>> + 'static>>
298    {
299        Box::pin(crate::GioFuture::new(
300            self,
301            move |obj, cancellable, send| {
302                obj.write_async(buffer, io_priority, Some(cancellable), move |res| {
303                    send.resolve(res);
304                });
305            },
306        ))
307    }
308
309    fn write_all_future<B: AsRef<[u8]> + Send + 'static>(
310        &self,
311        buffer: B,
312        io_priority: Priority,
313    ) -> Pin<
314        Box<
315            dyn std::future::Future<
316                    Output = Result<(B, usize, Option<glib::Error>), (B, glib::Error)>,
317                > + 'static,
318        >,
319    > {
320        Box::pin(crate::GioFuture::new(
321            self,
322            move |obj, cancellable, send| {
323                obj.write_all_async(buffer, io_priority, Some(cancellable), move |res| {
324                    send.resolve(res);
325                });
326            },
327        ))
328    }
329
330    /// Tries to write the bytes contained in the @n_vectors @vectors into the
331    /// stream. Will block during the operation.
332    ///
333    /// If @n_vectors is 0 or the sum of all bytes in @vectors is 0, returns 0 and
334    /// does nothing.
335    ///
336    /// On success, the number of bytes written to the stream is returned.
337    /// It is not an error if this is not the same as the requested size, as it
338    /// can happen e.g. on a partial I/O error, or if there is not enough
339    /// storage in the stream. All writes block until at least one byte
340    /// is written or an error occurs; 0 is never returned (unless
341    /// @n_vectors is 0 or the sum of all bytes in @vectors is 0).
342    ///
343    /// If @cancellable is not [`None`], then the operation can be cancelled by
344    /// triggering the cancellable object from another thread. If the operation
345    /// was cancelled, the error [`IOErrorEnum::Cancelled`][crate::IOErrorEnum::Cancelled] will be returned. If an
346    /// operation was partially finished when the operation was cancelled the
347    /// partial result will be returned, without an error.
348    ///
349    /// Some implementations of g_output_stream_writev() may have limitations on the
350    /// aggregate buffer size, and will return [`IOErrorEnum::InvalidArgument`][crate::IOErrorEnum::InvalidArgument] if these
351    /// are exceeded. For example, when writing to a local file on UNIX platforms,
352    /// the aggregate buffer size must not exceed `G_MAXSSIZE` bytes.
353    /// ## `vectors`
354    /// the buffer containing the #GOutputVectors to write.
355    /// ## `cancellable`
356    /// optional cancellable object
357    ///
358    /// # Returns
359    ///
360    /// [`true`] on success, [`false`] if there was an error
361    ///
362    /// ## `bytes_written`
363    /// location to store the number of bytes that were
364    ///     written to the stream
365    #[cfg(feature = "v2_60")]
366    #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
367    #[doc(alias = "g_output_stream_writev")]
368    fn writev(
369        &self,
370        vectors: &[OutputVector],
371        cancellable: Option<&impl IsA<Cancellable>>,
372    ) -> Result<usize, glib::Error> {
373        unsafe {
374            let mut error = ptr::null_mut();
375            let mut bytes_written = mem::MaybeUninit::uninit();
376
377            ffi::g_output_stream_writev(
378                self.as_ref().to_glib_none().0,
379                vectors.as_ptr() as *const _,
380                vectors.len(),
381                bytes_written.as_mut_ptr(),
382                cancellable.map(|p| p.as_ref()).to_glib_none().0,
383                &mut error,
384            );
385            if error.is_null() {
386                Ok(bytes_written.assume_init())
387            } else {
388                Err(from_glib_full(error))
389            }
390        }
391    }
392
393    /// Request an asynchronous write of the bytes contained in @n_vectors @vectors into
394    /// the stream. When the operation is finished @callback will be called.
395    /// You can then call g_output_stream_writev_finish() to get the result of the
396    /// operation.
397    ///
398    /// During an async request no other sync and async calls are allowed,
399    /// and will result in [`IOErrorEnum::Pending`][crate::IOErrorEnum::Pending] errors.
400    ///
401    /// On success, the number of bytes written will be passed to the
402    /// @callback. It is not an error if this is not the same as the
403    /// requested size, as it can happen e.g. on a partial I/O error,
404    /// but generally we try to write as many bytes as requested.
405    ///
406    /// You are guaranteed that this method will never fail with
407    /// [`IOErrorEnum::WouldBlock`][crate::IOErrorEnum::WouldBlock] — if @self can't accept more data, the
408    /// method will just wait until this changes.
409    ///
410    /// Any outstanding I/O request with higher priority (lower numerical
411    /// value) will be executed before an outstanding request with lower
412    /// priority. Default priority is `G_PRIORITY_DEFAULT`.
413    ///
414    /// The asynchronous methods have a default fallback that uses threads
415    /// to implement asynchronicity, so they are optional for inheriting
416    /// classes. However, if you override one you must override all.
417    ///
418    /// For the synchronous, blocking version of this function, see
419    /// g_output_stream_writev().
420    ///
421    /// Note that no copy of @vectors will be made, so it must stay valid
422    /// until @callback is called.
423    /// ## `vectors`
424    /// the buffer containing the #GOutputVectors to write.
425    /// ## `io_priority`
426    /// the I/O priority of the request.
427    /// ## `cancellable`
428    /// optional #GCancellable object, [`None`] to ignore.
429    /// ## `callback`
430    /// a #GAsyncReadyCallback
431    ///     to call when the request is satisfied
432    #[cfg(feature = "v2_60")]
433    #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
434    #[doc(alias = "g_output_stream_writev_async")]
435    fn writev_async<
436        B: AsRef<[u8]> + Send + 'static,
437        P: FnOnce(Result<(Vec<B>, usize), (Vec<B>, glib::Error)>) + 'static,
438    >(
439        &self,
440        vectors: impl IntoIterator<Item = B> + 'static,
441        io_priority: glib::Priority,
442        cancellable: Option<&impl IsA<Cancellable>>,
443        callback: P,
444    ) {
445        let main_context = glib::MainContext::ref_thread_default();
446        let is_main_context_owner = main_context.is_owner();
447        let has_acquired_main_context = (!is_main_context_owner)
448            .then(|| main_context.acquire().ok())
449            .flatten();
450        assert!(
451            is_main_context_owner || has_acquired_main_context.is_some(),
452            "Async operations only allowed if the thread is owning the MainContext"
453        );
454
455        let cancellable = cancellable.map(|c| c.as_ref());
456        let gcancellable = cancellable.to_glib_none();
457        let buffers = vectors.into_iter().collect::<Vec<_>>();
458        let vectors = buffers
459            .iter()
460            .map(|v| ffi::GOutputVector {
461                buffer: v.as_ref().as_ptr() as *const _,
462                size: v.as_ref().len(),
463            })
464            .collect::<Vec<_>>();
465        let vectors_ptr = vectors.as_ptr();
466        let num_vectors = vectors.len();
467        let user_data: Box<(
468            glib::thread_guard::ThreadGuard<P>,
469            Vec<B>,
470            Vec<ffi::GOutputVector>,
471        )> = Box::new((
472            glib::thread_guard::ThreadGuard::new(callback),
473            buffers,
474            vectors,
475        ));
476
477        unsafe extern "C" fn writev_async_trampoline<
478            B: AsRef<[u8]> + Send + 'static,
479            P: FnOnce(Result<(Vec<B>, usize), (Vec<B>, glib::Error)>) + 'static,
480        >(
481            _source_object: *mut glib::gobject_ffi::GObject,
482            res: *mut ffi::GAsyncResult,
483            user_data: glib::ffi::gpointer,
484        ) {
485            unsafe {
486                let user_data: Box<(
487                    glib::thread_guard::ThreadGuard<P>,
488                    Vec<B>,
489                    Vec<ffi::GOutputVector>,
490                )> = Box::from_raw(user_data as *mut _);
491                let (callback, buffers, _) = *user_data;
492                let callback = callback.into_inner();
493
494                let mut error = ptr::null_mut();
495                let mut bytes_written = mem::MaybeUninit::uninit();
496                ffi::g_output_stream_writev_finish(
497                    _source_object as *mut _,
498                    res,
499                    bytes_written.as_mut_ptr(),
500                    &mut error,
501                );
502                let bytes_written = bytes_written.assume_init();
503                let result = if error.is_null() {
504                    Ok((buffers, bytes_written))
505                } else {
506                    Err((buffers, from_glib_full(error)))
507                };
508                callback(result);
509            }
510        }
511        let callback = writev_async_trampoline::<B, P>;
512        unsafe {
513            ffi::g_output_stream_writev_async(
514                self.as_ref().to_glib_none().0,
515                vectors_ptr,
516                num_vectors,
517                io_priority.into_glib(),
518                gcancellable.0,
519                Some(callback),
520                Box::into_raw(user_data) as *mut _,
521            );
522        }
523    }
524
525    #[cfg(feature = "v2_60")]
526    #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
527    fn writev_future<B: AsRef<[u8]> + Send + 'static>(
528        &self,
529        vectors: impl IntoIterator<Item = B> + 'static,
530        io_priority: glib::Priority,
531    ) -> Pin<
532        Box<
533            dyn std::future::Future<Output = Result<(Vec<B>, usize), (Vec<B>, glib::Error)>>
534                + 'static,
535        >,
536    > {
537        Box::pin(crate::GioFuture::new(
538            self,
539            move |obj, cancellable, send| {
540                obj.writev_async(vectors, io_priority, Some(cancellable), move |res| {
541                    send.resolve(res);
542                });
543            },
544        ))
545    }
546
547    /// Tries to write the bytes contained in the @n_vectors @vectors into the
548    /// stream. Will block during the operation.
549    ///
550    /// This function is similar to g_output_stream_writev(), except it tries to
551    /// write as many bytes as requested, only stopping on an error.
552    ///
553    /// On a successful write of all @n_vectors vectors, [`true`] is returned, and
554    /// @bytes_written is set to the sum of all the sizes of @vectors.
555    ///
556    /// If there is an error during the operation [`false`] is returned and @error
557    /// is set to indicate the error status.
558    ///
559    /// As a special exception to the normal conventions for functions that
560    /// use #GError, if this function returns [`false`] (and sets @error) then
561    /// @bytes_written will be set to the number of bytes that were
562    /// successfully written before the error was encountered.  This
563    /// functionality is only available from C. If you need it from another
564    /// language then you must write your own loop around
565    /// g_output_stream_write().
566    ///
567    /// The content of the individual elements of @vectors might be changed by this
568    /// function.
569    /// ## `vectors`
570    /// the buffer containing the #GOutputVectors to write.
571    /// ## `cancellable`
572    /// optional #GCancellable object, [`None`] to ignore.
573    ///
574    /// # Returns
575    ///
576    /// [`true`] on success, [`false`] if there was an error
577    ///
578    /// ## `bytes_written`
579    /// location to store the number of bytes that were
580    ///     written to the stream
581    #[cfg(feature = "v2_60")]
582    #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
583    #[doc(alias = "g_output_stream_writev_all")]
584    fn writev_all(
585        &self,
586        vectors: &[OutputVector],
587        cancellable: Option<&impl IsA<Cancellable>>,
588    ) -> Result<(usize, Option<glib::Error>), glib::Error> {
589        unsafe {
590            let mut error = ptr::null_mut();
591            let mut bytes_written = mem::MaybeUninit::uninit();
592
593            ffi::g_output_stream_writev_all(
594                self.as_ref().to_glib_none().0,
595                mut_override(vectors.as_ptr() as *const _),
596                vectors.len(),
597                bytes_written.as_mut_ptr(),
598                cancellable.map(|p| p.as_ref()).to_glib_none().0,
599                &mut error,
600            );
601            let bytes_written = bytes_written.assume_init();
602            if error.is_null() {
603                Ok((bytes_written, None))
604            } else if bytes_written != 0 {
605                Ok((bytes_written, Some(from_glib_full(error))))
606            } else {
607                Err(from_glib_full(error))
608            }
609        }
610    }
611
612    /// Request an asynchronous write of the bytes contained in the @n_vectors @vectors into
613    /// the stream. When the operation is finished @callback will be called.
614    /// You can then call g_output_stream_writev_all_finish() to get the result of the
615    /// operation.
616    ///
617    /// This is the asynchronous version of g_output_stream_writev_all().
618    ///
619    /// Call g_output_stream_writev_all_finish() to collect the result.
620    ///
621    /// Any outstanding I/O request with higher priority (lower numerical
622    /// value) will be executed before an outstanding request with lower
623    /// priority. Default priority is `G_PRIORITY_DEFAULT`.
624    ///
625    /// Note that no copy of @vectors will be made, so it must stay valid
626    /// until @callback is called. The content of the individual elements
627    /// of @vectors might be changed by this function.
628    /// ## `vectors`
629    /// the buffer containing the #GOutputVectors to write.
630    /// ## `io_priority`
631    /// the I/O priority of the request
632    /// ## `cancellable`
633    /// optional #GCancellable object, [`None`] to ignore
634    /// ## `callback`
635    /// a #GAsyncReadyCallback
636    ///     to call when the request is satisfied
637    #[cfg(feature = "v2_60")]
638    #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
639    #[doc(alias = "g_output_stream_writev_all_async")]
640    fn writev_all_async<
641        B: AsRef<[u8]> + Send + 'static,
642        P: FnOnce(Result<(Vec<B>, usize, Option<glib::Error>), (Vec<B>, glib::Error)>) + 'static,
643    >(
644        &self,
645        vectors: impl IntoIterator<Item = B> + 'static,
646        io_priority: glib::Priority,
647        cancellable: Option<&impl IsA<Cancellable>>,
648        callback: P,
649    ) {
650        let main_context = glib::MainContext::ref_thread_default();
651        let is_main_context_owner = main_context.is_owner();
652        let has_acquired_main_context = (!is_main_context_owner)
653            .then(|| main_context.acquire().ok())
654            .flatten();
655        assert!(
656            is_main_context_owner || has_acquired_main_context.is_some(),
657            "Async operations only allowed if the thread is owning the MainContext"
658        );
659
660        let cancellable = cancellable.map(|c| c.as_ref());
661        let gcancellable = cancellable.to_glib_none();
662        let buffers = vectors.into_iter().collect::<Vec<_>>();
663        let vectors = buffers
664            .iter()
665            .map(|v| ffi::GOutputVector {
666                buffer: v.as_ref().as_ptr() as *const _,
667                size: v.as_ref().len(),
668            })
669            .collect::<Vec<_>>();
670        let vectors_ptr = vectors.as_ptr();
671        let num_vectors = vectors.len();
672        let user_data: Box<(
673            glib::thread_guard::ThreadGuard<P>,
674            Vec<B>,
675            Vec<ffi::GOutputVector>,
676        )> = Box::new((
677            glib::thread_guard::ThreadGuard::new(callback),
678            buffers,
679            vectors,
680        ));
681
682        unsafe extern "C" fn writev_all_async_trampoline<
683            B: AsRef<[u8]> + Send + 'static,
684            P: FnOnce(Result<(Vec<B>, usize, Option<glib::Error>), (Vec<B>, glib::Error)>) + 'static,
685        >(
686            _source_object: *mut glib::gobject_ffi::GObject,
687            res: *mut ffi::GAsyncResult,
688            user_data: glib::ffi::gpointer,
689        ) {
690            unsafe {
691                let user_data: Box<(
692                    glib::thread_guard::ThreadGuard<P>,
693                    Vec<B>,
694                    Vec<ffi::GOutputVector>,
695                )> = Box::from_raw(user_data as *mut _);
696                let (callback, buffers, _) = *user_data;
697                let callback = callback.into_inner();
698
699                let mut error = ptr::null_mut();
700                let mut bytes_written = mem::MaybeUninit::uninit();
701                ffi::g_output_stream_writev_all_finish(
702                    _source_object as *mut _,
703                    res,
704                    bytes_written.as_mut_ptr(),
705                    &mut error,
706                );
707                let bytes_written = bytes_written.assume_init();
708                let result = if error.is_null() {
709                    Ok((buffers, bytes_written, None))
710                } else if bytes_written != 0 {
711                    Ok((buffers, bytes_written, from_glib_full(error)))
712                } else {
713                    Err((buffers, from_glib_full(error)))
714                };
715                callback(result);
716            }
717        }
718        let callback = writev_all_async_trampoline::<B, P>;
719        unsafe {
720            ffi::g_output_stream_writev_all_async(
721                self.as_ref().to_glib_none().0,
722                mut_override(vectors_ptr),
723                num_vectors,
724                io_priority.into_glib(),
725                gcancellable.0,
726                Some(callback),
727                Box::into_raw(user_data) as *mut _,
728            );
729        }
730    }
731
732    #[cfg(feature = "v2_60")]
733    #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
734    fn writev_all_future<B: AsRef<[u8]> + Send + 'static>(
735        &self,
736        vectors: impl IntoIterator<Item = B> + 'static,
737        io_priority: glib::Priority,
738    ) -> Pin<
739        Box<
740            dyn std::future::Future<
741                    Output = Result<(Vec<B>, usize, Option<glib::Error>), (Vec<B>, glib::Error)>,
742                > + 'static,
743        >,
744    > {
745        Box::pin(crate::GioFuture::new(
746            self,
747            move |obj, cancellable, send| {
748                obj.writev_all_async(vectors, io_priority, Some(cancellable), move |res| {
749                    send.resolve(res);
750                });
751            },
752        ))
753    }
754
755    fn into_write(self) -> OutputStreamWrite<Self>
756    where
757        Self: IsA<OutputStream>,
758    {
759        OutputStreamWrite(self)
760    }
761}
762
763impl<O: IsA<OutputStream>> OutputStreamExtManual for O {}
764
765#[derive(Debug)]
766pub struct OutputStreamWrite<T: IsA<OutputStream>>(T);
767
768impl<T: IsA<OutputStream>> OutputStreamWrite<T> {
769    pub fn into_output_stream(self) -> T {
770        self.0
771    }
772
773    pub fn output_stream(&self) -> &T {
774        &self.0
775    }
776}
777
778impl<T: IsA<OutputStream>> io::Write for OutputStreamWrite<T> {
779    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
780        let result = self
781            .0
782            .as_ref()
783            .write(buf, crate::Cancellable::NONE)
784            .map(|size| size as usize);
785        to_std_io_result(result)
786    }
787
788    fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
789        let result = self
790            .0
791            .as_ref()
792            .write_all(buf, crate::Cancellable::NONE)
793            .and_then(|(_, e)| e.map(Err).unwrap_or(Ok(())));
794        to_std_io_result(result)
795    }
796
797    #[cfg(feature = "v2_60")]
798    #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
799    fn write_vectored(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
800        let vectors = bufs
801            .iter()
802            .map(|v| OutputVector::new(v))
803            .collect::<smallvec::SmallVec<[_; 2]>>();
804        let result = self.0.as_ref().writev(&vectors, crate::Cancellable::NONE);
805        to_std_io_result(result)
806    }
807
808    fn flush(&mut self) -> io::Result<()> {
809        let gio_result = self.0.as_ref().flush(crate::Cancellable::NONE);
810        to_std_io_result(gio_result)
811    }
812}
813
814impl<T: IsA<OutputStream> + IsA<Seekable>> io::Seek for OutputStreamWrite<T> {
815    fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
816        let (pos, type_) = match pos {
817            io::SeekFrom::Start(pos) => (pos as i64, glib::SeekType::Set),
818            io::SeekFrom::End(pos) => (pos, glib::SeekType::End),
819            io::SeekFrom::Current(pos) => (pos, glib::SeekType::Cur),
820        };
821        let seekable: &Seekable = self.0.as_ref();
822        let gio_result = seekable
823            .seek(pos, type_, crate::Cancellable::NONE)
824            .map(|_| seekable.tell() as u64);
825        to_std_io_result(gio_result)
826    }
827}
828
829#[cfg(test)]
830mod tests {
831    use std::io::Write;
832
833    use glib::Bytes;
834
835    #[cfg(feature = "v2_60")]
836    use crate::OutputVector;
837    use crate::{MemoryInputStream, MemoryOutputStream, prelude::*, test_util::run_async};
838
839    #[test]
840    fn splice_async() {
841        let ret = run_async(|tx, l| {
842            let input = MemoryInputStream::new();
843            let b = Bytes::from_owned(vec![1, 2, 3]);
844            input.add_bytes(&b);
845
846            let strm = MemoryOutputStream::new_resizable();
847            strm.splice_async(
848                &input,
849                crate::OutputStreamSpliceFlags::CLOSE_SOURCE,
850                glib::Priority::DEFAULT_IDLE,
851                crate::Cancellable::NONE,
852                move |ret| {
853                    tx.send(ret).unwrap();
854                    l.quit();
855                },
856            );
857        });
858
859        assert_eq!(ret.unwrap(), 3);
860    }
861
862    #[test]
863    fn write_async() {
864        let ret = run_async(|tx, l| {
865            let strm = MemoryOutputStream::new_resizable();
866
867            let buf = vec![1, 2, 3];
868            strm.write_async(
869                buf,
870                glib::Priority::DEFAULT_IDLE,
871                crate::Cancellable::NONE,
872                move |ret| {
873                    tx.send(ret).unwrap();
874                    l.quit();
875                },
876            );
877        });
878
879        let (buf, size) = ret.unwrap();
880        assert_eq!(buf, vec![1, 2, 3]);
881        assert_eq!(size, 3);
882    }
883
884    #[test]
885    fn write_all_async() {
886        let ret = run_async(|tx, l| {
887            let strm = MemoryOutputStream::new_resizable();
888
889            let buf = vec![1, 2, 3];
890            strm.write_all_async(
891                buf,
892                glib::Priority::DEFAULT_IDLE,
893                crate::Cancellable::NONE,
894                move |ret| {
895                    tx.send(ret).unwrap();
896                    l.quit();
897                },
898            );
899        });
900
901        let (buf, size, err) = ret.unwrap();
902        assert_eq!(buf, vec![1, 2, 3]);
903        assert_eq!(size, 3);
904        assert!(err.is_none());
905    }
906
907    #[test]
908    fn write_bytes_async() {
909        let ret = run_async(|tx, l| {
910            let strm = MemoryOutputStream::new_resizable();
911
912            let b = Bytes::from_owned(vec![1, 2, 3]);
913            strm.write_bytes_async(
914                &b,
915                glib::Priority::DEFAULT_IDLE,
916                crate::Cancellable::NONE,
917                move |ret| {
918                    tx.send(ret).unwrap();
919                    l.quit();
920                },
921            );
922        });
923
924        assert_eq!(ret.unwrap(), 3);
925    }
926
927    #[test]
928    fn std_io_write() {
929        let b = Bytes::from_owned(vec![1, 2, 3]);
930        let mut write = MemoryOutputStream::new_resizable().into_write();
931
932        let ret = write.write(&b);
933
934        let stream = write.into_output_stream();
935        stream.close(crate::Cancellable::NONE).unwrap();
936        assert_eq!(ret.unwrap(), 3);
937        assert_eq!(stream.steal_as_bytes(), [1, 2, 3].as_ref());
938    }
939
940    #[test]
941    fn into_output_stream() {
942        let stream = MemoryOutputStream::new_resizable();
943        let stream_clone = stream.clone();
944        let stream = stream.into_write().into_output_stream();
945
946        assert_eq!(stream, stream_clone);
947    }
948
949    #[test]
950    #[cfg(feature = "v2_60")]
951    fn writev() {
952        let stream = MemoryOutputStream::new_resizable();
953
954        let ret = stream.writev(
955            &[OutputVector::new(&[1, 2, 3]), OutputVector::new(&[4, 5, 6])],
956            crate::Cancellable::NONE,
957        );
958        assert_eq!(ret.unwrap(), 6);
959        stream.close(crate::Cancellable::NONE).unwrap();
960        assert_eq!(stream.steal_as_bytes(), [1, 2, 3, 4, 5, 6].as_ref());
961    }
962
963    #[test]
964    #[cfg(feature = "v2_60")]
965    fn writev_async() {
966        let ret = run_async(|tx, l| {
967            let strm = MemoryOutputStream::new_resizable();
968
969            let strm_clone = strm.clone();
970            strm.writev_async(
971                [vec![1, 2, 3], vec![4, 5, 6]],
972                glib::Priority::DEFAULT_IDLE,
973                crate::Cancellable::NONE,
974                move |ret| {
975                    tx.send(ret).unwrap();
976                    strm_clone.close(crate::Cancellable::NONE).unwrap();
977                    assert_eq!(strm_clone.steal_as_bytes(), [1, 2, 3, 4, 5, 6].as_ref());
978                    l.quit();
979                },
980            );
981        });
982
983        let (buf, size) = ret.unwrap();
984        assert_eq!(buf, [[1, 2, 3], [4, 5, 6]]);
985        assert_eq!(size, 6);
986    }
987
988    #[test]
989    #[cfg(feature = "v2_60")]
990    fn writev_all_async() {
991        let ret = run_async(|tx, l| {
992            let strm = MemoryOutputStream::new_resizable();
993
994            let strm_clone = strm.clone();
995            strm.writev_all_async(
996                [vec![1, 2, 3], vec![4, 5, 6]],
997                glib::Priority::DEFAULT_IDLE,
998                crate::Cancellable::NONE,
999                move |ret| {
1000                    tx.send(ret).unwrap();
1001                    strm_clone.close(crate::Cancellable::NONE).unwrap();
1002                    assert_eq!(strm_clone.steal_as_bytes(), [1, 2, 3, 4, 5, 6].as_ref());
1003                    l.quit();
1004                },
1005            );
1006        });
1007
1008        let (buf, size, err) = ret.unwrap();
1009        assert_eq!(buf, [[1, 2, 3], [4, 5, 6]]);
1010        assert_eq!(size, 6);
1011        assert!(err.is_none());
1012    }
1013}