1use alloc::{boxed::Box, vec::Vec};
2use bevy_platform::cell::SyncUnsafeCell;
3use bevy_platform::sync::Arc;
4use bevy_tasks::{ComputeTaskPool, Scope, TaskPool, ThreadExecutor};
5use concurrent_queue::ConcurrentQueue;
6use core::{any::Any, panic::AssertUnwindSafe};
7use fixedbitset::FixedBitSet;
8#[cfg(feature = "std")]
9use std::eprintln;
10use std::sync::{Mutex, MutexGuard};
11
12#[cfg(feature = "trace")]
13use tracing::{info_span, Span};
14
15use crate::{
16 error::{ErrorContext, ErrorHandler, Result},
17 prelude::Resource,
18 schedule::{
19 is_apply_deferred, ConditionWithAccess, SystemExecutor, SystemSchedule, SystemWithAccess,
20 },
21 system::{RunSystemError, ScheduleSystem},
22 world::{unsafe_world_cell::UnsafeWorldCell, World},
23};
24#[cfg(feature = "hotpatching")]
25use crate::{prelude::DetectChanges, HotPatchChanges};
26
27use super::__rust_begin_short_backtrace;
28
29struct Environment<'env, 'sys> {
31 executor: &'env MultiThreadedExecutor,
32 systems: &'sys [SyncUnsafeCell<SystemWithAccess>],
33 conditions: SyncUnsafeCell<Conditions<'sys>>,
34 world_cell: UnsafeWorldCell<'env>,
35}
36
37struct Conditions<'a> {
38 system_conditions: &'a mut [Vec<ConditionWithAccess>],
39 set_conditions: &'a mut [Vec<ConditionWithAccess>],
40 sets_with_conditions_of_systems: &'a [FixedBitSet],
41 systems_in_sets_with_conditions: &'a [FixedBitSet],
42}
43
44impl<'env, 'sys> Environment<'env, 'sys> {
45 fn new(
46 executor: &'env MultiThreadedExecutor,
47 schedule: &'sys mut SystemSchedule,
48 world: &'env mut World,
49 ) -> Self {
50 Environment {
51 executor,
52 systems: SyncUnsafeCell::from_mut(schedule.systems.as_mut_slice()).as_slice_of_cells(),
53 conditions: SyncUnsafeCell::new(Conditions {
54 system_conditions: &mut schedule.system_conditions,
55 set_conditions: &mut schedule.set_conditions,
56 sets_with_conditions_of_systems: &schedule.sets_with_conditions_of_systems,
57 systems_in_sets_with_conditions: &schedule.systems_in_sets_with_conditions,
58 }),
59 world_cell: world.as_unsafe_world_cell(),
60 }
61 }
62}
63
64struct SystemTaskMetadata {
67 conflicting_systems: FixedBitSet,
69 condition_conflicting_systems: FixedBitSet,
74 dependents: Vec<usize>,
76 is_send: bool,
78 is_exclusive: bool,
80}
81
82struct SystemResult {
84 system_index: usize,
85}
86
87pub struct MultiThreadedExecutor {
89 state: Mutex<ExecutorState>,
91 system_completion: ConcurrentQueue<SystemResult>,
93 apply_final_deferred: bool,
95 panic_payload: Mutex<Option<Box<dyn Any + Send>>>,
97 starting_systems: FixedBitSet,
98 #[cfg(feature = "trace")]
100 executor_span: Span,
101}
102
103pub struct ExecutorState {
105 system_task_metadata: Vec<SystemTaskMetadata>,
107 set_condition_conflicting_systems: Vec<FixedBitSet>,
109 local_thread_running: bool,
111 exclusive_running: bool,
113 num_running_systems: usize,
115 num_dependencies_remaining: Vec<usize>,
117 evaluated_sets: FixedBitSet,
119 ready_systems: FixedBitSet,
121 ready_systems_copy: FixedBitSet,
123 running_systems: FixedBitSet,
125 skipped_systems: FixedBitSet,
127 completed_systems: FixedBitSet,
129 unapplied_systems: FixedBitSet,
131}
132
133#[derive(Copy, Clone)]
138struct Context<'scope, 'env, 'sys> {
139 environment: &'env Environment<'env, 'sys>,
140 scope: &'scope Scope<'scope, 'env, ()>,
141 error_handler: ErrorHandler,
142}
143
144impl Default for MultiThreadedExecutor {
145 fn default() -> Self {
146 Self::new()
147 }
148}
149
150impl SystemExecutor for MultiThreadedExecutor {
151 fn init(&mut self, schedule: &SystemSchedule) {
152 let state = self.state.get_mut().unwrap();
153 let sys_count = schedule.system_ids.len();
155 let set_count = schedule.set_ids.len();
156
157 self.system_completion = ConcurrentQueue::bounded(sys_count.max(1));
158 self.starting_systems = FixedBitSet::with_capacity(sys_count);
159 state.evaluated_sets = FixedBitSet::with_capacity(set_count);
160 state.ready_systems = FixedBitSet::with_capacity(sys_count);
161 state.ready_systems_copy = FixedBitSet::with_capacity(sys_count);
162 state.running_systems = FixedBitSet::with_capacity(sys_count);
163 state.completed_systems = FixedBitSet::with_capacity(sys_count);
164 state.skipped_systems = FixedBitSet::with_capacity(sys_count);
165 state.unapplied_systems = FixedBitSet::with_capacity(sys_count);
166
167 state.system_task_metadata = Vec::with_capacity(sys_count);
168 for index in 0..sys_count {
169 state.system_task_metadata.push(SystemTaskMetadata {
170 conflicting_systems: FixedBitSet::with_capacity(sys_count),
171 condition_conflicting_systems: FixedBitSet::with_capacity(sys_count),
172 dependents: schedule.system_dependents[index].clone(),
173 is_send: schedule.systems[index].system.is_send(),
174 is_exclusive: schedule.systems[index].system.is_exclusive(),
175 });
176 if schedule.system_dependencies[index] == 0 {
177 self.starting_systems.insert(index);
178 }
179 }
180
181 {
182 #[cfg(feature = "trace")]
183 let _span = info_span!("calculate conflicting systems").entered();
184 for index1 in 0..sys_count {
185 let system1 = &schedule.systems[index1];
186 for index2 in 0..index1 {
187 let system2 = &schedule.systems[index2];
188 if !system2.access.is_compatible(&system1.access) {
189 state.system_task_metadata[index1]
190 .conflicting_systems
191 .insert(index2);
192 state.system_task_metadata[index2]
193 .conflicting_systems
194 .insert(index1);
195 }
196 }
197
198 for index2 in 0..sys_count {
199 let system2 = &schedule.systems[index2];
200 if schedule.system_conditions[index1]
201 .iter()
202 .any(|condition| !system2.access.is_compatible(&condition.access))
203 {
204 state.system_task_metadata[index1]
205 .condition_conflicting_systems
206 .insert(index2);
207 }
208 }
209 }
210
211 state.set_condition_conflicting_systems.clear();
212 state.set_condition_conflicting_systems.reserve(set_count);
213 for set_idx in 0..set_count {
214 let mut conflicting_systems = FixedBitSet::with_capacity(sys_count);
215 for sys_index in 0..sys_count {
216 let system = &schedule.systems[sys_index];
217 if schedule.set_conditions[set_idx]
218 .iter()
219 .any(|condition| !system.access.is_compatible(&condition.access))
220 {
221 conflicting_systems.insert(sys_index);
222 }
223 }
224 state
225 .set_condition_conflicting_systems
226 .push(conflicting_systems);
227 }
228 }
229
230 state.num_dependencies_remaining = Vec::with_capacity(sys_count);
231 }
232
233 fn run(
234 &mut self,
235 schedule: &mut SystemSchedule,
236 world: &mut World,
237 _skip_systems: Option<&FixedBitSet>,
238 error_handler: ErrorHandler,
239 ) {
240 let state = self.state.get_mut().unwrap();
241 if schedule.systems.is_empty() {
243 return;
244 }
245 state.num_running_systems = 0;
246 state
247 .num_dependencies_remaining
248 .clone_from(&schedule.system_dependencies);
249 state.ready_systems.clone_from(&self.starting_systems);
250
251 #[cfg(feature = "bevy_debug_stepping")]
254 if let Some(skipped_systems) = _skip_systems {
255 debug_assert_eq!(skipped_systems.len(), state.completed_systems.len());
256 state.completed_systems |= skipped_systems;
258
259 for system_index in skipped_systems.ones() {
262 state.signal_dependents(system_index);
263 state.ready_systems.remove(system_index);
264 }
265 }
266
267 let thread_executor = world
268 .get_resource::<MainThreadExecutor>()
269 .map(|e| e.0.clone());
270 let thread_executor = thread_executor.as_deref();
271
272 let environment = &Environment::new(self, schedule, world);
273
274 ComputeTaskPool::get_or_init(TaskPool::default).scope_with_executor(
275 false,
276 thread_executor,
277 |scope| {
278 let context = Context {
279 environment,
280 scope,
281 error_handler,
282 };
283
284 context.tick_executor();
287 },
288 );
289
290 let systems = environment.systems;
292
293 let state = self.state.get_mut().unwrap();
294 if self.apply_final_deferred {
295 let res = apply_deferred(&state.unapplied_systems, systems, world);
298 if let Err(payload) = res {
299 let panic_payload = self.panic_payload.get_mut().unwrap();
300 *panic_payload = Some(payload);
301 }
302 state.unapplied_systems.clear();
303 }
304
305 let payload = self.panic_payload.get_mut().unwrap();
307 if let Some(payload) = payload.take() {
308 std::panic::resume_unwind(payload);
309 }
310
311 debug_assert!(state.ready_systems.is_clear());
312 debug_assert!(state.running_systems.is_clear());
313 state.evaluated_sets.clear();
314 state.skipped_systems.clear();
315 state.completed_systems.clear();
316 }
317
318 fn set_apply_final_deferred(&mut self, value: bool) {
319 self.apply_final_deferred = value;
320 }
321}
322
323impl<'scope, 'env: 'scope, 'sys> Context<'scope, 'env, 'sys> {
324 fn system_completed(
325 &self,
326 system_index: usize,
327 res: Result<(), Box<dyn Any + Send>>,
328 system: &ScheduleSystem,
329 ) {
330 self.environment
332 .executor
333 .system_completion
334 .push(SystemResult { system_index })
335 .unwrap_or_else(|error| unreachable!("{}", error));
336 if let Err(payload) = res {
337 #[cfg(feature = "std")]
338 #[expect(clippy::print_stderr, reason = "Allowed behind `std` feature gate.")]
339 {
340 eprintln!("Encountered a panic in system `{}`!", system.name());
341 }
342 {
344 let mut panic_payload = self.environment.executor.panic_payload.lock().unwrap();
345 *panic_payload = Some(payload);
346 }
347 }
348 self.tick_executor();
349 }
350
351 #[expect(
352 clippy::mut_from_ref,
353 reason = "Field is only accessed here and is guarded by lock with a documented safety comment"
354 )]
355 fn try_lock<'a>(&'a self) -> Option<(&'a mut Conditions<'sys>, MutexGuard<'a, ExecutorState>)> {
356 let guard = self.environment.executor.state.try_lock().ok()?;
357 let conditions = unsafe { &mut *self.environment.conditions.get() };
360 Some((conditions, guard))
361 }
362
363 fn tick_executor(&self) {
364 loop {
370 let Some((conditions, mut guard)) = self.try_lock() else {
371 return;
372 };
373 guard.tick(self, conditions);
374 drop(guard);
376 if self.environment.executor.system_completion.is_empty() {
377 return;
378 }
379 }
380 }
381}
382
383impl MultiThreadedExecutor {
384 pub fn new() -> Self {
388 Self {
389 state: Mutex::new(ExecutorState::new()),
390 system_completion: ConcurrentQueue::unbounded(),
391 starting_systems: FixedBitSet::new(),
392 apply_final_deferred: true,
393 panic_payload: Mutex::new(None),
394 #[cfg(feature = "trace")]
395 executor_span: info_span!("multithreaded executor"),
396 }
397 }
398}
399
400impl ExecutorState {
401 fn new() -> Self {
402 Self {
403 system_task_metadata: Vec::new(),
404 set_condition_conflicting_systems: Vec::new(),
405 num_running_systems: 0,
406 num_dependencies_remaining: Vec::new(),
407 local_thread_running: false,
408 exclusive_running: false,
409 evaluated_sets: FixedBitSet::new(),
410 ready_systems: FixedBitSet::new(),
411 ready_systems_copy: FixedBitSet::new(),
412 running_systems: FixedBitSet::new(),
413 skipped_systems: FixedBitSet::new(),
414 completed_systems: FixedBitSet::new(),
415 unapplied_systems: FixedBitSet::new(),
416 }
417 }
418
419 fn tick(&mut self, context: &Context, conditions: &mut Conditions) {
420 #[cfg(feature = "trace")]
421 let _span = context.environment.executor.executor_span.enter();
422
423 for result in context.environment.executor.system_completion.try_iter() {
424 self.finish_system_and_handle_dependents(result);
425 }
426
427 unsafe {
431 self.spawn_system_tasks(context, conditions);
432 }
433 }
434
435 unsafe fn spawn_system_tasks(&mut self, context: &Context, conditions: &mut Conditions) {
441 if self.exclusive_running {
442 return;
443 }
444
445 #[cfg(feature = "hotpatching")]
446 #[expect(
447 clippy::undocumented_unsafe_blocks,
448 reason = "This actually could result in UB if a system tries to mutate
449 `HotPatchChanges`. We allow this as the resource only exists with the `hotpatching` feature.
450 and `hotpatching` should never be enabled in release."
451 )]
452 #[cfg(feature = "hotpatching")]
453 let hotpatch_tick = unsafe {
454 context
455 .environment
456 .world_cell
457 .get_resource_ref::<HotPatchChanges>()
458 }
459 .map(|r| r.last_changed())
460 .unwrap_or_default();
461
462 let mut ready_systems = core::mem::take(&mut self.ready_systems_copy);
464
465 let mut check_for_new_ready_systems = true;
468 while check_for_new_ready_systems {
469 check_for_new_ready_systems = false;
470
471 ready_systems.clone_from(&self.ready_systems);
472
473 for system_index in ready_systems.ones() {
474 debug_assert!(!self.running_systems.contains(system_index));
475 let system =
478 &mut unsafe { &mut *context.environment.systems[system_index].get() }.system;
479
480 #[cfg(feature = "hotpatching")]
481 if hotpatch_tick.is_newer_than(
482 system.get_last_run(),
483 context.environment.world_cell.change_tick(),
484 ) {
485 system.refresh_hotpatch();
486 }
487
488 if !self.can_run(system_index, conditions) {
489 continue;
494 }
495
496 self.ready_systems.remove(system_index);
497
498 if unsafe {
501 !self.should_run(
502 system_index,
503 system,
504 conditions,
505 context.environment.world_cell,
506 context.error_handler,
507 )
508 } {
509 self.skip_system_and_signal_dependents(system_index);
510 check_for_new_ready_systems = true;
512 continue;
513 }
514
515 self.running_systems.insert(system_index);
516 self.num_running_systems += 1;
517
518 if self.system_task_metadata[system_index].is_exclusive {
519 unsafe {
522 self.spawn_exclusive_system_task(context, system_index);
523 }
524 check_for_new_ready_systems = false;
525 break;
526 }
527
528 unsafe {
534 self.spawn_system_task(context, system_index);
535 }
536 }
537 }
538
539 self.ready_systems_copy = ready_systems;
541 }
542
543 fn can_run(&mut self, system_index: usize, conditions: &mut Conditions) -> bool {
544 let system_meta = &self.system_task_metadata[system_index];
545 if system_meta.is_exclusive && self.num_running_systems > 0 {
546 return false;
547 }
548
549 if !system_meta.is_send && self.local_thread_running {
550 return false;
551 }
552
553 for set_idx in conditions.sets_with_conditions_of_systems[system_index]
555 .difference(&self.evaluated_sets)
556 {
557 if !self.set_condition_conflicting_systems[set_idx].is_disjoint(&self.running_systems) {
558 return false;
559 }
560 }
561
562 if !system_meta
563 .condition_conflicting_systems
564 .is_disjoint(&self.running_systems)
565 {
566 return false;
567 }
568
569 if !self.skipped_systems.contains(system_index)
570 && !system_meta
571 .conflicting_systems
572 .is_disjoint(&self.running_systems)
573 {
574 return false;
575 }
576
577 true
578 }
579
580 unsafe fn should_run(
585 &mut self,
586 system_index: usize,
587 system: &mut ScheduleSystem,
588 conditions: &mut Conditions,
589 world: UnsafeWorldCell,
590 error_handler: ErrorHandler,
591 ) -> bool {
592 let mut should_run = !self.skipped_systems.contains(system_index);
593
594 for set_idx in conditions.sets_with_conditions_of_systems[system_index].ones() {
595 if self.evaluated_sets.contains(set_idx) {
596 continue;
597 }
598
599 let set_conditions_met = unsafe {
604 evaluate_and_fold_conditions(
605 &mut conditions.set_conditions[set_idx],
606 world,
607 error_handler,
608 system,
609 true,
610 )
611 };
612
613 if !set_conditions_met {
614 self.skipped_systems
615 .union_with(&conditions.systems_in_sets_with_conditions[set_idx]);
616 }
617
618 should_run &= set_conditions_met;
619 self.evaluated_sets.insert(set_idx);
620 }
621
622 let system_conditions_met = unsafe {
627 evaluate_and_fold_conditions(
628 &mut conditions.system_conditions[system_index],
629 world,
630 error_handler,
631 system,
632 false,
633 )
634 };
635
636 if !system_conditions_met {
637 self.skipped_systems.insert(system_index);
638 }
639
640 should_run &= system_conditions_met;
641
642 should_run
643 }
644
645 unsafe fn spawn_system_task(&mut self, context: &Context, system_index: usize) {
651 let system = &mut unsafe { &mut *context.environment.systems[system_index].get() }.system;
653 let context = *context;
655
656 let system_meta = &self.system_task_metadata[system_index];
657
658 let task = async move {
659 let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
660 unsafe {
665 if let Err(RunSystemError::Failed(err)) =
666 __rust_begin_short_backtrace::run_unsafe(
667 system,
668 context.environment.world_cell,
669 )
670 {
671 (context.error_handler)(
672 err,
673 ErrorContext::System {
674 name: system.name(),
675 last_run: system.get_last_run(),
676 },
677 );
678 }
679 };
680 }));
681 context.system_completed(system_index, res, system);
682 };
683
684 if system_meta.is_send {
685 context.scope.spawn(task);
686 } else {
687 self.local_thread_running = true;
688 context.scope.spawn_on_external(task);
689 }
690 }
691
692 unsafe fn spawn_exclusive_system_task(&mut self, context: &Context, system_index: usize) {
695 let system = &mut unsafe { &mut *context.environment.systems[system_index].get() }.system;
697 let context = *context;
699
700 if is_apply_deferred(&**system) {
701 let unapplied_systems = self.unapplied_systems.clone();
703 self.unapplied_systems.clear();
704 let task = async move {
705 let world = unsafe { context.environment.world_cell.world_mut() };
708 let res = apply_deferred(&unapplied_systems, context.environment.systems, world);
709 context.system_completed(system_index, res, system);
710 };
711
712 context.scope.spawn_on_scope(task);
713 } else {
714 let task = async move {
715 let world = unsafe { context.environment.world_cell.world_mut() };
718 let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
719 if let Err(RunSystemError::Failed(err)) =
720 __rust_begin_short_backtrace::run(system, world)
721 {
722 (context.error_handler)(
723 err,
724 ErrorContext::System {
725 name: system.name(),
726 last_run: system.get_last_run(),
727 },
728 );
729 }
730 }));
731 context.system_completed(system_index, res, system);
732 };
733
734 context.scope.spawn_on_scope(task);
735 }
736
737 self.exclusive_running = true;
738 self.local_thread_running = true;
739 }
740
741 fn finish_system_and_handle_dependents(&mut self, result: SystemResult) {
742 let SystemResult { system_index, .. } = result;
743
744 if self.system_task_metadata[system_index].is_exclusive {
745 self.exclusive_running = false;
746 }
747
748 if !self.system_task_metadata[system_index].is_send {
749 self.local_thread_running = false;
750 }
751
752 debug_assert!(self.num_running_systems >= 1);
753 self.num_running_systems -= 1;
754 self.running_systems.remove(system_index);
755 self.completed_systems.insert(system_index);
756 self.unapplied_systems.insert(system_index);
757
758 self.signal_dependents(system_index);
759 }
760
761 fn skip_system_and_signal_dependents(&mut self, system_index: usize) {
762 self.completed_systems.insert(system_index);
763 self.signal_dependents(system_index);
764 }
765
766 fn signal_dependents(&mut self, system_index: usize) {
767 for &dep_idx in &self.system_task_metadata[system_index].dependents {
768 let remaining = &mut self.num_dependencies_remaining[dep_idx];
769 debug_assert!(*remaining >= 1);
770 *remaining -= 1;
771 if *remaining == 0 && !self.completed_systems.contains(dep_idx) {
772 self.ready_systems.insert(dep_idx);
773 }
774 }
775 }
776}
777
778fn apply_deferred(
779 unapplied_systems: &FixedBitSet,
780 systems: &[SyncUnsafeCell<SystemWithAccess>],
781 world: &mut World,
782) -> Result<(), Box<dyn Any + Send>> {
783 for system_index in unapplied_systems.ones() {
784 let system = &mut unsafe { &mut *systems[system_index].get() }.system;
786 let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
787 system.apply_deferred(world);
788 }));
789 if let Err(payload) = res {
790 #[cfg(feature = "std")]
791 #[expect(clippy::print_stderr, reason = "Allowed behind `std` feature gate.")]
792 {
793 eprintln!(
794 "Encountered a panic when applying buffers for system `{}`!",
795 system.name()
796 );
797 }
798 return Err(payload);
799 }
800 }
801 Ok(())
802}
803
804unsafe fn evaluate_and_fold_conditions(
808 conditions: &mut [ConditionWithAccess],
809 world: UnsafeWorldCell,
810 error_handler: ErrorHandler,
811 for_system: &ScheduleSystem,
812 on_set: bool,
813) -> bool {
814 #[expect(
815 clippy::unnecessary_fold,
816 reason = "Short-circuiting here would prevent conditions from mutating their own state as needed."
817 )]
818 conditions
819 .iter_mut()
820 .map(|ConditionWithAccess { condition, .. }| {
821 unsafe { __rust_begin_short_backtrace::readonly_run_unsafe(&mut **condition, world) }
825 .unwrap_or_else(|err| {
826 if let RunSystemError::Failed(err) = err {
827 error_handler(
828 err,
829 ErrorContext::RunCondition {
830 name: condition.name(),
831 last_run: condition.get_last_run(),
832 system: for_system.name(),
833 on_set,
834 },
835 );
836 };
837 false
838 })
839 })
840 .fold(true, |acc, res| acc && res)
841}
842
843#[derive(Resource, Clone)]
845pub struct MainThreadExecutor(pub Arc<ThreadExecutor<'static>>);
846
847impl Default for MainThreadExecutor {
848 fn default() -> Self {
849 Self::new()
850 }
851}
852
853impl MainThreadExecutor {
854 pub fn new() -> Self {
856 MainThreadExecutor(TaskPool::get_thread_executor())
857 }
858}
859
860#[cfg(test)]
861mod tests {
862 use crate::{
863 prelude::Resource,
864 schedule::{IntoScheduleConfigs, MultiThreadedExecutor, Schedule},
865 system::Commands,
866 world::World,
867 };
868
869 #[derive(Resource)]
870 struct R;
871
872 #[test]
873 fn skipped_systems_notify_dependents() {
874 let mut world = World::new();
875 let mut schedule = Schedule::default();
876 schedule.set_executor(MultiThreadedExecutor::new());
877 schedule.add_systems(
878 (
879 (|| {}).run_if(|| false),
880 |mut commands: Commands| {
882 commands.insert_resource(R);
883 },
884 )
885 .chain(),
886 );
887 schedule.run(&mut world);
888 assert!(world.get_resource::<R>().is_some());
889 }
890
891 #[test]
895 fn check_spawn_exclusive_system_task_miri() {
896 let mut world = World::new();
897 let mut schedule = Schedule::default();
898 schedule.set_executor(MultiThreadedExecutor::new());
899 schedule.add_systems(((|_: Commands| {}), |_: Commands| {}).chain());
900 schedule.run(&mut world);
901 }
902}