gio/
pollable_input_stream.rs1use std::{cell::RefCell, io, mem::transmute, pin::Pin, ptr};
4
5use futures_core::{
6 stream::Stream,
7 task::{Context, Poll},
8};
9use futures_io::AsyncRead;
10use glib::{prelude::*, translate::*};
11
12use crate::{Cancellable, PollableInputStream, ffi, prelude::*};
13
14pub trait PollableInputStreamExtManual: IsA<PollableInputStream> + Sized {
15 #[doc(alias = "g_pollable_input_stream_create_source")]
33 fn create_source<F, C>(
34 &self,
35 cancellable: Option<&C>,
36 name: Option<&str>,
37 priority: glib::Priority,
38 func: F,
39 ) -> glib::Source
40 where
41 F: FnMut(&Self) -> glib::ControlFlow + 'static,
42 C: IsA<Cancellable>,
43 {
44 unsafe extern "C" fn trampoline<
45 O: IsA<PollableInputStream>,
46 F: FnMut(&O) -> glib::ControlFlow + 'static,
47 >(
48 stream: *mut ffi::GPollableInputStream,
49 func: glib::ffi::gpointer,
50 ) -> glib::ffi::gboolean {
51 unsafe {
52 let func: &RefCell<F> = &*(func as *const RefCell<F>);
53 let mut func = func.borrow_mut();
54 (*func)(PollableInputStream::from_glib_borrow(stream).unsafe_cast_ref()).into_glib()
55 }
56 }
57 unsafe extern "C" fn destroy_closure<F>(ptr: glib::ffi::gpointer) {
58 unsafe {
59 let _ = Box::<RefCell<F>>::from_raw(ptr as *mut _);
60 }
61 }
62 let cancellable = cancellable.map(|c| c.as_ref());
63 let gcancellable = cancellable.to_glib_none();
64 unsafe {
65 let source = ffi::g_pollable_input_stream_create_source(
66 self.as_ref().to_glib_none().0,
67 gcancellable.0,
68 );
69
70 let trampoline = trampoline::<Self, F> as glib::ffi::gpointer;
71 glib::ffi::g_source_set_callback(
72 source,
73 Some(transmute::<
74 glib::ffi::gpointer,
75 unsafe extern "C" fn(glib::ffi::gpointer) -> glib::ffi::gboolean,
76 >(trampoline)),
77 Box::into_raw(Box::new(RefCell::new(func))) as glib::ffi::gpointer,
78 Some(destroy_closure::<F>),
79 );
80 glib::ffi::g_source_set_priority(source, priority.into_glib());
81
82 if let Some(name) = name {
83 glib::ffi::g_source_set_name(source, name.to_glib_none().0);
84 }
85
86 from_glib_full(source)
87 }
88 }
89
90 fn create_source_future<C: IsA<Cancellable>>(
91 &self,
92 cancellable: Option<&C>,
93 priority: glib::Priority,
94 ) -> Pin<Box<dyn std::future::Future<Output = ()> + 'static>> {
95 let cancellable: Option<Cancellable> = cancellable.map(|c| c.as_ref()).cloned();
96
97 let obj = self.clone();
98 Box::pin(glib::SourceFuture::new(move |send| {
99 let mut send = Some(send);
100 obj.create_source(cancellable.as_ref(), None, priority, move |_| {
101 let _ = send.take().unwrap().send(());
102 glib::ControlFlow::Break
103 })
104 }))
105 }
106
107 fn create_source_stream<C: IsA<Cancellable>>(
108 &self,
109 cancellable: Option<&C>,
110 priority: glib::Priority,
111 ) -> Pin<Box<dyn Stream<Item = ()> + 'static>> {
112 let cancellable: Option<Cancellable> = cancellable.map(|c| c.as_ref()).cloned();
113
114 let obj = self.clone();
115 Box::pin(glib::SourceStream::new(move |send| {
116 obj.create_source(cancellable.as_ref(), None, priority, move |_| {
117 if send.unbounded_send(()).is_err() {
118 glib::ControlFlow::Break
119 } else {
120 glib::ControlFlow::Continue
121 }
122 })
123 }))
124 }
125
126 #[doc(alias = "g_pollable_input_stream_read_nonblocking")]
152 fn read_nonblocking<C: IsA<Cancellable>>(
153 &self,
154 buffer: &mut [u8],
155 cancellable: Option<&C>,
156 ) -> Result<isize, glib::Error> {
157 let cancellable = cancellable.map(|c| c.as_ref());
158 let gcancellable = cancellable.to_glib_none();
159 let count = buffer.len();
160 unsafe {
161 let mut error = ptr::null_mut();
162 let ret = ffi::g_pollable_input_stream_read_nonblocking(
163 self.as_ref().to_glib_none().0,
164 buffer.to_glib_none().0,
165 count,
166 gcancellable.0,
167 &mut error,
168 );
169 if error.is_null() {
170 Ok(ret)
171 } else {
172 Err(from_glib_full(error))
173 }
174 }
175 }
176
177 fn into_async_read(self) -> Result<InputStreamAsyncRead<Self>, Self>
178 where
179 Self: IsA<PollableInputStream>,
180 {
181 if self.can_poll() {
182 Ok(InputStreamAsyncRead(self))
183 } else {
184 Err(self)
185 }
186 }
187}
188
189impl<O: IsA<PollableInputStream>> PollableInputStreamExtManual for O {}
190
191#[derive(Debug)]
192pub struct InputStreamAsyncRead<T: IsA<PollableInputStream>>(T);
193
194impl<T: IsA<PollableInputStream>> InputStreamAsyncRead<T> {
195 pub fn into_input_stream(self) -> T {
196 self.0
197 }
198
199 pub fn input_stream(&self) -> &T {
200 &self.0
201 }
202}
203
204impl<T: IsA<PollableInputStream>> AsyncRead for InputStreamAsyncRead<T> {
205 fn poll_read(
206 self: Pin<&mut Self>,
207 cx: &mut Context,
208 buf: &mut [u8],
209 ) -> Poll<io::Result<usize>> {
210 let stream = Pin::get_ref(self.as_ref());
211 let gio_result = stream
212 .0
213 .as_ref()
214 .read_nonblocking(buf, crate::Cancellable::NONE);
215
216 match gio_result {
217 Ok(size) => Poll::Ready(Ok(size as usize)),
218 Err(err) => {
219 let kind = err
220 .kind::<crate::IOErrorEnum>()
221 .unwrap_or(crate::IOErrorEnum::Failed);
222 if kind == crate::IOErrorEnum::WouldBlock {
223 let mut waker = Some(cx.waker().clone());
224 let source = stream.0.as_ref().create_source(
225 crate::Cancellable::NONE,
226 None,
227 glib::Priority::default(),
228 move |_| {
229 if let Some(waker) = waker.take() {
230 waker.wake();
231 }
232 glib::ControlFlow::Break
233 },
234 );
235 let main_context = glib::MainContext::ref_thread_default();
236 source.attach(Some(&main_context));
237
238 Poll::Pending
239 } else {
240 Poll::Ready(Err(io::Error::new(io::ErrorKind::from(kind), err)))
241 }
242 }
243 }
244 }
245}