bevy_tasks/single_threaded_task_pool.rs
1use alloc::{string::String, vec::Vec};
2use bevy_platform::sync::Arc;
3use core::{cell::{RefCell, Cell}, future::Future, marker::PhantomData, mem};
4
5use crate::executor::LocalExecutor;
6use crate::{block_on, Task};
7
8crate::cfg::std! {
9 if {
10 use std::thread_local;
11
12 use crate::executor::LocalExecutor as Executor;
13
14 thread_local! {
15 static LOCAL_EXECUTOR: Executor<'static> = const { Executor::new() };
16 }
17 } else {
18
19 // Because we do not have thread-locals without std, we cannot use LocalExecutor here.
20 use crate::executor::Executor;
21
22 static LOCAL_EXECUTOR: Executor<'static> = const { Executor::new() };
23 }
24}
25
26/// Used to create a [`TaskPool`].
27#[derive(Debug, Default, Clone)]
28pub struct TaskPoolBuilder {}
29
30/// This is a dummy struct for wasm support to provide the same api as with the multithreaded
31/// task pool. In the case of the multithreaded task pool this struct is used to spawn
32/// tasks on a specific thread. But the wasm task pool just calls
33/// `wasm_bindgen_futures::spawn_local` for spawning which just runs tasks on the main thread
34/// and so the [`ThreadExecutor`] does nothing.
35#[derive(Default)]
36pub struct ThreadExecutor<'a>(PhantomData<&'a ()>);
37impl<'a> ThreadExecutor<'a> {
38 /// Creates a new `ThreadExecutor`
39 pub fn new() -> Self {
40 Self::default()
41 }
42}
43
44impl TaskPoolBuilder {
45 /// Creates a new `TaskPoolBuilder` instance
46 pub fn new() -> Self {
47 Self::default()
48 }
49
50 /// No op on the single threaded task pool
51 pub fn num_threads(self, _num_threads: usize) -> Self {
52 self
53 }
54
55 /// No op on the single threaded task pool
56 pub fn stack_size(self, _stack_size: usize) -> Self {
57 self
58 }
59
60 /// No op on the single threaded task pool
61 pub fn thread_name(self, _thread_name: String) -> Self {
62 self
63 }
64
65 /// No op on the single threaded task pool
66 pub fn on_thread_spawn(self, _f: impl Fn() + Send + Sync + 'static) -> Self {
67 self
68 }
69
70 /// No op on the single threaded task pool
71 pub fn on_thread_destroy(self, _f: impl Fn() + Send + Sync + 'static) -> Self {
72 self
73 }
74
75 /// Creates a new [`TaskPool`]
76 pub fn build(self) -> TaskPool {
77 TaskPool::new_internal()
78 }
79}
80
81/// A thread pool for executing tasks. Tasks are futures that are being automatically driven by
82/// the pool on threads owned by the pool. In this case - main thread only.
83#[derive(Debug, Default, Clone)]
84pub struct TaskPool {}
85
86impl TaskPool {
87 /// Just create a new `ThreadExecutor` for wasm
88 pub fn get_thread_executor() -> Arc<ThreadExecutor<'static>> {
89 Arc::new(ThreadExecutor::new())
90 }
91
92 /// Create a `TaskPool` with the default configuration.
93 pub fn new() -> Self {
94 TaskPoolBuilder::new().build()
95 }
96
97 fn new_internal() -> Self {
98 Self {}
99 }
100
101 /// Return the number of threads owned by the task pool
102 pub fn thread_num(&self) -> usize {
103 1
104 }
105
106 /// Allows spawning non-`'static` futures on the thread pool. The function takes a callback,
107 /// passing a scope object into it. The scope object provided to the callback can be used
108 /// to spawn tasks. This function will await the completion of all tasks before returning.
109 ///
110 /// This is similar to `rayon::scope` and `crossbeam::scope`
111 pub fn scope<'env, F, T>(&self, f: F) -> Vec<T>
112 where
113 F: for<'scope> FnOnce(&'scope mut Scope<'scope, 'env, T>),
114 T: Send + 'static,
115 {
116 self.scope_with_executor(false, None, f)
117 }
118
119 /// Allows spawning non-`'static` futures on the thread pool. The function takes a callback,
120 /// passing a scope object into it. The scope object provided to the callback can be used
121 /// to spawn tasks. This function will await the completion of all tasks before returning.
122 ///
123 /// This is similar to `rayon::scope` and `crossbeam::scope`
124 #[expect(unsafe_code, reason = "Required to transmute lifetimes.")]
125 pub fn scope_with_executor<'env, F, T>(
126 &self,
127 _tick_task_pool_executor: bool,
128 _thread_executor: Option<&ThreadExecutor>,
129 f: F,
130 ) -> Vec<T>
131 where
132 F: for<'scope> FnOnce(&'scope mut Scope<'scope, 'env, T>),
133 T: Send + 'static,
134 {
135 // SAFETY: This safety comment applies to all references transmuted to 'env.
136 // Any futures spawned with these references need to return before this function completes.
137 // This is guaranteed because we drive all the futures spawned onto the Scope
138 // to completion in this function. However, rust has no way of knowing this so we
139 // transmute the lifetimes to 'env here to appease the compiler as it is unable to validate safety.
140 // Any usages of the references passed into `Scope` must be accessed through
141 // the transmuted reference for the rest of this function.
142
143 let executor = LocalExecutor::new();
144 // SAFETY: As above, all futures must complete in this function so we can change the lifetime
145 let executor_ref: &'env LocalExecutor<'env> = unsafe { mem::transmute(&executor) };
146
147 let results: RefCell<Vec<Option<T>>> = RefCell::new(Vec::new());
148 // SAFETY: As above, all futures must complete in this function so we can change the lifetime
149 let results_ref: &'env RefCell<Vec<Option<T>>> = unsafe { mem::transmute(&results) };
150
151 let pending_tasks: Cell<usize> = Cell::new(0);
152 // SAFETY: As above, all futures must complete in this function so we can change the lifetime
153 let pending_tasks: &'env Cell<usize> = unsafe { mem::transmute(&pending_tasks) };
154
155 let mut scope = Scope {
156 executor_ref,
157 pending_tasks,
158 results_ref,
159 scope: PhantomData,
160 env: PhantomData,
161 };
162
163 // SAFETY: As above, all futures must complete in this function so we can change the lifetime
164 let scope_ref: &'env mut Scope<'_, 'env, T> = unsafe { mem::transmute(&mut scope) };
165
166 f(scope_ref);
167
168 // Wait until the scope is complete
169 block_on(executor.run(async {
170 while pending_tasks.get() != 0 {
171 futures_lite::future::yield_now().await;
172 }
173 }));
174
175 results
176 .take()
177 .into_iter()
178 .map(|result| result.unwrap())
179 .collect()
180 }
181
182 /// Spawns a static future onto the thread pool. The returned Task is a future, which can be polled
183 /// to retrieve the output of the original future. Dropping the task will attempt to cancel it.
184 /// It can also be "detached", allowing it to continue running without having to be polled by the
185 /// end-user.
186 ///
187 /// If the provided future is non-`Send`, [`TaskPool::spawn_local`] should be used instead.
188 pub fn spawn<T>(
189 &self,
190 future: impl Future<Output = T> + 'static + MaybeSend + MaybeSync,
191 ) -> Task<T>
192 where
193 T: 'static + MaybeSend + MaybeSync,
194 {
195 crate::cfg::switch! {{
196 crate::cfg::web => {
197 Task::wrap_future(future)
198 }
199 crate::cfg::std => {
200 LOCAL_EXECUTOR.with(|executor| {
201 let task = executor.spawn(future);
202 // Loop until all tasks are done
203 while executor.try_tick() {}
204
205 Task::new(task)
206 })
207 }
208 _ => {
209 let task = LOCAL_EXECUTOR.spawn(future);
210 // Loop until all tasks are done
211 while LOCAL_EXECUTOR.try_tick() {}
212
213 Task::new(task)
214 }
215 }}
216 }
217
218 /// Spawns a static future on the JS event loop. This is exactly the same as [`TaskPool::spawn`].
219 pub fn spawn_local<T>(
220 &self,
221 future: impl Future<Output = T> + 'static + MaybeSend + MaybeSync,
222 ) -> Task<T>
223 where
224 T: 'static + MaybeSend + MaybeSync,
225 {
226 self.spawn(future)
227 }
228
229 /// Runs a function with the local executor. Typically used to tick
230 /// the local executor on the main thread as it needs to share time with
231 /// other things.
232 ///
233 /// ```
234 /// use bevy_tasks::TaskPool;
235 ///
236 /// TaskPool::new().with_local_executor(|local_executor| {
237 /// local_executor.try_tick();
238 /// });
239 /// ```
240 pub fn with_local_executor<F, R>(&self, f: F) -> R
241 where
242 F: FnOnce(&Executor) -> R,
243 {
244 crate::cfg::switch! {{
245 crate::cfg::std => {
246 LOCAL_EXECUTOR.with(f)
247 }
248 _ => {
249 f(&LOCAL_EXECUTOR)
250 }
251 }}
252 }
253}
254
255/// A `TaskPool` scope for running one or more non-`'static` futures.
256///
257/// For more information, see [`TaskPool::scope`].
258#[derive(Debug)]
259pub struct Scope<'scope, 'env: 'scope, T> {
260 executor_ref: &'scope LocalExecutor<'scope>,
261 // The number of pending tasks spawned on the scope
262 pending_tasks: &'scope Cell<usize>,
263 // Vector to gather results of all futures spawned during scope run
264 results_ref: &'env RefCell<Vec<Option<T>>>,
265
266 // make `Scope` invariant over 'scope and 'env
267 scope: PhantomData<&'scope mut &'scope ()>,
268 env: PhantomData<&'env mut &'env ()>,
269}
270
271impl<'scope, 'env, T: Send + 'env> Scope<'scope, 'env, T> {
272 /// Spawns a scoped future onto the executor. The scope *must* outlive
273 /// the provided future. The results of the future will be returned as a part of
274 /// [`TaskPool::scope`]'s return value.
275 ///
276 /// On the single threaded task pool, it just calls [`Scope::spawn_on_scope`].
277 ///
278 /// For more information, see [`TaskPool::scope`].
279 pub fn spawn<Fut: Future<Output = T> + 'scope + MaybeSend>(&self, f: Fut) {
280 self.spawn_on_scope(f);
281 }
282
283 /// Spawns a scoped future onto the executor. The scope *must* outlive
284 /// the provided future. The results of the future will be returned as a part of
285 /// [`TaskPool::scope`]'s return value.
286 ///
287 /// On the single threaded task pool, it just calls [`Scope::spawn_on_scope`].
288 ///
289 /// For more information, see [`TaskPool::scope`].
290 pub fn spawn_on_external<Fut: Future<Output = T> + 'scope + MaybeSend>(&self, f: Fut) {
291 self.spawn_on_scope(f);
292 }
293
294 /// Spawns a scoped future that runs on the thread the scope called from. The
295 /// scope *must* outlive the provided future. The results of the future will be
296 /// returned as a part of [`TaskPool::scope`]'s return value.
297 ///
298 /// For more information, see [`TaskPool::scope`].
299 pub fn spawn_on_scope<Fut: Future<Output = T> + 'scope + MaybeSend>(&self, f: Fut) {
300 // increment the number of pending tasks
301 let pending_tasks = self.pending_tasks;
302 pending_tasks.update(|i| i + 1);
303
304 // add a spot to keep the result, and record the index
305 let results_ref = self.results_ref;
306 let mut results = results_ref.borrow_mut();
307 let task_number = results.len();
308 results.push(None);
309 drop(results);
310
311 // create the job closure
312 let f = async move {
313 let result = f.await;
314
315 // store the result in the allocated slot
316 let mut results = results_ref.borrow_mut();
317 results[task_number] = Some(result);
318 drop(results);
319
320 // decrement the pending tasks count
321 pending_tasks.update(|i| i - 1);
322 };
323
324 // spawn the job itself
325 self.executor_ref.spawn(f).detach();
326 }
327}
328
329crate::cfg::std! {
330 if {
331 pub trait MaybeSend {}
332 impl<T> MaybeSend for T {}
333
334 pub trait MaybeSync {}
335 impl<T> MaybeSync for T {}
336 } else {
337 pub trait MaybeSend: Send {}
338 impl<T: Send> MaybeSend for T {}
339
340 pub trait MaybeSync: Sync {}
341 impl<T: Sync> MaybeSync for T {}
342 }
343}
344
345#[cfg(test)]
346mod test {
347 use std::{time, thread};
348
349 use super::*;
350
351 /// This test creates a scope with a single task that goes to sleep for a
352 /// nontrivial amount of time. At one point, the scope would (incorrectly)
353 /// return early under these conditions, causing a crash.
354 ///
355 /// The correct behavior is for the scope to block until the receiver is
356 /// woken by the external thread.
357 #[test]
358 fn scoped_spawn() {
359 let (sender, recever) = async_channel::unbounded();
360 let task_pool = TaskPool {};
361 let thread = thread::spawn(move || {
362 let duration = time::Duration::from_millis(50);
363 thread::sleep(duration);
364 let _ = sender.send(0);
365 });
366 task_pool.scope(|scope| {
367 scope.spawn(async {
368 recever.recv().await
369 });
370 });
371 }
372}