Skip to main content

glib/
thread_pool.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{panic, ptr};
4
5#[cfg(feature = "futures")]
6use futures_channel::oneshot;
7#[cfg(feature = "futures")]
8use std::future::Future;
9
10use crate::{ffi, translate::*};
11
12/// The `GThreadPool` struct represents a thread pool.
13///
14/// A thread pool is useful when you wish to asynchronously fork out the execution of work
15/// and continue working in your own thread. If that will happen often, the overhead of starting
16/// and destroying a thread each time might be too high. In such cases reusing already started
17/// threads seems like a good idea. And it indeed is, but implementing this can be tedious
18/// and error-prone.
19///
20/// Therefore GLib provides thread pools for your convenience. An added advantage is, that the
21/// threads can be shared between the different subsystems of your program, when they are using GLib.
22///
23/// To create a new thread pool, you use [`new()`][Self::new()].
24/// It is destroyed by `GLib::ThreadPool::free()`.
25///
26/// If you want to execute a certain task within a thread pool, use [`push()`][Self::push()].
27///
28/// To get the current number of running threads you call [`num_threads()`][Self::num_threads()].
29/// To get the number of still unprocessed tasks you call [`unprocessed()`][Self::unprocessed()].
30/// To control the maximum number of threads for a thread pool, you use
31/// [`max_threads()`][Self::max_threads()]. and [`set_max_threads()`][Self::set_max_threads()].
32///
33/// Finally you can control the number of unused threads, that are kept alive by GLib for future use.
34/// The current number can be fetched with [`num_unused_threads()`][Self::num_unused_threads()].
35/// The maximum number can be controlled by [`max_unused_threads()`][Self::max_unused_threads()] and
36/// [`set_max_unused_threads()`][Self::set_max_unused_threads()]. All currently unused threads
37/// can be stopped by calling [`stop_unused_threads()`][Self::stop_unused_threads()].
38// rustdoc-stripper-ignore-next-stop
39/// The `GThreadPool` struct represents a thread pool.
40///
41/// A thread pool is useful when you wish to asynchronously fork out the execution of work
42/// and continue working in your own thread. If that will happen often, the overhead of starting
43/// and destroying a thread each time might be too high. In such cases reusing already started
44/// threads seems like a good idea. And it indeed is, but implementing this can be tedious
45/// and error-prone.
46///
47/// Therefore GLib provides thread pools for your convenience. An added advantage is, that the
48/// threads can be shared between the different subsystems of your program, when they are using GLib.
49///
50/// To create a new thread pool, you use [`new()`][Self::new()].
51/// It is destroyed by `GLib::ThreadPool::free()`.
52///
53/// If you want to execute a certain task within a thread pool, use [`push()`][Self::push()].
54///
55/// To get the current number of running threads you call [`num_threads()`][Self::num_threads()].
56/// To get the number of still unprocessed tasks you call [`unprocessed()`][Self::unprocessed()].
57/// To control the maximum number of threads for a thread pool, you use
58/// [`max_threads()`][Self::max_threads()]. and [`set_max_threads()`][Self::set_max_threads()].
59///
60/// Finally you can control the number of unused threads, that are kept alive by GLib for future use.
61/// The current number can be fetched with [`num_unused_threads()`][Self::num_unused_threads()].
62/// The maximum number can be controlled by [`max_unused_threads()`][Self::max_unused_threads()] and
63/// [`set_max_unused_threads()`][Self::set_max_unused_threads()]. All currently unused threads
64/// can be stopped by calling [`stop_unused_threads()`][Self::stop_unused_threads()].
65#[derive(Debug)]
66#[doc(alias = "GThreadPool")]
67pub struct ThreadPool(ptr::NonNull<ffi::GThreadPool>);
68
69unsafe impl Send for ThreadPool {}
70unsafe impl Sync for ThreadPool {}
71
72// rustdoc-stripper-ignore-next
73/// A handle to a thread running on a [`ThreadPool`].
74///
75/// Like [`std::thread::JoinHandle`] for a GLib thread. The return value from the task can be
76/// retrieved by calling [`ThreadHandle::join`]. Dropping the handle "detaches" the thread,
77/// allowing it to complete but discarding the return value.
78#[derive(Debug)]
79pub struct ThreadHandle<T> {
80    rx: std::sync::mpsc::Receiver<std::thread::Result<T>>,
81}
82
83impl<T> ThreadHandle<T> {
84    // rustdoc-stripper-ignore-next
85    /// Waits for the associated thread to finish.
86    ///
87    /// Blocks until the associated thread returns. Returns `Ok` with the value returned from the
88    /// thread, or `Err` if the thread panicked. This function will return immediately if the
89    /// associated thread has already finished.
90    #[inline]
91    pub fn join(self) -> std::thread::Result<T> {
92        self.rx.recv().unwrap()
93    }
94}
95
96impl ThreadPool {
97    #[doc(alias = "g_thread_pool_new")]
98    pub fn shared(max_threads: Option<u32>) -> Result<Self, crate::Error> {
99        unsafe {
100            let mut err = ptr::null_mut();
101            let pool = ffi::g_thread_pool_new(
102                Some(spawn_func),
103                ptr::null_mut(),
104                max_threads.map(|v| v as i32).unwrap_or(-1),
105                ffi::GFALSE,
106                &mut err,
107            );
108            if pool.is_null() {
109                Err(from_glib_full(err))
110            } else {
111                Ok(ThreadPool(ptr::NonNull::new_unchecked(pool)))
112            }
113        }
114    }
115
116    #[doc(alias = "g_thread_pool_new")]
117    pub fn exclusive(max_threads: u32) -> Result<Self, crate::Error> {
118        unsafe {
119            let mut err = ptr::null_mut();
120            let pool = ffi::g_thread_pool_new(
121                Some(spawn_func),
122                ptr::null_mut(),
123                max_threads as i32,
124                ffi::GTRUE,
125                &mut err,
126            );
127            if pool.is_null() {
128                Err(from_glib_full(err))
129            } else {
130                Ok(ThreadPool(ptr::NonNull::new_unchecked(pool)))
131            }
132        }
133    }
134
135    /// Inserts @data into the list of tasks to be executed by @self.
136    ///
137    /// When the number of currently running threads is lower than the
138    /// maximal allowed number of threads, a new thread is started (or
139    /// reused) with the properties given to g_thread_pool_new().
140    /// Otherwise, @data stays in the queue until a thread in this pool
141    /// finishes its previous task and processes @data.
142    ///
143    /// @error can be [`None`] to ignore errors, or non-[`None`] to report
144    /// errors. An error can only occur when a new thread couldn't be
145    /// created. In that case @data is simply appended to the queue of
146    /// work to do.
147    ///
148    /// Before version 2.32, this function did not return a success status.
149    ///
150    /// # Returns
151    ///
152    /// [`true`] on success, [`false`] if an error occurred
153    // rustdoc-stripper-ignore-next-stop
154    /// Inserts @data into the list of tasks to be executed by @self.
155    ///
156    /// When the number of currently running threads is lower than the
157    /// maximal allowed number of threads, a new thread is started (or
158    /// reused) with the properties given to g_thread_pool_new().
159    /// Otherwise, @data stays in the queue until a thread in this pool
160    /// finishes its previous task and processes @data.
161    ///
162    /// @error can be [`None`] to ignore errors, or non-[`None`] to report
163    /// errors. An error can only occur when a new thread couldn't be
164    /// created. In that case @data is simply appended to the queue of
165    /// work to do.
166    ///
167    /// Before version 2.32, this function did not return a success status.
168    ///
169    /// # Returns
170    ///
171    /// [`true`] on success, [`false`] if an error occurred
172    #[doc(alias = "g_thread_pool_push")]
173    pub fn push<T: Send + 'static, F: FnOnce() -> T + Send + 'static>(
174        &self,
175        func: F,
176    ) -> Result<ThreadHandle<T>, crate::Error> {
177        let (tx, rx) = std::sync::mpsc::sync_channel(1);
178        unsafe {
179            let func: Box<dyn FnOnce() + Send + 'static> = Box::new(move || {
180                let _ = tx.send(panic::catch_unwind(panic::AssertUnwindSafe(func)));
181            });
182            let func = Box::new(func);
183            let mut err = ptr::null_mut();
184
185            let func = Box::into_raw(func);
186            let ret: bool = from_glib(ffi::g_thread_pool_push(
187                self.0.as_ptr(),
188                func as *mut _,
189                &mut err,
190            ));
191            if ret {
192                Ok(ThreadHandle { rx })
193            } else {
194                let _ = Box::from_raw(func);
195                Err(from_glib_full(err))
196            }
197        }
198    }
199
200    #[cfg(feature = "futures")]
201    pub fn push_future<T: Send + 'static, F: FnOnce() -> T + Send + 'static>(
202        &self,
203        func: F,
204    ) -> Result<
205        impl Future<Output = std::thread::Result<T>> + Send + Sync + 'static + use<T, F>,
206        crate::Error,
207    > {
208        let (sender, receiver) = oneshot::channel();
209
210        self.push(move || {
211            let _ = sender.send(panic::catch_unwind(panic::AssertUnwindSafe(func)));
212        })?;
213
214        Ok(async move { receiver.await.expect("Dropped before executing") })
215    }
216
217    /// Sets the maximal allowed number of threads for @self.
218    /// A value of -1 means that the maximal number of threads
219    /// is unlimited. If @self is an exclusive thread pool, setting
220    /// the maximal number of threads to -1 is not allowed.
221    ///
222    /// Setting @max_threads to 0 means stopping all work for @self.
223    /// It is effectively frozen until @max_threads is set to a non-zero
224    /// value again.
225    ///
226    /// A thread is never terminated while calling @func, as supplied by
227    /// g_thread_pool_new(). Instead the maximal number of threads only
228    /// has effect for the allocation of new threads in g_thread_pool_push().
229    /// A new thread is allocated, whenever the number of currently
230    /// running threads in @self is smaller than the maximal number.
231    ///
232    /// @error can be [`None`] to ignore errors, or non-[`None`] to report
233    /// errors. An error can only occur when a new thread couldn't be
234    /// created.
235    ///
236    /// Before version 2.32, this function did not return a success status.
237    /// ## `max_threads`
238    /// a new maximal number of threads for @self,
239    ///     or -1 for unlimited
240    ///
241    /// # Returns
242    ///
243    /// [`true`] on success, [`false`] if an error occurred
244    // rustdoc-stripper-ignore-next-stop
245    /// Sets the maximal allowed number of threads for @self.
246    /// A value of -1 means that the maximal number of threads
247    /// is unlimited. If @self is an exclusive thread pool, setting
248    /// the maximal number of threads to -1 is not allowed.
249    ///
250    /// Setting @max_threads to 0 means stopping all work for @self.
251    /// It is effectively frozen until @max_threads is set to a non-zero
252    /// value again.
253    ///
254    /// A thread is never terminated while calling @func, as supplied by
255    /// g_thread_pool_new(). Instead the maximal number of threads only
256    /// has effect for the allocation of new threads in g_thread_pool_push().
257    /// A new thread is allocated, whenever the number of currently
258    /// running threads in @self is smaller than the maximal number.
259    ///
260    /// @error can be [`None`] to ignore errors, or non-[`None`] to report
261    /// errors. An error can only occur when a new thread couldn't be
262    /// created.
263    ///
264    /// Before version 2.32, this function did not return a success status.
265    /// ## `max_threads`
266    /// a new maximal number of threads for @self,
267    ///     or -1 for unlimited
268    ///
269    /// # Returns
270    ///
271    /// [`true`] on success, [`false`] if an error occurred
272    #[doc(alias = "g_thread_pool_set_max_threads")]
273    pub fn set_max_threads(&self, max_threads: Option<u32>) -> Result<(), crate::Error> {
274        unsafe {
275            let mut err = ptr::null_mut();
276            let ret: bool = from_glib(ffi::g_thread_pool_set_max_threads(
277                self.0.as_ptr(),
278                max_threads.map(|v| v as i32).unwrap_or(-1),
279                &mut err,
280            ));
281            if ret {
282                Ok(())
283            } else {
284                Err(from_glib_full(err))
285            }
286        }
287    }
288
289    /// Returns the maximal number of threads for @self.
290    ///
291    /// # Returns
292    ///
293    /// the maximal number of threads
294    // rustdoc-stripper-ignore-next-stop
295    /// Returns the maximal number of threads for @self.
296    ///
297    /// # Returns
298    ///
299    /// the maximal number of threads
300    #[doc(alias = "g_thread_pool_get_max_threads")]
301    #[doc(alias = "get_max_threads")]
302    pub fn max_threads(&self) -> Option<u32> {
303        unsafe {
304            let max_threads = ffi::g_thread_pool_get_max_threads(self.0.as_ptr());
305            if max_threads == -1 {
306                None
307            } else {
308                Some(max_threads as u32)
309            }
310        }
311    }
312
313    /// Returns the number of threads currently running in @self.
314    ///
315    /// # Returns
316    ///
317    /// the number of threads currently running
318    // rustdoc-stripper-ignore-next-stop
319    /// Returns the number of threads currently running in @self.
320    ///
321    /// # Returns
322    ///
323    /// the number of threads currently running
324    #[doc(alias = "g_thread_pool_get_num_threads")]
325    #[doc(alias = "get_num_threads")]
326    pub fn num_threads(&self) -> u32 {
327        unsafe { ffi::g_thread_pool_get_num_threads(self.0.as_ptr()) }
328    }
329
330    /// Returns the number of tasks still unprocessed in @self.
331    ///
332    /// # Returns
333    ///
334    /// the number of unprocessed tasks
335    // rustdoc-stripper-ignore-next-stop
336    /// Returns the number of tasks still unprocessed in @self.
337    ///
338    /// # Returns
339    ///
340    /// the number of unprocessed tasks
341    #[doc(alias = "g_thread_pool_unprocessed")]
342    #[doc(alias = "get_unprocessed")]
343    pub fn unprocessed(&self) -> u32 {
344        unsafe { ffi::g_thread_pool_unprocessed(self.0.as_ptr()) }
345    }
346
347    /// Sets the maximal number of unused threads to @max_threads.
348    /// If @max_threads is -1, no limit is imposed on the number
349    /// of unused threads.
350    ///
351    /// The default value is 8 since GLib 2.84. Previously the default value was 2.
352    /// ## `max_threads`
353    /// maximal number of unused threads
354    // rustdoc-stripper-ignore-next-stop
355    /// Sets the maximal number of unused threads to @max_threads.
356    /// If @max_threads is -1, no limit is imposed on the number
357    /// of unused threads.
358    ///
359    /// The default value is 8 since GLib 2.84. Previously the default value was 2.
360    /// ## `max_threads`
361    /// maximal number of unused threads
362    #[doc(alias = "g_thread_pool_set_max_unused_threads")]
363    pub fn set_max_unused_threads(max_threads: Option<u32>) {
364        unsafe {
365            ffi::g_thread_pool_set_max_unused_threads(max_threads.map(|v| v as i32).unwrap_or(-1))
366        }
367    }
368
369    /// Returns the maximal allowed number of unused threads.
370    ///
371    /// # Returns
372    ///
373    /// the maximal number of unused threads
374    // rustdoc-stripper-ignore-next-stop
375    /// Returns the maximal allowed number of unused threads.
376    ///
377    /// # Returns
378    ///
379    /// the maximal number of unused threads
380    #[doc(alias = "g_thread_pool_get_max_unused_threads")]
381    #[doc(alias = "get_max_unused_threads")]
382    pub fn max_unused_threads() -> Option<u32> {
383        unsafe {
384            let max_unused_threads = ffi::g_thread_pool_get_max_unused_threads();
385            if max_unused_threads == -1 {
386                None
387            } else {
388                Some(max_unused_threads as u32)
389            }
390        }
391    }
392
393    /// Returns the number of currently unused threads.
394    ///
395    /// # Returns
396    ///
397    /// the number of currently unused threads
398    // rustdoc-stripper-ignore-next-stop
399    /// Returns the number of currently unused threads.
400    ///
401    /// # Returns
402    ///
403    /// the number of currently unused threads
404    #[doc(alias = "g_thread_pool_get_num_unused_threads")]
405    #[doc(alias = "get_num_unused_threads")]
406    pub fn num_unused_threads() -> u32 {
407        unsafe { ffi::g_thread_pool_get_num_unused_threads() }
408    }
409
410    /// Stops all currently unused threads. This does not change the
411    /// maximal number of unused threads. This function can be used to
412    /// regularly stop all unused threads e.g. from g_timeout_add().
413    // rustdoc-stripper-ignore-next-stop
414    /// Stops all currently unused threads. This does not change the
415    /// maximal number of unused threads. This function can be used to
416    /// regularly stop all unused threads e.g. from g_timeout_add().
417    #[doc(alias = "g_thread_pool_stop_unused_threads")]
418    pub fn stop_unused_threads() {
419        unsafe {
420            ffi::g_thread_pool_stop_unused_threads();
421        }
422    }
423
424    /// This function will set the maximum @interval that a thread
425    /// waiting in the pool for new tasks can be idle for before
426    /// being stopped. This function is similar to calling
427    /// g_thread_pool_stop_unused_threads() on a regular timeout,
428    /// except this is done on a per thread basis.
429    ///
430    /// By setting @interval to 0, idle threads will not be stopped.
431    ///
432    /// The default value is 15000 (15 seconds).
433    /// ## `interval`
434    /// the maximum @interval (in milliseconds)
435    ///     a thread can be idle
436    // rustdoc-stripper-ignore-next-stop
437    /// This function will set the maximum @interval that a thread
438    /// waiting in the pool for new tasks can be idle for before
439    /// being stopped. This function is similar to calling
440    /// g_thread_pool_stop_unused_threads() on a regular timeout,
441    /// except this is done on a per thread basis.
442    ///
443    /// By setting @interval to 0, idle threads will not be stopped.
444    ///
445    /// The default value is 15000 (15 seconds).
446    /// ## `interval`
447    /// the maximum @interval (in milliseconds)
448    ///     a thread can be idle
449    #[doc(alias = "g_thread_pool_set_max_idle_time")]
450    pub fn set_max_idle_time(max_idle_time: u32) {
451        unsafe { ffi::g_thread_pool_set_max_idle_time(max_idle_time) }
452    }
453
454    /// This function will return the maximum @interval that a
455    /// thread will wait in the thread pool for new tasks before
456    /// being stopped.
457    ///
458    /// If this function returns 0, threads waiting in the thread
459    /// pool for new work are not stopped.
460    ///
461    /// # Returns
462    ///
463    /// the maximum @interval (milliseconds) to wait
464    ///     for new tasks in the thread pool before stopping the
465    ///     thread
466    // rustdoc-stripper-ignore-next-stop
467    /// This function will return the maximum @interval that a
468    /// thread will wait in the thread pool for new tasks before
469    /// being stopped.
470    ///
471    /// If this function returns 0, threads waiting in the thread
472    /// pool for new work are not stopped.
473    ///
474    /// # Returns
475    ///
476    /// the maximum @interval (milliseconds) to wait
477    ///     for new tasks in the thread pool before stopping the
478    ///     thread
479    #[doc(alias = "g_thread_pool_get_max_idle_time")]
480    #[doc(alias = "get_max_idle_time")]
481    pub fn max_idle_time() -> u32 {
482        unsafe { ffi::g_thread_pool_get_max_idle_time() }
483    }
484}
485
486impl Drop for ThreadPool {
487    #[inline]
488    fn drop(&mut self) {
489        unsafe {
490            ffi::g_thread_pool_free(self.0.as_ptr(), ffi::GFALSE, ffi::GTRUE);
491        }
492    }
493}
494
495unsafe extern "C" fn spawn_func(func: ffi::gpointer, _data: ffi::gpointer) {
496    unsafe {
497        let func: Box<Box<dyn FnOnce()>> = Box::from_raw(func as *mut _);
498        func()
499    }
500}
501
502#[cfg(test)]
503mod tests {
504    use super::*;
505
506    #[test]
507    fn test_push() {
508        use std::sync::mpsc;
509
510        let p = ThreadPool::exclusive(1).unwrap();
511        let (sender, receiver) = mpsc::channel();
512
513        let handle = p
514            .push(move || {
515                sender.send(true).unwrap();
516                123
517            })
518            .unwrap();
519
520        assert_eq!(handle.join().unwrap(), 123);
521        assert_eq!(receiver.recv(), Ok(true));
522    }
523
524    #[cfg(feature = "futures")]
525    #[test]
526    fn test_push_future() {
527        let c = crate::MainContext::new();
528        let p = ThreadPool::shared(None).unwrap();
529
530        let fut = p.push_future(|| true).unwrap();
531
532        let res = c.block_on(fut);
533        assert!(res.unwrap());
534    }
535}