//! Watch stream management use crate::WatchRegistry; use chainfire_types::watch::{WatchRequest, WatchResponse}; use std::collections::HashSet; use std::sync::Arc; use tokio::sync::mpsc; use tracing::{debug, trace}; /// Manages watch subscriptions for a single client stream pub struct WatchStream { /// Reference to the global registry registry: Arc, /// Watch IDs owned by this stream active_watches: HashSet, /// Channel for sending events to the client event_tx: mpsc::Sender, } impl WatchStream { /// Create a new watch stream pub fn new(registry: Arc, event_tx: mpsc::Sender) -> Self { Self { registry, active_watches: HashSet::new(), event_tx, } } /// Handle a create watch request pub fn create_watch(&mut self, req: WatchRequest) -> WatchResponse { let watch_id = self.registry.create_watch(req, self.event_tx.clone()); self.active_watches.insert(watch_id); debug!(watch_id, "Stream created watch"); WatchResponse::created(watch_id) } /// Handle a cancel watch request pub fn cancel_watch(&mut self, watch_id: i64) -> WatchResponse { let canceled = if self.active_watches.remove(&watch_id) { self.registry.cancel_watch(watch_id) } else { false }; debug!(watch_id, canceled, "Stream canceled watch"); WatchResponse::canceled(watch_id) } /// Get the number of active watches in this stream pub fn watch_count(&self) -> usize { self.active_watches.len() } /// Get active watch IDs pub fn watch_ids(&self) -> impl Iterator + '_ { self.active_watches.iter().copied() } } impl Drop for WatchStream { fn drop(&mut self) { // Clean up all watches when stream closes for watch_id in self.active_watches.drain() { self.registry.cancel_watch(watch_id); trace!(watch_id, "Cleaned up watch on stream close"); } } } /// Handle for spawning watch event processor pub struct WatchEventHandler { registry: Arc, } impl WatchEventHandler { /// Create a new event handler pub fn new(registry: Arc) -> Self { Self { registry } } /// Spawn a background task that processes watch events pub fn spawn_dispatcher( self, mut event_rx: mpsc::UnboundedReceiver, ) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { while let Some(event) = event_rx.recv().await { self.registry.dispatch_event(event).await; } debug!("Watch event dispatcher stopped"); }) } /// Spawn a background task for progress notifications pub fn spawn_progress_notifier( registry: Arc, interval: std::time::Duration, ) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { let mut ticker = tokio::time::interval(interval); loop { ticker.tick().await; registry.send_progress().await; } }) } } #[cfg(test)] mod tests { use super::*; use chainfire_types::kv::KvEntry; use chainfire_types::watch::{WatchEvent, WatchEventType}; #[tokio::test] async fn test_watch_stream_lifecycle() { let registry = Arc::new(WatchRegistry::new()); let (tx, mut rx) = mpsc::channel(10); let mut stream = WatchStream::new(Arc::clone(®istry), tx); // Create watch let req = WatchRequest::key(0, b"/test"); let response = stream.create_watch(req); assert!(response.created); let watch_id = response.watch_id; assert_eq!(stream.watch_count(), 1); assert_eq!(registry.watch_count(), 1); // Cancel watch let response = stream.cancel_watch(watch_id); assert!(response.canceled); assert_eq!(stream.watch_count(), 0); assert_eq!(registry.watch_count(), 0); } #[tokio::test] async fn test_watch_stream_cleanup_on_drop() { let registry = Arc::new(WatchRegistry::new()); let (tx, _rx) = mpsc::channel(10); { let mut stream = WatchStream::new(Arc::clone(®istry), tx); stream.create_watch(WatchRequest::key(0, b"/a")); stream.create_watch(WatchRequest::key(0, b"/b")); stream.create_watch(WatchRequest::key(0, b"/c")); assert_eq!(registry.watch_count(), 3); } // Stream dropped here // Registry should be cleaned up assert_eq!(registry.watch_count(), 0); } #[tokio::test] async fn test_event_handler() { let registry = Arc::new(WatchRegistry::new()); let (event_tx, event_rx) = mpsc::unbounded_channel(); let (watch_tx, mut watch_rx) = mpsc::channel(10); // Create a watch let req = WatchRequest::key(1, b"/test"); registry.create_watch(req, watch_tx); // Start event handler let handler = WatchEventHandler::new(Arc::clone(®istry)); let handle = handler.spawn_dispatcher(event_rx); // Send an event event_tx .send(WatchEvent { event_type: WatchEventType::Put, kv: KvEntry::new(b"/test".to_vec(), b"value".to_vec(), 1), prev_kv: None, }) .unwrap(); // Should receive the event let response = watch_rx.recv().await.unwrap(); assert_eq!(response.events.len(), 1); // Cleanup drop(event_tx); handle.await.unwrap(); } }