gio/
input_stream.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{fmt, future::Future, io, mem, pin::Pin, ptr};
4
5use futures_core::task::{Context, Poll};
6use futures_io::{AsyncBufRead, AsyncRead};
7use glib::{Priority, prelude::*, translate::*};
8
9use crate::{Cancellable, InputStream, Seekable, error::to_std_io_result, ffi, prelude::*};
10
11pub trait InputStreamExtManual: IsA<InputStream> + Sized {
12    /// Tries to read @count bytes from the stream into the buffer starting at
13    /// @buffer. Will block during this read.
14    ///
15    /// If count is zero returns zero and does nothing. A value of @count
16    /// larger than `G_MAXSSIZE` will cause a [`IOErrorEnum::InvalidArgument`][crate::IOErrorEnum::InvalidArgument] error.
17    ///
18    /// On success, the number of bytes read into the buffer is returned.
19    /// It is not an error if this is not the same as the requested size, as it
20    /// can happen e.g. near the end of a file. Zero is returned on end of file
21    /// (or if @count is zero),  but never otherwise.
22    ///
23    /// The returned @buffer is not a nul-terminated string, it can contain nul bytes
24    /// at any position, and this function doesn't nul-terminate the @buffer.
25    ///
26    /// If @cancellable is not [`None`], then the operation can be cancelled by
27    /// triggering the cancellable object from another thread. If the operation
28    /// was cancelled, the error [`IOErrorEnum::Cancelled`][crate::IOErrorEnum::Cancelled] will be returned. If an
29    /// operation was partially finished when the operation was cancelled the
30    /// partial result will be returned, without an error.
31    ///
32    /// On error -1 is returned and @error is set accordingly.
33    /// ## `cancellable`
34    /// optional #GCancellable object, [`None`] to ignore.
35    ///
36    /// # Returns
37    ///
38    /// Number of bytes read, or -1 on error, or 0 on end of file.
39    ///
40    /// ## `buffer`
41    ///
42    ///   a buffer to read data into (which should be at least count bytes long).
43    #[doc(alias = "g_input_stream_read")]
44    fn read<B: AsMut<[u8]>, C: IsA<Cancellable>>(
45        &self,
46        mut buffer: B,
47        cancellable: Option<&C>,
48    ) -> Result<usize, glib::Error> {
49        let cancellable = cancellable.map(|c| c.as_ref());
50        let gcancellable = cancellable.to_glib_none();
51        let buffer = buffer.as_mut();
52        let buffer_ptr = buffer.as_mut_ptr();
53        let count = buffer.len();
54        unsafe {
55            let mut error = ptr::null_mut();
56            let ret = ffi::g_input_stream_read(
57                self.as_ref().to_glib_none().0,
58                buffer_ptr,
59                count,
60                gcancellable.0,
61                &mut error,
62            );
63            if error.is_null() {
64                Ok(ret as usize)
65            } else {
66                Err(from_glib_full(error))
67            }
68        }
69    }
70
71    /// Tries to read @count bytes from the stream into the buffer starting at
72    /// @buffer. Will block during this read.
73    ///
74    /// This function is similar to g_input_stream_read(), except it tries to
75    /// read as many bytes as requested, only stopping on an error or end of stream.
76    ///
77    /// On a successful read of @count bytes, or if we reached the end of the
78    /// stream,  [`true`] is returned, and @bytes_read is set to the number of bytes
79    /// read into @buffer.
80    ///
81    /// If there is an error during the operation [`false`] is returned and @error
82    /// is set to indicate the error status.
83    ///
84    /// As a special exception to the normal conventions for functions that
85    /// use #GError, if this function returns [`false`] (and sets @error) then
86    /// @bytes_read will be set to the number of bytes that were successfully
87    /// read before the error was encountered.  This functionality is only
88    /// available from C.  If you need it from another language then you must
89    /// write your own loop around g_input_stream_read().
90    /// ## `cancellable`
91    /// optional #GCancellable object, [`None`] to ignore.
92    ///
93    /// # Returns
94    ///
95    /// [`true`] on success, [`false`] if there was an error
96    ///
97    /// ## `buffer`
98    ///
99    ///   a buffer to read data into (which should be at least count bytes long).
100    ///
101    /// ## `bytes_read`
102    /// location to store the number of bytes that was read from the stream
103    #[doc(alias = "g_input_stream_read_all")]
104    fn read_all<B: AsMut<[u8]>, C: IsA<Cancellable>>(
105        &self,
106        mut buffer: B,
107        cancellable: Option<&C>,
108    ) -> Result<(usize, Option<glib::Error>), glib::Error> {
109        let cancellable = cancellable.map(|c| c.as_ref());
110        let gcancellable = cancellable.to_glib_none();
111        let buffer = buffer.as_mut();
112        let buffer_ptr = buffer.as_mut_ptr();
113        let count = buffer.len();
114        unsafe {
115            let mut bytes_read = mem::MaybeUninit::uninit();
116            let mut error = ptr::null_mut();
117            let _ = ffi::g_input_stream_read_all(
118                self.as_ref().to_glib_none().0,
119                buffer_ptr,
120                count,
121                bytes_read.as_mut_ptr(),
122                gcancellable.0,
123                &mut error,
124            );
125
126            let bytes_read = bytes_read.assume_init();
127            if error.is_null() {
128                Ok((bytes_read, None))
129            } else if bytes_read != 0 {
130                Ok((bytes_read, Some(from_glib_full(error))))
131            } else {
132                Err(from_glib_full(error))
133            }
134        }
135    }
136
137    /// Request an asynchronous read of @count bytes from the stream into the
138    /// buffer starting at @buffer.
139    ///
140    /// This is the asynchronous equivalent of [`InputStreamExtManual::read_all()`][crate::prelude::InputStreamExtManual::read_all()].
141    ///
142    /// Call `InputStream::read_all_finish()` to collect the result.
143    ///
144    /// Any outstanding I/O request with higher priority (lower numerical
145    /// value) will be executed before an outstanding request with lower
146    /// priority. Default priority is `G_PRIORITY_DEFAULT`.
147    /// ## `io_priority`
148    /// the [I/O priority](iface.AsyncResult.html#io-priority) of the request
149    /// ## `cancellable`
150    /// optional #GCancellable object, [`None`] to ignore
151    /// ## `callback`
152    /// a #GAsyncReadyCallback
153    ///   to call when the request is satisfied
154    ///
155    /// # Returns
156    ///
157    ///
158    /// ## `buffer`
159    ///
160    ///   a buffer to read data into (which should be at least count bytes long)
161    #[doc(alias = "g_input_stream_read_all_async")]
162    fn read_all_async<
163        B: AsMut<[u8]> + Send + 'static,
164        Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + 'static,
165        C: IsA<Cancellable>,
166    >(
167        &self,
168        buffer: B,
169        io_priority: Priority,
170        cancellable: Option<&C>,
171        callback: Q,
172    ) {
173        let main_context = glib::MainContext::ref_thread_default();
174        let is_main_context_owner = main_context.is_owner();
175        let has_acquired_main_context = (!is_main_context_owner)
176            .then(|| main_context.acquire().ok())
177            .flatten();
178        assert!(
179            is_main_context_owner || has_acquired_main_context.is_some(),
180            "Async operations only allowed if the thread is owning the MainContext"
181        );
182
183        let cancellable = cancellable.map(|c| c.as_ref());
184        let gcancellable = cancellable.to_glib_none();
185        let mut user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
186            Box::new((glib::thread_guard::ThreadGuard::new(callback), buffer));
187        // Need to do this after boxing as the contents pointer might change by moving into the box
188        let (count, buffer_ptr) = {
189            let buffer = &mut user_data.1;
190            let slice = (*buffer).as_mut();
191            (slice.len(), slice.as_mut_ptr())
192        };
193        unsafe extern "C" fn read_all_async_trampoline<
194            B: AsMut<[u8]> + Send + 'static,
195            Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + 'static,
196        >(
197            _source_object: *mut glib::gobject_ffi::GObject,
198            res: *mut ffi::GAsyncResult,
199            user_data: glib::ffi::gpointer,
200        ) {
201            unsafe {
202                let user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
203                    Box::from_raw(user_data as *mut _);
204                let (callback, buffer) = *user_data;
205                let callback = callback.into_inner();
206
207                let mut error = ptr::null_mut();
208                let mut bytes_read = mem::MaybeUninit::uninit();
209                let _ = ffi::g_input_stream_read_all_finish(
210                    _source_object as *mut _,
211                    res,
212                    bytes_read.as_mut_ptr(),
213                    &mut error,
214                );
215
216                let bytes_read = bytes_read.assume_init();
217                let result = if error.is_null() {
218                    Ok((buffer, bytes_read, None))
219                } else if bytes_read != 0 {
220                    Ok((buffer, bytes_read, Some(from_glib_full(error))))
221                } else {
222                    Err((buffer, from_glib_full(error)))
223                };
224
225                callback(result);
226            }
227        }
228        let callback = read_all_async_trampoline::<B, Q>;
229        unsafe {
230            ffi::g_input_stream_read_all_async(
231                self.as_ref().to_glib_none().0,
232                buffer_ptr,
233                count,
234                io_priority.into_glib(),
235                gcancellable.0,
236                Some(callback),
237                Box::into_raw(user_data) as *mut _,
238            );
239        }
240    }
241
242    /// Request an asynchronous read of @count bytes from the stream into the buffer
243    /// starting at @buffer. When the operation is finished @callback will be called.
244    /// You can then call g_input_stream_read_finish() to get the result of the
245    /// operation.
246    ///
247    /// During an async request no other sync and async calls are allowed on @self, and will
248    /// result in [`IOErrorEnum::Pending`][crate::IOErrorEnum::Pending] errors.
249    ///
250    /// A value of @count larger than `G_MAXSSIZE` will cause a [`IOErrorEnum::InvalidArgument`][crate::IOErrorEnum::InvalidArgument] error.
251    ///
252    /// On success, the number of bytes read into the buffer will be passed to the
253    /// callback. It is not an error if this is not the same as the requested size, as it
254    /// can happen e.g. near the end of a file, but generally we try to read
255    /// as many bytes as requested. Zero is returned on end of file
256    /// (or if @count is zero),  but never otherwise.
257    ///
258    /// Any outstanding i/o request with higher priority (lower numerical value) will
259    /// be executed before an outstanding request with lower priority. Default
260    /// priority is `G_PRIORITY_DEFAULT`.
261    ///
262    /// The asynchronous methods have a default fallback that uses threads to implement
263    /// asynchronicity, so they are optional for inheriting classes. However, if you
264    /// override one you must override all.
265    /// ## `io_priority`
266    /// the [I/O priority](iface.AsyncResult.html#io-priority)
267    /// of the request.
268    /// ## `cancellable`
269    /// optional #GCancellable object, [`None`] to ignore.
270    /// ## `callback`
271    /// a #GAsyncReadyCallback
272    ///   to call when the request is satisfied
273    ///
274    /// # Returns
275    ///
276    ///
277    /// ## `buffer`
278    ///
279    ///   a buffer to read data into (which should be at least count bytes long).
280    #[doc(alias = "g_input_stream_read_async")]
281    fn read_async<
282        B: AsMut<[u8]> + Send + 'static,
283        Q: FnOnce(Result<(B, usize), (B, glib::Error)>) + 'static,
284        C: IsA<Cancellable>,
285    >(
286        &self,
287        buffer: B,
288        io_priority: Priority,
289        cancellable: Option<&C>,
290        callback: Q,
291    ) {
292        let main_context = glib::MainContext::ref_thread_default();
293        let is_main_context_owner = main_context.is_owner();
294        let has_acquired_main_context = (!is_main_context_owner)
295            .then(|| main_context.acquire().ok())
296            .flatten();
297        assert!(
298            is_main_context_owner || has_acquired_main_context.is_some(),
299            "Async operations only allowed if the thread is owning the MainContext"
300        );
301
302        let cancellable = cancellable.map(|c| c.as_ref());
303        let gcancellable = cancellable.to_glib_none();
304        let mut user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
305            Box::new((glib::thread_guard::ThreadGuard::new(callback), buffer));
306        // Need to do this after boxing as the contents pointer might change by moving into the box
307        let (count, buffer_ptr) = {
308            let buffer = &mut user_data.1;
309            let slice = (*buffer).as_mut();
310            (slice.len(), slice.as_mut_ptr())
311        };
312        unsafe extern "C" fn read_async_trampoline<
313            B: AsMut<[u8]> + Send + 'static,
314            Q: FnOnce(Result<(B, usize), (B, glib::Error)>) + 'static,
315        >(
316            _source_object: *mut glib::gobject_ffi::GObject,
317            res: *mut ffi::GAsyncResult,
318            user_data: glib::ffi::gpointer,
319        ) {
320            unsafe {
321                let user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
322                    Box::from_raw(user_data as *mut _);
323                let (callback, buffer) = *user_data;
324                let callback = callback.into_inner();
325
326                let mut error = ptr::null_mut();
327                let ret =
328                    ffi::g_input_stream_read_finish(_source_object as *mut _, res, &mut error);
329
330                let result = if error.is_null() {
331                    Ok((buffer, ret as usize))
332                } else {
333                    Err((buffer, from_glib_full(error)))
334                };
335
336                callback(result);
337            }
338        }
339        let callback = read_async_trampoline::<B, Q>;
340        unsafe {
341            ffi::g_input_stream_read_async(
342                self.as_ref().to_glib_none().0,
343                buffer_ptr,
344                count,
345                io_priority.into_glib(),
346                gcancellable.0,
347                Some(callback),
348                Box::into_raw(user_data) as *mut _,
349            );
350        }
351    }
352
353    fn read_all_future<B: AsMut<[u8]> + Send + 'static>(
354        &self,
355        buffer: B,
356        io_priority: Priority,
357    ) -> Pin<
358        Box<
359            dyn std::future::Future<
360                    Output = Result<(B, usize, Option<glib::Error>), (B, glib::Error)>,
361                > + 'static,
362        >,
363    > {
364        Box::pin(crate::GioFuture::new(
365            self,
366            move |obj, cancellable, send| {
367                obj.read_all_async(buffer, io_priority, Some(cancellable), move |res| {
368                    send.resolve(res);
369                });
370            },
371        ))
372    }
373
374    fn read_future<B: AsMut<[u8]> + Send + 'static>(
375        &self,
376        buffer: B,
377        io_priority: Priority,
378    ) -> Pin<Box<dyn std::future::Future<Output = Result<(B, usize), (B, glib::Error)>> + 'static>>
379    {
380        Box::pin(crate::GioFuture::new(
381            self,
382            move |obj, cancellable, send| {
383                obj.read_async(buffer, io_priority, Some(cancellable), move |res| {
384                    send.resolve(res);
385                });
386            },
387        ))
388    }
389
390    fn into_read(self) -> InputStreamRead<Self>
391    where
392        Self: IsA<InputStream>,
393    {
394        InputStreamRead(self)
395    }
396
397    fn into_async_buf_read(self, buffer_size: usize) -> InputStreamAsyncBufRead<Self>
398    where
399        Self: IsA<InputStream>,
400    {
401        InputStreamAsyncBufRead::new(self, buffer_size)
402    }
403}
404
405impl<O: IsA<InputStream>> InputStreamExtManual for O {}
406
407#[derive(Debug)]
408pub struct InputStreamRead<T: IsA<InputStream>>(T);
409
410impl<T: IsA<InputStream>> InputStreamRead<T> {
411    pub fn into_input_stream(self) -> T {
412        self.0
413    }
414
415    pub fn input_stream(&self) -> &T {
416        &self.0
417    }
418}
419
420impl<T: IsA<InputStream>> io::Read for InputStreamRead<T> {
421    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
422        let gio_result = self.0.as_ref().read(buf, crate::Cancellable::NONE);
423        to_std_io_result(gio_result)
424    }
425}
426
427impl<T: IsA<InputStream> + IsA<Seekable>> io::Seek for InputStreamRead<T> {
428    fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
429        let (pos, type_) = match pos {
430            io::SeekFrom::Start(pos) => (pos as i64, glib::SeekType::Set),
431            io::SeekFrom::End(pos) => (pos, glib::SeekType::End),
432            io::SeekFrom::Current(pos) => (pos, glib::SeekType::Cur),
433        };
434        let seekable: &Seekable = self.0.as_ref();
435        let gio_result = seekable
436            .seek(pos, type_, crate::Cancellable::NONE)
437            .map(|_| seekable.tell() as u64);
438        to_std_io_result(gio_result)
439    }
440}
441
442enum State {
443    Waiting {
444        buffer: Vec<u8>,
445    },
446    Transitioning,
447    Reading {
448        pending: Pin<
449            Box<
450                dyn std::future::Future<Output = Result<(Vec<u8>, usize), (Vec<u8>, glib::Error)>>
451                    + 'static,
452            >,
453        >,
454    },
455    HasData {
456        buffer: Vec<u8>,
457        valid: (usize, usize), // first index is inclusive, second is exclusive
458    },
459    Failed(crate::IOErrorEnum),
460}
461
462impl State {
463    fn into_buffer(self) -> Vec<u8> {
464        match self {
465            State::Waiting { buffer } => buffer,
466            _ => panic!("Invalid state"),
467        }
468    }
469
470    #[doc(alias = "get_pending")]
471    fn pending(
472        &mut self,
473    ) -> &mut Pin<
474        Box<
475            dyn std::future::Future<Output = Result<(Vec<u8>, usize), (Vec<u8>, glib::Error)>>
476                + 'static,
477        >,
478    > {
479        match self {
480            State::Reading { pending } => pending,
481            _ => panic!("Invalid state"),
482        }
483    }
484}
485pub struct InputStreamAsyncBufRead<T: IsA<InputStream>> {
486    stream: T,
487    state: State,
488}
489
490impl<T: IsA<InputStream>> InputStreamAsyncBufRead<T> {
491    pub fn into_input_stream(self) -> T {
492        self.stream
493    }
494
495    pub fn input_stream(&self) -> &T {
496        &self.stream
497    }
498
499    fn new(stream: T, buffer_size: usize) -> Self {
500        let buffer = vec![0; buffer_size];
501
502        Self {
503            stream,
504            state: State::Waiting { buffer },
505        }
506    }
507    fn set_reading(
508        &mut self,
509    ) -> &mut Pin<
510        Box<
511            dyn std::future::Future<Output = Result<(Vec<u8>, usize), (Vec<u8>, glib::Error)>>
512                + 'static,
513        >,
514    > {
515        match self.state {
516            State::Waiting { .. } => {
517                let waiting = mem::replace(&mut self.state, State::Transitioning);
518                let buffer = waiting.into_buffer();
519                let pending = self.input_stream().read_future(buffer, Priority::default());
520                self.state = State::Reading { pending };
521            }
522            State::Reading { .. } => {}
523            _ => panic!("Invalid state"),
524        };
525
526        self.state.pending()
527    }
528
529    #[doc(alias = "get_data")]
530    fn data(&self) -> Poll<io::Result<&[u8]>> {
531        if let State::HasData {
532            ref buffer,
533            valid: (i, j),
534        } = self.state
535        {
536            return Poll::Ready(Ok(&buffer[i..j]));
537        }
538        panic!("Invalid state")
539    }
540
541    fn set_waiting(&mut self, buffer: Vec<u8>) {
542        match self.state {
543            State::Reading { .. } | State::Transitioning => self.state = State::Waiting { buffer },
544            _ => panic!("Invalid state"),
545        }
546    }
547
548    fn set_has_data(&mut self, buffer: Vec<u8>, valid: (usize, usize)) {
549        match self.state {
550            State::Reading { .. } | State::Transitioning => {
551                self.state = State::HasData { buffer, valid }
552            }
553            _ => panic!("Invalid state"),
554        }
555    }
556
557    fn poll_fill_buf(&mut self, cx: &mut Context) -> Poll<Result<&[u8], futures_io::Error>> {
558        match self.state {
559            State::Failed(kind) => Poll::Ready(Err(io::Error::new(
560                io::ErrorKind::from(kind),
561                BufReadError::Failed,
562            ))),
563            State::HasData { .. } => self.data(),
564            State::Transitioning => panic!("Invalid state"),
565            State::Waiting { .. } | State::Reading { .. } => {
566                let pending = self.set_reading();
567                match Pin::new(pending).poll(cx) {
568                    Poll::Ready(Ok((buffer, res))) => {
569                        if res == 0 {
570                            self.set_waiting(buffer);
571                            Poll::Ready(Ok(&[]))
572                        } else {
573                            self.set_has_data(buffer, (0, res));
574                            self.data()
575                        }
576                    }
577                    Poll::Ready(Err((_, err))) => {
578                        let kind = err
579                            .kind::<crate::IOErrorEnum>()
580                            .unwrap_or(crate::IOErrorEnum::Failed);
581                        self.state = State::Failed(kind);
582                        Poll::Ready(Err(io::Error::new(io::ErrorKind::from(kind), err)))
583                    }
584                    Poll::Pending => Poll::Pending,
585                }
586            }
587        }
588    }
589
590    fn consume(&mut self, amt: usize) {
591        if amt == 0 {
592            return;
593        }
594
595        if let State::HasData { .. } = self.state {
596            let has_data = mem::replace(&mut self.state, State::Transitioning);
597            if let State::HasData {
598                buffer,
599                valid: (i, j),
600            } = has_data
601            {
602                let available = j - i;
603                if amt > available {
604                    panic!("Cannot consume {amt} bytes as only {available} are available",)
605                }
606                let remaining = available - amt;
607                if remaining == 0 {
608                    return self.set_waiting(buffer);
609                } else {
610                    return self.set_has_data(buffer, (i + amt, j));
611                }
612            }
613        }
614
615        panic!("Invalid state")
616    }
617}
618
619#[derive(Debug)]
620enum BufReadError {
621    Failed,
622}
623
624impl std::error::Error for BufReadError {}
625
626impl fmt::Display for BufReadError {
627    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
628        match self {
629            Self::Failed => fmt.write_str("Previous read operation failed"),
630        }
631    }
632}
633
634impl<T: IsA<InputStream>> AsyncRead for InputStreamAsyncBufRead<T> {
635    fn poll_read(
636        self: Pin<&mut Self>,
637        cx: &mut Context,
638        out_buf: &mut [u8],
639    ) -> Poll<io::Result<usize>> {
640        let reader = self.get_mut();
641        let poll = reader.poll_fill_buf(cx);
642
643        let poll = poll.map_ok(|buffer| {
644            let copied = buffer.len().min(out_buf.len());
645            out_buf[..copied].copy_from_slice(&buffer[..copied]);
646            copied
647        });
648
649        if let Poll::Ready(Ok(consumed)) = poll {
650            reader.consume(consumed);
651        }
652        poll
653    }
654}
655
656impl<T: IsA<InputStream>> AsyncBufRead for InputStreamAsyncBufRead<T> {
657    fn poll_fill_buf(
658        self: Pin<&mut Self>,
659        cx: &mut Context,
660    ) -> Poll<Result<&[u8], futures_io::Error>> {
661        self.get_mut().poll_fill_buf(cx)
662    }
663
664    fn consume(self: Pin<&mut Self>, amt: usize) {
665        self.get_mut().consume(amt);
666    }
667}
668
669impl<T: IsA<InputStream>> Unpin for InputStreamAsyncBufRead<T> {}
670
671#[cfg(test)]
672mod tests {
673    use std::io::Read;
674
675    use glib::Bytes;
676
677    use crate::{MemoryInputStream, prelude::*, test_util::run_async};
678
679    #[test]
680    fn read_all_async() {
681        let ret = run_async(|tx, l| {
682            let b = Bytes::from_owned(vec![1, 2, 3]);
683            let strm = MemoryInputStream::from_bytes(&b);
684
685            let buf = vec![0; 10];
686            strm.read_all_async(
687                buf,
688                glib::Priority::DEFAULT_IDLE,
689                crate::Cancellable::NONE,
690                move |ret| {
691                    tx.send(ret).unwrap();
692                    l.quit();
693                },
694            );
695        });
696
697        let (buf, count, err) = ret.unwrap();
698        assert_eq!(count, 3);
699        assert!(err.is_none());
700        assert_eq!(buf[0], 1);
701        assert_eq!(buf[1], 2);
702        assert_eq!(buf[2], 3);
703    }
704
705    #[test]
706    fn read_all() {
707        let b = Bytes::from_owned(vec![1, 2, 3]);
708        let strm = MemoryInputStream::from_bytes(&b);
709        let mut buf = vec![0; 10];
710
711        let ret = strm.read_all(&mut buf, crate::Cancellable::NONE).unwrap();
712
713        assert_eq!(ret.0, 3);
714        assert!(ret.1.is_none());
715        assert_eq!(buf[0], 1);
716        assert_eq!(buf[1], 2);
717        assert_eq!(buf[2], 3);
718    }
719
720    #[test]
721    fn read() {
722        let b = Bytes::from_owned(vec![1, 2, 3]);
723        let strm = MemoryInputStream::from_bytes(&b);
724        let mut buf = vec![0; 10];
725
726        let ret = strm.read(&mut buf, crate::Cancellable::NONE);
727
728        assert_eq!(ret.unwrap(), 3);
729        assert_eq!(buf[0], 1);
730        assert_eq!(buf[1], 2);
731        assert_eq!(buf[2], 3);
732    }
733
734    #[test]
735    fn read_async() {
736        let ret = run_async(|tx, l| {
737            let b = Bytes::from_owned(vec![1, 2, 3]);
738            let strm = MemoryInputStream::from_bytes(&b);
739
740            let buf = vec![0; 10];
741            strm.read_async(
742                buf,
743                glib::Priority::DEFAULT_IDLE,
744                crate::Cancellable::NONE,
745                move |ret| {
746                    tx.send(ret).unwrap();
747                    l.quit();
748                },
749            );
750        });
751
752        let (buf, count) = ret.unwrap();
753        assert_eq!(count, 3);
754        assert_eq!(buf[0], 1);
755        assert_eq!(buf[1], 2);
756        assert_eq!(buf[2], 3);
757    }
758
759    #[test]
760    fn read_bytes_async() {
761        let ret = run_async(|tx, l| {
762            let b = Bytes::from_owned(vec![1, 2, 3]);
763            let strm = MemoryInputStream::from_bytes(&b);
764
765            strm.read_bytes_async(
766                10,
767                glib::Priority::DEFAULT_IDLE,
768                crate::Cancellable::NONE,
769                move |ret| {
770                    tx.send(ret).unwrap();
771                    l.quit();
772                },
773            );
774        });
775
776        let bytes = ret.unwrap();
777        assert_eq!(bytes, vec![1, 2, 3]);
778    }
779
780    #[test]
781    fn skip_async() {
782        let ret = run_async(|tx, l| {
783            let b = Bytes::from_owned(vec![1, 2, 3]);
784            let strm = MemoryInputStream::from_bytes(&b);
785
786            strm.skip_async(
787                10,
788                glib::Priority::DEFAULT_IDLE,
789                crate::Cancellable::NONE,
790                move |ret| {
791                    tx.send(ret).unwrap();
792                    l.quit();
793                },
794            );
795        });
796
797        let skipped = ret.unwrap();
798        assert_eq!(skipped, 3);
799    }
800
801    #[test]
802    fn std_io_read() {
803        let b = Bytes::from_owned(vec![1, 2, 3]);
804        let mut read = MemoryInputStream::from_bytes(&b).into_read();
805        let mut buf = [0u8; 10];
806
807        let ret = read.read(&mut buf);
808
809        assert_eq!(ret.unwrap(), 3);
810        assert_eq!(buf[0], 1);
811        assert_eq!(buf[1], 2);
812        assert_eq!(buf[2], 3);
813    }
814
815    #[test]
816    fn into_input_stream() {
817        let b = Bytes::from_owned(vec![1, 2, 3]);
818        let stream = MemoryInputStream::from_bytes(&b);
819        let stream_clone = stream.clone();
820        let stream = stream.into_read().into_input_stream();
821
822        assert_eq!(stream, stream_clone);
823    }
824}