glib/thread_pool.rs
1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{panic, ptr};
4
5#[cfg(feature = "futures")]
6use futures_channel::oneshot;
7#[cfg(feature = "futures")]
8use std::future::Future;
9
10use crate::{ffi, translate::*};
11
12/// The `GThreadPool` struct represents a thread pool.
13///
14/// A thread pool is useful when you wish to asynchronously fork out the execution of work
15/// and continue working in your own thread. If that will happen often, the overhead of starting
16/// and destroying a thread each time might be too high. In such cases reusing already started
17/// threads seems like a good idea. And it indeed is, but implementing this can be tedious
18/// and error-prone.
19///
20/// Therefore GLib provides thread pools for your convenience. An added advantage is, that the
21/// threads can be shared between the different subsystems of your program, when they are using GLib.
22///
23/// To create a new thread pool, you use [`new()`][Self::new()].
24/// It is destroyed by `GLib::ThreadPool::free()`.
25///
26/// If you want to execute a certain task within a thread pool, use [`push()`][Self::push()].
27///
28/// To get the current number of running threads you call [`num_threads()`][Self::num_threads()].
29/// To get the number of still unprocessed tasks you call [`unprocessed()`][Self::unprocessed()].
30/// To control the maximum number of threads for a thread pool, you use
31/// [`max_threads()`][Self::max_threads()]. and [`set_max_threads()`][Self::set_max_threads()].
32///
33/// Finally you can control the number of unused threads, that are kept alive by GLib for future use.
34/// The current number can be fetched with [`num_unused_threads()`][Self::num_unused_threads()].
35/// The maximum number can be controlled by [`max_unused_threads()`][Self::max_unused_threads()] and
36/// [`set_max_unused_threads()`][Self::set_max_unused_threads()]. All currently unused threads
37/// can be stopped by calling [`stop_unused_threads()`][Self::stop_unused_threads()].
38#[derive(Debug)]
39#[doc(alias = "GThreadPool")]
40pub struct ThreadPool(ptr::NonNull<ffi::GThreadPool>);
41
42unsafe impl Send for ThreadPool {}
43unsafe impl Sync for ThreadPool {}
44
45// rustdoc-stripper-ignore-next
46/// A handle to a thread running on a [`ThreadPool`].
47///
48/// Like [`std::thread::JoinHandle`] for a GLib thread. The return value from the task can be
49/// retrieved by calling [`ThreadHandle::join`]. Dropping the handle "detaches" the thread,
50/// allowing it to complete but discarding the return value.
51#[derive(Debug)]
52pub struct ThreadHandle<T> {
53 rx: std::sync::mpsc::Receiver<std::thread::Result<T>>,
54}
55
56impl<T> ThreadHandle<T> {
57 // rustdoc-stripper-ignore-next
58 /// Waits for the associated thread to finish.
59 ///
60 /// Blocks until the associated thread returns. Returns `Ok` with the value returned from the
61 /// thread, or `Err` if the thread panicked. This function will return immediately if the
62 /// associated thread has already finished.
63 #[inline]
64 pub fn join(self) -> std::thread::Result<T> {
65 self.rx.recv().unwrap()
66 }
67}
68
69impl ThreadPool {
70 #[doc(alias = "g_thread_pool_new")]
71 pub fn shared(max_threads: Option<u32>) -> Result<Self, crate::Error> {
72 unsafe {
73 let mut err = ptr::null_mut();
74 let pool = ffi::g_thread_pool_new(
75 Some(spawn_func),
76 ptr::null_mut(),
77 max_threads.map(|v| v as i32).unwrap_or(-1),
78 ffi::GFALSE,
79 &mut err,
80 );
81 if pool.is_null() {
82 Err(from_glib_full(err))
83 } else {
84 Ok(ThreadPool(ptr::NonNull::new_unchecked(pool)))
85 }
86 }
87 }
88
89 #[doc(alias = "g_thread_pool_new")]
90 pub fn exclusive(max_threads: u32) -> Result<Self, crate::Error> {
91 unsafe {
92 let mut err = ptr::null_mut();
93 let pool = ffi::g_thread_pool_new(
94 Some(spawn_func),
95 ptr::null_mut(),
96 max_threads as i32,
97 ffi::GTRUE,
98 &mut err,
99 );
100 if pool.is_null() {
101 Err(from_glib_full(err))
102 } else {
103 Ok(ThreadPool(ptr::NonNull::new_unchecked(pool)))
104 }
105 }
106 }
107
108 /// Inserts @data into the list of tasks to be executed by @self.
109 ///
110 /// When the number of currently running threads is lower than the
111 /// maximal allowed number of threads, a new thread is started (or
112 /// reused) with the properties given to g_thread_pool_new().
113 /// Otherwise, @data stays in the queue until a thread in this pool
114 /// finishes its previous task and processes @data.
115 ///
116 /// @error can be [`None`] to ignore errors, or non-[`None`] to report
117 /// errors. An error can only occur when a new thread couldn't be
118 /// created. In that case @data is simply appended to the queue of
119 /// work to do.
120 ///
121 /// Before version 2.32, this function did not return a success status.
122 ///
123 /// # Returns
124 ///
125 /// [`true`] on success, [`false`] if an error occurred
126 #[doc(alias = "g_thread_pool_push")]
127 pub fn push<T: Send + 'static, F: FnOnce() -> T + Send + 'static>(
128 &self,
129 func: F,
130 ) -> Result<ThreadHandle<T>, crate::Error> {
131 let (tx, rx) = std::sync::mpsc::sync_channel(1);
132 unsafe {
133 let func: Box<dyn FnOnce() + Send + 'static> = Box::new(move || {
134 let _ = tx.send(panic::catch_unwind(panic::AssertUnwindSafe(func)));
135 });
136 let func = Box::new(func);
137 let mut err = ptr::null_mut();
138
139 let func = Box::into_raw(func);
140 let ret: bool = from_glib(ffi::g_thread_pool_push(
141 self.0.as_ptr(),
142 func as *mut _,
143 &mut err,
144 ));
145 if ret {
146 Ok(ThreadHandle { rx })
147 } else {
148 let _ = Box::from_raw(func);
149 Err(from_glib_full(err))
150 }
151 }
152 }
153
154 #[cfg(feature = "futures")]
155 pub fn push_future<T: Send + 'static, F: FnOnce() -> T + Send + 'static>(
156 &self,
157 func: F,
158 ) -> Result<impl Future<Output = std::thread::Result<T>> + Send + Sync + 'static, crate::Error>
159 {
160 let (sender, receiver) = oneshot::channel();
161
162 self.push(move || {
163 let _ = sender.send(panic::catch_unwind(panic::AssertUnwindSafe(func)));
164 })?;
165
166 Ok(async move { receiver.await.expect("Dropped before executing") })
167 }
168
169 /// Sets the maximal allowed number of threads for @self.
170 /// A value of -1 means that the maximal number of threads
171 /// is unlimited. If @self is an exclusive thread pool, setting
172 /// the maximal number of threads to -1 is not allowed.
173 ///
174 /// Setting @max_threads to 0 means stopping all work for @self.
175 /// It is effectively frozen until @max_threads is set to a non-zero
176 /// value again.
177 ///
178 /// A thread is never terminated while calling @func, as supplied by
179 /// g_thread_pool_new(). Instead the maximal number of threads only
180 /// has effect for the allocation of new threads in g_thread_pool_push().
181 /// A new thread is allocated, whenever the number of currently
182 /// running threads in @self is smaller than the maximal number.
183 ///
184 /// @error can be [`None`] to ignore errors, or non-[`None`] to report
185 /// errors. An error can only occur when a new thread couldn't be
186 /// created.
187 ///
188 /// Before version 2.32, this function did not return a success status.
189 /// ## `max_threads`
190 /// a new maximal number of threads for @self,
191 /// or -1 for unlimited
192 ///
193 /// # Returns
194 ///
195 /// [`true`] on success, [`false`] if an error occurred
196 #[doc(alias = "g_thread_pool_set_max_threads")]
197 pub fn set_max_threads(&self, max_threads: Option<u32>) -> Result<(), crate::Error> {
198 unsafe {
199 let mut err = ptr::null_mut();
200 let ret: bool = from_glib(ffi::g_thread_pool_set_max_threads(
201 self.0.as_ptr(),
202 max_threads.map(|v| v as i32).unwrap_or(-1),
203 &mut err,
204 ));
205 if ret {
206 Ok(())
207 } else {
208 Err(from_glib_full(err))
209 }
210 }
211 }
212
213 /// Returns the maximal number of threads for @self.
214 ///
215 /// # Returns
216 ///
217 /// the maximal number of threads
218 #[doc(alias = "g_thread_pool_get_max_threads")]
219 #[doc(alias = "get_max_threads")]
220 pub fn max_threads(&self) -> Option<u32> {
221 unsafe {
222 let max_threads = ffi::g_thread_pool_get_max_threads(self.0.as_ptr());
223 if max_threads == -1 {
224 None
225 } else {
226 Some(max_threads as u32)
227 }
228 }
229 }
230
231 /// Returns the number of threads currently running in @self.
232 ///
233 /// # Returns
234 ///
235 /// the number of threads currently running
236 #[doc(alias = "g_thread_pool_get_num_threads")]
237 #[doc(alias = "get_num_threads")]
238 pub fn num_threads(&self) -> u32 {
239 unsafe { ffi::g_thread_pool_get_num_threads(self.0.as_ptr()) }
240 }
241
242 /// Returns the number of tasks still unprocessed in @self.
243 ///
244 /// # Returns
245 ///
246 /// the number of unprocessed tasks
247 #[doc(alias = "g_thread_pool_unprocessed")]
248 #[doc(alias = "get_unprocessed")]
249 pub fn unprocessed(&self) -> u32 {
250 unsafe { ffi::g_thread_pool_unprocessed(self.0.as_ptr()) }
251 }
252
253 /// Sets the maximal number of unused threads to @max_threads.
254 /// If @max_threads is -1, no limit is imposed on the number
255 /// of unused threads.
256 ///
257 /// The default value is 8 since GLib 2.84. Previously the default value was 2.
258 /// ## `max_threads`
259 /// maximal number of unused threads
260 #[doc(alias = "g_thread_pool_set_max_unused_threads")]
261 pub fn set_max_unused_threads(max_threads: Option<u32>) {
262 unsafe {
263 ffi::g_thread_pool_set_max_unused_threads(max_threads.map(|v| v as i32).unwrap_or(-1))
264 }
265 }
266
267 /// Returns the maximal allowed number of unused threads.
268 ///
269 /// # Returns
270 ///
271 /// the maximal number of unused threads
272 #[doc(alias = "g_thread_pool_get_max_unused_threads")]
273 #[doc(alias = "get_max_unused_threads")]
274 pub fn max_unused_threads() -> Option<u32> {
275 unsafe {
276 let max_unused_threads = ffi::g_thread_pool_get_max_unused_threads();
277 if max_unused_threads == -1 {
278 None
279 } else {
280 Some(max_unused_threads as u32)
281 }
282 }
283 }
284
285 /// Returns the number of currently unused threads.
286 ///
287 /// # Returns
288 ///
289 /// the number of currently unused threads
290 #[doc(alias = "g_thread_pool_get_num_unused_threads")]
291 #[doc(alias = "get_num_unused_threads")]
292 pub fn num_unused_threads() -> u32 {
293 unsafe { ffi::g_thread_pool_get_num_unused_threads() }
294 }
295
296 /// Stops all currently unused threads. This does not change the
297 /// maximal number of unused threads. This function can be used to
298 /// regularly stop all unused threads e.g. from g_timeout_add().
299 #[doc(alias = "g_thread_pool_stop_unused_threads")]
300 pub fn stop_unused_threads() {
301 unsafe {
302 ffi::g_thread_pool_stop_unused_threads();
303 }
304 }
305
306 /// This function will set the maximum @interval that a thread
307 /// waiting in the pool for new tasks can be idle for before
308 /// being stopped. This function is similar to calling
309 /// g_thread_pool_stop_unused_threads() on a regular timeout,
310 /// except this is done on a per thread basis.
311 ///
312 /// By setting @interval to 0, idle threads will not be stopped.
313 ///
314 /// The default value is 15000 (15 seconds).
315 /// ## `interval`
316 /// the maximum @interval (in milliseconds)
317 /// a thread can be idle
318 #[doc(alias = "g_thread_pool_set_max_idle_time")]
319 pub fn set_max_idle_time(max_idle_time: u32) {
320 unsafe { ffi::g_thread_pool_set_max_idle_time(max_idle_time) }
321 }
322
323 /// This function will return the maximum @interval that a
324 /// thread will wait in the thread pool for new tasks before
325 /// being stopped.
326 ///
327 /// If this function returns 0, threads waiting in the thread
328 /// pool for new work are not stopped.
329 ///
330 /// # Returns
331 ///
332 /// the maximum @interval (milliseconds) to wait
333 /// for new tasks in the thread pool before stopping the
334 /// thread
335 #[doc(alias = "g_thread_pool_get_max_idle_time")]
336 #[doc(alias = "get_max_idle_time")]
337 pub fn max_idle_time() -> u32 {
338 unsafe { ffi::g_thread_pool_get_max_idle_time() }
339 }
340}
341
342impl Drop for ThreadPool {
343 #[inline]
344 fn drop(&mut self) {
345 unsafe {
346 ffi::g_thread_pool_free(self.0.as_ptr(), ffi::GFALSE, ffi::GTRUE);
347 }
348 }
349}
350
351unsafe extern "C" fn spawn_func(func: ffi::gpointer, _data: ffi::gpointer) {
352 let func: Box<Box<dyn FnOnce()>> = Box::from_raw(func as *mut _);
353 func()
354}
355
356#[cfg(test)]
357mod tests {
358 use super::*;
359
360 #[test]
361 fn test_push() {
362 use std::sync::mpsc;
363
364 let p = ThreadPool::exclusive(1).unwrap();
365 let (sender, receiver) = mpsc::channel();
366
367 let handle = p
368 .push(move || {
369 sender.send(true).unwrap();
370 123
371 })
372 .unwrap();
373
374 assert_eq!(handle.join().unwrap(), 123);
375 assert_eq!(receiver.recv(), Ok(true));
376 }
377
378 #[cfg(feature = "futures")]
379 #[test]
380 fn test_push_future() {
381 let c = crate::MainContext::new();
382 let p = ThreadPool::shared(None).unwrap();
383
384 let fut = p.push_future(|| true).unwrap();
385
386 let res = c.block_on(fut);
387 assert!(res.unwrap());
388 }
389}