diff --git a/k8shost/crates/k8shost-server/src/fiberlb_controller.rs b/k8shost/crates/k8shost-server/src/fiberlb_controller.rs index f639a7c..cb8e78a 100644 --- a/k8shost/crates/k8shost-server/src/fiberlb_controller.rs +++ b/k8shost/crates/k8shost-server/src/fiberlb_controller.rs @@ -1,10 +1,11 @@ //! FiberLB Controller - Manages LoadBalancer service VIP allocation //! -//! This controller watches for Services with type=LoadBalancer and provisions -//! external VIPs by creating LoadBalancer resources in FiberLB. +//! This controller continuously reconciles tenant-scoped LoadBalancer Services +//! into FiberLB resources and removes stale publications when Services disappear +//! or stop being externally published. use crate::auth::{authorized_request, issue_controller_token}; -use crate::storage::Storage; +use crate::storage::{Storage, TenantRef}; use anyhow::Result; use fiberlb_api::backend_service_client::BackendServiceClient; use fiberlb_api::listener_service_client::ListenerServiceClient; @@ -12,15 +13,21 @@ use fiberlb_api::load_balancer_service_client::LoadBalancerServiceClient; use fiberlb_api::pool_service_client::PoolServiceClient; use fiberlb_api::{ CreateBackendRequest, CreateListenerRequest, CreateLoadBalancerRequest, CreatePoolRequest, - DeleteLoadBalancerRequest, ListenerProtocol, PoolAlgorithm, PoolProtocol, + DeleteBackendRequest, DeleteListenerRequest, DeleteLoadBalancerRequest, ListBackendsRequest, + ListListenersRequest, ListLoadBalancersRequest, ListPoolsRequest, ListenerProtocol, + LoadBalancer, PoolAlgorithm, PoolProtocol, }; -use k8shost_types::{LoadBalancerIngress, LoadBalancerStatus, ServiceStatus}; +use k8shost_types::{LoadBalancerIngress, LoadBalancerStatus, Pod, Service, ServiceStatus}; +use std::collections::{BTreeSet, HashMap, HashSet}; use std::sync::Arc; use std::time::Duration; use tokio::time::sleep; +use tonic::{transport::Channel, Code}; use tracing::{debug, info, warn}; const CONTROLLER_PRINCIPAL_ID: &str = "k8shost-controller"; +const LB_ID_ANNOTATION: &str = "fiberlb.plasmacloud.io/lb-id"; +const POOL_ID_ANNOTATION: &str = "fiberlb.plasmacloud.io/pool-id"; /// FiberLB controller for managing LoadBalancer service VIPs pub struct FiberLbController { @@ -28,20 +35,20 @@ pub struct FiberLbController { fiberlb_addr: String, iam_server_addr: String, interval: Duration, + known_tenants: Arc>>, } impl FiberLbController { - /// Create a new FiberLB controller pub fn new(storage: Arc, fiberlb_addr: String, iam_server_addr: String) -> Self { Self { storage, fiberlb_addr, iam_server_addr, - interval: Duration::from_secs(10), // Check every 10 seconds + interval: Duration::from_secs(10), + known_tenants: Arc::new(tokio::sync::RwLock::new(BTreeSet::new())), } } - /// Start the controller loop pub async fn run(self: Arc) { info!( "FiberLB controller started (FiberLB at {}, {}s interval)", @@ -50,420 +57,28 @@ impl FiberLbController { ); loop { - if let Err(e) = self.reconcile_loadbalancers().await { - warn!("FiberLB controller cycle failed: {}", e); + if let Err(error) = self.reconcile_loadbalancers().await { + warn!(error = %error, "FiberLB controller cycle failed"); } sleep(self.interval).await; } } - /// Reconcile LoadBalancer services across all tenants async fn reconcile_loadbalancers(&self) -> Result<()> { - // For MVP, iterate through known tenants - // In production, would get active tenants from IAM or FlareDB - let tenants = vec![("default-org".to_string(), "default-project".to_string())]; - - for (org_id, project_id) in tenants { - if let Err(e) = self - .reconcile_tenant_loadbalancers(&org_id, &project_id) - .await - { - warn!( - "Failed to reconcile LoadBalancers for tenant {}/{}: {}", - org_id, project_id, e - ); - } - } - - Ok(()) - } - - /// Reconcile LoadBalancer services for a specific tenant - async fn reconcile_tenant_loadbalancers(&self, org_id: &str, project_id: &str) -> Result<()> { - // Get all services for this tenant - let services = self.storage.list_services(org_id, project_id, None).await?; - - // Filter for LoadBalancer services that need provisioning - let lb_services: Vec<_> = services - .into_iter() - .filter(|svc| { - // Service is a LoadBalancer if: - // 1. type is "LoadBalancer" - // 2. status is None OR status.load_balancer is None (not yet provisioned) - svc.spec.r#type.as_deref() == Some("LoadBalancer") - && (svc.status.is_none() - || svc - .status - .as_ref() - .and_then(|s| s.load_balancer.as_ref()) - .is_none()) - }) - .collect(); - - if lb_services.is_empty() { - debug!( - "No LoadBalancer services to provision for tenant {}/{}", - org_id, project_id - ); + let tenants = self.reconciliation_tenants().await?; + if tenants.is_empty() { + debug!("No active tenants need FiberLB reconciliation"); return Ok(()); } - info!( - "Found {} LoadBalancer service(s) to provision for tenant {}/{}", - lb_services.len(), - org_id, - project_id - ); - - let auth_token = issue_controller_token( - &self.iam_server_addr, - CONTROLLER_PRINCIPAL_ID, - org_id, - project_id, - ) - .await?; - - // Connect to FiberLB services - let mut lb_client = - match LoadBalancerServiceClient::connect(self.fiberlb_addr.clone()).await { - Ok(client) => client, - Err(e) => { - warn!( - "Failed to connect to FiberLB at {}: {}", - self.fiberlb_addr, e - ); - return Ok(()); - } - }; - - let mut pool_client = match PoolServiceClient::connect(self.fiberlb_addr.clone()).await { - Ok(client) => client, - Err(e) => { - warn!("Failed to connect to FiberLB PoolService: {}", e); - return Ok(()); - } - }; - - let mut listener_client = - match ListenerServiceClient::connect(self.fiberlb_addr.clone()).await { - Ok(client) => client, - Err(e) => { - warn!("Failed to connect to FiberLB ListenerService: {}", e); - return Ok(()); - } - }; - - let mut backend_client = - match BackendServiceClient::connect(self.fiberlb_addr.clone()).await { - Ok(client) => client, - Err(e) => { - warn!("Failed to connect to FiberLB BackendService: {}", e); - return Ok(()); - } - }; - - // Provision each LoadBalancer service - for mut service in lb_services { - let namespace = service - .metadata - .namespace - .clone() - .unwrap_or_else(|| "default".to_string()); - let name = service.metadata.name.clone(); - - info!( - "Provisioning LoadBalancer for service {}/{}", - namespace, name - ); - - // Create LoadBalancer in FiberLB - let lb_name = format!("{}.{}", name, namespace); - let mut allocated_vip: Option = None; - let create_req = CreateLoadBalancerRequest { - name: lb_name.clone(), - org_id: org_id.to_string(), - project_id: project_id.to_string(), - description: format!("k8s service {}/{}", namespace, name), - vip_address: String::new(), - }; - - let lb_id = match lb_client - .create_load_balancer(authorized_request(create_req, &auth_token)) - .await - { - Ok(response) => { - let lb = response.into_inner().loadbalancer; - if let Some(lb) = lb { - let vip = if lb.vip_address.is_empty() { - warn!("FiberLB returned LoadBalancer without VIP"); - "0.0.0.0".to_string() - } else { - lb.vip_address.clone() - }; - - info!( - "FiberLB allocated VIP {} for service {}/{}", - vip, namespace, name - ); - allocated_vip = Some(vip); - lb.id - } else { - warn!("FiberLB returned empty LoadBalancer response"); - continue; - } - } - Err(e) => { - warn!( - "Failed to create LoadBalancer in FiberLB for service {}/{}: {}", - namespace, name, e - ); - continue; - } - }; - - // Create Pool for this LoadBalancer - let pool_name = format!("{}-pool", lb_name); - let pool_id = match pool_client - .create_pool(authorized_request( - CreatePoolRequest { - name: pool_name.clone(), - loadbalancer_id: lb_id.clone(), - algorithm: PoolAlgorithm::RoundRobin as i32, - protocol: PoolProtocol::Tcp as i32, - session_persistence: None, - }, - &auth_token, - )) - .await - { - Ok(response) => { - let pool = response.into_inner().pool; - if let Some(pool) = pool { - info!( - "Created Pool {} for service {}/{}", - pool.id, namespace, name - ); - pool.id - } else { - warn!("Failed to create Pool for service {}/{}", namespace, name); - continue; - } - } - Err(e) => { - warn!( - "Failed to create Pool for service {}/{}: {}", - namespace, name, e - ); - continue; - } - }; - - // Create Listeners for each Service port - let mut listeners_ready = true; - for svc_port in &service.spec.ports { - let listener_name = format!( - "{}-listener-{}", - lb_name, - svc_port - .name - .as_deref() - .unwrap_or(&svc_port.port.to_string()) - ); - - match listener_client - .create_listener(authorized_request( - CreateListenerRequest { - name: listener_name.clone(), - loadbalancer_id: lb_id.clone(), - protocol: ListenerProtocol::Tcp as i32, - port: svc_port.port as u32, - default_pool_id: pool_id.clone(), - tls_config: None, - connection_limit: 0, // No limit - }, - &auth_token, - )) - .await - { - Ok(response) => { - let listener = response.into_inner().listener; - if let Some(listener) = listener { - info!( - "Created Listener {} on port {} for service {}/{}", - listener.id, svc_port.port, namespace, name - ); - } - } - Err(e) => { - listeners_ready = false; - warn!( - "Failed to create Listener on port {} for service {}/{}: {}", - svc_port.port, namespace, name, e - ); - } - } - } - - // Query Pods matching Service selector and create Backends - let pods = match self - .storage - .list_pods( - org_id, - project_id, - Some(&namespace), - if service.spec.selector.is_empty() { - None - } else { - Some(&service.spec.selector) - }, - ) - .await - { - Ok(pods) => pods, - Err(e) => { - warn!( - "Failed to list Pods for service {}/{}: {}", - namespace, name, e - ); - vec![] - } - }; - - info!( - "Found {} Pod(s) matching selector for service {}/{}", - pods.len(), - namespace, - name - ); - - // Create Backend for each Pod - let mut backend_count = 0usize; - for pod in &pods { - // Get Pod IP - let pod_ip = match pod.status.as_ref().and_then(|s| s.pod_ip.as_ref()) { - Some(ip) => ip, - None => { - debug!( - "Pod {} has no IP yet, skipping backend creation", - pod.metadata.name - ); - continue; - } - }; - - // For each Service port, create a Backend - for svc_port in &service.spec.ports { - // Use target_port if specified, otherwise use port - let backend_port = svc_port.target_port.unwrap_or(svc_port.port); - - let backend_name = - format!("{}-backend-{}-{}", lb_name, pod.metadata.name, backend_port); - - match backend_client - .create_backend(authorized_request( - CreateBackendRequest { - name: backend_name.clone(), - pool_id: pool_id.clone(), - address: pod_ip.clone(), - port: backend_port as u32, - weight: 1, - }, - &auth_token, - )) - .await - { - Ok(response) => { - let backend = response.into_inner().backend; - if let Some(backend) = backend { - backend_count += 1; - info!( - "Created Backend {} for Pod {} ({}:{}) in service {}/{}", - backend.id, - pod.metadata.name, - pod_ip, - backend_port, - namespace, - name - ); - } - } - Err(e) => { - warn!( - "Failed to create Backend for Pod {} in service {}/{}: {}", - pod.metadata.name, namespace, name, e - ); - } - } - } - } - - if !listeners_ready { + for tenant in tenants { + if let Err(error) = self.reconcile_tenant_loadbalancers(&tenant).await { warn!( - "Skipping Service update for {}/{} because one or more FiberLB listeners failed", - namespace, name - ); - continue; - } - - if backend_count == 0 { - warn!( - "Skipping Service update for {}/{} because no FiberLB backends were created", - namespace, name - ); - continue; - } - - service.status = Some(ServiceStatus { - load_balancer: Some(LoadBalancerStatus { - ingress: vec![LoadBalancerIngress { - ip: allocated_vip, - hostname: None, - }], - }), - }); - service - .metadata - .annotations - .insert("fiberlb.plasmacloud.io/lb-id".to_string(), lb_id.clone()); - service.metadata.annotations.insert( - "fiberlb.plasmacloud.io/pool-id".to_string(), - pool_id.clone(), - ); - - // Merge with the latest stored version so the DNS controller does not lose its annotations. - if let Ok(Some(mut current)) = self - .storage - .get_service(org_id, project_id, &namespace, &name) - .await - { - current.status = service.status.clone().or(current.status); - current - .metadata - .annotations - .extend(service.metadata.annotations.clone()); - service = current; - } - - let current_version = service - .metadata - .resource_version - .as_ref() - .and_then(|v| v.parse::().ok()) - .unwrap_or(0); - service.metadata.resource_version = Some((current_version + 1).to_string()); - - if let Err(e) = self.storage.put_service(&service).await { - warn!( - "Failed to update service {}/{} with FiberLB resources: {}", - namespace, name, e - ); - } else { - info!( - "Successfully provisioned LoadBalancer for service {}/{} with {} backend(s)", - namespace, - name, - pods.len() + org_id = %tenant.org_id, + project_id = %tenant.project_id, + error = %error, + "failed to reconcile tenant load balancers" ); } } @@ -471,36 +86,761 @@ impl FiberLbController { Ok(()) } - /// Cleanup LoadBalancer when Service is deleted - /// - /// This should be called when a Service with type=LoadBalancer is deleted. - /// For MVP, this is not automatically triggered - would need a deletion watch. - #[allow(dead_code)] - async fn cleanup_loadbalancer( - &self, - org_id: &str, - project_id: &str, - lb_id: &str, - ) -> Result<()> { - let mut fiberlb_client = - LoadBalancerServiceClient::connect(self.fiberlb_addr.clone()).await?; + async fn reconcile_tenant_loadbalancers(&self, tenant: &TenantRef) -> Result<()> { + let services = self + .storage + .list_services(&tenant.org_id, &tenant.project_id, None) + .await?; + + for service in services.iter().filter(|service| { + !service_is_load_balancer(service) && has_load_balancer_state(service) + }) { + self.clear_service_load_balancer_state( + &tenant.org_id, + &tenant.project_id, + service_namespace(service), + &service.metadata.name, + ) + .await?; + } + + let desired_services = services + .into_iter() + .filter(service_is_load_balancer) + .collect::>(); + if !desired_services.is_empty() { + self.known_tenants + .write() + .await + .insert(tenant_cache_key(tenant)); + } + let desired_names = desired_services + .iter() + .map(service_lb_name) + .collect::>(); + let auth_token = issue_controller_token( &self.iam_server_addr, CONTROLLER_PRINCIPAL_ID, - org_id, - project_id, + &tenant.org_id, + &tenant.project_id, ) .await?; - let delete_req = DeleteLoadBalancerRequest { - id: lb_id.to_string(), - }; + let mut lb_client = + match LoadBalancerServiceClient::connect(self.fiberlb_addr.clone()).await { + Ok(client) => client, + Err(error) => { + warn!( + address = %self.fiberlb_addr, + error = %error, + "failed to connect to FiberLB load balancer service" + ); + return Ok(()); + } + }; + let mut pool_client = PoolServiceClient::connect(self.fiberlb_addr.clone()).await?; + let mut listener_client = ListenerServiceClient::connect(self.fiberlb_addr.clone()).await?; + let mut backend_client = BackendServiceClient::connect(self.fiberlb_addr.clone()).await?; - fiberlb_client - .delete_load_balancer(authorized_request(delete_req, &auth_token)) + let current_load_balancers = self + .list_load_balancers(&mut lb_client, tenant, &auth_token) .await?; + for load_balancer in current_load_balancers + .iter() + .filter(|load_balancer| is_managed_load_balancer(load_balancer)) + { + if !desired_names.contains(&load_balancer.name) { + delete_load_balancer(&mut lb_client, &auth_token, &load_balancer.id).await?; + } + } + + for service in desired_services { + self.reconcile_service_load_balancer( + tenant, + &mut lb_client, + &mut pool_client, + &mut listener_client, + &mut backend_client, + &auth_token, + ¤t_load_balancers, + &service, + ) + .await?; + } + + if desired_names.is_empty() { + self.known_tenants + .write() + .await + .remove(&tenant_cache_key(tenant)); + } - info!("Deleted LoadBalancer {} from FiberLB", lb_id); Ok(()) } + + async fn reconciliation_tenants(&self) -> Result> { + let mut tenants = self + .storage + .list_active_tenants() + .await? + .into_iter() + .collect::>(); + + for cache_key in self.known_tenants.read().await.iter() { + if let Some(tenant) = parse_tenant_cache_key(cache_key) { + tenants.insert(tenant); + } + } + + Ok(tenants.into_iter().collect()) + } + + async fn reconcile_service_load_balancer( + &self, + tenant: &TenantRef, + lb_client: &mut LoadBalancerServiceClient, + pool_client: &mut PoolServiceClient, + listener_client: &mut ListenerServiceClient, + backend_client: &mut BackendServiceClient, + auth_token: &str, + current_load_balancers: &[LoadBalancer], + service: &Service, + ) -> Result<()> { + let namespace = service_namespace(service).to_string(); + let lb_name = service_lb_name(service); + let description = service_lb_description(service); + let annotated_lb_id = service.metadata.annotations.get(LB_ID_ANNOTATION); + let existing_load_balancer = current_load_balancers.iter().find(|load_balancer| { + annotated_lb_id.map(String::as_str) == Some(load_balancer.id.as_str()) + || load_balancer.name == lb_name + }); + + let load_balancer = ensure_load_balancer( + lb_client, + auth_token, + tenant, + &lb_name, + &description, + existing_load_balancer, + ) + .await?; + let pool = ensure_pool( + pool_client, + auth_token, + &load_balancer.id, + &format!("{lb_name}-pool"), + ) + .await?; + + reconcile_listeners( + listener_client, + auth_token, + &load_balancer.id, + &pool.id, + service, + &lb_name, + ) + .await?; + + let pods = self + .storage + .list_pods( + &tenant.org_id, + &tenant.project_id, + Some(&namespace), + if service.spec.selector.is_empty() { + None + } else { + Some(&service.spec.selector) + }, + ) + .await?; + + reconcile_backends( + backend_client, + auth_token, + &pool.id, + service, + &lb_name, + &pods, + ) + .await?; + + self.reconcile_service_load_balancer_state( + &tenant.org_id, + &tenant.project_id, + &namespace, + &service.metadata.name, + &load_balancer.id, + &pool.id, + non_empty_value(&load_balancer.vip_address), + ) + .await?; + + Ok(()) + } + + async fn reconcile_service_load_balancer_state( + &self, + org_id: &str, + project_id: &str, + namespace: &str, + name: &str, + lb_id: &str, + pool_id: &str, + vip: Option<&str>, + ) -> Result<()> { + let Some(mut service) = self + .storage + .get_service(org_id, project_id, namespace, name) + .await? + else { + return Ok(()); + }; + + let mut changed = false; + changed |= set_annotation(&mut service, LB_ID_ANNOTATION, lb_id); + changed |= set_annotation(&mut service, POOL_ID_ANNOTATION, pool_id); + changed |= set_load_balancer_status(&mut service, vip); + + if !changed { + return Ok(()); + } + + bump_resource_version(&mut service); + self.storage.put_service(&service).await?; + Ok(()) + } + + async fn clear_service_load_balancer_state( + &self, + org_id: &str, + project_id: &str, + namespace: &str, + name: &str, + ) -> Result<()> { + let Some(mut service) = self + .storage + .get_service(org_id, project_id, namespace, name) + .await? + else { + return Ok(()); + }; + + let mut changed = false; + changed |= service + .metadata + .annotations + .remove(LB_ID_ANNOTATION) + .is_some(); + changed |= service + .metadata + .annotations + .remove(POOL_ID_ANNOTATION) + .is_some(); + + let had_load_balancer = service + .status + .as_ref() + .and_then(|status| status.load_balancer.as_ref()) + .is_some(); + if had_load_balancer { + if let Some(status) = service.status.as_mut() { + status.load_balancer = None; + } + if matches!(service.status.as_ref(), Some(status) if status.load_balancer.is_none()) { + service.status = None; + } + changed = true; + } + + if !changed { + return Ok(()); + } + + bump_resource_version(&mut service); + self.storage.put_service(&service).await?; + Ok(()) + } + + async fn list_load_balancers( + &self, + client: &mut LoadBalancerServiceClient, + tenant: &TenantRef, + auth_token: &str, + ) -> Result> { + let mut load_balancers = Vec::new(); + let mut page_token = String::new(); + + loop { + let response = client + .list_load_balancers(authorized_request( + ListLoadBalancersRequest { + org_id: tenant.org_id.clone(), + project_id: tenant.project_id.clone(), + page_size: 256, + page_token: page_token.clone(), + }, + auth_token, + )) + .await? + .into_inner(); + + load_balancers.extend(response.loadbalancers); + if response.next_page_token.is_empty() { + break; + } + page_token = response.next_page_token; + } + + Ok(load_balancers) + } +} + +#[derive(Clone)] +struct DesiredBackend { + name: String, + address: String, + port: u32, +} + +async fn ensure_load_balancer( + client: &mut LoadBalancerServiceClient, + auth_token: &str, + tenant: &TenantRef, + name: &str, + description: &str, + existing: Option<&LoadBalancer>, +) -> Result { + if let Some(load_balancer) = existing { + return Ok(load_balancer.clone()); + } + + Ok(client + .create_load_balancer(authorized_request( + CreateLoadBalancerRequest { + name: name.to_string(), + org_id: tenant.org_id.clone(), + project_id: tenant.project_id.clone(), + description: description.to_string(), + vip_address: String::new(), + }, + auth_token, + )) + .await? + .into_inner() + .loadbalancer + .ok_or_else(|| anyhow::anyhow!("FiberLB returned empty CreateLoadBalancer response"))?) +} + +async fn ensure_pool( + client: &mut PoolServiceClient, + auth_token: &str, + load_balancer_id: &str, + name: &str, +) -> Result { + let mut page_token = String::new(); + + loop { + let response = client + .list_pools(authorized_request( + ListPoolsRequest { + loadbalancer_id: load_balancer_id.to_string(), + page_size: 256, + page_token: page_token.clone(), + }, + auth_token, + )) + .await? + .into_inner(); + + if let Some(pool) = response.pools.into_iter().find(|pool| pool.name == name) { + return Ok(pool); + } + + if response.next_page_token.is_empty() { + break; + } + page_token = response.next_page_token; + } + + Ok(client + .create_pool(authorized_request( + CreatePoolRequest { + name: name.to_string(), + loadbalancer_id: load_balancer_id.to_string(), + algorithm: PoolAlgorithm::RoundRobin as i32, + protocol: PoolProtocol::Tcp as i32, + session_persistence: None, + }, + auth_token, + )) + .await? + .into_inner() + .pool + .ok_or_else(|| anyhow::anyhow!("FiberLB returned empty CreatePool response"))?) +} + +async fn reconcile_listeners( + client: &mut ListenerServiceClient, + auth_token: &str, + load_balancer_id: &str, + pool_id: &str, + service: &Service, + lb_name: &str, +) -> Result<()> { + let desired = service + .spec + .ports + .iter() + .map(|port| { + ( + format!( + "{}-listener-{}", + lb_name, + port.name.as_deref().unwrap_or(&port.port.to_string()) + ), + port.port as u32, + ) + }) + .collect::>(); + let mut satisfied = HashSet::new(); + + for listener in list_listeners(client, auth_token, load_balancer_id).await? { + match desired.get(&listener.name) { + Some(port) + if listener.port == *port + && listener.protocol == ListenerProtocol::Tcp as i32 + && listener.default_pool_id == pool_id + && !satisfied.contains(&listener.name) => + { + satisfied.insert(listener.name.clone()); + } + _ => delete_listener(client, auth_token, &listener.id).await?, + } + } + + for (name, port) in desired { + if satisfied.contains(&name) { + continue; + } + + client + .create_listener(authorized_request( + CreateListenerRequest { + name, + loadbalancer_id: load_balancer_id.to_string(), + protocol: ListenerProtocol::Tcp as i32, + port, + default_pool_id: pool_id.to_string(), + tls_config: None, + connection_limit: 0, + }, + auth_token, + )) + .await?; + } + + Ok(()) +} + +async fn reconcile_backends( + client: &mut BackendServiceClient, + auth_token: &str, + pool_id: &str, + service: &Service, + lb_name: &str, + pods: &[Pod], +) -> Result<()> { + let desired = desired_backends(service, lb_name, pods) + .into_iter() + .map(|backend| (backend.name.clone(), backend)) + .collect::>(); + let mut satisfied = HashSet::new(); + + for backend in list_backends(client, auth_token, pool_id).await? { + match desired.get(&backend.name) { + Some(target) + if backend.address == target.address + && backend.port == target.port + && !satisfied.contains(&backend.name) => + { + satisfied.insert(backend.name.clone()); + } + _ => delete_backend(client, auth_token, &backend.id).await?, + } + } + + for backend in desired.into_values() { + if satisfied.contains(&backend.name) { + continue; + } + + client + .create_backend(authorized_request( + CreateBackendRequest { + name: backend.name, + pool_id: pool_id.to_string(), + address: backend.address, + port: backend.port, + weight: 1, + }, + auth_token, + )) + .await?; + } + + Ok(()) +} + +async fn list_listeners( + client: &mut ListenerServiceClient, + auth_token: &str, + load_balancer_id: &str, +) -> Result> { + let mut listeners = Vec::new(); + let mut page_token = String::new(); + + loop { + let response = client + .list_listeners(authorized_request( + ListListenersRequest { + loadbalancer_id: load_balancer_id.to_string(), + page_size: 256, + page_token: page_token.clone(), + }, + auth_token, + )) + .await? + .into_inner(); + + listeners.extend(response.listeners); + if response.next_page_token.is_empty() { + break; + } + page_token = response.next_page_token; + } + + Ok(listeners) +} + +async fn list_backends( + client: &mut BackendServiceClient, + auth_token: &str, + pool_id: &str, +) -> Result> { + let mut backends = Vec::new(); + let mut page_token = String::new(); + + loop { + let response = client + .list_backends(authorized_request( + ListBackendsRequest { + pool_id: pool_id.to_string(), + page_size: 256, + page_token: page_token.clone(), + }, + auth_token, + )) + .await? + .into_inner(); + + backends.extend(response.backends); + if response.next_page_token.is_empty() { + break; + } + page_token = response.next_page_token; + } + + Ok(backends) +} + +fn desired_backends(service: &Service, lb_name: &str, pods: &[Pod]) -> Vec { + let mut desired = Vec::new(); + + for pod in pods { + let Some(pod_ip) = pod + .status + .as_ref() + .and_then(|status| status.pod_ip.as_ref()) + else { + continue; + }; + + for service_port in &service.spec.ports { + let backend_port = service_port.target_port.unwrap_or(service_port.port); + desired.push(DesiredBackend { + name: format!("{}-backend-{}-{}", lb_name, pod.metadata.name, backend_port), + address: pod_ip.clone(), + port: backend_port as u32, + }); + } + } + + desired +} + +fn service_is_load_balancer(service: &Service) -> bool { + service.spec.r#type.as_deref() == Some("LoadBalancer") +} + +fn tenant_cache_key(tenant: &TenantRef) -> String { + format!("{}/{}", tenant.org_id, tenant.project_id) +} + +fn parse_tenant_cache_key(value: &str) -> Option { + let (org_id, project_id) = value.split_once('/')?; + Some(TenantRef { + org_id: org_id.to_string(), + project_id: project_id.to_string(), + }) +} + +fn has_load_balancer_state(service: &Service) -> bool { + service.metadata.annotations.contains_key(LB_ID_ANNOTATION) + || service + .metadata + .annotations + .contains_key(POOL_ID_ANNOTATION) + || service + .status + .as_ref() + .and_then(|status| status.load_balancer.as_ref()) + .is_some() +} + +fn service_namespace(service: &Service) -> &str { + service.metadata.namespace.as_deref().unwrap_or("default") +} + +fn service_lb_name(service: &Service) -> String { + format!("{}.{}", service.metadata.name, service_namespace(service)) +} + +fn service_lb_description(service: &Service) -> String { + format!( + "k8s service {}/{}", + service_namespace(service), + service.metadata.name + ) +} + +fn is_managed_load_balancer(load_balancer: &LoadBalancer) -> bool { + load_balancer.description.starts_with("k8s service ") +} + +fn non_empty_value(value: &str) -> Option<&str> { + if value.is_empty() { + None + } else { + Some(value) + } +} + +fn set_annotation(service: &mut Service, key: &str, value: &str) -> bool { + match service.metadata.annotations.get(key) { + Some(current) if current == value => false, + _ => { + service + .metadata + .annotations + .insert(key.to_string(), value.to_string()); + true + } + } +} + +fn set_load_balancer_status(service: &mut Service, vip: Option<&str>) -> bool { + let desired = vip.map(|vip| ServiceStatus { + load_balancer: Some(LoadBalancerStatus { + ingress: vec![LoadBalancerIngress { + ip: Some(vip.to_string()), + hostname: None, + }], + }), + }); + + let current_vip = service + .status + .as_ref() + .and_then(|status| status.load_balancer.as_ref()) + .and_then(|load_balancer| load_balancer.ingress.first()) + .and_then(|ingress| ingress.ip.as_deref()); + if current_vip == vip { + return false; + } + + service.status = desired; + true +} + +fn bump_resource_version(service: &mut Service) { + let current = service + .metadata + .resource_version + .as_deref() + .and_then(|value| value.parse::().ok()) + .unwrap_or(0); + service.metadata.resource_version = Some((current + 1).to_string()); +} + +async fn delete_load_balancer( + client: &mut LoadBalancerServiceClient, + auth_token: &str, + lb_id: &str, +) -> Result<()> { + match client + .delete_load_balancer(authorized_request( + DeleteLoadBalancerRequest { + id: lb_id.to_string(), + }, + auth_token, + )) + .await + { + Ok(_) => Ok(()), + Err(status) if status.code() == Code::NotFound => Ok(()), + Err(status) => Err(status.into()), + } +} + +async fn delete_listener( + client: &mut ListenerServiceClient, + auth_token: &str, + listener_id: &str, +) -> Result<()> { + match client + .delete_listener(authorized_request( + DeleteListenerRequest { + id: listener_id.to_string(), + }, + auth_token, + )) + .await + { + Ok(_) => Ok(()), + Err(status) if status.code() == Code::NotFound => Ok(()), + Err(status) => Err(status.into()), + } +} + +async fn delete_backend( + client: &mut BackendServiceClient, + auth_token: &str, + backend_id: &str, +) -> Result<()> { + match client + .delete_backend(authorized_request( + DeleteBackendRequest { + id: backend_id.to_string(), + }, + auth_token, + )) + .await + { + Ok(_) => Ok(()), + Err(status) if status.code() == Code::NotFound => Ok(()), + Err(status) => Err(status.into()), + } } diff --git a/k8shost/crates/k8shost-server/src/flashdns_controller.rs b/k8shost/crates/k8shost-server/src/flashdns_controller.rs index 05e173e..64d29bf 100644 --- a/k8shost/crates/k8shost-server/src/flashdns_controller.rs +++ b/k8shost/crates/k8shost-server/src/flashdns_controller.rs @@ -1,27 +1,32 @@ //! FlashDNS Controller - Manages cluster.local DNS records for Services //! -//! This controller watches for Services and automatically creates DNS records -//! in the format: {service}.{namespace}.svc.cluster.local → ClusterIP +//! This controller continuously reconciles tenant-scoped Service resources into +//! authoritative `*.svc.cluster.local` A records and removes stale records when +//! Services disappear or lose their ClusterIP. use crate::auth::{authorized_request, issue_controller_token}; -use crate::storage::Storage; +use crate::storage::{Storage, TenantRef}; use anyhow::Result; use flashdns_api::proto::record_service_client::RecordServiceClient; use flashdns_api::proto::zone_service_client::ZoneServiceClient; use flashdns_api::proto::{ get_zone_request, record_data, ARecord, CreateRecordRequest, CreateZoneRequest, - DeleteRecordRequest, GetZoneRequest, ListZonesRequest, RecordData, + DeleteRecordRequest, GetZoneRequest, ListRecordsRequest, ListZonesRequest, RecordData, + RecordInfo, }; -use std::collections::HashMap; +use k8shost_types::Service; +use std::collections::{BTreeSet, HashMap}; use std::sync::Arc; use std::time::Duration; use tokio::time::sleep; -use tonic::Code; +use tonic::{transport::Channel, Code}; use tracing::{debug, info, warn}; const CLUSTER_DOMAIN: &str = "cluster.local"; -const DNS_RECORD_TTL: u32 = 60; // 60 seconds for dynamic cluster services +const DNS_RECORD_TTL: u32 = 60; const CONTROLLER_PRINCIPAL_ID: &str = "k8shost-controller"; +const RECORD_ID_ANNOTATION: &str = "flashdns.plasmacloud.io/record-id"; +const ZONE_ID_ANNOTATION: &str = "flashdns.plasmacloud.io/zone-id"; /// FlashDNS controller for managing cluster.local DNS records pub struct FlashDnsController { @@ -29,23 +34,20 @@ pub struct FlashDnsController { flashdns_addr: String, iam_server_addr: String, interval: Duration, - /// Cache of zone_id per tenant (org_id/project_id -> zone_id) zone_cache: Arc>>, } impl FlashDnsController { - /// Create a new FlashDNS controller pub fn new(storage: Arc, flashdns_addr: String, iam_server_addr: String) -> Self { Self { storage, flashdns_addr, iam_server_addr, - interval: Duration::from_secs(10), // Check every 10 seconds + interval: Duration::from_secs(10), zone_cache: Arc::new(tokio::sync::RwLock::new(HashMap::new())), } } - /// Start the controller loop pub async fn run(self: Arc) { info!( "FlashDNS controller started (FlashDNS at {}, {}s interval)", @@ -54,268 +56,328 @@ impl FlashDnsController { ); loop { - if let Err(e) = self.reconcile_dns_records().await { - warn!("FlashDNS controller cycle failed: {}", e); + if let Err(error) = self.reconcile_dns_records().await { + warn!(error = %error, "FlashDNS controller cycle failed"); } sleep(self.interval).await; } } - /// Reconcile DNS records across all tenants async fn reconcile_dns_records(&self) -> Result<()> { - // For MVP, iterate through known tenants - let tenants = vec![("default-org".to_string(), "default-project".to_string())]; - - for (org_id, project_id) in tenants { - if let Err(e) = self.reconcile_tenant_dns(&org_id, &project_id).await { - warn!( - "Failed to reconcile DNS for tenant {}/{}: {}", - org_id, project_id, e - ); - } - } - - Ok(()) - } - - /// Reconcile DNS records for a specific tenant - async fn reconcile_tenant_dns(&self, org_id: &str, project_id: &str) -> Result<()> { - let auth_token = issue_controller_token( - &self.iam_server_addr, - CONTROLLER_PRINCIPAL_ID, - org_id, - project_id, - ) - .await?; - - // Ensure cluster.local zone exists for this tenant - let zone_id = match self - .ensure_zone_exists(org_id, project_id, &auth_token) - .await - { - Ok(id) => id, - Err(e) => { - warn!( - "Failed to ensure zone exists for tenant {}/{}: {}", - org_id, project_id, e - ); - return Ok(()); - } - }; - - // Get all services for this tenant - let services = self.storage.list_services(org_id, project_id, None).await?; - - // Filter for services that need DNS records - let services_needing_dns: Vec<_> = services - .into_iter() - .filter(|svc| { - // Service needs DNS if: - // 1. Has cluster_ip allocated - // 2. Does NOT have flashdns.plasmacloud.io/record-id annotation (not yet provisioned) - svc.spec.cluster_ip.is_some() - && !svc - .metadata - .annotations - .contains_key("flashdns.plasmacloud.io/record-id") - }) - .collect(); - - if services_needing_dns.is_empty() { - debug!( - "No services need DNS records for tenant {}/{}", - org_id, project_id - ); + let tenants = self.reconciliation_tenants().await?; + if tenants.is_empty() { + debug!("No active tenants need FlashDNS reconciliation"); return Ok(()); } - info!( - "Found {} service(s) needing DNS records for tenant {}/{}", - services_needing_dns.len(), - org_id, - project_id - ); - - // Connect to FlashDNS - let mut record_client = match RecordServiceClient::connect(self.flashdns_addr.clone()).await - { - Ok(client) => client, - Err(e) => { + for tenant in tenants { + if let Err(error) = self.reconcile_tenant_dns(&tenant).await { warn!( - "Failed to connect to FlashDNS at {}: {}", - self.flashdns_addr, e + org_id = %tenant.org_id, + project_id = %tenant.project_id, + error = %error, + "failed to reconcile tenant DNS" ); - return Ok(()); - } - }; - - // Create DNS records for each service - for mut service in services_needing_dns { - let namespace = service - .metadata - .namespace - .clone() - .unwrap_or_else(|| "default".to_string()); - let name = service.metadata.name.clone(); - let cluster_ip = service.spec.cluster_ip.as_ref().unwrap(); - - // Construct DNS name: {service}.{namespace}.svc - // Full FQDN will be: {service}.{namespace}.svc.cluster.local - let record_name = format!("{}.{}.svc", name, namespace); - - info!( - "Creating DNS record {} → {} for service {}/{}", - record_name, cluster_ip, namespace, name - ); - - // Create A record - let create_req = CreateRecordRequest { - zone_id: zone_id.clone(), - name: record_name.clone(), - record_type: "A".to_string(), - ttl: DNS_RECORD_TTL, - data: Some(RecordData { - data: Some(record_data::Data::A(ARecord { - address: cluster_ip.clone(), - })), - }), - }; - - match record_client - .create_record(authorized_request(create_req, &auth_token)) - .await - { - Ok(response) => { - let record = response.into_inner().record; - if let Some(record) = record { - info!( - "Created DNS record {} → {} (record_id: {})", - record_name, cluster_ip, record.id - ); - - // Store record_id in service annotations - service - .metadata - .annotations - .insert("flashdns.plasmacloud.io/record-id".to_string(), record.id); - service.metadata.annotations.insert( - "flashdns.plasmacloud.io/zone-id".to_string(), - zone_id.clone(), - ); - - // Merge with the latest stored version so the FiberLB controller does not - // lose its status/annotations when both controllers reconcile together. - if let Ok(Some(mut current)) = self - .storage - .get_service(org_id, project_id, &namespace, &name) - .await - { - current.status = current.status.or(service.status.clone()); - current - .metadata - .annotations - .extend(service.metadata.annotations.clone()); - service = current; - } - - let current_version = service - .metadata - .resource_version - .as_ref() - .and_then(|v| v.parse::().ok()) - .unwrap_or(0); - service.metadata.resource_version = Some((current_version + 1).to_string()); - - // Save updated service - if let Err(e) = self.storage.put_service(&service).await { - warn!( - "Failed to update service {}/{} with DNS record ID: {}", - namespace, name, e - ); - } - } - } - Err(e) => { - warn!( - "Failed to create DNS record {} for service {}/{}: {}", - record_name, namespace, name, e - ); - } } } Ok(()) } - /// Ensure cluster.local zone exists for tenant, return zone_id - async fn ensure_zone_exists( - &self, - org_id: &str, - project_id: &str, - auth_token: &str, - ) -> Result { - let cache_key = format!("{}/{}", org_id, project_id); + async fn reconcile_tenant_dns(&self, tenant: &TenantRef) -> Result<()> { + let auth_token = issue_controller_token( + &self.iam_server_addr, + CONTROLLER_PRINCIPAL_ID, + &tenant.org_id, + &tenant.project_id, + ) + .await?; + let services = self + .storage + .list_services(&tenant.org_id, &tenant.project_id, None) + .await?; - // Check cache first + for service in services + .iter() + .filter(|service| !service_requires_dns(service) && has_dns_state(service)) { - let cache = self.zone_cache.read().await; - if let Some(zone_id) = cache.get(&cache_key) { - return Ok(zone_id.clone()); + self.clear_service_dns_state( + &tenant.org_id, + &tenant.project_id, + service_namespace(service), + &service.metadata.name, + ) + .await?; + } + + let desired_services = services + .into_iter() + .filter(service_requires_dns) + .collect::>(); + let has_desired_services = !desired_services.is_empty(); + let zone_id = match self + .zone_id_for_tenant(tenant, &auth_token, !desired_services.is_empty()) + .await? + { + Some(zone_id) => zone_id, + None => return Ok(()), + }; + + let mut record_client = match RecordServiceClient::connect(self.flashdns_addr.clone()).await + { + Ok(client) => client, + Err(error) => { + warn!( + address = %self.flashdns_addr, + error = %error, + "failed to connect to FlashDNS" + ); + return Ok(()); + } + }; + + let mut existing_by_name = self + .list_zone_records(&mut record_client, &zone_id, &auth_token) + .await? + .into_iter() + .filter(|record| is_managed_record(record)) + .fold( + HashMap::>::new(), + |mut acc, record| { + acc.entry(record.name.clone()).or_default().push(record); + acc + }, + ); + + for service in desired_services { + let namespace = service_namespace(&service).to_string(); + let record_name = service_record_name(&service); + let cluster_ip = service + .spec + .cluster_ip + .as_deref() + .expect("service_requires_dns guarantees cluster_ip"); + let existing = existing_by_name.remove(&record_name).unwrap_or_default(); + let record = self + .ensure_service_record( + &mut record_client, + &auth_token, + &zone_id, + &record_name, + cluster_ip, + existing, + ) + .await?; + + self.reconcile_service_dns_state( + &tenant.org_id, + &tenant.project_id, + &namespace, + &service.metadata.name, + &zone_id, + &record.id, + ) + .await?; + } + + for stale_records in existing_by_name.into_values() { + for record in stale_records { + delete_record(&mut record_client, &auth_token, &record.id).await?; } } - // Connect to FlashDNS - let mut zone_client = ZoneServiceClient::connect(self.flashdns_addr.clone()).await?; + if !has_desired_services { + self.zone_cache + .write() + .await + .remove(&tenant_cache_key(tenant)); + } + Ok(()) + } + + async fn reconciliation_tenants(&self) -> Result> { + let mut tenants = self + .storage + .list_active_tenants() + .await? + .into_iter() + .collect::>(); + + for cache_key in self.zone_cache.read().await.keys() { + if let Some(tenant) = parse_tenant_cache_key(cache_key) { + tenants.insert(tenant); + } + } + + Ok(tenants.into_iter().collect()) + } + + async fn ensure_service_record( + &self, + client: &mut RecordServiceClient, + auth_token: &str, + zone_id: &str, + record_name: &str, + cluster_ip: &str, + existing: Vec, + ) -> Result { + let mut matching = None; + + for record in existing { + let is_match = record.record_type == "A" && record_a_value(&record) == Some(cluster_ip); + if is_match && matching.is_none() { + matching = Some(record); + continue; + } + + delete_record(client, auth_token, &record.id).await?; + } + + if let Some(record) = matching { + return Ok(record); + } + + Ok(client + .create_record(authorized_request( + CreateRecordRequest { + zone_id: zone_id.to_string(), + name: record_name.to_string(), + record_type: "A".to_string(), + ttl: DNS_RECORD_TTL, + data: Some(RecordData { + data: Some(record_data::Data::A(ARecord { + address: cluster_ip.to_string(), + })), + }), + }, + auth_token, + )) + .await? + .into_inner() + .record + .ok_or_else(|| anyhow::anyhow!("FlashDNS returned empty CreateRecord response"))?) + } + + async fn reconcile_service_dns_state( + &self, + org_id: &str, + project_id: &str, + namespace: &str, + name: &str, + zone_id: &str, + record_id: &str, + ) -> Result<()> { + let Some(mut service) = self + .storage + .get_service(org_id, project_id, namespace, name) + .await? + else { + return Ok(()); + }; + + let mut changed = false; + changed |= set_annotation(&mut service, RECORD_ID_ANNOTATION, record_id); + changed |= set_annotation(&mut service, ZONE_ID_ANNOTATION, zone_id); + + if !changed { + return Ok(()); + } + + bump_resource_version(&mut service); + self.storage.put_service(&service).await?; + Ok(()) + } + + async fn clear_service_dns_state( + &self, + org_id: &str, + project_id: &str, + namespace: &str, + name: &str, + ) -> Result<()> { + let Some(mut service) = self + .storage + .get_service(org_id, project_id, namespace, name) + .await? + else { + return Ok(()); + }; + + let mut changed = false; + changed |= service + .metadata + .annotations + .remove(RECORD_ID_ANNOTATION) + .is_some(); + changed |= service + .metadata + .annotations + .remove(ZONE_ID_ANNOTATION) + .is_some(); + + if !changed { + return Ok(()); + } + + bump_resource_version(&mut service); + self.storage.put_service(&service).await?; + Ok(()) + } + + async fn zone_id_for_tenant( + &self, + tenant: &TenantRef, + auth_token: &str, + create_if_missing: bool, + ) -> Result> { + let cache_key = tenant_cache_key(tenant); + if let Some(zone_id) = self.zone_cache.read().await.get(&cache_key).cloned() { + return Ok(Some(zone_id)); + } + + let mut zone_client = ZoneServiceClient::connect(self.flashdns_addr.clone()).await?; if let Some(zone_id) = self .lookup_zone_id(&mut zone_client, CLUSTER_DOMAIN, auth_token) .await? { - info!( - "Found existing zone {} for tenant {}/{} (zone_id: {})", - CLUSTER_DOMAIN, org_id, project_id, zone_id - ); - - let mut cache = self.zone_cache.write().await; - cache.insert(cache_key.clone(), zone_id.clone()); - - return Ok(zone_id); + self.zone_cache + .write() + .await + .insert(cache_key, zone_id.clone()); + return Ok(Some(zone_id)); } - // Create zone - info!( - "Creating zone {} for tenant {}/{}", - CLUSTER_DOMAIN, org_id, project_id - ); - - let create_req = CreateZoneRequest { - name: CLUSTER_DOMAIN.to_string(), - org_id: org_id.to_string(), - project_id: project_id.to_string(), - primary_ns: "ns1.plasmacloud.io".to_string(), - admin_email: "admin@plasmacloud.io".to_string(), - }; + if !create_if_missing { + return Ok(None); + } let response = match zone_client - .create_zone(authorized_request(create_req, auth_token)) + .create_zone(authorized_request( + CreateZoneRequest { + name: CLUSTER_DOMAIN.to_string(), + org_id: tenant.org_id.clone(), + project_id: tenant.project_id.clone(), + primary_ns: "ns1.plasmacloud.io".to_string(), + admin_email: "admin@plasmacloud.io".to_string(), + }, + auth_token, + )) .await { Ok(response) => response, Err(status) if status.code() == Code::AlreadyExists => { - debug!( - "Zone {} already exists for tenant {}/{}; retrying lookup", - CLUSTER_DOMAIN, org_id, project_id - ); - for _ in 0..5 { if let Some(zone_id) = self .lookup_zone_id(&mut zone_client, CLUSTER_DOMAIN, auth_token) .await? { - let mut cache = self.zone_cache.write().await; - cache.insert(cache_key.clone(), zone_id.clone()); - return Ok(zone_id); + self.zone_cache + .write() + .await + .insert(cache_key.clone(), zone_id.clone()); + return Ok(Some(zone_id)); } sleep(Duration::from_millis(200)).await; } @@ -323,32 +385,28 @@ impl FlashDnsController { return Err(anyhow::anyhow!( "zone {} already exists for tenant {}/{} but could not be listed", CLUSTER_DOMAIN, - org_id, - project_id + tenant.org_id, + tenant.project_id )); } Err(status) => return Err(status.into()), }; - let zone = response + + let zone_id = response .into_inner() .zone - .ok_or_else(|| anyhow::anyhow!("FlashDNS returned empty zone"))?; - - info!( - "Created zone {} for tenant {}/{} (zone_id: {})", - CLUSTER_DOMAIN, org_id, project_id, zone.id - ); - - // Cache zone_id - let mut cache = self.zone_cache.write().await; - cache.insert(cache_key, zone.id.clone()); - - Ok(zone.id) + .ok_or_else(|| anyhow::anyhow!("FlashDNS returned empty zone"))? + .id; + self.zone_cache + .write() + .await + .insert(cache_key, zone_id.clone()); + Ok(Some(zone_id)) } async fn lookup_zone_id( &self, - zone_client: &mut ZoneServiceClient, + zone_client: &mut ZoneServiceClient, zone_name: &str, auth_token: &str, ) -> Result> { @@ -360,11 +418,10 @@ impl FlashDnsController { .get_zone(authorized_request(get_req, auth_token)) .await { - Ok(response) => Ok(response.into_inner().zone.map(|z| z.id)), - Err(e) if e.code() == Code::NotFound => Ok(None), - Err(e) => { - debug!("Exact zone lookup failed for {}: {}", zone_name, e); - + Ok(response) => Ok(response.into_inner().zone.map(|zone| zone.id)), + Err(error) if error.code() == Code::NotFound => Ok(None), + Err(error) => { + debug!("exact zone lookup failed for {}: {}", zone_name, error); let list_req = ListZonesRequest { org_id: String::new(), project_id: String::new(), @@ -381,11 +438,13 @@ impl FlashDnsController { .into_inner() .zones .into_iter() - .find(|z| z.name.trim_end_matches('.') == zone_name.trim_end_matches('.')) - .map(|z| z.id)), + .find(|zone| { + zone.name.trim_end_matches('.') == zone_name.trim_end_matches('.') + }) + .map(|zone| zone.id)), Err(list_error) => { debug!( - "Zone list fallback failed for {}: {}", + "zone list fallback failed for {}: {}", zone_name, list_error ); Ok(None) @@ -395,33 +454,134 @@ impl FlashDnsController { } } - /// Cleanup DNS record when Service is deleted (not automatically triggered in MVP) - #[allow(dead_code)] - async fn cleanup_dns_record( + async fn list_zone_records( &self, - org_id: &str, - project_id: &str, - record_id: &str, - _zone_id: &str, - ) -> Result<()> { - let mut record_client = RecordServiceClient::connect(self.flashdns_addr.clone()).await?; - let auth_token = issue_controller_token( - &self.iam_server_addr, - CONTROLLER_PRINCIPAL_ID, - org_id, - project_id, - ) - .await?; + client: &mut RecordServiceClient, + zone_id: &str, + auth_token: &str, + ) -> Result> { + let mut records = Vec::new(); + let mut page_token = String::new(); - let delete_req = DeleteRecordRequest { - id: record_id.to_string(), - }; + loop { + let response = client + .list_records(authorized_request( + ListRecordsRequest { + zone_id: zone_id.to_string(), + name_filter: String::new(), + type_filter: String::new(), + page_size: 256, + page_token: page_token.clone(), + }, + auth_token, + )) + .await? + .into_inner(); - record_client - .delete_record(authorized_request(delete_req, &auth_token)) - .await?; + records.extend(response.records); + if response.next_page_token.is_empty() { + break; + } + page_token = response.next_page_token; + } - info!("Deleted DNS record {} from FlashDNS", record_id); - Ok(()) + Ok(records) + } +} + +fn tenant_cache_key(tenant: &TenantRef) -> String { + format!("{}/{}", tenant.org_id, tenant.project_id) +} + +fn parse_tenant_cache_key(value: &str) -> Option { + let (org_id, project_id) = value.split_once('/')?; + Some(TenantRef { + org_id: org_id.to_string(), + project_id: project_id.to_string(), + }) +} + +fn service_requires_dns(service: &Service) -> bool { + service.spec.cluster_ip.is_some() +} + +fn has_dns_state(service: &Service) -> bool { + service + .metadata + .annotations + .contains_key(RECORD_ID_ANNOTATION) + || service + .metadata + .annotations + .contains_key(ZONE_ID_ANNOTATION) +} + +fn service_namespace(service: &Service) -> &str { + service.metadata.namespace.as_deref().unwrap_or("default") +} + +fn service_record_name(service: &Service) -> String { + format!( + "{}.{}.svc", + service.metadata.name, + service_namespace(service) + ) +} + +fn is_managed_record(record: &RecordInfo) -> bool { + record.name.ends_with(".svc") +} + +fn record_a_value(record: &RecordInfo) -> Option<&str> { + record + .data + .as_ref() + .and_then(|data| data.data.as_ref()) + .and_then(|data| match data { + record_data::Data::A(record) => Some(record.address.as_str()), + _ => None, + }) +} + +fn set_annotation(service: &mut Service, key: &str, value: &str) -> bool { + match service.metadata.annotations.get(key) { + Some(current) if current == value => false, + _ => { + service + .metadata + .annotations + .insert(key.to_string(), value.to_string()); + true + } + } +} + +fn bump_resource_version(service: &mut Service) { + let current = service + .metadata + .resource_version + .as_deref() + .and_then(|value| value.parse::().ok()) + .unwrap_or(0); + service.metadata.resource_version = Some((current + 1).to_string()); +} + +async fn delete_record( + client: &mut RecordServiceClient, + auth_token: &str, + record_id: &str, +) -> Result<()> { + match client + .delete_record(authorized_request( + DeleteRecordRequest { + id: record_id.to_string(), + }, + auth_token, + )) + .await + { + Ok(_) => Ok(()), + Err(status) if status.code() == Code::NotFound => Ok(()), + Err(status) => Err(status.into()), } } diff --git a/k8shost/crates/k8shost-server/src/scheduler.rs b/k8shost/crates/k8shost-server/src/scheduler.rs index b4c6594..7976990 100644 --- a/k8shost/crates/k8shost-server/src/scheduler.rs +++ b/k8shost/crates/k8shost-server/src/scheduler.rs @@ -3,10 +3,10 @@ //! Assigns pending pods to available nodes based on resource availability and scheduling policies. //! Implements tenant-aware scheduling with quota enforcement via CreditService. -use crate::storage::Storage; +use crate::storage::{Storage, TenantRef}; use creditservice_client::Client as CreditServiceClient; use k8shost_types::{Node, Pod}; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use tokio::sync::RwLock; @@ -83,7 +83,6 @@ impl Scheduler { /// Schedule all pending pods across all tenants async fn schedule_pending_pods(&self) -> anyhow::Result<()> { - // Get list of active tenants from storage (query pods for unique org_id/project_id) let tenants = self.get_active_tenants().await?; if tenants.is_empty() { @@ -93,11 +92,14 @@ impl Scheduler { info!("Scheduling for {} active tenant(s)", tenants.len()); - for (org_id, project_id) in tenants { - if let Err(e) = self.schedule_tenant_pods(&org_id, &project_id).await { + for tenant in tenants { + if let Err(e) = self + .schedule_tenant_pods(&tenant.org_id, &tenant.project_id) + .await + { warn!( "Failed to schedule pods for tenant {}/{}: {}", - org_id, project_id, e + tenant.org_id, tenant.project_id, e ); } } @@ -105,31 +107,9 @@ impl Scheduler { Ok(()) } - /// Get list of active tenants from storage (unique org_id/project_id pairs) - async fn get_active_tenants(&self) -> anyhow::Result> { - // Query all pods to find unique (org_id, project_id) combinations - // This is a pragmatic approach that doesn't require IAM changes - let all_pods = self.storage.list_all_pods().await.unwrap_or_else(|e| { - warn!("Failed to query all pods for tenant discovery: {}", e); - vec![] - }); - - let mut tenants: HashSet<(String, String)> = HashSet::new(); - - for pod in all_pods { - if let (Some(org_id), Some(project_id)) = - (pod.metadata.org_id.clone(), pod.metadata.project_id.clone()) - { - tenants.insert((org_id, project_id)); - } - } - - // Fall back to default tenant if no pods found - if tenants.is_empty() { - tenants.insert(("default-org".to_string(), "default-project".to_string())); - } - - Ok(tenants.into_iter().collect()) + /// Get list of active tenants from storage-discovered resource metadata. + async fn get_active_tenants(&self) -> anyhow::Result> { + self.storage.list_active_tenants().await.map_err(Into::into) } /// Schedule pending pods for a specific tenant @@ -431,7 +411,7 @@ impl Scheduler { #[cfg(test)] mod tests { use super::*; - use k8shost_types::{NodeCondition, NodeStatus, ObjectMeta, PodSpec, PodStatus}; + use k8shost_types::{NodeCondition, NodeStatus, ObjectMeta}; #[tokio::test] async fn test_is_node_ready() { diff --git a/k8shost/crates/k8shost-server/src/storage.rs b/k8shost/crates/k8shost-server/src/storage.rs index 59659c6..e0443a0 100644 --- a/k8shost/crates/k8shost-server/src/storage.rs +++ b/k8shost/crates/k8shost-server/src/storage.rs @@ -5,6 +5,7 @@ use flaredb_client::RdbClient; use k8shost_types::{Deployment, Node, Pod, Service}; +use serde::Deserialize; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::Mutex; @@ -15,6 +16,25 @@ pub struct Storage { client: Arc>, } +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] +pub struct TenantRef { + pub org_id: String, + pub project_id: String, +} + +#[derive(Debug, Deserialize)] +struct ResourceMetadataEnvelope { + metadata: ResourceMetadataRef, +} + +#[derive(Debug, Deserialize)] +struct ResourceMetadataRef { + #[serde(default)] + org_id: Option, + #[serde(default)] + project_id: Option, +} + impl Storage { /// Create a new storage instance with FlareDB backend pub async fn new(pd_addr: String) -> Result> { @@ -220,6 +240,34 @@ impl Storage { Ok(pods) } + /// List active tenants discovered from all persisted k8shost resources. + pub async fn list_active_tenants(&self) -> Result, Status> { + let prefix = b"k8s/".to_vec(); + let mut end_key = prefix.clone(); + end_key.push(0xff); + + let mut tenants = std::collections::BTreeSet::new(); + let mut start_key = prefix; + + loop { + let mut client = self.client.lock().await; + let (_keys, values, next) = client + .raw_scan(start_key.clone(), end_key.clone(), 1000) + .await + .map_err(|e| Status::internal(format!("FlareDB scan failed: {}", e)))?; + + tenants.extend(collect_active_tenants(values)); + + if let Some(next_key) = next { + start_key = next_key; + } else { + break; + } + } + + Ok(tenants.into_iter().collect()) + } + /// Delete a pod pub async fn delete_pod( &self, @@ -681,3 +729,62 @@ impl Storage { Ok(existed) } } + +fn collect_active_tenants( + values: impl IntoIterator>, +) -> std::collections::BTreeSet { + let mut tenants = std::collections::BTreeSet::new(); + + for value in values { + let Ok(resource) = serde_json::from_slice::(&value) else { + continue; + }; + + let (Some(org_id), Some(project_id)) = + (resource.metadata.org_id, resource.metadata.project_id) + else { + continue; + }; + + tenants.insert(TenantRef { org_id, project_id }); + } + + tenants +} + +#[cfg(test)] +mod tests { + use super::{collect_active_tenants, TenantRef}; + + #[test] + fn collect_active_tenants_discovers_unique_tenants_from_mixed_resources() { + let tenants = collect_active_tenants(vec![ + br#"{"metadata":{"name":"pod-a","org_id":"org-a","project_id":"project-a"}}"#.to_vec(), + br#"{"metadata":{"name":"svc-a","org_id":"org-a","project_id":"project-a"}}"#.to_vec(), + br#"{"metadata":{"name":"node-b","org_id":"org-b","project_id":"project-b"}}"#.to_vec(), + br#"{"metadata":{"name":"deploy-c","org_id":"org-c","project_id":"project-c"}}"# + .to_vec(), + br#"{"metadata":{"name":"invalid-missing-project","org_id":"org-z"}}"#.to_vec(), + br#"not-json"#.to_vec(), + ]); + + let tenants = tenants.into_iter().collect::>(); + assert_eq!( + tenants, + vec![ + TenantRef { + org_id: "org-a".to_string(), + project_id: "project-a".to_string(), + }, + TenantRef { + org_id: "org-b".to_string(), + project_id: "project-b".to_string(), + }, + TenantRef { + org_id: "org-c".to_string(), + project_id: "project-c".to_string(), + }, + ] + ); + } +} diff --git a/nix/test-cluster/run-cluster.sh b/nix/test-cluster/run-cluster.sh index d76865f..3d87ee6 100755 --- a/nix/test-cluster/run-cluster.sh +++ b/nix/test-cluster/run-cluster.sh @@ -3581,8 +3581,8 @@ validate_k8shost_flow() { k8s_http_tunnel="$(start_ssh_tunnel node01 18087 8085)" trap 'stop_ssh_tunnel node01 "${k8s_http_tunnel}"; stop_ssh_tunnel node01 "${k8s_tunnel}"; stop_ssh_tunnel node01 "${lb_tunnel}"; stop_ssh_tunnel node01 "${dns_tunnel}"; stop_ssh_tunnel node01 "${prism_tunnel}"; stop_ssh_tunnel node01 "${iam_tunnel}"' RETURN - local org_id="default-org" - local project_id="default-project" + local org_id="k8shost-org" + local project_id="k8shost-project" local principal_id="k8shost-smoke-$(date +%s)" local token node_name deployment_name pod_name service_name service_port token="$(issue_project_admin_token 15080 "${org_id}" "${project_id}" "${principal_id}")" @@ -3801,23 +3801,58 @@ validate_k8shost_flow() { -proto "${K8SHOST_PROTO}" \ -d "$(jq -cn --arg ns "default" --arg name "${pod_name}" '{namespace:$ns, name:$name}')" \ 127.0.0.1:15087 k8shost.PodService/DeletePod >/dev/null - grpcurl -plaintext \ - -H "authorization: Bearer ${token}" \ - -import-path "${FLASHDNS_PROTO_DIR}" \ - -proto "${FLASHDNS_PROTO}" \ - -d "$(jq -cn --arg id "${record_id}" '{id:$id}')" \ - 127.0.0.1:15084 flashdns.v1.RecordService/DeleteRecord >/dev/null - grpcurl -plaintext \ - -H "authorization: Bearer ${token}" \ - -import-path "${FIBERLB_PROTO_DIR}" \ - -proto "${FIBERLB_PROTO}" \ - -d "$(jq -cn --arg id "${lb_id}" '{id:$id}')" \ - 127.0.0.1:15085 fiberlb.v1.LoadBalancerService/DeleteLoadBalancer >/dev/null + + deadline=$((SECONDS + HTTP_WAIT_TIMEOUT)) + while true; do + if ! ssh_node node01 "dig @127.0.0.1 -p 5353 +short ${service_name}.default.svc.cluster.local A | grep -q ." >/dev/null 2>&1; then + break + fi + if (( SECONDS >= deadline )); then + die "timed out waiting for K8sHost FlashDNS cleanup for ${service_name}" + fi + sleep 2 + done + + deadline=$((SECONDS + HTTP_WAIT_TIMEOUT)) + while true; do + local lb_list_json + lb_list_json="$(grpcurl -plaintext \ + -H "authorization: Bearer ${token}" \ + -import-path "${FIBERLB_PROTO_DIR}" \ + -proto "${FIBERLB_PROTO}" \ + -d "$(jq -cn --arg org "${org_id}" --arg project "${project_id}" '{orgId:$org, projectId:$project, pageSize:100}')" \ + 127.0.0.1:15085 fiberlb.v1.LoadBalancerService/ListLoadBalancers 2>/dev/null || true)" + if [[ -n "${lb_list_json}" ]] && ! printf '%s' "${lb_list_json}" | jq -e --arg id "${lb_id}" '.loadbalancers | any(.id == $id)' >/dev/null 2>&1; then + break + fi + if (( SECONDS >= deadline )); then + die "timed out waiting for K8sHost FiberLB cleanup for ${service_name}" + fi + sleep 2 + done + + deadline=$((SECONDS + HTTP_WAIT_TIMEOUT)) + while true; do + if ! grpcurl -plaintext \ + -H "authorization: Bearer ${token}" \ + -import-path "${FLASHDNS_PROTO_DIR}" \ + -proto "${FLASHDNS_PROTO}" \ + -d "$(jq -cn --arg id "${record_id}" '{id:$id}')" \ + 127.0.0.1:15084 flashdns.v1.RecordService/GetRecord >/dev/null 2>&1; then + break + fi + if (( SECONDS >= deadline )); then + die "timed out waiting for K8sHost FlashDNS record deletion for ${service_name}" + fi + sleep 2 + done trap - RETURN + stop_ssh_tunnel node01 "${k8s_http_tunnel}" stop_ssh_tunnel node01 "${k8s_tunnel}" stop_ssh_tunnel node01 "${lb_tunnel}" stop_ssh_tunnel node01 "${dns_tunnel}" + stop_ssh_tunnel node01 "${prism_tunnel}" stop_ssh_tunnel node01 "${iam_tunnel}" }