use crate::error::to_std_io_result;
use crate::prelude::*;
use crate::Cancellable;
use crate::PollableOutputStream;
use futures_channel::oneshot;
use futures_core::task::{Context, Poll};
use futures_core::Future;
use futures_io::AsyncWrite;
use glib::object::{Cast, IsA};
use glib::translate::*;
use std::cell::RefCell;
use std::io;
use std::mem::transmute;
use std::pin::Pin;
use futures_core::stream::Stream;
pub trait PollableOutputStreamExtManual {
#[doc(alias = "g_pollable_output_stream_create_source")]
fn create_source<F, C>(
&self,
cancellable: Option<&C>,
name: Option<&str>,
priority: glib::Priority,
func: F,
) -> glib::Source
where
F: FnMut(&Self) -> glib::Continue + 'static,
C: IsA<Cancellable>;
fn create_source_future<C: IsA<Cancellable>>(
&self,
cancellable: Option<&C>,
priority: glib::Priority,
) -> Pin<Box<dyn std::future::Future<Output = ()> + 'static>>;
fn create_source_stream<C: IsA<Cancellable>>(
&self,
cancellable: Option<&C>,
priority: glib::Priority,
) -> Pin<Box<dyn Stream<Item = ()> + 'static>>;
fn into_async_write(self) -> Result<OutputStreamAsyncWrite<Self>, Self>
where
Self: IsA<PollableOutputStream>,
{
if self.can_poll() {
Ok(OutputStreamAsyncWrite(self, None))
} else {
Err(self)
}
}
}
impl<O: IsA<PollableOutputStream>> PollableOutputStreamExtManual for O {
fn create_source<F, C>(
&self,
cancellable: Option<&C>,
name: Option<&str>,
priority: glib::Priority,
func: F,
) -> glib::Source
where
F: FnMut(&Self) -> glib::Continue + 'static,
C: IsA<Cancellable>,
{
unsafe extern "C" fn trampoline<
O: IsA<PollableOutputStream>,
F: FnMut(&O) -> glib::Continue + 'static,
>(
stream: *mut ffi::GPollableOutputStream,
func: glib::ffi::gpointer,
) -> glib::ffi::gboolean {
let func: &RefCell<F> = &*(func as *const RefCell<F>);
let mut func = func.borrow_mut();
(*func)(PollableOutputStream::from_glib_borrow(stream).unsafe_cast_ref()).into_glib()
}
unsafe extern "C" fn destroy_closure<O, F>(ptr: glib::ffi::gpointer) {
let _ = Box::<RefCell<F>>::from_raw(ptr as *mut _);
}
let cancellable = cancellable.map(|c| c.as_ref());
let gcancellable = cancellable.to_glib_none();
unsafe {
let source = ffi::g_pollable_output_stream_create_source(
self.as_ref().to_glib_none().0,
gcancellable.0,
);
let trampoline = trampoline::<Self, F> as glib::ffi::gpointer;
glib::ffi::g_source_set_callback(
source,
Some(transmute::<
_,
unsafe extern "C" fn(glib::ffi::gpointer) -> glib::ffi::gboolean,
>(trampoline)),
Box::into_raw(Box::new(RefCell::new(func))) as glib::ffi::gpointer,
Some(destroy_closure::<Self, F>),
);
glib::ffi::g_source_set_priority(source, priority.into_glib());
if let Some(name) = name {
glib::ffi::g_source_set_name(source, name.to_glib_none().0);
}
from_glib_full(source)
}
}
fn create_source_future<C: IsA<Cancellable>>(
&self,
cancellable: Option<&C>,
priority: glib::Priority,
) -> Pin<Box<dyn std::future::Future<Output = ()> + 'static>> {
let cancellable: Option<Cancellable> = cancellable.map(|c| c.as_ref()).cloned();
let obj = self.clone();
Box::pin(glib::SourceFuture::new(move |send| {
let mut send = Some(send);
obj.create_source(cancellable.as_ref(), None, priority, move |_| {
let _ = send.take().unwrap().send(());
glib::Continue(false)
})
}))
}
fn create_source_stream<C: IsA<Cancellable>>(
&self,
cancellable: Option<&C>,
priority: glib::Priority,
) -> Pin<Box<dyn Stream<Item = ()> + 'static>> {
let cancellable: Option<Cancellable> = cancellable.map(|c| c.as_ref()).cloned();
let obj = self.clone();
Box::pin(glib::SourceStream::new(move |send| {
let send = Some(send);
obj.create_source(cancellable.as_ref(), None, priority, move |_| {
if send.as_ref().unwrap().unbounded_send(()).is_err() {
glib::Continue(false)
} else {
glib::Continue(true)
}
})
}))
}
}
#[derive(Debug)]
pub struct OutputStreamAsyncWrite<T: IsA<PollableOutputStream>>(
T,
Option<oneshot::Receiver<Result<(), glib::Error>>>,
);
impl<T: IsA<PollableOutputStream>> OutputStreamAsyncWrite<T> {
pub fn into_output_stream(self) -> T {
self.0
}
pub fn output_stream(&self) -> &T {
&self.0
}
}
impl<T: IsA<PollableOutputStream>> AsyncWrite for OutputStreamAsyncWrite<T> {
fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
let stream = Pin::get_ref(self.as_ref());
let gio_result = stream
.0
.as_ref()
.write_nonblocking(buf, crate::Cancellable::NONE);
match gio_result {
Ok(size) => Poll::Ready(Ok(size as usize)),
Err(err) => {
let kind = err.kind::<crate::IOErrorEnum>().unwrap();
if kind == crate::IOErrorEnum::WouldBlock {
let mut waker = Some(cx.waker().clone());
let source = stream.0.as_ref().create_source(
crate::Cancellable::NONE,
None,
glib::PRIORITY_DEFAULT,
move |_| {
if let Some(waker) = waker.take() {
waker.wake();
}
glib::Continue(false)
},
);
let main_context = glib::MainContext::ref_thread_default();
source.attach(Some(&main_context));
Poll::Pending
} else {
Poll::Ready(Err(io::Error::new(io::ErrorKind::from(kind), err)))
}
}
}
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
let stream = unsafe { Pin::get_unchecked_mut(self) };
let rx = if let Some(ref mut rx) = stream.1 {
rx
} else {
let (tx, rx) = oneshot::channel();
stream.0.as_ref().flush_async(
glib::PRIORITY_DEFAULT,
crate::Cancellable::NONE,
move |res| {
let _ = tx.send(res);
},
);
stream.1 = Some(rx);
stream.1.as_mut().unwrap()
};
match Pin::new(rx).poll(cx) {
Poll::Ready(Ok(res)) => {
let _ = stream.1.take();
Poll::Ready(to_std_io_result(res))
}
Poll::Ready(Err(_)) => {
let _ = stream.1.take();
Poll::Ready(Ok(()))
}
Poll::Pending => Poll::Pending,
}
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
let stream = unsafe { Pin::get_unchecked_mut(self) };
let rx = if let Some(ref mut rx) = stream.1 {
rx
} else {
let (tx, rx) = oneshot::channel();
stream.0.as_ref().close_async(
glib::PRIORITY_DEFAULT,
crate::Cancellable::NONE,
move |res| {
let _ = tx.send(res);
},
);
stream.1 = Some(rx);
stream.1.as_mut().unwrap()
};
match Pin::new(rx).poll(cx) {
Poll::Ready(Ok(res)) => {
let _ = stream.1.take();
Poll::Ready(to_std_io_result(res))
}
Poll::Ready(Err(_)) => {
let _ = stream.1.take();
Poll::Ready(Ok(()))
}
Poll::Pending => Poll::Pending,
}
}
}