1use std::{
4 any::Any,
5 io::{Read, Seek},
6};
7
8use crate::{prelude::*, subclass::prelude::*, InputStream};
9
10mod imp {
11 use std::cell::RefCell;
12
13 use super::*;
14
15 pub(super) enum Reader {
16 Read(AnyReader),
17 ReadSeek(AnyReader),
18 }
19
20 #[derive(Default)]
21 pub struct ReadInputStream {
22 pub(super) read: RefCell<Option<Reader>>,
23 }
24
25 #[glib::object_subclass]
26 impl ObjectSubclass for ReadInputStream {
27 const NAME: &'static str = "ReadInputStream";
28 const ALLOW_NAME_CONFLICT: bool = true;
29 type Type = super::ReadInputStream;
30 type ParentType = InputStream;
31 type Interfaces = (crate::Seekable,);
32 }
33
34 impl ObjectImpl for ReadInputStream {}
35
36 impl InputStreamImpl for ReadInputStream {
37 fn read(
38 &self,
39 buffer: &mut [u8],
40 _cancellable: Option<&crate::Cancellable>,
41 ) -> Result<usize, glib::Error> {
42 let mut read = self.read.borrow_mut();
43 let read = match *read {
44 None => {
45 return Err(glib::Error::new(
46 crate::IOErrorEnum::Closed,
47 "Already closed",
48 ));
49 }
50 Some(Reader::Read(ref mut read)) => read,
51 Some(Reader::ReadSeek(ref mut read)) => read,
52 };
53
54 loop {
55 match std_error_to_gio_error(read.read(buffer)) {
56 None => continue,
57 Some(res) => return res,
58 }
59 }
60 }
61
62 fn close(&self, _cancellable: Option<&crate::Cancellable>) -> Result<(), glib::Error> {
63 let _ = self.read.take();
64 Ok(())
65 }
66 }
67
68 impl SeekableImpl for ReadInputStream {
69 fn tell(&self) -> i64 {
70 -1
79 }
80
81 fn can_seek(&self) -> bool {
82 let read = self.read.borrow();
83 matches!(*read, Some(Reader::ReadSeek(_)))
84 }
85
86 fn seek(
87 &self,
88 offset: i64,
89 type_: glib::SeekType,
90 _cancellable: Option<&crate::Cancellable>,
91 ) -> Result<(), glib::Error> {
92 use std::io::SeekFrom;
93
94 let mut read = self.read.borrow_mut();
95 match *read {
96 Some(Reader::ReadSeek(ref mut read)) => {
97 let pos = match type_ {
98 glib::SeekType::Cur => SeekFrom::Current(offset),
99 glib::SeekType::Set => {
100 if offset < 0 {
101 return Err(glib::Error::new(
102 crate::IOErrorEnum::InvalidArgument,
103 "Invalid Argument",
104 ));
105 } else {
106 SeekFrom::Start(offset as u64)
107 }
108 }
109 glib::SeekType::End => SeekFrom::End(offset),
110 _ => unimplemented!(),
111 };
112
113 loop {
114 match std_error_to_gio_error(read.seek(pos)) {
115 None => continue,
116 Some(res) => return res.map(|_| ()),
117 }
118 }
119 }
120 _ => Err(glib::Error::new(
121 crate::IOErrorEnum::NotSupported,
122 "Truncating not supported",
123 )),
124 }
125 }
126
127 fn can_truncate(&self) -> bool {
128 false
129 }
130
131 fn truncate(
132 &self,
133 _offset: i64,
134 _cancellable: Option<&crate::Cancellable>,
135 ) -> Result<(), glib::Error> {
136 Err(glib::Error::new(
137 crate::IOErrorEnum::NotSupported,
138 "Truncating not supported",
139 ))
140 }
141 }
142}
143
144glib::wrapper! {
145 pub struct ReadInputStream(ObjectSubclass<imp::ReadInputStream>) @extends crate::InputStream, @implements crate::Seekable;
146}
147
148impl ReadInputStream {
149 pub fn new<R: Read + Send + 'static>(read: R) -> ReadInputStream {
150 let obj: Self = glib::Object::new();
151
152 *obj.imp().read.borrow_mut() = Some(imp::Reader::Read(AnyReader::new(read)));
153
154 obj
155 }
156
157 pub fn new_seekable<R: Read + Seek + Send + 'static>(read: R) -> ReadInputStream {
158 let obj: Self = glib::Object::new();
159
160 *obj.imp().read.borrow_mut() = Some(imp::Reader::ReadSeek(AnyReader::new_seekable(read)));
161
162 obj
163 }
164
165 pub fn close_and_take(&self) -> Box<dyn Any + Send + 'static> {
166 let inner = self.imp().read.take();
167
168 let ret = match inner {
169 None => {
170 panic!("Stream already closed or inner taken");
171 }
172 Some(imp::Reader::Read(read)) => read.reader,
173 Some(imp::Reader::ReadSeek(read)) => read.reader,
174 };
175
176 let _ = self.close(crate::Cancellable::NONE);
177
178 match ret {
179 AnyOrPanic::Any(r) => r,
180 AnyOrPanic::Panic(p) => std::panic::resume_unwind(p),
181 }
182 }
183}
184
185enum AnyOrPanic {
186 Any(Box<dyn Any + Send + 'static>),
187 Panic(Box<dyn Any + Send + 'static>),
188}
189
190struct AnyReader {
193 reader: AnyOrPanic,
194 read_fn: fn(s: &mut AnyReader, buffer: &mut [u8]) -> std::io::Result<usize>,
195 seek_fn: Option<fn(s: &mut AnyReader, pos: std::io::SeekFrom) -> std::io::Result<u64>>,
196}
197
198impl AnyReader {
199 fn new<R: Read + Any + Send + 'static>(r: R) -> Self {
200 Self {
201 reader: AnyOrPanic::Any(Box::new(r)),
202 read_fn: Self::read_fn::<R>,
203 seek_fn: None,
204 }
205 }
206
207 fn new_seekable<R: Read + Seek + Any + Send + 'static>(r: R) -> Self {
208 Self {
209 reader: AnyOrPanic::Any(Box::new(r)),
210 read_fn: Self::read_fn::<R>,
211 seek_fn: Some(Self::seek_fn::<R>),
212 }
213 }
214
215 fn read_fn<R: Read + 'static>(s: &mut AnyReader, buffer: &mut [u8]) -> std::io::Result<usize> {
216 s.with_inner(|r: &mut R| r.read(buffer))
217 }
218
219 fn seek_fn<R: Seek + 'static>(
220 s: &mut AnyReader,
221 pos: std::io::SeekFrom,
222 ) -> std::io::Result<u64> {
223 s.with_inner(|r: &mut R| r.seek(pos))
224 }
225
226 fn with_inner<R: 'static, T, F: FnOnce(&mut R) -> std::io::Result<T>>(
227 &mut self,
228 func: F,
229 ) -> std::io::Result<T> {
230 match self.reader {
231 AnyOrPanic::Any(ref mut reader) => {
232 let r = reader.downcast_mut::<R>().unwrap();
233 match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| func(r))) {
234 Ok(res) => res,
235 Err(panic) => {
236 self.reader = AnyOrPanic::Panic(panic);
237 Err(std::io::Error::other("Panicked"))
238 }
239 }
240 }
241 AnyOrPanic::Panic(_) => Err(std::io::Error::other("Panicked before")),
242 }
243 }
244
245 fn read(&mut self, buffer: &mut [u8]) -> std::io::Result<usize> {
246 (self.read_fn)(self, buffer)
247 }
248
249 fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> {
250 if let Some(ref seek_fn) = self.seek_fn {
251 seek_fn(self, pos)
252 } else {
253 unreachable!()
254 }
255 }
256}
257
258pub(crate) fn std_error_to_gio_error<T>(
259 res: Result<T, std::io::Error>,
260) -> Option<Result<T, glib::Error>> {
261 match res {
262 Ok(res) => Some(Ok(res)),
263 Err(err) => {
264 use std::io::ErrorKind;
265
266 #[allow(clippy::wildcard_in_or_patterns)]
267 match err.kind() {
268 ErrorKind::NotFound => Some(Err(glib::Error::new(
269 crate::IOErrorEnum::NotFound,
270 "Not Found",
271 ))),
272 ErrorKind::PermissionDenied => Some(Err(glib::Error::new(
273 crate::IOErrorEnum::PermissionDenied,
274 "Permission Denied",
275 ))),
276 ErrorKind::ConnectionRefused => Some(Err(glib::Error::new(
277 crate::IOErrorEnum::ConnectionRefused,
278 "Connection Refused",
279 ))),
280 ErrorKind::ConnectionReset
281 | ErrorKind::ConnectionAborted
282 | ErrorKind::NotConnected => Some(Err(glib::Error::new(
283 crate::IOErrorEnum::NotConnected,
284 "Connection Reset",
285 ))),
286 ErrorKind::AddrInUse | ErrorKind::AddrNotAvailable => Some(Err(glib::Error::new(
287 crate::IOErrorEnum::AddressInUse,
288 "Address In Use",
289 ))),
290 ErrorKind::BrokenPipe => Some(Err(glib::Error::new(
291 crate::IOErrorEnum::BrokenPipe,
292 "Broken Pipe",
293 ))),
294 ErrorKind::AlreadyExists => Some(Err(glib::Error::new(
295 crate::IOErrorEnum::Exists,
296 "Already Exists",
297 ))),
298 ErrorKind::WouldBlock => Some(Err(glib::Error::new(
299 crate::IOErrorEnum::WouldBlock,
300 "Would Block",
301 ))),
302 ErrorKind::InvalidInput | ErrorKind::InvalidData => Some(Err(glib::Error::new(
303 crate::IOErrorEnum::InvalidData,
304 "Invalid Input",
305 ))),
306 ErrorKind::TimedOut => Some(Err(glib::Error::new(
307 crate::IOErrorEnum::TimedOut,
308 "Timed Out",
309 ))),
310 ErrorKind::Interrupted => None,
311 ErrorKind::UnexpectedEof => Some(Err(glib::Error::new(
312 crate::IOErrorEnum::Closed,
313 "Unexpected Eof",
314 ))),
315 ErrorKind::WriteZero | _ => Some(Err(glib::Error::new(
316 crate::IOErrorEnum::Failed,
317 format!("Unknown error: {err:?}").as_str(),
318 ))),
319 }
320 }
321 }
322}
323
324#[cfg(test)]
325mod tests {
326 use std::io::Cursor;
327
328 use super::*;
329
330 #[test]
331 fn test_read() {
332 let cursor = Cursor::new(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
333 let stream = ReadInputStream::new(cursor);
334
335 let mut buf = [0u8; 1024];
336 assert_eq!(stream.read(&mut buf[..], crate::Cancellable::NONE), Ok(10));
337 assert_eq!(&buf[..10], &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10][..]);
338
339 assert_eq!(stream.read(&mut buf[..], crate::Cancellable::NONE), Ok(0));
340
341 let inner = stream.close_and_take();
342 assert!(inner.is::<Cursor<Vec<u8>>>());
343 let inner = inner.downcast_ref::<Cursor<Vec<u8>>>().unwrap();
344 assert_eq!(inner.get_ref(), &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
345 }
346
347 #[test]
348 fn test_read_seek() {
349 let cursor = Cursor::new(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
350 let stream = ReadInputStream::new_seekable(cursor);
351
352 let mut buf = [0u8; 1024];
353 assert_eq!(stream.read(&mut buf[..], crate::Cancellable::NONE), Ok(10));
354 assert_eq!(&buf[..10], &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10][..]);
355
356 assert_eq!(stream.read(&mut buf[..], crate::Cancellable::NONE), Ok(0));
357
358 assert!(stream.can_seek());
359 assert_eq!(
360 stream.seek(0, glib::SeekType::Set, crate::Cancellable::NONE),
361 Ok(())
362 );
363 assert_eq!(stream.read(&mut buf[..], crate::Cancellable::NONE), Ok(10));
364 assert_eq!(&buf[..10], &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10][..]);
365
366 let inner = stream.close_and_take();
367 assert!(inner.is::<Cursor<Vec<u8>>>());
368 let inner = inner.downcast_ref::<Cursor<Vec<u8>>>().unwrap();
369 assert_eq!(inner.get_ref(), &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
370 }
371}