glib/thread_pool.rs
1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{future::Future, panic, ptr};
4
5use futures_channel::oneshot;
6
7use crate::{ffi, translate::*};
8
9/// The `GThreadPool` struct represents a thread pool.
10///
11/// A thread pool is useful when you wish to asynchronously fork out the execution of work
12/// and continue working in your own thread. If that will happen often, the overhead of starting
13/// and destroying a thread each time might be too high. In such cases reusing already started
14/// threads seems like a good idea. And it indeed is, but implementing this can be tedious
15/// and error-prone.
16///
17/// Therefore GLib provides thread pools for your convenience. An added advantage is, that the
18/// threads can be shared between the different subsystems of your program, when they are using GLib.
19///
20/// To create a new thread pool, you use [`new()`][Self::new()].
21/// It is destroyed by `GLib::ThreadPool::free()`.
22///
23/// If you want to execute a certain task within a thread pool, use [`push()`][Self::push()].
24///
25/// To get the current number of running threads you call [`num_threads()`][Self::num_threads()].
26/// To get the number of still unprocessed tasks you call [`unprocessed()`][Self::unprocessed()].
27/// To control the maximum number of threads for a thread pool, you use
28/// [`max_threads()`][Self::max_threads()]. and [`set_max_threads()`][Self::set_max_threads()].
29///
30/// Finally you can control the number of unused threads, that are kept alive by GLib for future use.
31/// The current number can be fetched with [`num_unused_threads()`][Self::num_unused_threads()].
32/// The maximum number can be controlled by [`max_unused_threads()`][Self::max_unused_threads()] and
33/// [`set_max_unused_threads()`][Self::set_max_unused_threads()]. All currently unused threads
34/// can be stopped by calling [`stop_unused_threads()`][Self::stop_unused_threads()].
35#[derive(Debug)]
36#[doc(alias = "GThreadPool")]
37pub struct ThreadPool(ptr::NonNull<ffi::GThreadPool>);
38
39unsafe impl Send for ThreadPool {}
40unsafe impl Sync for ThreadPool {}
41
42// rustdoc-stripper-ignore-next
43/// A handle to a thread running on a [`ThreadPool`].
44///
45/// Like [`std::thread::JoinHandle`] for a GLib thread. The return value from the task can be
46/// retrieved by calling [`ThreadHandle::join`]. Dropping the handle "detaches" the thread,
47/// allowing it to complete but discarding the return value.
48#[derive(Debug)]
49pub struct ThreadHandle<T> {
50 rx: std::sync::mpsc::Receiver<std::thread::Result<T>>,
51}
52
53impl<T> ThreadHandle<T> {
54 // rustdoc-stripper-ignore-next
55 /// Waits for the associated thread to finish.
56 ///
57 /// Blocks until the associated thread returns. Returns `Ok` with the value returned from the
58 /// thread, or `Err` if the thread panicked. This function will return immediately if the
59 /// associated thread has already finished.
60 #[inline]
61 pub fn join(self) -> std::thread::Result<T> {
62 self.rx.recv().unwrap()
63 }
64}
65
66impl ThreadPool {
67 #[doc(alias = "g_thread_pool_new")]
68 pub fn shared(max_threads: Option<u32>) -> Result<Self, crate::Error> {
69 unsafe {
70 let mut err = ptr::null_mut();
71 let pool = ffi::g_thread_pool_new(
72 Some(spawn_func),
73 ptr::null_mut(),
74 max_threads.map(|v| v as i32).unwrap_or(-1),
75 ffi::GFALSE,
76 &mut err,
77 );
78 if pool.is_null() {
79 Err(from_glib_full(err))
80 } else {
81 Ok(ThreadPool(ptr::NonNull::new_unchecked(pool)))
82 }
83 }
84 }
85
86 #[doc(alias = "g_thread_pool_new")]
87 pub fn exclusive(max_threads: u32) -> Result<Self, crate::Error> {
88 unsafe {
89 let mut err = ptr::null_mut();
90 let pool = ffi::g_thread_pool_new(
91 Some(spawn_func),
92 ptr::null_mut(),
93 max_threads as i32,
94 ffi::GTRUE,
95 &mut err,
96 );
97 if pool.is_null() {
98 Err(from_glib_full(err))
99 } else {
100 Ok(ThreadPool(ptr::NonNull::new_unchecked(pool)))
101 }
102 }
103 }
104
105 /// Inserts @data into the list of tasks to be executed by @self.
106 ///
107 /// When the number of currently running threads is lower than the
108 /// maximal allowed number of threads, a new thread is started (or
109 /// reused) with the properties given to g_thread_pool_new().
110 /// Otherwise, @data stays in the queue until a thread in this pool
111 /// finishes its previous task and processes @data.
112 ///
113 /// @error can be [`None`] to ignore errors, or non-[`None`] to report
114 /// errors. An error can only occur when a new thread couldn't be
115 /// created. In that case @data is simply appended to the queue of
116 /// work to do.
117 ///
118 /// Before version 2.32, this function did not return a success status.
119 ///
120 /// # Returns
121 ///
122 /// [`true`] on success, [`false`] if an error occurred
123 #[doc(alias = "g_thread_pool_push")]
124 pub fn push<T: Send + 'static, F: FnOnce() -> T + Send + 'static>(
125 &self,
126 func: F,
127 ) -> Result<ThreadHandle<T>, crate::Error> {
128 let (tx, rx) = std::sync::mpsc::sync_channel(1);
129 unsafe {
130 let func: Box<dyn FnOnce() + Send + 'static> = Box::new(move || {
131 let _ = tx.send(panic::catch_unwind(panic::AssertUnwindSafe(func)));
132 });
133 let func = Box::new(func);
134 let mut err = ptr::null_mut();
135
136 let func = Box::into_raw(func);
137 let ret: bool = from_glib(ffi::g_thread_pool_push(
138 self.0.as_ptr(),
139 func as *mut _,
140 &mut err,
141 ));
142 if ret {
143 Ok(ThreadHandle { rx })
144 } else {
145 let _ = Box::from_raw(func);
146 Err(from_glib_full(err))
147 }
148 }
149 }
150
151 pub fn push_future<T: Send + 'static, F: FnOnce() -> T + Send + 'static>(
152 &self,
153 func: F,
154 ) -> Result<impl Future<Output = std::thread::Result<T>> + Send + Sync + 'static, crate::Error>
155 {
156 let (sender, receiver) = oneshot::channel();
157
158 self.push(move || {
159 let _ = sender.send(panic::catch_unwind(panic::AssertUnwindSafe(func)));
160 })?;
161
162 Ok(async move { receiver.await.expect("Dropped before executing") })
163 }
164
165 /// Sets the maximal allowed number of threads for @self.
166 /// A value of -1 means that the maximal number of threads
167 /// is unlimited. If @self is an exclusive thread pool, setting
168 /// the maximal number of threads to -1 is not allowed.
169 ///
170 /// Setting @max_threads to 0 means stopping all work for @self.
171 /// It is effectively frozen until @max_threads is set to a non-zero
172 /// value again.
173 ///
174 /// A thread is never terminated while calling @func, as supplied by
175 /// g_thread_pool_new(). Instead the maximal number of threads only
176 /// has effect for the allocation of new threads in g_thread_pool_push().
177 /// A new thread is allocated, whenever the number of currently
178 /// running threads in @self is smaller than the maximal number.
179 ///
180 /// @error can be [`None`] to ignore errors, or non-[`None`] to report
181 /// errors. An error can only occur when a new thread couldn't be
182 /// created.
183 ///
184 /// Before version 2.32, this function did not return a success status.
185 /// ## `max_threads`
186 /// a new maximal number of threads for @self,
187 /// or -1 for unlimited
188 ///
189 /// # Returns
190 ///
191 /// [`true`] on success, [`false`] if an error occurred
192 #[doc(alias = "g_thread_pool_set_max_threads")]
193 pub fn set_max_threads(&self, max_threads: Option<u32>) -> Result<(), crate::Error> {
194 unsafe {
195 let mut err = ptr::null_mut();
196 let ret: bool = from_glib(ffi::g_thread_pool_set_max_threads(
197 self.0.as_ptr(),
198 max_threads.map(|v| v as i32).unwrap_or(-1),
199 &mut err,
200 ));
201 if ret {
202 Ok(())
203 } else {
204 Err(from_glib_full(err))
205 }
206 }
207 }
208
209 /// Returns the maximal number of threads for @self.
210 ///
211 /// # Returns
212 ///
213 /// the maximal number of threads
214 #[doc(alias = "g_thread_pool_get_max_threads")]
215 #[doc(alias = "get_max_threads")]
216 pub fn max_threads(&self) -> Option<u32> {
217 unsafe {
218 let max_threads = ffi::g_thread_pool_get_max_threads(self.0.as_ptr());
219 if max_threads == -1 {
220 None
221 } else {
222 Some(max_threads as u32)
223 }
224 }
225 }
226
227 /// Returns the number of threads currently running in @self.
228 ///
229 /// # Returns
230 ///
231 /// the number of threads currently running
232 #[doc(alias = "g_thread_pool_get_num_threads")]
233 #[doc(alias = "get_num_threads")]
234 pub fn num_threads(&self) -> u32 {
235 unsafe { ffi::g_thread_pool_get_num_threads(self.0.as_ptr()) }
236 }
237
238 /// Returns the number of tasks still unprocessed in @self.
239 ///
240 /// # Returns
241 ///
242 /// the number of unprocessed tasks
243 #[doc(alias = "g_thread_pool_unprocessed")]
244 #[doc(alias = "get_unprocessed")]
245 pub fn unprocessed(&self) -> u32 {
246 unsafe { ffi::g_thread_pool_unprocessed(self.0.as_ptr()) }
247 }
248
249 /// Sets the maximal number of unused threads to @max_threads.
250 /// If @max_threads is -1, no limit is imposed on the number
251 /// of unused threads.
252 ///
253 /// The default value is 8 since GLib 2.84. Previously the default value was 2.
254 /// ## `max_threads`
255 /// maximal number of unused threads
256 #[doc(alias = "g_thread_pool_set_max_unused_threads")]
257 pub fn set_max_unused_threads(max_threads: Option<u32>) {
258 unsafe {
259 ffi::g_thread_pool_set_max_unused_threads(max_threads.map(|v| v as i32).unwrap_or(-1))
260 }
261 }
262
263 /// Returns the maximal allowed number of unused threads.
264 ///
265 /// # Returns
266 ///
267 /// the maximal number of unused threads
268 #[doc(alias = "g_thread_pool_get_max_unused_threads")]
269 #[doc(alias = "get_max_unused_threads")]
270 pub fn max_unused_threads() -> Option<u32> {
271 unsafe {
272 let max_unused_threads = ffi::g_thread_pool_get_max_unused_threads();
273 if max_unused_threads == -1 {
274 None
275 } else {
276 Some(max_unused_threads as u32)
277 }
278 }
279 }
280
281 /// Returns the number of currently unused threads.
282 ///
283 /// # Returns
284 ///
285 /// the number of currently unused threads
286 #[doc(alias = "g_thread_pool_get_num_unused_threads")]
287 #[doc(alias = "get_num_unused_threads")]
288 pub fn num_unused_threads() -> u32 {
289 unsafe { ffi::g_thread_pool_get_num_unused_threads() }
290 }
291
292 /// Stops all currently unused threads. This does not change the
293 /// maximal number of unused threads. This function can be used to
294 /// regularly stop all unused threads e.g. from g_timeout_add().
295 #[doc(alias = "g_thread_pool_stop_unused_threads")]
296 pub fn stop_unused_threads() {
297 unsafe {
298 ffi::g_thread_pool_stop_unused_threads();
299 }
300 }
301
302 /// This function will set the maximum @interval that a thread
303 /// waiting in the pool for new tasks can be idle for before
304 /// being stopped. This function is similar to calling
305 /// g_thread_pool_stop_unused_threads() on a regular timeout,
306 /// except this is done on a per thread basis.
307 ///
308 /// By setting @interval to 0, idle threads will not be stopped.
309 ///
310 /// The default value is 15000 (15 seconds).
311 /// ## `interval`
312 /// the maximum @interval (in milliseconds)
313 /// a thread can be idle
314 #[doc(alias = "g_thread_pool_set_max_idle_time")]
315 pub fn set_max_idle_time(max_idle_time: u32) {
316 unsafe { ffi::g_thread_pool_set_max_idle_time(max_idle_time) }
317 }
318
319 /// This function will return the maximum @interval that a
320 /// thread will wait in the thread pool for new tasks before
321 /// being stopped.
322 ///
323 /// If this function returns 0, threads waiting in the thread
324 /// pool for new work are not stopped.
325 ///
326 /// # Returns
327 ///
328 /// the maximum @interval (milliseconds) to wait
329 /// for new tasks in the thread pool before stopping the
330 /// thread
331 #[doc(alias = "g_thread_pool_get_max_idle_time")]
332 #[doc(alias = "get_max_idle_time")]
333 pub fn max_idle_time() -> u32 {
334 unsafe { ffi::g_thread_pool_get_max_idle_time() }
335 }
336}
337
338impl Drop for ThreadPool {
339 #[inline]
340 fn drop(&mut self) {
341 unsafe {
342 ffi::g_thread_pool_free(self.0.as_ptr(), ffi::GFALSE, ffi::GTRUE);
343 }
344 }
345}
346
347unsafe extern "C" fn spawn_func(func: ffi::gpointer, _data: ffi::gpointer) {
348 let func: Box<Box<dyn FnOnce()>> = Box::from_raw(func as *mut _);
349 func()
350}
351
352#[cfg(test)]
353mod tests {
354 use super::*;
355
356 #[test]
357 fn test_push() {
358 use std::sync::mpsc;
359
360 let p = ThreadPool::exclusive(1).unwrap();
361 let (sender, receiver) = mpsc::channel();
362
363 let handle = p
364 .push(move || {
365 sender.send(true).unwrap();
366 123
367 })
368 .unwrap();
369
370 assert_eq!(handle.join().unwrap(), 123);
371 assert_eq!(receiver.recv(), Ok(true));
372 }
373
374 #[test]
375 fn test_push_future() {
376 let c = crate::MainContext::new();
377 let p = ThreadPool::shared(None).unwrap();
378
379 let fut = p.push_future(|| true).unwrap();
380
381 let res = c.block_on(fut);
382 assert!(res.unwrap());
383 }
384}