1use std::{io, pin::Pin};
4
5use futures_core::task::{Context, Poll};
6use futures_io::{AsyncRead, AsyncWrite};
7use glib::prelude::*;
8
9use crate::{
10 prelude::*, IOStream, InputStreamAsyncRead, OutputStreamAsyncWrite, PollableInputStream,
11 PollableOutputStream,
12};
13
14mod sealed {
15 pub trait Sealed {}
16 impl<T: super::IsA<super::IOStream>> Sealed for T {}
17}
18
19pub trait IOStreamExtManual: sealed::Sealed + Sized + IsA<IOStream> {
20 fn into_async_read_write(self) -> Result<IOStreamAsyncReadWrite<Self>, Self> {
21 let write = self
22 .output_stream()
23 .dynamic_cast::<PollableOutputStream>()
24 .ok()
25 .and_then(|s| s.into_async_write().ok());
26
27 let read = self
28 .input_stream()
29 .dynamic_cast::<PollableInputStream>()
30 .ok()
31 .and_then(|s| s.into_async_read().ok());
32
33 let (read, write) = match (read, write) {
34 (Some(read), Some(write)) => (read, write),
35 _ => return Err(self),
36 };
37
38 Ok(IOStreamAsyncReadWrite {
39 io_stream: self,
40 read,
41 write,
42 })
43 }
44}
45
46impl<O: IsA<IOStream>> IOStreamExtManual for O {}
47
48#[derive(Debug)]
49pub struct IOStreamAsyncReadWrite<T> {
50 io_stream: T,
51 read: InputStreamAsyncRead<PollableInputStream>,
52 write: OutputStreamAsyncWrite<PollableOutputStream>,
53}
54
55impl<T: IsA<IOStream>> IOStreamAsyncReadWrite<T> {
56 pub fn input_stream(&self) -> &PollableInputStream {
57 self.read.input_stream()
58 }
59
60 pub fn output_stream(&self) -> &PollableOutputStream {
61 self.write.output_stream()
62 }
63
64 pub fn into_io_stream(self) -> T {
65 self.io_stream
66 }
67
68 pub fn io_stream(&self) -> &T {
69 &self.io_stream
70 }
71}
72
73impl<T: IsA<IOStream> + std::marker::Unpin> AsyncRead for IOStreamAsyncReadWrite<T> {
74 fn poll_read(
75 self: Pin<&mut Self>,
76 cx: &mut Context<'_>,
77 buf: &mut [u8],
78 ) -> Poll<Result<usize, io::Error>> {
79 Pin::new(&mut Pin::get_mut(self).read).poll_read(cx, buf)
80 }
81}
82
83impl<T: IsA<IOStream> + std::marker::Unpin> AsyncWrite for IOStreamAsyncReadWrite<T> {
84 fn poll_write(
85 self: Pin<&mut Self>,
86 cx: &mut Context<'_>,
87 buf: &[u8],
88 ) -> Poll<Result<usize, io::Error>> {
89 Pin::new(&mut Pin::get_mut(self).write).poll_write(cx, buf)
90 }
91
92 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
93 Pin::new(&mut Pin::get_mut(self).write).poll_flush(cx)
94 }
95
96 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
97 Pin::new(&mut Pin::get_mut(self).write).poll_close(cx)
98 }
99}