1use std::{pin, pin::Pin, time::Duration};
4
5use futures_channel::{mpsc, oneshot};
6use futures_core::{
7 future::{FusedFuture, Future},
8 stream::{FusedStream, Stream},
9 task,
10 task::Poll,
11};
12
13use crate::{ControlFlow, MainContext, Priority, Source};
14
15pub struct SourceFuture<F, T> {
19 create_source: Option<F>,
20 source: Option<(Source, oneshot::Receiver<T>)>,
21}
22
23impl<F, T: 'static> SourceFuture<F, T>
24where
25 F: FnOnce(oneshot::Sender<T>) -> Source + 'static,
26{
27 pub fn new(create_source: F) -> SourceFuture<F, T> {
34 SourceFuture {
35 create_source: Some(create_source),
36 source: None,
37 }
38 }
39}
40
41impl<F, T> Unpin for SourceFuture<F, T> {}
42
43impl<F, T> Future for SourceFuture<F, T>
44where
45 F: FnOnce(oneshot::Sender<T>) -> Source + 'static,
46{
47 type Output = T;
48
49 fn poll(mut self: pin::Pin<&mut Self>, ctx: &mut task::Context) -> Poll<T> {
50 let SourceFuture {
51 ref mut create_source,
52 ref mut source,
53 ..
54 } = *self;
55
56 if let Some(create_source) = create_source.take() {
57 let main_context = MainContext::ref_thread_default();
58 assert!(
59 main_context.is_owner(),
60 "Spawning futures only allowed if the thread is owning the MainContext"
61 );
62
63 let (send, recv) = oneshot::channel();
70
71 let s = create_source(send);
72
73 s.attach(Some(&main_context));
74 *source = Some((s, recv));
75 }
76
77 let res = {
79 let &mut (_, ref mut receiver) = source.as_mut().unwrap();
80 Pin::new(receiver).poll(ctx)
81 };
82 #[allow(clippy::match_wild_err_arm)]
83 match res {
84 Poll::Ready(Err(_)) => panic!("Source sender was unexpectedly closed"),
85 Poll::Ready(Ok(v)) => {
86 let _ = source.take();
88 Poll::Ready(v)
89 }
90 Poll::Pending => Poll::Pending,
91 }
92 }
93}
94
95impl<F, T> FusedFuture for SourceFuture<F, T>
96where
97 F: FnOnce(oneshot::Sender<T>) -> Source + 'static,
98{
99 fn is_terminated(&self) -> bool {
100 self.create_source.is_none()
101 && self
102 .source
103 .as_ref()
104 .map_or(true, |(_, receiver)| receiver.is_terminated())
105 }
106}
107
108impl<T, F> Drop for SourceFuture<T, F> {
109 fn drop(&mut self) {
110 if let Some((source, _)) = self.source.take() {
112 source.destroy();
113 }
114 }
115}
116
117pub fn timeout_future(value: Duration) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
122 timeout_future_with_priority(crate::Priority::default(), value)
123}
124
125pub fn timeout_future_with_priority(
130 priority: Priority,
131 value: Duration,
132) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
133 Box::pin(SourceFuture::new(move |send| {
134 let mut send = Some(send);
135 crate::timeout_source_new(value, None, priority, move || {
136 let _ = send.take().unwrap().send(());
137 ControlFlow::Break
138 })
139 }))
140}
141
142pub fn timeout_future_seconds(value: u32) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
147 timeout_future_seconds_with_priority(crate::Priority::default(), value)
148}
149
150pub fn timeout_future_seconds_with_priority(
155 priority: Priority,
156 value: u32,
157) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
158 Box::pin(SourceFuture::new(move |send| {
159 let mut send = Some(send);
160 crate::timeout_source_new_seconds(value, None, priority, move || {
161 let _ = send.take().unwrap().send(());
162 ControlFlow::Break
163 })
164 }))
165}
166
167pub fn child_watch_future(
174 pid: crate::Pid,
175) -> Pin<Box<dyn Future<Output = (crate::Pid, i32)> + Send + 'static>> {
176 child_watch_future_with_priority(crate::Priority::default(), pid)
177}
178
179pub fn child_watch_future_with_priority(
186 priority: Priority,
187 pid: crate::Pid,
188) -> Pin<Box<dyn Future<Output = (crate::Pid, i32)> + Send + 'static>> {
189 Box::pin(SourceFuture::new(move |send| {
190 let mut send = Some(send);
191 crate::child_watch_source_new(pid, None, priority, move |pid, code| {
192 let _ = send.take().unwrap().send((pid, code));
193 })
194 }))
195}
196
197#[cfg(unix)]
198#[cfg_attr(docsrs, doc(cfg(unix)))]
199pub fn unix_signal_future(signum: i32) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
204 unix_signal_future_with_priority(crate::Priority::default(), signum)
205}
206
207#[cfg(unix)]
208#[cfg_attr(docsrs, doc(cfg(unix)))]
209pub fn unix_signal_future_with_priority(
214 priority: Priority,
215 signum: i32,
216) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
217 Box::pin(SourceFuture::new(move |send| {
218 let mut send = Some(send);
219 crate::unix_signal_source_new(signum, None, priority, move || {
220 let _ = send.take().unwrap().send(());
221 ControlFlow::Break
222 })
223 }))
224}
225
226pub struct SourceStream<F, T> {
230 create_source: Option<F>,
231 source: Option<(Source, mpsc::UnboundedReceiver<T>)>,
232}
233
234impl<F, T> Unpin for SourceStream<F, T> {}
235
236impl<F, T: 'static> SourceStream<F, T>
237where
238 F: FnOnce(mpsc::UnboundedSender<T>) -> Source + 'static,
239{
240 pub fn new(create_source: F) -> SourceStream<F, T> {
247 SourceStream {
248 create_source: Some(create_source),
249 source: None,
250 }
251 }
252}
253
254impl<F, T> Stream for SourceStream<F, T>
255where
256 F: FnOnce(mpsc::UnboundedSender<T>) -> Source + 'static,
257{
258 type Item = T;
259
260 fn poll_next(mut self: pin::Pin<&mut Self>, ctx: &mut task::Context) -> Poll<Option<T>> {
261 let SourceStream {
262 ref mut create_source,
263 ref mut source,
264 ..
265 } = *self;
266
267 if let Some(create_source) = create_source.take() {
268 let main_context = MainContext::ref_thread_default();
269 assert!(
270 main_context.is_owner(),
271 "Spawning futures only allowed if the thread is owning the MainContext"
272 );
273
274 let (send, recv) = mpsc::unbounded();
281
282 let s = create_source(send);
283
284 s.attach(Some(&main_context));
285 *source = Some((s, recv));
286 }
287
288 let res = {
290 let &mut (_, ref mut receiver) = source.as_mut().unwrap();
291 Pin::new(receiver).poll_next(ctx)
292 };
293 #[allow(clippy::match_wild_err_arm)]
294 match res {
295 Poll::Ready(v) => {
296 if v.is_none() {
297 let _ = source.take();
299 }
300 Poll::Ready(v)
301 }
302 Poll::Pending => Poll::Pending,
303 }
304 }
305}
306
307impl<F, T> FusedStream for SourceStream<F, T>
308where
309 F: FnOnce(mpsc::UnboundedSender<T>) -> Source + 'static,
310{
311 fn is_terminated(&self) -> bool {
312 self.create_source.is_none()
313 && self
314 .source
315 .as_ref()
316 .map_or(true, |(_, receiver)| receiver.is_terminated())
317 }
318}
319
320impl<T, F> Drop for SourceStream<T, F> {
321 fn drop(&mut self) {
322 if let Some((source, _)) = self.source.take() {
324 source.destroy();
325 }
326 }
327}
328
329pub fn interval_stream(value: Duration) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>> {
334 interval_stream_with_priority(crate::Priority::default(), value)
335}
336
337pub fn interval_stream_with_priority(
342 priority: Priority,
343 value: Duration,
344) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>> {
345 Box::pin(SourceStream::new(move |send| {
346 crate::timeout_source_new(value, None, priority, move || {
347 if send.unbounded_send(()).is_err() {
348 ControlFlow::Break
349 } else {
350 ControlFlow::Continue
351 }
352 })
353 }))
354}
355
356pub fn interval_stream_seconds(value: u32) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>> {
361 interval_stream_seconds_with_priority(crate::Priority::default(), value)
362}
363
364pub fn interval_stream_seconds_with_priority(
369 priority: Priority,
370 value: u32,
371) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>> {
372 Box::pin(SourceStream::new(move |send| {
373 crate::timeout_source_new_seconds(value, None, priority, move || {
374 if send.unbounded_send(()).is_err() {
375 ControlFlow::Break
376 } else {
377 ControlFlow::Continue
378 }
379 })
380 }))
381}
382
383#[cfg(unix)]
384#[cfg_attr(docsrs, doc(cfg(unix)))]
385pub fn unix_signal_stream(signum: i32) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>> {
390 unix_signal_stream_with_priority(crate::Priority::default(), signum)
391}
392
393#[cfg(unix)]
394#[cfg_attr(docsrs, doc(cfg(unix)))]
395pub fn unix_signal_stream_with_priority(
400 priority: Priority,
401 signum: i32,
402) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>> {
403 Box::pin(SourceStream::new(move |send| {
404 crate::unix_signal_source_new(signum, None, priority, move || {
405 if send.unbounded_send(()).is_err() {
406 ControlFlow::Break
407 } else {
408 ControlFlow::Continue
409 }
410 })
411 }))
412}
413
414#[cfg(test)]
415mod tests {
416 use std::{thread, time::Duration};
417
418 use futures_util::{future::FutureExt, stream::StreamExt};
419
420 use super::*;
421
422 #[test]
423 fn test_timeout() {
424 let c = MainContext::new();
425
426 c.block_on(timeout_future(Duration::from_millis(20)));
427 }
428
429 #[test]
430 fn test_timeout_send() {
431 let c = MainContext::new();
432 let l = crate::MainLoop::new(Some(&c), false);
433
434 let l_clone = l.clone();
435 c.spawn(timeout_future(Duration::from_millis(20)).then(move |()| {
436 l_clone.quit();
437 futures_util::future::ready(())
438 }));
439
440 l.run();
441 }
442
443 #[test]
444 fn test_interval() {
445 let c = MainContext::new();
446
447 let mut count = 0;
448
449 {
450 let count = &mut count;
451 c.block_on(
452 interval_stream(Duration::from_millis(20))
453 .take(2)
454 .for_each(|()| {
455 *count += 1;
456
457 futures_util::future::ready(())
458 })
459 .map(|_| ()),
460 );
461 }
462
463 assert_eq!(count, 2);
464 }
465
466 #[test]
467 fn test_timeout_and_channel() {
468 let c = MainContext::new();
469
470 let res = c.block_on(timeout_future(Duration::from_millis(20)).then(|()| {
471 let (sender, receiver) = oneshot::channel();
472
473 thread::spawn(move || {
474 sender.send(1).unwrap();
475 });
476
477 receiver.then(|i| futures_util::future::ready(i.unwrap()))
478 }));
479
480 assert_eq!(res, 1);
481 }
482}