gio/
io_stream.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{io, pin::Pin};
4
5use futures_core::task::{Context, Poll};
6use futures_io::{AsyncRead, AsyncWrite};
7use glib::prelude::*;
8
9use crate::{
10    prelude::*, IOStream, InputStreamAsyncRead, OutputStreamAsyncWrite, PollableInputStream,
11    PollableOutputStream,
12};
13
14mod sealed {
15    pub trait Sealed {}
16    impl<T: super::IsA<super::IOStream>> Sealed for T {}
17}
18
19pub trait IOStreamExtManual: sealed::Sealed + Sized + IsA<IOStream> {
20    fn into_async_read_write(self) -> Result<IOStreamAsyncReadWrite<Self>, Self> {
21        let write = self
22            .output_stream()
23            .dynamic_cast::<PollableOutputStream>()
24            .ok()
25            .and_then(|s| s.into_async_write().ok());
26
27        let read = self
28            .input_stream()
29            .dynamic_cast::<PollableInputStream>()
30            .ok()
31            .and_then(|s| s.into_async_read().ok());
32
33        let (read, write) = match (read, write) {
34            (Some(read), Some(write)) => (read, write),
35            _ => return Err(self),
36        };
37
38        Ok(IOStreamAsyncReadWrite {
39            io_stream: self,
40            read,
41            write,
42        })
43    }
44}
45
46impl<O: IsA<IOStream>> IOStreamExtManual for O {}
47
48#[derive(Debug)]
49pub struct IOStreamAsyncReadWrite<T> {
50    io_stream: T,
51    read: InputStreamAsyncRead<PollableInputStream>,
52    write: OutputStreamAsyncWrite<PollableOutputStream>,
53}
54
55impl<T: IsA<IOStream>> IOStreamAsyncReadWrite<T> {
56    pub fn input_stream(&self) -> &PollableInputStream {
57        self.read.input_stream()
58    }
59
60    pub fn output_stream(&self) -> &PollableOutputStream {
61        self.write.output_stream()
62    }
63
64    pub fn into_io_stream(self) -> T {
65        self.io_stream
66    }
67
68    pub fn io_stream(&self) -> &T {
69        &self.io_stream
70    }
71}
72
73impl<T: IsA<IOStream> + std::marker::Unpin> AsyncRead for IOStreamAsyncReadWrite<T> {
74    fn poll_read(
75        self: Pin<&mut Self>,
76        cx: &mut Context<'_>,
77        buf: &mut [u8],
78    ) -> Poll<Result<usize, io::Error>> {
79        Pin::new(&mut Pin::get_mut(self).read).poll_read(cx, buf)
80    }
81}
82
83impl<T: IsA<IOStream> + std::marker::Unpin> AsyncWrite for IOStreamAsyncReadWrite<T> {
84    fn poll_write(
85        self: Pin<&mut Self>,
86        cx: &mut Context<'_>,
87        buf: &[u8],
88    ) -> Poll<Result<usize, io::Error>> {
89        Pin::new(&mut Pin::get_mut(self).write).poll_write(cx, buf)
90    }
91
92    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
93        Pin::new(&mut Pin::get_mut(self).write).poll_flush(cx)
94    }
95
96    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
97        Pin::new(&mut Pin::get_mut(self).write).poll_close(cx)
98    }
99}