From ce979d8f262bfa25551d6c42b735b22421b6771d Mon Sep 17 00:00:00 2001 From: centra Date: Mon, 30 Mar 2026 20:06:08 +0900 Subject: [PATCH] fiberlb: add BGP interop, drain, and policy validation --- .../crates/fiberlb-api/proto/fiberlb.proto | 1 + .../crates/fiberlb-server/src/bgp_client.rs | 854 +++++++++++++++++- fiberlb/crates/fiberlb-server/src/config.rs | 90 ++ fiberlb/crates/fiberlb-server/src/main.rs | 17 +- .../src/services/loadbalancer.rs | 54 +- .../crates/fiberlb-server/src/vip_manager.rs | 137 ++- flake.nix | 18 + nix/modules/fiberlb.nix | 69 ++ ...fiberlb-native-bgp-ecmp-drain-vm-smoke.nix | 745 +++++++++++++++ .../fiberlb-native-bgp-interop-vm-smoke.nix | 737 +++++++++++++++ 10 files changed, 2667 insertions(+), 55 deletions(-) create mode 100644 nix/tests/fiberlb-native-bgp-ecmp-drain-vm-smoke.nix create mode 100644 nix/tests/fiberlb-native-bgp-interop-vm-smoke.nix diff --git a/fiberlb/crates/fiberlb-api/proto/fiberlb.proto b/fiberlb/crates/fiberlb-api/proto/fiberlb.proto index 08f218f..0ada402 100644 --- a/fiberlb/crates/fiberlb-api/proto/fiberlb.proto +++ b/fiberlb/crates/fiberlb-api/proto/fiberlb.proto @@ -48,6 +48,7 @@ message CreateLoadBalancerRequest { string org_id = 2; string project_id = 3; string description = 4; + string vip_address = 5; } message CreateLoadBalancerResponse { diff --git a/fiberlb/crates/fiberlb-server/src/bgp_client.rs b/fiberlb/crates/fiberlb-server/src/bgp_client.rs index 65dda4f..cc2ccfc 100644 --- a/fiberlb/crates/fiberlb-server/src/bgp_client.rs +++ b/fiberlb/crates/fiberlb-server/src/bgp_client.rs @@ -5,15 +5,15 @@ //! and `/32` VIP advertise/withdraw driven by the VIP manager. use std::collections::HashMap; -use std::net::{IpAddr, Ipv4Addr}; -use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::sync::atomic::{AtomicU32, AtomicU64, AtomicUsize, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; -use tokio::net::TcpStream; +use tokio::net::{TcpStream, UdpSocket}; use tokio::sync::{mpsc, watch, RwLock}; -use tokio::time::sleep; +use tokio::time::{sleep, MissedTickBehavior}; use tracing::{debug, info, warn}; use crate::config::{BgpConfig, BgpPeerConfig}; @@ -26,18 +26,31 @@ const BGP_TYPE_NOTIFICATION: u8 = 3; const BGP_TYPE_KEEPALIVE: u8 = 4; const BGP_MAX_MESSAGE_SIZE: usize = 4096; const BGP_ORIGIN_IGP: u8 = 0; +const ATTR_FLAG_OPTIONAL: u8 = 0x80; const ATTR_FLAG_TRANSITIVE: u8 = 0x40; const ATTR_FLAG_EXTENDED_LEN: u8 = 0x10; const ATTR_TYPE_ORIGIN: u8 = 1; const ATTR_TYPE_AS_PATH: u8 = 2; const ATTR_TYPE_NEXT_HOP: u8 = 3; +const ATTR_TYPE_MULTI_EXIT_DISC: u8 = 4; +const ATTR_TYPE_COMMUNITIES: u8 = 8; const AS_PATH_SEGMENT_SEQUENCE: u8 = 2; const METRIC_BGP_CONFIGURED_PEERS: &str = "fiberlb_bgp_configured_peers"; const METRIC_BGP_CONNECTED_PEERS: &str = "fiberlb_bgp_connected_peers"; const METRIC_BGP_DESIRED_ROUTES: &str = "fiberlb_bgp_desired_routes"; const METRIC_BGP_PEER_SESSION_UP: &str = "fiberlb_bgp_peer_session_up"; +const METRIC_BGP_PEER_BFD_UP: &str = "fiberlb_bgp_peer_bfd_up"; const METRIC_BGP_SESSION_ESTABLISHED_TOTAL: &str = "fiberlb_bgp_session_established_total"; const METRIC_BGP_SESSION_ENDS_TOTAL: &str = "fiberlb_bgp_session_ends_total"; +const BFD_CONTROL_PORT: u16 = 3784; +const BFD_VERSION: u8 = 1; +const BFD_DIAGNOSTIC_NONE: u8 = 0; +const BFD_STATE_ADMIN_DOWN: u8 = 0; +const BFD_STATE_DOWN: u8 = 1; +const BFD_STATE_INIT: u8 = 2; +const BFD_STATE_UP: u8 = 3; +const BFD_PACKET_LEN: usize = 24; +static NEXT_BFD_DISCRIMINATOR: AtomicU32 = AtomicU32::new(1); /// Result type for BGP operations. pub type Result = std::result::Result; @@ -121,6 +134,9 @@ impl NativeBgpSpeaker { record_desired_routes(0); for peer in &config.peers { set_peer_session_up(peer, false); + if peer.bfd.enabled { + set_peer_bfd_up(peer, false); + } } for peer in config.peers.clone() { @@ -187,6 +203,266 @@ pub async fn create_bgp_client(config: BgpConfig) -> Result> Ok(Arc::new(speaker)) } +#[derive(Debug)] +enum SessionEvent { + Bgp(Result), + Bfd(BfdPeerState), +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum BfdPeerState { + Up, + Down, +} + +#[derive(Debug)] +struct BfdRuntime { + shutdown: watch::Sender, + task: tokio::task::JoinHandle<()>, +} + +fn maybe_start_bfd_runtime( + stream: &TcpStream, + peer: &BgpPeerConfig, + event_tx: mpsc::Sender, +) -> Result> { + if !peer.bfd.enabled { + return Ok(None); + } + + let local_ip = to_ipv4( + stream.local_addr().map_err(BgpError::Io)?.ip(), + "BFD local address", + )?; + let remote_ip = to_ipv4( + stream.peer_addr().map_err(BgpError::Io)?.ip(), + "BFD peer address", + )?; + let local_discriminator = NEXT_BFD_DISCRIMINATOR.fetch_add(1, Ordering::Relaxed).max(1); + + let std_socket = std::net::UdpSocket::bind(SocketAddr::new(IpAddr::V4(local_ip), 0))?; + std_socket.set_nonblocking(true)?; + std_socket.set_ttl(255)?; + std_socket.connect(SocketAddr::new(IpAddr::V4(remote_ip), BFD_CONTROL_PORT))?; + let socket = UdpSocket::from_std(std_socket)?; + + let (shutdown, shutdown_rx) = watch::channel(false); + let peer = peer.clone(); + let task = tokio::spawn(run_bfd_session( + socket, + peer, + local_discriminator, + event_tx, + shutdown_rx, + )); + + Ok(Some(BfdRuntime { shutdown, task })) +} + +async fn run_bfd_session( + socket: UdpSocket, + peer: BgpPeerConfig, + local_discriminator: u32, + event_tx: mpsc::Sender, + mut shutdown_rx: watch::Receiver, +) { + let desired_min_tx = Duration::from_millis(peer.bfd.desired_min_tx_millis.max(1)); + let required_min_rx = Duration::from_millis(peer.bfd.required_min_rx_millis.max(1)); + let detect_multiplier = peer.bfd.detect_multiplier.max(1) as u32; + let mut local_state = BFD_STATE_DOWN; + let mut remote_discriminator = 0u32; + let mut remote_desired_min_tx = desired_min_tx; + let mut remote_required_min_rx = required_min_rx; + let mut session_up = false; + let mut last_rx: Option = None; + let mut recv_buf = [0u8; 512]; + let mut send_tick = tokio::time::interval(desired_min_tx); + let mut watchdog_tick = tokio::time::interval(Duration::from_millis(100)); + send_tick.set_missed_tick_behavior(MissedTickBehavior::Delay); + watchdog_tick.set_missed_tick_behavior(MissedTickBehavior::Delay); + send_tick.tick().await; + watchdog_tick.tick().await; + + loop { + tokio::select! { + _ = send_tick.tick() => { + let effective_tx = desired_min_tx.max(remote_required_min_rx); + let packet = encode_bfd_packet( + local_state, + detect_multiplier as u8, + local_discriminator, + remote_discriminator, + effective_tx, + required_min_rx, + ); + if socket.send(&packet).await.is_err() { + break; + } + } + result = socket.recv(&mut recv_buf) => { + let Ok(len) = result else { + break; + }; + let Ok(packet) = parse_bfd_packet(&recv_buf[..len]) else { + continue; + }; + if packet.your_discriminator != 0 && packet.your_discriminator != local_discriminator { + continue; + } + + last_rx = Some(Instant::now()); + remote_discriminator = packet.my_discriminator; + let remote_state = packet.state; + remote_desired_min_tx = packet.desired_min_tx.max(Duration::from_millis(1)); + remote_required_min_rx = packet.required_min_rx.max(Duration::from_millis(1)); + local_state = next_bfd_local_state(local_state, remote_state); + + let now_up = remote_discriminator != 0 + && matches!(local_state, BFD_STATE_INIT | BFD_STATE_UP) + && matches!(remote_state, BFD_STATE_INIT | BFD_STATE_UP); + if now_up { + local_state = BFD_STATE_UP; + } + + if now_up && !session_up { + session_up = true; + if event_tx.send(SessionEvent::Bfd(BfdPeerState::Up)).await.is_err() { + break; + } + } else if !now_up && session_up { + session_up = false; + if event_tx.send(SessionEvent::Bfd(BfdPeerState::Down)).await.is_err() { + break; + } + } + } + _ = watchdog_tick.tick() => { + let Some(last_rx_at) = last_rx else { + continue; + }; + let detect_timeout = + remote_desired_min_tx.max(required_min_rx).mul_f64(detect_multiplier as f64); + if last_rx_at.elapsed() >= detect_timeout { + remote_discriminator = 0; + remote_desired_min_tx = desired_min_tx; + remote_required_min_rx = required_min_rx; + local_state = BFD_STATE_DOWN; + last_rx = None; + if session_up { + session_up = false; + if event_tx.send(SessionEvent::Bfd(BfdPeerState::Down)).await.is_err() { + break; + } + } + } + } + changed = shutdown_rx.changed() => { + if changed.is_err() || *shutdown_rx.borrow_and_update() { + break; + } + } + } + } +} + +fn next_bfd_local_state(current: u8, remote: u8) -> u8 { + match current { + BFD_STATE_DOWN => { + if matches!(remote, BFD_STATE_INIT | BFD_STATE_UP) { + BFD_STATE_UP + } else { + BFD_STATE_INIT + } + } + BFD_STATE_INIT => { + if matches!(remote, BFD_STATE_INIT | BFD_STATE_UP) { + BFD_STATE_UP + } else { + BFD_STATE_INIT + } + } + BFD_STATE_UP => { + if remote == BFD_STATE_DOWN { + BFD_STATE_INIT + } else { + BFD_STATE_UP + } + } + _ => BFD_STATE_DOWN, + } +} + +#[derive(Debug, Clone, Copy)] +struct BfdPacket { + state: u8, + my_discriminator: u32, + your_discriminator: u32, + desired_min_tx: Duration, + required_min_rx: Duration, +} + +fn encode_bfd_packet( + state: u8, + detect_multiplier: u8, + my_discriminator: u32, + your_discriminator: u32, + desired_min_tx: Duration, + required_min_rx: Duration, +) -> [u8; BFD_PACKET_LEN] { + let mut packet = [0u8; BFD_PACKET_LEN]; + packet[0] = (BFD_VERSION << 5) | BFD_DIAGNOSTIC_NONE; + packet[1] = state << 6; + packet[2] = detect_multiplier; + packet[3] = BFD_PACKET_LEN as u8; + packet[4..8].copy_from_slice(&my_discriminator.to_be_bytes()); + packet[8..12].copy_from_slice(&your_discriminator.to_be_bytes()); + packet[12..16].copy_from_slice(&duration_to_bfd_micros(desired_min_tx).to_be_bytes()); + packet[16..20].copy_from_slice(&duration_to_bfd_micros(required_min_rx).to_be_bytes()); + packet[20..24].copy_from_slice(&0u32.to_be_bytes()); + packet +} + +fn parse_bfd_packet(data: &[u8]) -> Result { + if data.len() < BFD_PACKET_LEN { + return Err(BgpError::Protocol(format!( + "BFD control packet too short: {} bytes", + data.len() + ))); + } + let version = data[0] >> 5; + if version != BFD_VERSION { + return Err(BgpError::Protocol(format!( + "unsupported BFD version {}", + version + ))); + } + + let state = data[1] >> 6; + if matches!(state, BFD_STATE_ADMIN_DOWN) { + return Err(BgpError::Protocol("peer entered BFD admin-down".to_string())); + } + + Ok(BfdPacket { + state, + my_discriminator: u32::from_be_bytes([data[4], data[5], data[6], data[7]]), + your_discriminator: u32::from_be_bytes([data[8], data[9], data[10], data[11]]), + desired_min_tx: bfd_micros_to_duration(u32::from_be_bytes([ + data[12], data[13], data[14], data[15], + ])), + required_min_rx: bfd_micros_to_duration(u32::from_be_bytes([ + data[16], data[17], data[18], data[19], + ])), + }) +} + +fn duration_to_bfd_micros(duration: Duration) -> u32 { + duration.as_micros().min(u32::MAX as u128) as u32 +} + +fn bfd_micros_to_duration(micros: u32) -> Duration { + Duration::from_micros(u64::from(micros.max(1))) +} + async fn run_peer_loop(config: BgpConfig, peer: BgpPeerConfig, shared: Arc) { let peer_name = peer_name(&peer); let peer_label = peer_metric_label(&peer); @@ -259,6 +535,9 @@ async fn establish_peer_session( ) .increment(1); set_peer_session_up(peer, true); + if peer.bfd.enabled { + set_peer_bfd_up(peer, false); + } shared.connected_sessions.fetch_add(1, Ordering::Relaxed); record_connected_peers(&shared); let session_result = run_established_session( @@ -273,6 +552,9 @@ async fn establish_peer_session( shared.connected_sessions.fetch_sub(1, Ordering::Relaxed); record_connected_peers(&shared); set_peer_session_up(peer, false); + if peer.bfd.enabled { + set_peer_bfd_up(peer, false); + } session_result } @@ -355,16 +637,17 @@ async fn run_established_session( ) -> Result<()> { let local_as = u16::try_from(config.local_as) .map_err(|_| BgpError::Config(format!("local AS {} is out of range", config.local_as)))?; - let (mut reader, mut writer) = stream.into_split(); let (event_tx, mut event_rx) = mpsc::channel(16); let peer_name = peer_name(peer); + let mut bfd_runtime = maybe_start_bfd_runtime(&stream, peer, event_tx.clone())?; + let (mut reader, mut writer) = stream.into_split(); let reader_task = tokio::spawn(async move { loop { let message = read_bgp_message(&mut reader).await; let terminal = matches!(message, Err(_) | Ok(BgpMessage::Notification(_))); - if event_tx.send(message).await.is_err() { + if event_tx.send(SessionEvent::Bgp(message)).await.is_err() { break; } @@ -382,22 +665,29 @@ async fn run_established_session( let mut route_updates = shared.route_updates.subscribe(); let mut advertised = HashMap::new(); - reconcile_routes(&shared, &mut writer, &mut advertised, local_as).await?; + let mut routes_ready = bfd_runtime.is_none(); + let mut bfd_bootstrap_deadline = if peer.bfd.enabled { + Some(Instant::now() + Duration::from_secs(peer.bfd.bootstrap_timeout_secs.max(1))) + } else { + None + }; + if routes_ready { + reconcile_routes(&shared, &mut writer, &mut advertised, local_as, peer).await?; + } let mut last_rx = Instant::now(); - loop { + let result = loop { tokio::select! { maybe_message = event_rx.recv() => { let Some(message) = maybe_message else { - reader_task.abort(); - return Err(BgpError::ConnectionFailed(format!("peer {} closed the session", peer_name))); + break Err(BgpError::ConnectionFailed(format!("peer {} closed the session", peer_name))); }; match message { - Ok(BgpMessage::Keepalive) => { + SessionEvent::Bgp(Ok(BgpMessage::Keepalive)) => { last_rx = Instant::now(); } - Ok(BgpMessage::Update(update)) => { + SessionEvent::Bgp(Ok(BgpMessage::Update(update))) => { last_rx = Instant::now(); debug!( peer = %peer_name, @@ -405,12 +695,13 @@ async fn run_established_session( withdrawn = update.withdrawn_routes.len(), next_hop = ?update.next_hop, as_path = ?update.as_path, + med = ?update.med, + communities = ?update.communities, "Ignoring inbound UPDATE from peer" ); } - Ok(BgpMessage::Notification(notification)) => { - reader_task.abort(); - return Err(BgpError::Protocol(format!( + SessionEvent::Bgp(Ok(BgpMessage::Notification(notification))) => { + break Err(BgpError::Protocol(format!( "peer {} sent notification {}:{} ({} data bytes)", peer_name, notification.code, @@ -418,7 +709,7 @@ async fn run_established_session( notification.data.len() ))); } - Ok(BgpMessage::Unknown { msg_type, payload }) => { + SessionEvent::Bgp(Ok(BgpMessage::Unknown { msg_type, payload })) => { last_rx = Instant::now(); debug!( peer = %peer_name, @@ -427,38 +718,77 @@ async fn run_established_session( "Ignoring unsupported BGP message" ); } - Ok(BgpMessage::Open(_)) => { - reader_task.abort(); - return Err(BgpError::Protocol(format!( + SessionEvent::Bgp(Ok(BgpMessage::Open(_))) => { + break Err(BgpError::Protocol(format!( "peer {} sent OPEN after session establishment", peer_name ))); } - Err(error) => { - reader_task.abort(); - return Err(error); + SessionEvent::Bgp(Err(error)) => { + break Err(error); + } + SessionEvent::Bfd(BfdPeerState::Up) => { + if !routes_ready { + routes_ready = true; + bfd_bootstrap_deadline = None; + set_peer_bfd_up(peer, true); + if let Err(error) = reconcile_routes(&shared, &mut writer, &mut advertised, local_as, peer).await { + break Err(error); + } + } + } + SessionEvent::Bfd(BfdPeerState::Down) => { + set_peer_bfd_up(peer, false); + if routes_ready { + break Err(BgpError::ConnectionFailed(format!( + "peer {} BFD session went down", + peer_name + ))); + } } } } _ = keepalive.tick() => { - send_keepalive(&mut writer).await?; + if let Err(error) = send_keepalive(&mut writer).await { + break Err(error); + } } changed = route_updates.changed() => { if changed.is_err() { - reader_task.abort(); - return Err(BgpError::ConnectionFailed(format!("peer {} route update channel closed", peer_name))); + break Err(BgpError::ConnectionFailed(format!("peer {} route update channel closed", peer_name))); } route_updates.borrow_and_update(); - reconcile_routes(&shared, &mut writer, &mut advertised, local_as).await?; + if routes_ready { + if let Err(error) = reconcile_routes(&shared, &mut writer, &mut advertised, local_as, peer).await { + break Err(error); + } + } } - _ = hold_monitor.tick(), if hold_deadline.is_some() => { - if last_rx.elapsed() >= hold_deadline.unwrap() { - reader_task.abort(); - return Err(BgpError::Protocol(format!("peer {} hold timer expired", peer_name))); + _ = hold_monitor.tick(), if hold_deadline.is_some() || bfd_bootstrap_deadline.is_some() => { + if let Some(hold_deadline) = hold_deadline { + if last_rx.elapsed() >= hold_deadline { + break Err(BgpError::Protocol(format!("peer {} hold timer expired", peer_name))); + } + } + if let Some(deadline) = bfd_bootstrap_deadline { + if Instant::now() >= deadline { + break Err(BgpError::ConnectionFailed(format!( + "peer {} BFD session did not become ready within {}s", + peer_name, + peer.bfd.bootstrap_timeout_secs + ))); + } } } } + }; + + reader_task.abort(); + if let Some(runtime) = bfd_runtime.take() { + let _ = runtime.shutdown.send(true); + runtime.task.abort(); } + result } async fn reconcile_routes( @@ -466,6 +796,7 @@ async fn reconcile_routes( writer: &mut W, advertised: &mut HashMap, local_as: u16, + peer: &BgpPeerConfig, ) -> Result<()> { let desired = shared.desired_routes.read().await.clone(); @@ -485,7 +816,7 @@ async fn reconcile_routes( continue; } - send_announce(writer, prefix, next_hop, local_as).await?; + send_announce(writer, prefix, next_hop, local_as, peer).await?; advertised.insert(prefix, next_hop); } @@ -566,6 +897,36 @@ fn validate_config(config: &BgpConfig) -> Result<()> { peer.asn ))); } + for community in &peer.export_policy.communities { + parse_standard_community(community).map_err(|error| { + BgpError::Config(format!( + "peer {} has invalid community '{}': {}", + peer_name(peer), + community, + error + )) + })?; + } + if peer.bfd.enabled { + if peer.bfd.desired_min_tx_millis == 0 || peer.bfd.required_min_rx_millis == 0 { + return Err(BgpError::Config(format!( + "peer {} must use non-zero BFD transmit and receive intervals", + peer_name(peer) + ))); + } + if peer.bfd.detect_multiplier == 0 { + return Err(BgpError::Config(format!( + "peer {} must use a non-zero BFD detect multiplier", + peer_name(peer) + ))); + } + if peer.bfd.bootstrap_timeout_secs == 0 { + return Err(BgpError::Config(format!( + "peer {} must use a non-zero BFD bootstrap timeout", + peer_name(peer) + ))); + } + } } Ok(()) @@ -601,6 +962,11 @@ fn set_peer_session_up(peer: &BgpPeerConfig, up: bool) { .set(if up { 1.0 } else { 0.0 }); } +fn set_peer_bfd_up(peer: &BgpPeerConfig, up: bool) { + metrics::gauge!(METRIC_BGP_PEER_BFD_UP, "peer" => peer_metric_label(peer)) + .set(if up { 1.0 } else { 0.0 }); +} + fn negotiated_keepalive_interval( requested_keepalive_secs: u16, negotiated_hold_time_secs: u16, @@ -651,22 +1017,44 @@ async fn send_announce( prefix: Ipv4Addr, next_hop: Ipv4Addr, local_as: u16, + peer: &BgpPeerConfig, ) -> Result<()> { let mut path_attributes = Vec::new(); - path_attributes.extend_from_slice(&[ATTR_FLAG_TRANSITIVE, ATTR_TYPE_ORIGIN, 1, BGP_ORIGIN_IGP]); - - path_attributes.extend_from_slice(&[ + path_attributes.extend(encode_path_attribute( + ATTR_FLAG_TRANSITIVE, + ATTR_TYPE_ORIGIN, + &[BGP_ORIGIN_IGP], + )); + path_attributes.extend(encode_path_attribute( ATTR_FLAG_TRANSITIVE, ATTR_TYPE_AS_PATH, - 4, - AS_PATH_SEGMENT_SEQUENCE, - 1, - ]); - path_attributes.extend_from_slice(&local_as.to_be_bytes()); - - path_attributes.extend_from_slice(&[ATTR_FLAG_TRANSITIVE, ATTR_TYPE_NEXT_HOP, 4]); - path_attributes.extend_from_slice(&next_hop.octets()); + &encode_as_path(local_as), + )); + path_attributes.extend(encode_path_attribute( + ATTR_FLAG_TRANSITIVE, + ATTR_TYPE_NEXT_HOP, + &next_hop.octets(), + )); + if let Some(med) = peer.export_policy.med { + path_attributes.extend(encode_path_attribute( + ATTR_FLAG_OPTIONAL, + ATTR_TYPE_MULTI_EXIT_DISC, + &med.to_be_bytes(), + )); + } + if !peer.export_policy.communities.is_empty() { + let mut communities = Vec::with_capacity(peer.export_policy.communities.len() * 4); + for community in &peer.export_policy.communities { + let parsed = parse_standard_community(community).map_err(BgpError::Config)?; + communities.extend_from_slice(&parsed.to_be_bytes()); + } + path_attributes.extend(encode_path_attribute( + ATTR_FLAG_OPTIONAL | ATTR_FLAG_TRANSITIVE, + ATTR_TYPE_COMMUNITIES, + &communities, + )); + } let nlri = encode_ipv4_prefix(prefix); let mut payload = Vec::with_capacity(2 + 2 + path_attributes.len() + nlri.len()); @@ -695,6 +1083,43 @@ fn encode_ipv4_prefix(prefix: Ipv4Addr) -> Vec { bytes } +fn encode_as_path(local_as: u16) -> [u8; 4] { + let as_bytes = local_as.to_be_bytes(); + [AS_PATH_SEGMENT_SEQUENCE, 1, as_bytes[0], as_bytes[1]] +} + +fn encode_path_attribute(flags: u8, attr_type: u8, value: &[u8]) -> Vec { + let mut attribute = + Vec::with_capacity(2 + if value.len() > u8::MAX as usize { 2 } else { 1 } + value.len()); + let extended = value.len() > u8::MAX as usize; + attribute.push(if extended { + flags | ATTR_FLAG_EXTENDED_LEN + } else { + flags + }); + attribute.push(attr_type); + if extended { + attribute.extend_from_slice(&(value.len() as u16).to_be_bytes()); + } else { + attribute.push(value.len() as u8); + } + attribute.extend_from_slice(value); + attribute +} + +fn parse_standard_community(community: &str) -> std::result::Result { + let (high, low) = community + .split_once(':') + .ok_or_else(|| "expected ASN:VALUE".to_string())?; + let high: u16 = high + .parse() + .map_err(|_| "community ASN must be a 16-bit integer".to_string())?; + let low: u16 = low + .parse() + .map_err(|_| "community value must be a 16-bit integer".to_string())?; + Ok((u32::from(high) << 16) | u32::from(low)) +} + async fn write_bgp_message( writer: &mut W, msg_type: u8, @@ -838,13 +1263,15 @@ fn parse_update_message(body: &[u8]) -> Result { cursor += attrs_len; let announced = parse_prefix_list(&body[cursor..])?; - let (next_hop, as_path) = parse_path_attributes(attrs)?; + let (next_hop, as_path, med, communities) = parse_path_attributes(attrs)?; Ok(UpdateMessage { withdrawn_routes: withdrawn, announced_routes: announced, next_hop, as_path, + med, + communities, }) } @@ -887,10 +1314,12 @@ fn parse_prefix_list(data: &[u8]) -> Result> { Ok(prefixes) } -fn parse_path_attributes(attrs: &[u8]) -> Result<(Option, Vec)> { +fn parse_path_attributes(attrs: &[u8]) -> Result<(Option, Vec, Option, Vec)> { let mut cursor = 0usize; let mut next_hop = None; let mut as_path = Vec::new(); + let mut med = None; + let mut communities = Vec::new(); while cursor < attrs.len() { if attrs.len() < cursor + 3 { @@ -935,11 +1364,19 @@ fn parse_path_attributes(attrs: &[u8]) -> Result<(Option, Vec)> { ATTR_TYPE_AS_PATH => { as_path = parse_as_path(value)?; } + ATTR_TYPE_MULTI_EXIT_DISC => { + if value.len() == 4 { + med = Some(u32::from_be_bytes([value[0], value[1], value[2], value[3]])); + } + } + ATTR_TYPE_COMMUNITIES => { + communities = parse_communities(value)?; + } _ => {} } } - Ok((next_hop, as_path)) + Ok((next_hop, as_path, med, communities)) } fn parse_as_path(data: &[u8]) -> Result> { @@ -971,6 +1408,20 @@ fn parse_as_path(data: &[u8]) -> Result> { Ok(path) } +fn parse_communities(data: &[u8]) -> Result> { + if data.len() % 4 != 0 { + return Err(BgpError::Protocol( + "COMMUNITIES payload must be a multiple of 4 bytes".to_string(), + )); + } + + let mut communities = Vec::with_capacity(data.len() / 4); + for chunk in data.chunks_exact(4) { + communities.push(u32::from_be_bytes([chunk[0], chunk[1], chunk[2], chunk[3]])); + } + Ok(communities) +} + #[derive(Debug)] enum BgpMessage { Open(OpenMessage), @@ -993,6 +1444,8 @@ struct UpdateMessage { announced_routes: Vec, next_hop: Option, as_path: Vec, + med: Option, + communities: Vec, } #[derive(Debug)] @@ -1005,9 +1458,10 @@ struct NotificationMessage { #[cfg(test)] mod tests { use super::*; + use crate::config::{BfdConfig, BgpExportPolicyConfig}; - use tokio::net::TcpListener; - use tokio::sync::{mpsc, oneshot}; + use tokio::net::{TcpListener, UdpSocket}; + use tokio::sync::{mpsc, oneshot, watch}; use tokio::time::timeout; #[tokio::test] @@ -1031,6 +1485,8 @@ mod tests { port: 179, asn: 65010, description: String::new(), + export_policy: BgpExportPolicyConfig::default(), + bfd: BfdConfig::default(), }], ..BgpConfig::default() }; @@ -1039,6 +1495,29 @@ mod tests { assert!(error.to_string().contains("currently supports eBGP only")); } + #[test] + fn test_validate_config_rejects_invalid_community() { + let config = BgpConfig { + enabled: true, + local_as: 65010, + peers: vec![BgpPeerConfig { + address: "127.0.0.1".to_string(), + port: 179, + asn: 65020, + description: String::new(), + export_policy: BgpExportPolicyConfig { + med: None, + communities: vec!["not-a-community".to_string()], + }, + bfd: BfdConfig::default(), + }], + ..BgpConfig::default() + }; + + let error = validate_config(&config).unwrap_err(); + assert!(error.to_string().contains("invalid community")); + } + #[tokio::test] async fn test_native_speaker_announces_and_withdraws_routes() { let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); @@ -1085,6 +1564,8 @@ mod tests { Some("192.0.2.10".parse::().unwrap()) ); assert_eq!(announcement.as_path, vec![65010]); + assert_eq!(announcement.med, None); + assert!(announcement.communities.is_empty()); let _ = announced_tx.send(()); let withdrawal = match read_bgp_message(&mut socket).await.unwrap() { @@ -1107,6 +1588,8 @@ mod tests { port, asn: 65020, description: "test-peer".to_string(), + export_policy: BgpExportPolicyConfig::default(), + bfd: BfdConfig::default(), }], ..BgpConfig::default() }) @@ -1130,6 +1613,68 @@ mod tests { peer_task.await.unwrap(); } + #[tokio::test] + async fn test_native_speaker_applies_peer_export_policy() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let port = listener.local_addr().unwrap().port(); + + let peer_task = tokio::spawn(async move { + let (mut socket, _) = listener.accept().await.unwrap(); + complete_test_peer_handshake( + &mut socket, + 65010, + "192.0.2.10".parse().unwrap(), + 65020, + "192.0.2.20".parse().unwrap(), + ) + .await; + + let announcement = expect_update(&mut socket).await; + assert_eq!(announcement.med, Some(42)); + assert_eq!( + announcement.communities, + vec![parse_standard_community("65010:100").unwrap()] + ); + }); + + let client = create_bgp_client(BgpConfig { + enabled: true, + local_as: 65010, + router_id: "192.0.2.10".to_string(), + peers: vec![BgpPeerConfig { + address: "127.0.0.1".to_string(), + port, + asn: 65020, + description: "policy-peer".to_string(), + export_policy: BgpExportPolicyConfig { + med: Some(42), + communities: vec!["65010:100".to_string()], + }, + bfd: BfdConfig::default(), + }], + ..BgpConfig::default() + }) + .await + .unwrap(); + + for _ in 0..20 { + if client.is_connected().await { + break; + } + sleep(Duration::from_millis(50)).await; + } + + client + .announce_route( + "203.0.113.10".parse().unwrap(), + "192.0.2.10".parse().unwrap(), + ) + .await + .unwrap(); + + peer_task.await.unwrap(); + } + #[tokio::test] async fn test_native_speaker_resyncs_routes_across_multiple_peers() { let listener_a = TcpListener::bind("127.0.0.1:0").await.unwrap(); @@ -1221,12 +1766,16 @@ mod tests { port: port_a, asn: 65020, description: "peer-a".to_string(), + export_policy: BgpExportPolicyConfig::default(), + bfd: BfdConfig::default(), }, BgpPeerConfig { address: "127.0.0.1".to_string(), port: port_b, asn: 65030, description: "peer-b".to_string(), + export_policy: BgpExportPolicyConfig::default(), + bfd: BfdConfig::default(), }, ], ..BgpConfig::default() @@ -1312,6 +1861,172 @@ mod tests { } } + async fn connected_udp_pair() -> (UdpSocket, UdpSocket) { + let left = UdpSocket::bind("127.0.0.1:0").await.unwrap(); + let right = UdpSocket::bind("127.0.0.1:0").await.unwrap(); + left.connect(right.local_addr().unwrap()).await.unwrap(); + right.connect(left.local_addr().unwrap()).await.unwrap(); + (left, right) + } + + #[tokio::test] + async fn test_bfd_runtime_reports_up_and_down() { + let (speaker_socket, peer_socket) = connected_udp_pair().await; + let (event_tx, mut event_rx) = mpsc::channel(8); + let (shutdown_tx, shutdown_rx) = watch::channel(false); + let peer = BgpPeerConfig { + address: "127.0.0.1".to_string(), + port: 179, + asn: 65020, + description: "bfd-test".to_string(), + export_policy: BgpExportPolicyConfig::default(), + bfd: BfdConfig { + enabled: true, + desired_min_tx_millis: 20, + required_min_rx_millis: 20, + detect_multiplier: 2, + bootstrap_timeout_secs: 1, + }, + }; + + let runtime = tokio::spawn(run_bfd_session( + speaker_socket, + peer.clone(), + 7, + event_tx, + shutdown_rx, + )); + + let mut buf = [0u8; 512]; + let len = timeout(Duration::from_secs(1), peer_socket.recv(&mut buf)) + .await + .unwrap() + .unwrap(); + let initial = parse_bfd_packet(&buf[..len]).unwrap(); + assert_eq!(initial.your_discriminator, 0); + assert_eq!(initial.my_discriminator, 7); + + let response = encode_bfd_packet( + BFD_STATE_INIT, + peer.bfd.detect_multiplier, + 42, + initial.my_discriminator, + Duration::from_millis(peer.bfd.desired_min_tx_millis), + Duration::from_millis(peer.bfd.required_min_rx_millis), + ); + for _ in 0..3 { + peer_socket.send(&response).await.unwrap(); + sleep(Duration::from_millis(20)).await; + } + + match timeout(Duration::from_secs(1), event_rx.recv()).await.unwrap() { + Some(SessionEvent::Bfd(BfdPeerState::Up)) => {} + other => panic!("expected BFD Up event, got {:?}", other), + } + + match timeout(Duration::from_secs(1), event_rx.recv()).await.unwrap() { + Some(SessionEvent::Bfd(BfdPeerState::Down)) => {} + other => panic!("expected BFD Down event, got {:?}", other), + } + + let _ = shutdown_tx.send(true); + runtime.await.unwrap(); + } + + #[tokio::test] + async fn test_native_speaker_waits_for_bfd_before_advertising() { + let listener = TcpListener::bind("127.0.0.2:0").await.unwrap(); + let port = listener.local_addr().unwrap().port(); + let bfd_socket = UdpSocket::bind("127.0.0.2:3784").await.unwrap(); + let (ready_tx, ready_rx) = oneshot::channel(); + + let peer_task = tokio::spawn(async move { + let (mut socket, _) = listener.accept().await.unwrap(); + complete_test_peer_handshake( + &mut socket, + 65010, + "192.0.2.10".parse().unwrap(), + 65020, + "192.0.2.20".parse().unwrap(), + ) + .await; + ready_tx.send(()).unwrap(); + + let mut bfd_buf = [0u8; 512]; + let (len, addr) = timeout(Duration::from_secs(1), bfd_socket.recv_from(&mut bfd_buf)) + .await + .unwrap() + .unwrap(); + let initial = parse_bfd_packet(&bfd_buf[..len]).unwrap(); + assert_eq!(initial.your_discriminator, 0); + + assert!(timeout(Duration::from_millis(300), read_bgp_message(&mut socket)) + .await + .is_err()); + + let response = encode_bfd_packet( + BFD_STATE_INIT, + 2, + 42, + initial.my_discriminator, + Duration::from_millis(20), + Duration::from_millis(20), + ); + for _ in 0..3 { + bfd_socket.send_to(&response, addr).await.unwrap(); + sleep(Duration::from_millis(20)).await; + } + + let announcement = expect_update(&mut socket).await; + assert_eq!( + announcement.announced_routes, + vec!["203.0.113.10".parse::().unwrap()] + ); + }); + + let client = create_bgp_client(BgpConfig { + enabled: true, + local_as: 65010, + router_id: "192.0.2.10".to_string(), + connect_retry_secs: 1, + peers: vec![BgpPeerConfig { + address: "127.0.0.2".to_string(), + port, + asn: 65020, + description: "bfd-peer".to_string(), + export_policy: BgpExportPolicyConfig::default(), + bfd: BfdConfig { + enabled: true, + desired_min_tx_millis: 20, + required_min_rx_millis: 20, + detect_multiplier: 2, + bootstrap_timeout_secs: 1, + }, + }], + ..BgpConfig::default() + }) + .await + .unwrap(); + + ready_rx.await.unwrap(); + for _ in 0..20 { + if client.is_connected().await { + break; + } + sleep(Duration::from_millis(50)).await; + } + + client + .announce_route( + "203.0.113.10".parse().unwrap(), + "192.0.2.10".parse().unwrap(), + ) + .await + .unwrap(); + + peer_task.await.unwrap(); + } + #[test] fn test_parse_update_message_extracts_routes() { let mut attrs = Vec::new(); @@ -1345,5 +2060,50 @@ mod tests { update.next_hop, Some("192.0.2.10".parse::().unwrap()) ); + assert_eq!(update.med, None); + assert!(update.communities.is_empty()); + } + + #[test] + fn test_parse_update_message_extracts_med_and_communities() { + let mut attrs = Vec::new(); + attrs.extend(encode_path_attribute( + ATTR_FLAG_TRANSITIVE, + ATTR_TYPE_ORIGIN, + &[BGP_ORIGIN_IGP], + )); + attrs.extend(encode_path_attribute( + ATTR_FLAG_TRANSITIVE, + ATTR_TYPE_AS_PATH, + &encode_as_path(65010), + )); + attrs.extend(encode_path_attribute( + ATTR_FLAG_TRANSITIVE, + ATTR_TYPE_NEXT_HOP, + &[192, 0, 2, 10], + )); + attrs.extend(encode_path_attribute( + ATTR_FLAG_OPTIONAL, + ATTR_TYPE_MULTI_EXIT_DISC, + &42u32.to_be_bytes(), + )); + attrs.extend(encode_path_attribute( + ATTR_FLAG_OPTIONAL | ATTR_FLAG_TRANSITIVE, + ATTR_TYPE_COMMUNITIES, + &parse_standard_community("65010:100").unwrap().to_be_bytes(), + )); + + let mut payload = Vec::new(); + payload.extend_from_slice(&0u16.to_be_bytes()); + payload.extend_from_slice(&(attrs.len() as u16).to_be_bytes()); + payload.extend_from_slice(&attrs); + payload.extend_from_slice(&encode_ipv4_prefix("203.0.113.7".parse().unwrap())); + + let update = parse_update_message(&payload).unwrap(); + assert_eq!(update.med, Some(42)); + assert_eq!( + update.communities, + vec![parse_standard_community("65010:100").unwrap()] + ); } } diff --git a/fiberlb/crates/fiberlb-server/src/config.rs b/fiberlb/crates/fiberlb-server/src/config.rs index d658256..b825d5c 100644 --- a/fiberlb/crates/fiberlb-server/src/config.rs +++ b/fiberlb/crates/fiberlb-server/src/config.rs @@ -135,16 +135,34 @@ pub struct VipAdvertisementConfig { /// Interval between BGP advertisement reconciliation sweeps. #[serde(default = "default_vip_check_interval_secs")] pub interval_secs: u64, + + /// Presence of this file puts the node into control-plane drain mode. + #[serde(default = "default_vip_drain_file")] + pub drain_file: String, + + /// Time to keep a locally owned VIP after withdrawing it for drain. + #[serde(default = "default_vip_drain_hold_time_secs")] + pub drain_hold_time_secs: u64, } fn default_vip_check_interval_secs() -> u64 { 3 } +fn default_vip_drain_file() -> String { + "/var/lib/fiberlb/drain".to_string() +} + +fn default_vip_drain_hold_time_secs() -> u64 { + 5 +} + impl Default for VipAdvertisementConfig { fn default() -> Self { Self { interval_secs: default_vip_check_interval_secs(), + drain_file: default_vip_drain_file(), + drain_hold_time_secs: default_vip_drain_hold_time_secs(), } } } @@ -190,12 +208,84 @@ pub struct BgpPeerConfig { /// Optional operator-visible description. #[serde(default)] pub description: String, + + /// Optional export policy applied to announcements sent to this peer. + #[serde(default)] + pub export_policy: BgpExportPolicyConfig, + + /// Optional single-hop BFD session parameters for this peer. + #[serde(default)] + pub bfd: BfdConfig, } fn default_bgp_peer_port() -> u16 { 179 } +/// Peer-scoped BGP export policy. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)] +pub struct BgpExportPolicyConfig { + /// Optional MED attached to announced VIP routes. + #[serde(default)] + pub med: Option, + + /// Optional standard communities attached to announced VIP routes. + #[serde(default)] + pub communities: Vec, +} + +/// Single-hop BFD configuration for a BGP peer. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct BfdConfig { + /// Whether BFD should gate route advertisement for this peer. + #[serde(default)] + pub enabled: bool, + + /// Desired transmit interval in milliseconds. + #[serde(default = "default_bfd_desired_min_tx_millis")] + pub desired_min_tx_millis: u64, + + /// Required receive interval in milliseconds. + #[serde(default = "default_bfd_required_min_rx_millis")] + pub required_min_rx_millis: u64, + + /// Detection multiplier. + #[serde(default = "default_bfd_detect_multiplier")] + pub detect_multiplier: u8, + + /// Maximum time to wait for the session to reach Up after BGP establishment. + #[serde(default = "default_bfd_bootstrap_timeout_secs")] + pub bootstrap_timeout_secs: u64, +} + +fn default_bfd_desired_min_tx_millis() -> u64 { + 300 +} + +fn default_bfd_required_min_rx_millis() -> u64 { + 300 +} + +fn default_bfd_detect_multiplier() -> u8 { + 3 +} + +fn default_bfd_bootstrap_timeout_secs() -> u64 { + 10 +} + +impl Default for BfdConfig { + fn default() -> Self { + Self { + enabled: false, + desired_min_tx_millis: default_bfd_desired_min_tx_millis(), + required_min_rx_millis: default_bfd_required_min_rx_millis(), + detect_multiplier: default_bfd_detect_multiplier(), + bootstrap_timeout_secs: default_bfd_bootstrap_timeout_secs(), + } + } +} + /// Native BGP speaker configuration. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct BgpConfig { diff --git a/fiberlb/crates/fiberlb-server/src/main.rs b/fiberlb/crates/fiberlb-server/src/main.rs index e2a6140..294e8e7 100644 --- a/fiberlb/crates/fiberlb-server/src/main.rs +++ b/fiberlb/crates/fiberlb-server/src/main.rs @@ -148,6 +148,10 @@ async fn main() -> Result<(), Box> { "fiberlb_bgp_peer_session_up", "Per-peer BGP session state (1=established, 0=down)" ); + metrics::describe_gauge!( + "fiberlb_bgp_peer_bfd_up", + "Per-peer BFD session state for FiberLB native BGP peers (1=up, 0=down)" + ); metrics::describe_counter!( "fiberlb_bgp_session_established_total", "Total number of BGP peer sessions established" @@ -156,6 +160,10 @@ async fn main() -> Result<(), Box> { "fiberlb_bgp_session_ends_total", "Total number of BGP peer session terminations by peer and result" ); + metrics::describe_gauge!( + "fiberlb_vip_drain_active", + "Whether FiberLB node drain mode is active (1=drain, 0=normal)" + ); if let Some(endpoint) = &config.chainfire_endpoint { tracing::info!(" Cluster coordination: ChainFire @ {}", endpoint); @@ -280,7 +288,14 @@ async fn main() -> Result<(), Box> { } else { None }; - let manager = Arc::new(VipManager::new(bgp, metadata.clone(), next_hop, vip_owner)); + let manager = Arc::new(VipManager::new( + bgp, + metadata.clone(), + next_hop, + vip_owner, + config.vip_advertisement.drain_file.clone(), + Duration::from_secs(config.vip_advertisement.drain_hold_time_secs), + )); let _vip_task = manager.clone().spawn(Duration::from_secs( config.vip_advertisement.interval_secs.max(1), )); diff --git a/fiberlb/crates/fiberlb-server/src/services/loadbalancer.rs b/fiberlb/crates/fiberlb-server/src/services/loadbalancer.rs index 393ccb7..cbfcad3 100644 --- a/fiberlb/crates/fiberlb-server/src/services/loadbalancer.rs +++ b/fiberlb/crates/fiberlb-server/src/services/loadbalancer.rs @@ -1,5 +1,6 @@ //! LoadBalancer service implementation +use std::net::IpAddr; use std::sync::Arc; use base64::Engine as _; @@ -31,6 +32,44 @@ impl LoadBalancerServiceImpl { } } +fn normalize_requested_vip(vip_address: &str) -> Result, Status> { + let trimmed = vip_address.trim(); + if trimmed.is_empty() { + return Ok(None); + } + + let vip: IpAddr = trimmed + .parse() + .map_err(|_| Status::invalid_argument("vip_address must be a valid IP address"))?; + + if vip.is_unspecified() || vip.is_multicast() { + return Err(Status::invalid_argument( + "vip_address must be a usable unicast address", + )); + } + + Ok(Some(vip.to_string())) +} + +async fn ensure_vip_available(metadata: &LbMetadataStore, vip: &str) -> Result<(), Status> { + let lbs = metadata + .list_all_lbs() + .await + .map_err(|e| Status::internal(format!("metadata error: {}", e)))?; + + if lbs + .iter() + .any(|lb| lb.vip_address.as_deref() == Some(vip)) + { + return Err(Status::already_exists(format!( + "vip_address {} is already in use", + vip + ))); + } + + Ok(()) +} + const ACTION_LB_CREATE: &str = "network:loadbalancers:create"; const ACTION_LB_READ: &str = "network:loadbalancers:read"; const ACTION_LB_LIST: &str = "network:loadbalancers:list"; @@ -98,11 +137,16 @@ impl LoadBalancerService for LoadBalancerServiceImpl { lb.description = Some(req.description); } - // Allocate VIP from pool - let vip = self.metadata - .allocate_vip() - .await - .map_err(|e| Status::resource_exhausted(format!("failed to allocate VIP: {}", e)))?; + let requested_vip = normalize_requested_vip(&req.vip_address)?; + let vip = if let Some(vip) = requested_vip { + ensure_vip_available(&self.metadata, &vip).await?; + vip + } else { + self.metadata + .allocate_vip() + .await + .map_err(|e| Status::resource_exhausted(format!("failed to allocate VIP: {}", e)))? + }; lb.vip_address = Some(vip); // Save load balancer diff --git a/fiberlb/crates/fiberlb-server/src/vip_manager.rs b/fiberlb/crates/fiberlb-server/src/vip_manager.rs index 241fa32..ca16034 100644 --- a/fiberlb/crates/fiberlb-server/src/vip_manager.rs +++ b/fiberlb/crates/fiberlb-server/src/vip_manager.rs @@ -6,7 +6,7 @@ use std::collections::{HashMap, HashSet}; use std::net::IpAddr; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use tokio::sync::{watch, RwLock}; use tokio::time::sleep; @@ -17,6 +17,8 @@ use crate::metadata::LbMetadataStore; use crate::vip_owner::VipAddressOwner; use fiberlb_types::LoadBalancerId; +const METRIC_VIP_DRAIN_ACTIVE: &str = "fiberlb_vip_drain_active"; + /// Current local control-plane state for a VIP. #[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] struct VipState { @@ -24,6 +26,8 @@ struct VipState { owned: bool, /// The VIP is advertised to BGP peers. advertised: bool, + /// When the node entered drain while this VIP was active. + drain_started_at: Option, } impl VipState { @@ -48,6 +52,10 @@ pub struct VipManager { vip_owner: Option>, /// Router's own IP address (used as BGP next hop) next_hop: IpAddr, + /// Presence of this file activates drain mode. + drain_file: String, + /// How long to keep a locally-owned VIP after withdrawing it for drain. + drain_hold_time: Duration, /// Shutdown signal for the background reconciliation task. shutdown: watch::Sender, } @@ -59,6 +67,8 @@ impl VipManager { metadata: Arc, next_hop: IpAddr, vip_owner: Option>, + drain_file: String, + drain_hold_time: Duration, ) -> Self { let (shutdown, _shutdown_rx) = watch::channel(false); Self { @@ -67,6 +77,8 @@ impl VipManager { vip_state: Arc::new(RwLock::new(HashMap::new())), vip_owner, next_hop, + drain_file, + drain_hold_time, shutdown, } } @@ -141,12 +153,24 @@ impl VipManager { } } + let drain_active = self.is_drain_active().await; + metrics::gauge!(METRIC_VIP_DRAIN_ACTIVE).set(if drain_active { 1.0 } else { 0.0 }); + // Update BGP advertisements - self.reconcile_advertisements(&active_vips).await?; + self.reconcile_advertisements(&active_vips, drain_active).await?; Ok(()) } + async fn is_drain_active(&self) -> bool { + let path = self.drain_file.trim(); + if path.is_empty() { + return false; + } + + tokio::fs::metadata(path).await.is_ok() + } + /// Check if a load balancer has any healthy backends async fn has_healthy_backends( &self, @@ -178,13 +202,58 @@ impl VipManager { async fn reconcile_advertisements( &self, active_vips: &HashSet, + drain_active: bool, ) -> Result<(), Box> { let mut state = self.vip_state.write().await; + let now = Instant::now(); for vip in active_vips { let mut vip_state = state.get(vip).copied().unwrap_or_default(); let mut changed = false; + if drain_active { + if vip_state.advertised { + info!("Withdrawing VIP {} for node drain", vip); + if let Err(error) = self.bgp.withdraw_route(*vip).await { + error!("Failed to withdraw VIP {} for drain: {}", vip, error); + } else { + vip_state.advertised = false; + vip_state.drain_started_at = Some(now); + changed = true; + } + } else if vip_state.owned && vip_state.drain_started_at.is_none() { + vip_state.drain_started_at = Some(now); + changed = true; + } + + if vip_state.owned { + let drain_started_at = vip_state.drain_started_at.unwrap_or(now); + if now.duration_since(drain_started_at) >= self.drain_hold_time { + if let Some(vip_owner) = &self.vip_owner { + info!("Releasing local VIP {} after drain hold", vip); + if let Err(error) = vip_owner.ensure_absent(*vip).await { + error!("Failed to release local VIP {} after drain: {}", vip, error); + } else { + vip_state.owned = false; + changed = true; + } + } else { + vip_state.owned = false; + changed = true; + } + } + } + + if vip_state.is_idle() { + state.remove(vip); + } else if changed { + state.insert(*vip, vip_state); + } + continue; + } + + vip_state.drain_started_at = None; + if !vip_state.owned { if let Some(vip_owner) = &self.vip_owner { info!("Claiming local VIP {} on this node", vip); @@ -220,6 +289,7 @@ impl VipManager { for vip in managed_vips { if !active_vips.contains(&vip) { let mut vip_state = state.get(&vip).copied().unwrap_or_default(); + vip_state.drain_started_at = None; if vip_state.owned { if let Some(vip_owner) = &self.vip_owner { @@ -266,6 +336,7 @@ impl VipManager { } vip_state.owned = true; } + vip_state.drain_started_at = None; if !vip_state.advertised { info!("Manually advertising VIP {}", vip); @@ -290,6 +361,7 @@ impl VipManager { } vip_state.owned = false; } + vip_state.drain_started_at = None; if vip_state.advertised { info!("Manually withdrawing VIP {}", vip); @@ -318,6 +390,7 @@ impl VipManager { for vip in managed_vips { let mut vip_state = state.get(&vip).copied().unwrap_or_default(); + vip_state.drain_started_at = None; if vip_state.owned { info!("Releasing local VIP {} for shutdown", vip); @@ -372,6 +445,7 @@ mod tests { use crate::bgp_client::{BgpClient, Result}; use crate::vip_owner::VipOwnershipError; use std::sync::Mutex; + use std::time::{Duration, SystemTime, UNIX_EPOCH}; /// Mock BGP client for testing struct MockBgpClient { @@ -459,6 +533,8 @@ mod tests { metadata, next_hop, Some(mock_owner.clone()), + String::new(), + Duration::from_secs(0), ); let vip: IpAddr = "10.0.1.100".parse().unwrap(); @@ -484,4 +560,61 @@ mod tests { ] ); } + + #[tokio::test] + async fn test_vip_drain_withdraws_before_releasing_vip() { + let events = Arc::new(Mutex::new(Vec::new())); + let mock_bgp = Arc::new(MockBgpClient::new(events.clone())); + let mock_owner = Arc::new(MockVipOwner::new(events.clone())); + let metadata = Arc::new(LbMetadataStore::new_in_memory()); + let next_hop = "10.0.0.1".parse().unwrap(); + let drain_path = std::env::temp_dir().join(format!( + "fiberlb-drain-{}", + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_nanos() + )); + + let manager = VipManager::new( + mock_bgp, + metadata, + next_hop, + Some(mock_owner.clone()), + drain_path.display().to_string(), + Duration::from_millis(200), + ); + + let vip: IpAddr = "10.0.1.100".parse().unwrap(); + manager.advertise_vip(vip).await.unwrap(); + + tokio::fs::write(&drain_path, b"1").await.unwrap(); + let mut active_vips = HashSet::new(); + active_vips.insert(vip); + + manager + .reconcile_advertisements(&active_vips, true) + .await + .unwrap(); + assert!(mock_owner.released.lock().unwrap().is_empty()); + assert_eq!(manager.get_advertised_vips().await, Vec::::new()); + + sleep(Duration::from_millis(250)).await; + manager + .reconcile_advertisements(&active_vips, true) + .await + .unwrap(); + assert!(mock_owner.released.lock().unwrap().contains(&vip)); + + tokio::fs::remove_file(&drain_path).await.unwrap(); + assert_eq!( + events.lock().unwrap().clone(), + vec![ + format!("own:{vip}"), + format!("announce:{vip}"), + format!("withdraw:{vip}"), + format!("unown:{vip}"), + ] + ); + } } diff --git a/flake.nix b/flake.nix index 223e9af..34aad00 100644 --- a/flake.nix +++ b/flake.nix @@ -936,6 +936,24 @@ } ); + fiberlb-native-bgp-interop-vm-smoke = pkgs.testers.runNixOSTest ( + import ./nix/tests/fiberlb-native-bgp-interop-vm-smoke.nix { + inherit pkgs; + photoncloudPackages = self.packages.${system}; + photoncloudModule = self.nixosModules.default; + nixNosModule = nix-nos.nixosModules.default; + } + ); + + fiberlb-native-bgp-ecmp-drain-vm-smoke = pkgs.testers.runNixOSTest ( + import ./nix/tests/fiberlb-native-bgp-ecmp-drain-vm-smoke.nix { + inherit pkgs; + photoncloudPackages = self.packages.${system}; + photoncloudModule = self.nixosModules.default; + nixNosModule = nix-nos.nixosModules.default; + } + ); + deployer-bootstrap-e2e = pkgs.runCommand "deployer-bootstrap-e2e" { nativeBuildInputs = with pkgs; [ bash diff --git a/nix/modules/fiberlb.nix b/nix/modules/fiberlb.nix index 3fdac82..d82e287 100644 --- a/nix/modules/fiberlb.nix +++ b/nix/modules/fiberlb.nix @@ -28,6 +28,47 @@ let default = ""; description = "Optional description used for logs and operators."; }; + + med = lib.mkOption { + type = lib.types.nullOr lib.types.ints.unsigned; + default = null; + description = "Optional MED to attach to VIP announcements sent to this peer."; + }; + + communities = lib.mkOption { + type = lib.types.listOf lib.types.str; + default = [ ]; + description = "Optional standard BGP communities to attach to VIP announcements sent to this peer."; + example = [ "65001:100" "65001:200" ]; + }; + + bfd = { + enable = lib.mkEnableOption "single-hop BFD for this BGP peer"; + + desiredMinTxMillis = lib.mkOption { + type = lib.types.ints.positive; + default = 300; + description = "Desired BFD transmit interval in milliseconds."; + }; + + requiredMinRxMillis = lib.mkOption { + type = lib.types.ints.positive; + default = 300; + description = "Required BFD receive interval in milliseconds."; + }; + + detectMultiplier = lib.mkOption { + type = lib.types.ints.positive; + default = 3; + description = "BFD detection multiplier."; + }; + + bootstrapTimeoutSecs = lib.mkOption { + type = lib.types.ints.positive; + default = 10; + description = "How long FiberLB waits for the BFD session to reach Up after BGP establishment."; + }; + }; }; }; @@ -46,6 +87,8 @@ let }; vip_advertisement = { interval_secs = cfg.vipCheckIntervalSecs; + drain_file = cfg.vipDrain.filePath; + drain_hold_time_secs = cfg.vipDrain.holdTimeSecs; }; vip_ownership = { enabled = cfg.vipOwnership.enable; @@ -66,6 +109,18 @@ let peers = map (peer: { inherit (peer) address port asn description; + export_policy = { + inherit (peer) communities; + } // lib.optionalAttrs (peer.med != null) { + med = peer.med; + }; + bfd = { + enabled = peer.bfd.enable; + desired_min_tx_millis = peer.bfd.desiredMinTxMillis; + required_min_rx_millis = peer.bfd.requiredMinRxMillis; + detect_multiplier = peer.bfd.detectMultiplier; + bootstrap_timeout_secs = peer.bfd.bootstrapTimeoutSecs; + }; }) cfg.bgp.peers; } @@ -169,6 +224,20 @@ in }; }; + vipDrain = { + filePath = lib.mkOption { + type = lib.types.str; + default = "/var/lib/fiberlb/drain"; + description = "Presence of this file puts FiberLB into node drain mode."; + }; + + holdTimeSecs = lib.mkOption { + type = lib.types.ints.unsigned; + default = 5; + description = "How long FiberLB keeps a locally owned VIP after withdrawing it for drain."; + }; + }; + bgp = { enable = lib.mkEnableOption "FiberLB native BGP VIP advertisement"; diff --git a/nix/tests/fiberlb-native-bgp-ecmp-drain-vm-smoke.nix b/nix/tests/fiberlb-native-bgp-ecmp-drain-vm-smoke.nix new file mode 100644 index 0000000..6bb5b87 --- /dev/null +++ b/nix/tests/fiberlb-native-bgp-ecmp-drain-vm-smoke.nix @@ -0,0 +1,745 @@ +{ + pkgs, + photoncloudPackages, + photoncloudModule, + nixNosModule, +}: + +let + edgeZebraConfig = pkgs.writeText "fiberlb-ecmp-edge-zebra.conf" '' + hostname edge-zebra + log stdout debugging + ''; + edgeBgpdConfig = pkgs.writeText "fiberlb-ecmp-edge-bgpd.conf" '' + hostname edge-frr + log stdout debugging + + router bgp 65020 + bgp router-id 192.168.100.1 + no bgp ebgp-requires-policy + bgp bestpath as-path multipath-relax + neighbor 192.168.100.2 remote-as 65010 + neighbor 192.168.100.2 description fiberlb-a + neighbor 192.168.100.3 remote-as 65010 + neighbor 192.168.100.3 description fiberlb-b + ! + address-family ipv4 unicast + maximum-paths 8 + neighbor 192.168.100.2 activate + neighbor 192.168.100.3 activate + exit-address-family + ! + ''; + iamProtoDir = ../../iam/proto; + iamProto = "iam.proto"; + fiberlbProtoDir = ../../fiberlb/crates/fiberlb-api/proto; + fiberlbProto = "fiberlb.proto"; + backendScriptA = pkgs.writeText "fiberlb-ecmp-backend-a.py" '' + from http.server import BaseHTTPRequestHandler, HTTPServer + + + class Handler(BaseHTTPRequestHandler): + def do_GET(self): + body = b"fiberlb ecmp backend a\n" + self.send_response(200) + self.send_header("Content-Type", "text/plain; charset=utf-8") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + def log_message(self, format, *args): + return + + + HTTPServer(("127.0.0.1", 18081), Handler).serve_forever() + ''; + backendScriptB = pkgs.writeText "fiberlb-ecmp-backend-b.py" '' + from http.server import BaseHTTPRequestHandler, HTTPServer + + + class Handler(BaseHTTPRequestHandler): + def do_GET(self): + body = b"fiberlb ecmp backend b\n" + self.send_response(200) + self.send_header("Content-Type", "text/plain; charset=utf-8") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + def log_message(self, format, *args): + return + + + HTTPServer(("127.0.0.1", 18081), Handler).serve_forever() + ''; +in +{ + name = "fiberlb-native-bgp-ecmp-drain-vm-smoke"; + + nodes = { + edge = + { ... }: + { + networking.hostName = "edge"; + networking.useDHCP = false; + networking.firewall.enable = false; + virtualisation.vlans = [ 1 ]; + networking.interfaces.eth1.ipv4.addresses = [ + { + address = "192.168.100.1"; + prefixLength = 24; + } + ]; + + environment.systemPackages = with pkgs; [ + curl + frr + iproute2 + jq + ]; + + users.groups.frr = { }; + users.groups.frrvty = { }; + users.users.frr = { + isSystemUser = true; + group = "frr"; + extraGroups = [ "frrvty" ]; + }; + users.users.root.extraGroups = [ "frrvty" ]; + + systemd.services.frr-zebra = { + description = "FRR zebra for FiberLB ECMP smoke"; + wantedBy = [ "multi-user.target" ]; + after = [ "network.target" ]; + serviceConfig = { + Type = "simple"; + RuntimeDirectory = "frr"; + RuntimeDirectoryMode = "0755"; + ExecStartPre = "${pkgs.runtimeShell} -lc '${pkgs.coreutils}/bin/install -d -o root -g root /run/frr /var/run/frr && ${pkgs.coreutils}/bin/rm -f /run/frr/zebra.pid /var/run/frr/zebra.pid'"; + ExecStart = "${pkgs.frr}/libexec/frr/zebra -f ${edgeZebraConfig} -A 127.0.0.1 -P 2601 -i /run/frr/zebra.pid -z /run/frr/zserv.api -u root -g root --log stdout"; + Restart = "on-failure"; + RestartSec = "2s"; + }; + }; + + systemd.services.frr-bgpd = { + description = "FRR bgpd for FiberLB ECMP smoke"; + wantedBy = [ "multi-user.target" ]; + after = [ "network.target" "frr-zebra.service" ]; + requires = [ "frr-zebra.service" ]; + serviceConfig = { + Type = "simple"; + RuntimeDirectory = "frr"; + RuntimeDirectoryMode = "0755"; + ExecStartPre = "${pkgs.runtimeShell} -lc '${pkgs.coreutils}/bin/install -d -o root -g root /run/frr /var/run/frr && ${pkgs.coreutils}/bin/rm -f /run/frr/bgpd.pid /var/run/frr/bgpd.pid && for _ in $(seq 1 30); do [ -S /run/frr/zserv.api ] && exit 0; sleep 1; done; echo zserv socket did not appear >&2; exit 1'"; + ExecStart = "${pkgs.frr}/libexec/frr/bgpd -f ${edgeBgpdConfig} -A 127.0.0.1 -P 2605 -p 179 -i /run/frr/bgpd.pid -z /run/frr/zserv.api -S --log stdout"; + Restart = "on-failure"; + RestartSec = "2s"; + }; + }; + + system.stateVersion = "24.11"; + }; + + lb_a = + { ... }: + { + imports = [ + nixNosModule + photoncloudModule + ]; + + networking.hostName = "lb-a"; + networking.useDHCP = false; + networking.firewall.enable = false; + virtualisation.vlans = [ 1 ]; + networking.interfaces.eth1.ipv4.addresses = [ + { + address = "192.168.100.2"; + prefixLength = 24; + } + ]; + + environment.systemPackages = with pkgs; [ + curl + grpcurl + jq + python3 + ]; + + services.iam = { + enable = true; + package = photoncloudPackages.iam-server; + port = 50080; + httpPort = 8083; + storeBackend = "memory"; + }; + + systemd.services.iam.environment = { + IAM_ALLOW_RANDOM_SIGNING_KEY = "1"; + }; + + services.fiberlb = { + enable = true; + package = photoncloudPackages.fiberlb-server; + port = 50085; + iamAddr = "192.168.100.2:50080"; + metadataBackend = "sqlite"; + databaseUrl = "sqlite:/var/lib/fiberlb/metadata.db"; + singleNode = true; + healthCheckIntervalSecs = 1; + healthCheckTimeoutSecs = 1; + vipCheckIntervalSecs = 1; + vipDrain.holdTimeSecs = 3; + vipOwnership = { + enable = true; + interface = "lo"; + }; + bgp = { + enable = true; + localAs = 65010; + routerId = "192.168.100.2"; + nextHop = "192.168.100.2"; + holdTimeSecs = 30; + keepaliveSecs = 10; + peers = [ + { + address = "192.168.100.1"; + port = 179; + asn = 65020; + description = "edge"; + } + ]; + }; + }; + + systemd.services.mock-backend = { + description = "FiberLB ECMP backend A"; + wantedBy = [ "multi-user.target" ]; + after = [ "network.target" ]; + serviceConfig = { + Type = "simple"; + ExecStart = "${pkgs.python3}/bin/python ${backendScriptA}"; + Restart = "always"; + RestartSec = "1s"; + }; + }; + + system.stateVersion = "24.11"; + }; + + lb_b = + { ... }: + { + imports = [ + nixNosModule + photoncloudModule + ]; + + networking.hostName = "lb-b"; + networking.useDHCP = false; + networking.firewall.enable = false; + virtualisation.vlans = [ 1 ]; + networking.interfaces.eth1.ipv4.addresses = [ + { + address = "192.168.100.3"; + prefixLength = 24; + } + ]; + + environment.systemPackages = with pkgs; [ + curl + grpcurl + jq + python3 + ]; + + services.iam = { + enable = true; + package = photoncloudPackages.iam-server; + port = 50080; + httpPort = 8083; + storeBackend = "memory"; + }; + + systemd.services.iam.environment = { + IAM_ALLOW_RANDOM_SIGNING_KEY = "1"; + }; + + services.fiberlb = { + enable = true; + package = photoncloudPackages.fiberlb-server; + port = 50085; + iamAddr = "192.168.100.3:50080"; + metadataBackend = "sqlite"; + databaseUrl = "sqlite:/var/lib/fiberlb/metadata.db"; + singleNode = true; + healthCheckIntervalSecs = 1; + healthCheckTimeoutSecs = 1; + vipCheckIntervalSecs = 1; + vipDrain.holdTimeSecs = 3; + vipOwnership = { + enable = true; + interface = "lo"; + }; + bgp = { + enable = true; + localAs = 65010; + routerId = "192.168.100.3"; + nextHop = "192.168.100.3"; + holdTimeSecs = 30; + keepaliveSecs = 10; + peers = [ + { + address = "192.168.100.1"; + port = 179; + asn = 65020; + description = "edge"; + } + ]; + }; + }; + + systemd.services.mock-backend = { + description = "FiberLB ECMP backend B"; + wantedBy = [ "multi-user.target" ]; + after = [ "network.target" ]; + serviceConfig = { + Type = "simple"; + ExecStart = "${pkgs.python3}/bin/python ${backendScriptB}"; + Restart = "always"; + RestartSec = "1s"; + }; + }; + + system.stateVersion = "24.11"; + }; + }; + + testScript = '' + import json + import re + import shlex + import time + + IAM_PROTO_DIR = "${iamProtoDir}" + IAM_PROTO = "${iamProto}" + FIBERLB_PROTO_DIR = "${fiberlbProtoDir}" + FIBERLB_PROTO = "${fiberlbProto}" + METRIC_RE = re.compile(r"^([a-zA-Z_:][a-zA-Z0-9_:]*)(?:\{([^}]*)\})?\s+([-+0-9.eE]+)$") + VIP = "203.0.113.77" + VIP_PREFIX = f"{VIP}/32" + LISTENER_URL = f"http://{VIP}:18080/" + + def grpcurl_json(machine, endpoint, import_path, proto, service, payload, headers=None): + header_args = "" + for header in headers or []: + header_args += f" -H {shlex.quote(header)}" + command = ( + f"grpcurl -plaintext{header_args} " + f"-import-path {shlex.quote(import_path)} " + f"-proto {shlex.quote(proto)} " + f"-d {shlex.quote(json.dumps(payload))} " + f"{shlex.quote(endpoint)} {shlex.quote(service)}" + ) + status, output = machine.execute(f"timeout 15 sh -lc {shlex.quote(command + ' 2>&1')}") + if status != 0: + raise AssertionError( + "grpcurl failed" + f" service={service}" + f" status={status}" + f" payload={json.dumps(payload, sort_keys=True)}" + f" output={output}" + ) + return json.loads(output) + + def issue_project_admin_token(machine, org_id, project_id): + principal_id = f"fiberlb-ecmp-{machine.name}-{int(time.time())}" + deadline = time.time() + 120 + + def retry(action): + last_error = None + while time.time() < deadline: + try: + return action() + except Exception as exc: + last_error = exc + time.sleep(2) + raise AssertionError(f"IAM bootstrap timed out: {last_error}") + + retry(lambda: grpcurl_json( + machine, + "127.0.0.1:50080", + IAM_PROTO_DIR, + IAM_PROTO, + "iam.v1.IamAdmin/CreatePrincipal", + { + "id": principal_id, + "kind": "PRINCIPAL_KIND_SERVICE_ACCOUNT", + "name": principal_id, + "orgId": org_id, + "projectId": project_id, + }, + )) + retry(lambda: grpcurl_json( + machine, + "127.0.0.1:50080", + IAM_PROTO_DIR, + IAM_PROTO, + "iam.v1.IamAdmin/CreateBinding", + { + "principal": { + "kind": "PRINCIPAL_KIND_SERVICE_ACCOUNT", + "id": principal_id, + }, + "role": "roles/ProjectAdmin", + "scope": { + "project": { + "id": project_id, + "orgId": org_id, + } + }, + }, + )) + token_response = retry(lambda: grpcurl_json( + machine, + "127.0.0.1:50080", + IAM_PROTO_DIR, + IAM_PROTO, + "iam.v1.IamToken/IssueToken", + { + "principalId": principal_id, + "principalKind": "PRINCIPAL_KIND_SERVICE_ACCOUNT", + "scope": { + "project": { + "id": project_id, + "orgId": org_id, + } + }, + "ttlSeconds": 3600, + }, + )) + return token_response["token"] + + def create_load_balancer(machine, token, name_suffix): + response = grpcurl_json( + machine, + "127.0.0.1:50085", + FIBERLB_PROTO_DIR, + FIBERLB_PROTO, + "fiberlb.v1.LoadBalancerService/CreateLoadBalancer", + { + "name": f"bgp-ecmp-{name_suffix}", + "orgId": "bgp-ecmp-org", + "projectId": "bgp-ecmp-project", + "description": f"native bgp ecmp {name_suffix}", + "vipAddress": VIP, + }, + headers=[f"authorization: Bearer {token}"], + ) + lb_id = response["loadbalancer"]["id"] + pool_id = grpcurl_json( + machine, + "127.0.0.1:50085", + FIBERLB_PROTO_DIR, + FIBERLB_PROTO, + "fiberlb.v1.PoolService/CreatePool", + { + "name": f"bgp-ecmp-pool-{name_suffix}", + "loadbalancerId": lb_id, + "algorithm": "POOL_ALGORITHM_ROUND_ROBIN", + "protocol": "POOL_PROTOCOL_TCP", + }, + headers=[f"authorization: Bearer {token}"], + )["pool"]["id"] + backend_id = grpcurl_json( + machine, + "127.0.0.1:50085", + FIBERLB_PROTO_DIR, + FIBERLB_PROTO, + "fiberlb.v1.BackendService/CreateBackend", + { + "name": f"bgp-ecmp-backend-{name_suffix}", + "poolId": pool_id, + "address": "127.0.0.1", + "port": 18081, + "weight": 1, + }, + headers=[f"authorization: Bearer {token}"], + )["backend"]["id"] + grpcurl_json( + machine, + "127.0.0.1:50085", + FIBERLB_PROTO_DIR, + FIBERLB_PROTO, + "fiberlb.v1.HealthCheckService/CreateHealthCheck", + { + "name": f"bgp-ecmp-health-{name_suffix}", + "poolId": pool_id, + "type": "HEALTH_CHECK_TYPE_HTTP", + "intervalSeconds": 1, + "timeoutSeconds": 1, + "healthyThreshold": 1, + "unhealthyThreshold": 1, + "httpConfig": { + "method": "GET", + "path": "/", + "expectedCodes": [200], + }, + }, + headers=[f"authorization: Bearer {token}"], + ) + grpcurl_json( + machine, + "127.0.0.1:50085", + FIBERLB_PROTO_DIR, + FIBERLB_PROTO, + "fiberlb.v1.ListenerService/CreateListener", + { + "name": f"bgp-ecmp-listener-{name_suffix}", + "loadbalancerId": lb_id, + "protocol": "LISTENER_PROTOCOL_TCP", + "port": 18080, + "defaultPoolId": pool_id, + }, + headers=[f"authorization: Bearer {token}"], + ) + return backend_id + + def wait_for_backend_status(machine, status, backend_id, token): + machine.wait_until_succeeds( + "grpcurl -plaintext " + f"-H {shlex.quote('authorization: Bearer ' + token)} " + f"-import-path {shlex.quote(FIBERLB_PROTO_DIR)} " + f"-proto {shlex.quote(FIBERLB_PROTO)} " + f"-d {shlex.quote(json.dumps({'id': backend_id}))} " + "127.0.0.1:50085 fiberlb.v1.BackendService/GetBackend " + f"| jq -e {shlex.quote(f'.backend.status == \"{status}\"')}" + ) + + def machine_diagnostics(machine, unit): + metrics = machine.succeed("curl -fsS http://127.0.0.1:9098/metrics || true") + service_status = machine.succeed( + f"systemctl status {shlex.quote(unit)} --no-pager || true" + ) + journal = machine.succeed( + f"journalctl -u {shlex.quote(unit)} -n 200 --no-pager || true" + ) + return ( + f"metrics:\n{metrics}\n" + f"systemctl status:\n{service_status}\n" + f"journal:\n{journal}" + ) + + def edge_bgp_diagnostics(): + bgpd_status = edge.succeed("systemctl status frr-bgpd.service --no-pager || true") + bgpd_journal = edge.succeed("journalctl -u frr-bgpd.service -n 200 --no-pager || true") + bgp_summary = edge.succeed("vtysh -c 'show ip bgp summary' || true") + bgp_route = edge.succeed(f"vtysh -c 'show ip bgp {VIP_PREFIX}' || true") + zebra_route = edge.succeed(f"vtysh -c 'show ip route {VIP_PREFIX}' || true") + kernel_route = edge.succeed(f"ip route show {VIP_PREFIX} || true") + return ( + "edge frr-bgpd status:\n" + f"{bgpd_status}\n" + "edge frr-bgpd journal:\n" + f"{bgpd_journal}\n" + "edge BGP summary:\n" + f"{bgp_summary}\n" + f"edge BGP route {VIP_PREFIX}:\n" + f"{bgp_route}\n" + f"edge zebra route {VIP_PREFIX}:\n" + f"{zebra_route}\n" + f"edge kernel route {VIP_PREFIX}:\n" + f"{kernel_route}\n" + ) + + def wait_for_unit_or_dump(machine, unit): + deadline = time.time() + 120 + while time.time() < deadline: + status, output = machine.execute(f"systemctl is-active {shlex.quote(unit)}") + state = output.strip() + if status == 0 and state == "active": + return + if state == "failed": + raise AssertionError( + f"unit {unit} failed to start\n{machine_diagnostics(machine, unit)}" + ) + time.sleep(1) + + raise AssertionError( + f"unit {unit} did not become active before timeout\n{machine_diagnostics(machine, unit)}" + ) + + def wait_for_command_or_dump(machine, command, unit=None, timeout=120): + deadline = time.time() + timeout + last_output = "" + while time.time() < deadline: + status, output = machine.execute(f"sh -lc {shlex.quote(command + ' 2>&1')}") + last_output = output + if status == 0: + return + time.sleep(1) + + diagnostics = f"last command output:\n{last_output}\n" + if unit is not None: + diagnostics += machine_diagnostics(machine, unit) + diagnostics += f"socket state:\n{machine.succeed('ss -ltnp || true')}\n" + raise AssertionError( + f"command did not succeed before timeout: {command}\n{diagnostics}" + ) + + def parse_labels(label_blob): + if not label_blob: + return {} + labels = {} + for part in label_blob.split(","): + key, value = part.split("=", 1) + labels[key] = value.strip().strip('"') + return labels + + def wait_for_metric(machine, metric_name, expected_value, labels=None): + expected_labels = labels or {} + deadline = time.time() + 60 + last_exposition = "" + + while time.time() < deadline: + exposition = machine.succeed("curl -fsS http://127.0.0.1:9098/metrics") + last_exposition = exposition + for line in exposition.splitlines(): + line = line.strip() + if not line or line.startswith("#"): + continue + match = METRIC_RE.match(line) + if not match: + continue + name, label_blob, value = match.groups() + if name != metric_name: + continue + if parse_labels(label_blob) != expected_labels: + continue + if abs(float(value) - float(expected_value)) < 0.0001: + return + time.sleep(1) + + raise AssertionError( + f"metric {metric_name} with labels={expected_labels} did not reach {expected_value}\n" + f"last metrics scrape:\n{last_exposition}\n" + f"{machine_diagnostics(machine, 'fiberlb.service')}\n" + f"{edge_bgp_diagnostics()}" + ) + + def wait_for_local_vip(machine, present): + pattern = f"inet {VIP}/32" + if present: + machine.wait_until_succeeds( + f"ip -4 addr show dev lo | grep -F {shlex.quote(pattern)}" + ) + else: + deadline = time.time() + 60 + while time.time() < deadline: + output = machine.succeed("ip -4 addr show dev lo || true") + if pattern not in output: + return + time.sleep(1) + raise AssertionError(f"VIP {VIP} still present on loopback") + + def wait_for_edge_route(next_hops): + deadline = time.time() + 60 + last_output = "" + while time.time() < deadline: + output = edge.succeed(f"ip route show {shlex.quote(VIP_PREFIX)} || true") + last_output = output + if all(next_hop in output for next_hop in next_hops): + return + time.sleep(1) + raise AssertionError( + f"edge route for {VIP_PREFIX} did not contain nexthops {next_hops}\n" + f"last kernel route output:\n{last_output}\n" + f"{edge_bgp_diagnostics()}" + ) + + def wait_for_edge_route_absent(needle): + deadline = time.time() + 60 + last_output = "" + while time.time() < deadline: + output = edge.succeed(f"ip route show {shlex.quote(VIP_PREFIX)} || true") + last_output = output + if needle not in output: + return + time.sleep(1) + raise AssertionError( + f"edge route for {VIP_PREFIX} still contained {needle}\n" + f"last kernel route output:\n{last_output}\n" + f"{edge_bgp_diagnostics()}" + ) + + def wait_for_http_any(): + edge.wait_until_succeeds( + f"curl -fsS --max-time 5 {shlex.quote(LISTENER_URL)} | grep -E 'fiberlb ecmp backend (a|b)'" + ) + + start_all() + serial_stdout_off() + + wait_for_unit_or_dump(edge, "frr-zebra.service") + wait_for_command_or_dump(edge, "test -S /run/frr/zserv.api", "frr-zebra.service") + wait_for_unit_or_dump(edge, "frr-bgpd.service") + wait_for_command_or_dump( + edge, + "ss -ltnH '( sport = :179 )' | grep -q LISTEN", + "frr-bgpd.service", + ) + + for machine in [lb_a, lb_b]: + wait_for_unit_or_dump(machine, "iam.service") + wait_for_command_or_dump(machine, "ss -ltnH '( sport = :50080 )' | grep -q LISTEN", "iam.service") + wait_for_unit_or_dump(machine, "mock-backend.service") + wait_for_unit_or_dump(machine, "fiberlb.service") + wait_for_command_or_dump(machine, "ss -ltnH '( sport = :50085 )' | grep -q LISTEN", "fiberlb.service") + wait_for_command_or_dump(machine, "ss -ltnH '( sport = :9098 )' | grep -q LISTEN", "fiberlb.service") + + wait_for_command_or_dump( + edge, + "vtysh -c 'show ip bgp neighbor 192.168.100.2' | grep -F 'BGP state = Established'", + "frr-bgpd.service", + ) + wait_for_command_or_dump( + edge, + "vtysh -c 'show ip bgp neighbor 192.168.100.3' | grep -F 'BGP state = Established'", + "frr-bgpd.service", + ) + + token_a = issue_project_admin_token(lb_a, "bgp-ecmp-org", "bgp-ecmp-project") + token_b = issue_project_admin_token(lb_b, "bgp-ecmp-org", "bgp-ecmp-project") + backend_a = create_load_balancer(lb_a, token_a, "a") + backend_b = create_load_balancer(lb_b, token_b, "b") + + wait_for_backend_status(lb_a, "BACKEND_STATUS_ONLINE", backend_a, token_a) + wait_for_backend_status(lb_b, "BACKEND_STATUS_ONLINE", backend_b, token_b) + wait_for_metric(lb_a, "fiberlb_bgp_connected_peers", 1) + wait_for_metric(lb_b, "fiberlb_bgp_connected_peers", 1) + wait_for_local_vip(lb_a, True) + wait_for_local_vip(lb_b, True) + + wait_for_edge_route(["via 192.168.100.2", "via 192.168.100.3"]) + wait_for_http_any() + + lb_a.succeed("touch /var/lib/fiberlb/drain") + wait_for_metric(lb_a, "fiberlb_vip_drain_active", 1) + wait_for_edge_route(["via 192.168.100.3"]) + wait_for_edge_route_absent("via 192.168.100.2") + wait_for_local_vip(lb_a, True) + edge.wait_until_succeeds( + f"curl -fsS --max-time 5 {shlex.quote(LISTENER_URL)} | grep -F 'fiberlb ecmp backend b'" + ) + + time.sleep(4) + wait_for_local_vip(lb_a, False) + + lb_a.succeed("rm -f /var/lib/fiberlb/drain") + wait_for_metric(lb_a, "fiberlb_vip_drain_active", 0) + wait_for_local_vip(lb_a, True) + wait_for_edge_route(["via 192.168.100.2", "via 192.168.100.3"]) + wait_for_http_any() + ''; +} diff --git a/nix/tests/fiberlb-native-bgp-interop-vm-smoke.nix b/nix/tests/fiberlb-native-bgp-interop-vm-smoke.nix new file mode 100644 index 0000000..45957a4 --- /dev/null +++ b/nix/tests/fiberlb-native-bgp-interop-vm-smoke.nix @@ -0,0 +1,737 @@ +{ + pkgs, + photoncloudPackages, + photoncloudModule, + nixNosModule, +}: + +let + frrZebraConfig = pkgs.writeText "fiberlb-interop-frr-zebra.conf" '' + hostname interop-zebra + log stdout debugging + ''; + frrBgpdConfig = pkgs.writeText "fiberlb-interop-frr-bgpd.conf" '' + hostname interop-frr + log stdout debugging + + router bgp 65020 + bgp router-id 192.168.100.1 + no bgp ebgp-requires-policy + neighbor 192.168.100.2 remote-as 65010 + neighbor 192.168.100.2 description fiberlb-frr + ! + address-family ipv4 unicast + neighbor 192.168.100.2 activate + exit-address-family + ! + ''; + birdConfig = pkgs.writeText "fiberlb-interop-bird.conf" '' + router id 192.168.100.3; + + protocol device {} + + protocol kernel { + ipv4 { + import none; + export none; + }; + } + + protocol bgp fiberlb_peer { + local 192.168.100.3 as 65030; + neighbor 192.168.100.2 as 65010; + + ipv4 { + import all; + export none; + }; + } + ''; + + gobgpdConfig = pkgs.writeText "fiberlb-interop-gobgpd.json" (builtins.toJSON { + global = { + config = { + as = 65040; + router-id = "192.168.100.4"; + }; + }; + + neighbors = [ + { + config = { + neighbor-address = "192.168.100.2"; + peer-as = 65010; + description = "fiberlb-gobgp"; + }; + } + ]; + }); + + iamProtoDir = ../../iam/proto; + iamProto = "iam.proto"; + fiberlbProtoDir = ../../fiberlb/crates/fiberlb-api/proto; + fiberlbProto = "fiberlb.proto"; + backendScript = pkgs.writeText "fiberlb-interop-backend.py" '' + from http.server import BaseHTTPRequestHandler, HTTPServer + + + class Handler(BaseHTTPRequestHandler): + def do_GET(self): + body = b"fiberlb interop backend\n" + self.send_response(200) + self.send_header("Content-Type", "text/plain; charset=utf-8") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + def log_message(self, format, *args): + return + + + HTTPServer(("127.0.0.1", 18081), Handler).serve_forever() + ''; +in +{ + name = "fiberlb-native-bgp-interop-vm-smoke"; + + nodes = { + frr = + { ... }: + { + networking.hostName = "frr"; + networking.useDHCP = false; + networking.firewall.enable = false; + virtualisation.vlans = [ 1 ]; + networking.interfaces.eth1.ipv4.addresses = [ + { + address = "192.168.100.1"; + prefixLength = 24; + } + ]; + + environment.systemPackages = with pkgs; [ + curl + frr + jq + iproute2 + ]; + + users.groups.frr = { }; + users.groups.frrvty = { }; + users.users.frr = { + isSystemUser = true; + group = "frr"; + extraGroups = [ "frrvty" ]; + }; + users.users.root.extraGroups = [ "frrvty" ]; + + systemd.services.frr-zebra = { + description = "FRR zebra for FiberLB interop smoke"; + wantedBy = [ "multi-user.target" ]; + after = [ "network.target" ]; + serviceConfig = { + Type = "simple"; + RuntimeDirectory = "frr"; + RuntimeDirectoryMode = "0755"; + ExecStartPre = "${pkgs.runtimeShell} -lc '${pkgs.coreutils}/bin/install -d -o root -g root /run/frr /var/run/frr && ${pkgs.coreutils}/bin/rm -f /run/frr/zebra.pid /var/run/frr/zebra.pid'"; + ExecStart = "${pkgs.frr}/libexec/frr/zebra -f ${frrZebraConfig} -A 127.0.0.1 -P 2601 -i /run/frr/zebra.pid -z /run/frr/zserv.api -u root -g root --log stdout"; + Restart = "on-failure"; + RestartSec = "2s"; + }; + }; + + systemd.services.frr-bgpd = { + description = "FRR bgpd for FiberLB interop smoke"; + wantedBy = [ "multi-user.target" ]; + after = [ "network.target" "frr-zebra.service" ]; + requires = [ "frr-zebra.service" ]; + serviceConfig = { + Type = "simple"; + RuntimeDirectory = "frr"; + RuntimeDirectoryMode = "0755"; + ExecStartPre = "${pkgs.runtimeShell} -lc '${pkgs.coreutils}/bin/install -d -o root -g root /run/frr /var/run/frr && ${pkgs.coreutils}/bin/rm -f /run/frr/bgpd.pid /var/run/frr/bgpd.pid && for _ in $(seq 1 30); do [ -S /run/frr/zserv.api ] && exit 0; sleep 1; done; echo zserv socket did not appear >&2; exit 1'"; + ExecStart = "${pkgs.frr}/libexec/frr/bgpd -f ${frrBgpdConfig} -A 127.0.0.1 -P 2605 -p 179 -i /run/frr/bgpd.pid -z /run/frr/zserv.api -S --log stdout"; + Restart = "on-failure"; + RestartSec = "2s"; + }; + }; + + system.stateVersion = "24.11"; + }; + + bird = + { ... }: + { + networking.hostName = "bird"; + networking.useDHCP = false; + networking.firewall.enable = false; + virtualisation.vlans = [ 1 ]; + networking.interfaces.eth1.ipv4.addresses = [ + { + address = "192.168.100.3"; + prefixLength = 24; + } + ]; + + environment.systemPackages = with pkgs; [ + bird2 + jq + ]; + + systemd.services.bird-peer = { + description = "BIRD peer for FiberLB interop smoke"; + wantedBy = [ "multi-user.target" ]; + after = [ "network.target" ]; + serviceConfig = { + Type = "simple"; + ExecStart = "${pkgs.bird2}/bin/bird -f -c ${birdConfig} -s /run/bird.ctl"; + Restart = "on-failure"; + RestartSec = "2s"; + }; + }; + + system.stateVersion = "24.11"; + }; + + gobgp = + { ... }: + { + networking.hostName = "gobgp"; + networking.useDHCP = false; + networking.firewall.enable = false; + virtualisation.vlans = [ 1 ]; + networking.interfaces.eth1.ipv4.addresses = [ + { + address = "192.168.100.4"; + prefixLength = 24; + } + ]; + + environment.systemPackages = with pkgs; [ + gobgp + gobgpd + jq + ]; + + systemd.services.gobgpd-peer = { + description = "GoBGP peer for FiberLB interop smoke"; + wantedBy = [ "multi-user.target" ]; + after = [ "network.target" ]; + serviceConfig = { + Type = "simple"; + ExecStart = "${pkgs.gobgpd}/bin/gobgpd -t json -f ${gobgpdConfig} --api-hosts 127.0.0.1:50051 -p"; + Restart = "on-failure"; + RestartSec = "2s"; + }; + }; + + system.stateVersion = "24.11"; + }; + + lb = + { ... }: + { + imports = [ + nixNosModule + photoncloudModule + ]; + + networking.hostName = "lb"; + networking.useDHCP = false; + networking.firewall.enable = false; + virtualisation.vlans = [ 1 ]; + networking.interfaces.eth1.ipv4.addresses = [ + { + address = "192.168.100.2"; + prefixLength = 24; + } + ]; + + environment.systemPackages = with pkgs; [ + curl + grpcurl + jq + python3 + ]; + + services.iam = { + enable = true; + package = photoncloudPackages.iam-server; + port = 50080; + httpPort = 8083; + storeBackend = "memory"; + }; + + systemd.services.iam.environment = { + IAM_ALLOW_RANDOM_SIGNING_KEY = "1"; + }; + + services.fiberlb = { + enable = true; + package = photoncloudPackages.fiberlb-server; + port = 50085; + iamAddr = "192.168.100.2:50080"; + metadataBackend = "sqlite"; + databaseUrl = "sqlite:/var/lib/fiberlb/metadata.db"; + singleNode = true; + healthCheckIntervalSecs = 1; + healthCheckTimeoutSecs = 1; + vipCheckIntervalSecs = 1; + vipOwnership = { + enable = true; + interface = "lo"; + }; + bgp = { + enable = true; + localAs = 65010; + routerId = "192.168.100.2"; + nextHop = "192.168.100.2"; + holdTimeSecs = 9; + keepaliveSecs = 3; + peers = [ + { + address = "192.168.100.1"; + port = 179; + asn = 65020; + description = "frr-peer"; + med = 10; + communities = [ "65010:101" ]; + } + { + address = "192.168.100.3"; + port = 179; + asn = 65030; + description = "bird-peer"; + med = 20; + communities = [ "65010:202" ]; + } + { + address = "192.168.100.4"; + port = 179; + asn = 65040; + description = "gobgp-peer"; + med = 30; + communities = [ "65010:303" ]; + } + ]; + }; + }; + + systemd.services.mock-backend = { + description = "FiberLB interop backend"; + wantedBy = [ "multi-user.target" ]; + after = [ "network.target" ]; + serviceConfig = { + Type = "simple"; + ExecStart = "${pkgs.python3}/bin/python ${backendScript}"; + Restart = "always"; + RestartSec = "1s"; + }; + }; + + system.stateVersion = "24.11"; + }; + }; + + testScript = '' + import json + import re + import shlex + import time + + IAM_PROTO_DIR = "${iamProtoDir}" + IAM_PROTO = "${iamProto}" + FIBERLB_PROTO_DIR = "${fiberlbProtoDir}" + FIBERLB_PROTO = "${fiberlbProto}" + METRIC_RE = re.compile(r"^([a-zA-Z_:][a-zA-Z0-9_:]*)(?:\{([^}]*)\})?\s+([-+0-9.eE]+)$") + + def grpcurl_json(machine, endpoint, import_path, proto, service, payload, headers=None): + header_args = "" + for header in headers or []: + header_args += f" -H {shlex.quote(header)}" + command = ( + f"grpcurl -plaintext{header_args} " + f"-import-path {shlex.quote(import_path)} " + f"-proto {shlex.quote(proto)} " + f"-d {shlex.quote(json.dumps(payload))} " + f"{shlex.quote(endpoint)} {shlex.quote(service)}" + ) + status, output = machine.execute(f"timeout 15 sh -lc {shlex.quote(command + ' 2>&1')}") + if status != 0: + raise AssertionError( + "grpcurl failed" + f" service={service}" + f" status={status}" + f" payload={json.dumps(payload, sort_keys=True)}" + f" output={output}" + ) + return json.loads(output) + + def issue_project_admin_token(machine, org_id, project_id): + principal_id = f"fiberlb-interop-{int(time.time())}" + deadline = time.time() + 120 + + def retry(action): + last_error = None + while time.time() < deadline: + try: + return action() + except Exception as exc: + last_error = exc + time.sleep(2) + raise AssertionError(f"IAM bootstrap timed out: {last_error}") + + retry(lambda: grpcurl_json( + machine, + "127.0.0.1:50080", + IAM_PROTO_DIR, + IAM_PROTO, + "iam.v1.IamAdmin/CreatePrincipal", + { + "id": principal_id, + "kind": "PRINCIPAL_KIND_SERVICE_ACCOUNT", + "name": principal_id, + "orgId": org_id, + "projectId": project_id, + }, + )) + retry(lambda: grpcurl_json( + machine, + "127.0.0.1:50080", + IAM_PROTO_DIR, + IAM_PROTO, + "iam.v1.IamAdmin/CreateBinding", + { + "principal": { + "kind": "PRINCIPAL_KIND_SERVICE_ACCOUNT", + "id": principal_id, + }, + "role": "roles/ProjectAdmin", + "scope": { + "project": { + "id": project_id, + "orgId": org_id, + } + }, + }, + )) + token_response = retry(lambda: grpcurl_json( + machine, + "127.0.0.1:50080", + IAM_PROTO_DIR, + IAM_PROTO, + "iam.v1.IamToken/IssueToken", + { + "principalId": principal_id, + "principalKind": "PRINCIPAL_KIND_SERVICE_ACCOUNT", + "scope": { + "project": { + "id": project_id, + "orgId": org_id, + } + }, + "ttlSeconds": 3600, + }, + )) + return token_response["token"] + + def wait_for_backend_status(status, backend_id, token): + lb.wait_until_succeeds( + "grpcurl -plaintext " + f"-H {shlex.quote('authorization: Bearer ' + token)} " + f"-import-path {shlex.quote(FIBERLB_PROTO_DIR)} " + f"-proto {shlex.quote(FIBERLB_PROTO)} " + f"-d {shlex.quote(json.dumps({'id': backend_id}))} " + "127.0.0.1:50085 fiberlb.v1.BackendService/GetBackend " + f"| jq -e {shlex.quote(f'.backend.status == \"{status}\"')}" + ) + + def parse_labels(label_blob): + if not label_blob: + return {} + labels = {} + for part in label_blob.split(","): + key, value = part.split("=", 1) + labels[key] = value.strip().strip('"') + return labels + + def fiberlb_diagnostics(): + metrics = lb.succeed("curl -fsS http://127.0.0.1:9098/metrics || true") + journal = lb.succeed("journalctl -u fiberlb.service -n 200 --no-pager || true") + return ( + "fiberlb metrics:\n" + f"{metrics}\n" + "fiberlb journal:\n" + f"{journal}" + ) + + def wait_for_metric(metric_name, expected_value, labels=None): + expected_labels = labels or {} + deadline = time.time() + 60 + last_exposition = "" + + while time.time() < deadline: + exposition = lb.succeed("curl -fsS http://127.0.0.1:9098/metrics") + last_exposition = exposition + for line in exposition.splitlines(): + line = line.strip() + if not line or line.startswith("#"): + continue + match = METRIC_RE.match(line) + if not match: + continue + name, label_blob, value = match.groups() + if name != metric_name: + continue + if parse_labels(label_blob) != expected_labels: + continue + if abs(float(value) - float(expected_value)) < 0.0001: + return + time.sleep(1) + + raise AssertionError( + f"metric {metric_name} with labels={expected_labels} did not reach {expected_value}\n" + f"last metrics scrape:\n{last_exposition}\n" + f"{fiberlb_diagnostics()}" + ) + + def wait_for_local_vip(vip): + lb.wait_until_succeeds(f"ip -4 addr show dev lo | grep -F {shlex.quote('inet ' + vip + '/32')}") + + def wait_for_gobgp_route(prefix, present): + command = "gobgp -u 127.0.0.1 -p 50051 global rib || true" + if present: + gobgp.wait_until_succeeds(f"{command} | grep -F {shlex.quote(prefix)}") + else: + deadline = time.time() + 60 + while time.time() < deadline: + output = gobgp.succeed(command) + if prefix not in output: + return + time.sleep(1) + raise AssertionError(f"route {prefix} still present in GoBGP RIB") + + def wait_for_bird_route(prefix): + bird.wait_until_succeeds( + f"birdc -s /run/bird.ctl show route for {shlex.quote(prefix)} all | grep -F {shlex.quote(prefix)}" + ) + + def wait_for_frr_route(prefix): + frr.wait_until_succeeds( + f"vtysh -c {shlex.quote('show ip bgp ' + prefix)} | grep -F {shlex.quote(prefix)}" + ) + + def wait_for_unit_or_dump(machine, unit): + deadline = time.time() + 120 + while time.time() < deadline: + status, output = machine.execute(f"systemctl is-active {shlex.quote(unit)}") + state = output.strip() + if status == 0 and state == "active": + return + if state == "failed": + service_status = machine.succeed( + f"systemctl status {shlex.quote(unit)} --no-pager || true" + ) + journal = machine.succeed( + f"journalctl -u {shlex.quote(unit)} -n 200 --no-pager || true" + ) + raise AssertionError( + f"unit {unit} failed to start\n" + f"systemctl status:\n{service_status}\n" + f"journal:\n{journal}" + ) + time.sleep(1) + + service_status = machine.succeed( + f"systemctl status {shlex.quote(unit)} --no-pager || true" + ) + journal = machine.succeed( + f"journalctl -u {shlex.quote(unit)} -n 200 --no-pager || true" + ) + raise AssertionError( + f"unit {unit} did not become active before timeout\n" + f"systemctl status:\n{service_status}\n" + f"journal:\n{journal}" + ) + + def wait_for_command_or_dump(machine, command, unit=None, timeout=120): + deadline = time.time() + timeout + last_output = "" + while time.time() < deadline: + status, output = machine.execute(f"sh -lc {shlex.quote(command + ' 2>&1')}") + last_output = output + if status == 0: + return + time.sleep(1) + + diagnostics = f"last command output:\n{last_output}\n" + if unit is not None: + diagnostics += ( + f"systemctl status:\n{machine.succeed(f'systemctl status {shlex.quote(unit)} --no-pager || true')}\n" + f"journal:\n{machine.succeed(f'journalctl -u {shlex.quote(unit)} -n 200 --no-pager || true')}\n" + ) + diagnostics += f"socket state:\n{machine.succeed('ss -ltnp || true')}\n" + raise AssertionError( + f"command did not succeed before timeout: {command}\n{diagnostics}" + ) + + start_all() + serial_stdout_off() + + wait_for_unit_or_dump(frr, "frr-zebra.service") + wait_for_command_or_dump(frr, "test -S /run/frr/zserv.api", "frr-zebra.service") + wait_for_unit_or_dump(frr, "frr-bgpd.service") + wait_for_command_or_dump( + frr, + "ss -ltnH '( sport = :179 )' | grep -q LISTEN", + "frr-bgpd.service", + ) + wait_for_unit_or_dump(bird, "bird-peer.service") + wait_for_unit_or_dump(gobgp, "gobgpd-peer.service") + wait_for_command_or_dump( + gobgp, + "ss -ltnH '( sport = :179 )' | grep -q LISTEN", + "gobgpd-peer.service", + ) + wait_for_unit_or_dump(lb, "iam.service") + wait_for_command_or_dump(lb, "ss -ltnH '( sport = :50080 )' | grep -q LISTEN", "iam.service") + wait_for_unit_or_dump(lb, "mock-backend.service") + wait_for_unit_or_dump(lb, "fiberlb.service") + wait_for_command_or_dump(lb, "ss -ltnH '( sport = :50085 )' | grep -q LISTEN", "fiberlb.service") + wait_for_command_or_dump(lb, "ss -ltnH '( sport = :9098 )' | grep -q LISTEN", "fiberlb.service") + + frr.wait_until_succeeds("vtysh -c 'show ip bgp neighbor 192.168.100.2' | grep -F 'BGP state = Established'") + bird.wait_until_succeeds("birdc -s /run/bird.ctl show protocols all fiberlb_peer | grep -F Established") + gobgp.wait_until_succeeds("gobgp -u 127.0.0.1 -p 50051 neighbor | grep -F 192.168.100.2") + wait_for_metric("fiberlb_bgp_configured_peers", 3) + wait_for_metric("fiberlb_bgp_peer_session_up", 1, {"peer": "192.168.100.1:179"}) + wait_for_metric("fiberlb_bgp_peer_session_up", 1, {"peer": "192.168.100.3:179"}) + wait_for_metric("fiberlb_bgp_peer_session_up", 1, {"peer": "192.168.100.4:179"}) + wait_for_metric("fiberlb_bgp_connected_peers", 3) + + token = issue_project_admin_token(lb, "bgp-interop-org", "bgp-interop-project") + + lb_response = grpcurl_json( + lb, + "127.0.0.1:50085", + FIBERLB_PROTO_DIR, + FIBERLB_PROTO, + "fiberlb.v1.LoadBalancerService/CreateLoadBalancer", + { + "name": "bgp-interop-lb", + "orgId": "bgp-interop-org", + "projectId": "bgp-interop-project", + "description": "native bgp interop smoke", + "vipAddress": "203.0.113.77", + }, + headers=[f"authorization: Bearer {token}"], + ) + loadbalancer = lb_response["loadbalancer"] + lb_id = loadbalancer["id"] + vip = loadbalancer["vipAddress"] + vip_prefix = f"{vip}/32" + + pool_id = grpcurl_json( + lb, + "127.0.0.1:50085", + FIBERLB_PROTO_DIR, + FIBERLB_PROTO, + "fiberlb.v1.PoolService/CreatePool", + { + "name": "bgp-interop-pool", + "loadbalancerId": lb_id, + "algorithm": "POOL_ALGORITHM_ROUND_ROBIN", + "protocol": "POOL_PROTOCOL_TCP", + }, + headers=[f"authorization: Bearer {token}"], + )["pool"]["id"] + + backend_id = grpcurl_json( + lb, + "127.0.0.1:50085", + FIBERLB_PROTO_DIR, + FIBERLB_PROTO, + "fiberlb.v1.BackendService/CreateBackend", + { + "name": "bgp-interop-backend", + "poolId": pool_id, + "address": "127.0.0.1", + "port": 18081, + "weight": 1, + }, + headers=[f"authorization: Bearer {token}"], + )["backend"]["id"] + + grpcurl_json( + lb, + "127.0.0.1:50085", + FIBERLB_PROTO_DIR, + FIBERLB_PROTO, + "fiberlb.v1.HealthCheckService/CreateHealthCheck", + { + "name": "bgp-interop-health", + "poolId": pool_id, + "type": "HEALTH_CHECK_TYPE_HTTP", + "intervalSeconds": 1, + "timeoutSeconds": 1, + "healthyThreshold": 1, + "unhealthyThreshold": 1, + "httpConfig": { + "method": "GET", + "path": "/", + "expectedCodes": [200], + }, + }, + headers=[f"authorization: Bearer {token}"], + ) + + grpcurl_json( + lb, + "127.0.0.1:50085", + FIBERLB_PROTO_DIR, + FIBERLB_PROTO, + "fiberlb.v1.ListenerService/CreateListener", + { + "name": "bgp-interop-listener", + "loadbalancerId": lb_id, + "protocol": "LISTENER_PROTOCOL_TCP", + "port": 18080, + "defaultPoolId": pool_id, + }, + headers=[f"authorization: Bearer {token}"], + ) + + wait_for_backend_status("BACKEND_STATUS_ONLINE", backend_id, token) + wait_for_local_vip(vip) + wait_for_metric("fiberlb_bgp_desired_routes", 1) + wait_for_frr_route(vip_prefix) + wait_for_bird_route(vip_prefix) + wait_for_gobgp_route(vip_prefix, True) + + frr.wait_until_succeeds( + "vtysh -c 'show ip bgp 203.0.113.77/32' | grep -F 'metric 10'" + ) + frr.wait_until_succeeds( + "vtysh -c 'show ip bgp 203.0.113.77/32' | grep -F 'Community: 65010:101'" + ) + bird.wait_until_succeeds( + "birdc -s /run/bird.ctl show route for 203.0.113.77/32 all | grep -F 'BGP.med: 20'" + ) + bird.wait_until_succeeds( + "birdc -s /run/bird.ctl show route for 203.0.113.77/32 all | grep -F 'BGP.community: (65010,202)'" + ) + + gobgp.succeed("systemctl stop gobgpd-peer.service") + wait_for_metric("fiberlb_bgp_connected_peers", 2) + wait_for_metric("fiberlb_bgp_peer_session_up", 0, {"peer": "192.168.100.4:179"}) + wait_for_frr_route(vip_prefix) + wait_for_bird_route(vip_prefix) + + gobgp.succeed("systemctl start gobgpd-peer.service") + wait_for_unit_or_dump(gobgp, "gobgpd-peer.service") + gobgp.wait_until_succeeds("gobgp -u 127.0.0.1 -p 50051 neighbor | grep -F 192.168.100.2") + wait_for_metric("fiberlb_bgp_connected_peers", 3) + wait_for_metric("fiberlb_bgp_peer_session_up", 1, {"peer": "192.168.100.4:179"}) + wait_for_gobgp_route(vip_prefix, True) + ''; +}