gio/
gio_future.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
// Take a look at the license at the top of the repository in the LICENSE file.

use std::{
    future::Future,
    pin::{self, Pin},
};

use futures_channel::oneshot;
use futures_core::{
    task::{Context, Poll},
    FusedFuture,
};

use crate::{prelude::*, Cancellable};

pub struct GioFuture<F, O, T> {
    obj: O,
    schedule_operation: Option<F>,
    cancellable: Option<Cancellable>,
    receiver: Option<oneshot::Receiver<T>>,
}

pub struct GioFutureResult<T> {
    sender: oneshot::Sender<T>,
}

impl<T> GioFutureResult<T> {
    pub fn resolve(self, res: T) {
        let _ = self.sender.send(res);
    }
}

impl<F, O, T: 'static> GioFuture<F, O, T>
where
    O: Clone + 'static,
    F: FnOnce(&O, &Cancellable, GioFutureResult<T>) + 'static,
{
    pub fn new(obj: &O, schedule_operation: F) -> GioFuture<F, O, T> {
        GioFuture {
            obj: obj.clone(),
            schedule_operation: Some(schedule_operation),
            cancellable: Some(Cancellable::new()),
            receiver: None,
        }
    }
}

impl<F, O, T> Future for GioFuture<F, O, T>
where
    O: Clone + 'static,
    F: FnOnce(&O, &Cancellable, GioFutureResult<T>) + 'static,
{
    type Output = T;

    fn poll(mut self: pin::Pin<&mut Self>, ctx: &mut Context) -> Poll<T> {
        let GioFuture {
            ref obj,
            ref mut schedule_operation,
            ref mut cancellable,
            ref mut receiver,
            ..
        } = *self;

        if let Some(schedule_operation) = schedule_operation.take() {
            let main_context = glib::MainContext::ref_thread_default();
            assert!(
                main_context.is_owner(),
                "Spawning futures only allowed if the thread is owning the MainContext"
            );

            // Channel for sending back the GIO async operation
            // result to our future here.
            //
            // In theory, we could directly continue polling the
            // corresponding task from the GIO async operation
            // callback, however this would break at the very
            // least the g_main_current_source() API.
            let (send, recv) = oneshot::channel();

            schedule_operation(
                obj,
                cancellable.as_ref().unwrap(),
                GioFutureResult { sender: send },
            );

            *receiver = Some(recv);
        }

        // At this point we must have a receiver
        let res = {
            let receiver = receiver.as_mut().unwrap();
            Pin::new(receiver).poll(ctx)
        };

        match res {
            Poll::Pending => Poll::Pending,
            Poll::Ready(Err(_)) => panic!("Async operation sender was unexpectedly closed"),
            Poll::Ready(Ok(v)) => {
                // Get rid of the reference to the cancellable and receiver
                let _ = cancellable.take();
                let _ = receiver.take();
                Poll::Ready(v)
            }
        }
    }
}

impl<F, O, T> FusedFuture for GioFuture<F, O, T>
where
    O: Clone + 'static,
    F: FnOnce(&O, &Cancellable, GioFutureResult<T>) + 'static,
{
    fn is_terminated(&self) -> bool {
        self.schedule_operation.is_none()
            && self
                .receiver
                .as_ref()
                .map_or(true, |receiver| receiver.is_terminated())
    }
}

impl<F, O, T> Drop for GioFuture<F, O, T> {
    fn drop(&mut self) {
        if let Some(cancellable) = self.cancellable.take() {
            cancellable.cancel();
        }
        let _ = self.receiver.take();
    }
}

impl<F, O, T> Unpin for GioFuture<F, O, T> {}