1use std::{fmt, future::Future, io, mem, pin::Pin, ptr};
4
5use futures_core::task::{Context, Poll};
6use futures_io::{AsyncBufRead, AsyncRead};
7use glib::{prelude::*, translate::*, Priority};
8
9use crate::{error::to_std_io_result, ffi, prelude::*, Cancellable, InputStream, Seekable};
10
11pub trait InputStreamExtManual: IsA<InputStream> + Sized {
12 #[doc(alias = "g_input_stream_read")]
44 fn read<B: AsMut<[u8]>, C: IsA<Cancellable>>(
45 &self,
46 mut buffer: B,
47 cancellable: Option<&C>,
48 ) -> Result<usize, glib::Error> {
49 let cancellable = cancellable.map(|c| c.as_ref());
50 let gcancellable = cancellable.to_glib_none();
51 let buffer = buffer.as_mut();
52 let buffer_ptr = buffer.as_mut_ptr();
53 let count = buffer.len();
54 unsafe {
55 let mut error = ptr::null_mut();
56 let ret = ffi::g_input_stream_read(
57 self.as_ref().to_glib_none().0,
58 buffer_ptr,
59 count,
60 gcancellable.0,
61 &mut error,
62 );
63 if error.is_null() {
64 Ok(ret as usize)
65 } else {
66 Err(from_glib_full(error))
67 }
68 }
69 }
70
71 #[doc(alias = "g_input_stream_read_all")]
104 fn read_all<B: AsMut<[u8]>, C: IsA<Cancellable>>(
105 &self,
106 mut buffer: B,
107 cancellable: Option<&C>,
108 ) -> Result<(usize, Option<glib::Error>), glib::Error> {
109 let cancellable = cancellable.map(|c| c.as_ref());
110 let gcancellable = cancellable.to_glib_none();
111 let buffer = buffer.as_mut();
112 let buffer_ptr = buffer.as_mut_ptr();
113 let count = buffer.len();
114 unsafe {
115 let mut bytes_read = mem::MaybeUninit::uninit();
116 let mut error = ptr::null_mut();
117 let _ = ffi::g_input_stream_read_all(
118 self.as_ref().to_glib_none().0,
119 buffer_ptr,
120 count,
121 bytes_read.as_mut_ptr(),
122 gcancellable.0,
123 &mut error,
124 );
125
126 let bytes_read = bytes_read.assume_init();
127 if error.is_null() {
128 Ok((bytes_read, None))
129 } else if bytes_read != 0 {
130 Ok((bytes_read, Some(from_glib_full(error))))
131 } else {
132 Err(from_glib_full(error))
133 }
134 }
135 }
136
137 #[doc(alias = "g_input_stream_read_all_async")]
162 fn read_all_async<
163 B: AsMut<[u8]> + Send + 'static,
164 Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + 'static,
165 C: IsA<Cancellable>,
166 >(
167 &self,
168 buffer: B,
169 io_priority: Priority,
170 cancellable: Option<&C>,
171 callback: Q,
172 ) {
173 let main_context = glib::MainContext::ref_thread_default();
174 let is_main_context_owner = main_context.is_owner();
175 let has_acquired_main_context = (!is_main_context_owner)
176 .then(|| main_context.acquire().ok())
177 .flatten();
178 assert!(
179 is_main_context_owner || has_acquired_main_context.is_some(),
180 "Async operations only allowed if the thread is owning the MainContext"
181 );
182
183 let cancellable = cancellable.map(|c| c.as_ref());
184 let gcancellable = cancellable.to_glib_none();
185 let mut user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
186 Box::new((glib::thread_guard::ThreadGuard::new(callback), buffer));
187 let (count, buffer_ptr) = {
189 let buffer = &mut user_data.1;
190 let slice = (*buffer).as_mut();
191 (slice.len(), slice.as_mut_ptr())
192 };
193 unsafe extern "C" fn read_all_async_trampoline<
194 B: AsMut<[u8]> + Send + 'static,
195 Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + 'static,
196 >(
197 _source_object: *mut glib::gobject_ffi::GObject,
198 res: *mut ffi::GAsyncResult,
199 user_data: glib::ffi::gpointer,
200 ) {
201 let user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
202 Box::from_raw(user_data as *mut _);
203 let (callback, buffer) = *user_data;
204 let callback = callback.into_inner();
205
206 let mut error = ptr::null_mut();
207 let mut bytes_read = mem::MaybeUninit::uninit();
208 let _ = ffi::g_input_stream_read_all_finish(
209 _source_object as *mut _,
210 res,
211 bytes_read.as_mut_ptr(),
212 &mut error,
213 );
214
215 let bytes_read = bytes_read.assume_init();
216 let result = if error.is_null() {
217 Ok((buffer, bytes_read, None))
218 } else if bytes_read != 0 {
219 Ok((buffer, bytes_read, Some(from_glib_full(error))))
220 } else {
221 Err((buffer, from_glib_full(error)))
222 };
223
224 callback(result);
225 }
226 let callback = read_all_async_trampoline::<B, Q>;
227 unsafe {
228 ffi::g_input_stream_read_all_async(
229 self.as_ref().to_glib_none().0,
230 buffer_ptr,
231 count,
232 io_priority.into_glib(),
233 gcancellable.0,
234 Some(callback),
235 Box::into_raw(user_data) as *mut _,
236 );
237 }
238 }
239
240 #[doc(alias = "g_input_stream_read_async")]
279 fn read_async<
280 B: AsMut<[u8]> + Send + 'static,
281 Q: FnOnce(Result<(B, usize), (B, glib::Error)>) + 'static,
282 C: IsA<Cancellable>,
283 >(
284 &self,
285 buffer: B,
286 io_priority: Priority,
287 cancellable: Option<&C>,
288 callback: Q,
289 ) {
290 let main_context = glib::MainContext::ref_thread_default();
291 let is_main_context_owner = main_context.is_owner();
292 let has_acquired_main_context = (!is_main_context_owner)
293 .then(|| main_context.acquire().ok())
294 .flatten();
295 assert!(
296 is_main_context_owner || has_acquired_main_context.is_some(),
297 "Async operations only allowed if the thread is owning the MainContext"
298 );
299
300 let cancellable = cancellable.map(|c| c.as_ref());
301 let gcancellable = cancellable.to_glib_none();
302 let mut user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
303 Box::new((glib::thread_guard::ThreadGuard::new(callback), buffer));
304 let (count, buffer_ptr) = {
306 let buffer = &mut user_data.1;
307 let slice = (*buffer).as_mut();
308 (slice.len(), slice.as_mut_ptr())
309 };
310 unsafe extern "C" fn read_async_trampoline<
311 B: AsMut<[u8]> + Send + 'static,
312 Q: FnOnce(Result<(B, usize), (B, glib::Error)>) + 'static,
313 >(
314 _source_object: *mut glib::gobject_ffi::GObject,
315 res: *mut ffi::GAsyncResult,
316 user_data: glib::ffi::gpointer,
317 ) {
318 let user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
319 Box::from_raw(user_data as *mut _);
320 let (callback, buffer) = *user_data;
321 let callback = callback.into_inner();
322
323 let mut error = ptr::null_mut();
324 let ret = ffi::g_input_stream_read_finish(_source_object as *mut _, res, &mut error);
325
326 let result = if error.is_null() {
327 Ok((buffer, ret as usize))
328 } else {
329 Err((buffer, from_glib_full(error)))
330 };
331
332 callback(result);
333 }
334 let callback = read_async_trampoline::<B, Q>;
335 unsafe {
336 ffi::g_input_stream_read_async(
337 self.as_ref().to_glib_none().0,
338 buffer_ptr,
339 count,
340 io_priority.into_glib(),
341 gcancellable.0,
342 Some(callback),
343 Box::into_raw(user_data) as *mut _,
344 );
345 }
346 }
347
348 fn read_all_future<B: AsMut<[u8]> + Send + 'static>(
349 &self,
350 buffer: B,
351 io_priority: Priority,
352 ) -> Pin<
353 Box<
354 dyn std::future::Future<
355 Output = Result<(B, usize, Option<glib::Error>), (B, glib::Error)>,
356 > + 'static,
357 >,
358 > {
359 Box::pin(crate::GioFuture::new(
360 self,
361 move |obj, cancellable, send| {
362 obj.read_all_async(buffer, io_priority, Some(cancellable), move |res| {
363 send.resolve(res);
364 });
365 },
366 ))
367 }
368
369 fn read_future<B: AsMut<[u8]> + Send + 'static>(
370 &self,
371 buffer: B,
372 io_priority: Priority,
373 ) -> Pin<Box<dyn std::future::Future<Output = Result<(B, usize), (B, glib::Error)>> + 'static>>
374 {
375 Box::pin(crate::GioFuture::new(
376 self,
377 move |obj, cancellable, send| {
378 obj.read_async(buffer, io_priority, Some(cancellable), move |res| {
379 send.resolve(res);
380 });
381 },
382 ))
383 }
384
385 fn into_read(self) -> InputStreamRead<Self>
386 where
387 Self: IsA<InputStream>,
388 {
389 InputStreamRead(self)
390 }
391
392 fn into_async_buf_read(self, buffer_size: usize) -> InputStreamAsyncBufRead<Self>
393 where
394 Self: IsA<InputStream>,
395 {
396 InputStreamAsyncBufRead::new(self, buffer_size)
397 }
398}
399
400impl<O: IsA<InputStream>> InputStreamExtManual for O {}
401
402#[derive(Debug)]
403pub struct InputStreamRead<T: IsA<InputStream>>(T);
404
405impl<T: IsA<InputStream>> InputStreamRead<T> {
406 pub fn into_input_stream(self) -> T {
407 self.0
408 }
409
410 pub fn input_stream(&self) -> &T {
411 &self.0
412 }
413}
414
415impl<T: IsA<InputStream>> io::Read for InputStreamRead<T> {
416 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
417 let gio_result = self.0.as_ref().read(buf, crate::Cancellable::NONE);
418 to_std_io_result(gio_result)
419 }
420}
421
422impl<T: IsA<InputStream> + IsA<Seekable>> io::Seek for InputStreamRead<T> {
423 fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
424 let (pos, type_) = match pos {
425 io::SeekFrom::Start(pos) => (pos as i64, glib::SeekType::Set),
426 io::SeekFrom::End(pos) => (pos, glib::SeekType::End),
427 io::SeekFrom::Current(pos) => (pos, glib::SeekType::Cur),
428 };
429 let seekable: &Seekable = self.0.as_ref();
430 let gio_result = seekable
431 .seek(pos, type_, crate::Cancellable::NONE)
432 .map(|_| seekable.tell() as u64);
433 to_std_io_result(gio_result)
434 }
435}
436
437enum State {
438 Waiting {
439 buffer: Vec<u8>,
440 },
441 Transitioning,
442 Reading {
443 pending: Pin<
444 Box<
445 dyn std::future::Future<Output = Result<(Vec<u8>, usize), (Vec<u8>, glib::Error)>>
446 + 'static,
447 >,
448 >,
449 },
450 HasData {
451 buffer: Vec<u8>,
452 valid: (usize, usize), },
454 Failed(crate::IOErrorEnum),
455}
456
457impl State {
458 fn into_buffer(self) -> Vec<u8> {
459 match self {
460 State::Waiting { buffer } => buffer,
461 _ => panic!("Invalid state"),
462 }
463 }
464
465 #[doc(alias = "get_pending")]
466 fn pending(
467 &mut self,
468 ) -> &mut Pin<
469 Box<
470 dyn std::future::Future<Output = Result<(Vec<u8>, usize), (Vec<u8>, glib::Error)>>
471 + 'static,
472 >,
473 > {
474 match self {
475 State::Reading { ref mut pending } => pending,
476 _ => panic!("Invalid state"),
477 }
478 }
479}
480pub struct InputStreamAsyncBufRead<T: IsA<InputStream>> {
481 stream: T,
482 state: State,
483}
484
485impl<T: IsA<InputStream>> InputStreamAsyncBufRead<T> {
486 pub fn into_input_stream(self) -> T {
487 self.stream
488 }
489
490 pub fn input_stream(&self) -> &T {
491 &self.stream
492 }
493
494 fn new(stream: T, buffer_size: usize) -> Self {
495 let buffer = vec![0; buffer_size];
496
497 Self {
498 stream,
499 state: State::Waiting { buffer },
500 }
501 }
502 fn set_reading(
503 &mut self,
504 ) -> &mut Pin<
505 Box<
506 dyn std::future::Future<Output = Result<(Vec<u8>, usize), (Vec<u8>, glib::Error)>>
507 + 'static,
508 >,
509 > {
510 match self.state {
511 State::Waiting { .. } => {
512 let waiting = mem::replace(&mut self.state, State::Transitioning);
513 let buffer = waiting.into_buffer();
514 let pending = self.input_stream().read_future(buffer, Priority::default());
515 self.state = State::Reading { pending };
516 }
517 State::Reading { .. } => {}
518 _ => panic!("Invalid state"),
519 };
520
521 self.state.pending()
522 }
523
524 #[doc(alias = "get_data")]
525 fn data(&self) -> Poll<io::Result<&[u8]>> {
526 if let State::HasData {
527 ref buffer,
528 valid: (i, j),
529 } = self.state
530 {
531 return Poll::Ready(Ok(&buffer[i..j]));
532 }
533 panic!("Invalid state")
534 }
535
536 fn set_waiting(&mut self, buffer: Vec<u8>) {
537 match self.state {
538 State::Reading { .. } | State::Transitioning => self.state = State::Waiting { buffer },
539 _ => panic!("Invalid state"),
540 }
541 }
542
543 fn set_has_data(&mut self, buffer: Vec<u8>, valid: (usize, usize)) {
544 match self.state {
545 State::Reading { .. } | State::Transitioning => {
546 self.state = State::HasData { buffer, valid }
547 }
548 _ => panic!("Invalid state"),
549 }
550 }
551
552 fn poll_fill_buf(&mut self, cx: &mut Context) -> Poll<Result<&[u8], futures_io::Error>> {
553 match self.state {
554 State::Failed(kind) => Poll::Ready(Err(io::Error::new(
555 io::ErrorKind::from(kind),
556 BufReadError::Failed,
557 ))),
558 State::HasData { .. } => self.data(),
559 State::Transitioning => panic!("Invalid state"),
560 State::Waiting { .. } | State::Reading { .. } => {
561 let pending = self.set_reading();
562 match Pin::new(pending).poll(cx) {
563 Poll::Ready(Ok((buffer, res))) => {
564 if res == 0 {
565 self.set_waiting(buffer);
566 Poll::Ready(Ok(&[]))
567 } else {
568 self.set_has_data(buffer, (0, res));
569 self.data()
570 }
571 }
572 Poll::Ready(Err((_, err))) => {
573 let kind = err
574 .kind::<crate::IOErrorEnum>()
575 .unwrap_or(crate::IOErrorEnum::Failed);
576 self.state = State::Failed(kind);
577 Poll::Ready(Err(io::Error::new(io::ErrorKind::from(kind), err)))
578 }
579 Poll::Pending => Poll::Pending,
580 }
581 }
582 }
583 }
584
585 fn consume(&mut self, amt: usize) {
586 if amt == 0 {
587 return;
588 }
589
590 if let State::HasData { .. } = self.state {
591 let has_data = mem::replace(&mut self.state, State::Transitioning);
592 if let State::HasData {
593 buffer,
594 valid: (i, j),
595 } = has_data
596 {
597 let available = j - i;
598 if amt > available {
599 panic!("Cannot consume {amt} bytes as only {available} are available",)
600 }
601 let remaining = available - amt;
602 if remaining == 0 {
603 return self.set_waiting(buffer);
604 } else {
605 return self.set_has_data(buffer, (i + amt, j));
606 }
607 }
608 }
609
610 panic!("Invalid state")
611 }
612}
613
614#[derive(Debug)]
615enum BufReadError {
616 Failed,
617}
618
619impl std::error::Error for BufReadError {}
620
621impl fmt::Display for BufReadError {
622 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
623 match self {
624 Self::Failed => fmt.write_str("Previous read operation failed"),
625 }
626 }
627}
628
629impl<T: IsA<InputStream>> AsyncRead for InputStreamAsyncBufRead<T> {
630 fn poll_read(
631 self: Pin<&mut Self>,
632 cx: &mut Context,
633 out_buf: &mut [u8],
634 ) -> Poll<io::Result<usize>> {
635 let reader = self.get_mut();
636 let poll = reader.poll_fill_buf(cx);
637
638 let poll = poll.map_ok(|buffer| {
639 let copied = buffer.len().min(out_buf.len());
640 out_buf[..copied].copy_from_slice(&buffer[..copied]);
641 copied
642 });
643
644 if let Poll::Ready(Ok(consumed)) = poll {
645 reader.consume(consumed);
646 }
647 poll
648 }
649}
650
651impl<T: IsA<InputStream>> AsyncBufRead for InputStreamAsyncBufRead<T> {
652 fn poll_fill_buf(
653 self: Pin<&mut Self>,
654 cx: &mut Context,
655 ) -> Poll<Result<&[u8], futures_io::Error>> {
656 self.get_mut().poll_fill_buf(cx)
657 }
658
659 fn consume(self: Pin<&mut Self>, amt: usize) {
660 self.get_mut().consume(amt);
661 }
662}
663
664impl<T: IsA<InputStream>> Unpin for InputStreamAsyncBufRead<T> {}
665
666#[cfg(test)]
667mod tests {
668 use std::io::Read;
669
670 use glib::Bytes;
671
672 use crate::{prelude::*, test_util::run_async, MemoryInputStream};
673
674 #[test]
675 fn read_all_async() {
676 let ret = run_async(|tx, l| {
677 let b = Bytes::from_owned(vec![1, 2, 3]);
678 let strm = MemoryInputStream::from_bytes(&b);
679
680 let buf = vec![0; 10];
681 strm.read_all_async(
682 buf,
683 glib::Priority::DEFAULT_IDLE,
684 crate::Cancellable::NONE,
685 move |ret| {
686 tx.send(ret).unwrap();
687 l.quit();
688 },
689 );
690 });
691
692 let (buf, count, err) = ret.unwrap();
693 assert_eq!(count, 3);
694 assert!(err.is_none());
695 assert_eq!(buf[0], 1);
696 assert_eq!(buf[1], 2);
697 assert_eq!(buf[2], 3);
698 }
699
700 #[test]
701 fn read_all() {
702 let b = Bytes::from_owned(vec![1, 2, 3]);
703 let strm = MemoryInputStream::from_bytes(&b);
704 let mut buf = vec![0; 10];
705
706 let ret = strm.read_all(&mut buf, crate::Cancellable::NONE).unwrap();
707
708 assert_eq!(ret.0, 3);
709 assert!(ret.1.is_none());
710 assert_eq!(buf[0], 1);
711 assert_eq!(buf[1], 2);
712 assert_eq!(buf[2], 3);
713 }
714
715 #[test]
716 fn read() {
717 let b = Bytes::from_owned(vec![1, 2, 3]);
718 let strm = MemoryInputStream::from_bytes(&b);
719 let mut buf = vec![0; 10];
720
721 let ret = strm.read(&mut buf, crate::Cancellable::NONE);
722
723 assert_eq!(ret.unwrap(), 3);
724 assert_eq!(buf[0], 1);
725 assert_eq!(buf[1], 2);
726 assert_eq!(buf[2], 3);
727 }
728
729 #[test]
730 fn read_async() {
731 let ret = run_async(|tx, l| {
732 let b = Bytes::from_owned(vec![1, 2, 3]);
733 let strm = MemoryInputStream::from_bytes(&b);
734
735 let buf = vec![0; 10];
736 strm.read_async(
737 buf,
738 glib::Priority::DEFAULT_IDLE,
739 crate::Cancellable::NONE,
740 move |ret| {
741 tx.send(ret).unwrap();
742 l.quit();
743 },
744 );
745 });
746
747 let (buf, count) = ret.unwrap();
748 assert_eq!(count, 3);
749 assert_eq!(buf[0], 1);
750 assert_eq!(buf[1], 2);
751 assert_eq!(buf[2], 3);
752 }
753
754 #[test]
755 fn read_bytes_async() {
756 let ret = run_async(|tx, l| {
757 let b = Bytes::from_owned(vec![1, 2, 3]);
758 let strm = MemoryInputStream::from_bytes(&b);
759
760 strm.read_bytes_async(
761 10,
762 glib::Priority::DEFAULT_IDLE,
763 crate::Cancellable::NONE,
764 move |ret| {
765 tx.send(ret).unwrap();
766 l.quit();
767 },
768 );
769 });
770
771 let bytes = ret.unwrap();
772 assert_eq!(bytes, vec![1, 2, 3]);
773 }
774
775 #[test]
776 fn skip_async() {
777 let ret = run_async(|tx, l| {
778 let b = Bytes::from_owned(vec![1, 2, 3]);
779 let strm = MemoryInputStream::from_bytes(&b);
780
781 strm.skip_async(
782 10,
783 glib::Priority::DEFAULT_IDLE,
784 crate::Cancellable::NONE,
785 move |ret| {
786 tx.send(ret).unwrap();
787 l.quit();
788 },
789 );
790 });
791
792 let skipped = ret.unwrap();
793 assert_eq!(skipped, 3);
794 }
795
796 #[test]
797 fn std_io_read() {
798 let b = Bytes::from_owned(vec![1, 2, 3]);
799 let mut read = MemoryInputStream::from_bytes(&b).into_read();
800 let mut buf = [0u8; 10];
801
802 let ret = read.read(&mut buf);
803
804 assert_eq!(ret.unwrap(), 3);
805 assert_eq!(buf[0], 1);
806 assert_eq!(buf[1], 2);
807 assert_eq!(buf[2], 3);
808 }
809
810 #[test]
811 fn into_input_stream() {
812 let b = Bytes::from_owned(vec![1, 2, 3]);
813 let stream = MemoryInputStream::from_bytes(&b);
814 let stream_clone = stream.clone();
815 let stream = stream.into_read().into_input_stream();
816
817 assert_eq!(stream, stream_clone);
818 }
819}