gio/
pollable_output_stream.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{cell::RefCell, io, mem::transmute, pin::Pin};
4
5use futures_channel::oneshot;
6use futures_core::{
7    stream::Stream,
8    task::{Context, Poll},
9    Future,
10};
11use futures_io::AsyncWrite;
12use glib::{prelude::*, translate::*};
13
14use crate::{error::to_std_io_result, ffi, prelude::*, Cancellable, PollableOutputStream};
15#[cfg(feature = "v2_60")]
16use crate::{OutputVector, PollableReturn};
17
18pub trait PollableOutputStreamExtManual: IsA<PollableOutputStream> {
19    /// Creates a #GSource that triggers when @self can be written, or
20    /// @cancellable is triggered or an error occurs. The callback on the
21    /// source is of the #GPollableSourceFunc type.
22    ///
23    /// As with g_pollable_output_stream_is_writable(), it is possible that
24    /// the stream may not actually be writable even after the source
25    /// triggers, so you should use g_pollable_output_stream_write_nonblocking()
26    /// rather than g_output_stream_write() from the callback.
27    ///
28    /// The behaviour of this method is undefined if
29    /// g_pollable_output_stream_can_poll() returns [`false`] for @self.
30    /// ## `cancellable`
31    /// a #GCancellable, or [`None`]
32    ///
33    /// # Returns
34    ///
35    /// a new #GSource
36    #[doc(alias = "g_pollable_output_stream_create_source")]
37    fn create_source<F, C>(
38        &self,
39        cancellable: Option<&C>,
40        name: Option<&str>,
41        priority: glib::Priority,
42        func: F,
43    ) -> glib::Source
44    where
45        F: FnMut(&Self) -> glib::ControlFlow + 'static,
46        C: IsA<Cancellable>,
47    {
48        unsafe extern "C" fn trampoline<
49            O: IsA<PollableOutputStream>,
50            F: FnMut(&O) -> glib::ControlFlow + 'static,
51        >(
52            stream: *mut ffi::GPollableOutputStream,
53            func: glib::ffi::gpointer,
54        ) -> glib::ffi::gboolean {
55            let func: &RefCell<F> = &*(func as *const RefCell<F>);
56            let mut func = func.borrow_mut();
57            (*func)(PollableOutputStream::from_glib_borrow(stream).unsafe_cast_ref()).into_glib()
58        }
59        unsafe extern "C" fn destroy_closure<F>(ptr: glib::ffi::gpointer) {
60            let _ = Box::<RefCell<F>>::from_raw(ptr as *mut _);
61        }
62        let cancellable = cancellable.map(|c| c.as_ref());
63        let gcancellable = cancellable.to_glib_none();
64        unsafe {
65            let source = ffi::g_pollable_output_stream_create_source(
66                self.as_ref().to_glib_none().0,
67                gcancellable.0,
68            );
69
70            let trampoline = trampoline::<Self, F> as glib::ffi::gpointer;
71            glib::ffi::g_source_set_callback(
72                source,
73                Some(transmute::<
74                    glib::ffi::gpointer,
75                    unsafe extern "C" fn(glib::ffi::gpointer) -> glib::ffi::gboolean,
76                >(trampoline)),
77                Box::into_raw(Box::new(RefCell::new(func))) as glib::ffi::gpointer,
78                Some(destroy_closure::<F>),
79            );
80            glib::ffi::g_source_set_priority(source, priority.into_glib());
81
82            if let Some(name) = name {
83                glib::ffi::g_source_set_name(source, name.to_glib_none().0);
84            }
85
86            from_glib_full(source)
87        }
88    }
89
90    fn create_source_future<C: IsA<Cancellable>>(
91        &self,
92        cancellable: Option<&C>,
93        priority: glib::Priority,
94    ) -> Pin<Box<dyn std::future::Future<Output = ()> + 'static>> {
95        let cancellable: Option<Cancellable> = cancellable.map(|c| c.as_ref()).cloned();
96
97        let obj = self.clone();
98        Box::pin(glib::SourceFuture::new(move |send| {
99            let mut send = Some(send);
100            obj.create_source(cancellable.as_ref(), None, priority, move |_| {
101                let _ = send.take().unwrap().send(());
102                glib::ControlFlow::Break
103            })
104        }))
105    }
106
107    fn create_source_stream<C: IsA<Cancellable>>(
108        &self,
109        cancellable: Option<&C>,
110        priority: glib::Priority,
111    ) -> Pin<Box<dyn Stream<Item = ()> + 'static>> {
112        let cancellable: Option<Cancellable> = cancellable.map(|c| c.as_ref()).cloned();
113
114        let obj = self.clone();
115        Box::pin(glib::SourceStream::new(move |send| {
116            let send = Some(send);
117            obj.create_source(cancellable.as_ref(), None, priority, move |_| {
118                if send.as_ref().unwrap().unbounded_send(()).is_err() {
119                    glib::ControlFlow::Break
120                } else {
121                    glib::ControlFlow::Continue
122                }
123            })
124        }))
125    }
126
127    /// Attempts to write the bytes contained in the @n_vectors @vectors to @self,
128    /// as with g_output_stream_writev(). If @self is not currently writable,
129    /// this will immediately return %@G_POLLABLE_RETURN_WOULD_BLOCK, and you can
130    /// use g_pollable_output_stream_create_source() to create a #GSource
131    /// that will be triggered when @self is writable. @error will *not* be
132    /// set in that case.
133    ///
134    /// Note that since this method never blocks, you cannot actually
135    /// use @cancellable to cancel it. However, it will return an error
136    /// if @cancellable has already been cancelled when you call, which
137    /// may happen if you call this method after a source triggers due
138    /// to having been cancelled.
139    ///
140    /// Also note that if [`PollableReturn::WouldBlock`][crate::PollableReturn::WouldBlock] is returned some underlying
141    /// transports like D/TLS require that you re-send the same @vectors and
142    /// @n_vectors in the next write call.
143    ///
144    /// The behaviour of this method is undefined if
145    /// g_pollable_output_stream_can_poll() returns [`false`] for @self.
146    /// ## `vectors`
147    /// the buffer containing the #GOutputVectors to write.
148    /// ## `cancellable`
149    /// a #GCancellable, or [`None`]
150    ///
151    /// # Returns
152    ///
153    /// %@G_POLLABLE_RETURN_OK on success, [`PollableReturn::WouldBlock`][crate::PollableReturn::WouldBlock]
154    /// if the stream is not currently writable (and @error is *not* set), or
155    /// [`PollableReturn::Failed`][crate::PollableReturn::Failed] if there was an error in which case @error will
156    /// be set.
157    ///
158    /// ## `bytes_written`
159    /// location to store the number of bytes that were
160    ///     written to the stream
161    #[cfg(feature = "v2_60")]
162    #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
163    #[doc(alias = "g_pollable_output_stream_writev_nonblocking")]
164    fn writev_nonblocking(
165        &self,
166        vectors: &[OutputVector],
167        cancellable: Option<&impl IsA<Cancellable>>,
168    ) -> Result<(PollableReturn, usize), glib::Error> {
169        unsafe {
170            let mut error = std::ptr::null_mut();
171            let mut bytes_written = 0;
172
173            let ret = ffi::g_pollable_output_stream_writev_nonblocking(
174                self.as_ref().to_glib_none().0,
175                vectors.as_ptr() as *const _,
176                vectors.len(),
177                &mut bytes_written,
178                cancellable.map(|p| p.as_ref()).to_glib_none().0,
179                &mut error,
180            );
181            if error.is_null() {
182                Ok((from_glib(ret), bytes_written))
183            } else {
184                Err(from_glib_full(error))
185            }
186        }
187    }
188
189    fn into_async_write(self) -> Result<OutputStreamAsyncWrite<Self>, Self>
190    where
191        Self: IsA<PollableOutputStream>,
192    {
193        if self.can_poll() {
194            Ok(OutputStreamAsyncWrite(self, None))
195        } else {
196            Err(self)
197        }
198    }
199}
200
201impl<O: IsA<PollableOutputStream>> PollableOutputStreamExtManual for O {}
202
203#[derive(Debug)]
204pub struct OutputStreamAsyncWrite<T: IsA<PollableOutputStream>>(
205    T,
206    Option<oneshot::Receiver<Result<(), glib::Error>>>,
207);
208
209impl<T: IsA<PollableOutputStream>> OutputStreamAsyncWrite<T> {
210    pub fn into_output_stream(self) -> T {
211        self.0
212    }
213
214    pub fn output_stream(&self) -> &T {
215        &self.0
216    }
217}
218
219impl<T: IsA<PollableOutputStream>> AsyncWrite for OutputStreamAsyncWrite<T> {
220    fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
221        let stream = Pin::get_ref(self.as_ref());
222        let gio_result = stream
223            .0
224            .as_ref()
225            .write_nonblocking(buf, crate::Cancellable::NONE);
226
227        match gio_result {
228            Ok(size) => Poll::Ready(Ok(size as usize)),
229            Err(err) => {
230                let kind = err
231                    .kind::<crate::IOErrorEnum>()
232                    .unwrap_or(crate::IOErrorEnum::Failed);
233                if kind == crate::IOErrorEnum::WouldBlock {
234                    let mut waker = Some(cx.waker().clone());
235                    let source = stream.0.as_ref().create_source(
236                        crate::Cancellable::NONE,
237                        None,
238                        glib::Priority::default(),
239                        move |_| {
240                            if let Some(waker) = waker.take() {
241                                waker.wake();
242                            }
243                            glib::ControlFlow::Break
244                        },
245                    );
246                    let main_context = glib::MainContext::ref_thread_default();
247                    source.attach(Some(&main_context));
248
249                    Poll::Pending
250                } else {
251                    Poll::Ready(Err(io::Error::new(io::ErrorKind::from(kind), err)))
252                }
253            }
254        }
255    }
256
257    #[cfg(feature = "v2_60")]
258    #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
259    fn poll_write_vectored(
260        self: Pin<&mut Self>,
261        cx: &mut Context<'_>,
262        bufs: &[io::IoSlice<'_>],
263    ) -> Poll<io::Result<usize>> {
264        let stream = Pin::get_ref(self.as_ref());
265        let vectors = bufs
266            .iter()
267            .map(|v| OutputVector::new(v))
268            .collect::<smallvec::SmallVec<[_; 2]>>();
269        let gio_result = stream
270            .0
271            .as_ref()
272            .writev_nonblocking(&vectors, crate::Cancellable::NONE);
273
274        match gio_result {
275            Ok((PollableReturn::Ok, size)) => Poll::Ready(Ok(size)),
276            Ok((PollableReturn::WouldBlock, _)) => {
277                let mut waker = Some(cx.waker().clone());
278                let source = stream.0.as_ref().create_source(
279                    crate::Cancellable::NONE,
280                    None,
281                    glib::Priority::default(),
282                    move |_| {
283                        if let Some(waker) = waker.take() {
284                            waker.wake();
285                        }
286                        glib::ControlFlow::Break
287                    },
288                );
289                let main_context = glib::MainContext::ref_thread_default();
290                source.attach(Some(&main_context));
291
292                Poll::Pending
293            }
294            Ok((_, _)) => unreachable!(),
295            Err(err) => Poll::Ready(Err(io::Error::new(
296                io::ErrorKind::from(
297                    err.kind::<crate::IOErrorEnum>()
298                        .unwrap_or(crate::IOErrorEnum::Failed),
299                ),
300                err,
301            ))),
302        }
303    }
304
305    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
306        let stream = unsafe { Pin::get_unchecked_mut(self) };
307
308        let rx = if let Some(ref mut rx) = stream.1 {
309            rx
310        } else {
311            let (tx, rx) = oneshot::channel();
312            stream.0.as_ref().flush_async(
313                glib::Priority::default(),
314                crate::Cancellable::NONE,
315                move |res| {
316                    let _ = tx.send(res);
317                },
318            );
319
320            stream.1 = Some(rx);
321            stream.1.as_mut().unwrap()
322        };
323
324        match Pin::new(rx).poll(cx) {
325            Poll::Ready(Ok(res)) => {
326                let _ = stream.1.take();
327                Poll::Ready(to_std_io_result(res))
328            }
329            Poll::Ready(Err(_)) => {
330                let _ = stream.1.take();
331                Poll::Ready(Ok(()))
332            }
333            Poll::Pending => Poll::Pending,
334        }
335    }
336
337    fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
338        let stream = unsafe { Pin::get_unchecked_mut(self) };
339
340        let rx = if let Some(ref mut rx) = stream.1 {
341            rx
342        } else {
343            let (tx, rx) = oneshot::channel();
344            stream.0.as_ref().close_async(
345                glib::Priority::default(),
346                crate::Cancellable::NONE,
347                move |res| {
348                    let _ = tx.send(res);
349                },
350            );
351
352            stream.1 = Some(rx);
353            stream.1.as_mut().unwrap()
354        };
355
356        match Pin::new(rx).poll(cx) {
357            Poll::Ready(Ok(res)) => {
358                let _ = stream.1.take();
359                Poll::Ready(to_std_io_result(res))
360            }
361            Poll::Ready(Err(_)) => {
362                let _ = stream.1.take();
363                Poll::Ready(Ok(()))
364            }
365            Poll::Pending => Poll::Pending,
366        }
367    }
368}