photoncloud-monorepo/fiberlb/crates/fiberlb-server/src/bgp_client.rs
centra 67d4523adf
Some checks failed
Nix CI / filter (push) Failing after 1s
Nix CI / gate () (push) Has been skipped
Nix CI / gate (shared crates) (push) Has been skipped
Nix CI / build () (push) Has been skipped
Nix CI / ci-status (push) Failing after 1s
Strengthen FiberLB multi-peer BGP verification
2026-03-30 17:41:24 +09:00

1349 lines
42 KiB
Rust

//! Native BGP speaker for FiberLB VIP advertisement.
//!
//! This module owns the minimal eBGP control plane needed for FiberLB:
//! outbound TCP sessions to static peers, IPv4 unicast OPEN/KEEPALIVE/UPDATE,
//! and `/32` VIP advertise/withdraw driven by the VIP manager.
use std::collections::HashMap;
use std::net::{IpAddr, Ipv4Addr};
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::sync::{mpsc, watch, RwLock};
use tokio::time::sleep;
use tracing::{debug, info, warn};
use crate::config::{BgpConfig, BgpPeerConfig};
const BGP_MARKER: [u8; 16] = [0xff; 16];
const BGP_VERSION: u8 = 4;
const BGP_TYPE_OPEN: u8 = 1;
const BGP_TYPE_UPDATE: u8 = 2;
const BGP_TYPE_NOTIFICATION: u8 = 3;
const BGP_TYPE_KEEPALIVE: u8 = 4;
const BGP_MAX_MESSAGE_SIZE: usize = 4096;
const BGP_ORIGIN_IGP: u8 = 0;
const ATTR_FLAG_TRANSITIVE: u8 = 0x40;
const ATTR_FLAG_EXTENDED_LEN: u8 = 0x10;
const ATTR_TYPE_ORIGIN: u8 = 1;
const ATTR_TYPE_AS_PATH: u8 = 2;
const ATTR_TYPE_NEXT_HOP: u8 = 3;
const AS_PATH_SEGMENT_SEQUENCE: u8 = 2;
const METRIC_BGP_CONFIGURED_PEERS: &str = "fiberlb_bgp_configured_peers";
const METRIC_BGP_CONNECTED_PEERS: &str = "fiberlb_bgp_connected_peers";
const METRIC_BGP_DESIRED_ROUTES: &str = "fiberlb_bgp_desired_routes";
const METRIC_BGP_PEER_SESSION_UP: &str = "fiberlb_bgp_peer_session_up";
const METRIC_BGP_SESSION_ESTABLISHED_TOTAL: &str = "fiberlb_bgp_session_established_total";
const METRIC_BGP_SESSION_ENDS_TOTAL: &str = "fiberlb_bgp_session_ends_total";
/// Result type for BGP operations.
pub type Result<T> = std::result::Result<T, BgpError>;
/// BGP client errors.
#[derive(Debug, thiserror::Error)]
pub enum BgpError {
#[error("I/O error: {0}")]
Io(#[from] std::io::Error),
#[error("invalid configuration: {0}")]
Config(String),
#[error("invalid IP address: {0}")]
InvalidAddress(String),
#[error("connection failed: {0}")]
ConnectionFailed(String),
#[error("protocol error: {0}")]
Protocol(String),
#[error("unsupported operation: {0}")]
Unsupported(String),
}
/// BGP client trait for VIP advertisement.
#[tonic::async_trait]
pub trait BgpClient: Send + Sync {
/// Advertise a VIP route to BGP peers.
async fn announce_route(&self, prefix: IpAddr, next_hop: IpAddr) -> Result<()>;
/// Withdraw a VIP route from BGP peers.
async fn withdraw_route(&self, prefix: IpAddr) -> Result<()>;
/// Check if at least one peer session is established.
async fn is_connected(&self) -> bool;
}
/// A disabled client that keeps FiberLB runtime behavior uniform.
struct NullBgpClient;
#[tonic::async_trait]
impl BgpClient for NullBgpClient {
async fn announce_route(&self, _prefix: IpAddr, _next_hop: IpAddr) -> Result<()> {
Ok(())
}
async fn withdraw_route(&self, _prefix: IpAddr) -> Result<()> {
Ok(())
}
async fn is_connected(&self) -> bool {
false
}
}
#[derive(Debug)]
struct BgpSharedState {
desired_routes: RwLock<HashMap<Ipv4Addr, Ipv4Addr>>,
route_version: AtomicU64,
route_updates: watch::Sender<u64>,
connected_sessions: AtomicUsize,
}
/// Native outbound-only BGP speaker.
pub struct NativeBgpSpeaker {
shared: Arc<BgpSharedState>,
}
impl NativeBgpSpeaker {
/// Create a new native BGP speaker and spawn peer sessions.
pub async fn new(config: BgpConfig) -> Result<Self> {
validate_config(&config)?;
let (route_updates, _rx) = watch::channel(0u64);
let shared = Arc::new(BgpSharedState {
desired_routes: RwLock::new(HashMap::new()),
route_version: AtomicU64::new(0),
route_updates,
connected_sessions: AtomicUsize::new(0),
});
record_configured_peers(config.peers.len());
record_connected_peers(&shared);
record_desired_routes(0);
for peer in &config.peers {
set_peer_session_up(peer, false);
}
for peer in config.peers.clone() {
tokio::spawn(run_peer_loop(config.clone(), peer, shared.clone()));
}
Ok(Self { shared })
}
async fn update_route(&self, prefix: Ipv4Addr, next_hop: Ipv4Addr) {
let mut desired = self.shared.desired_routes.write().await;
let changed = desired.insert(prefix, next_hop) != Some(next_hop);
let route_count = desired.len();
drop(desired);
if changed {
record_desired_routes(route_count);
publish_route_change(&self.shared);
}
}
async fn remove_route(&self, prefix: Ipv4Addr) {
let mut desired = self.shared.desired_routes.write().await;
let changed = desired.remove(&prefix).is_some();
let route_count = desired.len();
drop(desired);
if changed {
record_desired_routes(route_count);
publish_route_change(&self.shared);
}
}
}
#[tonic::async_trait]
impl BgpClient for NativeBgpSpeaker {
async fn announce_route(&self, prefix: IpAddr, next_hop: IpAddr) -> Result<()> {
let prefix = to_ipv4(prefix, "VIP prefix")?;
let next_hop = to_ipv4(next_hop, "BGP next hop")?;
self.update_route(prefix, next_hop).await;
Ok(())
}
async fn withdraw_route(&self, prefix: IpAddr) -> Result<()> {
let prefix = to_ipv4(prefix, "VIP prefix")?;
self.remove_route(prefix).await;
Ok(())
}
async fn is_connected(&self) -> bool {
self.shared.connected_sessions.load(Ordering::Relaxed) > 0
}
}
/// Create a BGP client from configuration.
pub async fn create_bgp_client(config: BgpConfig) -> Result<Arc<dyn BgpClient>> {
if !config.enabled {
info!("FiberLB native BGP speaker disabled");
return Ok(Arc::new(NullBgpClient));
}
let speaker = NativeBgpSpeaker::new(config).await?;
Ok(Arc::new(speaker))
}
async fn run_peer_loop(config: BgpConfig, peer: BgpPeerConfig, shared: Arc<BgpSharedState>) {
let peer_name = peer_name(&peer);
let peer_label = peer_metric_label(&peer);
let connect_retry = Duration::from_secs(config.connect_retry_secs.max(1));
loop {
match establish_peer_session(&config, &peer, shared.clone()).await {
Ok(()) => {
metrics::counter!(
METRIC_BGP_SESSION_ENDS_TOTAL,
"peer" => peer_label.clone(),
"result" => "clean",
)
.increment(1);
warn!(peer = %peer_name, "BGP peer session ended cleanly; reconnecting");
}
Err(error) => {
metrics::counter!(
METRIC_BGP_SESSION_ENDS_TOTAL,
"peer" => peer_label.clone(),
"result" => "error",
)
.increment(1);
warn!(peer = %peer_name, error = %error, "BGP peer session failed");
}
}
sleep(connect_retry).await;
}
}
async fn establish_peer_session(
config: &BgpConfig,
peer: &BgpPeerConfig,
shared: Arc<BgpSharedState>,
) -> Result<()> {
let endpoint = format!("{}:{}", peer.address, peer.port);
info!(
peer = %peer_name(peer),
local_as = config.local_as,
router_id = %config.router_id,
"Connecting FiberLB BGP session",
);
let mut stream = TcpStream::connect(&endpoint)
.await
.map_err(|error| BgpError::ConnectionFailed(format!("{} ({})", endpoint, error)))?;
stream.set_nodelay(true)?;
let handshake = perform_handshake(&mut stream, config, peer).await?;
let keepalive_interval =
negotiated_keepalive_interval(config.keepalive_secs, handshake.hold_time);
let hold_deadline = if handshake.hold_time == 0 {
None
} else {
Some(Duration::from_secs(handshake.hold_time as u64))
};
info!(
peer = %peer_name(peer),
hold_time_secs = handshake.hold_time,
keepalive_secs = keepalive_interval.as_secs(),
peer_router_id = %handshake.router_id,
"FiberLB BGP session established",
);
metrics::counter!(
METRIC_BGP_SESSION_ESTABLISHED_TOTAL,
"peer" => peer_metric_label(peer),
)
.increment(1);
set_peer_session_up(peer, true);
shared.connected_sessions.fetch_add(1, Ordering::Relaxed);
record_connected_peers(&shared);
let session_result = run_established_session(
stream,
config,
peer,
shared.clone(),
keepalive_interval,
hold_deadline,
)
.await;
shared.connected_sessions.fetch_sub(1, Ordering::Relaxed);
record_connected_peers(&shared);
set_peer_session_up(peer, false);
session_result
}
async fn perform_handshake(
stream: &mut TcpStream,
config: &BgpConfig,
peer: &BgpPeerConfig,
) -> Result<OpenMessage> {
let local_as = u16::try_from(config.local_as)
.map_err(|_| BgpError::Config(format!("local AS {} is out of range", config.local_as)))?;
let router_id = config.router_id_addr().map_err(|error| {
BgpError::InvalidAddress(format!(
"invalid router_id '{}': {}",
config.router_id, error
))
})?;
send_open(stream, local_as, config.hold_time_secs, router_id).await?;
let remote_open = match read_bgp_message(stream).await? {
BgpMessage::Open(open) => open,
message => {
return Err(BgpError::Protocol(format!(
"expected OPEN from {}, received {:?}",
peer_name(peer),
message
)));
}
};
let peer_as = u16::try_from(peer.asn)
.map_err(|_| BgpError::Config(format!("peer AS {} is out of range", peer.asn)))?;
if remote_open.asn != peer_as {
return Err(BgpError::Protocol(format!(
"peer {} reported AS {}, expected {}",
peer_name(peer),
remote_open.asn,
peer_as
)));
}
send_keepalive(stream).await?;
loop {
match read_bgp_message(stream).await? {
BgpMessage::Keepalive => break,
BgpMessage::Notification(notification) => {
return Err(BgpError::Protocol(format!(
"peer {} rejected OPEN with notification {}:{}",
peer_name(peer),
notification.code,
notification.subcode
)));
}
BgpMessage::Update(_) => {
debug!(peer = %peer_name(peer), "Ignoring UPDATE during handshake");
}
BgpMessage::Unknown { msg_type, .. } => {
debug!(peer = %peer_name(peer), msg_type, "Ignoring unknown BGP message during handshake");
}
BgpMessage::Open(_) => {
return Err(BgpError::Protocol(format!(
"peer {} sent unexpected OPEN after OPEN exchange",
peer_name(peer)
)));
}
}
}
Ok(remote_open)
}
async fn run_established_session(
stream: TcpStream,
config: &BgpConfig,
peer: &BgpPeerConfig,
shared: Arc<BgpSharedState>,
keepalive_interval: Duration,
hold_deadline: Option<Duration>,
) -> Result<()> {
let local_as = u16::try_from(config.local_as)
.map_err(|_| BgpError::Config(format!("local AS {} is out of range", config.local_as)))?;
let (mut reader, mut writer) = stream.into_split();
let (event_tx, mut event_rx) = mpsc::channel(16);
let peer_name = peer_name(peer);
let reader_task = tokio::spawn(async move {
loop {
let message = read_bgp_message(&mut reader).await;
let terminal = matches!(message, Err(_) | Ok(BgpMessage::Notification(_)));
if event_tx.send(message).await.is_err() {
break;
}
if terminal {
break;
}
}
});
let mut keepalive = tokio::time::interval(keepalive_interval);
keepalive.tick().await;
let mut hold_monitor = tokio::time::interval(Duration::from_secs(1));
hold_monitor.tick().await;
let mut route_updates = shared.route_updates.subscribe();
let mut advertised = HashMap::new();
reconcile_routes(&shared, &mut writer, &mut advertised, local_as).await?;
let mut last_rx = Instant::now();
loop {
tokio::select! {
maybe_message = event_rx.recv() => {
let Some(message) = maybe_message else {
reader_task.abort();
return Err(BgpError::ConnectionFailed(format!("peer {} closed the session", peer_name)));
};
match message {
Ok(BgpMessage::Keepalive) => {
last_rx = Instant::now();
}
Ok(BgpMessage::Update(update)) => {
last_rx = Instant::now();
debug!(
peer = %peer_name,
announced = update.announced_routes.len(),
withdrawn = update.withdrawn_routes.len(),
next_hop = ?update.next_hop,
as_path = ?update.as_path,
"Ignoring inbound UPDATE from peer"
);
}
Ok(BgpMessage::Notification(notification)) => {
reader_task.abort();
return Err(BgpError::Protocol(format!(
"peer {} sent notification {}:{} ({} data bytes)",
peer_name,
notification.code,
notification.subcode,
notification.data.len()
)));
}
Ok(BgpMessage::Unknown { msg_type, payload }) => {
last_rx = Instant::now();
debug!(
peer = %peer_name,
msg_type,
payload_len = payload.len(),
"Ignoring unsupported BGP message"
);
}
Ok(BgpMessage::Open(_)) => {
reader_task.abort();
return Err(BgpError::Protocol(format!(
"peer {} sent OPEN after session establishment",
peer_name
)));
}
Err(error) => {
reader_task.abort();
return Err(error);
}
}
}
_ = keepalive.tick() => {
send_keepalive(&mut writer).await?;
}
changed = route_updates.changed() => {
if changed.is_err() {
reader_task.abort();
return Err(BgpError::ConnectionFailed(format!("peer {} route update channel closed", peer_name)));
}
route_updates.borrow_and_update();
reconcile_routes(&shared, &mut writer, &mut advertised, local_as).await?;
}
_ = hold_monitor.tick(), if hold_deadline.is_some() => {
if last_rx.elapsed() >= hold_deadline.unwrap() {
reader_task.abort();
return Err(BgpError::Protocol(format!("peer {} hold timer expired", peer_name)));
}
}
}
}
}
async fn reconcile_routes<W: AsyncWrite + Unpin>(
shared: &BgpSharedState,
writer: &mut W,
advertised: &mut HashMap<Ipv4Addr, Ipv4Addr>,
local_as: u16,
) -> Result<()> {
let desired = shared.desired_routes.read().await.clone();
let stale: Vec<Ipv4Addr> = advertised
.keys()
.filter(|prefix| !desired.contains_key(prefix))
.copied()
.collect();
for prefix in stale {
send_withdraw(writer, prefix).await?;
advertised.remove(&prefix);
}
for (prefix, next_hop) in desired {
if advertised.get(&prefix) == Some(&next_hop) {
continue;
}
send_announce(writer, prefix, next_hop, local_as).await?;
advertised.insert(prefix, next_hop);
}
Ok(())
}
fn publish_route_change(shared: &BgpSharedState) {
let version = shared.route_version.fetch_add(1, Ordering::Relaxed) + 1;
let _ = shared.route_updates.send(version);
}
fn validate_config(config: &BgpConfig) -> Result<()> {
if !config.enabled {
return Ok(());
}
if config.peers.is_empty() {
return Err(BgpError::Config(
"BGP is enabled but no peers are configured".to_string(),
));
}
if config.local_as == 0 || config.local_as > u16::MAX as u32 {
return Err(BgpError::Config(format!(
"FiberLB native BGP currently supports 16-bit ASNs only (got {})",
config.local_as
)));
}
config.router_id_addr().map_err(|error| {
BgpError::InvalidAddress(format!(
"invalid router_id '{}': {}",
config.router_id, error
))
})?;
let next_hop = config.next_hop_addr().map_err(|error| {
let rendered = config.next_hop.as_deref().unwrap_or(&config.router_id);
BgpError::InvalidAddress(format!("invalid BGP next hop '{}': {}", rendered, error))
})?;
to_ipv4(next_hop, "BGP next hop")?;
if config.hold_time_secs != 0 && config.hold_time_secs < 3 {
return Err(BgpError::Config(
"hold_time_secs must be 0 or at least 3 seconds".to_string(),
));
}
if config.keepalive_secs == 0 {
return Err(BgpError::Config(
"keepalive_secs must be at least 1 second".to_string(),
));
}
for peer in &config.peers {
if peer.address.trim().is_empty() {
return Err(BgpError::Config(
"BGP peer address must not be empty".to_string(),
));
}
if peer.port == 0 {
return Err(BgpError::Config(format!(
"BGP peer {} has invalid TCP port 0",
peer_name(peer)
)));
}
if peer.asn == 0 || peer.asn > u16::MAX as u32 {
return Err(BgpError::Config(format!(
"FiberLB native BGP currently supports 16-bit peer ASNs only (peer {} has AS {})",
peer_name(peer),
peer.asn
)));
}
if peer.asn == config.local_as {
return Err(BgpError::Config(format!(
"FiberLB native BGP currently supports eBGP only; peer {} uses the same AS {}",
peer_name(peer),
peer.asn
)));
}
}
Ok(())
}
fn peer_name(peer: &BgpPeerConfig) -> String {
if peer.description.trim().is_empty() {
format!("{}:{}", peer.address, peer.port)
} else {
format!("{} ({}:{})", peer.description, peer.address, peer.port)
}
}
fn peer_metric_label(peer: &BgpPeerConfig) -> String {
format!("{}:{}", peer.address, peer.port)
}
fn record_configured_peers(count: usize) {
metrics::gauge!(METRIC_BGP_CONFIGURED_PEERS).set(count as f64);
}
fn record_connected_peers(shared: &BgpSharedState) {
metrics::gauge!(METRIC_BGP_CONNECTED_PEERS)
.set(shared.connected_sessions.load(Ordering::Relaxed) as f64);
}
fn record_desired_routes(count: usize) {
metrics::gauge!(METRIC_BGP_DESIRED_ROUTES).set(count as f64);
}
fn set_peer_session_up(peer: &BgpPeerConfig, up: bool) {
metrics::gauge!(METRIC_BGP_PEER_SESSION_UP, "peer" => peer_metric_label(peer))
.set(if up { 1.0 } else { 0.0 });
}
fn negotiated_keepalive_interval(
requested_keepalive_secs: u16,
negotiated_hold_time_secs: u16,
) -> Duration {
let keepalive_secs = if negotiated_hold_time_secs == 0 {
requested_keepalive_secs.max(1)
} else {
requested_keepalive_secs
.max(1)
.min((negotiated_hold_time_secs / 3).max(1))
};
Duration::from_secs(keepalive_secs as u64)
}
fn to_ipv4(addr: IpAddr, label: &str) -> Result<Ipv4Addr> {
match addr {
IpAddr::V4(addr) => Ok(addr),
IpAddr::V6(_) => Err(BgpError::Unsupported(format!(
"{} must be IPv4 for native BGP speaker",
label
))),
}
}
async fn send_open<W: AsyncWrite + Unpin>(
writer: &mut W,
local_as: u16,
hold_time_secs: u16,
router_id: Ipv4Addr,
) -> Result<()> {
let mut payload = Vec::with_capacity(10);
payload.push(BGP_VERSION);
payload.extend_from_slice(&local_as.to_be_bytes());
payload.extend_from_slice(&hold_time_secs.to_be_bytes());
payload.extend_from_slice(&router_id.octets());
payload.push(0); // no optional parameters
write_bgp_message(writer, BGP_TYPE_OPEN, &payload).await
}
async fn send_keepalive<W: AsyncWrite + Unpin>(writer: &mut W) -> Result<()> {
write_bgp_message(writer, BGP_TYPE_KEEPALIVE, &[]).await
}
async fn send_announce<W: AsyncWrite + Unpin>(
writer: &mut W,
prefix: Ipv4Addr,
next_hop: Ipv4Addr,
local_as: u16,
) -> Result<()> {
let mut path_attributes = Vec::new();
path_attributes.extend_from_slice(&[ATTR_FLAG_TRANSITIVE, ATTR_TYPE_ORIGIN, 1, BGP_ORIGIN_IGP]);
path_attributes.extend_from_slice(&[
ATTR_FLAG_TRANSITIVE,
ATTR_TYPE_AS_PATH,
4,
AS_PATH_SEGMENT_SEQUENCE,
1,
]);
path_attributes.extend_from_slice(&local_as.to_be_bytes());
path_attributes.extend_from_slice(&[ATTR_FLAG_TRANSITIVE, ATTR_TYPE_NEXT_HOP, 4]);
path_attributes.extend_from_slice(&next_hop.octets());
let nlri = encode_ipv4_prefix(prefix);
let mut payload = Vec::with_capacity(2 + 2 + path_attributes.len() + nlri.len());
payload.extend_from_slice(&0u16.to_be_bytes());
payload.extend_from_slice(&(path_attributes.len() as u16).to_be_bytes());
payload.extend_from_slice(&path_attributes);
payload.extend_from_slice(&nlri);
write_bgp_message(writer, BGP_TYPE_UPDATE, &payload).await
}
async fn send_withdraw<W: AsyncWrite + Unpin>(writer: &mut W, prefix: Ipv4Addr) -> Result<()> {
let withdrawn = encode_ipv4_prefix(prefix);
let mut payload = Vec::with_capacity(2 + withdrawn.len() + 2);
payload.extend_from_slice(&(withdrawn.len() as u16).to_be_bytes());
payload.extend_from_slice(&withdrawn);
payload.extend_from_slice(&0u16.to_be_bytes());
write_bgp_message(writer, BGP_TYPE_UPDATE, &payload).await
}
fn encode_ipv4_prefix(prefix: Ipv4Addr) -> Vec<u8> {
let mut bytes = Vec::with_capacity(5);
bytes.push(32);
bytes.extend_from_slice(&prefix.octets());
bytes
}
async fn write_bgp_message<W: AsyncWrite + Unpin>(
writer: &mut W,
msg_type: u8,
payload: &[u8],
) -> Result<()> {
let length = 19 + payload.len();
if length > BGP_MAX_MESSAGE_SIZE {
return Err(BgpError::Protocol(format!(
"BGP message too large: {} bytes",
length
)));
}
let mut header = Vec::with_capacity(19);
header.extend_from_slice(&BGP_MARKER);
header.extend_from_slice(&(length as u16).to_be_bytes());
header.push(msg_type);
writer.write_all(&header).await?;
writer.write_all(payload).await?;
writer.flush().await?;
Ok(())
}
async fn read_bgp_message<R: AsyncRead + Unpin>(reader: &mut R) -> Result<BgpMessage> {
let mut header = [0u8; 19];
reader.read_exact(&mut header).await?;
if header[..16] != BGP_MARKER {
return Err(BgpError::Protocol("invalid BGP marker".to_string()));
}
let length = u16::from_be_bytes([header[16], header[17]]) as usize;
if !(19..=BGP_MAX_MESSAGE_SIZE).contains(&length) {
return Err(BgpError::Protocol(format!(
"invalid BGP message length {}",
length
)));
}
let msg_type = header[18];
let body_len = length - 19;
let mut body = vec![0u8; body_len];
if body_len > 0 {
reader.read_exact(&mut body).await?;
}
match msg_type {
BGP_TYPE_OPEN => Ok(BgpMessage::Open(parse_open_message(&body)?)),
BGP_TYPE_UPDATE => Ok(BgpMessage::Update(parse_update_message(&body)?)),
BGP_TYPE_NOTIFICATION => Ok(BgpMessage::Notification(parse_notification_message(&body)?)),
BGP_TYPE_KEEPALIVE => {
if !body.is_empty() {
return Err(BgpError::Protocol(
"KEEPALIVE message must not carry a payload".to_string(),
));
}
Ok(BgpMessage::Keepalive)
}
_ => Ok(BgpMessage::Unknown {
msg_type,
payload: body,
}),
}
}
fn parse_open_message(body: &[u8]) -> Result<OpenMessage> {
if body.len() < 10 {
return Err(BgpError::Protocol(format!(
"OPEN message too short: {} bytes",
body.len()
)));
}
if body[0] != BGP_VERSION {
return Err(BgpError::Protocol(format!(
"unsupported BGP version {}",
body[0]
)));
}
let optional_len = body[9] as usize;
if body.len() != 10 + optional_len {
return Err(BgpError::Protocol(format!(
"OPEN optional parameter length mismatch: body={}, optional={}",
body.len(),
optional_len
)));
}
Ok(OpenMessage {
asn: u16::from_be_bytes([body[1], body[2]]),
hold_time: u16::from_be_bytes([body[3], body[4]]),
router_id: Ipv4Addr::new(body[5], body[6], body[7], body[8]),
})
}
fn parse_notification_message(body: &[u8]) -> Result<NotificationMessage> {
if body.len() < 2 {
return Err(BgpError::Protocol(format!(
"NOTIFICATION message too short: {} bytes",
body.len()
)));
}
Ok(NotificationMessage {
code: body[0],
subcode: body[1],
data: body[2..].to_vec(),
})
}
fn parse_update_message(body: &[u8]) -> Result<UpdateMessage> {
if body.len() < 4 {
return Err(BgpError::Protocol(format!(
"UPDATE message too short: {} bytes",
body.len()
)));
}
let withdrawn_len = u16::from_be_bytes([body[0], body[1]]) as usize;
if body.len() < 2 + withdrawn_len + 2 {
return Err(BgpError::Protocol(
"UPDATE withdrawn-routes section exceeds message length".to_string(),
));
}
let mut cursor = 2usize;
let withdrawn = parse_prefix_list(&body[cursor..cursor + withdrawn_len])?;
cursor += withdrawn_len;
let attrs_len = u16::from_be_bytes([body[cursor], body[cursor + 1]]) as usize;
cursor += 2;
if body.len() < cursor + attrs_len {
return Err(BgpError::Protocol(
"UPDATE path attribute section exceeds message length".to_string(),
));
}
let attrs = &body[cursor..cursor + attrs_len];
cursor += attrs_len;
let announced = parse_prefix_list(&body[cursor..])?;
let (next_hop, as_path) = parse_path_attributes(attrs)?;
Ok(UpdateMessage {
withdrawn_routes: withdrawn,
announced_routes: announced,
next_hop,
as_path,
})
}
fn parse_prefix_list(data: &[u8]) -> Result<Vec<Ipv4Addr>> {
let mut prefixes = Vec::new();
let mut cursor = 0usize;
while cursor < data.len() {
let prefix_len = data[cursor] as usize;
cursor += 1;
if prefix_len > 32 {
return Err(BgpError::Protocol(format!(
"unsupported IPv4 prefix length {}",
prefix_len
)));
}
let octet_len = prefix_len.div_ceil(8);
if data.len() < cursor + octet_len {
return Err(BgpError::Protocol(
"prefix length exceeds UPDATE payload".to_string(),
));
}
let mut octets = [0u8; 4];
octets[..octet_len].copy_from_slice(&data[cursor..cursor + octet_len]);
cursor += octet_len;
if prefix_len != 32 {
return Err(BgpError::Protocol(format!(
"FiberLB native BGP currently supports /32 only (got /{})",
prefix_len
)));
}
prefixes.push(Ipv4Addr::from(octets));
}
Ok(prefixes)
}
fn parse_path_attributes(attrs: &[u8]) -> Result<(Option<Ipv4Addr>, Vec<u16>)> {
let mut cursor = 0usize;
let mut next_hop = None;
let mut as_path = Vec::new();
while cursor < attrs.len() {
if attrs.len() < cursor + 3 {
return Err(BgpError::Protocol(
"path attribute header truncated".to_string(),
));
}
let flags = attrs[cursor];
let attr_type = attrs[cursor + 1];
cursor += 2;
let (attr_len, len_width) = if flags & ATTR_FLAG_EXTENDED_LEN != 0 {
if attrs.len() < cursor + 2 {
return Err(BgpError::Protocol(
"extended path attribute length truncated".to_string(),
));
}
(
u16::from_be_bytes([attrs[cursor], attrs[cursor + 1]]) as usize,
2usize,
)
} else {
(attrs[cursor] as usize, 1usize)
};
cursor += len_width;
if attrs.len() < cursor + attr_len {
return Err(BgpError::Protocol(
"path attribute value truncated".to_string(),
));
}
let value = &attrs[cursor..cursor + attr_len];
cursor += attr_len;
match attr_type {
ATTR_TYPE_NEXT_HOP => {
if value.len() == 4 {
next_hop = Some(Ipv4Addr::new(value[0], value[1], value[2], value[3]));
}
}
ATTR_TYPE_AS_PATH => {
as_path = parse_as_path(value)?;
}
_ => {}
}
}
Ok((next_hop, as_path))
}
fn parse_as_path(data: &[u8]) -> Result<Vec<u16>> {
let mut cursor = 0usize;
let mut path = Vec::new();
while cursor < data.len() {
if data.len() < cursor + 2 {
return Err(BgpError::Protocol("AS_PATH segment truncated".to_string()));
}
let _segment_type = data[cursor];
let segment_len = data[cursor + 1] as usize;
cursor += 2;
let bytes_len = segment_len * 2;
if data.len() < cursor + bytes_len {
return Err(BgpError::Protocol("AS_PATH payload truncated".to_string()));
}
for index in 0..segment_len {
let start = cursor + (index * 2);
path.push(u16::from_be_bytes([data[start], data[start + 1]]));
}
cursor += bytes_len;
}
Ok(path)
}
#[derive(Debug)]
enum BgpMessage {
Open(OpenMessage),
Update(UpdateMessage),
Notification(NotificationMessage),
Keepalive,
Unknown { msg_type: u8, payload: Vec<u8> },
}
#[derive(Debug)]
struct OpenMessage {
asn: u16,
hold_time: u16,
router_id: Ipv4Addr,
}
#[derive(Debug)]
struct UpdateMessage {
withdrawn_routes: Vec<Ipv4Addr>,
announced_routes: Vec<Ipv4Addr>,
next_hop: Option<Ipv4Addr>,
as_path: Vec<u16>,
}
#[derive(Debug)]
struct NotificationMessage {
code: u8,
subcode: u8,
data: Vec<u8>,
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::net::TcpListener;
use tokio::sync::{mpsc, oneshot};
use tokio::time::timeout;
#[tokio::test]
async fn test_disabled_bgp_client_is_noop() {
let client = create_bgp_client(BgpConfig::default()).await.unwrap();
let vip: IpAddr = "203.0.113.10".parse().unwrap();
let next_hop: IpAddr = "192.0.2.10".parse().unwrap();
assert!(client.announce_route(vip, next_hop).await.is_ok());
assert!(client.withdraw_route(vip).await.is_ok());
assert!(!client.is_connected().await);
}
#[test]
fn test_validate_config_rejects_same_as_peer() {
let config = BgpConfig {
enabled: true,
local_as: 65010,
peers: vec![BgpPeerConfig {
address: "127.0.0.1".to_string(),
port: 179,
asn: 65010,
description: String::new(),
}],
..BgpConfig::default()
};
let error = validate_config(&config).unwrap_err();
assert!(error.to_string().contains("currently supports eBGP only"));
}
#[tokio::test]
async fn test_native_speaker_announces_and_withdraws_routes() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
let (announced_tx, announced_rx) = oneshot::channel();
let peer_task = tokio::spawn(async move {
let (mut socket, _) = listener.accept().await.unwrap();
let open = match read_bgp_message(&mut socket).await.unwrap() {
BgpMessage::Open(open) => open,
other => panic!("expected OPEN, got {:?}", other),
};
assert_eq!(open.asn, 65010);
assert_eq!(open.router_id, "192.0.2.10".parse::<Ipv4Addr>().unwrap());
send_open(
&mut socket,
65020,
90,
"192.0.2.20".parse::<Ipv4Addr>().unwrap(),
)
.await
.unwrap();
match read_bgp_message(&mut socket).await.unwrap() {
BgpMessage::Keepalive => {}
other => panic!("expected KEEPALIVE, got {:?}", other),
}
send_keepalive(&mut socket).await.unwrap();
let announcement = match read_bgp_message(&mut socket).await.unwrap() {
BgpMessage::Update(update) => update,
other => panic!("expected UPDATE announce, got {:?}", other),
};
assert_eq!(announcement.withdrawn_routes, Vec::<Ipv4Addr>::new());
assert_eq!(
announcement.announced_routes,
vec!["203.0.113.10".parse::<Ipv4Addr>().unwrap()]
);
assert_eq!(
announcement.next_hop,
Some("192.0.2.10".parse::<Ipv4Addr>().unwrap())
);
assert_eq!(announcement.as_path, vec![65010]);
let _ = announced_tx.send(());
let withdrawal = match read_bgp_message(&mut socket).await.unwrap() {
BgpMessage::Update(update) => update,
other => panic!("expected UPDATE withdraw, got {:?}", other),
};
assert_eq!(
withdrawal.withdrawn_routes,
vec!["203.0.113.10".parse::<Ipv4Addr>().unwrap()]
);
assert_eq!(withdrawal.announced_routes, Vec::<Ipv4Addr>::new());
});
let client = create_bgp_client(BgpConfig {
enabled: true,
local_as: 65010,
router_id: "192.0.2.10".to_string(),
peers: vec![BgpPeerConfig {
address: "127.0.0.1".to_string(),
port,
asn: 65020,
description: "test-peer".to_string(),
}],
..BgpConfig::default()
})
.await
.unwrap();
for _ in 0..20 {
if client.is_connected().await {
break;
}
sleep(Duration::from_millis(50)).await;
}
assert!(client.is_connected().await);
let vip: IpAddr = "203.0.113.10".parse().unwrap();
let next_hop: IpAddr = "192.0.2.10".parse().unwrap();
client.announce_route(vip, next_hop).await.unwrap();
announced_rx.await.unwrap();
client.withdraw_route(vip).await.unwrap();
peer_task.await.unwrap();
}
#[tokio::test]
async fn test_native_speaker_resyncs_routes_across_multiple_peers() {
let listener_a = TcpListener::bind("127.0.0.1:0").await.unwrap();
let listener_b = TcpListener::bind("127.0.0.1:0").await.unwrap();
let port_a = listener_a.local_addr().unwrap().port();
let port_b = listener_b.local_addr().unwrap().port();
let (peer_a_events_tx, mut peer_a_events_rx) = mpsc::channel(4);
let (peer_b_events_tx, mut peer_b_events_rx) = mpsc::channel(4);
let peer_a_task = tokio::spawn(async move {
let (mut first_socket, _) = listener_a.accept().await.unwrap();
complete_test_peer_handshake(
&mut first_socket,
65010,
"192.0.2.10".parse().unwrap(),
65020,
"192.0.2.20".parse().unwrap(),
)
.await;
let first_announcement = expect_update(&mut first_socket).await;
assert_eq!(
first_announcement.announced_routes,
vec!["203.0.113.10".parse::<Ipv4Addr>().unwrap()]
);
peer_a_events_tx.send("first-announcement").await.unwrap();
drop(first_socket);
let (mut second_socket, _) = listener_a.accept().await.unwrap();
complete_test_peer_handshake(
&mut second_socket,
65010,
"192.0.2.10".parse().unwrap(),
65020,
"192.0.2.20".parse().unwrap(),
)
.await;
let second_announcement = expect_update(&mut second_socket).await;
assert_eq!(
second_announcement.announced_routes,
vec!["203.0.113.10".parse::<Ipv4Addr>().unwrap()]
);
peer_a_events_tx.send("second-announcement").await.unwrap();
let withdrawal = expect_update(&mut second_socket).await;
assert_eq!(
withdrawal.withdrawn_routes,
vec!["203.0.113.10".parse::<Ipv4Addr>().unwrap()]
);
peer_a_events_tx.send("withdrawal").await.unwrap();
});
let peer_b_task = tokio::spawn(async move {
let (mut socket, _) = listener_b.accept().await.unwrap();
complete_test_peer_handshake(
&mut socket,
65010,
"192.0.2.10".parse().unwrap(),
65030,
"192.0.2.30".parse().unwrap(),
)
.await;
let announcement = expect_update(&mut socket).await;
assert_eq!(
announcement.announced_routes,
vec!["203.0.113.10".parse::<Ipv4Addr>().unwrap()]
);
peer_b_events_tx.send("announcement").await.unwrap();
let withdrawal = expect_update(&mut socket).await;
assert_eq!(
withdrawal.withdrawn_routes,
vec!["203.0.113.10".parse::<Ipv4Addr>().unwrap()]
);
peer_b_events_tx.send("withdrawal").await.unwrap();
});
let client = create_bgp_client(BgpConfig {
enabled: true,
local_as: 65010,
router_id: "192.0.2.10".to_string(),
connect_retry_secs: 1,
peers: vec![
BgpPeerConfig {
address: "127.0.0.1".to_string(),
port: port_a,
asn: 65020,
description: "peer-a".to_string(),
},
BgpPeerConfig {
address: "127.0.0.1".to_string(),
port: port_b,
asn: 65030,
description: "peer-b".to_string(),
},
],
..BgpConfig::default()
})
.await
.unwrap();
let vip: IpAddr = "203.0.113.10".parse().unwrap();
let next_hop: IpAddr = "192.0.2.10".parse().unwrap();
client.announce_route(vip, next_hop).await.unwrap();
assert_eq!(
timeout(Duration::from_secs(5), peer_a_events_rx.recv())
.await
.unwrap()
.unwrap(),
"first-announcement"
);
assert_eq!(
timeout(Duration::from_secs(5), peer_b_events_rx.recv())
.await
.unwrap()
.unwrap(),
"announcement"
);
assert_eq!(
timeout(Duration::from_secs(5), peer_a_events_rx.recv())
.await
.unwrap()
.unwrap(),
"second-announcement"
);
client.withdraw_route(vip).await.unwrap();
assert_eq!(
timeout(Duration::from_secs(5), peer_b_events_rx.recv())
.await
.unwrap()
.unwrap(),
"withdrawal"
);
assert_eq!(
timeout(Duration::from_secs(5), peer_a_events_rx.recv())
.await
.unwrap()
.unwrap(),
"withdrawal"
);
peer_a_task.await.unwrap();
peer_b_task.await.unwrap();
}
async fn complete_test_peer_handshake(
socket: &mut TcpStream,
expected_local_as: u16,
expected_router_id: Ipv4Addr,
peer_as: u16,
peer_router_id: Ipv4Addr,
) {
let open = match read_bgp_message(socket).await.unwrap() {
BgpMessage::Open(open) => open,
other => panic!("expected OPEN, got {:?}", other),
};
assert_eq!(open.asn, expected_local_as);
assert_eq!(open.router_id, expected_router_id);
send_open(socket, peer_as, 90, peer_router_id).await.unwrap();
match read_bgp_message(socket).await.unwrap() {
BgpMessage::Keepalive => {}
other => panic!("expected KEEPALIVE, got {:?}", other),
}
send_keepalive(socket).await.unwrap();
}
async fn expect_update(socket: &mut TcpStream) -> UpdateMessage {
match read_bgp_message(socket).await.unwrap() {
BgpMessage::Update(update) => update,
other => panic!("expected UPDATE, got {:?}", other),
}
}
#[test]
fn test_parse_update_message_extracts_routes() {
let mut attrs = Vec::new();
attrs.extend_from_slice(&[
ATTR_FLAG_TRANSITIVE,
ATTR_TYPE_ORIGIN,
1,
BGP_ORIGIN_IGP,
ATTR_FLAG_TRANSITIVE,
ATTR_TYPE_AS_PATH,
4,
AS_PATH_SEGMENT_SEQUENCE,
1,
]);
attrs.extend_from_slice(&65010u16.to_be_bytes());
attrs.extend_from_slice(&[ATTR_FLAG_TRANSITIVE, ATTR_TYPE_NEXT_HOP, 4, 192, 0, 2, 10]);
let mut payload = Vec::new();
payload.extend_from_slice(&0u16.to_be_bytes());
payload.extend_from_slice(&(attrs.len() as u16).to_be_bytes());
payload.extend_from_slice(&attrs);
payload.extend_from_slice(&encode_ipv4_prefix("203.0.113.7".parse().unwrap()));
let update = parse_update_message(&payload).unwrap();
assert_eq!(
update.announced_routes,
vec!["203.0.113.7".parse::<Ipv4Addr>().unwrap()]
);
assert_eq!(update.as_path, vec![65010]);
assert_eq!(
update.next_hop,
Some("192.0.2.10".parse::<Ipv4Addr>().unwrap())
);
}
}