//! L4 TCP Data Plane for FiberLB //! //! Handles TCP proxy functionality with round-robin backend selection. use std::collections::HashMap; use std::net::SocketAddr; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::{oneshot, RwLock}; use tokio::task::JoinHandle; use crate::maglev::MaglevTable; use crate::metadata::LbMetadataStore; use fiberlb_types::{Backend, BackendStatus, ListenerId, Listener, PoolId, PoolAlgorithm, BackendAdminState}; /// Result type for data plane operations pub type Result = std::result::Result; /// Data plane error types #[derive(Debug, thiserror::Error)] pub enum DataPlaneError { #[error("Listener not found: {0}")] ListenerNotFound(String), #[error("Pool not found: {0}")] PoolNotFound(String), #[error("No healthy backends available")] NoHealthyBackends, #[error("Listener already running: {0}")] ListenerAlreadyRunning(String), #[error("Bind error: {0}")] BindError(String), #[error("IO error: {0}")] IoError(#[from] std::io::Error), #[error("Metadata error: {0}")] MetadataError(String), } /// Handle for a running listener struct ListenerHandle { task: JoinHandle<()>, shutdown: oneshot::Sender<()>, } #[derive(Clone)] struct CachedPool { algorithm: PoolAlgorithm, healthy_backends: Vec, cached_at: Instant, } /// L4 TCP Data Plane pub struct DataPlane { metadata: Arc, listeners: Arc>>, pool_cache: Arc>>, } impl DataPlane { const POOL_CACHE_TTL: Duration = Duration::from_secs(1); /// Create a new data plane pub fn new(metadata: Arc) -> Self { Self { metadata, listeners: Arc::new(RwLock::new(HashMap::new())), pool_cache: Arc::new(RwLock::new(HashMap::new())), } } /// Start a listener by ID pub async fn start_listener(&self, listener_id: ListenerId) -> Result<()> { // Check if already running { let listeners = self.listeners.read().await; if listeners.contains_key(&listener_id) { return Err(DataPlaneError::ListenerAlreadyRunning(listener_id.to_string())); } } // Find the listener config - need to scan all LBs let listener = self.find_listener(&listener_id).await?; // Get the default pool let pool_id = listener .default_pool_id .ok_or_else(|| DataPlaneError::PoolNotFound("no default pool configured".into()))?; // Bind to listener address let bind_addr: SocketAddr = format!("0.0.0.0:{}", listener.port) .parse() .map_err(|e| DataPlaneError::BindError(format!("invalid port: {}", e)))?; let tcp_listener = TcpListener::bind(bind_addr) .await .map_err(|e| DataPlaneError::BindError(format!("bind failed: {}", e)))?; tracing::info!("Listener {} started on {}", listener_id, bind_addr); // Create shutdown channel let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); // Clone required state for the task let metadata = self.metadata.clone(); let pool_cache = self.pool_cache.clone(); let listener_id_clone = listener_id; // Spawn listener task let task = tokio::spawn(async move { loop { tokio::select! { accept_result = tcp_listener.accept() => { match accept_result { Ok((stream, peer_addr)) => { tracing::debug!("Accepted connection from {}", peer_addr); let metadata = metadata.clone(); let pool_cache = pool_cache.clone(); let pool_id = pool_id; // Spawn connection handler tokio::spawn(async move { if let Err(e) = Self::handle_connection( stream, peer_addr, metadata, pool_cache, pool_id, ).await { tracing::debug!("Connection handler error: {}", e); } }); } Err(e) => { tracing::error!("Accept error: {}", e); } } } _ = &mut shutdown_rx => { tracing::info!("Listener {} shutting down", listener_id_clone); break; } } } }); // Store handle { let mut listeners = self.listeners.write().await; listeners.insert(listener_id, ListenerHandle { task, shutdown: shutdown_tx, }); } Ok(()) } /// Stop a listener by ID pub async fn stop_listener(&self, listener_id: &ListenerId) -> Result<()> { let handle = { let mut listeners = self.listeners.write().await; listeners.remove(listener_id) .ok_or_else(|| DataPlaneError::ListenerNotFound(listener_id.to_string()))? }; // Send shutdown signal let _ = handle.shutdown.send(()); // Wait for task to complete (with timeout) let _ = tokio::time::timeout( std::time::Duration::from_secs(5), handle.task, ).await; tracing::info!("Listener {} stopped", listener_id); Ok(()) } /// Check if a listener is running pub async fn is_listener_running(&self, listener_id: &ListenerId) -> bool { let listeners = self.listeners.read().await; listeners.contains_key(listener_id) } /// Get count of running listeners pub async fn running_listener_count(&self) -> usize { let listeners = self.listeners.read().await; listeners.len() } /// Find a listener by ID (scans all LBs) async fn find_listener(&self, listener_id: &ListenerId) -> Result { self.metadata .load_listener_by_id(listener_id) .await .map_err(|e| DataPlaneError::MetadataError(e.to_string()))? .ok_or_else(|| DataPlaneError::ListenerNotFound(listener_id.to_string())) } /// Handle a single client connection async fn handle_connection( client: TcpStream, peer_addr: SocketAddr, metadata: Arc, pool_cache: Arc>>, pool_id: PoolId, ) -> Result<()> { // Select a backend using client address for consistent hashing let connection_key = peer_addr.to_string(); let backend = Self::select_backend(&metadata, &pool_cache, &pool_id, &connection_key, false).await?; // Build backend address let backend_stream = match Self::connect_backend(&backend).await { Ok(stream) => stream, Err(error) => { Self::invalidate_pool_cache(&pool_cache, &pool_id).await; let fallback = Self::select_backend(&metadata, &pool_cache, &pool_id, &connection_key, true).await?; if fallback.id == backend.id { return Err(error); } tracing::debug!( failed_backend = %backend.id, fallback_backend = %fallback.id, "Retrying FiberLB backend connection after cache refresh" ); Self::connect_backend(&fallback).await? } }; // Proxy bidirectionally Self::proxy_bidirectional(client, backend_stream).await } async fn connect_backend(backend: &Backend) -> Result { let backend_addr: SocketAddr = format!("{}:{}", backend.address, backend.port) .parse() .map_err(|e| DataPlaneError::IoError(std::io::Error::new( std::io::ErrorKind::InvalidInput, format!("invalid backend address: {}", e), )))?; tracing::debug!("Proxying to backend {}", backend_addr); TcpStream::connect(backend_addr).await.map_err(DataPlaneError::from) } /// Select a backend using configured algorithm (round-robin or Maglev) async fn select_backend( metadata: &Arc, pool_cache: &Arc>>, pool_id: &PoolId, connection_key: &str, force_refresh: bool, ) -> Result { let snapshot = Self::get_pool_snapshot(metadata, pool_cache, pool_id, force_refresh).await?; let healthy = snapshot.healthy_backends; // Select based on algorithm match snapshot.algorithm { PoolAlgorithm::Maglev => { // Use Maglev consistent hashing let table = MaglevTable::new(&healthy, None); let idx = table.lookup(connection_key) .ok_or(DataPlaneError::NoHealthyBackends)?; Ok(healthy[idx].clone()) } _ => { // Default: Round-robin for all other algorithms // TODO: Implement LeastConnections, IpHash, WeightedRoundRobin, Random static COUNTER: AtomicUsize = AtomicUsize::new(0); let idx = COUNTER.fetch_add(1, Ordering::Relaxed) % healthy.len(); Ok(healthy.into_iter().nth(idx).unwrap()) } } } async fn get_pool_snapshot( metadata: &Arc, pool_cache: &Arc>>, pool_id: &PoolId, force_refresh: bool, ) -> Result { if !force_refresh { let cache = pool_cache.read().await; if let Some(snapshot) = cache.get(pool_id) { if snapshot.cached_at.elapsed() < Self::POOL_CACHE_TTL { return Ok(snapshot.clone()); } } } let pool = metadata .load_pool_by_id(pool_id) .await .map_err(|e| DataPlaneError::MetadataError(e.to_string()))? .ok_or_else(|| DataPlaneError::PoolNotFound(pool_id.to_string()))?; let backends = metadata .list_backends(pool_id) .await .map_err(|e| DataPlaneError::MetadataError(e.to_string()))?; let healthy_backends: Vec<_> = backends .into_iter() .filter(|b| { b.admin_state == BackendAdminState::Enabled && (b.status == BackendStatus::Online || b.status == BackendStatus::Unknown) }) .collect(); if healthy_backends.is_empty() { Self::invalidate_pool_cache(pool_cache, pool_id).await; return Err(DataPlaneError::NoHealthyBackends); } let snapshot = CachedPool { algorithm: pool.algorithm, healthy_backends, cached_at: Instant::now(), }; let mut cache = pool_cache.write().await; cache.insert(*pool_id, snapshot.clone()); Ok(snapshot) } async fn invalidate_pool_cache( pool_cache: &Arc>>, pool_id: &PoolId, ) { let mut cache = pool_cache.write().await; cache.remove(pool_id); } /// Proxy data bidirectionally between client and backend async fn proxy_bidirectional( mut client: TcpStream, mut backend: TcpStream, ) -> Result<()> { let (mut client_read, mut client_write) = client.split(); let (mut backend_read, mut backend_write) = backend.split(); // Use tokio::io::copy for efficient proxying let client_to_backend = tokio::io::copy(&mut client_read, &mut backend_write); let backend_to_client = tokio::io::copy(&mut backend_read, &mut client_write); // Run both directions concurrently, complete when either finishes tokio::select! { result = client_to_backend => { if let Err(e) = result { tracing::debug!("Client to backend copy ended: {}", e); } } result = backend_to_client => { if let Err(e) = result { tracing::debug!("Backend to client copy ended: {}", e); } } } Ok(()) } /// Shutdown all listeners pub async fn shutdown(&self) { let listener_ids: Vec = { let listeners = self.listeners.read().await; listeners.keys().cloned().collect() }; for id in listener_ids { if let Err(e) = self.stop_listener(&id).await { tracing::warn!("Error stopping listener {}: {}", id, e); } } } } #[cfg(test)] mod tests { use super::*; #[tokio::test] async fn test_dataplane_creation() { let metadata = Arc::new(LbMetadataStore::new_in_memory()); let dataplane = DataPlane::new(metadata); assert_eq!(dataplane.running_listener_count().await, 0); } #[tokio::test] async fn test_listener_not_found() { let metadata = Arc::new(LbMetadataStore::new_in_memory()); let dataplane = DataPlane::new(metadata); let fake_id = ListenerId::new(); let result = dataplane.start_listener(fake_id).await; assert!(result.is_err()); match result { Err(DataPlaneError::ListenerNotFound(_)) => {} _ => panic!("Expected ListenerNotFound error"), } } #[tokio::test] async fn test_backend_selection_empty() { let metadata = Arc::new(LbMetadataStore::new_in_memory()); let pool_cache = Arc::new(RwLock::new(HashMap::new())); let pool_id = PoolId::new(); let result = DataPlane::select_backend( &metadata, &pool_cache, &pool_id, "192.168.1.1:54321", false, ).await; assert!(result.is_err()); // Expecting PoolNotFound since pool doesn't exist } }