gio/
gio_future.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{
4    future::Future,
5    pin::{self, Pin},
6};
7
8use futures_channel::oneshot;
9use futures_core::{
10    task::{Context, Poll},
11    FusedFuture,
12};
13
14use crate::{prelude::*, Cancellable};
15
16pub struct GioFuture<F, O, T> {
17    obj: O,
18    schedule_operation: Option<F>,
19    cancellable: Option<Cancellable>,
20    receiver: Option<oneshot::Receiver<T>>,
21}
22
23pub struct GioFutureResult<T> {
24    sender: oneshot::Sender<T>,
25}
26
27impl<T> GioFutureResult<T> {
28    pub fn resolve(self, res: T) {
29        let _ = self.sender.send(res);
30    }
31}
32
33impl<F, O, T: 'static> GioFuture<F, O, T>
34where
35    O: Clone + 'static,
36    F: FnOnce(&O, &Cancellable, GioFutureResult<T>) + 'static,
37{
38    pub fn new(obj: &O, schedule_operation: F) -> GioFuture<F, O, T> {
39        GioFuture {
40            obj: obj.clone(),
41            schedule_operation: Some(schedule_operation),
42            cancellable: Some(Cancellable::new()),
43            receiver: None,
44        }
45    }
46}
47
48impl<F, O, T> Future for GioFuture<F, O, T>
49where
50    O: Clone + 'static,
51    F: FnOnce(&O, &Cancellable, GioFutureResult<T>) + 'static,
52{
53    type Output = T;
54
55    fn poll(mut self: pin::Pin<&mut Self>, ctx: &mut Context) -> Poll<T> {
56        let GioFuture {
57            ref obj,
58            ref mut schedule_operation,
59            ref mut cancellable,
60            ref mut receiver,
61            ..
62        } = *self;
63
64        if let Some(schedule_operation) = schedule_operation.take() {
65            let main_context = glib::MainContext::ref_thread_default();
66            assert!(
67                main_context.is_owner(),
68                "Spawning futures only allowed if the thread is owning the MainContext"
69            );
70
71            // Channel for sending back the GIO async operation
72            // result to our future here.
73            //
74            // In theory, we could directly continue polling the
75            // corresponding task from the GIO async operation
76            // callback, however this would break at the very
77            // least the g_main_current_source() API.
78            let (send, recv) = oneshot::channel();
79
80            schedule_operation(
81                obj,
82                cancellable.as_ref().unwrap(),
83                GioFutureResult { sender: send },
84            );
85
86            *receiver = Some(recv);
87        }
88
89        // At this point we must have a receiver
90        let res = {
91            let receiver = receiver.as_mut().unwrap();
92            Pin::new(receiver).poll(ctx)
93        };
94
95        match res {
96            Poll::Pending => Poll::Pending,
97            Poll::Ready(Err(_)) => panic!("Async operation sender was unexpectedly closed"),
98            Poll::Ready(Ok(v)) => {
99                // Get rid of the reference to the cancellable and receiver
100                let _ = cancellable.take();
101                let _ = receiver.take();
102                Poll::Ready(v)
103            }
104        }
105    }
106}
107
108impl<F, O, T> FusedFuture for GioFuture<F, O, T>
109where
110    O: Clone + 'static,
111    F: FnOnce(&O, &Cancellable, GioFutureResult<T>) + 'static,
112{
113    fn is_terminated(&self) -> bool {
114        self.schedule_operation.is_none()
115            && self
116                .receiver
117                .as_ref()
118                .map_or(true, |receiver| receiver.is_terminated())
119    }
120}
121
122impl<F, O, T> Drop for GioFuture<F, O, T> {
123    fn drop(&mut self) {
124        if let Some(cancellable) = self.cancellable.take() {
125            cancellable.cancel();
126        }
127        let _ = self.receiver.take();
128    }
129}
130
131impl<F, O, T> Unpin for GioFuture<F, O, T> {}