//! L7 (HTTP/HTTPS) Data Plane //! //! Provides HTTP-aware load balancing with content-based routing, TLS termination, //! and session persistence. use axum::{ body::Body, extract::{Request, State}, http::{header, HeaderValue, StatusCode, Uri}, response::{IntoResponse, Response}, routing::any, Router, }; use hyper_util::client::legacy::connect::HttpConnector; use hyper_util::client::legacy::Client; use hyper_util::rt::TokioExecutor; use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; use tokio::sync::RwLock; use tokio::task::JoinHandle; use crate::l7_router::{L7Router, RequestInfo, RoutingResult}; use crate::metadata::LbMetadataStore; use fiberlb_types::{ Backend, BackendAdminState, BackendStatus, Listener, ListenerId, ListenerProtocol, PoolAlgorithm, PoolId, }; type Result = std::result::Result; #[derive(Debug, thiserror::Error)] pub enum L7Error { #[error("Listener not found: {0}")] ListenerNotFound(String), #[error("Invalid protocol: expected HTTP/HTTPS")] InvalidProtocol, #[error("TLS config missing for HTTPS listener")] TlsConfigMissing, #[error("TLS termination not implemented for HTTPS listeners")] TlsNotImplemented, #[error("Backend unavailable: {0}")] BackendUnavailable(String), #[error("Proxy error: {0}")] ProxyError(String), #[error("Metadata error: {0}")] Metadata(String), } /// Handle for a running L7 listener struct L7ListenerHandle { task: JoinHandle<()>, } /// L7 HTTP/HTTPS Data Plane pub struct L7DataPlane { metadata: Arc, router: Arc, http_client: Client, listeners: Arc>>, pool_counters: Arc>>, } impl L7DataPlane { /// Create a new L7 data plane pub fn new(metadata: Arc) -> Self { let http_client = Client::builder(TokioExecutor::new()) .pool_max_idle_per_host(32) .build_http(); Self { metadata: metadata.clone(), router: Arc::new(L7Router::new(metadata)), http_client, listeners: Arc::new(RwLock::new(HashMap::new())), pool_counters: Arc::new(RwLock::new(HashMap::new())), } } /// Start an HTTP/HTTPS listener pub async fn start_listener(&self, listener_id: ListenerId) -> Result<()> { let listener = self.find_listener(&listener_id).await?; // Validate protocol if !matches!(listener.protocol, ListenerProtocol::Http | ListenerProtocol::Https | ListenerProtocol::TerminatedHttps) { return Err(L7Error::InvalidProtocol); } let app = self.build_router(&listener).await?; let bind_addr: SocketAddr = format!("0.0.0.0:{}", listener.port) .parse() .map_err(|e| L7Error::ProxyError(format!("Invalid bind address: {}", e)))?; // For now, only implement HTTP (HTTPS/TLS in Phase 3) match listener.protocol { ListenerProtocol::Http => { self.start_http_server(listener_id, bind_addr, app).await } ListenerProtocol::Https | ListenerProtocol::TerminatedHttps => { // TODO: Phase 3 - TLS termination Err(L7Error::TlsNotImplemented) } _ => Err(L7Error::InvalidProtocol), } } /// Stop a listener pub async fn stop_listener(&self, listener_id: &ListenerId) -> Result<()> { let mut listeners = self.listeners.write().await; if let Some(handle) = listeners.remove(listener_id) { handle.task.abort(); tracing::info!(listener_id = %listener_id, "Stopped L7 listener"); Ok(()) } else { Err(L7Error::ListenerNotFound(listener_id.to_string())) } } /// Find listener in metadata async fn find_listener(&self, listener_id: &ListenerId) -> Result { match self .metadata .find_listener_by_id(listener_id) .await .map_err(|e| L7Error::Metadata(e.to_string()))? { Some(listener) => Ok(listener), None => Err(L7Error::ListenerNotFound(listener_id.to_string())), } } /// Build axum router for a listener async fn build_router(&self, listener: &Listener) -> Result { let state = ProxyState { metadata: self.metadata.clone(), router: self.router.clone(), http_client: self.http_client.clone(), listener_id: listener.id, default_pool_id: listener.default_pool_id.clone(), pool_counters: self.pool_counters.clone(), }; Ok(Router::new() .route("/*path", any(proxy_handler)) .route("/", any(proxy_handler)) .with_state(state)) } /// Start HTTP server (no TLS) async fn start_http_server( &self, listener_id: ListenerId, bind_addr: SocketAddr, app: Router, ) -> Result<()> { tracing::info!( listener_id = %listener_id, addr = %bind_addr, "Starting L7 HTTP listener" ); let tcp_listener = tokio::net::TcpListener::bind(bind_addr) .await .map_err(|e| L7Error::ProxyError(format!("Failed to bind: {}", e)))?; let task = tokio::spawn(async move { if let Err(e) = axum::serve(tcp_listener, app).await { tracing::error!("HTTP server error: {}", e); } }); let mut listeners = self.listeners.write().await; listeners.insert(listener_id, L7ListenerHandle { task }); Ok(()) } } /// Shared state for proxy handlers #[derive(Clone)] struct ProxyState { metadata: Arc, router: Arc, http_client: Client, listener_id: ListenerId, default_pool_id: Option, pool_counters: Arc>>, } /// Main proxy request handler #[axum::debug_handler] async fn proxy_handler( State(state): State, request: Request, ) -> impl IntoResponse { // Extract routing info before async operations (Request body is not Send) let request_info = RequestInfo::from_request(&request); // 1. Evaluate L7 policies to determine target pool let routing_result = state.router .evaluate(&state.listener_id, &request_info) .await; match routing_result { RoutingResult::Pool(pool_id) => { proxy_to_pool(&state, pool_id, request).await } RoutingResult::Redirect { url, status } => { // HTTP redirect let status_code = StatusCode::from_u16(status as u16) .unwrap_or(StatusCode::FOUND); Response::builder() .status(status_code) .header("Location", url) .body(Body::empty()) .unwrap() .into_response() } RoutingResult::Reject { status } => { // Reject with status code StatusCode::from_u16(status as u16) .unwrap_or(StatusCode::FORBIDDEN) .into_response() } RoutingResult::Default => { // Use default pool if configured match state.default_pool_id { Some(pool_id) => proxy_to_pool(&state, pool_id, request).await, None => StatusCode::SERVICE_UNAVAILABLE.into_response(), } } } } /// Proxy request to a backend pool async fn proxy_to_pool( state: &ProxyState, pool_id: PoolId, request: Request, ) -> Response { let request_hash = stable_request_hash(&request); let backend = match select_backend(state, pool_id, request_hash).await { Ok(backend) => backend, Err(error) => { tracing::warn!(pool_id = %pool_id, error = %error, "no backend available for L7 pool"); return text_response(StatusCode::SERVICE_UNAVAILABLE, error.to_string()); } }; let path_and_query = request .uri() .path_and_query() .map(|value| value.as_str()) .unwrap_or("/"); let backend_host = format!("{}:{}", backend.address, backend.port); let target_uri: Uri = match format!("http://{}{}", backend_host, path_and_query).parse() { Ok(uri) => uri, Err(error) => { tracing::error!( pool_id = %pool_id, backend = %backend_host, error = %error, "failed to build backend URI" ); return text_response(StatusCode::BAD_GATEWAY, "invalid backend URI"); } }; let (mut parts, body) = request.into_parts(); parts.uri = target_uri; rewrite_proxy_headers(&mut parts.headers, &backend_host); match state.http_client.request(Request::from_parts(parts, body)).await { Ok(response) => { let (parts, body) = response.into_parts(); Response::from_parts(parts, Body::new(body)) } Err(error) => { tracing::warn!( pool_id = %pool_id, backend = %backend_host, error = %error, "L7 backend request failed" ); text_response(StatusCode::BAD_GATEWAY, "upstream request failed") } } } async fn select_backend( state: &ProxyState, pool_id: PoolId, request_hash: usize, ) -> Result { let pool = state .metadata .load_pool_by_id(&pool_id) .await .map_err(|error| L7Error::Metadata(error.to_string()))? .ok_or_else(|| L7Error::BackendUnavailable(format!("pool {pool_id} not found")))?; let mut backends = state .metadata .list_backends(&pool_id) .await .map_err(|error| L7Error::Metadata(error.to_string()))? .into_iter() .filter(backend_is_available) .collect::>(); if backends.is_empty() { return Err(L7Error::BackendUnavailable(format!( "pool {pool_id} has no healthy backends" ))); } backends.sort_by(|lhs, rhs| lhs.name.cmp(&rhs.name)); let index = match pool.algorithm { PoolAlgorithm::IpHash | PoolAlgorithm::Maglev => request_hash % backends.len(), PoolAlgorithm::WeightedRoundRobin => weighted_round_robin_index(state, pool_id, &backends).await, PoolAlgorithm::Random => next_counter(state, pool_id).await % backends.len(), PoolAlgorithm::LeastConnections | PoolAlgorithm::RoundRobin => { next_counter(state, pool_id).await % backends.len() } }; Ok(backends[index].clone()) } fn backend_is_available(backend: &Backend) -> bool { backend.admin_state == BackendAdminState::Enabled && matches!(backend.status, BackendStatus::Online | BackendStatus::Unknown) } async fn next_counter(state: &ProxyState, pool_id: PoolId) -> usize { let mut counters = state.pool_counters.write().await; let counter = counters.entry(pool_id).or_insert(0); let current = *counter; *counter = counter.wrapping_add(1); current } async fn weighted_round_robin_index( state: &ProxyState, pool_id: PoolId, backends: &[Backend], ) -> usize { let total_weight = backends .iter() .map(|backend| backend.weight.max(1) as usize) .sum::(); if total_weight == 0 { return 0; } let mut offset = next_counter(state, pool_id).await % total_weight; for (index, backend) in backends.iter().enumerate() { let weight = backend.weight.max(1) as usize; if offset < weight { return index; } offset -= weight; } 0 } fn stable_request_hash(request: &Request) -> usize { use std::hash::{Hash, Hasher}; let mut hasher = std::collections::hash_map::DefaultHasher::new(); request.method().hash(&mut hasher); request.uri().path_and_query().map(|value| value.as_str()).hash(&mut hasher); request .headers() .get(header::HOST) .and_then(|value| value.to_str().ok()) .hash(&mut hasher); hasher.finish() as usize } fn rewrite_proxy_headers(headers: &mut axum::http::HeaderMap, backend_host: &str) { headers.remove(header::CONNECTION); headers.remove("proxy-connection"); headers.remove("keep-alive"); headers.remove(header::TE); headers.remove(header::TRAILER); headers.remove(header::TRANSFER_ENCODING); headers.remove(header::UPGRADE); if let Ok(host) = HeaderValue::from_str(backend_host) { headers.insert(header::HOST, host); } } fn text_response(status: StatusCode, body: impl Into) -> Response { Response::builder() .status(status) .body(body.into()) .unwrap() }