1use std::{
4 any::Any, cell::Cell, fmt, marker::PhantomData, mem, num::NonZeroU32, panic, pin::Pin, ptr,
5 thread,
6};
7
8use futures_channel::oneshot;
9use futures_core::{
10 future::Future,
11 task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
12};
13use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError};
14use futures_util::FutureExt;
15
16use crate::{
17 ffi, thread_guard::ThreadGuard, translate::*, MainContext, MainLoop, Priority, Source, SourceId,
18};
19
20enum FutureWrapper {
24 Send(FutureObj<'static, Box<dyn Any + Send + 'static>>),
25 NonSend(ThreadGuard<LocalFutureObj<'static, Box<dyn Any + 'static>>>),
26}
27
28unsafe impl Send for FutureWrapper {}
29unsafe impl Sync for FutureWrapper {}
30
31impl Future for FutureWrapper {
32 type Output = Box<dyn Any + 'static>;
33
34 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
35 match self.get_mut() {
36 FutureWrapper::Send(fut) => {
37 Pin::new(fut).poll(ctx).map(|b| b as Box<dyn Any + 'static>)
38 }
39 FutureWrapper::NonSend(fut) => Pin::new(fut.get_mut()).poll(ctx),
40 }
41 }
42}
43
44#[repr(C)]
52struct TaskSource {
53 source: ffi::GSource,
54 future: FutureWrapper,
55 waker: Waker,
56 return_tx: Option<oneshot::Sender<thread::Result<Box<dyn Any + 'static>>>>,
57}
58
59#[repr(C)]
60struct WakerSource {
61 source: ffi::GSource,
62}
63
64impl TaskSource {
65 unsafe extern "C" fn dispatch(
66 source: *mut ffi::GSource,
67 callback: ffi::GSourceFunc,
68 _user_data: ffi::gpointer,
69 ) -> ffi::gboolean {
70 let source = &mut *(source as *mut Self);
71 debug_assert!(callback.is_none());
72
73 if let Poll::Ready(()) = source.poll() {
76 ffi::G_SOURCE_REMOVE
77 } else {
78 ffi::G_SOURCE_CONTINUE
79 }
80 }
81
82 unsafe extern "C" fn finalize(source: *mut ffi::GSource) {
83 let source = source as *mut Self;
84
85 match (*source).future {
92 FutureWrapper::Send(_) => {
93 ptr::drop_in_place(&mut (*source).future);
94 }
95 FutureWrapper::NonSend(ref mut future) if future.is_owner() => {
96 ptr::drop_in_place(&mut (*source).future);
97 }
98 FutureWrapper::NonSend(ref mut future) => {
99 let context = ffi::g_source_get_context(source as *mut ffi::GSource);
100 if !context.is_null() {
101 let future = ptr::read(future);
102 let context = MainContext::from_glib_none(context);
103 context.invoke(move || {
104 drop(future);
105 });
106 } else {
107 ptr::drop_in_place(&mut (*source).future);
109 }
110 }
111 }
112
113 ptr::drop_in_place(&mut (*source).return_tx);
114
115 ptr::drop_in_place(&mut (*source).waker);
117 }
118}
119
120impl WakerSource {
121 unsafe fn clone_raw(waker: *const ()) -> RawWaker {
122 static VTABLE: RawWakerVTable = RawWakerVTable::new(
123 WakerSource::clone_raw,
124 WakerSource::wake_raw,
125 WakerSource::wake_by_ref_raw,
126 WakerSource::drop_raw,
127 );
128
129 let waker = waker as *const ffi::GSource;
130 ffi::g_source_ref(mut_override(waker));
131 RawWaker::new(waker as *const (), &VTABLE)
132 }
133
134 unsafe fn wake_raw(waker: *const ()) {
135 Self::wake_by_ref_raw(waker);
136 Self::drop_raw(waker);
137 }
138
139 unsafe fn wake_by_ref_raw(waker: *const ()) {
140 let waker = waker as *const ffi::GSource;
141 ffi::g_source_set_ready_time(mut_override(waker), 0);
142 }
143
144 unsafe fn drop_raw(waker: *const ()) {
145 let waker = waker as *const ffi::GSource;
146 ffi::g_source_unref(mut_override(waker));
147 }
148
149 unsafe extern "C" fn dispatch(
150 source: *mut ffi::GSource,
151 _callback: ffi::GSourceFunc,
152 _user_data: ffi::gpointer,
153 ) -> ffi::gboolean {
154 ffi::g_source_set_ready_time(mut_override(source), -1);
157 ffi::G_SOURCE_CONTINUE
158 }
159}
160
161unsafe impl Send for TaskSource {}
162unsafe impl Sync for TaskSource {}
163
164unsafe impl Send for WakerSource {}
165unsafe impl Sync for WakerSource {}
166
167impl TaskSource {
168 #[allow(clippy::new_ret_no_self)]
169 fn new(
171 priority: Priority,
172 future: FutureWrapper,
173 return_tx: Option<oneshot::Sender<thread::Result<Box<dyn Any + 'static>>>>,
174 ) -> Source {
175 unsafe {
176 static TASK_SOURCE_FUNCS: ffi::GSourceFuncs = ffi::GSourceFuncs {
177 check: None,
178 prepare: None,
179 dispatch: Some(TaskSource::dispatch),
180 finalize: Some(TaskSource::finalize),
181 closure_callback: None,
182 closure_marshal: None,
183 };
184 static WAKER_SOURCE_FUNCS: ffi::GSourceFuncs = ffi::GSourceFuncs {
185 check: None,
186 prepare: None,
187 dispatch: Some(WakerSource::dispatch),
188 finalize: None,
189 closure_callback: None,
190 closure_marshal: None,
191 };
192
193 let source = ffi::g_source_new(
194 mut_override(&TASK_SOURCE_FUNCS),
195 mem::size_of::<Self>() as u32,
196 );
197
198 let waker_source = ffi::g_source_new(
199 mut_override(&WAKER_SOURCE_FUNCS),
200 mem::size_of::<WakerSource>() as u32,
201 );
202
203 ffi::g_source_set_priority(source, priority.into_glib());
204 ffi::g_source_add_child_source(source, waker_source);
205
206 {
207 let source = &mut *(source as *mut Self);
208 ptr::write(&mut source.future, future);
209 ptr::write(&mut source.return_tx, return_tx);
210
211 let waker = Waker::from_raw(WakerSource::clone_raw(waker_source as *const ()));
213 ptr::write(&mut source.waker, waker);
214 }
215
216 ffi::g_source_set_ready_time(waker_source, 0);
220
221 ffi::g_source_unref(waker_source);
224
225 from_glib_full(source)
226 }
227 }
228
229 fn poll(&mut self) -> Poll<()> {
230 let source = &self.source as *const _;
231 let executor: Borrowed<MainContext> =
232 unsafe { from_glib_borrow(ffi::g_source_get_context(mut_override(source))) };
233
234 assert!(
235 executor.is_owner(),
236 "Polling futures only allowed if the thread is owning the MainContext"
237 );
238
239 executor
240 .with_thread_default(|| {
241 let _enter = futures_executor::enter().unwrap();
242 let mut context = Context::from_waker(&self.waker);
243
244 if let Some(tx) = self.return_tx.take() {
247 let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
248 Pin::new(&mut self.future).poll(&mut context)
249 }));
250 match res {
251 Ok(Poll::Ready(res)) => {
252 let _ = tx.send(Ok(res));
253 Poll::Ready(())
254 }
255 Ok(Poll::Pending) => {
256 self.return_tx.replace(tx);
257 Poll::Pending
258 }
259 Err(e) => {
260 let _ = tx.send(Err(e));
261 Poll::Ready(())
262 }
263 }
264 } else {
265 Pin::new(&mut self.future).poll(&mut context).map(|_| ())
266 }
267 })
268 .expect("current thread is not owner of the main context")
269 }
270}
271
272#[derive(Debug)]
279pub struct JoinHandle<T> {
280 rx: oneshot::Receiver<std::thread::Result<Box<dyn Any + 'static>>>,
281 source: Source,
282 id: Cell<Option<NonZeroU32>>,
283 phantom: PhantomData<oneshot::Receiver<std::thread::Result<T>>>,
284}
285
286impl<T> JoinHandle<T> {
287 #[inline]
288 fn new(
289 ctx: &MainContext,
290 source: Source,
291 rx: oneshot::Receiver<std::thread::Result<Box<dyn Any + 'static>>>,
292 ) -> Self {
293 let id = source.attach(Some(ctx));
294 let id = Cell::new(Some(unsafe { NonZeroU32::new_unchecked(id.as_raw()) }));
295 Self {
296 rx,
297 source,
298 id,
299 phantom: PhantomData,
300 }
301 }
302 #[inline]
307 pub fn as_raw_source_id(&self) -> Option<u32> {
308 self.id.get().map(|i| i.get())
309 }
310 #[inline]
313 pub fn abort(&self) {
314 self.source.destroy();
315 self.id.replace(None);
316 }
317 #[inline]
320 pub fn source(&self) -> &Source {
321 &self.source
322 }
323 pub fn into_source_id(self) -> Result<SourceId, Self> {
329 if let Some(id) = self.id.take() {
330 Ok(unsafe { SourceId::from_glib(id.get()) })
331 } else {
332 Err(self)
333 }
334 }
335}
336
337impl<T: 'static> Future for JoinHandle<T> {
338 type Output = Result<T, JoinError>;
339 #[inline]
340 fn poll(
341 mut self: std::pin::Pin<&mut Self>,
342 cx: &mut std::task::Context<'_>,
343 ) -> std::task::Poll<Self::Output> {
344 std::pin::Pin::new(&mut self.rx).poll(cx).map(|r| match r {
345 Err(_) => Err(JoinErrorInner::Cancelled.into()),
346 Ok(Err(e)) => Err(JoinErrorInner::Panic(e).into()),
347 Ok(Ok(r)) => Ok(*r.downcast().unwrap()),
348 })
349 }
350}
351
352impl<T: 'static> futures_core::FusedFuture for JoinHandle<T> {
353 #[inline]
354 fn is_terminated(&self) -> bool {
355 self.rx.is_terminated()
356 }
357}
358
359unsafe impl<T: Send> Send for JoinHandle<T> {}
363
364#[derive(Debug)]
367pub struct SpawnWithinJoinHandle<T> {
368 rx: Option<oneshot::Receiver<JoinHandle<T>>>,
369 join_handle: Option<JoinHandle<T>>,
370}
371
372impl<T> SpawnWithinJoinHandle<T> {
373 pub async fn into_inner(self) -> Result<JoinHandle<T>, JoinError> {
376 if let Some(join_handle) = self.join_handle {
377 return Ok(join_handle);
378 }
379
380 if let Some(rx) = self.rx {
381 match rx.await {
382 Ok(join_handle) => return Ok(join_handle),
383 Err(_) => return Err(JoinErrorInner::Cancelled.into()),
384 }
385 }
386
387 Err(JoinErrorInner::Cancelled.into())
388 }
389}
390
391impl<T: 'static> Future for SpawnWithinJoinHandle<T> {
392 type Output = Result<T, JoinError>;
393 #[inline]
394 fn poll(
395 mut self: std::pin::Pin<&mut Self>,
396 cx: &mut std::task::Context<'_>,
397 ) -> std::task::Poll<Self::Output> {
398 if let Some(ref mut rx) = self.rx {
399 match std::pin::Pin::new(rx).poll(cx) {
400 std::task::Poll::Pending => return std::task::Poll::Pending,
401 std::task::Poll::Ready(Err(_)) => {
402 self.rx = None;
403 return std::task::Poll::Ready(Err(JoinErrorInner::Cancelled.into()));
404 }
405 std::task::Poll::Ready(Ok(join_handle)) => {
406 self.rx = None;
407 self.join_handle = Some(join_handle);
408 }
409 }
410 }
411
412 if let Some(ref mut join_handle) = self.join_handle {
413 match std::pin::Pin::new(join_handle).poll(cx) {
414 std::task::Poll::Pending => return std::task::Poll::Pending,
415 std::task::Poll::Ready(Err(e)) => {
416 self.join_handle = None;
417 return std::task::Poll::Ready(Err(e));
418 }
419 std::task::Poll::Ready(Ok(r)) => {
420 self.join_handle = None;
421 return std::task::Poll::Ready(Ok(r));
422 }
423 }
424 }
425
426 std::task::Poll::Ready(Err(JoinErrorInner::Cancelled.into()))
427 }
428}
429
430impl<T: 'static> futures_core::FusedFuture for SpawnWithinJoinHandle<T> {
431 #[inline]
432 fn is_terminated(&self) -> bool {
433 if let Some(ref rx) = self.rx {
434 rx.is_terminated()
435 } else if let Some(ref join_handle) = self.join_handle {
436 join_handle.is_terminated()
437 } else {
438 true
439 }
440 }
441}
442
443#[derive(Debug)]
446pub struct JoinError(JoinErrorInner);
447
448impl JoinError {
449 #[inline]
452 pub fn is_cancelled(&self) -> bool {
453 matches!(self.0, JoinErrorInner::Cancelled)
454 }
455 #[inline]
458 pub fn is_panic(&self) -> bool {
459 matches!(self.0, JoinErrorInner::Panic(_))
460 }
461 #[inline]
469 pub fn into_panic(self) -> Box<dyn Any + Send + 'static> {
470 self.try_into_panic()
471 .expect("`JoinError` is not a panic error")
472 }
473 #[inline]
478 pub fn try_into_panic(self) -> Result<Box<dyn Any + Send + 'static>, Self> {
479 match self.0 {
480 JoinErrorInner::Panic(e) => Ok(e),
481 e => Err(Self(e)),
482 }
483 }
484}
485
486impl std::error::Error for JoinError {}
487
488impl std::fmt::Display for JoinError {
489 #[inline]
490 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
491 self.0.fmt(f)
492 }
493}
494
495#[derive(Debug)]
496enum JoinErrorInner {
497 Cancelled,
498 Panic(Box<dyn Any + Send + 'static>),
499}
500
501impl From<JoinErrorInner> for JoinError {
502 #[inline]
503 fn from(e: JoinErrorInner) -> Self {
504 Self(e)
505 }
506}
507
508impl std::error::Error for JoinErrorInner {}
509
510impl fmt::Display for JoinErrorInner {
511 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
512 match self {
513 Self::Cancelled => fmt.write_str("task cancelled"),
514 Self::Panic(_) => fmt.write_str("task panicked"),
515 }
516 }
517}
518
519impl MainContext {
520 pub fn spawn<R: Send + 'static, F: Future<Output = R> + Send + 'static>(
526 &self,
527 f: F,
528 ) -> JoinHandle<R> {
529 self.spawn_with_priority(crate::Priority::default(), f)
530 }
531
532 pub fn spawn_local<R: 'static, F: Future<Output = R> + 'static>(&self, f: F) -> JoinHandle<R> {
541 self.spawn_local_with_priority(crate::Priority::default(), f)
542 }
543
544 pub fn spawn_with_priority<R: Send + 'static, F: Future<Output = R> + Send + 'static>(
550 &self,
551 priority: Priority,
552 f: F,
553 ) -> JoinHandle<R> {
554 let f = FutureObj::new(Box::new(async move {
555 Box::new(f.await) as Box<dyn Any + Send + 'static>
556 }));
557 let (tx, rx) = oneshot::channel();
558 let source = TaskSource::new(priority, FutureWrapper::Send(f), Some(tx));
559 JoinHandle::new(self, source, rx)
560 }
561
562 pub fn spawn_local_with_priority<R: 'static, F: Future<Output = R> + 'static>(
571 &self,
572 priority: Priority,
573 f: F,
574 ) -> JoinHandle<R> {
575 let _acquire = self
576 .acquire()
577 .expect("Spawning local futures only allowed on the thread owning the MainContext");
578 let f = LocalFutureObj::new(Box::new(async move {
579 Box::new(f.await) as Box<dyn Any + 'static>
580 }));
581 let (tx, rx) = oneshot::channel();
582 let source = TaskSource::new(
583 priority,
584 FutureWrapper::NonSend(ThreadGuard::new(f)),
585 Some(tx),
586 );
587 JoinHandle::new(self, source, rx)
588 }
589
590 pub fn spawn_from_within<R: Send + 'static, F: Future<Output = R> + 'static>(
597 &self,
598 func: impl FnOnce() -> F + Send + 'static,
599 ) -> SpawnWithinJoinHandle<R> {
600 self.spawn_from_within_with_priority(crate::Priority::default(), func)
601 }
602
603 pub fn spawn_from_within_with_priority<R: Send + 'static, F: Future<Output = R> + 'static>(
610 &self,
611 priority: Priority,
612 func: impl FnOnce() -> F + Send + 'static,
613 ) -> SpawnWithinJoinHandle<R> {
614 let ctx = self.clone();
615 let (tx, rx) = oneshot::channel();
616 self.invoke_with_priority(priority, move || {
617 let _ = tx.send(ctx.spawn_local(func()));
618 });
619
620 SpawnWithinJoinHandle {
621 rx: Some(rx),
622 join_handle: None,
623 }
624 }
625
626 #[allow(clippy::transmute_ptr_to_ptr)]
635 pub fn block_on<F: Future>(&self, f: F) -> F::Output {
636 let mut res = None;
637 let l = MainLoop::new(Some(self), false);
638
639 let f = async {
640 res = Some(panic::AssertUnwindSafe(f).catch_unwind().await);
641 l.quit();
642 };
643
644 let f = unsafe {
645 let f = LocalFutureObj::new(Box::new(async move {
647 f.await;
648 Box::new(()) as Box<dyn Any + 'static>
649 }));
650 let f: LocalFutureObj<'static, Box<dyn Any + 'static>> = mem::transmute(f);
651 f
652 };
653
654 let source = TaskSource::new(
655 crate::Priority::default(),
656 FutureWrapper::NonSend(ThreadGuard::new(f)),
657 None,
658 );
659 source.attach(Some(self));
660
661 l.run();
662
663 match res.unwrap() {
664 Ok(v) => v,
665 Err(e) => panic::resume_unwind(e),
666 }
667 }
668}
669
670impl Spawn for MainContext {
671 fn spawn_obj(&self, f: FutureObj<'static, ()>) -> Result<(), SpawnError> {
672 let (tx, _) = oneshot::channel();
673 let source = TaskSource::new(
674 crate::Priority::default(),
675 FutureWrapper::Send(FutureObj::new(Box::new(async move {
676 f.await;
677 Box::new(()) as Box<dyn Any + Send + 'static>
678 }))),
679 Some(tx),
680 );
681 source.attach(Some(self));
682 Ok(())
683 }
684}
685
686impl LocalSpawn for MainContext {
687 fn spawn_local_obj(&self, f: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> {
688 let (tx, _) = oneshot::channel();
689 let source = TaskSource::new(
690 crate::Priority::default(),
691 FutureWrapper::NonSend(ThreadGuard::new(LocalFutureObj::new(Box::new(
692 async move {
693 f.await;
694 Box::new(()) as Box<dyn Any + 'static>
695 },
696 )))),
697 Some(tx),
698 );
699 source.attach(Some(self));
700 Ok(())
701 }
702}
703
704#[cfg(test)]
705mod tests {
706 use std::{sync::mpsc, thread};
707
708 use futures_channel::oneshot;
709 use futures_util::future::{FutureExt, TryFutureExt};
710
711 use super::*;
712
713 #[test]
714 fn test_spawn() {
715 let c = MainContext::new();
716 let l = crate::MainLoop::new(Some(&c), false);
717
718 let (sender, receiver) = mpsc::channel();
719 let (o_sender, o_receiver) = oneshot::channel();
720
721 let l_clone = l.clone();
722 c.spawn(
723 o_receiver
724 .and_then(move |()| {
725 sender.send(()).unwrap();
726 l_clone.quit();
727
728 futures_util::future::ok(())
729 })
730 .then(|res| {
731 assert!(res.is_ok());
732 futures_util::future::ready(())
733 }),
734 );
735
736 let join_handle = thread::spawn(move || {
737 l.run();
738 });
739
740 o_sender.send(()).unwrap();
741
742 receiver.recv().unwrap();
743
744 join_handle.join().unwrap();
745 }
746
747 #[test]
748 fn test_spawn_local() {
749 let c = MainContext::new();
750 let l = crate::MainLoop::new(Some(&c), false);
751
752 c.with_thread_default(|| {
753 let l_clone = l.clone();
754 c.spawn_local(futures_util::future::lazy(move |_ctx| {
755 l_clone.quit();
756 }));
757
758 l.run();
759 })
760 .unwrap();
761 }
762
763 #[test]
764 fn test_spawn_from_within() {
765 let c = MainContext::new();
766 let l = crate::MainLoop::new(Some(&c), false);
767
768 let join_handle = std::thread::spawn({
769 let l_clone = l.clone();
770 move || {
771 c.spawn_from_within(move || async move {
772 let rc = std::rc::Rc::new(123);
773 futures_util::future::ready(()).await;
774 assert_eq!(std::rc::Rc::strong_count(&rc), 1);
775 l_clone.quit();
776 });
777 }
778 });
779
780 l.run();
781
782 join_handle.join().unwrap();
783 }
784
785 #[test]
786 fn test_block_on() {
787 let c = MainContext::new();
788
789 let mut v = None;
790 {
791 let v = &mut v;
792
793 let future = futures_util::future::lazy(|_ctx| {
794 *v = Some(123);
795 Ok::<i32, ()>(123)
796 });
797
798 let res = c.block_on(future);
799 assert_eq!(res, Ok(123));
800 }
801
802 assert_eq!(v, Some(123));
803 }
804
805 #[test]
806 fn test_spawn_return() {
807 let c = MainContext::new();
808 c.block_on(async {
809 let val = 1;
810 let ret = c
811 .spawn(async move { futures_util::future::ready(2).await + val })
812 .await;
813 assert_eq!(ret.unwrap(), 3);
814 });
815 }
816
817 #[test]
818 fn test_spawn_panic() {
819 let c = MainContext::new();
820 c.block_on(async {
821 let ret = c
822 .spawn(async {
823 panic!("failed");
824 })
825 .await;
826 assert_eq!(
827 *ret.unwrap_err().into_panic().downcast::<&str>().unwrap(),
828 "failed"
829 );
830 });
831 }
832
833 #[test]
834 fn test_spawn_abort() {
835 let c = MainContext::new();
836 let v = std::sync::Arc::new(1);
837 let v_clone = v.clone();
838 let c_ref = &c;
839 c.block_on(async move {
840 let handle = c_ref.spawn(async move {
841 let _v = v_clone;
842 let test: u128 = std::future::pending().await;
843 println!("{test}");
844 unreachable!();
845 });
846
847 handle.abort();
848 });
849 drop(c);
850
851 assert_eq!(std::sync::Arc::strong_count(&v), 1);
853 }
854}