1use std::{
4 any::Any, cell::Cell, fmt, marker::PhantomData, mem, num::NonZeroU32, panic, pin::Pin, ptr,
5 thread,
6};
7
8use futures_channel::oneshot;
9use futures_core::{
10 future::Future,
11 task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
12};
13use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError};
14use futures_util::FutureExt;
15
16use crate::{
17 MainContext, MainLoop, Priority, Source, SourceId, ffi, thread_guard::ThreadGuard, translate::*,
18};
19
20enum FutureWrapper {
24 Send(FutureObj<'static, Box<dyn Any + Send + 'static>>),
25 NonSend(ThreadGuard<LocalFutureObj<'static, Box<dyn Any + 'static>>>),
26}
27
28unsafe impl Send for FutureWrapper {}
29unsafe impl Sync for FutureWrapper {}
30
31impl Future for FutureWrapper {
32 type Output = Box<dyn Any + 'static>;
33
34 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
35 match self.get_mut() {
36 FutureWrapper::Send(fut) => {
37 Pin::new(fut).poll(ctx).map(|b| b as Box<dyn Any + 'static>)
38 }
39 FutureWrapper::NonSend(fut) => Pin::new(fut.get_mut()).poll(ctx),
40 }
41 }
42}
43
44#[repr(C)]
52struct TaskSource {
53 source: ffi::GSource,
54 future: FutureWrapper,
55 waker: Waker,
56 return_tx: Option<oneshot::Sender<thread::Result<Box<dyn Any + 'static>>>>,
57}
58
59#[repr(C)]
60struct WakerSource {
61 source: ffi::GSource,
62}
63
64impl TaskSource {
65 unsafe extern "C" fn dispatch(
66 source: *mut ffi::GSource,
67 callback: ffi::GSourceFunc,
68 _user_data: ffi::gpointer,
69 ) -> ffi::gboolean {
70 unsafe {
71 let source = &mut *(source as *mut Self);
72 debug_assert!(callback.is_none());
73
74 if let Poll::Ready(()) = source.poll() {
77 ffi::G_SOURCE_REMOVE
78 } else {
79 ffi::G_SOURCE_CONTINUE
80 }
81 }
82 }
83
84 unsafe extern "C" fn finalize(source: *mut ffi::GSource) {
85 unsafe {
86 let source = source as *mut Self;
87
88 match (*source).future {
95 FutureWrapper::Send(_) => {
96 ptr::drop_in_place(&mut (*source).future);
97 }
98 FutureWrapper::NonSend(ref mut future) if future.is_owner() => {
99 ptr::drop_in_place(&mut (*source).future);
100 }
101 FutureWrapper::NonSend(ref mut future) => {
102 let context = ffi::g_source_get_context(source as *mut ffi::GSource);
103 if !context.is_null() {
104 let future = ptr::read(future);
105 let context = MainContext::from_glib_none(context);
106 context.invoke(move || {
107 drop(future);
108 });
109 } else {
110 ptr::drop_in_place(&mut (*source).future);
112 }
113 }
114 }
115
116 ptr::drop_in_place(&mut (*source).return_tx);
117
118 ptr::drop_in_place(&mut (*source).waker);
120 }
121 }
122}
123
124impl WakerSource {
125 unsafe fn clone_raw(waker: *const ()) -> RawWaker {
126 unsafe {
127 static VTABLE: RawWakerVTable = RawWakerVTable::new(
128 WakerSource::clone_raw,
129 WakerSource::wake_raw,
130 WakerSource::wake_by_ref_raw,
131 WakerSource::drop_raw,
132 );
133
134 let waker = waker as *const ffi::GSource;
135 ffi::g_source_ref(mut_override(waker));
136 RawWaker::new(waker as *const (), &VTABLE)
137 }
138 }
139
140 unsafe fn wake_raw(waker: *const ()) {
141 unsafe {
142 Self::wake_by_ref_raw(waker);
143 Self::drop_raw(waker);
144 }
145 }
146
147 unsafe fn wake_by_ref_raw(waker: *const ()) {
148 unsafe {
149 let waker = waker as *const ffi::GSource;
150 ffi::g_source_set_ready_time(mut_override(waker), 0);
151 }
152 }
153
154 unsafe fn drop_raw(waker: *const ()) {
155 unsafe {
156 let waker = waker as *const ffi::GSource;
157 ffi::g_source_unref(mut_override(waker));
158 }
159 }
160
161 unsafe extern "C" fn dispatch(
162 source: *mut ffi::GSource,
163 _callback: ffi::GSourceFunc,
164 _user_data: ffi::gpointer,
165 ) -> ffi::gboolean {
166 unsafe {
167 ffi::g_source_set_ready_time(mut_override(source), -1);
170 ffi::G_SOURCE_CONTINUE
171 }
172 }
173}
174
175unsafe impl Send for TaskSource {}
176unsafe impl Sync for TaskSource {}
177
178unsafe impl Send for WakerSource {}
179unsafe impl Sync for WakerSource {}
180
181impl TaskSource {
182 #[allow(clippy::new_ret_no_self)]
183 fn new(
185 priority: Priority,
186 future: FutureWrapper,
187 return_tx: Option<oneshot::Sender<thread::Result<Box<dyn Any + 'static>>>>,
188 ) -> Source {
189 unsafe {
190 static TASK_SOURCE_FUNCS: ffi::GSourceFuncs = ffi::GSourceFuncs {
191 check: None,
192 prepare: None,
193 dispatch: Some(TaskSource::dispatch),
194 finalize: Some(TaskSource::finalize),
195 closure_callback: None,
196 closure_marshal: None,
197 };
198 static WAKER_SOURCE_FUNCS: ffi::GSourceFuncs = ffi::GSourceFuncs {
199 check: None,
200 prepare: None,
201 dispatch: Some(WakerSource::dispatch),
202 finalize: None,
203 closure_callback: None,
204 closure_marshal: None,
205 };
206
207 let source = ffi::g_source_new(
208 mut_override(&TASK_SOURCE_FUNCS),
209 mem::size_of::<Self>() as u32,
210 );
211
212 let waker_source = ffi::g_source_new(
213 mut_override(&WAKER_SOURCE_FUNCS),
214 mem::size_of::<WakerSource>() as u32,
215 );
216
217 ffi::g_source_set_priority(source, priority.into_glib());
218 ffi::g_source_add_child_source(source, waker_source);
219
220 {
221 let source = &mut *(source as *mut Self);
222 ptr::write(&mut source.future, future);
223 ptr::write(&mut source.return_tx, return_tx);
224
225 let waker = Waker::from_raw(WakerSource::clone_raw(waker_source as *const ()));
227 ptr::write(&mut source.waker, waker);
228 }
229
230 ffi::g_source_set_ready_time(waker_source, 0);
234
235 ffi::g_source_unref(waker_source);
238
239 from_glib_full(source)
240 }
241 }
242
243 fn poll(&mut self) -> Poll<()> {
244 let source = &self.source as *const _;
245 let executor: Borrowed<MainContext> =
246 unsafe { from_glib_borrow(ffi::g_source_get_context(mut_override(source))) };
247
248 assert!(
249 executor.is_owner(),
250 "Polling futures only allowed if the thread is owning the MainContext"
251 );
252
253 executor
254 .with_thread_default(|| {
255 let _enter = futures_executor::enter().unwrap();
256 let mut context = Context::from_waker(&self.waker);
257
258 match self.return_tx.take() {
261 Some(tx) => {
262 let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
263 Pin::new(&mut self.future).poll(&mut context)
264 }));
265 match res {
266 Ok(Poll::Ready(res)) => {
267 let _ = tx.send(Ok(res));
268 Poll::Ready(())
269 }
270 Ok(Poll::Pending) => {
271 self.return_tx.replace(tx);
272 Poll::Pending
273 }
274 Err(e) => {
275 let _ = tx.send(Err(e));
276 Poll::Ready(())
277 }
278 }
279 }
280 _ => Pin::new(&mut self.future).poll(&mut context).map(|_| ()),
281 }
282 })
283 .expect("current thread is not owner of the main context")
284 }
285}
286
287#[derive(Debug)]
294pub struct JoinHandle<T> {
295 rx: oneshot::Receiver<std::thread::Result<Box<dyn Any + 'static>>>,
296 source: Source,
297 id: Cell<Option<NonZeroU32>>,
298 phantom: PhantomData<oneshot::Receiver<std::thread::Result<T>>>,
299}
300
301impl<T> JoinHandle<T> {
302 #[inline]
303 fn new(
304 ctx: &MainContext,
305 source: Source,
306 rx: oneshot::Receiver<std::thread::Result<Box<dyn Any + 'static>>>,
307 ) -> Self {
308 let id = source.attach(Some(ctx));
309 let id = Cell::new(Some(unsafe { NonZeroU32::new_unchecked(id.as_raw()) }));
310 Self {
311 rx,
312 source,
313 id,
314 phantom: PhantomData,
315 }
316 }
317 #[inline]
322 pub fn as_raw_source_id(&self) -> Option<u32> {
323 self.id.get().map(|i| i.get())
324 }
325 #[inline]
328 pub fn abort(&self) {
329 self.source.destroy();
330 self.id.replace(None);
331 }
332 #[inline]
335 pub fn source(&self) -> &Source {
336 &self.source
337 }
338 pub fn into_source_id(self) -> Result<SourceId, Self> {
344 if let Some(id) = self.id.take() {
345 Ok(unsafe { SourceId::from_glib(id.get()) })
346 } else {
347 Err(self)
348 }
349 }
350}
351
352impl<T: 'static> Future for JoinHandle<T> {
353 type Output = Result<T, JoinError>;
354 #[inline]
355 fn poll(
356 mut self: std::pin::Pin<&mut Self>,
357 cx: &mut std::task::Context<'_>,
358 ) -> std::task::Poll<Self::Output> {
359 std::pin::Pin::new(&mut self.rx).poll(cx).map(|r| match r {
360 Err(_) => Err(JoinErrorInner::Cancelled.into()),
361 Ok(Err(e)) => Err(JoinErrorInner::Panic(e).into()),
362 Ok(Ok(r)) => Ok(*r.downcast().unwrap()),
363 })
364 }
365}
366
367impl<T: 'static> futures_core::FusedFuture for JoinHandle<T> {
368 #[inline]
369 fn is_terminated(&self) -> bool {
370 self.rx.is_terminated()
371 }
372}
373
374unsafe impl<T: Send> Send for JoinHandle<T> {}
378
379#[derive(Debug)]
382pub struct SpawnWithinJoinHandle<T> {
383 rx: Option<oneshot::Receiver<JoinHandle<T>>>,
384 join_handle: Option<JoinHandle<T>>,
385}
386
387impl<T> SpawnWithinJoinHandle<T> {
388 pub async fn into_inner(self) -> Result<JoinHandle<T>, JoinError> {
391 if let Some(join_handle) = self.join_handle {
392 return Ok(join_handle);
393 }
394
395 if let Some(rx) = self.rx {
396 match rx.await {
397 Ok(join_handle) => return Ok(join_handle),
398 Err(_) => return Err(JoinErrorInner::Cancelled.into()),
399 }
400 }
401
402 Err(JoinErrorInner::Cancelled.into())
403 }
404}
405
406impl<T: 'static> Future for SpawnWithinJoinHandle<T> {
407 type Output = Result<T, JoinError>;
408 #[inline]
409 fn poll(
410 mut self: std::pin::Pin<&mut Self>,
411 cx: &mut std::task::Context<'_>,
412 ) -> std::task::Poll<Self::Output> {
413 if let Some(ref mut rx) = self.rx {
414 match std::pin::Pin::new(rx).poll(cx) {
415 std::task::Poll::Pending => return std::task::Poll::Pending,
416 std::task::Poll::Ready(Err(_)) => {
417 self.rx = None;
418 return std::task::Poll::Ready(Err(JoinErrorInner::Cancelled.into()));
419 }
420 std::task::Poll::Ready(Ok(join_handle)) => {
421 self.rx = None;
422 self.join_handle = Some(join_handle);
423 }
424 }
425 }
426
427 if let Some(ref mut join_handle) = self.join_handle {
428 match std::pin::Pin::new(join_handle).poll(cx) {
429 std::task::Poll::Pending => return std::task::Poll::Pending,
430 std::task::Poll::Ready(Err(e)) => {
431 self.join_handle = None;
432 return std::task::Poll::Ready(Err(e));
433 }
434 std::task::Poll::Ready(Ok(r)) => {
435 self.join_handle = None;
436 return std::task::Poll::Ready(Ok(r));
437 }
438 }
439 }
440
441 std::task::Poll::Ready(Err(JoinErrorInner::Cancelled.into()))
442 }
443}
444
445impl<T: 'static> futures_core::FusedFuture for SpawnWithinJoinHandle<T> {
446 #[inline]
447 fn is_terminated(&self) -> bool {
448 if let Some(ref rx) = self.rx {
449 rx.is_terminated()
450 } else if let Some(ref join_handle) = self.join_handle {
451 join_handle.is_terminated()
452 } else {
453 true
454 }
455 }
456}
457
458#[derive(Debug)]
461pub struct JoinError(JoinErrorInner);
462
463impl JoinError {
464 #[inline]
467 pub fn is_cancelled(&self) -> bool {
468 matches!(self.0, JoinErrorInner::Cancelled)
469 }
470 #[inline]
473 pub fn is_panic(&self) -> bool {
474 matches!(self.0, JoinErrorInner::Panic(_))
475 }
476 #[inline]
484 pub fn into_panic(self) -> Box<dyn Any + Send + 'static> {
485 self.try_into_panic()
486 .expect("`JoinError` is not a panic error")
487 }
488 #[inline]
493 pub fn try_into_panic(self) -> Result<Box<dyn Any + Send + 'static>, Self> {
494 match self.0 {
495 JoinErrorInner::Panic(e) => Ok(e),
496 e => Err(Self(e)),
497 }
498 }
499}
500
501impl std::error::Error for JoinError {}
502
503impl std::fmt::Display for JoinError {
504 #[inline]
505 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
506 self.0.fmt(f)
507 }
508}
509
510#[derive(Debug)]
511enum JoinErrorInner {
512 Cancelled,
513 Panic(Box<dyn Any + Send + 'static>),
514}
515
516impl From<JoinErrorInner> for JoinError {
517 #[inline]
518 fn from(e: JoinErrorInner) -> Self {
519 Self(e)
520 }
521}
522
523impl std::error::Error for JoinErrorInner {}
524
525impl fmt::Display for JoinErrorInner {
526 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
527 match self {
528 Self::Cancelled => fmt.write_str("task cancelled"),
529 Self::Panic(_) => fmt.write_str("task panicked"),
530 }
531 }
532}
533
534impl MainContext {
535 pub fn spawn<R: Send + 'static, F: Future<Output = R> + Send + 'static>(
541 &self,
542 f: F,
543 ) -> JoinHandle<R> {
544 self.spawn_with_priority(crate::Priority::default(), f)
545 }
546
547 pub fn spawn_local<R: 'static, F: Future<Output = R> + 'static>(&self, f: F) -> JoinHandle<R> {
556 self.spawn_local_with_priority(crate::Priority::default(), f)
557 }
558
559 pub fn spawn_with_priority<R: Send + 'static, F: Future<Output = R> + Send + 'static>(
565 &self,
566 priority: Priority,
567 f: F,
568 ) -> JoinHandle<R> {
569 let f = FutureObj::new(Box::new(async move {
570 Box::new(f.await) as Box<dyn Any + Send + 'static>
571 }));
572 let (tx, rx) = oneshot::channel();
573 let source = TaskSource::new(priority, FutureWrapper::Send(f), Some(tx));
574 JoinHandle::new(self, source, rx)
575 }
576
577 pub fn spawn_local_with_priority<R: 'static, F: Future<Output = R> + 'static>(
586 &self,
587 priority: Priority,
588 f: F,
589 ) -> JoinHandle<R> {
590 let _acquire = self
591 .acquire()
592 .expect("Spawning local futures only allowed on the thread owning the MainContext");
593 let f = LocalFutureObj::new(Box::new(async move {
594 Box::new(f.await) as Box<dyn Any + 'static>
595 }));
596 let (tx, rx) = oneshot::channel();
597 let source = TaskSource::new(
598 priority,
599 FutureWrapper::NonSend(ThreadGuard::new(f)),
600 Some(tx),
601 );
602 JoinHandle::new(self, source, rx)
603 }
604
605 pub fn spawn_from_within<R: Send + 'static, F: Future<Output = R> + 'static>(
612 &self,
613 func: impl FnOnce() -> F + Send + 'static,
614 ) -> SpawnWithinJoinHandle<R> {
615 self.spawn_from_within_with_priority(crate::Priority::default(), func)
616 }
617
618 pub fn spawn_from_within_with_priority<R: Send + 'static, F: Future<Output = R> + 'static>(
625 &self,
626 priority: Priority,
627 func: impl FnOnce() -> F + Send + 'static,
628 ) -> SpawnWithinJoinHandle<R> {
629 let ctx = self.clone();
630 let (tx, rx) = oneshot::channel();
631 self.invoke_with_priority(priority, move || {
632 let _ = tx.send(ctx.spawn_local(func()));
633 });
634
635 SpawnWithinJoinHandle {
636 rx: Some(rx),
637 join_handle: None,
638 }
639 }
640
641 #[allow(clippy::transmute_ptr_to_ptr)]
650 pub fn block_on<F: Future>(&self, f: F) -> F::Output {
651 let mut res = None;
652 let l = MainLoop::new(Some(self), false);
653
654 let f = async {
655 res = Some(panic::AssertUnwindSafe(f).catch_unwind().await);
656 l.quit();
657 };
658
659 let f = unsafe {
660 let f = LocalFutureObj::new(Box::new(async move {
662 f.await;
663 Box::new(()) as Box<dyn Any + 'static>
664 }));
665 let f: LocalFutureObj<'static, Box<dyn Any + 'static>> = mem::transmute(f);
666 f
667 };
668
669 let source = TaskSource::new(
670 crate::Priority::default(),
671 FutureWrapper::NonSend(ThreadGuard::new(f)),
672 None,
673 );
674 source.attach(Some(self));
675
676 l.run();
677
678 match res.unwrap() {
679 Ok(v) => v,
680 Err(e) => panic::resume_unwind(e),
681 }
682 }
683}
684
685impl Spawn for MainContext {
686 fn spawn_obj(&self, f: FutureObj<'static, ()>) -> Result<(), SpawnError> {
687 let (tx, _) = oneshot::channel();
688 let source = TaskSource::new(
689 crate::Priority::default(),
690 FutureWrapper::Send(FutureObj::new(Box::new(async move {
691 f.await;
692 Box::new(()) as Box<dyn Any + Send + 'static>
693 }))),
694 Some(tx),
695 );
696 source.attach(Some(self));
697 Ok(())
698 }
699}
700
701impl LocalSpawn for MainContext {
702 fn spawn_local_obj(&self, f: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> {
703 let (tx, _) = oneshot::channel();
704 let source = TaskSource::new(
705 crate::Priority::default(),
706 FutureWrapper::NonSend(ThreadGuard::new(LocalFutureObj::new(Box::new(
707 async move {
708 f.await;
709 Box::new(()) as Box<dyn Any + 'static>
710 },
711 )))),
712 Some(tx),
713 );
714 source.attach(Some(self));
715 Ok(())
716 }
717}
718
719#[cfg(test)]
720mod tests {
721 use std::{sync::mpsc, thread};
722
723 use futures_channel::oneshot;
724 use futures_util::future::{FutureExt, TryFutureExt};
725
726 use super::*;
727
728 #[test]
729 fn test_spawn() {
730 let c = MainContext::new();
731 let l = crate::MainLoop::new(Some(&c), false);
732
733 let (sender, receiver) = mpsc::channel();
734 let (o_sender, o_receiver) = oneshot::channel();
735
736 let l_clone = l.clone();
737 c.spawn(
738 o_receiver
739 .and_then(move |()| {
740 sender.send(()).unwrap();
741 l_clone.quit();
742
743 futures_util::future::ok(())
744 })
745 .then(|res| {
746 assert!(res.is_ok());
747 futures_util::future::ready(())
748 }),
749 );
750
751 let join_handle = thread::spawn(move || {
752 l.run();
753 });
754
755 o_sender.send(()).unwrap();
756
757 receiver.recv().unwrap();
758
759 join_handle.join().unwrap();
760 }
761
762 #[test]
763 fn test_spawn_local() {
764 let c = MainContext::new();
765 let l = crate::MainLoop::new(Some(&c), false);
766
767 c.with_thread_default(|| {
768 let l_clone = l.clone();
769 c.spawn_local(futures_util::future::lazy(move |_ctx| {
770 l_clone.quit();
771 }));
772
773 l.run();
774 })
775 .unwrap();
776 }
777
778 #[test]
779 fn test_spawn_from_within() {
780 let c = MainContext::new();
781 let l = crate::MainLoop::new(Some(&c), false);
782
783 let join_handle = std::thread::spawn({
784 let l_clone = l.clone();
785 move || {
786 c.spawn_from_within(move || async move {
787 let rc = std::rc::Rc::new(123);
788 futures_util::future::ready(()).await;
789 assert_eq!(std::rc::Rc::strong_count(&rc), 1);
790 l_clone.quit();
791 });
792 }
793 });
794
795 l.run();
796
797 join_handle.join().unwrap();
798 }
799
800 #[test]
801 fn test_block_on() {
802 let c = MainContext::new();
803
804 let mut v = None;
805 {
806 let v = &mut v;
807
808 let future = futures_util::future::lazy(|_ctx| {
809 *v = Some(123);
810 Ok::<i32, ()>(123)
811 });
812
813 let res = c.block_on(future);
814 assert_eq!(res, Ok(123));
815 }
816
817 assert_eq!(v, Some(123));
818 }
819
820 #[test]
821 fn test_spawn_return() {
822 let c = MainContext::new();
823 c.block_on(async {
824 let val = 1;
825 let ret = c
826 .spawn(async move { futures_util::future::ready(2).await + val })
827 .await;
828 assert_eq!(ret.unwrap(), 3);
829 });
830 }
831
832 #[test]
833 fn test_spawn_panic() {
834 let c = MainContext::new();
835 c.block_on(async {
836 let ret = c
837 .spawn(async {
838 panic!("failed");
839 })
840 .await;
841 assert_eq!(
842 *ret.unwrap_err().into_panic().downcast::<&str>().unwrap(),
843 "failed"
844 );
845 });
846 }
847
848 #[test]
849 fn test_spawn_abort() {
850 let c = MainContext::new();
851 let v = std::sync::Arc::new(1);
852 let v_clone = v.clone();
853 let c_ref = &c;
854 c.block_on(async move {
855 let handle = c_ref.spawn(async move {
856 let _v = v_clone;
857 let test: u128 = std::future::pending().await;
858 println!("{test}");
859 unreachable!();
860 });
861
862 handle.abort();
863 });
864 drop(c);
865
866 assert_eq!(std::sync::Arc::strong_count(&v), 1);
868 }
869}