1use std::{cell::RefCell, io, mem::transmute, pin::Pin};
4
5use futures_channel::oneshot;
6use futures_core::{
7 stream::Stream,
8 task::{Context, Poll},
9 Future,
10};
11use futures_io::AsyncWrite;
12use glib::{prelude::*, translate::*};
13
14use crate::{error::to_std_io_result, ffi, prelude::*, Cancellable, PollableOutputStream};
15#[cfg(feature = "v2_60")]
16use crate::{OutputVector, PollableReturn};
17
18mod sealed {
19 pub trait Sealed {}
20 impl<T: super::IsA<super::PollableOutputStream>> Sealed for T {}
21}
22
23pub trait PollableOutputStreamExtManual: sealed::Sealed + IsA<PollableOutputStream> {
24 #[doc(alias = "g_pollable_output_stream_create_source")]
42 fn create_source<F, C>(
43 &self,
44 cancellable: Option<&C>,
45 name: Option<&str>,
46 priority: glib::Priority,
47 func: F,
48 ) -> glib::Source
49 where
50 F: FnMut(&Self) -> glib::ControlFlow + 'static,
51 C: IsA<Cancellable>,
52 {
53 unsafe extern "C" fn trampoline<
54 O: IsA<PollableOutputStream>,
55 F: FnMut(&O) -> glib::ControlFlow + 'static,
56 >(
57 stream: *mut ffi::GPollableOutputStream,
58 func: glib::ffi::gpointer,
59 ) -> glib::ffi::gboolean {
60 let func: &RefCell<F> = &*(func as *const RefCell<F>);
61 let mut func = func.borrow_mut();
62 (*func)(PollableOutputStream::from_glib_borrow(stream).unsafe_cast_ref()).into_glib()
63 }
64 unsafe extern "C" fn destroy_closure<F>(ptr: glib::ffi::gpointer) {
65 let _ = Box::<RefCell<F>>::from_raw(ptr as *mut _);
66 }
67 let cancellable = cancellable.map(|c| c.as_ref());
68 let gcancellable = cancellable.to_glib_none();
69 unsafe {
70 let source = ffi::g_pollable_output_stream_create_source(
71 self.as_ref().to_glib_none().0,
72 gcancellable.0,
73 );
74
75 let trampoline = trampoline::<Self, F> as glib::ffi::gpointer;
76 glib::ffi::g_source_set_callback(
77 source,
78 Some(transmute::<
79 glib::ffi::gpointer,
80 unsafe extern "C" fn(glib::ffi::gpointer) -> glib::ffi::gboolean,
81 >(trampoline)),
82 Box::into_raw(Box::new(RefCell::new(func))) as glib::ffi::gpointer,
83 Some(destroy_closure::<F>),
84 );
85 glib::ffi::g_source_set_priority(source, priority.into_glib());
86
87 if let Some(name) = name {
88 glib::ffi::g_source_set_name(source, name.to_glib_none().0);
89 }
90
91 from_glib_full(source)
92 }
93 }
94
95 fn create_source_future<C: IsA<Cancellable>>(
96 &self,
97 cancellable: Option<&C>,
98 priority: glib::Priority,
99 ) -> Pin<Box<dyn std::future::Future<Output = ()> + 'static>> {
100 let cancellable: Option<Cancellable> = cancellable.map(|c| c.as_ref()).cloned();
101
102 let obj = self.clone();
103 Box::pin(glib::SourceFuture::new(move |send| {
104 let mut send = Some(send);
105 obj.create_source(cancellable.as_ref(), None, priority, move |_| {
106 let _ = send.take().unwrap().send(());
107 glib::ControlFlow::Break
108 })
109 }))
110 }
111
112 fn create_source_stream<C: IsA<Cancellable>>(
113 &self,
114 cancellable: Option<&C>,
115 priority: glib::Priority,
116 ) -> Pin<Box<dyn Stream<Item = ()> + 'static>> {
117 let cancellable: Option<Cancellable> = cancellable.map(|c| c.as_ref()).cloned();
118
119 let obj = self.clone();
120 Box::pin(glib::SourceStream::new(move |send| {
121 let send = Some(send);
122 obj.create_source(cancellable.as_ref(), None, priority, move |_| {
123 if send.as_ref().unwrap().unbounded_send(()).is_err() {
124 glib::ControlFlow::Break
125 } else {
126 glib::ControlFlow::Continue
127 }
128 })
129 }))
130 }
131
132 #[cfg(feature = "v2_60")]
167 #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
168 #[doc(alias = "g_pollable_output_stream_writev_nonblocking")]
169 fn writev_nonblocking(
170 &self,
171 vectors: &[OutputVector],
172 cancellable: Option<&impl IsA<Cancellable>>,
173 ) -> Result<(PollableReturn, usize), glib::Error> {
174 unsafe {
175 let mut error = std::ptr::null_mut();
176 let mut bytes_written = 0;
177
178 let ret = ffi::g_pollable_output_stream_writev_nonblocking(
179 self.as_ref().to_glib_none().0,
180 vectors.as_ptr() as *const _,
181 vectors.len(),
182 &mut bytes_written,
183 cancellable.map(|p| p.as_ref()).to_glib_none().0,
184 &mut error,
185 );
186 if error.is_null() {
187 Ok((from_glib(ret), bytes_written))
188 } else {
189 Err(from_glib_full(error))
190 }
191 }
192 }
193
194 fn into_async_write(self) -> Result<OutputStreamAsyncWrite<Self>, Self>
195 where
196 Self: IsA<PollableOutputStream>,
197 {
198 if self.can_poll() {
199 Ok(OutputStreamAsyncWrite(self, None))
200 } else {
201 Err(self)
202 }
203 }
204}
205
206impl<O: IsA<PollableOutputStream>> PollableOutputStreamExtManual for O {}
207
208#[derive(Debug)]
209pub struct OutputStreamAsyncWrite<T: IsA<PollableOutputStream>>(
210 T,
211 Option<oneshot::Receiver<Result<(), glib::Error>>>,
212);
213
214impl<T: IsA<PollableOutputStream>> OutputStreamAsyncWrite<T> {
215 pub fn into_output_stream(self) -> T {
216 self.0
217 }
218
219 pub fn output_stream(&self) -> &T {
220 &self.0
221 }
222}
223
224impl<T: IsA<PollableOutputStream>> AsyncWrite for OutputStreamAsyncWrite<T> {
225 fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
226 let stream = Pin::get_ref(self.as_ref());
227 let gio_result = stream
228 .0
229 .as_ref()
230 .write_nonblocking(buf, crate::Cancellable::NONE);
231
232 match gio_result {
233 Ok(size) => Poll::Ready(Ok(size as usize)),
234 Err(err) => {
235 let kind = err
236 .kind::<crate::IOErrorEnum>()
237 .unwrap_or(crate::IOErrorEnum::Failed);
238 if kind == crate::IOErrorEnum::WouldBlock {
239 let mut waker = Some(cx.waker().clone());
240 let source = stream.0.as_ref().create_source(
241 crate::Cancellable::NONE,
242 None,
243 glib::Priority::default(),
244 move |_| {
245 if let Some(waker) = waker.take() {
246 waker.wake();
247 }
248 glib::ControlFlow::Break
249 },
250 );
251 let main_context = glib::MainContext::ref_thread_default();
252 source.attach(Some(&main_context));
253
254 Poll::Pending
255 } else {
256 Poll::Ready(Err(io::Error::new(io::ErrorKind::from(kind), err)))
257 }
258 }
259 }
260 }
261
262 #[cfg(feature = "v2_60")]
263 #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
264 fn poll_write_vectored(
265 self: Pin<&mut Self>,
266 cx: &mut Context<'_>,
267 bufs: &[io::IoSlice<'_>],
268 ) -> Poll<io::Result<usize>> {
269 let stream = Pin::get_ref(self.as_ref());
270 let vectors = bufs
271 .iter()
272 .map(|v| OutputVector::new(v))
273 .collect::<smallvec::SmallVec<[_; 2]>>();
274 let gio_result = stream
275 .0
276 .as_ref()
277 .writev_nonblocking(&vectors, crate::Cancellable::NONE);
278
279 match gio_result {
280 Ok((PollableReturn::Ok, size)) => Poll::Ready(Ok(size)),
281 Ok((PollableReturn::WouldBlock, _)) => {
282 let mut waker = Some(cx.waker().clone());
283 let source = stream.0.as_ref().create_source(
284 crate::Cancellable::NONE,
285 None,
286 glib::Priority::default(),
287 move |_| {
288 if let Some(waker) = waker.take() {
289 waker.wake();
290 }
291 glib::ControlFlow::Break
292 },
293 );
294 let main_context = glib::MainContext::ref_thread_default();
295 source.attach(Some(&main_context));
296
297 Poll::Pending
298 }
299 Ok((_, _)) => unreachable!(),
300 Err(err) => Poll::Ready(Err(io::Error::new(
301 io::ErrorKind::from(
302 err.kind::<crate::IOErrorEnum>()
303 .unwrap_or(crate::IOErrorEnum::Failed),
304 ),
305 err,
306 ))),
307 }
308 }
309
310 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
311 let stream = unsafe { Pin::get_unchecked_mut(self) };
312
313 let rx = if let Some(ref mut rx) = stream.1 {
314 rx
315 } else {
316 let (tx, rx) = oneshot::channel();
317 stream.0.as_ref().flush_async(
318 glib::Priority::default(),
319 crate::Cancellable::NONE,
320 move |res| {
321 let _ = tx.send(res);
322 },
323 );
324
325 stream.1 = Some(rx);
326 stream.1.as_mut().unwrap()
327 };
328
329 match Pin::new(rx).poll(cx) {
330 Poll::Ready(Ok(res)) => {
331 let _ = stream.1.take();
332 Poll::Ready(to_std_io_result(res))
333 }
334 Poll::Ready(Err(_)) => {
335 let _ = stream.1.take();
336 Poll::Ready(Ok(()))
337 }
338 Poll::Pending => Poll::Pending,
339 }
340 }
341
342 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
343 let stream = unsafe { Pin::get_unchecked_mut(self) };
344
345 let rx = if let Some(ref mut rx) = stream.1 {
346 rx
347 } else {
348 let (tx, rx) = oneshot::channel();
349 stream.0.as_ref().close_async(
350 glib::Priority::default(),
351 crate::Cancellable::NONE,
352 move |res| {
353 let _ = tx.send(res);
354 },
355 );
356
357 stream.1 = Some(rx);
358 stream.1.as_mut().unwrap()
359 };
360
361 match Pin::new(rx).poll(cx) {
362 Poll::Ready(Ok(res)) => {
363 let _ = stream.1.take();
364 Poll::Ready(to_std_io_result(res))
365 }
366 Poll::Ready(Err(_)) => {
367 let _ = stream.1.take();
368 Poll::Ready(Ok(()))
369 }
370 Poll::Pending => Poll::Pending,
371 }
372 }
373}