gio/socket.rs
1// Take a look at the license at the top of the repository in the LICENSE file.
2
3#[cfg(unix)]
4use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
5#[cfg(windows)]
6use std::os::windows::io::{
7 AsRawSocket, AsSocket, BorrowedSocket, FromRawSocket, IntoRawSocket, OwnedSocket, RawSocket,
8};
9#[cfg(feature = "v2_60")]
10use std::time::Duration;
11use std::{cell::RefCell, marker::PhantomData, mem::transmute, pin::Pin, ptr};
12
13use futures_core::stream::Stream;
14use glib::{Slice, prelude::*, translate::*};
15
16#[cfg(feature = "v2_60")]
17use crate::PollableReturn;
18use crate::{Cancellable, Socket, SocketAddress, SocketControlMessage, ffi};
19
20impl Socket {
21 /// Creates a new #GSocket from a native file descriptor
22 /// or winsock SOCKET handle.
23 ///
24 /// This reads all the settings from the file descriptor so that
25 /// all properties should work. Note that the file descriptor
26 /// will be set to non-blocking mode, independent on the blocking
27 /// mode of the #GSocket.
28 ///
29 /// On success, the returned #GSocket takes ownership of @fd. On failure, the
30 /// caller must close @fd themselves.
31 ///
32 /// Since GLib 2.46, it is no longer a fatal error to call this on a non-socket
33 /// descriptor. Instead, a GError will be set with code [`IOErrorEnum::Failed`][crate::IOErrorEnum::Failed]
34 /// ## `fd`
35 /// a native socket file descriptor.
36 ///
37 /// # Returns
38 ///
39 /// a #GSocket or [`None`] on error.
40 /// Free the returned object with g_object_unref().
41 #[cfg(unix)]
42 #[cfg_attr(docsrs, doc(cfg(unix)))]
43 #[doc(alias = "g_socket_new_from_fd")]
44 pub fn from_fd(fd: OwnedFd) -> Result<Socket, glib::Error> {
45 unsafe { Self::from_raw_fd(fd) }
46 }
47
48 #[cfg(unix)]
49 #[cfg_attr(docsrs, doc(cfg(unix)))]
50 #[doc(alias = "g_socket_new_from_fd")]
51 pub unsafe fn from_raw_fd(fd: impl IntoRawFd) -> Result<Socket, glib::Error> {
52 let fd = fd.into_raw_fd();
53 let mut error = ptr::null_mut();
54 unsafe {
55 let ret = ffi::g_socket_new_from_fd(fd, &mut error);
56 if error.is_null() {
57 Ok(from_glib_full(ret))
58 } else {
59 let _ = OwnedFd::from_raw_fd(fd);
60 Err(from_glib_full(error))
61 }
62 }
63 }
64
65 #[cfg(windows)]
66 #[cfg_attr(docsrs, doc(cfg(windows)))]
67 pub fn from_socket(socket: OwnedSocket) -> Result<Socket, glib::Error> {
68 unsafe { Self::from_raw_socket(socket) }
69 }
70
71 #[cfg(windows)]
72 #[cfg_attr(docsrs, doc(cfg(windows)))]
73 pub unsafe fn from_raw_socket(socket: impl IntoRawSocket) -> Result<Socket, glib::Error> {
74 let socket = socket.into_raw_socket();
75 let mut error = ptr::null_mut();
76 unsafe {
77 let ret = ffi::g_socket_new_from_fd(socket as i32, &mut error);
78 if error.is_null() {
79 Ok(from_glib_full(ret))
80 } else {
81 let _ = OwnedSocket::from_raw_socket(socket);
82 Err(from_glib_full(error))
83 }
84 }
85 }
86}
87
88#[cfg(unix)]
89#[cfg_attr(docsrs, doc(cfg(unix)))]
90impl AsRawFd for Socket {
91 fn as_raw_fd(&self) -> RawFd {
92 unsafe { ffi::g_socket_get_fd(self.to_glib_none().0) as _ }
93 }
94}
95
96#[cfg(unix)]
97#[cfg_attr(docsrs, doc(cfg(unix)))]
98impl AsFd for Socket {
99 fn as_fd(&self) -> BorrowedFd<'_> {
100 unsafe {
101 let raw_fd = self.as_raw_fd();
102 BorrowedFd::borrow_raw(raw_fd)
103 }
104 }
105}
106
107#[cfg(windows)]
108#[cfg_attr(docsrs, doc(cfg(windows)))]
109impl AsRawSocket for Socket {
110 fn as_raw_socket(&self) -> RawSocket {
111 unsafe { ffi::g_socket_get_fd(self.to_glib_none().0) as _ }
112 }
113}
114
115#[cfg(windows)]
116#[cfg_attr(docsrs, doc(cfg(windows)))]
117impl AsSocket for Socket {
118 fn as_socket(&self) -> BorrowedSocket<'_> {
119 unsafe {
120 let raw_socket = self.as_raw_socket();
121 BorrowedSocket::borrow_raw(raw_socket)
122 }
123 }
124}
125
126#[doc(alias = "GInputVector")]
127#[repr(transparent)]
128#[derive(Debug)]
129pub struct InputVector<'v> {
130 vector: ffi::GInputVector,
131 buffer: PhantomData<&'v mut [u8]>,
132}
133
134impl<'v> InputVector<'v> {
135 #[inline]
136 pub fn new(buffer: &'v mut [u8]) -> Self {
137 Self {
138 vector: ffi::GInputVector {
139 buffer: buffer.as_mut_ptr() as *mut _,
140 size: buffer.len(),
141 },
142 buffer: PhantomData,
143 }
144 }
145}
146
147unsafe impl Send for InputVector<'_> {}
148unsafe impl Sync for InputVector<'_> {}
149
150impl std::ops::Deref for InputVector<'_> {
151 type Target = [u8];
152
153 #[inline]
154 fn deref(&self) -> &Self::Target {
155 unsafe { std::slice::from_raw_parts(self.vector.buffer as *const _, self.vector.size) }
156 }
157}
158
159impl std::ops::DerefMut for InputVector<'_> {
160 #[inline]
161 fn deref_mut(&mut self) -> &mut Self::Target {
162 unsafe { std::slice::from_raw_parts_mut(self.vector.buffer as *mut _, self.vector.size) }
163 }
164}
165
166#[derive(Debug)]
167pub struct SocketControlMessages {
168 ptr: *mut *mut ffi::GSocketControlMessage,
169 len: u32,
170}
171
172impl SocketControlMessages {
173 #[inline]
174 pub const fn new() -> Self {
175 Self {
176 ptr: ptr::null_mut(),
177 len: 0,
178 }
179 }
180}
181
182impl AsRef<[SocketControlMessage]> for SocketControlMessages {
183 #[inline]
184 fn as_ref(&self) -> &[SocketControlMessage] {
185 unsafe { std::slice::from_raw_parts(self.ptr as *const _, self.len as usize) }
186 }
187}
188
189impl std::ops::Deref for SocketControlMessages {
190 type Target = [SocketControlMessage];
191
192 #[inline]
193 fn deref(&self) -> &Self::Target {
194 self.as_ref()
195 }
196}
197
198impl Default for SocketControlMessages {
199 #[inline]
200 fn default() -> Self {
201 Self::new()
202 }
203}
204
205impl Drop for SocketControlMessages {
206 #[inline]
207 fn drop(&mut self) {
208 unsafe {
209 let _: Slice<SocketControlMessage> =
210 Slice::from_glib_full_num(self.ptr as *mut _, self.len as usize);
211 }
212 }
213}
214
215/// Structure used for scatter/gather data input when receiving multiple
216/// messages or packets in one go. You generally pass in an array of empty
217/// #GInputVectors and the operation will use all the buffers as if they
218/// were one buffer, and will set @bytes_received to the total number of bytes
219/// received across all #GInputVectors.
220///
221/// This structure closely mirrors `struct mmsghdr` and `struct msghdr` from
222/// the POSIX sockets API (see `man 2 recvmmsg`).
223///
224/// If @address is non-[`None`] then it is set to the source address the message
225/// was received from, and the caller must free it afterwards.
226///
227/// If @control_messages is non-[`None`] then it is set to an array of control
228/// messages received with the message (if any), and the caller must free it
229/// afterwards. @num_control_messages is set to the number of elements in
230/// this array, which may be zero.
231///
232/// Flags relevant to this message will be returned in @flags. For example,
233/// `MSG_EOR` or `MSG_TRUNC`.
234#[doc(alias = "GInputMessage")]
235#[repr(transparent)]
236#[derive(Debug)]
237pub struct InputMessage<'m> {
238 message: ffi::GInputMessage,
239 address: PhantomData<Option<&'m mut Option<SocketAddress>>>,
240 vectors: PhantomData<&'m mut [InputVector<'m>]>,
241 control_messages: PhantomData<Option<&'m mut SocketControlMessages>>,
242}
243
244impl<'m> InputMessage<'m> {
245 pub fn new(
246 mut address: Option<&'m mut Option<SocketAddress>>,
247 vectors: &'m mut [InputVector<'m>],
248 control_messages: Option<&'m mut SocketControlMessages>,
249 ) -> Self {
250 let address = address
251 .as_mut()
252 .map(|a| {
253 assert!(a.is_none());
254 *a as *mut _ as *mut _
255 })
256 .unwrap_or_else(ptr::null_mut);
257 let (control_messages, num_control_messages) = control_messages
258 .map(|c| (&mut c.ptr as *mut _, &mut c.len as *mut _))
259 .unwrap_or_else(|| (ptr::null_mut(), ptr::null_mut()));
260 Self {
261 message: ffi::GInputMessage {
262 address,
263 vectors: vectors.as_mut_ptr() as *mut ffi::GInputVector,
264 num_vectors: vectors.len().try_into().unwrap(),
265 bytes_received: 0,
266 flags: 0,
267 control_messages,
268 num_control_messages,
269 },
270 address: PhantomData,
271 vectors: PhantomData,
272 control_messages: PhantomData,
273 }
274 }
275 #[inline]
276 pub fn vectors(&mut self) -> &'m mut [InputVector<'m>] {
277 unsafe {
278 std::slice::from_raw_parts_mut(
279 self.message.vectors as *mut _,
280 self.message.num_vectors as usize,
281 )
282 }
283 }
284 #[inline]
285 pub const fn flags(&self) -> i32 {
286 self.message.flags
287 }
288 #[inline]
289 pub const fn bytes_received(&self) -> usize {
290 self.message.bytes_received
291 }
292}
293
294#[doc(alias = "GOutputVector")]
295#[repr(transparent)]
296#[derive(Debug)]
297pub struct OutputVector<'v> {
298 vector: ffi::GOutputVector,
299 buffer: PhantomData<&'v [u8]>,
300}
301
302impl<'v> OutputVector<'v> {
303 #[inline]
304 pub const fn new(buffer: &'v [u8]) -> Self {
305 Self {
306 vector: ffi::GOutputVector {
307 buffer: buffer.as_ptr() as *const _,
308 size: buffer.len(),
309 },
310 buffer: PhantomData,
311 }
312 }
313}
314
315unsafe impl Send for OutputVector<'_> {}
316unsafe impl Sync for OutputVector<'_> {}
317
318impl std::ops::Deref for OutputVector<'_> {
319 type Target = [u8];
320
321 #[inline]
322 fn deref(&self) -> &Self::Target {
323 unsafe { std::slice::from_raw_parts(self.vector.buffer as *const _, self.vector.size) }
324 }
325}
326
327/// Structure used for scatter/gather data output when sending multiple
328/// messages or packets in one go. You generally pass in an array of
329/// #GOutputVectors and the operation will use all the buffers as if they
330/// were one buffer.
331///
332/// If @address is [`None`] then the message is sent to the default receiver
333/// (as previously set by g_socket_connect()).
334#[doc(alias = "GOutputMessage")]
335#[repr(transparent)]
336#[derive(Debug)]
337pub struct OutputMessage<'m> {
338 message: ffi::GOutputMessage,
339 address: PhantomData<Option<&'m SocketAddress>>,
340 vectors: PhantomData<&'m [OutputVector<'m>]>,
341 control_messages: PhantomData<&'m [SocketControlMessage]>,
342}
343
344impl<'m> OutputMessage<'m> {
345 pub fn new<A: IsA<SocketAddress>>(
346 address: Option<&'m A>,
347 vectors: &'m [OutputVector<'m>],
348 control_messages: &'m [SocketControlMessage],
349 ) -> Self {
350 Self {
351 message: ffi::GOutputMessage {
352 address: address
353 .map(|a| a.upcast_ref::<SocketAddress>().as_ptr())
354 .unwrap_or_else(ptr::null_mut),
355 vectors: mut_override(vectors.as_ptr() as *const ffi::GOutputVector),
356 num_vectors: vectors.len().try_into().unwrap(),
357 bytes_sent: 0,
358 control_messages: control_messages.as_ptr() as *mut _,
359 num_control_messages: control_messages.len().try_into().unwrap(),
360 },
361 address: PhantomData,
362 vectors: PhantomData,
363 control_messages: PhantomData,
364 }
365 }
366 #[inline]
367 pub fn vectors(&self) -> &'m [OutputVector<'m>] {
368 unsafe {
369 std::slice::from_raw_parts(
370 self.message.vectors as *const _,
371 self.message.num_vectors as usize,
372 )
373 }
374 }
375 #[inline]
376 pub fn bytes_sent(&self) -> u32 {
377 self.message.bytes_sent
378 }
379}
380
381pub trait SocketExtManual: IsA<Socket> + Sized {
382 /// Receive data (up to @size bytes) from a socket. This is mainly used by
383 /// connection-oriented sockets; it is identical to g_socket_receive_from()
384 /// with @address set to [`None`].
385 ///
386 /// For [`SocketType::Datagram`][crate::SocketType::Datagram] and [`SocketType::Seqpacket`][crate::SocketType::Seqpacket] sockets,
387 /// g_socket_receive() will always read either 0 or 1 complete messages from
388 /// the socket. If the received message is too large to fit in @buffer, then
389 /// the data beyond @size bytes will be discarded, without any explicit
390 /// indication that this has occurred.
391 ///
392 /// For [`SocketType::Stream`][crate::SocketType::Stream] sockets, g_socket_receive() can return any
393 /// number of bytes, up to @size. If more than @size bytes have been
394 /// received, the additional data will be returned in future calls to
395 /// g_socket_receive().
396 ///
397 /// If the socket is in blocking mode the call will block until there
398 /// is some data to receive, the connection is closed, or there is an
399 /// error. If there is no data available and the socket is in
400 /// non-blocking mode, a [`IOErrorEnum::WouldBlock`][crate::IOErrorEnum::WouldBlock] error will be
401 /// returned. To be notified when data is available, wait for the
402 /// [`glib::IOCondition::IN`][crate::glib::IOCondition::IN] condition.
403 ///
404 /// On error -1 is returned and @error is set accordingly.
405 /// ## `cancellable`
406 /// a `GCancellable` or [`None`]
407 ///
408 /// # Returns
409 ///
410 /// Number of bytes read, or 0 if the connection was closed by
411 /// the peer, or -1 on error
412 ///
413 /// ## `buffer`
414 ///
415 /// a buffer to read data into (which should be at least @size bytes long).
416 #[doc(alias = "g_socket_receive")]
417 fn receive<B: AsMut<[u8]>, C: IsA<Cancellable>>(
418 &self,
419 mut buffer: B,
420 cancellable: Option<&C>,
421 ) -> Result<usize, glib::Error> {
422 let cancellable = cancellable.map(|c| c.as_ref());
423 let gcancellable = cancellable.to_glib_none();
424 let buffer = buffer.as_mut();
425 let buffer_ptr = buffer.as_mut_ptr();
426 let count = buffer.len();
427 unsafe {
428 let mut error = ptr::null_mut();
429 let ret = ffi::g_socket_receive(
430 self.as_ref().to_glib_none().0,
431 buffer_ptr,
432 count,
433 gcancellable.0,
434 &mut error,
435 );
436 if error.is_null() {
437 Ok(ret as usize)
438 } else {
439 Err(from_glib_full(error))
440 }
441 }
442 }
443 /// Receive data (up to @size bytes) from a socket.
444 ///
445 /// If @address is non-[`None`] then @address will be set equal to the
446 /// source address of the received packet.
447 /// @address is owned by the caller.
448 ///
449 /// See g_socket_receive() for additional information.
450 /// ## `cancellable`
451 /// a `GCancellable` or [`None`]
452 ///
453 /// # Returns
454 ///
455 /// Number of bytes read, or 0 if the connection was closed by
456 /// the peer, or -1 on error
457 ///
458 /// ## `address`
459 /// a pointer to a #GSocketAddress
460 /// pointer, or [`None`]
461 ///
462 /// ## `buffer`
463 ///
464 /// a buffer to read data into (which should be at least @size bytes long).
465 #[doc(alias = "g_socket_receive_from")]
466 fn receive_from<B: AsMut<[u8]>, C: IsA<Cancellable>>(
467 &self,
468 mut buffer: B,
469 cancellable: Option<&C>,
470 ) -> Result<(usize, SocketAddress), glib::Error> {
471 let cancellable = cancellable.map(|c| c.as_ref());
472 let gcancellable = cancellable.to_glib_none();
473 let buffer = buffer.as_mut();
474 let buffer_ptr = buffer.as_mut_ptr();
475 let count = buffer.len();
476 unsafe {
477 let mut error = ptr::null_mut();
478 let mut addr_ptr = ptr::null_mut();
479
480 let ret = ffi::g_socket_receive_from(
481 self.as_ref().to_glib_none().0,
482 &mut addr_ptr,
483 buffer_ptr,
484 count,
485 gcancellable.0,
486 &mut error,
487 );
488 if error.is_null() {
489 Ok((ret as usize, from_glib_full(addr_ptr)))
490 } else {
491 Err(from_glib_full(error))
492 }
493 }
494 }
495 /// Receive data from a socket. For receiving multiple messages, see
496 /// g_socket_receive_messages(); for easier use, see
497 /// g_socket_receive() and g_socket_receive_from().
498 ///
499 /// If @address is non-[`None`] then @address will be set equal to the
500 /// source address of the received packet.
501 /// @address is owned by the caller.
502 ///
503 /// @vector must point to an array of #GInputVector structs and
504 /// @num_vectors must be the length of this array. These structs
505 /// describe the buffers that received data will be scattered into.
506 /// If @num_vectors is -1, then @vectors is assumed to be terminated
507 /// by a #GInputVector with a [`None`] buffer pointer.
508 ///
509 /// As a special case, if @num_vectors is 0 (in which case, @vectors
510 /// may of course be [`None`]), then a single byte is received and
511 /// discarded. This is to facilitate the common practice of sending a
512 /// single '\0' byte for the purposes of transferring ancillary data.
513 ///
514 /// @messages, if non-[`None`], will be set to point to a newly-allocated
515 /// array of #GSocketControlMessage instances or [`None`] if no such
516 /// messages was received. These correspond to the control messages
517 /// received from the kernel, one #GSocketControlMessage per message
518 /// from the kernel. This array is [`None`]-terminated and must be freed
519 /// by the caller using g_free() after calling g_object_unref() on each
520 /// element. If @messages is [`None`], any control messages received will
521 /// be discarded.
522 ///
523 /// @num_messages, if non-[`None`], will be set to the number of control
524 /// messages received.
525 ///
526 /// If both @messages and @num_messages are non-[`None`], then
527 /// @num_messages gives the number of #GSocketControlMessage instances
528 /// in @messages (ie: not including the [`None`] terminator).
529 ///
530 /// @flags is an in/out parameter. The commonly available arguments
531 /// for this are available in the #GSocketMsgFlags enum, but the
532 /// values there are the same as the system values, and the flags
533 /// are passed in as-is, so you can pass in system-specific flags too
534 /// (and g_socket_receive_message() may pass system-specific flags out).
535 /// Flags passed in to the parameter affect the receive operation; flags returned
536 /// out of it are relevant to the specific returned message.
537 ///
538 /// As with g_socket_receive(), data may be discarded if @self is
539 /// [`SocketType::Datagram`][crate::SocketType::Datagram] or [`SocketType::Seqpacket`][crate::SocketType::Seqpacket] and you do not
540 /// provide enough buffer space to read a complete message. You can pass
541 /// [`SocketMsgFlags::PEEK`][crate::SocketMsgFlags::PEEK] in @flags to peek at the current message without
542 /// removing it from the receive queue, but there is no portable way to find
543 /// out the length of the message other than by reading it into a
544 /// sufficiently-large buffer.
545 ///
546 /// If the socket is in blocking mode the call will block until there
547 /// is some data to receive, the connection is closed, or there is an
548 /// error. If there is no data available and the socket is in
549 /// non-blocking mode, a [`IOErrorEnum::WouldBlock`][crate::IOErrorEnum::WouldBlock] error will be
550 /// returned. To be notified when data is available, wait for the
551 /// [`glib::IOCondition::IN`][crate::glib::IOCondition::IN] condition.
552 ///
553 /// On error -1 is returned and @error is set accordingly.
554 /// ## `vectors`
555 /// an array of #GInputVector structs
556 /// ## `cancellable`
557 /// a `GCancellable` or [`None`]
558 ///
559 /// # Returns
560 ///
561 /// Number of bytes read, or 0 if the connection was closed by
562 /// the peer, or -1 on error
563 ///
564 /// ## `address`
565 /// a pointer to a #GSocketAddress
566 /// pointer, or [`None`]
567 ///
568 /// ## `messages`
569 /// a pointer
570 /// which may be filled with an array of #GSocketControlMessages, or [`None`]
571 ///
572 /// ## `flags`
573 /// a pointer to an int containing #GSocketMsgFlags flags,
574 /// which may additionally contain
575 /// [other platform specific flags](http://man7.org/linux/man-pages/man2/recv.2.html)
576 #[doc(alias = "g_socket_receive_message")]
577 fn receive_message<C: IsA<Cancellable>>(
578 &self,
579 mut address: Option<&mut Option<SocketAddress>>,
580 vectors: &mut [InputVector],
581 control_messages: Option<&mut SocketControlMessages>,
582 mut flags: i32,
583 cancellable: Option<&C>,
584 ) -> Result<(usize, i32), glib::Error> {
585 let cancellable = cancellable.map(|c| c.as_ref());
586 let address = address
587 .as_mut()
588 .map(|a| {
589 assert!(a.is_none());
590 *a as *mut _ as *mut _
591 })
592 .unwrap_or_else(ptr::null_mut);
593 let (control_messages, num_control_messages) = control_messages
594 .map(|c| (&mut c.ptr as *mut _, &mut c.len as *mut _ as *mut _))
595 .unwrap_or_else(|| (ptr::null_mut(), ptr::null_mut()));
596 unsafe {
597 let mut error = ptr::null_mut();
598
599 let received = ffi::g_socket_receive_message(
600 self.as_ref().to_glib_none().0,
601 address,
602 vectors.as_mut_ptr() as *mut ffi::GInputVector,
603 vectors.len().try_into().unwrap(),
604 control_messages,
605 num_control_messages,
606 &mut flags,
607 cancellable.to_glib_none().0,
608 &mut error,
609 );
610 if error.is_null() {
611 Ok((received as usize, flags))
612 } else {
613 Err(from_glib_full(error))
614 }
615 }
616 }
617 /// Receive multiple data messages from @self in one go. This is the most
618 /// complicated and fully-featured version of this call. For easier use, see
619 /// g_socket_receive(), g_socket_receive_from(), and g_socket_receive_message().
620 ///
621 /// @messages must point to an array of #GInputMessage structs and
622 /// @num_messages must be the length of this array. Each #GInputMessage
623 /// contains a pointer to an array of #GInputVector structs describing the
624 /// buffers that the data received in each message will be written to. Using
625 /// multiple #GInputVectors is more memory-efficient than manually copying data
626 /// out of a single buffer to multiple sources, and more system-call-efficient
627 /// than making multiple calls to g_socket_receive(), such as in scenarios where
628 /// a lot of data packets need to be received (e.g. high-bandwidth video
629 /// streaming over RTP/UDP).
630 ///
631 /// @flags modify how all messages are received. The commonly available
632 /// arguments for this are available in the #GSocketMsgFlags enum, but the
633 /// values there are the same as the system values, and the flags
634 /// are passed in as-is, so you can pass in system-specific flags too. These
635 /// flags affect the overall receive operation. Flags affecting individual
636 /// messages are returned in #GInputMessage.flags.
637 ///
638 /// The other members of #GInputMessage are treated as described in its
639 /// documentation.
640 ///
641 /// If #GSocket:blocking is [`true`] the call will block until @num_messages have
642 /// been received, or the end of the stream is reached.
643 ///
644 /// If #GSocket:blocking is [`false`] the call will return up to @num_messages
645 /// without blocking, or [`IOErrorEnum::WouldBlock`][crate::IOErrorEnum::WouldBlock] if no messages are queued in the
646 /// operating system to be received.
647 ///
648 /// In blocking mode, if #GSocket:timeout is positive and is reached before any
649 /// messages are received, [`IOErrorEnum::TimedOut`][crate::IOErrorEnum::TimedOut] is returned, otherwise up to
650 /// @num_messages are returned. (Note: This is effectively the
651 /// behaviour of `MSG_WAITFORONE` with recvmmsg().)
652 ///
653 /// To be notified when messages are available, wait for the
654 /// [`glib::IOCondition::IN`][crate::glib::IOCondition::IN] condition. Note though that you may still receive
655 /// [`IOErrorEnum::WouldBlock`][crate::IOErrorEnum::WouldBlock] from g_socket_receive_messages() even if you were
656 /// previously notified of a [`glib::IOCondition::IN`][crate::glib::IOCondition::IN] condition.
657 ///
658 /// If the remote peer closes the connection, any messages queued in the
659 /// operating system will be returned, and subsequent calls to
660 /// g_socket_receive_messages() will return 0 (with no error set).
661 ///
662 /// On error -1 is returned and @error is set accordingly. An error will only
663 /// be returned if zero messages could be received; otherwise the number of
664 /// messages successfully received before the error will be returned.
665 /// ## `messages`
666 /// an array of #GInputMessage structs
667 /// ## `flags`
668 /// an int containing #GSocketMsgFlags flags for the overall operation,
669 /// which may additionally contain
670 /// [other platform specific flags](http://man7.org/linux/man-pages/man2/recv.2.html)
671 /// ## `cancellable`
672 /// a `GCancellable` or [`None`]
673 ///
674 /// # Returns
675 ///
676 /// number of messages received, or -1 on error. Note that the number
677 /// of messages received may be smaller than @num_messages if in non-blocking
678 /// mode, if the peer closed the connection, or if @num_messages
679 /// was larger than `UIO_MAXIOV` (1024), in which case the caller may re-try
680 /// to receive the remaining messages.
681 #[doc(alias = "g_socket_receive_messages")]
682 fn receive_messages<C: IsA<Cancellable>>(
683 &self,
684 messages: &mut [InputMessage],
685 flags: i32,
686 cancellable: Option<&C>,
687 ) -> Result<usize, glib::Error> {
688 let cancellable = cancellable.map(|c| c.as_ref());
689 unsafe {
690 let mut error = ptr::null_mut();
691
692 let count = ffi::g_socket_receive_messages(
693 self.as_ref().to_glib_none().0,
694 messages.as_mut_ptr() as *mut _,
695 messages.len().try_into().unwrap(),
696 flags,
697 cancellable.to_glib_none().0,
698 &mut error,
699 );
700 if error.is_null() {
701 Ok(count as usize)
702 } else {
703 Err(from_glib_full(error))
704 }
705 }
706 }
707 /// This behaves exactly the same as g_socket_receive(), except that
708 /// the choice of blocking or non-blocking behavior is determined by
709 /// the @blocking argument rather than by @self's properties.
710 /// ## `blocking`
711 /// whether to do blocking or non-blocking I/O
712 /// ## `cancellable`
713 /// a `GCancellable` or [`None`]
714 ///
715 /// # Returns
716 ///
717 /// Number of bytes read, or 0 if the connection was closed by
718 /// the peer, or -1 on error
719 ///
720 /// ## `buffer`
721 ///
722 /// a buffer to read data into (which should be at least @size bytes long).
723 #[doc(alias = "g_socket_receive_with_blocking")]
724 fn receive_with_blocking<B: AsMut<[u8]>, C: IsA<Cancellable>>(
725 &self,
726 mut buffer: B,
727 blocking: bool,
728 cancellable: Option<&C>,
729 ) -> Result<usize, glib::Error> {
730 let cancellable = cancellable.map(|c| c.as_ref());
731 let gcancellable = cancellable.to_glib_none();
732 let buffer = buffer.as_mut();
733 let buffer_ptr = buffer.as_mut_ptr();
734 let count = buffer.len();
735 unsafe {
736 let mut error = ptr::null_mut();
737 let ret = ffi::g_socket_receive_with_blocking(
738 self.as_ref().to_glib_none().0,
739 buffer_ptr,
740 count,
741 blocking.into_glib(),
742 gcancellable.0,
743 &mut error,
744 );
745 if error.is_null() {
746 Ok(ret as usize)
747 } else {
748 Err(from_glib_full(error))
749 }
750 }
751 }
752
753 /// Tries to send @size bytes from @buffer on the socket. This is
754 /// mainly used by connection-oriented sockets; it is identical to
755 /// g_socket_send_to() with @address set to [`None`].
756 ///
757 /// If the socket is in blocking mode the call will block until there is
758 /// space for the data in the socket queue. If there is no space available
759 /// and the socket is in non-blocking mode a [`IOErrorEnum::WouldBlock`][crate::IOErrorEnum::WouldBlock] error
760 /// will be returned. To be notified when space is available, wait for the
761 /// [`glib::IOCondition::OUT`][crate::glib::IOCondition::OUT] condition. Note though that you may still receive
762 /// [`IOErrorEnum::WouldBlock`][crate::IOErrorEnum::WouldBlock] from g_socket_send() even if you were previously
763 /// notified of a [`glib::IOCondition::OUT`][crate::glib::IOCondition::OUT] condition. (On Windows in particular, this is
764 /// very common due to the way the underlying APIs work.)
765 ///
766 /// On error -1 is returned and @error is set accordingly.
767 /// ## `buffer`
768 /// the buffer
769 /// containing the data to send.
770 /// ## `cancellable`
771 /// a `GCancellable` or [`None`]
772 ///
773 /// # Returns
774 ///
775 /// Number of bytes written (which may be less than @size), or -1
776 /// on error
777 #[doc(alias = "g_socket_send")]
778 fn send<B: AsRef<[u8]>, C: IsA<Cancellable>>(
779 &self,
780 buffer: B,
781 cancellable: Option<&C>,
782 ) -> Result<usize, glib::Error> {
783 let cancellable = cancellable.map(|c| c.as_ref());
784 let gcancellable = cancellable.to_glib_none();
785 let (count, buffer_ptr) = {
786 let slice = buffer.as_ref();
787 (slice.len(), slice.as_ptr())
788 };
789 unsafe {
790 let mut error = ptr::null_mut();
791 let ret = ffi::g_socket_send(
792 self.as_ref().to_glib_none().0,
793 mut_override(buffer_ptr),
794 count,
795 gcancellable.0,
796 &mut error,
797 );
798 if error.is_null() {
799 Ok(ret as usize)
800 } else {
801 Err(from_glib_full(error))
802 }
803 }
804 }
805 /// Send data to @address on @self. For sending multiple messages see
806 /// g_socket_send_messages(); for easier use, see
807 /// g_socket_send() and g_socket_send_to().
808 ///
809 /// If @address is [`None`] then the message is sent to the default receiver
810 /// (set by g_socket_connect()).
811 ///
812 /// @vectors must point to an array of #GOutputVector structs and
813 /// @num_vectors must be the length of this array. (If @num_vectors is -1,
814 /// then @vectors is assumed to be terminated by a #GOutputVector with a
815 /// [`None`] buffer pointer.) The #GOutputVector structs describe the buffers
816 /// that the sent data will be gathered from. Using multiple
817 /// #GOutputVectors is more memory-efficient than manually copying
818 /// data from multiple sources into a single buffer, and more
819 /// network-efficient than making multiple calls to g_socket_send().
820 ///
821 /// @messages, if non-[`None`], is taken to point to an array of @num_messages
822 /// #GSocketControlMessage instances. These correspond to the control
823 /// messages to be sent on the socket.
824 /// If @num_messages is -1 then @messages is treated as a [`None`]-terminated
825 /// array.
826 ///
827 /// @flags modify how the message is sent. The commonly available arguments
828 /// for this are available in the #GSocketMsgFlags enum, but the
829 /// values there are the same as the system values, and the flags
830 /// are passed in as-is, so you can pass in system-specific flags too.
831 ///
832 /// If the socket is in blocking mode the call will block until there is
833 /// space for the data in the socket queue. If there is no space available
834 /// and the socket is in non-blocking mode a [`IOErrorEnum::WouldBlock`][crate::IOErrorEnum::WouldBlock] error
835 /// will be returned. To be notified when space is available, wait for the
836 /// [`glib::IOCondition::OUT`][crate::glib::IOCondition::OUT] condition. Note though that you may still receive
837 /// [`IOErrorEnum::WouldBlock`][crate::IOErrorEnum::WouldBlock] from g_socket_send() even if you were previously
838 /// notified of a [`glib::IOCondition::OUT`][crate::glib::IOCondition::OUT] condition. (On Windows in particular, this is
839 /// very common due to the way the underlying APIs work.)
840 ///
841 /// The sum of the sizes of each #GOutputVector in vectors must not be
842 /// greater than `G_MAXSSIZE`. If the message can be larger than this,
843 /// then it is mandatory to use the g_socket_send_message_with_timeout()
844 /// function.
845 ///
846 /// On error -1 is returned and @error is set accordingly.
847 /// ## `address`
848 /// a #GSocketAddress, or [`None`]
849 /// ## `vectors`
850 /// an array of #GOutputVector structs
851 /// ## `messages`
852 /// a pointer to an
853 /// array of #GSocketControlMessages, or [`None`].
854 /// ## `flags`
855 /// an int containing #GSocketMsgFlags flags, which may additionally
856 /// contain [other platform specific flags](http://man7.org/linux/man-pages/man2/recv.2.html)
857 /// ## `cancellable`
858 /// a `GCancellable` or [`None`]
859 ///
860 /// # Returns
861 ///
862 /// Number of bytes written (which may be less than @size), or -1
863 /// on error
864 #[doc(alias = "g_socket_send_message")]
865 fn send_message<P: IsA<SocketAddress>, C: IsA<Cancellable>>(
866 &self,
867 address: Option<&P>,
868 vectors: &[OutputVector],
869 messages: &[SocketControlMessage],
870 flags: i32,
871 cancellable: Option<&C>,
872 ) -> Result<usize, glib::Error> {
873 let cancellable = cancellable.map(|c| c.as_ref());
874 unsafe {
875 let mut error = ptr::null_mut();
876 let ret = ffi::g_socket_send_message(
877 self.as_ref().to_glib_none().0,
878 address.map(|p| p.as_ref()).to_glib_none().0,
879 vectors.as_ptr() as *mut ffi::GOutputVector,
880 vectors.len().try_into().unwrap(),
881 messages.as_ptr() as *mut _,
882 messages.len().try_into().unwrap(),
883 flags,
884 cancellable.to_glib_none().0,
885 &mut error,
886 );
887 if error.is_null() {
888 Ok(ret as usize)
889 } else {
890 Err(from_glib_full(error))
891 }
892 }
893 }
894 /// This behaves exactly the same as g_socket_send_message(), except that
895 /// the choice of timeout behavior is determined by the @timeout_us argument
896 /// rather than by @self's properties.
897 ///
898 /// On error [`PollableReturn::Failed`][crate::PollableReturn::Failed] is returned and @error is set accordingly, or
899 /// if the socket is currently not writable [`PollableReturn::WouldBlock`][crate::PollableReturn::WouldBlock] is
900 /// returned. @bytes_written will contain 0 in both cases.
901 /// ## `address`
902 /// a #GSocketAddress, or [`None`]
903 /// ## `vectors`
904 /// an array of #GOutputVector structs
905 /// ## `messages`
906 /// a pointer to an
907 /// array of #GSocketControlMessages, or [`None`].
908 /// ## `flags`
909 /// an int containing #GSocketMsgFlags flags, which may additionally
910 /// contain [other platform specific flags](http://man7.org/linux/man-pages/man2/recv.2.html)
911 /// ## `timeout_us`
912 /// the maximum time (in microseconds) to wait, or -1
913 /// ## `cancellable`
914 /// a `GCancellable` or [`None`]
915 ///
916 /// # Returns
917 ///
918 /// [`PollableReturn::Ok`][crate::PollableReturn::Ok] if all data was successfully written,
919 /// [`PollableReturn::WouldBlock`][crate::PollableReturn::WouldBlock] if the socket is currently not writable, or
920 /// [`PollableReturn::Failed`][crate::PollableReturn::Failed] if an error happened and @error is set.
921 ///
922 /// ## `bytes_written`
923 /// location to store the number of bytes that were written to the socket
924 #[cfg(feature = "v2_60")]
925 #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
926 #[doc(alias = "g_socket_send_message_with_timeout")]
927 fn send_message_with_timeout<P: IsA<SocketAddress>, C: IsA<Cancellable>>(
928 &self,
929 address: Option<&P>,
930 vectors: &[OutputVector],
931 messages: &[SocketControlMessage],
932 flags: i32,
933 timeout: Option<Duration>,
934 cancellable: Option<&C>,
935 ) -> Result<(PollableReturn, usize), glib::Error> {
936 let cancellable = cancellable.map(|c| c.as_ref());
937 unsafe {
938 let mut error = ptr::null_mut();
939 let mut bytes_written = 0;
940
941 let ret = ffi::g_socket_send_message_with_timeout(
942 self.as_ref().to_glib_none().0,
943 address.map(|p| p.as_ref()).to_glib_none().0,
944 vectors.as_ptr() as *mut ffi::GOutputVector,
945 vectors.len().try_into().unwrap(),
946 messages.as_ptr() as *mut _,
947 messages.len().try_into().unwrap(),
948 flags,
949 timeout
950 .map(|t| t.as_micros().try_into().unwrap())
951 .unwrap_or(-1),
952 &mut bytes_written,
953 cancellable.to_glib_none().0,
954 &mut error,
955 );
956 if error.is_null() {
957 Ok((from_glib(ret), bytes_written))
958 } else {
959 Err(from_glib_full(error))
960 }
961 }
962 }
963 /// Send multiple data messages from @self in one go. This is the most
964 /// complicated and fully-featured version of this call. For easier use, see
965 /// g_socket_send(), g_socket_send_to(), and g_socket_send_message().
966 ///
967 /// @messages must point to an array of #GOutputMessage structs and
968 /// @num_messages must be the length of this array. Each #GOutputMessage
969 /// contains an address to send the data to, and a pointer to an array of
970 /// #GOutputVector structs to describe the buffers that the data to be sent
971 /// for each message will be gathered from. Using multiple #GOutputVectors is
972 /// more memory-efficient than manually copying data from multiple sources
973 /// into a single buffer, and more network-efficient than making multiple
974 /// calls to g_socket_send(). Sending multiple messages in one go avoids the
975 /// overhead of making a lot of syscalls in scenarios where a lot of data
976 /// packets need to be sent (e.g. high-bandwidth video streaming over RTP/UDP),
977 /// or where the same data needs to be sent to multiple recipients.
978 ///
979 /// @flags modify how the message is sent. The commonly available arguments
980 /// for this are available in the #GSocketMsgFlags enum, but the
981 /// values there are the same as the system values, and the flags
982 /// are passed in as-is, so you can pass in system-specific flags too.
983 ///
984 /// If the socket is in blocking mode the call will block until there is
985 /// space for all the data in the socket queue. If there is no space available
986 /// and the socket is in non-blocking mode a [`IOErrorEnum::WouldBlock`][crate::IOErrorEnum::WouldBlock] error
987 /// will be returned if no data was written at all, otherwise the number of
988 /// messages sent will be returned. To be notified when space is available,
989 /// wait for the [`glib::IOCondition::OUT`][crate::glib::IOCondition::OUT] condition. Note though that you may still receive
990 /// [`IOErrorEnum::WouldBlock`][crate::IOErrorEnum::WouldBlock] from g_socket_send() even if you were previously
991 /// notified of a [`glib::IOCondition::OUT`][crate::glib::IOCondition::OUT] condition. (On Windows in particular, this is
992 /// very common due to the way the underlying APIs work.)
993 ///
994 /// On error -1 is returned and @error is set accordingly. An error will only
995 /// be returned if zero messages could be sent; otherwise the number of messages
996 /// successfully sent before the error will be returned.
997 /// ## `messages`
998 /// an array of #GOutputMessage structs
999 /// ## `flags`
1000 /// an int containing #GSocketMsgFlags flags, which may additionally
1001 /// contain [other platform specific flags](http://man7.org/linux/man-pages/man2/recv.2.html)
1002 /// ## `cancellable`
1003 /// a `GCancellable` or [`None`]
1004 ///
1005 /// # Returns
1006 ///
1007 /// number of messages sent, or -1 on error. Note that the number of
1008 /// messages sent may be smaller than @num_messages if the socket is
1009 /// non-blocking or if @num_messages was larger than UIO_MAXIOV (1024),
1010 /// in which case the caller may re-try to send the remaining messages.
1011 #[doc(alias = "g_socket_send_messages")]
1012 fn send_messages<C: IsA<Cancellable>>(
1013 &self,
1014 messages: &mut [OutputMessage],
1015 flags: i32,
1016 cancellable: Option<&C>,
1017 ) -> Result<usize, glib::Error> {
1018 let cancellable = cancellable.map(|c| c.as_ref());
1019 unsafe {
1020 let mut error = ptr::null_mut();
1021 let count = ffi::g_socket_send_messages(
1022 self.as_ref().to_glib_none().0,
1023 messages.as_mut_ptr() as *mut _,
1024 messages.len().try_into().unwrap(),
1025 flags,
1026 cancellable.to_glib_none().0,
1027 &mut error,
1028 );
1029 if error.is_null() {
1030 Ok(count as usize)
1031 } else {
1032 Err(from_glib_full(error))
1033 }
1034 }
1035 }
1036 /// Tries to send @size bytes from @buffer to @address. If @address is
1037 /// [`None`] then the message is sent to the default receiver (set by
1038 /// g_socket_connect()).
1039 ///
1040 /// See g_socket_send() for additional information.
1041 /// ## `address`
1042 /// a #GSocketAddress, or [`None`]
1043 /// ## `buffer`
1044 /// the buffer
1045 /// containing the data to send.
1046 /// ## `cancellable`
1047 /// a `GCancellable` or [`None`]
1048 ///
1049 /// # Returns
1050 ///
1051 /// Number of bytes written (which may be less than @size), or -1
1052 /// on error
1053 #[doc(alias = "g_socket_send_to")]
1054 fn send_to<B: AsRef<[u8]>, P: IsA<SocketAddress>, C: IsA<Cancellable>>(
1055 &self,
1056 address: Option<&P>,
1057 buffer: B,
1058 cancellable: Option<&C>,
1059 ) -> Result<usize, glib::Error> {
1060 let cancellable = cancellable.map(|c| c.as_ref());
1061 let gcancellable = cancellable.to_glib_none();
1062 let (count, buffer_ptr) = {
1063 let slice = buffer.as_ref();
1064 (slice.len(), slice.as_ptr())
1065 };
1066 unsafe {
1067 let mut error = ptr::null_mut();
1068
1069 let ret = ffi::g_socket_send_to(
1070 self.as_ref().to_glib_none().0,
1071 address.map(|p| p.as_ref()).to_glib_none().0,
1072 mut_override(buffer_ptr),
1073 count,
1074 gcancellable.0,
1075 &mut error,
1076 );
1077 if error.is_null() {
1078 Ok(ret as usize)
1079 } else {
1080 Err(from_glib_full(error))
1081 }
1082 }
1083 }
1084 /// This behaves exactly the same as g_socket_send(), except that
1085 /// the choice of blocking or non-blocking behavior is determined by
1086 /// the @blocking argument rather than by @self's properties.
1087 /// ## `buffer`
1088 /// the buffer
1089 /// containing the data to send.
1090 /// ## `blocking`
1091 /// whether to do blocking or non-blocking I/O
1092 /// ## `cancellable`
1093 /// a `GCancellable` or [`None`]
1094 ///
1095 /// # Returns
1096 ///
1097 /// Number of bytes written (which may be less than @size), or -1
1098 /// on error
1099 #[doc(alias = "g_socket_send_with_blocking")]
1100 fn send_with_blocking<B: AsRef<[u8]>, C: IsA<Cancellable>>(
1101 &self,
1102 buffer: B,
1103 blocking: bool,
1104 cancellable: Option<&C>,
1105 ) -> Result<usize, glib::Error> {
1106 let cancellable = cancellable.map(|c| c.as_ref());
1107 let gcancellable = cancellable.to_glib_none();
1108 let (count, buffer_ptr) = {
1109 let slice = buffer.as_ref();
1110 (slice.len(), slice.as_ptr())
1111 };
1112 unsafe {
1113 let mut error = ptr::null_mut();
1114 let ret = ffi::g_socket_send_with_blocking(
1115 self.as_ref().to_glib_none().0,
1116 mut_override(buffer_ptr),
1117 count,
1118 blocking.into_glib(),
1119 gcancellable.0,
1120 &mut error,
1121 );
1122 if error.is_null() {
1123 Ok(ret as usize)
1124 } else {
1125 Err(from_glib_full(error))
1126 }
1127 }
1128 }
1129
1130 /// Returns the underlying OS socket object. On unix this
1131 /// is a socket file descriptor, and on Windows this is
1132 /// a Winsock2 SOCKET handle. This may be useful for
1133 /// doing platform specific or otherwise unusual operations
1134 /// on the socket.
1135 ///
1136 /// # Returns
1137 ///
1138 /// the file descriptor of the socket.
1139 #[cfg(unix)]
1140 #[cfg_attr(docsrs, doc(cfg(unix)))]
1141 #[doc(alias = "get_fd")]
1142 #[doc(alias = "g_socket_get_fd")]
1143 fn fd(&self) -> BorrowedFd<'_> {
1144 self.as_ref().as_fd()
1145 }
1146
1147 #[cfg(windows)]
1148 #[cfg_attr(docsrs, doc(cfg(windows)))]
1149 #[doc(alias = "get_socket")]
1150 #[doc(alias = "g_socket_get_fd")]
1151 fn socket(&self) -> BorrowedSocket<'_> {
1152 self.as_ref().as_socket()
1153 }
1154
1155 #[doc(alias = "g_socket_create_source")]
1156 fn create_source<F, C>(
1157 &self,
1158 condition: glib::IOCondition,
1159 cancellable: Option<&C>,
1160 name: Option<&str>,
1161 priority: glib::Priority,
1162 func: F,
1163 ) -> glib::Source
1164 where
1165 F: FnMut(&Self, glib::IOCondition) -> glib::ControlFlow + 'static,
1166 C: IsA<Cancellable>,
1167 {
1168 unsafe extern "C" fn trampoline<
1169 O: IsA<Socket>,
1170 F: FnMut(&O, glib::IOCondition) -> glib::ControlFlow + 'static,
1171 >(
1172 socket: *mut ffi::GSocket,
1173 condition: glib::ffi::GIOCondition,
1174 func: glib::ffi::gpointer,
1175 ) -> glib::ffi::gboolean {
1176 unsafe {
1177 let func: &RefCell<F> = &*(func as *const RefCell<F>);
1178 let mut func = func.borrow_mut();
1179 (*func)(
1180 Socket::from_glib_borrow(socket).unsafe_cast_ref(),
1181 from_glib(condition),
1182 )
1183 .into_glib()
1184 }
1185 }
1186 unsafe extern "C" fn destroy_closure<F>(ptr: glib::ffi::gpointer) {
1187 unsafe {
1188 let _ = Box::<RefCell<F>>::from_raw(ptr as *mut _);
1189 }
1190 }
1191 let cancellable = cancellable.map(|c| c.as_ref());
1192 let gcancellable = cancellable.to_glib_none();
1193 unsafe {
1194 let source = ffi::g_socket_create_source(
1195 self.as_ref().to_glib_none().0,
1196 condition.into_glib(),
1197 gcancellable.0,
1198 );
1199 let trampoline = trampoline::<Self, F> as glib::ffi::gpointer;
1200 glib::ffi::g_source_set_callback(
1201 source,
1202 Some(transmute::<
1203 glib::ffi::gpointer,
1204 unsafe extern "C" fn(glib::ffi::gpointer) -> glib::ffi::gboolean,
1205 >(trampoline)),
1206 Box::into_raw(Box::new(RefCell::new(func))) as glib::ffi::gpointer,
1207 Some(destroy_closure::<F>),
1208 );
1209 glib::ffi::g_source_set_priority(source, priority.into_glib());
1210
1211 if let Some(name) = name {
1212 glib::ffi::g_source_set_name(source, name.to_glib_none().0);
1213 }
1214
1215 from_glib_full(source)
1216 }
1217 }
1218
1219 fn create_source_future<C: IsA<Cancellable>>(
1220 &self,
1221 condition: glib::IOCondition,
1222 cancellable: Option<&C>,
1223 priority: glib::Priority,
1224 ) -> Pin<Box<dyn std::future::Future<Output = glib::IOCondition> + 'static>> {
1225 let cancellable: Option<Cancellable> = cancellable.map(|c| c.as_ref()).cloned();
1226
1227 let obj = self.clone();
1228 Box::pin(glib::SourceFuture::new(move |send| {
1229 let mut send = Some(send);
1230 obj.create_source(
1231 condition,
1232 cancellable.as_ref(),
1233 None,
1234 priority,
1235 move |_, condition| {
1236 let _ = send.take().unwrap().send(condition);
1237 glib::ControlFlow::Break
1238 },
1239 )
1240 }))
1241 }
1242
1243 fn create_source_stream<C: IsA<Cancellable>>(
1244 &self,
1245 condition: glib::IOCondition,
1246 cancellable: Option<&C>,
1247 priority: glib::Priority,
1248 ) -> Pin<Box<dyn Stream<Item = glib::IOCondition> + 'static>> {
1249 let cancellable: Option<Cancellable> = cancellable.map(|c| c.as_ref()).cloned();
1250
1251 let obj = self.clone();
1252 Box::pin(glib::SourceStream::new(move |send| {
1253 let send = Some(send);
1254 obj.create_source(
1255 condition,
1256 cancellable.as_ref(),
1257 None,
1258 priority,
1259 move |_, condition| {
1260 if send.as_ref().unwrap().unbounded_send(condition).is_err() {
1261 glib::ControlFlow::Break
1262 } else {
1263 glib::ControlFlow::Continue
1264 }
1265 },
1266 )
1267 }))
1268 }
1269}
1270
1271impl<O: IsA<Socket>> SocketExtManual for O {}
1272
1273#[cfg(all(docsrs, not(unix)))]
1274pub trait AsRawFd {
1275 fn as_raw_fd(&self) -> RawFd;
1276}
1277
1278#[cfg(all(docsrs, not(unix)))]
1279pub type RawFd = libc::c_int;
1280
1281#[cfg(all(docsrs, not(windows)))]
1282pub trait AsRawSocket {
1283 fn as_raw_socket(&self) -> RawSocket;
1284}
1285
1286#[cfg(all(docsrs, not(windows)))]
1287pub type RawSocket = *mut std::os::raw::c_void;
1288
1289#[cfg(test)]
1290mod tests {
1291 #[test]
1292 #[cfg(unix)]
1293 fn dgram_socket_messages() {
1294 use super::Socket;
1295 use crate::{Cancellable, prelude::*};
1296
1297 let addr = crate::InetSocketAddress::from_string("127.0.0.1", 28351).unwrap();
1298
1299 let out_sock = Socket::new(
1300 crate::SocketFamily::Ipv4,
1301 crate::SocketType::Datagram,
1302 crate::SocketProtocol::Udp,
1303 )
1304 .unwrap();
1305 let in_sock = Socket::new(
1306 crate::SocketFamily::Ipv4,
1307 crate::SocketType::Datagram,
1308 crate::SocketProtocol::Udp,
1309 )
1310 .unwrap();
1311 in_sock.bind(&addr, true).unwrap();
1312
1313 const DATA: [u8; std::mem::size_of::<u64>()] = 1234u64.to_be_bytes();
1314 let out_vec = DATA;
1315 let out_vecs = [super::OutputVector::new(out_vec.as_slice())];
1316 let mut out_msg = [super::OutputMessage::new(
1317 Some(&addr),
1318 out_vecs.as_slice(),
1319 &[],
1320 )];
1321 let written = super::SocketExtManual::send_messages(
1322 &out_sock,
1323 out_msg.as_mut_slice(),
1324 0,
1325 Cancellable::NONE,
1326 )
1327 .unwrap();
1328 assert_eq!(written, 1);
1329 assert_eq!(out_msg[0].bytes_sent() as usize, out_vec.len());
1330
1331 let mut in_addr = None;
1332 let mut in_vec = [0u8; DATA.len()];
1333 let mut in_vecs = [super::InputVector::new(in_vec.as_mut_slice())];
1334 let mut in_msg = [super::InputMessage::new(
1335 Some(&mut in_addr),
1336 in_vecs.as_mut_slice(),
1337 None,
1338 )];
1339 let received = super::SocketExtManual::receive_messages(
1340 &in_sock,
1341 in_msg.as_mut_slice(),
1342 0,
1343 Cancellable::NONE,
1344 )
1345 .unwrap();
1346
1347 assert_eq!(received, 1);
1348 assert_eq!(in_msg[0].bytes_received(), in_vec.len());
1349 assert_eq!(in_vec, out_vec);
1350 let in_addr = in_addr
1351 .unwrap()
1352 .downcast::<crate::InetSocketAddress>()
1353 .unwrap();
1354 assert_eq!(in_addr.address().to_str(), addr.address().to_str());
1355 }
1356}