From a1aa3f8245d8b0b5f5de5af98eacde2946da6393 Mon Sep 17 00:00:00 2001
From: centra
Date: Wed, 1 Apr 2026 13:50:21 +0900
Subject: [PATCH] Make k8shost publications authoritative across tenants
---
.../k8shost-server/src/fiberlb_controller.rs | 1204 +++++++++++------
.../k8shost-server/src/flashdns_controller.rs | 720 ++++++----
.../crates/k8shost-server/src/scheduler.rs | 44 +-
k8shost/crates/k8shost-server/src/storage.rs | 107 ++
nix/test-cluster/run-cluster.sh | 63 +-
5 files changed, 1380 insertions(+), 758 deletions(-)
diff --git a/k8shost/crates/k8shost-server/src/fiberlb_controller.rs b/k8shost/crates/k8shost-server/src/fiberlb_controller.rs
index f639a7c..cb8e78a 100644
--- a/k8shost/crates/k8shost-server/src/fiberlb_controller.rs
+++ b/k8shost/crates/k8shost-server/src/fiberlb_controller.rs
@@ -1,10 +1,11 @@
//! FiberLB Controller - Manages LoadBalancer service VIP allocation
//!
-//! This controller watches for Services with type=LoadBalancer and provisions
-//! external VIPs by creating LoadBalancer resources in FiberLB.
+//! This controller continuously reconciles tenant-scoped LoadBalancer Services
+//! into FiberLB resources and removes stale publications when Services disappear
+//! or stop being externally published.
use crate::auth::{authorized_request, issue_controller_token};
-use crate::storage::Storage;
+use crate::storage::{Storage, TenantRef};
use anyhow::Result;
use fiberlb_api::backend_service_client::BackendServiceClient;
use fiberlb_api::listener_service_client::ListenerServiceClient;
@@ -12,15 +13,21 @@ use fiberlb_api::load_balancer_service_client::LoadBalancerServiceClient;
use fiberlb_api::pool_service_client::PoolServiceClient;
use fiberlb_api::{
CreateBackendRequest, CreateListenerRequest, CreateLoadBalancerRequest, CreatePoolRequest,
- DeleteLoadBalancerRequest, ListenerProtocol, PoolAlgorithm, PoolProtocol,
+ DeleteBackendRequest, DeleteListenerRequest, DeleteLoadBalancerRequest, ListBackendsRequest,
+ ListListenersRequest, ListLoadBalancersRequest, ListPoolsRequest, ListenerProtocol,
+ LoadBalancer, PoolAlgorithm, PoolProtocol,
};
-use k8shost_types::{LoadBalancerIngress, LoadBalancerStatus, ServiceStatus};
+use k8shost_types::{LoadBalancerIngress, LoadBalancerStatus, Pod, Service, ServiceStatus};
+use std::collections::{BTreeSet, HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;
+use tonic::{transport::Channel, Code};
use tracing::{debug, info, warn};
const CONTROLLER_PRINCIPAL_ID: &str = "k8shost-controller";
+const LB_ID_ANNOTATION: &str = "fiberlb.plasmacloud.io/lb-id";
+const POOL_ID_ANNOTATION: &str = "fiberlb.plasmacloud.io/pool-id";
/// FiberLB controller for managing LoadBalancer service VIPs
pub struct FiberLbController {
@@ -28,20 +35,20 @@ pub struct FiberLbController {
fiberlb_addr: String,
iam_server_addr: String,
interval: Duration,
+ known_tenants: Arc>>,
}
impl FiberLbController {
- /// Create a new FiberLB controller
pub fn new(storage: Arc, fiberlb_addr: String, iam_server_addr: String) -> Self {
Self {
storage,
fiberlb_addr,
iam_server_addr,
- interval: Duration::from_secs(10), // Check every 10 seconds
+ interval: Duration::from_secs(10),
+ known_tenants: Arc::new(tokio::sync::RwLock::new(BTreeSet::new())),
}
}
- /// Start the controller loop
pub async fn run(self: Arc) {
info!(
"FiberLB controller started (FiberLB at {}, {}s interval)",
@@ -50,420 +57,28 @@ impl FiberLbController {
);
loop {
- if let Err(e) = self.reconcile_loadbalancers().await {
- warn!("FiberLB controller cycle failed: {}", e);
+ if let Err(error) = self.reconcile_loadbalancers().await {
+ warn!(error = %error, "FiberLB controller cycle failed");
}
sleep(self.interval).await;
}
}
- /// Reconcile LoadBalancer services across all tenants
async fn reconcile_loadbalancers(&self) -> Result<()> {
- // For MVP, iterate through known tenants
- // In production, would get active tenants from IAM or FlareDB
- let tenants = vec![("default-org".to_string(), "default-project".to_string())];
-
- for (org_id, project_id) in tenants {
- if let Err(e) = self
- .reconcile_tenant_loadbalancers(&org_id, &project_id)
- .await
- {
- warn!(
- "Failed to reconcile LoadBalancers for tenant {}/{}: {}",
- org_id, project_id, e
- );
- }
- }
-
- Ok(())
- }
-
- /// Reconcile LoadBalancer services for a specific tenant
- async fn reconcile_tenant_loadbalancers(&self, org_id: &str, project_id: &str) -> Result<()> {
- // Get all services for this tenant
- let services = self.storage.list_services(org_id, project_id, None).await?;
-
- // Filter for LoadBalancer services that need provisioning
- let lb_services: Vec<_> = services
- .into_iter()
- .filter(|svc| {
- // Service is a LoadBalancer if:
- // 1. type is "LoadBalancer"
- // 2. status is None OR status.load_balancer is None (not yet provisioned)
- svc.spec.r#type.as_deref() == Some("LoadBalancer")
- && (svc.status.is_none()
- || svc
- .status
- .as_ref()
- .and_then(|s| s.load_balancer.as_ref())
- .is_none())
- })
- .collect();
-
- if lb_services.is_empty() {
- debug!(
- "No LoadBalancer services to provision for tenant {}/{}",
- org_id, project_id
- );
+ let tenants = self.reconciliation_tenants().await?;
+ if tenants.is_empty() {
+ debug!("No active tenants need FiberLB reconciliation");
return Ok(());
}
- info!(
- "Found {} LoadBalancer service(s) to provision for tenant {}/{}",
- lb_services.len(),
- org_id,
- project_id
- );
-
- let auth_token = issue_controller_token(
- &self.iam_server_addr,
- CONTROLLER_PRINCIPAL_ID,
- org_id,
- project_id,
- )
- .await?;
-
- // Connect to FiberLB services
- let mut lb_client =
- match LoadBalancerServiceClient::connect(self.fiberlb_addr.clone()).await {
- Ok(client) => client,
- Err(e) => {
- warn!(
- "Failed to connect to FiberLB at {}: {}",
- self.fiberlb_addr, e
- );
- return Ok(());
- }
- };
-
- let mut pool_client = match PoolServiceClient::connect(self.fiberlb_addr.clone()).await {
- Ok(client) => client,
- Err(e) => {
- warn!("Failed to connect to FiberLB PoolService: {}", e);
- return Ok(());
- }
- };
-
- let mut listener_client =
- match ListenerServiceClient::connect(self.fiberlb_addr.clone()).await {
- Ok(client) => client,
- Err(e) => {
- warn!("Failed to connect to FiberLB ListenerService: {}", e);
- return Ok(());
- }
- };
-
- let mut backend_client =
- match BackendServiceClient::connect(self.fiberlb_addr.clone()).await {
- Ok(client) => client,
- Err(e) => {
- warn!("Failed to connect to FiberLB BackendService: {}", e);
- return Ok(());
- }
- };
-
- // Provision each LoadBalancer service
- for mut service in lb_services {
- let namespace = service
- .metadata
- .namespace
- .clone()
- .unwrap_or_else(|| "default".to_string());
- let name = service.metadata.name.clone();
-
- info!(
- "Provisioning LoadBalancer for service {}/{}",
- namespace, name
- );
-
- // Create LoadBalancer in FiberLB
- let lb_name = format!("{}.{}", name, namespace);
- let mut allocated_vip: Option = None;
- let create_req = CreateLoadBalancerRequest {
- name: lb_name.clone(),
- org_id: org_id.to_string(),
- project_id: project_id.to_string(),
- description: format!("k8s service {}/{}", namespace, name),
- vip_address: String::new(),
- };
-
- let lb_id = match lb_client
- .create_load_balancer(authorized_request(create_req, &auth_token))
- .await
- {
- Ok(response) => {
- let lb = response.into_inner().loadbalancer;
- if let Some(lb) = lb {
- let vip = if lb.vip_address.is_empty() {
- warn!("FiberLB returned LoadBalancer without VIP");
- "0.0.0.0".to_string()
- } else {
- lb.vip_address.clone()
- };
-
- info!(
- "FiberLB allocated VIP {} for service {}/{}",
- vip, namespace, name
- );
- allocated_vip = Some(vip);
- lb.id
- } else {
- warn!("FiberLB returned empty LoadBalancer response");
- continue;
- }
- }
- Err(e) => {
- warn!(
- "Failed to create LoadBalancer in FiberLB for service {}/{}: {}",
- namespace, name, e
- );
- continue;
- }
- };
-
- // Create Pool for this LoadBalancer
- let pool_name = format!("{}-pool", lb_name);
- let pool_id = match pool_client
- .create_pool(authorized_request(
- CreatePoolRequest {
- name: pool_name.clone(),
- loadbalancer_id: lb_id.clone(),
- algorithm: PoolAlgorithm::RoundRobin as i32,
- protocol: PoolProtocol::Tcp as i32,
- session_persistence: None,
- },
- &auth_token,
- ))
- .await
- {
- Ok(response) => {
- let pool = response.into_inner().pool;
- if let Some(pool) = pool {
- info!(
- "Created Pool {} for service {}/{}",
- pool.id, namespace, name
- );
- pool.id
- } else {
- warn!("Failed to create Pool for service {}/{}", namespace, name);
- continue;
- }
- }
- Err(e) => {
- warn!(
- "Failed to create Pool for service {}/{}: {}",
- namespace, name, e
- );
- continue;
- }
- };
-
- // Create Listeners for each Service port
- let mut listeners_ready = true;
- for svc_port in &service.spec.ports {
- let listener_name = format!(
- "{}-listener-{}",
- lb_name,
- svc_port
- .name
- .as_deref()
- .unwrap_or(&svc_port.port.to_string())
- );
-
- match listener_client
- .create_listener(authorized_request(
- CreateListenerRequest {
- name: listener_name.clone(),
- loadbalancer_id: lb_id.clone(),
- protocol: ListenerProtocol::Tcp as i32,
- port: svc_port.port as u32,
- default_pool_id: pool_id.clone(),
- tls_config: None,
- connection_limit: 0, // No limit
- },
- &auth_token,
- ))
- .await
- {
- Ok(response) => {
- let listener = response.into_inner().listener;
- if let Some(listener) = listener {
- info!(
- "Created Listener {} on port {} for service {}/{}",
- listener.id, svc_port.port, namespace, name
- );
- }
- }
- Err(e) => {
- listeners_ready = false;
- warn!(
- "Failed to create Listener on port {} for service {}/{}: {}",
- svc_port.port, namespace, name, e
- );
- }
- }
- }
-
- // Query Pods matching Service selector and create Backends
- let pods = match self
- .storage
- .list_pods(
- org_id,
- project_id,
- Some(&namespace),
- if service.spec.selector.is_empty() {
- None
- } else {
- Some(&service.spec.selector)
- },
- )
- .await
- {
- Ok(pods) => pods,
- Err(e) => {
- warn!(
- "Failed to list Pods for service {}/{}: {}",
- namespace, name, e
- );
- vec![]
- }
- };
-
- info!(
- "Found {} Pod(s) matching selector for service {}/{}",
- pods.len(),
- namespace,
- name
- );
-
- // Create Backend for each Pod
- let mut backend_count = 0usize;
- for pod in &pods {
- // Get Pod IP
- let pod_ip = match pod.status.as_ref().and_then(|s| s.pod_ip.as_ref()) {
- Some(ip) => ip,
- None => {
- debug!(
- "Pod {} has no IP yet, skipping backend creation",
- pod.metadata.name
- );
- continue;
- }
- };
-
- // For each Service port, create a Backend
- for svc_port in &service.spec.ports {
- // Use target_port if specified, otherwise use port
- let backend_port = svc_port.target_port.unwrap_or(svc_port.port);
-
- let backend_name =
- format!("{}-backend-{}-{}", lb_name, pod.metadata.name, backend_port);
-
- match backend_client
- .create_backend(authorized_request(
- CreateBackendRequest {
- name: backend_name.clone(),
- pool_id: pool_id.clone(),
- address: pod_ip.clone(),
- port: backend_port as u32,
- weight: 1,
- },
- &auth_token,
- ))
- .await
- {
- Ok(response) => {
- let backend = response.into_inner().backend;
- if let Some(backend) = backend {
- backend_count += 1;
- info!(
- "Created Backend {} for Pod {} ({}:{}) in service {}/{}",
- backend.id,
- pod.metadata.name,
- pod_ip,
- backend_port,
- namespace,
- name
- );
- }
- }
- Err(e) => {
- warn!(
- "Failed to create Backend for Pod {} in service {}/{}: {}",
- pod.metadata.name, namespace, name, e
- );
- }
- }
- }
- }
-
- if !listeners_ready {
+ for tenant in tenants {
+ if let Err(error) = self.reconcile_tenant_loadbalancers(&tenant).await {
warn!(
- "Skipping Service update for {}/{} because one or more FiberLB listeners failed",
- namespace, name
- );
- continue;
- }
-
- if backend_count == 0 {
- warn!(
- "Skipping Service update for {}/{} because no FiberLB backends were created",
- namespace, name
- );
- continue;
- }
-
- service.status = Some(ServiceStatus {
- load_balancer: Some(LoadBalancerStatus {
- ingress: vec![LoadBalancerIngress {
- ip: allocated_vip,
- hostname: None,
- }],
- }),
- });
- service
- .metadata
- .annotations
- .insert("fiberlb.plasmacloud.io/lb-id".to_string(), lb_id.clone());
- service.metadata.annotations.insert(
- "fiberlb.plasmacloud.io/pool-id".to_string(),
- pool_id.clone(),
- );
-
- // Merge with the latest stored version so the DNS controller does not lose its annotations.
- if let Ok(Some(mut current)) = self
- .storage
- .get_service(org_id, project_id, &namespace, &name)
- .await
- {
- current.status = service.status.clone().or(current.status);
- current
- .metadata
- .annotations
- .extend(service.metadata.annotations.clone());
- service = current;
- }
-
- let current_version = service
- .metadata
- .resource_version
- .as_ref()
- .and_then(|v| v.parse::().ok())
- .unwrap_or(0);
- service.metadata.resource_version = Some((current_version + 1).to_string());
-
- if let Err(e) = self.storage.put_service(&service).await {
- warn!(
- "Failed to update service {}/{} with FiberLB resources: {}",
- namespace, name, e
- );
- } else {
- info!(
- "Successfully provisioned LoadBalancer for service {}/{} with {} backend(s)",
- namespace,
- name,
- pods.len()
+ org_id = %tenant.org_id,
+ project_id = %tenant.project_id,
+ error = %error,
+ "failed to reconcile tenant load balancers"
);
}
}
@@ -471,36 +86,761 @@ impl FiberLbController {
Ok(())
}
- /// Cleanup LoadBalancer when Service is deleted
- ///
- /// This should be called when a Service with type=LoadBalancer is deleted.
- /// For MVP, this is not automatically triggered - would need a deletion watch.
- #[allow(dead_code)]
- async fn cleanup_loadbalancer(
- &self,
- org_id: &str,
- project_id: &str,
- lb_id: &str,
- ) -> Result<()> {
- let mut fiberlb_client =
- LoadBalancerServiceClient::connect(self.fiberlb_addr.clone()).await?;
+ async fn reconcile_tenant_loadbalancers(&self, tenant: &TenantRef) -> Result<()> {
+ let services = self
+ .storage
+ .list_services(&tenant.org_id, &tenant.project_id, None)
+ .await?;
+
+ for service in services.iter().filter(|service| {
+ !service_is_load_balancer(service) && has_load_balancer_state(service)
+ }) {
+ self.clear_service_load_balancer_state(
+ &tenant.org_id,
+ &tenant.project_id,
+ service_namespace(service),
+ &service.metadata.name,
+ )
+ .await?;
+ }
+
+ let desired_services = services
+ .into_iter()
+ .filter(service_is_load_balancer)
+ .collect::>();
+ if !desired_services.is_empty() {
+ self.known_tenants
+ .write()
+ .await
+ .insert(tenant_cache_key(tenant));
+ }
+ let desired_names = desired_services
+ .iter()
+ .map(service_lb_name)
+ .collect::>();
+
let auth_token = issue_controller_token(
&self.iam_server_addr,
CONTROLLER_PRINCIPAL_ID,
- org_id,
- project_id,
+ &tenant.org_id,
+ &tenant.project_id,
)
.await?;
- let delete_req = DeleteLoadBalancerRequest {
- id: lb_id.to_string(),
- };
+ let mut lb_client =
+ match LoadBalancerServiceClient::connect(self.fiberlb_addr.clone()).await {
+ Ok(client) => client,
+ Err(error) => {
+ warn!(
+ address = %self.fiberlb_addr,
+ error = %error,
+ "failed to connect to FiberLB load balancer service"
+ );
+ return Ok(());
+ }
+ };
+ let mut pool_client = PoolServiceClient::connect(self.fiberlb_addr.clone()).await?;
+ let mut listener_client = ListenerServiceClient::connect(self.fiberlb_addr.clone()).await?;
+ let mut backend_client = BackendServiceClient::connect(self.fiberlb_addr.clone()).await?;
- fiberlb_client
- .delete_load_balancer(authorized_request(delete_req, &auth_token))
+ let current_load_balancers = self
+ .list_load_balancers(&mut lb_client, tenant, &auth_token)
.await?;
+ for load_balancer in current_load_balancers
+ .iter()
+ .filter(|load_balancer| is_managed_load_balancer(load_balancer))
+ {
+ if !desired_names.contains(&load_balancer.name) {
+ delete_load_balancer(&mut lb_client, &auth_token, &load_balancer.id).await?;
+ }
+ }
+
+ for service in desired_services {
+ self.reconcile_service_load_balancer(
+ tenant,
+ &mut lb_client,
+ &mut pool_client,
+ &mut listener_client,
+ &mut backend_client,
+ &auth_token,
+ ¤t_load_balancers,
+ &service,
+ )
+ .await?;
+ }
+
+ if desired_names.is_empty() {
+ self.known_tenants
+ .write()
+ .await
+ .remove(&tenant_cache_key(tenant));
+ }
- info!("Deleted LoadBalancer {} from FiberLB", lb_id);
Ok(())
}
+
+ async fn reconciliation_tenants(&self) -> Result> {
+ let mut tenants = self
+ .storage
+ .list_active_tenants()
+ .await?
+ .into_iter()
+ .collect::>();
+
+ for cache_key in self.known_tenants.read().await.iter() {
+ if let Some(tenant) = parse_tenant_cache_key(cache_key) {
+ tenants.insert(tenant);
+ }
+ }
+
+ Ok(tenants.into_iter().collect())
+ }
+
+ async fn reconcile_service_load_balancer(
+ &self,
+ tenant: &TenantRef,
+ lb_client: &mut LoadBalancerServiceClient,
+ pool_client: &mut PoolServiceClient,
+ listener_client: &mut ListenerServiceClient,
+ backend_client: &mut BackendServiceClient,
+ auth_token: &str,
+ current_load_balancers: &[LoadBalancer],
+ service: &Service,
+ ) -> Result<()> {
+ let namespace = service_namespace(service).to_string();
+ let lb_name = service_lb_name(service);
+ let description = service_lb_description(service);
+ let annotated_lb_id = service.metadata.annotations.get(LB_ID_ANNOTATION);
+ let existing_load_balancer = current_load_balancers.iter().find(|load_balancer| {
+ annotated_lb_id.map(String::as_str) == Some(load_balancer.id.as_str())
+ || load_balancer.name == lb_name
+ });
+
+ let load_balancer = ensure_load_balancer(
+ lb_client,
+ auth_token,
+ tenant,
+ &lb_name,
+ &description,
+ existing_load_balancer,
+ )
+ .await?;
+ let pool = ensure_pool(
+ pool_client,
+ auth_token,
+ &load_balancer.id,
+ &format!("{lb_name}-pool"),
+ )
+ .await?;
+
+ reconcile_listeners(
+ listener_client,
+ auth_token,
+ &load_balancer.id,
+ &pool.id,
+ service,
+ &lb_name,
+ )
+ .await?;
+
+ let pods = self
+ .storage
+ .list_pods(
+ &tenant.org_id,
+ &tenant.project_id,
+ Some(&namespace),
+ if service.spec.selector.is_empty() {
+ None
+ } else {
+ Some(&service.spec.selector)
+ },
+ )
+ .await?;
+
+ reconcile_backends(
+ backend_client,
+ auth_token,
+ &pool.id,
+ service,
+ &lb_name,
+ &pods,
+ )
+ .await?;
+
+ self.reconcile_service_load_balancer_state(
+ &tenant.org_id,
+ &tenant.project_id,
+ &namespace,
+ &service.metadata.name,
+ &load_balancer.id,
+ &pool.id,
+ non_empty_value(&load_balancer.vip_address),
+ )
+ .await?;
+
+ Ok(())
+ }
+
+ async fn reconcile_service_load_balancer_state(
+ &self,
+ org_id: &str,
+ project_id: &str,
+ namespace: &str,
+ name: &str,
+ lb_id: &str,
+ pool_id: &str,
+ vip: Option<&str>,
+ ) -> Result<()> {
+ let Some(mut service) = self
+ .storage
+ .get_service(org_id, project_id, namespace, name)
+ .await?
+ else {
+ return Ok(());
+ };
+
+ let mut changed = false;
+ changed |= set_annotation(&mut service, LB_ID_ANNOTATION, lb_id);
+ changed |= set_annotation(&mut service, POOL_ID_ANNOTATION, pool_id);
+ changed |= set_load_balancer_status(&mut service, vip);
+
+ if !changed {
+ return Ok(());
+ }
+
+ bump_resource_version(&mut service);
+ self.storage.put_service(&service).await?;
+ Ok(())
+ }
+
+ async fn clear_service_load_balancer_state(
+ &self,
+ org_id: &str,
+ project_id: &str,
+ namespace: &str,
+ name: &str,
+ ) -> Result<()> {
+ let Some(mut service) = self
+ .storage
+ .get_service(org_id, project_id, namespace, name)
+ .await?
+ else {
+ return Ok(());
+ };
+
+ let mut changed = false;
+ changed |= service
+ .metadata
+ .annotations
+ .remove(LB_ID_ANNOTATION)
+ .is_some();
+ changed |= service
+ .metadata
+ .annotations
+ .remove(POOL_ID_ANNOTATION)
+ .is_some();
+
+ let had_load_balancer = service
+ .status
+ .as_ref()
+ .and_then(|status| status.load_balancer.as_ref())
+ .is_some();
+ if had_load_balancer {
+ if let Some(status) = service.status.as_mut() {
+ status.load_balancer = None;
+ }
+ if matches!(service.status.as_ref(), Some(status) if status.load_balancer.is_none()) {
+ service.status = None;
+ }
+ changed = true;
+ }
+
+ if !changed {
+ return Ok(());
+ }
+
+ bump_resource_version(&mut service);
+ self.storage.put_service(&service).await?;
+ Ok(())
+ }
+
+ async fn list_load_balancers(
+ &self,
+ client: &mut LoadBalancerServiceClient,
+ tenant: &TenantRef,
+ auth_token: &str,
+ ) -> Result> {
+ let mut load_balancers = Vec::new();
+ let mut page_token = String::new();
+
+ loop {
+ let response = client
+ .list_load_balancers(authorized_request(
+ ListLoadBalancersRequest {
+ org_id: tenant.org_id.clone(),
+ project_id: tenant.project_id.clone(),
+ page_size: 256,
+ page_token: page_token.clone(),
+ },
+ auth_token,
+ ))
+ .await?
+ .into_inner();
+
+ load_balancers.extend(response.loadbalancers);
+ if response.next_page_token.is_empty() {
+ break;
+ }
+ page_token = response.next_page_token;
+ }
+
+ Ok(load_balancers)
+ }
+}
+
+#[derive(Clone)]
+struct DesiredBackend {
+ name: String,
+ address: String,
+ port: u32,
+}
+
+async fn ensure_load_balancer(
+ client: &mut LoadBalancerServiceClient,
+ auth_token: &str,
+ tenant: &TenantRef,
+ name: &str,
+ description: &str,
+ existing: Option<&LoadBalancer>,
+) -> Result {
+ if let Some(load_balancer) = existing {
+ return Ok(load_balancer.clone());
+ }
+
+ Ok(client
+ .create_load_balancer(authorized_request(
+ CreateLoadBalancerRequest {
+ name: name.to_string(),
+ org_id: tenant.org_id.clone(),
+ project_id: tenant.project_id.clone(),
+ description: description.to_string(),
+ vip_address: String::new(),
+ },
+ auth_token,
+ ))
+ .await?
+ .into_inner()
+ .loadbalancer
+ .ok_or_else(|| anyhow::anyhow!("FiberLB returned empty CreateLoadBalancer response"))?)
+}
+
+async fn ensure_pool(
+ client: &mut PoolServiceClient,
+ auth_token: &str,
+ load_balancer_id: &str,
+ name: &str,
+) -> Result {
+ let mut page_token = String::new();
+
+ loop {
+ let response = client
+ .list_pools(authorized_request(
+ ListPoolsRequest {
+ loadbalancer_id: load_balancer_id.to_string(),
+ page_size: 256,
+ page_token: page_token.clone(),
+ },
+ auth_token,
+ ))
+ .await?
+ .into_inner();
+
+ if let Some(pool) = response.pools.into_iter().find(|pool| pool.name == name) {
+ return Ok(pool);
+ }
+
+ if response.next_page_token.is_empty() {
+ break;
+ }
+ page_token = response.next_page_token;
+ }
+
+ Ok(client
+ .create_pool(authorized_request(
+ CreatePoolRequest {
+ name: name.to_string(),
+ loadbalancer_id: load_balancer_id.to_string(),
+ algorithm: PoolAlgorithm::RoundRobin as i32,
+ protocol: PoolProtocol::Tcp as i32,
+ session_persistence: None,
+ },
+ auth_token,
+ ))
+ .await?
+ .into_inner()
+ .pool
+ .ok_or_else(|| anyhow::anyhow!("FiberLB returned empty CreatePool response"))?)
+}
+
+async fn reconcile_listeners(
+ client: &mut ListenerServiceClient,
+ auth_token: &str,
+ load_balancer_id: &str,
+ pool_id: &str,
+ service: &Service,
+ lb_name: &str,
+) -> Result<()> {
+ let desired = service
+ .spec
+ .ports
+ .iter()
+ .map(|port| {
+ (
+ format!(
+ "{}-listener-{}",
+ lb_name,
+ port.name.as_deref().unwrap_or(&port.port.to_string())
+ ),
+ port.port as u32,
+ )
+ })
+ .collect::>();
+ let mut satisfied = HashSet::new();
+
+ for listener in list_listeners(client, auth_token, load_balancer_id).await? {
+ match desired.get(&listener.name) {
+ Some(port)
+ if listener.port == *port
+ && listener.protocol == ListenerProtocol::Tcp as i32
+ && listener.default_pool_id == pool_id
+ && !satisfied.contains(&listener.name) =>
+ {
+ satisfied.insert(listener.name.clone());
+ }
+ _ => delete_listener(client, auth_token, &listener.id).await?,
+ }
+ }
+
+ for (name, port) in desired {
+ if satisfied.contains(&name) {
+ continue;
+ }
+
+ client
+ .create_listener(authorized_request(
+ CreateListenerRequest {
+ name,
+ loadbalancer_id: load_balancer_id.to_string(),
+ protocol: ListenerProtocol::Tcp as i32,
+ port,
+ default_pool_id: pool_id.to_string(),
+ tls_config: None,
+ connection_limit: 0,
+ },
+ auth_token,
+ ))
+ .await?;
+ }
+
+ Ok(())
+}
+
+async fn reconcile_backends(
+ client: &mut BackendServiceClient,
+ auth_token: &str,
+ pool_id: &str,
+ service: &Service,
+ lb_name: &str,
+ pods: &[Pod],
+) -> Result<()> {
+ let desired = desired_backends(service, lb_name, pods)
+ .into_iter()
+ .map(|backend| (backend.name.clone(), backend))
+ .collect::>();
+ let mut satisfied = HashSet::new();
+
+ for backend in list_backends(client, auth_token, pool_id).await? {
+ match desired.get(&backend.name) {
+ Some(target)
+ if backend.address == target.address
+ && backend.port == target.port
+ && !satisfied.contains(&backend.name) =>
+ {
+ satisfied.insert(backend.name.clone());
+ }
+ _ => delete_backend(client, auth_token, &backend.id).await?,
+ }
+ }
+
+ for backend in desired.into_values() {
+ if satisfied.contains(&backend.name) {
+ continue;
+ }
+
+ client
+ .create_backend(authorized_request(
+ CreateBackendRequest {
+ name: backend.name,
+ pool_id: pool_id.to_string(),
+ address: backend.address,
+ port: backend.port,
+ weight: 1,
+ },
+ auth_token,
+ ))
+ .await?;
+ }
+
+ Ok(())
+}
+
+async fn list_listeners(
+ client: &mut ListenerServiceClient,
+ auth_token: &str,
+ load_balancer_id: &str,
+) -> Result> {
+ let mut listeners = Vec::new();
+ let mut page_token = String::new();
+
+ loop {
+ let response = client
+ .list_listeners(authorized_request(
+ ListListenersRequest {
+ loadbalancer_id: load_balancer_id.to_string(),
+ page_size: 256,
+ page_token: page_token.clone(),
+ },
+ auth_token,
+ ))
+ .await?
+ .into_inner();
+
+ listeners.extend(response.listeners);
+ if response.next_page_token.is_empty() {
+ break;
+ }
+ page_token = response.next_page_token;
+ }
+
+ Ok(listeners)
+}
+
+async fn list_backends(
+ client: &mut BackendServiceClient,
+ auth_token: &str,
+ pool_id: &str,
+) -> Result> {
+ let mut backends = Vec::new();
+ let mut page_token = String::new();
+
+ loop {
+ let response = client
+ .list_backends(authorized_request(
+ ListBackendsRequest {
+ pool_id: pool_id.to_string(),
+ page_size: 256,
+ page_token: page_token.clone(),
+ },
+ auth_token,
+ ))
+ .await?
+ .into_inner();
+
+ backends.extend(response.backends);
+ if response.next_page_token.is_empty() {
+ break;
+ }
+ page_token = response.next_page_token;
+ }
+
+ Ok(backends)
+}
+
+fn desired_backends(service: &Service, lb_name: &str, pods: &[Pod]) -> Vec {
+ let mut desired = Vec::new();
+
+ for pod in pods {
+ let Some(pod_ip) = pod
+ .status
+ .as_ref()
+ .and_then(|status| status.pod_ip.as_ref())
+ else {
+ continue;
+ };
+
+ for service_port in &service.spec.ports {
+ let backend_port = service_port.target_port.unwrap_or(service_port.port);
+ desired.push(DesiredBackend {
+ name: format!("{}-backend-{}-{}", lb_name, pod.metadata.name, backend_port),
+ address: pod_ip.clone(),
+ port: backend_port as u32,
+ });
+ }
+ }
+
+ desired
+}
+
+fn service_is_load_balancer(service: &Service) -> bool {
+ service.spec.r#type.as_deref() == Some("LoadBalancer")
+}
+
+fn tenant_cache_key(tenant: &TenantRef) -> String {
+ format!("{}/{}", tenant.org_id, tenant.project_id)
+}
+
+fn parse_tenant_cache_key(value: &str) -> Option {
+ let (org_id, project_id) = value.split_once('/')?;
+ Some(TenantRef {
+ org_id: org_id.to_string(),
+ project_id: project_id.to_string(),
+ })
+}
+
+fn has_load_balancer_state(service: &Service) -> bool {
+ service.metadata.annotations.contains_key(LB_ID_ANNOTATION)
+ || service
+ .metadata
+ .annotations
+ .contains_key(POOL_ID_ANNOTATION)
+ || service
+ .status
+ .as_ref()
+ .and_then(|status| status.load_balancer.as_ref())
+ .is_some()
+}
+
+fn service_namespace(service: &Service) -> &str {
+ service.metadata.namespace.as_deref().unwrap_or("default")
+}
+
+fn service_lb_name(service: &Service) -> String {
+ format!("{}.{}", service.metadata.name, service_namespace(service))
+}
+
+fn service_lb_description(service: &Service) -> String {
+ format!(
+ "k8s service {}/{}",
+ service_namespace(service),
+ service.metadata.name
+ )
+}
+
+fn is_managed_load_balancer(load_balancer: &LoadBalancer) -> bool {
+ load_balancer.description.starts_with("k8s service ")
+}
+
+fn non_empty_value(value: &str) -> Option<&str> {
+ if value.is_empty() {
+ None
+ } else {
+ Some(value)
+ }
+}
+
+fn set_annotation(service: &mut Service, key: &str, value: &str) -> bool {
+ match service.metadata.annotations.get(key) {
+ Some(current) if current == value => false,
+ _ => {
+ service
+ .metadata
+ .annotations
+ .insert(key.to_string(), value.to_string());
+ true
+ }
+ }
+}
+
+fn set_load_balancer_status(service: &mut Service, vip: Option<&str>) -> bool {
+ let desired = vip.map(|vip| ServiceStatus {
+ load_balancer: Some(LoadBalancerStatus {
+ ingress: vec![LoadBalancerIngress {
+ ip: Some(vip.to_string()),
+ hostname: None,
+ }],
+ }),
+ });
+
+ let current_vip = service
+ .status
+ .as_ref()
+ .and_then(|status| status.load_balancer.as_ref())
+ .and_then(|load_balancer| load_balancer.ingress.first())
+ .and_then(|ingress| ingress.ip.as_deref());
+ if current_vip == vip {
+ return false;
+ }
+
+ service.status = desired;
+ true
+}
+
+fn bump_resource_version(service: &mut Service) {
+ let current = service
+ .metadata
+ .resource_version
+ .as_deref()
+ .and_then(|value| value.parse::().ok())
+ .unwrap_or(0);
+ service.metadata.resource_version = Some((current + 1).to_string());
+}
+
+async fn delete_load_balancer(
+ client: &mut LoadBalancerServiceClient,
+ auth_token: &str,
+ lb_id: &str,
+) -> Result<()> {
+ match client
+ .delete_load_balancer(authorized_request(
+ DeleteLoadBalancerRequest {
+ id: lb_id.to_string(),
+ },
+ auth_token,
+ ))
+ .await
+ {
+ Ok(_) => Ok(()),
+ Err(status) if status.code() == Code::NotFound => Ok(()),
+ Err(status) => Err(status.into()),
+ }
+}
+
+async fn delete_listener(
+ client: &mut ListenerServiceClient,
+ auth_token: &str,
+ listener_id: &str,
+) -> Result<()> {
+ match client
+ .delete_listener(authorized_request(
+ DeleteListenerRequest {
+ id: listener_id.to_string(),
+ },
+ auth_token,
+ ))
+ .await
+ {
+ Ok(_) => Ok(()),
+ Err(status) if status.code() == Code::NotFound => Ok(()),
+ Err(status) => Err(status.into()),
+ }
+}
+
+async fn delete_backend(
+ client: &mut BackendServiceClient,
+ auth_token: &str,
+ backend_id: &str,
+) -> Result<()> {
+ match client
+ .delete_backend(authorized_request(
+ DeleteBackendRequest {
+ id: backend_id.to_string(),
+ },
+ auth_token,
+ ))
+ .await
+ {
+ Ok(_) => Ok(()),
+ Err(status) if status.code() == Code::NotFound => Ok(()),
+ Err(status) => Err(status.into()),
+ }
}
diff --git a/k8shost/crates/k8shost-server/src/flashdns_controller.rs b/k8shost/crates/k8shost-server/src/flashdns_controller.rs
index 05e173e..64d29bf 100644
--- a/k8shost/crates/k8shost-server/src/flashdns_controller.rs
+++ b/k8shost/crates/k8shost-server/src/flashdns_controller.rs
@@ -1,27 +1,32 @@
//! FlashDNS Controller - Manages cluster.local DNS records for Services
//!
-//! This controller watches for Services and automatically creates DNS records
-//! in the format: {service}.{namespace}.svc.cluster.local → ClusterIP
+//! This controller continuously reconciles tenant-scoped Service resources into
+//! authoritative `*.svc.cluster.local` A records and removes stale records when
+//! Services disappear or lose their ClusterIP.
use crate::auth::{authorized_request, issue_controller_token};
-use crate::storage::Storage;
+use crate::storage::{Storage, TenantRef};
use anyhow::Result;
use flashdns_api::proto::record_service_client::RecordServiceClient;
use flashdns_api::proto::zone_service_client::ZoneServiceClient;
use flashdns_api::proto::{
get_zone_request, record_data, ARecord, CreateRecordRequest, CreateZoneRequest,
- DeleteRecordRequest, GetZoneRequest, ListZonesRequest, RecordData,
+ DeleteRecordRequest, GetZoneRequest, ListRecordsRequest, ListZonesRequest, RecordData,
+ RecordInfo,
};
-use std::collections::HashMap;
+use k8shost_types::Service;
+use std::collections::{BTreeSet, HashMap};
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;
-use tonic::Code;
+use tonic::{transport::Channel, Code};
use tracing::{debug, info, warn};
const CLUSTER_DOMAIN: &str = "cluster.local";
-const DNS_RECORD_TTL: u32 = 60; // 60 seconds for dynamic cluster services
+const DNS_RECORD_TTL: u32 = 60;
const CONTROLLER_PRINCIPAL_ID: &str = "k8shost-controller";
+const RECORD_ID_ANNOTATION: &str = "flashdns.plasmacloud.io/record-id";
+const ZONE_ID_ANNOTATION: &str = "flashdns.plasmacloud.io/zone-id";
/// FlashDNS controller for managing cluster.local DNS records
pub struct FlashDnsController {
@@ -29,23 +34,20 @@ pub struct FlashDnsController {
flashdns_addr: String,
iam_server_addr: String,
interval: Duration,
- /// Cache of zone_id per tenant (org_id/project_id -> zone_id)
zone_cache: Arc>>,
}
impl FlashDnsController {
- /// Create a new FlashDNS controller
pub fn new(storage: Arc, flashdns_addr: String, iam_server_addr: String) -> Self {
Self {
storage,
flashdns_addr,
iam_server_addr,
- interval: Duration::from_secs(10), // Check every 10 seconds
+ interval: Duration::from_secs(10),
zone_cache: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
}
}
- /// Start the controller loop
pub async fn run(self: Arc) {
info!(
"FlashDNS controller started (FlashDNS at {}, {}s interval)",
@@ -54,268 +56,328 @@ impl FlashDnsController {
);
loop {
- if let Err(e) = self.reconcile_dns_records().await {
- warn!("FlashDNS controller cycle failed: {}", e);
+ if let Err(error) = self.reconcile_dns_records().await {
+ warn!(error = %error, "FlashDNS controller cycle failed");
}
sleep(self.interval).await;
}
}
- /// Reconcile DNS records across all tenants
async fn reconcile_dns_records(&self) -> Result<()> {
- // For MVP, iterate through known tenants
- let tenants = vec![("default-org".to_string(), "default-project".to_string())];
-
- for (org_id, project_id) in tenants {
- if let Err(e) = self.reconcile_tenant_dns(&org_id, &project_id).await {
- warn!(
- "Failed to reconcile DNS for tenant {}/{}: {}",
- org_id, project_id, e
- );
- }
- }
-
- Ok(())
- }
-
- /// Reconcile DNS records for a specific tenant
- async fn reconcile_tenant_dns(&self, org_id: &str, project_id: &str) -> Result<()> {
- let auth_token = issue_controller_token(
- &self.iam_server_addr,
- CONTROLLER_PRINCIPAL_ID,
- org_id,
- project_id,
- )
- .await?;
-
- // Ensure cluster.local zone exists for this tenant
- let zone_id = match self
- .ensure_zone_exists(org_id, project_id, &auth_token)
- .await
- {
- Ok(id) => id,
- Err(e) => {
- warn!(
- "Failed to ensure zone exists for tenant {}/{}: {}",
- org_id, project_id, e
- );
- return Ok(());
- }
- };
-
- // Get all services for this tenant
- let services = self.storage.list_services(org_id, project_id, None).await?;
-
- // Filter for services that need DNS records
- let services_needing_dns: Vec<_> = services
- .into_iter()
- .filter(|svc| {
- // Service needs DNS if:
- // 1. Has cluster_ip allocated
- // 2. Does NOT have flashdns.plasmacloud.io/record-id annotation (not yet provisioned)
- svc.spec.cluster_ip.is_some()
- && !svc
- .metadata
- .annotations
- .contains_key("flashdns.plasmacloud.io/record-id")
- })
- .collect();
-
- if services_needing_dns.is_empty() {
- debug!(
- "No services need DNS records for tenant {}/{}",
- org_id, project_id
- );
+ let tenants = self.reconciliation_tenants().await?;
+ if tenants.is_empty() {
+ debug!("No active tenants need FlashDNS reconciliation");
return Ok(());
}
- info!(
- "Found {} service(s) needing DNS records for tenant {}/{}",
- services_needing_dns.len(),
- org_id,
- project_id
- );
-
- // Connect to FlashDNS
- let mut record_client = match RecordServiceClient::connect(self.flashdns_addr.clone()).await
- {
- Ok(client) => client,
- Err(e) => {
+ for tenant in tenants {
+ if let Err(error) = self.reconcile_tenant_dns(&tenant).await {
warn!(
- "Failed to connect to FlashDNS at {}: {}",
- self.flashdns_addr, e
+ org_id = %tenant.org_id,
+ project_id = %tenant.project_id,
+ error = %error,
+ "failed to reconcile tenant DNS"
);
- return Ok(());
- }
- };
-
- // Create DNS records for each service
- for mut service in services_needing_dns {
- let namespace = service
- .metadata
- .namespace
- .clone()
- .unwrap_or_else(|| "default".to_string());
- let name = service.metadata.name.clone();
- let cluster_ip = service.spec.cluster_ip.as_ref().unwrap();
-
- // Construct DNS name: {service}.{namespace}.svc
- // Full FQDN will be: {service}.{namespace}.svc.cluster.local
- let record_name = format!("{}.{}.svc", name, namespace);
-
- info!(
- "Creating DNS record {} → {} for service {}/{}",
- record_name, cluster_ip, namespace, name
- );
-
- // Create A record
- let create_req = CreateRecordRequest {
- zone_id: zone_id.clone(),
- name: record_name.clone(),
- record_type: "A".to_string(),
- ttl: DNS_RECORD_TTL,
- data: Some(RecordData {
- data: Some(record_data::Data::A(ARecord {
- address: cluster_ip.clone(),
- })),
- }),
- };
-
- match record_client
- .create_record(authorized_request(create_req, &auth_token))
- .await
- {
- Ok(response) => {
- let record = response.into_inner().record;
- if let Some(record) = record {
- info!(
- "Created DNS record {} → {} (record_id: {})",
- record_name, cluster_ip, record.id
- );
-
- // Store record_id in service annotations
- service
- .metadata
- .annotations
- .insert("flashdns.plasmacloud.io/record-id".to_string(), record.id);
- service.metadata.annotations.insert(
- "flashdns.plasmacloud.io/zone-id".to_string(),
- zone_id.clone(),
- );
-
- // Merge with the latest stored version so the FiberLB controller does not
- // lose its status/annotations when both controllers reconcile together.
- if let Ok(Some(mut current)) = self
- .storage
- .get_service(org_id, project_id, &namespace, &name)
- .await
- {
- current.status = current.status.or(service.status.clone());
- current
- .metadata
- .annotations
- .extend(service.metadata.annotations.clone());
- service = current;
- }
-
- let current_version = service
- .metadata
- .resource_version
- .as_ref()
- .and_then(|v| v.parse::().ok())
- .unwrap_or(0);
- service.metadata.resource_version = Some((current_version + 1).to_string());
-
- // Save updated service
- if let Err(e) = self.storage.put_service(&service).await {
- warn!(
- "Failed to update service {}/{} with DNS record ID: {}",
- namespace, name, e
- );
- }
- }
- }
- Err(e) => {
- warn!(
- "Failed to create DNS record {} for service {}/{}: {}",
- record_name, namespace, name, e
- );
- }
}
}
Ok(())
}
- /// Ensure cluster.local zone exists for tenant, return zone_id
- async fn ensure_zone_exists(
- &self,
- org_id: &str,
- project_id: &str,
- auth_token: &str,
- ) -> Result {
- let cache_key = format!("{}/{}", org_id, project_id);
+ async fn reconcile_tenant_dns(&self, tenant: &TenantRef) -> Result<()> {
+ let auth_token = issue_controller_token(
+ &self.iam_server_addr,
+ CONTROLLER_PRINCIPAL_ID,
+ &tenant.org_id,
+ &tenant.project_id,
+ )
+ .await?;
+ let services = self
+ .storage
+ .list_services(&tenant.org_id, &tenant.project_id, None)
+ .await?;
- // Check cache first
+ for service in services
+ .iter()
+ .filter(|service| !service_requires_dns(service) && has_dns_state(service))
{
- let cache = self.zone_cache.read().await;
- if let Some(zone_id) = cache.get(&cache_key) {
- return Ok(zone_id.clone());
+ self.clear_service_dns_state(
+ &tenant.org_id,
+ &tenant.project_id,
+ service_namespace(service),
+ &service.metadata.name,
+ )
+ .await?;
+ }
+
+ let desired_services = services
+ .into_iter()
+ .filter(service_requires_dns)
+ .collect::>();
+ let has_desired_services = !desired_services.is_empty();
+ let zone_id = match self
+ .zone_id_for_tenant(tenant, &auth_token, !desired_services.is_empty())
+ .await?
+ {
+ Some(zone_id) => zone_id,
+ None => return Ok(()),
+ };
+
+ let mut record_client = match RecordServiceClient::connect(self.flashdns_addr.clone()).await
+ {
+ Ok(client) => client,
+ Err(error) => {
+ warn!(
+ address = %self.flashdns_addr,
+ error = %error,
+ "failed to connect to FlashDNS"
+ );
+ return Ok(());
+ }
+ };
+
+ let mut existing_by_name = self
+ .list_zone_records(&mut record_client, &zone_id, &auth_token)
+ .await?
+ .into_iter()
+ .filter(|record| is_managed_record(record))
+ .fold(
+ HashMap::>::new(),
+ |mut acc, record| {
+ acc.entry(record.name.clone()).or_default().push(record);
+ acc
+ },
+ );
+
+ for service in desired_services {
+ let namespace = service_namespace(&service).to_string();
+ let record_name = service_record_name(&service);
+ let cluster_ip = service
+ .spec
+ .cluster_ip
+ .as_deref()
+ .expect("service_requires_dns guarantees cluster_ip");
+ let existing = existing_by_name.remove(&record_name).unwrap_or_default();
+ let record = self
+ .ensure_service_record(
+ &mut record_client,
+ &auth_token,
+ &zone_id,
+ &record_name,
+ cluster_ip,
+ existing,
+ )
+ .await?;
+
+ self.reconcile_service_dns_state(
+ &tenant.org_id,
+ &tenant.project_id,
+ &namespace,
+ &service.metadata.name,
+ &zone_id,
+ &record.id,
+ )
+ .await?;
+ }
+
+ for stale_records in existing_by_name.into_values() {
+ for record in stale_records {
+ delete_record(&mut record_client, &auth_token, &record.id).await?;
}
}
- // Connect to FlashDNS
- let mut zone_client = ZoneServiceClient::connect(self.flashdns_addr.clone()).await?;
+ if !has_desired_services {
+ self.zone_cache
+ .write()
+ .await
+ .remove(&tenant_cache_key(tenant));
+ }
+ Ok(())
+ }
+
+ async fn reconciliation_tenants(&self) -> Result> {
+ let mut tenants = self
+ .storage
+ .list_active_tenants()
+ .await?
+ .into_iter()
+ .collect::>();
+
+ for cache_key in self.zone_cache.read().await.keys() {
+ if let Some(tenant) = parse_tenant_cache_key(cache_key) {
+ tenants.insert(tenant);
+ }
+ }
+
+ Ok(tenants.into_iter().collect())
+ }
+
+ async fn ensure_service_record(
+ &self,
+ client: &mut RecordServiceClient,
+ auth_token: &str,
+ zone_id: &str,
+ record_name: &str,
+ cluster_ip: &str,
+ existing: Vec,
+ ) -> Result {
+ let mut matching = None;
+
+ for record in existing {
+ let is_match = record.record_type == "A" && record_a_value(&record) == Some(cluster_ip);
+ if is_match && matching.is_none() {
+ matching = Some(record);
+ continue;
+ }
+
+ delete_record(client, auth_token, &record.id).await?;
+ }
+
+ if let Some(record) = matching {
+ return Ok(record);
+ }
+
+ Ok(client
+ .create_record(authorized_request(
+ CreateRecordRequest {
+ zone_id: zone_id.to_string(),
+ name: record_name.to_string(),
+ record_type: "A".to_string(),
+ ttl: DNS_RECORD_TTL,
+ data: Some(RecordData {
+ data: Some(record_data::Data::A(ARecord {
+ address: cluster_ip.to_string(),
+ })),
+ }),
+ },
+ auth_token,
+ ))
+ .await?
+ .into_inner()
+ .record
+ .ok_or_else(|| anyhow::anyhow!("FlashDNS returned empty CreateRecord response"))?)
+ }
+
+ async fn reconcile_service_dns_state(
+ &self,
+ org_id: &str,
+ project_id: &str,
+ namespace: &str,
+ name: &str,
+ zone_id: &str,
+ record_id: &str,
+ ) -> Result<()> {
+ let Some(mut service) = self
+ .storage
+ .get_service(org_id, project_id, namespace, name)
+ .await?
+ else {
+ return Ok(());
+ };
+
+ let mut changed = false;
+ changed |= set_annotation(&mut service, RECORD_ID_ANNOTATION, record_id);
+ changed |= set_annotation(&mut service, ZONE_ID_ANNOTATION, zone_id);
+
+ if !changed {
+ return Ok(());
+ }
+
+ bump_resource_version(&mut service);
+ self.storage.put_service(&service).await?;
+ Ok(())
+ }
+
+ async fn clear_service_dns_state(
+ &self,
+ org_id: &str,
+ project_id: &str,
+ namespace: &str,
+ name: &str,
+ ) -> Result<()> {
+ let Some(mut service) = self
+ .storage
+ .get_service(org_id, project_id, namespace, name)
+ .await?
+ else {
+ return Ok(());
+ };
+
+ let mut changed = false;
+ changed |= service
+ .metadata
+ .annotations
+ .remove(RECORD_ID_ANNOTATION)
+ .is_some();
+ changed |= service
+ .metadata
+ .annotations
+ .remove(ZONE_ID_ANNOTATION)
+ .is_some();
+
+ if !changed {
+ return Ok(());
+ }
+
+ bump_resource_version(&mut service);
+ self.storage.put_service(&service).await?;
+ Ok(())
+ }
+
+ async fn zone_id_for_tenant(
+ &self,
+ tenant: &TenantRef,
+ auth_token: &str,
+ create_if_missing: bool,
+ ) -> Result