gio/datagram_based.rs
1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{cell::RefCell, mem::transmute, pin::Pin, ptr, time::Duration};
4
5use futures_core::stream::Stream;
6use glib::{prelude::*, translate::*};
7
8use crate::{Cancellable, DatagramBased, InputMessage, OutputMessage, ffi};
9
10pub trait DatagramBasedExtManual: IsA<DatagramBased> + Sized {
11 /// Creates a #GSource that can be attached to a #GMainContext to monitor for
12 /// the availability of the specified @condition on the #GDatagramBased. The
13 /// #GSource keeps a reference to the @self.
14 ///
15 /// The callback on the source is of the #GDatagramBasedSourceFunc type.
16 ///
17 /// It is meaningless to specify [`glib::IOCondition::ERR`][crate::glib::IOCondition::ERR] or [`glib::IOCondition::HUP`][crate::glib::IOCondition::HUP] in @condition; these
18 /// conditions will always be reported in the callback if they are true.
19 ///
20 /// If non-[`None`], @cancellable can be used to cancel the source, which will
21 /// cause the source to trigger, reporting the current condition (which is
22 /// likely 0 unless cancellation happened at the same time as a condition
23 /// change). You can check for this in the callback using
24 /// g_cancellable_is_cancelled().
25 /// ## `condition`
26 /// a #GIOCondition mask to monitor
27 /// ## `cancellable`
28 /// a #GCancellable
29 ///
30 /// # Returns
31 ///
32 /// a newly allocated #GSource
33 #[doc(alias = "g_datagram_based_create_source")]
34 fn create_source<F, C>(
35 &self,
36 condition: glib::IOCondition,
37 cancellable: Option<&C>,
38 name: Option<&str>,
39 priority: glib::Priority,
40 func: F,
41 ) -> glib::Source
42 where
43 F: FnMut(&Self, glib::IOCondition) -> glib::ControlFlow + 'static,
44 C: IsA<Cancellable>,
45 {
46 unsafe extern "C" fn trampoline<
47 O: IsA<DatagramBased>,
48 F: FnMut(&O, glib::IOCondition) -> glib::ControlFlow + 'static,
49 >(
50 datagram_based: *mut ffi::GDatagramBased,
51 condition: glib::ffi::GIOCondition,
52 func: glib::ffi::gpointer,
53 ) -> glib::ffi::gboolean {
54 unsafe {
55 let func: &RefCell<F> = &*(func as *const RefCell<F>);
56 let mut func = func.borrow_mut();
57 (*func)(
58 DatagramBased::from_glib_borrow(datagram_based).unsafe_cast_ref(),
59 from_glib(condition),
60 )
61 .into_glib()
62 }
63 }
64 unsafe extern "C" fn destroy_closure<F>(ptr: glib::ffi::gpointer) {
65 unsafe {
66 let _ = Box::<RefCell<F>>::from_raw(ptr as *mut _);
67 }
68 }
69 let cancellable = cancellable.map(|c| c.as_ref());
70 let gcancellable = cancellable.to_glib_none();
71 unsafe {
72 let source = ffi::g_datagram_based_create_source(
73 self.as_ref().to_glib_none().0,
74 condition.into_glib(),
75 gcancellable.0,
76 );
77 let trampoline = trampoline::<Self, F> as glib::ffi::gpointer;
78 glib::ffi::g_source_set_callback(
79 source,
80 Some(transmute::<
81 glib::ffi::gpointer,
82 unsafe extern "C" fn(glib::ffi::gpointer) -> glib::ffi::gboolean,
83 >(trampoline)),
84 Box::into_raw(Box::new(RefCell::new(func))) as glib::ffi::gpointer,
85 Some(destroy_closure::<F>),
86 );
87 glib::ffi::g_source_set_priority(source, priority.into_glib());
88
89 if let Some(name) = name {
90 glib::ffi::g_source_set_name(source, name.to_glib_none().0);
91 }
92
93 from_glib_full(source)
94 }
95 }
96
97 fn create_source_future<C: IsA<Cancellable>>(
98 &self,
99 condition: glib::IOCondition,
100 cancellable: Option<&C>,
101 priority: glib::Priority,
102 ) -> Pin<Box<dyn std::future::Future<Output = glib::IOCondition> + 'static>> {
103 let cancellable: Option<Cancellable> = cancellable.map(|c| c.as_ref()).cloned();
104
105 let obj = self.clone();
106 Box::pin(glib::SourceFuture::new(move |send| {
107 let mut send = Some(send);
108 obj.create_source(
109 condition,
110 cancellable.as_ref(),
111 None,
112 priority,
113 move |_, condition| {
114 let _ = send.take().unwrap().send(condition);
115 glib::ControlFlow::Break
116 },
117 )
118 }))
119 }
120
121 fn create_source_stream<C: IsA<Cancellable>>(
122 &self,
123 condition: glib::IOCondition,
124 cancellable: Option<&C>,
125 priority: glib::Priority,
126 ) -> Pin<Box<dyn Stream<Item = glib::IOCondition> + 'static>> {
127 let cancellable: Option<Cancellable> = cancellable.map(|c| c.as_ref()).cloned();
128
129 let obj = self.clone();
130 Box::pin(glib::SourceStream::new(move |send| {
131 let send = Some(send);
132 obj.create_source(
133 condition,
134 cancellable.as_ref(),
135 None,
136 priority,
137 move |_, condition| {
138 if send.as_ref().unwrap().unbounded_send(condition).is_err() {
139 glib::ControlFlow::Break
140 } else {
141 glib::ControlFlow::Continue
142 }
143 },
144 )
145 }))
146 }
147
148 /// Waits for up to @timeout microseconds for condition to become true on
149 /// @self. If the condition is met, [`true`] is returned.
150 ///
151 /// If @cancellable is cancelled before the condition is met, or if @timeout is
152 /// reached before the condition is met, then [`false`] is returned and @error is
153 /// set appropriately ([`IOErrorEnum::Cancelled`][crate::IOErrorEnum::Cancelled] or [`IOErrorEnum::TimedOut`][crate::IOErrorEnum::TimedOut]).
154 /// ## `condition`
155 /// a #GIOCondition mask to wait for
156 /// ## `timeout`
157 /// the maximum time (in microseconds) to wait, 0 to not block, or -1
158 /// to block indefinitely
159 /// ## `cancellable`
160 /// a #GCancellable
161 ///
162 /// # Returns
163 ///
164 /// [`true`] if the condition was met, [`false`] otherwise
165 #[doc(alias = "g_datagram_based_condition_wait")]
166 fn condition_wait(
167 &self,
168 condition: glib::IOCondition,
169 timeout: Option<Duration>,
170 cancellable: Option<&impl IsA<Cancellable>>,
171 ) -> Result<(), glib::Error> {
172 unsafe {
173 let mut error = ptr::null_mut();
174 let is_ok = ffi::g_datagram_based_condition_wait(
175 self.as_ref().to_glib_none().0,
176 condition.into_glib(),
177 timeout
178 .map(|t| t.as_micros().try_into().unwrap())
179 .unwrap_or(-1),
180 cancellable.map(|p| p.as_ref()).to_glib_none().0,
181 &mut error,
182 );
183 debug_assert_eq!(is_ok == glib::ffi::GFALSE, !error.is_null());
184 if error.is_null() {
185 Ok(())
186 } else {
187 Err(from_glib_full(error))
188 }
189 }
190 }
191
192 /// Receive one or more data messages from @self in one go.
193 ///
194 /// @messages must point to an array of #GInputMessage structs and
195 /// @num_messages must be the length of this array. Each #GInputMessage
196 /// contains a pointer to an array of #GInputVector structs describing the
197 /// buffers that the data received in each message will be written to.
198 ///
199 /// @flags modify how all messages are received. The commonly available
200 /// arguments for this are available in the #GSocketMsgFlags enum, but the
201 /// values there are the same as the system values, and the flags
202 /// are passed in as-is, so you can pass in system-specific flags too. These
203 /// flags affect the overall receive operation. Flags affecting individual
204 /// messages are returned in #GInputMessage.flags.
205 ///
206 /// The other members of #GInputMessage are treated as described in its
207 /// documentation.
208 ///
209 /// If @timeout is negative the call will block until @num_messages have been
210 /// received, the connection is closed remotely (EOS), @cancellable is cancelled,
211 /// or an error occurs.
212 ///
213 /// If @timeout is 0 the call will return up to @num_messages without blocking,
214 /// or [`IOErrorEnum::WouldBlock`][crate::IOErrorEnum::WouldBlock] if no messages are queued in the operating system
215 /// to be received.
216 ///
217 /// If @timeout is positive the call will block on the same conditions as if
218 /// @timeout were negative. If the timeout is reached
219 /// before any messages are received, [`IOErrorEnum::TimedOut`][crate::IOErrorEnum::TimedOut] is returned,
220 /// otherwise it will return the number of messages received before timing out.
221 /// (Note: This is effectively the behaviour of `MSG_WAITFORONE` with
222 /// recvmmsg().)
223 ///
224 /// To be notified when messages are available, wait for the [`glib::IOCondition::IN`][crate::glib::IOCondition::IN] condition.
225 /// Note though that you may still receive [`IOErrorEnum::WouldBlock`][crate::IOErrorEnum::WouldBlock] from
226 /// g_datagram_based_receive_messages() even if you were previously notified of a
227 /// [`glib::IOCondition::IN`][crate::glib::IOCondition::IN] condition.
228 ///
229 /// If the remote peer closes the connection, any messages queued in the
230 /// underlying receive buffer will be returned, and subsequent calls to
231 /// g_datagram_based_receive_messages() will return 0 (with no error set).
232 ///
233 /// If the connection is shut down or closed (by calling g_socket_close() or
234 /// g_socket_shutdown() with @shutdown_read set, if it’s a #GSocket, for
235 /// example), all calls to this function will return [`IOErrorEnum::Closed`][crate::IOErrorEnum::Closed].
236 ///
237 /// On error -1 is returned and @error is set accordingly. An error will only
238 /// be returned if zero messages could be received; otherwise the number of
239 /// messages successfully received before the error will be returned. If
240 /// @cancellable is cancelled, [`IOErrorEnum::Cancelled`][crate::IOErrorEnum::Cancelled] is returned as with any
241 /// other error.
242 /// ## `messages`
243 /// an array of #GInputMessage structs
244 /// ## `flags`
245 /// an int containing #GSocketMsgFlags flags for the overall operation
246 /// ## `timeout`
247 /// the maximum time (in microseconds) to wait, 0 to not block, or -1
248 /// to block indefinitely
249 /// ## `cancellable`
250 /// a `GCancellable`
251 ///
252 /// # Returns
253 ///
254 /// number of messages received, or -1 on error. Note that the number
255 /// of messages received may be smaller than @num_messages if @timeout is
256 /// zero or positive, if the peer closed the connection, or if @num_messages
257 /// was larger than `UIO_MAXIOV` (1024), in which case the caller may re-try
258 /// to receive the remaining messages.
259 #[doc(alias = "g_datagram_based_receive_messages")]
260 fn receive_messages<'v, V: IntoIterator<Item = &'v mut [&'v mut [u8]]>, C: IsA<Cancellable>>(
261 &self,
262 messages: &mut [InputMessage],
263 flags: i32,
264 timeout: Option<Duration>,
265 cancellable: Option<&C>,
266 ) -> Result<usize, glib::Error> {
267 let cancellable = cancellable.map(|c| c.as_ref());
268 unsafe {
269 let mut error = ptr::null_mut();
270
271 let count = ffi::g_datagram_based_receive_messages(
272 self.as_ref().to_glib_none().0,
273 messages.as_mut_ptr() as *mut _,
274 messages.len().try_into().unwrap(),
275 flags,
276 timeout
277 .map(|t| t.as_micros().try_into().unwrap())
278 .unwrap_or(-1),
279 cancellable.to_glib_none().0,
280 &mut error,
281 );
282 if error.is_null() {
283 Ok(count as usize)
284 } else {
285 Err(from_glib_full(error))
286 }
287 }
288 }
289
290 /// Send one or more data messages from @self in one go.
291 ///
292 /// @messages must point to an array of #GOutputMessage structs and
293 /// @num_messages must be the length of this array. Each #GOutputMessage
294 /// contains an address to send the data to, and a pointer to an array of
295 /// #GOutputVector structs to describe the buffers that the data to be sent
296 /// for each message will be gathered from.
297 ///
298 /// @flags modify how the message is sent. The commonly available arguments
299 /// for this are available in the #GSocketMsgFlags enum, but the
300 /// values there are the same as the system values, and the flags
301 /// are passed in as-is, so you can pass in system-specific flags too.
302 ///
303 /// The other members of #GOutputMessage are treated as described in its
304 /// documentation.
305 ///
306 /// If @timeout is negative the call will block until @num_messages have been
307 /// sent, @cancellable is cancelled, or an error occurs.
308 ///
309 /// If @timeout is 0 the call will send up to @num_messages without blocking,
310 /// or will return [`IOErrorEnum::WouldBlock`][crate::IOErrorEnum::WouldBlock] if there is no space to send messages.
311 ///
312 /// If @timeout is positive the call will block on the same conditions as if
313 /// @timeout were negative. If the timeout is reached before any messages are
314 /// sent, [`IOErrorEnum::TimedOut`][crate::IOErrorEnum::TimedOut] is returned, otherwise it will return the number
315 /// of messages sent before timing out.
316 ///
317 /// To be notified when messages can be sent, wait for the [`glib::IOCondition::OUT`][crate::glib::IOCondition::OUT] condition.
318 /// Note though that you may still receive [`IOErrorEnum::WouldBlock`][crate::IOErrorEnum::WouldBlock] from
319 /// g_datagram_based_send_messages() even if you were previously notified of a
320 /// [`glib::IOCondition::OUT`][crate::glib::IOCondition::OUT] condition. (On Windows in particular, this is very common due to
321 /// the way the underlying APIs work.)
322 ///
323 /// If the connection is shut down or closed (by calling g_socket_close() or
324 /// g_socket_shutdown() with @shutdown_write set, if it’s a #GSocket, for
325 /// example), all calls to this function will return [`IOErrorEnum::Closed`][crate::IOErrorEnum::Closed].
326 ///
327 /// On error -1 is returned and @error is set accordingly. An error will only
328 /// be returned if zero messages could be sent; otherwise the number of messages
329 /// successfully sent before the error will be returned. If @cancellable is
330 /// cancelled, [`IOErrorEnum::Cancelled`][crate::IOErrorEnum::Cancelled] is returned as with any other error.
331 /// ## `messages`
332 /// an array of #GOutputMessage structs
333 /// ## `flags`
334 /// an int containing #GSocketMsgFlags flags
335 /// ## `timeout`
336 /// the maximum time (in microseconds) to wait, 0 to not block, or -1
337 /// to block indefinitely
338 /// ## `cancellable`
339 /// a `GCancellable`
340 ///
341 /// # Returns
342 ///
343 /// number of messages sent, or -1 on error. Note that the number of
344 /// messages sent may be smaller than @num_messages if @timeout is zero
345 /// or positive, or if @num_messages was larger than `UIO_MAXIOV` (1024), in
346 /// which case the caller may re-try to send the remaining messages.
347 #[doc(alias = "g_datagram_based_send_messages")]
348 fn send_messages<C: IsA<Cancellable>>(
349 &self,
350 messages: &mut [OutputMessage],
351 flags: i32,
352 timeout: Option<Duration>,
353 cancellable: Option<&C>,
354 ) -> Result<usize, glib::Error> {
355 let cancellable = cancellable.map(|c| c.as_ref());
356 unsafe {
357 let mut error = ptr::null_mut();
358 let count = ffi::g_datagram_based_send_messages(
359 self.as_ref().to_glib_none().0,
360 messages.as_mut_ptr() as *mut _,
361 messages.len().try_into().unwrap(),
362 flags,
363 timeout
364 .map(|t| t.as_micros().try_into().unwrap())
365 .unwrap_or(-1),
366 cancellable.to_glib_none().0,
367 &mut error,
368 );
369 if error.is_null() {
370 Ok(count as usize)
371 } else {
372 Err(from_glib_full(error))
373 }
374 }
375 }
376}
377
378impl<O: IsA<DatagramBased>> DatagramBasedExtManual for O {}