glib/thread_pool.rs
1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{panic, ptr};
4
5#[cfg(feature = "futures")]
6use futures_channel::oneshot;
7#[cfg(feature = "futures")]
8use std::future::Future;
9
10use crate::{ffi, translate::*};
11
12/// The `GThreadPool` struct represents a thread pool.
13///
14/// A thread pool is useful when you wish to asynchronously fork out the execution of work
15/// and continue working in your own thread. If that will happen often, the overhead of starting
16/// and destroying a thread each time might be too high. In such cases reusing already started
17/// threads seems like a good idea. And it indeed is, but implementing this can be tedious
18/// and error-prone.
19///
20/// Therefore GLib provides thread pools for your convenience. An added advantage is, that the
21/// threads can be shared between the different subsystems of your program, when they are using GLib.
22///
23/// To create a new thread pool, you use [`new()`][Self::new()].
24/// It is destroyed by `GLib::ThreadPool::free()`.
25///
26/// If you want to execute a certain task within a thread pool, use [`push()`][Self::push()].
27///
28/// To get the current number of running threads you call [`num_threads()`][Self::num_threads()].
29/// To get the number of still unprocessed tasks you call [`unprocessed()`][Self::unprocessed()].
30/// To control the maximum number of threads for a thread pool, you use
31/// [`max_threads()`][Self::max_threads()]. and [`set_max_threads()`][Self::set_max_threads()].
32///
33/// Finally you can control the number of unused threads, that are kept alive by GLib for future use.
34/// The current number can be fetched with [`num_unused_threads()`][Self::num_unused_threads()].
35/// The maximum number can be controlled by [`max_unused_threads()`][Self::max_unused_threads()] and
36/// [`set_max_unused_threads()`][Self::set_max_unused_threads()]. All currently unused threads
37/// can be stopped by calling [`stop_unused_threads()`][Self::stop_unused_threads()].
38// rustdoc-stripper-ignore-next-stop
39/// The `GThreadPool` struct represents a thread pool.
40///
41/// A thread pool is useful when you wish to asynchronously fork out the execution of work
42/// and continue working in your own thread. If that will happen often, the overhead of starting
43/// and destroying a thread each time might be too high. In such cases reusing already started
44/// threads seems like a good idea. And it indeed is, but implementing this can be tedious
45/// and error-prone.
46///
47/// Therefore GLib provides thread pools for your convenience. An added advantage is, that the
48/// threads can be shared between the different subsystems of your program, when they are using GLib.
49///
50/// To create a new thread pool, you use [`new()`][Self::new()].
51/// It is destroyed by `GLib::ThreadPool::free()`.
52///
53/// If you want to execute a certain task within a thread pool, use [`push()`][Self::push()].
54///
55/// To get the current number of running threads you call [`num_threads()`][Self::num_threads()].
56/// To get the number of still unprocessed tasks you call [`unprocessed()`][Self::unprocessed()].
57/// To control the maximum number of threads for a thread pool, you use
58/// [`max_threads()`][Self::max_threads()]. and [`set_max_threads()`][Self::set_max_threads()].
59///
60/// Finally you can control the number of unused threads, that are kept alive by GLib for future use.
61/// The current number can be fetched with [`num_unused_threads()`][Self::num_unused_threads()].
62/// The maximum number can be controlled by [`max_unused_threads()`][Self::max_unused_threads()] and
63/// [`set_max_unused_threads()`][Self::set_max_unused_threads()]. All currently unused threads
64/// can be stopped by calling [`stop_unused_threads()`][Self::stop_unused_threads()].
65#[derive(Debug)]
66#[doc(alias = "GThreadPool")]
67pub struct ThreadPool(ptr::NonNull<ffi::GThreadPool>);
68
69unsafe impl Send for ThreadPool {}
70unsafe impl Sync for ThreadPool {}
71
72// rustdoc-stripper-ignore-next
73/// A handle to a thread running on a [`ThreadPool`].
74///
75/// Like [`std::thread::JoinHandle`] for a GLib thread. The return value from the task can be
76/// retrieved by calling [`ThreadHandle::join`]. Dropping the handle "detaches" the thread,
77/// allowing it to complete but discarding the return value.
78#[derive(Debug)]
79pub struct ThreadHandle<T> {
80 rx: std::sync::mpsc::Receiver<std::thread::Result<T>>,
81}
82
83impl<T> ThreadHandle<T> {
84 // rustdoc-stripper-ignore-next
85 /// Waits for the associated thread to finish.
86 ///
87 /// Blocks until the associated thread returns. Returns `Ok` with the value returned from the
88 /// thread, or `Err` if the thread panicked. This function will return immediately if the
89 /// associated thread has already finished.
90 #[inline]
91 pub fn join(self) -> std::thread::Result<T> {
92 self.rx.recv().unwrap()
93 }
94}
95
96impl ThreadPool {
97 #[doc(alias = "g_thread_pool_new")]
98 pub fn shared(max_threads: Option<u32>) -> Result<Self, crate::Error> {
99 unsafe {
100 let mut err = ptr::null_mut();
101 let pool = ffi::g_thread_pool_new(
102 Some(spawn_func),
103 ptr::null_mut(),
104 max_threads.map(|v| v as i32).unwrap_or(-1),
105 ffi::GFALSE,
106 &mut err,
107 );
108 if pool.is_null() {
109 Err(from_glib_full(err))
110 } else {
111 Ok(ThreadPool(ptr::NonNull::new_unchecked(pool)))
112 }
113 }
114 }
115
116 #[doc(alias = "g_thread_pool_new")]
117 pub fn exclusive(max_threads: u32) -> Result<Self, crate::Error> {
118 unsafe {
119 let mut err = ptr::null_mut();
120 let pool = ffi::g_thread_pool_new(
121 Some(spawn_func),
122 ptr::null_mut(),
123 max_threads as i32,
124 ffi::GTRUE,
125 &mut err,
126 );
127 if pool.is_null() {
128 Err(from_glib_full(err))
129 } else {
130 Ok(ThreadPool(ptr::NonNull::new_unchecked(pool)))
131 }
132 }
133 }
134
135 /// Inserts @data into the list of tasks to be executed by @self.
136 ///
137 /// When the number of currently running threads is lower than the
138 /// maximal allowed number of threads, a new thread is started (or
139 /// reused) with the properties given to g_thread_pool_new().
140 /// Otherwise, @data stays in the queue until a thread in this pool
141 /// finishes its previous task and processes @data.
142 ///
143 /// @error can be [`None`] to ignore errors, or non-[`None`] to report
144 /// errors. An error can only occur when a new thread couldn't be
145 /// created. In that case @data is simply appended to the queue of
146 /// work to do.
147 ///
148 /// Before version 2.32, this function did not return a success status.
149 ///
150 /// # Returns
151 ///
152 /// [`true`] on success, [`false`] if an error occurred
153 // rustdoc-stripper-ignore-next-stop
154 /// Inserts @data into the list of tasks to be executed by @self.
155 ///
156 /// When the number of currently running threads is lower than the
157 /// maximal allowed number of threads, a new thread is started (or
158 /// reused) with the properties given to g_thread_pool_new().
159 /// Otherwise, @data stays in the queue until a thread in this pool
160 /// finishes its previous task and processes @data.
161 ///
162 /// @error can be [`None`] to ignore errors, or non-[`None`] to report
163 /// errors. An error can only occur when a new thread couldn't be
164 /// created. In that case @data is simply appended to the queue of
165 /// work to do.
166 ///
167 /// Before version 2.32, this function did not return a success status.
168 ///
169 /// # Returns
170 ///
171 /// [`true`] on success, [`false`] if an error occurred
172 #[doc(alias = "g_thread_pool_push")]
173 pub fn push<T: Send + 'static, F: FnOnce() -> T + Send + 'static>(
174 &self,
175 func: F,
176 ) -> Result<ThreadHandle<T>, crate::Error> {
177 let (tx, rx) = std::sync::mpsc::sync_channel(1);
178 unsafe {
179 let func: Box<dyn FnOnce() + Send + 'static> = Box::new(move || {
180 let _ = tx.send(panic::catch_unwind(panic::AssertUnwindSafe(func)));
181 });
182 let func = Box::new(func);
183 let mut err = ptr::null_mut();
184
185 let func = Box::into_raw(func);
186 let ret: bool = from_glib(ffi::g_thread_pool_push(
187 self.0.as_ptr(),
188 func as *mut _,
189 &mut err,
190 ));
191 if ret {
192 Ok(ThreadHandle { rx })
193 } else {
194 let _ = Box::from_raw(func);
195 Err(from_glib_full(err))
196 }
197 }
198 }
199
200 #[cfg(feature = "futures")]
201 pub fn push_future<T: Send + 'static, F: FnOnce() -> T + Send + 'static>(
202 &self,
203 func: F,
204 ) -> Result<
205 impl Future<Output = std::thread::Result<T>> + Send + Sync + 'static + use<T, F>,
206 crate::Error,
207 > {
208 let (sender, receiver) = oneshot::channel();
209
210 self.push(move || {
211 let _ = sender.send(panic::catch_unwind(panic::AssertUnwindSafe(func)));
212 })?;
213
214 Ok(async move { receiver.await.expect("Dropped before executing") })
215 }
216
217 /// Sets the maximal allowed number of threads for @self.
218 /// A value of -1 means that the maximal number of threads
219 /// is unlimited. If @self is an exclusive thread pool, setting
220 /// the maximal number of threads to -1 is not allowed.
221 ///
222 /// Setting @max_threads to 0 means stopping all work for @self.
223 /// It is effectively frozen until @max_threads is set to a non-zero
224 /// value again.
225 ///
226 /// A thread is never terminated while calling @func, as supplied by
227 /// g_thread_pool_new(). Instead the maximal number of threads only
228 /// has effect for the allocation of new threads in g_thread_pool_push().
229 /// A new thread is allocated, whenever the number of currently
230 /// running threads in @self is smaller than the maximal number.
231 ///
232 /// @error can be [`None`] to ignore errors, or non-[`None`] to report
233 /// errors. An error can only occur when a new thread couldn't be
234 /// created.
235 ///
236 /// Before version 2.32, this function did not return a success status.
237 /// ## `max_threads`
238 /// a new maximal number of threads for @self,
239 /// or -1 for unlimited
240 ///
241 /// # Returns
242 ///
243 /// [`true`] on success, [`false`] if an error occurred
244 // rustdoc-stripper-ignore-next-stop
245 /// Sets the maximal allowed number of threads for @self.
246 /// A value of -1 means that the maximal number of threads
247 /// is unlimited. If @self is an exclusive thread pool, setting
248 /// the maximal number of threads to -1 is not allowed.
249 ///
250 /// Setting @max_threads to 0 means stopping all work for @self.
251 /// It is effectively frozen until @max_threads is set to a non-zero
252 /// value again.
253 ///
254 /// A thread is never terminated while calling @func, as supplied by
255 /// g_thread_pool_new(). Instead the maximal number of threads only
256 /// has effect for the allocation of new threads in g_thread_pool_push().
257 /// A new thread is allocated, whenever the number of currently
258 /// running threads in @self is smaller than the maximal number.
259 ///
260 /// @error can be [`None`] to ignore errors, or non-[`None`] to report
261 /// errors. An error can only occur when a new thread couldn't be
262 /// created.
263 ///
264 /// Before version 2.32, this function did not return a success status.
265 /// ## `max_threads`
266 /// a new maximal number of threads for @self,
267 /// or -1 for unlimited
268 ///
269 /// # Returns
270 ///
271 /// [`true`] on success, [`false`] if an error occurred
272 #[doc(alias = "g_thread_pool_set_max_threads")]
273 pub fn set_max_threads(&self, max_threads: Option<u32>) -> Result<(), crate::Error> {
274 unsafe {
275 let mut err = ptr::null_mut();
276 let ret: bool = from_glib(ffi::g_thread_pool_set_max_threads(
277 self.0.as_ptr(),
278 max_threads.map(|v| v as i32).unwrap_or(-1),
279 &mut err,
280 ));
281 if ret {
282 Ok(())
283 } else {
284 Err(from_glib_full(err))
285 }
286 }
287 }
288
289 /// Returns the maximal number of threads for @self.
290 ///
291 /// # Returns
292 ///
293 /// the maximal number of threads
294 // rustdoc-stripper-ignore-next-stop
295 /// Returns the maximal number of threads for @self.
296 ///
297 /// # Returns
298 ///
299 /// the maximal number of threads
300 #[doc(alias = "g_thread_pool_get_max_threads")]
301 #[doc(alias = "get_max_threads")]
302 pub fn max_threads(&self) -> Option<u32> {
303 unsafe {
304 let max_threads = ffi::g_thread_pool_get_max_threads(self.0.as_ptr());
305 if max_threads == -1 {
306 None
307 } else {
308 Some(max_threads as u32)
309 }
310 }
311 }
312
313 /// Returns the number of threads currently running in @self.
314 ///
315 /// # Returns
316 ///
317 /// the number of threads currently running
318 // rustdoc-stripper-ignore-next-stop
319 /// Returns the number of threads currently running in @self.
320 ///
321 /// # Returns
322 ///
323 /// the number of threads currently running
324 #[doc(alias = "g_thread_pool_get_num_threads")]
325 #[doc(alias = "get_num_threads")]
326 pub fn num_threads(&self) -> u32 {
327 unsafe { ffi::g_thread_pool_get_num_threads(self.0.as_ptr()) }
328 }
329
330 /// Returns the number of tasks still unprocessed in @self.
331 ///
332 /// # Returns
333 ///
334 /// the number of unprocessed tasks
335 // rustdoc-stripper-ignore-next-stop
336 /// Returns the number of tasks still unprocessed in @self.
337 ///
338 /// # Returns
339 ///
340 /// the number of unprocessed tasks
341 #[doc(alias = "g_thread_pool_unprocessed")]
342 #[doc(alias = "get_unprocessed")]
343 pub fn unprocessed(&self) -> u32 {
344 unsafe { ffi::g_thread_pool_unprocessed(self.0.as_ptr()) }
345 }
346
347 /// Sets the maximal number of unused threads to @max_threads.
348 /// If @max_threads is -1, no limit is imposed on the number
349 /// of unused threads.
350 ///
351 /// The default value is 8 since GLib 2.84. Previously the default value was 2.
352 /// ## `max_threads`
353 /// maximal number of unused threads
354 // rustdoc-stripper-ignore-next-stop
355 /// Sets the maximal number of unused threads to @max_threads.
356 /// If @max_threads is -1, no limit is imposed on the number
357 /// of unused threads.
358 ///
359 /// The default value is 8 since GLib 2.84. Previously the default value was 2.
360 /// ## `max_threads`
361 /// maximal number of unused threads
362 #[doc(alias = "g_thread_pool_set_max_unused_threads")]
363 pub fn set_max_unused_threads(max_threads: Option<u32>) {
364 unsafe {
365 ffi::g_thread_pool_set_max_unused_threads(max_threads.map(|v| v as i32).unwrap_or(-1))
366 }
367 }
368
369 /// Returns the maximal allowed number of unused threads.
370 ///
371 /// # Returns
372 ///
373 /// the maximal number of unused threads
374 // rustdoc-stripper-ignore-next-stop
375 /// Returns the maximal allowed number of unused threads.
376 ///
377 /// # Returns
378 ///
379 /// the maximal number of unused threads
380 #[doc(alias = "g_thread_pool_get_max_unused_threads")]
381 #[doc(alias = "get_max_unused_threads")]
382 pub fn max_unused_threads() -> Option<u32> {
383 unsafe {
384 let max_unused_threads = ffi::g_thread_pool_get_max_unused_threads();
385 if max_unused_threads == -1 {
386 None
387 } else {
388 Some(max_unused_threads as u32)
389 }
390 }
391 }
392
393 /// Returns the number of currently unused threads.
394 ///
395 /// # Returns
396 ///
397 /// the number of currently unused threads
398 // rustdoc-stripper-ignore-next-stop
399 /// Returns the number of currently unused threads.
400 ///
401 /// # Returns
402 ///
403 /// the number of currently unused threads
404 #[doc(alias = "g_thread_pool_get_num_unused_threads")]
405 #[doc(alias = "get_num_unused_threads")]
406 pub fn num_unused_threads() -> u32 {
407 unsafe { ffi::g_thread_pool_get_num_unused_threads() }
408 }
409
410 /// Stops all currently unused threads. This does not change the
411 /// maximal number of unused threads. This function can be used to
412 /// regularly stop all unused threads e.g. from g_timeout_add().
413 // rustdoc-stripper-ignore-next-stop
414 /// Stops all currently unused threads. This does not change the
415 /// maximal number of unused threads. This function can be used to
416 /// regularly stop all unused threads e.g. from g_timeout_add().
417 #[doc(alias = "g_thread_pool_stop_unused_threads")]
418 pub fn stop_unused_threads() {
419 unsafe {
420 ffi::g_thread_pool_stop_unused_threads();
421 }
422 }
423
424 /// This function will set the maximum @interval that a thread
425 /// waiting in the pool for new tasks can be idle for before
426 /// being stopped. This function is similar to calling
427 /// g_thread_pool_stop_unused_threads() on a regular timeout,
428 /// except this is done on a per thread basis.
429 ///
430 /// By setting @interval to 0, idle threads will not be stopped.
431 ///
432 /// The default value is 15000 (15 seconds).
433 /// ## `interval`
434 /// the maximum @interval (in milliseconds)
435 /// a thread can be idle
436 // rustdoc-stripper-ignore-next-stop
437 /// This function will set the maximum @interval that a thread
438 /// waiting in the pool for new tasks can be idle for before
439 /// being stopped. This function is similar to calling
440 /// g_thread_pool_stop_unused_threads() on a regular timeout,
441 /// except this is done on a per thread basis.
442 ///
443 /// By setting @interval to 0, idle threads will not be stopped.
444 ///
445 /// The default value is 15000 (15 seconds).
446 /// ## `interval`
447 /// the maximum @interval (in milliseconds)
448 /// a thread can be idle
449 #[doc(alias = "g_thread_pool_set_max_idle_time")]
450 pub fn set_max_idle_time(max_idle_time: u32) {
451 unsafe { ffi::g_thread_pool_set_max_idle_time(max_idle_time) }
452 }
453
454 /// This function will return the maximum @interval that a
455 /// thread will wait in the thread pool for new tasks before
456 /// being stopped.
457 ///
458 /// If this function returns 0, threads waiting in the thread
459 /// pool for new work are not stopped.
460 ///
461 /// # Returns
462 ///
463 /// the maximum @interval (milliseconds) to wait
464 /// for new tasks in the thread pool before stopping the
465 /// thread
466 // rustdoc-stripper-ignore-next-stop
467 /// This function will return the maximum @interval that a
468 /// thread will wait in the thread pool for new tasks before
469 /// being stopped.
470 ///
471 /// If this function returns 0, threads waiting in the thread
472 /// pool for new work are not stopped.
473 ///
474 /// # Returns
475 ///
476 /// the maximum @interval (milliseconds) to wait
477 /// for new tasks in the thread pool before stopping the
478 /// thread
479 #[doc(alias = "g_thread_pool_get_max_idle_time")]
480 #[doc(alias = "get_max_idle_time")]
481 pub fn max_idle_time() -> u32 {
482 unsafe { ffi::g_thread_pool_get_max_idle_time() }
483 }
484}
485
486impl Drop for ThreadPool {
487 #[inline]
488 fn drop(&mut self) {
489 unsafe {
490 ffi::g_thread_pool_free(self.0.as_ptr(), ffi::GFALSE, ffi::GTRUE);
491 }
492 }
493}
494
495unsafe extern "C" fn spawn_func(func: ffi::gpointer, _data: ffi::gpointer) {
496 unsafe {
497 let func: Box<Box<dyn FnOnce()>> = Box::from_raw(func as *mut _);
498 func()
499 }
500}
501
502#[cfg(test)]
503mod tests {
504 use super::*;
505
506 #[test]
507 fn test_push() {
508 use std::sync::mpsc;
509
510 let p = ThreadPool::exclusive(1).unwrap();
511 let (sender, receiver) = mpsc::channel();
512
513 let handle = p
514 .push(move || {
515 sender.send(true).unwrap();
516 123
517 })
518 .unwrap();
519
520 assert_eq!(handle.join().unwrap(), 123);
521 assert_eq!(receiver.recv(), Ok(true));
522 }
523
524 #[cfg(feature = "futures")]
525 #[test]
526 fn test_push_future() {
527 let c = crate::MainContext::new();
528 let p = ThreadPool::shared(None).unwrap();
529
530 let fut = p.push_future(|| true).unwrap();
531
532 let res = c.block_on(fut);
533 assert!(res.unwrap());
534 }
535}