Make k8shost publications authoritative across tenants

This commit is contained in:
centra 2026-04-01 13:50:21 +09:00
parent c17e5a6130
commit a1aa3f8245
Signed by: centra
GPG key ID: 0C09689D20B25ACA
5 changed files with 1380 additions and 758 deletions

File diff suppressed because it is too large Load diff

View file

@ -1,27 +1,32 @@
//! FlashDNS Controller - Manages cluster.local DNS records for Services //! FlashDNS Controller - Manages cluster.local DNS records for Services
//! //!
//! This controller watches for Services and automatically creates DNS records //! This controller continuously reconciles tenant-scoped Service resources into
//! in the format: {service}.{namespace}.svc.cluster.local → ClusterIP //! authoritative `*.svc.cluster.local` A records and removes stale records when
//! Services disappear or lose their ClusterIP.
use crate::auth::{authorized_request, issue_controller_token}; use crate::auth::{authorized_request, issue_controller_token};
use crate::storage::Storage; use crate::storage::{Storage, TenantRef};
use anyhow::Result; use anyhow::Result;
use flashdns_api::proto::record_service_client::RecordServiceClient; use flashdns_api::proto::record_service_client::RecordServiceClient;
use flashdns_api::proto::zone_service_client::ZoneServiceClient; use flashdns_api::proto::zone_service_client::ZoneServiceClient;
use flashdns_api::proto::{ use flashdns_api::proto::{
get_zone_request, record_data, ARecord, CreateRecordRequest, CreateZoneRequest, get_zone_request, record_data, ARecord, CreateRecordRequest, CreateZoneRequest,
DeleteRecordRequest, GetZoneRequest, ListZonesRequest, RecordData, DeleteRecordRequest, GetZoneRequest, ListRecordsRequest, ListZonesRequest, RecordData,
RecordInfo,
}; };
use std::collections::HashMap; use k8shost_types::Service;
use std::collections::{BTreeSet, HashMap};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::time::sleep; use tokio::time::sleep;
use tonic::Code; use tonic::{transport::Channel, Code};
use tracing::{debug, info, warn}; use tracing::{debug, info, warn};
const CLUSTER_DOMAIN: &str = "cluster.local"; const CLUSTER_DOMAIN: &str = "cluster.local";
const DNS_RECORD_TTL: u32 = 60; // 60 seconds for dynamic cluster services const DNS_RECORD_TTL: u32 = 60;
const CONTROLLER_PRINCIPAL_ID: &str = "k8shost-controller"; const CONTROLLER_PRINCIPAL_ID: &str = "k8shost-controller";
const RECORD_ID_ANNOTATION: &str = "flashdns.plasmacloud.io/record-id";
const ZONE_ID_ANNOTATION: &str = "flashdns.plasmacloud.io/zone-id";
/// FlashDNS controller for managing cluster.local DNS records /// FlashDNS controller for managing cluster.local DNS records
pub struct FlashDnsController { pub struct FlashDnsController {
@ -29,23 +34,20 @@ pub struct FlashDnsController {
flashdns_addr: String, flashdns_addr: String,
iam_server_addr: String, iam_server_addr: String,
interval: Duration, interval: Duration,
/// Cache of zone_id per tenant (org_id/project_id -> zone_id)
zone_cache: Arc<tokio::sync::RwLock<HashMap<String, String>>>, zone_cache: Arc<tokio::sync::RwLock<HashMap<String, String>>>,
} }
impl FlashDnsController { impl FlashDnsController {
/// Create a new FlashDNS controller
pub fn new(storage: Arc<Storage>, flashdns_addr: String, iam_server_addr: String) -> Self { pub fn new(storage: Arc<Storage>, flashdns_addr: String, iam_server_addr: String) -> Self {
Self { Self {
storage, storage,
flashdns_addr, flashdns_addr,
iam_server_addr, iam_server_addr,
interval: Duration::from_secs(10), // Check every 10 seconds interval: Duration::from_secs(10),
zone_cache: Arc::new(tokio::sync::RwLock::new(HashMap::new())), zone_cache: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
} }
} }
/// Start the controller loop
pub async fn run(self: Arc<Self>) { pub async fn run(self: Arc<Self>) {
info!( info!(
"FlashDNS controller started (FlashDNS at {}, {}s interval)", "FlashDNS controller started (FlashDNS at {}, {}s interval)",
@ -54,24 +56,28 @@ impl FlashDnsController {
); );
loop { loop {
if let Err(e) = self.reconcile_dns_records().await { if let Err(error) = self.reconcile_dns_records().await {
warn!("FlashDNS controller cycle failed: {}", e); warn!(error = %error, "FlashDNS controller cycle failed");
} }
sleep(self.interval).await; sleep(self.interval).await;
} }
} }
/// Reconcile DNS records across all tenants
async fn reconcile_dns_records(&self) -> Result<()> { async fn reconcile_dns_records(&self) -> Result<()> {
// For MVP, iterate through known tenants let tenants = self.reconciliation_tenants().await?;
let tenants = vec![("default-org".to_string(), "default-project".to_string())]; if tenants.is_empty() {
debug!("No active tenants need FlashDNS reconciliation");
return Ok(());
}
for (org_id, project_id) in tenants { for tenant in tenants {
if let Err(e) = self.reconcile_tenant_dns(&org_id, &project_id).await { if let Err(error) = self.reconcile_tenant_dns(&tenant).await {
warn!( warn!(
"Failed to reconcile DNS for tenant {}/{}: {}", org_id = %tenant.org_id,
org_id, project_id, e project_id = %tenant.project_id,
error = %error,
"failed to reconcile tenant DNS"
); );
} }
} }
@ -79,243 +85,299 @@ impl FlashDnsController {
Ok(()) Ok(())
} }
/// Reconcile DNS records for a specific tenant async fn reconcile_tenant_dns(&self, tenant: &TenantRef) -> Result<()> {
async fn reconcile_tenant_dns(&self, org_id: &str, project_id: &str) -> Result<()> {
let auth_token = issue_controller_token( let auth_token = issue_controller_token(
&self.iam_server_addr, &self.iam_server_addr,
CONTROLLER_PRINCIPAL_ID, CONTROLLER_PRINCIPAL_ID,
org_id, &tenant.org_id,
project_id, &tenant.project_id,
) )
.await?; .await?;
let services = self
.storage
.list_services(&tenant.org_id, &tenant.project_id, None)
.await?;
// Ensure cluster.local zone exists for this tenant for service in services
let zone_id = match self .iter()
.ensure_zone_exists(org_id, project_id, &auth_token) .filter(|service| !service_requires_dns(service) && has_dns_state(service))
.await
{ {
Ok(id) => id, self.clear_service_dns_state(
Err(e) => { &tenant.org_id,
warn!( &tenant.project_id,
"Failed to ensure zone exists for tenant {}/{}: {}", service_namespace(service),
org_id, project_id, e &service.metadata.name,
); )
return Ok(()); .await?;
} }
let desired_services = services
.into_iter()
.filter(service_requires_dns)
.collect::<Vec<_>>();
let has_desired_services = !desired_services.is_empty();
let zone_id = match self
.zone_id_for_tenant(tenant, &auth_token, !desired_services.is_empty())
.await?
{
Some(zone_id) => zone_id,
None => return Ok(()),
}; };
// Get all services for this tenant
let services = self.storage.list_services(org_id, project_id, None).await?;
// Filter for services that need DNS records
let services_needing_dns: Vec<_> = services
.into_iter()
.filter(|svc| {
// Service needs DNS if:
// 1. Has cluster_ip allocated
// 2. Does NOT have flashdns.plasmacloud.io/record-id annotation (not yet provisioned)
svc.spec.cluster_ip.is_some()
&& !svc
.metadata
.annotations
.contains_key("flashdns.plasmacloud.io/record-id")
})
.collect();
if services_needing_dns.is_empty() {
debug!(
"No services need DNS records for tenant {}/{}",
org_id, project_id
);
return Ok(());
}
info!(
"Found {} service(s) needing DNS records for tenant {}/{}",
services_needing_dns.len(),
org_id,
project_id
);
// Connect to FlashDNS
let mut record_client = match RecordServiceClient::connect(self.flashdns_addr.clone()).await let mut record_client = match RecordServiceClient::connect(self.flashdns_addr.clone()).await
{ {
Ok(client) => client, Ok(client) => client,
Err(e) => { Err(error) => {
warn!( warn!(
"Failed to connect to FlashDNS at {}: {}", address = %self.flashdns_addr,
self.flashdns_addr, e error = %error,
"failed to connect to FlashDNS"
); );
return Ok(()); return Ok(());
} }
}; };
// Create DNS records for each service let mut existing_by_name = self
for mut service in services_needing_dns { .list_zone_records(&mut record_client, &zone_id, &auth_token)
let namespace = service .await?
.metadata .into_iter()
.namespace .filter(|record| is_managed_record(record))
.clone() .fold(
.unwrap_or_else(|| "default".to_string()); HashMap::<String, Vec<RecordInfo>>::new(),
let name = service.metadata.name.clone(); |mut acc, record| {
let cluster_ip = service.spec.cluster_ip.as_ref().unwrap(); acc.entry(record.name.clone()).or_default().push(record);
acc
// Construct DNS name: {service}.{namespace}.svc },
// Full FQDN will be: {service}.{namespace}.svc.cluster.local
let record_name = format!("{}.{}.svc", name, namespace);
info!(
"Creating DNS record {} → {} for service {}/{}",
record_name, cluster_ip, namespace, name
); );
// Create A record for service in desired_services {
let create_req = CreateRecordRequest { let namespace = service_namespace(&service).to_string();
zone_id: zone_id.clone(), let record_name = service_record_name(&service);
name: record_name.clone(), let cluster_ip = service
.spec
.cluster_ip
.as_deref()
.expect("service_requires_dns guarantees cluster_ip");
let existing = existing_by_name.remove(&record_name).unwrap_or_default();
let record = self
.ensure_service_record(
&mut record_client,
&auth_token,
&zone_id,
&record_name,
cluster_ip,
existing,
)
.await?;
self.reconcile_service_dns_state(
&tenant.org_id,
&tenant.project_id,
&namespace,
&service.metadata.name,
&zone_id,
&record.id,
)
.await?;
}
for stale_records in existing_by_name.into_values() {
for record in stale_records {
delete_record(&mut record_client, &auth_token, &record.id).await?;
}
}
if !has_desired_services {
self.zone_cache
.write()
.await
.remove(&tenant_cache_key(tenant));
}
Ok(())
}
async fn reconciliation_tenants(&self) -> Result<Vec<TenantRef>> {
let mut tenants = self
.storage
.list_active_tenants()
.await?
.into_iter()
.collect::<BTreeSet<_>>();
for cache_key in self.zone_cache.read().await.keys() {
if let Some(tenant) = parse_tenant_cache_key(cache_key) {
tenants.insert(tenant);
}
}
Ok(tenants.into_iter().collect())
}
async fn ensure_service_record(
&self,
client: &mut RecordServiceClient<Channel>,
auth_token: &str,
zone_id: &str,
record_name: &str,
cluster_ip: &str,
existing: Vec<RecordInfo>,
) -> Result<RecordInfo> {
let mut matching = None;
for record in existing {
let is_match = record.record_type == "A" && record_a_value(&record) == Some(cluster_ip);
if is_match && matching.is_none() {
matching = Some(record);
continue;
}
delete_record(client, auth_token, &record.id).await?;
}
if let Some(record) = matching {
return Ok(record);
}
Ok(client
.create_record(authorized_request(
CreateRecordRequest {
zone_id: zone_id.to_string(),
name: record_name.to_string(),
record_type: "A".to_string(), record_type: "A".to_string(),
ttl: DNS_RECORD_TTL, ttl: DNS_RECORD_TTL,
data: Some(RecordData { data: Some(RecordData {
data: Some(record_data::Data::A(ARecord { data: Some(record_data::Data::A(ARecord {
address: cluster_ip.clone(), address: cluster_ip.to_string(),
})), })),
}), }),
}; },
auth_token,
match record_client ))
.create_record(authorized_request(create_req, &auth_token)) .await?
.await .into_inner()
{ .record
Ok(response) => { .ok_or_else(|| anyhow::anyhow!("FlashDNS returned empty CreateRecord response"))?)
let record = response.into_inner().record;
if let Some(record) = record {
info!(
"Created DNS record {} → {} (record_id: {})",
record_name, cluster_ip, record.id
);
// Store record_id in service annotations
service
.metadata
.annotations
.insert("flashdns.plasmacloud.io/record-id".to_string(), record.id);
service.metadata.annotations.insert(
"flashdns.plasmacloud.io/zone-id".to_string(),
zone_id.clone(),
);
// Merge with the latest stored version so the FiberLB controller does not
// lose its status/annotations when both controllers reconcile together.
if let Ok(Some(mut current)) = self
.storage
.get_service(org_id, project_id, &namespace, &name)
.await
{
current.status = current.status.or(service.status.clone());
current
.metadata
.annotations
.extend(service.metadata.annotations.clone());
service = current;
} }
let current_version = service async fn reconcile_service_dns_state(
.metadata
.resource_version
.as_ref()
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(0);
service.metadata.resource_version = Some((current_version + 1).to_string());
// Save updated service
if let Err(e) = self.storage.put_service(&service).await {
warn!(
"Failed to update service {}/{} with DNS record ID: {}",
namespace, name, e
);
}
}
}
Err(e) => {
warn!(
"Failed to create DNS record {} for service {}/{}: {}",
record_name, namespace, name, e
);
}
}
}
Ok(())
}
/// Ensure cluster.local zone exists for tenant, return zone_id
async fn ensure_zone_exists(
&self, &self,
org_id: &str, org_id: &str,
project_id: &str, project_id: &str,
namespace: &str,
name: &str,
zone_id: &str,
record_id: &str,
) -> Result<()> {
let Some(mut service) = self
.storage
.get_service(org_id, project_id, namespace, name)
.await?
else {
return Ok(());
};
let mut changed = false;
changed |= set_annotation(&mut service, RECORD_ID_ANNOTATION, record_id);
changed |= set_annotation(&mut service, ZONE_ID_ANNOTATION, zone_id);
if !changed {
return Ok(());
}
bump_resource_version(&mut service);
self.storage.put_service(&service).await?;
Ok(())
}
async fn clear_service_dns_state(
&self,
org_id: &str,
project_id: &str,
namespace: &str,
name: &str,
) -> Result<()> {
let Some(mut service) = self
.storage
.get_service(org_id, project_id, namespace, name)
.await?
else {
return Ok(());
};
let mut changed = false;
changed |= service
.metadata
.annotations
.remove(RECORD_ID_ANNOTATION)
.is_some();
changed |= service
.metadata
.annotations
.remove(ZONE_ID_ANNOTATION)
.is_some();
if !changed {
return Ok(());
}
bump_resource_version(&mut service);
self.storage.put_service(&service).await?;
Ok(())
}
async fn zone_id_for_tenant(
&self,
tenant: &TenantRef,
auth_token: &str, auth_token: &str,
) -> Result<String> { create_if_missing: bool,
let cache_key = format!("{}/{}", org_id, project_id); ) -> Result<Option<String>> {
let cache_key = tenant_cache_key(tenant);
// Check cache first if let Some(zone_id) = self.zone_cache.read().await.get(&cache_key).cloned() {
{ return Ok(Some(zone_id));
let cache = self.zone_cache.read().await;
if let Some(zone_id) = cache.get(&cache_key) {
return Ok(zone_id.clone());
}
} }
// Connect to FlashDNS
let mut zone_client = ZoneServiceClient::connect(self.flashdns_addr.clone()).await?; let mut zone_client = ZoneServiceClient::connect(self.flashdns_addr.clone()).await?;
if let Some(zone_id) = self if let Some(zone_id) = self
.lookup_zone_id(&mut zone_client, CLUSTER_DOMAIN, auth_token) .lookup_zone_id(&mut zone_client, CLUSTER_DOMAIN, auth_token)
.await? .await?
{ {
info!( self.zone_cache
"Found existing zone {} for tenant {}/{} (zone_id: {})", .write()
CLUSTER_DOMAIN, org_id, project_id, zone_id .await
); .insert(cache_key, zone_id.clone());
return Ok(Some(zone_id));
let mut cache = self.zone_cache.write().await;
cache.insert(cache_key.clone(), zone_id.clone());
return Ok(zone_id);
} }
// Create zone if !create_if_missing {
info!( return Ok(None);
"Creating zone {} for tenant {}/{}", }
CLUSTER_DOMAIN, org_id, project_id
);
let create_req = CreateZoneRequest {
name: CLUSTER_DOMAIN.to_string(),
org_id: org_id.to_string(),
project_id: project_id.to_string(),
primary_ns: "ns1.plasmacloud.io".to_string(),
admin_email: "admin@plasmacloud.io".to_string(),
};
let response = match zone_client let response = match zone_client
.create_zone(authorized_request(create_req, auth_token)) .create_zone(authorized_request(
CreateZoneRequest {
name: CLUSTER_DOMAIN.to_string(),
org_id: tenant.org_id.clone(),
project_id: tenant.project_id.clone(),
primary_ns: "ns1.plasmacloud.io".to_string(),
admin_email: "admin@plasmacloud.io".to_string(),
},
auth_token,
))
.await .await
{ {
Ok(response) => response, Ok(response) => response,
Err(status) if status.code() == Code::AlreadyExists => { Err(status) if status.code() == Code::AlreadyExists => {
debug!(
"Zone {} already exists for tenant {}/{}; retrying lookup",
CLUSTER_DOMAIN, org_id, project_id
);
for _ in 0..5 { for _ in 0..5 {
if let Some(zone_id) = self if let Some(zone_id) = self
.lookup_zone_id(&mut zone_client, CLUSTER_DOMAIN, auth_token) .lookup_zone_id(&mut zone_client, CLUSTER_DOMAIN, auth_token)
.await? .await?
{ {
let mut cache = self.zone_cache.write().await; self.zone_cache
cache.insert(cache_key.clone(), zone_id.clone()); .write()
return Ok(zone_id); .await
.insert(cache_key.clone(), zone_id.clone());
return Ok(Some(zone_id));
} }
sleep(Duration::from_millis(200)).await; sleep(Duration::from_millis(200)).await;
} }
@ -323,32 +385,28 @@ impl FlashDnsController {
return Err(anyhow::anyhow!( return Err(anyhow::anyhow!(
"zone {} already exists for tenant {}/{} but could not be listed", "zone {} already exists for tenant {}/{} but could not be listed",
CLUSTER_DOMAIN, CLUSTER_DOMAIN,
org_id, tenant.org_id,
project_id tenant.project_id
)); ));
} }
Err(status) => return Err(status.into()), Err(status) => return Err(status.into()),
}; };
let zone = response
let zone_id = response
.into_inner() .into_inner()
.zone .zone
.ok_or_else(|| anyhow::anyhow!("FlashDNS returned empty zone"))?; .ok_or_else(|| anyhow::anyhow!("FlashDNS returned empty zone"))?
.id;
info!( self.zone_cache
"Created zone {} for tenant {}/{} (zone_id: {})", .write()
CLUSTER_DOMAIN, org_id, project_id, zone.id .await
); .insert(cache_key, zone_id.clone());
Ok(Some(zone_id))
// Cache zone_id
let mut cache = self.zone_cache.write().await;
cache.insert(cache_key, zone.id.clone());
Ok(zone.id)
} }
async fn lookup_zone_id( async fn lookup_zone_id(
&self, &self,
zone_client: &mut ZoneServiceClient<tonic::transport::Channel>, zone_client: &mut ZoneServiceClient<Channel>,
zone_name: &str, zone_name: &str,
auth_token: &str, auth_token: &str,
) -> Result<Option<String>> { ) -> Result<Option<String>> {
@ -360,11 +418,10 @@ impl FlashDnsController {
.get_zone(authorized_request(get_req, auth_token)) .get_zone(authorized_request(get_req, auth_token))
.await .await
{ {
Ok(response) => Ok(response.into_inner().zone.map(|z| z.id)), Ok(response) => Ok(response.into_inner().zone.map(|zone| zone.id)),
Err(e) if e.code() == Code::NotFound => Ok(None), Err(error) if error.code() == Code::NotFound => Ok(None),
Err(e) => { Err(error) => {
debug!("Exact zone lookup failed for {}: {}", zone_name, e); debug!("exact zone lookup failed for {}: {}", zone_name, error);
let list_req = ListZonesRequest { let list_req = ListZonesRequest {
org_id: String::new(), org_id: String::new(),
project_id: String::new(), project_id: String::new(),
@ -381,11 +438,13 @@ impl FlashDnsController {
.into_inner() .into_inner()
.zones .zones
.into_iter() .into_iter()
.find(|z| z.name.trim_end_matches('.') == zone_name.trim_end_matches('.')) .find(|zone| {
.map(|z| z.id)), zone.name.trim_end_matches('.') == zone_name.trim_end_matches('.')
})
.map(|zone| zone.id)),
Err(list_error) => { Err(list_error) => {
debug!( debug!(
"Zone list fallback failed for {}: {}", "zone list fallback failed for {}: {}",
zone_name, list_error zone_name, list_error
); );
Ok(None) Ok(None)
@ -395,33 +454,134 @@ impl FlashDnsController {
} }
} }
/// Cleanup DNS record when Service is deleted (not automatically triggered in MVP) async fn list_zone_records(
#[allow(dead_code)]
async fn cleanup_dns_record(
&self, &self,
org_id: &str, client: &mut RecordServiceClient<Channel>,
project_id: &str, zone_id: &str,
record_id: &str, auth_token: &str,
_zone_id: &str, ) -> Result<Vec<RecordInfo>> {
) -> Result<()> { let mut records = Vec::new();
let mut record_client = RecordServiceClient::connect(self.flashdns_addr.clone()).await?; let mut page_token = String::new();
let auth_token = issue_controller_token(
&self.iam_server_addr, loop {
CONTROLLER_PRINCIPAL_ID, let response = client
org_id, .list_records(authorized_request(
project_id, ListRecordsRequest {
zone_id: zone_id.to_string(),
name_filter: String::new(),
type_filter: String::new(),
page_size: 256,
page_token: page_token.clone(),
},
auth_token,
))
.await?
.into_inner();
records.extend(response.records);
if response.next_page_token.is_empty() {
break;
}
page_token = response.next_page_token;
}
Ok(records)
}
}
fn tenant_cache_key(tenant: &TenantRef) -> String {
format!("{}/{}", tenant.org_id, tenant.project_id)
}
fn parse_tenant_cache_key(value: &str) -> Option<TenantRef> {
let (org_id, project_id) = value.split_once('/')?;
Some(TenantRef {
org_id: org_id.to_string(),
project_id: project_id.to_string(),
})
}
fn service_requires_dns(service: &Service) -> bool {
service.spec.cluster_ip.is_some()
}
fn has_dns_state(service: &Service) -> bool {
service
.metadata
.annotations
.contains_key(RECORD_ID_ANNOTATION)
|| service
.metadata
.annotations
.contains_key(ZONE_ID_ANNOTATION)
}
fn service_namespace(service: &Service) -> &str {
service.metadata.namespace.as_deref().unwrap_or("default")
}
fn service_record_name(service: &Service) -> String {
format!(
"{}.{}.svc",
service.metadata.name,
service_namespace(service)
) )
.await?; }
let delete_req = DeleteRecordRequest { fn is_managed_record(record: &RecordInfo) -> bool {
record.name.ends_with(".svc")
}
fn record_a_value(record: &RecordInfo) -> Option<&str> {
record
.data
.as_ref()
.and_then(|data| data.data.as_ref())
.and_then(|data| match data {
record_data::Data::A(record) => Some(record.address.as_str()),
_ => None,
})
}
fn set_annotation(service: &mut Service, key: &str, value: &str) -> bool {
match service.metadata.annotations.get(key) {
Some(current) if current == value => false,
_ => {
service
.metadata
.annotations
.insert(key.to_string(), value.to_string());
true
}
}
}
fn bump_resource_version(service: &mut Service) {
let current = service
.metadata
.resource_version
.as_deref()
.and_then(|value| value.parse::<u64>().ok())
.unwrap_or(0);
service.metadata.resource_version = Some((current + 1).to_string());
}
async fn delete_record(
client: &mut RecordServiceClient<Channel>,
auth_token: &str,
record_id: &str,
) -> Result<()> {
match client
.delete_record(authorized_request(
DeleteRecordRequest {
id: record_id.to_string(), id: record_id.to_string(),
}; },
auth_token,
record_client ))
.delete_record(authorized_request(delete_req, &auth_token)) .await
.await?; {
Ok(_) => Ok(()),
info!("Deleted DNS record {} from FlashDNS", record_id); Err(status) if status.code() == Code::NotFound => Ok(()),
Ok(()) Err(status) => Err(status.into()),
} }
} }

View file

@ -3,10 +3,10 @@
//! Assigns pending pods to available nodes based on resource availability and scheduling policies. //! Assigns pending pods to available nodes based on resource availability and scheduling policies.
//! Implements tenant-aware scheduling with quota enforcement via CreditService. //! Implements tenant-aware scheduling with quota enforcement via CreditService.
use crate::storage::Storage; use crate::storage::{Storage, TenantRef};
use creditservice_client::Client as CreditServiceClient; use creditservice_client::Client as CreditServiceClient;
use k8shost_types::{Node, Pod}; use k8shost_types::{Node, Pod};
use std::collections::{HashMap, HashSet}; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::sync::RwLock; use tokio::sync::RwLock;
@ -83,7 +83,6 @@ impl Scheduler {
/// Schedule all pending pods across all tenants /// Schedule all pending pods across all tenants
async fn schedule_pending_pods(&self) -> anyhow::Result<()> { async fn schedule_pending_pods(&self) -> anyhow::Result<()> {
// Get list of active tenants from storage (query pods for unique org_id/project_id)
let tenants = self.get_active_tenants().await?; let tenants = self.get_active_tenants().await?;
if tenants.is_empty() { if tenants.is_empty() {
@ -93,11 +92,14 @@ impl Scheduler {
info!("Scheduling for {} active tenant(s)", tenants.len()); info!("Scheduling for {} active tenant(s)", tenants.len());
for (org_id, project_id) in tenants { for tenant in tenants {
if let Err(e) = self.schedule_tenant_pods(&org_id, &project_id).await { if let Err(e) = self
.schedule_tenant_pods(&tenant.org_id, &tenant.project_id)
.await
{
warn!( warn!(
"Failed to schedule pods for tenant {}/{}: {}", "Failed to schedule pods for tenant {}/{}: {}",
org_id, project_id, e tenant.org_id, tenant.project_id, e
); );
} }
} }
@ -105,31 +107,9 @@ impl Scheduler {
Ok(()) Ok(())
} }
/// Get list of active tenants from storage (unique org_id/project_id pairs) /// Get list of active tenants from storage-discovered resource metadata.
async fn get_active_tenants(&self) -> anyhow::Result<Vec<(String, String)>> { async fn get_active_tenants(&self) -> anyhow::Result<Vec<TenantRef>> {
// Query all pods to find unique (org_id, project_id) combinations self.storage.list_active_tenants().await.map_err(Into::into)
// This is a pragmatic approach that doesn't require IAM changes
let all_pods = self.storage.list_all_pods().await.unwrap_or_else(|e| {
warn!("Failed to query all pods for tenant discovery: {}", e);
vec![]
});
let mut tenants: HashSet<(String, String)> = HashSet::new();
for pod in all_pods {
if let (Some(org_id), Some(project_id)) =
(pod.metadata.org_id.clone(), pod.metadata.project_id.clone())
{
tenants.insert((org_id, project_id));
}
}
// Fall back to default tenant if no pods found
if tenants.is_empty() {
tenants.insert(("default-org".to_string(), "default-project".to_string()));
}
Ok(tenants.into_iter().collect())
} }
/// Schedule pending pods for a specific tenant /// Schedule pending pods for a specific tenant
@ -431,7 +411,7 @@ impl Scheduler {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use k8shost_types::{NodeCondition, NodeStatus, ObjectMeta, PodSpec, PodStatus}; use k8shost_types::{NodeCondition, NodeStatus, ObjectMeta};
#[tokio::test] #[tokio::test]
async fn test_is_node_ready() { async fn test_is_node_ready() {

View file

@ -5,6 +5,7 @@
use flaredb_client::RdbClient; use flaredb_client::RdbClient;
use k8shost_types::{Deployment, Node, Pod, Service}; use k8shost_types::{Deployment, Node, Pod, Service};
use serde::Deserialize;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::Mutex; use tokio::sync::Mutex;
@ -15,6 +16,25 @@ pub struct Storage {
client: Arc<Mutex<RdbClient>>, client: Arc<Mutex<RdbClient>>,
} }
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct TenantRef {
pub org_id: String,
pub project_id: String,
}
#[derive(Debug, Deserialize)]
struct ResourceMetadataEnvelope {
metadata: ResourceMetadataRef,
}
#[derive(Debug, Deserialize)]
struct ResourceMetadataRef {
#[serde(default)]
org_id: Option<String>,
#[serde(default)]
project_id: Option<String>,
}
impl Storage { impl Storage {
/// Create a new storage instance with FlareDB backend /// Create a new storage instance with FlareDB backend
pub async fn new(pd_addr: String) -> Result<Self, Box<dyn std::error::Error>> { pub async fn new(pd_addr: String) -> Result<Self, Box<dyn std::error::Error>> {
@ -220,6 +240,34 @@ impl Storage {
Ok(pods) Ok(pods)
} }
/// List active tenants discovered from all persisted k8shost resources.
pub async fn list_active_tenants(&self) -> Result<Vec<TenantRef>, Status> {
let prefix = b"k8s/".to_vec();
let mut end_key = prefix.clone();
end_key.push(0xff);
let mut tenants = std::collections::BTreeSet::new();
let mut start_key = prefix;
loop {
let mut client = self.client.lock().await;
let (_keys, values, next) = client
.raw_scan(start_key.clone(), end_key.clone(), 1000)
.await
.map_err(|e| Status::internal(format!("FlareDB scan failed: {}", e)))?;
tenants.extend(collect_active_tenants(values));
if let Some(next_key) = next {
start_key = next_key;
} else {
break;
}
}
Ok(tenants.into_iter().collect())
}
/// Delete a pod /// Delete a pod
pub async fn delete_pod( pub async fn delete_pod(
&self, &self,
@ -681,3 +729,62 @@ impl Storage {
Ok(existed) Ok(existed)
} }
} }
fn collect_active_tenants(
values: impl IntoIterator<Item = Vec<u8>>,
) -> std::collections::BTreeSet<TenantRef> {
let mut tenants = std::collections::BTreeSet::new();
for value in values {
let Ok(resource) = serde_json::from_slice::<ResourceMetadataEnvelope>(&value) else {
continue;
};
let (Some(org_id), Some(project_id)) =
(resource.metadata.org_id, resource.metadata.project_id)
else {
continue;
};
tenants.insert(TenantRef { org_id, project_id });
}
tenants
}
#[cfg(test)]
mod tests {
use super::{collect_active_tenants, TenantRef};
#[test]
fn collect_active_tenants_discovers_unique_tenants_from_mixed_resources() {
let tenants = collect_active_tenants(vec![
br#"{"metadata":{"name":"pod-a","org_id":"org-a","project_id":"project-a"}}"#.to_vec(),
br#"{"metadata":{"name":"svc-a","org_id":"org-a","project_id":"project-a"}}"#.to_vec(),
br#"{"metadata":{"name":"node-b","org_id":"org-b","project_id":"project-b"}}"#.to_vec(),
br#"{"metadata":{"name":"deploy-c","org_id":"org-c","project_id":"project-c"}}"#
.to_vec(),
br#"{"metadata":{"name":"invalid-missing-project","org_id":"org-z"}}"#.to_vec(),
br#"not-json"#.to_vec(),
]);
let tenants = tenants.into_iter().collect::<Vec<_>>();
assert_eq!(
tenants,
vec![
TenantRef {
org_id: "org-a".to_string(),
project_id: "project-a".to_string(),
},
TenantRef {
org_id: "org-b".to_string(),
project_id: "project-b".to_string(),
},
TenantRef {
org_id: "org-c".to_string(),
project_id: "project-c".to_string(),
},
]
);
}
}

View file

@ -3581,8 +3581,8 @@ validate_k8shost_flow() {
k8s_http_tunnel="$(start_ssh_tunnel node01 18087 8085)" k8s_http_tunnel="$(start_ssh_tunnel node01 18087 8085)"
trap 'stop_ssh_tunnel node01 "${k8s_http_tunnel}"; stop_ssh_tunnel node01 "${k8s_tunnel}"; stop_ssh_tunnel node01 "${lb_tunnel}"; stop_ssh_tunnel node01 "${dns_tunnel}"; stop_ssh_tunnel node01 "${prism_tunnel}"; stop_ssh_tunnel node01 "${iam_tunnel}"' RETURN trap 'stop_ssh_tunnel node01 "${k8s_http_tunnel}"; stop_ssh_tunnel node01 "${k8s_tunnel}"; stop_ssh_tunnel node01 "${lb_tunnel}"; stop_ssh_tunnel node01 "${dns_tunnel}"; stop_ssh_tunnel node01 "${prism_tunnel}"; stop_ssh_tunnel node01 "${iam_tunnel}"' RETURN
local org_id="default-org" local org_id="k8shost-org"
local project_id="default-project" local project_id="k8shost-project"
local principal_id="k8shost-smoke-$(date +%s)" local principal_id="k8shost-smoke-$(date +%s)"
local token node_name deployment_name pod_name service_name service_port local token node_name deployment_name pod_name service_name service_port
token="$(issue_project_admin_token 15080 "${org_id}" "${project_id}" "${principal_id}")" token="$(issue_project_admin_token 15080 "${org_id}" "${project_id}" "${principal_id}")"
@ -3801,23 +3801,58 @@ validate_k8shost_flow() {
-proto "${K8SHOST_PROTO}" \ -proto "${K8SHOST_PROTO}" \
-d "$(jq -cn --arg ns "default" --arg name "${pod_name}" '{namespace:$ns, name:$name}')" \ -d "$(jq -cn --arg ns "default" --arg name "${pod_name}" '{namespace:$ns, name:$name}')" \
127.0.0.1:15087 k8shost.PodService/DeletePod >/dev/null 127.0.0.1:15087 k8shost.PodService/DeletePod >/dev/null
grpcurl -plaintext \
deadline=$((SECONDS + HTTP_WAIT_TIMEOUT))
while true; do
if ! ssh_node node01 "dig @127.0.0.1 -p 5353 +short ${service_name}.default.svc.cluster.local A | grep -q ." >/dev/null 2>&1; then
break
fi
if (( SECONDS >= deadline )); then
die "timed out waiting for K8sHost FlashDNS cleanup for ${service_name}"
fi
sleep 2
done
deadline=$((SECONDS + HTTP_WAIT_TIMEOUT))
while true; do
local lb_list_json
lb_list_json="$(grpcurl -plaintext \
-H "authorization: Bearer ${token}" \
-import-path "${FIBERLB_PROTO_DIR}" \
-proto "${FIBERLB_PROTO}" \
-d "$(jq -cn --arg org "${org_id}" --arg project "${project_id}" '{orgId:$org, projectId:$project, pageSize:100}')" \
127.0.0.1:15085 fiberlb.v1.LoadBalancerService/ListLoadBalancers 2>/dev/null || true)"
if [[ -n "${lb_list_json}" ]] && ! printf '%s' "${lb_list_json}" | jq -e --arg id "${lb_id}" '.loadbalancers | any(.id == $id)' >/dev/null 2>&1; then
break
fi
if (( SECONDS >= deadline )); then
die "timed out waiting for K8sHost FiberLB cleanup for ${service_name}"
fi
sleep 2
done
deadline=$((SECONDS + HTTP_WAIT_TIMEOUT))
while true; do
if ! grpcurl -plaintext \
-H "authorization: Bearer ${token}" \ -H "authorization: Bearer ${token}" \
-import-path "${FLASHDNS_PROTO_DIR}" \ -import-path "${FLASHDNS_PROTO_DIR}" \
-proto "${FLASHDNS_PROTO}" \ -proto "${FLASHDNS_PROTO}" \
-d "$(jq -cn --arg id "${record_id}" '{id:$id}')" \ -d "$(jq -cn --arg id "${record_id}" '{id:$id}')" \
127.0.0.1:15084 flashdns.v1.RecordService/DeleteRecord >/dev/null 127.0.0.1:15084 flashdns.v1.RecordService/GetRecord >/dev/null 2>&1; then
grpcurl -plaintext \ break
-H "authorization: Bearer ${token}" \ fi
-import-path "${FIBERLB_PROTO_DIR}" \ if (( SECONDS >= deadline )); then
-proto "${FIBERLB_PROTO}" \ die "timed out waiting for K8sHost FlashDNS record deletion for ${service_name}"
-d "$(jq -cn --arg id "${lb_id}" '{id:$id}')" \ fi
127.0.0.1:15085 fiberlb.v1.LoadBalancerService/DeleteLoadBalancer >/dev/null sleep 2
done
trap - RETURN trap - RETURN
stop_ssh_tunnel node01 "${k8s_http_tunnel}"
stop_ssh_tunnel node01 "${k8s_tunnel}" stop_ssh_tunnel node01 "${k8s_tunnel}"
stop_ssh_tunnel node01 "${lb_tunnel}" stop_ssh_tunnel node01 "${lb_tunnel}"
stop_ssh_tunnel node01 "${dns_tunnel}" stop_ssh_tunnel node01 "${dns_tunnel}"
stop_ssh_tunnel node01 "${prism_tunnel}"
stop_ssh_tunnel node01 "${iam_tunnel}" stop_ssh_tunnel node01 "${iam_tunnel}"
} }