gio/
socket.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3#[cfg(unix)]
4use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
5#[cfg(windows)]
6use std::os::windows::io::{
7    AsRawSocket, AsSocket, BorrowedSocket, FromRawSocket, IntoRawSocket, OwnedSocket, RawSocket,
8};
9#[cfg(feature = "v2_60")]
10use std::time::Duration;
11use std::{cell::RefCell, marker::PhantomData, mem::transmute, pin::Pin, ptr};
12
13use futures_core::stream::Stream;
14use glib::{prelude::*, translate::*, Slice};
15
16#[cfg(feature = "v2_60")]
17use crate::PollableReturn;
18use crate::{ffi, Cancellable, Socket, SocketAddress, SocketControlMessage};
19
20impl Socket {
21    /// Creates a new #GSocket from a native file descriptor
22    /// or winsock SOCKET handle.
23    ///
24    /// This reads all the settings from the file descriptor so that
25    /// all properties should work. Note that the file descriptor
26    /// will be set to non-blocking mode, independent on the blocking
27    /// mode of the #GSocket.
28    ///
29    /// On success, the returned #GSocket takes ownership of @fd. On failure, the
30    /// caller must close @fd themselves.
31    ///
32    /// Since GLib 2.46, it is no longer a fatal error to call this on a non-socket
33    /// descriptor.  Instead, a GError will be set with code [`IOErrorEnum::Failed`][crate::IOErrorEnum::Failed]
34    /// ## `fd`
35    /// a native socket file descriptor.
36    ///
37    /// # Returns
38    ///
39    /// a #GSocket or [`None`] on error.
40    ///     Free the returned object with g_object_unref().
41    #[cfg(unix)]
42    #[cfg_attr(docsrs, doc(cfg(unix)))]
43    #[doc(alias = "g_socket_new_from_fd")]
44    pub fn from_fd(fd: OwnedFd) -> Result<Socket, glib::Error> {
45        let fd = fd.into_raw_fd();
46        let mut error = ptr::null_mut();
47        unsafe {
48            let ret = ffi::g_socket_new_from_fd(fd, &mut error);
49            if error.is_null() {
50                Ok(from_glib_full(ret))
51            } else {
52                let _ = OwnedFd::from_raw_fd(fd);
53                Err(from_glib_full(error))
54            }
55        }
56    }
57    #[cfg(windows)]
58    #[cfg_attr(docsrs, doc(cfg(windows)))]
59    pub fn from_socket(socket: OwnedSocket) -> Result<Socket, glib::Error> {
60        let socket = socket.into_raw_socket();
61        let mut error = ptr::null_mut();
62        unsafe {
63            let ret = ffi::g_socket_new_from_fd(socket as i32, &mut error);
64            if error.is_null() {
65                Ok(from_glib_full(ret))
66            } else {
67                let _ = OwnedSocket::from_raw_socket(socket);
68                Err(from_glib_full(error))
69            }
70        }
71    }
72}
73
74#[cfg(unix)]
75#[cfg_attr(docsrs, doc(cfg(unix)))]
76impl AsRawFd for Socket {
77    fn as_raw_fd(&self) -> RawFd {
78        unsafe { ffi::g_socket_get_fd(self.to_glib_none().0) as _ }
79    }
80}
81
82#[cfg(unix)]
83#[cfg_attr(docsrs, doc(cfg(unix)))]
84impl AsFd for Socket {
85    fn as_fd(&self) -> BorrowedFd<'_> {
86        unsafe {
87            let raw_fd = self.as_raw_fd();
88            BorrowedFd::borrow_raw(raw_fd)
89        }
90    }
91}
92
93#[cfg(windows)]
94#[cfg_attr(docsrs, doc(cfg(windows)))]
95impl AsRawSocket for Socket {
96    fn as_raw_socket(&self) -> RawSocket {
97        unsafe { ffi::g_socket_get_fd(self.to_glib_none().0) as _ }
98    }
99}
100
101#[cfg(windows)]
102#[cfg_attr(docsrs, doc(cfg(windows)))]
103impl AsSocket for Socket {
104    fn as_socket(&self) -> BorrowedSocket<'_> {
105        unsafe {
106            let raw_socket = self.as_raw_socket();
107            BorrowedSocket::borrow_raw(raw_socket)
108        }
109    }
110}
111
112#[doc(alias = "GInputVector")]
113#[repr(transparent)]
114#[derive(Debug)]
115pub struct InputVector<'v> {
116    vector: ffi::GInputVector,
117    buffer: PhantomData<&'v mut [u8]>,
118}
119
120impl<'v> InputVector<'v> {
121    #[inline]
122    pub fn new(buffer: &'v mut [u8]) -> Self {
123        Self {
124            vector: ffi::GInputVector {
125                buffer: buffer.as_mut_ptr() as *mut _,
126                size: buffer.len(),
127            },
128            buffer: PhantomData,
129        }
130    }
131}
132
133unsafe impl Send for InputVector<'_> {}
134unsafe impl Sync for InputVector<'_> {}
135
136impl std::ops::Deref for InputVector<'_> {
137    type Target = [u8];
138
139    #[inline]
140    fn deref(&self) -> &Self::Target {
141        unsafe { std::slice::from_raw_parts(self.vector.buffer as *const _, self.vector.size) }
142    }
143}
144
145impl std::ops::DerefMut for InputVector<'_> {
146    #[inline]
147    fn deref_mut(&mut self) -> &mut Self::Target {
148        unsafe { std::slice::from_raw_parts_mut(self.vector.buffer as *mut _, self.vector.size) }
149    }
150}
151
152#[derive(Debug)]
153pub struct SocketControlMessages {
154    ptr: *mut *mut ffi::GSocketControlMessage,
155    len: u32,
156}
157
158impl SocketControlMessages {
159    #[inline]
160    pub const fn new() -> Self {
161        Self {
162            ptr: ptr::null_mut(),
163            len: 0,
164        }
165    }
166}
167
168impl AsRef<[SocketControlMessage]> for SocketControlMessages {
169    #[inline]
170    fn as_ref(&self) -> &[SocketControlMessage] {
171        unsafe { std::slice::from_raw_parts(self.ptr as *const _, self.len as usize) }
172    }
173}
174
175impl std::ops::Deref for SocketControlMessages {
176    type Target = [SocketControlMessage];
177
178    #[inline]
179    fn deref(&self) -> &Self::Target {
180        self.as_ref()
181    }
182}
183
184impl Default for SocketControlMessages {
185    #[inline]
186    fn default() -> Self {
187        Self::new()
188    }
189}
190
191impl Drop for SocketControlMessages {
192    #[inline]
193    fn drop(&mut self) {
194        unsafe {
195            let _: Slice<SocketControlMessage> =
196                Slice::from_glib_full_num(self.ptr as *mut _, self.len as usize);
197        }
198    }
199}
200
201/// Structure used for scatter/gather data input when receiving multiple
202/// messages or packets in one go. You generally pass in an array of empty
203/// #GInputVectors and the operation will use all the buffers as if they
204/// were one buffer, and will set @bytes_received to the total number of bytes
205/// received across all #GInputVectors.
206///
207/// This structure closely mirrors `struct mmsghdr` and `struct msghdr` from
208/// the POSIX sockets API (see `man 2 recvmmsg`).
209///
210/// If @address is non-[`None`] then it is set to the source address the message
211/// was received from, and the caller must free it afterwards.
212///
213/// If @control_messages is non-[`None`] then it is set to an array of control
214/// messages received with the message (if any), and the caller must free it
215/// afterwards. @num_control_messages is set to the number of elements in
216/// this array, which may be zero.
217///
218/// Flags relevant to this message will be returned in @flags. For example,
219/// `MSG_EOR` or `MSG_TRUNC`.
220#[doc(alias = "GInputMessage")]
221#[repr(transparent)]
222#[derive(Debug)]
223pub struct InputMessage<'m> {
224    message: ffi::GInputMessage,
225    address: PhantomData<Option<&'m mut Option<SocketAddress>>>,
226    vectors: PhantomData<&'m mut [InputVector<'m>]>,
227    control_messages: PhantomData<Option<&'m mut SocketControlMessages>>,
228}
229
230impl<'m> InputMessage<'m> {
231    pub fn new(
232        mut address: Option<&'m mut Option<SocketAddress>>,
233        vectors: &'m mut [InputVector<'m>],
234        control_messages: Option<&'m mut SocketControlMessages>,
235    ) -> Self {
236        let address = address
237            .as_mut()
238            .map(|a| {
239                assert!(a.is_none());
240                *a as *mut _ as *mut _
241            })
242            .unwrap_or_else(ptr::null_mut);
243        let (control_messages, num_control_messages) = control_messages
244            .map(|c| (&mut c.ptr as *mut _, &mut c.len as *mut _))
245            .unwrap_or_else(|| (ptr::null_mut(), ptr::null_mut()));
246        Self {
247            message: ffi::GInputMessage {
248                address,
249                vectors: vectors.as_mut_ptr() as *mut ffi::GInputVector,
250                num_vectors: vectors.len().try_into().unwrap(),
251                bytes_received: 0,
252                flags: 0,
253                control_messages,
254                num_control_messages,
255            },
256            address: PhantomData,
257            vectors: PhantomData,
258            control_messages: PhantomData,
259        }
260    }
261    #[inline]
262    pub fn vectors(&mut self) -> &'m mut [InputVector<'m>] {
263        unsafe {
264            std::slice::from_raw_parts_mut(
265                self.message.vectors as *mut _,
266                self.message.num_vectors as usize,
267            )
268        }
269    }
270    #[inline]
271    pub const fn flags(&self) -> i32 {
272        self.message.flags
273    }
274    #[inline]
275    pub const fn bytes_received(&self) -> usize {
276        self.message.bytes_received
277    }
278}
279
280#[doc(alias = "GOutputVector")]
281#[repr(transparent)]
282#[derive(Debug)]
283pub struct OutputVector<'v> {
284    vector: ffi::GOutputVector,
285    buffer: PhantomData<&'v [u8]>,
286}
287
288impl<'v> OutputVector<'v> {
289    #[inline]
290    pub const fn new(buffer: &'v [u8]) -> Self {
291        Self {
292            vector: ffi::GOutputVector {
293                buffer: buffer.as_ptr() as *const _,
294                size: buffer.len(),
295            },
296            buffer: PhantomData,
297        }
298    }
299}
300
301unsafe impl Send for OutputVector<'_> {}
302unsafe impl Sync for OutputVector<'_> {}
303
304impl std::ops::Deref for OutputVector<'_> {
305    type Target = [u8];
306
307    #[inline]
308    fn deref(&self) -> &Self::Target {
309        unsafe { std::slice::from_raw_parts(self.vector.buffer as *const _, self.vector.size) }
310    }
311}
312
313/// Structure used for scatter/gather data output when sending multiple
314/// messages or packets in one go. You generally pass in an array of
315/// #GOutputVectors and the operation will use all the buffers as if they
316/// were one buffer.
317///
318/// If @address is [`None`] then the message is sent to the default receiver
319/// (as previously set by g_socket_connect()).
320#[doc(alias = "GOutputMessage")]
321#[repr(transparent)]
322#[derive(Debug)]
323pub struct OutputMessage<'m> {
324    message: ffi::GOutputMessage,
325    address: PhantomData<Option<&'m SocketAddress>>,
326    vectors: PhantomData<&'m [OutputVector<'m>]>,
327    control_messages: PhantomData<&'m [SocketControlMessage]>,
328}
329
330impl<'m> OutputMessage<'m> {
331    pub fn new<A: IsA<SocketAddress>>(
332        address: Option<&'m A>,
333        vectors: &'m [OutputVector<'m>],
334        control_messages: &'m [SocketControlMessage],
335    ) -> Self {
336        Self {
337            message: ffi::GOutputMessage {
338                address: address
339                    .map(|a| a.upcast_ref::<SocketAddress>().as_ptr())
340                    .unwrap_or_else(ptr::null_mut),
341                vectors: mut_override(vectors.as_ptr() as *const ffi::GOutputVector),
342                num_vectors: vectors.len().try_into().unwrap(),
343                bytes_sent: 0,
344                control_messages: control_messages.as_ptr() as *mut _,
345                num_control_messages: control_messages.len().try_into().unwrap(),
346            },
347            address: PhantomData,
348            vectors: PhantomData,
349            control_messages: PhantomData,
350        }
351    }
352    #[inline]
353    pub fn vectors(&self) -> &'m [OutputVector<'m>] {
354        unsafe {
355            std::slice::from_raw_parts(
356                self.message.vectors as *const _,
357                self.message.num_vectors as usize,
358            )
359        }
360    }
361    #[inline]
362    pub fn bytes_sent(&self) -> u32 {
363        self.message.bytes_sent
364    }
365}
366
367pub trait SocketExtManual: IsA<Socket> + Sized {
368    /// Receive data (up to @size bytes) from a socket. This is mainly used by
369    /// connection-oriented sockets; it is identical to g_socket_receive_from()
370    /// with @address set to [`None`].
371    ///
372    /// For [`SocketType::Datagram`][crate::SocketType::Datagram] and [`SocketType::Seqpacket`][crate::SocketType::Seqpacket] sockets,
373    /// g_socket_receive() will always read either 0 or 1 complete messages from
374    /// the socket. If the received message is too large to fit in @buffer, then
375    /// the data beyond @size bytes will be discarded, without any explicit
376    /// indication that this has occurred.
377    ///
378    /// For [`SocketType::Stream`][crate::SocketType::Stream] sockets, g_socket_receive() can return any
379    /// number of bytes, up to @size. If more than @size bytes have been
380    /// received, the additional data will be returned in future calls to
381    /// g_socket_receive().
382    ///
383    /// If the socket is in blocking mode the call will block until there
384    /// is some data to receive, the connection is closed, or there is an
385    /// error. If there is no data available and the socket is in
386    /// non-blocking mode, a [`IOErrorEnum::WouldBlock`][crate::IOErrorEnum::WouldBlock] error will be
387    /// returned. To be notified when data is available, wait for the
388    /// [`glib::IOCondition::IN`][crate::glib::IOCondition::IN] condition.
389    ///
390    /// On error -1 is returned and @error is set accordingly.
391    /// ## `cancellable`
392    /// a `GCancellable` or [`None`]
393    ///
394    /// # Returns
395    ///
396    /// Number of bytes read, or 0 if the connection was closed by
397    /// the peer, or -1 on error
398    ///
399    /// ## `buffer`
400    ///
401    ///     a buffer to read data into (which should be at least @size bytes long).
402    #[doc(alias = "g_socket_receive")]
403    fn receive<B: AsMut<[u8]>, C: IsA<Cancellable>>(
404        &self,
405        mut buffer: B,
406        cancellable: Option<&C>,
407    ) -> Result<usize, glib::Error> {
408        let cancellable = cancellable.map(|c| c.as_ref());
409        let gcancellable = cancellable.to_glib_none();
410        let buffer = buffer.as_mut();
411        let buffer_ptr = buffer.as_mut_ptr();
412        let count = buffer.len();
413        unsafe {
414            let mut error = ptr::null_mut();
415            let ret = ffi::g_socket_receive(
416                self.as_ref().to_glib_none().0,
417                buffer_ptr,
418                count,
419                gcancellable.0,
420                &mut error,
421            );
422            if error.is_null() {
423                Ok(ret as usize)
424            } else {
425                Err(from_glib_full(error))
426            }
427        }
428    }
429    /// Receive data (up to @size bytes) from a socket.
430    ///
431    /// If @address is non-[`None`] then @address will be set equal to the
432    /// source address of the received packet.
433    /// @address is owned by the caller.
434    ///
435    /// See g_socket_receive() for additional information.
436    /// ## `cancellable`
437    /// a `GCancellable` or [`None`]
438    ///
439    /// # Returns
440    ///
441    /// Number of bytes read, or 0 if the connection was closed by
442    /// the peer, or -1 on error
443    ///
444    /// ## `address`
445    /// a pointer to a #GSocketAddress
446    ///     pointer, or [`None`]
447    ///
448    /// ## `buffer`
449    ///
450    ///     a buffer to read data into (which should be at least @size bytes long).
451    #[doc(alias = "g_socket_receive_from")]
452    fn receive_from<B: AsMut<[u8]>, C: IsA<Cancellable>>(
453        &self,
454        mut buffer: B,
455        cancellable: Option<&C>,
456    ) -> Result<(usize, SocketAddress), glib::Error> {
457        let cancellable = cancellable.map(|c| c.as_ref());
458        let gcancellable = cancellable.to_glib_none();
459        let buffer = buffer.as_mut();
460        let buffer_ptr = buffer.as_mut_ptr();
461        let count = buffer.len();
462        unsafe {
463            let mut error = ptr::null_mut();
464            let mut addr_ptr = ptr::null_mut();
465
466            let ret = ffi::g_socket_receive_from(
467                self.as_ref().to_glib_none().0,
468                &mut addr_ptr,
469                buffer_ptr,
470                count,
471                gcancellable.0,
472                &mut error,
473            );
474            if error.is_null() {
475                Ok((ret as usize, from_glib_full(addr_ptr)))
476            } else {
477                Err(from_glib_full(error))
478            }
479        }
480    }
481    /// Receive data from a socket.  For receiving multiple messages, see
482    /// g_socket_receive_messages(); for easier use, see
483    /// g_socket_receive() and g_socket_receive_from().
484    ///
485    /// If @address is non-[`None`] then @address will be set equal to the
486    /// source address of the received packet.
487    /// @address is owned by the caller.
488    ///
489    /// @vector must point to an array of #GInputVector structs and
490    /// @num_vectors must be the length of this array.  These structs
491    /// describe the buffers that received data will be scattered into.
492    /// If @num_vectors is -1, then @vectors is assumed to be terminated
493    /// by a #GInputVector with a [`None`] buffer pointer.
494    ///
495    /// As a special case, if @num_vectors is 0 (in which case, @vectors
496    /// may of course be [`None`]), then a single byte is received and
497    /// discarded. This is to facilitate the common practice of sending a
498    /// single '\0' byte for the purposes of transferring ancillary data.
499    ///
500    /// @messages, if non-[`None`], will be set to point to a newly-allocated
501    /// array of #GSocketControlMessage instances or [`None`] if no such
502    /// messages was received. These correspond to the control messages
503    /// received from the kernel, one #GSocketControlMessage per message
504    /// from the kernel. This array is [`None`]-terminated and must be freed
505    /// by the caller using g_free() after calling g_object_unref() on each
506    /// element. If @messages is [`None`], any control messages received will
507    /// be discarded.
508    ///
509    /// @num_messages, if non-[`None`], will be set to the number of control
510    /// messages received.
511    ///
512    /// If both @messages and @num_messages are non-[`None`], then
513    /// @num_messages gives the number of #GSocketControlMessage instances
514    /// in @messages (ie: not including the [`None`] terminator).
515    ///
516    /// @flags is an in/out parameter. The commonly available arguments
517    /// for this are available in the #GSocketMsgFlags enum, but the
518    /// values there are the same as the system values, and the flags
519    /// are passed in as-is, so you can pass in system-specific flags too
520    /// (and g_socket_receive_message() may pass system-specific flags out).
521    /// Flags passed in to the parameter affect the receive operation; flags returned
522    /// out of it are relevant to the specific returned message.
523    ///
524    /// As with g_socket_receive(), data may be discarded if @self is
525    /// [`SocketType::Datagram`][crate::SocketType::Datagram] or [`SocketType::Seqpacket`][crate::SocketType::Seqpacket] and you do not
526    /// provide enough buffer space to read a complete message. You can pass
527    /// [`SocketMsgFlags::PEEK`][crate::SocketMsgFlags::PEEK] in @flags to peek at the current message without
528    /// removing it from the receive queue, but there is no portable way to find
529    /// out the length of the message other than by reading it into a
530    /// sufficiently-large buffer.
531    ///
532    /// If the socket is in blocking mode the call will block until there
533    /// is some data to receive, the connection is closed, or there is an
534    /// error. If there is no data available and the socket is in
535    /// non-blocking mode, a [`IOErrorEnum::WouldBlock`][crate::IOErrorEnum::WouldBlock] error will be
536    /// returned. To be notified when data is available, wait for the
537    /// [`glib::IOCondition::IN`][crate::glib::IOCondition::IN] condition.
538    ///
539    /// On error -1 is returned and @error is set accordingly.
540    /// ## `vectors`
541    /// an array of #GInputVector structs
542    /// ## `flags`
543    /// a pointer to an int containing #GSocketMsgFlags flags,
544    ///    which may additionally contain
545    ///    [other platform specific flags](http://man7.org/linux/man-pages/man2/recv.2.html)
546    /// ## `cancellable`
547    /// a `GCancellable` or [`None`]
548    ///
549    /// # Returns
550    ///
551    /// Number of bytes read, or 0 if the connection was closed by
552    /// the peer, or -1 on error
553    ///
554    /// ## `address`
555    /// a pointer to a #GSocketAddress
556    ///     pointer, or [`None`]
557    ///
558    /// ## `messages`
559    /// a pointer
560    ///    which may be filled with an array of #GSocketControlMessages, or [`None`]
561    #[doc(alias = "g_socket_receive_message")]
562    fn receive_message<C: IsA<Cancellable>>(
563        &self,
564        mut address: Option<&mut Option<SocketAddress>>,
565        vectors: &mut [InputVector],
566        control_messages: Option<&mut SocketControlMessages>,
567        mut flags: i32,
568        cancellable: Option<&C>,
569    ) -> Result<(usize, i32), glib::Error> {
570        let cancellable = cancellable.map(|c| c.as_ref());
571        let address = address
572            .as_mut()
573            .map(|a| {
574                assert!(a.is_none());
575                *a as *mut _ as *mut _
576            })
577            .unwrap_or_else(ptr::null_mut);
578        let (control_messages, num_control_messages) = control_messages
579            .map(|c| (&mut c.ptr as *mut _, &mut c.len as *mut _ as *mut _))
580            .unwrap_or_else(|| (ptr::null_mut(), ptr::null_mut()));
581        unsafe {
582            let mut error = ptr::null_mut();
583
584            let received = ffi::g_socket_receive_message(
585                self.as_ref().to_glib_none().0,
586                address,
587                vectors.as_mut_ptr() as *mut ffi::GInputVector,
588                vectors.len().try_into().unwrap(),
589                control_messages,
590                num_control_messages,
591                &mut flags,
592                cancellable.to_glib_none().0,
593                &mut error,
594            );
595            if error.is_null() {
596                Ok((received as usize, flags))
597            } else {
598                Err(from_glib_full(error))
599            }
600        }
601    }
602    /// Receive multiple data messages from @self in one go.  This is the most
603    /// complicated and fully-featured version of this call. For easier use, see
604    /// g_socket_receive(), g_socket_receive_from(), and g_socket_receive_message().
605    ///
606    /// @messages must point to an array of #GInputMessage structs and
607    /// @num_messages must be the length of this array. Each #GInputMessage
608    /// contains a pointer to an array of #GInputVector structs describing the
609    /// buffers that the data received in each message will be written to. Using
610    /// multiple #GInputVectors is more memory-efficient than manually copying data
611    /// out of a single buffer to multiple sources, and more system-call-efficient
612    /// than making multiple calls to g_socket_receive(), such as in scenarios where
613    /// a lot of data packets need to be received (e.g. high-bandwidth video
614    /// streaming over RTP/UDP).
615    ///
616    /// @flags modify how all messages are received. The commonly available
617    /// arguments for this are available in the #GSocketMsgFlags enum, but the
618    /// values there are the same as the system values, and the flags
619    /// are passed in as-is, so you can pass in system-specific flags too. These
620    /// flags affect the overall receive operation. Flags affecting individual
621    /// messages are returned in #GInputMessage.flags.
622    ///
623    /// The other members of #GInputMessage are treated as described in its
624    /// documentation.
625    ///
626    /// If #GSocket:blocking is [`true`] the call will block until @num_messages have
627    /// been received, or the end of the stream is reached.
628    ///
629    /// If #GSocket:blocking is [`false`] the call will return up to @num_messages
630    /// without blocking, or [`IOErrorEnum::WouldBlock`][crate::IOErrorEnum::WouldBlock] if no messages are queued in the
631    /// operating system to be received.
632    ///
633    /// In blocking mode, if #GSocket:timeout is positive and is reached before any
634    /// messages are received, [`IOErrorEnum::TimedOut`][crate::IOErrorEnum::TimedOut] is returned, otherwise up to
635    /// @num_messages are returned. (Note: This is effectively the
636    /// behaviour of `MSG_WAITFORONE` with recvmmsg().)
637    ///
638    /// To be notified when messages are available, wait for the
639    /// [`glib::IOCondition::IN`][crate::glib::IOCondition::IN] condition. Note though that you may still receive
640    /// [`IOErrorEnum::WouldBlock`][crate::IOErrorEnum::WouldBlock] from g_socket_receive_messages() even if you were
641    /// previously notified of a [`glib::IOCondition::IN`][crate::glib::IOCondition::IN] condition.
642    ///
643    /// If the remote peer closes the connection, any messages queued in the
644    /// operating system will be returned, and subsequent calls to
645    /// g_socket_receive_messages() will return 0 (with no error set).
646    ///
647    /// On error -1 is returned and @error is set accordingly. An error will only
648    /// be returned if zero messages could be received; otherwise the number of
649    /// messages successfully received before the error will be returned.
650    /// ## `messages`
651    /// an array of #GInputMessage structs
652    /// ## `flags`
653    /// an int containing #GSocketMsgFlags flags for the overall operation,
654    ///    which may additionally contain
655    ///    [other platform specific flags](http://man7.org/linux/man-pages/man2/recv.2.html)
656    /// ## `cancellable`
657    /// a `GCancellable` or [`None`]
658    ///
659    /// # Returns
660    ///
661    /// number of messages received, or -1 on error. Note that the number
662    ///     of messages received may be smaller than @num_messages if in non-blocking
663    ///     mode, if the peer closed the connection, or if @num_messages
664    ///     was larger than `UIO_MAXIOV` (1024), in which case the caller may re-try
665    ///     to receive the remaining messages.
666    #[doc(alias = "g_socket_receive_messages")]
667    fn receive_messages<C: IsA<Cancellable>>(
668        &self,
669        messages: &mut [InputMessage],
670        flags: i32,
671        cancellable: Option<&C>,
672    ) -> Result<usize, glib::Error> {
673        let cancellable = cancellable.map(|c| c.as_ref());
674        unsafe {
675            let mut error = ptr::null_mut();
676
677            let count = ffi::g_socket_receive_messages(
678                self.as_ref().to_glib_none().0,
679                messages.as_mut_ptr() as *mut _,
680                messages.len().try_into().unwrap(),
681                flags,
682                cancellable.to_glib_none().0,
683                &mut error,
684            );
685            if error.is_null() {
686                Ok(count as usize)
687            } else {
688                Err(from_glib_full(error))
689            }
690        }
691    }
692    /// This behaves exactly the same as g_socket_receive(), except that
693    /// the choice of blocking or non-blocking behavior is determined by
694    /// the @blocking argument rather than by @self's properties.
695    /// ## `blocking`
696    /// whether to do blocking or non-blocking I/O
697    /// ## `cancellable`
698    /// a `GCancellable` or [`None`]
699    ///
700    /// # Returns
701    ///
702    /// Number of bytes read, or 0 if the connection was closed by
703    /// the peer, or -1 on error
704    ///
705    /// ## `buffer`
706    ///
707    ///     a buffer to read data into (which should be at least @size bytes long).
708    #[doc(alias = "g_socket_receive_with_blocking")]
709    fn receive_with_blocking<B: AsMut<[u8]>, C: IsA<Cancellable>>(
710        &self,
711        mut buffer: B,
712        blocking: bool,
713        cancellable: Option<&C>,
714    ) -> Result<usize, glib::Error> {
715        let cancellable = cancellable.map(|c| c.as_ref());
716        let gcancellable = cancellable.to_glib_none();
717        let buffer = buffer.as_mut();
718        let buffer_ptr = buffer.as_mut_ptr();
719        let count = buffer.len();
720        unsafe {
721            let mut error = ptr::null_mut();
722            let ret = ffi::g_socket_receive_with_blocking(
723                self.as_ref().to_glib_none().0,
724                buffer_ptr,
725                count,
726                blocking.into_glib(),
727                gcancellable.0,
728                &mut error,
729            );
730            if error.is_null() {
731                Ok(ret as usize)
732            } else {
733                Err(from_glib_full(error))
734            }
735        }
736    }
737
738    /// Tries to send @size bytes from @buffer on the socket. This is
739    /// mainly used by connection-oriented sockets; it is identical to
740    /// g_socket_send_to() with @address set to [`None`].
741    ///
742    /// If the socket is in blocking mode the call will block until there is
743    /// space for the data in the socket queue. If there is no space available
744    /// and the socket is in non-blocking mode a [`IOErrorEnum::WouldBlock`][crate::IOErrorEnum::WouldBlock] error
745    /// will be returned. To be notified when space is available, wait for the
746    /// [`glib::IOCondition::OUT`][crate::glib::IOCondition::OUT] condition. Note though that you may still receive
747    /// [`IOErrorEnum::WouldBlock`][crate::IOErrorEnum::WouldBlock] from g_socket_send() even if you were previously
748    /// notified of a [`glib::IOCondition::OUT`][crate::glib::IOCondition::OUT] condition. (On Windows in particular, this is
749    /// very common due to the way the underlying APIs work.)
750    ///
751    /// On error -1 is returned and @error is set accordingly.
752    /// ## `buffer`
753    /// the buffer
754    ///     containing the data to send.
755    /// ## `cancellable`
756    /// a `GCancellable` or [`None`]
757    ///
758    /// # Returns
759    ///
760    /// Number of bytes written (which may be less than @size), or -1
761    /// on error
762    #[doc(alias = "g_socket_send")]
763    fn send<B: AsRef<[u8]>, C: IsA<Cancellable>>(
764        &self,
765        buffer: B,
766        cancellable: Option<&C>,
767    ) -> Result<usize, glib::Error> {
768        let cancellable = cancellable.map(|c| c.as_ref());
769        let gcancellable = cancellable.to_glib_none();
770        let (count, buffer_ptr) = {
771            let slice = buffer.as_ref();
772            (slice.len(), slice.as_ptr())
773        };
774        unsafe {
775            let mut error = ptr::null_mut();
776            let ret = ffi::g_socket_send(
777                self.as_ref().to_glib_none().0,
778                mut_override(buffer_ptr),
779                count,
780                gcancellable.0,
781                &mut error,
782            );
783            if error.is_null() {
784                Ok(ret as usize)
785            } else {
786                Err(from_glib_full(error))
787            }
788        }
789    }
790    /// Send data to @address on @self.  For sending multiple messages see
791    /// g_socket_send_messages(); for easier use, see
792    /// g_socket_send() and g_socket_send_to().
793    ///
794    /// If @address is [`None`] then the message is sent to the default receiver
795    /// (set by g_socket_connect()).
796    ///
797    /// @vectors must point to an array of #GOutputVector structs and
798    /// @num_vectors must be the length of this array. (If @num_vectors is -1,
799    /// then @vectors is assumed to be terminated by a #GOutputVector with a
800    /// [`None`] buffer pointer.) The #GOutputVector structs describe the buffers
801    /// that the sent data will be gathered from. Using multiple
802    /// #GOutputVectors is more memory-efficient than manually copying
803    /// data from multiple sources into a single buffer, and more
804    /// network-efficient than making multiple calls to g_socket_send().
805    ///
806    /// @messages, if non-[`None`], is taken to point to an array of @num_messages
807    /// #GSocketControlMessage instances. These correspond to the control
808    /// messages to be sent on the socket.
809    /// If @num_messages is -1 then @messages is treated as a [`None`]-terminated
810    /// array.
811    ///
812    /// @flags modify how the message is sent. The commonly available arguments
813    /// for this are available in the #GSocketMsgFlags enum, but the
814    /// values there are the same as the system values, and the flags
815    /// are passed in as-is, so you can pass in system-specific flags too.
816    ///
817    /// If the socket is in blocking mode the call will block until there is
818    /// space for the data in the socket queue. If there is no space available
819    /// and the socket is in non-blocking mode a [`IOErrorEnum::WouldBlock`][crate::IOErrorEnum::WouldBlock] error
820    /// will be returned. To be notified when space is available, wait for the
821    /// [`glib::IOCondition::OUT`][crate::glib::IOCondition::OUT] condition. Note though that you may still receive
822    /// [`IOErrorEnum::WouldBlock`][crate::IOErrorEnum::WouldBlock] from g_socket_send() even if you were previously
823    /// notified of a [`glib::IOCondition::OUT`][crate::glib::IOCondition::OUT] condition. (On Windows in particular, this is
824    /// very common due to the way the underlying APIs work.)
825    ///
826    /// The sum of the sizes of each #GOutputVector in vectors must not be
827    /// greater than `G_MAXSSIZE`. If the message can be larger than this,
828    /// then it is mandatory to use the g_socket_send_message_with_timeout()
829    /// function.
830    ///
831    /// On error -1 is returned and @error is set accordingly.
832    /// ## `address`
833    /// a #GSocketAddress, or [`None`]
834    /// ## `vectors`
835    /// an array of #GOutputVector structs
836    /// ## `messages`
837    /// a pointer to an
838    ///   array of #GSocketControlMessages, or [`None`].
839    /// ## `flags`
840    /// an int containing #GSocketMsgFlags flags, which may additionally
841    ///    contain [other platform specific flags](http://man7.org/linux/man-pages/man2/recv.2.html)
842    /// ## `cancellable`
843    /// a `GCancellable` or [`None`]
844    ///
845    /// # Returns
846    ///
847    /// Number of bytes written (which may be less than @size), or -1
848    /// on error
849    #[doc(alias = "g_socket_send_message")]
850    fn send_message<P: IsA<SocketAddress>, C: IsA<Cancellable>>(
851        &self,
852        address: Option<&P>,
853        vectors: &[OutputVector],
854        messages: &[SocketControlMessage],
855        flags: i32,
856        cancellable: Option<&C>,
857    ) -> Result<usize, glib::Error> {
858        let cancellable = cancellable.map(|c| c.as_ref());
859        unsafe {
860            let mut error = ptr::null_mut();
861            let ret = ffi::g_socket_send_message(
862                self.as_ref().to_glib_none().0,
863                address.map(|p| p.as_ref()).to_glib_none().0,
864                vectors.as_ptr() as *mut ffi::GOutputVector,
865                vectors.len().try_into().unwrap(),
866                messages.as_ptr() as *mut _,
867                messages.len().try_into().unwrap(),
868                flags,
869                cancellable.to_glib_none().0,
870                &mut error,
871            );
872            if error.is_null() {
873                Ok(ret as usize)
874            } else {
875                Err(from_glib_full(error))
876            }
877        }
878    }
879    /// This behaves exactly the same as g_socket_send_message(), except that
880    /// the choice of timeout behavior is determined by the @timeout_us argument
881    /// rather than by @self's properties.
882    ///
883    /// On error [`PollableReturn::Failed`][crate::PollableReturn::Failed] is returned and @error is set accordingly, or
884    /// if the socket is currently not writable [`PollableReturn::WouldBlock`][crate::PollableReturn::WouldBlock] is
885    /// returned. @bytes_written will contain 0 in both cases.
886    /// ## `address`
887    /// a #GSocketAddress, or [`None`]
888    /// ## `vectors`
889    /// an array of #GOutputVector structs
890    /// ## `messages`
891    /// a pointer to an
892    ///   array of #GSocketControlMessages, or [`None`].
893    /// ## `flags`
894    /// an int containing #GSocketMsgFlags flags, which may additionally
895    ///    contain [other platform specific flags](http://man7.org/linux/man-pages/man2/recv.2.html)
896    /// ## `timeout_us`
897    /// the maximum time (in microseconds) to wait, or -1
898    /// ## `cancellable`
899    /// a `GCancellable` or [`None`]
900    ///
901    /// # Returns
902    ///
903    /// [`PollableReturn::Ok`][crate::PollableReturn::Ok] if all data was successfully written,
904    /// [`PollableReturn::WouldBlock`][crate::PollableReturn::WouldBlock] if the socket is currently not writable, or
905    /// [`PollableReturn::Failed`][crate::PollableReturn::Failed] if an error happened and @error is set.
906    ///
907    /// ## `bytes_written`
908    /// location to store the number of bytes that were written to the socket
909    #[cfg(feature = "v2_60")]
910    #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
911    #[doc(alias = "g_socket_send_message_with_timeout")]
912    fn send_message_with_timeout<P: IsA<SocketAddress>, C: IsA<Cancellable>>(
913        &self,
914        address: Option<&P>,
915        vectors: &[OutputVector],
916        messages: &[SocketControlMessage],
917        flags: i32,
918        timeout: Option<Duration>,
919        cancellable: Option<&C>,
920    ) -> Result<(PollableReturn, usize), glib::Error> {
921        let cancellable = cancellable.map(|c| c.as_ref());
922        unsafe {
923            let mut error = ptr::null_mut();
924            let mut bytes_written = 0;
925
926            let ret = ffi::g_socket_send_message_with_timeout(
927                self.as_ref().to_glib_none().0,
928                address.map(|p| p.as_ref()).to_glib_none().0,
929                vectors.as_ptr() as *mut ffi::GOutputVector,
930                vectors.len().try_into().unwrap(),
931                messages.as_ptr() as *mut _,
932                messages.len().try_into().unwrap(),
933                flags,
934                timeout
935                    .map(|t| t.as_micros().try_into().unwrap())
936                    .unwrap_or(-1),
937                &mut bytes_written,
938                cancellable.to_glib_none().0,
939                &mut error,
940            );
941            if error.is_null() {
942                Ok((from_glib(ret), bytes_written))
943            } else {
944                Err(from_glib_full(error))
945            }
946        }
947    }
948    /// Send multiple data messages from @self in one go.  This is the most
949    /// complicated and fully-featured version of this call. For easier use, see
950    /// g_socket_send(), g_socket_send_to(), and g_socket_send_message().
951    ///
952    /// @messages must point to an array of #GOutputMessage structs and
953    /// @num_messages must be the length of this array. Each #GOutputMessage
954    /// contains an address to send the data to, and a pointer to an array of
955    /// #GOutputVector structs to describe the buffers that the data to be sent
956    /// for each message will be gathered from. Using multiple #GOutputVectors is
957    /// more memory-efficient than manually copying data from multiple sources
958    /// into a single buffer, and more network-efficient than making multiple
959    /// calls to g_socket_send(). Sending multiple messages in one go avoids the
960    /// overhead of making a lot of syscalls in scenarios where a lot of data
961    /// packets need to be sent (e.g. high-bandwidth video streaming over RTP/UDP),
962    /// or where the same data needs to be sent to multiple recipients.
963    ///
964    /// @flags modify how the message is sent. The commonly available arguments
965    /// for this are available in the #GSocketMsgFlags enum, but the
966    /// values there are the same as the system values, and the flags
967    /// are passed in as-is, so you can pass in system-specific flags too.
968    ///
969    /// If the socket is in blocking mode the call will block until there is
970    /// space for all the data in the socket queue. If there is no space available
971    /// and the socket is in non-blocking mode a [`IOErrorEnum::WouldBlock`][crate::IOErrorEnum::WouldBlock] error
972    /// will be returned if no data was written at all, otherwise the number of
973    /// messages sent will be returned. To be notified when space is available,
974    /// wait for the [`glib::IOCondition::OUT`][crate::glib::IOCondition::OUT] condition. Note though that you may still receive
975    /// [`IOErrorEnum::WouldBlock`][crate::IOErrorEnum::WouldBlock] from g_socket_send() even if you were previously
976    /// notified of a [`glib::IOCondition::OUT`][crate::glib::IOCondition::OUT] condition. (On Windows in particular, this is
977    /// very common due to the way the underlying APIs work.)
978    ///
979    /// On error -1 is returned and @error is set accordingly. An error will only
980    /// be returned if zero messages could be sent; otherwise the number of messages
981    /// successfully sent before the error will be returned.
982    /// ## `messages`
983    /// an array of #GOutputMessage structs
984    /// ## `flags`
985    /// an int containing #GSocketMsgFlags flags, which may additionally
986    ///    contain [other platform specific flags](http://man7.org/linux/man-pages/man2/recv.2.html)
987    /// ## `cancellable`
988    /// a `GCancellable` or [`None`]
989    ///
990    /// # Returns
991    ///
992    /// number of messages sent, or -1 on error. Note that the number of
993    ///     messages sent may be smaller than @num_messages if the socket is
994    ///     non-blocking or if @num_messages was larger than UIO_MAXIOV (1024),
995    ///     in which case the caller may re-try to send the remaining messages.
996    #[doc(alias = "g_socket_send_messages")]
997    fn send_messages<C: IsA<Cancellable>>(
998        &self,
999        messages: &mut [OutputMessage],
1000        flags: i32,
1001        cancellable: Option<&C>,
1002    ) -> Result<usize, glib::Error> {
1003        let cancellable = cancellable.map(|c| c.as_ref());
1004        unsafe {
1005            let mut error = ptr::null_mut();
1006            let count = ffi::g_socket_send_messages(
1007                self.as_ref().to_glib_none().0,
1008                messages.as_mut_ptr() as *mut _,
1009                messages.len().try_into().unwrap(),
1010                flags,
1011                cancellable.to_glib_none().0,
1012                &mut error,
1013            );
1014            if error.is_null() {
1015                Ok(count as usize)
1016            } else {
1017                Err(from_glib_full(error))
1018            }
1019        }
1020    }
1021    /// Tries to send @size bytes from @buffer to @address. If @address is
1022    /// [`None`] then the message is sent to the default receiver (set by
1023    /// g_socket_connect()).
1024    ///
1025    /// See g_socket_send() for additional information.
1026    /// ## `address`
1027    /// a #GSocketAddress, or [`None`]
1028    /// ## `buffer`
1029    /// the buffer
1030    ///     containing the data to send.
1031    /// ## `cancellable`
1032    /// a `GCancellable` or [`None`]
1033    ///
1034    /// # Returns
1035    ///
1036    /// Number of bytes written (which may be less than @size), or -1
1037    /// on error
1038    #[doc(alias = "g_socket_send_to")]
1039    fn send_to<B: AsRef<[u8]>, P: IsA<SocketAddress>, C: IsA<Cancellable>>(
1040        &self,
1041        address: Option<&P>,
1042        buffer: B,
1043        cancellable: Option<&C>,
1044    ) -> Result<usize, glib::Error> {
1045        let cancellable = cancellable.map(|c| c.as_ref());
1046        let gcancellable = cancellable.to_glib_none();
1047        let (count, buffer_ptr) = {
1048            let slice = buffer.as_ref();
1049            (slice.len(), slice.as_ptr())
1050        };
1051        unsafe {
1052            let mut error = ptr::null_mut();
1053
1054            let ret = ffi::g_socket_send_to(
1055                self.as_ref().to_glib_none().0,
1056                address.map(|p| p.as_ref()).to_glib_none().0,
1057                mut_override(buffer_ptr),
1058                count,
1059                gcancellable.0,
1060                &mut error,
1061            );
1062            if error.is_null() {
1063                Ok(ret as usize)
1064            } else {
1065                Err(from_glib_full(error))
1066            }
1067        }
1068    }
1069    /// This behaves exactly the same as g_socket_send(), except that
1070    /// the choice of blocking or non-blocking behavior is determined by
1071    /// the @blocking argument rather than by @self's properties.
1072    /// ## `buffer`
1073    /// the buffer
1074    ///     containing the data to send.
1075    /// ## `blocking`
1076    /// whether to do blocking or non-blocking I/O
1077    /// ## `cancellable`
1078    /// a `GCancellable` or [`None`]
1079    ///
1080    /// # Returns
1081    ///
1082    /// Number of bytes written (which may be less than @size), or -1
1083    /// on error
1084    #[doc(alias = "g_socket_send_with_blocking")]
1085    fn send_with_blocking<B: AsRef<[u8]>, C: IsA<Cancellable>>(
1086        &self,
1087        buffer: B,
1088        blocking: bool,
1089        cancellable: Option<&C>,
1090    ) -> Result<usize, glib::Error> {
1091        let cancellable = cancellable.map(|c| c.as_ref());
1092        let gcancellable = cancellable.to_glib_none();
1093        let (count, buffer_ptr) = {
1094            let slice = buffer.as_ref();
1095            (slice.len(), slice.as_ptr())
1096        };
1097        unsafe {
1098            let mut error = ptr::null_mut();
1099            let ret = ffi::g_socket_send_with_blocking(
1100                self.as_ref().to_glib_none().0,
1101                mut_override(buffer_ptr),
1102                count,
1103                blocking.into_glib(),
1104                gcancellable.0,
1105                &mut error,
1106            );
1107            if error.is_null() {
1108                Ok(ret as usize)
1109            } else {
1110                Err(from_glib_full(error))
1111            }
1112        }
1113    }
1114
1115    /// Returns the underlying OS socket object. On unix this
1116    /// is a socket file descriptor, and on Windows this is
1117    /// a Winsock2 SOCKET handle. This may be useful for
1118    /// doing platform specific or otherwise unusual operations
1119    /// on the socket.
1120    ///
1121    /// # Returns
1122    ///
1123    /// the file descriptor of the socket.
1124    #[cfg(unix)]
1125    #[cfg_attr(docsrs, doc(cfg(unix)))]
1126    #[doc(alias = "get_fd")]
1127    #[doc(alias = "g_socket_get_fd")]
1128    fn fd(&self) -> BorrowedFd<'_> {
1129        self.as_ref().as_fd()
1130    }
1131
1132    #[cfg(windows)]
1133    #[cfg_attr(docsrs, doc(cfg(windows)))]
1134    #[doc(alias = "get_socket")]
1135    #[doc(alias = "g_socket_get_fd")]
1136    fn socket(&self) -> BorrowedSocket<'_> {
1137        self.as_ref().as_socket()
1138    }
1139
1140    #[doc(alias = "g_socket_create_source")]
1141    fn create_source<F, C>(
1142        &self,
1143        condition: glib::IOCondition,
1144        cancellable: Option<&C>,
1145        name: Option<&str>,
1146        priority: glib::Priority,
1147        func: F,
1148    ) -> glib::Source
1149    where
1150        F: FnMut(&Self, glib::IOCondition) -> glib::ControlFlow + 'static,
1151        C: IsA<Cancellable>,
1152    {
1153        unsafe extern "C" fn trampoline<
1154            O: IsA<Socket>,
1155            F: FnMut(&O, glib::IOCondition) -> glib::ControlFlow + 'static,
1156        >(
1157            socket: *mut ffi::GSocket,
1158            condition: glib::ffi::GIOCondition,
1159            func: glib::ffi::gpointer,
1160        ) -> glib::ffi::gboolean {
1161            let func: &RefCell<F> = &*(func as *const RefCell<F>);
1162            let mut func = func.borrow_mut();
1163            (*func)(
1164                Socket::from_glib_borrow(socket).unsafe_cast_ref(),
1165                from_glib(condition),
1166            )
1167            .into_glib()
1168        }
1169        unsafe extern "C" fn destroy_closure<F>(ptr: glib::ffi::gpointer) {
1170            let _ = Box::<RefCell<F>>::from_raw(ptr as *mut _);
1171        }
1172        let cancellable = cancellable.map(|c| c.as_ref());
1173        let gcancellable = cancellable.to_glib_none();
1174        unsafe {
1175            let source = ffi::g_socket_create_source(
1176                self.as_ref().to_glib_none().0,
1177                condition.into_glib(),
1178                gcancellable.0,
1179            );
1180            let trampoline = trampoline::<Self, F> as glib::ffi::gpointer;
1181            glib::ffi::g_source_set_callback(
1182                source,
1183                Some(transmute::<
1184                    glib::ffi::gpointer,
1185                    unsafe extern "C" fn(glib::ffi::gpointer) -> glib::ffi::gboolean,
1186                >(trampoline)),
1187                Box::into_raw(Box::new(RefCell::new(func))) as glib::ffi::gpointer,
1188                Some(destroy_closure::<F>),
1189            );
1190            glib::ffi::g_source_set_priority(source, priority.into_glib());
1191
1192            if let Some(name) = name {
1193                glib::ffi::g_source_set_name(source, name.to_glib_none().0);
1194            }
1195
1196            from_glib_full(source)
1197        }
1198    }
1199
1200    fn create_source_future<C: IsA<Cancellable>>(
1201        &self,
1202        condition: glib::IOCondition,
1203        cancellable: Option<&C>,
1204        priority: glib::Priority,
1205    ) -> Pin<Box<dyn std::future::Future<Output = glib::IOCondition> + 'static>> {
1206        let cancellable: Option<Cancellable> = cancellable.map(|c| c.as_ref()).cloned();
1207
1208        let obj = self.clone();
1209        Box::pin(glib::SourceFuture::new(move |send| {
1210            let mut send = Some(send);
1211            obj.create_source(
1212                condition,
1213                cancellable.as_ref(),
1214                None,
1215                priority,
1216                move |_, condition| {
1217                    let _ = send.take().unwrap().send(condition);
1218                    glib::ControlFlow::Break
1219                },
1220            )
1221        }))
1222    }
1223
1224    fn create_source_stream<C: IsA<Cancellable>>(
1225        &self,
1226        condition: glib::IOCondition,
1227        cancellable: Option<&C>,
1228        priority: glib::Priority,
1229    ) -> Pin<Box<dyn Stream<Item = glib::IOCondition> + 'static>> {
1230        let cancellable: Option<Cancellable> = cancellable.map(|c| c.as_ref()).cloned();
1231
1232        let obj = self.clone();
1233        Box::pin(glib::SourceStream::new(move |send| {
1234            let send = Some(send);
1235            obj.create_source(
1236                condition,
1237                cancellable.as_ref(),
1238                None,
1239                priority,
1240                move |_, condition| {
1241                    if send.as_ref().unwrap().unbounded_send(condition).is_err() {
1242                        glib::ControlFlow::Break
1243                    } else {
1244                        glib::ControlFlow::Continue
1245                    }
1246                },
1247            )
1248        }))
1249    }
1250}
1251
1252impl<O: IsA<Socket>> SocketExtManual for O {}
1253
1254#[cfg(all(docsrs, not(unix)))]
1255pub trait IntoRawFd {
1256    fn into_raw_fd(self) -> libc::c_int;
1257}
1258
1259#[cfg(all(docsrs, not(unix)))]
1260pub trait FromRawFd {
1261    unsafe fn from_raw_fd(fd: libc::c_int) -> Self;
1262}
1263
1264#[cfg(all(docsrs, not(unix)))]
1265pub trait AsRawFd {
1266    fn as_raw_fd(&self) -> RawFd;
1267}
1268
1269#[cfg(all(docsrs, not(unix)))]
1270pub type RawFd = libc::c_int;
1271
1272#[cfg(all(docsrs, not(windows)))]
1273pub trait IntoRawSocket {
1274    fn into_raw_socket(self) -> u64;
1275}
1276
1277#[cfg(all(docsrs, not(windows)))]
1278pub trait FromRawSocket {
1279    unsafe fn from_raw_socket(sock: u64) -> Self;
1280}
1281
1282#[cfg(all(docsrs, not(windows)))]
1283pub trait AsRawSocket {
1284    fn as_raw_socket(&self) -> RawSocket;
1285}
1286
1287#[cfg(all(docsrs, not(windows)))]
1288pub type RawSocket = *mut std::os::raw::c_void;
1289
1290#[cfg(test)]
1291mod tests {
1292    #[test]
1293    #[cfg(unix)]
1294    fn socket_messages() {
1295        use std::{
1296            io,
1297            os::unix::io::{AsRawFd, FromRawFd, OwnedFd},
1298        };
1299
1300        use super::Socket;
1301        use crate::{prelude::*, Cancellable, UnixFDMessage};
1302
1303        let mut fds = [0 as libc::c_int; 2];
1304        let (out_sock, in_sock) = unsafe {
1305            let ret = libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr());
1306            if ret != 0 {
1307                panic!("{}", io::Error::last_os_error());
1308            }
1309            (
1310                Socket::from_fd(OwnedFd::from_raw_fd(fds[0])).unwrap(),
1311                Socket::from_fd(OwnedFd::from_raw_fd(fds[1])).unwrap(),
1312            )
1313        };
1314
1315        let fd_msg = UnixFDMessage::new();
1316        fd_msg.append_fd(out_sock.as_raw_fd()).unwrap();
1317        let vs = [super::OutputVector::new(&[0])];
1318        let ctrl_msgs = [fd_msg.upcast()];
1319        let mut out_msg = [super::OutputMessage::new(
1320            crate::SocketAddress::NONE,
1321            vs.as_slice(),
1322            ctrl_msgs.as_slice(),
1323        )];
1324        let written = super::SocketExtManual::send_messages(
1325            &out_sock,
1326            out_msg.as_mut_slice(),
1327            0,
1328            Cancellable::NONE,
1329        )
1330        .unwrap();
1331        assert_eq!(written, 1);
1332        assert_eq!(out_msg[0].bytes_sent(), 1);
1333
1334        let mut v = [0u8];
1335        let mut vs = [super::InputVector::new(v.as_mut_slice())];
1336        let mut ctrl_msgs = super::SocketControlMessages::new();
1337        let mut in_msg = [super::InputMessage::new(
1338            None,
1339            vs.as_mut_slice(),
1340            Some(&mut ctrl_msgs),
1341        )];
1342        let received = super::SocketExtManual::receive_messages(
1343            &in_sock,
1344            in_msg.as_mut_slice(),
1345            0,
1346            Cancellable::NONE,
1347        )
1348        .unwrap();
1349
1350        assert_eq!(received, 1);
1351        assert_eq!(in_msg[0].bytes_received(), 1);
1352        assert_eq!(ctrl_msgs.len(), 1);
1353        let fds = ctrl_msgs[0]
1354            .downcast_ref::<UnixFDMessage>()
1355            .unwrap()
1356            .fd_list();
1357        assert_eq!(fds.length(), 1);
1358    }
1359    #[test]
1360    #[cfg(unix)]
1361    fn dgram_socket_messages() {
1362        use super::Socket;
1363        use crate::{prelude::*, Cancellable};
1364
1365        let addr = crate::InetSocketAddress::from_string("127.0.0.1", 28351).unwrap();
1366
1367        let out_sock = Socket::new(
1368            crate::SocketFamily::Ipv4,
1369            crate::SocketType::Datagram,
1370            crate::SocketProtocol::Udp,
1371        )
1372        .unwrap();
1373        let in_sock = Socket::new(
1374            crate::SocketFamily::Ipv4,
1375            crate::SocketType::Datagram,
1376            crate::SocketProtocol::Udp,
1377        )
1378        .unwrap();
1379        in_sock.bind(&addr, true).unwrap();
1380
1381        const DATA: [u8; std::mem::size_of::<u64>()] = 1234u64.to_be_bytes();
1382        let out_vec = DATA;
1383        let out_vecs = [super::OutputVector::new(out_vec.as_slice())];
1384        let mut out_msg = [super::OutputMessage::new(
1385            Some(&addr),
1386            out_vecs.as_slice(),
1387            &[],
1388        )];
1389        let written = super::SocketExtManual::send_messages(
1390            &out_sock,
1391            out_msg.as_mut_slice(),
1392            0,
1393            Cancellable::NONE,
1394        )
1395        .unwrap();
1396        assert_eq!(written, 1);
1397        assert_eq!(out_msg[0].bytes_sent() as usize, out_vec.len());
1398
1399        let mut in_addr = None;
1400        let mut in_vec = [0u8; DATA.len()];
1401        let mut in_vecs = [super::InputVector::new(in_vec.as_mut_slice())];
1402        let mut in_msg = [super::InputMessage::new(
1403            Some(&mut in_addr),
1404            in_vecs.as_mut_slice(),
1405            None,
1406        )];
1407        let received = super::SocketExtManual::receive_messages(
1408            &in_sock,
1409            in_msg.as_mut_slice(),
1410            0,
1411            Cancellable::NONE,
1412        )
1413        .unwrap();
1414
1415        assert_eq!(received, 1);
1416        assert_eq!(in_msg[0].bytes_received(), in_vec.len());
1417        assert_eq!(in_vec, out_vec);
1418        let in_addr = in_addr
1419            .unwrap()
1420            .downcast::<crate::InetSocketAddress>()
1421            .unwrap();
1422        assert_eq!(in_addr.address().to_str(), addr.address().to_str());
1423    }
1424}