glib/
main_context_futures.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{
4    any::Any, cell::Cell, fmt, marker::PhantomData, mem, num::NonZeroU32, panic, pin::Pin, ptr,
5    thread,
6};
7
8use futures_channel::oneshot;
9use futures_core::{
10    future::Future,
11    task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
12};
13use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError};
14use futures_util::FutureExt;
15
16use crate::{
17    ffi, thread_guard::ThreadGuard, translate::*, MainContext, MainLoop, Priority, Source, SourceId,
18};
19
20// Wrapper around Send Futures and non-Send Futures that will panic
21// if the non-Send Future is polled/dropped from a different thread
22// than where this was created.
23enum FutureWrapper {
24    Send(FutureObj<'static, Box<dyn Any + Send + 'static>>),
25    NonSend(ThreadGuard<LocalFutureObj<'static, Box<dyn Any + 'static>>>),
26}
27
28unsafe impl Send for FutureWrapper {}
29unsafe impl Sync for FutureWrapper {}
30
31impl Future for FutureWrapper {
32    type Output = Box<dyn Any + 'static>;
33
34    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
35        match self.get_mut() {
36            FutureWrapper::Send(fut) => {
37                Pin::new(fut).poll(ctx).map(|b| b as Box<dyn Any + 'static>)
38            }
39            FutureWrapper::NonSend(fut) => Pin::new(fut.get_mut()).poll(ctx),
40        }
41    }
42}
43
44// The TaskSource and WakerSource are split up as the TaskSource
45// must only be finalized on the thread that owns the main context,
46// but the WakerSource is passed around to arbitrary threads for
47// being able to wake up the TaskSource.
48//
49// The WakerSource is set up as a child source of the TaskSource, i.e.
50// whenever it is ready also the TaskSource is ready.
51#[repr(C)]
52struct TaskSource {
53    source: ffi::GSource,
54    future: FutureWrapper,
55    waker: Waker,
56    return_tx: Option<oneshot::Sender<thread::Result<Box<dyn Any + 'static>>>>,
57}
58
59#[repr(C)]
60struct WakerSource {
61    source: ffi::GSource,
62}
63
64impl TaskSource {
65    unsafe extern "C" fn dispatch(
66        source: *mut ffi::GSource,
67        callback: ffi::GSourceFunc,
68        _user_data: ffi::gpointer,
69    ) -> ffi::gboolean {
70        let source = &mut *(source as *mut Self);
71        debug_assert!(callback.is_none());
72
73        // Poll the TaskSource and ensure we're never called again if the
74        // contained Future resolved now.
75        if let Poll::Ready(()) = source.poll() {
76            ffi::G_SOURCE_REMOVE
77        } else {
78            ffi::G_SOURCE_CONTINUE
79        }
80    }
81
82    unsafe extern "C" fn finalize(source: *mut ffi::GSource) {
83        let source = source as *mut Self;
84
85        // This will panic if the future was a local future and is dropped from a different thread
86        // than where it was created so try to drop it from the main context if we're on another
87        // thread and the main context still exists.
88        //
89        // This can only really happen if the `Source` was manually retrieve from the context, but
90        // better safe than sorry.
91        match (*source).future {
92            FutureWrapper::Send(_) => {
93                ptr::drop_in_place(&mut (*source).future);
94            }
95            FutureWrapper::NonSend(ref mut future) if future.is_owner() => {
96                ptr::drop_in_place(&mut (*source).future);
97            }
98            FutureWrapper::NonSend(ref mut future) => {
99                let context = ffi::g_source_get_context(source as *mut ffi::GSource);
100                if !context.is_null() {
101                    let future = ptr::read(future);
102                    let context = MainContext::from_glib_none(context);
103                    context.invoke(move || {
104                        drop(future);
105                    });
106                } else {
107                    // This will panic
108                    ptr::drop_in_place(&mut (*source).future);
109                }
110            }
111        }
112
113        ptr::drop_in_place(&mut (*source).return_tx);
114
115        // Drop the waker to unref the underlying GSource
116        ptr::drop_in_place(&mut (*source).waker);
117    }
118}
119
120impl WakerSource {
121    unsafe fn clone_raw(waker: *const ()) -> RawWaker {
122        static VTABLE: RawWakerVTable = RawWakerVTable::new(
123            WakerSource::clone_raw,
124            WakerSource::wake_raw,
125            WakerSource::wake_by_ref_raw,
126            WakerSource::drop_raw,
127        );
128
129        let waker = waker as *const ffi::GSource;
130        ffi::g_source_ref(mut_override(waker));
131        RawWaker::new(waker as *const (), &VTABLE)
132    }
133
134    unsafe fn wake_raw(waker: *const ()) {
135        Self::wake_by_ref_raw(waker);
136        Self::drop_raw(waker);
137    }
138
139    unsafe fn wake_by_ref_raw(waker: *const ()) {
140        let waker = waker as *const ffi::GSource;
141        ffi::g_source_set_ready_time(mut_override(waker), 0);
142    }
143
144    unsafe fn drop_raw(waker: *const ()) {
145        let waker = waker as *const ffi::GSource;
146        ffi::g_source_unref(mut_override(waker));
147    }
148
149    unsafe extern "C" fn dispatch(
150        source: *mut ffi::GSource,
151        _callback: ffi::GSourceFunc,
152        _user_data: ffi::gpointer,
153    ) -> ffi::gboolean {
154        // Set ready-time to -1 so that we're not called again before
155        // being woken up another time.
156        ffi::g_source_set_ready_time(mut_override(source), -1);
157        ffi::G_SOURCE_CONTINUE
158    }
159}
160
161unsafe impl Send for TaskSource {}
162unsafe impl Sync for TaskSource {}
163
164unsafe impl Send for WakerSource {}
165unsafe impl Sync for WakerSource {}
166
167impl TaskSource {
168    #[allow(clippy::new_ret_no_self)]
169    // checker-ignore-item
170    fn new(
171        priority: Priority,
172        future: FutureWrapper,
173        return_tx: Option<oneshot::Sender<thread::Result<Box<dyn Any + 'static>>>>,
174    ) -> Source {
175        unsafe {
176            static TASK_SOURCE_FUNCS: ffi::GSourceFuncs = ffi::GSourceFuncs {
177                check: None,
178                prepare: None,
179                dispatch: Some(TaskSource::dispatch),
180                finalize: Some(TaskSource::finalize),
181                closure_callback: None,
182                closure_marshal: None,
183            };
184            static WAKER_SOURCE_FUNCS: ffi::GSourceFuncs = ffi::GSourceFuncs {
185                check: None,
186                prepare: None,
187                dispatch: Some(WakerSource::dispatch),
188                finalize: None,
189                closure_callback: None,
190                closure_marshal: None,
191            };
192
193            let source = ffi::g_source_new(
194                mut_override(&TASK_SOURCE_FUNCS),
195                mem::size_of::<Self>() as u32,
196            );
197
198            let waker_source = ffi::g_source_new(
199                mut_override(&WAKER_SOURCE_FUNCS),
200                mem::size_of::<WakerSource>() as u32,
201            );
202
203            ffi::g_source_set_priority(source, priority.into_glib());
204            ffi::g_source_add_child_source(source, waker_source);
205
206            {
207                let source = &mut *(source as *mut Self);
208                ptr::write(&mut source.future, future);
209                ptr::write(&mut source.return_tx, return_tx);
210
211                // This creates a new reference to the waker source.
212                let waker = Waker::from_raw(WakerSource::clone_raw(waker_source as *const ()));
213                ptr::write(&mut source.waker, waker);
214            }
215
216            // Set ready time to 0 so that the source is immediately dispatched
217            // for doing the initial polling. This will then either resolve the
218            // future or register the waker wherever necessary.
219            ffi::g_source_set_ready_time(waker_source, 0);
220
221            // Unref the waker source, a strong reference to it is stored inside
222            // the task source directly and inside the task source as child source.
223            ffi::g_source_unref(waker_source);
224
225            from_glib_full(source)
226        }
227    }
228
229    fn poll(&mut self) -> Poll<()> {
230        let source = &self.source as *const _;
231        let executor: Borrowed<MainContext> =
232            unsafe { from_glib_borrow(ffi::g_source_get_context(mut_override(source))) };
233
234        assert!(
235            executor.is_owner(),
236            "Polling futures only allowed if the thread is owning the MainContext"
237        );
238
239        executor
240            .with_thread_default(|| {
241                let _enter = futures_executor::enter().unwrap();
242                let mut context = Context::from_waker(&self.waker);
243
244                // This will panic if the future was a local future and is called from
245                // a different thread than where it was created.
246                if let Some(tx) = self.return_tx.take() {
247                    let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
248                        Pin::new(&mut self.future).poll(&mut context)
249                    }));
250                    match res {
251                        Ok(Poll::Ready(res)) => {
252                            let _ = tx.send(Ok(res));
253                            Poll::Ready(())
254                        }
255                        Ok(Poll::Pending) => {
256                            self.return_tx.replace(tx);
257                            Poll::Pending
258                        }
259                        Err(e) => {
260                            let _ = tx.send(Err(e));
261                            Poll::Ready(())
262                        }
263                    }
264                } else {
265                    Pin::new(&mut self.future).poll(&mut context).map(|_| ())
266                }
267            })
268            .expect("current thread is not owner of the main context")
269    }
270}
271
272// rustdoc-stripper-ignore-next
273/// A handle to a task running on a [`MainContext`].
274///
275/// Like [`std::thread::JoinHandle`] for a task rather than a thread. The return value from the
276/// task can be retrieved by awaiting on this object. Dropping the handle "detaches" the task,
277/// allowing it to complete but discarding the return value.
278#[derive(Debug)]
279pub struct JoinHandle<T> {
280    rx: oneshot::Receiver<std::thread::Result<Box<dyn Any + 'static>>>,
281    source: Source,
282    id: Cell<Option<NonZeroU32>>,
283    phantom: PhantomData<oneshot::Receiver<std::thread::Result<T>>>,
284}
285
286impl<T> JoinHandle<T> {
287    #[inline]
288    fn new(
289        ctx: &MainContext,
290        source: Source,
291        rx: oneshot::Receiver<std::thread::Result<Box<dyn Any + 'static>>>,
292    ) -> Self {
293        let id = source.attach(Some(ctx));
294        let id = Cell::new(Some(unsafe { NonZeroU32::new_unchecked(id.as_raw()) }));
295        Self {
296            rx,
297            source,
298            id,
299            phantom: PhantomData,
300        }
301    }
302    // rustdoc-stripper-ignore-next
303    /// Returns the internal source ID.
304    ///
305    /// Returns `None` if the handle was aborted already.
306    #[inline]
307    pub fn as_raw_source_id(&self) -> Option<u32> {
308        self.id.get().map(|i| i.get())
309    }
310    // rustdoc-stripper-ignore-next
311    /// Aborts the task associated with the handle.
312    #[inline]
313    pub fn abort(&self) {
314        self.source.destroy();
315        self.id.replace(None);
316    }
317    // rustdoc-stripper-ignore-next
318    /// Returns the [`Source`] associated with this handle.
319    #[inline]
320    pub fn source(&self) -> &Source {
321        &self.source
322    }
323    // rustdoc-stripper-ignore-next
324    /// Safely converts the handle into a [`SourceId`].
325    ///
326    /// Can be used to discard the return value while still retaining the ability to abort the
327    /// underlying task. Returns `Err(self)` if the handle was aborted already.
328    pub fn into_source_id(self) -> Result<SourceId, Self> {
329        if let Some(id) = self.id.take() {
330            Ok(unsafe { SourceId::from_glib(id.get()) })
331        } else {
332            Err(self)
333        }
334    }
335}
336
337impl<T: 'static> Future for JoinHandle<T> {
338    type Output = Result<T, JoinError>;
339    #[inline]
340    fn poll(
341        mut self: std::pin::Pin<&mut Self>,
342        cx: &mut std::task::Context<'_>,
343    ) -> std::task::Poll<Self::Output> {
344        std::pin::Pin::new(&mut self.rx).poll(cx).map(|r| match r {
345            Err(_) => Err(JoinErrorInner::Cancelled.into()),
346            Ok(Err(e)) => Err(JoinErrorInner::Panic(e).into()),
347            Ok(Ok(r)) => Ok(*r.downcast().unwrap()),
348        })
349    }
350}
351
352impl<T: 'static> futures_core::FusedFuture for JoinHandle<T> {
353    #[inline]
354    fn is_terminated(&self) -> bool {
355        self.rx.is_terminated()
356    }
357}
358
359// Safety: We can't rely on the auto implementation because we are retrieving
360// the result as a `Box<dyn Any + 'static>` from the [`Source`]. We need to
361// rely on type erasure here, so we have to manually assert the Send bound too.
362unsafe impl<T: Send> Send for JoinHandle<T> {}
363
364// rustdoc-stripper-ignore-next
365/// Variant of [`JoinHandle`] that is returned from [`MainContext::spawn_from_within`].
366#[derive(Debug)]
367pub struct SpawnWithinJoinHandle<T> {
368    rx: Option<oneshot::Receiver<JoinHandle<T>>>,
369    join_handle: Option<JoinHandle<T>>,
370}
371
372impl<T> SpawnWithinJoinHandle<T> {
373    // rustdoc-stripper-ignore-next
374    /// Waits until the task is spawned and returns the [`JoinHandle`].
375    pub async fn into_inner(self) -> Result<JoinHandle<T>, JoinError> {
376        if let Some(join_handle) = self.join_handle {
377            return Ok(join_handle);
378        }
379
380        if let Some(rx) = self.rx {
381            match rx.await {
382                Ok(join_handle) => return Ok(join_handle),
383                Err(_) => return Err(JoinErrorInner::Cancelled.into()),
384            }
385        }
386
387        Err(JoinErrorInner::Cancelled.into())
388    }
389}
390
391impl<T: 'static> Future for SpawnWithinJoinHandle<T> {
392    type Output = Result<T, JoinError>;
393    #[inline]
394    fn poll(
395        mut self: std::pin::Pin<&mut Self>,
396        cx: &mut std::task::Context<'_>,
397    ) -> std::task::Poll<Self::Output> {
398        if let Some(ref mut rx) = self.rx {
399            match std::pin::Pin::new(rx).poll(cx) {
400                std::task::Poll::Pending => return std::task::Poll::Pending,
401                std::task::Poll::Ready(Err(_)) => {
402                    self.rx = None;
403                    return std::task::Poll::Ready(Err(JoinErrorInner::Cancelled.into()));
404                }
405                std::task::Poll::Ready(Ok(join_handle)) => {
406                    self.rx = None;
407                    self.join_handle = Some(join_handle);
408                }
409            }
410        }
411
412        if let Some(ref mut join_handle) = self.join_handle {
413            match std::pin::Pin::new(join_handle).poll(cx) {
414                std::task::Poll::Pending => return std::task::Poll::Pending,
415                std::task::Poll::Ready(Err(e)) => {
416                    self.join_handle = None;
417                    return std::task::Poll::Ready(Err(e));
418                }
419                std::task::Poll::Ready(Ok(r)) => {
420                    self.join_handle = None;
421                    return std::task::Poll::Ready(Ok(r));
422                }
423            }
424        }
425
426        std::task::Poll::Ready(Err(JoinErrorInner::Cancelled.into()))
427    }
428}
429
430impl<T: 'static> futures_core::FusedFuture for SpawnWithinJoinHandle<T> {
431    #[inline]
432    fn is_terminated(&self) -> bool {
433        if let Some(ref rx) = self.rx {
434            rx.is_terminated()
435        } else if let Some(ref join_handle) = self.join_handle {
436            join_handle.is_terminated()
437        } else {
438            true
439        }
440    }
441}
442
443// rustdoc-stripper-ignore-next
444/// Task failure from awaiting a [`JoinHandle`].
445#[derive(Debug)]
446pub struct JoinError(JoinErrorInner);
447
448impl JoinError {
449    // rustdoc-stripper-ignore-next
450    /// Returns `true` if the handle was cancelled.
451    #[inline]
452    pub fn is_cancelled(&self) -> bool {
453        matches!(self.0, JoinErrorInner::Cancelled)
454    }
455    // rustdoc-stripper-ignore-next
456    /// Returns `true` if the task terminated with a panic.
457    #[inline]
458    pub fn is_panic(&self) -> bool {
459        matches!(self.0, JoinErrorInner::Panic(_))
460    }
461    // rustdoc-stripper-ignore-next
462    /// Converts the error into a panic result.
463    ///
464    /// # Panics
465    ///
466    /// Panics if the error is not a panic error. Use [`is_panic`](Self::is_panic) to check first
467    /// if the error represents a panic.
468    #[inline]
469    pub fn into_panic(self) -> Box<dyn Any + Send + 'static> {
470        self.try_into_panic()
471            .expect("`JoinError` is not a panic error")
472    }
473    // rustdoc-stripper-ignore-next
474    /// Attempts to convert the error into a panic result.
475    ///
476    /// Returns `Err(self)` if the error is not a panic result.
477    #[inline]
478    pub fn try_into_panic(self) -> Result<Box<dyn Any + Send + 'static>, Self> {
479        match self.0 {
480            JoinErrorInner::Panic(e) => Ok(e),
481            e => Err(Self(e)),
482        }
483    }
484}
485
486impl std::error::Error for JoinError {}
487
488impl std::fmt::Display for JoinError {
489    #[inline]
490    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
491        self.0.fmt(f)
492    }
493}
494
495#[derive(Debug)]
496enum JoinErrorInner {
497    Cancelled,
498    Panic(Box<dyn Any + Send + 'static>),
499}
500
501impl From<JoinErrorInner> for JoinError {
502    #[inline]
503    fn from(e: JoinErrorInner) -> Self {
504        Self(e)
505    }
506}
507
508impl std::error::Error for JoinErrorInner {}
509
510impl fmt::Display for JoinErrorInner {
511    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
512        match self {
513            Self::Cancelled => fmt.write_str("task cancelled"),
514            Self::Panic(_) => fmt.write_str("task panicked"),
515        }
516    }
517}
518
519impl MainContext {
520    // rustdoc-stripper-ignore-next
521    /// Spawn a new infallible `Future` on the main context.
522    ///
523    /// This can be called from any thread and will execute the future from the thread
524    /// where main context is running, e.g. via a `MainLoop`.
525    pub fn spawn<R: Send + 'static, F: Future<Output = R> + Send + 'static>(
526        &self,
527        f: F,
528    ) -> JoinHandle<R> {
529        self.spawn_with_priority(crate::Priority::default(), f)
530    }
531
532    // rustdoc-stripper-ignore-next
533    /// Spawn a new infallible `Future` on the main context.
534    ///
535    /// The given `Future` does not have to be `Send`.
536    ///
537    /// This can be called only from the thread where the main context is running, e.g.
538    /// from any other `Future` that is executed on this main context, or after calling
539    /// `with_thread_default` or `acquire` on the main context.
540    pub fn spawn_local<R: 'static, F: Future<Output = R> + 'static>(&self, f: F) -> JoinHandle<R> {
541        self.spawn_local_with_priority(crate::Priority::default(), f)
542    }
543
544    // rustdoc-stripper-ignore-next
545    /// Spawn a new infallible `Future` on the main context, with a non-default priority.
546    ///
547    /// This can be called from any thread and will execute the future from the thread
548    /// where main context is running, e.g. via a `MainLoop`.
549    pub fn spawn_with_priority<R: Send + 'static, F: Future<Output = R> + Send + 'static>(
550        &self,
551        priority: Priority,
552        f: F,
553    ) -> JoinHandle<R> {
554        let f = FutureObj::new(Box::new(async move {
555            Box::new(f.await) as Box<dyn Any + Send + 'static>
556        }));
557        let (tx, rx) = oneshot::channel();
558        let source = TaskSource::new(priority, FutureWrapper::Send(f), Some(tx));
559        JoinHandle::new(self, source, rx)
560    }
561
562    // rustdoc-stripper-ignore-next
563    /// Spawn a new infallible `Future` on the main context, with a non-default priority.
564    ///
565    /// The given `Future` does not have to be `Send`.
566    ///
567    /// This can be called only from the thread where the main context is running, e.g.
568    /// from any other `Future` that is executed on this main context, or after calling
569    /// `with_thread_default` or `acquire` on the main context.
570    pub fn spawn_local_with_priority<R: 'static, F: Future<Output = R> + 'static>(
571        &self,
572        priority: Priority,
573        f: F,
574    ) -> JoinHandle<R> {
575        let _acquire = self
576            .acquire()
577            .expect("Spawning local futures only allowed on the thread owning the MainContext");
578        let f = LocalFutureObj::new(Box::new(async move {
579            Box::new(f.await) as Box<dyn Any + 'static>
580        }));
581        let (tx, rx) = oneshot::channel();
582        let source = TaskSource::new(
583            priority,
584            FutureWrapper::NonSend(ThreadGuard::new(f)),
585            Some(tx),
586        );
587        JoinHandle::new(self, source, rx)
588    }
589
590    // rustdoc-stripper-ignore-next
591    /// Spawn a new infallible `Future` on the main context from inside the main context.
592    ///
593    /// The given `Future` does not have to be `Send` but the closure to spawn it has to be.
594    ///
595    /// This can be called only from any thread.
596    pub fn spawn_from_within<R: Send + 'static, F: Future<Output = R> + 'static>(
597        &self,
598        func: impl FnOnce() -> F + Send + 'static,
599    ) -> SpawnWithinJoinHandle<R> {
600        self.spawn_from_within_with_priority(crate::Priority::default(), func)
601    }
602
603    // rustdoc-stripper-ignore-next
604    /// Spawn a new infallible `Future` on the main context from inside the main context.
605    ///
606    /// The given `Future` does not have to be `Send` but the closure to spawn it has to be.
607    ///
608    /// This can be called only from any thread.
609    pub fn spawn_from_within_with_priority<R: Send + 'static, F: Future<Output = R> + 'static>(
610        &self,
611        priority: Priority,
612        func: impl FnOnce() -> F + Send + 'static,
613    ) -> SpawnWithinJoinHandle<R> {
614        let ctx = self.clone();
615        let (tx, rx) = oneshot::channel();
616        self.invoke_with_priority(priority, move || {
617            let _ = tx.send(ctx.spawn_local(func()));
618        });
619
620        SpawnWithinJoinHandle {
621            rx: Some(rx),
622            join_handle: None,
623        }
624    }
625
626    // rustdoc-stripper-ignore-next
627    /// Runs a new, infallible `Future` on the main context and block until it finished, returning
628    /// the result of the `Future`.
629    ///
630    /// The given `Future` does not have to be `Send` or `'static`.
631    ///
632    /// This must only be called if no `MainLoop` or anything else is running on this specific main
633    /// context.
634    #[allow(clippy::transmute_ptr_to_ptr)]
635    pub fn block_on<F: Future>(&self, f: F) -> F::Output {
636        let mut res = None;
637        let l = MainLoop::new(Some(self), false);
638
639        let f = async {
640            res = Some(panic::AssertUnwindSafe(f).catch_unwind().await);
641            l.quit();
642        };
643
644        let f = unsafe {
645            // Super-unsafe: We transmute here to get rid of the 'static lifetime
646            let f = LocalFutureObj::new(Box::new(async move {
647                f.await;
648                Box::new(()) as Box<dyn Any + 'static>
649            }));
650            let f: LocalFutureObj<'static, Box<dyn Any + 'static>> = mem::transmute(f);
651            f
652        };
653
654        let source = TaskSource::new(
655            crate::Priority::default(),
656            FutureWrapper::NonSend(ThreadGuard::new(f)),
657            None,
658        );
659        source.attach(Some(self));
660
661        l.run();
662
663        match res.unwrap() {
664            Ok(v) => v,
665            Err(e) => panic::resume_unwind(e),
666        }
667    }
668}
669
670impl Spawn for MainContext {
671    fn spawn_obj(&self, f: FutureObj<'static, ()>) -> Result<(), SpawnError> {
672        let (tx, _) = oneshot::channel();
673        let source = TaskSource::new(
674            crate::Priority::default(),
675            FutureWrapper::Send(FutureObj::new(Box::new(async move {
676                f.await;
677                Box::new(()) as Box<dyn Any + Send + 'static>
678            }))),
679            Some(tx),
680        );
681        source.attach(Some(self));
682        Ok(())
683    }
684}
685
686impl LocalSpawn for MainContext {
687    fn spawn_local_obj(&self, f: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> {
688        let (tx, _) = oneshot::channel();
689        let source = TaskSource::new(
690            crate::Priority::default(),
691            FutureWrapper::NonSend(ThreadGuard::new(LocalFutureObj::new(Box::new(
692                async move {
693                    f.await;
694                    Box::new(()) as Box<dyn Any + 'static>
695                },
696            )))),
697            Some(tx),
698        );
699        source.attach(Some(self));
700        Ok(())
701    }
702}
703
704#[cfg(test)]
705mod tests {
706    use std::{sync::mpsc, thread};
707
708    use futures_channel::oneshot;
709    use futures_util::future::{FutureExt, TryFutureExt};
710
711    use super::*;
712
713    #[test]
714    fn test_spawn() {
715        let c = MainContext::new();
716        let l = crate::MainLoop::new(Some(&c), false);
717
718        let (sender, receiver) = mpsc::channel();
719        let (o_sender, o_receiver) = oneshot::channel();
720
721        let l_clone = l.clone();
722        c.spawn(
723            o_receiver
724                .and_then(move |()| {
725                    sender.send(()).unwrap();
726                    l_clone.quit();
727
728                    futures_util::future::ok(())
729                })
730                .then(|res| {
731                    assert!(res.is_ok());
732                    futures_util::future::ready(())
733                }),
734        );
735
736        let join_handle = thread::spawn(move || {
737            l.run();
738        });
739
740        o_sender.send(()).unwrap();
741
742        receiver.recv().unwrap();
743
744        join_handle.join().unwrap();
745    }
746
747    #[test]
748    fn test_spawn_local() {
749        let c = MainContext::new();
750        let l = crate::MainLoop::new(Some(&c), false);
751
752        c.with_thread_default(|| {
753            let l_clone = l.clone();
754            c.spawn_local(futures_util::future::lazy(move |_ctx| {
755                l_clone.quit();
756            }));
757
758            l.run();
759        })
760        .unwrap();
761    }
762
763    #[test]
764    fn test_spawn_from_within() {
765        let c = MainContext::new();
766        let l = crate::MainLoop::new(Some(&c), false);
767
768        let join_handle = std::thread::spawn({
769            let l_clone = l.clone();
770            move || {
771                c.spawn_from_within(move || async move {
772                    let rc = std::rc::Rc::new(123);
773                    futures_util::future::ready(()).await;
774                    assert_eq!(std::rc::Rc::strong_count(&rc), 1);
775                    l_clone.quit();
776                });
777            }
778        });
779
780        l.run();
781
782        join_handle.join().unwrap();
783    }
784
785    #[test]
786    fn test_block_on() {
787        let c = MainContext::new();
788
789        let mut v = None;
790        {
791            let v = &mut v;
792
793            let future = futures_util::future::lazy(|_ctx| {
794                *v = Some(123);
795                Ok::<i32, ()>(123)
796            });
797
798            let res = c.block_on(future);
799            assert_eq!(res, Ok(123));
800        }
801
802        assert_eq!(v, Some(123));
803    }
804
805    #[test]
806    fn test_spawn_return() {
807        let c = MainContext::new();
808        c.block_on(async {
809            let val = 1;
810            let ret = c
811                .spawn(async move { futures_util::future::ready(2).await + val })
812                .await;
813            assert_eq!(ret.unwrap(), 3);
814        });
815    }
816
817    #[test]
818    fn test_spawn_panic() {
819        let c = MainContext::new();
820        c.block_on(async {
821            let ret = c
822                .spawn(async {
823                    panic!("failed");
824                })
825                .await;
826            assert_eq!(
827                *ret.unwrap_err().into_panic().downcast::<&str>().unwrap(),
828                "failed"
829            );
830        });
831    }
832
833    #[test]
834    fn test_spawn_abort() {
835        let c = MainContext::new();
836        let v = std::sync::Arc::new(1);
837        let v_clone = v.clone();
838        let c_ref = &c;
839        c.block_on(async move {
840            let handle = c_ref.spawn(async move {
841                let _v = v_clone;
842                let test: u128 = std::future::pending().await;
843                println!("{test}");
844                unreachable!();
845            });
846
847            handle.abort();
848        });
849        drop(c);
850
851        // Make sure the inner future is actually freed.
852        assert_eq!(std::sync::Arc::strong_count(&v), 1);
853    }
854}