//! Gossip agent with UDP transport use crate::broadcast::ActualStateBroadcast; use crate::identity::GossipId; use crate::membership::{MembershipChange, MembershipState}; use crate::runtime::GossipRuntime; use crate::GossipError; use foca::{Config as FocaConfig, Foca, NoCustomBroadcast, PostcardCodec, Timer}; use futures::stream::FuturesUnordered; use futures::StreamExt; use rand::rngs::SmallRng; use rand::SeedableRng; use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; use tokio::net::UdpSocket; use tokio::sync::mpsc; use tracing::{error, info, trace, warn}; /// Default gossip configuration pub fn default_config() -> FocaConfig { FocaConfig::simple() } /// Gossip agent managing the SWIM protocol pub struct GossipAgent { /// Our identity identity: GossipId, /// UDP socket for gossip socket: Arc, /// Membership state membership: Arc, /// Actual state broadcast handler broadcast: Arc, /// Channel for receiving membership changes membership_rx: mpsc::Receiver, /// Channel for receiving outgoing packets outgoing_rx: mpsc::Receiver<(SocketAddr, Vec)>, /// Channel for receiving timer events timer_rx: mpsc::Receiver<(Timer, Duration)>, /// Foca instance foca: Foca, /// Runtime for callbacks runtime: GossipRuntime, } impl GossipAgent { /// Create a new gossip agent pub async fn new(identity: GossipId, config: FocaConfig) -> Result { let socket = UdpSocket::bind(identity.addr) .await .map_err(|e| GossipError::BindFailed(e.to_string()))?; info!(addr = %identity.addr, node_id = identity.node_id, "Gossip agent bound"); let (outgoing_tx, outgoing_rx) = mpsc::channel(1024); let (timer_tx, timer_rx) = mpsc::channel(256); let (membership_tx, membership_rx) = mpsc::channel(256); let runtime = GossipRuntime::new(outgoing_tx, timer_tx, membership_tx); let rng = SmallRng::from_os_rng(); let foca = Foca::new(identity.clone(), config, rng, PostcardCodec); Ok(Self { identity, socket: Arc::new(socket), membership: Arc::new(MembershipState::new()), broadcast: Arc::new(ActualStateBroadcast::new()), membership_rx, outgoing_rx, timer_rx, foca, runtime, }) } /// Get the identity pub fn identity(&self) -> &GossipId { &self.identity } /// Get the membership state pub fn membership(&self) -> &Arc { &self.membership } /// Get the broadcast handler pub fn broadcast(&self) -> &Arc { &self.broadcast } /// Announce to a known cluster member to join pub fn announce(&mut self, addr: SocketAddr) -> Result<(), GossipError> { // Create a probe identity for the target let probe = GossipId::worker(0, addr); self.foca .announce(probe, &mut self.runtime) .map_err(|e| GossipError::JoinFailed(format!("{:?}", e)))?; info!(addr = %addr, "Announced to cluster"); Ok(()) } /// Get current members pub fn members(&self) -> Vec { self.foca.iter_members().map(|m| m.id().clone()).collect() } /// Run the gossip agent pub async fn run(&mut self) -> Result<(), GossipError> { let mut buf = vec![0u8; 65536]; let mut timer_handles = FuturesUnordered::new(); info!(identity = %self.identity, "Starting gossip agent"); loop { tokio::select! { // Handle incoming UDP packets result = self.socket.recv_from(&mut buf) => { match result { Ok((len, addr)) => { trace!(from = %addr, len, "Received gossip packet"); if let Err(e) = self.foca.handle_data(&buf[..len], &mut self.runtime) { warn!(error = ?e, "Failed to handle gossip data"); } } Err(e) => { error!(error = %e, "Failed to receive UDP packet"); } } } // Send outgoing packets Some((addr, data)) = self.outgoing_rx.recv() => { trace!(to = %addr, len = data.len(), "Sending gossip packet"); if let Err(e) = self.socket.send_to(&data, addr).await { warn!(error = %e, to = %addr, "Failed to send UDP packet"); } } // Schedule timers Some((timer, duration)) = self.timer_rx.recv() => { let timer_clone = timer.clone(); timer_handles.push(async move { tokio::time::sleep(duration).await; timer_clone }); } // Fire timers Some(timer) = timer_handles.next() => { if let Err(e) = self.foca.handle_timer(timer, &mut self.runtime) { warn!(error = ?e, "Failed to handle timer"); } } // Handle membership changes Some(change) = self.membership_rx.recv() => { // Also remove state on member down if let MembershipChange::MemberDown(ref id) = change { self.broadcast.remove_state(id.node_id); } self.membership.handle_change(change); } } } } /// Run the agent with graceful shutdown pub async fn run_until_shutdown( mut self, mut shutdown: tokio::sync::broadcast::Receiver<()>, ) -> Result<(), GossipError> { tokio::select! { result = self.run() => result, _ = shutdown.recv() => { info!("Gossip agent shutting down"); Ok(()) } } } } #[cfg(test)] mod tests { use super::*; use chainfire_types::node::NodeRole; async fn create_test_agent(port: u16) -> GossipAgent { let id = GossipId::new( port as u64, format!("127.0.0.1:{}", port).parse().unwrap(), NodeRole::Worker, ); GossipAgent::new(id, default_config()).await.unwrap() } #[tokio::test] async fn test_agent_creation() { let agent = create_test_agent(15000).await; assert_eq!(agent.identity().node_id, 15000); } #[tokio::test] async fn test_membership_empty() { let agent = create_test_agent(15001).await; assert_eq!(agent.membership().count(), 0); } // Note: Full gossip tests require multiple agents communicating // which is complex to set up in unit tests. Integration tests // would be more appropriate for testing actual gossip behavior. }