Skip to main content

glib/
thread_pool.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{panic, ptr};
4
5#[cfg(feature = "futures")]
6use futures_channel::oneshot;
7#[cfg(feature = "futures")]
8use std::future::Future;
9
10use crate::{ffi, translate::*};
11
12/// The `GThreadPool` struct represents a thread pool.
13///
14/// A thread pool is useful when you wish to asynchronously fork out the execution of work
15/// and continue working in your own thread. If that will happen often, the overhead of starting
16/// and destroying a thread each time might be too high. In such cases reusing already started
17/// threads seems like a good idea. And it indeed is, but implementing this can be tedious
18/// and error-prone.
19///
20/// Therefore GLib provides thread pools for your convenience. An added advantage is, that the
21/// threads can be shared between the different subsystems of your program, when they are using GLib.
22///
23/// To create a new thread pool, you use [`new()`][Self::new()].
24/// It is destroyed by `GLib::ThreadPool::free()`.
25///
26/// If you want to execute a certain task within a thread pool, use [`push()`][Self::push()].
27///
28/// To get the current number of running threads you call [`num_threads()`][Self::num_threads()].
29/// To get the number of still unprocessed tasks you call [`unprocessed()`][Self::unprocessed()].
30/// To control the maximum number of threads for a thread pool, you use
31/// [`max_threads()`][Self::max_threads()]. and [`set_max_threads()`][Self::set_max_threads()].
32///
33/// Finally you can control the number of unused threads, that are kept alive by GLib for future use.
34/// The current number can be fetched with [`num_unused_threads()`][Self::num_unused_threads()].
35/// The maximum number can be controlled by [`max_unused_threads()`][Self::max_unused_threads()] and
36/// [`set_max_unused_threads()`][Self::set_max_unused_threads()]. All currently unused threads
37/// can be stopped by calling [`stop_unused_threads()`][Self::stop_unused_threads()].
38#[derive(Debug)]
39#[doc(alias = "GThreadPool")]
40pub struct ThreadPool(ptr::NonNull<ffi::GThreadPool>);
41
42unsafe impl Send for ThreadPool {}
43unsafe impl Sync for ThreadPool {}
44
45// rustdoc-stripper-ignore-next
46/// A handle to a thread running on a [`ThreadPool`].
47///
48/// Like [`std::thread::JoinHandle`] for a GLib thread. The return value from the task can be
49/// retrieved by calling [`ThreadHandle::join`]. Dropping the handle "detaches" the thread,
50/// allowing it to complete but discarding the return value.
51#[derive(Debug)]
52pub struct ThreadHandle<T> {
53    rx: std::sync::mpsc::Receiver<std::thread::Result<T>>,
54}
55
56impl<T> ThreadHandle<T> {
57    // rustdoc-stripper-ignore-next
58    /// Waits for the associated thread to finish.
59    ///
60    /// Blocks until the associated thread returns. Returns `Ok` with the value returned from the
61    /// thread, or `Err` if the thread panicked. This function will return immediately if the
62    /// associated thread has already finished.
63    #[inline]
64    pub fn join(self) -> std::thread::Result<T> {
65        self.rx.recv().unwrap()
66    }
67}
68
69impl ThreadPool {
70    #[doc(alias = "g_thread_pool_new")]
71    pub fn shared(max_threads: Option<u32>) -> Result<Self, crate::Error> {
72        unsafe {
73            let mut err = ptr::null_mut();
74            let pool = ffi::g_thread_pool_new(
75                Some(spawn_func),
76                ptr::null_mut(),
77                max_threads.map(|v| v as i32).unwrap_or(-1),
78                ffi::GFALSE,
79                &mut err,
80            );
81            if pool.is_null() {
82                Err(from_glib_full(err))
83            } else {
84                Ok(ThreadPool(ptr::NonNull::new_unchecked(pool)))
85            }
86        }
87    }
88
89    #[doc(alias = "g_thread_pool_new")]
90    pub fn exclusive(max_threads: u32) -> Result<Self, crate::Error> {
91        unsafe {
92            let mut err = ptr::null_mut();
93            let pool = ffi::g_thread_pool_new(
94                Some(spawn_func),
95                ptr::null_mut(),
96                max_threads as i32,
97                ffi::GTRUE,
98                &mut err,
99            );
100            if pool.is_null() {
101                Err(from_glib_full(err))
102            } else {
103                Ok(ThreadPool(ptr::NonNull::new_unchecked(pool)))
104            }
105        }
106    }
107
108    /// Inserts @data into the list of tasks to be executed by @self.
109    ///
110    /// When the number of currently running threads is lower than the
111    /// maximal allowed number of threads, a new thread is started (or
112    /// reused) with the properties given to g_thread_pool_new().
113    /// Otherwise, @data stays in the queue until a thread in this pool
114    /// finishes its previous task and processes @data.
115    ///
116    /// @error can be [`None`] to ignore errors, or non-[`None`] to report
117    /// errors. An error can only occur when a new thread couldn't be
118    /// created. In that case @data is simply appended to the queue of
119    /// work to do.
120    ///
121    /// Before version 2.32, this function did not return a success status.
122    ///
123    /// # Returns
124    ///
125    /// [`true`] on success, [`false`] if an error occurred
126    #[doc(alias = "g_thread_pool_push")]
127    pub fn push<T: Send + 'static, F: FnOnce() -> T + Send + 'static>(
128        &self,
129        func: F,
130    ) -> Result<ThreadHandle<T>, crate::Error> {
131        let (tx, rx) = std::sync::mpsc::sync_channel(1);
132        unsafe {
133            let func: Box<dyn FnOnce() + Send + 'static> = Box::new(move || {
134                let _ = tx.send(panic::catch_unwind(panic::AssertUnwindSafe(func)));
135            });
136            let func = Box::new(func);
137            let mut err = ptr::null_mut();
138
139            let func = Box::into_raw(func);
140            let ret: bool = from_glib(ffi::g_thread_pool_push(
141                self.0.as_ptr(),
142                func as *mut _,
143                &mut err,
144            ));
145            if ret {
146                Ok(ThreadHandle { rx })
147            } else {
148                let _ = Box::from_raw(func);
149                Err(from_glib_full(err))
150            }
151        }
152    }
153
154    #[cfg(feature = "futures")]
155    pub fn push_future<T: Send + 'static, F: FnOnce() -> T + Send + 'static>(
156        &self,
157        func: F,
158    ) -> Result<
159        impl Future<Output = std::thread::Result<T>> + Send + Sync + 'static + use<T, F>,
160        crate::Error,
161    > {
162        let (sender, receiver) = oneshot::channel();
163
164        self.push(move || {
165            let _ = sender.send(panic::catch_unwind(panic::AssertUnwindSafe(func)));
166        })?;
167
168        Ok(async move { receiver.await.expect("Dropped before executing") })
169    }
170
171    /// Sets the maximal allowed number of threads for @self.
172    /// A value of -1 means that the maximal number of threads
173    /// is unlimited. If @self is an exclusive thread pool, setting
174    /// the maximal number of threads to -1 is not allowed.
175    ///
176    /// Setting @max_threads to 0 means stopping all work for @self.
177    /// It is effectively frozen until @max_threads is set to a non-zero
178    /// value again.
179    ///
180    /// A thread is never terminated while calling @func, as supplied by
181    /// g_thread_pool_new(). Instead the maximal number of threads only
182    /// has effect for the allocation of new threads in g_thread_pool_push().
183    /// A new thread is allocated, whenever the number of currently
184    /// running threads in @self is smaller than the maximal number.
185    ///
186    /// @error can be [`None`] to ignore errors, or non-[`None`] to report
187    /// errors. An error can only occur when a new thread couldn't be
188    /// created.
189    ///
190    /// Before version 2.32, this function did not return a success status.
191    /// ## `max_threads`
192    /// a new maximal number of threads for @self,
193    ///     or -1 for unlimited
194    ///
195    /// # Returns
196    ///
197    /// [`true`] on success, [`false`] if an error occurred
198    #[doc(alias = "g_thread_pool_set_max_threads")]
199    pub fn set_max_threads(&self, max_threads: Option<u32>) -> Result<(), crate::Error> {
200        unsafe {
201            let mut err = ptr::null_mut();
202            let ret: bool = from_glib(ffi::g_thread_pool_set_max_threads(
203                self.0.as_ptr(),
204                max_threads.map(|v| v as i32).unwrap_or(-1),
205                &mut err,
206            ));
207            if ret {
208                Ok(())
209            } else {
210                Err(from_glib_full(err))
211            }
212        }
213    }
214
215    /// Returns the maximal number of threads for @self.
216    ///
217    /// # Returns
218    ///
219    /// the maximal number of threads
220    #[doc(alias = "g_thread_pool_get_max_threads")]
221    #[doc(alias = "get_max_threads")]
222    pub fn max_threads(&self) -> Option<u32> {
223        unsafe {
224            let max_threads = ffi::g_thread_pool_get_max_threads(self.0.as_ptr());
225            if max_threads == -1 {
226                None
227            } else {
228                Some(max_threads as u32)
229            }
230        }
231    }
232
233    /// Returns the number of threads currently running in @self.
234    ///
235    /// # Returns
236    ///
237    /// the number of threads currently running
238    #[doc(alias = "g_thread_pool_get_num_threads")]
239    #[doc(alias = "get_num_threads")]
240    pub fn num_threads(&self) -> u32 {
241        unsafe { ffi::g_thread_pool_get_num_threads(self.0.as_ptr()) }
242    }
243
244    /// Returns the number of tasks still unprocessed in @self.
245    ///
246    /// # Returns
247    ///
248    /// the number of unprocessed tasks
249    #[doc(alias = "g_thread_pool_unprocessed")]
250    #[doc(alias = "get_unprocessed")]
251    pub fn unprocessed(&self) -> u32 {
252        unsafe { ffi::g_thread_pool_unprocessed(self.0.as_ptr()) }
253    }
254
255    /// Sets the maximal number of unused threads to @max_threads.
256    /// If @max_threads is -1, no limit is imposed on the number
257    /// of unused threads.
258    ///
259    /// The default value is 8 since GLib 2.84. Previously the default value was 2.
260    /// ## `max_threads`
261    /// maximal number of unused threads
262    #[doc(alias = "g_thread_pool_set_max_unused_threads")]
263    pub fn set_max_unused_threads(max_threads: Option<u32>) {
264        unsafe {
265            ffi::g_thread_pool_set_max_unused_threads(max_threads.map(|v| v as i32).unwrap_or(-1))
266        }
267    }
268
269    /// Returns the maximal allowed number of unused threads.
270    ///
271    /// # Returns
272    ///
273    /// the maximal number of unused threads
274    #[doc(alias = "g_thread_pool_get_max_unused_threads")]
275    #[doc(alias = "get_max_unused_threads")]
276    pub fn max_unused_threads() -> Option<u32> {
277        unsafe {
278            let max_unused_threads = ffi::g_thread_pool_get_max_unused_threads();
279            if max_unused_threads == -1 {
280                None
281            } else {
282                Some(max_unused_threads as u32)
283            }
284        }
285    }
286
287    /// Returns the number of currently unused threads.
288    ///
289    /// # Returns
290    ///
291    /// the number of currently unused threads
292    #[doc(alias = "g_thread_pool_get_num_unused_threads")]
293    #[doc(alias = "get_num_unused_threads")]
294    pub fn num_unused_threads() -> u32 {
295        unsafe { ffi::g_thread_pool_get_num_unused_threads() }
296    }
297
298    /// Stops all currently unused threads. This does not change the
299    /// maximal number of unused threads. This function can be used to
300    /// regularly stop all unused threads e.g. from g_timeout_add().
301    #[doc(alias = "g_thread_pool_stop_unused_threads")]
302    pub fn stop_unused_threads() {
303        unsafe {
304            ffi::g_thread_pool_stop_unused_threads();
305        }
306    }
307
308    /// This function will set the maximum @interval that a thread
309    /// waiting in the pool for new tasks can be idle for before
310    /// being stopped. This function is similar to calling
311    /// g_thread_pool_stop_unused_threads() on a regular timeout,
312    /// except this is done on a per thread basis.
313    ///
314    /// By setting @interval to 0, idle threads will not be stopped.
315    ///
316    /// The default value is 15000 (15 seconds).
317    /// ## `interval`
318    /// the maximum @interval (in milliseconds)
319    ///     a thread can be idle
320    #[doc(alias = "g_thread_pool_set_max_idle_time")]
321    pub fn set_max_idle_time(max_idle_time: u32) {
322        unsafe { ffi::g_thread_pool_set_max_idle_time(max_idle_time) }
323    }
324
325    /// This function will return the maximum @interval that a
326    /// thread will wait in the thread pool for new tasks before
327    /// being stopped.
328    ///
329    /// If this function returns 0, threads waiting in the thread
330    /// pool for new work are not stopped.
331    ///
332    /// # Returns
333    ///
334    /// the maximum @interval (milliseconds) to wait
335    ///     for new tasks in the thread pool before stopping the
336    ///     thread
337    #[doc(alias = "g_thread_pool_get_max_idle_time")]
338    #[doc(alias = "get_max_idle_time")]
339    pub fn max_idle_time() -> u32 {
340        unsafe { ffi::g_thread_pool_get_max_idle_time() }
341    }
342}
343
344impl Drop for ThreadPool {
345    #[inline]
346    fn drop(&mut self) {
347        unsafe {
348            ffi::g_thread_pool_free(self.0.as_ptr(), ffi::GFALSE, ffi::GTRUE);
349        }
350    }
351}
352
353unsafe extern "C" fn spawn_func(func: ffi::gpointer, _data: ffi::gpointer) {
354    unsafe {
355        let func: Box<Box<dyn FnOnce()>> = Box::from_raw(func as *mut _);
356        func()
357    }
358}
359
360#[cfg(test)]
361mod tests {
362    use super::*;
363
364    #[test]
365    fn test_push() {
366        use std::sync::mpsc;
367
368        let p = ThreadPool::exclusive(1).unwrap();
369        let (sender, receiver) = mpsc::channel();
370
371        let handle = p
372            .push(move || {
373                sender.send(true).unwrap();
374                123
375            })
376            .unwrap();
377
378        assert_eq!(handle.join().unwrap(), 123);
379        assert_eq!(receiver.recv(), Ok(true));
380    }
381
382    #[cfg(feature = "futures")]
383    #[test]
384    fn test_push_future() {
385        let c = crate::MainContext::new();
386        let p = ThreadPool::shared(None).unwrap();
387
388        let fut = p.push_future(|| true).unwrap();
389
390        let res = c.block_on(fut);
391        assert!(res.unwrap());
392    }
393}