glib/thread_pool.rs
1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{future::Future, panic, ptr};
4
5use futures_channel::oneshot;
6
7use crate::{ffi, translate::*};
8
9/// The `GThreadPool` struct represents a thread pool.
10///
11/// A thread pool is useful when you wish to asynchronously fork out the execution of work
12/// and continue working in your own thread. If that will happen often, the overhead of starting
13/// and destroying a thread each time might be too high. In such cases reusing already started
14/// threads seems like a good idea. And it indeed is, but implementing this can be tedious
15/// and error-prone.
16///
17/// Therefore GLib provides thread pools for your convenience. An added advantage is, that the
18/// threads can be shared between the different subsystems of your program, when they are using GLib.
19///
20/// To create a new thread pool, you use [`new()`][Self::new()].
21/// It is destroyed by `GLib::ThreadPool::free()`.
22///
23/// If you want to execute a certain task within a thread pool, use [`push()`][Self::push()].
24///
25/// To get the current number of running threads you call [`num_threads()`][Self::num_threads()].
26/// To get the number of still unprocessed tasks you call [`unprocessed()`][Self::unprocessed()].
27/// To control the maximum number of threads for a thread pool, you use
28/// [`max_threads()`][Self::max_threads()]. and [`set_max_threads()`][Self::set_max_threads()].
29///
30/// Finally you can control the number of unused threads, that are kept alive by GLib for future use.
31/// The current number can be fetched with [`num_unused_threads()`][Self::num_unused_threads()].
32/// The maximum number can be controlled by [`max_unused_threads()`][Self::max_unused_threads()] and
33/// [`set_max_unused_threads()`][Self::set_max_unused_threads()]. All currently unused threads
34/// can be stopped by calling [`stop_unused_threads()`][Self::stop_unused_threads()].
35// rustdoc-stripper-ignore-next-stop
36/// The `GThreadPool` struct represents a thread pool.
37///
38/// A thread pool is useful when you wish to asynchronously fork out the execution of work
39/// and continue working in your own thread. If that will happen often, the overhead of starting
40/// and destroying a thread each time might be too high. In such cases reusing already started
41/// threads seems like a good idea. And it indeed is, but implementing this can be tedious
42/// and error-prone.
43///
44/// Therefore GLib provides thread pools for your convenience. An added advantage is, that the
45/// threads can be shared between the different subsystems of your program, when they are using GLib.
46///
47/// To create a new thread pool, you use [`new()`][Self::new()].
48/// It is destroyed by `GLib::ThreadPool::free()`.
49///
50/// If you want to execute a certain task within a thread pool, use [`push()`][Self::push()].
51///
52/// To get the current number of running threads you call [`num_threads()`][Self::num_threads()].
53/// To get the number of still unprocessed tasks you call [`unprocessed()`][Self::unprocessed()].
54/// To control the maximum number of threads for a thread pool, you use
55/// [`max_threads()`][Self::max_threads()]. and [`set_max_threads()`][Self::set_max_threads()].
56///
57/// Finally you can control the number of unused threads, that are kept alive by GLib for future use.
58/// The current number can be fetched with [`num_unused_threads()`][Self::num_unused_threads()].
59/// The maximum number can be controlled by [`max_unused_threads()`][Self::max_unused_threads()] and
60/// [`set_max_unused_threads()`][Self::set_max_unused_threads()]. All currently unused threads
61/// can be stopped by calling [`stop_unused_threads()`][Self::stop_unused_threads()].
62#[derive(Debug)]
63#[doc(alias = "GThreadPool")]
64pub struct ThreadPool(ptr::NonNull<ffi::GThreadPool>);
65
66unsafe impl Send for ThreadPool {}
67unsafe impl Sync for ThreadPool {}
68
69// rustdoc-stripper-ignore-next
70/// A handle to a thread running on a [`ThreadPool`].
71///
72/// Like [`std::thread::JoinHandle`] for a GLib thread. The return value from the task can be
73/// retrieved by calling [`ThreadHandle::join`]. Dropping the handle "detaches" the thread,
74/// allowing it to complete but discarding the return value.
75#[derive(Debug)]
76pub struct ThreadHandle<T> {
77 rx: std::sync::mpsc::Receiver<std::thread::Result<T>>,
78}
79
80impl<T> ThreadHandle<T> {
81 // rustdoc-stripper-ignore-next
82 /// Waits for the associated thread to finish.
83 ///
84 /// Blocks until the associated thread returns. Returns `Ok` with the value returned from the
85 /// thread, or `Err` if the thread panicked. This function will return immediately if the
86 /// associated thread has already finished.
87 #[inline]
88 pub fn join(self) -> std::thread::Result<T> {
89 self.rx.recv().unwrap()
90 }
91}
92
93impl ThreadPool {
94 #[doc(alias = "g_thread_pool_new")]
95 pub fn shared(max_threads: Option<u32>) -> Result<Self, crate::Error> {
96 unsafe {
97 let mut err = ptr::null_mut();
98 let pool = ffi::g_thread_pool_new(
99 Some(spawn_func),
100 ptr::null_mut(),
101 max_threads.map(|v| v as i32).unwrap_or(-1),
102 ffi::GFALSE,
103 &mut err,
104 );
105 if pool.is_null() {
106 Err(from_glib_full(err))
107 } else {
108 Ok(ThreadPool(ptr::NonNull::new_unchecked(pool)))
109 }
110 }
111 }
112
113 #[doc(alias = "g_thread_pool_new")]
114 pub fn exclusive(max_threads: u32) -> Result<Self, crate::Error> {
115 unsafe {
116 let mut err = ptr::null_mut();
117 let pool = ffi::g_thread_pool_new(
118 Some(spawn_func),
119 ptr::null_mut(),
120 max_threads as i32,
121 ffi::GTRUE,
122 &mut err,
123 );
124 if pool.is_null() {
125 Err(from_glib_full(err))
126 } else {
127 Ok(ThreadPool(ptr::NonNull::new_unchecked(pool)))
128 }
129 }
130 }
131
132 /// Inserts @data into the list of tasks to be executed by @self.
133 ///
134 /// When the number of currently running threads is lower than the
135 /// maximal allowed number of threads, a new thread is started (or
136 /// reused) with the properties given to g_thread_pool_new().
137 /// Otherwise, @data stays in the queue until a thread in this pool
138 /// finishes its previous task and processes @data.
139 ///
140 /// @error can be [`None`] to ignore errors, or non-[`None`] to report
141 /// errors. An error can only occur when a new thread couldn't be
142 /// created. In that case @data is simply appended to the queue of
143 /// work to do.
144 ///
145 /// Before version 2.32, this function did not return a success status.
146 ///
147 /// # Returns
148 ///
149 /// [`true`] on success, [`false`] if an error occurred
150 // rustdoc-stripper-ignore-next-stop
151 /// Inserts @data into the list of tasks to be executed by @self.
152 ///
153 /// When the number of currently running threads is lower than the
154 /// maximal allowed number of threads, a new thread is started (or
155 /// reused) with the properties given to g_thread_pool_new().
156 /// Otherwise, @data stays in the queue until a thread in this pool
157 /// finishes its previous task and processes @data.
158 ///
159 /// @error can be [`None`] to ignore errors, or non-[`None`] to report
160 /// errors. An error can only occur when a new thread couldn't be
161 /// created. In that case @data is simply appended to the queue of
162 /// work to do.
163 ///
164 /// Before version 2.32, this function did not return a success status.
165 ///
166 /// # Returns
167 ///
168 /// [`true`] on success, [`false`] if an error occurred
169 #[doc(alias = "g_thread_pool_push")]
170 pub fn push<T: Send + 'static, F: FnOnce() -> T + Send + 'static>(
171 &self,
172 func: F,
173 ) -> Result<ThreadHandle<T>, crate::Error> {
174 let (tx, rx) = std::sync::mpsc::sync_channel(1);
175 unsafe {
176 let func: Box<dyn FnOnce() + Send + 'static> = Box::new(move || {
177 let _ = tx.send(panic::catch_unwind(panic::AssertUnwindSafe(func)));
178 });
179 let func = Box::new(func);
180 let mut err = ptr::null_mut();
181
182 let func = Box::into_raw(func);
183 let ret: bool = from_glib(ffi::g_thread_pool_push(
184 self.0.as_ptr(),
185 func as *mut _,
186 &mut err,
187 ));
188 if ret {
189 Ok(ThreadHandle { rx })
190 } else {
191 let _ = Box::from_raw(func);
192 Err(from_glib_full(err))
193 }
194 }
195 }
196
197 pub fn push_future<T: Send + 'static, F: FnOnce() -> T + Send + 'static>(
198 &self,
199 func: F,
200 ) -> Result<impl Future<Output = std::thread::Result<T>> + Send + Sync + 'static, crate::Error>
201 {
202 let (sender, receiver) = oneshot::channel();
203
204 self.push(move || {
205 let _ = sender.send(panic::catch_unwind(panic::AssertUnwindSafe(func)));
206 })?;
207
208 Ok(async move { receiver.await.expect("Dropped before executing") })
209 }
210
211 /// Sets the maximal allowed number of threads for @self.
212 /// A value of -1 means that the maximal number of threads
213 /// is unlimited. If @self is an exclusive thread pool, setting
214 /// the maximal number of threads to -1 is not allowed.
215 ///
216 /// Setting @max_threads to 0 means stopping all work for @self.
217 /// It is effectively frozen until @max_threads is set to a non-zero
218 /// value again.
219 ///
220 /// A thread is never terminated while calling @func, as supplied by
221 /// g_thread_pool_new(). Instead the maximal number of threads only
222 /// has effect for the allocation of new threads in g_thread_pool_push().
223 /// A new thread is allocated, whenever the number of currently
224 /// running threads in @self is smaller than the maximal number.
225 ///
226 /// @error can be [`None`] to ignore errors, or non-[`None`] to report
227 /// errors. An error can only occur when a new thread couldn't be
228 /// created.
229 ///
230 /// Before version 2.32, this function did not return a success status.
231 /// ## `max_threads`
232 /// a new maximal number of threads for @self,
233 /// or -1 for unlimited
234 ///
235 /// # Returns
236 ///
237 /// [`true`] on success, [`false`] if an error occurred
238 // rustdoc-stripper-ignore-next-stop
239 /// Sets the maximal allowed number of threads for @self.
240 /// A value of -1 means that the maximal number of threads
241 /// is unlimited. If @self is an exclusive thread pool, setting
242 /// the maximal number of threads to -1 is not allowed.
243 ///
244 /// Setting @max_threads to 0 means stopping all work for @self.
245 /// It is effectively frozen until @max_threads is set to a non-zero
246 /// value again.
247 ///
248 /// A thread is never terminated while calling @func, as supplied by
249 /// g_thread_pool_new(). Instead the maximal number of threads only
250 /// has effect for the allocation of new threads in g_thread_pool_push().
251 /// A new thread is allocated, whenever the number of currently
252 /// running threads in @self is smaller than the maximal number.
253 ///
254 /// @error can be [`None`] to ignore errors, or non-[`None`] to report
255 /// errors. An error can only occur when a new thread couldn't be
256 /// created.
257 ///
258 /// Before version 2.32, this function did not return a success status.
259 /// ## `max_threads`
260 /// a new maximal number of threads for @self,
261 /// or -1 for unlimited
262 ///
263 /// # Returns
264 ///
265 /// [`true`] on success, [`false`] if an error occurred
266 #[doc(alias = "g_thread_pool_set_max_threads")]
267 pub fn set_max_threads(&self, max_threads: Option<u32>) -> Result<(), crate::Error> {
268 unsafe {
269 let mut err = ptr::null_mut();
270 let ret: bool = from_glib(ffi::g_thread_pool_set_max_threads(
271 self.0.as_ptr(),
272 max_threads.map(|v| v as i32).unwrap_or(-1),
273 &mut err,
274 ));
275 if ret {
276 Ok(())
277 } else {
278 Err(from_glib_full(err))
279 }
280 }
281 }
282
283 /// Returns the maximal number of threads for @self.
284 ///
285 /// # Returns
286 ///
287 /// the maximal number of threads
288 // rustdoc-stripper-ignore-next-stop
289 /// Returns the maximal number of threads for @self.
290 ///
291 /// # Returns
292 ///
293 /// the maximal number of threads
294 #[doc(alias = "g_thread_pool_get_max_threads")]
295 #[doc(alias = "get_max_threads")]
296 pub fn max_threads(&self) -> Option<u32> {
297 unsafe {
298 let max_threads = ffi::g_thread_pool_get_max_threads(self.0.as_ptr());
299 if max_threads == -1 {
300 None
301 } else {
302 Some(max_threads as u32)
303 }
304 }
305 }
306
307 /// Returns the number of threads currently running in @self.
308 ///
309 /// # Returns
310 ///
311 /// the number of threads currently running
312 // rustdoc-stripper-ignore-next-stop
313 /// Returns the number of threads currently running in @self.
314 ///
315 /// # Returns
316 ///
317 /// the number of threads currently running
318 #[doc(alias = "g_thread_pool_get_num_threads")]
319 #[doc(alias = "get_num_threads")]
320 pub fn num_threads(&self) -> u32 {
321 unsafe { ffi::g_thread_pool_get_num_threads(self.0.as_ptr()) }
322 }
323
324 /// Returns the number of tasks still unprocessed in @self.
325 ///
326 /// # Returns
327 ///
328 /// the number of unprocessed tasks
329 // rustdoc-stripper-ignore-next-stop
330 /// Returns the number of tasks still unprocessed in @self.
331 ///
332 /// # Returns
333 ///
334 /// the number of unprocessed tasks
335 #[doc(alias = "g_thread_pool_unprocessed")]
336 #[doc(alias = "get_unprocessed")]
337 pub fn unprocessed(&self) -> u32 {
338 unsafe { ffi::g_thread_pool_unprocessed(self.0.as_ptr()) }
339 }
340
341 /// Sets the maximal number of unused threads to @max_threads.
342 /// If @max_threads is -1, no limit is imposed on the number
343 /// of unused threads.
344 ///
345 /// The default value is 8 since GLib 2.84. Previously the default value was 2.
346 /// ## `max_threads`
347 /// maximal number of unused threads
348 // rustdoc-stripper-ignore-next-stop
349 /// Sets the maximal number of unused threads to @max_threads.
350 /// If @max_threads is -1, no limit is imposed on the number
351 /// of unused threads.
352 ///
353 /// The default value is 8 since GLib 2.84. Previously the default value was 2.
354 /// ## `max_threads`
355 /// maximal number of unused threads
356 #[doc(alias = "g_thread_pool_set_max_unused_threads")]
357 pub fn set_max_unused_threads(max_threads: Option<u32>) {
358 unsafe {
359 ffi::g_thread_pool_set_max_unused_threads(max_threads.map(|v| v as i32).unwrap_or(-1))
360 }
361 }
362
363 /// Returns the maximal allowed number of unused threads.
364 ///
365 /// # Returns
366 ///
367 /// the maximal number of unused threads
368 // rustdoc-stripper-ignore-next-stop
369 /// Returns the maximal allowed number of unused threads.
370 ///
371 /// # Returns
372 ///
373 /// the maximal number of unused threads
374 #[doc(alias = "g_thread_pool_get_max_unused_threads")]
375 #[doc(alias = "get_max_unused_threads")]
376 pub fn max_unused_threads() -> Option<u32> {
377 unsafe {
378 let max_unused_threads = ffi::g_thread_pool_get_max_unused_threads();
379 if max_unused_threads == -1 {
380 None
381 } else {
382 Some(max_unused_threads as u32)
383 }
384 }
385 }
386
387 /// Returns the number of currently unused threads.
388 ///
389 /// # Returns
390 ///
391 /// the number of currently unused threads
392 // rustdoc-stripper-ignore-next-stop
393 /// Returns the number of currently unused threads.
394 ///
395 /// # Returns
396 ///
397 /// the number of currently unused threads
398 #[doc(alias = "g_thread_pool_get_num_unused_threads")]
399 #[doc(alias = "get_num_unused_threads")]
400 pub fn num_unused_threads() -> u32 {
401 unsafe { ffi::g_thread_pool_get_num_unused_threads() }
402 }
403
404 /// Stops all currently unused threads. This does not change the
405 /// maximal number of unused threads. This function can be used to
406 /// regularly stop all unused threads e.g. from g_timeout_add().
407 // rustdoc-stripper-ignore-next-stop
408 /// Stops all currently unused threads. This does not change the
409 /// maximal number of unused threads. This function can be used to
410 /// regularly stop all unused threads e.g. from g_timeout_add().
411 #[doc(alias = "g_thread_pool_stop_unused_threads")]
412 pub fn stop_unused_threads() {
413 unsafe {
414 ffi::g_thread_pool_stop_unused_threads();
415 }
416 }
417
418 /// This function will set the maximum @interval that a thread
419 /// waiting in the pool for new tasks can be idle for before
420 /// being stopped. This function is similar to calling
421 /// g_thread_pool_stop_unused_threads() on a regular timeout,
422 /// except this is done on a per thread basis.
423 ///
424 /// By setting @interval to 0, idle threads will not be stopped.
425 ///
426 /// The default value is 15000 (15 seconds).
427 /// ## `interval`
428 /// the maximum @interval (in milliseconds)
429 /// a thread can be idle
430 // rustdoc-stripper-ignore-next-stop
431 /// This function will set the maximum @interval that a thread
432 /// waiting in the pool for new tasks can be idle for before
433 /// being stopped. This function is similar to calling
434 /// g_thread_pool_stop_unused_threads() on a regular timeout,
435 /// except this is done on a per thread basis.
436 ///
437 /// By setting @interval to 0, idle threads will not be stopped.
438 ///
439 /// The default value is 15000 (15 seconds).
440 /// ## `interval`
441 /// the maximum @interval (in milliseconds)
442 /// a thread can be idle
443 #[doc(alias = "g_thread_pool_set_max_idle_time")]
444 pub fn set_max_idle_time(max_idle_time: u32) {
445 unsafe { ffi::g_thread_pool_set_max_idle_time(max_idle_time) }
446 }
447
448 /// This function will return the maximum @interval that a
449 /// thread will wait in the thread pool for new tasks before
450 /// being stopped.
451 ///
452 /// If this function returns 0, threads waiting in the thread
453 /// pool for new work are not stopped.
454 ///
455 /// # Returns
456 ///
457 /// the maximum @interval (milliseconds) to wait
458 /// for new tasks in the thread pool before stopping the
459 /// thread
460 // rustdoc-stripper-ignore-next-stop
461 /// This function will return the maximum @interval that a
462 /// thread will wait in the thread pool for new tasks before
463 /// being stopped.
464 ///
465 /// If this function returns 0, threads waiting in the thread
466 /// pool for new work are not stopped.
467 ///
468 /// # Returns
469 ///
470 /// the maximum @interval (milliseconds) to wait
471 /// for new tasks in the thread pool before stopping the
472 /// thread
473 #[doc(alias = "g_thread_pool_get_max_idle_time")]
474 #[doc(alias = "get_max_idle_time")]
475 pub fn max_idle_time() -> u32 {
476 unsafe { ffi::g_thread_pool_get_max_idle_time() }
477 }
478}
479
480impl Drop for ThreadPool {
481 #[inline]
482 fn drop(&mut self) {
483 unsafe {
484 ffi::g_thread_pool_free(self.0.as_ptr(), ffi::GFALSE, ffi::GTRUE);
485 }
486 }
487}
488
489unsafe extern "C" fn spawn_func(func: ffi::gpointer, _data: ffi::gpointer) {
490 let func: Box<Box<dyn FnOnce()>> = Box::from_raw(func as *mut _);
491 func()
492}
493
494#[cfg(test)]
495mod tests {
496 use super::*;
497
498 #[test]
499 fn test_push() {
500 use std::sync::mpsc;
501
502 let p = ThreadPool::exclusive(1).unwrap();
503 let (sender, receiver) = mpsc::channel();
504
505 let handle = p
506 .push(move || {
507 sender.send(true).unwrap();
508 123
509 })
510 .unwrap();
511
512 assert_eq!(handle.join().unwrap(), 123);
513 assert_eq!(receiver.recv(), Ok(true));
514 }
515
516 #[test]
517 fn test_push_future() {
518 let c = crate::MainContext::new();
519 let p = ThreadPool::shared(None).unwrap();
520
521 let fut = p.push_future(|| true).unwrap();
522
523 let res = c.block_on(fut);
524 assert!(res.unwrap());
525 }
526}