gio/
read_input_stream.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{
4    any::Any,
5    io::{Read, Seek},
6};
7
8use crate::{prelude::*, subclass::prelude::*, InputStream};
9
10mod imp {
11    use std::cell::RefCell;
12
13    use super::*;
14
15    pub(super) enum Reader {
16        Read(AnyReader),
17        ReadSeek(AnyReader),
18    }
19
20    #[derive(Default)]
21    pub struct ReadInputStream {
22        pub(super) read: RefCell<Option<Reader>>,
23    }
24
25    #[glib::object_subclass]
26    impl ObjectSubclass for ReadInputStream {
27        const NAME: &'static str = "ReadInputStream";
28        const ALLOW_NAME_CONFLICT: bool = true;
29        type Type = super::ReadInputStream;
30        type ParentType = InputStream;
31        type Interfaces = (crate::Seekable,);
32    }
33
34    impl ObjectImpl for ReadInputStream {}
35
36    impl InputStreamImpl for ReadInputStream {
37        fn read(
38            &self,
39            buffer: &mut [u8],
40            _cancellable: Option<&crate::Cancellable>,
41        ) -> Result<usize, glib::Error> {
42            let mut read = self.read.borrow_mut();
43            let read = match *read {
44                None => {
45                    return Err(glib::Error::new(
46                        crate::IOErrorEnum::Closed,
47                        "Already closed",
48                    ));
49                }
50                Some(Reader::Read(ref mut read)) => read,
51                Some(Reader::ReadSeek(ref mut read)) => read,
52            };
53
54            loop {
55                match std_error_to_gio_error(read.read(buffer)) {
56                    None => continue,
57                    Some(res) => return res,
58                }
59            }
60        }
61
62        fn close(&self, _cancellable: Option<&crate::Cancellable>) -> Result<(), glib::Error> {
63            let _ = self.read.take();
64            Ok(())
65        }
66    }
67
68    impl SeekableImpl for ReadInputStream {
69        fn tell(&self) -> i64 {
70            // XXX: stream_position is not stable yet
71            // let mut read = self.read.borrow_mut();
72            // match *read {
73            //     Some(Reader::ReadSeek(ref mut read)) => {
74            //         read.stream_position().map(|pos| pos as i64).unwrap_or(-1)
75            //     },
76            //     _ => -1,
77            // };
78            -1
79        }
80
81        fn can_seek(&self) -> bool {
82            let read = self.read.borrow();
83            matches!(*read, Some(Reader::ReadSeek(_)))
84        }
85
86        fn seek(
87            &self,
88            offset: i64,
89            type_: glib::SeekType,
90            _cancellable: Option<&crate::Cancellable>,
91        ) -> Result<(), glib::Error> {
92            use std::io::SeekFrom;
93
94            let mut read = self.read.borrow_mut();
95            match *read {
96                Some(Reader::ReadSeek(ref mut read)) => {
97                    let pos = match type_ {
98                        glib::SeekType::Cur => SeekFrom::Current(offset),
99                        glib::SeekType::Set => {
100                            if offset < 0 {
101                                return Err(glib::Error::new(
102                                    crate::IOErrorEnum::InvalidArgument,
103                                    "Invalid Argument",
104                                ));
105                            } else {
106                                SeekFrom::Start(offset as u64)
107                            }
108                        }
109                        glib::SeekType::End => SeekFrom::End(offset),
110                        _ => unimplemented!(),
111                    };
112
113                    loop {
114                        match std_error_to_gio_error(read.seek(pos)) {
115                            None => continue,
116                            Some(res) => return res.map(|_| ()),
117                        }
118                    }
119                }
120                _ => Err(glib::Error::new(
121                    crate::IOErrorEnum::NotSupported,
122                    "Truncating not supported",
123                )),
124            }
125        }
126
127        fn can_truncate(&self) -> bool {
128            false
129        }
130
131        fn truncate(
132            &self,
133            _offset: i64,
134            _cancellable: Option<&crate::Cancellable>,
135        ) -> Result<(), glib::Error> {
136            Err(glib::Error::new(
137                crate::IOErrorEnum::NotSupported,
138                "Truncating not supported",
139            ))
140        }
141    }
142}
143
144glib::wrapper! {
145    pub struct ReadInputStream(ObjectSubclass<imp::ReadInputStream>) @extends crate::InputStream, @implements crate::Seekable;
146}
147
148impl ReadInputStream {
149    pub fn new<R: Read + Send + 'static>(read: R) -> ReadInputStream {
150        let obj: Self = glib::Object::new();
151
152        *obj.imp().read.borrow_mut() = Some(imp::Reader::Read(AnyReader::new(read)));
153
154        obj
155    }
156
157    pub fn new_seekable<R: Read + Seek + Send + 'static>(read: R) -> ReadInputStream {
158        let obj: Self = glib::Object::new();
159
160        *obj.imp().read.borrow_mut() = Some(imp::Reader::ReadSeek(AnyReader::new_seekable(read)));
161
162        obj
163    }
164
165    pub fn close_and_take(&self) -> Box<dyn Any + Send + 'static> {
166        let inner = self.imp().read.take();
167
168        let ret = match inner {
169            None => {
170                panic!("Stream already closed or inner taken");
171            }
172            Some(imp::Reader::Read(read)) => read.reader,
173            Some(imp::Reader::ReadSeek(read)) => read.reader,
174        };
175
176        let _ = self.close(crate::Cancellable::NONE);
177
178        match ret {
179            AnyOrPanic::Any(r) => r,
180            AnyOrPanic::Panic(p) => std::panic::resume_unwind(p),
181        }
182    }
183}
184
185enum AnyOrPanic {
186    Any(Box<dyn Any + Send + 'static>),
187    Panic(Box<dyn Any + Send + 'static>),
188}
189
190// Helper struct for dynamically dispatching to any kind of Reader and
191// catching panics along the way
192struct AnyReader {
193    reader: AnyOrPanic,
194    read_fn: fn(s: &mut AnyReader, buffer: &mut [u8]) -> std::io::Result<usize>,
195    seek_fn: Option<fn(s: &mut AnyReader, pos: std::io::SeekFrom) -> std::io::Result<u64>>,
196}
197
198impl AnyReader {
199    fn new<R: Read + Any + Send + 'static>(r: R) -> Self {
200        Self {
201            reader: AnyOrPanic::Any(Box::new(r)),
202            read_fn: Self::read_fn::<R>,
203            seek_fn: None,
204        }
205    }
206
207    fn new_seekable<R: Read + Seek + Any + Send + 'static>(r: R) -> Self {
208        Self {
209            reader: AnyOrPanic::Any(Box::new(r)),
210            read_fn: Self::read_fn::<R>,
211            seek_fn: Some(Self::seek_fn::<R>),
212        }
213    }
214
215    fn read_fn<R: Read + 'static>(s: &mut AnyReader, buffer: &mut [u8]) -> std::io::Result<usize> {
216        s.with_inner(|r: &mut R| r.read(buffer))
217    }
218
219    fn seek_fn<R: Seek + 'static>(
220        s: &mut AnyReader,
221        pos: std::io::SeekFrom,
222    ) -> std::io::Result<u64> {
223        s.with_inner(|r: &mut R| r.seek(pos))
224    }
225
226    fn with_inner<R: 'static, T, F: FnOnce(&mut R) -> std::io::Result<T>>(
227        &mut self,
228        func: F,
229    ) -> std::io::Result<T> {
230        match self.reader {
231            AnyOrPanic::Any(ref mut reader) => {
232                let r = reader.downcast_mut::<R>().unwrap();
233                match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| func(r))) {
234                    Ok(res) => res,
235                    Err(panic) => {
236                        self.reader = AnyOrPanic::Panic(panic);
237                        Err(std::io::Error::other("Panicked"))
238                    }
239                }
240            }
241            AnyOrPanic::Panic(_) => Err(std::io::Error::other("Panicked before")),
242        }
243    }
244
245    fn read(&mut self, buffer: &mut [u8]) -> std::io::Result<usize> {
246        (self.read_fn)(self, buffer)
247    }
248
249    fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> {
250        if let Some(ref seek_fn) = self.seek_fn {
251            seek_fn(self, pos)
252        } else {
253            unreachable!()
254        }
255    }
256}
257
258pub(crate) fn std_error_to_gio_error<T>(
259    res: Result<T, std::io::Error>,
260) -> Option<Result<T, glib::Error>> {
261    match res {
262        Ok(res) => Some(Ok(res)),
263        Err(err) => {
264            use std::io::ErrorKind;
265
266            #[allow(clippy::wildcard_in_or_patterns)]
267            match err.kind() {
268                ErrorKind::NotFound => Some(Err(glib::Error::new(
269                    crate::IOErrorEnum::NotFound,
270                    "Not Found",
271                ))),
272                ErrorKind::PermissionDenied => Some(Err(glib::Error::new(
273                    crate::IOErrorEnum::PermissionDenied,
274                    "Permission Denied",
275                ))),
276                ErrorKind::ConnectionRefused => Some(Err(glib::Error::new(
277                    crate::IOErrorEnum::ConnectionRefused,
278                    "Connection Refused",
279                ))),
280                ErrorKind::ConnectionReset
281                | ErrorKind::ConnectionAborted
282                | ErrorKind::NotConnected => Some(Err(glib::Error::new(
283                    crate::IOErrorEnum::NotConnected,
284                    "Connection Reset",
285                ))),
286                ErrorKind::AddrInUse | ErrorKind::AddrNotAvailable => Some(Err(glib::Error::new(
287                    crate::IOErrorEnum::AddressInUse,
288                    "Address In Use",
289                ))),
290                ErrorKind::BrokenPipe => Some(Err(glib::Error::new(
291                    crate::IOErrorEnum::BrokenPipe,
292                    "Broken Pipe",
293                ))),
294                ErrorKind::AlreadyExists => Some(Err(glib::Error::new(
295                    crate::IOErrorEnum::Exists,
296                    "Already Exists",
297                ))),
298                ErrorKind::WouldBlock => Some(Err(glib::Error::new(
299                    crate::IOErrorEnum::WouldBlock,
300                    "Would Block",
301                ))),
302                ErrorKind::InvalidInput | ErrorKind::InvalidData => Some(Err(glib::Error::new(
303                    crate::IOErrorEnum::InvalidData,
304                    "Invalid Input",
305                ))),
306                ErrorKind::TimedOut => Some(Err(glib::Error::new(
307                    crate::IOErrorEnum::TimedOut,
308                    "Timed Out",
309                ))),
310                ErrorKind::Interrupted => None,
311                ErrorKind::UnexpectedEof => Some(Err(glib::Error::new(
312                    crate::IOErrorEnum::Closed,
313                    "Unexpected Eof",
314                ))),
315                ErrorKind::WriteZero | _ => Some(Err(glib::Error::new(
316                    crate::IOErrorEnum::Failed,
317                    format!("Unknown error: {err:?}").as_str(),
318                ))),
319            }
320        }
321    }
322}
323
324#[cfg(test)]
325mod tests {
326    use std::io::Cursor;
327
328    use super::*;
329
330    #[test]
331    fn test_read() {
332        let cursor = Cursor::new(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
333        let stream = ReadInputStream::new(cursor);
334
335        let mut buf = [0u8; 1024];
336        assert_eq!(stream.read(&mut buf[..], crate::Cancellable::NONE), Ok(10));
337        assert_eq!(&buf[..10], &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10][..]);
338
339        assert_eq!(stream.read(&mut buf[..], crate::Cancellable::NONE), Ok(0));
340
341        let inner = stream.close_and_take();
342        assert!(inner.is::<Cursor<Vec<u8>>>());
343        let inner = inner.downcast_ref::<Cursor<Vec<u8>>>().unwrap();
344        assert_eq!(inner.get_ref(), &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
345    }
346
347    #[test]
348    fn test_read_seek() {
349        let cursor = Cursor::new(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
350        let stream = ReadInputStream::new_seekable(cursor);
351
352        let mut buf = [0u8; 1024];
353        assert_eq!(stream.read(&mut buf[..], crate::Cancellable::NONE), Ok(10));
354        assert_eq!(&buf[..10], &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10][..]);
355
356        assert_eq!(stream.read(&mut buf[..], crate::Cancellable::NONE), Ok(0));
357
358        assert!(stream.can_seek());
359        assert_eq!(
360            stream.seek(0, glib::SeekType::Set, crate::Cancellable::NONE),
361            Ok(())
362        );
363        assert_eq!(stream.read(&mut buf[..], crate::Cancellable::NONE), Ok(10));
364        assert_eq!(&buf[..10], &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10][..]);
365
366        let inner = stream.close_and_take();
367        assert!(inner.is::<Cursor<Vec<u8>>>());
368        let inner = inner.downcast_ref::<Cursor<Vec<u8>>>().unwrap();
369        assert_eq!(inner.get_ref(), &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
370    }
371}