Skip to main content

bevy_asset/processor/
mod.rs

1//! Asset processing in Bevy is a framework for automatically transforming artist-authored assets into the format that best suits the needs of your particular game.
2//!
3//! You can think of the asset processing system as a "build system" for assets.
4//! When an artist adds a new asset to the project or an asset is changed (assuming asset hot reloading is enabled), the asset processing system will automatically perform the specified processing steps on the asset.
5//! This can include things like creating lightmaps for baked lighting, compressing a `.wav` file to an `.ogg`, or generating mipmaps for a texture.
6//!
7//! Its core values are:
8//!
9//! 1. Automatic: new and changed assets should be ready to use in-game without requiring any manual conversion or cleanup steps.
10//! 2. Configurable: every game has its own needs, and a high level of transparency and control is required.
11//! 3. Lossless: the original asset should always be preserved, ensuring artists can make changes later.
12//! 4. Deterministic: performing the same processing steps on the same asset should (generally) produce the exact same result. In cases where this doesn't make sense (steps that involve a degree of randomness or uncertainty), the results across runs should be "acceptably similar", as they will be generated once for a given set of inputs and cached.
13//!
14//! Taken together, this means that the original asset plus the processing steps should be enough to regenerate the final asset.
15//! While it may be possible to manually edit the final asset, this should be discouraged.
16//! Final post-processed assets should generally not be version-controlled, except to save developer time when recomputing heavy asset processing steps.
17//!
18//! # Usage
19//!
20//! Asset processing can be enabled or disabled in [`AssetPlugin`](crate::AssetPlugin) by setting the [`AssetMode`](crate::AssetMode).\
21//! Enable Bevy's `file_watcher` feature to automatically watch for changes to assets and reprocess them.
22//!
23//! To register a new asset processor, use [`AssetProcessor::register_processor`].
24//! To set the default asset processor for a given extension, use [`AssetProcessor::set_default_processor`].
25//! In most cases, these methods will be called directly on [`App`](bevy_app::App) using the [`AssetApp`](crate::AssetApp) extension trait.
26//!
27//! If a default asset processor is set, assets with a matching extension will be processed using that processor before loading.
28//!
29//! For an end-to-end example, check out the examples in the [`examples/asset/processing`](https://github.com/bevyengine/bevy/tree/latest/examples/asset/processing) directory of the Bevy repository.
30//!
31//!  # Defining asset processors
32//!
33//! Bevy provides two different ways to define new asset processors:
34//!
35//! - [`LoadTransformAndSave`] + [`AssetTransformer`](crate::transformer::AssetTransformer): a high-level API for loading, transforming, and saving assets.
36//! - [`Process`]: a flexible low-level API for processing assets in arbitrary ways.
37//!
38//! In most cases, [`LoadTransformAndSave`] should be sufficient.
39
40mod log;
41mod process;
42
43use async_lock::RwLockReadGuardArc;
44pub use log::*;
45pub use process::*;
46
47use crate::{
48    io::{
49        AssetReaderError, AssetSource, AssetSourceBuilders, AssetSourceEvent, AssetSourceId,
50        AssetSources, AssetWriterError, ErasedAssetReader, MissingAssetSourceError,
51    },
52    meta::{
53        get_asset_hash, get_full_asset_hash, AssetAction, AssetActionMinimal, AssetHash, AssetMeta,
54        AssetMetaDyn, AssetMetaMinimal, ProcessedInfo, ProcessedInfoMinimal,
55    },
56    AssetLoadError, AssetMetaCheck, AssetPath, AssetServer, AssetServerMode, DeserializeMetaError,
57    MissingAssetLoaderForExtensionError, UnapprovedPathMode, WriteDefaultMetaError,
58};
59use alloc::{borrow::ToOwned, boxed::Box, string::String, sync::Arc, vec, vec::Vec};
60use bevy_ecs::prelude::*;
61use bevy_platform::{
62    collections::{hash_map::Entry, HashMap, HashSet},
63    sync::{PoisonError, RwLock},
64};
65use bevy_tasks::IoTaskPool;
66use futures_io::ErrorKind;
67use futures_lite::{AsyncWriteExt, StreamExt};
68use futures_util::{select_biased, FutureExt};
69use std::{
70    path::{Path, PathBuf},
71    sync::Mutex,
72};
73use thiserror::Error;
74use tracing::{debug, error, trace, warn};
75
76#[cfg(feature = "trace")]
77use {
78    alloc::string::ToString,
79    tracing::{info_span, instrument::Instrument},
80};
81
82/// A "background" asset processor that reads asset values from a source [`AssetSource`] (which corresponds to an [`AssetReader`](crate::io::AssetReader) / [`AssetWriter`](crate::io::AssetWriter) pair),
83/// processes them in some way, and writes them to a destination [`AssetSource`].
84///
85/// This will create .meta files (a human-editable serialized form of [`AssetMeta`]) in the source [`AssetSource`] for assets
86/// that can be loaded and/or processed. This enables developers to configure how each asset should be loaded and/or processed.
87///
88/// [`AssetProcessor`] can be run in the background while a Bevy App is running. Changes to assets will be automatically detected and hot-reloaded.
89///
90/// Assets will only be re-processed if they have been changed. A hash of each asset source is stored in the metadata of the processed version of the
91/// asset, which is used to determine if the asset source has actually changed.
92///
93/// A [`ProcessorTransactionLog`] is produced, which uses "write-ahead logging" to make the [`AssetProcessor`] crash and failure resistant. If a failed/unfinished
94/// transaction from a previous run is detected, the affected asset(s) will be re-processed.
95///
96/// [`AssetProcessor`] can be cloned. It is backed by an [`Arc`] so clones will share state. Clones can be freely used in parallel.
97#[derive(Resource, Clone)]
98pub struct AssetProcessor {
99    server: AssetServer,
100    pub(crate) data: Arc<AssetProcessorData>,
101}
102
103/// Internal data stored inside an [`AssetProcessor`].
104pub struct AssetProcessorData {
105    /// The state of processing.
106    pub(crate) processing_state: Arc<ProcessingState>,
107    /// The factory that creates the transaction log.
108    ///
109    /// Note: we use a regular Mutex instead of an async mutex since we expect users to only set
110    /// this once, and before the asset processor starts - there is no reason to await (and it
111    /// avoids needing to use [`block_on`](bevy_tasks::block_on) to set the factory).
112    log_factory: Mutex<Option<Box<dyn ProcessorTransactionLogFactory>>>,
113    log: async_lock::RwLock<Option<Box<dyn ProcessorTransactionLog>>>,
114    /// The processors that will be used to process assets.
115    processors: RwLock<Processors>,
116    sources: Arc<AssetSources>,
117}
118
119/// The current state of processing, including the overall state and the state of all assets.
120pub(crate) struct ProcessingState {
121    /// The overall state of processing.
122    state: async_lock::RwLock<ProcessorState>,
123    /// The channel to broadcast when the processor has completed initialization.
124    initialized_sender: async_broadcast::Sender<()>,
125    initialized_receiver: async_broadcast::Receiver<()>,
126    /// The channel to broadcast when the processor has completed processing.
127    finished_sender: async_broadcast::Sender<()>,
128    finished_receiver: async_broadcast::Receiver<()>,
129    /// The current state of the assets.
130    asset_infos: async_lock::RwLock<ProcessorAssetInfos>,
131}
132
133#[derive(Default)]
134struct Processors {
135    /// Maps the type path of the processor to its instance.
136    type_path_to_processor: HashMap<&'static str, Arc<dyn ErasedProcessor>>,
137    /// Maps the short type path of the processor to its instance.
138    short_type_path_to_processor: HashMap<&'static str, ShortTypeProcessorEntry>,
139    /// Maps the file extension of an asset to the type path of the processor we should use to
140    /// process it by default.
141    file_extension_to_default_processor: HashMap<Box<str>, &'static str>,
142}
143
144enum ShortTypeProcessorEntry {
145    /// There is a unique processor with the given short type path.
146    Unique {
147        /// The full type path of the processor.
148        type_path: &'static str,
149        /// The processor itself.
150        processor: Arc<dyn ErasedProcessor>,
151    },
152    /// There are (at least) two processors with the same short type path (storing the full type
153    /// paths of all conflicting processors). Users must fully specify the type path in order to
154    /// disambiguate.
155    Ambiguous(Vec<&'static str>),
156}
157
158impl AssetProcessor {
159    /// Creates a new [`AssetProcessor`] instance.
160    pub fn new(
161        sources: &mut AssetSourceBuilders,
162        watch_processed: bool,
163    ) -> (Self, Arc<AssetSources>) {
164        let state = Arc::new(ProcessingState::new());
165        let mut sources = sources.build_sources(true, watch_processed);
166        sources.gate_on_processor(state.clone());
167        let sources = Arc::new(sources);
168
169        let data = Arc::new(AssetProcessorData::new(sources.clone(), state));
170        // The asset processor uses its own asset server with its own id space
171        let server = AssetServer::new_with_meta_check(
172            sources.clone(),
173            AssetServerMode::Processed,
174            AssetMetaCheck::Always,
175            false,
176            UnapprovedPathMode::default(),
177        );
178        (Self { server, data }, sources)
179    }
180
181    /// Gets a reference to the [`Arc`] containing the [`AssetProcessorData`].
182    pub fn data(&self) -> &Arc<AssetProcessorData> {
183        &self.data
184    }
185
186    /// The "internal" [`AssetServer`] used by the [`AssetProcessor`]. This is _separate_ from the asset processor used by
187    /// the main App. It has different processor-specific configuration and a different ID space.
188    pub fn server(&self) -> &AssetServer {
189        &self.server
190    }
191
192    /// Retrieves the current [`ProcessorState`]
193    pub async fn get_state(&self) -> ProcessorState {
194        self.data.processing_state.get_state().await
195    }
196
197    /// Retrieves the [`AssetSource`] for this processor
198    #[inline]
199    pub fn get_source<'a>(
200        &self,
201        id: impl Into<AssetSourceId<'a>>,
202    ) -> Result<&AssetSource, MissingAssetSourceError> {
203        self.data.sources.get(id.into())
204    }
205
206    #[inline]
207    pub fn sources(&self) -> &AssetSources {
208        &self.data.sources
209    }
210
211    /// Logs an unrecoverable error. On the next run of the processor, all assets will be regenerated. This should only be used as a last resort.
212    /// Every call to this should be considered with scrutiny and ideally replaced with something more granular.
213    async fn log_unrecoverable(&self) {
214        let mut log = self.data.log.write().await;
215        let log = log.as_mut().unwrap();
216        log.unrecoverable()
217            .await
218            .map_err(|error| WriteLogError {
219                log_entry: LogEntry::UnrecoverableError,
220                error,
221            })
222            .unwrap();
223    }
224
225    /// Logs the start of an asset being processed. If this is not followed at some point in the log by a closing [`AssetProcessor::log_end_processing`],
226    /// in the next run of the processor the asset processing will be considered "incomplete" and it will be reprocessed.
227    async fn log_begin_processing(&self, path: &AssetPath<'_>) {
228        let mut log = self.data.log.write().await;
229        let log = log.as_mut().unwrap();
230        log.begin_processing(path)
231            .await
232            .map_err(|error| WriteLogError {
233                log_entry: LogEntry::BeginProcessing(path.clone_owned()),
234                error,
235            })
236            .unwrap();
237    }
238
239    /// Logs the end of an asset being successfully processed. See [`AssetProcessor::log_begin_processing`].
240    async fn log_end_processing(&self, path: &AssetPath<'_>) {
241        let mut log = self.data.log.write().await;
242        let log = log.as_mut().unwrap();
243        log.end_processing(path)
244            .await
245            .map_err(|error| WriteLogError {
246                log_entry: LogEntry::EndProcessing(path.clone_owned()),
247                error,
248            })
249            .unwrap();
250    }
251
252    /// Starts the processor in a background thread.
253    pub fn start(processor: Res<Self>) {
254        let processor = processor.clone();
255        IoTaskPool::get()
256            .spawn(async move {
257                let start_time = std::time::Instant::now();
258                debug!("Processing Assets");
259
260                processor.initialize().await.unwrap();
261
262                let (new_task_sender, new_task_receiver) = async_channel::unbounded();
263                processor
264                    .queue_initial_processing_tasks(&new_task_sender)
265                    .await;
266
267                // Once all the tasks are queued for the initial processing, start actually
268                // executing the tasks.
269                {
270                    let processor = processor.clone();
271                    let new_task_sender = new_task_sender.clone();
272                    IoTaskPool::get()
273                        .spawn(async move {
274                            processor
275                                .execute_processing_tasks(new_task_sender, new_task_receiver)
276                                .await;
277                        })
278                        .detach();
279                }
280
281                processor.data.wait_until_finished().await;
282
283                let end_time = std::time::Instant::now();
284                debug!("Processing finished in {:?}", end_time - start_time);
285
286                debug!("Listening for changes to source assets");
287                processor.spawn_source_change_event_listeners(&new_task_sender);
288            })
289            .detach();
290    }
291
292    /// Sends start task events for all assets in all processed sources into `sender`.
293    async fn queue_initial_processing_tasks(
294        &self,
295        sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
296    ) {
297        for source in self.sources().iter_processed() {
298            self.queue_processing_tasks_for_folder(source, PathBuf::from(""), sender)
299                .await
300                .unwrap();
301        }
302    }
303
304    /// Spawns listeners of change events for all asset sources which will start processor tasks in
305    /// response.
306    fn spawn_source_change_event_listeners(
307        &self,
308        sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
309    ) {
310        for source in self.data.sources.iter_processed() {
311            let Some(receiver) = source.event_receiver().cloned() else {
312                continue;
313            };
314            let source_id = source.id();
315            let processor = self.clone();
316            let sender = sender.clone();
317            IoTaskPool::get()
318                .spawn(async move {
319                    while let Ok(event) = receiver.recv().await {
320                        let Ok(source) = processor.get_source(source_id.clone()) else {
321                            return;
322                        };
323                        processor
324                            .handle_asset_source_event(source, event, &sender)
325                            .await;
326                    }
327                })
328                .detach();
329        }
330    }
331
332    /// Executes all tasks that come through `receiver`, and updates the processor's overall state
333    /// based on task starts and ends.
334    ///
335    /// This future does not terminate until the channel is closed (not when the channel is empty).
336    /// This means that in [`AssetProcessor::start`], this execution will continue even after all
337    /// the initial tasks are processed.
338    async fn execute_processing_tasks(
339        &self,
340        new_task_sender: async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
341        new_task_receiver: async_channel::Receiver<(AssetSourceId<'static>, PathBuf)>,
342    ) {
343        // Convert the Sender into a WeakSender so that once all task producers terminate (and drop
344        // their sender), this task doesn't keep itself alive. We still however need a way to get
345        // the sender since processing tasks can start the tasks of dependent assets.
346        let new_task_sender = {
347            let weak_sender = new_task_sender.downgrade();
348            drop(new_task_sender);
349            weak_sender
350        };
351
352        // If there aren't any tasks in the channel the first time around, we should immediately go
353        // to the finished state (otherwise we'd be sitting around stuck in the `Initialized`
354        // state).
355        if new_task_receiver.is_empty() {
356            self.data
357                .processing_state
358                .set_state(ProcessorState::Finished)
359                .await;
360        }
361        enum ProcessorTaskEvent {
362            Start(AssetSourceId<'static>, PathBuf),
363            Finished,
364        }
365        let (task_finished_sender, task_finished_receiver) = async_channel::unbounded::<()>();
366
367        let mut pending_tasks = 0;
368        while let Ok(event) = {
369            // It's ok to use `select_biased` since we prefer to start task rather than finish tasks
370            // anyway - since otherwise we might mark the processor as finished before all queued
371            // tasks are done. `select_biased` also doesn't depend on `std` which is nice!
372            select_biased! {
373                result = new_task_receiver.recv().fuse() => {
374                    result.map(|(source_id, path)| ProcessorTaskEvent::Start(source_id, path))
375                },
376                result = task_finished_receiver.recv().fuse() => {
377                    result.map(|()| ProcessorTaskEvent::Finished)
378                }
379            }
380        } {
381            match event {
382                ProcessorTaskEvent::Start(source_id, path) => {
383                    let Some(new_task_sender) = new_task_sender.upgrade() else {
384                        // If we can't upgrade the task sender, that means all sources of tasks
385                        // (like the source event listeners) have been dropped. That means that the
386                        // sources are no longer in the app, so reading/writing to them will
387                        // probably not work, so ignoring the task is fine. This also likely means
388                        // that the whole app is being dropped, so we can recover on the next
389                        // initialization.
390                        continue;
391                    };
392                    let processor = self.clone();
393                    let task_finished_sender = task_finished_sender.clone();
394                    pending_tasks += 1;
395                    IoTaskPool::get()
396                        .spawn(async move {
397                            let Ok(source) = processor.get_source(source_id) else {
398                                return;
399                            };
400                            processor.process_asset(source, path, new_task_sender).await;
401                            // If the channel gets closed, that's ok. Just ignore it.
402                            let _ = task_finished_sender.send(()).await;
403                        })
404                        .detach();
405                    self.data
406                        .processing_state
407                        .set_state(ProcessorState::Processing)
408                        .await;
409                }
410                ProcessorTaskEvent::Finished => {
411                    pending_tasks -= 1;
412                    if pending_tasks == 0 {
413                        // clean up metadata in asset server
414                        self.server.write_infos().consume_handle_drop_events();
415                        self.data
416                            .processing_state
417                            .set_state(ProcessorState::Finished)
418                            .await;
419                    }
420                }
421            }
422        }
423    }
424
425    /// Writes the default meta file for the provided `path`.
426    ///
427    /// This function generates the appropriate meta file to process `path` with the default
428    /// processor. If there is no default processor, it falls back to the default loader.
429    ///
430    /// Note if there is already a meta file for `path`, this function returns
431    /// `Err(WriteDefaultMetaError::MetaAlreadyExists)`.
432    pub async fn write_default_meta_file_for_path(
433        &self,
434        path: impl Into<AssetPath<'_>>,
435    ) -> Result<(), WriteDefaultMetaError> {
436        let path = path.into();
437        let Some(processor) = path
438            .get_full_extension()
439            .and_then(|extension| self.get_default_processor(extension))
440        else {
441            return self
442                .server
443                .write_default_loader_meta_file_for_path(path)
444                .await;
445        };
446
447        let short_type_path = processor.short_type_path();
448        // Try to get the processor using the short type - if it fails, that must mean that the
449        // short type path is insufficient, so we'll have to fall back to the long path.
450        let processor_path_kind = if self.get_processor(short_type_path).is_ok() {
451            MetaTypePathKind::Short
452        } else {
453            MetaTypePathKind::Long
454        };
455
456        let meta = processor.default_meta(processor_path_kind);
457        let serialized_meta = meta.serialize();
458
459        let source = self.get_source(path.source())?;
460
461        // Note: we get the reader rather than the processed reader, since we want to write the meta
462        // file for the unprocessed version of that asset (so it will be processed by the default
463        // processor).
464        let reader = source.reader();
465        match reader.read_meta_bytes(path.path()).await {
466            Ok(_) => return Err(WriteDefaultMetaError::MetaAlreadyExists),
467            Err(AssetReaderError::NotFound(_)) => {
468                // The meta file couldn't be found so just fall through.
469            }
470            Err(AssetReaderError::Io(err)) => {
471                return Err(WriteDefaultMetaError::IoErrorFromExistingMetaCheck(err))
472            }
473            Err(AssetReaderError::HttpError(err)) => {
474                return Err(WriteDefaultMetaError::HttpErrorFromExistingMetaCheck(err))
475            }
476        }
477
478        let writer = source.writer()?;
479        writer
480            .write_meta_bytes(path.path(), &serialized_meta)
481            .await?;
482
483        Ok(())
484    }
485
486    async fn handle_asset_source_event(
487        &self,
488        source: &AssetSource,
489        event: AssetSourceEvent,
490        new_task_sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
491    ) {
492        trace!("{event:?}");
493        match event {
494            AssetSourceEvent::AddedAsset(path)
495            | AssetSourceEvent::AddedMeta(path)
496            | AssetSourceEvent::ModifiedAsset(path)
497            | AssetSourceEvent::ModifiedMeta(path) => {
498                let _ = new_task_sender.send((source.id(), path)).await;
499            }
500            AssetSourceEvent::RemovedAsset(path) => {
501                self.handle_removed_asset(source, path).await;
502            }
503            AssetSourceEvent::RemovedMeta(path) => {
504                self.handle_removed_meta(source, path, new_task_sender)
505                    .await;
506            }
507            AssetSourceEvent::AddedFolder(path) => {
508                self.handle_added_folder(source, path, new_task_sender)
509                    .await;
510            }
511            // NOTE: As a heads up for future devs: this event shouldn't be run in parallel with other events that might
512            // touch this folder (ex: the folder might be re-created with new assets). Clean up the old state first.
513            // Currently this event handler is not parallel, but it could be (and likely should be) in the future.
514            AssetSourceEvent::RemovedFolder(path) => {
515                self.handle_removed_folder(source, &path).await;
516            }
517            AssetSourceEvent::RenamedAsset { old, new } => {
518                // If there was a rename event, but the path hasn't changed, this asset might need reprocessing.
519                // Sometimes this event is returned when an asset is moved "back" into the asset folder
520                if old == new {
521                    let _ = new_task_sender.send((source.id(), new)).await;
522                } else {
523                    self.handle_renamed_asset(source, old, new, new_task_sender)
524                        .await;
525                }
526            }
527            AssetSourceEvent::RenamedMeta { old, new } => {
528                // If there was a rename event, but the path hasn't changed, this asset meta might need reprocessing.
529                // Sometimes this event is returned when an asset meta is moved "back" into the asset folder
530                if old == new {
531                    let _ = new_task_sender.send((source.id(), new)).await;
532                } else {
533                    debug!("Meta renamed from {old:?} to {new:?}");
534                    // Renaming meta should not assume that an asset has also been renamed. Check both old and new assets to see
535                    // if they should be re-imported (and/or have new meta generated)
536                    let _ = new_task_sender.send((source.id(), old)).await;
537                    let _ = new_task_sender.send((source.id(), new)).await;
538                }
539            }
540            AssetSourceEvent::RenamedFolder { old, new } => {
541                // If there was a rename event, but the path hasn't changed, this asset folder might need reprocessing.
542                // Sometimes this event is returned when an asset meta is moved "back" into the asset folder
543                if old == new {
544                    self.handle_added_folder(source, new, new_task_sender).await;
545                } else {
546                    // PERF: this reprocesses everything in the moved folder. this is not necessary in most cases, but
547                    // requires some nuance when it comes to path handling.
548                    self.handle_removed_folder(source, &old).await;
549                    self.handle_added_folder(source, new, new_task_sender).await;
550                }
551            }
552            AssetSourceEvent::RemovedUnknown { path, is_meta } => {
553                let processed_reader = source.ungated_processed_reader().unwrap();
554                match processed_reader.is_directory(&path).await {
555                    Ok(is_directory) => {
556                        if is_directory {
557                            self.handle_removed_folder(source, &path).await;
558                        } else if is_meta {
559                            self.handle_removed_meta(source, path, new_task_sender)
560                                .await;
561                        } else {
562                            self.handle_removed_asset(source, path).await;
563                        }
564                    }
565                    Err(err) => {
566                        match err {
567                            AssetReaderError::NotFound(_) => {
568                                // if the path is not found, a processed version does not exist
569                            }
570                            AssetReaderError::Io(err) => {
571                                error!(
572                                    "Path '{}' was removed, but the destination reader could not determine if it \
573                                    was a folder or a file due to the following error: {err}",
574                                    AssetPath::from_path(&path).with_source(source.id())
575                                );
576                            }
577                            AssetReaderError::HttpError(status) => {
578                                error!(
579                                    "Path '{}' was removed, but the destination reader could not determine if it \
580                                    was a folder or a file due to receiving an unexpected HTTP Status {status}",
581                                    AssetPath::from_path(&path).with_source(source.id())
582                                );
583                            }
584                        }
585                    }
586                }
587            }
588        }
589    }
590
591    async fn handle_added_folder(
592        &self,
593        source: &AssetSource,
594        path: PathBuf,
595        new_task_sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
596    ) {
597        debug!(
598            "Folder {} was added. Attempting to re-process",
599            AssetPath::from_path(&path).with_source(source.id())
600        );
601        self.queue_processing_tasks_for_folder(source, path, new_task_sender)
602            .await
603            .unwrap();
604    }
605
606    /// Responds to a removed meta event by reprocessing the asset at the given path.
607    async fn handle_removed_meta(
608        &self,
609        source: &AssetSource,
610        path: PathBuf,
611        new_task_sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
612    ) {
613        // If meta was removed, we might need to regenerate it.
614        // Likewise, the user might be manually re-adding the asset.
615        // Therefore, we shouldn't automatically delete the asset ... that is a
616        // user-initiated action.
617        debug!(
618            "Meta for asset {} was removed. Attempting to re-process",
619            AssetPath::from_path(&path).with_source(source.id())
620        );
621        let _ = new_task_sender.send((source.id(), path)).await;
622    }
623
624    /// Removes all processed assets stored at the given path (respecting transactionality), then removes the folder itself.
625    async fn handle_removed_folder(&self, source: &AssetSource, path: &Path) {
626        debug!(
627            "Removing folder {} because source was removed",
628            path.display()
629        );
630        let processed_reader = source.ungated_processed_reader().unwrap();
631        match processed_reader.read_directory(path).await {
632            Ok(mut path_stream) => {
633                while let Some(child_path) = path_stream.next().await {
634                    self.handle_removed_asset(source, child_path).await;
635                }
636            }
637            Err(err) => match err {
638                AssetReaderError::NotFound(_err) => {
639                    // The processed folder does not exist. No need to update anything
640                }
641                AssetReaderError::HttpError(status) => {
642                    self.log_unrecoverable().await;
643                    error!(
644                        "Unrecoverable Error: Failed to read the processed assets at {path:?} in order to remove assets that no longer exist \
645                        in the source directory. Restart the asset processor to fully reprocess assets. HTTP Status Code {status}"
646                    );
647                }
648                AssetReaderError::Io(err) => {
649                    self.log_unrecoverable().await;
650                    error!(
651                        "Unrecoverable Error: Failed to read the processed assets at {path:?} in order to remove assets that no longer exist \
652                        in the source directory. Restart the asset processor to fully reprocess assets. Error: {err}"
653                    );
654                }
655            },
656        }
657        let processed_writer = source.processed_writer().unwrap();
658        if let Err(err) = processed_writer.remove_directory(path).await {
659            match err {
660                AssetWriterError::Io(err) => {
661                    // we can ignore NotFound because if the "final" file in a folder was removed
662                    // then we automatically clean up this folder
663                    if err.kind() != ErrorKind::NotFound {
664                        let asset_path = AssetPath::from_path(path).with_source(source.id());
665                        error!("Failed to remove destination folder that no longer exists in {asset_path}: {err}");
666                    }
667                }
668            }
669        }
670    }
671
672    /// Removes the processed version of an asset and associated in-memory metadata. This will block until all existing reads/writes to the
673    /// asset have finished, thanks to the `file_transaction_lock`.
674    async fn handle_removed_asset(&self, source: &AssetSource, path: PathBuf) {
675        let asset_path = AssetPath::from(path).with_source(source.id());
676        debug!("Removing processed {asset_path} because source was removed");
677        let lock = {
678            // Scope the infos lock so we don't hold up other processing for too long.
679            let mut infos = self.data.processing_state.asset_infos.write().await;
680            infos.remove(&asset_path).await
681        };
682        let Some(lock) = lock else {
683            return;
684        };
685
686        // we must wait for uncontested write access to the asset source to ensure existing
687        // readers/writers can finish their operations
688        let _write_lock = lock.write();
689        self.remove_processed_asset_and_meta(source, asset_path.path())
690            .await;
691    }
692
693    /// Handles a renamed source asset by moving its processed results to the new location and updating in-memory paths + metadata.
694    /// This will cause direct path dependencies to break.
695    async fn handle_renamed_asset(
696        &self,
697        source: &AssetSource,
698        old: PathBuf,
699        new: PathBuf,
700        new_task_sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
701    ) {
702        let old = AssetPath::from(old).with_source(source.id());
703        let new = AssetPath::from(new).with_source(source.id());
704        let processed_writer = source.processed_writer().unwrap();
705        let result = {
706            // Scope the infos lock so we don't hold up other processing for too long.
707            let mut infos = self.data.processing_state.asset_infos.write().await;
708            infos.rename(&old, &new, new_task_sender).await
709        };
710        let Some((old_lock, new_lock)) = result else {
711            return;
712        };
713        // we must wait for uncontested write access to both assets to ensure existing
714        // readers/writers can finish their operations
715        let _old_write_lock = old_lock.write();
716        let _new_write_lock = new_lock.write();
717        processed_writer
718            .rename(old.path(), new.path())
719            .await
720            .unwrap();
721        processed_writer
722            .rename_meta(old.path(), new.path())
723            .await
724            .unwrap();
725    }
726
727    async fn queue_processing_tasks_for_folder(
728        &self,
729        source: &AssetSource,
730        path: PathBuf,
731        new_task_sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
732    ) -> Result<(), AssetReaderError> {
733        if source.reader().is_directory(&path).await? {
734            let mut path_stream = source.reader().read_directory(&path).await?;
735            while let Some(path) = path_stream.next().await {
736                Box::pin(self.queue_processing_tasks_for_folder(source, path, new_task_sender))
737                    .await?;
738            }
739        } else {
740            let _ = new_task_sender.send((source.id(), path)).await;
741        }
742        Ok(())
743    }
744
745    /// Register a new asset processor.
746    pub fn register_processor<P: Process>(&self, processor: P) {
747        let mut processors = self
748            .data
749            .processors
750            .write()
751            .unwrap_or_else(PoisonError::into_inner);
752        let processor = Arc::new(processor);
753        processors
754            .type_path_to_processor
755            .insert(P::type_path(), processor.clone());
756        match processors
757            .short_type_path_to_processor
758            .entry(P::short_type_path())
759        {
760            Entry::Vacant(entry) => {
761                entry.insert(ShortTypeProcessorEntry::Unique {
762                    type_path: P::type_path(),
763                    processor,
764                });
765            }
766            Entry::Occupied(mut entry) => match entry.get_mut() {
767                ShortTypeProcessorEntry::Unique { type_path, .. } => {
768                    let type_path = *type_path;
769                    *entry.get_mut() =
770                        ShortTypeProcessorEntry::Ambiguous(vec![type_path, P::type_path()]);
771                }
772                ShortTypeProcessorEntry::Ambiguous(type_paths) => {
773                    type_paths.push(P::type_path());
774                }
775            },
776        }
777    }
778
779    /// Set the default processor for the given `extension`. Make sure `P` is registered with [`AssetProcessor::register_processor`].
780    pub fn set_default_processor<P: Process>(&self, extension: &str) {
781        let mut processors = self
782            .data
783            .processors
784            .write()
785            .unwrap_or_else(PoisonError::into_inner);
786        processors
787            .file_extension_to_default_processor
788            .insert(extension.into(), P::type_path());
789    }
790
791    /// Returns the default processor for the given `extension`, if it exists.
792    pub fn get_default_processor(&self, extension: &str) -> Option<Arc<dyn ErasedProcessor>> {
793        let processors = self
794            .data
795            .processors
796            .read()
797            .unwrap_or_else(PoisonError::into_inner);
798        let key = processors
799            .file_extension_to_default_processor
800            .get(extension)?;
801        processors.type_path_to_processor.get(key).cloned()
802    }
803
804    /// Returns the processor with the given `processor_type_name`, if it exists.
805    pub fn get_processor(
806        &self,
807        processor_type_name: &str,
808    ) -> Result<Arc<dyn ErasedProcessor>, GetProcessorError> {
809        let processors = self
810            .data
811            .processors
812            .read()
813            .unwrap_or_else(PoisonError::into_inner);
814        if let Some(short_type_processor) = processors
815            .short_type_path_to_processor
816            .get(processor_type_name)
817        {
818            return match short_type_processor {
819                ShortTypeProcessorEntry::Unique { processor, .. } => Ok(processor.clone()),
820                ShortTypeProcessorEntry::Ambiguous(examples) => Err(GetProcessorError::Ambiguous {
821                    processor_short_name: processor_type_name.to_owned(),
822                    ambiguous_processor_names: examples.clone(),
823                }),
824            };
825        }
826        processors
827            .type_path_to_processor
828            .get(processor_type_name)
829            .cloned()
830            .ok_or_else(|| GetProcessorError::Missing(processor_type_name.to_owned()))
831    }
832
833    /// Populates the initial view of each asset by scanning the unprocessed and processed asset folders.
834    /// This info will later be used to determine whether or not to re-process an asset
835    ///
836    /// This will validate transactions and recover failed transactions when necessary.
837    async fn initialize(&self) -> Result<(), InitializeError> {
838        self.validate_transaction_log_and_recover().await;
839        let mut asset_infos = self.data.processing_state.asset_infos.write().await;
840
841        /// Retrieves asset paths recursively. If `clean_empty_folders_writer` is Some, it will be used to clean up empty
842        /// folders when they are discovered.
843        async fn get_asset_paths(
844            reader: &dyn ErasedAssetReader,
845            path: PathBuf,
846            paths: &mut Vec<PathBuf>,
847            mut empty_dirs: Option<&mut Vec<PathBuf>>,
848        ) -> Result<bool, AssetReaderError> {
849            if reader.is_directory(&path).await? {
850                let mut path_stream = reader.read_directory(&path).await?;
851                let mut contains_files = false;
852
853                while let Some(child_path) = path_stream.next().await {
854                    contains_files |= Box::pin(get_asset_paths(
855                        reader,
856                        child_path,
857                        paths,
858                        empty_dirs.as_deref_mut(),
859                    ))
860                    .await?;
861                }
862                // Add the current directory after all its subdirectories so we delete any empty
863                // subdirectories before the current directory.
864                if !contains_files
865                    && path.parent().is_some()
866                    && let Some(empty_dirs) = empty_dirs
867                {
868                    empty_dirs.push(path);
869                }
870                Ok(contains_files)
871            } else {
872                paths.push(path);
873                Ok(true)
874            }
875        }
876
877        for source in self.sources().iter_processed() {
878            let Some(processed_reader) = source.ungated_processed_reader() else {
879                continue;
880            };
881            let Ok(processed_writer) = source.processed_writer() else {
882                continue;
883            };
884            let mut unprocessed_paths = Vec::new();
885            get_asset_paths(
886                source.reader(),
887                PathBuf::from(""),
888                &mut unprocessed_paths,
889                None,
890            )
891            .await
892            .map_err(InitializeError::FailedToReadSourcePaths)?;
893
894            let mut processed_paths = Vec::new();
895            let mut empty_dirs = Vec::new();
896            get_asset_paths(
897                processed_reader,
898                PathBuf::from(""),
899                &mut processed_paths,
900                Some(&mut empty_dirs),
901            )
902            .await
903            .map_err(InitializeError::FailedToReadDestinationPaths)?;
904
905            // Remove any empty directories from the processed path. Note: this has to happen after
906            // we fetch all the paths, otherwise the path stream can skip over paths
907            // (we're modifying a collection while iterating through it).
908            for empty_dir in empty_dirs {
909                // We don't care if this succeeds, since it's just a cleanup task. It is best-effort
910                let _ = processed_writer.remove_empty_directory(&empty_dir).await;
911            }
912
913            for path in unprocessed_paths {
914                asset_infos.get_or_insert(AssetPath::from(path).with_source(source.id()));
915            }
916
917            for path in processed_paths {
918                let mut dependencies = Vec::new();
919                let asset_path = AssetPath::from(path).with_source(source.id());
920                if let Some(info) = asset_infos.get_mut(&asset_path) {
921                    match processed_reader.read_meta_bytes(asset_path.path()).await {
922                        Ok(meta_bytes) => {
923                            match ron::de::from_bytes::<ProcessedInfoMinimal>(&meta_bytes) {
924                                Ok(minimal) => {
925                                    trace!(
926                                        "Populated processed info for asset {asset_path} {:?}",
927                                        minimal.processed_info
928                                    );
929
930                                    if let Some(processed_info) = &minimal.processed_info {
931                                        for process_dependency_info in
932                                            &processed_info.process_dependencies
933                                        {
934                                            dependencies.push(process_dependency_info.path.clone());
935                                        }
936                                    }
937                                    info.processed_info = minimal.processed_info;
938                                }
939                                Err(err) => {
940                                    trace!("Removing processed data for {asset_path} because meta could not be parsed: {err}");
941                                    self.remove_processed_asset_and_meta(source, asset_path.path())
942                                        .await;
943                                }
944                            }
945                        }
946                        Err(err) => {
947                            trace!("Removing processed data for {asset_path} because meta failed to load: {err}");
948                            self.remove_processed_asset_and_meta(source, asset_path.path())
949                                .await;
950                        }
951                    }
952                } else {
953                    trace!("Removing processed data for non-existent asset {asset_path}");
954                    self.remove_processed_asset_and_meta(source, asset_path.path())
955                        .await;
956                }
957
958                for dependency in dependencies {
959                    asset_infos.add_dependent(&dependency, asset_path.clone());
960                }
961            }
962        }
963
964        self.data
965            .processing_state
966            .set_state(ProcessorState::Processing)
967            .await;
968
969        Ok(())
970    }
971
972    /// Removes the processed version of an asset and its metadata, if it exists. This _is not_ transactional like `remove_processed_asset_transactional`, nor
973    /// does it remove existing in-memory metadata.
974    async fn remove_processed_asset_and_meta(&self, source: &AssetSource, path: &Path) {
975        if let Err(err) = source.processed_writer().unwrap().remove(path).await {
976            warn!("Failed to remove non-existent asset {path:?}: {err}");
977        }
978
979        if let Err(err) = source.processed_writer().unwrap().remove_meta(path).await {
980            warn!("Failed to remove non-existent meta {path:?}: {err}");
981        }
982
983        self.clean_empty_processed_ancestor_folders(source, path)
984            .await;
985    }
986
987    async fn clean_empty_processed_ancestor_folders(&self, source: &AssetSource, mut path: &Path) {
988        // As a safety precaution don't delete absolute paths to avoid deleting folders outside of the destination folder
989        if path.is_absolute() {
990            error!("Attempted to clean up ancestor folders of an absolute path. This is unsafe so the operation was skipped.");
991            return;
992        }
993        while let Some(parent) = path.parent() {
994            path = parent;
995            if parent == Path::new("") {
996                break;
997            }
998            if source
999                .processed_writer()
1000                .unwrap()
1001                .remove_empty_directory(parent)
1002                .await
1003                .is_err()
1004            {
1005                // if we fail to delete a folder, stop walking up the tree
1006                break;
1007            }
1008        }
1009    }
1010
1011    /// Processes the asset (if it has not already been processed or the asset source has changed).
1012    /// If the asset has "process dependencies" (relies on the values of other assets), it will asynchronously await until
1013    /// the dependencies have been processed (See [`ProcessorGatedReader`], which is used in the [`AssetProcessor`]'s [`AssetServer`]
1014    /// to block reads until the asset is processed).
1015    ///
1016    /// [`LoadContext`]: crate::loader::LoadContext
1017    /// [`ProcessorGatedReader`]: crate::io::processor_gated::ProcessorGatedReader
1018    async fn process_asset(
1019        &self,
1020        source: &AssetSource,
1021        path: PathBuf,
1022        processor_task_event: async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
1023    ) {
1024        let asset_path = AssetPath::from(path).with_source(source.id());
1025        let result = self.process_asset_internal(source, &asset_path).await;
1026        let mut infos = self.data.processing_state.asset_infos.write().await;
1027        infos
1028            .finish_processing(asset_path, result, processor_task_event)
1029            .await;
1030    }
1031
1032    async fn process_asset_internal(
1033        &self,
1034        source: &AssetSource,
1035        asset_path: &AssetPath<'static>,
1036    ) -> Result<ProcessResult, ProcessError> {
1037        // TODO: check if already processing to protect against duplicate hot-reload events
1038        debug!("Processing {}", asset_path);
1039        let server = &self.server;
1040        let path = asset_path.path();
1041        let reader = source.reader();
1042
1043        let reader_err = |err| ProcessError::AssetReaderError {
1044            path: asset_path.clone(),
1045            err,
1046        };
1047        let writer_err = |err| ProcessError::AssetWriterError {
1048            path: asset_path.clone(),
1049            err,
1050        };
1051
1052        let (mut source_meta, meta_bytes, processor) = match reader.read_meta_bytes(path).await {
1053            Ok(meta_bytes) => {
1054                let minimal: AssetMetaMinimal = ron::de::from_bytes(&meta_bytes).map_err(|e| {
1055                    ProcessError::DeserializeMetaError(DeserializeMetaError::DeserializeMinimal(e))
1056                })?;
1057                let (meta, processor) = match minimal.asset {
1058                    AssetActionMinimal::Load { loader } => {
1059                        let loader = server.get_asset_loader_with_type_name(&loader).await?;
1060                        let meta = loader.deserialize_meta(&meta_bytes)?;
1061                        (meta, None)
1062                    }
1063                    AssetActionMinimal::Process { processor } => {
1064                        let processor = self.get_processor(&processor)?;
1065                        let meta = processor.deserialize_meta(&meta_bytes)?;
1066                        (meta, Some(processor))
1067                    }
1068                    AssetActionMinimal::Ignore => {
1069                        return Ok(ProcessResult::Ignored);
1070                    }
1071                };
1072                (meta, meta_bytes, processor)
1073            }
1074            Err(AssetReaderError::NotFound(_path)) => {
1075                let (meta, processor) = if let Some(processor) = asset_path
1076                    .get_full_extension()
1077                    .and_then(|ext| self.get_default_processor(ext))
1078                {
1079                    // Note: It doesn't matter whether we use the Long or Short kind, since we're
1080                    // returning the processor here anyway, and we're only using this meta to pass
1081                    // along the processor settings.
1082                    let meta = processor.default_meta(MetaTypePathKind::Long);
1083                    (meta, Some(processor))
1084                } else {
1085                    match server.get_path_asset_loader(asset_path.clone()).await {
1086                        Ok(loader) => (loader.default_meta(), None),
1087                        Err(MissingAssetLoaderForExtensionError { .. }) => {
1088                            let meta: Box<dyn AssetMetaDyn> =
1089                                Box::new(AssetMeta::<(), ()>::new(AssetAction::Ignore));
1090                            (meta, None)
1091                        }
1092                    }
1093                };
1094                let meta_bytes = meta.serialize();
1095                (meta, meta_bytes, processor)
1096            }
1097            Err(err) => {
1098                return Err(ProcessError::ReadAssetMetaError {
1099                    path: asset_path.clone(),
1100                    err,
1101                })
1102            }
1103        };
1104
1105        let processed_writer = source.processed_writer()?;
1106
1107        let new_hash = {
1108            // Create a reader just for computing the hash. Keep this scoped here so that we drop it
1109            // as soon as the hash is computed.
1110            let mut reader_for_hash = reader.read(path).await.map_err(reader_err)?;
1111
1112            get_asset_hash(&meta_bytes, &mut reader_for_hash)
1113                .await
1114                .map_err(reader_err)?
1115        };
1116        let mut new_processed_info = ProcessedInfo {
1117            hash: new_hash,
1118            full_hash: new_hash,
1119            process_dependencies: Vec::new(),
1120        };
1121
1122        {
1123            let infos = self.data.processing_state.asset_infos.read().await;
1124            if let Some(current_processed_info) = infos
1125                .get(asset_path)
1126                .and_then(|i| i.processed_info.as_ref())
1127                && current_processed_info.hash == new_hash
1128            {
1129                let mut dependency_changed = false;
1130                for current_dep_info in &current_processed_info.process_dependencies {
1131                    let live_hash = infos
1132                        .get(&current_dep_info.path)
1133                        .and_then(|i| i.processed_info.as_ref())
1134                        .map(|i| i.full_hash);
1135                    if live_hash != Some(current_dep_info.full_hash) {
1136                        dependency_changed = true;
1137                        break;
1138                    }
1139                }
1140                if !dependency_changed {
1141                    return Ok(ProcessResult::SkippedNotChanged);
1142                }
1143            }
1144        }
1145
1146        // Note: this lock must remain alive until all processed asset and meta writes have finished (or failed)
1147        // See ProcessedAssetInfo::file_transaction_lock docs for more info
1148        let _transaction_lock = {
1149            let lock = {
1150                let mut infos = self.data.processing_state.asset_infos.write().await;
1151                let info = infos.get_or_insert(asset_path.clone());
1152                // Clone out the transaction lock first and then lock after we've dropped the
1153                // asset_infos. Otherwise, trying to lock a single path can block all other paths to
1154                // (leading to deadlocks).
1155                info.file_transaction_lock.clone()
1156            };
1157            lock.write_arc().await
1158        };
1159
1160        // NOTE: if processing the asset fails this will produce an "unfinished" log entry, forcing a rebuild on next run.
1161        // Directly writing to the asset destination in the processor necessitates this behavior
1162        // TODO: this class of failure can be recovered via re-processing + smarter log validation that allows for duplicate transactions in the event of failures
1163        self.log_begin_processing(asset_path).await;
1164        if let Some(processor) = processor {
1165            // Unwrap is ok since we have a processor, so the `AssetAction` must have been
1166            // `AssetAction::Process` (which includes its settings).
1167            let settings = source_meta.process_settings().unwrap();
1168
1169            // Create a reader just for the actual process. Note: this means that we're performing
1170            // two reads for the same file (but we avoid having to load the whole file into memory).
1171            // For some sources (like local file systems), this is not a big deal, but for other
1172            // sources like an HTTP asset sources, this could be an entire additional download (if
1173            // the asset source doesn't do any caching). In practice, most sources being processed
1174            // are likely to be local, and processing in general is a publish-time operation, so
1175            // it's not likely to be too big a deal. If in the future, we decide we want to avoid
1176            // this repeated read, we could "ask" the asset source if it prefers avoiding repeated
1177            // reads or not.
1178            let reader_for_process = reader.read(path).await.map_err(reader_err)?;
1179
1180            let mut writer = processed_writer.write(path).await.map_err(writer_err)?;
1181            let mut processed_meta = {
1182                let mut context = ProcessContext::new(
1183                    self,
1184                    asset_path,
1185                    reader_for_process,
1186                    &mut new_processed_info,
1187                );
1188                let process = processor.process(&mut context, settings, &mut *writer);
1189                #[cfg(feature = "trace")]
1190                let process = {
1191                    let span = info_span!(
1192                        "asset processing",
1193                        processor = processor.type_path(),
1194                        asset = asset_path.to_string(),
1195                    );
1196                    process.instrument(span)
1197                };
1198                process.await?
1199            };
1200
1201            writer
1202                .flush()
1203                .await
1204                .map_err(|e| ProcessError::AssetWriterError {
1205                    path: asset_path.clone(),
1206                    err: AssetWriterError::Io(e),
1207                })?;
1208
1209            let full_hash = get_full_asset_hash(
1210                new_hash,
1211                new_processed_info
1212                    .process_dependencies
1213                    .iter()
1214                    .map(|i| i.full_hash),
1215            );
1216            new_processed_info.full_hash = full_hash;
1217            *processed_meta.processed_info_mut() = Some(new_processed_info.clone());
1218            let meta_bytes = processed_meta.serialize();
1219
1220            processed_writer
1221                .write_meta_bytes(path, &meta_bytes)
1222                .await
1223                .map_err(writer_err)?;
1224        } else {
1225            // See the reasoning for processing why it's ok to do a second read here.
1226            let mut reader_for_copy = reader.read(path).await.map_err(reader_err)?;
1227            let mut writer = processed_writer.write(path).await.map_err(writer_err)?;
1228            futures_lite::io::copy(&mut reader_for_copy, &mut writer)
1229                .await
1230                .map_err(|err| ProcessError::AssetWriterError {
1231                    path: asset_path.clone_owned(),
1232                    err: err.into(),
1233                })?;
1234            *source_meta.processed_info_mut() = Some(new_processed_info.clone());
1235            let meta_bytes = source_meta.serialize();
1236            processed_writer
1237                .write_meta_bytes(path, &meta_bytes)
1238                .await
1239                .map_err(writer_err)?;
1240        }
1241        self.log_end_processing(asset_path).await;
1242
1243        Ok(ProcessResult::Processed(new_processed_info))
1244    }
1245
1246    async fn validate_transaction_log_and_recover(&self) {
1247        let log_factory = self
1248            .data
1249            .log_factory
1250            .lock()
1251            .unwrap_or_else(PoisonError::into_inner)
1252            // Take the log factory to indicate we've started and this should disable setting a new
1253            // log factory.
1254            .take()
1255            .expect("the asset processor only starts once");
1256        if let Err(err) = validate_transaction_log(log_factory.as_ref()).await {
1257            let state_is_valid = match err {
1258                ValidateLogError::ReadLogError(err) => {
1259                    error!("Failed to read processor log file. Processed assets cannot be validated so they must be re-generated {err}");
1260                    false
1261                }
1262                ValidateLogError::UnrecoverableError => {
1263                    error!("Encountered an unrecoverable error in the last run. Processed assets cannot be validated so they must be re-generated");
1264                    false
1265                }
1266                ValidateLogError::EntryErrors(entry_errors) => {
1267                    let mut state_is_valid = true;
1268                    for entry_error in entry_errors {
1269                        match entry_error {
1270                            LogEntryError::DuplicateTransaction(_)
1271                            | LogEntryError::EndedMissingTransaction(_) => {
1272                                error!("{}", entry_error);
1273                                state_is_valid = false;
1274                                break;
1275                            }
1276                            LogEntryError::UnfinishedTransaction(path) => {
1277                                debug!("Asset {path:?} did not finish processing. Clearing state for that asset");
1278                                let mut unrecoverable_err = |message: &dyn core::fmt::Display| {
1279                                    error!("Failed to remove asset {path:?}: {message}");
1280                                    state_is_valid = false;
1281                                };
1282                                let Ok(source) = self.get_source(path.source()) else {
1283                                    unrecoverable_err(&"AssetSource does not exist");
1284                                    continue;
1285                                };
1286                                let Ok(processed_writer) = source.processed_writer() else {
1287                                    unrecoverable_err(&"AssetSource does not have a processed AssetWriter registered");
1288                                    continue;
1289                                };
1290
1291                                if let Err(err) = processed_writer.remove(path.path()).await {
1292                                    match err {
1293                                        AssetWriterError::Io(err) => {
1294                                            // any error but NotFound means we could be in a bad state
1295                                            if err.kind() != ErrorKind::NotFound {
1296                                                unrecoverable_err(&err);
1297                                            }
1298                                        }
1299                                    }
1300                                }
1301                                if let Err(err) = processed_writer.remove_meta(path.path()).await {
1302                                    match err {
1303                                        AssetWriterError::Io(err) => {
1304                                            // any error but NotFound means we could be in a bad state
1305                                            if err.kind() != ErrorKind::NotFound {
1306                                                unrecoverable_err(&err);
1307                                            }
1308                                        }
1309                                    }
1310                                }
1311                            }
1312                        }
1313                    }
1314                    state_is_valid
1315                }
1316            };
1317
1318            if !state_is_valid {
1319                error!("Processed asset transaction log state was invalid and unrecoverable for some reason (see previous logs). Removing processed assets and starting fresh.");
1320                for source in self.sources().iter_processed() {
1321                    let Ok(processed_writer) = source.processed_writer() else {
1322                        continue;
1323                    };
1324                    if let Err(err) = processed_writer
1325                        .remove_assets_in_directory(Path::new(""))
1326                        .await
1327                    {
1328                        panic!("Processed assets were in a bad state. To correct this, the asset processor attempted to remove all processed assets and start from scratch. This failed. There is no way to continue. Try restarting, or deleting imported asset folder manually. {err}");
1329                    }
1330                }
1331            }
1332        }
1333        let mut log = self.data.log.write().await;
1334        *log = match log_factory.create_new_log().await {
1335            Ok(log) => Some(log),
1336            Err(err) => panic!("Failed to initialize asset processor log. This cannot be recovered. Try restarting. If that doesn't work, try deleting processed asset folder. {}", err),
1337        };
1338    }
1339}
1340
1341impl AssetProcessorData {
1342    /// Initializes a new [`AssetProcessorData`] using the given [`AssetSources`].
1343    pub(crate) fn new(sources: Arc<AssetSources>, processing_state: Arc<ProcessingState>) -> Self {
1344        AssetProcessorData {
1345            processing_state,
1346            sources,
1347            log_factory: Mutex::new(Some(Box::new(FileTransactionLogFactory::default()))),
1348            log: Default::default(),
1349            processors: Default::default(),
1350        }
1351    }
1352
1353    /// Sets the transaction log factory for the processor.
1354    ///
1355    /// If this is called after asset processing has begun (in the `Startup` schedule), it will
1356    /// return an error. If not called, the default transaction log will be used.
1357    pub fn set_log_factory(
1358        &self,
1359        factory: Box<dyn ProcessorTransactionLogFactory>,
1360    ) -> Result<(), SetTransactionLogFactoryError> {
1361        let mut log_factory = self
1362            .log_factory
1363            .lock()
1364            .unwrap_or_else(PoisonError::into_inner);
1365        if log_factory.is_none() {
1366            // This indicates the asset processor has already started, so setting the factory does
1367            // nothing here.
1368            return Err(SetTransactionLogFactoryError::AlreadyInUse);
1369        }
1370
1371        *log_factory = Some(factory);
1372        Ok(())
1373    }
1374
1375    /// Returns a future that will not finish until the path has been processed.
1376    pub async fn wait_until_processed(&self, path: AssetPath<'static>) -> ProcessStatus {
1377        self.processing_state.wait_until_processed(path).await
1378    }
1379
1380    /// Returns a future that will not finish until the processor has been initialized.
1381    pub async fn wait_until_initialized(&self) {
1382        self.processing_state.wait_until_initialized().await;
1383    }
1384
1385    /// Returns a future that will not finish until processing has finished.
1386    pub async fn wait_until_finished(&self) {
1387        self.processing_state.wait_until_finished().await;
1388    }
1389}
1390
1391impl ProcessingState {
1392    /// Creates a new empty processing state.
1393    fn new() -> Self {
1394        let (mut initialized_sender, initialized_receiver) = async_broadcast::broadcast(1);
1395        let (mut finished_sender, finished_receiver) = async_broadcast::broadcast(1);
1396        // allow overflow on these "one slot" channels to allow receivers to retrieve the "latest" state, and to allow senders to
1397        // not block if there was older state present.
1398        initialized_sender.set_overflow(true);
1399        finished_sender.set_overflow(true);
1400
1401        Self {
1402            state: async_lock::RwLock::new(ProcessorState::Initializing),
1403            initialized_sender,
1404            initialized_receiver,
1405            finished_sender,
1406            finished_receiver,
1407            asset_infos: Default::default(),
1408        }
1409    }
1410
1411    /// Sets the overall state of processing and broadcasts appropriate events.
1412    async fn set_state(&self, state: ProcessorState) {
1413        let mut state_guard = self.state.write().await;
1414        let last_state = *state_guard;
1415        *state_guard = state;
1416        if last_state != ProcessorState::Finished && state == ProcessorState::Finished {
1417            self.finished_sender.broadcast(()).await.unwrap();
1418        } else if last_state != ProcessorState::Processing && state == ProcessorState::Processing {
1419            self.initialized_sender.broadcast(()).await.unwrap();
1420        }
1421    }
1422
1423    /// Retrieves the current [`ProcessorState`]
1424    pub(crate) async fn get_state(&self) -> ProcessorState {
1425        *self.state.read().await
1426    }
1427
1428    /// Gets a "transaction lock" that can be used to ensure no writes to asset or asset meta occur
1429    /// while it is held.
1430    pub(crate) async fn get_transaction_lock(
1431        &self,
1432        path: &AssetPath<'static>,
1433    ) -> Result<RwLockReadGuardArc<()>, AssetReaderError> {
1434        let lock = {
1435            let infos = self.asset_infos.read().await;
1436            let info = infos
1437                .get(path)
1438                .ok_or_else(|| AssetReaderError::NotFound(path.path().to_owned()))?;
1439            // Clone out the transaction lock first and then lock after we've dropped the
1440            // asset_infos. Otherwise, trying to lock a single path can block all other paths to
1441            // (leading to deadlocks).
1442            info.file_transaction_lock.clone()
1443        };
1444        Ok(lock.read_arc().await)
1445    }
1446
1447    /// Returns a future that will not finish until the path has been processed.
1448    pub(crate) async fn wait_until_processed(&self, path: AssetPath<'static>) -> ProcessStatus {
1449        self.wait_until_initialized().await;
1450        let mut receiver = {
1451            let infos = self.asset_infos.write().await;
1452            let info = infos.get(&path);
1453            match info {
1454                Some(info) => match info.status {
1455                    Some(result) => return result,
1456                    // This receiver must be created prior to losing the read lock to ensure this is transactional
1457                    None => info.status_receiver.clone(),
1458                },
1459                None => return ProcessStatus::NonExistent,
1460            }
1461        };
1462        receiver.recv().await.unwrap()
1463    }
1464
1465    /// Returns a future that will not finish until the processor has been initialized.
1466    pub(crate) async fn wait_until_initialized(&self) {
1467        let receiver = {
1468            let state = self.state.read().await;
1469            match *state {
1470                ProcessorState::Initializing => {
1471                    // This receiver must be created prior to losing the read lock to ensure this is transactional
1472                    Some(self.initialized_receiver.clone())
1473                }
1474                _ => None,
1475            }
1476        };
1477
1478        if let Some(mut receiver) = receiver {
1479            receiver.recv().await.unwrap();
1480        }
1481    }
1482
1483    /// Returns a future that will not finish until processing has finished.
1484    pub(crate) async fn wait_until_finished(&self) {
1485        let receiver = {
1486            let state = self.state.read().await;
1487            match *state {
1488                ProcessorState::Initializing | ProcessorState::Processing => {
1489                    // This receiver must be created prior to losing the read lock to ensure this is transactional
1490                    Some(self.finished_receiver.clone())
1491                }
1492                ProcessorState::Finished => None,
1493            }
1494        };
1495
1496        if let Some(mut receiver) = receiver {
1497            receiver.recv().await.unwrap();
1498        }
1499    }
1500}
1501
1502/// The (successful) result of processing an asset
1503#[derive(Debug, Clone)]
1504pub enum ProcessResult {
1505    Processed(ProcessedInfo),
1506    SkippedNotChanged,
1507    Ignored,
1508}
1509
1510/// The final status of processing an asset
1511#[derive(Debug, PartialEq, Eq, Copy, Clone)]
1512pub enum ProcessStatus {
1513    Processed,
1514    Failed,
1515    NonExistent,
1516}
1517
1518// NOTE: if you add new fields to this struct, make sure they are propagated (when relevant) in ProcessorAssetInfos::rename
1519#[derive(Debug)]
1520pub(crate) struct ProcessorAssetInfo {
1521    processed_info: Option<ProcessedInfo>,
1522    /// Paths of assets that depend on this asset when they are being processed.
1523    dependents: HashSet<AssetPath<'static>>,
1524    status: Option<ProcessStatus>,
1525    /// A lock that controls read/write access to processed asset files. The lock is shared for both the asset bytes and the meta bytes.
1526    /// _This lock must be locked whenever a read or write to processed assets occurs_
1527    /// There are scenarios where processed assets (and their metadata) are being read and written in multiple places at once:
1528    /// * when the processor is running in parallel with an app
1529    /// * when processing assets in parallel, the processor might read an asset's `process_dependencies` when processing new versions of those dependencies
1530    ///     * this second scenario almost certainly isn't possible with the current implementation, but its worth protecting against
1531    ///
1532    /// This lock defends against those scenarios by ensuring readers don't read while processed files are being written. And it ensures
1533    /// Because this lock is shared across meta and asset bytes, readers can ensure they don't read "old" versions of metadata with "new" asset data.
1534    pub(crate) file_transaction_lock: Arc<async_lock::RwLock<()>>,
1535    status_sender: async_broadcast::Sender<ProcessStatus>,
1536    status_receiver: async_broadcast::Receiver<ProcessStatus>,
1537}
1538
1539impl Default for ProcessorAssetInfo {
1540    fn default() -> Self {
1541        let (mut status_sender, status_receiver) = async_broadcast::broadcast(1);
1542        // allow overflow on these "one slot" channels to allow receivers to retrieve the "latest" state, and to allow senders to
1543        // not block if there was older state present.
1544        status_sender.set_overflow(true);
1545        Self {
1546            processed_info: Default::default(),
1547            dependents: Default::default(),
1548            file_transaction_lock: Default::default(),
1549            status: None,
1550            status_sender,
1551            status_receiver,
1552        }
1553    }
1554}
1555
1556impl ProcessorAssetInfo {
1557    async fn update_status(&mut self, status: ProcessStatus) {
1558        if self.status != Some(status) {
1559            self.status = Some(status);
1560            self.status_sender.broadcast(status).await.unwrap();
1561        }
1562    }
1563}
1564
1565/// The "current" in memory view of the asset space. This is "eventually consistent". It does not directly
1566/// represent the state of assets in storage, but rather a valid historical view that will gradually become more
1567/// consistent as events are processed.
1568#[derive(Default, Debug)]
1569pub struct ProcessorAssetInfos {
1570    /// The "current" in memory view of the asset space. During processing, if path does not exist in this, it should
1571    /// be considered non-existent.
1572    /// NOTE: YOU MUST USE `Self::get_or_insert` or `Self::insert` TO ADD ITEMS TO THIS COLLECTION TO ENSURE
1573    /// `non_existent_dependents` DATA IS CONSUMED
1574    infos: HashMap<AssetPath<'static>, ProcessorAssetInfo>,
1575    /// Dependents for assets that don't exist. This exists to track "dangling" asset references due to deleted / missing files.
1576    /// If the dependent asset is added, it can "resolve" these dependencies and re-compute those assets.
1577    /// Therefore this _must_ always be consistent with the `infos` data. If a new asset is added to `infos`, it should
1578    /// check this maps for dependencies and add them. If an asset is removed, it should update the dependents here.
1579    non_existent_dependents: HashMap<AssetPath<'static>, HashSet<AssetPath<'static>>>,
1580}
1581
1582impl ProcessorAssetInfos {
1583    fn get_or_insert(&mut self, asset_path: AssetPath<'static>) -> &mut ProcessorAssetInfo {
1584        self.infos.entry(asset_path.clone()).or_insert_with(|| {
1585            let mut info = ProcessorAssetInfo::default();
1586            // track existing dependents by resolving existing "hanging" dependents.
1587            if let Some(dependents) = self.non_existent_dependents.remove(&asset_path) {
1588                info.dependents = dependents;
1589            }
1590            info
1591        })
1592    }
1593
1594    pub(crate) fn get(&self, asset_path: &AssetPath<'static>) -> Option<&ProcessorAssetInfo> {
1595        self.infos.get(asset_path)
1596    }
1597
1598    fn get_mut(&mut self, asset_path: &AssetPath<'static>) -> Option<&mut ProcessorAssetInfo> {
1599        self.infos.get_mut(asset_path)
1600    }
1601
1602    fn add_dependent(&mut self, asset_path: &AssetPath<'static>, dependent: AssetPath<'static>) {
1603        if let Some(info) = self.get_mut(asset_path) {
1604            info.dependents.insert(dependent);
1605        } else {
1606            let dependents = self
1607                .non_existent_dependents
1608                .entry(asset_path.clone())
1609                .or_default();
1610            dependents.insert(dependent);
1611        }
1612    }
1613
1614    /// Finalize processing the asset, which will incorporate the result of the processed asset into the in-memory view the processed assets.
1615    async fn finish_processing(
1616        &mut self,
1617        asset_path: AssetPath<'static>,
1618        result: Result<ProcessResult, ProcessError>,
1619        reprocess_sender: async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
1620    ) {
1621        match result {
1622            Ok(ProcessResult::Processed(processed_info)) => {
1623                debug!("Finished processing \"{}\"", asset_path);
1624                // clean up old dependents
1625                let old_processed_info = self
1626                    .infos
1627                    .get_mut(&asset_path)
1628                    .and_then(|i| i.processed_info.take());
1629                if let Some(old_processed_info) = old_processed_info {
1630                    self.clear_dependencies(&asset_path, old_processed_info);
1631                }
1632
1633                // populate new dependents
1634                for process_dependency_info in &processed_info.process_dependencies {
1635                    self.add_dependent(&process_dependency_info.path, asset_path.to_owned());
1636                }
1637                let info = self.get_or_insert(asset_path);
1638                info.processed_info = Some(processed_info);
1639                info.update_status(ProcessStatus::Processed).await;
1640                let dependents = info.dependents.iter().cloned().collect::<Vec<_>>();
1641                for path in dependents {
1642                    let _ = reprocess_sender
1643                        .send((path.source().clone_owned(), path.path().to_owned()))
1644                        .await;
1645                }
1646            }
1647            Ok(ProcessResult::SkippedNotChanged) => {
1648                debug!("Skipping processing (unchanged) \"{}\"", asset_path);
1649                let info = self.get_mut(&asset_path).expect("info should exist");
1650                // NOTE: skipping an asset on a given pass doesn't mean it won't change in the future as a result
1651                // of a dependency being re-processed. This means apps might receive an "old" (but valid) asset first.
1652                // This is in the interest of fast startup times that don't block for all assets being checked + reprocessed
1653                // Therefore this relies on hot-reloading in the app to pickup the "latest" version of the asset
1654                // If "block until latest state is reflected" is required, we can easily add a less granular
1655                // "block until first pass finished" mode
1656                info.update_status(ProcessStatus::Processed).await;
1657            }
1658            Ok(ProcessResult::Ignored) => {
1659                debug!("Skipping processing (ignored) \"{}\"", asset_path);
1660            }
1661            Err(ProcessError::ExtensionRequired) => {
1662                // Skip assets without extensions
1663            }
1664            Err(ProcessError::MissingAssetLoaderForExtension(_)) => {
1665                trace!("No loader found for {asset_path}");
1666            }
1667            Err(ProcessError::AssetReaderError {
1668                err: AssetReaderError::NotFound(_),
1669                ..
1670            }) => {
1671                // if there is no asset source, no processing can be done
1672                trace!("No need to process asset {asset_path} because it does not exist");
1673            }
1674            Err(err) => {
1675                error!("Failed to process asset {asset_path}: {err}");
1676                // if this failed because a dependency could not be loaded, make sure it is reprocessed if that dependency is reprocessed
1677                if let ProcessError::AssetLoadError(AssetLoadError::AssetLoaderError(dependency)) =
1678                    err
1679                {
1680                    let info = self.get_mut(&asset_path).expect("info should exist");
1681                    info.processed_info = Some(ProcessedInfo {
1682                        hash: AssetHash::default(),
1683                        full_hash: AssetHash::default(),
1684                        process_dependencies: vec![],
1685                    });
1686                    self.add_dependent(dependency.path(), asset_path.to_owned());
1687                }
1688
1689                let info = self.get_mut(&asset_path).expect("info should exist");
1690                info.update_status(ProcessStatus::Failed).await;
1691            }
1692        }
1693    }
1694
1695    /// Remove the info for the given path. This should only happen if an asset's source is
1696    /// removed/non-existent. Returns the transaction lock for the asset, or [`None`] if the asset
1697    /// path does not exist.
1698    async fn remove(
1699        &mut self,
1700        asset_path: &AssetPath<'static>,
1701    ) -> Option<Arc<async_lock::RwLock<()>>> {
1702        let info = self.infos.remove(asset_path)?;
1703        if let Some(processed_info) = info.processed_info {
1704            self.clear_dependencies(asset_path, processed_info);
1705        }
1706        // Tell all listeners this asset does not exist
1707        info.status_sender
1708            .broadcast(ProcessStatus::NonExistent)
1709            .await
1710            .unwrap();
1711        if !info.dependents.is_empty() {
1712            error!(
1713                    "The asset at {asset_path} was removed, but it had assets that depend on it to be processed. Consider updating the path in the following assets: {:?}",
1714                    info.dependents
1715                );
1716            self.non_existent_dependents
1717                .insert(asset_path.clone(), info.dependents);
1718        }
1719
1720        Some(info.file_transaction_lock)
1721    }
1722
1723    /// Remove the info for the old path, and move over its info to the new path. This should only
1724    /// happen if an asset's source is removed/non-existent. Returns the transaction locks for the
1725    /// old and new assets respectively, or [`None`] if the old path does not exist.
1726    async fn rename(
1727        &mut self,
1728        old: &AssetPath<'static>,
1729        new: &AssetPath<'static>,
1730        new_task_sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
1731    ) -> Option<(Arc<async_lock::RwLock<()>>, Arc<async_lock::RwLock<()>>)> {
1732        let mut info = self.infos.remove(old)?;
1733        if !info.dependents.is_empty() {
1734            // TODO: We can't currently ensure "moved" folders with relative paths aren't broken because AssetPath
1735            // doesn't distinguish between absolute and relative paths. We have "erased" relativeness. In the short term,
1736            // we could do "remove everything in a folder and re-add", but that requires full rebuilds / destroying the cache.
1737            // If processors / loaders could enumerate dependencies, we could check if the new deps line up with a rename.
1738            // If deps encoded "relativeness" as part of loading, that would also work (this seems like the right call).
1739            // TODO: it would be nice to log an error here for dependents that aren't also being moved + fixed.
1740            // (see the remove impl).
1741            error!(
1742                    "The asset at {old} was removed, but it had assets that depend on it to be processed. Consider updating the path in the following assets: {:?}",
1743                    info.dependents
1744                );
1745            self.non_existent_dependents
1746                .insert(old.clone(), core::mem::take(&mut info.dependents));
1747        }
1748        if let Some(processed_info) = &info.processed_info {
1749            // Update "dependent" lists for this asset's "process dependencies" to use new path.
1750            for dep in &processed_info.process_dependencies {
1751                if let Some(info) = self.infos.get_mut(&dep.path) {
1752                    info.dependents.remove(old);
1753                    info.dependents.insert(new.clone());
1754                } else if let Some(dependents) = self.non_existent_dependents.get_mut(&dep.path) {
1755                    dependents.remove(old);
1756                    dependents.insert(new.clone());
1757                }
1758            }
1759        }
1760        // Tell all listeners this asset no longer exists
1761        info.status_sender
1762            .broadcast(ProcessStatus::NonExistent)
1763            .await
1764            .unwrap();
1765        let new_info = self.get_or_insert(new.clone());
1766        new_info.processed_info = info.processed_info;
1767        new_info.status = info.status;
1768        // Ensure things waiting on the new path are informed of the status of this asset
1769        if let Some(status) = new_info.status {
1770            new_info.status_sender.broadcast(status).await.unwrap();
1771        }
1772        let dependents = new_info.dependents.iter().cloned().collect::<Vec<_>>();
1773        // Queue the asset for a reprocess check, in case it needs new meta.
1774        let _ = new_task_sender
1775            .send((new.source().clone_owned(), new.path().to_owned()))
1776            .await;
1777        for dependent in dependents {
1778            // Queue dependents for reprocessing because they might have been waiting for this asset.
1779            let _ = new_task_sender
1780                .send((
1781                    dependent.source().clone_owned(),
1782                    dependent.path().to_owned(),
1783                ))
1784                .await;
1785        }
1786
1787        Some((
1788            info.file_transaction_lock,
1789            new_info.file_transaction_lock.clone(),
1790        ))
1791    }
1792
1793    fn clear_dependencies(&mut self, asset_path: &AssetPath<'static>, removed_info: ProcessedInfo) {
1794        for old_load_dep in removed_info.process_dependencies {
1795            if let Some(info) = self.infos.get_mut(&old_load_dep.path) {
1796                info.dependents.remove(asset_path);
1797            } else if let Some(dependents) =
1798                self.non_existent_dependents.get_mut(&old_load_dep.path)
1799            {
1800                dependents.remove(asset_path);
1801            }
1802        }
1803    }
1804}
1805
1806/// The current state of the [`AssetProcessor`].
1807#[derive(Copy, Clone, PartialEq, Eq)]
1808pub enum ProcessorState {
1809    /// The processor is still initializing, which involves scanning the current asset folders,
1810    /// constructing an in-memory view of the asset space, recovering from previous errors / crashes,
1811    /// and cleaning up old / unused assets.
1812    Initializing,
1813    /// The processor is currently processing assets.
1814    Processing,
1815    /// The processor has finished processing all valid assets and reporting invalid assets.
1816    Finished,
1817}
1818
1819/// An error that occurs when initializing the [`AssetProcessor`].
1820#[derive(Error, Debug)]
1821pub enum InitializeError {
1822    #[error(transparent)]
1823    FailedToReadSourcePaths(AssetReaderError),
1824    #[error(transparent)]
1825    FailedToReadDestinationPaths(AssetReaderError),
1826    #[error("Failed to validate asset log: {0}")]
1827    ValidateLogError(#[from] ValidateLogError),
1828}
1829
1830/// An error when attempting to set the transaction log factory.
1831#[derive(Error, Debug)]
1832pub enum SetTransactionLogFactoryError {
1833    #[error("Transaction log is already in use so setting the factory does nothing")]
1834    AlreadyInUse,
1835}
1836
1837/// An error when retrieving an asset processor.
1838#[derive(Error, Debug, PartialEq, Eq)]
1839pub enum GetProcessorError {
1840    #[error("The processor '{0}' does not exist")]
1841    Missing(String),
1842    #[error("The processor '{processor_short_name}' is ambiguous between several processors: {ambiguous_processor_names:?}")]
1843    Ambiguous {
1844        processor_short_name: String,
1845        ambiguous_processor_names: Vec<&'static str>,
1846    },
1847}
1848
1849impl From<GetProcessorError> for ProcessError {
1850    fn from(err: GetProcessorError) -> Self {
1851        match err {
1852            GetProcessorError::Missing(name) => Self::MissingProcessor(name),
1853            GetProcessorError::Ambiguous {
1854                processor_short_name,
1855                ambiguous_processor_names,
1856            } => Self::AmbiguousProcessor {
1857                processor_short_name,
1858                ambiguous_processor_names,
1859            },
1860        }
1861    }
1862}
1863
1864#[cfg(test)]
1865mod tests;