From 67d4523adfb3b40aebf4c043a152f8b1acf6103b Mon Sep 17 00:00:00 2001
From: centra
Date: Mon, 30 Mar 2026 17:41:24 +0900
Subject: [PATCH] Strengthen FiberLB multi-peer BGP verification
---
.../crates/fiberlb-server/src/bgp_client.rs | 259 +++++++-
fiberlb/crates/fiberlb-server/src/main.rs | 25 +
.../crates/fiberlb-server/src/vip_manager.rs | 23 +-
flake.nix | 9 +
.../fiberlb-native-bgp-multipath-vm-smoke.nix | 591 ++++++++++++++++++
5 files changed, 899 insertions(+), 8 deletions(-)
create mode 100644 nix/tests/fiberlb-native-bgp-multipath-vm-smoke.nix
diff --git a/fiberlb/crates/fiberlb-server/src/bgp_client.rs b/fiberlb/crates/fiberlb-server/src/bgp_client.rs
index a8cf511..65dda4f 100644
--- a/fiberlb/crates/fiberlb-server/src/bgp_client.rs
+++ b/fiberlb/crates/fiberlb-server/src/bgp_client.rs
@@ -32,6 +32,12 @@ const ATTR_TYPE_ORIGIN: u8 = 1;
const ATTR_TYPE_AS_PATH: u8 = 2;
const ATTR_TYPE_NEXT_HOP: u8 = 3;
const AS_PATH_SEGMENT_SEQUENCE: u8 = 2;
+const METRIC_BGP_CONFIGURED_PEERS: &str = "fiberlb_bgp_configured_peers";
+const METRIC_BGP_CONNECTED_PEERS: &str = "fiberlb_bgp_connected_peers";
+const METRIC_BGP_DESIRED_ROUTES: &str = "fiberlb_bgp_desired_routes";
+const METRIC_BGP_PEER_SESSION_UP: &str = "fiberlb_bgp_peer_session_up";
+const METRIC_BGP_SESSION_ESTABLISHED_TOTAL: &str = "fiberlb_bgp_session_established_total";
+const METRIC_BGP_SESSION_ENDS_TOTAL: &str = "fiberlb_bgp_session_ends_total";
/// Result type for BGP operations.
pub type Result = std::result::Result;
@@ -110,6 +116,13 @@ impl NativeBgpSpeaker {
connected_sessions: AtomicUsize::new(0),
});
+ record_configured_peers(config.peers.len());
+ record_connected_peers(&shared);
+ record_desired_routes(0);
+ for peer in &config.peers {
+ set_peer_session_up(peer, false);
+ }
+
for peer in config.peers.clone() {
tokio::spawn(run_peer_loop(config.clone(), peer, shared.clone()));
}
@@ -120,9 +133,11 @@ impl NativeBgpSpeaker {
async fn update_route(&self, prefix: Ipv4Addr, next_hop: Ipv4Addr) {
let mut desired = self.shared.desired_routes.write().await;
let changed = desired.insert(prefix, next_hop) != Some(next_hop);
+ let route_count = desired.len();
drop(desired);
if changed {
+ record_desired_routes(route_count);
publish_route_change(&self.shared);
}
}
@@ -130,9 +145,11 @@ impl NativeBgpSpeaker {
async fn remove_route(&self, prefix: Ipv4Addr) {
let mut desired = self.shared.desired_routes.write().await;
let changed = desired.remove(&prefix).is_some();
+ let route_count = desired.len();
drop(desired);
if changed {
+ record_desired_routes(route_count);
publish_route_change(&self.shared);
}
}
@@ -172,12 +189,29 @@ pub async fn create_bgp_client(config: BgpConfig) -> Result>
async fn run_peer_loop(config: BgpConfig, peer: BgpPeerConfig, shared: Arc) {
let peer_name = peer_name(&peer);
+ let peer_label = peer_metric_label(&peer);
let connect_retry = Duration::from_secs(config.connect_retry_secs.max(1));
loop {
match establish_peer_session(&config, &peer, shared.clone()).await {
- Ok(()) => warn!(peer = %peer_name, "BGP peer session ended cleanly; reconnecting"),
- Err(error) => warn!(peer = %peer_name, error = %error, "BGP peer session failed"),
+ Ok(()) => {
+ metrics::counter!(
+ METRIC_BGP_SESSION_ENDS_TOTAL,
+ "peer" => peer_label.clone(),
+ "result" => "clean",
+ )
+ .increment(1);
+ warn!(peer = %peer_name, "BGP peer session ended cleanly; reconnecting");
+ }
+ Err(error) => {
+ metrics::counter!(
+ METRIC_BGP_SESSION_ENDS_TOTAL,
+ "peer" => peer_label.clone(),
+ "result" => "error",
+ )
+ .increment(1);
+ warn!(peer = %peer_name, error = %error, "BGP peer session failed");
+ }
}
sleep(connect_retry).await;
@@ -219,7 +253,14 @@ async fn establish_peer_session(
"FiberLB BGP session established",
);
+ metrics::counter!(
+ METRIC_BGP_SESSION_ESTABLISHED_TOTAL,
+ "peer" => peer_metric_label(peer),
+ )
+ .increment(1);
+ set_peer_session_up(peer, true);
shared.connected_sessions.fetch_add(1, Ordering::Relaxed);
+ record_connected_peers(&shared);
let session_result = run_established_session(
stream,
config,
@@ -230,6 +271,8 @@ async fn establish_peer_session(
)
.await;
shared.connected_sessions.fetch_sub(1, Ordering::Relaxed);
+ record_connected_peers(&shared);
+ set_peer_session_up(peer, false);
session_result
}
@@ -331,9 +374,6 @@ async fn run_established_session(
}
});
- let mut advertised = HashMap::new();
- reconcile_routes(&shared, &mut writer, &mut advertised, local_as).await?;
-
let mut keepalive = tokio::time::interval(keepalive_interval);
keepalive.tick().await;
@@ -341,6 +381,8 @@ async fn run_established_session(
hold_monitor.tick().await;
let mut route_updates = shared.route_updates.subscribe();
+ let mut advertised = HashMap::new();
+ reconcile_routes(&shared, &mut writer, &mut advertised, local_as).await?;
let mut last_rx = Instant::now();
loop {
@@ -537,6 +579,28 @@ fn peer_name(peer: &BgpPeerConfig) -> String {
}
}
+fn peer_metric_label(peer: &BgpPeerConfig) -> String {
+ format!("{}:{}", peer.address, peer.port)
+}
+
+fn record_configured_peers(count: usize) {
+ metrics::gauge!(METRIC_BGP_CONFIGURED_PEERS).set(count as f64);
+}
+
+fn record_connected_peers(shared: &BgpSharedState) {
+ metrics::gauge!(METRIC_BGP_CONNECTED_PEERS)
+ .set(shared.connected_sessions.load(Ordering::Relaxed) as f64);
+}
+
+fn record_desired_routes(count: usize) {
+ metrics::gauge!(METRIC_BGP_DESIRED_ROUTES).set(count as f64);
+}
+
+fn set_peer_session_up(peer: &BgpPeerConfig, up: bool) {
+ metrics::gauge!(METRIC_BGP_PEER_SESSION_UP, "peer" => peer_metric_label(peer))
+ .set(if up { 1.0 } else { 0.0 });
+}
+
fn negotiated_keepalive_interval(
requested_keepalive_secs: u16,
negotiated_hold_time_secs: u16,
@@ -943,7 +1007,8 @@ mod tests {
use super::*;
use tokio::net::TcpListener;
- use tokio::sync::oneshot;
+ use tokio::sync::{mpsc, oneshot};
+ use tokio::time::timeout;
#[tokio::test]
async fn test_disabled_bgp_client_is_noop() {
@@ -1065,6 +1130,188 @@ mod tests {
peer_task.await.unwrap();
}
+ #[tokio::test]
+ async fn test_native_speaker_resyncs_routes_across_multiple_peers() {
+ let listener_a = TcpListener::bind("127.0.0.1:0").await.unwrap();
+ let listener_b = TcpListener::bind("127.0.0.1:0").await.unwrap();
+ let port_a = listener_a.local_addr().unwrap().port();
+ let port_b = listener_b.local_addr().unwrap().port();
+
+ let (peer_a_events_tx, mut peer_a_events_rx) = mpsc::channel(4);
+ let (peer_b_events_tx, mut peer_b_events_rx) = mpsc::channel(4);
+
+ let peer_a_task = tokio::spawn(async move {
+ let (mut first_socket, _) = listener_a.accept().await.unwrap();
+ complete_test_peer_handshake(
+ &mut first_socket,
+ 65010,
+ "192.0.2.10".parse().unwrap(),
+ 65020,
+ "192.0.2.20".parse().unwrap(),
+ )
+ .await;
+
+ let first_announcement = expect_update(&mut first_socket).await;
+ assert_eq!(
+ first_announcement.announced_routes,
+ vec!["203.0.113.10".parse::().unwrap()]
+ );
+ peer_a_events_tx.send("first-announcement").await.unwrap();
+ drop(first_socket);
+
+ let (mut second_socket, _) = listener_a.accept().await.unwrap();
+ complete_test_peer_handshake(
+ &mut second_socket,
+ 65010,
+ "192.0.2.10".parse().unwrap(),
+ 65020,
+ "192.0.2.20".parse().unwrap(),
+ )
+ .await;
+
+ let second_announcement = expect_update(&mut second_socket).await;
+ assert_eq!(
+ second_announcement.announced_routes,
+ vec!["203.0.113.10".parse::().unwrap()]
+ );
+ peer_a_events_tx.send("second-announcement").await.unwrap();
+
+ let withdrawal = expect_update(&mut second_socket).await;
+ assert_eq!(
+ withdrawal.withdrawn_routes,
+ vec!["203.0.113.10".parse::().unwrap()]
+ );
+ peer_a_events_tx.send("withdrawal").await.unwrap();
+ });
+
+ let peer_b_task = tokio::spawn(async move {
+ let (mut socket, _) = listener_b.accept().await.unwrap();
+ complete_test_peer_handshake(
+ &mut socket,
+ 65010,
+ "192.0.2.10".parse().unwrap(),
+ 65030,
+ "192.0.2.30".parse().unwrap(),
+ )
+ .await;
+
+ let announcement = expect_update(&mut socket).await;
+ assert_eq!(
+ announcement.announced_routes,
+ vec!["203.0.113.10".parse::().unwrap()]
+ );
+ peer_b_events_tx.send("announcement").await.unwrap();
+
+ let withdrawal = expect_update(&mut socket).await;
+ assert_eq!(
+ withdrawal.withdrawn_routes,
+ vec!["203.0.113.10".parse::().unwrap()]
+ );
+ peer_b_events_tx.send("withdrawal").await.unwrap();
+ });
+
+ let client = create_bgp_client(BgpConfig {
+ enabled: true,
+ local_as: 65010,
+ router_id: "192.0.2.10".to_string(),
+ connect_retry_secs: 1,
+ peers: vec![
+ BgpPeerConfig {
+ address: "127.0.0.1".to_string(),
+ port: port_a,
+ asn: 65020,
+ description: "peer-a".to_string(),
+ },
+ BgpPeerConfig {
+ address: "127.0.0.1".to_string(),
+ port: port_b,
+ asn: 65030,
+ description: "peer-b".to_string(),
+ },
+ ],
+ ..BgpConfig::default()
+ })
+ .await
+ .unwrap();
+
+ let vip: IpAddr = "203.0.113.10".parse().unwrap();
+ let next_hop: IpAddr = "192.0.2.10".parse().unwrap();
+ client.announce_route(vip, next_hop).await.unwrap();
+
+ assert_eq!(
+ timeout(Duration::from_secs(5), peer_a_events_rx.recv())
+ .await
+ .unwrap()
+ .unwrap(),
+ "first-announcement"
+ );
+ assert_eq!(
+ timeout(Duration::from_secs(5), peer_b_events_rx.recv())
+ .await
+ .unwrap()
+ .unwrap(),
+ "announcement"
+ );
+ assert_eq!(
+ timeout(Duration::from_secs(5), peer_a_events_rx.recv())
+ .await
+ .unwrap()
+ .unwrap(),
+ "second-announcement"
+ );
+
+ client.withdraw_route(vip).await.unwrap();
+
+ assert_eq!(
+ timeout(Duration::from_secs(5), peer_b_events_rx.recv())
+ .await
+ .unwrap()
+ .unwrap(),
+ "withdrawal"
+ );
+ assert_eq!(
+ timeout(Duration::from_secs(5), peer_a_events_rx.recv())
+ .await
+ .unwrap()
+ .unwrap(),
+ "withdrawal"
+ );
+
+ peer_a_task.await.unwrap();
+ peer_b_task.await.unwrap();
+ }
+
+ async fn complete_test_peer_handshake(
+ socket: &mut TcpStream,
+ expected_local_as: u16,
+ expected_router_id: Ipv4Addr,
+ peer_as: u16,
+ peer_router_id: Ipv4Addr,
+ ) {
+ let open = match read_bgp_message(socket).await.unwrap() {
+ BgpMessage::Open(open) => open,
+ other => panic!("expected OPEN, got {:?}", other),
+ };
+ assert_eq!(open.asn, expected_local_as);
+ assert_eq!(open.router_id, expected_router_id);
+
+ send_open(socket, peer_as, 90, peer_router_id).await.unwrap();
+
+ match read_bgp_message(socket).await.unwrap() {
+ BgpMessage::Keepalive => {}
+ other => panic!("expected KEEPALIVE, got {:?}", other),
+ }
+
+ send_keepalive(socket).await.unwrap();
+ }
+
+ async fn expect_update(socket: &mut TcpStream) -> UpdateMessage {
+ match read_bgp_message(socket).await.unwrap() {
+ BgpMessage::Update(update) => update,
+ other => panic!("expected UPDATE, got {:?}", other),
+ }
+ }
+
#[test]
fn test_parse_update_message_extracts_routes() {
let mut attrs = Vec::new();
diff --git a/fiberlb/crates/fiberlb-server/src/main.rs b/fiberlb/crates/fiberlb-server/src/main.rs
index 48b7de1..e2a6140 100644
--- a/fiberlb/crates/fiberlb-server/src/main.rs
+++ b/fiberlb/crates/fiberlb-server/src/main.rs
@@ -132,6 +132,31 @@ async fn main() -> Result<(), Box> {
metrics_addr
);
+ metrics::describe_gauge!(
+ "fiberlb_bgp_configured_peers",
+ "Number of BGP peers configured for the native FiberLB speaker"
+ );
+ metrics::describe_gauge!(
+ "fiberlb_bgp_connected_peers",
+ "Number of BGP peer sessions currently established"
+ );
+ metrics::describe_gauge!(
+ "fiberlb_bgp_desired_routes",
+ "Number of VIP routes FiberLB currently wants to advertise"
+ );
+ metrics::describe_gauge!(
+ "fiberlb_bgp_peer_session_up",
+ "Per-peer BGP session state (1=established, 0=down)"
+ );
+ metrics::describe_counter!(
+ "fiberlb_bgp_session_established_total",
+ "Total number of BGP peer sessions established"
+ );
+ metrics::describe_counter!(
+ "fiberlb_bgp_session_ends_total",
+ "Total number of BGP peer session terminations by peer and result"
+ );
+
if let Some(endpoint) = &config.chainfire_endpoint {
tracing::info!(" Cluster coordination: ChainFire @ {}", endpoint);
let endpoint = endpoint.clone();
diff --git a/fiberlb/crates/fiberlb-server/src/vip_manager.rs b/fiberlb/crates/fiberlb-server/src/vip_manager.rs
index e95c24a..241fa32 100644
--- a/fiberlb/crates/fiberlb-server/src/vip_manager.rs
+++ b/fiberlb/crates/fiberlb-server/src/vip_manager.rs
@@ -8,7 +8,7 @@ use std::net::IpAddr;
use std::sync::Arc;
use std::time::Duration;
-use tokio::sync::RwLock;
+use tokio::sync::{watch, RwLock};
use tokio::time::sleep;
use tracing::{debug, error, info, warn};
@@ -48,6 +48,8 @@ pub struct VipManager {
vip_owner: Option>,
/// Router's own IP address (used as BGP next hop)
next_hop: IpAddr,
+ /// Shutdown signal for the background reconciliation task.
+ shutdown: watch::Sender,
}
impl VipManager {
@@ -58,12 +60,14 @@ impl VipManager {
next_hop: IpAddr,
vip_owner: Option>,
) -> Self {
+ let (shutdown, _shutdown_rx) = watch::channel(false);
Self {
bgp,
metadata,
vip_state: Arc::new(RwLock::new(HashMap::new())),
vip_owner,
next_hop,
+ shutdown,
}
}
@@ -80,13 +84,27 @@ impl VipManager {
"VIP manager started (check interval: {}s)",
check_interval.as_secs()
);
+ let mut shutdown_rx = self.shutdown.subscribe();
loop {
+ if *shutdown_rx.borrow() {
+ info!("VIP manager reconciliation loop stopping");
+ break;
+ }
+
if let Err(e) = self.check_and_update_vips().await {
error!("VIP manager check failed: {}", e);
}
- sleep(check_interval).await;
+ tokio::select! {
+ _ = sleep(check_interval) => {}
+ changed = shutdown_rx.changed() => {
+ if changed.is_err() || *shutdown_rx.borrow_and_update() {
+ info!("VIP manager reconciliation loop stopping");
+ break;
+ }
+ }
+ }
}
})
}
@@ -293,6 +311,7 @@ impl VipManager {
/// Should be called during server shutdown to ensure clean BGP state
pub async fn shutdown(&self) -> Result<(), Box> {
info!("VIP manager shutting down, withdrawing all VIPs...");
+ let _ = self.shutdown.send(true);
let mut state = self.vip_state.write().await;
let managed_vips: Vec = state.keys().copied().collect();
diff --git a/flake.nix b/flake.nix
index d12d6c4..223e9af 100644
--- a/flake.nix
+++ b/flake.nix
@@ -927,6 +927,15 @@
}
);
+ fiberlb-native-bgp-multipath-vm-smoke = pkgs.testers.runNixOSTest (
+ import ./nix/tests/fiberlb-native-bgp-multipath-vm-smoke.nix {
+ inherit pkgs;
+ photoncloudPackages = self.packages.${system};
+ photoncloudModule = self.nixosModules.default;
+ nixNosModule = nix-nos.nixosModules.default;
+ }
+ );
+
deployer-bootstrap-e2e = pkgs.runCommand "deployer-bootstrap-e2e" {
nativeBuildInputs = with pkgs; [
bash
diff --git a/nix/tests/fiberlb-native-bgp-multipath-vm-smoke.nix b/nix/tests/fiberlb-native-bgp-multipath-vm-smoke.nix
new file mode 100644
index 0000000..c638330
--- /dev/null
+++ b/nix/tests/fiberlb-native-bgp-multipath-vm-smoke.nix
@@ -0,0 +1,591 @@
+{
+ pkgs,
+ photoncloudPackages,
+ photoncloudModule,
+ nixNosModule,
+}:
+
+let
+ mkGobgpdConfig =
+ routerId:
+ neighborAddress:
+ peerAs:
+ localAs:
+ pkgs.writeText "fiberlb-native-bgp-peer-${routerId}.json" (builtins.toJSON {
+ global = {
+ config = {
+ as = localAs;
+ router-id = routerId;
+ };
+ };
+
+ neighbors = [
+ {
+ config = {
+ neighbor-address = neighborAddress;
+ peer-as = peerAs;
+ description = "fiberlb-under-test";
+ };
+ }
+ ];
+ });
+
+ routerAConfig = mkGobgpdConfig "192.168.100.1" "192.168.100.2" 65010 65020;
+ routerBConfig = mkGobgpdConfig "192.168.100.3" "192.168.100.2" 65010 65020;
+
+ iamProtoDir = ../../iam/proto;
+ iamProto = "iam.proto";
+ fiberlbProtoDir = ../../fiberlb/crates/fiberlb-api/proto;
+ fiberlbProto = "fiberlb.proto";
+ backendScript = pkgs.writeText "fiberlb-multipath-backend.py" ''
+ from http.server import BaseHTTPRequestHandler, HTTPServer
+
+
+ class Handler(BaseHTTPRequestHandler):
+ def do_GET(self):
+ body = b"fiberlb multipath backend\n"
+ self.send_response(200)
+ self.send_header("Content-Type", "text/plain; charset=utf-8")
+ self.send_header("Content-Length", str(len(body)))
+ self.end_headers()
+ self.wfile.write(body)
+
+ def log_message(self, format, *args):
+ return
+
+
+ HTTPServer(("127.0.0.1", 18081), Handler).serve_forever()
+ '';
+in
+{
+ name = "fiberlb-native-bgp-multipath-vm-smoke";
+
+ nodes = {
+ router_a =
+ { ... }:
+ {
+ networking.hostName = "router-a";
+ networking.useDHCP = false;
+ networking.firewall.enable = false;
+ virtualisation.vlans = [ 1 ];
+ networking.interfaces.eth1.ipv4.addresses = [
+ {
+ address = "192.168.100.1";
+ prefixLength = 24;
+ }
+ ];
+
+ environment.systemPackages = with pkgs; [
+ curl
+ gobgp
+ gobgpd
+ jq
+ ];
+
+ systemd.services.gobgpd-peer = {
+ description = "GoBGP router-a test peer for FiberLB native BGP multi-peer smoke";
+ wantedBy = [ "multi-user.target" ];
+ after = [ "network.target" ];
+ serviceConfig = {
+ Type = "simple";
+ ExecStart = "${pkgs.gobgpd}/bin/gobgpd -t json -f ${routerAConfig} --api-hosts 127.0.0.1:50051 -p";
+ Restart = "on-failure";
+ RestartSec = "2s";
+ };
+ };
+
+ system.stateVersion = "24.11";
+ };
+
+ router_b =
+ { ... }:
+ {
+ networking.hostName = "router-b";
+ networking.useDHCP = false;
+ networking.firewall.enable = false;
+ virtualisation.vlans = [ 1 ];
+ networking.interfaces.eth1.ipv4.addresses = [
+ {
+ address = "192.168.100.3";
+ prefixLength = 24;
+ }
+ ];
+
+ environment.systemPackages = with pkgs; [
+ curl
+ gobgp
+ gobgpd
+ jq
+ ];
+
+ systemd.services.gobgpd-peer = {
+ description = "GoBGP router-b test peer for FiberLB native BGP multi-peer smoke";
+ wantedBy = [ "multi-user.target" ];
+ after = [ "network.target" ];
+ serviceConfig = {
+ Type = "simple";
+ ExecStart = "${pkgs.gobgpd}/bin/gobgpd -t json -f ${routerBConfig} --api-hosts 127.0.0.1:50051 -p";
+ Restart = "on-failure";
+ RestartSec = "2s";
+ };
+ };
+
+ system.stateVersion = "24.11";
+ };
+
+ lb =
+ { ... }:
+ {
+ imports = [
+ nixNosModule
+ photoncloudModule
+ ];
+
+ networking.hostName = "lb";
+ networking.useDHCP = false;
+ networking.firewall.enable = false;
+ virtualisation.vlans = [ 1 ];
+ networking.interfaces.eth1.ipv4.addresses = [
+ {
+ address = "192.168.100.2";
+ prefixLength = 24;
+ }
+ ];
+
+ environment.systemPackages = with pkgs; [
+ curl
+ grpcurl
+ jq
+ python3
+ ];
+
+ services.iam = {
+ enable = true;
+ package = photoncloudPackages.iam-server;
+ port = 50080;
+ httpPort = 8083;
+ storeBackend = "memory";
+ };
+
+ systemd.services.iam.environment = {
+ IAM_ALLOW_RANDOM_SIGNING_KEY = "1";
+ };
+
+ services.fiberlb = {
+ enable = true;
+ package = photoncloudPackages.fiberlb-server;
+ port = 50085;
+ iamAddr = "192.168.100.2:50080";
+ metadataBackend = "sqlite";
+ databaseUrl = "sqlite:/var/lib/fiberlb/metadata.db";
+ singleNode = true;
+ healthCheckIntervalSecs = 1;
+ healthCheckTimeoutSecs = 1;
+ vipCheckIntervalSecs = 1;
+ vipOwnership = {
+ enable = true;
+ interface = "lo";
+ };
+ bgp = {
+ enable = true;
+ localAs = 65010;
+ routerId = "192.168.100.2";
+ nextHop = "192.168.100.2";
+ holdTimeSecs = 9;
+ keepaliveSecs = 3;
+ peers = [
+ {
+ address = "192.168.100.1";
+ port = 179;
+ asn = 65020;
+ description = "router-a";
+ }
+ {
+ address = "192.168.100.3";
+ port = 179;
+ asn = 65020;
+ description = "router-b";
+ }
+ ];
+ };
+ };
+
+ systemd.services.mock-backend = {
+ description = "FiberLB health-check backend";
+ wantedBy = [ "multi-user.target" ];
+ after = [ "network.target" ];
+ serviceConfig = {
+ Type = "simple";
+ ExecStart = "${pkgs.python3}/bin/python ${backendScript}";
+ Restart = "always";
+ RestartSec = "1s";
+ };
+ };
+
+ system.stateVersion = "24.11";
+ };
+ };
+
+ testScript = ''
+ import json
+ import re
+ import shlex
+ import time
+
+ IAM_PROTO_DIR = "${iamProtoDir}"
+ IAM_PROTO = "${iamProto}"
+ FIBERLB_PROTO_DIR = "${fiberlbProtoDir}"
+ FIBERLB_PROTO = "${fiberlbProto}"
+ METRIC_RE = re.compile(r"^([a-zA-Z_:][a-zA-Z0-9_:]*)(?:\{([^}]*)\})?\s+([-+0-9.eE]+)$")
+
+ def grpcurl_json(machine, endpoint, import_path, proto, service, payload, headers=None):
+ header_args = ""
+ for header in headers or []:
+ header_args += f" -H {shlex.quote(header)}"
+ command = (
+ f"grpcurl -plaintext{header_args} "
+ f"-import-path {shlex.quote(import_path)} "
+ f"-proto {shlex.quote(proto)} "
+ f"-d {shlex.quote(json.dumps(payload))} "
+ f"{shlex.quote(endpoint)} {shlex.quote(service)}"
+ )
+ status, output = machine.execute(f"timeout 15 sh -lc {shlex.quote(command + ' 2>&1')}")
+ if status != 0:
+ raise AssertionError(
+ "grpcurl failed"
+ f" service={service}"
+ f" status={status}"
+ f" payload={json.dumps(payload, sort_keys=True)}"
+ f" output={output}"
+ )
+ return json.loads(output)
+
+ def issue_project_admin_token(machine, org_id, project_id):
+ principal_id = f"fiberlb-multipath-{int(time.time())}"
+ deadline = time.time() + 120
+
+ def retry(action):
+ last_error = None
+ while time.time() < deadline:
+ try:
+ return action()
+ except Exception as exc:
+ last_error = exc
+ time.sleep(2)
+ raise AssertionError(f"IAM bootstrap timed out: {last_error}")
+
+ retry(lambda: grpcurl_json(
+ machine,
+ "127.0.0.1:50080",
+ IAM_PROTO_DIR,
+ IAM_PROTO,
+ "iam.v1.IamAdmin/CreatePrincipal",
+ {
+ "id": principal_id,
+ "kind": "PRINCIPAL_KIND_SERVICE_ACCOUNT",
+ "name": principal_id,
+ "orgId": org_id,
+ "projectId": project_id,
+ },
+ ))
+ retry(lambda: grpcurl_json(
+ machine,
+ "127.0.0.1:50080",
+ IAM_PROTO_DIR,
+ IAM_PROTO,
+ "iam.v1.IamAdmin/CreateBinding",
+ {
+ "principal": {
+ "kind": "PRINCIPAL_KIND_SERVICE_ACCOUNT",
+ "id": principal_id,
+ },
+ "role": "roles/ProjectAdmin",
+ "scope": {
+ "project": {
+ "id": project_id,
+ "orgId": org_id,
+ }
+ },
+ },
+ ))
+ token_response = retry(lambda: grpcurl_json(
+ machine,
+ "127.0.0.1:50080",
+ IAM_PROTO_DIR,
+ IAM_PROTO,
+ "iam.v1.IamToken/IssueToken",
+ {
+ "principalId": principal_id,
+ "principalKind": "PRINCIPAL_KIND_SERVICE_ACCOUNT",
+ "scope": {
+ "project": {
+ "id": project_id,
+ "orgId": org_id,
+ }
+ },
+ "ttlSeconds": 3600,
+ },
+ ))
+ return token_response["token"]
+
+ def wait_for_backend_status(status, backend_id, token):
+ lb.wait_until_succeeds(
+ "grpcurl -plaintext "
+ f"-H {shlex.quote('authorization: Bearer ' + token)} "
+ f"-import-path {shlex.quote(FIBERLB_PROTO_DIR)} "
+ f"-proto {shlex.quote(FIBERLB_PROTO)} "
+ f"-d {shlex.quote(json.dumps({'id': backend_id}))} "
+ "127.0.0.1:50085 fiberlb.v1.BackendService/GetBackend "
+ f"| jq -e {shlex.quote(f'.backend.status == \"{status}\"')}"
+ )
+
+ def wait_for_route(machine, prefix, present):
+ command = "gobgp -u 127.0.0.1 -p 50051 global rib || true"
+ if present:
+ machine.wait_until_succeeds(f"{command} | grep -F {shlex.quote(prefix)}")
+ else:
+ deadline = time.time() + 60
+ while time.time() < deadline:
+ output = machine.succeed(command)
+ if prefix not in output:
+ return
+ time.sleep(1)
+ raise AssertionError(f"route {prefix} still present in GoBGP RIB")
+
+ def wait_for_local_vip(vip, present):
+ pattern = f"inet {vip}/32"
+ if present:
+ lb.wait_until_succeeds(
+ f"ip -4 addr show dev lo | grep -F {shlex.quote(pattern)}"
+ )
+ else:
+ deadline = time.time() + 60
+ while time.time() < deadline:
+ output = lb.succeed("ip -4 addr show dev lo || true")
+ if pattern not in output:
+ return
+ time.sleep(1)
+ raise AssertionError(f"VIP {vip} still present on loopback")
+
+ def wait_for_http_success(machine, url):
+ machine.wait_until_succeeds(
+ f"curl -fsS --max-time 5 {shlex.quote(url)} | grep -F 'fiberlb multipath backend'"
+ )
+
+ def parse_labels(label_blob):
+ if not label_blob:
+ return {}
+ labels = {}
+ for part in label_blob.split(","):
+ key, value = part.split("=", 1)
+ labels[key] = value.strip().strip('"')
+ return labels
+
+ def wait_for_metric(metric_name, expected_value, labels=None):
+ expected_labels = labels or {}
+ deadline = time.time() + 60
+
+ while time.time() < deadline:
+ exposition = lb.succeed("curl -fsS http://127.0.0.1:9098/metrics")
+ for line in exposition.splitlines():
+ line = line.strip()
+ if not line or line.startswith("#"):
+ continue
+ match = METRIC_RE.match(line)
+ if not match:
+ continue
+ name, label_blob, value = match.groups()
+ if name != metric_name:
+ continue
+ if parse_labels(label_blob) != expected_labels:
+ continue
+ if abs(float(value) - float(expected_value)) < 0.0001:
+ return
+ time.sleep(1)
+
+ raise AssertionError(
+ f"metric {metric_name} with labels={expected_labels} did not reach {expected_value}"
+ )
+
+ start_all()
+ serial_stdout_off()
+
+ router_a.wait_for_unit("gobgpd-peer.service")
+ router_a.wait_until_succeeds("ss -ltnH '( sport = :179 )' | grep -q LISTEN")
+ router_b.wait_for_unit("gobgpd-peer.service")
+ router_b.wait_until_succeeds("ss -ltnH '( sport = :179 )' | grep -q LISTEN")
+ lb.wait_for_unit("iam.service")
+ lb.wait_until_succeeds("ss -ltnH '( sport = :50080 )' | grep -q LISTEN")
+ lb.wait_for_unit("mock-backend.service")
+ lb.wait_for_unit("fiberlb.service")
+ lb.wait_until_succeeds("ss -ltnH '( sport = :50085 )' | grep -q LISTEN")
+ lb.wait_until_succeeds("ss -ltnH '( sport = :9098 )' | grep -q LISTEN")
+
+ router_a.wait_until_succeeds("gobgp -u 127.0.0.1 -p 50051 neighbor | grep -F 192.168.100.2")
+ router_b.wait_until_succeeds("gobgp -u 127.0.0.1 -p 50051 neighbor | grep -F 192.168.100.2")
+ wait_for_metric("fiberlb_bgp_configured_peers", 2)
+ wait_for_metric("fiberlb_bgp_connected_peers", 2)
+ wait_for_metric("fiberlb_bgp_peer_session_up", 1, {"peer": "192.168.100.1:179"})
+ wait_for_metric("fiberlb_bgp_peer_session_up", 1, {"peer": "192.168.100.3:179"})
+
+ token = issue_project_admin_token(lb, "bgp-multipath-org", "bgp-multipath-project")
+
+ lb_response = grpcurl_json(
+ lb,
+ "127.0.0.1:50085",
+ FIBERLB_PROTO_DIR,
+ FIBERLB_PROTO,
+ "fiberlb.v1.LoadBalancerService/CreateLoadBalancer",
+ {
+ "name": "bgp-multipath-lb",
+ "orgId": "bgp-multipath-org",
+ "projectId": "bgp-multipath-project",
+ "description": "native bgp multipath smoke",
+ },
+ headers=[f"authorization: Bearer {token}"],
+ )
+ loadbalancer = lb_response["loadbalancer"]
+ lb_id = loadbalancer["id"]
+ vip = loadbalancer["vipAddress"]
+ vip_prefix = f"{vip}/32"
+ listener_url = f"http://{vip}:18080/"
+
+ pool_id = grpcurl_json(
+ lb,
+ "127.0.0.1:50085",
+ FIBERLB_PROTO_DIR,
+ FIBERLB_PROTO,
+ "fiberlb.v1.PoolService/CreatePool",
+ {
+ "name": "bgp-multipath-pool",
+ "loadbalancerId": lb_id,
+ "algorithm": "POOL_ALGORITHM_ROUND_ROBIN",
+ "protocol": "POOL_PROTOCOL_TCP",
+ },
+ headers=[f"authorization: Bearer {token}"],
+ )["pool"]["id"]
+
+ backend_id = grpcurl_json(
+ lb,
+ "127.0.0.1:50085",
+ FIBERLB_PROTO_DIR,
+ FIBERLB_PROTO,
+ "fiberlb.v1.BackendService/CreateBackend",
+ {
+ "name": "bgp-multipath-backend",
+ "poolId": pool_id,
+ "address": "127.0.0.1",
+ "port": 18081,
+ "weight": 1,
+ },
+ headers=[f"authorization: Bearer {token}"],
+ )["backend"]["id"]
+
+ grpcurl_json(
+ lb,
+ "127.0.0.1:50085",
+ FIBERLB_PROTO_DIR,
+ FIBERLB_PROTO,
+ "fiberlb.v1.HealthCheckService/CreateHealthCheck",
+ {
+ "name": "bgp-multipath-health",
+ "poolId": pool_id,
+ "type": "HEALTH_CHECK_TYPE_HTTP",
+ "intervalSeconds": 1,
+ "timeoutSeconds": 1,
+ "healthyThreshold": 1,
+ "unhealthyThreshold": 1,
+ "httpConfig": {
+ "method": "GET",
+ "path": "/",
+ "expectedCodes": [200],
+ },
+ },
+ headers=[f"authorization: Bearer {token}"],
+ )
+
+ grpcurl_json(
+ lb,
+ "127.0.0.1:50085",
+ FIBERLB_PROTO_DIR,
+ FIBERLB_PROTO,
+ "fiberlb.v1.ListenerService/CreateListener",
+ {
+ "name": "bgp-multipath-listener",
+ "loadbalancerId": lb_id,
+ "protocol": "LISTENER_PROTOCOL_TCP",
+ "port": 18080,
+ "defaultPoolId": pool_id,
+ },
+ headers=[f"authorization: Bearer {token}"],
+ )
+
+ wait_for_backend_status("BACKEND_STATUS_ONLINE", backend_id, token)
+ wait_for_local_vip(vip, True)
+ wait_for_metric("fiberlb_bgp_desired_routes", 1)
+ wait_for_route(router_a, vip_prefix, True)
+ wait_for_route(router_b, vip_prefix, True)
+ router_a.succeed(f"ip route replace {shlex.quote(vip_prefix)} via 192.168.100.2 dev eth1")
+ router_b.succeed(f"ip route replace {shlex.quote(vip_prefix)} via 192.168.100.2 dev eth1")
+ wait_for_http_success(router_a, listener_url)
+ wait_for_http_success(router_b, listener_url)
+
+ router_a.succeed("systemctl stop gobgpd-peer.service")
+ wait_for_metric("fiberlb_bgp_connected_peers", 1)
+ wait_for_metric("fiberlb_bgp_peer_session_up", 0, {"peer": "192.168.100.1:179"})
+ wait_for_metric("fiberlb_bgp_peer_session_up", 1, {"peer": "192.168.100.3:179"})
+ wait_for_metric("fiberlb_bgp_desired_routes", 1)
+ wait_for_route(router_b, vip_prefix, True)
+ wait_for_http_success(router_b, listener_url)
+
+ router_a.succeed("systemctl start gobgpd-peer.service")
+ router_a.wait_for_unit("gobgpd-peer.service")
+ router_a.wait_until_succeeds("gobgp -u 127.0.0.1 -p 50051 neighbor | grep -F 192.168.100.2")
+ wait_for_metric("fiberlb_bgp_connected_peers", 2)
+ wait_for_metric("fiberlb_bgp_peer_session_up", 1, {"peer": "192.168.100.1:179"})
+ wait_for_route(router_a, vip_prefix, True)
+ wait_for_http_success(router_a, listener_url)
+
+ router_b.succeed("systemctl stop gobgpd-peer.service")
+ wait_for_metric("fiberlb_bgp_connected_peers", 1)
+ wait_for_metric("fiberlb_bgp_peer_session_up", 0, {"peer": "192.168.100.3:179"})
+ wait_for_metric("fiberlb_bgp_peer_session_up", 1, {"peer": "192.168.100.1:179"})
+ wait_for_metric("fiberlb_bgp_desired_routes", 1)
+ wait_for_route(router_a, vip_prefix, True)
+ wait_for_http_success(router_a, listener_url)
+
+ router_b.succeed("systemctl start gobgpd-peer.service")
+ router_b.wait_for_unit("gobgpd-peer.service")
+ router_b.wait_until_succeeds("gobgp -u 127.0.0.1 -p 50051 neighbor | grep -F 192.168.100.2")
+ wait_for_metric("fiberlb_bgp_connected_peers", 2)
+ wait_for_metric("fiberlb_bgp_peer_session_up", 1, {"peer": "192.168.100.3:179"})
+ wait_for_route(router_b, vip_prefix, True)
+ wait_for_http_success(router_b, listener_url)
+
+ lb.succeed("systemctl stop mock-backend.service")
+ wait_for_backend_status("BACKEND_STATUS_OFFLINE", backend_id, token)
+ wait_for_metric("fiberlb_bgp_desired_routes", 0)
+ wait_for_route(router_a, vip_prefix, False)
+ wait_for_route(router_b, vip_prefix, False)
+ wait_for_local_vip(vip, False)
+ router_a.fail(f"curl -fsS --max-time 3 {shlex.quote(listener_url)}")
+ router_b.fail(f"curl -fsS --max-time 3 {shlex.quote(listener_url)}")
+
+ lb.succeed("systemctl start mock-backend.service")
+ lb.wait_for_unit("mock-backend.service")
+ wait_for_backend_status("BACKEND_STATUS_ONLINE", backend_id, token)
+ wait_for_metric("fiberlb_bgp_desired_routes", 1)
+ wait_for_local_vip(vip, True)
+ wait_for_route(router_a, vip_prefix, True)
+ wait_for_route(router_b, vip_prefix, True)
+ wait_for_http_success(router_a, listener_url)
+ wait_for_http_success(router_b, listener_url)
+
+ lb.succeed("systemctl stop fiberlb.service")
+ wait_for_local_vip(vip, False)
+ wait_for_route(router_a, vip_prefix, False)
+ wait_for_route(router_b, vip_prefix, False)
+ router_a.fail(f"curl -fsS --max-time 3 {shlex.quote(listener_url)}")
+ router_b.fail(f"curl -fsS --max-time 3 {shlex.quote(listener_url)}")
+ '';
+}