Skip to main content

bevy_utils/
buffered_channel.rs

1use crate::Parallel;
2use alloc::vec::Vec;
3use async_channel::{Receiver, Sender};
4use core::ops::{Deref, DerefMut};
5
6/// An asynchronous MPSC channel that buffers messages and reuses allocations with thread locals.
7///
8/// This is a building block for efficient parallel worker tasks.
9///
10/// Cache this channel in a system's `Local` to reuse allocated memory.
11///
12/// This is faster than sending each message individually into a channel when communicating between
13/// tasks. Unlike `Parallel`, this allows you to execute a consuming task while producing tasks are
14/// concurrently sending data into the channel, enabling you to run a serial processing consumer
15/// at the same time as many parallel processing producers.
16pub struct BufferedChannel<T: Send> {
17    /// The minimum length of a `Vec` of buffered data before it is sent through the channel.
18    pub chunk_size: usize,
19    /// A pool of reusable vectors to minimize allocations.
20    pool: Parallel<Vec<Vec<T>>>,
21}
22
23impl<T: Send> Default for BufferedChannel<T> {
24    fn default() -> Self {
25        Self {
26            // This was tuned based on benchmarks across a wide range of sizes.
27            chunk_size: 1024,
28            pool: Parallel::default(),
29        }
30    }
31}
32
33impl<T: Send> BufferedChannel<T> {
34    const MAX_POOL_SIZE: usize = 8;
35
36    fn recycle(&self, mut chunk: Vec<T>) {
37        if chunk.capacity() < self.chunk_size {
38            return;
39        }
40        chunk.clear();
41        let mut pool = self.pool.borrow_local_mut();
42        if pool.len() < Self::MAX_POOL_SIZE {
43            // Only push to the pool if it's not full
44            // Avoids memory leak if the sender and receiver never switch threads
45            pool.push(chunk);
46        }
47    }
48}
49
50/// A wrapper around a [`Receiver`] that returns [`RecycledVec`]s to automatically return
51/// buffers to the [`BufferedChannel`] pool.
52pub struct BufferedReceiver<'a, T: Send> {
53    channel: &'a BufferedChannel<T>,
54    rx: Receiver<Vec<T>>,
55}
56
57impl<'a, T: Send> BufferedReceiver<'a, T> {
58    /// Receive a message asynchronously.
59    ///
60    /// The returned [`RecycledVec`] will automatically return the buffer to the pool when dropped.
61    pub async fn recv(&self) -> Result<RecycledVec<'_, T>, async_channel::RecvError> {
62        let buffer = self.rx.recv().await?;
63        Ok(RecycledVec {
64            buffer: Some(buffer),
65            channel: self.channel,
66        })
67    }
68
69    /// Receive a message blocking.
70    ///
71    /// The returned [`RecycledVec`] will automatically return the buffer to the pool when dropped.
72    pub fn recv_blocking(&self) -> Result<RecycledVec<'_, T>, async_channel::RecvError> {
73        #[cfg(all(feature = "std", not(target_family = "wasm")))]
74        let buffer = self.rx.recv_blocking()?;
75        #[cfg(any(not(feature = "std"), target_family = "wasm"))]
76        let buffer = bevy_platform::future::block_on(self.rx.recv())?;
77
78        Ok(RecycledVec {
79            buffer: Some(buffer),
80            channel: self.channel,
81        })
82    }
83}
84
85impl<'a, T: Send> Clone for BufferedReceiver<'a, T> {
86    fn clone(&self) -> Self {
87        Self {
88            channel: self.channel,
89            rx: self.rx.clone(),
90        }
91    }
92}
93
94/// A wrapper around a `Vec<T>` that automatically returns it to the [`BufferedChannel`]'s pool when
95/// dropped.
96pub struct RecycledVec<'a, T: Send> {
97    buffer: Option<Vec<T>>,
98    channel: &'a BufferedChannel<T>,
99}
100
101impl<'a, T: Send> RecycledVec<'a, T> {
102    /// Drains the elements from the buffer as an iterator, keeping the allocation
103    /// so it can be recycled when this [`RecycledVec`] is dropped.
104    pub fn drain(&mut self) -> alloc::vec::Drain<'_, T> {
105        self.buffer.as_mut().unwrap().drain(..)
106    }
107}
108
109impl<'a, T: Send> Deref for RecycledVec<'a, T> {
110    type Target = [T];
111    fn deref(&self) -> &Self::Target {
112        self.buffer.as_ref().unwrap()
113    }
114}
115
116impl<'a, T: Send> DerefMut for RecycledVec<'a, T> {
117    fn deref_mut(&mut self) -> &mut Self::Target {
118        self.buffer.as_mut().unwrap()
119    }
120}
121
122impl<'a, 'b, T: Send> IntoIterator for &'b RecycledVec<'a, T> {
123    type Item = &'b T;
124    type IntoIter = core::slice::Iter<'b, T>;
125
126    fn into_iter(self) -> Self::IntoIter {
127        self.buffer.as_ref().unwrap().iter()
128    }
129}
130
131impl<'a, 'b, T: Send> IntoIterator for &'b mut RecycledVec<'a, T> {
132    type Item = &'b mut T;
133    type IntoIter = core::slice::IterMut<'b, T>;
134
135    fn into_iter(self) -> Self::IntoIter {
136        self.buffer.as_mut().unwrap().iter_mut()
137    }
138}
139
140impl<'a, T: Send> Drop for RecycledVec<'a, T> {
141    fn drop(&mut self) {
142        if let Some(buffer) = self.buffer.take() {
143            self.channel.recycle(buffer);
144        }
145    }
146}
147
148/// A [`BufferedChannel`] sender that buffers messages locally, flushing it when the sender is
149/// dropped or [`BufferedChannel::chunk_size`] is reached.
150pub struct BufferedSender<'a, T: Send> {
151    channel: &'a BufferedChannel<T>,
152    /// We use an `Option` to lazily allocate the buffer or pull from the channel's buffer pool.
153    buffer: Option<Vec<T>>,
154    tx: Sender<Vec<T>>,
155}
156
157impl<T: Send> BufferedChannel<T> {
158    fn get_buffer(&self) -> Vec<T> {
159        self.pool
160            .borrow_local_mut()
161            .pop()
162            .unwrap_or_else(|| Vec::with_capacity(self.chunk_size))
163    }
164
165    /// Create an unbounded channel and return the receiver and sender.
166    ///
167    /// The created channel can hold an unlimited number of messages.
168    pub fn unbounded(&self) -> (BufferedReceiver<'_, T>, BufferedSender<'_, T>) {
169        let (tx, rx) = async_channel::unbounded();
170        (
171            BufferedReceiver { channel: self, rx },
172            BufferedSender {
173                channel: self,
174                buffer: None,
175                tx,
176            },
177        )
178    }
179
180    /// Create a bounded channel and return the receiver and sender.
181    ///
182    /// The created channel has space to hold at most `cap` messages at a time.
183    ///
184    /// # Panics
185    ///
186    /// Capacity must be a positive number. If `cap` is zero, this function will panic.
187    pub fn bounded(&self, cap: usize) -> (BufferedReceiver<'_, T>, BufferedSender<'_, T>) {
188        let (tx, rx) = async_channel::bounded(cap);
189        (
190            BufferedReceiver { channel: self, rx },
191            BufferedSender {
192                channel: self,
193                buffer: None,
194                tx,
195            },
196        )
197    }
198}
199
200impl<'a, T: Send> BufferedSender<'a, T> {
201    /// Send a message asynchronously.
202    ///
203    /// This is buffered and will not be sent into the channel until [`BufferedChannel::chunk_size`]
204    /// messages are accumulated or the sender is dropped.
205    pub async fn send(&mut self, msg: T) -> Result<(), async_channel::SendError<Vec<T>>> {
206        let buffer = self.buffer.get_or_insert_with(|| self.channel.get_buffer());
207        buffer.push(msg);
208        if buffer.len() >= self.channel.chunk_size {
209            let full_buffer = self.buffer.take().unwrap();
210            self.tx.send(full_buffer).await?;
211        }
212        Ok(())
213    }
214
215    /// Send an item blocking.
216    ///
217    /// This is buffered and will not be sent into the channel until [`BufferedChannel::chunk_size`]
218    /// messages are accumulated or the sender is dropped.
219    pub fn send_blocking(&mut self, msg: T) -> Result<(), async_channel::SendError<Vec<T>>> {
220        let buffer = self.buffer.get_or_insert_with(|| self.channel.get_buffer());
221        buffer.push(msg);
222        if buffer.len() >= self.channel.chunk_size {
223            let full_buffer = self.buffer.take().unwrap();
224            #[cfg(all(feature = "std", not(target_family = "wasm")))]
225            self.tx.send_blocking(full_buffer)?;
226            #[cfg(any(not(feature = "std"), target_family = "wasm"))]
227            bevy_platform::future::block_on(self.tx.send(full_buffer))?;
228        }
229        Ok(())
230    }
231
232    /// Flush any remaining messages in the local buffer, sending them into the channel.
233    pub fn flush(&mut self) {
234        if let Some(buffer) = self.buffer.take() {
235            if !buffer.is_empty() {
236                // The allocation is sent through the channel and will be reused when dropped.
237                #[cfg(all(feature = "std", not(target_family = "wasm")))]
238                let _ = self.tx.send_blocking(buffer);
239                #[cfg(any(not(feature = "std"), target_family = "wasm"))]
240                let _ = bevy_platform::future::block_on(self.tx.send(buffer));
241            } else {
242                // If it's empty, just return it to the pool.
243                self.channel.recycle(buffer);
244            }
245        }
246    }
247}
248
249impl<'a, T: Send> Clone for BufferedSender<'a, T> {
250    fn clone(&self) -> Self {
251        Self {
252            channel: self.channel,
253            buffer: None,
254            tx: self.tx.clone(),
255        }
256    }
257}
258
259/// Automatically flush the buffer when a sender is dropped.
260impl<'a, T: Send> Drop for BufferedSender<'a, T> {
261    fn drop(&mut self) {
262        self.flush();
263    }
264}