glib/
thread_pool.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{panic, ptr};
4
5#[cfg(feature = "futures")]
6use futures_channel::oneshot;
7#[cfg(feature = "futures")]
8use std::future::Future;
9
10use crate::{ffi, translate::*};
11
12/// The `GThreadPool` struct represents a thread pool.
13///
14/// A thread pool is useful when you wish to asynchronously fork out the execution of work
15/// and continue working in your own thread. If that will happen often, the overhead of starting
16/// and destroying a thread each time might be too high. In such cases reusing already started
17/// threads seems like a good idea. And it indeed is, but implementing this can be tedious
18/// and error-prone.
19///
20/// Therefore GLib provides thread pools for your convenience. An added advantage is, that the
21/// threads can be shared between the different subsystems of your program, when they are using GLib.
22///
23/// To create a new thread pool, you use [`new()`][Self::new()].
24/// It is destroyed by `GLib::ThreadPool::free()`.
25///
26/// If you want to execute a certain task within a thread pool, use [`push()`][Self::push()].
27///
28/// To get the current number of running threads you call [`num_threads()`][Self::num_threads()].
29/// To get the number of still unprocessed tasks you call [`unprocessed()`][Self::unprocessed()].
30/// To control the maximum number of threads for a thread pool, you use
31/// [`max_threads()`][Self::max_threads()]. and [`set_max_threads()`][Self::set_max_threads()].
32///
33/// Finally you can control the number of unused threads, that are kept alive by GLib for future use.
34/// The current number can be fetched with [`num_unused_threads()`][Self::num_unused_threads()].
35/// The maximum number can be controlled by [`max_unused_threads()`][Self::max_unused_threads()] and
36/// [`set_max_unused_threads()`][Self::set_max_unused_threads()]. All currently unused threads
37/// can be stopped by calling [`stop_unused_threads()`][Self::stop_unused_threads()].
38// rustdoc-stripper-ignore-next-stop
39/// The `GThreadPool` struct represents a thread pool.
40///
41/// A thread pool is useful when you wish to asynchronously fork out the execution of work
42/// and continue working in your own thread. If that will happen often, the overhead of starting
43/// and destroying a thread each time might be too high. In such cases reusing already started
44/// threads seems like a good idea. And it indeed is, but implementing this can be tedious
45/// and error-prone.
46///
47/// Therefore GLib provides thread pools for your convenience. An added advantage is, that the
48/// threads can be shared between the different subsystems of your program, when they are using GLib.
49///
50/// To create a new thread pool, you use [`new()`][Self::new()].
51/// It is destroyed by `GLib::ThreadPool::free()`.
52///
53/// If you want to execute a certain task within a thread pool, use [`push()`][Self::push()].
54///
55/// To get the current number of running threads you call [`num_threads()`][Self::num_threads()].
56/// To get the number of still unprocessed tasks you call [`unprocessed()`][Self::unprocessed()].
57/// To control the maximum number of threads for a thread pool, you use
58/// [`max_threads()`][Self::max_threads()]. and [`set_max_threads()`][Self::set_max_threads()].
59///
60/// Finally you can control the number of unused threads, that are kept alive by GLib for future use.
61/// The current number can be fetched with [`num_unused_threads()`][Self::num_unused_threads()].
62/// The maximum number can be controlled by [`max_unused_threads()`][Self::max_unused_threads()] and
63/// [`set_max_unused_threads()`][Self::set_max_unused_threads()]. All currently unused threads
64/// can be stopped by calling [`stop_unused_threads()`][Self::stop_unused_threads()].
65#[derive(Debug)]
66#[doc(alias = "GThreadPool")]
67pub struct ThreadPool(ptr::NonNull<ffi::GThreadPool>);
68
69unsafe impl Send for ThreadPool {}
70unsafe impl Sync for ThreadPool {}
71
72// rustdoc-stripper-ignore-next
73/// A handle to a thread running on a [`ThreadPool`].
74///
75/// Like [`std::thread::JoinHandle`] for a GLib thread. The return value from the task can be
76/// retrieved by calling [`ThreadHandle::join`]. Dropping the handle "detaches" the thread,
77/// allowing it to complete but discarding the return value.
78#[derive(Debug)]
79pub struct ThreadHandle<T> {
80    rx: std::sync::mpsc::Receiver<std::thread::Result<T>>,
81}
82
83impl<T> ThreadHandle<T> {
84    // rustdoc-stripper-ignore-next
85    /// Waits for the associated thread to finish.
86    ///
87    /// Blocks until the associated thread returns. Returns `Ok` with the value returned from the
88    /// thread, or `Err` if the thread panicked. This function will return immediately if the
89    /// associated thread has already finished.
90    #[inline]
91    pub fn join(self) -> std::thread::Result<T> {
92        self.rx.recv().unwrap()
93    }
94}
95
96impl ThreadPool {
97    #[doc(alias = "g_thread_pool_new")]
98    pub fn shared(max_threads: Option<u32>) -> Result<Self, crate::Error> {
99        unsafe {
100            let mut err = ptr::null_mut();
101            let pool = ffi::g_thread_pool_new(
102                Some(spawn_func),
103                ptr::null_mut(),
104                max_threads.map(|v| v as i32).unwrap_or(-1),
105                ffi::GFALSE,
106                &mut err,
107            );
108            if pool.is_null() {
109                Err(from_glib_full(err))
110            } else {
111                Ok(ThreadPool(ptr::NonNull::new_unchecked(pool)))
112            }
113        }
114    }
115
116    #[doc(alias = "g_thread_pool_new")]
117    pub fn exclusive(max_threads: u32) -> Result<Self, crate::Error> {
118        unsafe {
119            let mut err = ptr::null_mut();
120            let pool = ffi::g_thread_pool_new(
121                Some(spawn_func),
122                ptr::null_mut(),
123                max_threads as i32,
124                ffi::GTRUE,
125                &mut err,
126            );
127            if pool.is_null() {
128                Err(from_glib_full(err))
129            } else {
130                Ok(ThreadPool(ptr::NonNull::new_unchecked(pool)))
131            }
132        }
133    }
134
135    /// Inserts @data into the list of tasks to be executed by @self.
136    ///
137    /// When the number of currently running threads is lower than the
138    /// maximal allowed number of threads, a new thread is started (or
139    /// reused) with the properties given to g_thread_pool_new().
140    /// Otherwise, @data stays in the queue until a thread in this pool
141    /// finishes its previous task and processes @data.
142    ///
143    /// @error can be [`None`] to ignore errors, or non-[`None`] to report
144    /// errors. An error can only occur when a new thread couldn't be
145    /// created. In that case @data is simply appended to the queue of
146    /// work to do.
147    ///
148    /// Before version 2.32, this function did not return a success status.
149    ///
150    /// # Returns
151    ///
152    /// [`true`] on success, [`false`] if an error occurred
153    // rustdoc-stripper-ignore-next-stop
154    /// Inserts @data into the list of tasks to be executed by @self.
155    ///
156    /// When the number of currently running threads is lower than the
157    /// maximal allowed number of threads, a new thread is started (or
158    /// reused) with the properties given to g_thread_pool_new().
159    /// Otherwise, @data stays in the queue until a thread in this pool
160    /// finishes its previous task and processes @data.
161    ///
162    /// @error can be [`None`] to ignore errors, or non-[`None`] to report
163    /// errors. An error can only occur when a new thread couldn't be
164    /// created. In that case @data is simply appended to the queue of
165    /// work to do.
166    ///
167    /// Before version 2.32, this function did not return a success status.
168    ///
169    /// # Returns
170    ///
171    /// [`true`] on success, [`false`] if an error occurred
172    #[doc(alias = "g_thread_pool_push")]
173    pub fn push<T: Send + 'static, F: FnOnce() -> T + Send + 'static>(
174        &self,
175        func: F,
176    ) -> Result<ThreadHandle<T>, crate::Error> {
177        let (tx, rx) = std::sync::mpsc::sync_channel(1);
178        unsafe {
179            let func: Box<dyn FnOnce() + Send + 'static> = Box::new(move || {
180                let _ = tx.send(panic::catch_unwind(panic::AssertUnwindSafe(func)));
181            });
182            let func = Box::new(func);
183            let mut err = ptr::null_mut();
184
185            let func = Box::into_raw(func);
186            let ret: bool = from_glib(ffi::g_thread_pool_push(
187                self.0.as_ptr(),
188                func as *mut _,
189                &mut err,
190            ));
191            if ret {
192                Ok(ThreadHandle { rx })
193            } else {
194                let _ = Box::from_raw(func);
195                Err(from_glib_full(err))
196            }
197        }
198    }
199
200    #[cfg(feature = "futures")]
201    pub fn push_future<T: Send + 'static, F: FnOnce() -> T + Send + 'static>(
202        &self,
203        func: F,
204    ) -> Result<impl Future<Output = std::thread::Result<T>> + Send + Sync + 'static, crate::Error>
205    {
206        let (sender, receiver) = oneshot::channel();
207
208        self.push(move || {
209            let _ = sender.send(panic::catch_unwind(panic::AssertUnwindSafe(func)));
210        })?;
211
212        Ok(async move { receiver.await.expect("Dropped before executing") })
213    }
214
215    /// Sets the maximal allowed number of threads for @self.
216    /// A value of -1 means that the maximal number of threads
217    /// is unlimited. If @self is an exclusive thread pool, setting
218    /// the maximal number of threads to -1 is not allowed.
219    ///
220    /// Setting @max_threads to 0 means stopping all work for @self.
221    /// It is effectively frozen until @max_threads is set to a non-zero
222    /// value again.
223    ///
224    /// A thread is never terminated while calling @func, as supplied by
225    /// g_thread_pool_new(). Instead the maximal number of threads only
226    /// has effect for the allocation of new threads in g_thread_pool_push().
227    /// A new thread is allocated, whenever the number of currently
228    /// running threads in @self is smaller than the maximal number.
229    ///
230    /// @error can be [`None`] to ignore errors, or non-[`None`] to report
231    /// errors. An error can only occur when a new thread couldn't be
232    /// created.
233    ///
234    /// Before version 2.32, this function did not return a success status.
235    /// ## `max_threads`
236    /// a new maximal number of threads for @self,
237    ///     or -1 for unlimited
238    ///
239    /// # Returns
240    ///
241    /// [`true`] on success, [`false`] if an error occurred
242    // rustdoc-stripper-ignore-next-stop
243    /// Sets the maximal allowed number of threads for @self.
244    /// A value of -1 means that the maximal number of threads
245    /// is unlimited. If @self is an exclusive thread pool, setting
246    /// the maximal number of threads to -1 is not allowed.
247    ///
248    /// Setting @max_threads to 0 means stopping all work for @self.
249    /// It is effectively frozen until @max_threads is set to a non-zero
250    /// value again.
251    ///
252    /// A thread is never terminated while calling @func, as supplied by
253    /// g_thread_pool_new(). Instead the maximal number of threads only
254    /// has effect for the allocation of new threads in g_thread_pool_push().
255    /// A new thread is allocated, whenever the number of currently
256    /// running threads in @self is smaller than the maximal number.
257    ///
258    /// @error can be [`None`] to ignore errors, or non-[`None`] to report
259    /// errors. An error can only occur when a new thread couldn't be
260    /// created.
261    ///
262    /// Before version 2.32, this function did not return a success status.
263    /// ## `max_threads`
264    /// a new maximal number of threads for @self,
265    ///     or -1 for unlimited
266    ///
267    /// # Returns
268    ///
269    /// [`true`] on success, [`false`] if an error occurred
270    #[doc(alias = "g_thread_pool_set_max_threads")]
271    pub fn set_max_threads(&self, max_threads: Option<u32>) -> Result<(), crate::Error> {
272        unsafe {
273            let mut err = ptr::null_mut();
274            let ret: bool = from_glib(ffi::g_thread_pool_set_max_threads(
275                self.0.as_ptr(),
276                max_threads.map(|v| v as i32).unwrap_or(-1),
277                &mut err,
278            ));
279            if ret {
280                Ok(())
281            } else {
282                Err(from_glib_full(err))
283            }
284        }
285    }
286
287    /// Returns the maximal number of threads for @self.
288    ///
289    /// # Returns
290    ///
291    /// the maximal number of threads
292    // rustdoc-stripper-ignore-next-stop
293    /// Returns the maximal number of threads for @self.
294    ///
295    /// # Returns
296    ///
297    /// the maximal number of threads
298    #[doc(alias = "g_thread_pool_get_max_threads")]
299    #[doc(alias = "get_max_threads")]
300    pub fn max_threads(&self) -> Option<u32> {
301        unsafe {
302            let max_threads = ffi::g_thread_pool_get_max_threads(self.0.as_ptr());
303            if max_threads == -1 {
304                None
305            } else {
306                Some(max_threads as u32)
307            }
308        }
309    }
310
311    /// Returns the number of threads currently running in @self.
312    ///
313    /// # Returns
314    ///
315    /// the number of threads currently running
316    // rustdoc-stripper-ignore-next-stop
317    /// Returns the number of threads currently running in @self.
318    ///
319    /// # Returns
320    ///
321    /// the number of threads currently running
322    #[doc(alias = "g_thread_pool_get_num_threads")]
323    #[doc(alias = "get_num_threads")]
324    pub fn num_threads(&self) -> u32 {
325        unsafe { ffi::g_thread_pool_get_num_threads(self.0.as_ptr()) }
326    }
327
328    /// Returns the number of tasks still unprocessed in @self.
329    ///
330    /// # Returns
331    ///
332    /// the number of unprocessed tasks
333    // rustdoc-stripper-ignore-next-stop
334    /// Returns the number of tasks still unprocessed in @self.
335    ///
336    /// # Returns
337    ///
338    /// the number of unprocessed tasks
339    #[doc(alias = "g_thread_pool_unprocessed")]
340    #[doc(alias = "get_unprocessed")]
341    pub fn unprocessed(&self) -> u32 {
342        unsafe { ffi::g_thread_pool_unprocessed(self.0.as_ptr()) }
343    }
344
345    /// Sets the maximal number of unused threads to @max_threads.
346    /// If @max_threads is -1, no limit is imposed on the number
347    /// of unused threads.
348    ///
349    /// The default value is 8 since GLib 2.84. Previously the default value was 2.
350    /// ## `max_threads`
351    /// maximal number of unused threads
352    // rustdoc-stripper-ignore-next-stop
353    /// Sets the maximal number of unused threads to @max_threads.
354    /// If @max_threads is -1, no limit is imposed on the number
355    /// of unused threads.
356    ///
357    /// The default value is 8 since GLib 2.84. Previously the default value was 2.
358    /// ## `max_threads`
359    /// maximal number of unused threads
360    #[doc(alias = "g_thread_pool_set_max_unused_threads")]
361    pub fn set_max_unused_threads(max_threads: Option<u32>) {
362        unsafe {
363            ffi::g_thread_pool_set_max_unused_threads(max_threads.map(|v| v as i32).unwrap_or(-1))
364        }
365    }
366
367    /// Returns the maximal allowed number of unused threads.
368    ///
369    /// # Returns
370    ///
371    /// the maximal number of unused threads
372    // rustdoc-stripper-ignore-next-stop
373    /// Returns the maximal allowed number of unused threads.
374    ///
375    /// # Returns
376    ///
377    /// the maximal number of unused threads
378    #[doc(alias = "g_thread_pool_get_max_unused_threads")]
379    #[doc(alias = "get_max_unused_threads")]
380    pub fn max_unused_threads() -> Option<u32> {
381        unsafe {
382            let max_unused_threads = ffi::g_thread_pool_get_max_unused_threads();
383            if max_unused_threads == -1 {
384                None
385            } else {
386                Some(max_unused_threads as u32)
387            }
388        }
389    }
390
391    /// Returns the number of currently unused threads.
392    ///
393    /// # Returns
394    ///
395    /// the number of currently unused threads
396    // rustdoc-stripper-ignore-next-stop
397    /// Returns the number of currently unused threads.
398    ///
399    /// # Returns
400    ///
401    /// the number of currently unused threads
402    #[doc(alias = "g_thread_pool_get_num_unused_threads")]
403    #[doc(alias = "get_num_unused_threads")]
404    pub fn num_unused_threads() -> u32 {
405        unsafe { ffi::g_thread_pool_get_num_unused_threads() }
406    }
407
408    /// Stops all currently unused threads. This does not change the
409    /// maximal number of unused threads. This function can be used to
410    /// regularly stop all unused threads e.g. from g_timeout_add().
411    // rustdoc-stripper-ignore-next-stop
412    /// Stops all currently unused threads. This does not change the
413    /// maximal number of unused threads. This function can be used to
414    /// regularly stop all unused threads e.g. from g_timeout_add().
415    #[doc(alias = "g_thread_pool_stop_unused_threads")]
416    pub fn stop_unused_threads() {
417        unsafe {
418            ffi::g_thread_pool_stop_unused_threads();
419        }
420    }
421
422    /// This function will set the maximum @interval that a thread
423    /// waiting in the pool for new tasks can be idle for before
424    /// being stopped. This function is similar to calling
425    /// g_thread_pool_stop_unused_threads() on a regular timeout,
426    /// except this is done on a per thread basis.
427    ///
428    /// By setting @interval to 0, idle threads will not be stopped.
429    ///
430    /// The default value is 15000 (15 seconds).
431    /// ## `interval`
432    /// the maximum @interval (in milliseconds)
433    ///     a thread can be idle
434    // rustdoc-stripper-ignore-next-stop
435    /// This function will set the maximum @interval that a thread
436    /// waiting in the pool for new tasks can be idle for before
437    /// being stopped. This function is similar to calling
438    /// g_thread_pool_stop_unused_threads() on a regular timeout,
439    /// except this is done on a per thread basis.
440    ///
441    /// By setting @interval to 0, idle threads will not be stopped.
442    ///
443    /// The default value is 15000 (15 seconds).
444    /// ## `interval`
445    /// the maximum @interval (in milliseconds)
446    ///     a thread can be idle
447    #[doc(alias = "g_thread_pool_set_max_idle_time")]
448    pub fn set_max_idle_time(max_idle_time: u32) {
449        unsafe { ffi::g_thread_pool_set_max_idle_time(max_idle_time) }
450    }
451
452    /// This function will return the maximum @interval that a
453    /// thread will wait in the thread pool for new tasks before
454    /// being stopped.
455    ///
456    /// If this function returns 0, threads waiting in the thread
457    /// pool for new work are not stopped.
458    ///
459    /// # Returns
460    ///
461    /// the maximum @interval (milliseconds) to wait
462    ///     for new tasks in the thread pool before stopping the
463    ///     thread
464    // rustdoc-stripper-ignore-next-stop
465    /// This function will return the maximum @interval that a
466    /// thread will wait in the thread pool for new tasks before
467    /// being stopped.
468    ///
469    /// If this function returns 0, threads waiting in the thread
470    /// pool for new work are not stopped.
471    ///
472    /// # Returns
473    ///
474    /// the maximum @interval (milliseconds) to wait
475    ///     for new tasks in the thread pool before stopping the
476    ///     thread
477    #[doc(alias = "g_thread_pool_get_max_idle_time")]
478    #[doc(alias = "get_max_idle_time")]
479    pub fn max_idle_time() -> u32 {
480        unsafe { ffi::g_thread_pool_get_max_idle_time() }
481    }
482}
483
484impl Drop for ThreadPool {
485    #[inline]
486    fn drop(&mut self) {
487        unsafe {
488            ffi::g_thread_pool_free(self.0.as_ptr(), ffi::GFALSE, ffi::GTRUE);
489        }
490    }
491}
492
493unsafe extern "C" fn spawn_func(func: ffi::gpointer, _data: ffi::gpointer) {
494    let func: Box<Box<dyn FnOnce()>> = Box::from_raw(func as *mut _);
495    func()
496}
497
498#[cfg(test)]
499mod tests {
500    use super::*;
501
502    #[test]
503    fn test_push() {
504        use std::sync::mpsc;
505
506        let p = ThreadPool::exclusive(1).unwrap();
507        let (sender, receiver) = mpsc::channel();
508
509        let handle = p
510            .push(move || {
511                sender.send(true).unwrap();
512                123
513            })
514            .unwrap();
515
516        assert_eq!(handle.join().unwrap(), 123);
517        assert_eq!(receiver.recv(), Ok(true));
518    }
519
520    #[cfg(feature = "futures")]
521    #[test]
522    fn test_push_future() {
523        let c = crate::MainContext::new();
524        let p = ThreadPool::shared(None).unwrap();
525
526        let fut = p.push_future(|| true).unwrap();
527
528        let res = c.block_on(fut);
529        assert!(res.unwrap());
530    }
531}