gio/
read_input_stream.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{
4    any::Any,
5    io::{Read, Seek},
6};
7
8use crate::{prelude::*, subclass::prelude::*, InputStream};
9
10mod imp {
11    use std::cell::RefCell;
12
13    use super::*;
14
15    pub(super) enum Reader {
16        Read(AnyReader),
17        ReadSeek(AnyReader),
18    }
19
20    #[derive(Default)]
21    pub struct ReadInputStream {
22        pub(super) read: RefCell<Option<Reader>>,
23    }
24
25    #[glib::object_subclass]
26    impl ObjectSubclass for ReadInputStream {
27        const NAME: &'static str = "ReadInputStream";
28        const ALLOW_NAME_CONFLICT: bool = true;
29        type Type = super::ReadInputStream;
30        type ParentType = InputStream;
31        type Interfaces = (crate::Seekable,);
32    }
33
34    impl ObjectImpl for ReadInputStream {}
35
36    impl InputStreamImpl for ReadInputStream {
37        fn read(
38            &self,
39            buffer: &mut [u8],
40            _cancellable: Option<&crate::Cancellable>,
41        ) -> Result<usize, glib::Error> {
42            let mut read = self.read.borrow_mut();
43            let read = match *read {
44                None => {
45                    return Err(glib::Error::new(
46                        crate::IOErrorEnum::Closed,
47                        "Already closed",
48                    ));
49                }
50                Some(Reader::Read(ref mut read)) => read,
51                Some(Reader::ReadSeek(ref mut read)) => read,
52            };
53
54            loop {
55                match std_error_to_gio_error(read.read(buffer)) {
56                    None => continue,
57                    Some(res) => return res,
58                }
59            }
60        }
61
62        fn close(&self, _cancellable: Option<&crate::Cancellable>) -> Result<(), glib::Error> {
63            let _ = self.read.take();
64            Ok(())
65        }
66    }
67
68    impl SeekableImpl for ReadInputStream {
69        fn tell(&self) -> i64 {
70            // XXX: stream_position is not stable yet
71            // let mut read = self.read.borrow_mut();
72            // match *read {
73            //     Some(Reader::ReadSeek(ref mut read)) => {
74            //         read.stream_position().map(|pos| pos as i64).unwrap_or(-1)
75            //     },
76            //     _ => -1,
77            // };
78            -1
79        }
80
81        fn can_seek(&self) -> bool {
82            let read = self.read.borrow();
83            matches!(*read, Some(Reader::ReadSeek(_)))
84        }
85
86        fn seek(
87            &self,
88            offset: i64,
89            type_: glib::SeekType,
90            _cancellable: Option<&crate::Cancellable>,
91        ) -> Result<(), glib::Error> {
92            use std::io::SeekFrom;
93
94            let mut read = self.read.borrow_mut();
95            match *read {
96                Some(Reader::ReadSeek(ref mut read)) => {
97                    let pos = match type_ {
98                        glib::SeekType::Cur => SeekFrom::Current(offset),
99                        glib::SeekType::Set => {
100                            if offset < 0 {
101                                return Err(glib::Error::new(
102                                    crate::IOErrorEnum::InvalidArgument,
103                                    "Invalid Argument",
104                                ));
105                            } else {
106                                SeekFrom::Start(offset as u64)
107                            }
108                        }
109                        glib::SeekType::End => SeekFrom::End(offset),
110                        _ => unimplemented!(),
111                    };
112
113                    loop {
114                        match std_error_to_gio_error(read.seek(pos)) {
115                            None => continue,
116                            Some(res) => return res.map(|_| ()),
117                        }
118                    }
119                }
120                _ => Err(glib::Error::new(
121                    crate::IOErrorEnum::NotSupported,
122                    "Truncating not supported",
123                )),
124            }
125        }
126
127        fn can_truncate(&self) -> bool {
128            false
129        }
130
131        fn truncate(
132            &self,
133            _offset: i64,
134            _cancellable: Option<&crate::Cancellable>,
135        ) -> Result<(), glib::Error> {
136            Err(glib::Error::new(
137                crate::IOErrorEnum::NotSupported,
138                "Truncating not supported",
139            ))
140        }
141    }
142}
143
144glib::wrapper! {
145    pub struct ReadInputStream(ObjectSubclass<imp::ReadInputStream>) @extends crate::InputStream, @implements crate::Seekable;
146}
147
148impl ReadInputStream {
149    pub fn new<R: Read + Send + 'static>(read: R) -> ReadInputStream {
150        let obj: Self = glib::Object::new();
151
152        *obj.imp().read.borrow_mut() = Some(imp::Reader::Read(AnyReader::new(read)));
153
154        obj
155    }
156
157    pub fn new_seekable<R: Read + Seek + Send + 'static>(read: R) -> ReadInputStream {
158        let obj: Self = glib::Object::new();
159
160        *obj.imp().read.borrow_mut() = Some(imp::Reader::ReadSeek(AnyReader::new_seekable(read)));
161
162        obj
163    }
164
165    pub fn close_and_take(&self) -> Box<dyn Any + Send + 'static> {
166        let inner = self.imp().read.take();
167
168        let ret = match inner {
169            None => {
170                panic!("Stream already closed or inner taken");
171            }
172            Some(imp::Reader::Read(read)) => read.reader,
173            Some(imp::Reader::ReadSeek(read)) => read.reader,
174        };
175
176        let _ = self.close(crate::Cancellable::NONE);
177
178        match ret {
179            AnyOrPanic::Any(r) => r,
180            AnyOrPanic::Panic(p) => std::panic::resume_unwind(p),
181        }
182    }
183}
184
185enum AnyOrPanic {
186    Any(Box<dyn Any + Send + 'static>),
187    Panic(Box<dyn Any + Send + 'static>),
188}
189
190// Helper struct for dynamically dispatching to any kind of Reader and
191// catching panics along the way
192struct AnyReader {
193    reader: AnyOrPanic,
194    read_fn: fn(s: &mut AnyReader, buffer: &mut [u8]) -> std::io::Result<usize>,
195    seek_fn: Option<fn(s: &mut AnyReader, pos: std::io::SeekFrom) -> std::io::Result<u64>>,
196}
197
198impl AnyReader {
199    fn new<R: Read + Any + Send + 'static>(r: R) -> Self {
200        Self {
201            reader: AnyOrPanic::Any(Box::new(r)),
202            read_fn: Self::read_fn::<R>,
203            seek_fn: None,
204        }
205    }
206
207    fn new_seekable<R: Read + Seek + Any + Send + 'static>(r: R) -> Self {
208        Self {
209            reader: AnyOrPanic::Any(Box::new(r)),
210            read_fn: Self::read_fn::<R>,
211            seek_fn: Some(Self::seek_fn::<R>),
212        }
213    }
214
215    fn read_fn<R: Read + 'static>(s: &mut AnyReader, buffer: &mut [u8]) -> std::io::Result<usize> {
216        s.with_inner(|r: &mut R| r.read(buffer))
217    }
218
219    fn seek_fn<R: Seek + 'static>(
220        s: &mut AnyReader,
221        pos: std::io::SeekFrom,
222    ) -> std::io::Result<u64> {
223        s.with_inner(|r: &mut R| r.seek(pos))
224    }
225
226    fn with_inner<R: 'static, T, F: FnOnce(&mut R) -> std::io::Result<T>>(
227        &mut self,
228        func: F,
229    ) -> std::io::Result<T> {
230        match self.reader {
231            AnyOrPanic::Any(ref mut reader) => {
232                let r = reader.downcast_mut::<R>().unwrap();
233                match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| func(r))) {
234                    Ok(res) => res,
235                    Err(panic) => {
236                        self.reader = AnyOrPanic::Panic(panic);
237                        Err(std::io::Error::new(std::io::ErrorKind::Other, "Panicked"))
238                    }
239                }
240            }
241            AnyOrPanic::Panic(_) => Err(std::io::Error::new(
242                std::io::ErrorKind::Other,
243                "Panicked before",
244            )),
245        }
246    }
247
248    fn read(&mut self, buffer: &mut [u8]) -> std::io::Result<usize> {
249        (self.read_fn)(self, buffer)
250    }
251
252    fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> {
253        if let Some(ref seek_fn) = self.seek_fn {
254            seek_fn(self, pos)
255        } else {
256            unreachable!()
257        }
258    }
259}
260
261pub(crate) fn std_error_to_gio_error<T>(
262    res: Result<T, std::io::Error>,
263) -> Option<Result<T, glib::Error>> {
264    match res {
265        Ok(res) => Some(Ok(res)),
266        Err(err) => {
267            use std::io::ErrorKind;
268
269            #[allow(clippy::wildcard_in_or_patterns)]
270            match err.kind() {
271                ErrorKind::NotFound => Some(Err(glib::Error::new(
272                    crate::IOErrorEnum::NotFound,
273                    "Not Found",
274                ))),
275                ErrorKind::PermissionDenied => Some(Err(glib::Error::new(
276                    crate::IOErrorEnum::PermissionDenied,
277                    "Permission Denied",
278                ))),
279                ErrorKind::ConnectionRefused => Some(Err(glib::Error::new(
280                    crate::IOErrorEnum::ConnectionRefused,
281                    "Connection Refused",
282                ))),
283                ErrorKind::ConnectionReset
284                | ErrorKind::ConnectionAborted
285                | ErrorKind::NotConnected => Some(Err(glib::Error::new(
286                    crate::IOErrorEnum::NotConnected,
287                    "Connection Reset",
288                ))),
289                ErrorKind::AddrInUse | ErrorKind::AddrNotAvailable => Some(Err(glib::Error::new(
290                    crate::IOErrorEnum::AddressInUse,
291                    "Address In Use",
292                ))),
293                ErrorKind::BrokenPipe => Some(Err(glib::Error::new(
294                    crate::IOErrorEnum::BrokenPipe,
295                    "Broken Pipe",
296                ))),
297                ErrorKind::AlreadyExists => Some(Err(glib::Error::new(
298                    crate::IOErrorEnum::Exists,
299                    "Already Exists",
300                ))),
301                ErrorKind::WouldBlock => Some(Err(glib::Error::new(
302                    crate::IOErrorEnum::WouldBlock,
303                    "Would Block",
304                ))),
305                ErrorKind::InvalidInput | ErrorKind::InvalidData => Some(Err(glib::Error::new(
306                    crate::IOErrorEnum::InvalidData,
307                    "Invalid Input",
308                ))),
309                ErrorKind::TimedOut => Some(Err(glib::Error::new(
310                    crate::IOErrorEnum::TimedOut,
311                    "Timed Out",
312                ))),
313                ErrorKind::Interrupted => None,
314                ErrorKind::UnexpectedEof => Some(Err(glib::Error::new(
315                    crate::IOErrorEnum::Closed,
316                    "Unexpected Eof",
317                ))),
318                ErrorKind::WriteZero | _ => Some(Err(glib::Error::new(
319                    crate::IOErrorEnum::Failed,
320                    format!("Unknown error: {err:?}").as_str(),
321                ))),
322            }
323        }
324    }
325}
326
327#[cfg(test)]
328mod tests {
329    use std::io::Cursor;
330
331    use super::*;
332
333    #[test]
334    fn test_read() {
335        let cursor = Cursor::new(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
336        let stream = ReadInputStream::new(cursor);
337
338        let mut buf = [0u8; 1024];
339        assert_eq!(stream.read(&mut buf[..], crate::Cancellable::NONE), Ok(10));
340        assert_eq!(&buf[..10], &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10][..]);
341
342        assert_eq!(stream.read(&mut buf[..], crate::Cancellable::NONE), Ok(0));
343
344        let inner = stream.close_and_take();
345        assert!(inner.is::<Cursor<Vec<u8>>>());
346        let inner = inner.downcast_ref::<Cursor<Vec<u8>>>().unwrap();
347        assert_eq!(inner.get_ref(), &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
348    }
349
350    #[test]
351    fn test_read_seek() {
352        let cursor = Cursor::new(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
353        let stream = ReadInputStream::new_seekable(cursor);
354
355        let mut buf = [0u8; 1024];
356        assert_eq!(stream.read(&mut buf[..], crate::Cancellable::NONE), Ok(10));
357        assert_eq!(&buf[..10], &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10][..]);
358
359        assert_eq!(stream.read(&mut buf[..], crate::Cancellable::NONE), Ok(0));
360
361        assert!(stream.can_seek());
362        assert_eq!(
363            stream.seek(0, glib::SeekType::Set, crate::Cancellable::NONE),
364            Ok(())
365        );
366        assert_eq!(stream.read(&mut buf[..], crate::Cancellable::NONE), Ok(10));
367        assert_eq!(&buf[..10], &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10][..]);
368
369        let inner = stream.close_and_take();
370        assert!(inner.is::<Cursor<Vec<u8>>>());
371        let inner = inner.downcast_ref::<Cursor<Vec<u8>>>().unwrap();
372        assert_eq!(inner.get_ref(), &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
373    }
374}