gio/
input_stream.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{fmt, future::Future, io, mem, pin::Pin, ptr};
4
5use futures_core::task::{Context, Poll};
6use futures_io::{AsyncBufRead, AsyncRead};
7use glib::{prelude::*, translate::*, Priority};
8
9use crate::{error::to_std_io_result, ffi, prelude::*, Cancellable, InputStream, Seekable};
10
11pub trait InputStreamExtManual: IsA<InputStream> + Sized {
12    /// Tries to read @count bytes from the stream into the buffer starting at
13    /// @buffer. Will block during this read.
14    ///
15    /// If count is zero returns zero and does nothing. A value of @count
16    /// larger than `G_MAXSSIZE` will cause a [`IOErrorEnum::InvalidArgument`][crate::IOErrorEnum::InvalidArgument] error.
17    ///
18    /// On success, the number of bytes read into the buffer is returned.
19    /// It is not an error if this is not the same as the requested size, as it
20    /// can happen e.g. near the end of a file. Zero is returned on end of file
21    /// (or if @count is zero),  but never otherwise.
22    ///
23    /// The returned @buffer is not a nul-terminated string, it can contain nul bytes
24    /// at any position, and this function doesn't nul-terminate the @buffer.
25    ///
26    /// If @cancellable is not [`None`], then the operation can be cancelled by
27    /// triggering the cancellable object from another thread. If the operation
28    /// was cancelled, the error [`IOErrorEnum::Cancelled`][crate::IOErrorEnum::Cancelled] will be returned. If an
29    /// operation was partially finished when the operation was cancelled the
30    /// partial result will be returned, without an error.
31    ///
32    /// On error -1 is returned and @error is set accordingly.
33    /// ## `cancellable`
34    /// optional #GCancellable object, [`None`] to ignore.
35    ///
36    /// # Returns
37    ///
38    /// Number of bytes read, or -1 on error, or 0 on end of file.
39    ///
40    /// ## `buffer`
41    ///
42    ///   a buffer to read data into (which should be at least count bytes long).
43    #[doc(alias = "g_input_stream_read")]
44    fn read<B: AsMut<[u8]>, C: IsA<Cancellable>>(
45        &self,
46        mut buffer: B,
47        cancellable: Option<&C>,
48    ) -> Result<usize, glib::Error> {
49        let cancellable = cancellable.map(|c| c.as_ref());
50        let gcancellable = cancellable.to_glib_none();
51        let buffer = buffer.as_mut();
52        let buffer_ptr = buffer.as_mut_ptr();
53        let count = buffer.len();
54        unsafe {
55            let mut error = ptr::null_mut();
56            let ret = ffi::g_input_stream_read(
57                self.as_ref().to_glib_none().0,
58                buffer_ptr,
59                count,
60                gcancellable.0,
61                &mut error,
62            );
63            if error.is_null() {
64                Ok(ret as usize)
65            } else {
66                Err(from_glib_full(error))
67            }
68        }
69    }
70
71    /// Tries to read @count bytes from the stream into the buffer starting at
72    /// @buffer. Will block during this read.
73    ///
74    /// This function is similar to g_input_stream_read(), except it tries to
75    /// read as many bytes as requested, only stopping on an error or end of stream.
76    ///
77    /// On a successful read of @count bytes, or if we reached the end of the
78    /// stream,  [`true`] is returned, and @bytes_read is set to the number of bytes
79    /// read into @buffer.
80    ///
81    /// If there is an error during the operation [`false`] is returned and @error
82    /// is set to indicate the error status.
83    ///
84    /// As a special exception to the normal conventions for functions that
85    /// use #GError, if this function returns [`false`] (and sets @error) then
86    /// @bytes_read will be set to the number of bytes that were successfully
87    /// read before the error was encountered.  This functionality is only
88    /// available from C.  If you need it from another language then you must
89    /// write your own loop around g_input_stream_read().
90    /// ## `cancellable`
91    /// optional #GCancellable object, [`None`] to ignore.
92    ///
93    /// # Returns
94    ///
95    /// [`true`] on success, [`false`] if there was an error
96    ///
97    /// ## `buffer`
98    ///
99    ///   a buffer to read data into (which should be at least count bytes long).
100    ///
101    /// ## `bytes_read`
102    /// location to store the number of bytes that was read from the stream
103    #[doc(alias = "g_input_stream_read_all")]
104    fn read_all<B: AsMut<[u8]>, C: IsA<Cancellable>>(
105        &self,
106        mut buffer: B,
107        cancellable: Option<&C>,
108    ) -> Result<(usize, Option<glib::Error>), glib::Error> {
109        let cancellable = cancellable.map(|c| c.as_ref());
110        let gcancellable = cancellable.to_glib_none();
111        let buffer = buffer.as_mut();
112        let buffer_ptr = buffer.as_mut_ptr();
113        let count = buffer.len();
114        unsafe {
115            let mut bytes_read = mem::MaybeUninit::uninit();
116            let mut error = ptr::null_mut();
117            let _ = ffi::g_input_stream_read_all(
118                self.as_ref().to_glib_none().0,
119                buffer_ptr,
120                count,
121                bytes_read.as_mut_ptr(),
122                gcancellable.0,
123                &mut error,
124            );
125
126            let bytes_read = bytes_read.assume_init();
127            if error.is_null() {
128                Ok((bytes_read, None))
129            } else if bytes_read != 0 {
130                Ok((bytes_read, Some(from_glib_full(error))))
131            } else {
132                Err(from_glib_full(error))
133            }
134        }
135    }
136
137    /// Request an asynchronous read of @count bytes from the stream into the
138    /// buffer starting at @buffer.
139    ///
140    /// This is the asynchronous equivalent of [`InputStreamExtManual::read_all()`][crate::prelude::InputStreamExtManual::read_all()].
141    ///
142    /// Call `InputStream::read_all_finish()` to collect the result.
143    ///
144    /// Any outstanding I/O request with higher priority (lower numerical
145    /// value) will be executed before an outstanding request with lower
146    /// priority. Default priority is `G_PRIORITY_DEFAULT`.
147    /// ## `io_priority`
148    /// the [I/O priority](iface.AsyncResult.html#io-priority) of the request
149    /// ## `cancellable`
150    /// optional #GCancellable object, [`None`] to ignore
151    /// ## `callback`
152    /// a #GAsyncReadyCallback
153    ///   to call when the request is satisfied
154    ///
155    /// # Returns
156    ///
157    ///
158    /// ## `buffer`
159    ///
160    ///   a buffer to read data into (which should be at least count bytes long)
161    #[doc(alias = "g_input_stream_read_all_async")]
162    fn read_all_async<
163        B: AsMut<[u8]> + Send + 'static,
164        Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + 'static,
165        C: IsA<Cancellable>,
166    >(
167        &self,
168        buffer: B,
169        io_priority: Priority,
170        cancellable: Option<&C>,
171        callback: Q,
172    ) {
173        let main_context = glib::MainContext::ref_thread_default();
174        let is_main_context_owner = main_context.is_owner();
175        let has_acquired_main_context = (!is_main_context_owner)
176            .then(|| main_context.acquire().ok())
177            .flatten();
178        assert!(
179            is_main_context_owner || has_acquired_main_context.is_some(),
180            "Async operations only allowed if the thread is owning the MainContext"
181        );
182
183        let cancellable = cancellable.map(|c| c.as_ref());
184        let gcancellable = cancellable.to_glib_none();
185        let mut user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
186            Box::new((glib::thread_guard::ThreadGuard::new(callback), buffer));
187        // Need to do this after boxing as the contents pointer might change by moving into the box
188        let (count, buffer_ptr) = {
189            let buffer = &mut user_data.1;
190            let slice = (*buffer).as_mut();
191            (slice.len(), slice.as_mut_ptr())
192        };
193        unsafe extern "C" fn read_all_async_trampoline<
194            B: AsMut<[u8]> + Send + 'static,
195            Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + 'static,
196        >(
197            _source_object: *mut glib::gobject_ffi::GObject,
198            res: *mut ffi::GAsyncResult,
199            user_data: glib::ffi::gpointer,
200        ) {
201            let user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
202                Box::from_raw(user_data as *mut _);
203            let (callback, buffer) = *user_data;
204            let callback = callback.into_inner();
205
206            let mut error = ptr::null_mut();
207            let mut bytes_read = mem::MaybeUninit::uninit();
208            let _ = ffi::g_input_stream_read_all_finish(
209                _source_object as *mut _,
210                res,
211                bytes_read.as_mut_ptr(),
212                &mut error,
213            );
214
215            let bytes_read = bytes_read.assume_init();
216            let result = if error.is_null() {
217                Ok((buffer, bytes_read, None))
218            } else if bytes_read != 0 {
219                Ok((buffer, bytes_read, Some(from_glib_full(error))))
220            } else {
221                Err((buffer, from_glib_full(error)))
222            };
223
224            callback(result);
225        }
226        let callback = read_all_async_trampoline::<B, Q>;
227        unsafe {
228            ffi::g_input_stream_read_all_async(
229                self.as_ref().to_glib_none().0,
230                buffer_ptr,
231                count,
232                io_priority.into_glib(),
233                gcancellable.0,
234                Some(callback),
235                Box::into_raw(user_data) as *mut _,
236            );
237        }
238    }
239
240    /// Request an asynchronous read of @count bytes from the stream into the buffer
241    /// starting at @buffer. When the operation is finished @callback will be called.
242    /// You can then call g_input_stream_read_finish() to get the result of the
243    /// operation.
244    ///
245    /// During an async request no other sync and async calls are allowed on @self, and will
246    /// result in [`IOErrorEnum::Pending`][crate::IOErrorEnum::Pending] errors.
247    ///
248    /// A value of @count larger than `G_MAXSSIZE` will cause a [`IOErrorEnum::InvalidArgument`][crate::IOErrorEnum::InvalidArgument] error.
249    ///
250    /// On success, the number of bytes read into the buffer will be passed to the
251    /// callback. It is not an error if this is not the same as the requested size, as it
252    /// can happen e.g. near the end of a file, but generally we try to read
253    /// as many bytes as requested. Zero is returned on end of file
254    /// (or if @count is zero),  but never otherwise.
255    ///
256    /// Any outstanding i/o request with higher priority (lower numerical value) will
257    /// be executed before an outstanding request with lower priority. Default
258    /// priority is `G_PRIORITY_DEFAULT`.
259    ///
260    /// The asynchronous methods have a default fallback that uses threads to implement
261    /// asynchronicity, so they are optional for inheriting classes. However, if you
262    /// override one you must override all.
263    /// ## `io_priority`
264    /// the [I/O priority](iface.AsyncResult.html#io-priority)
265    /// of the request.
266    /// ## `cancellable`
267    /// optional #GCancellable object, [`None`] to ignore.
268    /// ## `callback`
269    /// a #GAsyncReadyCallback
270    ///   to call when the request is satisfied
271    ///
272    /// # Returns
273    ///
274    ///
275    /// ## `buffer`
276    ///
277    ///   a buffer to read data into (which should be at least count bytes long).
278    #[doc(alias = "g_input_stream_read_async")]
279    fn read_async<
280        B: AsMut<[u8]> + Send + 'static,
281        Q: FnOnce(Result<(B, usize), (B, glib::Error)>) + 'static,
282        C: IsA<Cancellable>,
283    >(
284        &self,
285        buffer: B,
286        io_priority: Priority,
287        cancellable: Option<&C>,
288        callback: Q,
289    ) {
290        let main_context = glib::MainContext::ref_thread_default();
291        let is_main_context_owner = main_context.is_owner();
292        let has_acquired_main_context = (!is_main_context_owner)
293            .then(|| main_context.acquire().ok())
294            .flatten();
295        assert!(
296            is_main_context_owner || has_acquired_main_context.is_some(),
297            "Async operations only allowed if the thread is owning the MainContext"
298        );
299
300        let cancellable = cancellable.map(|c| c.as_ref());
301        let gcancellable = cancellable.to_glib_none();
302        let mut user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
303            Box::new((glib::thread_guard::ThreadGuard::new(callback), buffer));
304        // Need to do this after boxing as the contents pointer might change by moving into the box
305        let (count, buffer_ptr) = {
306            let buffer = &mut user_data.1;
307            let slice = (*buffer).as_mut();
308            (slice.len(), slice.as_mut_ptr())
309        };
310        unsafe extern "C" fn read_async_trampoline<
311            B: AsMut<[u8]> + Send + 'static,
312            Q: FnOnce(Result<(B, usize), (B, glib::Error)>) + 'static,
313        >(
314            _source_object: *mut glib::gobject_ffi::GObject,
315            res: *mut ffi::GAsyncResult,
316            user_data: glib::ffi::gpointer,
317        ) {
318            let user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
319                Box::from_raw(user_data as *mut _);
320            let (callback, buffer) = *user_data;
321            let callback = callback.into_inner();
322
323            let mut error = ptr::null_mut();
324            let ret = ffi::g_input_stream_read_finish(_source_object as *mut _, res, &mut error);
325
326            let result = if error.is_null() {
327                Ok((buffer, ret as usize))
328            } else {
329                Err((buffer, from_glib_full(error)))
330            };
331
332            callback(result);
333        }
334        let callback = read_async_trampoline::<B, Q>;
335        unsafe {
336            ffi::g_input_stream_read_async(
337                self.as_ref().to_glib_none().0,
338                buffer_ptr,
339                count,
340                io_priority.into_glib(),
341                gcancellable.0,
342                Some(callback),
343                Box::into_raw(user_data) as *mut _,
344            );
345        }
346    }
347
348    fn read_all_future<B: AsMut<[u8]> + Send + 'static>(
349        &self,
350        buffer: B,
351        io_priority: Priority,
352    ) -> Pin<
353        Box<
354            dyn std::future::Future<
355                    Output = Result<(B, usize, Option<glib::Error>), (B, glib::Error)>,
356                > + 'static,
357        >,
358    > {
359        Box::pin(crate::GioFuture::new(
360            self,
361            move |obj, cancellable, send| {
362                obj.read_all_async(buffer, io_priority, Some(cancellable), move |res| {
363                    send.resolve(res);
364                });
365            },
366        ))
367    }
368
369    fn read_future<B: AsMut<[u8]> + Send + 'static>(
370        &self,
371        buffer: B,
372        io_priority: Priority,
373    ) -> Pin<Box<dyn std::future::Future<Output = Result<(B, usize), (B, glib::Error)>> + 'static>>
374    {
375        Box::pin(crate::GioFuture::new(
376            self,
377            move |obj, cancellable, send| {
378                obj.read_async(buffer, io_priority, Some(cancellable), move |res| {
379                    send.resolve(res);
380                });
381            },
382        ))
383    }
384
385    fn into_read(self) -> InputStreamRead<Self>
386    where
387        Self: IsA<InputStream>,
388    {
389        InputStreamRead(self)
390    }
391
392    fn into_async_buf_read(self, buffer_size: usize) -> InputStreamAsyncBufRead<Self>
393    where
394        Self: IsA<InputStream>,
395    {
396        InputStreamAsyncBufRead::new(self, buffer_size)
397    }
398}
399
400impl<O: IsA<InputStream>> InputStreamExtManual for O {}
401
402#[derive(Debug)]
403pub struct InputStreamRead<T: IsA<InputStream>>(T);
404
405impl<T: IsA<InputStream>> InputStreamRead<T> {
406    pub fn into_input_stream(self) -> T {
407        self.0
408    }
409
410    pub fn input_stream(&self) -> &T {
411        &self.0
412    }
413}
414
415impl<T: IsA<InputStream>> io::Read for InputStreamRead<T> {
416    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
417        let gio_result = self.0.as_ref().read(buf, crate::Cancellable::NONE);
418        to_std_io_result(gio_result)
419    }
420}
421
422impl<T: IsA<InputStream> + IsA<Seekable>> io::Seek for InputStreamRead<T> {
423    fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
424        let (pos, type_) = match pos {
425            io::SeekFrom::Start(pos) => (pos as i64, glib::SeekType::Set),
426            io::SeekFrom::End(pos) => (pos, glib::SeekType::End),
427            io::SeekFrom::Current(pos) => (pos, glib::SeekType::Cur),
428        };
429        let seekable: &Seekable = self.0.as_ref();
430        let gio_result = seekable
431            .seek(pos, type_, crate::Cancellable::NONE)
432            .map(|_| seekable.tell() as u64);
433        to_std_io_result(gio_result)
434    }
435}
436
437enum State {
438    Waiting {
439        buffer: Vec<u8>,
440    },
441    Transitioning,
442    Reading {
443        pending: Pin<
444            Box<
445                dyn std::future::Future<Output = Result<(Vec<u8>, usize), (Vec<u8>, glib::Error)>>
446                    + 'static,
447            >,
448        >,
449    },
450    HasData {
451        buffer: Vec<u8>,
452        valid: (usize, usize), // first index is inclusive, second is exclusive
453    },
454    Failed(crate::IOErrorEnum),
455}
456
457impl State {
458    fn into_buffer(self) -> Vec<u8> {
459        match self {
460            State::Waiting { buffer } => buffer,
461            _ => panic!("Invalid state"),
462        }
463    }
464
465    #[doc(alias = "get_pending")]
466    fn pending(
467        &mut self,
468    ) -> &mut Pin<
469        Box<
470            dyn std::future::Future<Output = Result<(Vec<u8>, usize), (Vec<u8>, glib::Error)>>
471                + 'static,
472        >,
473    > {
474        match self {
475            State::Reading { ref mut pending } => pending,
476            _ => panic!("Invalid state"),
477        }
478    }
479}
480pub struct InputStreamAsyncBufRead<T: IsA<InputStream>> {
481    stream: T,
482    state: State,
483}
484
485impl<T: IsA<InputStream>> InputStreamAsyncBufRead<T> {
486    pub fn into_input_stream(self) -> T {
487        self.stream
488    }
489
490    pub fn input_stream(&self) -> &T {
491        &self.stream
492    }
493
494    fn new(stream: T, buffer_size: usize) -> Self {
495        let buffer = vec![0; buffer_size];
496
497        Self {
498            stream,
499            state: State::Waiting { buffer },
500        }
501    }
502    fn set_reading(
503        &mut self,
504    ) -> &mut Pin<
505        Box<
506            dyn std::future::Future<Output = Result<(Vec<u8>, usize), (Vec<u8>, glib::Error)>>
507                + 'static,
508        >,
509    > {
510        match self.state {
511            State::Waiting { .. } => {
512                let waiting = mem::replace(&mut self.state, State::Transitioning);
513                let buffer = waiting.into_buffer();
514                let pending = self.input_stream().read_future(buffer, Priority::default());
515                self.state = State::Reading { pending };
516            }
517            State::Reading { .. } => {}
518            _ => panic!("Invalid state"),
519        };
520
521        self.state.pending()
522    }
523
524    #[doc(alias = "get_data")]
525    fn data(&self) -> Poll<io::Result<&[u8]>> {
526        if let State::HasData {
527            ref buffer,
528            valid: (i, j),
529        } = self.state
530        {
531            return Poll::Ready(Ok(&buffer[i..j]));
532        }
533        panic!("Invalid state")
534    }
535
536    fn set_waiting(&mut self, buffer: Vec<u8>) {
537        match self.state {
538            State::Reading { .. } | State::Transitioning => self.state = State::Waiting { buffer },
539            _ => panic!("Invalid state"),
540        }
541    }
542
543    fn set_has_data(&mut self, buffer: Vec<u8>, valid: (usize, usize)) {
544        match self.state {
545            State::Reading { .. } | State::Transitioning => {
546                self.state = State::HasData { buffer, valid }
547            }
548            _ => panic!("Invalid state"),
549        }
550    }
551
552    fn poll_fill_buf(&mut self, cx: &mut Context) -> Poll<Result<&[u8], futures_io::Error>> {
553        match self.state {
554            State::Failed(kind) => Poll::Ready(Err(io::Error::new(
555                io::ErrorKind::from(kind),
556                BufReadError::Failed,
557            ))),
558            State::HasData { .. } => self.data(),
559            State::Transitioning => panic!("Invalid state"),
560            State::Waiting { .. } | State::Reading { .. } => {
561                let pending = self.set_reading();
562                match Pin::new(pending).poll(cx) {
563                    Poll::Ready(Ok((buffer, res))) => {
564                        if res == 0 {
565                            self.set_waiting(buffer);
566                            Poll::Ready(Ok(&[]))
567                        } else {
568                            self.set_has_data(buffer, (0, res));
569                            self.data()
570                        }
571                    }
572                    Poll::Ready(Err((_, err))) => {
573                        let kind = err
574                            .kind::<crate::IOErrorEnum>()
575                            .unwrap_or(crate::IOErrorEnum::Failed);
576                        self.state = State::Failed(kind);
577                        Poll::Ready(Err(io::Error::new(io::ErrorKind::from(kind), err)))
578                    }
579                    Poll::Pending => Poll::Pending,
580                }
581            }
582        }
583    }
584
585    fn consume(&mut self, amt: usize) {
586        if amt == 0 {
587            return;
588        }
589
590        if let State::HasData { .. } = self.state {
591            let has_data = mem::replace(&mut self.state, State::Transitioning);
592            if let State::HasData {
593                buffer,
594                valid: (i, j),
595            } = has_data
596            {
597                let available = j - i;
598                if amt > available {
599                    panic!("Cannot consume {amt} bytes as only {available} are available",)
600                }
601                let remaining = available - amt;
602                if remaining == 0 {
603                    return self.set_waiting(buffer);
604                } else {
605                    return self.set_has_data(buffer, (i + amt, j));
606                }
607            }
608        }
609
610        panic!("Invalid state")
611    }
612}
613
614#[derive(Debug)]
615enum BufReadError {
616    Failed,
617}
618
619impl std::error::Error for BufReadError {}
620
621impl fmt::Display for BufReadError {
622    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
623        match self {
624            Self::Failed => fmt.write_str("Previous read operation failed"),
625        }
626    }
627}
628
629impl<T: IsA<InputStream>> AsyncRead for InputStreamAsyncBufRead<T> {
630    fn poll_read(
631        self: Pin<&mut Self>,
632        cx: &mut Context,
633        out_buf: &mut [u8],
634    ) -> Poll<io::Result<usize>> {
635        let reader = self.get_mut();
636        let poll = reader.poll_fill_buf(cx);
637
638        let poll = poll.map_ok(|buffer| {
639            let copied = buffer.len().min(out_buf.len());
640            out_buf[..copied].copy_from_slice(&buffer[..copied]);
641            copied
642        });
643
644        if let Poll::Ready(Ok(consumed)) = poll {
645            reader.consume(consumed);
646        }
647        poll
648    }
649}
650
651impl<T: IsA<InputStream>> AsyncBufRead for InputStreamAsyncBufRead<T> {
652    fn poll_fill_buf(
653        self: Pin<&mut Self>,
654        cx: &mut Context,
655    ) -> Poll<Result<&[u8], futures_io::Error>> {
656        self.get_mut().poll_fill_buf(cx)
657    }
658
659    fn consume(self: Pin<&mut Self>, amt: usize) {
660        self.get_mut().consume(amt);
661    }
662}
663
664impl<T: IsA<InputStream>> Unpin for InputStreamAsyncBufRead<T> {}
665
666#[cfg(test)]
667mod tests {
668    use std::io::Read;
669
670    use glib::Bytes;
671
672    use crate::{prelude::*, test_util::run_async, MemoryInputStream};
673
674    #[test]
675    fn read_all_async() {
676        let ret = run_async(|tx, l| {
677            let b = Bytes::from_owned(vec![1, 2, 3]);
678            let strm = MemoryInputStream::from_bytes(&b);
679
680            let buf = vec![0; 10];
681            strm.read_all_async(
682                buf,
683                glib::Priority::DEFAULT_IDLE,
684                crate::Cancellable::NONE,
685                move |ret| {
686                    tx.send(ret).unwrap();
687                    l.quit();
688                },
689            );
690        });
691
692        let (buf, count, err) = ret.unwrap();
693        assert_eq!(count, 3);
694        assert!(err.is_none());
695        assert_eq!(buf[0], 1);
696        assert_eq!(buf[1], 2);
697        assert_eq!(buf[2], 3);
698    }
699
700    #[test]
701    fn read_all() {
702        let b = Bytes::from_owned(vec![1, 2, 3]);
703        let strm = MemoryInputStream::from_bytes(&b);
704        let mut buf = vec![0; 10];
705
706        let ret = strm.read_all(&mut buf, crate::Cancellable::NONE).unwrap();
707
708        assert_eq!(ret.0, 3);
709        assert!(ret.1.is_none());
710        assert_eq!(buf[0], 1);
711        assert_eq!(buf[1], 2);
712        assert_eq!(buf[2], 3);
713    }
714
715    #[test]
716    fn read() {
717        let b = Bytes::from_owned(vec![1, 2, 3]);
718        let strm = MemoryInputStream::from_bytes(&b);
719        let mut buf = vec![0; 10];
720
721        let ret = strm.read(&mut buf, crate::Cancellable::NONE);
722
723        assert_eq!(ret.unwrap(), 3);
724        assert_eq!(buf[0], 1);
725        assert_eq!(buf[1], 2);
726        assert_eq!(buf[2], 3);
727    }
728
729    #[test]
730    fn read_async() {
731        let ret = run_async(|tx, l| {
732            let b = Bytes::from_owned(vec![1, 2, 3]);
733            let strm = MemoryInputStream::from_bytes(&b);
734
735            let buf = vec![0; 10];
736            strm.read_async(
737                buf,
738                glib::Priority::DEFAULT_IDLE,
739                crate::Cancellable::NONE,
740                move |ret| {
741                    tx.send(ret).unwrap();
742                    l.quit();
743                },
744            );
745        });
746
747        let (buf, count) = ret.unwrap();
748        assert_eq!(count, 3);
749        assert_eq!(buf[0], 1);
750        assert_eq!(buf[1], 2);
751        assert_eq!(buf[2], 3);
752    }
753
754    #[test]
755    fn read_bytes_async() {
756        let ret = run_async(|tx, l| {
757            let b = Bytes::from_owned(vec![1, 2, 3]);
758            let strm = MemoryInputStream::from_bytes(&b);
759
760            strm.read_bytes_async(
761                10,
762                glib::Priority::DEFAULT_IDLE,
763                crate::Cancellable::NONE,
764                move |ret| {
765                    tx.send(ret).unwrap();
766                    l.quit();
767                },
768            );
769        });
770
771        let bytes = ret.unwrap();
772        assert_eq!(bytes, vec![1, 2, 3]);
773    }
774
775    #[test]
776    fn skip_async() {
777        let ret = run_async(|tx, l| {
778            let b = Bytes::from_owned(vec![1, 2, 3]);
779            let strm = MemoryInputStream::from_bytes(&b);
780
781            strm.skip_async(
782                10,
783                glib::Priority::DEFAULT_IDLE,
784                crate::Cancellable::NONE,
785                move |ret| {
786                    tx.send(ret).unwrap();
787                    l.quit();
788                },
789            );
790        });
791
792        let skipped = ret.unwrap();
793        assert_eq!(skipped, 3);
794    }
795
796    #[test]
797    fn std_io_read() {
798        let b = Bytes::from_owned(vec![1, 2, 3]);
799        let mut read = MemoryInputStream::from_bytes(&b).into_read();
800        let mut buf = [0u8; 10];
801
802        let ret = read.read(&mut buf);
803
804        assert_eq!(ret.unwrap(), 3);
805        assert_eq!(buf[0], 1);
806        assert_eq!(buf[1], 2);
807        assert_eq!(buf[2], 3);
808    }
809
810    #[test]
811    fn into_input_stream() {
812        let b = Bytes::from_owned(vec![1, 2, 3]);
813        let stream = MemoryInputStream::from_bytes(&b);
814        let stream_clone = stream.clone();
815        let stream = stream.into_read().into_input_stream();
816
817        assert_eq!(stream, stream_clone);
818    }
819}