use std::collections::VecDeque;
use std::marker::PhantomData;
#[cfg_attr(feature = "serde-serialize", derive(Serialize, Deserialize))]
#[derive(Clone)]
pub struct Subscription<T> {
id: u32,
_phantom: PhantomData<T>,
}
#[cfg_attr(feature = "serde-serialize", derive(Serialize, Deserialize))]
#[derive(Clone)]
struct PubSubCursor {
id: u32,
next: u32,
}
impl PubSubCursor {
fn id(&self, num_deleted: u32) -> usize {
(self.id - num_deleted) as usize
}
fn next(&self, num_deleted: u32) -> usize {
(self.next - num_deleted) as usize
}
}
#[cfg_attr(feature = "serde-serialize", derive(Serialize, Deserialize))]
#[derive(Clone, Default)]
pub struct PubSub<T> {
deleted_messages: u32,
deleted_offsets: u32,
messages: VecDeque<T>,
offsets: VecDeque<u32>,
cursors: Vec<PubSubCursor>,
}
impl<T> PubSub<T> {
pub fn new() -> Self {
Self {
deleted_offsets: 0,
deleted_messages: 0,
messages: VecDeque::new(),
offsets: VecDeque::new(),
cursors: Vec::new(),
}
}
fn reset_shifts(&mut self) {
for offset in &mut self.offsets {
*offset -= self.deleted_messages;
}
for cursor in &mut self.cursors {
cursor.id -= self.deleted_offsets;
cursor.next -= self.deleted_messages;
}
self.deleted_offsets = 0;
self.deleted_messages = 0;
}
pub fn publish(&mut self, message: T) {
if self.offsets.is_empty() {
return;
}
self.messages.push_back(message);
}
#[must_use]
pub fn subscribe(&mut self) -> Subscription<T> {
let cursor = PubSubCursor {
next: self.messages.len() as u32 + self.deleted_messages,
id: self.offsets.len() as u32 + self.deleted_offsets,
};
let subscription = Subscription {
id: self.cursors.len() as u32,
_phantom: PhantomData,
};
self.offsets.push_back(cursor.next);
self.cursors.push(cursor);
subscription
}
pub fn read_ith(&self, sub: &Subscription<T>, i: usize) -> Option<&T> {
let cursor = &self.cursors[sub.id as usize];
self.messages.get(cursor.next(self.deleted_messages) + i)
}
pub fn read(&self, sub: &Subscription<T>) -> impl Iterator<Item = &T> {
let cursor = &self.cursors[sub.id as usize];
let next = cursor.next(self.deleted_messages);
self.messages.range(next..)
}
pub fn ack(&mut self, sub: &Subscription<T>) {
let cursor = &mut self.cursors[sub.id as usize];
self.offsets[cursor.id(self.deleted_offsets)] = u32::MAX;
cursor.id = self.offsets.len() as u32 + self.deleted_offsets;
cursor.next = self.messages.len() as u32 + self.deleted_messages;
self.offsets.push_back(cursor.next);
while self.offsets.front() == Some(&u32::MAX) {
self.offsets.pop_front();
self.deleted_offsets += 1;
}
let next = self.offsets.front().unwrap();
let num_to_delete = *next - self.deleted_messages;
for _ in 0..num_to_delete {
self.messages.pop_front();
}
self.deleted_messages += num_to_delete;
if self.deleted_messages > u32::MAX / 2 || self.deleted_offsets > u32::MAX / 2 {
self.reset_shifts();
}
}
}