glib/thread_pool.rs
1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{panic, ptr};
4
5#[cfg(feature = "futures")]
6use futures_channel::oneshot;
7#[cfg(feature = "futures")]
8use std::future::Future;
9
10use crate::{ffi, translate::*};
11
12/// The `GThreadPool` struct represents a thread pool.
13///
14/// A thread pool is useful when you wish to asynchronously fork out the execution of work
15/// and continue working in your own thread. If that will happen often, the overhead of starting
16/// and destroying a thread each time might be too high. In such cases reusing already started
17/// threads seems like a good idea. And it indeed is, but implementing this can be tedious
18/// and error-prone.
19///
20/// Therefore GLib provides thread pools for your convenience. An added advantage is, that the
21/// threads can be shared between the different subsystems of your program, when they are using GLib.
22///
23/// To create a new thread pool, you use [`new()`][Self::new()].
24/// It is destroyed by `GLib::ThreadPool::free()`.
25///
26/// If you want to execute a certain task within a thread pool, use [`push()`][Self::push()].
27///
28/// To get the current number of running threads you call [`num_threads()`][Self::num_threads()].
29/// To get the number of still unprocessed tasks you call [`unprocessed()`][Self::unprocessed()].
30/// To control the maximum number of threads for a thread pool, you use
31/// [`max_threads()`][Self::max_threads()]. and [`set_max_threads()`][Self::set_max_threads()].
32///
33/// Finally you can control the number of unused threads, that are kept alive by GLib for future use.
34/// The current number can be fetched with [`num_unused_threads()`][Self::num_unused_threads()].
35/// The maximum number can be controlled by [`max_unused_threads()`][Self::max_unused_threads()] and
36/// [`set_max_unused_threads()`][Self::set_max_unused_threads()]. All currently unused threads
37/// can be stopped by calling [`stop_unused_threads()`][Self::stop_unused_threads()].
38// rustdoc-stripper-ignore-next-stop
39/// The `GThreadPool` struct represents a thread pool.
40///
41/// A thread pool is useful when you wish to asynchronously fork out the execution of work
42/// and continue working in your own thread. If that will happen often, the overhead of starting
43/// and destroying a thread each time might be too high. In such cases reusing already started
44/// threads seems like a good idea. And it indeed is, but implementing this can be tedious
45/// and error-prone.
46///
47/// Therefore GLib provides thread pools for your convenience. An added advantage is, that the
48/// threads can be shared between the different subsystems of your program, when they are using GLib.
49///
50/// To create a new thread pool, you use [`new()`][Self::new()].
51/// It is destroyed by `GLib::ThreadPool::free()`.
52///
53/// If you want to execute a certain task within a thread pool, use [`push()`][Self::push()].
54///
55/// To get the current number of running threads you call [`num_threads()`][Self::num_threads()].
56/// To get the number of still unprocessed tasks you call [`unprocessed()`][Self::unprocessed()].
57/// To control the maximum number of threads for a thread pool, you use
58/// [`max_threads()`][Self::max_threads()]. and [`set_max_threads()`][Self::set_max_threads()].
59///
60/// Finally you can control the number of unused threads, that are kept alive by GLib for future use.
61/// The current number can be fetched with [`num_unused_threads()`][Self::num_unused_threads()].
62/// The maximum number can be controlled by [`max_unused_threads()`][Self::max_unused_threads()] and
63/// [`set_max_unused_threads()`][Self::set_max_unused_threads()]. All currently unused threads
64/// can be stopped by calling [`stop_unused_threads()`][Self::stop_unused_threads()].
65#[derive(Debug)]
66#[doc(alias = "GThreadPool")]
67pub struct ThreadPool(ptr::NonNull<ffi::GThreadPool>);
68
69unsafe impl Send for ThreadPool {}
70unsafe impl Sync for ThreadPool {}
71
72// rustdoc-stripper-ignore-next
73/// A handle to a thread running on a [`ThreadPool`].
74///
75/// Like [`std::thread::JoinHandle`] for a GLib thread. The return value from the task can be
76/// retrieved by calling [`ThreadHandle::join`]. Dropping the handle "detaches" the thread,
77/// allowing it to complete but discarding the return value.
78#[derive(Debug)]
79pub struct ThreadHandle<T> {
80 rx: std::sync::mpsc::Receiver<std::thread::Result<T>>,
81}
82
83impl<T> ThreadHandle<T> {
84 // rustdoc-stripper-ignore-next
85 /// Waits for the associated thread to finish.
86 ///
87 /// Blocks until the associated thread returns. Returns `Ok` with the value returned from the
88 /// thread, or `Err` if the thread panicked. This function will return immediately if the
89 /// associated thread has already finished.
90 #[inline]
91 pub fn join(self) -> std::thread::Result<T> {
92 self.rx.recv().unwrap()
93 }
94}
95
96impl ThreadPool {
97 #[doc(alias = "g_thread_pool_new")]
98 pub fn shared(max_threads: Option<u32>) -> Result<Self, crate::Error> {
99 unsafe {
100 let mut err = ptr::null_mut();
101 let pool = ffi::g_thread_pool_new(
102 Some(spawn_func),
103 ptr::null_mut(),
104 max_threads.map(|v| v as i32).unwrap_or(-1),
105 ffi::GFALSE,
106 &mut err,
107 );
108 if pool.is_null() {
109 Err(from_glib_full(err))
110 } else {
111 Ok(ThreadPool(ptr::NonNull::new_unchecked(pool)))
112 }
113 }
114 }
115
116 #[doc(alias = "g_thread_pool_new")]
117 pub fn exclusive(max_threads: u32) -> Result<Self, crate::Error> {
118 unsafe {
119 let mut err = ptr::null_mut();
120 let pool = ffi::g_thread_pool_new(
121 Some(spawn_func),
122 ptr::null_mut(),
123 max_threads as i32,
124 ffi::GTRUE,
125 &mut err,
126 );
127 if pool.is_null() {
128 Err(from_glib_full(err))
129 } else {
130 Ok(ThreadPool(ptr::NonNull::new_unchecked(pool)))
131 }
132 }
133 }
134
135 /// Inserts @data into the list of tasks to be executed by @self.
136 ///
137 /// When the number of currently running threads is lower than the
138 /// maximal allowed number of threads, a new thread is started (or
139 /// reused) with the properties given to g_thread_pool_new().
140 /// Otherwise, @data stays in the queue until a thread in this pool
141 /// finishes its previous task and processes @data.
142 ///
143 /// @error can be [`None`] to ignore errors, or non-[`None`] to report
144 /// errors. An error can only occur when a new thread couldn't be
145 /// created. In that case @data is simply appended to the queue of
146 /// work to do.
147 ///
148 /// Before version 2.32, this function did not return a success status.
149 ///
150 /// # Returns
151 ///
152 /// [`true`] on success, [`false`] if an error occurred
153 // rustdoc-stripper-ignore-next-stop
154 /// Inserts @data into the list of tasks to be executed by @self.
155 ///
156 /// When the number of currently running threads is lower than the
157 /// maximal allowed number of threads, a new thread is started (or
158 /// reused) with the properties given to g_thread_pool_new().
159 /// Otherwise, @data stays in the queue until a thread in this pool
160 /// finishes its previous task and processes @data.
161 ///
162 /// @error can be [`None`] to ignore errors, or non-[`None`] to report
163 /// errors. An error can only occur when a new thread couldn't be
164 /// created. In that case @data is simply appended to the queue of
165 /// work to do.
166 ///
167 /// Before version 2.32, this function did not return a success status.
168 ///
169 /// # Returns
170 ///
171 /// [`true`] on success, [`false`] if an error occurred
172 #[doc(alias = "g_thread_pool_push")]
173 pub fn push<T: Send + 'static, F: FnOnce() -> T + Send + 'static>(
174 &self,
175 func: F,
176 ) -> Result<ThreadHandle<T>, crate::Error> {
177 let (tx, rx) = std::sync::mpsc::sync_channel(1);
178 unsafe {
179 let func: Box<dyn FnOnce() + Send + 'static> = Box::new(move || {
180 let _ = tx.send(panic::catch_unwind(panic::AssertUnwindSafe(func)));
181 });
182 let func = Box::new(func);
183 let mut err = ptr::null_mut();
184
185 let func = Box::into_raw(func);
186 let ret: bool = from_glib(ffi::g_thread_pool_push(
187 self.0.as_ptr(),
188 func as *mut _,
189 &mut err,
190 ));
191 if ret {
192 Ok(ThreadHandle { rx })
193 } else {
194 let _ = Box::from_raw(func);
195 Err(from_glib_full(err))
196 }
197 }
198 }
199
200 #[cfg(feature = "futures")]
201 pub fn push_future<T: Send + 'static, F: FnOnce() -> T + Send + 'static>(
202 &self,
203 func: F,
204 ) -> Result<impl Future<Output = std::thread::Result<T>> + Send + Sync + 'static, crate::Error>
205 {
206 let (sender, receiver) = oneshot::channel();
207
208 self.push(move || {
209 let _ = sender.send(panic::catch_unwind(panic::AssertUnwindSafe(func)));
210 })?;
211
212 Ok(async move { receiver.await.expect("Dropped before executing") })
213 }
214
215 /// Sets the maximal allowed number of threads for @self.
216 /// A value of -1 means that the maximal number of threads
217 /// is unlimited. If @self is an exclusive thread pool, setting
218 /// the maximal number of threads to -1 is not allowed.
219 ///
220 /// Setting @max_threads to 0 means stopping all work for @self.
221 /// It is effectively frozen until @max_threads is set to a non-zero
222 /// value again.
223 ///
224 /// A thread is never terminated while calling @func, as supplied by
225 /// g_thread_pool_new(). Instead the maximal number of threads only
226 /// has effect for the allocation of new threads in g_thread_pool_push().
227 /// A new thread is allocated, whenever the number of currently
228 /// running threads in @self is smaller than the maximal number.
229 ///
230 /// @error can be [`None`] to ignore errors, or non-[`None`] to report
231 /// errors. An error can only occur when a new thread couldn't be
232 /// created.
233 ///
234 /// Before version 2.32, this function did not return a success status.
235 /// ## `max_threads`
236 /// a new maximal number of threads for @self,
237 /// or -1 for unlimited
238 ///
239 /// # Returns
240 ///
241 /// [`true`] on success, [`false`] if an error occurred
242 // rustdoc-stripper-ignore-next-stop
243 /// Sets the maximal allowed number of threads for @self.
244 /// A value of -1 means that the maximal number of threads
245 /// is unlimited. If @self is an exclusive thread pool, setting
246 /// the maximal number of threads to -1 is not allowed.
247 ///
248 /// Setting @max_threads to 0 means stopping all work for @self.
249 /// It is effectively frozen until @max_threads is set to a non-zero
250 /// value again.
251 ///
252 /// A thread is never terminated while calling @func, as supplied by
253 /// g_thread_pool_new(). Instead the maximal number of threads only
254 /// has effect for the allocation of new threads in g_thread_pool_push().
255 /// A new thread is allocated, whenever the number of currently
256 /// running threads in @self is smaller than the maximal number.
257 ///
258 /// @error can be [`None`] to ignore errors, or non-[`None`] to report
259 /// errors. An error can only occur when a new thread couldn't be
260 /// created.
261 ///
262 /// Before version 2.32, this function did not return a success status.
263 /// ## `max_threads`
264 /// a new maximal number of threads for @self,
265 /// or -1 for unlimited
266 ///
267 /// # Returns
268 ///
269 /// [`true`] on success, [`false`] if an error occurred
270 #[doc(alias = "g_thread_pool_set_max_threads")]
271 pub fn set_max_threads(&self, max_threads: Option<u32>) -> Result<(), crate::Error> {
272 unsafe {
273 let mut err = ptr::null_mut();
274 let ret: bool = from_glib(ffi::g_thread_pool_set_max_threads(
275 self.0.as_ptr(),
276 max_threads.map(|v| v as i32).unwrap_or(-1),
277 &mut err,
278 ));
279 if ret {
280 Ok(())
281 } else {
282 Err(from_glib_full(err))
283 }
284 }
285 }
286
287 /// Returns the maximal number of threads for @self.
288 ///
289 /// # Returns
290 ///
291 /// the maximal number of threads
292 // rustdoc-stripper-ignore-next-stop
293 /// Returns the maximal number of threads for @self.
294 ///
295 /// # Returns
296 ///
297 /// the maximal number of threads
298 #[doc(alias = "g_thread_pool_get_max_threads")]
299 #[doc(alias = "get_max_threads")]
300 pub fn max_threads(&self) -> Option<u32> {
301 unsafe {
302 let max_threads = ffi::g_thread_pool_get_max_threads(self.0.as_ptr());
303 if max_threads == -1 {
304 None
305 } else {
306 Some(max_threads as u32)
307 }
308 }
309 }
310
311 /// Returns the number of threads currently running in @self.
312 ///
313 /// # Returns
314 ///
315 /// the number of threads currently running
316 // rustdoc-stripper-ignore-next-stop
317 /// Returns the number of threads currently running in @self.
318 ///
319 /// # Returns
320 ///
321 /// the number of threads currently running
322 #[doc(alias = "g_thread_pool_get_num_threads")]
323 #[doc(alias = "get_num_threads")]
324 pub fn num_threads(&self) -> u32 {
325 unsafe { ffi::g_thread_pool_get_num_threads(self.0.as_ptr()) }
326 }
327
328 /// Returns the number of tasks still unprocessed in @self.
329 ///
330 /// # Returns
331 ///
332 /// the number of unprocessed tasks
333 // rustdoc-stripper-ignore-next-stop
334 /// Returns the number of tasks still unprocessed in @self.
335 ///
336 /// # Returns
337 ///
338 /// the number of unprocessed tasks
339 #[doc(alias = "g_thread_pool_unprocessed")]
340 #[doc(alias = "get_unprocessed")]
341 pub fn unprocessed(&self) -> u32 {
342 unsafe { ffi::g_thread_pool_unprocessed(self.0.as_ptr()) }
343 }
344
345 /// Sets the maximal number of unused threads to @max_threads.
346 /// If @max_threads is -1, no limit is imposed on the number
347 /// of unused threads.
348 ///
349 /// The default value is 8 since GLib 2.84. Previously the default value was 2.
350 /// ## `max_threads`
351 /// maximal number of unused threads
352 // rustdoc-stripper-ignore-next-stop
353 /// Sets the maximal number of unused threads to @max_threads.
354 /// If @max_threads is -1, no limit is imposed on the number
355 /// of unused threads.
356 ///
357 /// The default value is 8 since GLib 2.84. Previously the default value was 2.
358 /// ## `max_threads`
359 /// maximal number of unused threads
360 #[doc(alias = "g_thread_pool_set_max_unused_threads")]
361 pub fn set_max_unused_threads(max_threads: Option<u32>) {
362 unsafe {
363 ffi::g_thread_pool_set_max_unused_threads(max_threads.map(|v| v as i32).unwrap_or(-1))
364 }
365 }
366
367 /// Returns the maximal allowed number of unused threads.
368 ///
369 /// # Returns
370 ///
371 /// the maximal number of unused threads
372 // rustdoc-stripper-ignore-next-stop
373 /// Returns the maximal allowed number of unused threads.
374 ///
375 /// # Returns
376 ///
377 /// the maximal number of unused threads
378 #[doc(alias = "g_thread_pool_get_max_unused_threads")]
379 #[doc(alias = "get_max_unused_threads")]
380 pub fn max_unused_threads() -> Option<u32> {
381 unsafe {
382 let max_unused_threads = ffi::g_thread_pool_get_max_unused_threads();
383 if max_unused_threads == -1 {
384 None
385 } else {
386 Some(max_unused_threads as u32)
387 }
388 }
389 }
390
391 /// Returns the number of currently unused threads.
392 ///
393 /// # Returns
394 ///
395 /// the number of currently unused threads
396 // rustdoc-stripper-ignore-next-stop
397 /// Returns the number of currently unused threads.
398 ///
399 /// # Returns
400 ///
401 /// the number of currently unused threads
402 #[doc(alias = "g_thread_pool_get_num_unused_threads")]
403 #[doc(alias = "get_num_unused_threads")]
404 pub fn num_unused_threads() -> u32 {
405 unsafe { ffi::g_thread_pool_get_num_unused_threads() }
406 }
407
408 /// Stops all currently unused threads. This does not change the
409 /// maximal number of unused threads. This function can be used to
410 /// regularly stop all unused threads e.g. from g_timeout_add().
411 // rustdoc-stripper-ignore-next-stop
412 /// Stops all currently unused threads. This does not change the
413 /// maximal number of unused threads. This function can be used to
414 /// regularly stop all unused threads e.g. from g_timeout_add().
415 #[doc(alias = "g_thread_pool_stop_unused_threads")]
416 pub fn stop_unused_threads() {
417 unsafe {
418 ffi::g_thread_pool_stop_unused_threads();
419 }
420 }
421
422 /// This function will set the maximum @interval that a thread
423 /// waiting in the pool for new tasks can be idle for before
424 /// being stopped. This function is similar to calling
425 /// g_thread_pool_stop_unused_threads() on a regular timeout,
426 /// except this is done on a per thread basis.
427 ///
428 /// By setting @interval to 0, idle threads will not be stopped.
429 ///
430 /// The default value is 15000 (15 seconds).
431 /// ## `interval`
432 /// the maximum @interval (in milliseconds)
433 /// a thread can be idle
434 // rustdoc-stripper-ignore-next-stop
435 /// This function will set the maximum @interval that a thread
436 /// waiting in the pool for new tasks can be idle for before
437 /// being stopped. This function is similar to calling
438 /// g_thread_pool_stop_unused_threads() on a regular timeout,
439 /// except this is done on a per thread basis.
440 ///
441 /// By setting @interval to 0, idle threads will not be stopped.
442 ///
443 /// The default value is 15000 (15 seconds).
444 /// ## `interval`
445 /// the maximum @interval (in milliseconds)
446 /// a thread can be idle
447 #[doc(alias = "g_thread_pool_set_max_idle_time")]
448 pub fn set_max_idle_time(max_idle_time: u32) {
449 unsafe { ffi::g_thread_pool_set_max_idle_time(max_idle_time) }
450 }
451
452 /// This function will return the maximum @interval that a
453 /// thread will wait in the thread pool for new tasks before
454 /// being stopped.
455 ///
456 /// If this function returns 0, threads waiting in the thread
457 /// pool for new work are not stopped.
458 ///
459 /// # Returns
460 ///
461 /// the maximum @interval (milliseconds) to wait
462 /// for new tasks in the thread pool before stopping the
463 /// thread
464 // rustdoc-stripper-ignore-next-stop
465 /// This function will return the maximum @interval that a
466 /// thread will wait in the thread pool for new tasks before
467 /// being stopped.
468 ///
469 /// If this function returns 0, threads waiting in the thread
470 /// pool for new work are not stopped.
471 ///
472 /// # Returns
473 ///
474 /// the maximum @interval (milliseconds) to wait
475 /// for new tasks in the thread pool before stopping the
476 /// thread
477 #[doc(alias = "g_thread_pool_get_max_idle_time")]
478 #[doc(alias = "get_max_idle_time")]
479 pub fn max_idle_time() -> u32 {
480 unsafe { ffi::g_thread_pool_get_max_idle_time() }
481 }
482}
483
484impl Drop for ThreadPool {
485 #[inline]
486 fn drop(&mut self) {
487 unsafe {
488 ffi::g_thread_pool_free(self.0.as_ptr(), ffi::GFALSE, ffi::GTRUE);
489 }
490 }
491}
492
493unsafe extern "C" fn spawn_func(func: ffi::gpointer, _data: ffi::gpointer) {
494 let func: Box<Box<dyn FnOnce()>> = Box::from_raw(func as *mut _);
495 func()
496}
497
498#[cfg(test)]
499mod tests {
500 use super::*;
501
502 #[test]
503 fn test_push() {
504 use std::sync::mpsc;
505
506 let p = ThreadPool::exclusive(1).unwrap();
507 let (sender, receiver) = mpsc::channel();
508
509 let handle = p
510 .push(move || {
511 sender.send(true).unwrap();
512 123
513 })
514 .unwrap();
515
516 assert_eq!(handle.join().unwrap(), 123);
517 assert_eq!(receiver.recv(), Ok(true));
518 }
519
520 #[cfg(feature = "futures")]
521 #[test]
522 fn test_push_future() {
523 let c = crate::MainContext::new();
524 let p = ThreadPool::shared(None).unwrap();
525
526 let fut = p.push_future(|| true).unwrap();
527
528 let res = c.block_on(fut);
529 assert!(res.unwrap());
530 }
531}