Skip to main content

bevy_ecs/schedule/executor/
multi_threaded.rs

1use alloc::{boxed::Box, vec::Vec};
2use bevy_platform::cell::SyncUnsafeCell;
3use bevy_platform::sync::Arc;
4use bevy_tasks::{ComputeTaskPool, Scope, TaskPool, ThreadExecutor};
5use concurrent_queue::ConcurrentQueue;
6use core::{any::Any, panic::AssertUnwindSafe};
7use fixedbitset::FixedBitSet;
8#[cfg(feature = "std")]
9use std::eprintln;
10use std::sync::{Mutex, MutexGuard};
11
12#[cfg(feature = "trace")]
13use tracing::{info_span, Span};
14
15use crate::{
16    error::{ErrorContext, ErrorHandler, Result},
17    prelude::Resource,
18    schedule::{
19        is_apply_deferred, ConditionWithAccess, SystemExecutor, SystemSchedule, SystemWithAccess,
20    },
21    system::{RunSystemError, ScheduleSystem},
22    world::{unsafe_world_cell::UnsafeWorldCell, World},
23};
24#[cfg(feature = "hotpatching")]
25use crate::{prelude::DetectChanges, HotPatchChanges};
26
27use super::__rust_begin_short_backtrace;
28
29/// Borrowed data used by the [`MultiThreadedExecutor`].
30struct Environment<'env, 'sys> {
31    executor: &'env MultiThreadedExecutor,
32    systems: &'sys [SyncUnsafeCell<SystemWithAccess>],
33    conditions: SyncUnsafeCell<Conditions<'sys>>,
34    world_cell: UnsafeWorldCell<'env>,
35}
36
37struct Conditions<'a> {
38    system_conditions: &'a mut [Vec<ConditionWithAccess>],
39    set_conditions: &'a mut [Vec<ConditionWithAccess>],
40    sets_with_conditions_of_systems: &'a [FixedBitSet],
41    systems_in_sets_with_conditions: &'a [FixedBitSet],
42}
43
44impl<'env, 'sys> Environment<'env, 'sys> {
45    fn new(
46        executor: &'env MultiThreadedExecutor,
47        schedule: &'sys mut SystemSchedule,
48        world: &'env mut World,
49    ) -> Self {
50        Environment {
51            executor,
52            systems: SyncUnsafeCell::from_mut(schedule.systems.as_mut_slice()).as_slice_of_cells(),
53            conditions: SyncUnsafeCell::new(Conditions {
54                system_conditions: &mut schedule.system_conditions,
55                set_conditions: &mut schedule.set_conditions,
56                sets_with_conditions_of_systems: &schedule.sets_with_conditions_of_systems,
57                systems_in_sets_with_conditions: &schedule.systems_in_sets_with_conditions,
58            }),
59            world_cell: world.as_unsafe_world_cell(),
60        }
61    }
62}
63
64/// Per-system data used by the [`MultiThreadedExecutor`].
65// Copied here because it can't be read from the system when it's running.
66struct SystemTaskMetadata {
67    /// The set of systems whose `component_access_set()` conflicts with this one.
68    conflicting_systems: FixedBitSet,
69    /// The set of systems whose `component_access_set()` conflicts with this system's conditions.
70    /// Note that this is separate from `conflicting_systems` to handle the case where
71    /// a system is skipped by an earlier system set condition or system stepping,
72    /// and needs access to run its conditions but not for itself.
73    condition_conflicting_systems: FixedBitSet,
74    /// Indices of the systems that directly depend on the system.
75    dependents: Vec<usize>,
76    /// Is `true` if the system does not access `!Send` data.
77    is_send: bool,
78    /// Is `true` if the system is exclusive.
79    is_exclusive: bool,
80}
81
82/// The result of running a system that is sent across a channel.
83struct SystemResult {
84    system_index: usize,
85}
86
87/// Runs the schedule using a thread pool. Non-conflicting systems can run in parallel.
88pub struct MultiThreadedExecutor {
89    /// The running state, protected by a mutex so that a reference to the executor can be shared across tasks.
90    state: Mutex<ExecutorState>,
91    /// Queue of system completion events.
92    system_completion: ConcurrentQueue<SystemResult>,
93    /// Setting when true applies deferred system buffers after all systems have run
94    apply_final_deferred: bool,
95    /// When set, tells the executor that a thread has panicked.
96    panic_payload: Mutex<Option<Box<dyn Any + Send>>>,
97    starting_systems: FixedBitSet,
98    /// Cached tracing span
99    #[cfg(feature = "trace")]
100    executor_span: Span,
101}
102
103/// The state of the executor while running.
104pub struct ExecutorState {
105    /// Metadata for scheduling and running system tasks.
106    system_task_metadata: Vec<SystemTaskMetadata>,
107    /// The set of systems whose `component_access_set()` conflicts with this system set's conditions.
108    set_condition_conflicting_systems: Vec<FixedBitSet>,
109    /// Returns `true` if a system with non-`Send` access is running.
110    local_thread_running: bool,
111    /// Returns `true` if an exclusive system is running.
112    exclusive_running: bool,
113    /// The number of systems that are running.
114    num_running_systems: usize,
115    /// The number of dependencies each system has that have not completed.
116    num_dependencies_remaining: Vec<usize>,
117    /// System sets whose conditions have been evaluated.
118    evaluated_sets: FixedBitSet,
119    /// Systems that have no remaining dependencies and are waiting to run.
120    ready_systems: FixedBitSet,
121    /// copy of `ready_systems`
122    ready_systems_copy: FixedBitSet,
123    /// Systems that are running.
124    running_systems: FixedBitSet,
125    /// Systems that got skipped.
126    skipped_systems: FixedBitSet,
127    /// Systems whose conditions have been evaluated and were run or skipped.
128    completed_systems: FixedBitSet,
129    /// Systems that have run but have not had their buffers applied.
130    unapplied_systems: FixedBitSet,
131}
132
133/// References to data required by the executor.
134/// This is copied to each system task so that can invoke the executor when they complete.
135// These all need to outlive 'scope in order to be sent to new tasks,
136// and keeping them all in a struct means we can use lifetime elision.
137#[derive(Copy, Clone)]
138struct Context<'scope, 'env, 'sys> {
139    environment: &'env Environment<'env, 'sys>,
140    scope: &'scope Scope<'scope, 'env, ()>,
141    error_handler: ErrorHandler,
142}
143
144impl Default for MultiThreadedExecutor {
145    fn default() -> Self {
146        Self::new()
147    }
148}
149
150impl SystemExecutor for MultiThreadedExecutor {
151    fn init(&mut self, schedule: &SystemSchedule) {
152        let state = self.state.get_mut().unwrap();
153        // pre-allocate space
154        let sys_count = schedule.system_ids.len();
155        let set_count = schedule.set_ids.len();
156
157        self.system_completion = ConcurrentQueue::bounded(sys_count.max(1));
158        self.starting_systems = FixedBitSet::with_capacity(sys_count);
159        state.evaluated_sets = FixedBitSet::with_capacity(set_count);
160        state.ready_systems = FixedBitSet::with_capacity(sys_count);
161        state.ready_systems_copy = FixedBitSet::with_capacity(sys_count);
162        state.running_systems = FixedBitSet::with_capacity(sys_count);
163        state.completed_systems = FixedBitSet::with_capacity(sys_count);
164        state.skipped_systems = FixedBitSet::with_capacity(sys_count);
165        state.unapplied_systems = FixedBitSet::with_capacity(sys_count);
166
167        state.system_task_metadata = Vec::with_capacity(sys_count);
168        for index in 0..sys_count {
169            state.system_task_metadata.push(SystemTaskMetadata {
170                conflicting_systems: FixedBitSet::with_capacity(sys_count),
171                condition_conflicting_systems: FixedBitSet::with_capacity(sys_count),
172                dependents: schedule.system_dependents[index].clone(),
173                is_send: schedule.systems[index].system.is_send(),
174                is_exclusive: schedule.systems[index].system.is_exclusive(),
175            });
176            if schedule.system_dependencies[index] == 0 {
177                self.starting_systems.insert(index);
178            }
179        }
180
181        {
182            #[cfg(feature = "trace")]
183            let _span = info_span!("calculate conflicting systems").entered();
184            for index1 in 0..sys_count {
185                let system1 = &schedule.systems[index1];
186                for index2 in 0..index1 {
187                    let system2 = &schedule.systems[index2];
188                    if !system2.access.is_compatible(&system1.access) {
189                        state.system_task_metadata[index1]
190                            .conflicting_systems
191                            .insert(index2);
192                        state.system_task_metadata[index2]
193                            .conflicting_systems
194                            .insert(index1);
195                    }
196                }
197
198                for index2 in 0..sys_count {
199                    let system2 = &schedule.systems[index2];
200                    if schedule.system_conditions[index1]
201                        .iter()
202                        .any(|condition| !system2.access.is_compatible(&condition.access))
203                    {
204                        state.system_task_metadata[index1]
205                            .condition_conflicting_systems
206                            .insert(index2);
207                    }
208                }
209            }
210
211            state.set_condition_conflicting_systems.clear();
212            state.set_condition_conflicting_systems.reserve(set_count);
213            for set_idx in 0..set_count {
214                let mut conflicting_systems = FixedBitSet::with_capacity(sys_count);
215                for sys_index in 0..sys_count {
216                    let system = &schedule.systems[sys_index];
217                    if schedule.set_conditions[set_idx]
218                        .iter()
219                        .any(|condition| !system.access.is_compatible(&condition.access))
220                    {
221                        conflicting_systems.insert(sys_index);
222                    }
223                }
224                state
225                    .set_condition_conflicting_systems
226                    .push(conflicting_systems);
227            }
228        }
229
230        state.num_dependencies_remaining = Vec::with_capacity(sys_count);
231    }
232
233    fn run(
234        &mut self,
235        schedule: &mut SystemSchedule,
236        world: &mut World,
237        _skip_systems: Option<&FixedBitSet>,
238        error_handler: ErrorHandler,
239    ) {
240        let state = self.state.get_mut().unwrap();
241        // reset counts
242        if schedule.systems.is_empty() {
243            return;
244        }
245        state.num_running_systems = 0;
246        state
247            .num_dependencies_remaining
248            .clone_from(&schedule.system_dependencies);
249        state.ready_systems.clone_from(&self.starting_systems);
250
251        // If stepping is enabled, make sure we skip those systems that should
252        // not be run.
253        #[cfg(feature = "bevy_debug_stepping")]
254        if let Some(skipped_systems) = _skip_systems {
255            debug_assert_eq!(skipped_systems.len(), state.completed_systems.len());
256            // mark skipped systems as completed
257            state.completed_systems |= skipped_systems;
258
259            // signal the dependencies for each of the skipped systems, as
260            // though they had run
261            for system_index in skipped_systems.ones() {
262                state.signal_dependents(system_index);
263                state.ready_systems.remove(system_index);
264            }
265        }
266
267        let thread_executor = world
268            .get_resource::<MainThreadExecutor>()
269            .map(|e| e.0.clone());
270        let thread_executor = thread_executor.as_deref();
271
272        let environment = &Environment::new(self, schedule, world);
273
274        ComputeTaskPool::get_or_init(TaskPool::default).scope_with_executor(
275            false,
276            thread_executor,
277            |scope| {
278                let context = Context {
279                    environment,
280                    scope,
281                    error_handler,
282                };
283
284                // The first tick won't need to process finished systems, but we still need to run the loop in
285                // tick_executor() in case a system completes while the first tick still holds the mutex.
286                context.tick_executor();
287            },
288        );
289
290        // End the borrows of self and world in environment by copying out the reference to systems.
291        let systems = environment.systems;
292
293        let state = self.state.get_mut().unwrap();
294        if self.apply_final_deferred {
295            // Do one final apply buffers after all systems have completed
296            // Commands should be applied while on the scope's thread, not the executor's thread
297            let res = apply_deferred(&state.unapplied_systems, systems, world);
298            if let Err(payload) = res {
299                let panic_payload = self.panic_payload.get_mut().unwrap();
300                *panic_payload = Some(payload);
301            }
302            state.unapplied_systems.clear();
303        }
304
305        // check to see if there was a panic
306        let payload = self.panic_payload.get_mut().unwrap();
307        if let Some(payload) = payload.take() {
308            std::panic::resume_unwind(payload);
309        }
310
311        debug_assert!(state.ready_systems.is_clear());
312        debug_assert!(state.running_systems.is_clear());
313        state.evaluated_sets.clear();
314        state.skipped_systems.clear();
315        state.completed_systems.clear();
316    }
317
318    fn set_apply_final_deferred(&mut self, value: bool) {
319        self.apply_final_deferred = value;
320    }
321}
322
323impl<'scope, 'env: 'scope, 'sys> Context<'scope, 'env, 'sys> {
324    fn system_completed(
325        &self,
326        system_index: usize,
327        res: Result<(), Box<dyn Any + Send>>,
328        system: &ScheduleSystem,
329    ) {
330        // tell the executor that the system finished
331        self.environment
332            .executor
333            .system_completion
334            .push(SystemResult { system_index })
335            .unwrap_or_else(|error| unreachable!("{}", error));
336        if let Err(payload) = res {
337            #[cfg(feature = "std")]
338            #[expect(clippy::print_stderr, reason = "Allowed behind `std` feature gate.")]
339            {
340                eprintln!("Encountered a panic in system `{}`!", system.name());
341            }
342            // set the payload to propagate the error
343            {
344                let mut panic_payload = self.environment.executor.panic_payload.lock().unwrap();
345                *panic_payload = Some(payload);
346            }
347        }
348        self.tick_executor();
349    }
350
351    #[expect(
352        clippy::mut_from_ref,
353        reason = "Field is only accessed here and is guarded by lock with a documented safety comment"
354    )]
355    fn try_lock<'a>(&'a self) -> Option<(&'a mut Conditions<'sys>, MutexGuard<'a, ExecutorState>)> {
356        let guard = self.environment.executor.state.try_lock().ok()?;
357        // SAFETY: This is an exclusive access as no other location fetches conditions mutably, and
358        // is synchronized by the lock on the executor state.
359        let conditions = unsafe { &mut *self.environment.conditions.get() };
360        Some((conditions, guard))
361    }
362
363    fn tick_executor(&self) {
364        // Ensure that the executor handles any events pushed to the system_completion queue by this thread.
365        // If this thread acquires the lock, the executor runs after the push() and they are processed.
366        // If this thread does not acquire the lock, then the is_empty() check on the other thread runs
367        // after the lock is released, which is after try_lock() failed, which is after the push()
368        // on this thread, so the is_empty() check will see the new events and loop.
369        loop {
370            let Some((conditions, mut guard)) = self.try_lock() else {
371                return;
372            };
373            guard.tick(self, conditions);
374            // Make sure we drop the guard before checking system_completion.is_empty(), or we could lose events.
375            drop(guard);
376            if self.environment.executor.system_completion.is_empty() {
377                return;
378            }
379        }
380    }
381}
382
383impl MultiThreadedExecutor {
384    /// Creates a new `multi_threaded` executor for use with a [`Schedule`].
385    ///
386    /// [`Schedule`]: crate::schedule::Schedule
387    pub fn new() -> Self {
388        Self {
389            state: Mutex::new(ExecutorState::new()),
390            system_completion: ConcurrentQueue::unbounded(),
391            starting_systems: FixedBitSet::new(),
392            apply_final_deferred: true,
393            panic_payload: Mutex::new(None),
394            #[cfg(feature = "trace")]
395            executor_span: info_span!("multithreaded executor"),
396        }
397    }
398}
399
400impl ExecutorState {
401    fn new() -> Self {
402        Self {
403            system_task_metadata: Vec::new(),
404            set_condition_conflicting_systems: Vec::new(),
405            num_running_systems: 0,
406            num_dependencies_remaining: Vec::new(),
407            local_thread_running: false,
408            exclusive_running: false,
409            evaluated_sets: FixedBitSet::new(),
410            ready_systems: FixedBitSet::new(),
411            ready_systems_copy: FixedBitSet::new(),
412            running_systems: FixedBitSet::new(),
413            skipped_systems: FixedBitSet::new(),
414            completed_systems: FixedBitSet::new(),
415            unapplied_systems: FixedBitSet::new(),
416        }
417    }
418
419    fn tick(&mut self, context: &Context, conditions: &mut Conditions) {
420        #[cfg(feature = "trace")]
421        let _span = context.environment.executor.executor_span.enter();
422
423        for result in context.environment.executor.system_completion.try_iter() {
424            self.finish_system_and_handle_dependents(result);
425        }
426
427        // SAFETY:
428        // - `finish_system_and_handle_dependents` has updated the currently running systems.
429        // - `rebuild_active_access` locks access for all currently running systems.
430        unsafe {
431            self.spawn_system_tasks(context, conditions);
432        }
433    }
434
435    /// # Safety
436    /// - Caller must ensure that `self.ready_systems` does not contain any systems that
437    ///   have been mutably borrowed (such as the systems currently running).
438    /// - `world_cell` must have permission to access all world data (not counting
439    ///   any world data that is claimed by systems currently running on this executor).
440    unsafe fn spawn_system_tasks(&mut self, context: &Context, conditions: &mut Conditions) {
441        if self.exclusive_running {
442            return;
443        }
444
445        #[cfg(feature = "hotpatching")]
446        #[expect(
447            clippy::undocumented_unsafe_blocks,
448            reason = "This actually could result in UB if a system tries to mutate
449            `HotPatchChanges`. We allow this as the resource only exists with the `hotpatching` feature.
450            and `hotpatching` should never be enabled in release."
451        )]
452        #[cfg(feature = "hotpatching")]
453        let hotpatch_tick = unsafe {
454            context
455                .environment
456                .world_cell
457                .get_resource_ref::<HotPatchChanges>()
458        }
459        .map(|r| r.last_changed())
460        .unwrap_or_default();
461
462        // can't borrow since loop mutably borrows `self`
463        let mut ready_systems = core::mem::take(&mut self.ready_systems_copy);
464
465        // Skipping systems may cause their dependents to become ready immediately.
466        // If that happens, we need to run again immediately or we may fail to spawn those dependents.
467        let mut check_for_new_ready_systems = true;
468        while check_for_new_ready_systems {
469            check_for_new_ready_systems = false;
470
471            ready_systems.clone_from(&self.ready_systems);
472
473            for system_index in ready_systems.ones() {
474                debug_assert!(!self.running_systems.contains(system_index));
475                // SAFETY: Caller assured that these systems are not running.
476                // Therefore, no other reference to this system exists and there is no aliasing.
477                let system =
478                    &mut unsafe { &mut *context.environment.systems[system_index].get() }.system;
479
480                #[cfg(feature = "hotpatching")]
481                if hotpatch_tick.is_newer_than(
482                    system.get_last_run(),
483                    context.environment.world_cell.change_tick(),
484                ) {
485                    system.refresh_hotpatch();
486                }
487
488                if !self.can_run(system_index, conditions) {
489                    // NOTE: exclusive systems with ambiguities are susceptible to
490                    // being significantly displaced here (compared to single-threaded order)
491                    // if systems after them in topological order can run
492                    // if that becomes an issue, `break;` if exclusive system
493                    continue;
494                }
495
496                self.ready_systems.remove(system_index);
497
498                // SAFETY: `can_run` returned true, which means that:
499                // - There can be no systems running whose accesses would conflict with any conditions.
500                if unsafe {
501                    !self.should_run(
502                        system_index,
503                        system,
504                        conditions,
505                        context.environment.world_cell,
506                        context.error_handler,
507                    )
508                } {
509                    self.skip_system_and_signal_dependents(system_index);
510                    // signal_dependents may have set more systems to ready.
511                    check_for_new_ready_systems = true;
512                    continue;
513                }
514
515                self.running_systems.insert(system_index);
516                self.num_running_systems += 1;
517
518                if self.system_task_metadata[system_index].is_exclusive {
519                    // SAFETY: `can_run` returned true for this system,
520                    // which means no systems are currently borrowed.
521                    unsafe {
522                        self.spawn_exclusive_system_task(context, system_index);
523                    }
524                    check_for_new_ready_systems = false;
525                    break;
526                }
527
528                // SAFETY:
529                // - Caller ensured no other reference to this system exists.
530                // - `system_task_metadata[system_index].is_exclusive` is `false`,
531                //   so `System::is_exclusive` returned `false` when we called it.
532                // - `can_run` returned true, so no systems with conflicting world access are running.
533                unsafe {
534                    self.spawn_system_task(context, system_index);
535                }
536            }
537        }
538
539        // give back
540        self.ready_systems_copy = ready_systems;
541    }
542
543    fn can_run(&mut self, system_index: usize, conditions: &mut Conditions) -> bool {
544        let system_meta = &self.system_task_metadata[system_index];
545        if system_meta.is_exclusive && self.num_running_systems > 0 {
546            return false;
547        }
548
549        if !system_meta.is_send && self.local_thread_running {
550            return false;
551        }
552
553        // TODO: an earlier out if world's archetypes did not change
554        for set_idx in conditions.sets_with_conditions_of_systems[system_index]
555            .difference(&self.evaluated_sets)
556        {
557            if !self.set_condition_conflicting_systems[set_idx].is_disjoint(&self.running_systems) {
558                return false;
559            }
560        }
561
562        if !system_meta
563            .condition_conflicting_systems
564            .is_disjoint(&self.running_systems)
565        {
566            return false;
567        }
568
569        if !self.skipped_systems.contains(system_index)
570            && !system_meta
571                .conflicting_systems
572                .is_disjoint(&self.running_systems)
573        {
574            return false;
575        }
576
577        true
578    }
579
580    /// # Safety
581    /// * `world` must have permission to read any world data required by
582    ///   the system's conditions: this includes conditions for the system
583    ///   itself, and conditions for any of the system's sets.
584    unsafe fn should_run(
585        &mut self,
586        system_index: usize,
587        system: &mut ScheduleSystem,
588        conditions: &mut Conditions,
589        world: UnsafeWorldCell,
590        error_handler: ErrorHandler,
591    ) -> bool {
592        let mut should_run = !self.skipped_systems.contains(system_index);
593
594        for set_idx in conditions.sets_with_conditions_of_systems[system_index].ones() {
595            if self.evaluated_sets.contains(set_idx) {
596                continue;
597            }
598
599            // Evaluate the system set's conditions.
600            // SAFETY:
601            // - The caller ensures that `world` has permission to read any data
602            //   required by the conditions.
603            let set_conditions_met = unsafe {
604                evaluate_and_fold_conditions(
605                    &mut conditions.set_conditions[set_idx],
606                    world,
607                    error_handler,
608                    system,
609                    true,
610                )
611            };
612
613            if !set_conditions_met {
614                self.skipped_systems
615                    .union_with(&conditions.systems_in_sets_with_conditions[set_idx]);
616            }
617
618            should_run &= set_conditions_met;
619            self.evaluated_sets.insert(set_idx);
620        }
621
622        // Evaluate the system's conditions.
623        // SAFETY:
624        // - The caller ensures that `world` has permission to read any data
625        //   required by the conditions.
626        let system_conditions_met = unsafe {
627            evaluate_and_fold_conditions(
628                &mut conditions.system_conditions[system_index],
629                world,
630                error_handler,
631                system,
632                false,
633            )
634        };
635
636        if !system_conditions_met {
637            self.skipped_systems.insert(system_index);
638        }
639
640        should_run &= system_conditions_met;
641
642        should_run
643    }
644
645    /// # Safety
646    /// - Caller must not alias systems that are running.
647    /// - `is_exclusive` must have returned `false` for the specified system.
648    /// - `world` must have permission to access the world data
649    ///   used by the specified system.
650    unsafe fn spawn_system_task(&mut self, context: &Context, system_index: usize) {
651        // SAFETY: this system is not running, no other reference exists
652        let system = &mut unsafe { &mut *context.environment.systems[system_index].get() }.system;
653        // Move the full context object into the new future.
654        let context = *context;
655
656        let system_meta = &self.system_task_metadata[system_index];
657
658        let task = async move {
659            let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
660                // SAFETY:
661                // - The caller ensures that we have permission to
662                // access the world data used by the system.
663                // - `is_exclusive` returned false
664                unsafe {
665                    if let Err(RunSystemError::Failed(err)) =
666                        __rust_begin_short_backtrace::run_unsafe(
667                            system,
668                            context.environment.world_cell,
669                        )
670                    {
671                        (context.error_handler)(
672                            err,
673                            ErrorContext::System {
674                                name: system.name(),
675                                last_run: system.get_last_run(),
676                            },
677                        );
678                    }
679                };
680            }));
681            context.system_completed(system_index, res, system);
682        };
683
684        if system_meta.is_send {
685            context.scope.spawn(task);
686        } else {
687            self.local_thread_running = true;
688            context.scope.spawn_on_external(task);
689        }
690    }
691
692    /// # Safety
693    /// Caller must ensure no systems are currently borrowed.
694    unsafe fn spawn_exclusive_system_task(&mut self, context: &Context, system_index: usize) {
695        // SAFETY: this system is not running, no other reference exists
696        let system = &mut unsafe { &mut *context.environment.systems[system_index].get() }.system;
697        // Move the full context object into the new future.
698        let context = *context;
699
700        if is_apply_deferred(&**system) {
701            // TODO: avoid allocation
702            let unapplied_systems = self.unapplied_systems.clone();
703            self.unapplied_systems.clear();
704            let task = async move {
705                // SAFETY: `can_run` returned true for this system, which means
706                // that no other systems currently have access to the world.
707                let world = unsafe { context.environment.world_cell.world_mut() };
708                let res = apply_deferred(&unapplied_systems, context.environment.systems, world);
709                context.system_completed(system_index, res, system);
710            };
711
712            context.scope.spawn_on_scope(task);
713        } else {
714            let task = async move {
715                // SAFETY: `can_run` returned true for this system, which means
716                // that no other systems currently have access to the world.
717                let world = unsafe { context.environment.world_cell.world_mut() };
718                let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
719                    if let Err(RunSystemError::Failed(err)) =
720                        __rust_begin_short_backtrace::run(system, world)
721                    {
722                        (context.error_handler)(
723                            err,
724                            ErrorContext::System {
725                                name: system.name(),
726                                last_run: system.get_last_run(),
727                            },
728                        );
729                    }
730                }));
731                context.system_completed(system_index, res, system);
732            };
733
734            context.scope.spawn_on_scope(task);
735        }
736
737        self.exclusive_running = true;
738        self.local_thread_running = true;
739    }
740
741    fn finish_system_and_handle_dependents(&mut self, result: SystemResult) {
742        let SystemResult { system_index, .. } = result;
743
744        if self.system_task_metadata[system_index].is_exclusive {
745            self.exclusive_running = false;
746        }
747
748        if !self.system_task_metadata[system_index].is_send {
749            self.local_thread_running = false;
750        }
751
752        debug_assert!(self.num_running_systems >= 1);
753        self.num_running_systems -= 1;
754        self.running_systems.remove(system_index);
755        self.completed_systems.insert(system_index);
756        self.unapplied_systems.insert(system_index);
757
758        self.signal_dependents(system_index);
759    }
760
761    fn skip_system_and_signal_dependents(&mut self, system_index: usize) {
762        self.completed_systems.insert(system_index);
763        self.signal_dependents(system_index);
764    }
765
766    fn signal_dependents(&mut self, system_index: usize) {
767        for &dep_idx in &self.system_task_metadata[system_index].dependents {
768            let remaining = &mut self.num_dependencies_remaining[dep_idx];
769            debug_assert!(*remaining >= 1);
770            *remaining -= 1;
771            if *remaining == 0 && !self.completed_systems.contains(dep_idx) {
772                self.ready_systems.insert(dep_idx);
773            }
774        }
775    }
776}
777
778fn apply_deferred(
779    unapplied_systems: &FixedBitSet,
780    systems: &[SyncUnsafeCell<SystemWithAccess>],
781    world: &mut World,
782) -> Result<(), Box<dyn Any + Send>> {
783    for system_index in unapplied_systems.ones() {
784        // SAFETY: none of these systems are running, no other references exist
785        let system = &mut unsafe { &mut *systems[system_index].get() }.system;
786        let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
787            system.apply_deferred(world);
788        }));
789        if let Err(payload) = res {
790            #[cfg(feature = "std")]
791            #[expect(clippy::print_stderr, reason = "Allowed behind `std` feature gate.")]
792            {
793                eprintln!(
794                    "Encountered a panic when applying buffers for system `{}`!",
795                    system.name()
796                );
797            }
798            return Err(payload);
799        }
800    }
801    Ok(())
802}
803
804/// # Safety
805/// - `world` must have permission to read any world data
806///   required by `conditions`.
807unsafe fn evaluate_and_fold_conditions(
808    conditions: &mut [ConditionWithAccess],
809    world: UnsafeWorldCell,
810    error_handler: ErrorHandler,
811    for_system: &ScheduleSystem,
812    on_set: bool,
813) -> bool {
814    #[expect(
815        clippy::unnecessary_fold,
816        reason = "Short-circuiting here would prevent conditions from mutating their own state as needed."
817    )]
818    conditions
819        .iter_mut()
820        .map(|ConditionWithAccess { condition, .. }| {
821            // SAFETY:
822            // - The caller ensures that `world` has permission to read any data
823            //   required by the condition.
824            unsafe { __rust_begin_short_backtrace::readonly_run_unsafe(&mut **condition, world) }
825                .unwrap_or_else(|err| {
826                    if let RunSystemError::Failed(err) = err {
827                        error_handler(
828                            err,
829                            ErrorContext::RunCondition {
830                                name: condition.name(),
831                                last_run: condition.get_last_run(),
832                                system: for_system.name(),
833                                on_set,
834                            },
835                        );
836                    };
837                    false
838                })
839        })
840        .fold(true, |acc, res| acc && res)
841}
842
843/// New-typed [`ThreadExecutor`] [`Resource`] that is used to run systems on the main thread
844#[derive(Resource, Clone)]
845pub struct MainThreadExecutor(pub Arc<ThreadExecutor<'static>>);
846
847impl Default for MainThreadExecutor {
848    fn default() -> Self {
849        Self::new()
850    }
851}
852
853impl MainThreadExecutor {
854    /// Creates a new executor that can be used to run systems on the main thread.
855    pub fn new() -> Self {
856        MainThreadExecutor(TaskPool::get_thread_executor())
857    }
858}
859
860#[cfg(test)]
861mod tests {
862    use crate::{
863        prelude::Resource,
864        schedule::{IntoScheduleConfigs, MultiThreadedExecutor, Schedule},
865        system::Commands,
866        world::World,
867    };
868
869    #[derive(Resource)]
870    struct R;
871
872    #[test]
873    fn skipped_systems_notify_dependents() {
874        let mut world = World::new();
875        let mut schedule = Schedule::default();
876        schedule.set_executor(MultiThreadedExecutor::new());
877        schedule.add_systems(
878            (
879                (|| {}).run_if(|| false),
880                // This system depends on a system that is always skipped.
881                |mut commands: Commands| {
882                    commands.insert_resource(R);
883                },
884            )
885                .chain(),
886        );
887        schedule.run(&mut world);
888        assert!(world.get_resource::<R>().is_some());
889    }
890
891    /// Regression test for a weird bug flagged by MIRI in
892    /// `spawn_exclusive_system_task`, related to a `&mut World` being captured
893    /// inside an `async` block and somehow remaining alive even after its last use.
894    #[test]
895    fn check_spawn_exclusive_system_task_miri() {
896        let mut world = World::new();
897        let mut schedule = Schedule::default();
898        schedule.set_executor(MultiThreadedExecutor::new());
899        schedule.add_systems(((|_: Commands| {}), |_: Commands| {}).chain());
900        schedule.run(&mut world);
901    }
902}