glib/thread_pool.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384
// Take a look at the license at the top of the repository in the LICENSE file.
use std::{future::Future, panic, ptr};
use futures_channel::oneshot;
use crate::{ffi, translate::*};
/// The `GThreadPool` struct represents a thread pool.
///
/// A thread pool is useful when you wish to asynchronously fork out the execution of work
/// and continue working in your own thread. If that will happen often, the overhead of starting
/// and destroying a thread each time might be too high. In such cases reusing already started
/// threads seems like a good idea. And it indeed is, but implementing this can be tedious
/// and error-prone.
///
/// Therefore GLib provides thread pools for your convenience. An added advantage is, that the
/// threads can be shared between the different subsystems of your program, when they are using GLib.
///
/// To create a new thread pool, you use [`new()`][Self::new()].
/// It is destroyed by `GLib::ThreadPool::free()`.
///
/// If you want to execute a certain task within a thread pool, use [`push()`][Self::push()].
///
/// To get the current number of running threads you call [`num_threads()`][Self::num_threads()].
/// To get the number of still unprocessed tasks you call [`unprocessed()`][Self::unprocessed()].
/// To control the maximum number of threads for a thread pool, you use
/// [`max_threads()`][Self::max_threads()]. and [`set_max_threads()`][Self::set_max_threads()].
///
/// Finally you can control the number of unused threads, that are kept alive by GLib for future use.
/// The current number can be fetched with [`num_unused_threads()`][Self::num_unused_threads()].
/// The maximum number can be controlled by [`max_unused_threads()`][Self::max_unused_threads()] and
/// [`set_max_unused_threads()`][Self::set_max_unused_threads()]. All currently unused threads
/// can be stopped by calling [`stop_unused_threads()`][Self::stop_unused_threads()].
#[derive(Debug)]
#[doc(alias = "GThreadPool")]
pub struct ThreadPool(ptr::NonNull<ffi::GThreadPool>);
unsafe impl Send for ThreadPool {}
unsafe impl Sync for ThreadPool {}
// rustdoc-stripper-ignore-next
/// A handle to a thread running on a [`ThreadPool`].
///
/// Like [`std::thread::JoinHandle`] for a GLib thread. The return value from the task can be
/// retrieved by calling [`ThreadHandle::join`]. Dropping the handle "detaches" the thread,
/// allowing it to complete but discarding the return value.
#[derive(Debug)]
pub struct ThreadHandle<T> {
rx: std::sync::mpsc::Receiver<std::thread::Result<T>>,
}
impl<T> ThreadHandle<T> {
// rustdoc-stripper-ignore-next
/// Waits for the associated thread to finish.
///
/// Blocks until the associated thread returns. Returns `Ok` with the value returned from the
/// thread, or `Err` if the thread panicked. This function will return immediately if the
/// associated thread has already finished.
#[inline]
pub fn join(self) -> std::thread::Result<T> {
self.rx.recv().unwrap()
}
}
impl ThreadPool {
#[doc(alias = "g_thread_pool_new")]
pub fn shared(max_threads: Option<u32>) -> Result<Self, crate::Error> {
unsafe {
let mut err = ptr::null_mut();
let pool = ffi::g_thread_pool_new(
Some(spawn_func),
ptr::null_mut(),
max_threads.map(|v| v as i32).unwrap_or(-1),
ffi::GFALSE,
&mut err,
);
if pool.is_null() {
Err(from_glib_full(err))
} else {
Ok(ThreadPool(ptr::NonNull::new_unchecked(pool)))
}
}
}
#[doc(alias = "g_thread_pool_new")]
pub fn exclusive(max_threads: u32) -> Result<Self, crate::Error> {
unsafe {
let mut err = ptr::null_mut();
let pool = ffi::g_thread_pool_new(
Some(spawn_func),
ptr::null_mut(),
max_threads as i32,
ffi::GTRUE,
&mut err,
);
if pool.is_null() {
Err(from_glib_full(err))
} else {
Ok(ThreadPool(ptr::NonNull::new_unchecked(pool)))
}
}
}
/// Inserts @data into the list of tasks to be executed by @self.
///
/// When the number of currently running threads is lower than the
/// maximal allowed number of threads, a new thread is started (or
/// reused) with the properties given to g_thread_pool_new().
/// Otherwise, @data stays in the queue until a thread in this pool
/// finishes its previous task and processes @data.
///
/// @error can be [`None`] to ignore errors, or non-[`None`] to report
/// errors. An error can only occur when a new thread couldn't be
/// created. In that case @data is simply appended to the queue of
/// work to do.
///
/// Before version 2.32, this function did not return a success status.
///
/// # Returns
///
/// [`true`] on success, [`false`] if an error occurred
#[doc(alias = "g_thread_pool_push")]
pub fn push<T: Send + 'static, F: FnOnce() -> T + Send + 'static>(
&self,
func: F,
) -> Result<ThreadHandle<T>, crate::Error> {
let (tx, rx) = std::sync::mpsc::sync_channel(1);
unsafe {
let func: Box<dyn FnOnce() + Send + 'static> = Box::new(move || {
let _ = tx.send(panic::catch_unwind(panic::AssertUnwindSafe(func)));
});
let func = Box::new(func);
let mut err = ptr::null_mut();
let func = Box::into_raw(func);
let ret: bool = from_glib(ffi::g_thread_pool_push(
self.0.as_ptr(),
func as *mut _,
&mut err,
));
if ret {
Ok(ThreadHandle { rx })
} else {
let _ = Box::from_raw(func);
Err(from_glib_full(err))
}
}
}
pub fn push_future<T: Send + 'static, F: FnOnce() -> T + Send + 'static>(
&self,
func: F,
) -> Result<impl Future<Output = std::thread::Result<T>> + Send + Sync + 'static, crate::Error>
{
let (sender, receiver) = oneshot::channel();
self.push(move || {
let _ = sender.send(panic::catch_unwind(panic::AssertUnwindSafe(func)));
})?;
Ok(async move { receiver.await.expect("Dropped before executing") })
}
/// Sets the maximal allowed number of threads for @self.
/// A value of -1 means that the maximal number of threads
/// is unlimited. If @self is an exclusive thread pool, setting
/// the maximal number of threads to -1 is not allowed.
///
/// Setting @max_threads to 0 means stopping all work for @self.
/// It is effectively frozen until @max_threads is set to a non-zero
/// value again.
///
/// A thread is never terminated while calling @func, as supplied by
/// g_thread_pool_new(). Instead the maximal number of threads only
/// has effect for the allocation of new threads in g_thread_pool_push().
/// A new thread is allocated, whenever the number of currently
/// running threads in @self is smaller than the maximal number.
///
/// @error can be [`None`] to ignore errors, or non-[`None`] to report
/// errors. An error can only occur when a new thread couldn't be
/// created.
///
/// Before version 2.32, this function did not return a success status.
/// ## `max_threads`
/// a new maximal number of threads for @self,
/// or -1 for unlimited
///
/// # Returns
///
/// [`true`] on success, [`false`] if an error occurred
#[doc(alias = "g_thread_pool_set_max_threads")]
pub fn set_max_threads(&self, max_threads: Option<u32>) -> Result<(), crate::Error> {
unsafe {
let mut err = ptr::null_mut();
let ret: bool = from_glib(ffi::g_thread_pool_set_max_threads(
self.0.as_ptr(),
max_threads.map(|v| v as i32).unwrap_or(-1),
&mut err,
));
if ret {
Ok(())
} else {
Err(from_glib_full(err))
}
}
}
/// Returns the maximal number of threads for @self.
///
/// # Returns
///
/// the maximal number of threads
#[doc(alias = "g_thread_pool_get_max_threads")]
#[doc(alias = "get_max_threads")]
pub fn max_threads(&self) -> Option<u32> {
unsafe {
let max_threads = ffi::g_thread_pool_get_max_threads(self.0.as_ptr());
if max_threads == -1 {
None
} else {
Some(max_threads as u32)
}
}
}
/// Returns the number of threads currently running in @self.
///
/// # Returns
///
/// the number of threads currently running
#[doc(alias = "g_thread_pool_get_num_threads")]
#[doc(alias = "get_num_threads")]
pub fn num_threads(&self) -> u32 {
unsafe { ffi::g_thread_pool_get_num_threads(self.0.as_ptr()) }
}
/// Returns the number of tasks still unprocessed in @self.
///
/// # Returns
///
/// the number of unprocessed tasks
#[doc(alias = "g_thread_pool_unprocessed")]
#[doc(alias = "get_unprocessed")]
pub fn unprocessed(&self) -> u32 {
unsafe { ffi::g_thread_pool_unprocessed(self.0.as_ptr()) }
}
/// Sets the maximal number of unused threads to @max_threads.
/// If @max_threads is -1, no limit is imposed on the number
/// of unused threads.
///
/// The default value is 8 since GLib 2.84. Previously the default value was 2.
/// ## `max_threads`
/// maximal number of unused threads
#[doc(alias = "g_thread_pool_set_max_unused_threads")]
pub fn set_max_unused_threads(max_threads: Option<u32>) {
unsafe {
ffi::g_thread_pool_set_max_unused_threads(max_threads.map(|v| v as i32).unwrap_or(-1))
}
}
/// Returns the maximal allowed number of unused threads.
///
/// # Returns
///
/// the maximal number of unused threads
#[doc(alias = "g_thread_pool_get_max_unused_threads")]
#[doc(alias = "get_max_unused_threads")]
pub fn max_unused_threads() -> Option<u32> {
unsafe {
let max_unused_threads = ffi::g_thread_pool_get_max_unused_threads();
if max_unused_threads == -1 {
None
} else {
Some(max_unused_threads as u32)
}
}
}
/// Returns the number of currently unused threads.
///
/// # Returns
///
/// the number of currently unused threads
#[doc(alias = "g_thread_pool_get_num_unused_threads")]
#[doc(alias = "get_num_unused_threads")]
pub fn num_unused_threads() -> u32 {
unsafe { ffi::g_thread_pool_get_num_unused_threads() }
}
/// Stops all currently unused threads. This does not change the
/// maximal number of unused threads. This function can be used to
/// regularly stop all unused threads e.g. from g_timeout_add().
#[doc(alias = "g_thread_pool_stop_unused_threads")]
pub fn stop_unused_threads() {
unsafe {
ffi::g_thread_pool_stop_unused_threads();
}
}
/// This function will set the maximum @interval that a thread
/// waiting in the pool for new tasks can be idle for before
/// being stopped. This function is similar to calling
/// g_thread_pool_stop_unused_threads() on a regular timeout,
/// except this is done on a per thread basis.
///
/// By setting @interval to 0, idle threads will not be stopped.
///
/// The default value is 15000 (15 seconds).
/// ## `interval`
/// the maximum @interval (in milliseconds)
/// a thread can be idle
#[doc(alias = "g_thread_pool_set_max_idle_time")]
pub fn set_max_idle_time(max_idle_time: u32) {
unsafe { ffi::g_thread_pool_set_max_idle_time(max_idle_time) }
}
/// This function will return the maximum @interval that a
/// thread will wait in the thread pool for new tasks before
/// being stopped.
///
/// If this function returns 0, threads waiting in the thread
/// pool for new work are not stopped.
///
/// # Returns
///
/// the maximum @interval (milliseconds) to wait
/// for new tasks in the thread pool before stopping the
/// thread
#[doc(alias = "g_thread_pool_get_max_idle_time")]
#[doc(alias = "get_max_idle_time")]
pub fn max_idle_time() -> u32 {
unsafe { ffi::g_thread_pool_get_max_idle_time() }
}
}
impl Drop for ThreadPool {
#[inline]
fn drop(&mut self) {
unsafe {
ffi::g_thread_pool_free(self.0.as_ptr(), ffi::GFALSE, ffi::GTRUE);
}
}
}
unsafe extern "C" fn spawn_func(func: ffi::gpointer, _data: ffi::gpointer) {
let func: Box<Box<dyn FnOnce()>> = Box::from_raw(func as *mut _);
func()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_push() {
use std::sync::mpsc;
let p = ThreadPool::exclusive(1).unwrap();
let (sender, receiver) = mpsc::channel();
let handle = p
.push(move || {
sender.send(true).unwrap();
123
})
.unwrap();
assert_eq!(handle.join().unwrap(), 123);
assert_eq!(receiver.recv(), Ok(true));
}
#[test]
fn test_push_future() {
let c = crate::MainContext::new();
let p = ThreadPool::shared(None).unwrap();
let fut = p.push_future(|| true).unwrap();
let res = c.block_on(fut);
assert!(res.unwrap());
}
}