1use crate::Parallel;
2use alloc::vec::Vec;
3use async_channel::{Receiver, Sender};
4use core::ops::{Deref, DerefMut};
5
6pub struct BufferedChannel<T: Send> {
17 pub chunk_size: usize,
19 pool: Parallel<Vec<Vec<T>>>,
21}
22
23impl<T: Send> Default for BufferedChannel<T> {
24 fn default() -> Self {
25 Self {
26 chunk_size: 1024,
28 pool: Parallel::default(),
29 }
30 }
31}
32
33impl<T: Send> BufferedChannel<T> {
34 const MAX_POOL_SIZE: usize = 8;
35
36 fn recycle(&self, mut chunk: Vec<T>) {
37 if chunk.capacity() < self.chunk_size {
38 return;
39 }
40 chunk.clear();
41 let mut pool = self.pool.borrow_local_mut();
42 if pool.len() < Self::MAX_POOL_SIZE {
43 pool.push(chunk);
46 }
47 }
48}
49
50pub struct BufferedReceiver<'a, T: Send> {
53 channel: &'a BufferedChannel<T>,
54 rx: Receiver<Vec<T>>,
55}
56
57impl<'a, T: Send> BufferedReceiver<'a, T> {
58 pub async fn recv(&self) -> Result<RecycledVec<'_, T>, async_channel::RecvError> {
62 let buffer = self.rx.recv().await?;
63 Ok(RecycledVec {
64 buffer: Some(buffer),
65 channel: self.channel,
66 })
67 }
68
69 pub fn recv_blocking(&self) -> Result<RecycledVec<'_, T>, async_channel::RecvError> {
73 #[cfg(all(feature = "std", not(target_family = "wasm")))]
74 let buffer = self.rx.recv_blocking()?;
75 #[cfg(any(not(feature = "std"), target_family = "wasm"))]
76 let buffer = bevy_platform::future::block_on(self.rx.recv())?;
77
78 Ok(RecycledVec {
79 buffer: Some(buffer),
80 channel: self.channel,
81 })
82 }
83}
84
85impl<'a, T: Send> Clone for BufferedReceiver<'a, T> {
86 fn clone(&self) -> Self {
87 Self {
88 channel: self.channel,
89 rx: self.rx.clone(),
90 }
91 }
92}
93
94pub struct RecycledVec<'a, T: Send> {
97 buffer: Option<Vec<T>>,
98 channel: &'a BufferedChannel<T>,
99}
100
101impl<'a, T: Send> RecycledVec<'a, T> {
102 pub fn drain(&mut self) -> alloc::vec::Drain<'_, T> {
105 self.buffer.as_mut().unwrap().drain(..)
106 }
107}
108
109impl<'a, T: Send> Deref for RecycledVec<'a, T> {
110 type Target = [T];
111 fn deref(&self) -> &Self::Target {
112 self.buffer.as_ref().unwrap()
113 }
114}
115
116impl<'a, T: Send> DerefMut for RecycledVec<'a, T> {
117 fn deref_mut(&mut self) -> &mut Self::Target {
118 self.buffer.as_mut().unwrap()
119 }
120}
121
122impl<'a, 'b, T: Send> IntoIterator for &'b RecycledVec<'a, T> {
123 type Item = &'b T;
124 type IntoIter = core::slice::Iter<'b, T>;
125
126 fn into_iter(self) -> Self::IntoIter {
127 self.buffer.as_ref().unwrap().iter()
128 }
129}
130
131impl<'a, 'b, T: Send> IntoIterator for &'b mut RecycledVec<'a, T> {
132 type Item = &'b mut T;
133 type IntoIter = core::slice::IterMut<'b, T>;
134
135 fn into_iter(self) -> Self::IntoIter {
136 self.buffer.as_mut().unwrap().iter_mut()
137 }
138}
139
140impl<'a, T: Send> Drop for RecycledVec<'a, T> {
141 fn drop(&mut self) {
142 if let Some(buffer) = self.buffer.take() {
143 self.channel.recycle(buffer);
144 }
145 }
146}
147
148pub struct BufferedSender<'a, T: Send> {
151 channel: &'a BufferedChannel<T>,
152 buffer: Option<Vec<T>>,
154 tx: Sender<Vec<T>>,
155}
156
157impl<T: Send> BufferedChannel<T> {
158 fn get_buffer(&self) -> Vec<T> {
159 self.pool
160 .borrow_local_mut()
161 .pop()
162 .unwrap_or_else(|| Vec::with_capacity(self.chunk_size))
163 }
164
165 pub fn unbounded(&self) -> (BufferedReceiver<'_, T>, BufferedSender<'_, T>) {
169 let (tx, rx) = async_channel::unbounded();
170 (
171 BufferedReceiver { channel: self, rx },
172 BufferedSender {
173 channel: self,
174 buffer: None,
175 tx,
176 },
177 )
178 }
179
180 pub fn bounded(&self, cap: usize) -> (BufferedReceiver<'_, T>, BufferedSender<'_, T>) {
188 let (tx, rx) = async_channel::bounded(cap);
189 (
190 BufferedReceiver { channel: self, rx },
191 BufferedSender {
192 channel: self,
193 buffer: None,
194 tx,
195 },
196 )
197 }
198}
199
200impl<'a, T: Send> BufferedSender<'a, T> {
201 pub async fn send(&mut self, msg: T) -> Result<(), async_channel::SendError<Vec<T>>> {
206 let buffer = self.buffer.get_or_insert_with(|| self.channel.get_buffer());
207 buffer.push(msg);
208 if buffer.len() >= self.channel.chunk_size {
209 let full_buffer = self.buffer.take().unwrap();
210 self.tx.send(full_buffer).await?;
211 }
212 Ok(())
213 }
214
215 pub fn send_blocking(&mut self, msg: T) -> Result<(), async_channel::SendError<Vec<T>>> {
220 let buffer = self.buffer.get_or_insert_with(|| self.channel.get_buffer());
221 buffer.push(msg);
222 if buffer.len() >= self.channel.chunk_size {
223 let full_buffer = self.buffer.take().unwrap();
224 #[cfg(all(feature = "std", not(target_family = "wasm")))]
225 self.tx.send_blocking(full_buffer)?;
226 #[cfg(any(not(feature = "std"), target_family = "wasm"))]
227 bevy_platform::future::block_on(self.tx.send(full_buffer))?;
228 }
229 Ok(())
230 }
231
232 pub fn flush(&mut self) {
234 if let Some(buffer) = self.buffer.take() {
235 if !buffer.is_empty() {
236 #[cfg(all(feature = "std", not(target_family = "wasm")))]
238 let _ = self.tx.send_blocking(buffer);
239 #[cfg(any(not(feature = "std"), target_family = "wasm"))]
240 let _ = bevy_platform::future::block_on(self.tx.send(buffer));
241 } else {
242 self.channel.recycle(buffer);
244 }
245 }
246 }
247}
248
249impl<'a, T: Send> Clone for BufferedSender<'a, T> {
250 fn clone(&self) -> Self {
251 Self {
252 channel: self.channel,
253 buffer: None,
254 tx: self.tx.clone(),
255 }
256 }
257}
258
259impl<'a, T: Send> Drop for BufferedSender<'a, T> {
261 fn drop(&mut self) {
262 self.flush();
263 }
264}