gio/
input_stream.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{fmt, future::Future, io, mem, pin::Pin, ptr};
4
5use futures_core::task::{Context, Poll};
6use futures_io::{AsyncBufRead, AsyncRead};
7use glib::{prelude::*, translate::*, Priority};
8
9use crate::{error::to_std_io_result, ffi, prelude::*, Cancellable, InputStream, Seekable};
10
11mod sealed {
12    pub trait Sealed {}
13    impl<T: super::IsA<super::InputStream>> Sealed for T {}
14}
15
16pub trait InputStreamExtManual: sealed::Sealed + IsA<InputStream> + Sized {
17    /// Tries to read @count bytes from the stream into the buffer starting at
18    /// @buffer. Will block during this read.
19    ///
20    /// If count is zero returns zero and does nothing. A value of @count
21    /// larger than `G_MAXSSIZE` will cause a [`IOErrorEnum::InvalidArgument`][crate::IOErrorEnum::InvalidArgument] error.
22    ///
23    /// On success, the number of bytes read into the buffer is returned.
24    /// It is not an error if this is not the same as the requested size, as it
25    /// can happen e.g. near the end of a file. Zero is returned on end of file
26    /// (or if @count is zero),  but never otherwise.
27    ///
28    /// The returned @buffer is not a nul-terminated string, it can contain nul bytes
29    /// at any position, and this function doesn't nul-terminate the @buffer.
30    ///
31    /// If @cancellable is not [`None`], then the operation can be cancelled by
32    /// triggering the cancellable object from another thread. If the operation
33    /// was cancelled, the error [`IOErrorEnum::Cancelled`][crate::IOErrorEnum::Cancelled] will be returned. If an
34    /// operation was partially finished when the operation was cancelled the
35    /// partial result will be returned, without an error.
36    ///
37    /// On error -1 is returned and @error is set accordingly.
38    /// ## `cancellable`
39    /// optional #GCancellable object, [`None`] to ignore.
40    ///
41    /// # Returns
42    ///
43    /// Number of bytes read, or -1 on error, or 0 on end of file.
44    ///
45    /// ## `buffer`
46    ///
47    ///   a buffer to read data into (which should be at least count bytes long).
48    #[doc(alias = "g_input_stream_read")]
49    fn read<B: AsMut<[u8]>, C: IsA<Cancellable>>(
50        &self,
51        mut buffer: B,
52        cancellable: Option<&C>,
53    ) -> Result<usize, glib::Error> {
54        let cancellable = cancellable.map(|c| c.as_ref());
55        let gcancellable = cancellable.to_glib_none();
56        let buffer = buffer.as_mut();
57        let buffer_ptr = buffer.as_mut_ptr();
58        let count = buffer.len();
59        unsafe {
60            let mut error = ptr::null_mut();
61            let ret = ffi::g_input_stream_read(
62                self.as_ref().to_glib_none().0,
63                buffer_ptr,
64                count,
65                gcancellable.0,
66                &mut error,
67            );
68            if error.is_null() {
69                Ok(ret as usize)
70            } else {
71                Err(from_glib_full(error))
72            }
73        }
74    }
75
76    /// Tries to read @count bytes from the stream into the buffer starting at
77    /// @buffer. Will block during this read.
78    ///
79    /// This function is similar to g_input_stream_read(), except it tries to
80    /// read as many bytes as requested, only stopping on an error or end of stream.
81    ///
82    /// On a successful read of @count bytes, or if we reached the end of the
83    /// stream,  [`true`] is returned, and @bytes_read is set to the number of bytes
84    /// read into @buffer.
85    ///
86    /// If there is an error during the operation [`false`] is returned and @error
87    /// is set to indicate the error status.
88    ///
89    /// As a special exception to the normal conventions for functions that
90    /// use #GError, if this function returns [`false`] (and sets @error) then
91    /// @bytes_read will be set to the number of bytes that were successfully
92    /// read before the error was encountered.  This functionality is only
93    /// available from C.  If you need it from another language then you must
94    /// write your own loop around g_input_stream_read().
95    /// ## `cancellable`
96    /// optional #GCancellable object, [`None`] to ignore.
97    ///
98    /// # Returns
99    ///
100    /// [`true`] on success, [`false`] if there was an error
101    ///
102    /// ## `buffer`
103    ///
104    ///   a buffer to read data into (which should be at least count bytes long).
105    ///
106    /// ## `bytes_read`
107    /// location to store the number of bytes that was read from the stream
108    #[doc(alias = "g_input_stream_read_all")]
109    fn read_all<B: AsMut<[u8]>, C: IsA<Cancellable>>(
110        &self,
111        mut buffer: B,
112        cancellable: Option<&C>,
113    ) -> Result<(usize, Option<glib::Error>), glib::Error> {
114        let cancellable = cancellable.map(|c| c.as_ref());
115        let gcancellable = cancellable.to_glib_none();
116        let buffer = buffer.as_mut();
117        let buffer_ptr = buffer.as_mut_ptr();
118        let count = buffer.len();
119        unsafe {
120            let mut bytes_read = mem::MaybeUninit::uninit();
121            let mut error = ptr::null_mut();
122            let _ = ffi::g_input_stream_read_all(
123                self.as_ref().to_glib_none().0,
124                buffer_ptr,
125                count,
126                bytes_read.as_mut_ptr(),
127                gcancellable.0,
128                &mut error,
129            );
130
131            let bytes_read = bytes_read.assume_init();
132            if error.is_null() {
133                Ok((bytes_read, None))
134            } else if bytes_read != 0 {
135                Ok((bytes_read, Some(from_glib_full(error))))
136            } else {
137                Err(from_glib_full(error))
138            }
139        }
140    }
141
142    /// Request an asynchronous read of @count bytes from the stream into the
143    /// buffer starting at @buffer.
144    ///
145    /// This is the asynchronous equivalent of [`InputStreamExtManual::read_all()`][crate::prelude::InputStreamExtManual::read_all()].
146    ///
147    /// Call `InputStream::read_all_finish()` to collect the result.
148    ///
149    /// Any outstanding I/O request with higher priority (lower numerical
150    /// value) will be executed before an outstanding request with lower
151    /// priority. Default priority is `G_PRIORITY_DEFAULT`.
152    /// ## `io_priority`
153    /// the [I/O priority](iface.AsyncResult.html#io-priority) of the request
154    /// ## `cancellable`
155    /// optional #GCancellable object, [`None`] to ignore
156    /// ## `callback`
157    /// a #GAsyncReadyCallback
158    ///   to call when the request is satisfied
159    ///
160    /// # Returns
161    ///
162    ///
163    /// ## `buffer`
164    ///
165    ///   a buffer to read data into (which should be at least count bytes long)
166    #[doc(alias = "g_input_stream_read_all_async")]
167    fn read_all_async<
168        B: AsMut<[u8]> + Send + 'static,
169        Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + 'static,
170        C: IsA<Cancellable>,
171    >(
172        &self,
173        buffer: B,
174        io_priority: Priority,
175        cancellable: Option<&C>,
176        callback: Q,
177    ) {
178        let main_context = glib::MainContext::ref_thread_default();
179        let is_main_context_owner = main_context.is_owner();
180        let has_acquired_main_context = (!is_main_context_owner)
181            .then(|| main_context.acquire().ok())
182            .flatten();
183        assert!(
184            is_main_context_owner || has_acquired_main_context.is_some(),
185            "Async operations only allowed if the thread is owning the MainContext"
186        );
187
188        let cancellable = cancellable.map(|c| c.as_ref());
189        let gcancellable = cancellable.to_glib_none();
190        let mut user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
191            Box::new((glib::thread_guard::ThreadGuard::new(callback), buffer));
192        // Need to do this after boxing as the contents pointer might change by moving into the box
193        let (count, buffer_ptr) = {
194            let buffer = &mut user_data.1;
195            let slice = (*buffer).as_mut();
196            (slice.len(), slice.as_mut_ptr())
197        };
198        unsafe extern "C" fn read_all_async_trampoline<
199            B: AsMut<[u8]> + Send + 'static,
200            Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + 'static,
201        >(
202            _source_object: *mut glib::gobject_ffi::GObject,
203            res: *mut ffi::GAsyncResult,
204            user_data: glib::ffi::gpointer,
205        ) {
206            let user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
207                Box::from_raw(user_data as *mut _);
208            let (callback, buffer) = *user_data;
209            let callback = callback.into_inner();
210
211            let mut error = ptr::null_mut();
212            let mut bytes_read = mem::MaybeUninit::uninit();
213            let _ = ffi::g_input_stream_read_all_finish(
214                _source_object as *mut _,
215                res,
216                bytes_read.as_mut_ptr(),
217                &mut error,
218            );
219
220            let bytes_read = bytes_read.assume_init();
221            let result = if error.is_null() {
222                Ok((buffer, bytes_read, None))
223            } else if bytes_read != 0 {
224                Ok((buffer, bytes_read, Some(from_glib_full(error))))
225            } else {
226                Err((buffer, from_glib_full(error)))
227            };
228
229            callback(result);
230        }
231        let callback = read_all_async_trampoline::<B, Q>;
232        unsafe {
233            ffi::g_input_stream_read_all_async(
234                self.as_ref().to_glib_none().0,
235                buffer_ptr,
236                count,
237                io_priority.into_glib(),
238                gcancellable.0,
239                Some(callback),
240                Box::into_raw(user_data) as *mut _,
241            );
242        }
243    }
244
245    /// Request an asynchronous read of @count bytes from the stream into the buffer
246    /// starting at @buffer. When the operation is finished @callback will be called.
247    /// You can then call g_input_stream_read_finish() to get the result of the
248    /// operation.
249    ///
250    /// During an async request no other sync and async calls are allowed on @self, and will
251    /// result in [`IOErrorEnum::Pending`][crate::IOErrorEnum::Pending] errors.
252    ///
253    /// A value of @count larger than `G_MAXSSIZE` will cause a [`IOErrorEnum::InvalidArgument`][crate::IOErrorEnum::InvalidArgument] error.
254    ///
255    /// On success, the number of bytes read into the buffer will be passed to the
256    /// callback. It is not an error if this is not the same as the requested size, as it
257    /// can happen e.g. near the end of a file, but generally we try to read
258    /// as many bytes as requested. Zero is returned on end of file
259    /// (or if @count is zero),  but never otherwise.
260    ///
261    /// Any outstanding i/o request with higher priority (lower numerical value) will
262    /// be executed before an outstanding request with lower priority. Default
263    /// priority is `G_PRIORITY_DEFAULT`.
264    ///
265    /// The asynchronous methods have a default fallback that uses threads to implement
266    /// asynchronicity, so they are optional for inheriting classes. However, if you
267    /// override one you must override all.
268    /// ## `io_priority`
269    /// the [I/O priority](iface.AsyncResult.html#io-priority)
270    /// of the request.
271    /// ## `cancellable`
272    /// optional #GCancellable object, [`None`] to ignore.
273    /// ## `callback`
274    /// a #GAsyncReadyCallback
275    ///   to call when the request is satisfied
276    ///
277    /// # Returns
278    ///
279    ///
280    /// ## `buffer`
281    ///
282    ///   a buffer to read data into (which should be at least count bytes long).
283    #[doc(alias = "g_input_stream_read_async")]
284    fn read_async<
285        B: AsMut<[u8]> + Send + 'static,
286        Q: FnOnce(Result<(B, usize), (B, glib::Error)>) + 'static,
287        C: IsA<Cancellable>,
288    >(
289        &self,
290        buffer: B,
291        io_priority: Priority,
292        cancellable: Option<&C>,
293        callback: Q,
294    ) {
295        let main_context = glib::MainContext::ref_thread_default();
296        let is_main_context_owner = main_context.is_owner();
297        let has_acquired_main_context = (!is_main_context_owner)
298            .then(|| main_context.acquire().ok())
299            .flatten();
300        assert!(
301            is_main_context_owner || has_acquired_main_context.is_some(),
302            "Async operations only allowed if the thread is owning the MainContext"
303        );
304
305        let cancellable = cancellable.map(|c| c.as_ref());
306        let gcancellable = cancellable.to_glib_none();
307        let mut user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
308            Box::new((glib::thread_guard::ThreadGuard::new(callback), buffer));
309        // Need to do this after boxing as the contents pointer might change by moving into the box
310        let (count, buffer_ptr) = {
311            let buffer = &mut user_data.1;
312            let slice = (*buffer).as_mut();
313            (slice.len(), slice.as_mut_ptr())
314        };
315        unsafe extern "C" fn read_async_trampoline<
316            B: AsMut<[u8]> + Send + 'static,
317            Q: FnOnce(Result<(B, usize), (B, glib::Error)>) + 'static,
318        >(
319            _source_object: *mut glib::gobject_ffi::GObject,
320            res: *mut ffi::GAsyncResult,
321            user_data: glib::ffi::gpointer,
322        ) {
323            let user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
324                Box::from_raw(user_data as *mut _);
325            let (callback, buffer) = *user_data;
326            let callback = callback.into_inner();
327
328            let mut error = ptr::null_mut();
329            let ret = ffi::g_input_stream_read_finish(_source_object as *mut _, res, &mut error);
330
331            let result = if error.is_null() {
332                Ok((buffer, ret as usize))
333            } else {
334                Err((buffer, from_glib_full(error)))
335            };
336
337            callback(result);
338        }
339        let callback = read_async_trampoline::<B, Q>;
340        unsafe {
341            ffi::g_input_stream_read_async(
342                self.as_ref().to_glib_none().0,
343                buffer_ptr,
344                count,
345                io_priority.into_glib(),
346                gcancellable.0,
347                Some(callback),
348                Box::into_raw(user_data) as *mut _,
349            );
350        }
351    }
352
353    fn read_all_future<B: AsMut<[u8]> + Send + 'static>(
354        &self,
355        buffer: B,
356        io_priority: Priority,
357    ) -> Pin<
358        Box<
359            dyn std::future::Future<
360                    Output = Result<(B, usize, Option<glib::Error>), (B, glib::Error)>,
361                > + 'static,
362        >,
363    > {
364        Box::pin(crate::GioFuture::new(
365            self,
366            move |obj, cancellable, send| {
367                obj.read_all_async(buffer, io_priority, Some(cancellable), move |res| {
368                    send.resolve(res);
369                });
370            },
371        ))
372    }
373
374    fn read_future<B: AsMut<[u8]> + Send + 'static>(
375        &self,
376        buffer: B,
377        io_priority: Priority,
378    ) -> Pin<Box<dyn std::future::Future<Output = Result<(B, usize), (B, glib::Error)>> + 'static>>
379    {
380        Box::pin(crate::GioFuture::new(
381            self,
382            move |obj, cancellable, send| {
383                obj.read_async(buffer, io_priority, Some(cancellable), move |res| {
384                    send.resolve(res);
385                });
386            },
387        ))
388    }
389
390    fn into_read(self) -> InputStreamRead<Self>
391    where
392        Self: IsA<InputStream>,
393    {
394        InputStreamRead(self)
395    }
396
397    fn into_async_buf_read(self, buffer_size: usize) -> InputStreamAsyncBufRead<Self>
398    where
399        Self: IsA<InputStream>,
400    {
401        InputStreamAsyncBufRead::new(self, buffer_size)
402    }
403}
404
405impl<O: IsA<InputStream>> InputStreamExtManual for O {}
406
407#[derive(Debug)]
408pub struct InputStreamRead<T: IsA<InputStream>>(T);
409
410impl<T: IsA<InputStream>> InputStreamRead<T> {
411    pub fn into_input_stream(self) -> T {
412        self.0
413    }
414
415    pub fn input_stream(&self) -> &T {
416        &self.0
417    }
418}
419
420impl<T: IsA<InputStream>> io::Read for InputStreamRead<T> {
421    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
422        let gio_result = self.0.as_ref().read(buf, crate::Cancellable::NONE);
423        to_std_io_result(gio_result)
424    }
425}
426
427impl<T: IsA<InputStream> + IsA<Seekable>> io::Seek for InputStreamRead<T> {
428    fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
429        let (pos, type_) = match pos {
430            io::SeekFrom::Start(pos) => (pos as i64, glib::SeekType::Set),
431            io::SeekFrom::End(pos) => (pos, glib::SeekType::End),
432            io::SeekFrom::Current(pos) => (pos, glib::SeekType::Cur),
433        };
434        let seekable: &Seekable = self.0.as_ref();
435        let gio_result = seekable
436            .seek(pos, type_, crate::Cancellable::NONE)
437            .map(|_| seekable.tell() as u64);
438        to_std_io_result(gio_result)
439    }
440}
441
442enum State {
443    Waiting {
444        buffer: Vec<u8>,
445    },
446    Transitioning,
447    Reading {
448        pending: Pin<
449            Box<
450                dyn std::future::Future<Output = Result<(Vec<u8>, usize), (Vec<u8>, glib::Error)>>
451                    + 'static,
452            >,
453        >,
454    },
455    HasData {
456        buffer: Vec<u8>,
457        valid: (usize, usize), // first index is inclusive, second is exclusive
458    },
459    Failed(crate::IOErrorEnum),
460}
461
462impl State {
463    fn into_buffer(self) -> Vec<u8> {
464        match self {
465            State::Waiting { buffer } => buffer,
466            _ => panic!("Invalid state"),
467        }
468    }
469
470    #[doc(alias = "get_pending")]
471    fn pending(
472        &mut self,
473    ) -> &mut Pin<
474        Box<
475            dyn std::future::Future<Output = Result<(Vec<u8>, usize), (Vec<u8>, glib::Error)>>
476                + 'static,
477        >,
478    > {
479        match self {
480            State::Reading { ref mut pending } => pending,
481            _ => panic!("Invalid state"),
482        }
483    }
484}
485pub struct InputStreamAsyncBufRead<T: IsA<InputStream>> {
486    stream: T,
487    state: State,
488}
489
490impl<T: IsA<InputStream>> InputStreamAsyncBufRead<T> {
491    pub fn into_input_stream(self) -> T {
492        self.stream
493    }
494
495    pub fn input_stream(&self) -> &T {
496        &self.stream
497    }
498
499    fn new(stream: T, buffer_size: usize) -> Self {
500        let buffer = vec![0; buffer_size];
501
502        Self {
503            stream,
504            state: State::Waiting { buffer },
505        }
506    }
507    fn set_reading(
508        &mut self,
509    ) -> &mut Pin<
510        Box<
511            dyn std::future::Future<Output = Result<(Vec<u8>, usize), (Vec<u8>, glib::Error)>>
512                + 'static,
513        >,
514    > {
515        match self.state {
516            State::Waiting { .. } => {
517                let waiting = mem::replace(&mut self.state, State::Transitioning);
518                let buffer = waiting.into_buffer();
519                let pending = self.input_stream().read_future(buffer, Priority::default());
520                self.state = State::Reading { pending };
521            }
522            State::Reading { .. } => {}
523            _ => panic!("Invalid state"),
524        };
525
526        self.state.pending()
527    }
528
529    #[doc(alias = "get_data")]
530    fn data(&self) -> Poll<io::Result<&[u8]>> {
531        if let State::HasData {
532            ref buffer,
533            valid: (i, j),
534        } = self.state
535        {
536            return Poll::Ready(Ok(&buffer[i..j]));
537        }
538        panic!("Invalid state")
539    }
540
541    fn set_waiting(&mut self, buffer: Vec<u8>) {
542        match self.state {
543            State::Reading { .. } | State::Transitioning => self.state = State::Waiting { buffer },
544            _ => panic!("Invalid state"),
545        }
546    }
547
548    fn set_has_data(&mut self, buffer: Vec<u8>, valid: (usize, usize)) {
549        match self.state {
550            State::Reading { .. } | State::Transitioning { .. } => {
551                self.state = State::HasData { buffer, valid }
552            }
553            _ => panic!("Invalid state"),
554        }
555    }
556
557    fn poll_fill_buf(&mut self, cx: &mut Context) -> Poll<Result<&[u8], futures_io::Error>> {
558        match self.state {
559            State::Failed(kind) => Poll::Ready(Err(io::Error::new(
560                io::ErrorKind::from(kind),
561                BufReadError::Failed,
562            ))),
563            State::HasData { .. } => self.data(),
564            State::Transitioning => panic!("Invalid state"),
565            State::Waiting { .. } | State::Reading { .. } => {
566                let pending = self.set_reading();
567                match Pin::new(pending).poll(cx) {
568                    Poll::Ready(Ok((buffer, res))) => {
569                        if res == 0 {
570                            self.set_waiting(buffer);
571                            Poll::Ready(Ok(&[]))
572                        } else {
573                            self.set_has_data(buffer, (0, res));
574                            self.data()
575                        }
576                    }
577                    Poll::Ready(Err((_, err))) => {
578                        let kind = err
579                            .kind::<crate::IOErrorEnum>()
580                            .unwrap_or(crate::IOErrorEnum::Failed);
581                        self.state = State::Failed(kind);
582                        Poll::Ready(Err(io::Error::new(io::ErrorKind::from(kind), err)))
583                    }
584                    Poll::Pending => Poll::Pending,
585                }
586            }
587        }
588    }
589
590    fn consume(&mut self, amt: usize) {
591        if amt == 0 {
592            return;
593        }
594
595        if let State::HasData { .. } = self.state {
596            let has_data = mem::replace(&mut self.state, State::Transitioning);
597            if let State::HasData {
598                buffer,
599                valid: (i, j),
600            } = has_data
601            {
602                let available = j - i;
603                if amt > available {
604                    panic!("Cannot consume {amt} bytes as only {available} are available",)
605                }
606                let remaining = available - amt;
607                if remaining == 0 {
608                    return self.set_waiting(buffer);
609                } else {
610                    return self.set_has_data(buffer, (i + amt, j));
611                }
612            }
613        }
614
615        panic!("Invalid state")
616    }
617}
618
619#[derive(Debug)]
620enum BufReadError {
621    Failed,
622}
623
624impl std::error::Error for BufReadError {}
625
626impl fmt::Display for BufReadError {
627    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
628        match self {
629            Self::Failed => fmt.write_str("Previous read operation failed"),
630        }
631    }
632}
633
634impl<T: IsA<InputStream>> AsyncRead for InputStreamAsyncBufRead<T> {
635    fn poll_read(
636        self: Pin<&mut Self>,
637        cx: &mut Context,
638        out_buf: &mut [u8],
639    ) -> Poll<io::Result<usize>> {
640        let reader = self.get_mut();
641        let poll = reader.poll_fill_buf(cx);
642
643        let poll = poll.map_ok(|buffer| {
644            let copied = buffer.len().min(out_buf.len());
645            out_buf[..copied].copy_from_slice(&buffer[..copied]);
646            copied
647        });
648
649        if let Poll::Ready(Ok(consumed)) = poll {
650            reader.consume(consumed);
651        }
652        poll
653    }
654}
655
656impl<T: IsA<InputStream>> AsyncBufRead for InputStreamAsyncBufRead<T> {
657    fn poll_fill_buf(
658        self: Pin<&mut Self>,
659        cx: &mut Context,
660    ) -> Poll<Result<&[u8], futures_io::Error>> {
661        self.get_mut().poll_fill_buf(cx)
662    }
663
664    fn consume(self: Pin<&mut Self>, amt: usize) {
665        self.get_mut().consume(amt);
666    }
667}
668
669impl<T: IsA<InputStream>> Unpin for InputStreamAsyncBufRead<T> {}
670
671#[cfg(test)]
672mod tests {
673    use std::io::Read;
674
675    use glib::Bytes;
676
677    use crate::{prelude::*, test_util::run_async, MemoryInputStream};
678
679    #[test]
680    fn read_all_async() {
681        let ret = run_async(|tx, l| {
682            let b = Bytes::from_owned(vec![1, 2, 3]);
683            let strm = MemoryInputStream::from_bytes(&b);
684
685            let buf = vec![0; 10];
686            strm.read_all_async(
687                buf,
688                glib::Priority::DEFAULT_IDLE,
689                crate::Cancellable::NONE,
690                move |ret| {
691                    tx.send(ret).unwrap();
692                    l.quit();
693                },
694            );
695        });
696
697        let (buf, count, err) = ret.unwrap();
698        assert_eq!(count, 3);
699        assert!(err.is_none());
700        assert_eq!(buf[0], 1);
701        assert_eq!(buf[1], 2);
702        assert_eq!(buf[2], 3);
703    }
704
705    #[test]
706    fn read_all() {
707        let b = Bytes::from_owned(vec![1, 2, 3]);
708        let strm = MemoryInputStream::from_bytes(&b);
709        let mut buf = vec![0; 10];
710
711        let ret = strm.read_all(&mut buf, crate::Cancellable::NONE).unwrap();
712
713        assert_eq!(ret.0, 3);
714        assert!(ret.1.is_none());
715        assert_eq!(buf[0], 1);
716        assert_eq!(buf[1], 2);
717        assert_eq!(buf[2], 3);
718    }
719
720    #[test]
721    fn read() {
722        let b = Bytes::from_owned(vec![1, 2, 3]);
723        let strm = MemoryInputStream::from_bytes(&b);
724        let mut buf = vec![0; 10];
725
726        let ret = strm.read(&mut buf, crate::Cancellable::NONE);
727
728        assert_eq!(ret.unwrap(), 3);
729        assert_eq!(buf[0], 1);
730        assert_eq!(buf[1], 2);
731        assert_eq!(buf[2], 3);
732    }
733
734    #[test]
735    fn read_async() {
736        let ret = run_async(|tx, l| {
737            let b = Bytes::from_owned(vec![1, 2, 3]);
738            let strm = MemoryInputStream::from_bytes(&b);
739
740            let buf = vec![0; 10];
741            strm.read_async(
742                buf,
743                glib::Priority::DEFAULT_IDLE,
744                crate::Cancellable::NONE,
745                move |ret| {
746                    tx.send(ret).unwrap();
747                    l.quit();
748                },
749            );
750        });
751
752        let (buf, count) = ret.unwrap();
753        assert_eq!(count, 3);
754        assert_eq!(buf[0], 1);
755        assert_eq!(buf[1], 2);
756        assert_eq!(buf[2], 3);
757    }
758
759    #[test]
760    fn read_bytes_async() {
761        let ret = run_async(|tx, l| {
762            let b = Bytes::from_owned(vec![1, 2, 3]);
763            let strm = MemoryInputStream::from_bytes(&b);
764
765            strm.read_bytes_async(
766                10,
767                glib::Priority::DEFAULT_IDLE,
768                crate::Cancellable::NONE,
769                move |ret| {
770                    tx.send(ret).unwrap();
771                    l.quit();
772                },
773            );
774        });
775
776        let bytes = ret.unwrap();
777        assert_eq!(bytes, vec![1, 2, 3]);
778    }
779
780    #[test]
781    fn skip_async() {
782        let ret = run_async(|tx, l| {
783            let b = Bytes::from_owned(vec![1, 2, 3]);
784            let strm = MemoryInputStream::from_bytes(&b);
785
786            strm.skip_async(
787                10,
788                glib::Priority::DEFAULT_IDLE,
789                crate::Cancellable::NONE,
790                move |ret| {
791                    tx.send(ret).unwrap();
792                    l.quit();
793                },
794            );
795        });
796
797        let skipped = ret.unwrap();
798        assert_eq!(skipped, 3);
799    }
800
801    #[test]
802    fn std_io_read() {
803        let b = Bytes::from_owned(vec![1, 2, 3]);
804        let mut read = MemoryInputStream::from_bytes(&b).into_read();
805        let mut buf = [0u8; 10];
806
807        let ret = read.read(&mut buf);
808
809        assert_eq!(ret.unwrap(), 3);
810        assert_eq!(buf[0], 1);
811        assert_eq!(buf[1], 2);
812        assert_eq!(buf[2], 3);
813    }
814
815    #[test]
816    fn into_input_stream() {
817        let b = Bytes::from_owned(vec![1, 2, 3]);
818        let stream = MemoryInputStream::from_bytes(&b);
819        let stream_clone = stream.clone();
820        let stream = stream.into_read().into_input_stream();
821
822        assert_eq!(stream, stream_clone);
823    }
824}