glib/
thread_pool.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{future::Future, panic, ptr};
4
5use futures_channel::oneshot;
6
7use crate::{ffi, translate::*};
8
9/// The `GThreadPool` struct represents a thread pool.
10///
11/// A thread pool is useful when you wish to asynchronously fork out the execution of work
12/// and continue working in your own thread. If that will happen often, the overhead of starting
13/// and destroying a thread each time might be too high. In such cases reusing already started
14/// threads seems like a good idea. And it indeed is, but implementing this can be tedious
15/// and error-prone.
16///
17/// Therefore GLib provides thread pools for your convenience. An added advantage is, that the
18/// threads can be shared between the different subsystems of your program, when they are using GLib.
19///
20/// To create a new thread pool, you use [`new()`][Self::new()].
21/// It is destroyed by `GLib::ThreadPool::free()`.
22///
23/// If you want to execute a certain task within a thread pool, use [`push()`][Self::push()].
24///
25/// To get the current number of running threads you call [`num_threads()`][Self::num_threads()].
26/// To get the number of still unprocessed tasks you call [`unprocessed()`][Self::unprocessed()].
27/// To control the maximum number of threads for a thread pool, you use
28/// [`max_threads()`][Self::max_threads()]. and [`set_max_threads()`][Self::set_max_threads()].
29///
30/// Finally you can control the number of unused threads, that are kept alive by GLib for future use.
31/// The current number can be fetched with [`num_unused_threads()`][Self::num_unused_threads()].
32/// The maximum number can be controlled by [`max_unused_threads()`][Self::max_unused_threads()] and
33/// [`set_max_unused_threads()`][Self::set_max_unused_threads()]. All currently unused threads
34/// can be stopped by calling [`stop_unused_threads()`][Self::stop_unused_threads()].
35// rustdoc-stripper-ignore-next-stop
36/// The `GThreadPool` struct represents a thread pool.
37///
38/// A thread pool is useful when you wish to asynchronously fork out the execution of work
39/// and continue working in your own thread. If that will happen often, the overhead of starting
40/// and destroying a thread each time might be too high. In such cases reusing already started
41/// threads seems like a good idea. And it indeed is, but implementing this can be tedious
42/// and error-prone.
43///
44/// Therefore GLib provides thread pools for your convenience. An added advantage is, that the
45/// threads can be shared between the different subsystems of your program, when they are using GLib.
46///
47/// To create a new thread pool, you use [`new()`][Self::new()].
48/// It is destroyed by `GLib::ThreadPool::free()`.
49///
50/// If you want to execute a certain task within a thread pool, use [`push()`][Self::push()].
51///
52/// To get the current number of running threads you call [`num_threads()`][Self::num_threads()].
53/// To get the number of still unprocessed tasks you call [`unprocessed()`][Self::unprocessed()].
54/// To control the maximum number of threads for a thread pool, you use
55/// [`max_threads()`][Self::max_threads()]. and [`set_max_threads()`][Self::set_max_threads()].
56///
57/// Finally you can control the number of unused threads, that are kept alive by GLib for future use.
58/// The current number can be fetched with [`num_unused_threads()`][Self::num_unused_threads()].
59/// The maximum number can be controlled by [`max_unused_threads()`][Self::max_unused_threads()] and
60/// [`set_max_unused_threads()`][Self::set_max_unused_threads()]. All currently unused threads
61/// can be stopped by calling [`stop_unused_threads()`][Self::stop_unused_threads()].
62#[derive(Debug)]
63#[doc(alias = "GThreadPool")]
64pub struct ThreadPool(ptr::NonNull<ffi::GThreadPool>);
65
66unsafe impl Send for ThreadPool {}
67unsafe impl Sync for ThreadPool {}
68
69// rustdoc-stripper-ignore-next
70/// A handle to a thread running on a [`ThreadPool`].
71///
72/// Like [`std::thread::JoinHandle`] for a GLib thread. The return value from the task can be
73/// retrieved by calling [`ThreadHandle::join`]. Dropping the handle "detaches" the thread,
74/// allowing it to complete but discarding the return value.
75#[derive(Debug)]
76pub struct ThreadHandle<T> {
77    rx: std::sync::mpsc::Receiver<std::thread::Result<T>>,
78}
79
80impl<T> ThreadHandle<T> {
81    // rustdoc-stripper-ignore-next
82    /// Waits for the associated thread to finish.
83    ///
84    /// Blocks until the associated thread returns. Returns `Ok` with the value returned from the
85    /// thread, or `Err` if the thread panicked. This function will return immediately if the
86    /// associated thread has already finished.
87    #[inline]
88    pub fn join(self) -> std::thread::Result<T> {
89        self.rx.recv().unwrap()
90    }
91}
92
93impl ThreadPool {
94    #[doc(alias = "g_thread_pool_new")]
95    pub fn shared(max_threads: Option<u32>) -> Result<Self, crate::Error> {
96        unsafe {
97            let mut err = ptr::null_mut();
98            let pool = ffi::g_thread_pool_new(
99                Some(spawn_func),
100                ptr::null_mut(),
101                max_threads.map(|v| v as i32).unwrap_or(-1),
102                ffi::GFALSE,
103                &mut err,
104            );
105            if pool.is_null() {
106                Err(from_glib_full(err))
107            } else {
108                Ok(ThreadPool(ptr::NonNull::new_unchecked(pool)))
109            }
110        }
111    }
112
113    #[doc(alias = "g_thread_pool_new")]
114    pub fn exclusive(max_threads: u32) -> Result<Self, crate::Error> {
115        unsafe {
116            let mut err = ptr::null_mut();
117            let pool = ffi::g_thread_pool_new(
118                Some(spawn_func),
119                ptr::null_mut(),
120                max_threads as i32,
121                ffi::GTRUE,
122                &mut err,
123            );
124            if pool.is_null() {
125                Err(from_glib_full(err))
126            } else {
127                Ok(ThreadPool(ptr::NonNull::new_unchecked(pool)))
128            }
129        }
130    }
131
132    /// Inserts @data into the list of tasks to be executed by @self.
133    ///
134    /// When the number of currently running threads is lower than the
135    /// maximal allowed number of threads, a new thread is started (or
136    /// reused) with the properties given to g_thread_pool_new().
137    /// Otherwise, @data stays in the queue until a thread in this pool
138    /// finishes its previous task and processes @data.
139    ///
140    /// @error can be [`None`] to ignore errors, or non-[`None`] to report
141    /// errors. An error can only occur when a new thread couldn't be
142    /// created. In that case @data is simply appended to the queue of
143    /// work to do.
144    ///
145    /// Before version 2.32, this function did not return a success status.
146    ///
147    /// # Returns
148    ///
149    /// [`true`] on success, [`false`] if an error occurred
150    // rustdoc-stripper-ignore-next-stop
151    /// Inserts @data into the list of tasks to be executed by @self.
152    ///
153    /// When the number of currently running threads is lower than the
154    /// maximal allowed number of threads, a new thread is started (or
155    /// reused) with the properties given to g_thread_pool_new().
156    /// Otherwise, @data stays in the queue until a thread in this pool
157    /// finishes its previous task and processes @data.
158    ///
159    /// @error can be [`None`] to ignore errors, or non-[`None`] to report
160    /// errors. An error can only occur when a new thread couldn't be
161    /// created. In that case @data is simply appended to the queue of
162    /// work to do.
163    ///
164    /// Before version 2.32, this function did not return a success status.
165    ///
166    /// # Returns
167    ///
168    /// [`true`] on success, [`false`] if an error occurred
169    #[doc(alias = "g_thread_pool_push")]
170    pub fn push<T: Send + 'static, F: FnOnce() -> T + Send + 'static>(
171        &self,
172        func: F,
173    ) -> Result<ThreadHandle<T>, crate::Error> {
174        let (tx, rx) = std::sync::mpsc::sync_channel(1);
175        unsafe {
176            let func: Box<dyn FnOnce() + Send + 'static> = Box::new(move || {
177                let _ = tx.send(panic::catch_unwind(panic::AssertUnwindSafe(func)));
178            });
179            let func = Box::new(func);
180            let mut err = ptr::null_mut();
181
182            let func = Box::into_raw(func);
183            let ret: bool = from_glib(ffi::g_thread_pool_push(
184                self.0.as_ptr(),
185                func as *mut _,
186                &mut err,
187            ));
188            if ret {
189                Ok(ThreadHandle { rx })
190            } else {
191                let _ = Box::from_raw(func);
192                Err(from_glib_full(err))
193            }
194        }
195    }
196
197    pub fn push_future<T: Send + 'static, F: FnOnce() -> T + Send + 'static>(
198        &self,
199        func: F,
200    ) -> Result<impl Future<Output = std::thread::Result<T>> + Send + Sync + 'static, crate::Error>
201    {
202        let (sender, receiver) = oneshot::channel();
203
204        self.push(move || {
205            let _ = sender.send(panic::catch_unwind(panic::AssertUnwindSafe(func)));
206        })?;
207
208        Ok(async move { receiver.await.expect("Dropped before executing") })
209    }
210
211    /// Sets the maximal allowed number of threads for @self.
212    /// A value of -1 means that the maximal number of threads
213    /// is unlimited. If @self is an exclusive thread pool, setting
214    /// the maximal number of threads to -1 is not allowed.
215    ///
216    /// Setting @max_threads to 0 means stopping all work for @self.
217    /// It is effectively frozen until @max_threads is set to a non-zero
218    /// value again.
219    ///
220    /// A thread is never terminated while calling @func, as supplied by
221    /// g_thread_pool_new(). Instead the maximal number of threads only
222    /// has effect for the allocation of new threads in g_thread_pool_push().
223    /// A new thread is allocated, whenever the number of currently
224    /// running threads in @self is smaller than the maximal number.
225    ///
226    /// @error can be [`None`] to ignore errors, or non-[`None`] to report
227    /// errors. An error can only occur when a new thread couldn't be
228    /// created.
229    ///
230    /// Before version 2.32, this function did not return a success status.
231    /// ## `max_threads`
232    /// a new maximal number of threads for @self,
233    ///     or -1 for unlimited
234    ///
235    /// # Returns
236    ///
237    /// [`true`] on success, [`false`] if an error occurred
238    // rustdoc-stripper-ignore-next-stop
239    /// Sets the maximal allowed number of threads for @self.
240    /// A value of -1 means that the maximal number of threads
241    /// is unlimited. If @self is an exclusive thread pool, setting
242    /// the maximal number of threads to -1 is not allowed.
243    ///
244    /// Setting @max_threads to 0 means stopping all work for @self.
245    /// It is effectively frozen until @max_threads is set to a non-zero
246    /// value again.
247    ///
248    /// A thread is never terminated while calling @func, as supplied by
249    /// g_thread_pool_new(). Instead the maximal number of threads only
250    /// has effect for the allocation of new threads in g_thread_pool_push().
251    /// A new thread is allocated, whenever the number of currently
252    /// running threads in @self is smaller than the maximal number.
253    ///
254    /// @error can be [`None`] to ignore errors, or non-[`None`] to report
255    /// errors. An error can only occur when a new thread couldn't be
256    /// created.
257    ///
258    /// Before version 2.32, this function did not return a success status.
259    /// ## `max_threads`
260    /// a new maximal number of threads for @self,
261    ///     or -1 for unlimited
262    ///
263    /// # Returns
264    ///
265    /// [`true`] on success, [`false`] if an error occurred
266    #[doc(alias = "g_thread_pool_set_max_threads")]
267    pub fn set_max_threads(&self, max_threads: Option<u32>) -> Result<(), crate::Error> {
268        unsafe {
269            let mut err = ptr::null_mut();
270            let ret: bool = from_glib(ffi::g_thread_pool_set_max_threads(
271                self.0.as_ptr(),
272                max_threads.map(|v| v as i32).unwrap_or(-1),
273                &mut err,
274            ));
275            if ret {
276                Ok(())
277            } else {
278                Err(from_glib_full(err))
279            }
280        }
281    }
282
283    /// Returns the maximal number of threads for @self.
284    ///
285    /// # Returns
286    ///
287    /// the maximal number of threads
288    // rustdoc-stripper-ignore-next-stop
289    /// Returns the maximal number of threads for @self.
290    ///
291    /// # Returns
292    ///
293    /// the maximal number of threads
294    #[doc(alias = "g_thread_pool_get_max_threads")]
295    #[doc(alias = "get_max_threads")]
296    pub fn max_threads(&self) -> Option<u32> {
297        unsafe {
298            let max_threads = ffi::g_thread_pool_get_max_threads(self.0.as_ptr());
299            if max_threads == -1 {
300                None
301            } else {
302                Some(max_threads as u32)
303            }
304        }
305    }
306
307    /// Returns the number of threads currently running in @self.
308    ///
309    /// # Returns
310    ///
311    /// the number of threads currently running
312    // rustdoc-stripper-ignore-next-stop
313    /// Returns the number of threads currently running in @self.
314    ///
315    /// # Returns
316    ///
317    /// the number of threads currently running
318    #[doc(alias = "g_thread_pool_get_num_threads")]
319    #[doc(alias = "get_num_threads")]
320    pub fn num_threads(&self) -> u32 {
321        unsafe { ffi::g_thread_pool_get_num_threads(self.0.as_ptr()) }
322    }
323
324    /// Returns the number of tasks still unprocessed in @self.
325    ///
326    /// # Returns
327    ///
328    /// the number of unprocessed tasks
329    // rustdoc-stripper-ignore-next-stop
330    /// Returns the number of tasks still unprocessed in @self.
331    ///
332    /// # Returns
333    ///
334    /// the number of unprocessed tasks
335    #[doc(alias = "g_thread_pool_unprocessed")]
336    #[doc(alias = "get_unprocessed")]
337    pub fn unprocessed(&self) -> u32 {
338        unsafe { ffi::g_thread_pool_unprocessed(self.0.as_ptr()) }
339    }
340
341    /// Sets the maximal number of unused threads to @max_threads.
342    /// If @max_threads is -1, no limit is imposed on the number
343    /// of unused threads.
344    ///
345    /// The default value is 8 since GLib 2.84. Previously the default value was 2.
346    /// ## `max_threads`
347    /// maximal number of unused threads
348    // rustdoc-stripper-ignore-next-stop
349    /// Sets the maximal number of unused threads to @max_threads.
350    /// If @max_threads is -1, no limit is imposed on the number
351    /// of unused threads.
352    ///
353    /// The default value is 8 since GLib 2.84. Previously the default value was 2.
354    /// ## `max_threads`
355    /// maximal number of unused threads
356    #[doc(alias = "g_thread_pool_set_max_unused_threads")]
357    pub fn set_max_unused_threads(max_threads: Option<u32>) {
358        unsafe {
359            ffi::g_thread_pool_set_max_unused_threads(max_threads.map(|v| v as i32).unwrap_or(-1))
360        }
361    }
362
363    /// Returns the maximal allowed number of unused threads.
364    ///
365    /// # Returns
366    ///
367    /// the maximal number of unused threads
368    // rustdoc-stripper-ignore-next-stop
369    /// Returns the maximal allowed number of unused threads.
370    ///
371    /// # Returns
372    ///
373    /// the maximal number of unused threads
374    #[doc(alias = "g_thread_pool_get_max_unused_threads")]
375    #[doc(alias = "get_max_unused_threads")]
376    pub fn max_unused_threads() -> Option<u32> {
377        unsafe {
378            let max_unused_threads = ffi::g_thread_pool_get_max_unused_threads();
379            if max_unused_threads == -1 {
380                None
381            } else {
382                Some(max_unused_threads as u32)
383            }
384        }
385    }
386
387    /// Returns the number of currently unused threads.
388    ///
389    /// # Returns
390    ///
391    /// the number of currently unused threads
392    // rustdoc-stripper-ignore-next-stop
393    /// Returns the number of currently unused threads.
394    ///
395    /// # Returns
396    ///
397    /// the number of currently unused threads
398    #[doc(alias = "g_thread_pool_get_num_unused_threads")]
399    #[doc(alias = "get_num_unused_threads")]
400    pub fn num_unused_threads() -> u32 {
401        unsafe { ffi::g_thread_pool_get_num_unused_threads() }
402    }
403
404    /// Stops all currently unused threads. This does not change the
405    /// maximal number of unused threads. This function can be used to
406    /// regularly stop all unused threads e.g. from g_timeout_add().
407    // rustdoc-stripper-ignore-next-stop
408    /// Stops all currently unused threads. This does not change the
409    /// maximal number of unused threads. This function can be used to
410    /// regularly stop all unused threads e.g. from g_timeout_add().
411    #[doc(alias = "g_thread_pool_stop_unused_threads")]
412    pub fn stop_unused_threads() {
413        unsafe {
414            ffi::g_thread_pool_stop_unused_threads();
415        }
416    }
417
418    /// This function will set the maximum @interval that a thread
419    /// waiting in the pool for new tasks can be idle for before
420    /// being stopped. This function is similar to calling
421    /// g_thread_pool_stop_unused_threads() on a regular timeout,
422    /// except this is done on a per thread basis.
423    ///
424    /// By setting @interval to 0, idle threads will not be stopped.
425    ///
426    /// The default value is 15000 (15 seconds).
427    /// ## `interval`
428    /// the maximum @interval (in milliseconds)
429    ///     a thread can be idle
430    // rustdoc-stripper-ignore-next-stop
431    /// This function will set the maximum @interval that a thread
432    /// waiting in the pool for new tasks can be idle for before
433    /// being stopped. This function is similar to calling
434    /// g_thread_pool_stop_unused_threads() on a regular timeout,
435    /// except this is done on a per thread basis.
436    ///
437    /// By setting @interval to 0, idle threads will not be stopped.
438    ///
439    /// The default value is 15000 (15 seconds).
440    /// ## `interval`
441    /// the maximum @interval (in milliseconds)
442    ///     a thread can be idle
443    #[doc(alias = "g_thread_pool_set_max_idle_time")]
444    pub fn set_max_idle_time(max_idle_time: u32) {
445        unsafe { ffi::g_thread_pool_set_max_idle_time(max_idle_time) }
446    }
447
448    /// This function will return the maximum @interval that a
449    /// thread will wait in the thread pool for new tasks before
450    /// being stopped.
451    ///
452    /// If this function returns 0, threads waiting in the thread
453    /// pool for new work are not stopped.
454    ///
455    /// # Returns
456    ///
457    /// the maximum @interval (milliseconds) to wait
458    ///     for new tasks in the thread pool before stopping the
459    ///     thread
460    // rustdoc-stripper-ignore-next-stop
461    /// This function will return the maximum @interval that a
462    /// thread will wait in the thread pool for new tasks before
463    /// being stopped.
464    ///
465    /// If this function returns 0, threads waiting in the thread
466    /// pool for new work are not stopped.
467    ///
468    /// # Returns
469    ///
470    /// the maximum @interval (milliseconds) to wait
471    ///     for new tasks in the thread pool before stopping the
472    ///     thread
473    #[doc(alias = "g_thread_pool_get_max_idle_time")]
474    #[doc(alias = "get_max_idle_time")]
475    pub fn max_idle_time() -> u32 {
476        unsafe { ffi::g_thread_pool_get_max_idle_time() }
477    }
478}
479
480impl Drop for ThreadPool {
481    #[inline]
482    fn drop(&mut self) {
483        unsafe {
484            ffi::g_thread_pool_free(self.0.as_ptr(), ffi::GFALSE, ffi::GTRUE);
485        }
486    }
487}
488
489unsafe extern "C" fn spawn_func(func: ffi::gpointer, _data: ffi::gpointer) {
490    let func: Box<Box<dyn FnOnce()>> = Box::from_raw(func as *mut _);
491    func()
492}
493
494#[cfg(test)]
495mod tests {
496    use super::*;
497
498    #[test]
499    fn test_push() {
500        use std::sync::mpsc;
501
502        let p = ThreadPool::exclusive(1).unwrap();
503        let (sender, receiver) = mpsc::channel();
504
505        let handle = p
506            .push(move || {
507                sender.send(true).unwrap();
508                123
509            })
510            .unwrap();
511
512        assert_eq!(handle.join().unwrap(), 123);
513        assert_eq!(receiver.recv(), Ok(true));
514    }
515
516    #[test]
517    fn test_push_future() {
518        let c = crate::MainContext::new();
519        let p = ThreadPool::shared(None).unwrap();
520
521        let fut = p.push_future(|| true).unwrap();
522
523        let res = c.block_on(fut);
524        assert!(res.unwrap());
525    }
526}