1use std::{fmt, future::Future, io, mem, pin::Pin, ptr};
4
5use futures_core::task::{Context, Poll};
6use futures_io::{AsyncBufRead, AsyncRead};
7use glib::{prelude::*, translate::*, Priority};
8
9use crate::{error::to_std_io_result, ffi, prelude::*, Cancellable, InputStream, Seekable};
10
11mod sealed {
12 pub trait Sealed {}
13 impl<T: super::IsA<super::InputStream>> Sealed for T {}
14}
15
16pub trait InputStreamExtManual: sealed::Sealed + IsA<InputStream> + Sized {
17 #[doc(alias = "g_input_stream_read")]
49 fn read<B: AsMut<[u8]>, C: IsA<Cancellable>>(
50 &self,
51 mut buffer: B,
52 cancellable: Option<&C>,
53 ) -> Result<usize, glib::Error> {
54 let cancellable = cancellable.map(|c| c.as_ref());
55 let gcancellable = cancellable.to_glib_none();
56 let buffer = buffer.as_mut();
57 let buffer_ptr = buffer.as_mut_ptr();
58 let count = buffer.len();
59 unsafe {
60 let mut error = ptr::null_mut();
61 let ret = ffi::g_input_stream_read(
62 self.as_ref().to_glib_none().0,
63 buffer_ptr,
64 count,
65 gcancellable.0,
66 &mut error,
67 );
68 if error.is_null() {
69 Ok(ret as usize)
70 } else {
71 Err(from_glib_full(error))
72 }
73 }
74 }
75
76 #[doc(alias = "g_input_stream_read_all")]
109 fn read_all<B: AsMut<[u8]>, C: IsA<Cancellable>>(
110 &self,
111 mut buffer: B,
112 cancellable: Option<&C>,
113 ) -> Result<(usize, Option<glib::Error>), glib::Error> {
114 let cancellable = cancellable.map(|c| c.as_ref());
115 let gcancellable = cancellable.to_glib_none();
116 let buffer = buffer.as_mut();
117 let buffer_ptr = buffer.as_mut_ptr();
118 let count = buffer.len();
119 unsafe {
120 let mut bytes_read = mem::MaybeUninit::uninit();
121 let mut error = ptr::null_mut();
122 let _ = ffi::g_input_stream_read_all(
123 self.as_ref().to_glib_none().0,
124 buffer_ptr,
125 count,
126 bytes_read.as_mut_ptr(),
127 gcancellable.0,
128 &mut error,
129 );
130
131 let bytes_read = bytes_read.assume_init();
132 if error.is_null() {
133 Ok((bytes_read, None))
134 } else if bytes_read != 0 {
135 Ok((bytes_read, Some(from_glib_full(error))))
136 } else {
137 Err(from_glib_full(error))
138 }
139 }
140 }
141
142 #[doc(alias = "g_input_stream_read_all_async")]
167 fn read_all_async<
168 B: AsMut<[u8]> + Send + 'static,
169 Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + 'static,
170 C: IsA<Cancellable>,
171 >(
172 &self,
173 buffer: B,
174 io_priority: Priority,
175 cancellable: Option<&C>,
176 callback: Q,
177 ) {
178 let main_context = glib::MainContext::ref_thread_default();
179 let is_main_context_owner = main_context.is_owner();
180 let has_acquired_main_context = (!is_main_context_owner)
181 .then(|| main_context.acquire().ok())
182 .flatten();
183 assert!(
184 is_main_context_owner || has_acquired_main_context.is_some(),
185 "Async operations only allowed if the thread is owning the MainContext"
186 );
187
188 let cancellable = cancellable.map(|c| c.as_ref());
189 let gcancellable = cancellable.to_glib_none();
190 let mut user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
191 Box::new((glib::thread_guard::ThreadGuard::new(callback), buffer));
192 let (count, buffer_ptr) = {
194 let buffer = &mut user_data.1;
195 let slice = (*buffer).as_mut();
196 (slice.len(), slice.as_mut_ptr())
197 };
198 unsafe extern "C" fn read_all_async_trampoline<
199 B: AsMut<[u8]> + Send + 'static,
200 Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + 'static,
201 >(
202 _source_object: *mut glib::gobject_ffi::GObject,
203 res: *mut ffi::GAsyncResult,
204 user_data: glib::ffi::gpointer,
205 ) {
206 let user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
207 Box::from_raw(user_data as *mut _);
208 let (callback, buffer) = *user_data;
209 let callback = callback.into_inner();
210
211 let mut error = ptr::null_mut();
212 let mut bytes_read = mem::MaybeUninit::uninit();
213 let _ = ffi::g_input_stream_read_all_finish(
214 _source_object as *mut _,
215 res,
216 bytes_read.as_mut_ptr(),
217 &mut error,
218 );
219
220 let bytes_read = bytes_read.assume_init();
221 let result = if error.is_null() {
222 Ok((buffer, bytes_read, None))
223 } else if bytes_read != 0 {
224 Ok((buffer, bytes_read, Some(from_glib_full(error))))
225 } else {
226 Err((buffer, from_glib_full(error)))
227 };
228
229 callback(result);
230 }
231 let callback = read_all_async_trampoline::<B, Q>;
232 unsafe {
233 ffi::g_input_stream_read_all_async(
234 self.as_ref().to_glib_none().0,
235 buffer_ptr,
236 count,
237 io_priority.into_glib(),
238 gcancellable.0,
239 Some(callback),
240 Box::into_raw(user_data) as *mut _,
241 );
242 }
243 }
244
245 #[doc(alias = "g_input_stream_read_async")]
284 fn read_async<
285 B: AsMut<[u8]> + Send + 'static,
286 Q: FnOnce(Result<(B, usize), (B, glib::Error)>) + 'static,
287 C: IsA<Cancellable>,
288 >(
289 &self,
290 buffer: B,
291 io_priority: Priority,
292 cancellable: Option<&C>,
293 callback: Q,
294 ) {
295 let main_context = glib::MainContext::ref_thread_default();
296 let is_main_context_owner = main_context.is_owner();
297 let has_acquired_main_context = (!is_main_context_owner)
298 .then(|| main_context.acquire().ok())
299 .flatten();
300 assert!(
301 is_main_context_owner || has_acquired_main_context.is_some(),
302 "Async operations only allowed if the thread is owning the MainContext"
303 );
304
305 let cancellable = cancellable.map(|c| c.as_ref());
306 let gcancellable = cancellable.to_glib_none();
307 let mut user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
308 Box::new((glib::thread_guard::ThreadGuard::new(callback), buffer));
309 let (count, buffer_ptr) = {
311 let buffer = &mut user_data.1;
312 let slice = (*buffer).as_mut();
313 (slice.len(), slice.as_mut_ptr())
314 };
315 unsafe extern "C" fn read_async_trampoline<
316 B: AsMut<[u8]> + Send + 'static,
317 Q: FnOnce(Result<(B, usize), (B, glib::Error)>) + 'static,
318 >(
319 _source_object: *mut glib::gobject_ffi::GObject,
320 res: *mut ffi::GAsyncResult,
321 user_data: glib::ffi::gpointer,
322 ) {
323 let user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
324 Box::from_raw(user_data as *mut _);
325 let (callback, buffer) = *user_data;
326 let callback = callback.into_inner();
327
328 let mut error = ptr::null_mut();
329 let ret = ffi::g_input_stream_read_finish(_source_object as *mut _, res, &mut error);
330
331 let result = if error.is_null() {
332 Ok((buffer, ret as usize))
333 } else {
334 Err((buffer, from_glib_full(error)))
335 };
336
337 callback(result);
338 }
339 let callback = read_async_trampoline::<B, Q>;
340 unsafe {
341 ffi::g_input_stream_read_async(
342 self.as_ref().to_glib_none().0,
343 buffer_ptr,
344 count,
345 io_priority.into_glib(),
346 gcancellable.0,
347 Some(callback),
348 Box::into_raw(user_data) as *mut _,
349 );
350 }
351 }
352
353 fn read_all_future<B: AsMut<[u8]> + Send + 'static>(
354 &self,
355 buffer: B,
356 io_priority: Priority,
357 ) -> Pin<
358 Box<
359 dyn std::future::Future<
360 Output = Result<(B, usize, Option<glib::Error>), (B, glib::Error)>,
361 > + 'static,
362 >,
363 > {
364 Box::pin(crate::GioFuture::new(
365 self,
366 move |obj, cancellable, send| {
367 obj.read_all_async(buffer, io_priority, Some(cancellable), move |res| {
368 send.resolve(res);
369 });
370 },
371 ))
372 }
373
374 fn read_future<B: AsMut<[u8]> + Send + 'static>(
375 &self,
376 buffer: B,
377 io_priority: Priority,
378 ) -> Pin<Box<dyn std::future::Future<Output = Result<(B, usize), (B, glib::Error)>> + 'static>>
379 {
380 Box::pin(crate::GioFuture::new(
381 self,
382 move |obj, cancellable, send| {
383 obj.read_async(buffer, io_priority, Some(cancellable), move |res| {
384 send.resolve(res);
385 });
386 },
387 ))
388 }
389
390 fn into_read(self) -> InputStreamRead<Self>
391 where
392 Self: IsA<InputStream>,
393 {
394 InputStreamRead(self)
395 }
396
397 fn into_async_buf_read(self, buffer_size: usize) -> InputStreamAsyncBufRead<Self>
398 where
399 Self: IsA<InputStream>,
400 {
401 InputStreamAsyncBufRead::new(self, buffer_size)
402 }
403}
404
405impl<O: IsA<InputStream>> InputStreamExtManual for O {}
406
407#[derive(Debug)]
408pub struct InputStreamRead<T: IsA<InputStream>>(T);
409
410impl<T: IsA<InputStream>> InputStreamRead<T> {
411 pub fn into_input_stream(self) -> T {
412 self.0
413 }
414
415 pub fn input_stream(&self) -> &T {
416 &self.0
417 }
418}
419
420impl<T: IsA<InputStream>> io::Read for InputStreamRead<T> {
421 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
422 let gio_result = self.0.as_ref().read(buf, crate::Cancellable::NONE);
423 to_std_io_result(gio_result)
424 }
425}
426
427impl<T: IsA<InputStream> + IsA<Seekable>> io::Seek for InputStreamRead<T> {
428 fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
429 let (pos, type_) = match pos {
430 io::SeekFrom::Start(pos) => (pos as i64, glib::SeekType::Set),
431 io::SeekFrom::End(pos) => (pos, glib::SeekType::End),
432 io::SeekFrom::Current(pos) => (pos, glib::SeekType::Cur),
433 };
434 let seekable: &Seekable = self.0.as_ref();
435 let gio_result = seekable
436 .seek(pos, type_, crate::Cancellable::NONE)
437 .map(|_| seekable.tell() as u64);
438 to_std_io_result(gio_result)
439 }
440}
441
442enum State {
443 Waiting {
444 buffer: Vec<u8>,
445 },
446 Transitioning,
447 Reading {
448 pending: Pin<
449 Box<
450 dyn std::future::Future<Output = Result<(Vec<u8>, usize), (Vec<u8>, glib::Error)>>
451 + 'static,
452 >,
453 >,
454 },
455 HasData {
456 buffer: Vec<u8>,
457 valid: (usize, usize), },
459 Failed(crate::IOErrorEnum),
460}
461
462impl State {
463 fn into_buffer(self) -> Vec<u8> {
464 match self {
465 State::Waiting { buffer } => buffer,
466 _ => panic!("Invalid state"),
467 }
468 }
469
470 #[doc(alias = "get_pending")]
471 fn pending(
472 &mut self,
473 ) -> &mut Pin<
474 Box<
475 dyn std::future::Future<Output = Result<(Vec<u8>, usize), (Vec<u8>, glib::Error)>>
476 + 'static,
477 >,
478 > {
479 match self {
480 State::Reading { ref mut pending } => pending,
481 _ => panic!("Invalid state"),
482 }
483 }
484}
485pub struct InputStreamAsyncBufRead<T: IsA<InputStream>> {
486 stream: T,
487 state: State,
488}
489
490impl<T: IsA<InputStream>> InputStreamAsyncBufRead<T> {
491 pub fn into_input_stream(self) -> T {
492 self.stream
493 }
494
495 pub fn input_stream(&self) -> &T {
496 &self.stream
497 }
498
499 fn new(stream: T, buffer_size: usize) -> Self {
500 let buffer = vec![0; buffer_size];
501
502 Self {
503 stream,
504 state: State::Waiting { buffer },
505 }
506 }
507 fn set_reading(
508 &mut self,
509 ) -> &mut Pin<
510 Box<
511 dyn std::future::Future<Output = Result<(Vec<u8>, usize), (Vec<u8>, glib::Error)>>
512 + 'static,
513 >,
514 > {
515 match self.state {
516 State::Waiting { .. } => {
517 let waiting = mem::replace(&mut self.state, State::Transitioning);
518 let buffer = waiting.into_buffer();
519 let pending = self.input_stream().read_future(buffer, Priority::default());
520 self.state = State::Reading { pending };
521 }
522 State::Reading { .. } => {}
523 _ => panic!("Invalid state"),
524 };
525
526 self.state.pending()
527 }
528
529 #[doc(alias = "get_data")]
530 fn data(&self) -> Poll<io::Result<&[u8]>> {
531 if let State::HasData {
532 ref buffer,
533 valid: (i, j),
534 } = self.state
535 {
536 return Poll::Ready(Ok(&buffer[i..j]));
537 }
538 panic!("Invalid state")
539 }
540
541 fn set_waiting(&mut self, buffer: Vec<u8>) {
542 match self.state {
543 State::Reading { .. } | State::Transitioning => self.state = State::Waiting { buffer },
544 _ => panic!("Invalid state"),
545 }
546 }
547
548 fn set_has_data(&mut self, buffer: Vec<u8>, valid: (usize, usize)) {
549 match self.state {
550 State::Reading { .. } | State::Transitioning { .. } => {
551 self.state = State::HasData { buffer, valid }
552 }
553 _ => panic!("Invalid state"),
554 }
555 }
556
557 fn poll_fill_buf(&mut self, cx: &mut Context) -> Poll<Result<&[u8], futures_io::Error>> {
558 match self.state {
559 State::Failed(kind) => Poll::Ready(Err(io::Error::new(
560 io::ErrorKind::from(kind),
561 BufReadError::Failed,
562 ))),
563 State::HasData { .. } => self.data(),
564 State::Transitioning => panic!("Invalid state"),
565 State::Waiting { .. } | State::Reading { .. } => {
566 let pending = self.set_reading();
567 match Pin::new(pending).poll(cx) {
568 Poll::Ready(Ok((buffer, res))) => {
569 if res == 0 {
570 self.set_waiting(buffer);
571 Poll::Ready(Ok(&[]))
572 } else {
573 self.set_has_data(buffer, (0, res));
574 self.data()
575 }
576 }
577 Poll::Ready(Err((_, err))) => {
578 let kind = err
579 .kind::<crate::IOErrorEnum>()
580 .unwrap_or(crate::IOErrorEnum::Failed);
581 self.state = State::Failed(kind);
582 Poll::Ready(Err(io::Error::new(io::ErrorKind::from(kind), err)))
583 }
584 Poll::Pending => Poll::Pending,
585 }
586 }
587 }
588 }
589
590 fn consume(&mut self, amt: usize) {
591 if amt == 0 {
592 return;
593 }
594
595 if let State::HasData { .. } = self.state {
596 let has_data = mem::replace(&mut self.state, State::Transitioning);
597 if let State::HasData {
598 buffer,
599 valid: (i, j),
600 } = has_data
601 {
602 let available = j - i;
603 if amt > available {
604 panic!("Cannot consume {amt} bytes as only {available} are available",)
605 }
606 let remaining = available - amt;
607 if remaining == 0 {
608 return self.set_waiting(buffer);
609 } else {
610 return self.set_has_data(buffer, (i + amt, j));
611 }
612 }
613 }
614
615 panic!("Invalid state")
616 }
617}
618
619#[derive(Debug)]
620enum BufReadError {
621 Failed,
622}
623
624impl std::error::Error for BufReadError {}
625
626impl fmt::Display for BufReadError {
627 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
628 match self {
629 Self::Failed => fmt.write_str("Previous read operation failed"),
630 }
631 }
632}
633
634impl<T: IsA<InputStream>> AsyncRead for InputStreamAsyncBufRead<T> {
635 fn poll_read(
636 self: Pin<&mut Self>,
637 cx: &mut Context,
638 out_buf: &mut [u8],
639 ) -> Poll<io::Result<usize>> {
640 let reader = self.get_mut();
641 let poll = reader.poll_fill_buf(cx);
642
643 let poll = poll.map_ok(|buffer| {
644 let copied = buffer.len().min(out_buf.len());
645 out_buf[..copied].copy_from_slice(&buffer[..copied]);
646 copied
647 });
648
649 if let Poll::Ready(Ok(consumed)) = poll {
650 reader.consume(consumed);
651 }
652 poll
653 }
654}
655
656impl<T: IsA<InputStream>> AsyncBufRead for InputStreamAsyncBufRead<T> {
657 fn poll_fill_buf(
658 self: Pin<&mut Self>,
659 cx: &mut Context,
660 ) -> Poll<Result<&[u8], futures_io::Error>> {
661 self.get_mut().poll_fill_buf(cx)
662 }
663
664 fn consume(self: Pin<&mut Self>, amt: usize) {
665 self.get_mut().consume(amt);
666 }
667}
668
669impl<T: IsA<InputStream>> Unpin for InputStreamAsyncBufRead<T> {}
670
671#[cfg(test)]
672mod tests {
673 use std::io::Read;
674
675 use glib::Bytes;
676
677 use crate::{prelude::*, test_util::run_async, MemoryInputStream};
678
679 #[test]
680 fn read_all_async() {
681 let ret = run_async(|tx, l| {
682 let b = Bytes::from_owned(vec![1, 2, 3]);
683 let strm = MemoryInputStream::from_bytes(&b);
684
685 let buf = vec![0; 10];
686 strm.read_all_async(
687 buf,
688 glib::Priority::DEFAULT_IDLE,
689 crate::Cancellable::NONE,
690 move |ret| {
691 tx.send(ret).unwrap();
692 l.quit();
693 },
694 );
695 });
696
697 let (buf, count, err) = ret.unwrap();
698 assert_eq!(count, 3);
699 assert!(err.is_none());
700 assert_eq!(buf[0], 1);
701 assert_eq!(buf[1], 2);
702 assert_eq!(buf[2], 3);
703 }
704
705 #[test]
706 fn read_all() {
707 let b = Bytes::from_owned(vec![1, 2, 3]);
708 let strm = MemoryInputStream::from_bytes(&b);
709 let mut buf = vec![0; 10];
710
711 let ret = strm.read_all(&mut buf, crate::Cancellable::NONE).unwrap();
712
713 assert_eq!(ret.0, 3);
714 assert!(ret.1.is_none());
715 assert_eq!(buf[0], 1);
716 assert_eq!(buf[1], 2);
717 assert_eq!(buf[2], 3);
718 }
719
720 #[test]
721 fn read() {
722 let b = Bytes::from_owned(vec![1, 2, 3]);
723 let strm = MemoryInputStream::from_bytes(&b);
724 let mut buf = vec![0; 10];
725
726 let ret = strm.read(&mut buf, crate::Cancellable::NONE);
727
728 assert_eq!(ret.unwrap(), 3);
729 assert_eq!(buf[0], 1);
730 assert_eq!(buf[1], 2);
731 assert_eq!(buf[2], 3);
732 }
733
734 #[test]
735 fn read_async() {
736 let ret = run_async(|tx, l| {
737 let b = Bytes::from_owned(vec![1, 2, 3]);
738 let strm = MemoryInputStream::from_bytes(&b);
739
740 let buf = vec![0; 10];
741 strm.read_async(
742 buf,
743 glib::Priority::DEFAULT_IDLE,
744 crate::Cancellable::NONE,
745 move |ret| {
746 tx.send(ret).unwrap();
747 l.quit();
748 },
749 );
750 });
751
752 let (buf, count) = ret.unwrap();
753 assert_eq!(count, 3);
754 assert_eq!(buf[0], 1);
755 assert_eq!(buf[1], 2);
756 assert_eq!(buf[2], 3);
757 }
758
759 #[test]
760 fn read_bytes_async() {
761 let ret = run_async(|tx, l| {
762 let b = Bytes::from_owned(vec![1, 2, 3]);
763 let strm = MemoryInputStream::from_bytes(&b);
764
765 strm.read_bytes_async(
766 10,
767 glib::Priority::DEFAULT_IDLE,
768 crate::Cancellable::NONE,
769 move |ret| {
770 tx.send(ret).unwrap();
771 l.quit();
772 },
773 );
774 });
775
776 let bytes = ret.unwrap();
777 assert_eq!(bytes, vec![1, 2, 3]);
778 }
779
780 #[test]
781 fn skip_async() {
782 let ret = run_async(|tx, l| {
783 let b = Bytes::from_owned(vec![1, 2, 3]);
784 let strm = MemoryInputStream::from_bytes(&b);
785
786 strm.skip_async(
787 10,
788 glib::Priority::DEFAULT_IDLE,
789 crate::Cancellable::NONE,
790 move |ret| {
791 tx.send(ret).unwrap();
792 l.quit();
793 },
794 );
795 });
796
797 let skipped = ret.unwrap();
798 assert_eq!(skipped, 3);
799 }
800
801 #[test]
802 fn std_io_read() {
803 let b = Bytes::from_owned(vec![1, 2, 3]);
804 let mut read = MemoryInputStream::from_bytes(&b).into_read();
805 let mut buf = [0u8; 10];
806
807 let ret = read.read(&mut buf);
808
809 assert_eq!(ret.unwrap(), 3);
810 assert_eq!(buf[0], 1);
811 assert_eq!(buf[1], 2);
812 assert_eq!(buf[2], 3);
813 }
814
815 #[test]
816 fn into_input_stream() {
817 let b = Bytes::from_owned(vec![1, 2, 3]);
818 let stream = MemoryInputStream::from_bytes(&b);
819 let stream_clone = stream.clone();
820 let stream = stream.into_read().into_input_stream();
821
822 assert_eq!(stream, stream_clone);
823 }
824}