1use std::{cell::RefCell, io, mem::transmute, pin::Pin};
4
5use futures_channel::oneshot;
6use futures_core::{
7 stream::Stream,
8 task::{Context, Poll},
9 Future,
10};
11use futures_io::AsyncWrite;
12use glib::{prelude::*, translate::*};
13
14use crate::{error::to_std_io_result, ffi, prelude::*, Cancellable, PollableOutputStream};
15#[cfg(feature = "v2_60")]
16use crate::{OutputVector, PollableReturn};
17
18pub trait PollableOutputStreamExtManual: IsA<PollableOutputStream> {
19 #[doc(alias = "g_pollable_output_stream_create_source")]
37 fn create_source<F, C>(
38 &self,
39 cancellable: Option<&C>,
40 name: Option<&str>,
41 priority: glib::Priority,
42 func: F,
43 ) -> glib::Source
44 where
45 F: FnMut(&Self) -> glib::ControlFlow + 'static,
46 C: IsA<Cancellable>,
47 {
48 unsafe extern "C" fn trampoline<
49 O: IsA<PollableOutputStream>,
50 F: FnMut(&O) -> glib::ControlFlow + 'static,
51 >(
52 stream: *mut ffi::GPollableOutputStream,
53 func: glib::ffi::gpointer,
54 ) -> glib::ffi::gboolean {
55 let func: &RefCell<F> = &*(func as *const RefCell<F>);
56 let mut func = func.borrow_mut();
57 (*func)(PollableOutputStream::from_glib_borrow(stream).unsafe_cast_ref()).into_glib()
58 }
59 unsafe extern "C" fn destroy_closure<F>(ptr: glib::ffi::gpointer) {
60 let _ = Box::<RefCell<F>>::from_raw(ptr as *mut _);
61 }
62 let cancellable = cancellable.map(|c| c.as_ref());
63 let gcancellable = cancellable.to_glib_none();
64 unsafe {
65 let source = ffi::g_pollable_output_stream_create_source(
66 self.as_ref().to_glib_none().0,
67 gcancellable.0,
68 );
69
70 let trampoline = trampoline::<Self, F> as glib::ffi::gpointer;
71 glib::ffi::g_source_set_callback(
72 source,
73 Some(transmute::<
74 glib::ffi::gpointer,
75 unsafe extern "C" fn(glib::ffi::gpointer) -> glib::ffi::gboolean,
76 >(trampoline)),
77 Box::into_raw(Box::new(RefCell::new(func))) as glib::ffi::gpointer,
78 Some(destroy_closure::<F>),
79 );
80 glib::ffi::g_source_set_priority(source, priority.into_glib());
81
82 if let Some(name) = name {
83 glib::ffi::g_source_set_name(source, name.to_glib_none().0);
84 }
85
86 from_glib_full(source)
87 }
88 }
89
90 fn create_source_future<C: IsA<Cancellable>>(
91 &self,
92 cancellable: Option<&C>,
93 priority: glib::Priority,
94 ) -> Pin<Box<dyn std::future::Future<Output = ()> + 'static>> {
95 let cancellable: Option<Cancellable> = cancellable.map(|c| c.as_ref()).cloned();
96
97 let obj = self.clone();
98 Box::pin(glib::SourceFuture::new(move |send| {
99 let mut send = Some(send);
100 obj.create_source(cancellable.as_ref(), None, priority, move |_| {
101 let _ = send.take().unwrap().send(());
102 glib::ControlFlow::Break
103 })
104 }))
105 }
106
107 fn create_source_stream<C: IsA<Cancellable>>(
108 &self,
109 cancellable: Option<&C>,
110 priority: glib::Priority,
111 ) -> Pin<Box<dyn Stream<Item = ()> + 'static>> {
112 let cancellable: Option<Cancellable> = cancellable.map(|c| c.as_ref()).cloned();
113
114 let obj = self.clone();
115 Box::pin(glib::SourceStream::new(move |send| {
116 let send = Some(send);
117 obj.create_source(cancellable.as_ref(), None, priority, move |_| {
118 if send.as_ref().unwrap().unbounded_send(()).is_err() {
119 glib::ControlFlow::Break
120 } else {
121 glib::ControlFlow::Continue
122 }
123 })
124 }))
125 }
126
127 #[cfg(feature = "v2_60")]
162 #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
163 #[doc(alias = "g_pollable_output_stream_writev_nonblocking")]
164 fn writev_nonblocking(
165 &self,
166 vectors: &[OutputVector],
167 cancellable: Option<&impl IsA<Cancellable>>,
168 ) -> Result<(PollableReturn, usize), glib::Error> {
169 unsafe {
170 let mut error = std::ptr::null_mut();
171 let mut bytes_written = 0;
172
173 let ret = ffi::g_pollable_output_stream_writev_nonblocking(
174 self.as_ref().to_glib_none().0,
175 vectors.as_ptr() as *const _,
176 vectors.len(),
177 &mut bytes_written,
178 cancellable.map(|p| p.as_ref()).to_glib_none().0,
179 &mut error,
180 );
181 if error.is_null() {
182 Ok((from_glib(ret), bytes_written))
183 } else {
184 Err(from_glib_full(error))
185 }
186 }
187 }
188
189 fn into_async_write(self) -> Result<OutputStreamAsyncWrite<Self>, Self>
190 where
191 Self: IsA<PollableOutputStream>,
192 {
193 if self.can_poll() {
194 Ok(OutputStreamAsyncWrite(self, None))
195 } else {
196 Err(self)
197 }
198 }
199}
200
201impl<O: IsA<PollableOutputStream>> PollableOutputStreamExtManual for O {}
202
203#[derive(Debug)]
204pub struct OutputStreamAsyncWrite<T: IsA<PollableOutputStream>>(
205 T,
206 Option<oneshot::Receiver<Result<(), glib::Error>>>,
207);
208
209impl<T: IsA<PollableOutputStream>> OutputStreamAsyncWrite<T> {
210 pub fn into_output_stream(self) -> T {
211 self.0
212 }
213
214 pub fn output_stream(&self) -> &T {
215 &self.0
216 }
217}
218
219impl<T: IsA<PollableOutputStream>> AsyncWrite for OutputStreamAsyncWrite<T> {
220 fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
221 let stream = Pin::get_ref(self.as_ref());
222 let gio_result = stream
223 .0
224 .as_ref()
225 .write_nonblocking(buf, crate::Cancellable::NONE);
226
227 match gio_result {
228 Ok(size) => Poll::Ready(Ok(size as usize)),
229 Err(err) => {
230 let kind = err
231 .kind::<crate::IOErrorEnum>()
232 .unwrap_or(crate::IOErrorEnum::Failed);
233 if kind == crate::IOErrorEnum::WouldBlock {
234 let mut waker = Some(cx.waker().clone());
235 let source = stream.0.as_ref().create_source(
236 crate::Cancellable::NONE,
237 None,
238 glib::Priority::default(),
239 move |_| {
240 if let Some(waker) = waker.take() {
241 waker.wake();
242 }
243 glib::ControlFlow::Break
244 },
245 );
246 let main_context = glib::MainContext::ref_thread_default();
247 source.attach(Some(&main_context));
248
249 Poll::Pending
250 } else {
251 Poll::Ready(Err(io::Error::new(io::ErrorKind::from(kind), err)))
252 }
253 }
254 }
255 }
256
257 #[cfg(feature = "v2_60")]
258 #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
259 fn poll_write_vectored(
260 self: Pin<&mut Self>,
261 cx: &mut Context<'_>,
262 bufs: &[io::IoSlice<'_>],
263 ) -> Poll<io::Result<usize>> {
264 let stream = Pin::get_ref(self.as_ref());
265 let vectors = bufs
266 .iter()
267 .map(|v| OutputVector::new(v))
268 .collect::<smallvec::SmallVec<[_; 2]>>();
269 let gio_result = stream
270 .0
271 .as_ref()
272 .writev_nonblocking(&vectors, crate::Cancellable::NONE);
273
274 match gio_result {
275 Ok((PollableReturn::Ok, size)) => Poll::Ready(Ok(size)),
276 Ok((PollableReturn::WouldBlock, _)) => {
277 let mut waker = Some(cx.waker().clone());
278 let source = stream.0.as_ref().create_source(
279 crate::Cancellable::NONE,
280 None,
281 glib::Priority::default(),
282 move |_| {
283 if let Some(waker) = waker.take() {
284 waker.wake();
285 }
286 glib::ControlFlow::Break
287 },
288 );
289 let main_context = glib::MainContext::ref_thread_default();
290 source.attach(Some(&main_context));
291
292 Poll::Pending
293 }
294 Ok((_, _)) => unreachable!(),
295 Err(err) => Poll::Ready(Err(io::Error::new(
296 io::ErrorKind::from(
297 err.kind::<crate::IOErrorEnum>()
298 .unwrap_or(crate::IOErrorEnum::Failed),
299 ),
300 err,
301 ))),
302 }
303 }
304
305 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
306 let stream = unsafe { Pin::get_unchecked_mut(self) };
307
308 let rx = if let Some(ref mut rx) = stream.1 {
309 rx
310 } else {
311 let (tx, rx) = oneshot::channel();
312 stream.0.as_ref().flush_async(
313 glib::Priority::default(),
314 crate::Cancellable::NONE,
315 move |res| {
316 let _ = tx.send(res);
317 },
318 );
319
320 stream.1 = Some(rx);
321 stream.1.as_mut().unwrap()
322 };
323
324 match Pin::new(rx).poll(cx) {
325 Poll::Ready(Ok(res)) => {
326 let _ = stream.1.take();
327 Poll::Ready(to_std_io_result(res))
328 }
329 Poll::Ready(Err(_)) => {
330 let _ = stream.1.take();
331 Poll::Ready(Ok(()))
332 }
333 Poll::Pending => Poll::Pending,
334 }
335 }
336
337 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
338 let stream = unsafe { Pin::get_unchecked_mut(self) };
339
340 let rx = if let Some(ref mut rx) = stream.1 {
341 rx
342 } else {
343 let (tx, rx) = oneshot::channel();
344 stream.0.as_ref().close_async(
345 glib::Priority::default(),
346 crate::Cancellable::NONE,
347 move |res| {
348 let _ = tx.send(res);
349 },
350 );
351
352 stream.1 = Some(rx);
353 stream.1.as_mut().unwrap()
354 };
355
356 match Pin::new(rx).poll(cx) {
357 Poll::Ready(Ok(res)) => {
358 let _ = stream.1.take();
359 Poll::Ready(to_std_io_result(res))
360 }
361 Poll::Ready(Err(_)) => {
362 let _ = stream.1.take();
363 Poll::Ready(Ok(()))
364 }
365 Poll::Pending => Poll::Pending,
366 }
367 }
368}