1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// Take a look at the license at the top of the repository in the LICENSE file.

use std::{pin::Pin, task::ready};

use futures_core::{
    stream::Stream,
    task::{Context, Poll},
    Future,
};

use crate::{prelude::SocketListenerExt, SocketConnection, SocketListener};
use glib::{prelude::*, Error, Object};

pub struct Incoming {
    listener: SocketListener,
    fut: Option<Pin<Box<dyn Future<Output = Result<(SocketConnection, Option<Object>), Error>>>>>,
}

impl Stream for Incoming {
    type Item = Result<(SocketConnection, Option<Object>), Error>;

    fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        if self.fut.is_none() {
            self.fut = Some(self.listener.accept_future());
        }

        let fut = self.fut.as_mut().unwrap();
        let res = ready!(Pin::new(fut).poll(ctx));
        self.fut.take();

        Poll::Ready(Some(res))
    }
}

pub trait SocketListenerExtManual: SocketListenerExt {
    // rustdoc-stripper-ignore-next
    /// Returns a stream of incoming connections
    ///
    /// Iterating over this stream is equivalent to calling [`SocketListenerExt::accept_future`] in a
    /// loop. The stream of connections is infinite, i.e awaiting the next
    /// connection will never result in [`None`].
    fn incoming(
        &self,
    ) -> Pin<Box<dyn Stream<Item = Result<(SocketConnection, Option<Object>), Error>>>>;
}

impl<O: IsA<SocketListener>> SocketListenerExtManual for O {
    fn incoming(
        &self,
    ) -> Pin<Box<dyn Stream<Item = Result<(SocketConnection, Option<Object>), Error>>>> {
        Box::pin(Incoming {
            listener: self.as_ref().clone(),
            fut: None,
        })
    }
}