//! FiberLB Controller - Manages LoadBalancer service VIP allocation //! //! This controller watches for Services with type=LoadBalancer and provisions //! external VIPs by creating LoadBalancer resources in FiberLB. use crate::storage::Storage; use anyhow::Result; use fiberlb_api::backend_service_client::BackendServiceClient; use fiberlb_api::listener_service_client::ListenerServiceClient; use fiberlb_api::load_balancer_service_client::LoadBalancerServiceClient; use fiberlb_api::pool_service_client::PoolServiceClient; use fiberlb_api::{ CreateBackendRequest, CreateListenerRequest, CreateLoadBalancerRequest, CreatePoolRequest, DeleteLoadBalancerRequest, ListenerProtocol, PoolAlgorithm, PoolProtocol, }; use k8shost_types::{LoadBalancerIngress, LoadBalancerStatus, ServiceStatus}; use std::sync::Arc; use std::time::Duration; use tokio::time::sleep; use tracing::{debug, info, warn}; /// FiberLB controller for managing LoadBalancer service VIPs pub struct FiberLbController { storage: Arc, fiberlb_addr: String, interval: Duration, } impl FiberLbController { /// Create a new FiberLB controller pub fn new(storage: Arc, fiberlb_addr: String) -> Self { Self { storage, fiberlb_addr, interval: Duration::from_secs(10), // Check every 10 seconds } } /// Start the controller loop pub async fn run(self: Arc) { info!( "FiberLB controller started (FiberLB at {}, {}s interval)", self.fiberlb_addr, self.interval.as_secs() ); loop { if let Err(e) = self.reconcile_loadbalancers().await { warn!("FiberLB controller cycle failed: {}", e); } sleep(self.interval).await; } } /// Reconcile LoadBalancer services across all tenants async fn reconcile_loadbalancers(&self) -> Result<()> { // For MVP, iterate through known tenants // In production, would get active tenants from IAM or FlareDB let tenants = vec![("default-org".to_string(), "default-project".to_string())]; for (org_id, project_id) in tenants { if let Err(e) = self.reconcile_tenant_loadbalancers(&org_id, &project_id).await { warn!( "Failed to reconcile LoadBalancers for tenant {}/{}: {}", org_id, project_id, e ); } } Ok(()) } /// Reconcile LoadBalancer services for a specific tenant async fn reconcile_tenant_loadbalancers(&self, org_id: &str, project_id: &str) -> Result<()> { // Get all services for this tenant let services = self .storage .list_services(org_id, project_id, None) .await?; // Filter for LoadBalancer services that need provisioning let lb_services: Vec<_> = services .into_iter() .filter(|svc| { // Service is a LoadBalancer if: // 1. type is "LoadBalancer" // 2. status is None OR status.load_balancer is None (not yet provisioned) svc.spec.r#type.as_deref() == Some("LoadBalancer") && (svc.status.is_none() || svc.status.as_ref().and_then(|s| s.load_balancer.as_ref()).is_none()) }) .collect(); if lb_services.is_empty() { debug!("No LoadBalancer services to provision for tenant {}/{}", org_id, project_id); return Ok(()); } info!( "Found {} LoadBalancer service(s) to provision for tenant {}/{}", lb_services.len(), org_id, project_id ); // Connect to FiberLB services let mut lb_client = match LoadBalancerServiceClient::connect(self.fiberlb_addr.clone()).await { Ok(client) => client, Err(e) => { warn!("Failed to connect to FiberLB at {}: {}", self.fiberlb_addr, e); return Ok(()); } }; let mut pool_client = match PoolServiceClient::connect(self.fiberlb_addr.clone()).await { Ok(client) => client, Err(e) => { warn!("Failed to connect to FiberLB PoolService: {}", e); return Ok(()); } }; let mut listener_client = match ListenerServiceClient::connect(self.fiberlb_addr.clone()).await { Ok(client) => client, Err(e) => { warn!("Failed to connect to FiberLB ListenerService: {}", e); return Ok(()); } }; let mut backend_client = match BackendServiceClient::connect(self.fiberlb_addr.clone()).await { Ok(client) => client, Err(e) => { warn!("Failed to connect to FiberLB BackendService: {}", e); return Ok(()); } }; // Provision each LoadBalancer service for mut service in lb_services { let namespace = service.metadata.namespace.as_deref().unwrap_or("default"); let name = &service.metadata.name; info!("Provisioning LoadBalancer for service {}/{}", namespace, name); // Create LoadBalancer in FiberLB let lb_name = format!("{}.{}", name, namespace); let create_req = CreateLoadBalancerRequest { name: lb_name.clone(), org_id: org_id.to_string(), project_id: project_id.to_string(), description: format!("k8s service {}/{}", namespace, name), }; let lb_id = match lb_client.create_load_balancer(create_req).await { Ok(response) => { let lb = response.into_inner().loadbalancer; if let Some(lb) = lb { let vip = if lb.vip_address.is_empty() { warn!("FiberLB returned LoadBalancer without VIP"); "0.0.0.0".to_string() } else { lb.vip_address.clone() }; info!( "FiberLB allocated VIP {} for service {}/{}", vip, namespace, name ); // Update service status with VIP service.status = Some(ServiceStatus { load_balancer: Some(LoadBalancerStatus { ingress: vec![LoadBalancerIngress { ip: Some(vip), hostname: None, }], }), }); // Store LoadBalancer ID in annotations service .metadata .annotations .insert("fiberlb.plasmacloud.io/lb-id".to_string(), lb.id.clone()); lb.id } else { warn!("FiberLB returned empty LoadBalancer response"); continue; } } Err(e) => { warn!( "Failed to create LoadBalancer in FiberLB for service {}/{}: {}", namespace, name, e ); continue; } }; // Create Pool for this LoadBalancer let pool_name = format!("{}-pool", lb_name); let pool_id = match pool_client .create_pool(CreatePoolRequest { name: pool_name.clone(), loadbalancer_id: lb_id.clone(), algorithm: PoolAlgorithm::RoundRobin as i32, protocol: PoolProtocol::Tcp as i32, session_persistence: None, }) .await { Ok(response) => { let pool = response.into_inner().pool; if let Some(pool) = pool { info!("Created Pool {} for service {}/{}", pool.id, namespace, name); pool.id } else { warn!("Failed to create Pool for service {}/{}", namespace, name); continue; } } Err(e) => { warn!("Failed to create Pool for service {}/{}: {}", namespace, name, e); continue; } }; // Store Pool ID in annotations service .metadata .annotations .insert("fiberlb.plasmacloud.io/pool-id".to_string(), pool_id.clone()); // Create Listeners for each Service port for svc_port in &service.spec.ports { let listener_name = format!( "{}-listener-{}", lb_name, svc_port.name.as_deref().unwrap_or(&svc_port.port.to_string()) ); match listener_client .create_listener(CreateListenerRequest { name: listener_name.clone(), loadbalancer_id: lb_id.clone(), protocol: ListenerProtocol::Tcp as i32, port: svc_port.port as u32, default_pool_id: pool_id.clone(), tls_config: None, connection_limit: 0, // No limit }) .await { Ok(response) => { let listener = response.into_inner().listener; if let Some(listener) = listener { info!( "Created Listener {} on port {} for service {}/{}", listener.id, svc_port.port, namespace, name ); } } Err(e) => { warn!( "Failed to create Listener on port {} for service {}/{}: {}", svc_port.port, namespace, name, e ); } } } // Query Pods matching Service selector and create Backends let pods = match self .storage .list_pods( org_id, project_id, Some(namespace), if service.spec.selector.is_empty() { None } else { Some(&service.spec.selector) }, ) .await { Ok(pods) => pods, Err(e) => { warn!( "Failed to list Pods for service {}/{}: {}", namespace, name, e ); vec![] } }; info!( "Found {} Pod(s) matching selector for service {}/{}", pods.len(), namespace, name ); // Create Backend for each Pod for pod in &pods { // Get Pod IP let pod_ip = match pod.status.as_ref().and_then(|s| s.pod_ip.as_ref()) { Some(ip) => ip, None => { debug!( "Pod {} has no IP yet, skipping backend creation", pod.metadata.name ); continue; } }; // For each Service port, create a Backend for svc_port in &service.spec.ports { // Use target_port if specified, otherwise use port let backend_port = svc_port.target_port.unwrap_or(svc_port.port); let backend_name = format!( "{}-backend-{}-{}", lb_name, pod.metadata.name, backend_port ); match backend_client .create_backend(CreateBackendRequest { name: backend_name.clone(), pool_id: pool_id.clone(), address: pod_ip.clone(), port: backend_port as u32, weight: 1, }) .await { Ok(response) => { let backend = response.into_inner().backend; if let Some(backend) = backend { info!( "Created Backend {} for Pod {} ({}:{}) in service {}/{}", backend.id, pod.metadata.name, pod_ip, backend_port, namespace, name ); } } Err(e) => { warn!( "Failed to create Backend for Pod {} in service {}/{}: {}", pod.metadata.name, namespace, name, e ); } } } } // Increment resource version and save updated service let current_version = service .metadata .resource_version .as_ref() .and_then(|v| v.parse::().ok()) .unwrap_or(0); service.metadata.resource_version = Some((current_version + 1).to_string()); if let Err(e) = self.storage.put_service(&service).await { warn!( "Failed to update service {}/{} with FiberLB resources: {}", namespace, name, e ); } else { info!( "Successfully provisioned LoadBalancer for service {}/{} with {} backend(s)", namespace, name, pods.len() ); } } Ok(()) } /// Cleanup LoadBalancer when Service is deleted /// /// This should be called when a Service with type=LoadBalancer is deleted. /// For MVP, this is not automatically triggered - would need a deletion watch. #[allow(dead_code)] async fn cleanup_loadbalancer(&self, lb_id: &str) -> Result<()> { let mut fiberlb_client = LoadBalancerServiceClient::connect(self.fiberlb_addr.clone()) .await?; let delete_req = DeleteLoadBalancerRequest { id: lb_id.to_string(), }; fiberlb_client .delete_load_balancer(delete_req) .await?; info!("Deleted LoadBalancer {} from FiberLB", lb_id); Ok(()) } }