//! Event types and dispatcher use std::sync::Arc; use tokio::sync::broadcast; use chainfire_types::node::NodeInfo; use crate::callbacks::{ClusterEventHandler, KvEventHandler, LeaveReason}; /// Cluster-level events #[derive(Debug, Clone)] pub enum ClusterEvent { /// A node joined the cluster NodeJoined(NodeInfo), /// A node left the cluster NodeLeft { /// The node ID that left node_id: u64, /// Why the node left reason: LeaveReason, }, /// Leadership changed LeaderChanged { /// Previous leader (None if no previous leader) old: Option, /// New leader new: u64, }, /// This node became the leader BecameLeader, /// This node lost leadership LostLeadership, /// Cluster membership changed MembershipChanged(Vec), /// Network partition detected PartitionDetected { /// Nodes that are reachable reachable: Vec, /// Nodes that are unreachable unreachable: Vec, }, /// Cluster is ready ClusterReady, } /// KV store events #[derive(Debug, Clone)] pub enum KvEvent { /// A key was created or updated KeyChanged { /// Namespace of the key namespace: String, /// The key that changed key: Vec, /// New value value: Vec, /// Revision number revision: u64, }, /// A key was deleted KeyDeleted { /// Namespace of the key namespace: String, /// The key that was deleted key: Vec, /// Revision number revision: u64, }, } /// Event dispatcher that manages callbacks and event broadcasting pub struct EventDispatcher { cluster_handlers: Vec>, kv_handlers: Vec>, event_tx: broadcast::Sender, } impl EventDispatcher { /// Create a new event dispatcher pub fn new() -> Self { let (event_tx, _) = broadcast::channel(1024); Self { cluster_handlers: Vec::new(), kv_handlers: Vec::new(), event_tx, } } /// Add a cluster event handler pub fn add_cluster_handler(&mut self, handler: Arc) { self.cluster_handlers.push(handler); } /// Add a KV event handler pub fn add_kv_handler(&mut self, handler: Arc) { self.kv_handlers.push(handler); } /// Get a subscriber for cluster events pub fn subscribe(&self) -> broadcast::Receiver { self.event_tx.subscribe() } /// Dispatch a cluster event to all handlers pub async fn dispatch_cluster_event(&self, event: ClusterEvent) { // Broadcast to channel subscribers let _ = self.event_tx.send(event.clone()); // Call registered handlers match &event { ClusterEvent::NodeJoined(node) => { for handler in &self.cluster_handlers { handler.on_node_joined(node).await; } } ClusterEvent::NodeLeft { node_id, reason } => { for handler in &self.cluster_handlers { handler.on_node_left(*node_id, *reason).await; } } ClusterEvent::LeaderChanged { old, new } => { for handler in &self.cluster_handlers { handler.on_leader_changed(*old, *new).await; } } ClusterEvent::BecameLeader => { for handler in &self.cluster_handlers { handler.on_became_leader().await; } } ClusterEvent::LostLeadership => { for handler in &self.cluster_handlers { handler.on_lost_leadership().await; } } ClusterEvent::MembershipChanged(members) => { for handler in &self.cluster_handlers { handler.on_membership_changed(members).await; } } ClusterEvent::PartitionDetected { reachable, unreachable, } => { for handler in &self.cluster_handlers { handler.on_partition_detected(reachable, unreachable).await; } } ClusterEvent::ClusterReady => { for handler in &self.cluster_handlers { handler.on_cluster_ready().await; } } } } /// Dispatch a KV event to all handlers pub async fn dispatch_kv_event(&self, event: KvEvent) { match &event { KvEvent::KeyChanged { namespace, key, value, revision, } => { for handler in &self.kv_handlers { handler .on_key_changed(namespace, key, value, *revision) .await; } } KvEvent::KeyDeleted { namespace, key, revision, } => { for handler in &self.kv_handlers { handler.on_key_deleted(namespace, key, *revision).await; } } } } } impl Default for EventDispatcher { fn default() -> Self { Self::new() } }