glib/thread_pool.rs
1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{panic, ptr};
4
5#[cfg(feature = "futures")]
6use futures_channel::oneshot;
7#[cfg(feature = "futures")]
8use std::future::Future;
9
10use crate::{ffi, translate::*};
11
12/// The `GThreadPool` struct represents a thread pool.
13///
14/// A thread pool is useful when you wish to asynchronously fork out the execution of work
15/// and continue working in your own thread. If that will happen often, the overhead of starting
16/// and destroying a thread each time might be too high. In such cases reusing already started
17/// threads seems like a good idea. And it indeed is, but implementing this can be tedious
18/// and error-prone.
19///
20/// Therefore GLib provides thread pools for your convenience. An added advantage is, that the
21/// threads can be shared between the different subsystems of your program, when they are using GLib.
22///
23/// To create a new thread pool, you use [`new()`][Self::new()].
24/// It is destroyed by `GLib::ThreadPool::free()`.
25///
26/// If you want to execute a certain task within a thread pool, use [`push()`][Self::push()].
27///
28/// To get the current number of running threads you call [`num_threads()`][Self::num_threads()].
29/// To get the number of still unprocessed tasks you call [`unprocessed()`][Self::unprocessed()].
30/// To control the maximum number of threads for a thread pool, you use
31/// [`max_threads()`][Self::max_threads()]. and [`set_max_threads()`][Self::set_max_threads()].
32///
33/// Finally you can control the number of unused threads, that are kept alive by GLib for future use.
34/// The current number can be fetched with [`num_unused_threads()`][Self::num_unused_threads()].
35/// The maximum number can be controlled by [`max_unused_threads()`][Self::max_unused_threads()] and
36/// [`set_max_unused_threads()`][Self::set_max_unused_threads()]. All currently unused threads
37/// can be stopped by calling [`stop_unused_threads()`][Self::stop_unused_threads()].
38#[derive(Debug)]
39#[doc(alias = "GThreadPool")]
40pub struct ThreadPool(ptr::NonNull<ffi::GThreadPool>);
41
42unsafe impl Send for ThreadPool {}
43unsafe impl Sync for ThreadPool {}
44
45// rustdoc-stripper-ignore-next
46/// A handle to a thread running on a [`ThreadPool`].
47///
48/// Like [`std::thread::JoinHandle`] for a GLib thread. The return value from the task can be
49/// retrieved by calling [`ThreadHandle::join`]. Dropping the handle "detaches" the thread,
50/// allowing it to complete but discarding the return value.
51#[derive(Debug)]
52pub struct ThreadHandle<T> {
53 rx: std::sync::mpsc::Receiver<std::thread::Result<T>>,
54}
55
56impl<T> ThreadHandle<T> {
57 // rustdoc-stripper-ignore-next
58 /// Waits for the associated thread to finish.
59 ///
60 /// Blocks until the associated thread returns. Returns `Ok` with the value returned from the
61 /// thread, or `Err` if the thread panicked. This function will return immediately if the
62 /// associated thread has already finished.
63 #[inline]
64 pub fn join(self) -> std::thread::Result<T> {
65 self.rx.recv().unwrap()
66 }
67}
68
69impl ThreadPool {
70 #[doc(alias = "g_thread_pool_new")]
71 pub fn shared(max_threads: Option<u32>) -> Result<Self, crate::Error> {
72 unsafe {
73 let mut err = ptr::null_mut();
74 let pool = ffi::g_thread_pool_new(
75 Some(spawn_func),
76 ptr::null_mut(),
77 max_threads.map(|v| v as i32).unwrap_or(-1),
78 ffi::GFALSE,
79 &mut err,
80 );
81 if pool.is_null() {
82 Err(from_glib_full(err))
83 } else {
84 Ok(ThreadPool(ptr::NonNull::new_unchecked(pool)))
85 }
86 }
87 }
88
89 #[doc(alias = "g_thread_pool_new")]
90 pub fn exclusive(max_threads: u32) -> Result<Self, crate::Error> {
91 unsafe {
92 let mut err = ptr::null_mut();
93 let pool = ffi::g_thread_pool_new(
94 Some(spawn_func),
95 ptr::null_mut(),
96 max_threads as i32,
97 ffi::GTRUE,
98 &mut err,
99 );
100 if pool.is_null() {
101 Err(from_glib_full(err))
102 } else {
103 Ok(ThreadPool(ptr::NonNull::new_unchecked(pool)))
104 }
105 }
106 }
107
108 /// Inserts @data into the list of tasks to be executed by @self.
109 ///
110 /// When the number of currently running threads is lower than the
111 /// maximal allowed number of threads, a new thread is started (or
112 /// reused) with the properties given to g_thread_pool_new().
113 /// Otherwise, @data stays in the queue until a thread in this pool
114 /// finishes its previous task and processes @data.
115 ///
116 /// @error can be [`None`] to ignore errors, or non-[`None`] to report
117 /// errors. An error can only occur when a new thread couldn't be
118 /// created. In that case @data is simply appended to the queue of
119 /// work to do.
120 ///
121 /// Before version 2.32, this function did not return a success status.
122 ///
123 /// # Returns
124 ///
125 /// [`true`] on success, [`false`] if an error occurred
126 #[doc(alias = "g_thread_pool_push")]
127 pub fn push<T: Send + 'static, F: FnOnce() -> T + Send + 'static>(
128 &self,
129 func: F,
130 ) -> Result<ThreadHandle<T>, crate::Error> {
131 let (tx, rx) = std::sync::mpsc::sync_channel(1);
132 unsafe {
133 let func: Box<dyn FnOnce() + Send + 'static> = Box::new(move || {
134 let _ = tx.send(panic::catch_unwind(panic::AssertUnwindSafe(func)));
135 });
136 let func = Box::new(func);
137 let mut err = ptr::null_mut();
138
139 let func = Box::into_raw(func);
140 let ret: bool = from_glib(ffi::g_thread_pool_push(
141 self.0.as_ptr(),
142 func as *mut _,
143 &mut err,
144 ));
145 if ret {
146 Ok(ThreadHandle { rx })
147 } else {
148 let _ = Box::from_raw(func);
149 Err(from_glib_full(err))
150 }
151 }
152 }
153
154 #[cfg(feature = "futures")]
155 pub fn push_future<T: Send + 'static, F: FnOnce() -> T + Send + 'static>(
156 &self,
157 func: F,
158 ) -> Result<
159 impl Future<Output = std::thread::Result<T>> + Send + Sync + 'static + use<T, F>,
160 crate::Error,
161 > {
162 let (sender, receiver) = oneshot::channel();
163
164 self.push(move || {
165 let _ = sender.send(panic::catch_unwind(panic::AssertUnwindSafe(func)));
166 })?;
167
168 Ok(async move { receiver.await.expect("Dropped before executing") })
169 }
170
171 /// Sets the maximal allowed number of threads for @self.
172 /// A value of -1 means that the maximal number of threads
173 /// is unlimited. If @self is an exclusive thread pool, setting
174 /// the maximal number of threads to -1 is not allowed.
175 ///
176 /// Setting @max_threads to 0 means stopping all work for @self.
177 /// It is effectively frozen until @max_threads is set to a non-zero
178 /// value again.
179 ///
180 /// A thread is never terminated while calling @func, as supplied by
181 /// g_thread_pool_new(). Instead the maximal number of threads only
182 /// has effect for the allocation of new threads in g_thread_pool_push().
183 /// A new thread is allocated, whenever the number of currently
184 /// running threads in @self is smaller than the maximal number.
185 ///
186 /// @error can be [`None`] to ignore errors, or non-[`None`] to report
187 /// errors. An error can only occur when a new thread couldn't be
188 /// created.
189 ///
190 /// Before version 2.32, this function did not return a success status.
191 /// ## `max_threads`
192 /// a new maximal number of threads for @self,
193 /// or -1 for unlimited
194 ///
195 /// # Returns
196 ///
197 /// [`true`] on success, [`false`] if an error occurred
198 #[doc(alias = "g_thread_pool_set_max_threads")]
199 pub fn set_max_threads(&self, max_threads: Option<u32>) -> Result<(), crate::Error> {
200 unsafe {
201 let mut err = ptr::null_mut();
202 let ret: bool = from_glib(ffi::g_thread_pool_set_max_threads(
203 self.0.as_ptr(),
204 max_threads.map(|v| v as i32).unwrap_or(-1),
205 &mut err,
206 ));
207 if ret {
208 Ok(())
209 } else {
210 Err(from_glib_full(err))
211 }
212 }
213 }
214
215 /// Returns the maximal number of threads for @self.
216 ///
217 /// # Returns
218 ///
219 /// the maximal number of threads
220 #[doc(alias = "g_thread_pool_get_max_threads")]
221 #[doc(alias = "get_max_threads")]
222 pub fn max_threads(&self) -> Option<u32> {
223 unsafe {
224 let max_threads = ffi::g_thread_pool_get_max_threads(self.0.as_ptr());
225 if max_threads == -1 {
226 None
227 } else {
228 Some(max_threads as u32)
229 }
230 }
231 }
232
233 /// Returns the number of threads currently running in @self.
234 ///
235 /// # Returns
236 ///
237 /// the number of threads currently running
238 #[doc(alias = "g_thread_pool_get_num_threads")]
239 #[doc(alias = "get_num_threads")]
240 pub fn num_threads(&self) -> u32 {
241 unsafe { ffi::g_thread_pool_get_num_threads(self.0.as_ptr()) }
242 }
243
244 /// Returns the number of tasks still unprocessed in @self.
245 ///
246 /// # Returns
247 ///
248 /// the number of unprocessed tasks
249 #[doc(alias = "g_thread_pool_unprocessed")]
250 #[doc(alias = "get_unprocessed")]
251 pub fn unprocessed(&self) -> u32 {
252 unsafe { ffi::g_thread_pool_unprocessed(self.0.as_ptr()) }
253 }
254
255 /// Sets the maximal number of unused threads to @max_threads.
256 /// If @max_threads is -1, no limit is imposed on the number
257 /// of unused threads.
258 ///
259 /// The default value is 8 since GLib 2.84. Previously the default value was 2.
260 /// ## `max_threads`
261 /// maximal number of unused threads
262 #[doc(alias = "g_thread_pool_set_max_unused_threads")]
263 pub fn set_max_unused_threads(max_threads: Option<u32>) {
264 unsafe {
265 ffi::g_thread_pool_set_max_unused_threads(max_threads.map(|v| v as i32).unwrap_or(-1))
266 }
267 }
268
269 /// Returns the maximal allowed number of unused threads.
270 ///
271 /// # Returns
272 ///
273 /// the maximal number of unused threads
274 #[doc(alias = "g_thread_pool_get_max_unused_threads")]
275 #[doc(alias = "get_max_unused_threads")]
276 pub fn max_unused_threads() -> Option<u32> {
277 unsafe {
278 let max_unused_threads = ffi::g_thread_pool_get_max_unused_threads();
279 if max_unused_threads == -1 {
280 None
281 } else {
282 Some(max_unused_threads as u32)
283 }
284 }
285 }
286
287 /// Returns the number of currently unused threads.
288 ///
289 /// # Returns
290 ///
291 /// the number of currently unused threads
292 #[doc(alias = "g_thread_pool_get_num_unused_threads")]
293 #[doc(alias = "get_num_unused_threads")]
294 pub fn num_unused_threads() -> u32 {
295 unsafe { ffi::g_thread_pool_get_num_unused_threads() }
296 }
297
298 /// Stops all currently unused threads. This does not change the
299 /// maximal number of unused threads. This function can be used to
300 /// regularly stop all unused threads e.g. from g_timeout_add().
301 #[doc(alias = "g_thread_pool_stop_unused_threads")]
302 pub fn stop_unused_threads() {
303 unsafe {
304 ffi::g_thread_pool_stop_unused_threads();
305 }
306 }
307
308 /// This function will set the maximum @interval that a thread
309 /// waiting in the pool for new tasks can be idle for before
310 /// being stopped. This function is similar to calling
311 /// g_thread_pool_stop_unused_threads() on a regular timeout,
312 /// except this is done on a per thread basis.
313 ///
314 /// By setting @interval to 0, idle threads will not be stopped.
315 ///
316 /// The default value is 15000 (15 seconds).
317 /// ## `interval`
318 /// the maximum @interval (in milliseconds)
319 /// a thread can be idle
320 #[doc(alias = "g_thread_pool_set_max_idle_time")]
321 pub fn set_max_idle_time(max_idle_time: u32) {
322 unsafe { ffi::g_thread_pool_set_max_idle_time(max_idle_time) }
323 }
324
325 /// This function will return the maximum @interval that a
326 /// thread will wait in the thread pool for new tasks before
327 /// being stopped.
328 ///
329 /// If this function returns 0, threads waiting in the thread
330 /// pool for new work are not stopped.
331 ///
332 /// # Returns
333 ///
334 /// the maximum @interval (milliseconds) to wait
335 /// for new tasks in the thread pool before stopping the
336 /// thread
337 #[doc(alias = "g_thread_pool_get_max_idle_time")]
338 #[doc(alias = "get_max_idle_time")]
339 pub fn max_idle_time() -> u32 {
340 unsafe { ffi::g_thread_pool_get_max_idle_time() }
341 }
342}
343
344impl Drop for ThreadPool {
345 #[inline]
346 fn drop(&mut self) {
347 unsafe {
348 ffi::g_thread_pool_free(self.0.as_ptr(), ffi::GFALSE, ffi::GTRUE);
349 }
350 }
351}
352
353unsafe extern "C" fn spawn_func(func: ffi::gpointer, _data: ffi::gpointer) {
354 unsafe {
355 let func: Box<Box<dyn FnOnce()>> = Box::from_raw(func as *mut _);
356 func()
357 }
358}
359
360#[cfg(test)]
361mod tests {
362 use super::*;
363
364 #[test]
365 fn test_push() {
366 use std::sync::mpsc;
367
368 let p = ThreadPool::exclusive(1).unwrap();
369 let (sender, receiver) = mpsc::channel();
370
371 let handle = p
372 .push(move || {
373 sender.send(true).unwrap();
374 123
375 })
376 .unwrap();
377
378 assert_eq!(handle.join().unwrap(), 123);
379 assert_eq!(receiver.recv(), Ok(true));
380 }
381
382 #[cfg(feature = "futures")]
383 #[test]
384 fn test_push_future() {
385 let c = crate::MainContext::new();
386 let p = ThreadPool::shared(None).unwrap();
387
388 let fut = p.push_future(|| true).unwrap();
389
390 let res = c.block_on(fut);
391 assert!(res.unwrap());
392 }
393}