glib/
source_futures.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{pin, pin::Pin, time::Duration};
4
5use futures_channel::{mpsc, oneshot};
6use futures_core::{
7    future::{FusedFuture, Future},
8    stream::{FusedStream, Stream},
9    task,
10    task::Poll,
11};
12
13use crate::{ControlFlow, MainContext, Priority, Source};
14
15// rustdoc-stripper-ignore-next
16/// Represents a `Future` around a `glib::Source`. The future will
17/// be resolved once the source has provided a value
18pub struct SourceFuture<F, T> {
19    create_source: Option<F>,
20    source: Option<(Source, oneshot::Receiver<T>)>,
21}
22
23impl<F, T: 'static> SourceFuture<F, T>
24where
25    F: FnOnce(oneshot::Sender<T>) -> Source + 'static,
26{
27    // rustdoc-stripper-ignore-next
28    /// Create a new `SourceFuture`
29    ///
30    /// The provided closure should return a newly created `glib::Source` when called
31    /// and pass the value provided by the source to the oneshot sender that is passed
32    /// to the closure.
33    pub fn new(create_source: F) -> SourceFuture<F, T> {
34        SourceFuture {
35            create_source: Some(create_source),
36            source: None,
37        }
38    }
39}
40
41impl<F, T> Unpin for SourceFuture<F, T> {}
42
43impl<F, T> Future for SourceFuture<F, T>
44where
45    F: FnOnce(oneshot::Sender<T>) -> Source + 'static,
46{
47    type Output = T;
48
49    fn poll(mut self: pin::Pin<&mut Self>, ctx: &mut task::Context) -> Poll<T> {
50        let SourceFuture {
51            ref mut create_source,
52            ref mut source,
53            ..
54        } = *self;
55
56        if let Some(create_source) = create_source.take() {
57            let main_context = MainContext::ref_thread_default();
58            assert!(
59                main_context.is_owner(),
60                "Spawning futures only allowed if the thread is owning the MainContext"
61            );
62
63            // Channel for sending back the Source result to our future here.
64            //
65            // In theory, we could directly continue polling the
66            // corresponding task from the Source callback,
67            // however this would break at the very least
68            // the g_main_current_source() API.
69            let (send, recv) = oneshot::channel();
70
71            let s = create_source(send);
72
73            s.attach(Some(&main_context));
74            *source = Some((s, recv));
75        }
76
77        // At this point we must have a receiver
78        let res = {
79            let &mut (_, ref mut receiver) = source.as_mut().unwrap();
80            Pin::new(receiver).poll(ctx)
81        };
82        #[allow(clippy::match_wild_err_arm)]
83        match res {
84            Poll::Ready(Err(_)) => panic!("Source sender was unexpectedly closed"),
85            Poll::Ready(Ok(v)) => {
86                // Get rid of the reference to the source, it triggered
87                let _ = source.take();
88                Poll::Ready(v)
89            }
90            Poll::Pending => Poll::Pending,
91        }
92    }
93}
94
95impl<F, T> FusedFuture for SourceFuture<F, T>
96where
97    F: FnOnce(oneshot::Sender<T>) -> Source + 'static,
98{
99    fn is_terminated(&self) -> bool {
100        self.create_source.is_none()
101            && self
102                .source
103                .as_ref()
104                .is_none_or(|(_, receiver)| receiver.is_terminated())
105    }
106}
107
108impl<T, F> Drop for SourceFuture<T, F> {
109    fn drop(&mut self) {
110        // Get rid of the source, we don't care anymore if it still triggers
111        if let Some((source, _)) = self.source.take() {
112            source.destroy();
113        }
114    }
115}
116
117// rustdoc-stripper-ignore-next
118/// Create a `Future` that will resolve after the given number of milliseconds.
119///
120/// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`.
121pub fn timeout_future(value: Duration) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
122    timeout_future_with_priority(crate::Priority::default(), value)
123}
124
125// rustdoc-stripper-ignore-next
126/// Create a `Future` that will resolve after the given number of milliseconds.
127///
128/// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`.
129pub fn timeout_future_with_priority(
130    priority: Priority,
131    value: Duration,
132) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
133    Box::pin(SourceFuture::new(move |send| {
134        let mut send = Some(send);
135        crate::timeout_source_new(value, None, priority, move || {
136            let _ = send.take().unwrap().send(());
137            ControlFlow::Break
138        })
139    }))
140}
141
142// rustdoc-stripper-ignore-next
143/// Create a `Future` that will resolve after the given number of seconds.
144///
145/// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`.
146pub fn timeout_future_seconds(value: u32) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
147    timeout_future_seconds_with_priority(crate::Priority::default(), value)
148}
149
150// rustdoc-stripper-ignore-next
151/// Create a `Future` that will resolve after the given number of seconds.
152///
153/// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`.
154pub fn timeout_future_seconds_with_priority(
155    priority: Priority,
156    value: u32,
157) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
158    Box::pin(SourceFuture::new(move |send| {
159        let mut send = Some(send);
160        crate::timeout_source_new_seconds(value, None, priority, move || {
161            let _ = send.take().unwrap().send(());
162            ControlFlow::Break
163        })
164    }))
165}
166
167// rustdoc-stripper-ignore-next
168/// Create a `Future` that will resolve once the child process with the given pid exits
169///
170/// The `Future` will resolve to the pid of the child process and the exit code.
171///
172/// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`.
173pub fn child_watch_future(
174    pid: crate::Pid,
175) -> Pin<Box<dyn Future<Output = (crate::Pid, i32)> + Send + 'static>> {
176    child_watch_future_with_priority(crate::Priority::default(), pid)
177}
178
179// rustdoc-stripper-ignore-next
180/// Create a `Future` that will resolve once the child process with the given pid exits
181///
182/// The `Future` will resolve to the pid of the child process and the exit code.
183///
184/// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`.
185pub fn child_watch_future_with_priority(
186    priority: Priority,
187    pid: crate::Pid,
188) -> Pin<Box<dyn Future<Output = (crate::Pid, i32)> + Send + 'static>> {
189    Box::pin(SourceFuture::new(move |send| {
190        let mut send = Some(send);
191        crate::child_watch_source_new(pid, None, priority, move |pid, code| {
192            let _ = send.take().unwrap().send((pid, code));
193        })
194    }))
195}
196
197// rustdoc-stripper-ignore-next
198/// Represents a `Stream` around a `glib::Source`. The stream will
199/// be provide all values that are provided by the source
200pub struct SourceStream<F, T> {
201    create_source: Option<F>,
202    source: Option<(Source, mpsc::UnboundedReceiver<T>)>,
203}
204
205impl<F, T> Unpin for SourceStream<F, T> {}
206
207impl<F, T: 'static> SourceStream<F, T>
208where
209    F: FnOnce(mpsc::UnboundedSender<T>) -> Source + 'static,
210{
211    // rustdoc-stripper-ignore-next
212    /// Create a new `SourceStream`
213    ///
214    /// The provided closure should return a newly created `glib::Source` when called
215    /// and pass the values provided by the source to the sender that is passed
216    /// to the closure.
217    pub fn new(create_source: F) -> SourceStream<F, T> {
218        SourceStream {
219            create_source: Some(create_source),
220            source: None,
221        }
222    }
223}
224
225impl<F, T> Stream for SourceStream<F, T>
226where
227    F: FnOnce(mpsc::UnboundedSender<T>) -> Source + 'static,
228{
229    type Item = T;
230
231    fn poll_next(mut self: pin::Pin<&mut Self>, ctx: &mut task::Context) -> Poll<Option<T>> {
232        let SourceStream {
233            ref mut create_source,
234            ref mut source,
235            ..
236        } = *self;
237
238        if let Some(create_source) = create_source.take() {
239            let main_context = MainContext::ref_thread_default();
240            assert!(
241                main_context.is_owner(),
242                "Spawning futures only allowed if the thread is owning the MainContext"
243            );
244
245            // Channel for sending back the Source result to our future here.
246            //
247            // In theory we could directly continue polling the
248            // corresponding task from the Source callback,
249            // however this would break at the very least
250            // the g_main_current_source() API.
251            let (send, recv) = mpsc::unbounded();
252
253            let s = create_source(send);
254
255            s.attach(Some(&main_context));
256            *source = Some((s, recv));
257        }
258
259        // At this point we must have a receiver
260        let res = {
261            let &mut (_, ref mut receiver) = source.as_mut().unwrap();
262            Pin::new(receiver).poll_next(ctx)
263        };
264        #[allow(clippy::match_wild_err_arm)]
265        match res {
266            Poll::Ready(v) => {
267                if v.is_none() {
268                    // Get rid of the reference to the source, it triggered
269                    let _ = source.take();
270                }
271                Poll::Ready(v)
272            }
273            Poll::Pending => Poll::Pending,
274        }
275    }
276}
277
278impl<F, T> FusedStream for SourceStream<F, T>
279where
280    F: FnOnce(mpsc::UnboundedSender<T>) -> Source + 'static,
281{
282    fn is_terminated(&self) -> bool {
283        self.create_source.is_none()
284            && self
285                .source
286                .as_ref()
287                .is_none_or(|(_, receiver)| receiver.is_terminated())
288    }
289}
290
291impl<T, F> Drop for SourceStream<T, F> {
292    fn drop(&mut self) {
293        // Get rid of the source, we don't care anymore if it still triggers
294        if let Some((source, _)) = self.source.take() {
295            source.destroy();
296        }
297    }
298}
299
300// rustdoc-stripper-ignore-next
301/// Create a `Stream` that will provide a value every given number of milliseconds.
302///
303/// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`.
304pub fn interval_stream(value: Duration) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>> {
305    interval_stream_with_priority(crate::Priority::default(), value)
306}
307
308// rustdoc-stripper-ignore-next
309/// Create a `Stream` that will provide a value every given number of milliseconds.
310///
311/// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`.
312pub fn interval_stream_with_priority(
313    priority: Priority,
314    value: Duration,
315) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>> {
316    Box::pin(SourceStream::new(move |send| {
317        crate::timeout_source_new(value, None, priority, move || {
318            if send.unbounded_send(()).is_err() {
319                ControlFlow::Break
320            } else {
321                ControlFlow::Continue
322            }
323        })
324    }))
325}
326
327// rustdoc-stripper-ignore-next
328/// Create a `Stream` that will provide a value every given number of seconds.
329///
330/// The `Stream` must be spawned on an `Executor` backed by a `glib::MainContext`.
331pub fn interval_stream_seconds(value: u32) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>> {
332    interval_stream_seconds_with_priority(crate::Priority::default(), value)
333}
334
335// rustdoc-stripper-ignore-next
336/// Create a `Stream` that will provide a value every given number of seconds.
337///
338/// The `Stream` must be spawned on an `Executor` backed by a `glib::MainContext`.
339pub fn interval_stream_seconds_with_priority(
340    priority: Priority,
341    value: u32,
342) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>> {
343    Box::pin(SourceStream::new(move |send| {
344        crate::timeout_source_new_seconds(value, None, priority, move || {
345            if send.unbounded_send(()).is_err() {
346                ControlFlow::Break
347            } else {
348                ControlFlow::Continue
349            }
350        })
351    }))
352}
353
354#[cfg(test)]
355mod tests {
356    use std::{thread, time::Duration};
357
358    use futures_util::{future::FutureExt, stream::StreamExt};
359
360    use super::*;
361
362    #[test]
363    fn test_timeout() {
364        let c = MainContext::new();
365
366        c.block_on(timeout_future(Duration::from_millis(20)));
367    }
368
369    #[test]
370    fn test_timeout_send() {
371        let c = MainContext::new();
372        let l = crate::MainLoop::new(Some(&c), false);
373
374        let l_clone = l.clone();
375        c.spawn(timeout_future(Duration::from_millis(20)).then(move |()| {
376            l_clone.quit();
377            futures_util::future::ready(())
378        }));
379
380        l.run();
381    }
382
383    #[test]
384    fn test_interval() {
385        let c = MainContext::new();
386
387        let mut count = 0;
388
389        {
390            let count = &mut count;
391            c.block_on(
392                interval_stream(Duration::from_millis(20))
393                    .take(2)
394                    .for_each(|()| {
395                        *count += 1;
396
397                        futures_util::future::ready(())
398                    })
399                    .map(|_| ()),
400            );
401        }
402
403        assert_eq!(count, 2);
404    }
405
406    #[test]
407    fn test_timeout_and_channel() {
408        let c = MainContext::new();
409
410        let res = c.block_on(timeout_future(Duration::from_millis(20)).then(|()| {
411            let (sender, receiver) = oneshot::channel();
412
413            thread::spawn(move || {
414                sender.send(1).unwrap();
415            });
416
417            receiver.then(|i| futures_util::future::ready(i.unwrap()))
418        }));
419
420        assert_eq!(res, 1);
421    }
422}