diff --git a/fiberlb/crates/fiberlb-server/src/bgp_client.rs b/fiberlb/crates/fiberlb-server/src/bgp_client.rs index a8cf511..65dda4f 100644 --- a/fiberlb/crates/fiberlb-server/src/bgp_client.rs +++ b/fiberlb/crates/fiberlb-server/src/bgp_client.rs @@ -32,6 +32,12 @@ const ATTR_TYPE_ORIGIN: u8 = 1; const ATTR_TYPE_AS_PATH: u8 = 2; const ATTR_TYPE_NEXT_HOP: u8 = 3; const AS_PATH_SEGMENT_SEQUENCE: u8 = 2; +const METRIC_BGP_CONFIGURED_PEERS: &str = "fiberlb_bgp_configured_peers"; +const METRIC_BGP_CONNECTED_PEERS: &str = "fiberlb_bgp_connected_peers"; +const METRIC_BGP_DESIRED_ROUTES: &str = "fiberlb_bgp_desired_routes"; +const METRIC_BGP_PEER_SESSION_UP: &str = "fiberlb_bgp_peer_session_up"; +const METRIC_BGP_SESSION_ESTABLISHED_TOTAL: &str = "fiberlb_bgp_session_established_total"; +const METRIC_BGP_SESSION_ENDS_TOTAL: &str = "fiberlb_bgp_session_ends_total"; /// Result type for BGP operations. pub type Result = std::result::Result; @@ -110,6 +116,13 @@ impl NativeBgpSpeaker { connected_sessions: AtomicUsize::new(0), }); + record_configured_peers(config.peers.len()); + record_connected_peers(&shared); + record_desired_routes(0); + for peer in &config.peers { + set_peer_session_up(peer, false); + } + for peer in config.peers.clone() { tokio::spawn(run_peer_loop(config.clone(), peer, shared.clone())); } @@ -120,9 +133,11 @@ impl NativeBgpSpeaker { async fn update_route(&self, prefix: Ipv4Addr, next_hop: Ipv4Addr) { let mut desired = self.shared.desired_routes.write().await; let changed = desired.insert(prefix, next_hop) != Some(next_hop); + let route_count = desired.len(); drop(desired); if changed { + record_desired_routes(route_count); publish_route_change(&self.shared); } } @@ -130,9 +145,11 @@ impl NativeBgpSpeaker { async fn remove_route(&self, prefix: Ipv4Addr) { let mut desired = self.shared.desired_routes.write().await; let changed = desired.remove(&prefix).is_some(); + let route_count = desired.len(); drop(desired); if changed { + record_desired_routes(route_count); publish_route_change(&self.shared); } } @@ -172,12 +189,29 @@ pub async fn create_bgp_client(config: BgpConfig) -> Result> async fn run_peer_loop(config: BgpConfig, peer: BgpPeerConfig, shared: Arc) { let peer_name = peer_name(&peer); + let peer_label = peer_metric_label(&peer); let connect_retry = Duration::from_secs(config.connect_retry_secs.max(1)); loop { match establish_peer_session(&config, &peer, shared.clone()).await { - Ok(()) => warn!(peer = %peer_name, "BGP peer session ended cleanly; reconnecting"), - Err(error) => warn!(peer = %peer_name, error = %error, "BGP peer session failed"), + Ok(()) => { + metrics::counter!( + METRIC_BGP_SESSION_ENDS_TOTAL, + "peer" => peer_label.clone(), + "result" => "clean", + ) + .increment(1); + warn!(peer = %peer_name, "BGP peer session ended cleanly; reconnecting"); + } + Err(error) => { + metrics::counter!( + METRIC_BGP_SESSION_ENDS_TOTAL, + "peer" => peer_label.clone(), + "result" => "error", + ) + .increment(1); + warn!(peer = %peer_name, error = %error, "BGP peer session failed"); + } } sleep(connect_retry).await; @@ -219,7 +253,14 @@ async fn establish_peer_session( "FiberLB BGP session established", ); + metrics::counter!( + METRIC_BGP_SESSION_ESTABLISHED_TOTAL, + "peer" => peer_metric_label(peer), + ) + .increment(1); + set_peer_session_up(peer, true); shared.connected_sessions.fetch_add(1, Ordering::Relaxed); + record_connected_peers(&shared); let session_result = run_established_session( stream, config, @@ -230,6 +271,8 @@ async fn establish_peer_session( ) .await; shared.connected_sessions.fetch_sub(1, Ordering::Relaxed); + record_connected_peers(&shared); + set_peer_session_up(peer, false); session_result } @@ -331,9 +374,6 @@ async fn run_established_session( } }); - let mut advertised = HashMap::new(); - reconcile_routes(&shared, &mut writer, &mut advertised, local_as).await?; - let mut keepalive = tokio::time::interval(keepalive_interval); keepalive.tick().await; @@ -341,6 +381,8 @@ async fn run_established_session( hold_monitor.tick().await; let mut route_updates = shared.route_updates.subscribe(); + let mut advertised = HashMap::new(); + reconcile_routes(&shared, &mut writer, &mut advertised, local_as).await?; let mut last_rx = Instant::now(); loop { @@ -537,6 +579,28 @@ fn peer_name(peer: &BgpPeerConfig) -> String { } } +fn peer_metric_label(peer: &BgpPeerConfig) -> String { + format!("{}:{}", peer.address, peer.port) +} + +fn record_configured_peers(count: usize) { + metrics::gauge!(METRIC_BGP_CONFIGURED_PEERS).set(count as f64); +} + +fn record_connected_peers(shared: &BgpSharedState) { + metrics::gauge!(METRIC_BGP_CONNECTED_PEERS) + .set(shared.connected_sessions.load(Ordering::Relaxed) as f64); +} + +fn record_desired_routes(count: usize) { + metrics::gauge!(METRIC_BGP_DESIRED_ROUTES).set(count as f64); +} + +fn set_peer_session_up(peer: &BgpPeerConfig, up: bool) { + metrics::gauge!(METRIC_BGP_PEER_SESSION_UP, "peer" => peer_metric_label(peer)) + .set(if up { 1.0 } else { 0.0 }); +} + fn negotiated_keepalive_interval( requested_keepalive_secs: u16, negotiated_hold_time_secs: u16, @@ -943,7 +1007,8 @@ mod tests { use super::*; use tokio::net::TcpListener; - use tokio::sync::oneshot; + use tokio::sync::{mpsc, oneshot}; + use tokio::time::timeout; #[tokio::test] async fn test_disabled_bgp_client_is_noop() { @@ -1065,6 +1130,188 @@ mod tests { peer_task.await.unwrap(); } + #[tokio::test] + async fn test_native_speaker_resyncs_routes_across_multiple_peers() { + let listener_a = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let listener_b = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let port_a = listener_a.local_addr().unwrap().port(); + let port_b = listener_b.local_addr().unwrap().port(); + + let (peer_a_events_tx, mut peer_a_events_rx) = mpsc::channel(4); + let (peer_b_events_tx, mut peer_b_events_rx) = mpsc::channel(4); + + let peer_a_task = tokio::spawn(async move { + let (mut first_socket, _) = listener_a.accept().await.unwrap(); + complete_test_peer_handshake( + &mut first_socket, + 65010, + "192.0.2.10".parse().unwrap(), + 65020, + "192.0.2.20".parse().unwrap(), + ) + .await; + + let first_announcement = expect_update(&mut first_socket).await; + assert_eq!( + first_announcement.announced_routes, + vec!["203.0.113.10".parse::().unwrap()] + ); + peer_a_events_tx.send("first-announcement").await.unwrap(); + drop(first_socket); + + let (mut second_socket, _) = listener_a.accept().await.unwrap(); + complete_test_peer_handshake( + &mut second_socket, + 65010, + "192.0.2.10".parse().unwrap(), + 65020, + "192.0.2.20".parse().unwrap(), + ) + .await; + + let second_announcement = expect_update(&mut second_socket).await; + assert_eq!( + second_announcement.announced_routes, + vec!["203.0.113.10".parse::().unwrap()] + ); + peer_a_events_tx.send("second-announcement").await.unwrap(); + + let withdrawal = expect_update(&mut second_socket).await; + assert_eq!( + withdrawal.withdrawn_routes, + vec!["203.0.113.10".parse::().unwrap()] + ); + peer_a_events_tx.send("withdrawal").await.unwrap(); + }); + + let peer_b_task = tokio::spawn(async move { + let (mut socket, _) = listener_b.accept().await.unwrap(); + complete_test_peer_handshake( + &mut socket, + 65010, + "192.0.2.10".parse().unwrap(), + 65030, + "192.0.2.30".parse().unwrap(), + ) + .await; + + let announcement = expect_update(&mut socket).await; + assert_eq!( + announcement.announced_routes, + vec!["203.0.113.10".parse::().unwrap()] + ); + peer_b_events_tx.send("announcement").await.unwrap(); + + let withdrawal = expect_update(&mut socket).await; + assert_eq!( + withdrawal.withdrawn_routes, + vec!["203.0.113.10".parse::().unwrap()] + ); + peer_b_events_tx.send("withdrawal").await.unwrap(); + }); + + let client = create_bgp_client(BgpConfig { + enabled: true, + local_as: 65010, + router_id: "192.0.2.10".to_string(), + connect_retry_secs: 1, + peers: vec![ + BgpPeerConfig { + address: "127.0.0.1".to_string(), + port: port_a, + asn: 65020, + description: "peer-a".to_string(), + }, + BgpPeerConfig { + address: "127.0.0.1".to_string(), + port: port_b, + asn: 65030, + description: "peer-b".to_string(), + }, + ], + ..BgpConfig::default() + }) + .await + .unwrap(); + + let vip: IpAddr = "203.0.113.10".parse().unwrap(); + let next_hop: IpAddr = "192.0.2.10".parse().unwrap(); + client.announce_route(vip, next_hop).await.unwrap(); + + assert_eq!( + timeout(Duration::from_secs(5), peer_a_events_rx.recv()) + .await + .unwrap() + .unwrap(), + "first-announcement" + ); + assert_eq!( + timeout(Duration::from_secs(5), peer_b_events_rx.recv()) + .await + .unwrap() + .unwrap(), + "announcement" + ); + assert_eq!( + timeout(Duration::from_secs(5), peer_a_events_rx.recv()) + .await + .unwrap() + .unwrap(), + "second-announcement" + ); + + client.withdraw_route(vip).await.unwrap(); + + assert_eq!( + timeout(Duration::from_secs(5), peer_b_events_rx.recv()) + .await + .unwrap() + .unwrap(), + "withdrawal" + ); + assert_eq!( + timeout(Duration::from_secs(5), peer_a_events_rx.recv()) + .await + .unwrap() + .unwrap(), + "withdrawal" + ); + + peer_a_task.await.unwrap(); + peer_b_task.await.unwrap(); + } + + async fn complete_test_peer_handshake( + socket: &mut TcpStream, + expected_local_as: u16, + expected_router_id: Ipv4Addr, + peer_as: u16, + peer_router_id: Ipv4Addr, + ) { + let open = match read_bgp_message(socket).await.unwrap() { + BgpMessage::Open(open) => open, + other => panic!("expected OPEN, got {:?}", other), + }; + assert_eq!(open.asn, expected_local_as); + assert_eq!(open.router_id, expected_router_id); + + send_open(socket, peer_as, 90, peer_router_id).await.unwrap(); + + match read_bgp_message(socket).await.unwrap() { + BgpMessage::Keepalive => {} + other => panic!("expected KEEPALIVE, got {:?}", other), + } + + send_keepalive(socket).await.unwrap(); + } + + async fn expect_update(socket: &mut TcpStream) -> UpdateMessage { + match read_bgp_message(socket).await.unwrap() { + BgpMessage::Update(update) => update, + other => panic!("expected UPDATE, got {:?}", other), + } + } + #[test] fn test_parse_update_message_extracts_routes() { let mut attrs = Vec::new(); diff --git a/fiberlb/crates/fiberlb-server/src/main.rs b/fiberlb/crates/fiberlb-server/src/main.rs index 48b7de1..e2a6140 100644 --- a/fiberlb/crates/fiberlb-server/src/main.rs +++ b/fiberlb/crates/fiberlb-server/src/main.rs @@ -132,6 +132,31 @@ async fn main() -> Result<(), Box> { metrics_addr ); + metrics::describe_gauge!( + "fiberlb_bgp_configured_peers", + "Number of BGP peers configured for the native FiberLB speaker" + ); + metrics::describe_gauge!( + "fiberlb_bgp_connected_peers", + "Number of BGP peer sessions currently established" + ); + metrics::describe_gauge!( + "fiberlb_bgp_desired_routes", + "Number of VIP routes FiberLB currently wants to advertise" + ); + metrics::describe_gauge!( + "fiberlb_bgp_peer_session_up", + "Per-peer BGP session state (1=established, 0=down)" + ); + metrics::describe_counter!( + "fiberlb_bgp_session_established_total", + "Total number of BGP peer sessions established" + ); + metrics::describe_counter!( + "fiberlb_bgp_session_ends_total", + "Total number of BGP peer session terminations by peer and result" + ); + if let Some(endpoint) = &config.chainfire_endpoint { tracing::info!(" Cluster coordination: ChainFire @ {}", endpoint); let endpoint = endpoint.clone(); diff --git a/fiberlb/crates/fiberlb-server/src/vip_manager.rs b/fiberlb/crates/fiberlb-server/src/vip_manager.rs index e95c24a..241fa32 100644 --- a/fiberlb/crates/fiberlb-server/src/vip_manager.rs +++ b/fiberlb/crates/fiberlb-server/src/vip_manager.rs @@ -8,7 +8,7 @@ use std::net::IpAddr; use std::sync::Arc; use std::time::Duration; -use tokio::sync::RwLock; +use tokio::sync::{watch, RwLock}; use tokio::time::sleep; use tracing::{debug, error, info, warn}; @@ -48,6 +48,8 @@ pub struct VipManager { vip_owner: Option>, /// Router's own IP address (used as BGP next hop) next_hop: IpAddr, + /// Shutdown signal for the background reconciliation task. + shutdown: watch::Sender, } impl VipManager { @@ -58,12 +60,14 @@ impl VipManager { next_hop: IpAddr, vip_owner: Option>, ) -> Self { + let (shutdown, _shutdown_rx) = watch::channel(false); Self { bgp, metadata, vip_state: Arc::new(RwLock::new(HashMap::new())), vip_owner, next_hop, + shutdown, } } @@ -80,13 +84,27 @@ impl VipManager { "VIP manager started (check interval: {}s)", check_interval.as_secs() ); + let mut shutdown_rx = self.shutdown.subscribe(); loop { + if *shutdown_rx.borrow() { + info!("VIP manager reconciliation loop stopping"); + break; + } + if let Err(e) = self.check_and_update_vips().await { error!("VIP manager check failed: {}", e); } - sleep(check_interval).await; + tokio::select! { + _ = sleep(check_interval) => {} + changed = shutdown_rx.changed() => { + if changed.is_err() || *shutdown_rx.borrow_and_update() { + info!("VIP manager reconciliation loop stopping"); + break; + } + } + } } }) } @@ -293,6 +311,7 @@ impl VipManager { /// Should be called during server shutdown to ensure clean BGP state pub async fn shutdown(&self) -> Result<(), Box> { info!("VIP manager shutting down, withdrawing all VIPs..."); + let _ = self.shutdown.send(true); let mut state = self.vip_state.write().await; let managed_vips: Vec = state.keys().copied().collect(); diff --git a/flake.nix b/flake.nix index d12d6c4..223e9af 100644 --- a/flake.nix +++ b/flake.nix @@ -927,6 +927,15 @@ } ); + fiberlb-native-bgp-multipath-vm-smoke = pkgs.testers.runNixOSTest ( + import ./nix/tests/fiberlb-native-bgp-multipath-vm-smoke.nix { + inherit pkgs; + photoncloudPackages = self.packages.${system}; + photoncloudModule = self.nixosModules.default; + nixNosModule = nix-nos.nixosModules.default; + } + ); + deployer-bootstrap-e2e = pkgs.runCommand "deployer-bootstrap-e2e" { nativeBuildInputs = with pkgs; [ bash diff --git a/nix/tests/fiberlb-native-bgp-multipath-vm-smoke.nix b/nix/tests/fiberlb-native-bgp-multipath-vm-smoke.nix new file mode 100644 index 0000000..c638330 --- /dev/null +++ b/nix/tests/fiberlb-native-bgp-multipath-vm-smoke.nix @@ -0,0 +1,591 @@ +{ + pkgs, + photoncloudPackages, + photoncloudModule, + nixNosModule, +}: + +let + mkGobgpdConfig = + routerId: + neighborAddress: + peerAs: + localAs: + pkgs.writeText "fiberlb-native-bgp-peer-${routerId}.json" (builtins.toJSON { + global = { + config = { + as = localAs; + router-id = routerId; + }; + }; + + neighbors = [ + { + config = { + neighbor-address = neighborAddress; + peer-as = peerAs; + description = "fiberlb-under-test"; + }; + } + ]; + }); + + routerAConfig = mkGobgpdConfig "192.168.100.1" "192.168.100.2" 65010 65020; + routerBConfig = mkGobgpdConfig "192.168.100.3" "192.168.100.2" 65010 65020; + + iamProtoDir = ../../iam/proto; + iamProto = "iam.proto"; + fiberlbProtoDir = ../../fiberlb/crates/fiberlb-api/proto; + fiberlbProto = "fiberlb.proto"; + backendScript = pkgs.writeText "fiberlb-multipath-backend.py" '' + from http.server import BaseHTTPRequestHandler, HTTPServer + + + class Handler(BaseHTTPRequestHandler): + def do_GET(self): + body = b"fiberlb multipath backend\n" + self.send_response(200) + self.send_header("Content-Type", "text/plain; charset=utf-8") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + def log_message(self, format, *args): + return + + + HTTPServer(("127.0.0.1", 18081), Handler).serve_forever() + ''; +in +{ + name = "fiberlb-native-bgp-multipath-vm-smoke"; + + nodes = { + router_a = + { ... }: + { + networking.hostName = "router-a"; + networking.useDHCP = false; + networking.firewall.enable = false; + virtualisation.vlans = [ 1 ]; + networking.interfaces.eth1.ipv4.addresses = [ + { + address = "192.168.100.1"; + prefixLength = 24; + } + ]; + + environment.systemPackages = with pkgs; [ + curl + gobgp + gobgpd + jq + ]; + + systemd.services.gobgpd-peer = { + description = "GoBGP router-a test peer for FiberLB native BGP multi-peer smoke"; + wantedBy = [ "multi-user.target" ]; + after = [ "network.target" ]; + serviceConfig = { + Type = "simple"; + ExecStart = "${pkgs.gobgpd}/bin/gobgpd -t json -f ${routerAConfig} --api-hosts 127.0.0.1:50051 -p"; + Restart = "on-failure"; + RestartSec = "2s"; + }; + }; + + system.stateVersion = "24.11"; + }; + + router_b = + { ... }: + { + networking.hostName = "router-b"; + networking.useDHCP = false; + networking.firewall.enable = false; + virtualisation.vlans = [ 1 ]; + networking.interfaces.eth1.ipv4.addresses = [ + { + address = "192.168.100.3"; + prefixLength = 24; + } + ]; + + environment.systemPackages = with pkgs; [ + curl + gobgp + gobgpd + jq + ]; + + systemd.services.gobgpd-peer = { + description = "GoBGP router-b test peer for FiberLB native BGP multi-peer smoke"; + wantedBy = [ "multi-user.target" ]; + after = [ "network.target" ]; + serviceConfig = { + Type = "simple"; + ExecStart = "${pkgs.gobgpd}/bin/gobgpd -t json -f ${routerBConfig} --api-hosts 127.0.0.1:50051 -p"; + Restart = "on-failure"; + RestartSec = "2s"; + }; + }; + + system.stateVersion = "24.11"; + }; + + lb = + { ... }: + { + imports = [ + nixNosModule + photoncloudModule + ]; + + networking.hostName = "lb"; + networking.useDHCP = false; + networking.firewall.enable = false; + virtualisation.vlans = [ 1 ]; + networking.interfaces.eth1.ipv4.addresses = [ + { + address = "192.168.100.2"; + prefixLength = 24; + } + ]; + + environment.systemPackages = with pkgs; [ + curl + grpcurl + jq + python3 + ]; + + services.iam = { + enable = true; + package = photoncloudPackages.iam-server; + port = 50080; + httpPort = 8083; + storeBackend = "memory"; + }; + + systemd.services.iam.environment = { + IAM_ALLOW_RANDOM_SIGNING_KEY = "1"; + }; + + services.fiberlb = { + enable = true; + package = photoncloudPackages.fiberlb-server; + port = 50085; + iamAddr = "192.168.100.2:50080"; + metadataBackend = "sqlite"; + databaseUrl = "sqlite:/var/lib/fiberlb/metadata.db"; + singleNode = true; + healthCheckIntervalSecs = 1; + healthCheckTimeoutSecs = 1; + vipCheckIntervalSecs = 1; + vipOwnership = { + enable = true; + interface = "lo"; + }; + bgp = { + enable = true; + localAs = 65010; + routerId = "192.168.100.2"; + nextHop = "192.168.100.2"; + holdTimeSecs = 9; + keepaliveSecs = 3; + peers = [ + { + address = "192.168.100.1"; + port = 179; + asn = 65020; + description = "router-a"; + } + { + address = "192.168.100.3"; + port = 179; + asn = 65020; + description = "router-b"; + } + ]; + }; + }; + + systemd.services.mock-backend = { + description = "FiberLB health-check backend"; + wantedBy = [ "multi-user.target" ]; + after = [ "network.target" ]; + serviceConfig = { + Type = "simple"; + ExecStart = "${pkgs.python3}/bin/python ${backendScript}"; + Restart = "always"; + RestartSec = "1s"; + }; + }; + + system.stateVersion = "24.11"; + }; + }; + + testScript = '' + import json + import re + import shlex + import time + + IAM_PROTO_DIR = "${iamProtoDir}" + IAM_PROTO = "${iamProto}" + FIBERLB_PROTO_DIR = "${fiberlbProtoDir}" + FIBERLB_PROTO = "${fiberlbProto}" + METRIC_RE = re.compile(r"^([a-zA-Z_:][a-zA-Z0-9_:]*)(?:\{([^}]*)\})?\s+([-+0-9.eE]+)$") + + def grpcurl_json(machine, endpoint, import_path, proto, service, payload, headers=None): + header_args = "" + for header in headers or []: + header_args += f" -H {shlex.quote(header)}" + command = ( + f"grpcurl -plaintext{header_args} " + f"-import-path {shlex.quote(import_path)} " + f"-proto {shlex.quote(proto)} " + f"-d {shlex.quote(json.dumps(payload))} " + f"{shlex.quote(endpoint)} {shlex.quote(service)}" + ) + status, output = machine.execute(f"timeout 15 sh -lc {shlex.quote(command + ' 2>&1')}") + if status != 0: + raise AssertionError( + "grpcurl failed" + f" service={service}" + f" status={status}" + f" payload={json.dumps(payload, sort_keys=True)}" + f" output={output}" + ) + return json.loads(output) + + def issue_project_admin_token(machine, org_id, project_id): + principal_id = f"fiberlb-multipath-{int(time.time())}" + deadline = time.time() + 120 + + def retry(action): + last_error = None + while time.time() < deadline: + try: + return action() + except Exception as exc: + last_error = exc + time.sleep(2) + raise AssertionError(f"IAM bootstrap timed out: {last_error}") + + retry(lambda: grpcurl_json( + machine, + "127.0.0.1:50080", + IAM_PROTO_DIR, + IAM_PROTO, + "iam.v1.IamAdmin/CreatePrincipal", + { + "id": principal_id, + "kind": "PRINCIPAL_KIND_SERVICE_ACCOUNT", + "name": principal_id, + "orgId": org_id, + "projectId": project_id, + }, + )) + retry(lambda: grpcurl_json( + machine, + "127.0.0.1:50080", + IAM_PROTO_DIR, + IAM_PROTO, + "iam.v1.IamAdmin/CreateBinding", + { + "principal": { + "kind": "PRINCIPAL_KIND_SERVICE_ACCOUNT", + "id": principal_id, + }, + "role": "roles/ProjectAdmin", + "scope": { + "project": { + "id": project_id, + "orgId": org_id, + } + }, + }, + )) + token_response = retry(lambda: grpcurl_json( + machine, + "127.0.0.1:50080", + IAM_PROTO_DIR, + IAM_PROTO, + "iam.v1.IamToken/IssueToken", + { + "principalId": principal_id, + "principalKind": "PRINCIPAL_KIND_SERVICE_ACCOUNT", + "scope": { + "project": { + "id": project_id, + "orgId": org_id, + } + }, + "ttlSeconds": 3600, + }, + )) + return token_response["token"] + + def wait_for_backend_status(status, backend_id, token): + lb.wait_until_succeeds( + "grpcurl -plaintext " + f"-H {shlex.quote('authorization: Bearer ' + token)} " + f"-import-path {shlex.quote(FIBERLB_PROTO_DIR)} " + f"-proto {shlex.quote(FIBERLB_PROTO)} " + f"-d {shlex.quote(json.dumps({'id': backend_id}))} " + "127.0.0.1:50085 fiberlb.v1.BackendService/GetBackend " + f"| jq -e {shlex.quote(f'.backend.status == \"{status}\"')}" + ) + + def wait_for_route(machine, prefix, present): + command = "gobgp -u 127.0.0.1 -p 50051 global rib || true" + if present: + machine.wait_until_succeeds(f"{command} | grep -F {shlex.quote(prefix)}") + else: + deadline = time.time() + 60 + while time.time() < deadline: + output = machine.succeed(command) + if prefix not in output: + return + time.sleep(1) + raise AssertionError(f"route {prefix} still present in GoBGP RIB") + + def wait_for_local_vip(vip, present): + pattern = f"inet {vip}/32" + if present: + lb.wait_until_succeeds( + f"ip -4 addr show dev lo | grep -F {shlex.quote(pattern)}" + ) + else: + deadline = time.time() + 60 + while time.time() < deadline: + output = lb.succeed("ip -4 addr show dev lo || true") + if pattern not in output: + return + time.sleep(1) + raise AssertionError(f"VIP {vip} still present on loopback") + + def wait_for_http_success(machine, url): + machine.wait_until_succeeds( + f"curl -fsS --max-time 5 {shlex.quote(url)} | grep -F 'fiberlb multipath backend'" + ) + + def parse_labels(label_blob): + if not label_blob: + return {} + labels = {} + for part in label_blob.split(","): + key, value = part.split("=", 1) + labels[key] = value.strip().strip('"') + return labels + + def wait_for_metric(metric_name, expected_value, labels=None): + expected_labels = labels or {} + deadline = time.time() + 60 + + while time.time() < deadline: + exposition = lb.succeed("curl -fsS http://127.0.0.1:9098/metrics") + for line in exposition.splitlines(): + line = line.strip() + if not line or line.startswith("#"): + continue + match = METRIC_RE.match(line) + if not match: + continue + name, label_blob, value = match.groups() + if name != metric_name: + continue + if parse_labels(label_blob) != expected_labels: + continue + if abs(float(value) - float(expected_value)) < 0.0001: + return + time.sleep(1) + + raise AssertionError( + f"metric {metric_name} with labels={expected_labels} did not reach {expected_value}" + ) + + start_all() + serial_stdout_off() + + router_a.wait_for_unit("gobgpd-peer.service") + router_a.wait_until_succeeds("ss -ltnH '( sport = :179 )' | grep -q LISTEN") + router_b.wait_for_unit("gobgpd-peer.service") + router_b.wait_until_succeeds("ss -ltnH '( sport = :179 )' | grep -q LISTEN") + lb.wait_for_unit("iam.service") + lb.wait_until_succeeds("ss -ltnH '( sport = :50080 )' | grep -q LISTEN") + lb.wait_for_unit("mock-backend.service") + lb.wait_for_unit("fiberlb.service") + lb.wait_until_succeeds("ss -ltnH '( sport = :50085 )' | grep -q LISTEN") + lb.wait_until_succeeds("ss -ltnH '( sport = :9098 )' | grep -q LISTEN") + + router_a.wait_until_succeeds("gobgp -u 127.0.0.1 -p 50051 neighbor | grep -F 192.168.100.2") + router_b.wait_until_succeeds("gobgp -u 127.0.0.1 -p 50051 neighbor | grep -F 192.168.100.2") + wait_for_metric("fiberlb_bgp_configured_peers", 2) + wait_for_metric("fiberlb_bgp_connected_peers", 2) + wait_for_metric("fiberlb_bgp_peer_session_up", 1, {"peer": "192.168.100.1:179"}) + wait_for_metric("fiberlb_bgp_peer_session_up", 1, {"peer": "192.168.100.3:179"}) + + token = issue_project_admin_token(lb, "bgp-multipath-org", "bgp-multipath-project") + + lb_response = grpcurl_json( + lb, + "127.0.0.1:50085", + FIBERLB_PROTO_DIR, + FIBERLB_PROTO, + "fiberlb.v1.LoadBalancerService/CreateLoadBalancer", + { + "name": "bgp-multipath-lb", + "orgId": "bgp-multipath-org", + "projectId": "bgp-multipath-project", + "description": "native bgp multipath smoke", + }, + headers=[f"authorization: Bearer {token}"], + ) + loadbalancer = lb_response["loadbalancer"] + lb_id = loadbalancer["id"] + vip = loadbalancer["vipAddress"] + vip_prefix = f"{vip}/32" + listener_url = f"http://{vip}:18080/" + + pool_id = grpcurl_json( + lb, + "127.0.0.1:50085", + FIBERLB_PROTO_DIR, + FIBERLB_PROTO, + "fiberlb.v1.PoolService/CreatePool", + { + "name": "bgp-multipath-pool", + "loadbalancerId": lb_id, + "algorithm": "POOL_ALGORITHM_ROUND_ROBIN", + "protocol": "POOL_PROTOCOL_TCP", + }, + headers=[f"authorization: Bearer {token}"], + )["pool"]["id"] + + backend_id = grpcurl_json( + lb, + "127.0.0.1:50085", + FIBERLB_PROTO_DIR, + FIBERLB_PROTO, + "fiberlb.v1.BackendService/CreateBackend", + { + "name": "bgp-multipath-backend", + "poolId": pool_id, + "address": "127.0.0.1", + "port": 18081, + "weight": 1, + }, + headers=[f"authorization: Bearer {token}"], + )["backend"]["id"] + + grpcurl_json( + lb, + "127.0.0.1:50085", + FIBERLB_PROTO_DIR, + FIBERLB_PROTO, + "fiberlb.v1.HealthCheckService/CreateHealthCheck", + { + "name": "bgp-multipath-health", + "poolId": pool_id, + "type": "HEALTH_CHECK_TYPE_HTTP", + "intervalSeconds": 1, + "timeoutSeconds": 1, + "healthyThreshold": 1, + "unhealthyThreshold": 1, + "httpConfig": { + "method": "GET", + "path": "/", + "expectedCodes": [200], + }, + }, + headers=[f"authorization: Bearer {token}"], + ) + + grpcurl_json( + lb, + "127.0.0.1:50085", + FIBERLB_PROTO_DIR, + FIBERLB_PROTO, + "fiberlb.v1.ListenerService/CreateListener", + { + "name": "bgp-multipath-listener", + "loadbalancerId": lb_id, + "protocol": "LISTENER_PROTOCOL_TCP", + "port": 18080, + "defaultPoolId": pool_id, + }, + headers=[f"authorization: Bearer {token}"], + ) + + wait_for_backend_status("BACKEND_STATUS_ONLINE", backend_id, token) + wait_for_local_vip(vip, True) + wait_for_metric("fiberlb_bgp_desired_routes", 1) + wait_for_route(router_a, vip_prefix, True) + wait_for_route(router_b, vip_prefix, True) + router_a.succeed(f"ip route replace {shlex.quote(vip_prefix)} via 192.168.100.2 dev eth1") + router_b.succeed(f"ip route replace {shlex.quote(vip_prefix)} via 192.168.100.2 dev eth1") + wait_for_http_success(router_a, listener_url) + wait_for_http_success(router_b, listener_url) + + router_a.succeed("systemctl stop gobgpd-peer.service") + wait_for_metric("fiberlb_bgp_connected_peers", 1) + wait_for_metric("fiberlb_bgp_peer_session_up", 0, {"peer": "192.168.100.1:179"}) + wait_for_metric("fiberlb_bgp_peer_session_up", 1, {"peer": "192.168.100.3:179"}) + wait_for_metric("fiberlb_bgp_desired_routes", 1) + wait_for_route(router_b, vip_prefix, True) + wait_for_http_success(router_b, listener_url) + + router_a.succeed("systemctl start gobgpd-peer.service") + router_a.wait_for_unit("gobgpd-peer.service") + router_a.wait_until_succeeds("gobgp -u 127.0.0.1 -p 50051 neighbor | grep -F 192.168.100.2") + wait_for_metric("fiberlb_bgp_connected_peers", 2) + wait_for_metric("fiberlb_bgp_peer_session_up", 1, {"peer": "192.168.100.1:179"}) + wait_for_route(router_a, vip_prefix, True) + wait_for_http_success(router_a, listener_url) + + router_b.succeed("systemctl stop gobgpd-peer.service") + wait_for_metric("fiberlb_bgp_connected_peers", 1) + wait_for_metric("fiberlb_bgp_peer_session_up", 0, {"peer": "192.168.100.3:179"}) + wait_for_metric("fiberlb_bgp_peer_session_up", 1, {"peer": "192.168.100.1:179"}) + wait_for_metric("fiberlb_bgp_desired_routes", 1) + wait_for_route(router_a, vip_prefix, True) + wait_for_http_success(router_a, listener_url) + + router_b.succeed("systemctl start gobgpd-peer.service") + router_b.wait_for_unit("gobgpd-peer.service") + router_b.wait_until_succeeds("gobgp -u 127.0.0.1 -p 50051 neighbor | grep -F 192.168.100.2") + wait_for_metric("fiberlb_bgp_connected_peers", 2) + wait_for_metric("fiberlb_bgp_peer_session_up", 1, {"peer": "192.168.100.3:179"}) + wait_for_route(router_b, vip_prefix, True) + wait_for_http_success(router_b, listener_url) + + lb.succeed("systemctl stop mock-backend.service") + wait_for_backend_status("BACKEND_STATUS_OFFLINE", backend_id, token) + wait_for_metric("fiberlb_bgp_desired_routes", 0) + wait_for_route(router_a, vip_prefix, False) + wait_for_route(router_b, vip_prefix, False) + wait_for_local_vip(vip, False) + router_a.fail(f"curl -fsS --max-time 3 {shlex.quote(listener_url)}") + router_b.fail(f"curl -fsS --max-time 3 {shlex.quote(listener_url)}") + + lb.succeed("systemctl start mock-backend.service") + lb.wait_for_unit("mock-backend.service") + wait_for_backend_status("BACKEND_STATUS_ONLINE", backend_id, token) + wait_for_metric("fiberlb_bgp_desired_routes", 1) + wait_for_local_vip(vip, True) + wait_for_route(router_a, vip_prefix, True) + wait_for_route(router_b, vip_prefix, True) + wait_for_http_success(router_a, listener_url) + wait_for_http_success(router_b, listener_url) + + lb.succeed("systemctl stop fiberlb.service") + wait_for_local_vip(vip, False) + wait_for_route(router_a, vip_prefix, False) + wait_for_route(router_b, vip_prefix, False) + router_a.fail(f"curl -fsS --max-time 3 {shlex.quote(listener_url)}") + router_b.fail(f"curl -fsS --max-time 3 {shlex.quote(listener_url)}") + ''; +}