photoncloud-monorepo/fiberlb/crates/fiberlb-server/src/dataplane.rs
centra a7ec7e2158 Add T026 practical test + k8shost to flake + workspace files
- Created T026-practical-test task.yaml for MVP smoke testing
- Added k8shost-server to flake.nix (packages, apps, overlays)
- Staged all workspace directories for nix flake build
- Updated flake.nix shellHook to include k8shost

Resolves: T026.S1 blocker (R8 - nix submodule visibility)
2025-12-09 06:07:50 +09:00

331 lines
11 KiB
Rust

//! L4 TCP Data Plane for FiberLB
//!
//! Handles TCP proxy functionality with round-robin backend selection.
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{oneshot, RwLock};
use tokio::task::JoinHandle;
use crate::metadata::LbMetadataStore;
use fiberlb_types::{Backend, BackendStatus, ListenerId, Listener, PoolId, BackendAdminState};
/// Result type for data plane operations
pub type Result<T> = std::result::Result<T, DataPlaneError>;
/// Data plane error types
#[derive(Debug, thiserror::Error)]
pub enum DataPlaneError {
#[error("Listener not found: {0}")]
ListenerNotFound(String),
#[error("Pool not found: {0}")]
PoolNotFound(String),
#[error("No healthy backends available")]
NoHealthyBackends,
#[error("Listener already running: {0}")]
ListenerAlreadyRunning(String),
#[error("Bind error: {0}")]
BindError(String),
#[error("IO error: {0}")]
IoError(#[from] std::io::Error),
#[error("Metadata error: {0}")]
MetadataError(String),
}
/// Handle for a running listener
struct ListenerHandle {
task: JoinHandle<()>,
shutdown: oneshot::Sender<()>,
}
/// L4 TCP Data Plane
pub struct DataPlane {
metadata: Arc<LbMetadataStore>,
listeners: Arc<RwLock<HashMap<ListenerId, ListenerHandle>>>,
}
impl DataPlane {
/// Create a new data plane
pub fn new(metadata: Arc<LbMetadataStore>) -> Self {
Self {
metadata,
listeners: Arc::new(RwLock::new(HashMap::new())),
}
}
/// Start a listener by ID
pub async fn start_listener(&self, listener_id: ListenerId) -> Result<()> {
// Check if already running
{
let listeners = self.listeners.read().await;
if listeners.contains_key(&listener_id) {
return Err(DataPlaneError::ListenerAlreadyRunning(listener_id.to_string()));
}
}
// Find the listener config - need to scan all LBs
let listener = self.find_listener(&listener_id).await?;
// Get the default pool
let pool_id = listener
.default_pool_id
.ok_or_else(|| DataPlaneError::PoolNotFound("no default pool configured".into()))?;
// Bind to listener address
let bind_addr: SocketAddr = format!("0.0.0.0:{}", listener.port)
.parse()
.map_err(|e| DataPlaneError::BindError(format!("invalid port: {}", e)))?;
let tcp_listener = TcpListener::bind(bind_addr)
.await
.map_err(|e| DataPlaneError::BindError(format!("bind failed: {}", e)))?;
tracing::info!("Listener {} started on {}", listener_id, bind_addr);
// Create shutdown channel
let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
// Clone required state for the task
let metadata = self.metadata.clone();
let listener_id_clone = listener_id;
// Spawn listener task
let task = tokio::spawn(async move {
loop {
tokio::select! {
accept_result = tcp_listener.accept() => {
match accept_result {
Ok((stream, peer_addr)) => {
tracing::debug!("Accepted connection from {}", peer_addr);
let metadata = metadata.clone();
let pool_id = pool_id;
// Spawn connection handler
tokio::spawn(async move {
if let Err(e) = Self::handle_connection(stream, metadata, pool_id).await {
tracing::debug!("Connection handler error: {}", e);
}
});
}
Err(e) => {
tracing::error!("Accept error: {}", e);
}
}
}
_ = &mut shutdown_rx => {
tracing::info!("Listener {} shutting down", listener_id_clone);
break;
}
}
}
});
// Store handle
{
let mut listeners = self.listeners.write().await;
listeners.insert(listener_id, ListenerHandle {
task,
shutdown: shutdown_tx,
});
}
Ok(())
}
/// Stop a listener by ID
pub async fn stop_listener(&self, listener_id: &ListenerId) -> Result<()> {
let handle = {
let mut listeners = self.listeners.write().await;
listeners.remove(listener_id)
.ok_or_else(|| DataPlaneError::ListenerNotFound(listener_id.to_string()))?
};
// Send shutdown signal
let _ = handle.shutdown.send(());
// Wait for task to complete (with timeout)
let _ = tokio::time::timeout(
std::time::Duration::from_secs(5),
handle.task,
).await;
tracing::info!("Listener {} stopped", listener_id);
Ok(())
}
/// Check if a listener is running
pub async fn is_listener_running(&self, listener_id: &ListenerId) -> bool {
let listeners = self.listeners.read().await;
listeners.contains_key(listener_id)
}
/// Get count of running listeners
pub async fn running_listener_count(&self) -> usize {
let listeners = self.listeners.read().await;
listeners.len()
}
/// Find a listener by ID (scans all LBs)
async fn find_listener(&self, listener_id: &ListenerId) -> Result<Listener> {
// Note: This is inefficient - in production would use an ID index
let lbs = self.metadata
.list_lbs("", None)
.await
.map_err(|e| DataPlaneError::MetadataError(e.to_string()))?;
for lb in lbs {
if let Ok(Some(listener)) = self.metadata.load_listener(&lb.id, listener_id).await {
return Ok(listener);
}
}
Err(DataPlaneError::ListenerNotFound(listener_id.to_string()))
}
/// Handle a single client connection
async fn handle_connection(
client: TcpStream,
metadata: Arc<LbMetadataStore>,
pool_id: PoolId,
) -> Result<()> {
// Select a backend
let backend = Self::select_backend(&metadata, &pool_id).await?;
// Build backend address
let backend_addr: SocketAddr = format!("{}:{}", backend.address, backend.port)
.parse()
.map_err(|e| DataPlaneError::IoError(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
format!("invalid backend address: {}", e),
)))?;
tracing::debug!("Proxying to backend {}", backend_addr);
// Connect to backend
let backend_stream = TcpStream::connect(backend_addr).await?;
// Proxy bidirectionally
Self::proxy_bidirectional(client, backend_stream).await
}
/// Select a backend using round-robin
async fn select_backend(
metadata: &Arc<LbMetadataStore>,
pool_id: &PoolId,
) -> Result<Backend> {
// Get all backends for the pool
let backends = metadata
.list_backends(pool_id)
.await
.map_err(|e| DataPlaneError::MetadataError(e.to_string()))?;
// Filter to healthy/enabled backends
let healthy: Vec<_> = backends
.into_iter()
.filter(|b| {
b.admin_state == BackendAdminState::Enabled &&
(b.status == BackendStatus::Online || b.status == BackendStatus::Unknown)
})
.collect();
if healthy.is_empty() {
return Err(DataPlaneError::NoHealthyBackends);
}
// Simple round-robin using thread-local counter
// In production, would use atomic counter per pool
static COUNTER: AtomicUsize = AtomicUsize::new(0);
let idx = COUNTER.fetch_add(1, Ordering::Relaxed) % healthy.len();
Ok(healthy.into_iter().nth(idx).unwrap())
}
/// Proxy data bidirectionally between client and backend
async fn proxy_bidirectional(
mut client: TcpStream,
mut backend: TcpStream,
) -> Result<()> {
let (mut client_read, mut client_write) = client.split();
let (mut backend_read, mut backend_write) = backend.split();
// Use tokio::io::copy for efficient proxying
let client_to_backend = tokio::io::copy(&mut client_read, &mut backend_write);
let backend_to_client = tokio::io::copy(&mut backend_read, &mut client_write);
// Run both directions concurrently, complete when either finishes
tokio::select! {
result = client_to_backend => {
if let Err(e) = result {
tracing::debug!("Client to backend copy ended: {}", e);
}
}
result = backend_to_client => {
if let Err(e) = result {
tracing::debug!("Backend to client copy ended: {}", e);
}
}
}
Ok(())
}
/// Shutdown all listeners
pub async fn shutdown(&self) {
let listener_ids: Vec<ListenerId> = {
let listeners = self.listeners.read().await;
listeners.keys().cloned().collect()
};
for id in listener_ids {
if let Err(e) = self.stop_listener(&id).await {
tracing::warn!("Error stopping listener {}: {}", id, e);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_dataplane_creation() {
let metadata = Arc::new(LbMetadataStore::new_in_memory());
let dataplane = DataPlane::new(metadata);
assert_eq!(dataplane.running_listener_count().await, 0);
}
#[tokio::test]
async fn test_listener_not_found() {
let metadata = Arc::new(LbMetadataStore::new_in_memory());
let dataplane = DataPlane::new(metadata);
let fake_id = ListenerId::new();
let result = dataplane.start_listener(fake_id).await;
assert!(result.is_err());
match result {
Err(DataPlaneError::ListenerNotFound(_)) => {}
_ => panic!("Expected ListenerNotFound error"),
}
}
#[tokio::test]
async fn test_backend_selection_empty() {
let metadata = Arc::new(LbMetadataStore::new_in_memory());
let pool_id = PoolId::new();
let result = DataPlane::select_backend(&Arc::new(LbMetadataStore::new_in_memory()), &pool_id).await;
assert!(result.is_err());
match result {
Err(DataPlaneError::NoHealthyBackends) => {}
_ => panic!("Expected NoHealthyBackends error"),
}
}
}