gio/
pollable_output_stream.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{cell::RefCell, io, mem::transmute, pin::Pin};
4
5use futures_channel::oneshot;
6use futures_core::{
7    stream::Stream,
8    task::{Context, Poll},
9    Future,
10};
11use futures_io::AsyncWrite;
12use glib::{prelude::*, translate::*};
13
14use crate::{error::to_std_io_result, ffi, prelude::*, Cancellable, PollableOutputStream};
15#[cfg(feature = "v2_60")]
16use crate::{OutputVector, PollableReturn};
17
18mod sealed {
19    pub trait Sealed {}
20    impl<T: super::IsA<super::PollableOutputStream>> Sealed for T {}
21}
22
23pub trait PollableOutputStreamExtManual: sealed::Sealed + IsA<PollableOutputStream> {
24    /// Creates a #GSource that triggers when @self can be written, or
25    /// @cancellable is triggered or an error occurs. The callback on the
26    /// source is of the #GPollableSourceFunc type.
27    ///
28    /// As with g_pollable_output_stream_is_writable(), it is possible that
29    /// the stream may not actually be writable even after the source
30    /// triggers, so you should use g_pollable_output_stream_write_nonblocking()
31    /// rather than g_output_stream_write() from the callback.
32    ///
33    /// The behaviour of this method is undefined if
34    /// g_pollable_output_stream_can_poll() returns [`false`] for @self.
35    /// ## `cancellable`
36    /// a #GCancellable, or [`None`]
37    ///
38    /// # Returns
39    ///
40    /// a new #GSource
41    #[doc(alias = "g_pollable_output_stream_create_source")]
42    fn create_source<F, C>(
43        &self,
44        cancellable: Option<&C>,
45        name: Option<&str>,
46        priority: glib::Priority,
47        func: F,
48    ) -> glib::Source
49    where
50        F: FnMut(&Self) -> glib::ControlFlow + 'static,
51        C: IsA<Cancellable>,
52    {
53        unsafe extern "C" fn trampoline<
54            O: IsA<PollableOutputStream>,
55            F: FnMut(&O) -> glib::ControlFlow + 'static,
56        >(
57            stream: *mut ffi::GPollableOutputStream,
58            func: glib::ffi::gpointer,
59        ) -> glib::ffi::gboolean {
60            let func: &RefCell<F> = &*(func as *const RefCell<F>);
61            let mut func = func.borrow_mut();
62            (*func)(PollableOutputStream::from_glib_borrow(stream).unsafe_cast_ref()).into_glib()
63        }
64        unsafe extern "C" fn destroy_closure<F>(ptr: glib::ffi::gpointer) {
65            let _ = Box::<RefCell<F>>::from_raw(ptr as *mut _);
66        }
67        let cancellable = cancellable.map(|c| c.as_ref());
68        let gcancellable = cancellable.to_glib_none();
69        unsafe {
70            let source = ffi::g_pollable_output_stream_create_source(
71                self.as_ref().to_glib_none().0,
72                gcancellable.0,
73            );
74
75            let trampoline = trampoline::<Self, F> as glib::ffi::gpointer;
76            glib::ffi::g_source_set_callback(
77                source,
78                Some(transmute::<
79                    glib::ffi::gpointer,
80                    unsafe extern "C" fn(glib::ffi::gpointer) -> glib::ffi::gboolean,
81                >(trampoline)),
82                Box::into_raw(Box::new(RefCell::new(func))) as glib::ffi::gpointer,
83                Some(destroy_closure::<F>),
84            );
85            glib::ffi::g_source_set_priority(source, priority.into_glib());
86
87            if let Some(name) = name {
88                glib::ffi::g_source_set_name(source, name.to_glib_none().0);
89            }
90
91            from_glib_full(source)
92        }
93    }
94
95    fn create_source_future<C: IsA<Cancellable>>(
96        &self,
97        cancellable: Option<&C>,
98        priority: glib::Priority,
99    ) -> Pin<Box<dyn std::future::Future<Output = ()> + 'static>> {
100        let cancellable: Option<Cancellable> = cancellable.map(|c| c.as_ref()).cloned();
101
102        let obj = self.clone();
103        Box::pin(glib::SourceFuture::new(move |send| {
104            let mut send = Some(send);
105            obj.create_source(cancellable.as_ref(), None, priority, move |_| {
106                let _ = send.take().unwrap().send(());
107                glib::ControlFlow::Break
108            })
109        }))
110    }
111
112    fn create_source_stream<C: IsA<Cancellable>>(
113        &self,
114        cancellable: Option<&C>,
115        priority: glib::Priority,
116    ) -> Pin<Box<dyn Stream<Item = ()> + 'static>> {
117        let cancellable: Option<Cancellable> = cancellable.map(|c| c.as_ref()).cloned();
118
119        let obj = self.clone();
120        Box::pin(glib::SourceStream::new(move |send| {
121            let send = Some(send);
122            obj.create_source(cancellable.as_ref(), None, priority, move |_| {
123                if send.as_ref().unwrap().unbounded_send(()).is_err() {
124                    glib::ControlFlow::Break
125                } else {
126                    glib::ControlFlow::Continue
127                }
128            })
129        }))
130    }
131
132    /// Attempts to write the bytes contained in the @n_vectors @vectors to @self,
133    /// as with g_output_stream_writev(). If @self is not currently writable,
134    /// this will immediately return %@G_POLLABLE_RETURN_WOULD_BLOCK, and you can
135    /// use g_pollable_output_stream_create_source() to create a #GSource
136    /// that will be triggered when @self is writable. @error will *not* be
137    /// set in that case.
138    ///
139    /// Note that since this method never blocks, you cannot actually
140    /// use @cancellable to cancel it. However, it will return an error
141    /// if @cancellable has already been cancelled when you call, which
142    /// may happen if you call this method after a source triggers due
143    /// to having been cancelled.
144    ///
145    /// Also note that if [`PollableReturn::WouldBlock`][crate::PollableReturn::WouldBlock] is returned some underlying
146    /// transports like D/TLS require that you re-send the same @vectors and
147    /// @n_vectors in the next write call.
148    ///
149    /// The behaviour of this method is undefined if
150    /// g_pollable_output_stream_can_poll() returns [`false`] for @self.
151    /// ## `vectors`
152    /// the buffer containing the #GOutputVectors to write.
153    /// ## `cancellable`
154    /// a #GCancellable, or [`None`]
155    ///
156    /// # Returns
157    ///
158    /// %@G_POLLABLE_RETURN_OK on success, [`PollableReturn::WouldBlock`][crate::PollableReturn::WouldBlock]
159    /// if the stream is not currently writable (and @error is *not* set), or
160    /// [`PollableReturn::Failed`][crate::PollableReturn::Failed] if there was an error in which case @error will
161    /// be set.
162    ///
163    /// ## `bytes_written`
164    /// location to store the number of bytes that were
165    ///     written to the stream
166    #[cfg(feature = "v2_60")]
167    #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
168    #[doc(alias = "g_pollable_output_stream_writev_nonblocking")]
169    fn writev_nonblocking(
170        &self,
171        vectors: &[OutputVector],
172        cancellable: Option<&impl IsA<Cancellable>>,
173    ) -> Result<(PollableReturn, usize), glib::Error> {
174        unsafe {
175            let mut error = std::ptr::null_mut();
176            let mut bytes_written = 0;
177
178            let ret = ffi::g_pollable_output_stream_writev_nonblocking(
179                self.as_ref().to_glib_none().0,
180                vectors.as_ptr() as *const _,
181                vectors.len(),
182                &mut bytes_written,
183                cancellable.map(|p| p.as_ref()).to_glib_none().0,
184                &mut error,
185            );
186            if error.is_null() {
187                Ok((from_glib(ret), bytes_written))
188            } else {
189                Err(from_glib_full(error))
190            }
191        }
192    }
193
194    fn into_async_write(self) -> Result<OutputStreamAsyncWrite<Self>, Self>
195    where
196        Self: IsA<PollableOutputStream>,
197    {
198        if self.can_poll() {
199            Ok(OutputStreamAsyncWrite(self, None))
200        } else {
201            Err(self)
202        }
203    }
204}
205
206impl<O: IsA<PollableOutputStream>> PollableOutputStreamExtManual for O {}
207
208#[derive(Debug)]
209pub struct OutputStreamAsyncWrite<T: IsA<PollableOutputStream>>(
210    T,
211    Option<oneshot::Receiver<Result<(), glib::Error>>>,
212);
213
214impl<T: IsA<PollableOutputStream>> OutputStreamAsyncWrite<T> {
215    pub fn into_output_stream(self) -> T {
216        self.0
217    }
218
219    pub fn output_stream(&self) -> &T {
220        &self.0
221    }
222}
223
224impl<T: IsA<PollableOutputStream>> AsyncWrite for OutputStreamAsyncWrite<T> {
225    fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
226        let stream = Pin::get_ref(self.as_ref());
227        let gio_result = stream
228            .0
229            .as_ref()
230            .write_nonblocking(buf, crate::Cancellable::NONE);
231
232        match gio_result {
233            Ok(size) => Poll::Ready(Ok(size as usize)),
234            Err(err) => {
235                let kind = err
236                    .kind::<crate::IOErrorEnum>()
237                    .unwrap_or(crate::IOErrorEnum::Failed);
238                if kind == crate::IOErrorEnum::WouldBlock {
239                    let mut waker = Some(cx.waker().clone());
240                    let source = stream.0.as_ref().create_source(
241                        crate::Cancellable::NONE,
242                        None,
243                        glib::Priority::default(),
244                        move |_| {
245                            if let Some(waker) = waker.take() {
246                                waker.wake();
247                            }
248                            glib::ControlFlow::Break
249                        },
250                    );
251                    let main_context = glib::MainContext::ref_thread_default();
252                    source.attach(Some(&main_context));
253
254                    Poll::Pending
255                } else {
256                    Poll::Ready(Err(io::Error::new(io::ErrorKind::from(kind), err)))
257                }
258            }
259        }
260    }
261
262    #[cfg(feature = "v2_60")]
263    #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
264    fn poll_write_vectored(
265        self: Pin<&mut Self>,
266        cx: &mut Context<'_>,
267        bufs: &[io::IoSlice<'_>],
268    ) -> Poll<io::Result<usize>> {
269        let stream = Pin::get_ref(self.as_ref());
270        let vectors = bufs
271            .iter()
272            .map(|v| OutputVector::new(v))
273            .collect::<smallvec::SmallVec<[_; 2]>>();
274        let gio_result = stream
275            .0
276            .as_ref()
277            .writev_nonblocking(&vectors, crate::Cancellable::NONE);
278
279        match gio_result {
280            Ok((PollableReturn::Ok, size)) => Poll::Ready(Ok(size)),
281            Ok((PollableReturn::WouldBlock, _)) => {
282                let mut waker = Some(cx.waker().clone());
283                let source = stream.0.as_ref().create_source(
284                    crate::Cancellable::NONE,
285                    None,
286                    glib::Priority::default(),
287                    move |_| {
288                        if let Some(waker) = waker.take() {
289                            waker.wake();
290                        }
291                        glib::ControlFlow::Break
292                    },
293                );
294                let main_context = glib::MainContext::ref_thread_default();
295                source.attach(Some(&main_context));
296
297                Poll::Pending
298            }
299            Ok((_, _)) => unreachable!(),
300            Err(err) => Poll::Ready(Err(io::Error::new(
301                io::ErrorKind::from(
302                    err.kind::<crate::IOErrorEnum>()
303                        .unwrap_or(crate::IOErrorEnum::Failed),
304                ),
305                err,
306            ))),
307        }
308    }
309
310    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
311        let stream = unsafe { Pin::get_unchecked_mut(self) };
312
313        let rx = if let Some(ref mut rx) = stream.1 {
314            rx
315        } else {
316            let (tx, rx) = oneshot::channel();
317            stream.0.as_ref().flush_async(
318                glib::Priority::default(),
319                crate::Cancellable::NONE,
320                move |res| {
321                    let _ = tx.send(res);
322                },
323            );
324
325            stream.1 = Some(rx);
326            stream.1.as_mut().unwrap()
327        };
328
329        match Pin::new(rx).poll(cx) {
330            Poll::Ready(Ok(res)) => {
331                let _ = stream.1.take();
332                Poll::Ready(to_std_io_result(res))
333            }
334            Poll::Ready(Err(_)) => {
335                let _ = stream.1.take();
336                Poll::Ready(Ok(()))
337            }
338            Poll::Pending => Poll::Pending,
339        }
340    }
341
342    fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
343        let stream = unsafe { Pin::get_unchecked_mut(self) };
344
345        let rx = if let Some(ref mut rx) = stream.1 {
346            rx
347        } else {
348            let (tx, rx) = oneshot::channel();
349            stream.0.as_ref().close_async(
350                glib::Priority::default(),
351                crate::Cancellable::NONE,
352                move |res| {
353                    let _ = tx.send(res);
354                },
355            );
356
357            stream.1 = Some(rx);
358            stream.1.as_mut().unwrap()
359        };
360
361        match Pin::new(rx).poll(cx) {
362            Poll::Ready(Ok(res)) => {
363                let _ = stream.1.take();
364                Poll::Ready(to_std_io_result(res))
365            }
366            Poll::Ready(Err(_)) => {
367                let _ = stream.1.take();
368                Poll::Ready(Ok(()))
369            }
370            Poll::Pending => Poll::Pending,
371        }
372    }
373}