gio/
datagram_based.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{cell::RefCell, mem::transmute, pin::Pin, ptr, time::Duration};
4
5use futures_core::stream::Stream;
6use glib::{prelude::*, translate::*};
7
8use crate::{ffi, Cancellable, DatagramBased, InputMessage, OutputMessage};
9
10mod sealed {
11    pub trait Sealed {}
12    impl<T: super::IsA<super::DatagramBased>> Sealed for T {}
13}
14
15pub trait DatagramBasedExtManual: sealed::Sealed + IsA<DatagramBased> + Sized {
16    /// Creates a #GSource that can be attached to a #GMainContext to monitor for
17    /// the availability of the specified @condition on the #GDatagramBased. The
18    /// #GSource keeps a reference to the @self.
19    ///
20    /// The callback on the source is of the #GDatagramBasedSourceFunc type.
21    ///
22    /// It is meaningless to specify [`glib::IOCondition::ERR`][crate::glib::IOCondition::ERR] or [`glib::IOCondition::HUP`][crate::glib::IOCondition::HUP] in @condition; these
23    /// conditions will always be reported in the callback if they are true.
24    ///
25    /// If non-[`None`], @cancellable can be used to cancel the source, which will
26    /// cause the source to trigger, reporting the current condition (which is
27    /// likely 0 unless cancellation happened at the same time as a condition
28    /// change). You can check for this in the callback using
29    /// g_cancellable_is_cancelled().
30    /// ## `condition`
31    /// a #GIOCondition mask to monitor
32    /// ## `cancellable`
33    /// a #GCancellable
34    ///
35    /// # Returns
36    ///
37    /// a newly allocated #GSource
38    #[doc(alias = "g_datagram_based_create_source")]
39    fn create_source<F, C>(
40        &self,
41        condition: glib::IOCondition,
42        cancellable: Option<&C>,
43        name: Option<&str>,
44        priority: glib::Priority,
45        func: F,
46    ) -> glib::Source
47    where
48        F: FnMut(&Self, glib::IOCondition) -> glib::ControlFlow + 'static,
49        C: IsA<Cancellable>,
50    {
51        unsafe extern "C" fn trampoline<
52            O: IsA<DatagramBased>,
53            F: FnMut(&O, glib::IOCondition) -> glib::ControlFlow + 'static,
54        >(
55            datagram_based: *mut ffi::GDatagramBased,
56            condition: glib::ffi::GIOCondition,
57            func: glib::ffi::gpointer,
58        ) -> glib::ffi::gboolean {
59            let func: &RefCell<F> = &*(func as *const RefCell<F>);
60            let mut func = func.borrow_mut();
61            (*func)(
62                DatagramBased::from_glib_borrow(datagram_based).unsafe_cast_ref(),
63                from_glib(condition),
64            )
65            .into_glib()
66        }
67        unsafe extern "C" fn destroy_closure<F>(ptr: glib::ffi::gpointer) {
68            let _ = Box::<RefCell<F>>::from_raw(ptr as *mut _);
69        }
70        let cancellable = cancellable.map(|c| c.as_ref());
71        let gcancellable = cancellable.to_glib_none();
72        unsafe {
73            let source = ffi::g_datagram_based_create_source(
74                self.as_ref().to_glib_none().0,
75                condition.into_glib(),
76                gcancellable.0,
77            );
78            let trampoline = trampoline::<Self, F> as glib::ffi::gpointer;
79            glib::ffi::g_source_set_callback(
80                source,
81                Some(transmute::<
82                    glib::ffi::gpointer,
83                    unsafe extern "C" fn(glib::ffi::gpointer) -> glib::ffi::gboolean,
84                >(trampoline)),
85                Box::into_raw(Box::new(RefCell::new(func))) as glib::ffi::gpointer,
86                Some(destroy_closure::<F>),
87            );
88            glib::ffi::g_source_set_priority(source, priority.into_glib());
89
90            if let Some(name) = name {
91                glib::ffi::g_source_set_name(source, name.to_glib_none().0);
92            }
93
94            from_glib_full(source)
95        }
96    }
97
98    fn create_source_future<C: IsA<Cancellable>>(
99        &self,
100        condition: glib::IOCondition,
101        cancellable: Option<&C>,
102        priority: glib::Priority,
103    ) -> Pin<Box<dyn std::future::Future<Output = glib::IOCondition> + 'static>> {
104        let cancellable: Option<Cancellable> = cancellable.map(|c| c.as_ref()).cloned();
105
106        let obj = self.clone();
107        Box::pin(glib::SourceFuture::new(move |send| {
108            let mut send = Some(send);
109            obj.create_source(
110                condition,
111                cancellable.as_ref(),
112                None,
113                priority,
114                move |_, condition| {
115                    let _ = send.take().unwrap().send(condition);
116                    glib::ControlFlow::Break
117                },
118            )
119        }))
120    }
121
122    fn create_source_stream<C: IsA<Cancellable>>(
123        &self,
124        condition: glib::IOCondition,
125        cancellable: Option<&C>,
126        priority: glib::Priority,
127    ) -> Pin<Box<dyn Stream<Item = glib::IOCondition> + 'static>> {
128        let cancellable: Option<Cancellable> = cancellable.map(|c| c.as_ref()).cloned();
129
130        let obj = self.clone();
131        Box::pin(glib::SourceStream::new(move |send| {
132            let send = Some(send);
133            obj.create_source(
134                condition,
135                cancellable.as_ref(),
136                None,
137                priority,
138                move |_, condition| {
139                    if send.as_ref().unwrap().unbounded_send(condition).is_err() {
140                        glib::ControlFlow::Break
141                    } else {
142                        glib::ControlFlow::Continue
143                    }
144                },
145            )
146        }))
147    }
148
149    /// Waits for up to @timeout microseconds for condition to become true on
150    /// @self. If the condition is met, [`true`] is returned.
151    ///
152    /// If @cancellable is cancelled before the condition is met, or if @timeout is
153    /// reached before the condition is met, then [`false`] is returned and @error is
154    /// set appropriately ([`IOErrorEnum::Cancelled`][crate::IOErrorEnum::Cancelled] or [`IOErrorEnum::TimedOut`][crate::IOErrorEnum::TimedOut]).
155    /// ## `condition`
156    /// a #GIOCondition mask to wait for
157    /// ## `timeout`
158    /// the maximum time (in microseconds) to wait, 0 to not block, or -1
159    ///   to block indefinitely
160    /// ## `cancellable`
161    /// a #GCancellable
162    ///
163    /// # Returns
164    ///
165    /// [`true`] if the condition was met, [`false`] otherwise
166    #[doc(alias = "g_datagram_based_condition_wait")]
167    fn condition_wait(
168        &self,
169        condition: glib::IOCondition,
170        timeout: Option<Duration>,
171        cancellable: Option<&impl IsA<Cancellable>>,
172    ) -> Result<(), glib::Error> {
173        unsafe {
174            let mut error = ptr::null_mut();
175            let is_ok = ffi::g_datagram_based_condition_wait(
176                self.as_ref().to_glib_none().0,
177                condition.into_glib(),
178                timeout
179                    .map(|t| t.as_micros().try_into().unwrap())
180                    .unwrap_or(-1),
181                cancellable.map(|p| p.as_ref()).to_glib_none().0,
182                &mut error,
183            );
184            debug_assert_eq!(is_ok == glib::ffi::GFALSE, !error.is_null());
185            if error.is_null() {
186                Ok(())
187            } else {
188                Err(from_glib_full(error))
189            }
190        }
191    }
192
193    /// Receive one or more data messages from @self in one go.
194    ///
195    /// @messages must point to an array of #GInputMessage structs and
196    /// @num_messages must be the length of this array. Each #GInputMessage
197    /// contains a pointer to an array of #GInputVector structs describing the
198    /// buffers that the data received in each message will be written to.
199    ///
200    /// @flags modify how all messages are received. The commonly available
201    /// arguments for this are available in the #GSocketMsgFlags enum, but the
202    /// values there are the same as the system values, and the flags
203    /// are passed in as-is, so you can pass in system-specific flags too. These
204    /// flags affect the overall receive operation. Flags affecting individual
205    /// messages are returned in #GInputMessage.flags.
206    ///
207    /// The other members of #GInputMessage are treated as described in its
208    /// documentation.
209    ///
210    /// If @timeout is negative the call will block until @num_messages have been
211    /// received, the connection is closed remotely (EOS), @cancellable is cancelled,
212    /// or an error occurs.
213    ///
214    /// If @timeout is 0 the call will return up to @num_messages without blocking,
215    /// or [`IOErrorEnum::WouldBlock`][crate::IOErrorEnum::WouldBlock] if no messages are queued in the operating system
216    /// to be received.
217    ///
218    /// If @timeout is positive the call will block on the same conditions as if
219    /// @timeout were negative. If the timeout is reached
220    /// before any messages are received, [`IOErrorEnum::TimedOut`][crate::IOErrorEnum::TimedOut] is returned,
221    /// otherwise it will return the number of messages received before timing out.
222    /// (Note: This is effectively the behaviour of `MSG_WAITFORONE` with
223    /// recvmmsg().)
224    ///
225    /// To be notified when messages are available, wait for the [`glib::IOCondition::IN`][crate::glib::IOCondition::IN] condition.
226    /// Note though that you may still receive [`IOErrorEnum::WouldBlock`][crate::IOErrorEnum::WouldBlock] from
227    /// g_datagram_based_receive_messages() even if you were previously notified of a
228    /// [`glib::IOCondition::IN`][crate::glib::IOCondition::IN] condition.
229    ///
230    /// If the remote peer closes the connection, any messages queued in the
231    /// underlying receive buffer will be returned, and subsequent calls to
232    /// g_datagram_based_receive_messages() will return 0 (with no error set).
233    ///
234    /// If the connection is shut down or closed (by calling g_socket_close() or
235    /// g_socket_shutdown() with @shutdown_read set, if it’s a #GSocket, for
236    /// example), all calls to this function will return [`IOErrorEnum::Closed`][crate::IOErrorEnum::Closed].
237    ///
238    /// On error -1 is returned and @error is set accordingly. An error will only
239    /// be returned if zero messages could be received; otherwise the number of
240    /// messages successfully received before the error will be returned. If
241    /// @cancellable is cancelled, [`IOErrorEnum::Cancelled`][crate::IOErrorEnum::Cancelled] is returned as with any
242    /// other error.
243    /// ## `messages`
244    /// an array of #GInputMessage structs
245    /// ## `flags`
246    /// an int containing #GSocketMsgFlags flags for the overall operation
247    /// ## `timeout`
248    /// the maximum time (in microseconds) to wait, 0 to not block, or -1
249    ///   to block indefinitely
250    /// ## `cancellable`
251    /// a `GCancellable`
252    ///
253    /// # Returns
254    ///
255    /// number of messages received, or -1 on error. Note that the number
256    ///     of messages received may be smaller than @num_messages if @timeout is
257    ///     zero or positive, if the peer closed the connection, or if @num_messages
258    ///     was larger than `UIO_MAXIOV` (1024), in which case the caller may re-try
259    ///     to receive the remaining messages.
260    #[doc(alias = "g_datagram_based_receive_messages")]
261    fn receive_messages<'v, V: IntoIterator<Item = &'v mut [&'v mut [u8]]>, C: IsA<Cancellable>>(
262        &self,
263        messages: &mut [InputMessage],
264        flags: i32,
265        timeout: Option<Duration>,
266        cancellable: Option<&C>,
267    ) -> Result<usize, glib::Error> {
268        let cancellable = cancellable.map(|c| c.as_ref());
269        unsafe {
270            let mut error = ptr::null_mut();
271
272            let count = ffi::g_datagram_based_receive_messages(
273                self.as_ref().to_glib_none().0,
274                messages.as_mut_ptr() as *mut _,
275                messages.len().try_into().unwrap(),
276                flags,
277                timeout
278                    .map(|t| t.as_micros().try_into().unwrap())
279                    .unwrap_or(-1),
280                cancellable.to_glib_none().0,
281                &mut error,
282            );
283            if error.is_null() {
284                Ok(count as usize)
285            } else {
286                Err(from_glib_full(error))
287            }
288        }
289    }
290
291    /// Send one or more data messages from @self in one go.
292    ///
293    /// @messages must point to an array of #GOutputMessage structs and
294    /// @num_messages must be the length of this array. Each #GOutputMessage
295    /// contains an address to send the data to, and a pointer to an array of
296    /// #GOutputVector structs to describe the buffers that the data to be sent
297    /// for each message will be gathered from.
298    ///
299    /// @flags modify how the message is sent. The commonly available arguments
300    /// for this are available in the #GSocketMsgFlags enum, but the
301    /// values there are the same as the system values, and the flags
302    /// are passed in as-is, so you can pass in system-specific flags too.
303    ///
304    /// The other members of #GOutputMessage are treated as described in its
305    /// documentation.
306    ///
307    /// If @timeout is negative the call will block until @num_messages have been
308    /// sent, @cancellable is cancelled, or an error occurs.
309    ///
310    /// If @timeout is 0 the call will send up to @num_messages without blocking,
311    /// or will return [`IOErrorEnum::WouldBlock`][crate::IOErrorEnum::WouldBlock] if there is no space to send messages.
312    ///
313    /// If @timeout is positive the call will block on the same conditions as if
314    /// @timeout were negative. If the timeout is reached before any messages are
315    /// sent, [`IOErrorEnum::TimedOut`][crate::IOErrorEnum::TimedOut] is returned, otherwise it will return the number
316    /// of messages sent before timing out.
317    ///
318    /// To be notified when messages can be sent, wait for the [`glib::IOCondition::OUT`][crate::glib::IOCondition::OUT] condition.
319    /// Note though that you may still receive [`IOErrorEnum::WouldBlock`][crate::IOErrorEnum::WouldBlock] from
320    /// g_datagram_based_send_messages() even if you were previously notified of a
321    /// [`glib::IOCondition::OUT`][crate::glib::IOCondition::OUT] condition. (On Windows in particular, this is very common due to
322    /// the way the underlying APIs work.)
323    ///
324    /// If the connection is shut down or closed (by calling g_socket_close() or
325    /// g_socket_shutdown() with @shutdown_write set, if it’s a #GSocket, for
326    /// example), all calls to this function will return [`IOErrorEnum::Closed`][crate::IOErrorEnum::Closed].
327    ///
328    /// On error -1 is returned and @error is set accordingly. An error will only
329    /// be returned if zero messages could be sent; otherwise the number of messages
330    /// successfully sent before the error will be returned. If @cancellable is
331    /// cancelled, [`IOErrorEnum::Cancelled`][crate::IOErrorEnum::Cancelled] is returned as with any other error.
332    /// ## `messages`
333    /// an array of #GOutputMessage structs
334    /// ## `flags`
335    /// an int containing #GSocketMsgFlags flags
336    /// ## `timeout`
337    /// the maximum time (in microseconds) to wait, 0 to not block, or -1
338    ///   to block indefinitely
339    /// ## `cancellable`
340    /// a `GCancellable`
341    ///
342    /// # Returns
343    ///
344    /// number of messages sent, or -1 on error. Note that the number of
345    ///     messages sent may be smaller than @num_messages if @timeout is zero
346    ///     or positive, or if @num_messages was larger than `UIO_MAXIOV` (1024), in
347    ///     which case the caller may re-try to send the remaining messages.
348    #[doc(alias = "g_datagram_based_send_messages")]
349    fn send_messages<C: IsA<Cancellable>>(
350        &self,
351        messages: &mut [OutputMessage],
352        flags: i32,
353        timeout: Option<Duration>,
354        cancellable: Option<&C>,
355    ) -> Result<usize, glib::Error> {
356        let cancellable = cancellable.map(|c| c.as_ref());
357        unsafe {
358            let mut error = ptr::null_mut();
359            let count = ffi::g_datagram_based_send_messages(
360                self.as_ref().to_glib_none().0,
361                messages.as_mut_ptr() as *mut _,
362                messages.len().try_into().unwrap(),
363                flags,
364                timeout
365                    .map(|t| t.as_micros().try_into().unwrap())
366                    .unwrap_or(-1),
367                cancellable.to_glib_none().0,
368                &mut error,
369            );
370            if error.is_null() {
371                Ok(count as usize)
372            } else {
373                Err(from_glib_full(error))
374            }
375        }
376    }
377}
378
379impl<O: IsA<DatagramBased>> DatagramBasedExtManual for O {}