1use std::collections::VecDeque;
4use std::marker::PhantomData;
5
6#[cfg_attr(feature = "serde-serialize", derive(Serialize, Deserialize))]
8#[derive(Clone)]
9pub struct Subscription<T> {
10 id: u32,
12 _phantom: PhantomData<T>,
13}
14
15#[cfg_attr(feature = "serde-serialize", derive(Serialize, Deserialize))]
16#[derive(Clone)]
17struct PubSubCursor {
18 id: u32,
20 next: u32,
27}
28
29impl PubSubCursor {
30 fn id(&self, num_deleted: u32) -> usize {
31 (self.id - num_deleted) as usize
32 }
33
34 fn next(&self, num_deleted: u32) -> usize {
35 (self.next - num_deleted) as usize
36 }
37}
38
39#[cfg_attr(feature = "serde-serialize", derive(Serialize, Deserialize))]
41#[derive(Clone, Default)]
42pub struct PubSub<T> {
43 deleted_messages: u32,
44 deleted_offsets: u32,
45 messages: VecDeque<T>,
46 offsets: VecDeque<u32>,
47 cursors: Vec<PubSubCursor>,
48}
49
50impl<T> PubSub<T> {
51 pub fn new() -> Self {
53 Self {
54 deleted_offsets: 0,
55 deleted_messages: 0,
56 messages: VecDeque::new(),
57 offsets: VecDeque::new(),
58 cursors: Vec::new(),
59 }
60 }
61
62 fn reset_shifts(&mut self) {
63 for offset in &mut self.offsets {
64 *offset -= self.deleted_messages;
65 }
66
67 for cursor in &mut self.cursors {
68 cursor.id -= self.deleted_offsets;
69 cursor.next -= self.deleted_messages;
70 }
71
72 self.deleted_offsets = 0;
73 self.deleted_messages = 0;
74 }
75
76 pub fn publish(&mut self, message: T) {
78 if self.offsets.is_empty() {
79 return;
81 }
82
83 self.messages.push_back(message);
84 }
85
86 #[must_use]
90 pub fn subscribe(&mut self) -> Subscription<T> {
91 let cursor = PubSubCursor {
92 next: self.messages.len() as u32 + self.deleted_messages,
93 id: self.offsets.len() as u32 + self.deleted_offsets,
94 };
95
96 let subscription = Subscription {
97 id: self.cursors.len() as u32,
98 _phantom: PhantomData,
99 };
100
101 self.offsets.push_back(cursor.next);
102 self.cursors.push(cursor);
103 subscription
104 }
105
106 pub fn read_ith(&self, sub: &Subscription<T>, i: usize) -> Option<&T> {
108 let cursor = &self.cursors[sub.id as usize];
109 self.messages.get(cursor.next(self.deleted_messages) + i)
110 }
111
112 pub fn read(&self, sub: &Subscription<T>) -> impl Iterator<Item = &T> {
114 let cursor = &self.cursors[sub.id as usize];
115 let next = cursor.next(self.deleted_messages);
116
117 self.messages.range(next..)
118 }
119
120 pub fn ack(&mut self, sub: &Subscription<T>) {
124 let cursor = &mut self.cursors[sub.id as usize];
126
127 self.offsets[cursor.id(self.deleted_offsets)] = u32::MAX;
128 cursor.id = self.offsets.len() as u32 + self.deleted_offsets;
129
130 cursor.next = self.messages.len() as u32 + self.deleted_messages;
131 self.offsets.push_back(cursor.next);
132
133 while self.offsets.front() == Some(&u32::MAX) {
136 self.offsets.pop_front();
137 self.deleted_offsets += 1;
138 }
139
140 let next = self.offsets.front().unwrap();
143 let num_to_delete = *next - self.deleted_messages;
144
145 for _ in 0..num_to_delete {
146 self.messages.pop_front();
147 }
148
149 self.deleted_messages += num_to_delete;
150
151 if self.deleted_messages > u32::MAX / 2 || self.deleted_offsets > u32::MAX / 2 {
152 self.reset_shifts();
155 }
156 }
157}