gio/
socket_listener.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{pin::Pin, task::ready};
4
5use futures_core::{
6    stream::Stream,
7    task::{Context, Poll},
8    Future,
9};
10
11use crate::{prelude::SocketListenerExt, SocketConnection, SocketListener};
12use glib::{prelude::*, Error, Object};
13
14pub struct Incoming {
15    listener: SocketListener,
16    fut: Option<Pin<Box<dyn Future<Output = Result<(SocketConnection, Option<Object>), Error>>>>>,
17}
18
19impl Stream for Incoming {
20    type Item = Result<(SocketConnection, Option<Object>), Error>;
21
22    fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
23        if self.fut.is_none() {
24            self.fut = Some(self.listener.accept_future());
25        }
26
27        let fut = self.fut.as_mut().unwrap();
28        let res = ready!(Pin::new(fut).poll(ctx));
29        self.fut.take();
30
31        Poll::Ready(Some(res))
32    }
33}
34
35pub trait SocketListenerExtManual: SocketListenerExt {
36    // rustdoc-stripper-ignore-next
37    /// Returns a stream of incoming connections
38    ///
39    /// Iterating over this stream is equivalent to calling [`SocketListenerExt::accept_future`] in a
40    /// loop. The stream of connections is infinite, i.e awaiting the next
41    /// connection will never result in [`None`].
42    fn incoming(
43        &self,
44    ) -> Pin<Box<dyn Stream<Item = Result<(SocketConnection, Option<Object>), Error>>>>;
45}
46
47impl<O: IsA<SocketListener>> SocketListenerExtManual for O {
48    fn incoming(
49        &self,
50    ) -> Pin<Box<dyn Stream<Item = Result<(SocketConnection, Option<Object>), Error>>>> {
51        Box::pin(Incoming {
52            listener: self.as_ref().clone(),
53            fut: None,
54        })
55    }
56}