mojentic/tracer/
event_store.rs

1//! Event storage with callbacks and filtering
2//!
3//! This module provides thread-safe event storage with support for callbacks,
4//! filtering by type, time range, and custom predicates.
5
6use super::tracer_events::TracerEvent;
7use std::sync::{Arc, Mutex};
8
9/// Type alias for event callback functions
10pub type EventCallback = Arc<dyn Fn(&dyn TracerEvent) + Send + Sync>;
11
12/// Store for capturing and querying tracer events
13///
14/// EventStore provides thread-safe storage for tracer events with support for:
15/// - Callbacks triggered on each stored event
16/// - Filtering by event type
17/// - Filtering by time range
18/// - Custom filter predicates
19/// - Query for last N events
20pub struct EventStore {
21    events: Arc<Mutex<Vec<Box<dyn TracerEvent>>>>,
22    on_store_callback: Option<EventCallback>,
23}
24
25impl EventStore {
26    /// Create a new event store
27    ///
28    /// # Arguments
29    ///
30    /// * `on_store_callback` - Optional callback function called whenever an event is stored
31    pub fn new(on_store_callback: Option<EventCallback>) -> Self {
32        Self {
33            events: Arc::new(Mutex::new(Vec::new())),
34            on_store_callback,
35        }
36    }
37
38    /// Store an event in the event store
39    ///
40    /// If a callback is configured, it will be called with the stored event.
41    ///
42    /// # Arguments
43    ///
44    /// * `event` - The event to store
45    pub fn store(&self, event: Box<dyn TracerEvent>) {
46        // Trigger callback before storing (if exists)
47        if let Some(callback) = &self.on_store_callback {
48            callback(event.as_ref());
49        }
50
51        // Store the event
52        let mut events = self.events.lock().unwrap();
53        events.push(event);
54    }
55
56    /// Count events matching filters
57    ///
58    /// # Arguments
59    ///
60    /// * `start_time` - Include events with timestamp >= start_time
61    /// * `end_time` - Include events with timestamp <= end_time
62    /// * `filter_func` - Custom filter function to apply to events
63    ///
64    /// # Returns
65    ///
66    /// Number of events matching the filter criteria
67    pub fn count_events(
68        &self,
69        start_time: Option<f64>,
70        end_time: Option<f64>,
71        filter_func: Option<&dyn super::EventFilterFn>,
72    ) -> usize {
73        let events = self.events.lock().unwrap();
74        let mut count = 0;
75
76        for event in events.iter() {
77            let event_ref = event.as_ref();
78
79            // Filter by time range
80            if let Some(start) = start_time {
81                if event_ref.timestamp() < start {
82                    continue;
83                }
84            }
85
86            if let Some(end) = end_time {
87                if event_ref.timestamp() > end {
88                    continue;
89                }
90            }
91
92            // Apply custom filter function
93            if let Some(filter) = filter_func {
94                if !filter.matches(event_ref) {
95                    continue;
96                }
97            }
98
99            count += 1;
100        }
101
102        count
103    }
104
105    /// Get summaries of events matching filters
106    ///
107    /// Returns printable summaries instead of cloning events
108    ///
109    /// # Arguments
110    ///
111    /// * `start_time` - Include events with timestamp >= start_time
112    /// * `end_time` - Include events with timestamp <= end_time
113    /// * `filter_func` - Custom filter function to apply to events
114    ///
115    /// # Returns
116    ///
117    /// Vector of event summaries matching the filter criteria
118    pub fn get_event_summaries(
119        &self,
120        start_time: Option<f64>,
121        end_time: Option<f64>,
122        filter_func: Option<&dyn super::EventFilterFn>,
123    ) -> Vec<String> {
124        let events = self.events.lock().unwrap();
125        let mut result = Vec::new();
126
127        for event in events.iter() {
128            let event_ref = event.as_ref();
129
130            // Filter by time range
131            if let Some(start) = start_time {
132                if event_ref.timestamp() < start {
133                    continue;
134                }
135            }
136
137            if let Some(end) = end_time {
138                if event_ref.timestamp() > end {
139                    continue;
140                }
141            }
142
143            // Apply custom filter function
144            if let Some(filter) = filter_func {
145                if !filter.matches(event_ref) {
146                    continue;
147                }
148            }
149
150            result.push(event_ref.printable_summary());
151        }
152
153        result
154    }
155
156    /// Get the last N event summaries, optionally filtered
157    ///
158    /// # Arguments
159    ///
160    /// * `n` - Number of events to return
161    /// * `filter_func` - Optional custom filter function
162    ///
163    /// # Returns
164    ///
165    /// Vector of the last N event summaries matching the filter criteria
166    pub fn get_last_n_summaries(
167        &self,
168        n: usize,
169        filter_func: Option<&dyn super::EventFilterFn>,
170    ) -> Vec<String> {
171        let events = self.events.lock().unwrap();
172
173        let filtered: Vec<_> = if let Some(filter) = filter_func {
174            events.iter().filter(|e| filter.matches(e.as_ref())).collect()
175        } else {
176            events.iter().collect()
177        };
178
179        let start_idx = if n < filtered.len() {
180            filtered.len() - n
181        } else {
182            0
183        };
184
185        filtered[start_idx..].iter().map(|e| e.as_ref().printable_summary()).collect()
186    }
187
188    /// Clear all events from the store
189    pub fn clear(&self) {
190        let mut events = self.events.lock().unwrap();
191        events.clear();
192    }
193
194    /// Get the total number of events in the store
195    pub fn len(&self) -> usize {
196        let events = self.events.lock().unwrap();
197        events.len()
198    }
199
200    /// Check if the event store is empty
201    pub fn is_empty(&self) -> bool {
202        let events = self.events.lock().unwrap();
203        events.is_empty()
204    }
205}
206
207impl Default for EventStore {
208    fn default() -> Self {
209        Self::new(None)
210    }
211}
212
213#[cfg(test)]
214mod tests {
215    use super::*;
216    use crate::tracer::tracer_events::LlmCallTracerEvent;
217    use std::sync::atomic::{AtomicUsize, Ordering};
218    use std::time::{SystemTime, UNIX_EPOCH};
219
220    fn current_timestamp() -> f64 {
221        SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs_f64()
222    }
223
224    #[test]
225    fn test_store_event() {
226        let store = EventStore::default();
227
228        let event = Box::new(LlmCallTracerEvent {
229            timestamp: current_timestamp(),
230            correlation_id: "test-123".to_string(),
231            source: "test".to_string(),
232            model: "llama3.2".to_string(),
233            messages: vec![],
234            temperature: 1.0,
235            tools: None,
236        });
237
238        store.store(event);
239        assert_eq!(store.len(), 1);
240    }
241
242    #[test]
243    fn test_callback_triggered() {
244        let callback_count = Arc::new(AtomicUsize::new(0));
245        let callback_count_clone = Arc::clone(&callback_count);
246
247        let callback: EventCallback = Arc::new(move |_event| {
248            callback_count_clone.fetch_add(1, Ordering::SeqCst);
249        });
250
251        let store = EventStore::new(Some(callback));
252
253        let event = Box::new(LlmCallTracerEvent {
254            timestamp: current_timestamp(),
255            correlation_id: "test-123".to_string(),
256            source: "test".to_string(),
257            model: "llama3.2".to_string(),
258            messages: vec![],
259            temperature: 1.0,
260            tools: None,
261        });
262
263        store.store(event);
264        assert_eq!(callback_count.load(Ordering::SeqCst), 1);
265    }
266
267    #[test]
268    fn test_clear() {
269        let store = EventStore::default();
270
271        let event = Box::new(LlmCallTracerEvent {
272            timestamp: current_timestamp(),
273            correlation_id: "test-123".to_string(),
274            source: "test".to_string(),
275            model: "llama3.2".to_string(),
276            messages: vec![],
277            temperature: 1.0,
278            tools: None,
279        });
280
281        store.store(event);
282        assert_eq!(store.len(), 1);
283
284        store.clear();
285        assert_eq!(store.len(), 0);
286        assert!(store.is_empty());
287    }
288
289    #[test]
290    fn test_multiple_events() {
291        let store = EventStore::default();
292
293        for i in 0..5 {
294            let event = Box::new(LlmCallTracerEvent {
295                timestamp: current_timestamp(),
296                correlation_id: format!("test-{}", i),
297                source: "test".to_string(),
298                model: "llama3.2".to_string(),
299                messages: vec![],
300                temperature: 1.0,
301                tools: None,
302            });
303            store.store(event);
304        }
305
306        assert_eq!(store.len(), 5);
307    }
308
309    #[test]
310    fn test_len_and_is_empty() {
311        let store = EventStore::default();
312        assert_eq!(store.len(), 0);
313        assert!(store.is_empty());
314
315        let event = Box::new(LlmCallTracerEvent {
316            timestamp: current_timestamp(),
317            correlation_id: "test-123".to_string(),
318            source: "test".to_string(),
319            model: "llama3.2".to_string(),
320            messages: vec![],
321            temperature: 1.0,
322            tools: None,
323        });
324
325        store.store(event);
326        assert_eq!(store.len(), 1);
327        assert!(!store.is_empty());
328    }
329}