1use std::{pin, pin::Pin, time::Duration};
4
5use futures_channel::{mpsc, oneshot};
6use futures_core::{
7 future::{FusedFuture, Future},
8 stream::{FusedStream, Stream},
9 task,
10 task::Poll,
11};
12
13use crate::{ControlFlow, MainContext, Priority, Source};
14
15pub struct SourceFuture<F, T> {
19 create_source: Option<F>,
20 source: Option<(Source, oneshot::Receiver<T>)>,
21}
22
23impl<F, T: 'static> SourceFuture<F, T>
24where
25 F: FnOnce(oneshot::Sender<T>) -> Source + 'static,
26{
27 pub fn new(create_source: F) -> SourceFuture<F, T> {
34 SourceFuture {
35 create_source: Some(create_source),
36 source: None,
37 }
38 }
39}
40
41impl<F, T> Unpin for SourceFuture<F, T> {}
42
43impl<F, T> Future for SourceFuture<F, T>
44where
45 F: FnOnce(oneshot::Sender<T>) -> Source + 'static,
46{
47 type Output = T;
48
49 fn poll(mut self: pin::Pin<&mut Self>, ctx: &mut task::Context) -> Poll<T> {
50 let SourceFuture {
51 ref mut create_source,
52 ref mut source,
53 ..
54 } = *self;
55
56 if let Some(create_source) = create_source.take() {
57 let main_context = MainContext::ref_thread_default();
58 assert!(
59 main_context.is_owner(),
60 "Spawning futures only allowed if the thread is owning the MainContext"
61 );
62
63 let (send, recv) = oneshot::channel();
70
71 let s = create_source(send);
72
73 s.attach(Some(&main_context));
74 *source = Some((s, recv));
75 }
76
77 let res = {
79 let &mut (_, ref mut receiver) = source.as_mut().unwrap();
80 Pin::new(receiver).poll(ctx)
81 };
82 #[allow(clippy::match_wild_err_arm)]
83 match res {
84 Poll::Ready(Err(_)) => panic!("Source sender was unexpectedly closed"),
85 Poll::Ready(Ok(v)) => {
86 let _ = source.take();
88 Poll::Ready(v)
89 }
90 Poll::Pending => Poll::Pending,
91 }
92 }
93}
94
95impl<F, T> FusedFuture for SourceFuture<F, T>
96where
97 F: FnOnce(oneshot::Sender<T>) -> Source + 'static,
98{
99 fn is_terminated(&self) -> bool {
100 self.create_source.is_none()
101 && self
102 .source
103 .as_ref()
104 .is_none_or(|(_, receiver)| receiver.is_terminated())
105 }
106}
107
108impl<T, F> Drop for SourceFuture<T, F> {
109 fn drop(&mut self) {
110 if let Some((source, _)) = self.source.take() {
112 source.destroy();
113 }
114 }
115}
116
117pub fn timeout_future(value: Duration) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
122 timeout_future_with_priority(crate::Priority::default(), value)
123}
124
125pub fn timeout_future_with_priority(
130 priority: Priority,
131 value: Duration,
132) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
133 Box::pin(SourceFuture::new(move |send| {
134 let mut send = Some(send);
135 crate::timeout_source_new(value, None, priority, move || {
136 let _ = send.take().unwrap().send(());
137 ControlFlow::Break
138 })
139 }))
140}
141
142pub fn timeout_future_seconds(value: u32) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
147 timeout_future_seconds_with_priority(crate::Priority::default(), value)
148}
149
150pub fn timeout_future_seconds_with_priority(
155 priority: Priority,
156 value: u32,
157) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
158 Box::pin(SourceFuture::new(move |send| {
159 let mut send = Some(send);
160 crate::timeout_source_new_seconds(value, None, priority, move || {
161 let _ = send.take().unwrap().send(());
162 ControlFlow::Break
163 })
164 }))
165}
166
167pub fn child_watch_future(
174 pid: crate::Pid,
175) -> Pin<Box<dyn Future<Output = (crate::Pid, i32)> + Send + 'static>> {
176 child_watch_future_with_priority(crate::Priority::default(), pid)
177}
178
179pub fn child_watch_future_with_priority(
186 priority: Priority,
187 pid: crate::Pid,
188) -> Pin<Box<dyn Future<Output = (crate::Pid, i32)> + Send + 'static>> {
189 Box::pin(SourceFuture::new(move |send| {
190 let mut send = Some(send);
191 crate::child_watch_source_new(pid, None, priority, move |pid, code| {
192 let _ = send.take().unwrap().send((pid, code));
193 })
194 }))
195}
196
197pub struct SourceStream<F, T> {
201 create_source: Option<F>,
202 source: Option<(Source, mpsc::UnboundedReceiver<T>)>,
203}
204
205impl<F, T> Unpin for SourceStream<F, T> {}
206
207impl<F, T: 'static> SourceStream<F, T>
208where
209 F: FnOnce(mpsc::UnboundedSender<T>) -> Source + 'static,
210{
211 pub fn new(create_source: F) -> SourceStream<F, T> {
218 SourceStream {
219 create_source: Some(create_source),
220 source: None,
221 }
222 }
223}
224
225impl<F, T> Stream for SourceStream<F, T>
226where
227 F: FnOnce(mpsc::UnboundedSender<T>) -> Source + 'static,
228{
229 type Item = T;
230
231 fn poll_next(mut self: pin::Pin<&mut Self>, ctx: &mut task::Context) -> Poll<Option<T>> {
232 let SourceStream {
233 ref mut create_source,
234 ref mut source,
235 ..
236 } = *self;
237
238 if let Some(create_source) = create_source.take() {
239 let main_context = MainContext::ref_thread_default();
240 assert!(
241 main_context.is_owner(),
242 "Spawning futures only allowed if the thread is owning the MainContext"
243 );
244
245 let (send, recv) = mpsc::unbounded();
252
253 let s = create_source(send);
254
255 s.attach(Some(&main_context));
256 *source = Some((s, recv));
257 }
258
259 let res = {
261 let &mut (_, ref mut receiver) = source.as_mut().unwrap();
262 Pin::new(receiver).poll_next(ctx)
263 };
264 #[allow(clippy::match_wild_err_arm)]
265 match res {
266 Poll::Ready(v) => {
267 if v.is_none() {
268 let _ = source.take();
270 }
271 Poll::Ready(v)
272 }
273 Poll::Pending => Poll::Pending,
274 }
275 }
276}
277
278impl<F, T> FusedStream for SourceStream<F, T>
279where
280 F: FnOnce(mpsc::UnboundedSender<T>) -> Source + 'static,
281{
282 fn is_terminated(&self) -> bool {
283 self.create_source.is_none()
284 && self
285 .source
286 .as_ref()
287 .is_none_or(|(_, receiver)| receiver.is_terminated())
288 }
289}
290
291impl<T, F> Drop for SourceStream<T, F> {
292 fn drop(&mut self) {
293 if let Some((source, _)) = self.source.take() {
295 source.destroy();
296 }
297 }
298}
299
300pub fn interval_stream(value: Duration) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>> {
305 interval_stream_with_priority(crate::Priority::default(), value)
306}
307
308pub fn interval_stream_with_priority(
313 priority: Priority,
314 value: Duration,
315) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>> {
316 Box::pin(SourceStream::new(move |send| {
317 crate::timeout_source_new(value, None, priority, move || {
318 if send.unbounded_send(()).is_err() {
319 ControlFlow::Break
320 } else {
321 ControlFlow::Continue
322 }
323 })
324 }))
325}
326
327pub fn interval_stream_seconds(value: u32) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>> {
332 interval_stream_seconds_with_priority(crate::Priority::default(), value)
333}
334
335pub fn interval_stream_seconds_with_priority(
340 priority: Priority,
341 value: u32,
342) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>> {
343 Box::pin(SourceStream::new(move |send| {
344 crate::timeout_source_new_seconds(value, None, priority, move || {
345 if send.unbounded_send(()).is_err() {
346 ControlFlow::Break
347 } else {
348 ControlFlow::Continue
349 }
350 })
351 }))
352}
353
354#[cfg(test)]
355mod tests {
356 use std::{thread, time::Duration};
357
358 use futures_util::{future::FutureExt, stream::StreamExt};
359
360 use super::*;
361
362 #[test]
363 fn test_timeout() {
364 let c = MainContext::new();
365
366 c.block_on(timeout_future(Duration::from_millis(20)));
367 }
368
369 #[test]
370 fn test_timeout_send() {
371 let c = MainContext::new();
372 let l = crate::MainLoop::new(Some(&c), false);
373
374 let l_clone = l.clone();
375 c.spawn(timeout_future(Duration::from_millis(20)).then(move |()| {
376 l_clone.quit();
377 futures_util::future::ready(())
378 }));
379
380 l.run();
381 }
382
383 #[test]
384 fn test_interval() {
385 let c = MainContext::new();
386
387 let mut count = 0;
388
389 {
390 let count = &mut count;
391 c.block_on(
392 interval_stream(Duration::from_millis(20))
393 .take(2)
394 .for_each(|()| {
395 *count += 1;
396
397 futures_util::future::ready(())
398 })
399 .map(|_| ()),
400 );
401 }
402
403 assert_eq!(count, 2);
404 }
405
406 #[test]
407 fn test_timeout_and_channel() {
408 let c = MainContext::new();
409
410 let res = c.block_on(timeout_future(Duration::from_millis(20)).then(|()| {
411 let (sender, receiver) = oneshot::channel();
412
413 thread::spawn(move || {
414 sender.send(1).unwrap();
415 });
416
417 receiver.then(|i| futures_util::future::ready(i.unwrap()))
418 }));
419
420 assert_eq!(res, 1);
421 }
422}