From 74d1e197cb66413bb86cc4bbadd7dddeefa87e95 Mon Sep 17 00:00:00 2001 From: centra Date: Wed, 1 Apr 2026 14:15:13 +0900 Subject: [PATCH] Persist k8shost controller tenant registries --- .../k8shost-server/src/fiberlb_controller.rs | 34 ++--- .../k8shost-server/src/flashdns_controller.rs | 22 ++-- k8shost/crates/k8shost-server/src/storage.rs | 120 +++++++++++++++++- 3 files changed, 137 insertions(+), 39 deletions(-) diff --git a/k8shost/crates/k8shost-server/src/fiberlb_controller.rs b/k8shost/crates/k8shost-server/src/fiberlb_controller.rs index cb8e78a..6199fca 100644 --- a/k8shost/crates/k8shost-server/src/fiberlb_controller.rs +++ b/k8shost/crates/k8shost-server/src/fiberlb_controller.rs @@ -35,7 +35,6 @@ pub struct FiberLbController { fiberlb_addr: String, iam_server_addr: String, interval: Duration, - known_tenants: Arc>>, } impl FiberLbController { @@ -45,7 +44,6 @@ impl FiberLbController { fiberlb_addr, iam_server_addr, interval: Duration::from_secs(10), - known_tenants: Arc::new(tokio::sync::RwLock::new(BTreeSet::new())), } } @@ -109,10 +107,9 @@ impl FiberLbController { .filter(service_is_load_balancer) .collect::>(); if !desired_services.is_empty() { - self.known_tenants - .write() - .await - .insert(tenant_cache_key(tenant)); + self.storage + .remember_controller_tenant("fiberlb", tenant) + .await?; } let desired_names = desired_services .iter() @@ -170,10 +167,9 @@ impl FiberLbController { } if desired_names.is_empty() { - self.known_tenants - .write() - .await - .remove(&tenant_cache_key(tenant)); + self.storage + .forget_controller_tenant("fiberlb", tenant) + .await?; } Ok(()) @@ -187,10 +183,8 @@ impl FiberLbController { .into_iter() .collect::>(); - for cache_key in self.known_tenants.read().await.iter() { - if let Some(tenant) = parse_tenant_cache_key(cache_key) { - tenants.insert(tenant); - } + for tenant in self.storage.list_controller_tenants("fiberlb").await? { + tenants.insert(tenant); } Ok(tenants.into_iter().collect()) @@ -685,18 +679,6 @@ fn service_is_load_balancer(service: &Service) -> bool { service.spec.r#type.as_deref() == Some("LoadBalancer") } -fn tenant_cache_key(tenant: &TenantRef) -> String { - format!("{}/{}", tenant.org_id, tenant.project_id) -} - -fn parse_tenant_cache_key(value: &str) -> Option { - let (org_id, project_id) = value.split_once('/')?; - Some(TenantRef { - org_id: org_id.to_string(), - project_id: project_id.to_string(), - }) -} - fn has_load_balancer_state(service: &Service) -> bool { service.metadata.annotations.contains_key(LB_ID_ANNOTATION) || service diff --git a/k8shost/crates/k8shost-server/src/flashdns_controller.rs b/k8shost/crates/k8shost-server/src/flashdns_controller.rs index 64d29bf..9f77f56 100644 --- a/k8shost/crates/k8shost-server/src/flashdns_controller.rs +++ b/k8shost/crates/k8shost-server/src/flashdns_controller.rs @@ -116,6 +116,11 @@ impl FlashDnsController { .filter(service_requires_dns) .collect::>(); let has_desired_services = !desired_services.is_empty(); + if has_desired_services { + self.storage + .remember_controller_tenant("flashdns", tenant) + .await?; + } let zone_id = match self .zone_id_for_tenant(tenant, &auth_token, !desired_services.is_empty()) .await? @@ -192,6 +197,9 @@ impl FlashDnsController { .write() .await .remove(&tenant_cache_key(tenant)); + self.storage + .forget_controller_tenant("flashdns", tenant) + .await?; } Ok(()) @@ -205,10 +213,8 @@ impl FlashDnsController { .into_iter() .collect::>(); - for cache_key in self.zone_cache.read().await.keys() { - if let Some(tenant) = parse_tenant_cache_key(cache_key) { - tenants.insert(tenant); - } + for tenant in self.storage.list_controller_tenants("flashdns").await? { + tenants.insert(tenant); } Ok(tenants.into_iter().collect()) @@ -493,14 +499,6 @@ fn tenant_cache_key(tenant: &TenantRef) -> String { format!("{}/{}", tenant.org_id, tenant.project_id) } -fn parse_tenant_cache_key(value: &str) -> Option { - let (org_id, project_id) = value.split_once('/')?; - Some(TenantRef { - org_id: org_id.to_string(), - project_id: project_id.to_string(), - }) -} - fn service_requires_dns(service: &Service) -> bool { service.spec.cluster_ip.is_some() } diff --git a/k8shost/crates/k8shost-server/src/storage.rs b/k8shost/crates/k8shost-server/src/storage.rs index e0443a0..dc7750e 100644 --- a/k8shost/crates/k8shost-server/src/storage.rs +++ b/k8shost/crates/k8shost-server/src/storage.rs @@ -35,6 +35,8 @@ struct ResourceMetadataRef { project_id: Option, } +const CONTROLLER_TENANTS_PREFIX: &str = "controllers"; + impl Storage { /// Create a new storage instance with FlareDB backend pub async fn new(pd_addr: String) -> Result> { @@ -268,6 +270,69 @@ impl Storage { Ok(tenants.into_iter().collect()) } + /// Persist a controller-specific tenant registry entry so cleanup survives restarts. + pub async fn remember_controller_tenant( + &self, + controller: &str, + tenant: &TenantRef, + ) -> Result<(), Status> { + let key = controller_tenant_key(controller, &tenant.org_id, &tenant.project_id); + let mut client = self.client.lock().await; + client + .raw_put(key, Vec::new()) + .await + .map_err(|e| Status::internal(format!("FlareDB put failed: {}", e)))?; + Ok(()) + } + + /// Remove a controller-specific tenant registry entry after authoritative cleanup. + pub async fn forget_controller_tenant( + &self, + controller: &str, + tenant: &TenantRef, + ) -> Result<(), Status> { + let key = controller_tenant_key(controller, &tenant.org_id, &tenant.project_id); + let mut client = self.client.lock().await; + client + .raw_delete(key) + .await + .map_err(|e| Status::internal(format!("FlareDB delete failed: {}", e)))?; + Ok(()) + } + + /// List controller-specific tenants that still require reconciliation or cleanup. + pub async fn list_controller_tenants( + &self, + controller: &str, + ) -> Result, Status> { + let prefix = controller_tenant_prefix(controller); + let end_key = range_end_key(&prefix); + let mut tenants = std::collections::BTreeSet::new(); + let mut start_key = prefix; + + loop { + let mut client = self.client.lock().await; + let (keys, _values, next) = client + .raw_scan(start_key.clone(), end_key.clone(), 1000) + .await + .map_err(|e| Status::internal(format!("FlareDB scan failed: {}", e)))?; + + for key in keys { + if let Some(tenant) = parse_controller_tenant_key(controller, &key) { + tenants.insert(tenant); + } + } + + if let Some(next_key) = next { + start_key = next_key; + } else { + break; + } + } + + Ok(tenants.into_iter().collect()) + } + /// Delete a pod pub async fn delete_pod( &self, @@ -752,9 +817,47 @@ fn collect_active_tenants( tenants } +fn controller_tenant_key(controller: &str, org_id: &str, project_id: &str) -> Vec { + format!("{CONTROLLER_TENANTS_PREFIX}/{controller}/tenants/{org_id}/{project_id}").into_bytes() +} + +fn controller_tenant_prefix(controller: &str) -> Vec { + format!("{CONTROLLER_TENANTS_PREFIX}/{controller}/tenants/").into_bytes() +} + +fn parse_controller_tenant_key(controller: &str, key: &[u8]) -> Option { + let key = std::str::from_utf8(key).ok()?; + let prefix = format!("{CONTROLLER_TENANTS_PREFIX}/{controller}/tenants/"); + let suffix = key.strip_prefix(&prefix)?; + let (org_id, project_id) = suffix.split_once('/')?; + if org_id.is_empty() || project_id.is_empty() || project_id.contains('/') { + return None; + } + Some(TenantRef { + org_id: org_id.to_string(), + project_id: project_id.to_string(), + }) +} + +fn range_end_key(prefix: &[u8]) -> Vec { + let mut end_key = prefix.to_vec(); + if let Some(last) = end_key.last_mut() { + if *last == 0xff { + end_key.push(0x00); + } else { + *last += 1; + } + } else { + end_key.push(0xff); + } + end_key +} + #[cfg(test)] mod tests { - use super::{collect_active_tenants, TenantRef}; + use super::{ + collect_active_tenants, controller_tenant_key, parse_controller_tenant_key, TenantRef, + }; #[test] fn collect_active_tenants_discovers_unique_tenants_from_mixed_resources() { @@ -787,4 +890,19 @@ mod tests { ] ); } + + #[test] + fn parse_controller_tenant_key_round_trips_controller_registry_entries() { + let key = controller_tenant_key("flashdns", "org-a", "project-a"); + let tenant = parse_controller_tenant_key("flashdns", &key) + .expect("controller tenant key should parse"); + assert_eq!( + tenant, + TenantRef { + org_id: "org-a".to_string(), + project_id: "project-a".to_string(), + } + ); + assert!(parse_controller_tenant_key("fiberlb", &key).is_none()); + } }