gio/
cancellable_future.rs
1use std::{
4 fmt::{Debug, Display},
5 future::Future,
6 pin::Pin,
7 task::{Context, Poll},
8};
9
10use pin_project_lite::pin_project;
11
12use crate::{prelude::*, Cancellable, CancelledHandlerId, IOErrorEnum};
13
14pub struct Cancelled;
17
18pin_project! {
19 pub struct CancellableFuture<F> {
36 #[pin]
37 future: F,
38
39 #[pin]
40 waker_handler_cb: Option<CancelledHandlerId>,
41
42 cancellable: Cancellable,
43 }
44}
45
46impl<F> CancellableFuture<F> {
47 pub fn new(future: F, cancellable: Cancellable) -> Self {
54 Self {
55 future,
56 waker_handler_cb: None,
57 cancellable,
58 }
59 }
60
61 #[inline]
71 pub fn is_cancelled(&self) -> bool {
72 self.cancellable.is_cancelled()
73 }
74
75 #[inline]
78 pub fn cancellable(&self) -> &Cancellable {
79 &self.cancellable
80 }
81}
82
83impl<F> Future for CancellableFuture<F>
84where
85 F: Future,
86{
87 type Output = Result<<F as Future>::Output, Cancelled>;
88
89 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
90 if self.is_cancelled() {
91 return Poll::Ready(Err(Cancelled));
92 }
93
94 let mut this = self.as_mut().project();
95
96 match this.future.poll(cx) {
97 Poll::Ready(out) => Poll::Ready(Ok(out)),
98
99 Poll::Pending => {
100 if let Some(prev_handler) = this.waker_handler_cb.take() {
101 this.cancellable.disconnect_cancelled(prev_handler);
102 }
103
104 let canceller_handler_id = this.cancellable.connect_cancelled({
105 let w = cx.waker().clone();
106 move |_| w.wake()
107 });
108
109 match canceller_handler_id {
110 Some(canceller_handler_id) => {
111 *this.waker_handler_cb = Some(canceller_handler_id);
112 Poll::Pending
113 }
114
115 None => Poll::Ready(Err(Cancelled)),
116 }
117 }
118 }
119 }
120}
121
122impl From<Cancelled> for glib::Error {
123 fn from(_: Cancelled) -> Self {
124 glib::Error::new(IOErrorEnum::Cancelled, "Task cancelled")
125 }
126}
127
128impl std::error::Error for Cancelled {}
129
130impl Debug for Cancelled {
131 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132 write!(f, "Task cancelled")
133 }
134}
135
136impl Display for Cancelled {
137 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138 Debug::fmt(self, f)
139 }
140}
141
142#[cfg(test)]
143mod tests {
144 use futures_channel::oneshot;
145
146 use super::{Cancellable, CancellableFuture, Cancelled};
147 use crate::prelude::*;
148
149 #[test]
150 fn cancellable_future_ok() {
151 let ctx = glib::MainContext::new();
152 let c = Cancellable::new();
153 let (tx, rx) = oneshot::channel();
154
155 {
156 ctx.spawn_local(async {
157 let cancellable_future = CancellableFuture::new(async { 42 }, c);
158 assert!(!cancellable_future.is_cancelled());
159
160 let result = cancellable_future.await;
161 assert!(matches!(result, Ok(42)));
162
163 tx.send(()).unwrap();
164 });
165 }
166
167 ctx.block_on(rx).unwrap()
168 }
169
170 #[test]
171 fn cancellable_future_cancel() {
172 let ctx = glib::MainContext::new();
173 let c = Cancellable::new();
174 let (tx, rx) = oneshot::channel();
175
176 {
177 let c = c.clone();
178 ctx.spawn_local(async move {
179 let cancellable_future = CancellableFuture::new(std::future::pending::<()>(), c);
180
181 let result = cancellable_future.await;
182 assert!(matches!(result, Err(Cancelled)));
183
184 tx.send(()).unwrap();
185 });
186 }
187
188 std::thread::spawn(move || c.cancel()).join().unwrap();
189
190 ctx.block_on(rx).unwrap();
191 }
192}