glib/
thread_pool.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{panic, ptr};
4
5#[cfg(feature = "futures")]
6use futures_channel::oneshot;
7#[cfg(feature = "futures")]
8use std::future::Future;
9
10use crate::{ffi, translate::*};
11
12#[derive(Debug)]
13#[doc(alias = "GThreadPool")]
14pub struct ThreadPool(ptr::NonNull<ffi::GThreadPool>);
15
16unsafe impl Send for ThreadPool {}
17unsafe impl Sync for ThreadPool {}
18
19// rustdoc-stripper-ignore-next
20/// A handle to a thread running on a [`ThreadPool`].
21///
22/// Like [`std::thread::JoinHandle`] for a GLib thread. The return value from the task can be
23/// retrieved by calling [`ThreadHandle::join`]. Dropping the handle "detaches" the thread,
24/// allowing it to complete but discarding the return value.
25#[derive(Debug)]
26pub struct ThreadHandle<T> {
27    rx: std::sync::mpsc::Receiver<std::thread::Result<T>>,
28}
29
30impl<T> ThreadHandle<T> {
31    // rustdoc-stripper-ignore-next
32    /// Waits for the associated thread to finish.
33    ///
34    /// Blocks until the associated thread returns. Returns `Ok` with the value returned from the
35    /// thread, or `Err` if the thread panicked. This function will return immediately if the
36    /// associated thread has already finished.
37    #[inline]
38    pub fn join(self) -> std::thread::Result<T> {
39        self.rx.recv().unwrap()
40    }
41}
42
43impl ThreadPool {
44    #[doc(alias = "g_thread_pool_new")]
45    pub fn shared(max_threads: Option<u32>) -> Result<Self, crate::Error> {
46        unsafe {
47            let mut err = ptr::null_mut();
48            let pool = ffi::g_thread_pool_new(
49                Some(spawn_func),
50                ptr::null_mut(),
51                max_threads.map(|v| v as i32).unwrap_or(-1),
52                ffi::GFALSE,
53                &mut err,
54            );
55            if pool.is_null() {
56                Err(from_glib_full(err))
57            } else {
58                Ok(ThreadPool(ptr::NonNull::new_unchecked(pool)))
59            }
60        }
61    }
62
63    #[doc(alias = "g_thread_pool_new")]
64    pub fn exclusive(max_threads: u32) -> Result<Self, crate::Error> {
65        unsafe {
66            let mut err = ptr::null_mut();
67            let pool = ffi::g_thread_pool_new(
68                Some(spawn_func),
69                ptr::null_mut(),
70                max_threads as i32,
71                ffi::GTRUE,
72                &mut err,
73            );
74            if pool.is_null() {
75                Err(from_glib_full(err))
76            } else {
77                Ok(ThreadPool(ptr::NonNull::new_unchecked(pool)))
78            }
79        }
80    }
81
82    #[doc(alias = "g_thread_pool_push")]
83    pub fn push<T: Send + 'static, F: FnOnce() -> T + Send + 'static>(
84        &self,
85        func: F,
86    ) -> Result<ThreadHandle<T>, crate::Error> {
87        let (tx, rx) = std::sync::mpsc::sync_channel(1);
88        unsafe {
89            let func: Box<dyn FnOnce() + Send + 'static> = Box::new(move || {
90                let _ = tx.send(panic::catch_unwind(panic::AssertUnwindSafe(func)));
91            });
92            let func = Box::new(func);
93            let mut err = ptr::null_mut();
94
95            let func = Box::into_raw(func);
96            let ret: bool = from_glib(ffi::g_thread_pool_push(
97                self.0.as_ptr(),
98                func as *mut _,
99                &mut err,
100            ));
101            if ret {
102                Ok(ThreadHandle { rx })
103            } else {
104                let _ = Box::from_raw(func);
105                Err(from_glib_full(err))
106            }
107        }
108    }
109
110    #[cfg(feature = "futures")]
111    pub fn push_future<T: Send + 'static, F: FnOnce() -> T + Send + 'static>(
112        &self,
113        func: F,
114    ) -> Result<
115        impl Future<Output = std::thread::Result<T>> + Send + Sync + 'static + use<T, F>,
116        crate::Error,
117    > {
118        let (sender, receiver) = oneshot::channel();
119
120        self.push(move || {
121            let _ = sender.send(panic::catch_unwind(panic::AssertUnwindSafe(func)));
122        })?;
123
124        Ok(async move { receiver.await.expect("Dropped before executing") })
125    }
126
127    #[doc(alias = "g_thread_pool_set_max_threads")]
128    pub fn set_max_threads(&self, max_threads: Option<u32>) -> Result<(), crate::Error> {
129        unsafe {
130            let mut err = ptr::null_mut();
131            let ret: bool = from_glib(ffi::g_thread_pool_set_max_threads(
132                self.0.as_ptr(),
133                max_threads.map(|v| v as i32).unwrap_or(-1),
134                &mut err,
135            ));
136            if ret {
137                Ok(())
138            } else {
139                Err(from_glib_full(err))
140            }
141        }
142    }
143
144    #[doc(alias = "g_thread_pool_get_max_threads")]
145    #[doc(alias = "get_max_threads")]
146    pub fn max_threads(&self) -> Option<u32> {
147        unsafe {
148            let max_threads = ffi::g_thread_pool_get_max_threads(self.0.as_ptr());
149            if max_threads == -1 {
150                None
151            } else {
152                Some(max_threads as u32)
153            }
154        }
155    }
156
157    #[doc(alias = "g_thread_pool_get_num_threads")]
158    #[doc(alias = "get_num_threads")]
159    pub fn num_threads(&self) -> u32 {
160        unsafe { ffi::g_thread_pool_get_num_threads(self.0.as_ptr()) }
161    }
162
163    #[doc(alias = "g_thread_pool_unprocessed")]
164    #[doc(alias = "get_unprocessed")]
165    pub fn unprocessed(&self) -> u32 {
166        unsafe { ffi::g_thread_pool_unprocessed(self.0.as_ptr()) }
167    }
168
169    #[doc(alias = "g_thread_pool_set_max_unused_threads")]
170    pub fn set_max_unused_threads(max_threads: Option<u32>) {
171        unsafe {
172            ffi::g_thread_pool_set_max_unused_threads(max_threads.map(|v| v as i32).unwrap_or(-1))
173        }
174    }
175
176    #[doc(alias = "g_thread_pool_get_max_unused_threads")]
177    #[doc(alias = "get_max_unused_threads")]
178    pub fn max_unused_threads() -> Option<u32> {
179        unsafe {
180            let max_unused_threads = ffi::g_thread_pool_get_max_unused_threads();
181            if max_unused_threads == -1 {
182                None
183            } else {
184                Some(max_unused_threads as u32)
185            }
186        }
187    }
188
189    #[doc(alias = "g_thread_pool_get_num_unused_threads")]
190    #[doc(alias = "get_num_unused_threads")]
191    pub fn num_unused_threads() -> u32 {
192        unsafe { ffi::g_thread_pool_get_num_unused_threads() }
193    }
194
195    #[doc(alias = "g_thread_pool_stop_unused_threads")]
196    pub fn stop_unused_threads() {
197        unsafe {
198            ffi::g_thread_pool_stop_unused_threads();
199        }
200    }
201
202    #[doc(alias = "g_thread_pool_set_max_idle_time")]
203    pub fn set_max_idle_time(max_idle_time: u32) {
204        unsafe { ffi::g_thread_pool_set_max_idle_time(max_idle_time) }
205    }
206
207    #[doc(alias = "g_thread_pool_get_max_idle_time")]
208    #[doc(alias = "get_max_idle_time")]
209    pub fn max_idle_time() -> u32 {
210        unsafe { ffi::g_thread_pool_get_max_idle_time() }
211    }
212}
213
214impl Drop for ThreadPool {
215    #[inline]
216    fn drop(&mut self) {
217        unsafe {
218            ffi::g_thread_pool_free(self.0.as_ptr(), ffi::GFALSE, ffi::GTRUE);
219        }
220    }
221}
222
223unsafe extern "C" fn spawn_func(func: ffi::gpointer, _data: ffi::gpointer) {
224    unsafe {
225        let func: Box<Box<dyn FnOnce()>> = Box::from_raw(func as *mut _);
226        func()
227    }
228}
229
230#[cfg(test)]
231mod tests {
232    use super::*;
233
234    #[test]
235    fn test_push() {
236        use std::sync::mpsc;
237
238        let p = ThreadPool::exclusive(1).unwrap();
239        let (sender, receiver) = mpsc::channel();
240
241        let handle = p
242            .push(move || {
243                sender.send(true).unwrap();
244                123
245            })
246            .unwrap();
247
248        assert_eq!(handle.join().unwrap(), 123);
249        assert_eq!(receiver.recv(), Ok(true));
250    }
251
252    #[cfg(feature = "futures")]
253    #[test]
254    fn test_push_future() {
255        let c = crate::MainContext::new();
256        let p = ThreadPool::shared(None).unwrap();
257
258        let fut = p.push_future(|| true).unwrap();
259
260        let res = c.block_on(fut);
261        assert!(res.unwrap());
262    }
263}