glib/
main_context_futures.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{
4    any::Any, cell::Cell, fmt, marker::PhantomData, mem, num::NonZeroU32, panic, pin::Pin, ptr,
5    thread,
6};
7
8use futures_channel::oneshot;
9use futures_core::{
10    future::Future,
11    task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
12};
13use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError};
14use futures_util::FutureExt;
15
16use crate::{
17    MainContext, MainLoop, Priority, Source, SourceId, ffi, thread_guard::ThreadGuard, translate::*,
18};
19
20// Wrapper around Send Futures and non-Send Futures that will panic
21// if the non-Send Future is polled/dropped from a different thread
22// than where this was created.
23enum FutureWrapper {
24    Send(FutureObj<'static, Box<dyn Any + Send + 'static>>),
25    NonSend(ThreadGuard<LocalFutureObj<'static, Box<dyn Any + 'static>>>),
26}
27
28unsafe impl Send for FutureWrapper {}
29unsafe impl Sync for FutureWrapper {}
30
31impl Future for FutureWrapper {
32    type Output = Box<dyn Any + 'static>;
33
34    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
35        match self.get_mut() {
36            FutureWrapper::Send(fut) => {
37                Pin::new(fut).poll(ctx).map(|b| b as Box<dyn Any + 'static>)
38            }
39            FutureWrapper::NonSend(fut) => Pin::new(fut.get_mut()).poll(ctx),
40        }
41    }
42}
43
44// The TaskSource and WakerSource are split up as the TaskSource
45// must only be finalized on the thread that owns the main context,
46// but the WakerSource is passed around to arbitrary threads for
47// being able to wake up the TaskSource.
48//
49// The WakerSource is set up as a child source of the TaskSource, i.e.
50// whenever it is ready also the TaskSource is ready.
51#[repr(C)]
52struct TaskSource {
53    source: ffi::GSource,
54    future: FutureWrapper,
55    waker: Waker,
56    return_tx: Option<oneshot::Sender<thread::Result<Box<dyn Any + 'static>>>>,
57}
58
59#[repr(C)]
60struct WakerSource {
61    source: ffi::GSource,
62}
63
64impl TaskSource {
65    unsafe extern "C" fn dispatch(
66        source: *mut ffi::GSource,
67        callback: ffi::GSourceFunc,
68        _user_data: ffi::gpointer,
69    ) -> ffi::gboolean {
70        unsafe {
71            let source = &mut *(source as *mut Self);
72            debug_assert!(callback.is_none());
73
74            // Poll the TaskSource and ensure we're never called again if the
75            // contained Future resolved now.
76            if let Poll::Ready(()) = source.poll() {
77                ffi::G_SOURCE_REMOVE
78            } else {
79                ffi::G_SOURCE_CONTINUE
80            }
81        }
82    }
83
84    unsafe extern "C" fn finalize(source: *mut ffi::GSource) {
85        unsafe {
86            let source = source as *mut Self;
87
88            // This will panic if the future was a local future and is dropped from a different thread
89            // than where it was created so try to drop it from the main context if we're on another
90            // thread and the main context still exists.
91            //
92            // This can only really happen if the `Source` was manually retrieve from the context, but
93            // better safe than sorry.
94            match (*source).future {
95                FutureWrapper::Send(_) => {
96                    ptr::drop_in_place(&mut (*source).future);
97                }
98                FutureWrapper::NonSend(ref mut future) if future.is_owner() => {
99                    ptr::drop_in_place(&mut (*source).future);
100                }
101                FutureWrapper::NonSend(ref mut future) => {
102                    let context = ffi::g_source_get_context(source as *mut ffi::GSource);
103                    if !context.is_null() {
104                        let future = ptr::read(future);
105                        let context = MainContext::from_glib_none(context);
106                        context.invoke(move || {
107                            drop(future);
108                        });
109                    } else {
110                        // This will panic
111                        ptr::drop_in_place(&mut (*source).future);
112                    }
113                }
114            }
115
116            ptr::drop_in_place(&mut (*source).return_tx);
117
118            // Drop the waker to unref the underlying GSource
119            ptr::drop_in_place(&mut (*source).waker);
120        }
121    }
122}
123
124impl WakerSource {
125    unsafe fn clone_raw(waker: *const ()) -> RawWaker {
126        unsafe {
127            static VTABLE: RawWakerVTable = RawWakerVTable::new(
128                WakerSource::clone_raw,
129                WakerSource::wake_raw,
130                WakerSource::wake_by_ref_raw,
131                WakerSource::drop_raw,
132            );
133
134            let waker = waker as *const ffi::GSource;
135            ffi::g_source_ref(mut_override(waker));
136            RawWaker::new(waker as *const (), &VTABLE)
137        }
138    }
139
140    unsafe fn wake_raw(waker: *const ()) {
141        unsafe {
142            Self::wake_by_ref_raw(waker);
143            Self::drop_raw(waker);
144        }
145    }
146
147    unsafe fn wake_by_ref_raw(waker: *const ()) {
148        unsafe {
149            let waker = waker as *const ffi::GSource;
150            ffi::g_source_set_ready_time(mut_override(waker), 0);
151        }
152    }
153
154    unsafe fn drop_raw(waker: *const ()) {
155        unsafe {
156            let waker = waker as *const ffi::GSource;
157            ffi::g_source_unref(mut_override(waker));
158        }
159    }
160
161    unsafe extern "C" fn dispatch(
162        source: *mut ffi::GSource,
163        _callback: ffi::GSourceFunc,
164        _user_data: ffi::gpointer,
165    ) -> ffi::gboolean {
166        unsafe {
167            // Set ready-time to -1 so that we're not called again before
168            // being woken up another time.
169            ffi::g_source_set_ready_time(mut_override(source), -1);
170            ffi::G_SOURCE_CONTINUE
171        }
172    }
173}
174
175unsafe impl Send for TaskSource {}
176unsafe impl Sync for TaskSource {}
177
178unsafe impl Send for WakerSource {}
179unsafe impl Sync for WakerSource {}
180
181impl TaskSource {
182    #[allow(clippy::new_ret_no_self)]
183    // checker-ignore-item
184    fn new(
185        priority: Priority,
186        future: FutureWrapper,
187        return_tx: Option<oneshot::Sender<thread::Result<Box<dyn Any + 'static>>>>,
188    ) -> Source {
189        unsafe {
190            static TASK_SOURCE_FUNCS: ffi::GSourceFuncs = ffi::GSourceFuncs {
191                check: None,
192                prepare: None,
193                dispatch: Some(TaskSource::dispatch),
194                finalize: Some(TaskSource::finalize),
195                closure_callback: None,
196                closure_marshal: None,
197            };
198            static WAKER_SOURCE_FUNCS: ffi::GSourceFuncs = ffi::GSourceFuncs {
199                check: None,
200                prepare: None,
201                dispatch: Some(WakerSource::dispatch),
202                finalize: None,
203                closure_callback: None,
204                closure_marshal: None,
205            };
206
207            let source = ffi::g_source_new(
208                mut_override(&TASK_SOURCE_FUNCS),
209                mem::size_of::<Self>() as u32,
210            );
211
212            let waker_source = ffi::g_source_new(
213                mut_override(&WAKER_SOURCE_FUNCS),
214                mem::size_of::<WakerSource>() as u32,
215            );
216
217            ffi::g_source_set_priority(source, priority.into_glib());
218            ffi::g_source_add_child_source(source, waker_source);
219
220            {
221                let source = &mut *(source as *mut Self);
222                ptr::write(&mut source.future, future);
223                ptr::write(&mut source.return_tx, return_tx);
224
225                // This creates a new reference to the waker source.
226                let waker = Waker::from_raw(WakerSource::clone_raw(waker_source as *const ()));
227                ptr::write(&mut source.waker, waker);
228            }
229
230            // Set ready time to 0 so that the source is immediately dispatched
231            // for doing the initial polling. This will then either resolve the
232            // future or register the waker wherever necessary.
233            ffi::g_source_set_ready_time(waker_source, 0);
234
235            // Unref the waker source, a strong reference to it is stored inside
236            // the task source directly and inside the task source as child source.
237            ffi::g_source_unref(waker_source);
238
239            from_glib_full(source)
240        }
241    }
242
243    fn poll(&mut self) -> Poll<()> {
244        let source = &self.source as *const _;
245        let executor: Borrowed<MainContext> =
246            unsafe { from_glib_borrow(ffi::g_source_get_context(mut_override(source))) };
247
248        assert!(
249            executor.is_owner(),
250            "Polling futures only allowed if the thread is owning the MainContext"
251        );
252
253        executor
254            .with_thread_default(|| {
255                let _enter = futures_executor::enter().unwrap();
256                let mut context = Context::from_waker(&self.waker);
257
258                // This will panic if the future was a local future and is called from
259                // a different thread than where it was created.
260                match self.return_tx.take() {
261                    Some(tx) => {
262                        let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
263                            Pin::new(&mut self.future).poll(&mut context)
264                        }));
265                        match res {
266                            Ok(Poll::Ready(res)) => {
267                                let _ = tx.send(Ok(res));
268                                Poll::Ready(())
269                            }
270                            Ok(Poll::Pending) => {
271                                self.return_tx.replace(tx);
272                                Poll::Pending
273                            }
274                            Err(e) => {
275                                let _ = tx.send(Err(e));
276                                Poll::Ready(())
277                            }
278                        }
279                    }
280                    _ => Pin::new(&mut self.future).poll(&mut context).map(|_| ()),
281                }
282            })
283            .expect("current thread is not owner of the main context")
284    }
285}
286
287// rustdoc-stripper-ignore-next
288/// A handle to a task running on a [`MainContext`].
289///
290/// Like [`std::thread::JoinHandle`] for a task rather than a thread. The return value from the
291/// task can be retrieved by awaiting on this object. Dropping the handle "detaches" the task,
292/// allowing it to complete but discarding the return value.
293#[derive(Debug)]
294pub struct JoinHandle<T> {
295    rx: oneshot::Receiver<std::thread::Result<Box<dyn Any + 'static>>>,
296    source: Source,
297    id: Cell<Option<NonZeroU32>>,
298    phantom: PhantomData<oneshot::Receiver<std::thread::Result<T>>>,
299}
300
301impl<T> JoinHandle<T> {
302    #[inline]
303    fn new(
304        ctx: &MainContext,
305        source: Source,
306        rx: oneshot::Receiver<std::thread::Result<Box<dyn Any + 'static>>>,
307    ) -> Self {
308        let id = source.attach(Some(ctx));
309        let id = Cell::new(Some(unsafe { NonZeroU32::new_unchecked(id.as_raw()) }));
310        Self {
311            rx,
312            source,
313            id,
314            phantom: PhantomData,
315        }
316    }
317    // rustdoc-stripper-ignore-next
318    /// Returns the internal source ID.
319    ///
320    /// Returns `None` if the handle was aborted already.
321    #[inline]
322    pub fn as_raw_source_id(&self) -> Option<u32> {
323        self.id.get().map(|i| i.get())
324    }
325    // rustdoc-stripper-ignore-next
326    /// Aborts the task associated with the handle.
327    #[inline]
328    pub fn abort(&self) {
329        self.source.destroy();
330        self.id.replace(None);
331    }
332    // rustdoc-stripper-ignore-next
333    /// Returns the [`Source`] associated with this handle.
334    #[inline]
335    pub fn source(&self) -> &Source {
336        &self.source
337    }
338    // rustdoc-stripper-ignore-next
339    /// Safely converts the handle into a [`SourceId`].
340    ///
341    /// Can be used to discard the return value while still retaining the ability to abort the
342    /// underlying task. Returns `Err(self)` if the handle was aborted already.
343    pub fn into_source_id(self) -> Result<SourceId, Self> {
344        if let Some(id) = self.id.take() {
345            Ok(unsafe { SourceId::from_glib(id.get()) })
346        } else {
347            Err(self)
348        }
349    }
350}
351
352impl<T: 'static> Future for JoinHandle<T> {
353    type Output = Result<T, JoinError>;
354    #[inline]
355    fn poll(
356        mut self: std::pin::Pin<&mut Self>,
357        cx: &mut std::task::Context<'_>,
358    ) -> std::task::Poll<Self::Output> {
359        std::pin::Pin::new(&mut self.rx).poll(cx).map(|r| match r {
360            Err(_) => Err(JoinErrorInner::Cancelled.into()),
361            Ok(Err(e)) => Err(JoinErrorInner::Panic(e).into()),
362            Ok(Ok(r)) => Ok(*r.downcast().unwrap()),
363        })
364    }
365}
366
367impl<T: 'static> futures_core::FusedFuture for JoinHandle<T> {
368    #[inline]
369    fn is_terminated(&self) -> bool {
370        self.rx.is_terminated()
371    }
372}
373
374// Safety: We can't rely on the auto implementation because we are retrieving
375// the result as a `Box<dyn Any + 'static>` from the [`Source`]. We need to
376// rely on type erasure here, so we have to manually assert the Send bound too.
377unsafe impl<T: Send> Send for JoinHandle<T> {}
378
379// rustdoc-stripper-ignore-next
380/// Variant of [`JoinHandle`] that is returned from [`MainContext::spawn_from_within`].
381#[derive(Debug)]
382pub struct SpawnWithinJoinHandle<T> {
383    rx: Option<oneshot::Receiver<JoinHandle<T>>>,
384    join_handle: Option<JoinHandle<T>>,
385}
386
387impl<T> SpawnWithinJoinHandle<T> {
388    // rustdoc-stripper-ignore-next
389    /// Waits until the task is spawned and returns the [`JoinHandle`].
390    pub async fn into_inner(self) -> Result<JoinHandle<T>, JoinError> {
391        if let Some(join_handle) = self.join_handle {
392            return Ok(join_handle);
393        }
394
395        if let Some(rx) = self.rx {
396            match rx.await {
397                Ok(join_handle) => return Ok(join_handle),
398                Err(_) => return Err(JoinErrorInner::Cancelled.into()),
399            }
400        }
401
402        Err(JoinErrorInner::Cancelled.into())
403    }
404}
405
406impl<T: 'static> Future for SpawnWithinJoinHandle<T> {
407    type Output = Result<T, JoinError>;
408    #[inline]
409    fn poll(
410        mut self: std::pin::Pin<&mut Self>,
411        cx: &mut std::task::Context<'_>,
412    ) -> std::task::Poll<Self::Output> {
413        if let Some(ref mut rx) = self.rx {
414            match std::pin::Pin::new(rx).poll(cx) {
415                std::task::Poll::Pending => return std::task::Poll::Pending,
416                std::task::Poll::Ready(Err(_)) => {
417                    self.rx = None;
418                    return std::task::Poll::Ready(Err(JoinErrorInner::Cancelled.into()));
419                }
420                std::task::Poll::Ready(Ok(join_handle)) => {
421                    self.rx = None;
422                    self.join_handle = Some(join_handle);
423                }
424            }
425        }
426
427        if let Some(ref mut join_handle) = self.join_handle {
428            match std::pin::Pin::new(join_handle).poll(cx) {
429                std::task::Poll::Pending => return std::task::Poll::Pending,
430                std::task::Poll::Ready(Err(e)) => {
431                    self.join_handle = None;
432                    return std::task::Poll::Ready(Err(e));
433                }
434                std::task::Poll::Ready(Ok(r)) => {
435                    self.join_handle = None;
436                    return std::task::Poll::Ready(Ok(r));
437                }
438            }
439        }
440
441        std::task::Poll::Ready(Err(JoinErrorInner::Cancelled.into()))
442    }
443}
444
445impl<T: 'static> futures_core::FusedFuture for SpawnWithinJoinHandle<T> {
446    #[inline]
447    fn is_terminated(&self) -> bool {
448        if let Some(ref rx) = self.rx {
449            rx.is_terminated()
450        } else if let Some(ref join_handle) = self.join_handle {
451            join_handle.is_terminated()
452        } else {
453            true
454        }
455    }
456}
457
458// rustdoc-stripper-ignore-next
459/// Task failure from awaiting a [`JoinHandle`].
460#[derive(Debug)]
461pub struct JoinError(JoinErrorInner);
462
463impl JoinError {
464    // rustdoc-stripper-ignore-next
465    /// Returns `true` if the handle was cancelled.
466    #[inline]
467    pub fn is_cancelled(&self) -> bool {
468        matches!(self.0, JoinErrorInner::Cancelled)
469    }
470    // rustdoc-stripper-ignore-next
471    /// Returns `true` if the task terminated with a panic.
472    #[inline]
473    pub fn is_panic(&self) -> bool {
474        matches!(self.0, JoinErrorInner::Panic(_))
475    }
476    // rustdoc-stripper-ignore-next
477    /// Converts the error into a panic result.
478    ///
479    /// # Panics
480    ///
481    /// Panics if the error is not a panic error. Use [`is_panic`](Self::is_panic) to check first
482    /// if the error represents a panic.
483    #[inline]
484    pub fn into_panic(self) -> Box<dyn Any + Send + 'static> {
485        self.try_into_panic()
486            .expect("`JoinError` is not a panic error")
487    }
488    // rustdoc-stripper-ignore-next
489    /// Attempts to convert the error into a panic result.
490    ///
491    /// Returns `Err(self)` if the error is not a panic result.
492    #[inline]
493    pub fn try_into_panic(self) -> Result<Box<dyn Any + Send + 'static>, Self> {
494        match self.0 {
495            JoinErrorInner::Panic(e) => Ok(e),
496            e => Err(Self(e)),
497        }
498    }
499}
500
501impl std::error::Error for JoinError {}
502
503impl std::fmt::Display for JoinError {
504    #[inline]
505    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
506        self.0.fmt(f)
507    }
508}
509
510#[derive(Debug)]
511enum JoinErrorInner {
512    Cancelled,
513    Panic(Box<dyn Any + Send + 'static>),
514}
515
516impl From<JoinErrorInner> for JoinError {
517    #[inline]
518    fn from(e: JoinErrorInner) -> Self {
519        Self(e)
520    }
521}
522
523impl std::error::Error for JoinErrorInner {}
524
525impl fmt::Display for JoinErrorInner {
526    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
527        match self {
528            Self::Cancelled => fmt.write_str("task cancelled"),
529            Self::Panic(_) => fmt.write_str("task panicked"),
530        }
531    }
532}
533
534impl MainContext {
535    // rustdoc-stripper-ignore-next
536    /// Spawn a new infallible `Future` on the main context.
537    ///
538    /// This can be called from any thread and will execute the future from the thread
539    /// where main context is running, e.g. via a `MainLoop`.
540    pub fn spawn<R: Send + 'static, F: Future<Output = R> + Send + 'static>(
541        &self,
542        f: F,
543    ) -> JoinHandle<R> {
544        self.spawn_with_priority(crate::Priority::default(), f)
545    }
546
547    // rustdoc-stripper-ignore-next
548    /// Spawn a new infallible `Future` on the main context.
549    ///
550    /// The given `Future` does not have to be `Send`.
551    ///
552    /// This can be called only from the thread where the main context is running, e.g.
553    /// from any other `Future` that is executed on this main context, or after calling
554    /// `with_thread_default` or `acquire` on the main context.
555    pub fn spawn_local<R: 'static, F: Future<Output = R> + 'static>(&self, f: F) -> JoinHandle<R> {
556        self.spawn_local_with_priority(crate::Priority::default(), f)
557    }
558
559    // rustdoc-stripper-ignore-next
560    /// Spawn a new infallible `Future` on the main context, with a non-default priority.
561    ///
562    /// This can be called from any thread and will execute the future from the thread
563    /// where main context is running, e.g. via a `MainLoop`.
564    pub fn spawn_with_priority<R: Send + 'static, F: Future<Output = R> + Send + 'static>(
565        &self,
566        priority: Priority,
567        f: F,
568    ) -> JoinHandle<R> {
569        let f = FutureObj::new(Box::new(async move {
570            Box::new(f.await) as Box<dyn Any + Send + 'static>
571        }));
572        let (tx, rx) = oneshot::channel();
573        let source = TaskSource::new(priority, FutureWrapper::Send(f), Some(tx));
574        JoinHandle::new(self, source, rx)
575    }
576
577    // rustdoc-stripper-ignore-next
578    /// Spawn a new infallible `Future` on the main context, with a non-default priority.
579    ///
580    /// The given `Future` does not have to be `Send`.
581    ///
582    /// This can be called only from the thread where the main context is running, e.g.
583    /// from any other `Future` that is executed on this main context, or after calling
584    /// `with_thread_default` or `acquire` on the main context.
585    pub fn spawn_local_with_priority<R: 'static, F: Future<Output = R> + 'static>(
586        &self,
587        priority: Priority,
588        f: F,
589    ) -> JoinHandle<R> {
590        let _acquire = self
591            .acquire()
592            .expect("Spawning local futures only allowed on the thread owning the MainContext");
593        let f = LocalFutureObj::new(Box::new(async move {
594            Box::new(f.await) as Box<dyn Any + 'static>
595        }));
596        let (tx, rx) = oneshot::channel();
597        let source = TaskSource::new(
598            priority,
599            FutureWrapper::NonSend(ThreadGuard::new(f)),
600            Some(tx),
601        );
602        JoinHandle::new(self, source, rx)
603    }
604
605    // rustdoc-stripper-ignore-next
606    /// Spawn a new infallible `Future` on the main context from inside the main context.
607    ///
608    /// The given `Future` does not have to be `Send` but the closure to spawn it has to be.
609    ///
610    /// This can be called only from any thread.
611    pub fn spawn_from_within<R: Send + 'static, F: Future<Output = R> + 'static>(
612        &self,
613        func: impl FnOnce() -> F + Send + 'static,
614    ) -> SpawnWithinJoinHandle<R> {
615        self.spawn_from_within_with_priority(crate::Priority::default(), func)
616    }
617
618    // rustdoc-stripper-ignore-next
619    /// Spawn a new infallible `Future` on the main context from inside the main context.
620    ///
621    /// The given `Future` does not have to be `Send` but the closure to spawn it has to be.
622    ///
623    /// This can be called only from any thread.
624    pub fn spawn_from_within_with_priority<R: Send + 'static, F: Future<Output = R> + 'static>(
625        &self,
626        priority: Priority,
627        func: impl FnOnce() -> F + Send + 'static,
628    ) -> SpawnWithinJoinHandle<R> {
629        let ctx = self.clone();
630        let (tx, rx) = oneshot::channel();
631        self.invoke_with_priority(priority, move || {
632            let _ = tx.send(ctx.spawn_local(func()));
633        });
634
635        SpawnWithinJoinHandle {
636            rx: Some(rx),
637            join_handle: None,
638        }
639    }
640
641    // rustdoc-stripper-ignore-next
642    /// Runs a new, infallible `Future` on the main context and block until it finished, returning
643    /// the result of the `Future`.
644    ///
645    /// The given `Future` does not have to be `Send` or `'static`.
646    ///
647    /// This must only be called if no `MainLoop` or anything else is running on this specific main
648    /// context.
649    #[allow(clippy::transmute_ptr_to_ptr)]
650    pub fn block_on<F: Future>(&self, f: F) -> F::Output {
651        let mut res = None;
652        let l = MainLoop::new(Some(self), false);
653
654        let f = async {
655            res = Some(panic::AssertUnwindSafe(f).catch_unwind().await);
656            l.quit();
657        };
658
659        let f = unsafe {
660            // Super-unsafe: We transmute here to get rid of the 'static lifetime
661            let f = LocalFutureObj::new(Box::new(async move {
662                f.await;
663                Box::new(()) as Box<dyn Any + 'static>
664            }));
665            let f: LocalFutureObj<'static, Box<dyn Any + 'static>> = mem::transmute(f);
666            f
667        };
668
669        let source = TaskSource::new(
670            crate::Priority::default(),
671            FutureWrapper::NonSend(ThreadGuard::new(f)),
672            None,
673        );
674        source.attach(Some(self));
675
676        l.run();
677
678        match res.unwrap() {
679            Ok(v) => v,
680            Err(e) => panic::resume_unwind(e),
681        }
682    }
683}
684
685impl Spawn for MainContext {
686    fn spawn_obj(&self, f: FutureObj<'static, ()>) -> Result<(), SpawnError> {
687        let (tx, _) = oneshot::channel();
688        let source = TaskSource::new(
689            crate::Priority::default(),
690            FutureWrapper::Send(FutureObj::new(Box::new(async move {
691                f.await;
692                Box::new(()) as Box<dyn Any + Send + 'static>
693            }))),
694            Some(tx),
695        );
696        source.attach(Some(self));
697        Ok(())
698    }
699}
700
701impl LocalSpawn for MainContext {
702    fn spawn_local_obj(&self, f: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> {
703        let (tx, _) = oneshot::channel();
704        let source = TaskSource::new(
705            crate::Priority::default(),
706            FutureWrapper::NonSend(ThreadGuard::new(LocalFutureObj::new(Box::new(
707                async move {
708                    f.await;
709                    Box::new(()) as Box<dyn Any + 'static>
710                },
711            )))),
712            Some(tx),
713        );
714        source.attach(Some(self));
715        Ok(())
716    }
717}
718
719#[cfg(test)]
720mod tests {
721    use std::{sync::mpsc, thread};
722
723    use futures_channel::oneshot;
724    use futures_util::future::{FutureExt, TryFutureExt};
725
726    use super::*;
727
728    #[test]
729    fn test_spawn() {
730        let c = MainContext::new();
731        let l = crate::MainLoop::new(Some(&c), false);
732
733        let (sender, receiver) = mpsc::channel();
734        let (o_sender, o_receiver) = oneshot::channel();
735
736        let l_clone = l.clone();
737        c.spawn(
738            o_receiver
739                .and_then(move |()| {
740                    sender.send(()).unwrap();
741                    l_clone.quit();
742
743                    futures_util::future::ok(())
744                })
745                .then(|res| {
746                    assert!(res.is_ok());
747                    futures_util::future::ready(())
748                }),
749        );
750
751        let join_handle = thread::spawn(move || {
752            l.run();
753        });
754
755        o_sender.send(()).unwrap();
756
757        receiver.recv().unwrap();
758
759        join_handle.join().unwrap();
760    }
761
762    #[test]
763    fn test_spawn_local() {
764        let c = MainContext::new();
765        let l = crate::MainLoop::new(Some(&c), false);
766
767        c.with_thread_default(|| {
768            let l_clone = l.clone();
769            c.spawn_local(futures_util::future::lazy(move |_ctx| {
770                l_clone.quit();
771            }));
772
773            l.run();
774        })
775        .unwrap();
776    }
777
778    #[test]
779    fn test_spawn_from_within() {
780        let c = MainContext::new();
781        let l = crate::MainLoop::new(Some(&c), false);
782
783        let join_handle = std::thread::spawn({
784            let l_clone = l.clone();
785            move || {
786                c.spawn_from_within(move || async move {
787                    let rc = std::rc::Rc::new(123);
788                    futures_util::future::ready(()).await;
789                    assert_eq!(std::rc::Rc::strong_count(&rc), 1);
790                    l_clone.quit();
791                });
792            }
793        });
794
795        l.run();
796
797        join_handle.join().unwrap();
798    }
799
800    #[test]
801    fn test_block_on() {
802        let c = MainContext::new();
803
804        let mut v = None;
805        {
806            let v = &mut v;
807
808            let future = futures_util::future::lazy(|_ctx| {
809                *v = Some(123);
810                Ok::<i32, ()>(123)
811            });
812
813            let res = c.block_on(future);
814            assert_eq!(res, Ok(123));
815        }
816
817        assert_eq!(v, Some(123));
818    }
819
820    #[test]
821    fn test_spawn_return() {
822        let c = MainContext::new();
823        c.block_on(async {
824            let val = 1;
825            let ret = c
826                .spawn(async move { futures_util::future::ready(2).await + val })
827                .await;
828            assert_eq!(ret.unwrap(), 3);
829        });
830    }
831
832    #[test]
833    fn test_spawn_panic() {
834        let c = MainContext::new();
835        c.block_on(async {
836            let ret = c
837                .spawn(async {
838                    panic!("failed");
839                })
840                .await;
841            assert_eq!(
842                *ret.unwrap_err().into_panic().downcast::<&str>().unwrap(),
843                "failed"
844            );
845        });
846    }
847
848    #[test]
849    fn test_spawn_abort() {
850        let c = MainContext::new();
851        let v = std::sync::Arc::new(1);
852        let v_clone = v.clone();
853        let c_ref = &c;
854        c.block_on(async move {
855            let handle = c_ref.spawn(async move {
856                let _v = v_clone;
857                let test: u128 = std::future::pending().await;
858                println!("{test}");
859                unreachable!();
860            });
861
862            handle.abort();
863        });
864        drop(c);
865
866        // Make sure the inner future is actually freed.
867        assert_eq!(std::sync::Arc::strong_count(&v), 1);
868    }
869}