gio/datagram_based.rs
1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{cell::RefCell, mem::transmute, pin::Pin, ptr, time::Duration};
4
5use futures_core::stream::Stream;
6use glib::{prelude::*, translate::*};
7
8use crate::{ffi, Cancellable, DatagramBased, InputMessage, OutputMessage};
9
10mod sealed {
11 pub trait Sealed {}
12 impl<T: super::IsA<super::DatagramBased>> Sealed for T {}
13}
14
15pub trait DatagramBasedExtManual: sealed::Sealed + IsA<DatagramBased> + Sized {
16 /// Creates a #GSource that can be attached to a #GMainContext to monitor for
17 /// the availability of the specified @condition on the #GDatagramBased. The
18 /// #GSource keeps a reference to the @self.
19 ///
20 /// The callback on the source is of the #GDatagramBasedSourceFunc type.
21 ///
22 /// It is meaningless to specify [`glib::IOCondition::ERR`][crate::glib::IOCondition::ERR] or [`glib::IOCondition::HUP`][crate::glib::IOCondition::HUP] in @condition; these
23 /// conditions will always be reported in the callback if they are true.
24 ///
25 /// If non-[`None`], @cancellable can be used to cancel the source, which will
26 /// cause the source to trigger, reporting the current condition (which is
27 /// likely 0 unless cancellation happened at the same time as a condition
28 /// change). You can check for this in the callback using
29 /// g_cancellable_is_cancelled().
30 /// ## `condition`
31 /// a #GIOCondition mask to monitor
32 /// ## `cancellable`
33 /// a #GCancellable
34 ///
35 /// # Returns
36 ///
37 /// a newly allocated #GSource
38 #[doc(alias = "g_datagram_based_create_source")]
39 fn create_source<F, C>(
40 &self,
41 condition: glib::IOCondition,
42 cancellable: Option<&C>,
43 name: Option<&str>,
44 priority: glib::Priority,
45 func: F,
46 ) -> glib::Source
47 where
48 F: FnMut(&Self, glib::IOCondition) -> glib::ControlFlow + 'static,
49 C: IsA<Cancellable>,
50 {
51 unsafe extern "C" fn trampoline<
52 O: IsA<DatagramBased>,
53 F: FnMut(&O, glib::IOCondition) -> glib::ControlFlow + 'static,
54 >(
55 datagram_based: *mut ffi::GDatagramBased,
56 condition: glib::ffi::GIOCondition,
57 func: glib::ffi::gpointer,
58 ) -> glib::ffi::gboolean {
59 let func: &RefCell<F> = &*(func as *const RefCell<F>);
60 let mut func = func.borrow_mut();
61 (*func)(
62 DatagramBased::from_glib_borrow(datagram_based).unsafe_cast_ref(),
63 from_glib(condition),
64 )
65 .into_glib()
66 }
67 unsafe extern "C" fn destroy_closure<F>(ptr: glib::ffi::gpointer) {
68 let _ = Box::<RefCell<F>>::from_raw(ptr as *mut _);
69 }
70 let cancellable = cancellable.map(|c| c.as_ref());
71 let gcancellable = cancellable.to_glib_none();
72 unsafe {
73 let source = ffi::g_datagram_based_create_source(
74 self.as_ref().to_glib_none().0,
75 condition.into_glib(),
76 gcancellable.0,
77 );
78 let trampoline = trampoline::<Self, F> as glib::ffi::gpointer;
79 glib::ffi::g_source_set_callback(
80 source,
81 Some(transmute::<
82 glib::ffi::gpointer,
83 unsafe extern "C" fn(glib::ffi::gpointer) -> glib::ffi::gboolean,
84 >(trampoline)),
85 Box::into_raw(Box::new(RefCell::new(func))) as glib::ffi::gpointer,
86 Some(destroy_closure::<F>),
87 );
88 glib::ffi::g_source_set_priority(source, priority.into_glib());
89
90 if let Some(name) = name {
91 glib::ffi::g_source_set_name(source, name.to_glib_none().0);
92 }
93
94 from_glib_full(source)
95 }
96 }
97
98 fn create_source_future<C: IsA<Cancellable>>(
99 &self,
100 condition: glib::IOCondition,
101 cancellable: Option<&C>,
102 priority: glib::Priority,
103 ) -> Pin<Box<dyn std::future::Future<Output = glib::IOCondition> + 'static>> {
104 let cancellable: Option<Cancellable> = cancellable.map(|c| c.as_ref()).cloned();
105
106 let obj = self.clone();
107 Box::pin(glib::SourceFuture::new(move |send| {
108 let mut send = Some(send);
109 obj.create_source(
110 condition,
111 cancellable.as_ref(),
112 None,
113 priority,
114 move |_, condition| {
115 let _ = send.take().unwrap().send(condition);
116 glib::ControlFlow::Break
117 },
118 )
119 }))
120 }
121
122 fn create_source_stream<C: IsA<Cancellable>>(
123 &self,
124 condition: glib::IOCondition,
125 cancellable: Option<&C>,
126 priority: glib::Priority,
127 ) -> Pin<Box<dyn Stream<Item = glib::IOCondition> + 'static>> {
128 let cancellable: Option<Cancellable> = cancellable.map(|c| c.as_ref()).cloned();
129
130 let obj = self.clone();
131 Box::pin(glib::SourceStream::new(move |send| {
132 let send = Some(send);
133 obj.create_source(
134 condition,
135 cancellable.as_ref(),
136 None,
137 priority,
138 move |_, condition| {
139 if send.as_ref().unwrap().unbounded_send(condition).is_err() {
140 glib::ControlFlow::Break
141 } else {
142 glib::ControlFlow::Continue
143 }
144 },
145 )
146 }))
147 }
148
149 /// Waits for up to @timeout microseconds for condition to become true on
150 /// @self. If the condition is met, [`true`] is returned.
151 ///
152 /// If @cancellable is cancelled before the condition is met, or if @timeout is
153 /// reached before the condition is met, then [`false`] is returned and @error is
154 /// set appropriately ([`IOErrorEnum::Cancelled`][crate::IOErrorEnum::Cancelled] or [`IOErrorEnum::TimedOut`][crate::IOErrorEnum::TimedOut]).
155 /// ## `condition`
156 /// a #GIOCondition mask to wait for
157 /// ## `timeout`
158 /// the maximum time (in microseconds) to wait, 0 to not block, or -1
159 /// to block indefinitely
160 /// ## `cancellable`
161 /// a #GCancellable
162 ///
163 /// # Returns
164 ///
165 /// [`true`] if the condition was met, [`false`] otherwise
166 #[doc(alias = "g_datagram_based_condition_wait")]
167 fn condition_wait(
168 &self,
169 condition: glib::IOCondition,
170 timeout: Option<Duration>,
171 cancellable: Option<&impl IsA<Cancellable>>,
172 ) -> Result<(), glib::Error> {
173 unsafe {
174 let mut error = ptr::null_mut();
175 let is_ok = ffi::g_datagram_based_condition_wait(
176 self.as_ref().to_glib_none().0,
177 condition.into_glib(),
178 timeout
179 .map(|t| t.as_micros().try_into().unwrap())
180 .unwrap_or(-1),
181 cancellable.map(|p| p.as_ref()).to_glib_none().0,
182 &mut error,
183 );
184 debug_assert_eq!(is_ok == glib::ffi::GFALSE, !error.is_null());
185 if error.is_null() {
186 Ok(())
187 } else {
188 Err(from_glib_full(error))
189 }
190 }
191 }
192
193 /// Receive one or more data messages from @self in one go.
194 ///
195 /// @messages must point to an array of #GInputMessage structs and
196 /// @num_messages must be the length of this array. Each #GInputMessage
197 /// contains a pointer to an array of #GInputVector structs describing the
198 /// buffers that the data received in each message will be written to.
199 ///
200 /// @flags modify how all messages are received. The commonly available
201 /// arguments for this are available in the #GSocketMsgFlags enum, but the
202 /// values there are the same as the system values, and the flags
203 /// are passed in as-is, so you can pass in system-specific flags too. These
204 /// flags affect the overall receive operation. Flags affecting individual
205 /// messages are returned in #GInputMessage.flags.
206 ///
207 /// The other members of #GInputMessage are treated as described in its
208 /// documentation.
209 ///
210 /// If @timeout is negative the call will block until @num_messages have been
211 /// received, the connection is closed remotely (EOS), @cancellable is cancelled,
212 /// or an error occurs.
213 ///
214 /// If @timeout is 0 the call will return up to @num_messages without blocking,
215 /// or [`IOErrorEnum::WouldBlock`][crate::IOErrorEnum::WouldBlock] if no messages are queued in the operating system
216 /// to be received.
217 ///
218 /// If @timeout is positive the call will block on the same conditions as if
219 /// @timeout were negative. If the timeout is reached
220 /// before any messages are received, [`IOErrorEnum::TimedOut`][crate::IOErrorEnum::TimedOut] is returned,
221 /// otherwise it will return the number of messages received before timing out.
222 /// (Note: This is effectively the behaviour of `MSG_WAITFORONE` with
223 /// recvmmsg().)
224 ///
225 /// To be notified when messages are available, wait for the [`glib::IOCondition::IN`][crate::glib::IOCondition::IN] condition.
226 /// Note though that you may still receive [`IOErrorEnum::WouldBlock`][crate::IOErrorEnum::WouldBlock] from
227 /// g_datagram_based_receive_messages() even if you were previously notified of a
228 /// [`glib::IOCondition::IN`][crate::glib::IOCondition::IN] condition.
229 ///
230 /// If the remote peer closes the connection, any messages queued in the
231 /// underlying receive buffer will be returned, and subsequent calls to
232 /// g_datagram_based_receive_messages() will return 0 (with no error set).
233 ///
234 /// If the connection is shut down or closed (by calling g_socket_close() or
235 /// g_socket_shutdown() with @shutdown_read set, if it’s a #GSocket, for
236 /// example), all calls to this function will return [`IOErrorEnum::Closed`][crate::IOErrorEnum::Closed].
237 ///
238 /// On error -1 is returned and @error is set accordingly. An error will only
239 /// be returned if zero messages could be received; otherwise the number of
240 /// messages successfully received before the error will be returned. If
241 /// @cancellable is cancelled, [`IOErrorEnum::Cancelled`][crate::IOErrorEnum::Cancelled] is returned as with any
242 /// other error.
243 /// ## `messages`
244 /// an array of #GInputMessage structs
245 /// ## `flags`
246 /// an int containing #GSocketMsgFlags flags for the overall operation
247 /// ## `timeout`
248 /// the maximum time (in microseconds) to wait, 0 to not block, or -1
249 /// to block indefinitely
250 /// ## `cancellable`
251 /// a `GCancellable`
252 ///
253 /// # Returns
254 ///
255 /// number of messages received, or -1 on error. Note that the number
256 /// of messages received may be smaller than @num_messages if @timeout is
257 /// zero or positive, if the peer closed the connection, or if @num_messages
258 /// was larger than `UIO_MAXIOV` (1024), in which case the caller may re-try
259 /// to receive the remaining messages.
260 #[doc(alias = "g_datagram_based_receive_messages")]
261 fn receive_messages<'v, V: IntoIterator<Item = &'v mut [&'v mut [u8]]>, C: IsA<Cancellable>>(
262 &self,
263 messages: &mut [InputMessage],
264 flags: i32,
265 timeout: Option<Duration>,
266 cancellable: Option<&C>,
267 ) -> Result<usize, glib::Error> {
268 let cancellable = cancellable.map(|c| c.as_ref());
269 unsafe {
270 let mut error = ptr::null_mut();
271
272 let count = ffi::g_datagram_based_receive_messages(
273 self.as_ref().to_glib_none().0,
274 messages.as_mut_ptr() as *mut _,
275 messages.len().try_into().unwrap(),
276 flags,
277 timeout
278 .map(|t| t.as_micros().try_into().unwrap())
279 .unwrap_or(-1),
280 cancellable.to_glib_none().0,
281 &mut error,
282 );
283 if error.is_null() {
284 Ok(count as usize)
285 } else {
286 Err(from_glib_full(error))
287 }
288 }
289 }
290
291 /// Send one or more data messages from @self in one go.
292 ///
293 /// @messages must point to an array of #GOutputMessage structs and
294 /// @num_messages must be the length of this array. Each #GOutputMessage
295 /// contains an address to send the data to, and a pointer to an array of
296 /// #GOutputVector structs to describe the buffers that the data to be sent
297 /// for each message will be gathered from.
298 ///
299 /// @flags modify how the message is sent. The commonly available arguments
300 /// for this are available in the #GSocketMsgFlags enum, but the
301 /// values there are the same as the system values, and the flags
302 /// are passed in as-is, so you can pass in system-specific flags too.
303 ///
304 /// The other members of #GOutputMessage are treated as described in its
305 /// documentation.
306 ///
307 /// If @timeout is negative the call will block until @num_messages have been
308 /// sent, @cancellable is cancelled, or an error occurs.
309 ///
310 /// If @timeout is 0 the call will send up to @num_messages without blocking,
311 /// or will return [`IOErrorEnum::WouldBlock`][crate::IOErrorEnum::WouldBlock] if there is no space to send messages.
312 ///
313 /// If @timeout is positive the call will block on the same conditions as if
314 /// @timeout were negative. If the timeout is reached before any messages are
315 /// sent, [`IOErrorEnum::TimedOut`][crate::IOErrorEnum::TimedOut] is returned, otherwise it will return the number
316 /// of messages sent before timing out.
317 ///
318 /// To be notified when messages can be sent, wait for the [`glib::IOCondition::OUT`][crate::glib::IOCondition::OUT] condition.
319 /// Note though that you may still receive [`IOErrorEnum::WouldBlock`][crate::IOErrorEnum::WouldBlock] from
320 /// g_datagram_based_send_messages() even if you were previously notified of a
321 /// [`glib::IOCondition::OUT`][crate::glib::IOCondition::OUT] condition. (On Windows in particular, this is very common due to
322 /// the way the underlying APIs work.)
323 ///
324 /// If the connection is shut down or closed (by calling g_socket_close() or
325 /// g_socket_shutdown() with @shutdown_write set, if it’s a #GSocket, for
326 /// example), all calls to this function will return [`IOErrorEnum::Closed`][crate::IOErrorEnum::Closed].
327 ///
328 /// On error -1 is returned and @error is set accordingly. An error will only
329 /// be returned if zero messages could be sent; otherwise the number of messages
330 /// successfully sent before the error will be returned. If @cancellable is
331 /// cancelled, [`IOErrorEnum::Cancelled`][crate::IOErrorEnum::Cancelled] is returned as with any other error.
332 /// ## `messages`
333 /// an array of #GOutputMessage structs
334 /// ## `flags`
335 /// an int containing #GSocketMsgFlags flags
336 /// ## `timeout`
337 /// the maximum time (in microseconds) to wait, 0 to not block, or -1
338 /// to block indefinitely
339 /// ## `cancellable`
340 /// a `GCancellable`
341 ///
342 /// # Returns
343 ///
344 /// number of messages sent, or -1 on error. Note that the number of
345 /// messages sent may be smaller than @num_messages if @timeout is zero
346 /// or positive, or if @num_messages was larger than `UIO_MAXIOV` (1024), in
347 /// which case the caller may re-try to send the remaining messages.
348 #[doc(alias = "g_datagram_based_send_messages")]
349 fn send_messages<C: IsA<Cancellable>>(
350 &self,
351 messages: &mut [OutputMessage],
352 flags: i32,
353 timeout: Option<Duration>,
354 cancellable: Option<&C>,
355 ) -> Result<usize, glib::Error> {
356 let cancellable = cancellable.map(|c| c.as_ref());
357 unsafe {
358 let mut error = ptr::null_mut();
359 let count = ffi::g_datagram_based_send_messages(
360 self.as_ref().to_glib_none().0,
361 messages.as_mut_ptr() as *mut _,
362 messages.len().try_into().unwrap(),
363 flags,
364 timeout
365 .map(|t| t.as_micros().try_into().unwrap())
366 .unwrap_or(-1),
367 cancellable.to_glib_none().0,
368 &mut error,
369 );
370 if error.is_null() {
371 Ok(count as usize)
372 } else {
373 Err(from_glib_full(error))
374 }
375 }
376 }
377}
378
379impl<O: IsA<DatagramBased>> DatagramBasedExtManual for O {}