glib/
thread_pool.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{panic, ptr};
4
5#[cfg(feature = "futures")]
6use futures_channel::oneshot;
7#[cfg(feature = "futures")]
8use std::future::Future;
9
10use crate::{ffi, translate::*};
11
12#[derive(Debug)]
13#[doc(alias = "GThreadPool")]
14pub struct ThreadPool(ptr::NonNull<ffi::GThreadPool>);
15
16unsafe impl Send for ThreadPool {}
17unsafe impl Sync for ThreadPool {}
18
19// rustdoc-stripper-ignore-next
20/// A handle to a thread running on a [`ThreadPool`].
21///
22/// Like [`std::thread::JoinHandle`] for a GLib thread. The return value from the task can be
23/// retrieved by calling [`ThreadHandle::join`]. Dropping the handle "detaches" the thread,
24/// allowing it to complete but discarding the return value.
25#[derive(Debug)]
26pub struct ThreadHandle<T> {
27    rx: std::sync::mpsc::Receiver<std::thread::Result<T>>,
28}
29
30impl<T> ThreadHandle<T> {
31    // rustdoc-stripper-ignore-next
32    /// Waits for the associated thread to finish.
33    ///
34    /// Blocks until the associated thread returns. Returns `Ok` with the value returned from the
35    /// thread, or `Err` if the thread panicked. This function will return immediately if the
36    /// associated thread has already finished.
37    #[inline]
38    pub fn join(self) -> std::thread::Result<T> {
39        self.rx.recv().unwrap()
40    }
41}
42
43impl ThreadPool {
44    #[doc(alias = "g_thread_pool_new")]
45    pub fn shared(max_threads: Option<u32>) -> Result<Self, crate::Error> {
46        unsafe {
47            let mut err = ptr::null_mut();
48            let pool = ffi::g_thread_pool_new(
49                Some(spawn_func),
50                ptr::null_mut(),
51                max_threads.map(|v| v as i32).unwrap_or(-1),
52                ffi::GFALSE,
53                &mut err,
54            );
55            if pool.is_null() {
56                Err(from_glib_full(err))
57            } else {
58                Ok(ThreadPool(ptr::NonNull::new_unchecked(pool)))
59            }
60        }
61    }
62
63    #[doc(alias = "g_thread_pool_new")]
64    pub fn exclusive(max_threads: u32) -> Result<Self, crate::Error> {
65        unsafe {
66            let mut err = ptr::null_mut();
67            let pool = ffi::g_thread_pool_new(
68                Some(spawn_func),
69                ptr::null_mut(),
70                max_threads as i32,
71                ffi::GTRUE,
72                &mut err,
73            );
74            if pool.is_null() {
75                Err(from_glib_full(err))
76            } else {
77                Ok(ThreadPool(ptr::NonNull::new_unchecked(pool)))
78            }
79        }
80    }
81
82    #[doc(alias = "g_thread_pool_push")]
83    pub fn push<T: Send + 'static, F: FnOnce() -> T + Send + 'static>(
84        &self,
85        func: F,
86    ) -> Result<ThreadHandle<T>, crate::Error> {
87        let (tx, rx) = std::sync::mpsc::sync_channel(1);
88        unsafe {
89            let func: Box<dyn FnOnce() + Send + 'static> = Box::new(move || {
90                let _ = tx.send(panic::catch_unwind(panic::AssertUnwindSafe(func)));
91            });
92            let func = Box::new(func);
93            let mut err = ptr::null_mut();
94
95            let func = Box::into_raw(func);
96            let ret: bool = from_glib(ffi::g_thread_pool_push(
97                self.0.as_ptr(),
98                func as *mut _,
99                &mut err,
100            ));
101            if ret {
102                Ok(ThreadHandle { rx })
103            } else {
104                let _ = Box::from_raw(func);
105                Err(from_glib_full(err))
106            }
107        }
108    }
109
110    #[cfg(feature = "futures")]
111    pub fn push_future<T: Send + 'static, F: FnOnce() -> T + Send + 'static>(
112        &self,
113        func: F,
114    ) -> Result<impl Future<Output = std::thread::Result<T>> + Send + Sync + 'static, crate::Error>
115    {
116        let (sender, receiver) = oneshot::channel();
117
118        self.push(move || {
119            let _ = sender.send(panic::catch_unwind(panic::AssertUnwindSafe(func)));
120        })?;
121
122        Ok(async move { receiver.await.expect("Dropped before executing") })
123    }
124
125    #[doc(alias = "g_thread_pool_set_max_threads")]
126    pub fn set_max_threads(&self, max_threads: Option<u32>) -> Result<(), crate::Error> {
127        unsafe {
128            let mut err = ptr::null_mut();
129            let ret: bool = from_glib(ffi::g_thread_pool_set_max_threads(
130                self.0.as_ptr(),
131                max_threads.map(|v| v as i32).unwrap_or(-1),
132                &mut err,
133            ));
134            if ret {
135                Ok(())
136            } else {
137                Err(from_glib_full(err))
138            }
139        }
140    }
141
142    #[doc(alias = "g_thread_pool_get_max_threads")]
143    #[doc(alias = "get_max_threads")]
144    pub fn max_threads(&self) -> Option<u32> {
145        unsafe {
146            let max_threads = ffi::g_thread_pool_get_max_threads(self.0.as_ptr());
147            if max_threads == -1 {
148                None
149            } else {
150                Some(max_threads as u32)
151            }
152        }
153    }
154
155    #[doc(alias = "g_thread_pool_get_num_threads")]
156    #[doc(alias = "get_num_threads")]
157    pub fn num_threads(&self) -> u32 {
158        unsafe { ffi::g_thread_pool_get_num_threads(self.0.as_ptr()) }
159    }
160
161    #[doc(alias = "g_thread_pool_unprocessed")]
162    #[doc(alias = "get_unprocessed")]
163    pub fn unprocessed(&self) -> u32 {
164        unsafe { ffi::g_thread_pool_unprocessed(self.0.as_ptr()) }
165    }
166
167    #[doc(alias = "g_thread_pool_set_max_unused_threads")]
168    pub fn set_max_unused_threads(max_threads: Option<u32>) {
169        unsafe {
170            ffi::g_thread_pool_set_max_unused_threads(max_threads.map(|v| v as i32).unwrap_or(-1))
171        }
172    }
173
174    #[doc(alias = "g_thread_pool_get_max_unused_threads")]
175    #[doc(alias = "get_max_unused_threads")]
176    pub fn max_unused_threads() -> Option<u32> {
177        unsafe {
178            let max_unused_threads = ffi::g_thread_pool_get_max_unused_threads();
179            if max_unused_threads == -1 {
180                None
181            } else {
182                Some(max_unused_threads as u32)
183            }
184        }
185    }
186
187    #[doc(alias = "g_thread_pool_get_num_unused_threads")]
188    #[doc(alias = "get_num_unused_threads")]
189    pub fn num_unused_threads() -> u32 {
190        unsafe { ffi::g_thread_pool_get_num_unused_threads() }
191    }
192
193    #[doc(alias = "g_thread_pool_stop_unused_threads")]
194    pub fn stop_unused_threads() {
195        unsafe {
196            ffi::g_thread_pool_stop_unused_threads();
197        }
198    }
199
200    #[doc(alias = "g_thread_pool_set_max_idle_time")]
201    pub fn set_max_idle_time(max_idle_time: u32) {
202        unsafe { ffi::g_thread_pool_set_max_idle_time(max_idle_time) }
203    }
204
205    #[doc(alias = "g_thread_pool_get_max_idle_time")]
206    #[doc(alias = "get_max_idle_time")]
207    pub fn max_idle_time() -> u32 {
208        unsafe { ffi::g_thread_pool_get_max_idle_time() }
209    }
210}
211
212impl Drop for ThreadPool {
213    #[inline]
214    fn drop(&mut self) {
215        unsafe {
216            ffi::g_thread_pool_free(self.0.as_ptr(), ffi::GFALSE, ffi::GTRUE);
217        }
218    }
219}
220
221unsafe extern "C" fn spawn_func(func: ffi::gpointer, _data: ffi::gpointer) {
222    let func: Box<Box<dyn FnOnce()>> = Box::from_raw(func as *mut _);
223    func()
224}
225
226#[cfg(test)]
227mod tests {
228    use super::*;
229
230    #[test]
231    fn test_push() {
232        use std::sync::mpsc;
233
234        let p = ThreadPool::exclusive(1).unwrap();
235        let (sender, receiver) = mpsc::channel();
236
237        let handle = p
238            .push(move || {
239                sender.send(true).unwrap();
240                123
241            })
242            .unwrap();
243
244        assert_eq!(handle.join().unwrap(), 123);
245        assert_eq!(receiver.recv(), Ok(true));
246    }
247
248    #[cfg(feature = "futures")]
249    #[test]
250    fn test_push_future() {
251        let c = crate::MainContext::new();
252        let p = ThreadPool::shared(None).unwrap();
253
254        let fut = p.push_future(|| true).unwrap();
255
256        let res = c.block_on(fut);
257        assert!(res.unwrap());
258    }
259}