Skip to main content

bevy_ecs/message/
message_reader.rs

1#[cfg(feature = "multi_threaded")]
2use crate::message::MessageParIter;
3use crate::{
4    message::{Message, MessageCursor, MessageIterator, MessageIteratorWithId, Messages},
5    system::{Local, Res, SystemParam, SystemParamValidationError},
6};
7
8/// Reads [`Message`]s of type `T` in order and tracks which messages have already been read.
9///
10/// Use [`PopulatedMessageReader<T>`] to skip the system if there are no messages.
11///
12/// # Usage
13/// ```
14/// # use bevy_ecs::prelude::*;
15///
16/// #[derive(Message)]
17/// pub struct MyMessage(String); // Custom message type.
18/// fn my_system(mut reader: MessageReader<MyMessage>) {
19///     for msg in reader.read() {
20///         println!("{}", msg.0)
21///     }
22/// }
23///
24/// # bevy_ecs::system::assert_is_system(my_system);
25/// ```
26///
27/// # Concurrency
28///
29/// Unlike [`MessageWriter<T>`], systems with `MessageReader<T>` param can be executed concurrently
30/// (but not concurrently with `MessageWriter<T>` or `MessageMutator<T>` systems for the same message type).
31///
32/// [`MessageWriter<T>`]: super::MessageWriter
33#[derive(SystemParam, Debug)]
34pub struct MessageReader<'w, 's, M: Message> {
35    pub(super) reader: Local<'s, MessageCursor<M>>,
36    #[system_param(validation_message = "Message not initialized")]
37    messages: Res<'w, Messages<M>>,
38}
39
40impl<'w, 's, M: Message> MessageReader<'w, 's, M> {
41    /// Iterates over the messages this [`MessageReader`] has not seen yet. This updates the
42    /// [`MessageReader`]'s message counter, which means subsequent message reads will not include messages
43    /// that happened before now.
44    pub fn read(&mut self) -> MessageIterator<'_, M> {
45        self.reader.read(&self.messages)
46    }
47
48    /// Like [`read`](Self::read), except also returning the [`MessageId`](super::MessageId) of the messages.
49    pub fn read_with_id(&mut self) -> MessageIteratorWithId<'_, M> {
50        self.reader.read_with_id(&self.messages)
51    }
52
53    /// Returns a parallel iterator over the messages this [`MessageReader`] has not seen yet.
54    /// See also [`for_each`](MessageParIter::for_each).
55    ///
56    /// # Example
57    /// ```
58    /// # use bevy_ecs::prelude::*;
59    /// # use std::sync::atomic::{AtomicUsize, Ordering};
60    ///
61    /// #[derive(Message)]
62    /// struct MyMessage {
63    ///     value: usize,
64    /// }
65    ///
66    /// #[derive(Resource, Default)]
67    /// struct Counter(AtomicUsize);
68    ///
69    /// // setup
70    /// let mut world = World::new();
71    /// world.init_resource::<Messages<MyMessage>>();
72    /// world.insert_resource(Counter::default());
73    ///
74    /// let mut schedule = Schedule::default();
75    /// schedule.add_systems(|mut messages: MessageReader<MyMessage>, counter: Res<Counter>| {
76    ///     messages.par_read().for_each(|MyMessage { value }| {
77    ///         counter.0.fetch_add(*value, Ordering::Relaxed);
78    ///     });
79    /// });
80    /// for value in 0..100 {
81    ///     world.write_message(MyMessage { value });
82    /// }
83    /// schedule.run(&mut world);
84    /// let Counter(counter) = world.remove_resource::<Counter>().unwrap();
85    /// // all messages were processed
86    /// assert_eq!(counter.into_inner(), 4950);
87    /// ```
88    #[cfg(feature = "multi_threaded")]
89    pub fn par_read(&mut self) -> MessageParIter<'_, M> {
90        self.reader.par_read(&self.messages)
91    }
92
93    /// Determines the number of messages available to be read from this [`MessageReader`] without consuming any.
94    pub fn len(&self) -> usize {
95        self.reader.len(&self.messages)
96    }
97
98    /// Returns `true` if there are no messages available to read.
99    ///
100    /// # Example
101    ///
102    /// The following example shows a useful pattern where some behavior is triggered if new messages are available.
103    /// [`MessageReader::clear()`] is used so the same messages don't re-trigger the behavior the next time the system runs.
104    ///
105    /// ```
106    /// # use bevy_ecs::prelude::*;
107    /// #
108    /// #[derive(Message)]
109    /// struct Collision;
110    ///
111    /// fn play_collision_sound(mut messages: MessageReader<Collision>) {
112    ///     if !messages.is_empty() {
113    ///         messages.clear();
114    ///         // Play a sound
115    ///     }
116    /// }
117    /// # bevy_ecs::system::assert_is_system(play_collision_sound);
118    /// ```
119    pub fn is_empty(&self) -> bool {
120        self.reader.is_empty(&self.messages)
121    }
122
123    /// Consumes all available messages.
124    ///
125    /// This means these messages will not appear in calls to [`MessageReader::read()`] or
126    /// [`MessageReader::read_with_id()`] and [`MessageReader::is_empty()`] will return `true`.
127    ///
128    /// For usage, see [`MessageReader::is_empty()`].
129    pub fn clear(&mut self) {
130        self.reader.clear(&self.messages);
131    }
132}
133
134/// Reads [`Message`]s of type `T` in order and tracks which messages have already been read.
135/// Skips the system if there no messages.
136///
137/// Use [`MessageReader<T>`] to run the system even if there are no messages.
138///
139/// Use the [`on_message`](crate::prelude::on_message) run condition to skip the system based on messages that it doesn't read.
140#[derive(Debug)]
141pub struct PopulatedMessageReader<'w, 's, M: Message>(MessageReader<'w, 's, M>);
142
143impl<'w, 's, M: Message> core::ops::Deref for PopulatedMessageReader<'w, 's, M> {
144    type Target = MessageReader<'w, 's, M>;
145
146    fn deref(&self) -> &Self::Target {
147        &self.0
148    }
149}
150
151impl<'w, 's, M: Message> core::ops::DerefMut for PopulatedMessageReader<'w, 's, M> {
152    fn deref_mut(&mut self) -> &mut Self::Target {
153        &mut self.0
154    }
155}
156
157// SAFETY: relies on MessageReader to uphold soundness requirements
158unsafe impl<'w, 's, M: Message> SystemParam for PopulatedMessageReader<'w, 's, M> {
159    type State = <MessageReader<'w, 's, M> as SystemParam>::State;
160    type Item<'world, 'state> = PopulatedMessageReader<'world, 'state, M>;
161
162    fn init_state(world: &mut crate::prelude::World) -> Self::State {
163        MessageReader::<M>::init_state(world)
164    }
165
166    fn init_access(
167        state: &Self::State,
168        system_meta: &mut crate::system::SystemMeta,
169        component_access_set: &mut crate::query::FilteredAccessSet,
170        world: &mut crate::prelude::World,
171    ) {
172        MessageReader::<M>::init_access(state, system_meta, component_access_set, world);
173    }
174
175    unsafe fn get_param<'world, 'state>(
176        state: &'state mut Self::State,
177        system_meta: &crate::system::SystemMeta,
178        world: crate::world::unsafe_world_cell::UnsafeWorldCell<'world>,
179        change_tick: crate::change_detection::Tick,
180    ) -> Result<Self::Item<'world, 'state>, SystemParamValidationError> {
181        // SAFETY: requirements are upheld by MessageReader's implementation
182        let reader = unsafe { MessageReader::get_param(state, system_meta, world, change_tick)? };
183        if reader.is_empty() {
184            Err(SystemParamValidationError::skipped::<Self>(
185                "message queue is empty",
186            ))
187        } else {
188            Ok(PopulatedMessageReader(reader))
189        }
190    }
191}
192
193#[cfg(test)]
194mod tests {
195    use core::sync::atomic::{AtomicBool, Ordering};
196
197    use super::*;
198    use crate::message::MessageRegistry;
199    use crate::prelude::*;
200    use bevy_platform::sync::Arc;
201
202    #[test]
203    fn test_populated_message_reader() {
204        let system_ran = Arc::new(AtomicBool::new(false));
205
206        let mut world = World::new();
207        MessageRegistry::register_message::<TheMessage>(&mut world);
208
209        let mut schedule = Schedule::default();
210        schedule.add_systems({
211            let system_ran = system_ran.clone();
212            move |mut _reader: PopulatedMessageReader<TheMessage>| {
213                system_ran.store(true, Ordering::SeqCst);
214            }
215        });
216
217        schedule.run(&mut world);
218        assert!(
219            !system_ran.load(Ordering::SeqCst),
220            "system with PopulatedMessageReader should have been skipped"
221        );
222
223        world.write_message(TheMessage);
224        schedule.run(&mut world);
225        assert!(
226            system_ran.load(Ordering::SeqCst),
227            "system with PopulatedMessageReader should NOT have been skipped"
228        );
229
230        #[derive(Message)]
231        struct TheMessage;
232    }
233}