From ce4bab07d6bf9769759f37a388170fd89dbe7f4d Mon Sep 17 00:00:00 2001 From: centra Date: Mon, 30 Mar 2026 16:13:14 +0900 Subject: [PATCH] fiberlb: add native BGP speaker and VM smoke test --- fiberlb/crates/fiberlb-server/build.rs | 2 +- .../crates/fiberlb-server/src/bgp_client.rs | 1232 ++++++++++++++--- fiberlb/crates/fiberlb-server/src/config.rs | 171 ++- .../crates/fiberlb-server/src/healthcheck.rs | 38 +- fiberlb/crates/fiberlb-server/src/lib.rs | 8 +- fiberlb/crates/fiberlb-server/src/maglev.rs | 26 +- fiberlb/crates/fiberlb-server/src/main.rs | 104 +- .../crates/fiberlb-server/src/vip_manager.rs | 21 +- flake.nix | 9 + nix-nos/modules/bgp/gobgp.nix | 4 +- nix/modules/fiberlb.nix | 168 ++- nix/modules/iam.nix | 5 +- nix/modules/plasmacloud-network.nix | 28 +- nix/tests/fiberlb-native-bgp-vm-smoke.nix | 378 +++++ 14 files changed, 1919 insertions(+), 275 deletions(-) create mode 100644 nix/tests/fiberlb-native-bgp-vm-smoke.nix diff --git a/fiberlb/crates/fiberlb-server/build.rs b/fiberlb/crates/fiberlb-server/build.rs index 9df21fd..9def0f5 100644 --- a/fiberlb/crates/fiberlb-server/build.rs +++ b/fiberlb/crates/fiberlb-server/build.rs @@ -5,6 +5,6 @@ fn main() -> Result<(), Box> { tonic_build::configure() .build_server(false) .build_client(true) - .compile(&["proto/api/gobgp.proto"], &["proto"])?; + .compile_protos(&["proto/api/gobgp.proto"], &["proto"])?; Ok(()) } diff --git a/fiberlb/crates/fiberlb-server/src/bgp_client.rs b/fiberlb/crates/fiberlb-server/src/bgp_client.rs index 5568e62..a8cf511 100644 --- a/fiberlb/crates/fiberlb-server/src/bgp_client.rs +++ b/fiberlb/crates/fiberlb-server/src/bgp_client.rs @@ -1,228 +1,1102 @@ -//! BGP client for GoBGP gRPC integration +//! Native BGP speaker for FiberLB VIP advertisement. //! -//! Provides a Rust wrapper around the GoBGP gRPC API to advertise -//! and withdraw VIP routes for Anycast load balancing. +//! This module owns the minimal eBGP control plane needed for FiberLB: +//! outbound TCP sessions to static peers, IPv4 unicast OPEN/KEEPALIVE/UPDATE, +//! and `/32` VIP advertise/withdraw driven by the VIP manager. -use std::net::IpAddr; +use std::collections::HashMap; +use std::net::{IpAddr, Ipv4Addr}; +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; use std::sync::Arc; -use thiserror::Error; -use tonic::transport::Channel; -use tracing::{debug, error, info, warn}; +use std::time::{Duration, Instant}; -/// Result type for BGP operations +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use tokio::net::TcpStream; +use tokio::sync::{mpsc, watch, RwLock}; +use tokio::time::sleep; +use tracing::{debug, info, warn}; + +use crate::config::{BgpConfig, BgpPeerConfig}; + +const BGP_MARKER: [u8; 16] = [0xff; 16]; +const BGP_VERSION: u8 = 4; +const BGP_TYPE_OPEN: u8 = 1; +const BGP_TYPE_UPDATE: u8 = 2; +const BGP_TYPE_NOTIFICATION: u8 = 3; +const BGP_TYPE_KEEPALIVE: u8 = 4; +const BGP_MAX_MESSAGE_SIZE: usize = 4096; +const BGP_ORIGIN_IGP: u8 = 0; +const ATTR_FLAG_TRANSITIVE: u8 = 0x40; +const ATTR_FLAG_EXTENDED_LEN: u8 = 0x10; +const ATTR_TYPE_ORIGIN: u8 = 1; +const ATTR_TYPE_AS_PATH: u8 = 2; +const ATTR_TYPE_NEXT_HOP: u8 = 3; +const AS_PATH_SEGMENT_SEQUENCE: u8 = 2; + +/// Result type for BGP operations. pub type Result = std::result::Result; -/// BGP client errors -#[derive(Debug, Error)] +/// BGP client errors. +#[derive(Debug, thiserror::Error)] pub enum BgpError { - #[error("gRPC transport error: {0}")] - Transport(String), - #[error("BGP route operation failed: {0}")] - RouteOperation(String), - #[error("Invalid IP address: {0}")] + #[error("I/O error: {0}")] + Io(#[from] std::io::Error), + #[error("invalid configuration: {0}")] + Config(String), + #[error("invalid IP address: {0}")] InvalidAddress(String), - #[error("GoBGP not reachable at {0}")] + #[error("connection failed: {0}")] ConnectionFailed(String), + #[error("protocol error: {0}")] + Protocol(String), + #[error("unsupported operation: {0}")] + Unsupported(String), } -/// BGP client configuration -#[derive(Debug, Clone)] -pub struct BgpConfig { - /// GoBGP gRPC server address (e.g., "127.0.0.1:50051") - pub gobgp_address: String, - /// Local AS number - pub local_as: u32, - /// Router ID in dotted decimal format - pub router_id: String, - /// Whether BGP integration is enabled - pub enabled: bool, -} - -impl Default for BgpConfig { - fn default() -> Self { - Self { - gobgp_address: "127.0.0.1:50051".to_string(), - local_as: 65001, - router_id: "10.0.0.1".to_string(), - enabled: false, - } - } -} - -/// BGP client trait for VIP advertisement -/// -/// Abstracts the BGP speaker interface to allow for different implementations -/// (GoBGP, RustyBGP, mock for testing) +/// BGP client trait for VIP advertisement. #[tonic::async_trait] pub trait BgpClient: Send + Sync { - /// Advertise a VIP route to BGP peers + /// Advertise a VIP route to BGP peers. async fn announce_route(&self, prefix: IpAddr, next_hop: IpAddr) -> Result<()>; - /// Withdraw a VIP route from BGP peers + /// Withdraw a VIP route from BGP peers. async fn withdraw_route(&self, prefix: IpAddr) -> Result<()>; - /// Check if client is connected to BGP daemon + /// Check if at least one peer session is established. async fn is_connected(&self) -> bool; } -/// GoBGP client implementation -/// -/// Connects to GoBGP daemon via gRPC and manages route advertisements -pub struct GobgpClient { - config: BgpConfig, - _channel: Option, -} - -impl GobgpClient { - /// Create a new GoBGP client - pub async fn new(config: BgpConfig) -> Result { - if !config.enabled { - info!("BGP is disabled in configuration"); - return Ok(Self { - config, - _channel: None, - }); - } - - info!( - "Connecting to GoBGP at {} (AS {})", - config.gobgp_address, config.local_as - ); - - // TODO: Connect to GoBGP gRPC server - // For now, we create a client that logs operations but doesn't actually connect - // Real implementation would use tonic::transport::Channel::connect() - // and the GoBGP protobuf service stubs - - Ok(Self { - config, - _channel: None, - }) - } - - /// Get local router address for use as next hop - fn get_next_hop(&self) -> Result { - self.config - .router_id - .parse() - .map_err(|e| BgpError::InvalidAddress(format!("Invalid router_id: {}", e))) - } - - /// Format prefix as CIDR string (always /32 for VIP) - fn format_prefix(addr: IpAddr) -> String { - match addr { - IpAddr::V4(_) => format!("{}/32", addr), - IpAddr::V6(_) => format!("{}/128", addr), - } - } -} +/// A disabled client that keeps FiberLB runtime behavior uniform. +struct NullBgpClient; #[tonic::async_trait] -impl BgpClient for GobgpClient { - async fn announce_route(&self, prefix: IpAddr, next_hop: IpAddr) -> Result<()> { - if !self.config.enabled { - debug!("BGP disabled, skipping route announcement for {}", prefix); - return Ok(()); - } - - let prefix_str = Self::format_prefix(prefix); - info!( - "Announcing BGP route: {} via {} (AS {})", - prefix_str, next_hop, self.config.local_as - ); - - // TODO: Actual GoBGP gRPC call - // This would be something like: - // - // let mut client = gobgp_client::GobgpApiClient::new(self.channel.clone()); - // let path = Path { - // nlri: Some(IpAddressPrefix { - // prefix_len: 32, - // prefix: prefix.to_string(), - // }), - // pattrs: vec![ - // PathAttribute::origin(Origin::Igp), - // PathAttribute::next_hop(next_hop.to_string()), - // PathAttribute::local_pref(100), - // ], - // }; - // client.add_path(AddPathRequest { path: Some(path) }).await?; - - debug!("BGP route announced successfully: {}", prefix_str); +impl BgpClient for NullBgpClient { + async fn announce_route(&self, _prefix: IpAddr, _next_hop: IpAddr) -> Result<()> { Ok(()) } - async fn withdraw_route(&self, prefix: IpAddr) -> Result<()> { - if !self.config.enabled { - debug!("BGP disabled, skipping route withdrawal for {}", prefix); - return Ok(()); - } - - let prefix_str = Self::format_prefix(prefix); - info!("Withdrawing BGP route: {} (AS {})", prefix_str, self.config.local_as); - - // TODO: Actual GoBGP gRPC call - // This would be something like: - // - // let mut client = gobgp_client::GobgpApiClient::new(self.channel.clone()); - // let path = Path { - // nlri: Some(IpAddressPrefix { - // prefix_len: 32, - // prefix: prefix.to_string(), - // }), - // is_withdraw: true, - // // ... other fields - // }; - // client.delete_path(DeletePathRequest { path: Some(path) }).await?; - - debug!("BGP route withdrawn successfully: {}", prefix_str); + async fn withdraw_route(&self, _prefix: IpAddr) -> Result<()> { Ok(()) } async fn is_connected(&self) -> bool { - if !self.config.enabled { - return false; - } - - // TODO: Check GoBGP connection health - // For now, always return true if enabled - true + false } } -/// Create a BGP client from configuration +#[derive(Debug)] +struct BgpSharedState { + desired_routes: RwLock>, + route_version: AtomicU64, + route_updates: watch::Sender, + connected_sessions: AtomicUsize, +} + +/// Native outbound-only BGP speaker. +pub struct NativeBgpSpeaker { + shared: Arc, +} + +impl NativeBgpSpeaker { + /// Create a new native BGP speaker and spawn peer sessions. + pub async fn new(config: BgpConfig) -> Result { + validate_config(&config)?; + + let (route_updates, _rx) = watch::channel(0u64); + let shared = Arc::new(BgpSharedState { + desired_routes: RwLock::new(HashMap::new()), + route_version: AtomicU64::new(0), + route_updates, + connected_sessions: AtomicUsize::new(0), + }); + + for peer in config.peers.clone() { + tokio::spawn(run_peer_loop(config.clone(), peer, shared.clone())); + } + + Ok(Self { shared }) + } + + async fn update_route(&self, prefix: Ipv4Addr, next_hop: Ipv4Addr) { + let mut desired = self.shared.desired_routes.write().await; + let changed = desired.insert(prefix, next_hop) != Some(next_hop); + drop(desired); + + if changed { + publish_route_change(&self.shared); + } + } + + async fn remove_route(&self, prefix: Ipv4Addr) { + let mut desired = self.shared.desired_routes.write().await; + let changed = desired.remove(&prefix).is_some(); + drop(desired); + + if changed { + publish_route_change(&self.shared); + } + } +} + +#[tonic::async_trait] +impl BgpClient for NativeBgpSpeaker { + async fn announce_route(&self, prefix: IpAddr, next_hop: IpAddr) -> Result<()> { + let prefix = to_ipv4(prefix, "VIP prefix")?; + let next_hop = to_ipv4(next_hop, "BGP next hop")?; + + self.update_route(prefix, next_hop).await; + Ok(()) + } + + async fn withdraw_route(&self, prefix: IpAddr) -> Result<()> { + let prefix = to_ipv4(prefix, "VIP prefix")?; + self.remove_route(prefix).await; + Ok(()) + } + + async fn is_connected(&self) -> bool { + self.shared.connected_sessions.load(Ordering::Relaxed) > 0 + } +} + +/// Create a BGP client from configuration. pub async fn create_bgp_client(config: BgpConfig) -> Result> { - let client = GobgpClient::new(config).await?; - Ok(Arc::new(client)) + if !config.enabled { + info!("FiberLB native BGP speaker disabled"); + return Ok(Arc::new(NullBgpClient)); + } + + let speaker = NativeBgpSpeaker::new(config).await?; + Ok(Arc::new(speaker)) +} + +async fn run_peer_loop(config: BgpConfig, peer: BgpPeerConfig, shared: Arc) { + let peer_name = peer_name(&peer); + let connect_retry = Duration::from_secs(config.connect_retry_secs.max(1)); + + loop { + match establish_peer_session(&config, &peer, shared.clone()).await { + Ok(()) => warn!(peer = %peer_name, "BGP peer session ended cleanly; reconnecting"), + Err(error) => warn!(peer = %peer_name, error = %error, "BGP peer session failed"), + } + + sleep(connect_retry).await; + } +} + +async fn establish_peer_session( + config: &BgpConfig, + peer: &BgpPeerConfig, + shared: Arc, +) -> Result<()> { + let endpoint = format!("{}:{}", peer.address, peer.port); + info!( + peer = %peer_name(peer), + local_as = config.local_as, + router_id = %config.router_id, + "Connecting FiberLB BGP session", + ); + + let mut stream = TcpStream::connect(&endpoint) + .await + .map_err(|error| BgpError::ConnectionFailed(format!("{} ({})", endpoint, error)))?; + stream.set_nodelay(true)?; + + let handshake = perform_handshake(&mut stream, config, peer).await?; + let keepalive_interval = + negotiated_keepalive_interval(config.keepalive_secs, handshake.hold_time); + let hold_deadline = if handshake.hold_time == 0 { + None + } else { + Some(Duration::from_secs(handshake.hold_time as u64)) + }; + + info!( + peer = %peer_name(peer), + hold_time_secs = handshake.hold_time, + keepalive_secs = keepalive_interval.as_secs(), + peer_router_id = %handshake.router_id, + "FiberLB BGP session established", + ); + + shared.connected_sessions.fetch_add(1, Ordering::Relaxed); + let session_result = run_established_session( + stream, + config, + peer, + shared.clone(), + keepalive_interval, + hold_deadline, + ) + .await; + shared.connected_sessions.fetch_sub(1, Ordering::Relaxed); + session_result +} + +async fn perform_handshake( + stream: &mut TcpStream, + config: &BgpConfig, + peer: &BgpPeerConfig, +) -> Result { + let local_as = u16::try_from(config.local_as) + .map_err(|_| BgpError::Config(format!("local AS {} is out of range", config.local_as)))?; + let router_id = config.router_id_addr().map_err(|error| { + BgpError::InvalidAddress(format!( + "invalid router_id '{}': {}", + config.router_id, error + )) + })?; + + send_open(stream, local_as, config.hold_time_secs, router_id).await?; + + let remote_open = match read_bgp_message(stream).await? { + BgpMessage::Open(open) => open, + message => { + return Err(BgpError::Protocol(format!( + "expected OPEN from {}, received {:?}", + peer_name(peer), + message + ))); + } + }; + + let peer_as = u16::try_from(peer.asn) + .map_err(|_| BgpError::Config(format!("peer AS {} is out of range", peer.asn)))?; + if remote_open.asn != peer_as { + return Err(BgpError::Protocol(format!( + "peer {} reported AS {}, expected {}", + peer_name(peer), + remote_open.asn, + peer_as + ))); + } + + send_keepalive(stream).await?; + + loop { + match read_bgp_message(stream).await? { + BgpMessage::Keepalive => break, + BgpMessage::Notification(notification) => { + return Err(BgpError::Protocol(format!( + "peer {} rejected OPEN with notification {}:{}", + peer_name(peer), + notification.code, + notification.subcode + ))); + } + BgpMessage::Update(_) => { + debug!(peer = %peer_name(peer), "Ignoring UPDATE during handshake"); + } + BgpMessage::Unknown { msg_type, .. } => { + debug!(peer = %peer_name(peer), msg_type, "Ignoring unknown BGP message during handshake"); + } + BgpMessage::Open(_) => { + return Err(BgpError::Protocol(format!( + "peer {} sent unexpected OPEN after OPEN exchange", + peer_name(peer) + ))); + } + } + } + + Ok(remote_open) +} + +async fn run_established_session( + stream: TcpStream, + config: &BgpConfig, + peer: &BgpPeerConfig, + shared: Arc, + keepalive_interval: Duration, + hold_deadline: Option, +) -> Result<()> { + let local_as = u16::try_from(config.local_as) + .map_err(|_| BgpError::Config(format!("local AS {} is out of range", config.local_as)))?; + let (mut reader, mut writer) = stream.into_split(); + let (event_tx, mut event_rx) = mpsc::channel(16); + let peer_name = peer_name(peer); + + let reader_task = tokio::spawn(async move { + loop { + let message = read_bgp_message(&mut reader).await; + let terminal = matches!(message, Err(_) | Ok(BgpMessage::Notification(_))); + + if event_tx.send(message).await.is_err() { + break; + } + + if terminal { + break; + } + } + }); + + let mut advertised = HashMap::new(); + reconcile_routes(&shared, &mut writer, &mut advertised, local_as).await?; + + let mut keepalive = tokio::time::interval(keepalive_interval); + keepalive.tick().await; + + let mut hold_monitor = tokio::time::interval(Duration::from_secs(1)); + hold_monitor.tick().await; + + let mut route_updates = shared.route_updates.subscribe(); + let mut last_rx = Instant::now(); + + loop { + tokio::select! { + maybe_message = event_rx.recv() => { + let Some(message) = maybe_message else { + reader_task.abort(); + return Err(BgpError::ConnectionFailed(format!("peer {} closed the session", peer_name))); + }; + + match message { + Ok(BgpMessage::Keepalive) => { + last_rx = Instant::now(); + } + Ok(BgpMessage::Update(update)) => { + last_rx = Instant::now(); + debug!( + peer = %peer_name, + announced = update.announced_routes.len(), + withdrawn = update.withdrawn_routes.len(), + next_hop = ?update.next_hop, + as_path = ?update.as_path, + "Ignoring inbound UPDATE from peer" + ); + } + Ok(BgpMessage::Notification(notification)) => { + reader_task.abort(); + return Err(BgpError::Protocol(format!( + "peer {} sent notification {}:{} ({} data bytes)", + peer_name, + notification.code, + notification.subcode, + notification.data.len() + ))); + } + Ok(BgpMessage::Unknown { msg_type, payload }) => { + last_rx = Instant::now(); + debug!( + peer = %peer_name, + msg_type, + payload_len = payload.len(), + "Ignoring unsupported BGP message" + ); + } + Ok(BgpMessage::Open(_)) => { + reader_task.abort(); + return Err(BgpError::Protocol(format!( + "peer {} sent OPEN after session establishment", + peer_name + ))); + } + Err(error) => { + reader_task.abort(); + return Err(error); + } + } + } + _ = keepalive.tick() => { + send_keepalive(&mut writer).await?; + } + changed = route_updates.changed() => { + if changed.is_err() { + reader_task.abort(); + return Err(BgpError::ConnectionFailed(format!("peer {} route update channel closed", peer_name))); + } + route_updates.borrow_and_update(); + reconcile_routes(&shared, &mut writer, &mut advertised, local_as).await?; + } + _ = hold_monitor.tick(), if hold_deadline.is_some() => { + if last_rx.elapsed() >= hold_deadline.unwrap() { + reader_task.abort(); + return Err(BgpError::Protocol(format!("peer {} hold timer expired", peer_name))); + } + } + } + } +} + +async fn reconcile_routes( + shared: &BgpSharedState, + writer: &mut W, + advertised: &mut HashMap, + local_as: u16, +) -> Result<()> { + let desired = shared.desired_routes.read().await.clone(); + + let stale: Vec = advertised + .keys() + .filter(|prefix| !desired.contains_key(prefix)) + .copied() + .collect(); + + for prefix in stale { + send_withdraw(writer, prefix).await?; + advertised.remove(&prefix); + } + + for (prefix, next_hop) in desired { + if advertised.get(&prefix) == Some(&next_hop) { + continue; + } + + send_announce(writer, prefix, next_hop, local_as).await?; + advertised.insert(prefix, next_hop); + } + + Ok(()) +} + +fn publish_route_change(shared: &BgpSharedState) { + let version = shared.route_version.fetch_add(1, Ordering::Relaxed) + 1; + let _ = shared.route_updates.send(version); +} + +fn validate_config(config: &BgpConfig) -> Result<()> { + if !config.enabled { + return Ok(()); + } + + if config.peers.is_empty() { + return Err(BgpError::Config( + "BGP is enabled but no peers are configured".to_string(), + )); + } + + if config.local_as == 0 || config.local_as > u16::MAX as u32 { + return Err(BgpError::Config(format!( + "FiberLB native BGP currently supports 16-bit ASNs only (got {})", + config.local_as + ))); + } + + config.router_id_addr().map_err(|error| { + BgpError::InvalidAddress(format!( + "invalid router_id '{}': {}", + config.router_id, error + )) + })?; + + let next_hop = config.next_hop_addr().map_err(|error| { + let rendered = config.next_hop.as_deref().unwrap_or(&config.router_id); + BgpError::InvalidAddress(format!("invalid BGP next hop '{}': {}", rendered, error)) + })?; + to_ipv4(next_hop, "BGP next hop")?; + + if config.hold_time_secs != 0 && config.hold_time_secs < 3 { + return Err(BgpError::Config( + "hold_time_secs must be 0 or at least 3 seconds".to_string(), + )); + } + + if config.keepalive_secs == 0 { + return Err(BgpError::Config( + "keepalive_secs must be at least 1 second".to_string(), + )); + } + + for peer in &config.peers { + if peer.address.trim().is_empty() { + return Err(BgpError::Config( + "BGP peer address must not be empty".to_string(), + )); + } + if peer.port == 0 { + return Err(BgpError::Config(format!( + "BGP peer {} has invalid TCP port 0", + peer_name(peer) + ))); + } + if peer.asn == 0 || peer.asn > u16::MAX as u32 { + return Err(BgpError::Config(format!( + "FiberLB native BGP currently supports 16-bit peer ASNs only (peer {} has AS {})", + peer_name(peer), + peer.asn + ))); + } + if peer.asn == config.local_as { + return Err(BgpError::Config(format!( + "FiberLB native BGP currently supports eBGP only; peer {} uses the same AS {}", + peer_name(peer), + peer.asn + ))); + } + } + + Ok(()) +} + +fn peer_name(peer: &BgpPeerConfig) -> String { + if peer.description.trim().is_empty() { + format!("{}:{}", peer.address, peer.port) + } else { + format!("{} ({}:{})", peer.description, peer.address, peer.port) + } +} + +fn negotiated_keepalive_interval( + requested_keepalive_secs: u16, + negotiated_hold_time_secs: u16, +) -> Duration { + let keepalive_secs = if negotiated_hold_time_secs == 0 { + requested_keepalive_secs.max(1) + } else { + requested_keepalive_secs + .max(1) + .min((negotiated_hold_time_secs / 3).max(1)) + }; + + Duration::from_secs(keepalive_secs as u64) +} + +fn to_ipv4(addr: IpAddr, label: &str) -> Result { + match addr { + IpAddr::V4(addr) => Ok(addr), + IpAddr::V6(_) => Err(BgpError::Unsupported(format!( + "{} must be IPv4 for native BGP speaker", + label + ))), + } +} + +async fn send_open( + writer: &mut W, + local_as: u16, + hold_time_secs: u16, + router_id: Ipv4Addr, +) -> Result<()> { + let mut payload = Vec::with_capacity(10); + payload.push(BGP_VERSION); + payload.extend_from_slice(&local_as.to_be_bytes()); + payload.extend_from_slice(&hold_time_secs.to_be_bytes()); + payload.extend_from_slice(&router_id.octets()); + payload.push(0); // no optional parameters + + write_bgp_message(writer, BGP_TYPE_OPEN, &payload).await +} + +async fn send_keepalive(writer: &mut W) -> Result<()> { + write_bgp_message(writer, BGP_TYPE_KEEPALIVE, &[]).await +} + +async fn send_announce( + writer: &mut W, + prefix: Ipv4Addr, + next_hop: Ipv4Addr, + local_as: u16, +) -> Result<()> { + let mut path_attributes = Vec::new(); + + path_attributes.extend_from_slice(&[ATTR_FLAG_TRANSITIVE, ATTR_TYPE_ORIGIN, 1, BGP_ORIGIN_IGP]); + + path_attributes.extend_from_slice(&[ + ATTR_FLAG_TRANSITIVE, + ATTR_TYPE_AS_PATH, + 4, + AS_PATH_SEGMENT_SEQUENCE, + 1, + ]); + path_attributes.extend_from_slice(&local_as.to_be_bytes()); + + path_attributes.extend_from_slice(&[ATTR_FLAG_TRANSITIVE, ATTR_TYPE_NEXT_HOP, 4]); + path_attributes.extend_from_slice(&next_hop.octets()); + + let nlri = encode_ipv4_prefix(prefix); + let mut payload = Vec::with_capacity(2 + 2 + path_attributes.len() + nlri.len()); + payload.extend_from_slice(&0u16.to_be_bytes()); + payload.extend_from_slice(&(path_attributes.len() as u16).to_be_bytes()); + payload.extend_from_slice(&path_attributes); + payload.extend_from_slice(&nlri); + + write_bgp_message(writer, BGP_TYPE_UPDATE, &payload).await +} + +async fn send_withdraw(writer: &mut W, prefix: Ipv4Addr) -> Result<()> { + let withdrawn = encode_ipv4_prefix(prefix); + let mut payload = Vec::with_capacity(2 + withdrawn.len() + 2); + payload.extend_from_slice(&(withdrawn.len() as u16).to_be_bytes()); + payload.extend_from_slice(&withdrawn); + payload.extend_from_slice(&0u16.to_be_bytes()); + + write_bgp_message(writer, BGP_TYPE_UPDATE, &payload).await +} + +fn encode_ipv4_prefix(prefix: Ipv4Addr) -> Vec { + let mut bytes = Vec::with_capacity(5); + bytes.push(32); + bytes.extend_from_slice(&prefix.octets()); + bytes +} + +async fn write_bgp_message( + writer: &mut W, + msg_type: u8, + payload: &[u8], +) -> Result<()> { + let length = 19 + payload.len(); + if length > BGP_MAX_MESSAGE_SIZE { + return Err(BgpError::Protocol(format!( + "BGP message too large: {} bytes", + length + ))); + } + + let mut header = Vec::with_capacity(19); + header.extend_from_slice(&BGP_MARKER); + header.extend_from_slice(&(length as u16).to_be_bytes()); + header.push(msg_type); + + writer.write_all(&header).await?; + writer.write_all(payload).await?; + writer.flush().await?; + Ok(()) +} + +async fn read_bgp_message(reader: &mut R) -> Result { + let mut header = [0u8; 19]; + reader.read_exact(&mut header).await?; + + if header[..16] != BGP_MARKER { + return Err(BgpError::Protocol("invalid BGP marker".to_string())); + } + + let length = u16::from_be_bytes([header[16], header[17]]) as usize; + if !(19..=BGP_MAX_MESSAGE_SIZE).contains(&length) { + return Err(BgpError::Protocol(format!( + "invalid BGP message length {}", + length + ))); + } + + let msg_type = header[18]; + let body_len = length - 19; + let mut body = vec![0u8; body_len]; + if body_len > 0 { + reader.read_exact(&mut body).await?; + } + + match msg_type { + BGP_TYPE_OPEN => Ok(BgpMessage::Open(parse_open_message(&body)?)), + BGP_TYPE_UPDATE => Ok(BgpMessage::Update(parse_update_message(&body)?)), + BGP_TYPE_NOTIFICATION => Ok(BgpMessage::Notification(parse_notification_message(&body)?)), + BGP_TYPE_KEEPALIVE => { + if !body.is_empty() { + return Err(BgpError::Protocol( + "KEEPALIVE message must not carry a payload".to_string(), + )); + } + Ok(BgpMessage::Keepalive) + } + _ => Ok(BgpMessage::Unknown { + msg_type, + payload: body, + }), + } +} + +fn parse_open_message(body: &[u8]) -> Result { + if body.len() < 10 { + return Err(BgpError::Protocol(format!( + "OPEN message too short: {} bytes", + body.len() + ))); + } + + if body[0] != BGP_VERSION { + return Err(BgpError::Protocol(format!( + "unsupported BGP version {}", + body[0] + ))); + } + + let optional_len = body[9] as usize; + if body.len() != 10 + optional_len { + return Err(BgpError::Protocol(format!( + "OPEN optional parameter length mismatch: body={}, optional={}", + body.len(), + optional_len + ))); + } + + Ok(OpenMessage { + asn: u16::from_be_bytes([body[1], body[2]]), + hold_time: u16::from_be_bytes([body[3], body[4]]), + router_id: Ipv4Addr::new(body[5], body[6], body[7], body[8]), + }) +} + +fn parse_notification_message(body: &[u8]) -> Result { + if body.len() < 2 { + return Err(BgpError::Protocol(format!( + "NOTIFICATION message too short: {} bytes", + body.len() + ))); + } + + Ok(NotificationMessage { + code: body[0], + subcode: body[1], + data: body[2..].to_vec(), + }) +} + +fn parse_update_message(body: &[u8]) -> Result { + if body.len() < 4 { + return Err(BgpError::Protocol(format!( + "UPDATE message too short: {} bytes", + body.len() + ))); + } + + let withdrawn_len = u16::from_be_bytes([body[0], body[1]]) as usize; + if body.len() < 2 + withdrawn_len + 2 { + return Err(BgpError::Protocol( + "UPDATE withdrawn-routes section exceeds message length".to_string(), + )); + } + + let mut cursor = 2usize; + let withdrawn = parse_prefix_list(&body[cursor..cursor + withdrawn_len])?; + cursor += withdrawn_len; + + let attrs_len = u16::from_be_bytes([body[cursor], body[cursor + 1]]) as usize; + cursor += 2; + if body.len() < cursor + attrs_len { + return Err(BgpError::Protocol( + "UPDATE path attribute section exceeds message length".to_string(), + )); + } + + let attrs = &body[cursor..cursor + attrs_len]; + cursor += attrs_len; + + let announced = parse_prefix_list(&body[cursor..])?; + let (next_hop, as_path) = parse_path_attributes(attrs)?; + + Ok(UpdateMessage { + withdrawn_routes: withdrawn, + announced_routes: announced, + next_hop, + as_path, + }) +} + +fn parse_prefix_list(data: &[u8]) -> Result> { + let mut prefixes = Vec::new(); + let mut cursor = 0usize; + + while cursor < data.len() { + let prefix_len = data[cursor] as usize; + cursor += 1; + + if prefix_len > 32 { + return Err(BgpError::Protocol(format!( + "unsupported IPv4 prefix length {}", + prefix_len + ))); + } + + let octet_len = prefix_len.div_ceil(8); + if data.len() < cursor + octet_len { + return Err(BgpError::Protocol( + "prefix length exceeds UPDATE payload".to_string(), + )); + } + + let mut octets = [0u8; 4]; + octets[..octet_len].copy_from_slice(&data[cursor..cursor + octet_len]); + cursor += octet_len; + + if prefix_len != 32 { + return Err(BgpError::Protocol(format!( + "FiberLB native BGP currently supports /32 only (got /{})", + prefix_len + ))); + } + + prefixes.push(Ipv4Addr::from(octets)); + } + + Ok(prefixes) +} + +fn parse_path_attributes(attrs: &[u8]) -> Result<(Option, Vec)> { + let mut cursor = 0usize; + let mut next_hop = None; + let mut as_path = Vec::new(); + + while cursor < attrs.len() { + if attrs.len() < cursor + 3 { + return Err(BgpError::Protocol( + "path attribute header truncated".to_string(), + )); + } + + let flags = attrs[cursor]; + let attr_type = attrs[cursor + 1]; + cursor += 2; + + let (attr_len, len_width) = if flags & ATTR_FLAG_EXTENDED_LEN != 0 { + if attrs.len() < cursor + 2 { + return Err(BgpError::Protocol( + "extended path attribute length truncated".to_string(), + )); + } + ( + u16::from_be_bytes([attrs[cursor], attrs[cursor + 1]]) as usize, + 2usize, + ) + } else { + (attrs[cursor] as usize, 1usize) + }; + cursor += len_width; + + if attrs.len() < cursor + attr_len { + return Err(BgpError::Protocol( + "path attribute value truncated".to_string(), + )); + } + let value = &attrs[cursor..cursor + attr_len]; + cursor += attr_len; + + match attr_type { + ATTR_TYPE_NEXT_HOP => { + if value.len() == 4 { + next_hop = Some(Ipv4Addr::new(value[0], value[1], value[2], value[3])); + } + } + ATTR_TYPE_AS_PATH => { + as_path = parse_as_path(value)?; + } + _ => {} + } + } + + Ok((next_hop, as_path)) +} + +fn parse_as_path(data: &[u8]) -> Result> { + let mut cursor = 0usize; + let mut path = Vec::new(); + + while cursor < data.len() { + if data.len() < cursor + 2 { + return Err(BgpError::Protocol("AS_PATH segment truncated".to_string())); + } + + let _segment_type = data[cursor]; + let segment_len = data[cursor + 1] as usize; + cursor += 2; + + let bytes_len = segment_len * 2; + if data.len() < cursor + bytes_len { + return Err(BgpError::Protocol("AS_PATH payload truncated".to_string())); + } + + for index in 0..segment_len { + let start = cursor + (index * 2); + path.push(u16::from_be_bytes([data[start], data[start + 1]])); + } + + cursor += bytes_len; + } + + Ok(path) +} + +#[derive(Debug)] +enum BgpMessage { + Open(OpenMessage), + Update(UpdateMessage), + Notification(NotificationMessage), + Keepalive, + Unknown { msg_type: u8, payload: Vec }, +} + +#[derive(Debug)] +struct OpenMessage { + asn: u16, + hold_time: u16, + router_id: Ipv4Addr, +} + +#[derive(Debug)] +struct UpdateMessage { + withdrawn_routes: Vec, + announced_routes: Vec, + next_hop: Option, + as_path: Vec, +} + +#[derive(Debug)] +struct NotificationMessage { + code: u8, + subcode: u8, + data: Vec, } #[cfg(test)] mod tests { use super::*; + use tokio::net::TcpListener; + use tokio::sync::oneshot; + #[tokio::test] - async fn test_bgp_client_disabled() { - let config = BgpConfig { - enabled: false, - ..Default::default() - }; + async fn test_disabled_bgp_client_is_noop() { + let client = create_bgp_client(BgpConfig::default()).await.unwrap(); - let client = GobgpClient::new(config).await.unwrap(); - assert!(!client.is_connected().await); - - // Operations should succeed but do nothing - let vip = "10.0.1.100".parse().unwrap(); - let next_hop = "10.0.0.1".parse().unwrap(); + let vip: IpAddr = "203.0.113.10".parse().unwrap(); + let next_hop: IpAddr = "192.0.2.10".parse().unwrap(); assert!(client.announce_route(vip, next_hop).await.is_ok()); assert!(client.withdraw_route(vip).await.is_ok()); + assert!(!client.is_connected().await); } #[test] - fn test_format_prefix() { - let ipv4: IpAddr = "10.0.1.100".parse().unwrap(); - assert_eq!(GobgpClient::format_prefix(ipv4), "10.0.1.100/32"); + fn test_validate_config_rejects_same_as_peer() { + let config = BgpConfig { + enabled: true, + local_as: 65010, + peers: vec![BgpPeerConfig { + address: "127.0.0.1".to_string(), + port: 179, + asn: 65010, + description: String::new(), + }], + ..BgpConfig::default() + }; - let ipv6: IpAddr = "2001:db8::1".parse().unwrap(); - assert_eq!(GobgpClient::format_prefix(ipv6), "2001:db8::1/128"); + let error = validate_config(&config).unwrap_err(); + assert!(error.to_string().contains("currently supports eBGP only")); + } + + #[tokio::test] + async fn test_native_speaker_announces_and_withdraws_routes() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let port = listener.local_addr().unwrap().port(); + let (announced_tx, announced_rx) = oneshot::channel(); + + let peer_task = tokio::spawn(async move { + let (mut socket, _) = listener.accept().await.unwrap(); + + let open = match read_bgp_message(&mut socket).await.unwrap() { + BgpMessage::Open(open) => open, + other => panic!("expected OPEN, got {:?}", other), + }; + assert_eq!(open.asn, 65010); + assert_eq!(open.router_id, "192.0.2.10".parse::().unwrap()); + + send_open( + &mut socket, + 65020, + 90, + "192.0.2.20".parse::().unwrap(), + ) + .await + .unwrap(); + + match read_bgp_message(&mut socket).await.unwrap() { + BgpMessage::Keepalive => {} + other => panic!("expected KEEPALIVE, got {:?}", other), + } + + send_keepalive(&mut socket).await.unwrap(); + + let announcement = match read_bgp_message(&mut socket).await.unwrap() { + BgpMessage::Update(update) => update, + other => panic!("expected UPDATE announce, got {:?}", other), + }; + assert_eq!(announcement.withdrawn_routes, Vec::::new()); + assert_eq!( + announcement.announced_routes, + vec!["203.0.113.10".parse::().unwrap()] + ); + assert_eq!( + announcement.next_hop, + Some("192.0.2.10".parse::().unwrap()) + ); + assert_eq!(announcement.as_path, vec![65010]); + let _ = announced_tx.send(()); + + let withdrawal = match read_bgp_message(&mut socket).await.unwrap() { + BgpMessage::Update(update) => update, + other => panic!("expected UPDATE withdraw, got {:?}", other), + }; + assert_eq!( + withdrawal.withdrawn_routes, + vec!["203.0.113.10".parse::().unwrap()] + ); + assert_eq!(withdrawal.announced_routes, Vec::::new()); + }); + + let client = create_bgp_client(BgpConfig { + enabled: true, + local_as: 65010, + router_id: "192.0.2.10".to_string(), + peers: vec![BgpPeerConfig { + address: "127.0.0.1".to_string(), + port, + asn: 65020, + description: "test-peer".to_string(), + }], + ..BgpConfig::default() + }) + .await + .unwrap(); + + for _ in 0..20 { + if client.is_connected().await { + break; + } + sleep(Duration::from_millis(50)).await; + } + assert!(client.is_connected().await); + + let vip: IpAddr = "203.0.113.10".parse().unwrap(); + let next_hop: IpAddr = "192.0.2.10".parse().unwrap(); + client.announce_route(vip, next_hop).await.unwrap(); + announced_rx.await.unwrap(); + client.withdraw_route(vip).await.unwrap(); + + peer_task.await.unwrap(); + } + + #[test] + fn test_parse_update_message_extracts_routes() { + let mut attrs = Vec::new(); + attrs.extend_from_slice(&[ + ATTR_FLAG_TRANSITIVE, + ATTR_TYPE_ORIGIN, + 1, + BGP_ORIGIN_IGP, + ATTR_FLAG_TRANSITIVE, + ATTR_TYPE_AS_PATH, + 4, + AS_PATH_SEGMENT_SEQUENCE, + 1, + ]); + attrs.extend_from_slice(&65010u16.to_be_bytes()); + attrs.extend_from_slice(&[ATTR_FLAG_TRANSITIVE, ATTR_TYPE_NEXT_HOP, 4, 192, 0, 2, 10]); + + let mut payload = Vec::new(); + payload.extend_from_slice(&0u16.to_be_bytes()); + payload.extend_from_slice(&(attrs.len() as u16).to_be_bytes()); + payload.extend_from_slice(&attrs); + payload.extend_from_slice(&encode_ipv4_prefix("203.0.113.7".parse().unwrap())); + + let update = parse_update_message(&payload).unwrap(); + assert_eq!( + update.announced_routes, + vec!["203.0.113.7".parse::().unwrap()] + ); + assert_eq!(update.as_path, vec![65010]); + assert_eq!( + update.next_hop, + Some("192.0.2.10".parse::().unwrap()) + ); } } diff --git a/fiberlb/crates/fiberlb-server/src/config.rs b/fiberlb/crates/fiberlb-server/src/config.rs index 52b9c45..cbc40f4 100644 --- a/fiberlb/crates/fiberlb-server/src/config.rs +++ b/fiberlb/crates/fiberlb-server/src/config.rs @@ -1,7 +1,7 @@ //! Server configuration use serde::{Deserialize, Serialize}; -use std::net::SocketAddr; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; /// TLS configuration #[derive(Debug, Clone, Serialize, Deserialize)] @@ -70,6 +70,18 @@ pub struct ServerConfig { /// Authentication configuration #[serde(default)] pub auth: AuthConfig, + + /// Backend health checker configuration + #[serde(default)] + pub health: HealthRuntimeConfig, + + /// VIP advertisement reconciliation configuration + #[serde(default)] + pub vip_advertisement: VipAdvertisementConfig, + + /// Native BGP speaker configuration + #[serde(default)] + pub bgp: BgpConfig, } /// Authentication configuration @@ -84,6 +96,160 @@ fn default_iam_server_addr() -> String { "127.0.0.1:50051".to_string() } +/// Backend health checker runtime configuration. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct HealthRuntimeConfig { + /// Interval between backend health check sweeps. + #[serde(default = "default_health_check_interval_secs")] + pub interval_secs: u64, + + /// Timeout for individual backend checks. + #[serde(default = "default_health_check_timeout_secs")] + pub timeout_secs: u64, +} + +fn default_health_check_interval_secs() -> u64 { + 5 +} + +fn default_health_check_timeout_secs() -> u64 { + 5 +} + +impl Default for HealthRuntimeConfig { + fn default() -> Self { + Self { + interval_secs: default_health_check_interval_secs(), + timeout_secs: default_health_check_timeout_secs(), + } + } +} + +/// VIP advertisement reconciliation runtime configuration. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct VipAdvertisementConfig { + /// Interval between BGP advertisement reconciliation sweeps. + #[serde(default = "default_vip_check_interval_secs")] + pub interval_secs: u64, +} + +fn default_vip_check_interval_secs() -> u64 { + 3 +} + +impl Default for VipAdvertisementConfig { + fn default() -> Self { + Self { + interval_secs: default_vip_check_interval_secs(), + } + } +} + +/// Static BGP peer configuration. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct BgpPeerConfig { + /// Peer IP address or hostname. + pub address: String, + + /// Peer TCP port. + #[serde(default = "default_bgp_peer_port")] + pub port: u16, + + /// Peer AS number. + pub asn: u32, + + /// Optional operator-visible description. + #[serde(default)] + pub description: String, +} + +fn default_bgp_peer_port() -> u16 { + 179 +} + +/// Native BGP speaker configuration. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct BgpConfig { + /// Whether FiberLB should originate VIP routes itself. + #[serde(default)] + pub enabled: bool, + + /// Local AS number. + #[serde(default = "default_bgp_local_as")] + pub local_as: u32, + + /// BGP router ID. Must be IPv4. + #[serde(default = "default_bgp_router_id")] + pub router_id: String, + + /// Optional explicit next-hop address. Falls back to router_id. + #[serde(default)] + pub next_hop: Option, + + /// Requested hold time in seconds. + #[serde(default = "default_bgp_hold_time_secs")] + pub hold_time_secs: u16, + + /// Keepalive interval in seconds. + #[serde(default = "default_bgp_keepalive_secs")] + pub keepalive_secs: u16, + + /// Delay before reconnecting to a failed peer. + #[serde(default = "default_bgp_connect_retry_secs")] + pub connect_retry_secs: u64, + + /// Static peers for outbound eBGP sessions. + #[serde(default)] + pub peers: Vec, +} + +fn default_bgp_local_as() -> u32 { + 65001 +} + +fn default_bgp_router_id() -> String { + Ipv4Addr::new(127, 0, 0, 1).to_string() +} + +fn default_bgp_hold_time_secs() -> u16 { + 90 +} + +fn default_bgp_keepalive_secs() -> u16 { + 30 +} + +fn default_bgp_connect_retry_secs() -> u64 { + 5 +} + +impl BgpConfig { + /// Effective next hop advertised in UPDATE messages. + pub fn next_hop_addr(&self) -> std::result::Result { + self.next_hop.as_deref().unwrap_or(&self.router_id).parse() + } + + /// Parsed router ID as IPv4. + pub fn router_id_addr(&self) -> std::result::Result { + self.router_id.parse() + } +} + +impl Default for BgpConfig { + fn default() -> Self { + Self { + enabled: false, + local_as: default_bgp_local_as(), + router_id: default_bgp_router_id(), + next_hop: None, + hold_time_secs: default_bgp_hold_time_secs(), + keepalive_secs: default_bgp_keepalive_secs(), + connect_retry_secs: default_bgp_connect_retry_secs(), + peers: Vec::new(), + } + } +} + impl Default for AuthConfig { fn default() -> Self { Self { @@ -104,6 +270,9 @@ impl Default for ServerConfig { log_level: "info".to_string(), tls: None, auth: AuthConfig::default(), + health: HealthRuntimeConfig::default(), + vip_advertisement: VipAdvertisementConfig::default(), + bgp: BgpConfig::default(), } } } diff --git a/fiberlb/crates/fiberlb-server/src/healthcheck.rs b/fiberlb/crates/fiberlb-server/src/healthcheck.rs index 08a3a15..2a6c69a 100644 --- a/fiberlb/crates/fiberlb-server/src/healthcheck.rs +++ b/fiberlb/crates/fiberlb-server/src/healthcheck.rs @@ -104,8 +104,8 @@ impl HealthChecker { .await .map_err(|e| HealthCheckError::MetadataError(e.to_string()))?; - // Use first health check config, or default TCP check - let hc_config = health_checks.into_iter().next(); + // Use the first enabled health check config, or default TCP check. + let hc_config = health_checks.into_iter().find(|check| check.enabled); // Check all backends in the pool let backends = self @@ -210,9 +210,10 @@ impl HealthChecker { ); // Write request - stream.writable().await.map_err(|e| { - HealthCheckError::HttpError(format!("stream not writable: {}", e)) - })?; + stream + .writable() + .await + .map_err(|e| HealthCheckError::HttpError(format!("stream not writable: {}", e)))?; match stream.try_write(request.as_bytes()) { Ok(_) => {} @@ -223,18 +224,19 @@ impl HealthChecker { // Read response (just first line for status code) let mut buf = [0u8; 128]; - stream.readable().await.map_err(|e| { - HealthCheckError::HttpError(format!("stream not readable: {}", e)) - })?; + stream + .readable() + .await + .map_err(|e| HealthCheckError::HttpError(format!("stream not readable: {}", e)))?; let n = match stream.try_read(&mut buf) { Ok(n) => n, Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { // Wait a bit and try again tokio::time::sleep(Duration::from_millis(100)).await; - stream.try_read(&mut buf).map_err(|e| { - HealthCheckError::HttpError(format!("read failed: {}", e)) - })? + stream + .try_read(&mut buf) + .map_err(|e| HealthCheckError::HttpError(format!("read failed: {}", e)))? } Err(e) => { return Err(HealthCheckError::HttpError(format!("read failed: {}", e))); @@ -250,8 +252,11 @@ impl HealthChecker { let status_line = response.lines().next().unwrap_or(""); // Check for 2xx status code - if status_line.contains(" 200 ") || status_line.contains(" 201 ") || - status_line.contains(" 202 ") || status_line.contains(" 204 ") { + if status_line.contains(" 200 ") + || status_line.contains(" 201 ") + || status_line.contains(" 202 ") + || status_line.contains(" 204 ") + { Ok(()) } else { Err(HealthCheckError::HttpError(format!( @@ -266,11 +271,13 @@ impl HealthChecker { pub fn spawn_health_checker( metadata: Arc, check_interval: Duration, + check_timeout: Duration, ) -> (tokio::task::JoinHandle<()>, watch::Sender) { let (shutdown_tx, shutdown_rx) = watch::channel(false); let handle = tokio::spawn(async move { - let mut checker = HealthChecker::new(metadata, check_interval, shutdown_rx); + let mut checker = + HealthChecker::new(metadata, check_interval, shutdown_rx).with_timeout(check_timeout); checker.run().await; }); @@ -321,7 +328,8 @@ mod tests { #[tokio::test] async fn test_spawn_health_checker() { let metadata = Arc::new(LbMetadataStore::new_in_memory()); - let (handle, shutdown_tx) = spawn_health_checker(metadata, Duration::from_secs(60)); + let (handle, shutdown_tx) = + spawn_health_checker(metadata, Duration::from_secs(60), Duration::from_secs(5)); // Verify it started assert!(!handle.is_finished()); diff --git a/fiberlb/crates/fiberlb-server/src/lib.rs b/fiberlb/crates/fiberlb-server/src/lib.rs index 4b79bbb..20b253f 100644 --- a/fiberlb/crates/fiberlb-server/src/lib.rs +++ b/fiberlb/crates/fiberlb-server/src/lib.rs @@ -1,5 +1,6 @@ //! FiberLB server implementation +pub mod bgp_client; pub mod config; pub mod dataplane; pub mod healthcheck; @@ -9,13 +10,16 @@ pub mod maglev; pub mod metadata; pub mod services; pub mod tls; +pub mod vip_manager; +pub use bgp_client::{create_bgp_client, BgpClient, BgpError, NativeBgpSpeaker}; pub use config::ServerConfig; pub use dataplane::DataPlane; -pub use healthcheck::{HealthChecker, spawn_health_checker}; +pub use healthcheck::{spawn_health_checker, HealthChecker}; pub use l7_dataplane::L7DataPlane; pub use l7_router::L7Router; -pub use maglev::{MaglevTable, ConnectionTracker}; +pub use maglev::{ConnectionTracker, MaglevTable}; pub use metadata::LbMetadataStore; pub use services::*; pub use tls::{build_tls_config, CertificateStore, SniCertResolver}; +pub use vip_manager::VipManager; diff --git a/fiberlb/crates/fiberlb-server/src/maglev.rs b/fiberlb/crates/fiberlb-server/src/maglev.rs index 0776e37..4b45d60 100644 --- a/fiberlb/crates/fiberlb-server/src/maglev.rs +++ b/fiberlb/crates/fiberlb-server/src/maglev.rs @@ -3,9 +3,9 @@ //! Implementation of Google's Maglev consistent hashing algorithm for L4 load balancing. //! Reference: https://research.google/pubs/pub44824/ +use fiberlb_types::Backend; use std::collections::hash_map::DefaultHasher; use std::hash::{Hash, Hasher}; -use fiberlb_types::Backend; /// Default lookup table size (prime number for better distribution) /// Google's paper uses 65537, but we use a smaller prime for memory efficiency @@ -116,9 +116,7 @@ impl MaglevTable { let offset = Self::hash_offset(backend, size); let skip = Self::hash_skip(backend, size); - (0..size) - .map(|j| (offset + j * skip) % size) - .collect() + (0..size).map(|j| (offset + j * skip) % size).collect() } /// Hash function for offset calculation @@ -134,10 +132,26 @@ impl MaglevTable { let mut hasher = DefaultHasher::new(); backend.hash(&mut hasher); "skip".hash(&mut hasher); - let skip = (hasher.finish() as usize) % (size - 1) + 1; + let mut skip = (hasher.finish() as usize) % (size - 1) + 1; + + // For non-prime table sizes we still need a full permutation, so force + // the step to be coprime with the table size. + while Self::gcd(skip, size) != 1 { + skip = (skip % (size - 1)) + 1; + } + skip } + fn gcd(mut a: usize, mut b: usize) -> usize { + while b != 0 { + let remainder = a % b; + a = b; + b = remainder; + } + a + } + /// Hash a connection key (e.g., "192.168.1.1:54321") fn hash_key(key: &str) -> u64 { let mut hasher = DefaultHasher::new(); @@ -291,11 +305,9 @@ mod tests { // Count how many keys map to the same backend let mut unchanged = 0; - let mut total = 0; for (key, old_backend) in &mappings { if let Some(idx) = table2.lookup(key) { if let Some(new_backend) = table2.backend_id(idx) { - total += 1; // Only keys that were on removed backend should change if old_backend != "10.0.0.2:8080" { if old_backend == new_backend { diff --git a/fiberlb/crates/fiberlb-server/src/main.rs b/fiberlb/crates/fiberlb-server/src/main.rs index ea2d752..f38442d 100644 --- a/fiberlb/crates/fiberlb-server/src/main.rs +++ b/fiberlb/crates/fiberlb-server/src/main.rs @@ -4,31 +4,30 @@ use std::sync::Arc; use chainfire_client::Client as ChainFireClient; use clap::Parser; -use metrics_exporter_prometheus::PrometheusBuilder; use fiberlb_api::{ + backend_service_server::BackendServiceServer, + certificate_service_server::CertificateServiceServer, + health_check_service_server::HealthCheckServiceServer, + l7_policy_service_server::L7PolicyServiceServer, l7_rule_service_server::L7RuleServiceServer, + listener_service_server::ListenerServiceServer, load_balancer_service_server::LoadBalancerServiceServer, pool_service_server::PoolServiceServer, - backend_service_server::BackendServiceServer, - listener_service_server::ListenerServiceServer, - health_check_service_server::HealthCheckServiceServer, - l7_policy_service_server::L7PolicyServiceServer, - l7_rule_service_server::L7RuleServiceServer, - certificate_service_server::CertificateServiceServer, }; use fiberlb_server::{ - config::MetadataBackend, - LbMetadataStore, LoadBalancerServiceImpl, PoolServiceImpl, BackendServiceImpl, - ListenerServiceImpl, HealthCheckServiceImpl, L7PolicyServiceImpl, L7RuleServiceImpl, - CertificateServiceImpl, DataPlane, L7DataPlane, ServerConfig, + config::MetadataBackend, create_bgp_client, spawn_health_checker, BackendServiceImpl, + CertificateServiceImpl, DataPlane, HealthCheckServiceImpl, L7DataPlane, L7PolicyServiceImpl, + L7RuleServiceImpl, LbMetadataStore, ListenerServiceImpl, LoadBalancerServiceImpl, + PoolServiceImpl, ServerConfig, VipManager, }; use iam_service_auth::AuthService; +use metrics_exporter_prometheus::PrometheusBuilder; use std::net::SocketAddr; use std::path::PathBuf; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tonic::transport::{Certificate, Identity, Server, ServerTlsConfig}; use tonic::{Request, Status}; use tonic_health::server::health_reporter; use tracing_subscriber::EnvFilter; -use std::time::{SystemTime, UNIX_EPOCH}; /// FiberLB load balancer server #[derive(Parser, Debug)] @@ -113,8 +112,7 @@ async fn main() -> Result<(), Box> { // Initialize tracing tracing_subscriber::fmt() .with_env_filter( - EnvFilter::try_from_default_env() - .unwrap_or_else(|_| EnvFilter::new(&config.log_level)), + EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(&config.log_level)), ) .init(); @@ -158,8 +156,8 @@ async fn main() -> Result<(), Box> { config.flaredb_endpoint.clone(), config.chainfire_endpoint.clone(), ) - .await - .map_err(|e| format!("Failed to initialize FlareDB metadata store: {}", e))?, + .await + .map_err(|e| format!("Failed to initialize FlareDB metadata store: {}", e))?, ) } MetadataBackend::Postgres | MetadataBackend::Sqlite => { @@ -187,7 +185,10 @@ async fn main() -> Result<(), Box> { }; // Initialize IAM authentication service - tracing::info!("Connecting to IAM server at {}", config.auth.iam_server_addr); + tracing::info!( + "Connecting to IAM server at {}", + config.auth.iam_server_addr + ); let auth_service = AuthService::new(&config.auth.iam_server_addr) .await .map_err(|e| format!("Failed to connect to IAM server: {}", e))?; @@ -228,6 +229,34 @@ async fn main() -> Result<(), Box> { restore_runtime_listeners(metadata.clone(), dataplane.clone(), l7_dataplane.clone()).await?; + let (_health_task, health_shutdown_tx) = spawn_health_checker( + metadata.clone(), + Duration::from_secs(config.health.interval_secs.max(1)), + Duration::from_secs(config.health.timeout_secs.max(1)), + ); + + let vip_manager = if config.bgp.enabled { + let next_hop = config.bgp.next_hop_addr().map_err(|error| { + format!( + "failed to parse FiberLB BGP next hop '{}': {}", + config + .bgp + .next_hop + .as_deref() + .unwrap_or(&config.bgp.router_id), + error + ) + })?; + let bgp = create_bgp_client(config.bgp.clone()).await?; + let manager = Arc::new(VipManager::new(bgp, metadata.clone(), next_hop)); + let _vip_task = manager.clone().spawn(Duration::from_secs( + config.vip_advertisement.interval_secs.max(1), + )); + Some(manager) + } else { + None + }; + // Setup health service let (mut health_reporter, health_service) = health_reporter(); health_reporter @@ -289,7 +318,7 @@ async fn main() -> Result<(), Box> { // Start gRPC server tracing::info!("gRPC server listening on {}", grpc_addr); - server + let server_result = server .add_service(health_service) .add_service(tonic::codegen::InterceptedService::new( LoadBalancerServiceServer::new(lb_service), @@ -324,8 +353,17 @@ async fn main() -> Result<(), Box> { make_interceptor(auth_service.clone()), )) .serve(grpc_addr) - .await?; + .await; + let _ = health_shutdown_tx.send(true); + + if let Some(vip_manager) = vip_manager { + if let Err(error) = vip_manager.shutdown().await { + tracing::warn!(error = %error, "FiberLB VIP manager shutdown failed"); + } + } + + server_result?; Ok(()) } @@ -389,31 +427,29 @@ async fn register_chainfire_membership( let value = format!(r#"{{"addr":"{}","ts":{}}}"#, addr, ts); let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(120); let mut attempt = 0usize; - let mut last_error = String::new(); - - loop { + let last_error = loop { attempt += 1; - match ChainFireClient::connect(endpoint).await { + let current_error = match ChainFireClient::connect(endpoint).await { Ok(mut client) => match client.put_str(&key, &value).await { Ok(_) => return Ok(()), - Err(error) => last_error = format!("put failed: {}", error), + Err(error) => format!("put failed: {}", error), }, - Err(error) => last_error = format!("connect failed: {}", error), - } + Err(error) => format!("connect failed: {}", error), + }; if tokio::time::Instant::now() >= deadline { - break; + break current_error; } tracing::warn!( attempt, endpoint, service, - error = %last_error, + error = %current_error, "retrying ChainFire membership registration" ); tokio::time::sleep(std::time::Duration::from_secs(2)).await; - } + }; Err(std::io::Error::other(format!( "failed to register ChainFire membership for {} via {} after {} attempts: {}", @@ -435,9 +471,15 @@ async fn restore_runtime_listeners( } let result = if listener.is_l7() { - l7_dataplane.start_listener(listener.id).await.map_err(|e| e.to_string()) + l7_dataplane + .start_listener(listener.id) + .await + .map_err(|e| e.to_string()) } else { - dataplane.start_listener(listener.id).await.map_err(|e| e.to_string()) + dataplane + .start_listener(listener.id) + .await + .map_err(|e| e.to_string()) }; if let Err(err) = result { diff --git a/fiberlb/crates/fiberlb-server/src/vip_manager.rs b/fiberlb/crates/fiberlb-server/src/vip_manager.rs index 903f253..a43a02f 100644 --- a/fiberlb/crates/fiberlb-server/src/vip_manager.rs +++ b/fiberlb/crates/fiberlb-server/src/vip_manager.rs @@ -12,7 +12,7 @@ use tokio::sync::RwLock; use tokio::time::sleep; use tracing::{debug, error, info, warn}; -use crate::bgp_client::{BgpClient, BgpConfig}; +use crate::bgp_client::BgpClient; use crate::metadata::LbMetadataStore; use fiberlb_types::LoadBalancerId; @@ -43,11 +43,7 @@ pub struct VipManager { impl VipManager { /// Create a new VIP manager - pub fn new( - bgp: Arc, - metadata: Arc, - next_hop: IpAddr, - ) -> Self { + pub fn new(bgp: Arc, metadata: Arc, next_hop: IpAddr) -> Self { Self { bgp, metadata, @@ -119,7 +115,10 @@ impl VipManager { } /// Check if a load balancer has any healthy backends - async fn has_healthy_backends(&self, lb_id: &LoadBalancerId) -> Result> { + async fn has_healthy_backends( + &self, + lb_id: &LoadBalancerId, + ) -> Result> { // Get all pools for this load balancer let pools = self.metadata.list_pools(lb_id).await?; @@ -129,8 +128,7 @@ impl VipManager { // Check if any backend is healthy for backend in backends { - use fiberlb_types::BackendStatus; - if backend.status == BackendStatus::Online { + if backend.is_available() { return Ok(true); } } @@ -144,7 +142,10 @@ impl VipManager { /// Compares current advertisements with desired active VIPs and: /// - Announces new VIPs that should be active /// - Withdraws VIPs that should no longer be active - async fn reconcile_advertisements(&self, active_vips: &HashSet) -> Result<(), Box> { + async fn reconcile_advertisements( + &self, + active_vips: &HashSet, + ) -> Result<(), Box> { let mut state = self.vip_state.write().await; // Find VIPs to announce (active but not yet advertised) diff --git a/flake.nix b/flake.nix index 0b1f15a..d12d6c4 100644 --- a/flake.nix +++ b/flake.nix @@ -918,6 +918,15 @@ } ); + fiberlb-native-bgp-vm-smoke = pkgs.testers.runNixOSTest ( + import ./nix/tests/fiberlb-native-bgp-vm-smoke.nix { + inherit pkgs; + photoncloudPackages = self.packages.${system}; + photoncloudModule = self.nixosModules.default; + nixNosModule = nix-nos.nixosModules.default; + } + ); + deployer-bootstrap-e2e = pkgs.runCommand "deployer-bootstrap-e2e" { nativeBuildInputs = with pkgs; [ bash diff --git a/nix-nos/modules/bgp/gobgp.nix b/nix-nos/modules/bgp/gobgp.nix index e4db0ce..b02266c 100644 --- a/nix-nos/modules/bgp/gobgp.nix +++ b/nix-nos/modules/bgp/gobgp.nix @@ -55,7 +55,7 @@ let in { config = mkIf (config.nix-nos.enable && cfg.enable && cfg.backend == "gobgp") { # Install GoBGP package - environment.systemPackages = [ pkgs.gobgp ]; + environment.systemPackages = [ pkgs.gobgp pkgs.gobgpd ]; # GoBGP systemd service systemd.services.gobgpd = { @@ -65,7 +65,7 @@ in { serviceConfig = { Type = "simple"; - ExecStart = "${pkgs.gobgp}/bin/gobgpd -f ${gobgpConfig}"; + ExecStart = "${pkgs.gobgpd}/bin/gobgpd -f ${gobgpConfig}"; Restart = "on-failure"; RestartSec = "5s"; }; diff --git a/nix/modules/fiberlb.nix b/nix/modules/fiberlb.nix index 184bf0f..ff1d769 100644 --- a/nix/modules/fiberlb.nix +++ b/nix/modules/fiberlb.nix @@ -3,7 +3,35 @@ let cfg = config.services.fiberlb; tomlFormat = pkgs.formats.toml { }; - fiberlbConfigFile = tomlFormat.generate "fiberlb.toml" { + bgpPeerType = lib.types.submodule { + options = { + address = lib.mkOption { + type = lib.types.str; + description = "BGP peer IP address or hostname."; + example = "192.0.2.1"; + }; + + port = lib.mkOption { + type = lib.types.port; + default = 179; + description = "BGP peer TCP port."; + }; + + asn = lib.mkOption { + type = lib.types.ints.positive; + description = "Peer AS number."; + example = 65020; + }; + + description = lib.mkOption { + type = lib.types.str; + default = ""; + description = "Optional description used for logs and operators."; + }; + }; + }; + + fiberlbBaseConfig = { grpc_addr = "0.0.0.0:${toString cfg.port}"; log_level = "info"; auth = { @@ -12,7 +40,52 @@ let then cfg.iamAddr else "127.0.0.1:50080"; }; + health = { + interval_secs = cfg.healthCheckIntervalSecs; + timeout_secs = cfg.healthCheckTimeoutSecs; + }; + vip_advertisement = { + interval_secs = cfg.vipCheckIntervalSecs; + }; + } // lib.optionalAttrs cfg.bgp.enable { + bgp = + { + enabled = true; + local_as = cfg.bgp.localAs; + router_id = + if cfg.bgp.routerId != null + then cfg.bgp.routerId + else "127.0.0.1"; + hold_time_secs = cfg.bgp.holdTimeSecs; + keepalive_secs = cfg.bgp.keepaliveSecs; + connect_retry_secs = cfg.bgp.connectRetrySecs; + peers = map + (peer: { + inherit (peer) address port asn description; + }) + cfg.bgp.peers; + } + // lib.optionalAttrs (cfg.bgp.nextHop != null) { + next_hop = cfg.bgp.nextHop; + }; }; + fiberlbConfigFile = tomlFormat.generate "fiberlb.toml" (lib.recursiveUpdate fiberlbBaseConfig cfg.settings); + flaredbDependencies = lib.optional (cfg.metadataBackend == "flaredb") "flaredb.service"; + normalizedDatabaseUrl = + let + sqliteUrl = + if cfg.databaseUrl != null + && cfg.metadataBackend == "sqlite" + && lib.hasPrefix "sqlite:/" cfg.databaseUrl + && !(lib.hasPrefix "sqlite://" cfg.databaseUrl) + then "sqlite://${lib.removePrefix "sqlite:" cfg.databaseUrl}" + else cfg.databaseUrl; + in + if sqliteUrl != null + && cfg.metadataBackend == "sqlite" + && !(lib.hasInfix "?" sqliteUrl) + then "${sqliteUrl}?mode=rwc" + else sqliteUrl; in { options.services.fiberlb = { @@ -64,6 +137,72 @@ in description = "Enable single-node mode (required when metadata backend is SQLite)"; }; + healthCheckIntervalSecs = lib.mkOption { + type = lib.types.ints.positive; + default = 5; + description = "Interval between FiberLB backend health sweeps."; + }; + + healthCheckTimeoutSecs = lib.mkOption { + type = lib.types.ints.positive; + default = 5; + description = "Timeout for each FiberLB backend health probe."; + }; + + vipCheckIntervalSecs = lib.mkOption { + type = lib.types.ints.positive; + default = 3; + description = "Interval between FiberLB VIP-to-BGP reconciliation sweeps."; + }; + + bgp = { + enable = lib.mkEnableOption "FiberLB native BGP VIP advertisement"; + + localAs = lib.mkOption { + type = lib.types.ints.positive; + default = 65001; + description = "Local AS number used by FiberLB's native BGP speaker."; + }; + + routerId = lib.mkOption { + type = lib.types.nullOr lib.types.str; + default = null; + description = "IPv4 router ID used by FiberLB's native BGP speaker."; + example = "192.0.2.10"; + }; + + nextHop = lib.mkOption { + type = lib.types.nullOr lib.types.str; + default = null; + description = "Explicit BGP NEXT_HOP address. Defaults to routerId when unset."; + example = "192.0.2.10"; + }; + + holdTimeSecs = lib.mkOption { + type = lib.types.ints.positive; + default = 90; + description = "Requested BGP hold time in seconds."; + }; + + keepaliveSecs = lib.mkOption { + type = lib.types.ints.positive; + default = 30; + description = "BGP keepalive interval in seconds."; + }; + + connectRetrySecs = lib.mkOption { + type = lib.types.ints.positive; + default = 5; + description = "Delay before FiberLB reconnects to a failed BGP peer."; + }; + + peers = lib.mkOption { + type = lib.types.listOf bgpPeerType; + default = [ ]; + description = "Static BGP peers for FiberLB's native speaker."; + }; + }; + dataDir = lib.mkOption { type = lib.types.path; default = "/var/lib/fiberlb"; @@ -84,6 +223,25 @@ in }; config = lib.mkIf cfg.enable { + assertions = [ + { + assertion = cfg.metadataBackend != "sqlite" || cfg.singleNode; + message = "services.fiberlb.singleNode must be true when metadataBackend is sqlite"; + } + { + assertion = cfg.metadataBackend == "flaredb" || cfg.databaseUrl != null; + message = "services.fiberlb.databaseUrl is required when metadataBackend is postgres or sqlite"; + } + { + assertion = (!cfg.bgp.enable) || cfg.bgp.routerId != null; + message = "services.fiberlb.bgp.routerId must be set when native BGP is enabled"; + } + { + assertion = (!cfg.bgp.enable) || ((builtins.length cfg.bgp.peers) > 0); + message = "services.fiberlb.bgp.peers must contain at least one peer when native BGP is enabled"; + } + ]; + # Create system user users.users.fiberlb = { isSystemUser = true; @@ -98,8 +256,8 @@ in systemd.services.fiberlb = { description = "FiberLB Load Balancing Service"; wantedBy = [ "multi-user.target" ]; - after = [ "network.target" "iam.service" "flaredb.service" ]; - requires = [ "iam.service" "flaredb.service" ]; + after = [ "network.target" "iam.service" ] ++ flaredbDependencies; + requires = [ "iam.service" ] ++ flaredbDependencies; serviceConfig = { Type = "simple"; @@ -124,8 +282,8 @@ in "RUST_LOG=info" "FIBERLB_FLAREDB_ENDPOINT=${if cfg.flaredbAddr != null then cfg.flaredbAddr else "127.0.0.1:2479"}" "FIBERLB_METADATA_BACKEND=${cfg.metadataBackend}" - ] ++ lib.optional (cfg.databaseUrl != null) "FIBERLB_METADATA_DATABASE_URL=${cfg.databaseUrl}" - ++ lib.optional cfg.singleNode "FIBERLB_SINGLE_NODE=1" + ] ++ lib.optional (normalizedDatabaseUrl != null) "FIBERLB_METADATA_DATABASE_URL=${normalizedDatabaseUrl}" + ++ lib.optional cfg.singleNode "FIBERLB_SINGLE_NODE=true" ++ lib.optional (cfg.chainfireAddr != null) "FIBERLB_CHAINFIRE_ENDPOINT=http://${cfg.chainfireAddr}"; # Start command diff --git a/nix/modules/iam.nix b/nix/modules/iam.nix index dec32ab..c2251b4 100644 --- a/nix/modules/iam.nix +++ b/nix/modules/iam.nix @@ -122,7 +122,10 @@ in IAM_DATABASE_URL = cfg.databaseUrl; }) (lib.mkIf cfg.singleNode { - IAM_SINGLE_NODE = "1"; + IAM_SINGLE_NODE = "true"; + }) + (lib.mkIf (cfg.storeBackend == "memory") { + IAM_ALLOW_MEMORY_BACKEND = "1"; }) ]; diff --git a/nix/modules/plasmacloud-network.nix b/nix/modules/plasmacloud-network.nix index 445f50e..0b767ba 100644 --- a/nix/modules/plasmacloud-network.nix +++ b/nix/modules/plasmacloud-network.nix @@ -38,7 +38,7 @@ in { vips = mkOption { type = types.listOf types.str; default = []; - description = "VIPs to advertise via BGP (CIDR notation)"; + description = "Legacy static VIP hints. FiberLB native BGP ignores this list and advertises active load balancer VIPs dynamically."; example = [ "203.0.113.10/32" "203.0.113.11/32" ]; }; @@ -75,44 +75,30 @@ in { assertion = clusterCfg.bgp.asn > 0; message = "plasmacloud.cluster.bgp.asn must be configured for FiberLB BGP"; } - { - assertion = (length cfg.fiberlbBgp.vips) > 0; - message = "plasmacloud.network.fiberlbBgp.vips must contain at least one VIP"; - } { assertion = (length cfg.fiberlbBgp.peers) > 0; message = "plasmacloud.network.fiberlbBgp.peers must contain at least one BGP peer"; } + { + assertion = config.services.fiberlb.enable or false; + message = "plasmacloud.network.fiberlbBgp.enable requires services.fiberlb.enable"; + } ]; - # Wire to nix-nos.bgp (Layer 1) - nix-nos.enable = true; - nix-nos.bgp = { + services.fiberlb.bgp = { enable = true; - backend = "gobgp"; # FiberLB uses GoBGP - asn = clusterCfg.bgp.asn; - - # Auto-detect router ID from primary IP or use configured value + localAs = clusterCfg.bgp.asn; routerId = if cfg.fiberlbBgp.routerId != null then cfg.fiberlbBgp.routerId else - # Fallback to a simple IP extraction from node config let hostname = config.networking.hostName; node = clusterCfg.nodes.${hostname} or null; in if node != null then node.ip else "127.0.0.1"; - peers = cfg.fiberlbBgp.peers; - - # Convert VIPs to BGP announcements - announcements = map (vip: { prefix = vip; }) cfg.fiberlbBgp.vips; }; - - # FiberLB service configuration (if FiberLB is enabled) - # Note: This assumes fiberlb service is defined elsewhere - # services.fiberlb.bgp.gobgpAddress = mkIf (config.services.fiberlb.enable or false) "127.0.0.1:50051"; }) # PrismNET OVN integration diff --git a/nix/tests/fiberlb-native-bgp-vm-smoke.nix b/nix/tests/fiberlb-native-bgp-vm-smoke.nix new file mode 100644 index 0000000..d238fe2 --- /dev/null +++ b/nix/tests/fiberlb-native-bgp-vm-smoke.nix @@ -0,0 +1,378 @@ +{ + pkgs, + photoncloudPackages, + photoncloudModule, + nixNosModule, +}: + +let + gobgpdConfig = pkgs.writeText "fiberlb-native-bgp-peer.json" (builtins.toJSON { + global = { + config = { + as = 65020; + router-id = "192.168.100.1"; + }; + }; + + neighbors = [ + { + config = { + neighbor-address = "192.168.100.2"; + peer-as = 65010; + description = "fiberlb-under-test"; + }; + } + ]; + }); + + iamProtoDir = ../../iam/proto; + iamProto = "iam.proto"; + fiberlbProtoDir = ../../fiberlb/crates/fiberlb-api/proto; + fiberlbProto = "fiberlb.proto"; +in +{ + name = "fiberlb-native-bgp-vm-smoke"; + + nodes = { + router = + { ... }: + { + networking.hostName = "router"; + networking.useDHCP = false; + networking.firewall.enable = false; + virtualisation.vlans = [ 1 ]; + networking.interfaces.eth1.ipv4.addresses = [ + { + address = "192.168.100.1"; + prefixLength = 24; + } + ]; + + environment.systemPackages = with pkgs; [ + gobgp + gobgpd + jq + ]; + + systemd.services.gobgpd-peer = { + description = "GoBGP test peer for FiberLB native BGP smoke"; + wantedBy = [ "multi-user.target" ]; + after = [ "network.target" ]; + serviceConfig = { + Type = "simple"; + ExecStart = "${pkgs.gobgpd}/bin/gobgpd -t json -f ${gobgpdConfig} --api-hosts 127.0.0.1:50051 -p"; + Restart = "on-failure"; + RestartSec = "2s"; + }; + }; + + system.stateVersion = "24.11"; + }; + + lb = + { ... }: + { + imports = [ + nixNosModule + photoncloudModule + ]; + + networking.hostName = "lb"; + networking.useDHCP = false; + networking.firewall.enable = false; + virtualisation.vlans = [ 1 ]; + networking.interfaces.eth1.ipv4.addresses = [ + { + address = "192.168.100.2"; + prefixLength = 24; + } + ]; + + environment.systemPackages = with pkgs; [ + grpcurl + jq + python3 + ]; + + services.iam = { + enable = true; + package = photoncloudPackages.iam-server; + port = 50080; + httpPort = 8083; + storeBackend = "memory"; + }; + + systemd.services.iam.environment = { + IAM_ALLOW_RANDOM_SIGNING_KEY = "1"; + }; + + services.fiberlb = { + enable = true; + package = photoncloudPackages.fiberlb-server; + port = 50085; + iamAddr = "192.168.100.2:50080"; + metadataBackend = "sqlite"; + databaseUrl = "sqlite:/var/lib/fiberlb/metadata.db"; + singleNode = true; + healthCheckIntervalSecs = 1; + healthCheckTimeoutSecs = 1; + vipCheckIntervalSecs = 1; + bgp = { + enable = true; + localAs = 65010; + routerId = "192.168.100.2"; + nextHop = "192.168.100.2"; + holdTimeSecs = 9; + keepaliveSecs = 3; + peers = [ + { + address = "192.168.100.1"; + port = 179; + asn = 65020; + description = "router-peer"; + } + ]; + }; + }; + + systemd.services.mock-backend = { + description = "FiberLB health-check backend"; + wantedBy = [ "multi-user.target" ]; + after = [ "network.target" ]; + serviceConfig = { + Type = "simple"; + ExecStart = "${pkgs.python3}/bin/python -m http.server 18081 --bind 127.0.0.1"; + Restart = "always"; + RestartSec = "1s"; + }; + }; + + system.stateVersion = "24.11"; + }; + }; + + testScript = '' + import json + import shlex + import time + + IAM_PROTO_DIR = "${iamProtoDir}" + IAM_PROTO = "${iamProto}" + FIBERLB_PROTO_DIR = "${fiberlbProtoDir}" + FIBERLB_PROTO = "${fiberlbProto}" + + def grpcurl_json(machine, endpoint, import_path, proto, service, payload, headers=None): + header_args = "" + for header in headers or []: + header_args += f" -H {shlex.quote(header)}" + command = ( + f"grpcurl -plaintext{header_args} " + f"-import-path {shlex.quote(import_path)} " + f"-proto {shlex.quote(proto)} " + f"-d {shlex.quote(json.dumps(payload))} " + f"{shlex.quote(endpoint)} {shlex.quote(service)}" + ) + status, output = machine.execute(f"timeout 15 sh -lc {shlex.quote(command + ' 2>&1')}") + if status != 0: + raise AssertionError( + "grpcurl failed" + f" service={service}" + f" status={status}" + f" payload={json.dumps(payload, sort_keys=True)}" + f" output={output}" + ) + return json.loads(output) + + def issue_project_admin_token(machine, org_id, project_id): + principal_id = f"fiberlb-smoke-{int(time.time())}" + deadline = time.time() + 120 + + def retry(action): + last_error = None + while time.time() < deadline: + try: + return action() + except Exception as exc: + last_error = exc + time.sleep(2) + raise AssertionError(f"IAM bootstrap timed out: {last_error}") + + retry(lambda: grpcurl_json( + machine, + "127.0.0.1:50080", + IAM_PROTO_DIR, + IAM_PROTO, + "iam.v1.IamAdmin/CreatePrincipal", + { + "id": principal_id, + "kind": "PRINCIPAL_KIND_SERVICE_ACCOUNT", + "name": principal_id, + "orgId": org_id, + "projectId": project_id, + }, + )) + retry(lambda: grpcurl_json( + machine, + "127.0.0.1:50080", + IAM_PROTO_DIR, + IAM_PROTO, + "iam.v1.IamAdmin/CreateBinding", + { + "principal": { + "kind": "PRINCIPAL_KIND_SERVICE_ACCOUNT", + "id": principal_id, + }, + "role": "roles/ProjectAdmin", + "scope": { + "project": { + "id": project_id, + "orgId": org_id, + } + }, + }, + )) + token_response = retry(lambda: grpcurl_json( + machine, + "127.0.0.1:50080", + IAM_PROTO_DIR, + IAM_PROTO, + "iam.v1.IamToken/IssueToken", + { + "principalId": principal_id, + "principalKind": "PRINCIPAL_KIND_SERVICE_ACCOUNT", + "scope": { + "project": { + "id": project_id, + "orgId": org_id, + } + }, + "ttlSeconds": 3600, + }, + )) + return token_response["token"] + + def wait_for_backend_status(status, backend_id, token): + lb.wait_until_succeeds( + "grpcurl -plaintext " + f"-H {shlex.quote('authorization: Bearer ' + token)} " + f"-import-path {shlex.quote(FIBERLB_PROTO_DIR)} " + f"-proto {shlex.quote(FIBERLB_PROTO)} " + f"-d {shlex.quote(json.dumps({'id': backend_id}))} " + "127.0.0.1:50085 fiberlb.v1.BackendService/GetBackend " + f"| jq -e {shlex.quote(f'.backend.status == \"{status}\"')}" + ) + + def wait_for_route(prefix, present): + if present: + router.wait_until_succeeds( + f"gobgp -u 127.0.0.1 -p 50051 global rib | grep -F {shlex.quote(prefix)}" + ) + else: + deadline = time.time() + 60 + while time.time() < deadline: + output = router.succeed("gobgp -u 127.0.0.1 -p 50051 global rib || true") + if prefix not in output: + return + time.sleep(1) + raise AssertionError(f"route {prefix} still present in GoBGP RIB") + + start_all() + serial_stdout_off() + + router.wait_for_unit("gobgpd-peer.service") + router.wait_until_succeeds("ss -ltnH '( sport = :179 )' | grep -q LISTEN") + lb.wait_for_unit("iam.service") + lb.wait_until_succeeds("ss -ltnH '( sport = :50080 )' | grep -q LISTEN") + lb.wait_for_unit("mock-backend.service") + lb.wait_for_unit("fiberlb.service") + lb.wait_until_succeeds("ss -ltnH '( sport = :50085 )' | grep -q LISTEN") + + router.wait_until_succeeds("gobgp -u 127.0.0.1 -p 50051 neighbor | grep -F 192.168.100.2") + + token = issue_project_admin_token(lb, "bgp-smoke-org", "bgp-smoke-project") + + lb_response = grpcurl_json( + lb, + "127.0.0.1:50085", + FIBERLB_PROTO_DIR, + FIBERLB_PROTO, + "fiberlb.v1.LoadBalancerService/CreateLoadBalancer", + { + "name": "bgp-smoke-lb", + "orgId": "bgp-smoke-org", + "projectId": "bgp-smoke-project", + "description": "native bgp smoke", + }, + headers=[f"authorization: Bearer {token}"], + ) + loadbalancer = lb_response["loadbalancer"] + lb_id = loadbalancer["id"] + vip_prefix = f"{loadbalancer['vipAddress']}/32" + + pool_id = grpcurl_json( + lb, + "127.0.0.1:50085", + FIBERLB_PROTO_DIR, + FIBERLB_PROTO, + "fiberlb.v1.PoolService/CreatePool", + { + "name": "bgp-smoke-pool", + "loadbalancerId": lb_id, + "algorithm": "POOL_ALGORITHM_ROUND_ROBIN", + "protocol": "POOL_PROTOCOL_TCP", + }, + headers=[f"authorization: Bearer {token}"], + )["pool"]["id"] + + backend_id = grpcurl_json( + lb, + "127.0.0.1:50085", + FIBERLB_PROTO_DIR, + FIBERLB_PROTO, + "fiberlb.v1.BackendService/CreateBackend", + { + "name": "bgp-smoke-backend", + "poolId": pool_id, + "address": "127.0.0.1", + "port": 18081, + "weight": 1, + }, + headers=[f"authorization: Bearer {token}"], + )["backend"]["id"] + + grpcurl_json( + lb, + "127.0.0.1:50085", + FIBERLB_PROTO_DIR, + FIBERLB_PROTO, + "fiberlb.v1.HealthCheckService/CreateHealthCheck", + { + "name": "bgp-smoke-health", + "poolId": pool_id, + "type": "HEALTH_CHECK_TYPE_HTTP", + "intervalSeconds": 1, + "timeoutSeconds": 1, + "healthyThreshold": 1, + "unhealthyThreshold": 1, + "httpConfig": { + "method": "GET", + "path": "/", + "expectedCodes": [200], + }, + }, + headers=[f"authorization: Bearer {token}"], + ) + + wait_for_backend_status("BACKEND_STATUS_ONLINE", backend_id, token) + wait_for_route(vip_prefix, True) + + lb.succeed("systemctl stop mock-backend.service") + wait_for_backend_status("BACKEND_STATUS_OFFLINE", backend_id, token) + wait_for_route(vip_prefix, False) + + lb.succeed("systemctl start mock-backend.service") + lb.wait_for_unit("mock-backend.service") + wait_for_backend_status("BACKEND_STATUS_ONLINE", backend_id, token) + wait_for_route(vip_prefix, True) + ''; +}