rapier3d/data/
pubsub.rs

1//! Publish-subscribe mechanism for internal events.
2
3use std::collections::VecDeque;
4use std::marker::PhantomData;
5
6/// A permanent subscription to a pub-sub queue.
7#[cfg_attr(feature = "serde-serialize", derive(Serialize, Deserialize))]
8#[derive(Clone)]
9pub struct Subscription<T> {
10    // Position on the cursor array.
11    id: u32,
12    _phantom: PhantomData<T>,
13}
14
15#[cfg_attr(feature = "serde-serialize", derive(Serialize, Deserialize))]
16#[derive(Clone)]
17struct PubSubCursor {
18    // Position on the offset array.
19    id: u32,
20    // Index of the next message to read.
21    // NOTE: Having this here is not actually necessary because
22    // this value is supposed to be equal to `offsets[self.id]`.
23    // However, we keep it because it lets us avoid one lookup
24    // on the `offsets` array inside of message-polling loops
25    // based on `read_ith`.
26    next: u32,
27}
28
29impl PubSubCursor {
30    fn id(&self, num_deleted: u32) -> usize {
31        (self.id - num_deleted) as usize
32    }
33
34    fn next(&self, num_deleted: u32) -> usize {
35        (self.next - num_deleted) as usize
36    }
37}
38
39/// A pub-sub queue.
40#[cfg_attr(feature = "serde-serialize", derive(Serialize, Deserialize))]
41#[derive(Clone, Default)]
42pub struct PubSub<T> {
43    deleted_messages: u32,
44    deleted_offsets: u32,
45    messages: VecDeque<T>,
46    offsets: VecDeque<u32>,
47    cursors: Vec<PubSubCursor>,
48}
49
50impl<T> PubSub<T> {
51    /// Create a new empty pub-sub queue.
52    pub fn new() -> Self {
53        Self {
54            deleted_offsets: 0,
55            deleted_messages: 0,
56            messages: VecDeque::new(),
57            offsets: VecDeque::new(),
58            cursors: Vec::new(),
59        }
60    }
61
62    fn reset_shifts(&mut self) {
63        for offset in &mut self.offsets {
64            *offset -= self.deleted_messages;
65        }
66
67        for cursor in &mut self.cursors {
68            cursor.id -= self.deleted_offsets;
69            cursor.next -= self.deleted_messages;
70        }
71
72        self.deleted_offsets = 0;
73        self.deleted_messages = 0;
74    }
75
76    /// Publish a new message.
77    pub fn publish(&mut self, message: T) {
78        if self.offsets.is_empty() {
79            // No subscribers, drop the message.
80            return;
81        }
82
83        self.messages.push_back(message);
84    }
85
86    /// Subscribe to the queue.
87    ///
88    /// A subscription cannot be cancelled.
89    #[must_use]
90    pub fn subscribe(&mut self) -> Subscription<T> {
91        let cursor = PubSubCursor {
92            next: self.messages.len() as u32 + self.deleted_messages,
93            id: self.offsets.len() as u32 + self.deleted_offsets,
94        };
95
96        let subscription = Subscription {
97            id: self.cursors.len() as u32,
98            _phantom: PhantomData,
99        };
100
101        self.offsets.push_back(cursor.next);
102        self.cursors.push(cursor);
103        subscription
104    }
105
106    /// Read the i-th message not yet read by the given subscriber.
107    pub fn read_ith(&self, sub: &Subscription<T>, i: usize) -> Option<&T> {
108        let cursor = &self.cursors[sub.id as usize];
109        self.messages.get(cursor.next(self.deleted_messages) + i)
110    }
111
112    /// Get the messages not yet read by the given subscriber.
113    pub fn read(&self, sub: &Subscription<T>) -> impl Iterator<Item = &T> {
114        let cursor = &self.cursors[sub.id as usize];
115        let next = cursor.next(self.deleted_messages);
116
117        self.messages.range(next..)
118    }
119
120    /// Makes the given subscribe acknowledge all the messages in the queue.
121    ///
122    /// A subscriber cannot read acknowledged messages any more.
123    pub fn ack(&mut self, sub: &Subscription<T>) {
124        // Update the cursor.
125        let cursor = &mut self.cursors[sub.id as usize];
126
127        self.offsets[cursor.id(self.deleted_offsets)] = u32::MAX;
128        cursor.id = self.offsets.len() as u32 + self.deleted_offsets;
129
130        cursor.next = self.messages.len() as u32 + self.deleted_messages;
131        self.offsets.push_back(cursor.next);
132
133        // Now clear the messages we don't need to
134        // maintain in memory anymore.
135        while self.offsets.front() == Some(&u32::MAX) {
136            self.offsets.pop_front();
137            self.deleted_offsets += 1;
138        }
139
140        // There must be at least one offset otherwise
141        // that would mean we have no subscribers.
142        let next = self.offsets.front().unwrap();
143        let num_to_delete = *next - self.deleted_messages;
144
145        for _ in 0..num_to_delete {
146            self.messages.pop_front();
147        }
148
149        self.deleted_messages += num_to_delete;
150
151        if self.deleted_messages > u32::MAX / 2 || self.deleted_offsets > u32::MAX / 2 {
152            // Don't let the deleted_* shifts grow indefinitely otherwise
153            // they will end up overflowing, breaking everything.
154            self.reset_shifts();
155        }
156    }
157}