glib/
thread_pool.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{panic, ptr};
4
5#[cfg(feature = "futures")]
6use futures_channel::oneshot;
7#[cfg(feature = "futures")]
8use std::future::Future;
9
10use crate::{ffi, translate::*};
11
12/// The `GThreadPool` struct represents a thread pool.
13///
14/// A thread pool is useful when you wish to asynchronously fork out the execution of work
15/// and continue working in your own thread. If that will happen often, the overhead of starting
16/// and destroying a thread each time might be too high. In such cases reusing already started
17/// threads seems like a good idea. And it indeed is, but implementing this can be tedious
18/// and error-prone.
19///
20/// Therefore GLib provides thread pools for your convenience. An added advantage is, that the
21/// threads can be shared between the different subsystems of your program, when they are using GLib.
22///
23/// To create a new thread pool, you use [`new()`][Self::new()].
24/// It is destroyed by `GLib::ThreadPool::free()`.
25///
26/// If you want to execute a certain task within a thread pool, use [`push()`][Self::push()].
27///
28/// To get the current number of running threads you call [`num_threads()`][Self::num_threads()].
29/// To get the number of still unprocessed tasks you call [`unprocessed()`][Self::unprocessed()].
30/// To control the maximum number of threads for a thread pool, you use
31/// [`max_threads()`][Self::max_threads()]. and [`set_max_threads()`][Self::set_max_threads()].
32///
33/// Finally you can control the number of unused threads, that are kept alive by GLib for future use.
34/// The current number can be fetched with [`num_unused_threads()`][Self::num_unused_threads()].
35/// The maximum number can be controlled by [`max_unused_threads()`][Self::max_unused_threads()] and
36/// [`set_max_unused_threads()`][Self::set_max_unused_threads()]. All currently unused threads
37/// can be stopped by calling [`stop_unused_threads()`][Self::stop_unused_threads()].
38#[derive(Debug)]
39#[doc(alias = "GThreadPool")]
40pub struct ThreadPool(ptr::NonNull<ffi::GThreadPool>);
41
42unsafe impl Send for ThreadPool {}
43unsafe impl Sync for ThreadPool {}
44
45// rustdoc-stripper-ignore-next
46/// A handle to a thread running on a [`ThreadPool`].
47///
48/// Like [`std::thread::JoinHandle`] for a GLib thread. The return value from the task can be
49/// retrieved by calling [`ThreadHandle::join`]. Dropping the handle "detaches" the thread,
50/// allowing it to complete but discarding the return value.
51#[derive(Debug)]
52pub struct ThreadHandle<T> {
53    rx: std::sync::mpsc::Receiver<std::thread::Result<T>>,
54}
55
56impl<T> ThreadHandle<T> {
57    // rustdoc-stripper-ignore-next
58    /// Waits for the associated thread to finish.
59    ///
60    /// Blocks until the associated thread returns. Returns `Ok` with the value returned from the
61    /// thread, or `Err` if the thread panicked. This function will return immediately if the
62    /// associated thread has already finished.
63    #[inline]
64    pub fn join(self) -> std::thread::Result<T> {
65        self.rx.recv().unwrap()
66    }
67}
68
69impl ThreadPool {
70    #[doc(alias = "g_thread_pool_new")]
71    pub fn shared(max_threads: Option<u32>) -> Result<Self, crate::Error> {
72        unsafe {
73            let mut err = ptr::null_mut();
74            let pool = ffi::g_thread_pool_new(
75                Some(spawn_func),
76                ptr::null_mut(),
77                max_threads.map(|v| v as i32).unwrap_or(-1),
78                ffi::GFALSE,
79                &mut err,
80            );
81            if pool.is_null() {
82                Err(from_glib_full(err))
83            } else {
84                Ok(ThreadPool(ptr::NonNull::new_unchecked(pool)))
85            }
86        }
87    }
88
89    #[doc(alias = "g_thread_pool_new")]
90    pub fn exclusive(max_threads: u32) -> Result<Self, crate::Error> {
91        unsafe {
92            let mut err = ptr::null_mut();
93            let pool = ffi::g_thread_pool_new(
94                Some(spawn_func),
95                ptr::null_mut(),
96                max_threads as i32,
97                ffi::GTRUE,
98                &mut err,
99            );
100            if pool.is_null() {
101                Err(from_glib_full(err))
102            } else {
103                Ok(ThreadPool(ptr::NonNull::new_unchecked(pool)))
104            }
105        }
106    }
107
108    /// Inserts @data into the list of tasks to be executed by @self.
109    ///
110    /// When the number of currently running threads is lower than the
111    /// maximal allowed number of threads, a new thread is started (or
112    /// reused) with the properties given to g_thread_pool_new().
113    /// Otherwise, @data stays in the queue until a thread in this pool
114    /// finishes its previous task and processes @data.
115    ///
116    /// @error can be [`None`] to ignore errors, or non-[`None`] to report
117    /// errors. An error can only occur when a new thread couldn't be
118    /// created. In that case @data is simply appended to the queue of
119    /// work to do.
120    ///
121    /// Before version 2.32, this function did not return a success status.
122    ///
123    /// # Returns
124    ///
125    /// [`true`] on success, [`false`] if an error occurred
126    #[doc(alias = "g_thread_pool_push")]
127    pub fn push<T: Send + 'static, F: FnOnce() -> T + Send + 'static>(
128        &self,
129        func: F,
130    ) -> Result<ThreadHandle<T>, crate::Error> {
131        let (tx, rx) = std::sync::mpsc::sync_channel(1);
132        unsafe {
133            let func: Box<dyn FnOnce() + Send + 'static> = Box::new(move || {
134                let _ = tx.send(panic::catch_unwind(panic::AssertUnwindSafe(func)));
135            });
136            let func = Box::new(func);
137            let mut err = ptr::null_mut();
138
139            let func = Box::into_raw(func);
140            let ret: bool = from_glib(ffi::g_thread_pool_push(
141                self.0.as_ptr(),
142                func as *mut _,
143                &mut err,
144            ));
145            if ret {
146                Ok(ThreadHandle { rx })
147            } else {
148                let _ = Box::from_raw(func);
149                Err(from_glib_full(err))
150            }
151        }
152    }
153
154    #[cfg(feature = "futures")]
155    pub fn push_future<T: Send + 'static, F: FnOnce() -> T + Send + 'static>(
156        &self,
157        func: F,
158    ) -> Result<impl Future<Output = std::thread::Result<T>> + Send + Sync + 'static, crate::Error>
159    {
160        let (sender, receiver) = oneshot::channel();
161
162        self.push(move || {
163            let _ = sender.send(panic::catch_unwind(panic::AssertUnwindSafe(func)));
164        })?;
165
166        Ok(async move { receiver.await.expect("Dropped before executing") })
167    }
168
169    /// Sets the maximal allowed number of threads for @self.
170    /// A value of -1 means that the maximal number of threads
171    /// is unlimited. If @self is an exclusive thread pool, setting
172    /// the maximal number of threads to -1 is not allowed.
173    ///
174    /// Setting @max_threads to 0 means stopping all work for @self.
175    /// It is effectively frozen until @max_threads is set to a non-zero
176    /// value again.
177    ///
178    /// A thread is never terminated while calling @func, as supplied by
179    /// g_thread_pool_new(). Instead the maximal number of threads only
180    /// has effect for the allocation of new threads in g_thread_pool_push().
181    /// A new thread is allocated, whenever the number of currently
182    /// running threads in @self is smaller than the maximal number.
183    ///
184    /// @error can be [`None`] to ignore errors, or non-[`None`] to report
185    /// errors. An error can only occur when a new thread couldn't be
186    /// created.
187    ///
188    /// Before version 2.32, this function did not return a success status.
189    /// ## `max_threads`
190    /// a new maximal number of threads for @self,
191    ///     or -1 for unlimited
192    ///
193    /// # Returns
194    ///
195    /// [`true`] on success, [`false`] if an error occurred
196    #[doc(alias = "g_thread_pool_set_max_threads")]
197    pub fn set_max_threads(&self, max_threads: Option<u32>) -> Result<(), crate::Error> {
198        unsafe {
199            let mut err = ptr::null_mut();
200            let ret: bool = from_glib(ffi::g_thread_pool_set_max_threads(
201                self.0.as_ptr(),
202                max_threads.map(|v| v as i32).unwrap_or(-1),
203                &mut err,
204            ));
205            if ret {
206                Ok(())
207            } else {
208                Err(from_glib_full(err))
209            }
210        }
211    }
212
213    /// Returns the maximal number of threads for @self.
214    ///
215    /// # Returns
216    ///
217    /// the maximal number of threads
218    #[doc(alias = "g_thread_pool_get_max_threads")]
219    #[doc(alias = "get_max_threads")]
220    pub fn max_threads(&self) -> Option<u32> {
221        unsafe {
222            let max_threads = ffi::g_thread_pool_get_max_threads(self.0.as_ptr());
223            if max_threads == -1 {
224                None
225            } else {
226                Some(max_threads as u32)
227            }
228        }
229    }
230
231    /// Returns the number of threads currently running in @self.
232    ///
233    /// # Returns
234    ///
235    /// the number of threads currently running
236    #[doc(alias = "g_thread_pool_get_num_threads")]
237    #[doc(alias = "get_num_threads")]
238    pub fn num_threads(&self) -> u32 {
239        unsafe { ffi::g_thread_pool_get_num_threads(self.0.as_ptr()) }
240    }
241
242    /// Returns the number of tasks still unprocessed in @self.
243    ///
244    /// # Returns
245    ///
246    /// the number of unprocessed tasks
247    #[doc(alias = "g_thread_pool_unprocessed")]
248    #[doc(alias = "get_unprocessed")]
249    pub fn unprocessed(&self) -> u32 {
250        unsafe { ffi::g_thread_pool_unprocessed(self.0.as_ptr()) }
251    }
252
253    /// Sets the maximal number of unused threads to @max_threads.
254    /// If @max_threads is -1, no limit is imposed on the number
255    /// of unused threads.
256    ///
257    /// The default value is 8 since GLib 2.84. Previously the default value was 2.
258    /// ## `max_threads`
259    /// maximal number of unused threads
260    #[doc(alias = "g_thread_pool_set_max_unused_threads")]
261    pub fn set_max_unused_threads(max_threads: Option<u32>) {
262        unsafe {
263            ffi::g_thread_pool_set_max_unused_threads(max_threads.map(|v| v as i32).unwrap_or(-1))
264        }
265    }
266
267    /// Returns the maximal allowed number of unused threads.
268    ///
269    /// # Returns
270    ///
271    /// the maximal number of unused threads
272    #[doc(alias = "g_thread_pool_get_max_unused_threads")]
273    #[doc(alias = "get_max_unused_threads")]
274    pub fn max_unused_threads() -> Option<u32> {
275        unsafe {
276            let max_unused_threads = ffi::g_thread_pool_get_max_unused_threads();
277            if max_unused_threads == -1 {
278                None
279            } else {
280                Some(max_unused_threads as u32)
281            }
282        }
283    }
284
285    /// Returns the number of currently unused threads.
286    ///
287    /// # Returns
288    ///
289    /// the number of currently unused threads
290    #[doc(alias = "g_thread_pool_get_num_unused_threads")]
291    #[doc(alias = "get_num_unused_threads")]
292    pub fn num_unused_threads() -> u32 {
293        unsafe { ffi::g_thread_pool_get_num_unused_threads() }
294    }
295
296    /// Stops all currently unused threads. This does not change the
297    /// maximal number of unused threads. This function can be used to
298    /// regularly stop all unused threads e.g. from g_timeout_add().
299    #[doc(alias = "g_thread_pool_stop_unused_threads")]
300    pub fn stop_unused_threads() {
301        unsafe {
302            ffi::g_thread_pool_stop_unused_threads();
303        }
304    }
305
306    /// This function will set the maximum @interval that a thread
307    /// waiting in the pool for new tasks can be idle for before
308    /// being stopped. This function is similar to calling
309    /// g_thread_pool_stop_unused_threads() on a regular timeout,
310    /// except this is done on a per thread basis.
311    ///
312    /// By setting @interval to 0, idle threads will not be stopped.
313    ///
314    /// The default value is 15000 (15 seconds).
315    /// ## `interval`
316    /// the maximum @interval (in milliseconds)
317    ///     a thread can be idle
318    #[doc(alias = "g_thread_pool_set_max_idle_time")]
319    pub fn set_max_idle_time(max_idle_time: u32) {
320        unsafe { ffi::g_thread_pool_set_max_idle_time(max_idle_time) }
321    }
322
323    /// This function will return the maximum @interval that a
324    /// thread will wait in the thread pool for new tasks before
325    /// being stopped.
326    ///
327    /// If this function returns 0, threads waiting in the thread
328    /// pool for new work are not stopped.
329    ///
330    /// # Returns
331    ///
332    /// the maximum @interval (milliseconds) to wait
333    ///     for new tasks in the thread pool before stopping the
334    ///     thread
335    #[doc(alias = "g_thread_pool_get_max_idle_time")]
336    #[doc(alias = "get_max_idle_time")]
337    pub fn max_idle_time() -> u32 {
338        unsafe { ffi::g_thread_pool_get_max_idle_time() }
339    }
340}
341
342impl Drop for ThreadPool {
343    #[inline]
344    fn drop(&mut self) {
345        unsafe {
346            ffi::g_thread_pool_free(self.0.as_ptr(), ffi::GFALSE, ffi::GTRUE);
347        }
348    }
349}
350
351unsafe extern "C" fn spawn_func(func: ffi::gpointer, _data: ffi::gpointer) {
352    let func: Box<Box<dyn FnOnce()>> = Box::from_raw(func as *mut _);
353    func()
354}
355
356#[cfg(test)]
357mod tests {
358    use super::*;
359
360    #[test]
361    fn test_push() {
362        use std::sync::mpsc;
363
364        let p = ThreadPool::exclusive(1).unwrap();
365        let (sender, receiver) = mpsc::channel();
366
367        let handle = p
368            .push(move || {
369                sender.send(true).unwrap();
370                123
371            })
372            .unwrap();
373
374        assert_eq!(handle.join().unwrap(), 123);
375        assert_eq!(receiver.recv(), Ok(true));
376    }
377
378    #[cfg(feature = "futures")]
379    #[test]
380    fn test_push_future() {
381        let c = crate::MainContext::new();
382        let p = ThreadPool::shared(None).unwrap();
383
384        let fut = p.push_future(|| true).unwrap();
385
386        let res = c.block_on(fut);
387        assert!(res.unwrap());
388    }
389}