Skip to main content

gio/
datagram_based.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{cell::RefCell, mem::transmute, pin::Pin, ptr, time::Duration};
4
5use futures_core::stream::Stream;
6use glib::{prelude::*, translate::*};
7
8use crate::{Cancellable, DatagramBased, InputMessage, OutputMessage, ffi};
9
10pub trait DatagramBasedExtManual: IsA<DatagramBased> + Sized {
11    /// Creates a #GSource that can be attached to a #GMainContext to monitor for
12    /// the availability of the specified @condition on the #GDatagramBased. The
13    /// #GSource keeps a reference to the @self.
14    ///
15    /// The callback on the source is of the #GDatagramBasedSourceFunc type.
16    ///
17    /// It is meaningless to specify [`glib::IOCondition::ERR`][crate::glib::IOCondition::ERR] or [`glib::IOCondition::HUP`][crate::glib::IOCondition::HUP] in @condition; these
18    /// conditions will always be reported in the callback if they are true.
19    ///
20    /// If non-[`None`], @cancellable can be used to cancel the source, which will
21    /// cause the source to trigger, reporting the current condition (which is
22    /// likely 0 unless cancellation happened at the same time as a condition
23    /// change). You can check for this in the callback using
24    /// g_cancellable_is_cancelled().
25    /// ## `condition`
26    /// a #GIOCondition mask to monitor
27    /// ## `cancellable`
28    /// a #GCancellable
29    ///
30    /// # Returns
31    ///
32    /// a newly allocated #GSource
33    #[doc(alias = "g_datagram_based_create_source")]
34    fn create_source<F, C>(
35        &self,
36        condition: glib::IOCondition,
37        cancellable: Option<&C>,
38        name: Option<&str>,
39        priority: glib::Priority,
40        func: F,
41    ) -> glib::Source
42    where
43        F: FnMut(&Self, glib::IOCondition) -> glib::ControlFlow + 'static,
44        C: IsA<Cancellable>,
45    {
46        unsafe extern "C" fn trampoline<
47            O: IsA<DatagramBased>,
48            F: FnMut(&O, glib::IOCondition) -> glib::ControlFlow + 'static,
49        >(
50            datagram_based: *mut ffi::GDatagramBased,
51            condition: glib::ffi::GIOCondition,
52            func: glib::ffi::gpointer,
53        ) -> glib::ffi::gboolean {
54            unsafe {
55                let func: &RefCell<F> = &*(func as *const RefCell<F>);
56                let mut func = func.borrow_mut();
57                (*func)(
58                    DatagramBased::from_glib_borrow(datagram_based).unsafe_cast_ref(),
59                    from_glib(condition),
60                )
61                .into_glib()
62            }
63        }
64        unsafe extern "C" fn destroy_closure<F>(ptr: glib::ffi::gpointer) {
65            unsafe {
66                let _ = Box::<RefCell<F>>::from_raw(ptr as *mut _);
67            }
68        }
69        let cancellable = cancellable.map(|c| c.as_ref());
70        let gcancellable = cancellable.to_glib_none();
71        unsafe {
72            let source = ffi::g_datagram_based_create_source(
73                self.as_ref().to_glib_none().0,
74                condition.into_glib(),
75                gcancellable.0,
76            );
77            let trampoline = trampoline::<Self, F> as glib::ffi::gpointer;
78            glib::ffi::g_source_set_callback(
79                source,
80                Some(transmute::<
81                    glib::ffi::gpointer,
82                    unsafe extern "C" fn(glib::ffi::gpointer) -> glib::ffi::gboolean,
83                >(trampoline)),
84                Box::into_raw(Box::new(RefCell::new(func))) as glib::ffi::gpointer,
85                Some(destroy_closure::<F>),
86            );
87            glib::ffi::g_source_set_priority(source, priority.into_glib());
88
89            if let Some(name) = name {
90                glib::ffi::g_source_set_name(source, name.to_glib_none().0);
91            }
92
93            from_glib_full(source)
94        }
95    }
96
97    fn create_source_future<C: IsA<Cancellable>>(
98        &self,
99        condition: glib::IOCondition,
100        cancellable: Option<&C>,
101        priority: glib::Priority,
102    ) -> Pin<Box<dyn std::future::Future<Output = glib::IOCondition> + 'static>> {
103        let cancellable: Option<Cancellable> = cancellable.map(|c| c.as_ref()).cloned();
104
105        let obj = self.clone();
106        Box::pin(glib::SourceFuture::new(move |send| {
107            let mut send = Some(send);
108            obj.create_source(
109                condition,
110                cancellable.as_ref(),
111                None,
112                priority,
113                move |_, condition| {
114                    let _ = send.take().unwrap().send(condition);
115                    glib::ControlFlow::Break
116                },
117            )
118        }))
119    }
120
121    fn create_source_stream<C: IsA<Cancellable>>(
122        &self,
123        condition: glib::IOCondition,
124        cancellable: Option<&C>,
125        priority: glib::Priority,
126    ) -> Pin<Box<dyn Stream<Item = glib::IOCondition> + 'static>> {
127        let cancellable: Option<Cancellable> = cancellable.map(|c| c.as_ref()).cloned();
128
129        let obj = self.clone();
130        Box::pin(glib::SourceStream::new(move |send| {
131            let send = Some(send);
132            obj.create_source(
133                condition,
134                cancellable.as_ref(),
135                None,
136                priority,
137                move |_, condition| {
138                    if send.as_ref().unwrap().unbounded_send(condition).is_err() {
139                        glib::ControlFlow::Break
140                    } else {
141                        glib::ControlFlow::Continue
142                    }
143                },
144            )
145        }))
146    }
147
148    /// Waits for up to @timeout microseconds for condition to become true on
149    /// @self. If the condition is met, [`true`] is returned.
150    ///
151    /// If @cancellable is cancelled before the condition is met, or if @timeout is
152    /// reached before the condition is met, then [`false`] is returned and @error is
153    /// set appropriately ([`IOErrorEnum::Cancelled`][crate::IOErrorEnum::Cancelled] or [`IOErrorEnum::TimedOut`][crate::IOErrorEnum::TimedOut]).
154    /// ## `condition`
155    /// a #GIOCondition mask to wait for
156    /// ## `timeout`
157    /// the maximum time (in microseconds) to wait, 0 to not block, or -1
158    ///   to block indefinitely
159    /// ## `cancellable`
160    /// a #GCancellable
161    ///
162    /// # Returns
163    ///
164    /// [`true`] if the condition was met, [`false`] otherwise
165    #[doc(alias = "g_datagram_based_condition_wait")]
166    fn condition_wait(
167        &self,
168        condition: glib::IOCondition,
169        timeout: Option<Duration>,
170        cancellable: Option<&impl IsA<Cancellable>>,
171    ) -> Result<(), glib::Error> {
172        unsafe {
173            let mut error = ptr::null_mut();
174            let is_ok = ffi::g_datagram_based_condition_wait(
175                self.as_ref().to_glib_none().0,
176                condition.into_glib(),
177                timeout
178                    .map(|t| t.as_micros().try_into().unwrap())
179                    .unwrap_or(-1),
180                cancellable.map(|p| p.as_ref()).to_glib_none().0,
181                &mut error,
182            );
183            debug_assert_eq!(is_ok == glib::ffi::GFALSE, !error.is_null());
184            if error.is_null() {
185                Ok(())
186            } else {
187                Err(from_glib_full(error))
188            }
189        }
190    }
191
192    /// Receive one or more data messages from @self in one go.
193    ///
194    /// @messages must point to an array of #GInputMessage structs and
195    /// @num_messages must be the length of this array. Each #GInputMessage
196    /// contains a pointer to an array of #GInputVector structs describing the
197    /// buffers that the data received in each message will be written to.
198    ///
199    /// @flags modify how all messages are received. The commonly available
200    /// arguments for this are available in the #GSocketMsgFlags enum, but the
201    /// values there are the same as the system values, and the flags
202    /// are passed in as-is, so you can pass in system-specific flags too. These
203    /// flags affect the overall receive operation. Flags affecting individual
204    /// messages are returned in #GInputMessage.flags.
205    ///
206    /// The other members of #GInputMessage are treated as described in its
207    /// documentation.
208    ///
209    /// If @timeout is negative the call will block until @num_messages have been
210    /// received, the connection is closed remotely (EOS), @cancellable is cancelled,
211    /// or an error occurs.
212    ///
213    /// If @timeout is 0 the call will return up to @num_messages without blocking,
214    /// or [`IOErrorEnum::WouldBlock`][crate::IOErrorEnum::WouldBlock] if no messages are queued in the operating system
215    /// to be received.
216    ///
217    /// If @timeout is positive the call will block on the same conditions as if
218    /// @timeout were negative. If the timeout is reached
219    /// before any messages are received, [`IOErrorEnum::TimedOut`][crate::IOErrorEnum::TimedOut] is returned,
220    /// otherwise it will return the number of messages received before timing out.
221    /// (Note: This is effectively the behaviour of `MSG_WAITFORONE` with
222    /// recvmmsg().)
223    ///
224    /// To be notified when messages are available, wait for the [`glib::IOCondition::IN`][crate::glib::IOCondition::IN] condition.
225    /// Note though that you may still receive [`IOErrorEnum::WouldBlock`][crate::IOErrorEnum::WouldBlock] from
226    /// g_datagram_based_receive_messages() even if you were previously notified of a
227    /// [`glib::IOCondition::IN`][crate::glib::IOCondition::IN] condition.
228    ///
229    /// If the remote peer closes the connection, any messages queued in the
230    /// underlying receive buffer will be returned, and subsequent calls to
231    /// g_datagram_based_receive_messages() will return 0 (with no error set).
232    ///
233    /// If the connection is shut down or closed (by calling g_socket_close() or
234    /// g_socket_shutdown() with @shutdown_read set, if it’s a #GSocket, for
235    /// example), all calls to this function will return [`IOErrorEnum::Closed`][crate::IOErrorEnum::Closed].
236    ///
237    /// On error -1 is returned and @error is set accordingly. An error will only
238    /// be returned if zero messages could be received; otherwise the number of
239    /// messages successfully received before the error will be returned. If
240    /// @cancellable is cancelled, [`IOErrorEnum::Cancelled`][crate::IOErrorEnum::Cancelled] is returned as with any
241    /// other error.
242    /// ## `messages`
243    /// an array of #GInputMessage structs
244    /// ## `flags`
245    /// an int containing #GSocketMsgFlags flags for the overall operation
246    /// ## `timeout`
247    /// the maximum time (in microseconds) to wait, 0 to not block, or -1
248    ///   to block indefinitely
249    /// ## `cancellable`
250    /// a `GCancellable`
251    ///
252    /// # Returns
253    ///
254    /// number of messages received, or -1 on error. Note that the number
255    ///     of messages received may be smaller than @num_messages if @timeout is
256    ///     zero or positive, if the peer closed the connection, or if @num_messages
257    ///     was larger than `UIO_MAXIOV` (1024), in which case the caller may re-try
258    ///     to receive the remaining messages.
259    #[doc(alias = "g_datagram_based_receive_messages")]
260    fn receive_messages<'v, V: IntoIterator<Item = &'v mut [&'v mut [u8]]>, C: IsA<Cancellable>>(
261        &self,
262        messages: &mut [InputMessage],
263        flags: i32,
264        timeout: Option<Duration>,
265        cancellable: Option<&C>,
266    ) -> Result<usize, glib::Error> {
267        let cancellable = cancellable.map(|c| c.as_ref());
268        unsafe {
269            let mut error = ptr::null_mut();
270
271            let count = ffi::g_datagram_based_receive_messages(
272                self.as_ref().to_glib_none().0,
273                messages.as_mut_ptr() as *mut _,
274                messages.len().try_into().unwrap(),
275                flags,
276                timeout
277                    .map(|t| t.as_micros().try_into().unwrap())
278                    .unwrap_or(-1),
279                cancellable.to_glib_none().0,
280                &mut error,
281            );
282            if error.is_null() {
283                Ok(count as usize)
284            } else {
285                Err(from_glib_full(error))
286            }
287        }
288    }
289
290    /// Send one or more data messages from @self in one go.
291    ///
292    /// @messages must point to an array of #GOutputMessage structs and
293    /// @num_messages must be the length of this array. Each #GOutputMessage
294    /// contains an address to send the data to, and a pointer to an array of
295    /// #GOutputVector structs to describe the buffers that the data to be sent
296    /// for each message will be gathered from.
297    ///
298    /// @flags modify how the message is sent. The commonly available arguments
299    /// for this are available in the #GSocketMsgFlags enum, but the
300    /// values there are the same as the system values, and the flags
301    /// are passed in as-is, so you can pass in system-specific flags too.
302    ///
303    /// The other members of #GOutputMessage are treated as described in its
304    /// documentation.
305    ///
306    /// If @timeout is negative the call will block until @num_messages have been
307    /// sent, @cancellable is cancelled, or an error occurs.
308    ///
309    /// If @timeout is 0 the call will send up to @num_messages without blocking,
310    /// or will return [`IOErrorEnum::WouldBlock`][crate::IOErrorEnum::WouldBlock] if there is no space to send messages.
311    ///
312    /// If @timeout is positive the call will block on the same conditions as if
313    /// @timeout were negative. If the timeout is reached before any messages are
314    /// sent, [`IOErrorEnum::TimedOut`][crate::IOErrorEnum::TimedOut] is returned, otherwise it will return the number
315    /// of messages sent before timing out.
316    ///
317    /// To be notified when messages can be sent, wait for the [`glib::IOCondition::OUT`][crate::glib::IOCondition::OUT] condition.
318    /// Note though that you may still receive [`IOErrorEnum::WouldBlock`][crate::IOErrorEnum::WouldBlock] from
319    /// g_datagram_based_send_messages() even if you were previously notified of a
320    /// [`glib::IOCondition::OUT`][crate::glib::IOCondition::OUT] condition. (On Windows in particular, this is very common due to
321    /// the way the underlying APIs work.)
322    ///
323    /// If the connection is shut down or closed (by calling g_socket_close() or
324    /// g_socket_shutdown() with @shutdown_write set, if it’s a #GSocket, for
325    /// example), all calls to this function will return [`IOErrorEnum::Closed`][crate::IOErrorEnum::Closed].
326    ///
327    /// On error -1 is returned and @error is set accordingly. An error will only
328    /// be returned if zero messages could be sent; otherwise the number of messages
329    /// successfully sent before the error will be returned. If @cancellable is
330    /// cancelled, [`IOErrorEnum::Cancelled`][crate::IOErrorEnum::Cancelled] is returned as with any other error.
331    /// ## `messages`
332    /// an array of #GOutputMessage structs
333    /// ## `flags`
334    /// an int containing #GSocketMsgFlags flags
335    /// ## `timeout`
336    /// the maximum time (in microseconds) to wait, 0 to not block, or -1
337    ///   to block indefinitely
338    /// ## `cancellable`
339    /// a `GCancellable`
340    ///
341    /// # Returns
342    ///
343    /// number of messages sent, or -1 on error. Note that the number of
344    ///     messages sent may be smaller than @num_messages if @timeout is zero
345    ///     or positive, or if @num_messages was larger than `UIO_MAXIOV` (1024), in
346    ///     which case the caller may re-try to send the remaining messages.
347    #[doc(alias = "g_datagram_based_send_messages")]
348    fn send_messages<C: IsA<Cancellable>>(
349        &self,
350        messages: &mut [OutputMessage],
351        flags: i32,
352        timeout: Option<Duration>,
353        cancellable: Option<&C>,
354    ) -> Result<usize, glib::Error> {
355        let cancellable = cancellable.map(|c| c.as_ref());
356        unsafe {
357            let mut error = ptr::null_mut();
358            let count = ffi::g_datagram_based_send_messages(
359                self.as_ref().to_glib_none().0,
360                messages.as_mut_ptr() as *mut _,
361                messages.len().try_into().unwrap(),
362                flags,
363                timeout
364                    .map(|t| t.as_micros().try_into().unwrap())
365                    .unwrap_or(-1),
366                cancellable.to_glib_none().0,
367                &mut error,
368            );
369            if error.is_null() {
370                Ok(count as usize)
371            } else {
372                Err(from_glib_full(error))
373            }
374        }
375    }
376}
377
378impl<O: IsA<DatagramBased>> DatagramBasedExtManual for O {}