glib/
source_futures.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{pin, pin::Pin, time::Duration};
4
5use futures_channel::{mpsc, oneshot};
6use futures_core::{
7    future::{FusedFuture, Future},
8    stream::{FusedStream, Stream},
9    task,
10    task::Poll,
11};
12
13use crate::{ControlFlow, MainContext, Priority, Source};
14
15// rustdoc-stripper-ignore-next
16/// Represents a `Future` around a `glib::Source`. The future will
17/// be resolved once the source has provided a value
18pub struct SourceFuture<F, T> {
19    create_source: Option<F>,
20    source: Option<(Source, oneshot::Receiver<T>)>,
21}
22
23impl<F, T: 'static> SourceFuture<F, T>
24where
25    F: FnOnce(oneshot::Sender<T>) -> Source + 'static,
26{
27    // rustdoc-stripper-ignore-next
28    /// Create a new `SourceFuture`
29    ///
30    /// The provided closure should return a newly created `glib::Source` when called
31    /// and pass the value provided by the source to the oneshot sender that is passed
32    /// to the closure.
33    pub fn new(create_source: F) -> SourceFuture<F, T> {
34        SourceFuture {
35            create_source: Some(create_source),
36            source: None,
37        }
38    }
39}
40
41impl<F, T> Unpin for SourceFuture<F, T> {}
42
43impl<F, T> Future for SourceFuture<F, T>
44where
45    F: FnOnce(oneshot::Sender<T>) -> Source + 'static,
46{
47    type Output = T;
48
49    fn poll(mut self: pin::Pin<&mut Self>, ctx: &mut task::Context) -> Poll<T> {
50        let SourceFuture {
51            ref mut create_source,
52            ref mut source,
53            ..
54        } = *self;
55
56        if let Some(create_source) = create_source.take() {
57            let main_context = MainContext::ref_thread_default();
58            assert!(
59                main_context.is_owner(),
60                "Spawning futures only allowed if the thread is owning the MainContext"
61            );
62
63            // Channel for sending back the Source result to our future here.
64            //
65            // In theory, we could directly continue polling the
66            // corresponding task from the Source callback,
67            // however this would break at the very least
68            // the g_main_current_source() API.
69            let (send, recv) = oneshot::channel();
70
71            let s = create_source(send);
72
73            s.attach(Some(&main_context));
74            *source = Some((s, recv));
75        }
76
77        // At this point we must have a receiver
78        let res = {
79            let &mut (_, ref mut receiver) = source.as_mut().unwrap();
80            Pin::new(receiver).poll(ctx)
81        };
82        #[allow(clippy::match_wild_err_arm)]
83        match res {
84            Poll::Ready(Err(_)) => panic!("Source sender was unexpectedly closed"),
85            Poll::Ready(Ok(v)) => {
86                // Get rid of the reference to the source, it triggered
87                let _ = source.take();
88                Poll::Ready(v)
89            }
90            Poll::Pending => Poll::Pending,
91        }
92    }
93}
94
95impl<F, T> FusedFuture for SourceFuture<F, T>
96where
97    F: FnOnce(oneshot::Sender<T>) -> Source + 'static,
98{
99    fn is_terminated(&self) -> bool {
100        self.create_source.is_none()
101            && self
102                .source
103                .as_ref()
104                .map_or(true, |(_, receiver)| receiver.is_terminated())
105    }
106}
107
108impl<T, F> Drop for SourceFuture<T, F> {
109    fn drop(&mut self) {
110        // Get rid of the source, we don't care anymore if it still triggers
111        if let Some((source, _)) = self.source.take() {
112            source.destroy();
113        }
114    }
115}
116
117// rustdoc-stripper-ignore-next
118/// Create a `Future` that will resolve after the given number of milliseconds.
119///
120/// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`.
121pub fn timeout_future(value: Duration) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
122    timeout_future_with_priority(crate::Priority::default(), value)
123}
124
125// rustdoc-stripper-ignore-next
126/// Create a `Future` that will resolve after the given number of milliseconds.
127///
128/// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`.
129pub fn timeout_future_with_priority(
130    priority: Priority,
131    value: Duration,
132) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
133    Box::pin(SourceFuture::new(move |send| {
134        let mut send = Some(send);
135        crate::timeout_source_new(value, None, priority, move || {
136            let _ = send.take().unwrap().send(());
137            ControlFlow::Break
138        })
139    }))
140}
141
142// rustdoc-stripper-ignore-next
143/// Create a `Future` that will resolve after the given number of seconds.
144///
145/// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`.
146pub fn timeout_future_seconds(value: u32) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
147    timeout_future_seconds_with_priority(crate::Priority::default(), value)
148}
149
150// rustdoc-stripper-ignore-next
151/// Create a `Future` that will resolve after the given number of seconds.
152///
153/// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`.
154pub fn timeout_future_seconds_with_priority(
155    priority: Priority,
156    value: u32,
157) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
158    Box::pin(SourceFuture::new(move |send| {
159        let mut send = Some(send);
160        crate::timeout_source_new_seconds(value, None, priority, move || {
161            let _ = send.take().unwrap().send(());
162            ControlFlow::Break
163        })
164    }))
165}
166
167// rustdoc-stripper-ignore-next
168/// Create a `Future` that will resolve once the child process with the given pid exits
169///
170/// The `Future` will resolve to the pid of the child process and the exit code.
171///
172/// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`.
173pub fn child_watch_future(
174    pid: crate::Pid,
175) -> Pin<Box<dyn Future<Output = (crate::Pid, i32)> + Send + 'static>> {
176    child_watch_future_with_priority(crate::Priority::default(), pid)
177}
178
179// rustdoc-stripper-ignore-next
180/// Create a `Future` that will resolve once the child process with the given pid exits
181///
182/// The `Future` will resolve to the pid of the child process and the exit code.
183///
184/// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`.
185pub fn child_watch_future_with_priority(
186    priority: Priority,
187    pid: crate::Pid,
188) -> Pin<Box<dyn Future<Output = (crate::Pid, i32)> + Send + 'static>> {
189    Box::pin(SourceFuture::new(move |send| {
190        let mut send = Some(send);
191        crate::child_watch_source_new(pid, None, priority, move |pid, code| {
192            let _ = send.take().unwrap().send((pid, code));
193        })
194    }))
195}
196
197#[cfg(unix)]
198#[cfg_attr(docsrs, doc(cfg(unix)))]
199// rustdoc-stripper-ignore-next
200/// Create a `Future` that will resolve once the given UNIX signal is raised
201///
202/// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`.
203pub fn unix_signal_future(signum: i32) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
204    unix_signal_future_with_priority(crate::Priority::default(), signum)
205}
206
207#[cfg(unix)]
208#[cfg_attr(docsrs, doc(cfg(unix)))]
209// rustdoc-stripper-ignore-next
210/// Create a `Future` that will resolve once the given UNIX signal is raised
211///
212/// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`.
213pub fn unix_signal_future_with_priority(
214    priority: Priority,
215    signum: i32,
216) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
217    Box::pin(SourceFuture::new(move |send| {
218        let mut send = Some(send);
219        crate::unix_signal_source_new(signum, None, priority, move || {
220            let _ = send.take().unwrap().send(());
221            ControlFlow::Break
222        })
223    }))
224}
225
226// rustdoc-stripper-ignore-next
227/// Represents a `Stream` around a `glib::Source`. The stream will
228/// be provide all values that are provided by the source
229pub struct SourceStream<F, T> {
230    create_source: Option<F>,
231    source: Option<(Source, mpsc::UnboundedReceiver<T>)>,
232}
233
234impl<F, T> Unpin for SourceStream<F, T> {}
235
236impl<F, T: 'static> SourceStream<F, T>
237where
238    F: FnOnce(mpsc::UnboundedSender<T>) -> Source + 'static,
239{
240    // rustdoc-stripper-ignore-next
241    /// Create a new `SourceStream`
242    ///
243    /// The provided closure should return a newly created `glib::Source` when called
244    /// and pass the values provided by the source to the sender that is passed
245    /// to the closure.
246    pub fn new(create_source: F) -> SourceStream<F, T> {
247        SourceStream {
248            create_source: Some(create_source),
249            source: None,
250        }
251    }
252}
253
254impl<F, T> Stream for SourceStream<F, T>
255where
256    F: FnOnce(mpsc::UnboundedSender<T>) -> Source + 'static,
257{
258    type Item = T;
259
260    fn poll_next(mut self: pin::Pin<&mut Self>, ctx: &mut task::Context) -> Poll<Option<T>> {
261        let SourceStream {
262            ref mut create_source,
263            ref mut source,
264            ..
265        } = *self;
266
267        if let Some(create_source) = create_source.take() {
268            let main_context = MainContext::ref_thread_default();
269            assert!(
270                main_context.is_owner(),
271                "Spawning futures only allowed if the thread is owning the MainContext"
272            );
273
274            // Channel for sending back the Source result to our future here.
275            //
276            // In theory we could directly continue polling the
277            // corresponding task from the Source callback,
278            // however this would break at the very least
279            // the g_main_current_source() API.
280            let (send, recv) = mpsc::unbounded();
281
282            let s = create_source(send);
283
284            s.attach(Some(&main_context));
285            *source = Some((s, recv));
286        }
287
288        // At this point we must have a receiver
289        let res = {
290            let &mut (_, ref mut receiver) = source.as_mut().unwrap();
291            Pin::new(receiver).poll_next(ctx)
292        };
293        #[allow(clippy::match_wild_err_arm)]
294        match res {
295            Poll::Ready(v) => {
296                if v.is_none() {
297                    // Get rid of the reference to the source, it triggered
298                    let _ = source.take();
299                }
300                Poll::Ready(v)
301            }
302            Poll::Pending => Poll::Pending,
303        }
304    }
305}
306
307impl<F, T> FusedStream for SourceStream<F, T>
308where
309    F: FnOnce(mpsc::UnboundedSender<T>) -> Source + 'static,
310{
311    fn is_terminated(&self) -> bool {
312        self.create_source.is_none()
313            && self
314                .source
315                .as_ref()
316                .map_or(true, |(_, receiver)| receiver.is_terminated())
317    }
318}
319
320impl<T, F> Drop for SourceStream<T, F> {
321    fn drop(&mut self) {
322        // Get rid of the source, we don't care anymore if it still triggers
323        if let Some((source, _)) = self.source.take() {
324            source.destroy();
325        }
326    }
327}
328
329// rustdoc-stripper-ignore-next
330/// Create a `Stream` that will provide a value every given number of milliseconds.
331///
332/// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`.
333pub fn interval_stream(value: Duration) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>> {
334    interval_stream_with_priority(crate::Priority::default(), value)
335}
336
337// rustdoc-stripper-ignore-next
338/// Create a `Stream` that will provide a value every given number of milliseconds.
339///
340/// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`.
341pub fn interval_stream_with_priority(
342    priority: Priority,
343    value: Duration,
344) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>> {
345    Box::pin(SourceStream::new(move |send| {
346        crate::timeout_source_new(value, None, priority, move || {
347            if send.unbounded_send(()).is_err() {
348                ControlFlow::Break
349            } else {
350                ControlFlow::Continue
351            }
352        })
353    }))
354}
355
356// rustdoc-stripper-ignore-next
357/// Create a `Stream` that will provide a value every given number of seconds.
358///
359/// The `Stream` must be spawned on an `Executor` backed by a `glib::MainContext`.
360pub fn interval_stream_seconds(value: u32) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>> {
361    interval_stream_seconds_with_priority(crate::Priority::default(), value)
362}
363
364// rustdoc-stripper-ignore-next
365/// Create a `Stream` that will provide a value every given number of seconds.
366///
367/// The `Stream` must be spawned on an `Executor` backed by a `glib::MainContext`.
368pub fn interval_stream_seconds_with_priority(
369    priority: Priority,
370    value: u32,
371) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>> {
372    Box::pin(SourceStream::new(move |send| {
373        crate::timeout_source_new_seconds(value, None, priority, move || {
374            if send.unbounded_send(()).is_err() {
375                ControlFlow::Break
376            } else {
377                ControlFlow::Continue
378            }
379        })
380    }))
381}
382
383#[cfg(unix)]
384#[cfg_attr(docsrs, doc(cfg(unix)))]
385// rustdoc-stripper-ignore-next
386/// Create a `Stream` that will provide a value whenever the given UNIX signal is raised
387///
388/// The `Stream` must be spawned on an `Executor` backed by a `glib::MainContext`.
389pub fn unix_signal_stream(signum: i32) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>> {
390    unix_signal_stream_with_priority(crate::Priority::default(), signum)
391}
392
393#[cfg(unix)]
394#[cfg_attr(docsrs, doc(cfg(unix)))]
395// rustdoc-stripper-ignore-next
396/// Create a `Stream` that will provide a value whenever the given UNIX signal is raised
397///
398/// The `Stream` must be spawned on an `Executor` backed by a `glib::MainContext`.
399pub fn unix_signal_stream_with_priority(
400    priority: Priority,
401    signum: i32,
402) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>> {
403    Box::pin(SourceStream::new(move |send| {
404        crate::unix_signal_source_new(signum, None, priority, move || {
405            if send.unbounded_send(()).is_err() {
406                ControlFlow::Break
407            } else {
408                ControlFlow::Continue
409            }
410        })
411    }))
412}
413
414#[cfg(test)]
415mod tests {
416    use std::{thread, time::Duration};
417
418    use futures_util::{future::FutureExt, stream::StreamExt};
419
420    use super::*;
421
422    #[test]
423    fn test_timeout() {
424        let c = MainContext::new();
425
426        c.block_on(timeout_future(Duration::from_millis(20)));
427    }
428
429    #[test]
430    fn test_timeout_send() {
431        let c = MainContext::new();
432        let l = crate::MainLoop::new(Some(&c), false);
433
434        let l_clone = l.clone();
435        c.spawn(timeout_future(Duration::from_millis(20)).then(move |()| {
436            l_clone.quit();
437            futures_util::future::ready(())
438        }));
439
440        l.run();
441    }
442
443    #[test]
444    fn test_interval() {
445        let c = MainContext::new();
446
447        let mut count = 0;
448
449        {
450            let count = &mut count;
451            c.block_on(
452                interval_stream(Duration::from_millis(20))
453                    .take(2)
454                    .for_each(|()| {
455                        *count += 1;
456
457                        futures_util::future::ready(())
458                    })
459                    .map(|_| ()),
460            );
461        }
462
463        assert_eq!(count, 2);
464    }
465
466    #[test]
467    fn test_timeout_and_channel() {
468        let c = MainContext::new();
469
470        let res = c.block_on(timeout_future(Duration::from_millis(20)).then(|()| {
471            let (sender, receiver) = oneshot::channel();
472
473            thread::spawn(move || {
474                sender.send(1).unwrap();
475            });
476
477            receiver.then(|i| futures_util::future::ready(i.unwrap()))
478        }));
479
480        assert_eq!(res, 1);
481    }
482}