1use std::{panic, ptr};
4
5#[cfg(feature = "futures")]
6use futures_channel::oneshot;
7#[cfg(feature = "futures")]
8use std::future::Future;
9
10use crate::{ffi, translate::*};
11
12#[derive(Debug)]
13#[doc(alias = "GThreadPool")]
14pub struct ThreadPool(ptr::NonNull<ffi::GThreadPool>);
15
16unsafe impl Send for ThreadPool {}
17unsafe impl Sync for ThreadPool {}
18
19#[derive(Debug)]
26pub struct ThreadHandle<T> {
27 rx: std::sync::mpsc::Receiver<std::thread::Result<T>>,
28}
29
30impl<T> ThreadHandle<T> {
31 #[inline]
38 pub fn join(self) -> std::thread::Result<T> {
39 self.rx.recv().unwrap()
40 }
41}
42
43impl ThreadPool {
44 #[doc(alias = "g_thread_pool_new")]
45 pub fn shared(max_threads: Option<u32>) -> Result<Self, crate::Error> {
46 unsafe {
47 let mut err = ptr::null_mut();
48 let pool = ffi::g_thread_pool_new(
49 Some(spawn_func),
50 ptr::null_mut(),
51 max_threads.map(|v| v as i32).unwrap_or(-1),
52 ffi::GFALSE,
53 &mut err,
54 );
55 if pool.is_null() {
56 Err(from_glib_full(err))
57 } else {
58 Ok(ThreadPool(ptr::NonNull::new_unchecked(pool)))
59 }
60 }
61 }
62
63 #[doc(alias = "g_thread_pool_new")]
64 pub fn exclusive(max_threads: u32) -> Result<Self, crate::Error> {
65 unsafe {
66 let mut err = ptr::null_mut();
67 let pool = ffi::g_thread_pool_new(
68 Some(spawn_func),
69 ptr::null_mut(),
70 max_threads as i32,
71 ffi::GTRUE,
72 &mut err,
73 );
74 if pool.is_null() {
75 Err(from_glib_full(err))
76 } else {
77 Ok(ThreadPool(ptr::NonNull::new_unchecked(pool)))
78 }
79 }
80 }
81
82 #[doc(alias = "g_thread_pool_push")]
83 pub fn push<T: Send + 'static, F: FnOnce() -> T + Send + 'static>(
84 &self,
85 func: F,
86 ) -> Result<ThreadHandle<T>, crate::Error> {
87 let (tx, rx) = std::sync::mpsc::sync_channel(1);
88 unsafe {
89 let func: Box<dyn FnOnce() + Send + 'static> = Box::new(move || {
90 let _ = tx.send(panic::catch_unwind(panic::AssertUnwindSafe(func)));
91 });
92 let func = Box::new(func);
93 let mut err = ptr::null_mut();
94
95 let func = Box::into_raw(func);
96 let ret: bool = from_glib(ffi::g_thread_pool_push(
97 self.0.as_ptr(),
98 func as *mut _,
99 &mut err,
100 ));
101 if ret {
102 Ok(ThreadHandle { rx })
103 } else {
104 let _ = Box::from_raw(func);
105 Err(from_glib_full(err))
106 }
107 }
108 }
109
110 #[cfg(feature = "futures")]
111 pub fn push_future<T: Send + 'static, F: FnOnce() -> T + Send + 'static>(
112 &self,
113 func: F,
114 ) -> Result<
115 impl Future<Output = std::thread::Result<T>> + Send + Sync + 'static + use<T, F>,
116 crate::Error,
117 > {
118 let (sender, receiver) = oneshot::channel();
119
120 self.push(move || {
121 let _ = sender.send(panic::catch_unwind(panic::AssertUnwindSafe(func)));
122 })?;
123
124 Ok(async move { receiver.await.expect("Dropped before executing") })
125 }
126
127 #[doc(alias = "g_thread_pool_set_max_threads")]
128 pub fn set_max_threads(&self, max_threads: Option<u32>) -> Result<(), crate::Error> {
129 unsafe {
130 let mut err = ptr::null_mut();
131 let ret: bool = from_glib(ffi::g_thread_pool_set_max_threads(
132 self.0.as_ptr(),
133 max_threads.map(|v| v as i32).unwrap_or(-1),
134 &mut err,
135 ));
136 if ret {
137 Ok(())
138 } else {
139 Err(from_glib_full(err))
140 }
141 }
142 }
143
144 #[doc(alias = "g_thread_pool_get_max_threads")]
145 #[doc(alias = "get_max_threads")]
146 pub fn max_threads(&self) -> Option<u32> {
147 unsafe {
148 let max_threads = ffi::g_thread_pool_get_max_threads(self.0.as_ptr());
149 if max_threads == -1 {
150 None
151 } else {
152 Some(max_threads as u32)
153 }
154 }
155 }
156
157 #[doc(alias = "g_thread_pool_get_num_threads")]
158 #[doc(alias = "get_num_threads")]
159 pub fn num_threads(&self) -> u32 {
160 unsafe { ffi::g_thread_pool_get_num_threads(self.0.as_ptr()) }
161 }
162
163 #[doc(alias = "g_thread_pool_unprocessed")]
164 #[doc(alias = "get_unprocessed")]
165 pub fn unprocessed(&self) -> u32 {
166 unsafe { ffi::g_thread_pool_unprocessed(self.0.as_ptr()) }
167 }
168
169 #[doc(alias = "g_thread_pool_set_max_unused_threads")]
170 pub fn set_max_unused_threads(max_threads: Option<u32>) {
171 unsafe {
172 ffi::g_thread_pool_set_max_unused_threads(max_threads.map(|v| v as i32).unwrap_or(-1))
173 }
174 }
175
176 #[doc(alias = "g_thread_pool_get_max_unused_threads")]
177 #[doc(alias = "get_max_unused_threads")]
178 pub fn max_unused_threads() -> Option<u32> {
179 unsafe {
180 let max_unused_threads = ffi::g_thread_pool_get_max_unused_threads();
181 if max_unused_threads == -1 {
182 None
183 } else {
184 Some(max_unused_threads as u32)
185 }
186 }
187 }
188
189 #[doc(alias = "g_thread_pool_get_num_unused_threads")]
190 #[doc(alias = "get_num_unused_threads")]
191 pub fn num_unused_threads() -> u32 {
192 unsafe { ffi::g_thread_pool_get_num_unused_threads() }
193 }
194
195 #[doc(alias = "g_thread_pool_stop_unused_threads")]
196 pub fn stop_unused_threads() {
197 unsafe {
198 ffi::g_thread_pool_stop_unused_threads();
199 }
200 }
201
202 #[doc(alias = "g_thread_pool_set_max_idle_time")]
203 pub fn set_max_idle_time(max_idle_time: u32) {
204 unsafe { ffi::g_thread_pool_set_max_idle_time(max_idle_time) }
205 }
206
207 #[doc(alias = "g_thread_pool_get_max_idle_time")]
208 #[doc(alias = "get_max_idle_time")]
209 pub fn max_idle_time() -> u32 {
210 unsafe { ffi::g_thread_pool_get_max_idle_time() }
211 }
212}
213
214impl Drop for ThreadPool {
215 #[inline]
216 fn drop(&mut self) {
217 unsafe {
218 ffi::g_thread_pool_free(self.0.as_ptr(), ffi::GFALSE, ffi::GTRUE);
219 }
220 }
221}
222
223unsafe extern "C" fn spawn_func(func: ffi::gpointer, _data: ffi::gpointer) {
224 unsafe {
225 let func: Box<Box<dyn FnOnce()>> = Box::from_raw(func as *mut _);
226 func()
227 }
228}
229
230#[cfg(test)]
231mod tests {
232 use super::*;
233
234 #[test]
235 fn test_push() {
236 use std::sync::mpsc;
237
238 let p = ThreadPool::exclusive(1).unwrap();
239 let (sender, receiver) = mpsc::channel();
240
241 let handle = p
242 .push(move || {
243 sender.send(true).unwrap();
244 123
245 })
246 .unwrap();
247
248 assert_eq!(handle.join().unwrap(), 123);
249 assert_eq!(receiver.recv(), Ok(true));
250 }
251
252 #[cfg(feature = "futures")]
253 #[test]
254 fn test_push_future() {
255 let c = crate::MainContext::new();
256 let p = ThreadPool::shared(None).unwrap();
257
258 let fut = p.push_future(|| true).unwrap();
259
260 let res = c.block_on(fut);
261 assert!(res.unwrap());
262 }
263}