bevy_ecs/message/message_reader.rs
1#[cfg(feature = "multi_threaded")]
2use crate::message::MessageParIter;
3use crate::{
4 message::{Message, MessageCursor, MessageIterator, MessageIteratorWithId, Messages},
5 system::{Local, Res, SystemParam, SystemParamValidationError},
6};
7
8/// Reads [`Message`]s of type `T` in order and tracks which messages have already been read.
9///
10/// Use [`PopulatedMessageReader<T>`] to skip the system if there are no messages.
11///
12/// # Usage
13/// ```
14/// # use bevy_ecs::prelude::*;
15///
16/// #[derive(Message)]
17/// pub struct MyMessage(String); // Custom message type.
18/// fn my_system(mut reader: MessageReader<MyMessage>) {
19/// for msg in reader.read() {
20/// println!("{}", msg.0)
21/// }
22/// }
23///
24/// # bevy_ecs::system::assert_is_system(my_system);
25/// ```
26///
27/// # Concurrency
28///
29/// Unlike [`MessageWriter<T>`], systems with `MessageReader<T>` param can be executed concurrently
30/// (but not concurrently with `MessageWriter<T>` or `MessageMutator<T>` systems for the same message type).
31///
32/// [`MessageWriter<T>`]: super::MessageWriter
33#[derive(SystemParam, Debug)]
34pub struct MessageReader<'w, 's, M: Message> {
35 pub(super) reader: Local<'s, MessageCursor<M>>,
36 #[system_param(validation_message = "Message not initialized")]
37 messages: Res<'w, Messages<M>>,
38}
39
40impl<'w, 's, M: Message> MessageReader<'w, 's, M> {
41 /// Iterates over the messages this [`MessageReader`] has not seen yet. This updates the
42 /// [`MessageReader`]'s message counter, which means subsequent message reads will not include messages
43 /// that happened before now.
44 pub fn read(&mut self) -> MessageIterator<'_, M> {
45 self.reader.read(&self.messages)
46 }
47
48 /// Like [`read`](Self::read), except also returning the [`MessageId`](super::MessageId) of the messages.
49 pub fn read_with_id(&mut self) -> MessageIteratorWithId<'_, M> {
50 self.reader.read_with_id(&self.messages)
51 }
52
53 /// Returns a parallel iterator over the messages this [`MessageReader`] has not seen yet.
54 /// See also [`for_each`](MessageParIter::for_each).
55 ///
56 /// # Example
57 /// ```
58 /// # use bevy_ecs::prelude::*;
59 /// # use std::sync::atomic::{AtomicUsize, Ordering};
60 ///
61 /// #[derive(Message)]
62 /// struct MyMessage {
63 /// value: usize,
64 /// }
65 ///
66 /// #[derive(Resource, Default)]
67 /// struct Counter(AtomicUsize);
68 ///
69 /// // setup
70 /// let mut world = World::new();
71 /// world.init_resource::<Messages<MyMessage>>();
72 /// world.insert_resource(Counter::default());
73 ///
74 /// let mut schedule = Schedule::default();
75 /// schedule.add_systems(|mut messages: MessageReader<MyMessage>, counter: Res<Counter>| {
76 /// messages.par_read().for_each(|MyMessage { value }| {
77 /// counter.0.fetch_add(*value, Ordering::Relaxed);
78 /// });
79 /// });
80 /// for value in 0..100 {
81 /// world.write_message(MyMessage { value });
82 /// }
83 /// schedule.run(&mut world);
84 /// let Counter(counter) = world.remove_resource::<Counter>().unwrap();
85 /// // all messages were processed
86 /// assert_eq!(counter.into_inner(), 4950);
87 /// ```
88 #[cfg(feature = "multi_threaded")]
89 pub fn par_read(&mut self) -> MessageParIter<'_, M> {
90 self.reader.par_read(&self.messages)
91 }
92
93 /// Determines the number of messages available to be read from this [`MessageReader`] without consuming any.
94 pub fn len(&self) -> usize {
95 self.reader.len(&self.messages)
96 }
97
98 /// Returns `true` if there are no messages available to read.
99 ///
100 /// # Example
101 ///
102 /// The following example shows a useful pattern where some behavior is triggered if new messages are available.
103 /// [`MessageReader::clear()`] is used so the same messages don't re-trigger the behavior the next time the system runs.
104 ///
105 /// ```
106 /// # use bevy_ecs::prelude::*;
107 /// #
108 /// #[derive(Message)]
109 /// struct Collision;
110 ///
111 /// fn play_collision_sound(mut messages: MessageReader<Collision>) {
112 /// if !messages.is_empty() {
113 /// messages.clear();
114 /// // Play a sound
115 /// }
116 /// }
117 /// # bevy_ecs::system::assert_is_system(play_collision_sound);
118 /// ```
119 pub fn is_empty(&self) -> bool {
120 self.reader.is_empty(&self.messages)
121 }
122
123 /// Consumes all available messages.
124 ///
125 /// This means these messages will not appear in calls to [`MessageReader::read()`] or
126 /// [`MessageReader::read_with_id()`] and [`MessageReader::is_empty()`] will return `true`.
127 ///
128 /// For usage, see [`MessageReader::is_empty()`].
129 pub fn clear(&mut self) {
130 self.reader.clear(&self.messages);
131 }
132}
133
134/// Reads [`Message`]s of type `T` in order and tracks which messages have already been read.
135/// Skips the system if there no messages.
136///
137/// Use [`MessageReader<T>`] to run the system even if there are no messages.
138///
139/// Use the [`on_message`](crate::prelude::on_message) run condition to skip the system based on messages that it doesn't read.
140#[derive(Debug)]
141pub struct PopulatedMessageReader<'w, 's, M: Message>(MessageReader<'w, 's, M>);
142
143impl<'w, 's, M: Message> core::ops::Deref for PopulatedMessageReader<'w, 's, M> {
144 type Target = MessageReader<'w, 's, M>;
145
146 fn deref(&self) -> &Self::Target {
147 &self.0
148 }
149}
150
151impl<'w, 's, M: Message> core::ops::DerefMut for PopulatedMessageReader<'w, 's, M> {
152 fn deref_mut(&mut self) -> &mut Self::Target {
153 &mut self.0
154 }
155}
156
157// SAFETY: relies on MessageReader to uphold soundness requirements
158unsafe impl<'w, 's, M: Message> SystemParam for PopulatedMessageReader<'w, 's, M> {
159 type State = <MessageReader<'w, 's, M> as SystemParam>::State;
160 type Item<'world, 'state> = PopulatedMessageReader<'world, 'state, M>;
161
162 fn init_state(world: &mut crate::prelude::World) -> Self::State {
163 MessageReader::<M>::init_state(world)
164 }
165
166 fn init_access(
167 state: &Self::State,
168 system_meta: &mut crate::system::SystemMeta,
169 component_access_set: &mut crate::query::FilteredAccessSet,
170 world: &mut crate::prelude::World,
171 ) {
172 MessageReader::<M>::init_access(state, system_meta, component_access_set, world);
173 }
174
175 unsafe fn get_param<'world, 'state>(
176 state: &'state mut Self::State,
177 system_meta: &crate::system::SystemMeta,
178 world: crate::world::unsafe_world_cell::UnsafeWorldCell<'world>,
179 change_tick: crate::change_detection::Tick,
180 ) -> Result<Self::Item<'world, 'state>, SystemParamValidationError> {
181 // SAFETY: requirements are upheld by MessageReader's implementation
182 let reader = unsafe { MessageReader::get_param(state, system_meta, world, change_tick)? };
183 if reader.is_empty() {
184 Err(SystemParamValidationError::skipped::<Self>(
185 "message queue is empty",
186 ))
187 } else {
188 Ok(PopulatedMessageReader(reader))
189 }
190 }
191}
192
193#[cfg(test)]
194mod tests {
195 use core::sync::atomic::{AtomicBool, Ordering};
196
197 use super::*;
198 use crate::message::MessageRegistry;
199 use crate::prelude::*;
200 use bevy_platform::sync::Arc;
201
202 #[test]
203 fn test_populated_message_reader() {
204 let system_ran = Arc::new(AtomicBool::new(false));
205
206 let mut world = World::new();
207 MessageRegistry::register_message::<TheMessage>(&mut world);
208
209 let mut schedule = Schedule::default();
210 schedule.add_systems({
211 let system_ran = system_ran.clone();
212 move |mut _reader: PopulatedMessageReader<TheMessage>| {
213 system_ran.store(true, Ordering::SeqCst);
214 }
215 });
216
217 schedule.run(&mut world);
218 assert!(
219 !system_ran.load(Ordering::SeqCst),
220 "system with PopulatedMessageReader should have been skipped"
221 );
222
223 world.write_message(TheMessage);
224 schedule.run(&mut world);
225 assert!(
226 system_ran.load(Ordering::SeqCst),
227 "system with PopulatedMessageReader should NOT have been skipped"
228 );
229
230 #[derive(Message)]
231 struct TheMessage;
232 }
233}