glib/
thread_pool.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{future::Future, panic, ptr};
4
5use futures_channel::oneshot;
6
7use crate::{ffi, translate::*};
8
9/// The `GThreadPool` struct represents a thread pool.
10///
11/// A thread pool is useful when you wish to asynchronously fork out the execution of work
12/// and continue working in your own thread. If that will happen often, the overhead of starting
13/// and destroying a thread each time might be too high. In such cases reusing already started
14/// threads seems like a good idea. And it indeed is, but implementing this can be tedious
15/// and error-prone.
16///
17/// Therefore GLib provides thread pools for your convenience. An added advantage is, that the
18/// threads can be shared between the different subsystems of your program, when they are using GLib.
19///
20/// To create a new thread pool, you use [`new()`][Self::new()].
21/// It is destroyed by `GLib::ThreadPool::free()`.
22///
23/// If you want to execute a certain task within a thread pool, use [`push()`][Self::push()].
24///
25/// To get the current number of running threads you call [`num_threads()`][Self::num_threads()].
26/// To get the number of still unprocessed tasks you call [`unprocessed()`][Self::unprocessed()].
27/// To control the maximum number of threads for a thread pool, you use
28/// [`max_threads()`][Self::max_threads()]. and [`set_max_threads()`][Self::set_max_threads()].
29///
30/// Finally you can control the number of unused threads, that are kept alive by GLib for future use.
31/// The current number can be fetched with [`num_unused_threads()`][Self::num_unused_threads()].
32/// The maximum number can be controlled by [`max_unused_threads()`][Self::max_unused_threads()] and
33/// [`set_max_unused_threads()`][Self::set_max_unused_threads()]. All currently unused threads
34/// can be stopped by calling [`stop_unused_threads()`][Self::stop_unused_threads()].
35#[derive(Debug)]
36#[doc(alias = "GThreadPool")]
37pub struct ThreadPool(ptr::NonNull<ffi::GThreadPool>);
38
39unsafe impl Send for ThreadPool {}
40unsafe impl Sync for ThreadPool {}
41
42// rustdoc-stripper-ignore-next
43/// A handle to a thread running on a [`ThreadPool`].
44///
45/// Like [`std::thread::JoinHandle`] for a GLib thread. The return value from the task can be
46/// retrieved by calling [`ThreadHandle::join`]. Dropping the handle "detaches" the thread,
47/// allowing it to complete but discarding the return value.
48#[derive(Debug)]
49pub struct ThreadHandle<T> {
50    rx: std::sync::mpsc::Receiver<std::thread::Result<T>>,
51}
52
53impl<T> ThreadHandle<T> {
54    // rustdoc-stripper-ignore-next
55    /// Waits for the associated thread to finish.
56    ///
57    /// Blocks until the associated thread returns. Returns `Ok` with the value returned from the
58    /// thread, or `Err` if the thread panicked. This function will return immediately if the
59    /// associated thread has already finished.
60    #[inline]
61    pub fn join(self) -> std::thread::Result<T> {
62        self.rx.recv().unwrap()
63    }
64}
65
66impl ThreadPool {
67    #[doc(alias = "g_thread_pool_new")]
68    pub fn shared(max_threads: Option<u32>) -> Result<Self, crate::Error> {
69        unsafe {
70            let mut err = ptr::null_mut();
71            let pool = ffi::g_thread_pool_new(
72                Some(spawn_func),
73                ptr::null_mut(),
74                max_threads.map(|v| v as i32).unwrap_or(-1),
75                ffi::GFALSE,
76                &mut err,
77            );
78            if pool.is_null() {
79                Err(from_glib_full(err))
80            } else {
81                Ok(ThreadPool(ptr::NonNull::new_unchecked(pool)))
82            }
83        }
84    }
85
86    #[doc(alias = "g_thread_pool_new")]
87    pub fn exclusive(max_threads: u32) -> Result<Self, crate::Error> {
88        unsafe {
89            let mut err = ptr::null_mut();
90            let pool = ffi::g_thread_pool_new(
91                Some(spawn_func),
92                ptr::null_mut(),
93                max_threads as i32,
94                ffi::GTRUE,
95                &mut err,
96            );
97            if pool.is_null() {
98                Err(from_glib_full(err))
99            } else {
100                Ok(ThreadPool(ptr::NonNull::new_unchecked(pool)))
101            }
102        }
103    }
104
105    /// Inserts @data into the list of tasks to be executed by @self.
106    ///
107    /// When the number of currently running threads is lower than the
108    /// maximal allowed number of threads, a new thread is started (or
109    /// reused) with the properties given to g_thread_pool_new().
110    /// Otherwise, @data stays in the queue until a thread in this pool
111    /// finishes its previous task and processes @data.
112    ///
113    /// @error can be [`None`] to ignore errors, or non-[`None`] to report
114    /// errors. An error can only occur when a new thread couldn't be
115    /// created. In that case @data is simply appended to the queue of
116    /// work to do.
117    ///
118    /// Before version 2.32, this function did not return a success status.
119    ///
120    /// # Returns
121    ///
122    /// [`true`] on success, [`false`] if an error occurred
123    #[doc(alias = "g_thread_pool_push")]
124    pub fn push<T: Send + 'static, F: FnOnce() -> T + Send + 'static>(
125        &self,
126        func: F,
127    ) -> Result<ThreadHandle<T>, crate::Error> {
128        let (tx, rx) = std::sync::mpsc::sync_channel(1);
129        unsafe {
130            let func: Box<dyn FnOnce() + Send + 'static> = Box::new(move || {
131                let _ = tx.send(panic::catch_unwind(panic::AssertUnwindSafe(func)));
132            });
133            let func = Box::new(func);
134            let mut err = ptr::null_mut();
135
136            let func = Box::into_raw(func);
137            let ret: bool = from_glib(ffi::g_thread_pool_push(
138                self.0.as_ptr(),
139                func as *mut _,
140                &mut err,
141            ));
142            if ret {
143                Ok(ThreadHandle { rx })
144            } else {
145                let _ = Box::from_raw(func);
146                Err(from_glib_full(err))
147            }
148        }
149    }
150
151    pub fn push_future<T: Send + 'static, F: FnOnce() -> T + Send + 'static>(
152        &self,
153        func: F,
154    ) -> Result<impl Future<Output = std::thread::Result<T>> + Send + Sync + 'static, crate::Error>
155    {
156        let (sender, receiver) = oneshot::channel();
157
158        self.push(move || {
159            let _ = sender.send(panic::catch_unwind(panic::AssertUnwindSafe(func)));
160        })?;
161
162        Ok(async move { receiver.await.expect("Dropped before executing") })
163    }
164
165    /// Sets the maximal allowed number of threads for @self.
166    /// A value of -1 means that the maximal number of threads
167    /// is unlimited. If @self is an exclusive thread pool, setting
168    /// the maximal number of threads to -1 is not allowed.
169    ///
170    /// Setting @max_threads to 0 means stopping all work for @self.
171    /// It is effectively frozen until @max_threads is set to a non-zero
172    /// value again.
173    ///
174    /// A thread is never terminated while calling @func, as supplied by
175    /// g_thread_pool_new(). Instead the maximal number of threads only
176    /// has effect for the allocation of new threads in g_thread_pool_push().
177    /// A new thread is allocated, whenever the number of currently
178    /// running threads in @self is smaller than the maximal number.
179    ///
180    /// @error can be [`None`] to ignore errors, or non-[`None`] to report
181    /// errors. An error can only occur when a new thread couldn't be
182    /// created.
183    ///
184    /// Before version 2.32, this function did not return a success status.
185    /// ## `max_threads`
186    /// a new maximal number of threads for @self,
187    ///     or -1 for unlimited
188    ///
189    /// # Returns
190    ///
191    /// [`true`] on success, [`false`] if an error occurred
192    #[doc(alias = "g_thread_pool_set_max_threads")]
193    pub fn set_max_threads(&self, max_threads: Option<u32>) -> Result<(), crate::Error> {
194        unsafe {
195            let mut err = ptr::null_mut();
196            let ret: bool = from_glib(ffi::g_thread_pool_set_max_threads(
197                self.0.as_ptr(),
198                max_threads.map(|v| v as i32).unwrap_or(-1),
199                &mut err,
200            ));
201            if ret {
202                Ok(())
203            } else {
204                Err(from_glib_full(err))
205            }
206        }
207    }
208
209    /// Returns the maximal number of threads for @self.
210    ///
211    /// # Returns
212    ///
213    /// the maximal number of threads
214    #[doc(alias = "g_thread_pool_get_max_threads")]
215    #[doc(alias = "get_max_threads")]
216    pub fn max_threads(&self) -> Option<u32> {
217        unsafe {
218            let max_threads = ffi::g_thread_pool_get_max_threads(self.0.as_ptr());
219            if max_threads == -1 {
220                None
221            } else {
222                Some(max_threads as u32)
223            }
224        }
225    }
226
227    /// Returns the number of threads currently running in @self.
228    ///
229    /// # Returns
230    ///
231    /// the number of threads currently running
232    #[doc(alias = "g_thread_pool_get_num_threads")]
233    #[doc(alias = "get_num_threads")]
234    pub fn num_threads(&self) -> u32 {
235        unsafe { ffi::g_thread_pool_get_num_threads(self.0.as_ptr()) }
236    }
237
238    /// Returns the number of tasks still unprocessed in @self.
239    ///
240    /// # Returns
241    ///
242    /// the number of unprocessed tasks
243    #[doc(alias = "g_thread_pool_unprocessed")]
244    #[doc(alias = "get_unprocessed")]
245    pub fn unprocessed(&self) -> u32 {
246        unsafe { ffi::g_thread_pool_unprocessed(self.0.as_ptr()) }
247    }
248
249    /// Sets the maximal number of unused threads to @max_threads.
250    /// If @max_threads is -1, no limit is imposed on the number
251    /// of unused threads.
252    ///
253    /// The default value is 8 since GLib 2.84. Previously the default value was 2.
254    /// ## `max_threads`
255    /// maximal number of unused threads
256    #[doc(alias = "g_thread_pool_set_max_unused_threads")]
257    pub fn set_max_unused_threads(max_threads: Option<u32>) {
258        unsafe {
259            ffi::g_thread_pool_set_max_unused_threads(max_threads.map(|v| v as i32).unwrap_or(-1))
260        }
261    }
262
263    /// Returns the maximal allowed number of unused threads.
264    ///
265    /// # Returns
266    ///
267    /// the maximal number of unused threads
268    #[doc(alias = "g_thread_pool_get_max_unused_threads")]
269    #[doc(alias = "get_max_unused_threads")]
270    pub fn max_unused_threads() -> Option<u32> {
271        unsafe {
272            let max_unused_threads = ffi::g_thread_pool_get_max_unused_threads();
273            if max_unused_threads == -1 {
274                None
275            } else {
276                Some(max_unused_threads as u32)
277            }
278        }
279    }
280
281    /// Returns the number of currently unused threads.
282    ///
283    /// # Returns
284    ///
285    /// the number of currently unused threads
286    #[doc(alias = "g_thread_pool_get_num_unused_threads")]
287    #[doc(alias = "get_num_unused_threads")]
288    pub fn num_unused_threads() -> u32 {
289        unsafe { ffi::g_thread_pool_get_num_unused_threads() }
290    }
291
292    /// Stops all currently unused threads. This does not change the
293    /// maximal number of unused threads. This function can be used to
294    /// regularly stop all unused threads e.g. from g_timeout_add().
295    #[doc(alias = "g_thread_pool_stop_unused_threads")]
296    pub fn stop_unused_threads() {
297        unsafe {
298            ffi::g_thread_pool_stop_unused_threads();
299        }
300    }
301
302    /// This function will set the maximum @interval that a thread
303    /// waiting in the pool for new tasks can be idle for before
304    /// being stopped. This function is similar to calling
305    /// g_thread_pool_stop_unused_threads() on a regular timeout,
306    /// except this is done on a per thread basis.
307    ///
308    /// By setting @interval to 0, idle threads will not be stopped.
309    ///
310    /// The default value is 15000 (15 seconds).
311    /// ## `interval`
312    /// the maximum @interval (in milliseconds)
313    ///     a thread can be idle
314    #[doc(alias = "g_thread_pool_set_max_idle_time")]
315    pub fn set_max_idle_time(max_idle_time: u32) {
316        unsafe { ffi::g_thread_pool_set_max_idle_time(max_idle_time) }
317    }
318
319    /// This function will return the maximum @interval that a
320    /// thread will wait in the thread pool for new tasks before
321    /// being stopped.
322    ///
323    /// If this function returns 0, threads waiting in the thread
324    /// pool for new work are not stopped.
325    ///
326    /// # Returns
327    ///
328    /// the maximum @interval (milliseconds) to wait
329    ///     for new tasks in the thread pool before stopping the
330    ///     thread
331    #[doc(alias = "g_thread_pool_get_max_idle_time")]
332    #[doc(alias = "get_max_idle_time")]
333    pub fn max_idle_time() -> u32 {
334        unsafe { ffi::g_thread_pool_get_max_idle_time() }
335    }
336}
337
338impl Drop for ThreadPool {
339    #[inline]
340    fn drop(&mut self) {
341        unsafe {
342            ffi::g_thread_pool_free(self.0.as_ptr(), ffi::GFALSE, ffi::GTRUE);
343        }
344    }
345}
346
347unsafe extern "C" fn spawn_func(func: ffi::gpointer, _data: ffi::gpointer) {
348    let func: Box<Box<dyn FnOnce()>> = Box::from_raw(func as *mut _);
349    func()
350}
351
352#[cfg(test)]
353mod tests {
354    use super::*;
355
356    #[test]
357    fn test_push() {
358        use std::sync::mpsc;
359
360        let p = ThreadPool::exclusive(1).unwrap();
361        let (sender, receiver) = mpsc::channel();
362
363        let handle = p
364            .push(move || {
365                sender.send(true).unwrap();
366                123
367            })
368            .unwrap();
369
370        assert_eq!(handle.join().unwrap(), 123);
371        assert_eq!(receiver.recv(), Ok(true));
372    }
373
374    #[test]
375    fn test_push_future() {
376        let c = crate::MainContext::new();
377        let p = ThreadPool::shared(None).unwrap();
378
379        let fut = p.push_future(|| true).unwrap();
380
381        let res = c.block_on(fut);
382        assert!(res.unwrap());
383    }
384}