bevy_tasks/
single_threaded_task_pool.rs

1use alloc::{string::String, vec::Vec};
2use bevy_platform::sync::Arc;
3use core::{cell::{RefCell, Cell}, future::Future, marker::PhantomData, mem};
4
5use crate::executor::LocalExecutor;
6use crate::{block_on, Task};
7
8crate::cfg::std! {
9    if {
10        use std::thread_local;
11
12        use crate::executor::LocalExecutor as Executor;
13
14        thread_local! {
15            static LOCAL_EXECUTOR: Executor<'static> = const { Executor::new() };
16        }
17    } else {
18
19        // Because we do not have thread-locals without std, we cannot use LocalExecutor here.
20        use crate::executor::Executor;
21
22        static LOCAL_EXECUTOR: Executor<'static> = const { Executor::new() };
23    }
24}
25
26/// Used to create a [`TaskPool`].
27#[derive(Debug, Default, Clone)]
28pub struct TaskPoolBuilder {}
29
30/// This is a dummy struct for wasm support to provide the same api as with the multithreaded
31/// task pool. In the case of the multithreaded task pool this struct is used to spawn
32/// tasks on a specific thread. But the wasm task pool just calls
33/// `wasm_bindgen_futures::spawn_local` for spawning which just runs tasks on the main thread
34/// and so the [`ThreadExecutor`] does nothing.
35#[derive(Default)]
36pub struct ThreadExecutor<'a>(PhantomData<&'a ()>);
37impl<'a> ThreadExecutor<'a> {
38    /// Creates a new `ThreadExecutor`
39    pub fn new() -> Self {
40        Self::default()
41    }
42}
43
44impl TaskPoolBuilder {
45    /// Creates a new `TaskPoolBuilder` instance
46    pub fn new() -> Self {
47        Self::default()
48    }
49
50    /// No op on the single threaded task pool
51    pub fn num_threads(self, _num_threads: usize) -> Self {
52        self
53    }
54
55    /// No op on the single threaded task pool
56    pub fn stack_size(self, _stack_size: usize) -> Self {
57        self
58    }
59
60    /// No op on the single threaded task pool
61    pub fn thread_name(self, _thread_name: String) -> Self {
62        self
63    }
64
65    /// No op on the single threaded task pool
66    pub fn on_thread_spawn(self, _f: impl Fn() + Send + Sync + 'static) -> Self {
67        self
68    }
69
70    /// No op on the single threaded task pool
71    pub fn on_thread_destroy(self, _f: impl Fn() + Send + Sync + 'static) -> Self {
72        self
73    }
74
75    /// Creates a new [`TaskPool`]
76    pub fn build(self) -> TaskPool {
77        TaskPool::new_internal()
78    }
79}
80
81/// A thread pool for executing tasks. Tasks are futures that are being automatically driven by
82/// the pool on threads owned by the pool. In this case - main thread only.
83#[derive(Debug, Default, Clone)]
84pub struct TaskPool {}
85
86impl TaskPool {
87    /// Just create a new `ThreadExecutor` for wasm
88    pub fn get_thread_executor() -> Arc<ThreadExecutor<'static>> {
89        Arc::new(ThreadExecutor::new())
90    }
91
92    /// Create a `TaskPool` with the default configuration.
93    pub fn new() -> Self {
94        TaskPoolBuilder::new().build()
95    }
96
97    fn new_internal() -> Self {
98        Self {}
99    }
100
101    /// Return the number of threads owned by the task pool
102    pub fn thread_num(&self) -> usize {
103        1
104    }
105
106    /// Allows spawning non-`'static` futures on the thread pool. The function takes a callback,
107    /// passing a scope object into it. The scope object provided to the callback can be used
108    /// to spawn tasks. This function will await the completion of all tasks before returning.
109    ///
110    /// This is similar to `rayon::scope` and `crossbeam::scope`
111    pub fn scope<'env, F, T>(&self, f: F) -> Vec<T>
112    where
113        F: for<'scope> FnOnce(&'scope mut Scope<'scope, 'env, T>),
114        T: Send + 'static,
115    {
116        self.scope_with_executor(false, None, f)
117    }
118
119    /// Allows spawning non-`'static` futures on the thread pool. The function takes a callback,
120    /// passing a scope object into it. The scope object provided to the callback can be used
121    /// to spawn tasks. This function will await the completion of all tasks before returning.
122    ///
123    /// This is similar to `rayon::scope` and `crossbeam::scope`
124    #[expect(unsafe_code, reason = "Required to transmute lifetimes.")]
125    pub fn scope_with_executor<'env, F, T>(
126        &self,
127        _tick_task_pool_executor: bool,
128        _thread_executor: Option<&ThreadExecutor>,
129        f: F,
130    ) -> Vec<T>
131    where
132        F: for<'scope> FnOnce(&'scope mut Scope<'scope, 'env, T>),
133        T: Send + 'static,
134    {
135        // SAFETY: This safety comment applies to all references transmuted to 'env.
136        // Any futures spawned with these references need to return before this function completes.
137        // This is guaranteed because we drive all the futures spawned onto the Scope
138        // to completion in this function. However, rust has no way of knowing this so we
139        // transmute the lifetimes to 'env here to appease the compiler as it is unable to validate safety.
140        // Any usages of the references passed into `Scope` must be accessed through
141        // the transmuted reference for the rest of this function.
142
143        let executor = LocalExecutor::new();
144        // SAFETY: As above, all futures must complete in this function so we can change the lifetime
145        let executor_ref: &'env LocalExecutor<'env> = unsafe { mem::transmute(&executor) };
146
147        let results: RefCell<Vec<Option<T>>> = RefCell::new(Vec::new());
148        // SAFETY: As above, all futures must complete in this function so we can change the lifetime
149        let results_ref: &'env RefCell<Vec<Option<T>>> = unsafe { mem::transmute(&results) };
150
151        let pending_tasks: Cell<usize> = Cell::new(0);
152        // SAFETY: As above, all futures must complete in this function so we can change the lifetime
153        let pending_tasks: &'env Cell<usize> = unsafe { mem::transmute(&pending_tasks) };
154
155        let mut scope = Scope {
156            executor_ref,
157            pending_tasks,
158            results_ref,
159            scope: PhantomData,
160            env: PhantomData,
161        };
162
163        // SAFETY: As above, all futures must complete in this function so we can change the lifetime
164        let scope_ref: &'env mut Scope<'_, 'env, T> = unsafe { mem::transmute(&mut scope) };
165
166        f(scope_ref);
167
168        // Wait until the scope is complete
169        block_on(executor.run(async {
170            while pending_tasks.get() != 0 {
171                futures_lite::future::yield_now().await;
172            }
173        }));
174
175        results
176            .take()
177            .into_iter()
178            .map(|result| result.unwrap())
179            .collect()
180    }
181
182    /// Spawns a static future onto the thread pool. The returned Task is a future, which can be polled
183    /// to retrieve the output of the original future. Dropping the task will attempt to cancel it.
184    /// It can also be "detached", allowing it to continue running without having to be polled by the
185    /// end-user.
186    ///
187    /// If the provided future is non-`Send`, [`TaskPool::spawn_local`] should be used instead.
188    pub fn spawn<T>(
189        &self,
190        future: impl Future<Output = T> + 'static + MaybeSend + MaybeSync,
191    ) -> Task<T>
192    where
193        T: 'static + MaybeSend + MaybeSync,
194    {
195        crate::cfg::switch! {{
196            crate::cfg::web => {
197                Task::wrap_future(future)
198            }
199            crate::cfg::std => {
200                LOCAL_EXECUTOR.with(|executor| {
201                    let task = executor.spawn(future);
202                    // Loop until all tasks are done
203                    while executor.try_tick() {}
204
205                    Task::new(task)
206                })
207            }
208            _ => {
209                let task = LOCAL_EXECUTOR.spawn(future);
210                // Loop until all tasks are done
211                while LOCAL_EXECUTOR.try_tick() {}
212
213                Task::new(task)
214            }
215        }}
216    }
217
218    /// Spawns a static future on the JS event loop. This is exactly the same as [`TaskPool::spawn`].
219    pub fn spawn_local<T>(
220        &self,
221        future: impl Future<Output = T> + 'static + MaybeSend + MaybeSync,
222    ) -> Task<T>
223    where
224        T: 'static + MaybeSend + MaybeSync,
225    {
226        self.spawn(future)
227    }
228
229    /// Runs a function with the local executor. Typically used to tick
230    /// the local executor on the main thread as it needs to share time with
231    /// other things.
232    ///
233    /// ```
234    /// use bevy_tasks::TaskPool;
235    ///
236    /// TaskPool::new().with_local_executor(|local_executor| {
237    ///     local_executor.try_tick();
238    /// });
239    /// ```
240    pub fn with_local_executor<F, R>(&self, f: F) -> R
241    where
242        F: FnOnce(&Executor) -> R,
243    {
244        crate::cfg::switch! {{
245            crate::cfg::std => {
246                LOCAL_EXECUTOR.with(f)
247            }
248            _ => {
249                f(&LOCAL_EXECUTOR)
250            }
251        }}
252    }
253}
254
255/// A `TaskPool` scope for running one or more non-`'static` futures.
256///
257/// For more information, see [`TaskPool::scope`].
258#[derive(Debug)]
259pub struct Scope<'scope, 'env: 'scope, T> {
260    executor_ref: &'scope LocalExecutor<'scope>,
261    // The number of pending tasks spawned on the scope
262    pending_tasks: &'scope Cell<usize>,
263    // Vector to gather results of all futures spawned during scope run
264    results_ref: &'env RefCell<Vec<Option<T>>>,
265
266    // make `Scope` invariant over 'scope and 'env
267    scope: PhantomData<&'scope mut &'scope ()>,
268    env: PhantomData<&'env mut &'env ()>,
269}
270
271impl<'scope, 'env, T: Send + 'env> Scope<'scope, 'env, T> {
272    /// Spawns a scoped future onto the executor. The scope *must* outlive
273    /// the provided future. The results of the future will be returned as a part of
274    /// [`TaskPool::scope`]'s return value.
275    ///
276    /// On the single threaded task pool, it just calls [`Scope::spawn_on_scope`].
277    ///
278    /// For more information, see [`TaskPool::scope`].
279    pub fn spawn<Fut: Future<Output = T> + 'scope + MaybeSend>(&self, f: Fut) {
280        self.spawn_on_scope(f);
281    }
282
283    /// Spawns a scoped future onto the executor. The scope *must* outlive
284    /// the provided future. The results of the future will be returned as a part of
285    /// [`TaskPool::scope`]'s return value.
286    ///
287    /// On the single threaded task pool, it just calls [`Scope::spawn_on_scope`].
288    ///
289    /// For more information, see [`TaskPool::scope`].
290    pub fn spawn_on_external<Fut: Future<Output = T> + 'scope + MaybeSend>(&self, f: Fut) {
291        self.spawn_on_scope(f);
292    }
293
294    /// Spawns a scoped future that runs on the thread the scope called from. The
295    /// scope *must* outlive the provided future. The results of the future will be
296    /// returned as a part of [`TaskPool::scope`]'s return value.
297    ///
298    /// For more information, see [`TaskPool::scope`].
299    pub fn spawn_on_scope<Fut: Future<Output = T> + 'scope + MaybeSend>(&self, f: Fut) {
300        // increment the number of pending tasks
301        let pending_tasks = self.pending_tasks;
302        pending_tasks.update(|i| i + 1);
303
304        // add a spot to keep the result, and record the index
305        let results_ref = self.results_ref;
306        let mut results = results_ref.borrow_mut();
307        let task_number = results.len();
308        results.push(None);
309        drop(results);
310
311        // create the job closure
312        let f = async move {
313            let result = f.await;
314
315            // store the result in the allocated slot
316            let mut results = results_ref.borrow_mut();
317            results[task_number] = Some(result);
318            drop(results);
319
320            // decrement the pending tasks count
321            pending_tasks.update(|i| i - 1);
322        };
323
324        // spawn the job itself
325        self.executor_ref.spawn(f).detach();
326    }
327}
328
329crate::cfg::std! {
330    if {
331        pub trait MaybeSend {}
332        impl<T> MaybeSend for T {}
333    
334        pub trait MaybeSync {}
335        impl<T> MaybeSync for T {}
336    } else {
337        pub trait MaybeSend: Send {}
338        impl<T: Send> MaybeSend for T {}
339    
340        pub trait MaybeSync: Sync {}
341        impl<T: Sync> MaybeSync for T {}
342    }
343}
344
345#[cfg(test)]
346mod test {
347    use std::{time, thread};
348
349    use super::*;
350
351    /// This test creates a scope with a single task that goes to sleep for a
352    /// nontrivial amount of time. At one point, the scope would (incorrectly)
353    /// return early under these conditions, causing a crash.
354    ///
355    /// The correct behavior is for the scope to block until the receiver is
356    /// woken by the external thread.
357    #[test]
358    fn scoped_spawn() {
359        let (sender, recever) = async_channel::unbounded();
360        let task_pool = TaskPool {};
361        let thread = thread::spawn(move || {
362            let duration = time::Duration::from_millis(50);
363            thread::sleep(duration);
364            let _ = sender.send(0);
365        });
366        task_pool.scope(|scope| {
367            scope.spawn(async {
368                recever.recv().await
369            });
370        });
371    }
372}