1mod log;
41mod process;
42
43use async_lock::RwLockReadGuardArc;
44pub use log::*;
45pub use process::*;
46
47use crate::{
48 io::{
49 AssetReaderError, AssetSource, AssetSourceBuilders, AssetSourceEvent, AssetSourceId,
50 AssetSources, AssetWriterError, ErasedAssetReader, MissingAssetSourceError,
51 },
52 meta::{
53 get_asset_hash, get_full_asset_hash, AssetAction, AssetActionMinimal, AssetHash, AssetMeta,
54 AssetMetaDyn, AssetMetaMinimal, ProcessedInfo, ProcessedInfoMinimal,
55 },
56 AssetLoadError, AssetMetaCheck, AssetPath, AssetServer, AssetServerMode, DeserializeMetaError,
57 MissingAssetLoaderForExtensionError, UnapprovedPathMode, WriteDefaultMetaError,
58};
59use alloc::{borrow::ToOwned, boxed::Box, string::String, sync::Arc, vec, vec::Vec};
60use bevy_ecs::prelude::*;
61use bevy_platform::{
62 collections::{hash_map::Entry, HashMap, HashSet},
63 sync::{PoisonError, RwLock},
64};
65use bevy_tasks::IoTaskPool;
66use futures_io::ErrorKind;
67use futures_lite::{AsyncWriteExt, StreamExt};
68use futures_util::{select_biased, FutureExt};
69use std::{
70 path::{Path, PathBuf},
71 sync::Mutex,
72};
73use thiserror::Error;
74use tracing::{debug, error, trace, warn};
75
76#[cfg(feature = "trace")]
77use {
78 alloc::string::ToString,
79 tracing::{info_span, instrument::Instrument},
80};
81
82#[derive(Resource, Clone)]
98pub struct AssetProcessor {
99 server: AssetServer,
100 pub(crate) data: Arc<AssetProcessorData>,
101}
102
103pub struct AssetProcessorData {
105 pub(crate) processing_state: Arc<ProcessingState>,
107 log_factory: Mutex<Option<Box<dyn ProcessorTransactionLogFactory>>>,
113 log: async_lock::RwLock<Option<Box<dyn ProcessorTransactionLog>>>,
114 processors: RwLock<Processors>,
116 sources: Arc<AssetSources>,
117}
118
119pub(crate) struct ProcessingState {
121 state: async_lock::RwLock<ProcessorState>,
123 initialized_sender: async_broadcast::Sender<()>,
125 initialized_receiver: async_broadcast::Receiver<()>,
126 finished_sender: async_broadcast::Sender<()>,
128 finished_receiver: async_broadcast::Receiver<()>,
129 asset_infos: async_lock::RwLock<ProcessorAssetInfos>,
131}
132
133#[derive(Default)]
134struct Processors {
135 type_path_to_processor: HashMap<&'static str, Arc<dyn ErasedProcessor>>,
137 short_type_path_to_processor: HashMap<&'static str, ShortTypeProcessorEntry>,
139 file_extension_to_default_processor: HashMap<Box<str>, &'static str>,
142}
143
144enum ShortTypeProcessorEntry {
145 Unique {
147 type_path: &'static str,
149 processor: Arc<dyn ErasedProcessor>,
151 },
152 Ambiguous(Vec<&'static str>),
156}
157
158impl AssetProcessor {
159 pub fn new(
161 sources: &mut AssetSourceBuilders,
162 watch_processed: bool,
163 ) -> (Self, Arc<AssetSources>) {
164 let state = Arc::new(ProcessingState::new());
165 let mut sources = sources.build_sources(true, watch_processed);
166 sources.gate_on_processor(state.clone());
167 let sources = Arc::new(sources);
168
169 let data = Arc::new(AssetProcessorData::new(sources.clone(), state));
170 let server = AssetServer::new_with_meta_check(
172 sources.clone(),
173 AssetServerMode::Processed,
174 AssetMetaCheck::Always,
175 false,
176 UnapprovedPathMode::default(),
177 );
178 (Self { server, data }, sources)
179 }
180
181 pub fn data(&self) -> &Arc<AssetProcessorData> {
183 &self.data
184 }
185
186 pub fn server(&self) -> &AssetServer {
189 &self.server
190 }
191
192 pub async fn get_state(&self) -> ProcessorState {
194 self.data.processing_state.get_state().await
195 }
196
197 #[inline]
199 pub fn get_source<'a>(
200 &self,
201 id: impl Into<AssetSourceId<'a>>,
202 ) -> Result<&AssetSource, MissingAssetSourceError> {
203 self.data.sources.get(id.into())
204 }
205
206 #[inline]
207 pub fn sources(&self) -> &AssetSources {
208 &self.data.sources
209 }
210
211 async fn log_unrecoverable(&self) {
214 let mut log = self.data.log.write().await;
215 let log = log.as_mut().unwrap();
216 log.unrecoverable()
217 .await
218 .map_err(|error| WriteLogError {
219 log_entry: LogEntry::UnrecoverableError,
220 error,
221 })
222 .unwrap();
223 }
224
225 async fn log_begin_processing(&self, path: &AssetPath<'_>) {
228 let mut log = self.data.log.write().await;
229 let log = log.as_mut().unwrap();
230 log.begin_processing(path)
231 .await
232 .map_err(|error| WriteLogError {
233 log_entry: LogEntry::BeginProcessing(path.clone_owned()),
234 error,
235 })
236 .unwrap();
237 }
238
239 async fn log_end_processing(&self, path: &AssetPath<'_>) {
241 let mut log = self.data.log.write().await;
242 let log = log.as_mut().unwrap();
243 log.end_processing(path)
244 .await
245 .map_err(|error| WriteLogError {
246 log_entry: LogEntry::EndProcessing(path.clone_owned()),
247 error,
248 })
249 .unwrap();
250 }
251
252 pub fn start(processor: Res<Self>) {
254 let processor = processor.clone();
255 IoTaskPool::get()
256 .spawn(async move {
257 let start_time = std::time::Instant::now();
258 debug!("Processing Assets");
259
260 processor.initialize().await.unwrap();
261
262 let (new_task_sender, new_task_receiver) = async_channel::unbounded();
263 processor
264 .queue_initial_processing_tasks(&new_task_sender)
265 .await;
266
267 {
270 let processor = processor.clone();
271 let new_task_sender = new_task_sender.clone();
272 IoTaskPool::get()
273 .spawn(async move {
274 processor
275 .execute_processing_tasks(new_task_sender, new_task_receiver)
276 .await;
277 })
278 .detach();
279 }
280
281 processor.data.wait_until_finished().await;
282
283 let end_time = std::time::Instant::now();
284 debug!("Processing finished in {:?}", end_time - start_time);
285
286 debug!("Listening for changes to source assets");
287 processor.spawn_source_change_event_listeners(&new_task_sender);
288 })
289 .detach();
290 }
291
292 async fn queue_initial_processing_tasks(
294 &self,
295 sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
296 ) {
297 for source in self.sources().iter_processed() {
298 self.queue_processing_tasks_for_folder(source, PathBuf::from(""), sender)
299 .await
300 .unwrap();
301 }
302 }
303
304 fn spawn_source_change_event_listeners(
307 &self,
308 sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
309 ) {
310 for source in self.data.sources.iter_processed() {
311 let Some(receiver) = source.event_receiver().cloned() else {
312 continue;
313 };
314 let source_id = source.id();
315 let processor = self.clone();
316 let sender = sender.clone();
317 IoTaskPool::get()
318 .spawn(async move {
319 while let Ok(event) = receiver.recv().await {
320 let Ok(source) = processor.get_source(source_id.clone()) else {
321 return;
322 };
323 processor
324 .handle_asset_source_event(source, event, &sender)
325 .await;
326 }
327 })
328 .detach();
329 }
330 }
331
332 async fn execute_processing_tasks(
339 &self,
340 new_task_sender: async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
341 new_task_receiver: async_channel::Receiver<(AssetSourceId<'static>, PathBuf)>,
342 ) {
343 let new_task_sender = {
347 let weak_sender = new_task_sender.downgrade();
348 drop(new_task_sender);
349 weak_sender
350 };
351
352 if new_task_receiver.is_empty() {
356 self.data
357 .processing_state
358 .set_state(ProcessorState::Finished)
359 .await;
360 }
361 enum ProcessorTaskEvent {
362 Start(AssetSourceId<'static>, PathBuf),
363 Finished,
364 }
365 let (task_finished_sender, task_finished_receiver) = async_channel::unbounded::<()>();
366
367 let mut pending_tasks = 0;
368 while let Ok(event) = {
369 select_biased! {
373 result = new_task_receiver.recv().fuse() => {
374 result.map(|(source_id, path)| ProcessorTaskEvent::Start(source_id, path))
375 },
376 result = task_finished_receiver.recv().fuse() => {
377 result.map(|()| ProcessorTaskEvent::Finished)
378 }
379 }
380 } {
381 match event {
382 ProcessorTaskEvent::Start(source_id, path) => {
383 let Some(new_task_sender) = new_task_sender.upgrade() else {
384 continue;
391 };
392 let processor = self.clone();
393 let task_finished_sender = task_finished_sender.clone();
394 pending_tasks += 1;
395 IoTaskPool::get()
396 .spawn(async move {
397 let Ok(source) = processor.get_source(source_id) else {
398 return;
399 };
400 processor.process_asset(source, path, new_task_sender).await;
401 let _ = task_finished_sender.send(()).await;
403 })
404 .detach();
405 self.data
406 .processing_state
407 .set_state(ProcessorState::Processing)
408 .await;
409 }
410 ProcessorTaskEvent::Finished => {
411 pending_tasks -= 1;
412 if pending_tasks == 0 {
413 self.server.write_infos().consume_handle_drop_events();
415 self.data
416 .processing_state
417 .set_state(ProcessorState::Finished)
418 .await;
419 }
420 }
421 }
422 }
423 }
424
425 pub async fn write_default_meta_file_for_path(
433 &self,
434 path: impl Into<AssetPath<'_>>,
435 ) -> Result<(), WriteDefaultMetaError> {
436 let path = path.into();
437 let Some(processor) = path
438 .get_full_extension()
439 .and_then(|extension| self.get_default_processor(extension))
440 else {
441 return self
442 .server
443 .write_default_loader_meta_file_for_path(path)
444 .await;
445 };
446
447 let short_type_path = processor.short_type_path();
448 let processor_path_kind = if self.get_processor(short_type_path).is_ok() {
451 MetaTypePathKind::Short
452 } else {
453 MetaTypePathKind::Long
454 };
455
456 let meta = processor.default_meta(processor_path_kind);
457 let serialized_meta = meta.serialize();
458
459 let source = self.get_source(path.source())?;
460
461 let reader = source.reader();
465 match reader.read_meta_bytes(path.path()).await {
466 Ok(_) => return Err(WriteDefaultMetaError::MetaAlreadyExists),
467 Err(AssetReaderError::NotFound(_)) => {
468 }
470 Err(AssetReaderError::Io(err)) => {
471 return Err(WriteDefaultMetaError::IoErrorFromExistingMetaCheck(err))
472 }
473 Err(AssetReaderError::HttpError(err)) => {
474 return Err(WriteDefaultMetaError::HttpErrorFromExistingMetaCheck(err))
475 }
476 }
477
478 let writer = source.writer()?;
479 writer
480 .write_meta_bytes(path.path(), &serialized_meta)
481 .await?;
482
483 Ok(())
484 }
485
486 async fn handle_asset_source_event(
487 &self,
488 source: &AssetSource,
489 event: AssetSourceEvent,
490 new_task_sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
491 ) {
492 trace!("{event:?}");
493 match event {
494 AssetSourceEvent::AddedAsset(path)
495 | AssetSourceEvent::AddedMeta(path)
496 | AssetSourceEvent::ModifiedAsset(path)
497 | AssetSourceEvent::ModifiedMeta(path) => {
498 let _ = new_task_sender.send((source.id(), path)).await;
499 }
500 AssetSourceEvent::RemovedAsset(path) => {
501 self.handle_removed_asset(source, path).await;
502 }
503 AssetSourceEvent::RemovedMeta(path) => {
504 self.handle_removed_meta(source, path, new_task_sender)
505 .await;
506 }
507 AssetSourceEvent::AddedFolder(path) => {
508 self.handle_added_folder(source, path, new_task_sender)
509 .await;
510 }
511 AssetSourceEvent::RemovedFolder(path) => {
515 self.handle_removed_folder(source, &path).await;
516 }
517 AssetSourceEvent::RenamedAsset { old, new } => {
518 if old == new {
521 let _ = new_task_sender.send((source.id(), new)).await;
522 } else {
523 self.handle_renamed_asset(source, old, new, new_task_sender)
524 .await;
525 }
526 }
527 AssetSourceEvent::RenamedMeta { old, new } => {
528 if old == new {
531 let _ = new_task_sender.send((source.id(), new)).await;
532 } else {
533 debug!("Meta renamed from {old:?} to {new:?}");
534 let _ = new_task_sender.send((source.id(), old)).await;
537 let _ = new_task_sender.send((source.id(), new)).await;
538 }
539 }
540 AssetSourceEvent::RenamedFolder { old, new } => {
541 if old == new {
544 self.handle_added_folder(source, new, new_task_sender).await;
545 } else {
546 self.handle_removed_folder(source, &old).await;
549 self.handle_added_folder(source, new, new_task_sender).await;
550 }
551 }
552 AssetSourceEvent::RemovedUnknown { path, is_meta } => {
553 let processed_reader = source.ungated_processed_reader().unwrap();
554 match processed_reader.is_directory(&path).await {
555 Ok(is_directory) => {
556 if is_directory {
557 self.handle_removed_folder(source, &path).await;
558 } else if is_meta {
559 self.handle_removed_meta(source, path, new_task_sender)
560 .await;
561 } else {
562 self.handle_removed_asset(source, path).await;
563 }
564 }
565 Err(err) => {
566 match err {
567 AssetReaderError::NotFound(_) => {
568 }
570 AssetReaderError::Io(err) => {
571 error!(
572 "Path '{}' was removed, but the destination reader could not determine if it \
573 was a folder or a file due to the following error: {err}",
574 AssetPath::from_path(&path).with_source(source.id())
575 );
576 }
577 AssetReaderError::HttpError(status) => {
578 error!(
579 "Path '{}' was removed, but the destination reader could not determine if it \
580 was a folder or a file due to receiving an unexpected HTTP Status {status}",
581 AssetPath::from_path(&path).with_source(source.id())
582 );
583 }
584 }
585 }
586 }
587 }
588 }
589 }
590
591 async fn handle_added_folder(
592 &self,
593 source: &AssetSource,
594 path: PathBuf,
595 new_task_sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
596 ) {
597 debug!(
598 "Folder {} was added. Attempting to re-process",
599 AssetPath::from_path(&path).with_source(source.id())
600 );
601 self.queue_processing_tasks_for_folder(source, path, new_task_sender)
602 .await
603 .unwrap();
604 }
605
606 async fn handle_removed_meta(
608 &self,
609 source: &AssetSource,
610 path: PathBuf,
611 new_task_sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
612 ) {
613 debug!(
618 "Meta for asset {} was removed. Attempting to re-process",
619 AssetPath::from_path(&path).with_source(source.id())
620 );
621 let _ = new_task_sender.send((source.id(), path)).await;
622 }
623
624 async fn handle_removed_folder(&self, source: &AssetSource, path: &Path) {
626 debug!(
627 "Removing folder {} because source was removed",
628 path.display()
629 );
630 let processed_reader = source.ungated_processed_reader().unwrap();
631 match processed_reader.read_directory(path).await {
632 Ok(mut path_stream) => {
633 while let Some(child_path) = path_stream.next().await {
634 self.handle_removed_asset(source, child_path).await;
635 }
636 }
637 Err(err) => match err {
638 AssetReaderError::NotFound(_err) => {
639 }
641 AssetReaderError::HttpError(status) => {
642 self.log_unrecoverable().await;
643 error!(
644 "Unrecoverable Error: Failed to read the processed assets at {path:?} in order to remove assets that no longer exist \
645 in the source directory. Restart the asset processor to fully reprocess assets. HTTP Status Code {status}"
646 );
647 }
648 AssetReaderError::Io(err) => {
649 self.log_unrecoverable().await;
650 error!(
651 "Unrecoverable Error: Failed to read the processed assets at {path:?} in order to remove assets that no longer exist \
652 in the source directory. Restart the asset processor to fully reprocess assets. Error: {err}"
653 );
654 }
655 },
656 }
657 let processed_writer = source.processed_writer().unwrap();
658 if let Err(err) = processed_writer.remove_directory(path).await {
659 match err {
660 AssetWriterError::Io(err) => {
661 if err.kind() != ErrorKind::NotFound {
664 let asset_path = AssetPath::from_path(path).with_source(source.id());
665 error!("Failed to remove destination folder that no longer exists in {asset_path}: {err}");
666 }
667 }
668 }
669 }
670 }
671
672 async fn handle_removed_asset(&self, source: &AssetSource, path: PathBuf) {
675 let asset_path = AssetPath::from(path).with_source(source.id());
676 debug!("Removing processed {asset_path} because source was removed");
677 let lock = {
678 let mut infos = self.data.processing_state.asset_infos.write().await;
680 infos.remove(&asset_path).await
681 };
682 let Some(lock) = lock else {
683 return;
684 };
685
686 let _write_lock = lock.write();
689 self.remove_processed_asset_and_meta(source, asset_path.path())
690 .await;
691 }
692
693 async fn handle_renamed_asset(
696 &self,
697 source: &AssetSource,
698 old: PathBuf,
699 new: PathBuf,
700 new_task_sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
701 ) {
702 let old = AssetPath::from(old).with_source(source.id());
703 let new = AssetPath::from(new).with_source(source.id());
704 let processed_writer = source.processed_writer().unwrap();
705 let result = {
706 let mut infos = self.data.processing_state.asset_infos.write().await;
708 infos.rename(&old, &new, new_task_sender).await
709 };
710 let Some((old_lock, new_lock)) = result else {
711 return;
712 };
713 let _old_write_lock = old_lock.write();
716 let _new_write_lock = new_lock.write();
717 processed_writer
718 .rename(old.path(), new.path())
719 .await
720 .unwrap();
721 processed_writer
722 .rename_meta(old.path(), new.path())
723 .await
724 .unwrap();
725 }
726
727 async fn queue_processing_tasks_for_folder(
728 &self,
729 source: &AssetSource,
730 path: PathBuf,
731 new_task_sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
732 ) -> Result<(), AssetReaderError> {
733 if source.reader().is_directory(&path).await? {
734 let mut path_stream = source.reader().read_directory(&path).await?;
735 while let Some(path) = path_stream.next().await {
736 Box::pin(self.queue_processing_tasks_for_folder(source, path, new_task_sender))
737 .await?;
738 }
739 } else {
740 let _ = new_task_sender.send((source.id(), path)).await;
741 }
742 Ok(())
743 }
744
745 pub fn register_processor<P: Process>(&self, processor: P) {
747 let mut processors = self
748 .data
749 .processors
750 .write()
751 .unwrap_or_else(PoisonError::into_inner);
752 let processor = Arc::new(processor);
753 processors
754 .type_path_to_processor
755 .insert(P::type_path(), processor.clone());
756 match processors
757 .short_type_path_to_processor
758 .entry(P::short_type_path())
759 {
760 Entry::Vacant(entry) => {
761 entry.insert(ShortTypeProcessorEntry::Unique {
762 type_path: P::type_path(),
763 processor,
764 });
765 }
766 Entry::Occupied(mut entry) => match entry.get_mut() {
767 ShortTypeProcessorEntry::Unique { type_path, .. } => {
768 let type_path = *type_path;
769 *entry.get_mut() =
770 ShortTypeProcessorEntry::Ambiguous(vec![type_path, P::type_path()]);
771 }
772 ShortTypeProcessorEntry::Ambiguous(type_paths) => {
773 type_paths.push(P::type_path());
774 }
775 },
776 }
777 }
778
779 pub fn set_default_processor<P: Process>(&self, extension: &str) {
781 let mut processors = self
782 .data
783 .processors
784 .write()
785 .unwrap_or_else(PoisonError::into_inner);
786 processors
787 .file_extension_to_default_processor
788 .insert(extension.into(), P::type_path());
789 }
790
791 pub fn get_default_processor(&self, extension: &str) -> Option<Arc<dyn ErasedProcessor>> {
793 let processors = self
794 .data
795 .processors
796 .read()
797 .unwrap_or_else(PoisonError::into_inner);
798 let key = processors
799 .file_extension_to_default_processor
800 .get(extension)?;
801 processors.type_path_to_processor.get(key).cloned()
802 }
803
804 pub fn get_processor(
806 &self,
807 processor_type_name: &str,
808 ) -> Result<Arc<dyn ErasedProcessor>, GetProcessorError> {
809 let processors = self
810 .data
811 .processors
812 .read()
813 .unwrap_or_else(PoisonError::into_inner);
814 if let Some(short_type_processor) = processors
815 .short_type_path_to_processor
816 .get(processor_type_name)
817 {
818 return match short_type_processor {
819 ShortTypeProcessorEntry::Unique { processor, .. } => Ok(processor.clone()),
820 ShortTypeProcessorEntry::Ambiguous(examples) => Err(GetProcessorError::Ambiguous {
821 processor_short_name: processor_type_name.to_owned(),
822 ambiguous_processor_names: examples.clone(),
823 }),
824 };
825 }
826 processors
827 .type_path_to_processor
828 .get(processor_type_name)
829 .cloned()
830 .ok_or_else(|| GetProcessorError::Missing(processor_type_name.to_owned()))
831 }
832
833 async fn initialize(&self) -> Result<(), InitializeError> {
838 self.validate_transaction_log_and_recover().await;
839 let mut asset_infos = self.data.processing_state.asset_infos.write().await;
840
841 async fn get_asset_paths(
844 reader: &dyn ErasedAssetReader,
845 path: PathBuf,
846 paths: &mut Vec<PathBuf>,
847 mut empty_dirs: Option<&mut Vec<PathBuf>>,
848 ) -> Result<bool, AssetReaderError> {
849 if reader.is_directory(&path).await? {
850 let mut path_stream = reader.read_directory(&path).await?;
851 let mut contains_files = false;
852
853 while let Some(child_path) = path_stream.next().await {
854 contains_files |= Box::pin(get_asset_paths(
855 reader,
856 child_path,
857 paths,
858 empty_dirs.as_deref_mut(),
859 ))
860 .await?;
861 }
862 if !contains_files
865 && path.parent().is_some()
866 && let Some(empty_dirs) = empty_dirs
867 {
868 empty_dirs.push(path);
869 }
870 Ok(contains_files)
871 } else {
872 paths.push(path);
873 Ok(true)
874 }
875 }
876
877 for source in self.sources().iter_processed() {
878 let Some(processed_reader) = source.ungated_processed_reader() else {
879 continue;
880 };
881 let Ok(processed_writer) = source.processed_writer() else {
882 continue;
883 };
884 let mut unprocessed_paths = Vec::new();
885 get_asset_paths(
886 source.reader(),
887 PathBuf::from(""),
888 &mut unprocessed_paths,
889 None,
890 )
891 .await
892 .map_err(InitializeError::FailedToReadSourcePaths)?;
893
894 let mut processed_paths = Vec::new();
895 let mut empty_dirs = Vec::new();
896 get_asset_paths(
897 processed_reader,
898 PathBuf::from(""),
899 &mut processed_paths,
900 Some(&mut empty_dirs),
901 )
902 .await
903 .map_err(InitializeError::FailedToReadDestinationPaths)?;
904
905 for empty_dir in empty_dirs {
909 let _ = processed_writer.remove_empty_directory(&empty_dir).await;
911 }
912
913 for path in unprocessed_paths {
914 asset_infos.get_or_insert(AssetPath::from(path).with_source(source.id()));
915 }
916
917 for path in processed_paths {
918 let mut dependencies = Vec::new();
919 let asset_path = AssetPath::from(path).with_source(source.id());
920 if let Some(info) = asset_infos.get_mut(&asset_path) {
921 match processed_reader.read_meta_bytes(asset_path.path()).await {
922 Ok(meta_bytes) => {
923 match ron::de::from_bytes::<ProcessedInfoMinimal>(&meta_bytes) {
924 Ok(minimal) => {
925 trace!(
926 "Populated processed info for asset {asset_path} {:?}",
927 minimal.processed_info
928 );
929
930 if let Some(processed_info) = &minimal.processed_info {
931 for process_dependency_info in
932 &processed_info.process_dependencies
933 {
934 dependencies.push(process_dependency_info.path.clone());
935 }
936 }
937 info.processed_info = minimal.processed_info;
938 }
939 Err(err) => {
940 trace!("Removing processed data for {asset_path} because meta could not be parsed: {err}");
941 self.remove_processed_asset_and_meta(source, asset_path.path())
942 .await;
943 }
944 }
945 }
946 Err(err) => {
947 trace!("Removing processed data for {asset_path} because meta failed to load: {err}");
948 self.remove_processed_asset_and_meta(source, asset_path.path())
949 .await;
950 }
951 }
952 } else {
953 trace!("Removing processed data for non-existent asset {asset_path}");
954 self.remove_processed_asset_and_meta(source, asset_path.path())
955 .await;
956 }
957
958 for dependency in dependencies {
959 asset_infos.add_dependent(&dependency, asset_path.clone());
960 }
961 }
962 }
963
964 self.data
965 .processing_state
966 .set_state(ProcessorState::Processing)
967 .await;
968
969 Ok(())
970 }
971
972 async fn remove_processed_asset_and_meta(&self, source: &AssetSource, path: &Path) {
975 if let Err(err) = source.processed_writer().unwrap().remove(path).await {
976 warn!("Failed to remove non-existent asset {path:?}: {err}");
977 }
978
979 if let Err(err) = source.processed_writer().unwrap().remove_meta(path).await {
980 warn!("Failed to remove non-existent meta {path:?}: {err}");
981 }
982
983 self.clean_empty_processed_ancestor_folders(source, path)
984 .await;
985 }
986
987 async fn clean_empty_processed_ancestor_folders(&self, source: &AssetSource, mut path: &Path) {
988 if path.is_absolute() {
990 error!("Attempted to clean up ancestor folders of an absolute path. This is unsafe so the operation was skipped.");
991 return;
992 }
993 while let Some(parent) = path.parent() {
994 path = parent;
995 if parent == Path::new("") {
996 break;
997 }
998 if source
999 .processed_writer()
1000 .unwrap()
1001 .remove_empty_directory(parent)
1002 .await
1003 .is_err()
1004 {
1005 break;
1007 }
1008 }
1009 }
1010
1011 async fn process_asset(
1019 &self,
1020 source: &AssetSource,
1021 path: PathBuf,
1022 processor_task_event: async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
1023 ) {
1024 let asset_path = AssetPath::from(path).with_source(source.id());
1025 let result = self.process_asset_internal(source, &asset_path).await;
1026 let mut infos = self.data.processing_state.asset_infos.write().await;
1027 infos
1028 .finish_processing(asset_path, result, processor_task_event)
1029 .await;
1030 }
1031
1032 async fn process_asset_internal(
1033 &self,
1034 source: &AssetSource,
1035 asset_path: &AssetPath<'static>,
1036 ) -> Result<ProcessResult, ProcessError> {
1037 debug!("Processing {}", asset_path);
1039 let server = &self.server;
1040 let path = asset_path.path();
1041 let reader = source.reader();
1042
1043 let reader_err = |err| ProcessError::AssetReaderError {
1044 path: asset_path.clone(),
1045 err,
1046 };
1047 let writer_err = |err| ProcessError::AssetWriterError {
1048 path: asset_path.clone(),
1049 err,
1050 };
1051
1052 let (mut source_meta, meta_bytes, processor) = match reader.read_meta_bytes(path).await {
1053 Ok(meta_bytes) => {
1054 let minimal: AssetMetaMinimal = ron::de::from_bytes(&meta_bytes).map_err(|e| {
1055 ProcessError::DeserializeMetaError(DeserializeMetaError::DeserializeMinimal(e))
1056 })?;
1057 let (meta, processor) = match minimal.asset {
1058 AssetActionMinimal::Load { loader } => {
1059 let loader = server.get_asset_loader_with_type_name(&loader).await?;
1060 let meta = loader.deserialize_meta(&meta_bytes)?;
1061 (meta, None)
1062 }
1063 AssetActionMinimal::Process { processor } => {
1064 let processor = self.get_processor(&processor)?;
1065 let meta = processor.deserialize_meta(&meta_bytes)?;
1066 (meta, Some(processor))
1067 }
1068 AssetActionMinimal::Ignore => {
1069 return Ok(ProcessResult::Ignored);
1070 }
1071 };
1072 (meta, meta_bytes, processor)
1073 }
1074 Err(AssetReaderError::NotFound(_path)) => {
1075 let (meta, processor) = if let Some(processor) = asset_path
1076 .get_full_extension()
1077 .and_then(|ext| self.get_default_processor(ext))
1078 {
1079 let meta = processor.default_meta(MetaTypePathKind::Long);
1083 (meta, Some(processor))
1084 } else {
1085 match server.get_path_asset_loader(asset_path.clone()).await {
1086 Ok(loader) => (loader.default_meta(), None),
1087 Err(MissingAssetLoaderForExtensionError { .. }) => {
1088 let meta: Box<dyn AssetMetaDyn> =
1089 Box::new(AssetMeta::<(), ()>::new(AssetAction::Ignore));
1090 (meta, None)
1091 }
1092 }
1093 };
1094 let meta_bytes = meta.serialize();
1095 (meta, meta_bytes, processor)
1096 }
1097 Err(err) => {
1098 return Err(ProcessError::ReadAssetMetaError {
1099 path: asset_path.clone(),
1100 err,
1101 })
1102 }
1103 };
1104
1105 let processed_writer = source.processed_writer()?;
1106
1107 let new_hash = {
1108 let mut reader_for_hash = reader.read(path).await.map_err(reader_err)?;
1111
1112 get_asset_hash(&meta_bytes, &mut reader_for_hash)
1113 .await
1114 .map_err(reader_err)?
1115 };
1116 let mut new_processed_info = ProcessedInfo {
1117 hash: new_hash,
1118 full_hash: new_hash,
1119 process_dependencies: Vec::new(),
1120 };
1121
1122 {
1123 let infos = self.data.processing_state.asset_infos.read().await;
1124 if let Some(current_processed_info) = infos
1125 .get(asset_path)
1126 .and_then(|i| i.processed_info.as_ref())
1127 && current_processed_info.hash == new_hash
1128 {
1129 let mut dependency_changed = false;
1130 for current_dep_info in ¤t_processed_info.process_dependencies {
1131 let live_hash = infos
1132 .get(¤t_dep_info.path)
1133 .and_then(|i| i.processed_info.as_ref())
1134 .map(|i| i.full_hash);
1135 if live_hash != Some(current_dep_info.full_hash) {
1136 dependency_changed = true;
1137 break;
1138 }
1139 }
1140 if !dependency_changed {
1141 return Ok(ProcessResult::SkippedNotChanged);
1142 }
1143 }
1144 }
1145
1146 let _transaction_lock = {
1149 let lock = {
1150 let mut infos = self.data.processing_state.asset_infos.write().await;
1151 let info = infos.get_or_insert(asset_path.clone());
1152 info.file_transaction_lock.clone()
1156 };
1157 lock.write_arc().await
1158 };
1159
1160 self.log_begin_processing(asset_path).await;
1164 if let Some(processor) = processor {
1165 let settings = source_meta.process_settings().unwrap();
1168
1169 let reader_for_process = reader.read(path).await.map_err(reader_err)?;
1179
1180 let mut writer = processed_writer.write(path).await.map_err(writer_err)?;
1181 let mut processed_meta = {
1182 let mut context = ProcessContext::new(
1183 self,
1184 asset_path,
1185 reader_for_process,
1186 &mut new_processed_info,
1187 );
1188 let process = processor.process(&mut context, settings, &mut *writer);
1189 #[cfg(feature = "trace")]
1190 let process = {
1191 let span = info_span!(
1192 "asset processing",
1193 processor = processor.type_path(),
1194 asset = asset_path.to_string(),
1195 );
1196 process.instrument(span)
1197 };
1198 process.await?
1199 };
1200
1201 writer
1202 .flush()
1203 .await
1204 .map_err(|e| ProcessError::AssetWriterError {
1205 path: asset_path.clone(),
1206 err: AssetWriterError::Io(e),
1207 })?;
1208
1209 let full_hash = get_full_asset_hash(
1210 new_hash,
1211 new_processed_info
1212 .process_dependencies
1213 .iter()
1214 .map(|i| i.full_hash),
1215 );
1216 new_processed_info.full_hash = full_hash;
1217 *processed_meta.processed_info_mut() = Some(new_processed_info.clone());
1218 let meta_bytes = processed_meta.serialize();
1219
1220 processed_writer
1221 .write_meta_bytes(path, &meta_bytes)
1222 .await
1223 .map_err(writer_err)?;
1224 } else {
1225 let mut reader_for_copy = reader.read(path).await.map_err(reader_err)?;
1227 let mut writer = processed_writer.write(path).await.map_err(writer_err)?;
1228 futures_lite::io::copy(&mut reader_for_copy, &mut writer)
1229 .await
1230 .map_err(|err| ProcessError::AssetWriterError {
1231 path: asset_path.clone_owned(),
1232 err: err.into(),
1233 })?;
1234 *source_meta.processed_info_mut() = Some(new_processed_info.clone());
1235 let meta_bytes = source_meta.serialize();
1236 processed_writer
1237 .write_meta_bytes(path, &meta_bytes)
1238 .await
1239 .map_err(writer_err)?;
1240 }
1241 self.log_end_processing(asset_path).await;
1242
1243 Ok(ProcessResult::Processed(new_processed_info))
1244 }
1245
1246 async fn validate_transaction_log_and_recover(&self) {
1247 let log_factory = self
1248 .data
1249 .log_factory
1250 .lock()
1251 .unwrap_or_else(PoisonError::into_inner)
1252 .take()
1255 .expect("the asset processor only starts once");
1256 if let Err(err) = validate_transaction_log(log_factory.as_ref()).await {
1257 let state_is_valid = match err {
1258 ValidateLogError::ReadLogError(err) => {
1259 error!("Failed to read processor log file. Processed assets cannot be validated so they must be re-generated {err}");
1260 false
1261 }
1262 ValidateLogError::UnrecoverableError => {
1263 error!("Encountered an unrecoverable error in the last run. Processed assets cannot be validated so they must be re-generated");
1264 false
1265 }
1266 ValidateLogError::EntryErrors(entry_errors) => {
1267 let mut state_is_valid = true;
1268 for entry_error in entry_errors {
1269 match entry_error {
1270 LogEntryError::DuplicateTransaction(_)
1271 | LogEntryError::EndedMissingTransaction(_) => {
1272 error!("{}", entry_error);
1273 state_is_valid = false;
1274 break;
1275 }
1276 LogEntryError::UnfinishedTransaction(path) => {
1277 debug!("Asset {path:?} did not finish processing. Clearing state for that asset");
1278 let mut unrecoverable_err = |message: &dyn core::fmt::Display| {
1279 error!("Failed to remove asset {path:?}: {message}");
1280 state_is_valid = false;
1281 };
1282 let Ok(source) = self.get_source(path.source()) else {
1283 unrecoverable_err(&"AssetSource does not exist");
1284 continue;
1285 };
1286 let Ok(processed_writer) = source.processed_writer() else {
1287 unrecoverable_err(&"AssetSource does not have a processed AssetWriter registered");
1288 continue;
1289 };
1290
1291 if let Err(err) = processed_writer.remove(path.path()).await {
1292 match err {
1293 AssetWriterError::Io(err) => {
1294 if err.kind() != ErrorKind::NotFound {
1296 unrecoverable_err(&err);
1297 }
1298 }
1299 }
1300 }
1301 if let Err(err) = processed_writer.remove_meta(path.path()).await {
1302 match err {
1303 AssetWriterError::Io(err) => {
1304 if err.kind() != ErrorKind::NotFound {
1306 unrecoverable_err(&err);
1307 }
1308 }
1309 }
1310 }
1311 }
1312 }
1313 }
1314 state_is_valid
1315 }
1316 };
1317
1318 if !state_is_valid {
1319 error!("Processed asset transaction log state was invalid and unrecoverable for some reason (see previous logs). Removing processed assets and starting fresh.");
1320 for source in self.sources().iter_processed() {
1321 let Ok(processed_writer) = source.processed_writer() else {
1322 continue;
1323 };
1324 if let Err(err) = processed_writer
1325 .remove_assets_in_directory(Path::new(""))
1326 .await
1327 {
1328 panic!("Processed assets were in a bad state. To correct this, the asset processor attempted to remove all processed assets and start from scratch. This failed. There is no way to continue. Try restarting, or deleting imported asset folder manually. {err}");
1329 }
1330 }
1331 }
1332 }
1333 let mut log = self.data.log.write().await;
1334 *log = match log_factory.create_new_log().await {
1335 Ok(log) => Some(log),
1336 Err(err) => panic!("Failed to initialize asset processor log. This cannot be recovered. Try restarting. If that doesn't work, try deleting processed asset folder. {}", err),
1337 };
1338 }
1339}
1340
1341impl AssetProcessorData {
1342 pub(crate) fn new(sources: Arc<AssetSources>, processing_state: Arc<ProcessingState>) -> Self {
1344 AssetProcessorData {
1345 processing_state,
1346 sources,
1347 log_factory: Mutex::new(Some(Box::new(FileTransactionLogFactory::default()))),
1348 log: Default::default(),
1349 processors: Default::default(),
1350 }
1351 }
1352
1353 pub fn set_log_factory(
1358 &self,
1359 factory: Box<dyn ProcessorTransactionLogFactory>,
1360 ) -> Result<(), SetTransactionLogFactoryError> {
1361 let mut log_factory = self
1362 .log_factory
1363 .lock()
1364 .unwrap_or_else(PoisonError::into_inner);
1365 if log_factory.is_none() {
1366 return Err(SetTransactionLogFactoryError::AlreadyInUse);
1369 }
1370
1371 *log_factory = Some(factory);
1372 Ok(())
1373 }
1374
1375 pub async fn wait_until_processed(&self, path: AssetPath<'static>) -> ProcessStatus {
1377 self.processing_state.wait_until_processed(path).await
1378 }
1379
1380 pub async fn wait_until_initialized(&self) {
1382 self.processing_state.wait_until_initialized().await;
1383 }
1384
1385 pub async fn wait_until_finished(&self) {
1387 self.processing_state.wait_until_finished().await;
1388 }
1389}
1390
1391impl ProcessingState {
1392 fn new() -> Self {
1394 let (mut initialized_sender, initialized_receiver) = async_broadcast::broadcast(1);
1395 let (mut finished_sender, finished_receiver) = async_broadcast::broadcast(1);
1396 initialized_sender.set_overflow(true);
1399 finished_sender.set_overflow(true);
1400
1401 Self {
1402 state: async_lock::RwLock::new(ProcessorState::Initializing),
1403 initialized_sender,
1404 initialized_receiver,
1405 finished_sender,
1406 finished_receiver,
1407 asset_infos: Default::default(),
1408 }
1409 }
1410
1411 async fn set_state(&self, state: ProcessorState) {
1413 let mut state_guard = self.state.write().await;
1414 let last_state = *state_guard;
1415 *state_guard = state;
1416 if last_state != ProcessorState::Finished && state == ProcessorState::Finished {
1417 self.finished_sender.broadcast(()).await.unwrap();
1418 } else if last_state != ProcessorState::Processing && state == ProcessorState::Processing {
1419 self.initialized_sender.broadcast(()).await.unwrap();
1420 }
1421 }
1422
1423 pub(crate) async fn get_state(&self) -> ProcessorState {
1425 *self.state.read().await
1426 }
1427
1428 pub(crate) async fn get_transaction_lock(
1431 &self,
1432 path: &AssetPath<'static>,
1433 ) -> Result<RwLockReadGuardArc<()>, AssetReaderError> {
1434 let lock = {
1435 let infos = self.asset_infos.read().await;
1436 let info = infos
1437 .get(path)
1438 .ok_or_else(|| AssetReaderError::NotFound(path.path().to_owned()))?;
1439 info.file_transaction_lock.clone()
1443 };
1444 Ok(lock.read_arc().await)
1445 }
1446
1447 pub(crate) async fn wait_until_processed(&self, path: AssetPath<'static>) -> ProcessStatus {
1449 self.wait_until_initialized().await;
1450 let mut receiver = {
1451 let infos = self.asset_infos.write().await;
1452 let info = infos.get(&path);
1453 match info {
1454 Some(info) => match info.status {
1455 Some(result) => return result,
1456 None => info.status_receiver.clone(),
1458 },
1459 None => return ProcessStatus::NonExistent,
1460 }
1461 };
1462 receiver.recv().await.unwrap()
1463 }
1464
1465 pub(crate) async fn wait_until_initialized(&self) {
1467 let receiver = {
1468 let state = self.state.read().await;
1469 match *state {
1470 ProcessorState::Initializing => {
1471 Some(self.initialized_receiver.clone())
1473 }
1474 _ => None,
1475 }
1476 };
1477
1478 if let Some(mut receiver) = receiver {
1479 receiver.recv().await.unwrap();
1480 }
1481 }
1482
1483 pub(crate) async fn wait_until_finished(&self) {
1485 let receiver = {
1486 let state = self.state.read().await;
1487 match *state {
1488 ProcessorState::Initializing | ProcessorState::Processing => {
1489 Some(self.finished_receiver.clone())
1491 }
1492 ProcessorState::Finished => None,
1493 }
1494 };
1495
1496 if let Some(mut receiver) = receiver {
1497 receiver.recv().await.unwrap();
1498 }
1499 }
1500}
1501
1502#[derive(Debug, Clone)]
1504pub enum ProcessResult {
1505 Processed(ProcessedInfo),
1506 SkippedNotChanged,
1507 Ignored,
1508}
1509
1510#[derive(Debug, PartialEq, Eq, Copy, Clone)]
1512pub enum ProcessStatus {
1513 Processed,
1514 Failed,
1515 NonExistent,
1516}
1517
1518#[derive(Debug)]
1520pub(crate) struct ProcessorAssetInfo {
1521 processed_info: Option<ProcessedInfo>,
1522 dependents: HashSet<AssetPath<'static>>,
1524 status: Option<ProcessStatus>,
1525 pub(crate) file_transaction_lock: Arc<async_lock::RwLock<()>>,
1535 status_sender: async_broadcast::Sender<ProcessStatus>,
1536 status_receiver: async_broadcast::Receiver<ProcessStatus>,
1537}
1538
1539impl Default for ProcessorAssetInfo {
1540 fn default() -> Self {
1541 let (mut status_sender, status_receiver) = async_broadcast::broadcast(1);
1542 status_sender.set_overflow(true);
1545 Self {
1546 processed_info: Default::default(),
1547 dependents: Default::default(),
1548 file_transaction_lock: Default::default(),
1549 status: None,
1550 status_sender,
1551 status_receiver,
1552 }
1553 }
1554}
1555
1556impl ProcessorAssetInfo {
1557 async fn update_status(&mut self, status: ProcessStatus) {
1558 if self.status != Some(status) {
1559 self.status = Some(status);
1560 self.status_sender.broadcast(status).await.unwrap();
1561 }
1562 }
1563}
1564
1565#[derive(Default, Debug)]
1569pub struct ProcessorAssetInfos {
1570 infos: HashMap<AssetPath<'static>, ProcessorAssetInfo>,
1575 non_existent_dependents: HashMap<AssetPath<'static>, HashSet<AssetPath<'static>>>,
1580}
1581
1582impl ProcessorAssetInfos {
1583 fn get_or_insert(&mut self, asset_path: AssetPath<'static>) -> &mut ProcessorAssetInfo {
1584 self.infos.entry(asset_path.clone()).or_insert_with(|| {
1585 let mut info = ProcessorAssetInfo::default();
1586 if let Some(dependents) = self.non_existent_dependents.remove(&asset_path) {
1588 info.dependents = dependents;
1589 }
1590 info
1591 })
1592 }
1593
1594 pub(crate) fn get(&self, asset_path: &AssetPath<'static>) -> Option<&ProcessorAssetInfo> {
1595 self.infos.get(asset_path)
1596 }
1597
1598 fn get_mut(&mut self, asset_path: &AssetPath<'static>) -> Option<&mut ProcessorAssetInfo> {
1599 self.infos.get_mut(asset_path)
1600 }
1601
1602 fn add_dependent(&mut self, asset_path: &AssetPath<'static>, dependent: AssetPath<'static>) {
1603 if let Some(info) = self.get_mut(asset_path) {
1604 info.dependents.insert(dependent);
1605 } else {
1606 let dependents = self
1607 .non_existent_dependents
1608 .entry(asset_path.clone())
1609 .or_default();
1610 dependents.insert(dependent);
1611 }
1612 }
1613
1614 async fn finish_processing(
1616 &mut self,
1617 asset_path: AssetPath<'static>,
1618 result: Result<ProcessResult, ProcessError>,
1619 reprocess_sender: async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
1620 ) {
1621 match result {
1622 Ok(ProcessResult::Processed(processed_info)) => {
1623 debug!("Finished processing \"{}\"", asset_path);
1624 let old_processed_info = self
1626 .infos
1627 .get_mut(&asset_path)
1628 .and_then(|i| i.processed_info.take());
1629 if let Some(old_processed_info) = old_processed_info {
1630 self.clear_dependencies(&asset_path, old_processed_info);
1631 }
1632
1633 for process_dependency_info in &processed_info.process_dependencies {
1635 self.add_dependent(&process_dependency_info.path, asset_path.to_owned());
1636 }
1637 let info = self.get_or_insert(asset_path);
1638 info.processed_info = Some(processed_info);
1639 info.update_status(ProcessStatus::Processed).await;
1640 let dependents = info.dependents.iter().cloned().collect::<Vec<_>>();
1641 for path in dependents {
1642 let _ = reprocess_sender
1643 .send((path.source().clone_owned(), path.path().to_owned()))
1644 .await;
1645 }
1646 }
1647 Ok(ProcessResult::SkippedNotChanged) => {
1648 debug!("Skipping processing (unchanged) \"{}\"", asset_path);
1649 let info = self.get_mut(&asset_path).expect("info should exist");
1650 info.update_status(ProcessStatus::Processed).await;
1657 }
1658 Ok(ProcessResult::Ignored) => {
1659 debug!("Skipping processing (ignored) \"{}\"", asset_path);
1660 }
1661 Err(ProcessError::ExtensionRequired) => {
1662 }
1664 Err(ProcessError::MissingAssetLoaderForExtension(_)) => {
1665 trace!("No loader found for {asset_path}");
1666 }
1667 Err(ProcessError::AssetReaderError {
1668 err: AssetReaderError::NotFound(_),
1669 ..
1670 }) => {
1671 trace!("No need to process asset {asset_path} because it does not exist");
1673 }
1674 Err(err) => {
1675 error!("Failed to process asset {asset_path}: {err}");
1676 if let ProcessError::AssetLoadError(AssetLoadError::AssetLoaderError(dependency)) =
1678 err
1679 {
1680 let info = self.get_mut(&asset_path).expect("info should exist");
1681 info.processed_info = Some(ProcessedInfo {
1682 hash: AssetHash::default(),
1683 full_hash: AssetHash::default(),
1684 process_dependencies: vec![],
1685 });
1686 self.add_dependent(dependency.path(), asset_path.to_owned());
1687 }
1688
1689 let info = self.get_mut(&asset_path).expect("info should exist");
1690 info.update_status(ProcessStatus::Failed).await;
1691 }
1692 }
1693 }
1694
1695 async fn remove(
1699 &mut self,
1700 asset_path: &AssetPath<'static>,
1701 ) -> Option<Arc<async_lock::RwLock<()>>> {
1702 let info = self.infos.remove(asset_path)?;
1703 if let Some(processed_info) = info.processed_info {
1704 self.clear_dependencies(asset_path, processed_info);
1705 }
1706 info.status_sender
1708 .broadcast(ProcessStatus::NonExistent)
1709 .await
1710 .unwrap();
1711 if !info.dependents.is_empty() {
1712 error!(
1713 "The asset at {asset_path} was removed, but it had assets that depend on it to be processed. Consider updating the path in the following assets: {:?}",
1714 info.dependents
1715 );
1716 self.non_existent_dependents
1717 .insert(asset_path.clone(), info.dependents);
1718 }
1719
1720 Some(info.file_transaction_lock)
1721 }
1722
1723 async fn rename(
1727 &mut self,
1728 old: &AssetPath<'static>,
1729 new: &AssetPath<'static>,
1730 new_task_sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
1731 ) -> Option<(Arc<async_lock::RwLock<()>>, Arc<async_lock::RwLock<()>>)> {
1732 let mut info = self.infos.remove(old)?;
1733 if !info.dependents.is_empty() {
1734 error!(
1742 "The asset at {old} was removed, but it had assets that depend on it to be processed. Consider updating the path in the following assets: {:?}",
1743 info.dependents
1744 );
1745 self.non_existent_dependents
1746 .insert(old.clone(), core::mem::take(&mut info.dependents));
1747 }
1748 if let Some(processed_info) = &info.processed_info {
1749 for dep in &processed_info.process_dependencies {
1751 if let Some(info) = self.infos.get_mut(&dep.path) {
1752 info.dependents.remove(old);
1753 info.dependents.insert(new.clone());
1754 } else if let Some(dependents) = self.non_existent_dependents.get_mut(&dep.path) {
1755 dependents.remove(old);
1756 dependents.insert(new.clone());
1757 }
1758 }
1759 }
1760 info.status_sender
1762 .broadcast(ProcessStatus::NonExistent)
1763 .await
1764 .unwrap();
1765 let new_info = self.get_or_insert(new.clone());
1766 new_info.processed_info = info.processed_info;
1767 new_info.status = info.status;
1768 if let Some(status) = new_info.status {
1770 new_info.status_sender.broadcast(status).await.unwrap();
1771 }
1772 let dependents = new_info.dependents.iter().cloned().collect::<Vec<_>>();
1773 let _ = new_task_sender
1775 .send((new.source().clone_owned(), new.path().to_owned()))
1776 .await;
1777 for dependent in dependents {
1778 let _ = new_task_sender
1780 .send((
1781 dependent.source().clone_owned(),
1782 dependent.path().to_owned(),
1783 ))
1784 .await;
1785 }
1786
1787 Some((
1788 info.file_transaction_lock,
1789 new_info.file_transaction_lock.clone(),
1790 ))
1791 }
1792
1793 fn clear_dependencies(&mut self, asset_path: &AssetPath<'static>, removed_info: ProcessedInfo) {
1794 for old_load_dep in removed_info.process_dependencies {
1795 if let Some(info) = self.infos.get_mut(&old_load_dep.path) {
1796 info.dependents.remove(asset_path);
1797 } else if let Some(dependents) =
1798 self.non_existent_dependents.get_mut(&old_load_dep.path)
1799 {
1800 dependents.remove(asset_path);
1801 }
1802 }
1803 }
1804}
1805
1806#[derive(Copy, Clone, PartialEq, Eq)]
1808pub enum ProcessorState {
1809 Initializing,
1813 Processing,
1815 Finished,
1817}
1818
1819#[derive(Error, Debug)]
1821pub enum InitializeError {
1822 #[error(transparent)]
1823 FailedToReadSourcePaths(AssetReaderError),
1824 #[error(transparent)]
1825 FailedToReadDestinationPaths(AssetReaderError),
1826 #[error("Failed to validate asset log: {0}")]
1827 ValidateLogError(#[from] ValidateLogError),
1828}
1829
1830#[derive(Error, Debug)]
1832pub enum SetTransactionLogFactoryError {
1833 #[error("Transaction log is already in use so setting the factory does nothing")]
1834 AlreadyInUse,
1835}
1836
1837#[derive(Error, Debug, PartialEq, Eq)]
1839pub enum GetProcessorError {
1840 #[error("The processor '{0}' does not exist")]
1841 Missing(String),
1842 #[error("The processor '{processor_short_name}' is ambiguous between several processors: {ambiguous_processor_names:?}")]
1843 Ambiguous {
1844 processor_short_name: String,
1845 ambiguous_processor_names: Vec<&'static str>,
1846 },
1847}
1848
1849impl From<GetProcessorError> for ProcessError {
1850 fn from(err: GetProcessorError) -> Self {
1851 match err {
1852 GetProcessorError::Missing(name) => Self::MissingProcessor(name),
1853 GetProcessorError::Ambiguous {
1854 processor_short_name,
1855 ambiguous_processor_names,
1856 } => Self::AmbiguousProcessor {
1857 processor_short_name,
1858 ambiguous_processor_names,
1859 },
1860 }
1861 }
1862}
1863
1864#[cfg(test)]
1865mod tests;