1use std::{io, pin::Pin};
4
5use futures_core::task::{Context, Poll};
6use futures_io::{AsyncRead, AsyncWrite};
7use glib::prelude::*;
8
9use crate::{
10 prelude::*, IOStream, InputStreamAsyncRead, OutputStreamAsyncWrite, PollableInputStream,
11 PollableOutputStream,
12};
13
14pub trait IOStreamExtManual: Sized + IsA<IOStream> {
15 fn into_async_read_write(self) -> Result<IOStreamAsyncReadWrite<Self>, Self> {
16 let write = self
17 .output_stream()
18 .dynamic_cast::<PollableOutputStream>()
19 .ok()
20 .and_then(|s| s.into_async_write().ok());
21
22 let read = self
23 .input_stream()
24 .dynamic_cast::<PollableInputStream>()
25 .ok()
26 .and_then(|s| s.into_async_read().ok());
27
28 let (read, write) = match (read, write) {
29 (Some(read), Some(write)) => (read, write),
30 _ => return Err(self),
31 };
32
33 Ok(IOStreamAsyncReadWrite {
34 io_stream: self,
35 read,
36 write,
37 })
38 }
39}
40
41impl<O: IsA<IOStream>> IOStreamExtManual for O {}
42
43#[derive(Debug)]
44pub struct IOStreamAsyncReadWrite<T> {
45 io_stream: T,
46 read: InputStreamAsyncRead<PollableInputStream>,
47 write: OutputStreamAsyncWrite<PollableOutputStream>,
48}
49
50impl<T: IsA<IOStream>> IOStreamAsyncReadWrite<T> {
51 pub fn input_stream(&self) -> &PollableInputStream {
52 self.read.input_stream()
53 }
54
55 pub fn output_stream(&self) -> &PollableOutputStream {
56 self.write.output_stream()
57 }
58
59 pub fn into_io_stream(self) -> T {
60 self.io_stream
61 }
62
63 pub fn io_stream(&self) -> &T {
64 &self.io_stream
65 }
66}
67
68impl<T: IsA<IOStream> + std::marker::Unpin> AsyncRead for IOStreamAsyncReadWrite<T> {
69 fn poll_read(
70 self: Pin<&mut Self>,
71 cx: &mut Context<'_>,
72 buf: &mut [u8],
73 ) -> Poll<Result<usize, io::Error>> {
74 Pin::new(&mut Pin::get_mut(self).read).poll_read(cx, buf)
75 }
76}
77
78impl<T: IsA<IOStream> + std::marker::Unpin> AsyncWrite for IOStreamAsyncReadWrite<T> {
79 fn poll_write(
80 self: Pin<&mut Self>,
81 cx: &mut Context<'_>,
82 buf: &[u8],
83 ) -> Poll<Result<usize, io::Error>> {
84 Pin::new(&mut Pin::get_mut(self).write).poll_write(cx, buf)
85 }
86
87 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
88 Pin::new(&mut Pin::get_mut(self).write).poll_flush(cx)
89 }
90
91 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
92 Pin::new(&mut Pin::get_mut(self).write).poll_close(cx)
93 }
94}