mojentic/tracer/
event_store.rs1use super::tracer_events::TracerEvent;
7use std::sync::{Arc, Mutex};
8
9pub type EventCallback = Arc<dyn Fn(&dyn TracerEvent) + Send + Sync>;
11
12pub struct EventStore {
21 events: Arc<Mutex<Vec<Box<dyn TracerEvent>>>>,
22 on_store_callback: Option<EventCallback>,
23}
24
25impl EventStore {
26 pub fn new(on_store_callback: Option<EventCallback>) -> Self {
32 Self {
33 events: Arc::new(Mutex::new(Vec::new())),
34 on_store_callback,
35 }
36 }
37
38 pub fn store(&self, event: Box<dyn TracerEvent>) {
46 if let Some(callback) = &self.on_store_callback {
48 callback(event.as_ref());
49 }
50
51 let mut events = self.events.lock().unwrap();
53 events.push(event);
54 }
55
56 pub fn count_events(
68 &self,
69 start_time: Option<f64>,
70 end_time: Option<f64>,
71 filter_func: Option<&dyn super::EventFilterFn>,
72 ) -> usize {
73 let events = self.events.lock().unwrap();
74 let mut count = 0;
75
76 for event in events.iter() {
77 let event_ref = event.as_ref();
78
79 if let Some(start) = start_time {
81 if event_ref.timestamp() < start {
82 continue;
83 }
84 }
85
86 if let Some(end) = end_time {
87 if event_ref.timestamp() > end {
88 continue;
89 }
90 }
91
92 if let Some(filter) = filter_func {
94 if !filter.matches(event_ref) {
95 continue;
96 }
97 }
98
99 count += 1;
100 }
101
102 count
103 }
104
105 pub fn get_event_summaries(
119 &self,
120 start_time: Option<f64>,
121 end_time: Option<f64>,
122 filter_func: Option<&dyn super::EventFilterFn>,
123 ) -> Vec<String> {
124 let events = self.events.lock().unwrap();
125 let mut result = Vec::new();
126
127 for event in events.iter() {
128 let event_ref = event.as_ref();
129
130 if let Some(start) = start_time {
132 if event_ref.timestamp() < start {
133 continue;
134 }
135 }
136
137 if let Some(end) = end_time {
138 if event_ref.timestamp() > end {
139 continue;
140 }
141 }
142
143 if let Some(filter) = filter_func {
145 if !filter.matches(event_ref) {
146 continue;
147 }
148 }
149
150 result.push(event_ref.printable_summary());
151 }
152
153 result
154 }
155
156 pub fn get_last_n_summaries(
167 &self,
168 n: usize,
169 filter_func: Option<&dyn super::EventFilterFn>,
170 ) -> Vec<String> {
171 let events = self.events.lock().unwrap();
172
173 let filtered: Vec<_> = if let Some(filter) = filter_func {
174 events.iter().filter(|e| filter.matches(e.as_ref())).collect()
175 } else {
176 events.iter().collect()
177 };
178
179 let start_idx = if n < filtered.len() {
180 filtered.len() - n
181 } else {
182 0
183 };
184
185 filtered[start_idx..].iter().map(|e| e.as_ref().printable_summary()).collect()
186 }
187
188 pub fn clear(&self) {
190 let mut events = self.events.lock().unwrap();
191 events.clear();
192 }
193
194 pub fn len(&self) -> usize {
196 let events = self.events.lock().unwrap();
197 events.len()
198 }
199
200 pub fn is_empty(&self) -> bool {
202 let events = self.events.lock().unwrap();
203 events.is_empty()
204 }
205}
206
207impl Default for EventStore {
208 fn default() -> Self {
209 Self::new(None)
210 }
211}
212
213#[cfg(test)]
214mod tests {
215 use super::*;
216 use crate::tracer::tracer_events::LlmCallTracerEvent;
217 use std::sync::atomic::{AtomicUsize, Ordering};
218 use std::time::{SystemTime, UNIX_EPOCH};
219
220 fn current_timestamp() -> f64 {
221 SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs_f64()
222 }
223
224 #[test]
225 fn test_store_event() {
226 let store = EventStore::default();
227
228 let event = Box::new(LlmCallTracerEvent {
229 timestamp: current_timestamp(),
230 correlation_id: "test-123".to_string(),
231 source: "test".to_string(),
232 model: "llama3.2".to_string(),
233 messages: vec![],
234 temperature: 1.0,
235 tools: None,
236 });
237
238 store.store(event);
239 assert_eq!(store.len(), 1);
240 }
241
242 #[test]
243 fn test_callback_triggered() {
244 let callback_count = Arc::new(AtomicUsize::new(0));
245 let callback_count_clone = Arc::clone(&callback_count);
246
247 let callback: EventCallback = Arc::new(move |_event| {
248 callback_count_clone.fetch_add(1, Ordering::SeqCst);
249 });
250
251 let store = EventStore::new(Some(callback));
252
253 let event = Box::new(LlmCallTracerEvent {
254 timestamp: current_timestamp(),
255 correlation_id: "test-123".to_string(),
256 source: "test".to_string(),
257 model: "llama3.2".to_string(),
258 messages: vec![],
259 temperature: 1.0,
260 tools: None,
261 });
262
263 store.store(event);
264 assert_eq!(callback_count.load(Ordering::SeqCst), 1);
265 }
266
267 #[test]
268 fn test_clear() {
269 let store = EventStore::default();
270
271 let event = Box::new(LlmCallTracerEvent {
272 timestamp: current_timestamp(),
273 correlation_id: "test-123".to_string(),
274 source: "test".to_string(),
275 model: "llama3.2".to_string(),
276 messages: vec![],
277 temperature: 1.0,
278 tools: None,
279 });
280
281 store.store(event);
282 assert_eq!(store.len(), 1);
283
284 store.clear();
285 assert_eq!(store.len(), 0);
286 assert!(store.is_empty());
287 }
288
289 #[test]
290 fn test_multiple_events() {
291 let store = EventStore::default();
292
293 for i in 0..5 {
294 let event = Box::new(LlmCallTracerEvent {
295 timestamp: current_timestamp(),
296 correlation_id: format!("test-{}", i),
297 source: "test".to_string(),
298 model: "llama3.2".to_string(),
299 messages: vec![],
300 temperature: 1.0,
301 tools: None,
302 });
303 store.store(event);
304 }
305
306 assert_eq!(store.len(), 5);
307 }
308
309 #[test]
310 fn test_len_and_is_empty() {
311 let store = EventStore::default();
312 assert_eq!(store.len(), 0);
313 assert!(store.is_empty());
314
315 let event = Box::new(LlmCallTracerEvent {
316 timestamp: current_timestamp(),
317 correlation_id: "test-123".to_string(),
318 source: "test".to_string(),
319 model: "llama3.2".to_string(),
320 messages: vec![],
321 temperature: 1.0,
322 tools: None,
323 });
324
325 store.store(event);
326 assert_eq!(store.len(), 1);
327 assert!(!store.is_empty());
328 }
329}