diff --git a/k8shost/Cargo.lock b/k8shost/Cargo.lock index 47d8064..ac2d6ef 100644 --- a/k8shost/Cargo.lock +++ b/k8shost/Cargo.lock @@ -1985,6 +1985,7 @@ dependencies = [ "prost 0.13.5", "serde", "serde_json", + "sha2", "tempfile", "tokio", "tokio-stream", diff --git a/k8shost/crates/k8shost-server/Cargo.toml b/k8shost/crates/k8shost-server/Cargo.toml index a6cb508..cce801c 100644 --- a/k8shost/crates/k8shost-server/Cargo.toml +++ b/k8shost/crates/k8shost-server/Cargo.toml @@ -39,6 +39,7 @@ chrono = { workspace = true } clap = { workspace = true } config = { workspace = true } toml = { workspace = true } +sha2 = "0.10" # REST API dependencies axum = "0.8" diff --git a/k8shost/crates/k8shost-server/src/cni.rs b/k8shost/crates/k8shost-server/src/cni.rs index ebc6d76..4e327bf 100644 --- a/k8shost/crates/k8shost-server/src/cni.rs +++ b/k8shost/crates/k8shost-server/src/cni.rs @@ -8,8 +8,8 @@ use anyhow::{Context, Result}; use serde_json::json; -use std::process::Command; use std::io::Write; +use std::process::Command; /// CNI configuration for pod network setup #[derive(Debug, Clone)] @@ -85,7 +85,9 @@ pub async fn invoke_cni_add( } // Wait for result - let output = child.wait_with_output().context("Failed to wait for CNI plugin")?; + let output = child + .wait_with_output() + .context("Failed to wait for CNI plugin")?; if !output.status.success() { let stderr = String::from_utf8_lossy(&output.stderr); @@ -93,8 +95,8 @@ pub async fn invoke_cni_add( } // Parse result - let result: CniResult = serde_json::from_slice(&output.stdout) - .context("Failed to parse CNI result")?; + let result: CniResult = + serde_json::from_slice(&output.stdout).context("Failed to parse CNI result")?; Ok(result) } @@ -146,11 +148,16 @@ pub async fn invoke_cni_del( } // Wait for result - let output = child.wait_with_output().context("Failed to wait for CNI plugin")?; + let output = child + .wait_with_output() + .context("Failed to wait for CNI plugin")?; if !output.status.success() { let stderr = String::from_utf8_lossy(&output.stderr); - tracing::warn!("CNI DEL error (may be expected if already deleted): {}", stderr); + tracing::warn!( + "CNI DEL error (may be expected if already deleted): {}", + stderr + ); } Ok(()) diff --git a/k8shost/crates/k8shost-server/src/config.rs b/k8shost/crates/k8shost-server/src/config.rs index a64272f..d2cbdd6 100644 --- a/k8shost/crates/k8shost-server/src/config.rs +++ b/k8shost/crates/k8shost-server/src/config.rs @@ -104,8 +104,7 @@ impl Default for ChainFireConfig { } } -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -#[derive(Default)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)] pub struct Config { pub server: ServerConfig, pub flaredb: FlareDbConfig, diff --git a/k8shost/crates/k8shost-server/src/deployment_controller.rs b/k8shost/crates/k8shost-server/src/deployment_controller.rs new file mode 100644 index 0000000..7f3b20d --- /dev/null +++ b/k8shost/crates/k8shost-server/src/deployment_controller.rs @@ -0,0 +1,414 @@ +//! Background reconciler for Deployment resources. + +use crate::services::deployment::{ + deployment_template_hash, DeploymentServiceImpl, DEPLOYMENT_NAME_ANNOTATION, + DEPLOYMENT_UID_ANNOTATION, TEMPLATE_HASH_ANNOTATION, +}; +use crate::storage::Storage; +use chrono::Utc; +use k8shost_types::{Deployment, DeploymentStatus, Pod, PodStatus}; +use std::collections::BTreeSet; +use std::sync::Arc; +use std::time::Duration; +use tokio::time::sleep; +use tonic::Status; +use tracing::{debug, info, warn}; +use uuid::Uuid; + +pub struct DeploymentController { + storage: Arc, + interval: Duration, +} + +#[derive(Debug)] +struct DeploymentPlan { + creates: Vec, + deletes: Vec, + status: DeploymentStatus, +} + +impl DeploymentController { + pub fn new(storage: Arc) -> Self { + Self { + storage, + interval: Duration::from_secs(5), + } + } + + pub async fn run(self: Arc) { + info!( + "Deployment controller started ({}s interval)", + self.interval.as_secs() + ); + + loop { + if let Err(error) = self.reconcile_all().await { + warn!(error = %error, "deployment reconciliation failed"); + } + sleep(self.interval).await; + } + } + + async fn reconcile_all(&self) -> anyhow::Result<()> { + let mut deployments = self.storage.list_all_deployments().await?; + deployments.sort_by(|lhs, rhs| { + lhs.metadata + .org_id + .cmp(&rhs.metadata.org_id) + .then_with(|| lhs.metadata.project_id.cmp(&rhs.metadata.project_id)) + .then_with(|| lhs.metadata.namespace.cmp(&rhs.metadata.namespace)) + .then_with(|| lhs.metadata.name.cmp(&rhs.metadata.name)) + }); + + for deployment in deployments { + if let Err(error) = self.reconcile_deployment(deployment).await { + warn!(error = %error, "failed to reconcile deployment"); + } + } + + Ok(()) + } + + async fn reconcile_deployment(&self, deployment: Deployment) -> anyhow::Result<()> { + let Some(org_id) = deployment.metadata.org_id.as_deref() else { + warn!(deployment = %deployment.metadata.name, "deployment missing org_id"); + return Ok(()); + }; + let Some(project_id) = deployment.metadata.project_id.as_deref() else { + warn!(deployment = %deployment.metadata.name, "deployment missing project_id"); + return Ok(()); + }; + let Some(namespace) = deployment.metadata.namespace.as_deref() else { + warn!(deployment = %deployment.metadata.name, "deployment missing namespace"); + return Ok(()); + }; + + let pods = self + .storage + .list_pods(org_id, project_id, Some(namespace), None) + .await?; + let plan = plan_deployment_reconciliation(&deployment, &pods)?; + + for pod_name in &plan.deletes { + let deleted = self + .storage + .delete_pod(org_id, project_id, namespace, pod_name) + .await?; + if deleted { + debug!( + deployment = %deployment.metadata.name, + pod = %pod_name, + "deleted deployment-managed pod" + ); + } + } + + for pod in &plan.creates { + self.storage.put_pod(pod).await?; + debug!( + deployment = %deployment.metadata.name, + pod = %pod.metadata.name, + "created deployment-managed pod" + ); + } + + let status_changed = !deployment_status_eq(deployment.status.as_ref(), Some(&plan.status)); + if !plan.creates.is_empty() || !plan.deletes.is_empty() || status_changed { + let mut updated = deployment.clone(); + updated.status = Some(plan.status); + updated.metadata.resource_version = Some(next_resource_version( + deployment.metadata.resource_version.as_deref(), + )); + self.storage.put_deployment(&updated).await?; + } + + Ok(()) + } +} + +fn plan_deployment_reconciliation( + deployment: &Deployment, + pods: &[Pod], +) -> Result { + DeploymentServiceImpl::validate_spec(deployment)?; + let template_hash = deployment_template_hash(deployment)?; + let desired_replicas = deployment.spec.replicas.unwrap_or(1).max(0) as usize; + let mut owned = pods + .iter() + .filter(|pod| DeploymentServiceImpl::pod_is_owned_by_deployment(deployment, pod)) + .cloned() + .collect::>(); + owned.sort_by(|lhs, rhs| lhs.metadata.name.cmp(&rhs.metadata.name)); + + let mut current = Vec::new(); + let mut stale = Vec::new(); + for pod in owned { + let pod_hash = pod + .metadata + .annotations + .get(TEMPLATE_HASH_ANNOTATION) + .map(String::as_str); + if pod_hash == Some(template_hash.as_str()) { + current.push(pod); + } else { + stale.push(pod); + } + } + + let mut deletes = stale + .iter() + .map(|pod| pod.metadata.name.clone()) + .collect::>(); + let mut survivors = current; + if survivors.len() > desired_replicas { + let excess = survivors.split_off(desired_replicas); + deletes.extend(excess.into_iter().map(|pod| pod.metadata.name)); + } + + let used_ordinals = pods + .iter() + .filter(|pod| DeploymentServiceImpl::pod_is_owned_by_deployment(deployment, pod)) + .filter_map(|pod| parse_pod_ordinal(&deployment.metadata.name, &pod.metadata.name)) + .collect::>(); + + let ready_replicas = survivors.iter().filter(|pod| pod_is_ready(pod)).count() as i32; + let available_replicas = ready_replicas; + + let mut creates = Vec::new(); + let mut used_ordinals = used_ordinals; + while survivors.len() + creates.len() < desired_replicas { + let ordinal = next_available_ordinal(&used_ordinals); + used_ordinals.insert(ordinal); + creates.push(build_pod(deployment, &template_hash, ordinal)?); + } + + Ok(DeploymentPlan { + creates, + deletes, + status: DeploymentStatus { + replicas: Some(desired_replicas as i32), + ready_replicas: Some(ready_replicas), + available_replicas: Some(available_replicas), + }, + }) +} + +fn build_pod(deployment: &Deployment, template_hash: &str, ordinal: usize) -> Result { + let Some(namespace) = deployment.metadata.namespace.clone() else { + return Err(Status::invalid_argument("deployment namespace is required")); + }; + let Some(org_id) = deployment.metadata.org_id.clone() else { + return Err(Status::invalid_argument("deployment org_id is required")); + }; + let Some(project_id) = deployment.metadata.project_id.clone() else { + return Err(Status::invalid_argument( + "deployment project_id is required", + )); + }; + + let mut metadata = deployment.spec.template.metadata.clone(); + metadata.name = format!("{}-{}", deployment.metadata.name, ordinal); + metadata.namespace = Some(namespace); + metadata.org_id = Some(org_id); + metadata.project_id = Some(project_id); + metadata.uid = Some(Uuid::new_v4().to_string()); + metadata.resource_version = Some("1".to_string()); + metadata.creation_timestamp = Some(Utc::now()); + metadata.annotations.insert( + DEPLOYMENT_NAME_ANNOTATION.to_string(), + deployment.metadata.name.clone(), + ); + if let Some(uid) = deployment.metadata.uid.as_ref() { + metadata + .annotations + .insert(DEPLOYMENT_UID_ANNOTATION.to_string(), uid.clone()); + } + metadata.annotations.insert( + TEMPLATE_HASH_ANNOTATION.to_string(), + template_hash.to_string(), + ); + + let mut spec = deployment.spec.template.spec.clone(); + spec.node_name = None; + + Ok(Pod { + metadata, + spec, + status: Some(PodStatus { + phase: Some("Pending".to_string()), + pod_ip: None, + host_ip: None, + conditions: Vec::new(), + }), + }) +} + +fn pod_is_ready(pod: &Pod) -> bool { + pod.status + .as_ref() + .and_then(|status| status.phase.as_deref()) + == Some("Running") +} + +fn parse_pod_ordinal(deployment_name: &str, pod_name: &str) -> Option { + let suffix = pod_name.strip_prefix(&format!("{}-", deployment_name))?; + suffix.parse::().ok() +} + +fn next_available_ordinal(used: &BTreeSet) -> usize { + let mut ordinal = 1usize; + while used.contains(&ordinal) { + ordinal += 1; + } + ordinal +} + +fn deployment_status_eq(lhs: Option<&DeploymentStatus>, rhs: Option<&DeploymentStatus>) -> bool { + match (lhs, rhs) { + (None, None) => true, + (Some(lhs), Some(rhs)) => { + lhs.replicas == rhs.replicas + && lhs.ready_replicas == rhs.ready_replicas + && lhs.available_replicas == rhs.available_replicas + } + _ => false, + } +} + +fn next_resource_version(current: Option<&str>) -> String { + let current = current + .and_then(|version| version.parse::().ok()) + .unwrap_or(0); + (current + 1).to_string() +} + +#[cfg(test)] +mod tests { + use super::*; + use std::collections::HashMap; + + fn test_deployment() -> Deployment { + Deployment { + metadata: k8shost_types::ObjectMeta { + name: "web".to_string(), + namespace: Some("default".to_string()), + uid: Some("deployment-uid".to_string()), + resource_version: Some("1".to_string()), + creation_timestamp: None, + labels: HashMap::new(), + annotations: HashMap::new(), + org_id: Some("test-org".to_string()), + project_id: Some("test-project".to_string()), + }, + spec: k8shost_types::DeploymentSpec { + replicas: Some(2), + selector: k8shost_types::LabelSelector { + match_labels: HashMap::from([("app".to_string(), "web".to_string())]), + }, + template: k8shost_types::PodTemplateSpec { + metadata: k8shost_types::ObjectMeta { + name: "".to_string(), + namespace: Some("default".to_string()), + uid: None, + resource_version: None, + creation_timestamp: None, + labels: HashMap::from([("app".to_string(), "web".to_string())]), + annotations: HashMap::new(), + org_id: None, + project_id: None, + }, + spec: k8shost_types::PodSpec { + containers: vec![k8shost_types::Container { + name: "web".to_string(), + image: "nginx:latest".to_string(), + command: Vec::new(), + args: Vec::new(), + ports: Vec::new(), + env: Vec::new(), + resources: None, + }], + restart_policy: Some("Always".to_string()), + node_name: None, + }, + }, + }, + status: None, + } + } + + fn owned_pod(name: &str, hash: &str, phase: &str) -> Pod { + Pod { + metadata: k8shost_types::ObjectMeta { + name: name.to_string(), + namespace: Some("default".to_string()), + uid: None, + resource_version: None, + creation_timestamp: None, + labels: HashMap::from([("app".to_string(), "web".to_string())]), + annotations: HashMap::from([ + (DEPLOYMENT_NAME_ANNOTATION.to_string(), "web".to_string()), + ( + DEPLOYMENT_UID_ANNOTATION.to_string(), + "deployment-uid".to_string(), + ), + (TEMPLATE_HASH_ANNOTATION.to_string(), hash.to_string()), + ]), + org_id: Some("test-org".to_string()), + project_id: Some("test-project".to_string()), + }, + spec: k8shost_types::PodSpec { + containers: Vec::new(), + restart_policy: None, + node_name: None, + }, + status: Some(PodStatus { + phase: Some(phase.to_string()), + pod_ip: None, + host_ip: None, + conditions: Vec::new(), + }), + } + } + + #[test] + fn plan_creates_missing_replicas() { + let deployment = test_deployment(); + let plan = plan_deployment_reconciliation(&deployment, &[]).unwrap(); + + assert_eq!(plan.creates.len(), 2); + assert!(plan.deletes.is_empty()); + assert_eq!(plan.status.replicas, Some(2)); + } + + #[test] + fn plan_scales_down_extra_pods() { + let mut deployment = test_deployment(); + deployment.spec.replicas = Some(1); + let hash = deployment_template_hash(&deployment).unwrap(); + let pods = vec![ + owned_pod("web-1", &hash, "Running"), + owned_pod("web-2", &hash, "Running"), + ]; + + let plan = plan_deployment_reconciliation(&deployment, &pods).unwrap(); + + assert!(plan.creates.is_empty()); + assert_eq!(plan.deletes, vec!["web-2".to_string()]); + assert_eq!(plan.status.ready_replicas, Some(1)); + } + + #[test] + fn plan_replaces_stale_template_pods() { + let deployment = test_deployment(); + let pods = vec![ + owned_pod("web-1", "stale", "Running"), + owned_pod("web-2", "stale", "Running"), + ]; + + let plan = plan_deployment_reconciliation(&deployment, &pods).unwrap(); + + assert_eq!(plan.creates.len(), 2); + assert_eq!(plan.deletes.len(), 2); + assert_eq!(plan.status.ready_replicas, Some(0)); + } +} diff --git a/k8shost/crates/k8shost-server/src/fiberlb_controller.rs b/k8shost/crates/k8shost-server/src/fiberlb_controller.rs index 4de33c3..f639a7c 100644 --- a/k8shost/crates/k8shost-server/src/fiberlb_controller.rs +++ b/k8shost/crates/k8shost-server/src/fiberlb_controller.rs @@ -11,8 +11,8 @@ use fiberlb_api::listener_service_client::ListenerServiceClient; use fiberlb_api::load_balancer_service_client::LoadBalancerServiceClient; use fiberlb_api::pool_service_client::PoolServiceClient; use fiberlb_api::{ - CreateBackendRequest, CreateListenerRequest, CreateLoadBalancerRequest, - CreatePoolRequest, DeleteLoadBalancerRequest, ListenerProtocol, PoolAlgorithm, PoolProtocol, + CreateBackendRequest, CreateListenerRequest, CreateLoadBalancerRequest, CreatePoolRequest, + DeleteLoadBalancerRequest, ListenerProtocol, PoolAlgorithm, PoolProtocol, }; use k8shost_types::{LoadBalancerIngress, LoadBalancerStatus, ServiceStatus}; use std::sync::Arc; @@ -65,7 +65,10 @@ impl FiberLbController { let tenants = vec![("default-org".to_string(), "default-project".to_string())]; for (org_id, project_id) in tenants { - if let Err(e) = self.reconcile_tenant_loadbalancers(&org_id, &project_id).await { + if let Err(e) = self + .reconcile_tenant_loadbalancers(&org_id, &project_id) + .await + { warn!( "Failed to reconcile LoadBalancers for tenant {}/{}: {}", org_id, project_id, e @@ -79,10 +82,7 @@ impl FiberLbController { /// Reconcile LoadBalancer services for a specific tenant async fn reconcile_tenant_loadbalancers(&self, org_id: &str, project_id: &str) -> Result<()> { // Get all services for this tenant - let services = self - .storage - .list_services(org_id, project_id, None) - .await?; + let services = self.storage.list_services(org_id, project_id, None).await?; // Filter for LoadBalancer services that need provisioning let lb_services: Vec<_> = services @@ -93,12 +93,19 @@ impl FiberLbController { // 2. status is None OR status.load_balancer is None (not yet provisioned) svc.spec.r#type.as_deref() == Some("LoadBalancer") && (svc.status.is_none() - || svc.status.as_ref().and_then(|s| s.load_balancer.as_ref()).is_none()) + || svc + .status + .as_ref() + .and_then(|s| s.load_balancer.as_ref()) + .is_none()) }) .collect(); if lb_services.is_empty() { - debug!("No LoadBalancer services to provision for tenant {}/{}", org_id, project_id); + debug!( + "No LoadBalancer services to provision for tenant {}/{}", + org_id, project_id + ); return Ok(()); } @@ -122,7 +129,10 @@ impl FiberLbController { match LoadBalancerServiceClient::connect(self.fiberlb_addr.clone()).await { Ok(client) => client, Err(e) => { - warn!("Failed to connect to FiberLB at {}: {}", self.fiberlb_addr, e); + warn!( + "Failed to connect to FiberLB at {}: {}", + self.fiberlb_addr, e + ); return Ok(()); } }; @@ -135,21 +145,23 @@ impl FiberLbController { } }; - let mut listener_client = match ListenerServiceClient::connect(self.fiberlb_addr.clone()).await { - Ok(client) => client, - Err(e) => { - warn!("Failed to connect to FiberLB ListenerService: {}", e); - return Ok(()); - } - }; + let mut listener_client = + match ListenerServiceClient::connect(self.fiberlb_addr.clone()).await { + Ok(client) => client, + Err(e) => { + warn!("Failed to connect to FiberLB ListenerService: {}", e); + return Ok(()); + } + }; - let mut backend_client = match BackendServiceClient::connect(self.fiberlb_addr.clone()).await { - Ok(client) => client, - Err(e) => { - warn!("Failed to connect to FiberLB BackendService: {}", e); - return Ok(()); - } - }; + let mut backend_client = + match BackendServiceClient::connect(self.fiberlb_addr.clone()).await { + Ok(client) => client, + Err(e) => { + warn!("Failed to connect to FiberLB BackendService: {}", e); + return Ok(()); + } + }; // Provision each LoadBalancer service for mut service in lb_services { @@ -160,7 +172,10 @@ impl FiberLbController { .unwrap_or_else(|| "default".to_string()); let name = service.metadata.name.clone(); - info!("Provisioning LoadBalancer for service {}/{}", namespace, name); + info!( + "Provisioning LoadBalancer for service {}/{}", + namespace, name + ); // Create LoadBalancer in FiberLB let lb_name = format!("{}.{}", name, namespace); @@ -210,19 +225,25 @@ impl FiberLbController { // Create Pool for this LoadBalancer let pool_name = format!("{}-pool", lb_name); let pool_id = match pool_client - .create_pool(authorized_request(CreatePoolRequest { - name: pool_name.clone(), - loadbalancer_id: lb_id.clone(), - algorithm: PoolAlgorithm::RoundRobin as i32, - protocol: PoolProtocol::Tcp as i32, - session_persistence: None, - }, &auth_token)) + .create_pool(authorized_request( + CreatePoolRequest { + name: pool_name.clone(), + loadbalancer_id: lb_id.clone(), + algorithm: PoolAlgorithm::RoundRobin as i32, + protocol: PoolProtocol::Tcp as i32, + session_persistence: None, + }, + &auth_token, + )) .await { Ok(response) => { let pool = response.into_inner().pool; if let Some(pool) = pool { - info!("Created Pool {} for service {}/{}", pool.id, namespace, name); + info!( + "Created Pool {} for service {}/{}", + pool.id, namespace, name + ); pool.id } else { warn!("Failed to create Pool for service {}/{}", namespace, name); @@ -230,7 +251,10 @@ impl FiberLbController { } } Err(e) => { - warn!("Failed to create Pool for service {}/{}: {}", namespace, name, e); + warn!( + "Failed to create Pool for service {}/{}: {}", + namespace, name, e + ); continue; } }; @@ -241,19 +265,25 @@ impl FiberLbController { let listener_name = format!( "{}-listener-{}", lb_name, - svc_port.name.as_deref().unwrap_or(&svc_port.port.to_string()) + svc_port + .name + .as_deref() + .unwrap_or(&svc_port.port.to_string()) ); match listener_client - .create_listener(authorized_request(CreateListenerRequest { - name: listener_name.clone(), - loadbalancer_id: lb_id.clone(), - protocol: ListenerProtocol::Tcp as i32, - port: svc_port.port as u32, - default_pool_id: pool_id.clone(), - tls_config: None, - connection_limit: 0, // No limit - }, &auth_token)) + .create_listener(authorized_request( + CreateListenerRequest { + name: listener_name.clone(), + loadbalancer_id: lb_id.clone(), + protocol: ListenerProtocol::Tcp as i32, + port: svc_port.port as u32, + default_pool_id: pool_id.clone(), + tls_config: None, + connection_limit: 0, // No limit + }, + &auth_token, + )) .await { Ok(response) => { @@ -327,21 +357,20 @@ impl FiberLbController { // Use target_port if specified, otherwise use port let backend_port = svc_port.target_port.unwrap_or(svc_port.port); - let backend_name = format!( - "{}-backend-{}-{}", - lb_name, - pod.metadata.name, - backend_port - ); + let backend_name = + format!("{}-backend-{}-{}", lb_name, pod.metadata.name, backend_port); match backend_client - .create_backend(authorized_request(CreateBackendRequest { - name: backend_name.clone(), - pool_id: pool_id.clone(), - address: pod_ip.clone(), - port: backend_port as u32, - weight: 1, - }, &auth_token)) + .create_backend(authorized_request( + CreateBackendRequest { + name: backend_name.clone(), + pool_id: pool_id.clone(), + address: pod_ip.clone(), + port: backend_port as u32, + weight: 1, + }, + &auth_token, + )) .await { Ok(response) => { @@ -397,10 +426,10 @@ impl FiberLbController { .metadata .annotations .insert("fiberlb.plasmacloud.io/lb-id".to_string(), lb_id.clone()); - service - .metadata - .annotations - .insert("fiberlb.plasmacloud.io/pool-id".to_string(), pool_id.clone()); + service.metadata.annotations.insert( + "fiberlb.plasmacloud.io/pool-id".to_string(), + pool_id.clone(), + ); // Merge with the latest stored version so the DNS controller does not lose its annotations. if let Ok(Some(mut current)) = self @@ -447,9 +476,14 @@ impl FiberLbController { /// This should be called when a Service with type=LoadBalancer is deleted. /// For MVP, this is not automatically triggered - would need a deletion watch. #[allow(dead_code)] - async fn cleanup_loadbalancer(&self, org_id: &str, project_id: &str, lb_id: &str) -> Result<()> { - let mut fiberlb_client = LoadBalancerServiceClient::connect(self.fiberlb_addr.clone()) - .await?; + async fn cleanup_loadbalancer( + &self, + org_id: &str, + project_id: &str, + lb_id: &str, + ) -> Result<()> { + let mut fiberlb_client = + LoadBalancerServiceClient::connect(self.fiberlb_addr.clone()).await?; let auth_token = issue_controller_token( &self.iam_server_addr, CONTROLLER_PRINCIPAL_ID, diff --git a/k8shost/crates/k8shost-server/src/flashdns_controller.rs b/k8shost/crates/k8shost-server/src/flashdns_controller.rs index 67a3d7c..05e173e 100644 --- a/k8shost/crates/k8shost-server/src/flashdns_controller.rs +++ b/k8shost/crates/k8shost-server/src/flashdns_controller.rs @@ -90,7 +90,10 @@ impl FlashDnsController { .await?; // Ensure cluster.local zone exists for this tenant - let zone_id = match self.ensure_zone_exists(org_id, project_id, &auth_token).await { + let zone_id = match self + .ensure_zone_exists(org_id, project_id, &auth_token) + .await + { Ok(id) => id, Err(e) => { warn!( @@ -102,10 +105,7 @@ impl FlashDnsController { }; // Get all services for this tenant - let services = self - .storage - .list_services(org_id, project_id, None) - .await?; + let services = self.storage.list_services(org_id, project_id, None).await?; // Filter for services that need DNS records let services_needing_dns: Vec<_> = services @@ -123,7 +123,10 @@ impl FlashDnsController { .collect(); if services_needing_dns.is_empty() { - debug!("No services need DNS records for tenant {}/{}", org_id, project_id); + debug!( + "No services need DNS records for tenant {}/{}", + org_id, project_id + ); return Ok(()); } @@ -139,7 +142,10 @@ impl FlashDnsController { { Ok(client) => client, Err(e) => { - warn!("Failed to connect to FlashDNS at {}: {}", self.flashdns_addr, e); + warn!( + "Failed to connect to FlashDNS at {}: {}", + self.flashdns_addr, e + ); return Ok(()); } }; @@ -375,13 +381,13 @@ impl FlashDnsController { .into_inner() .zones .into_iter() - .find(|z| { - z.name.trim_end_matches('.') - == zone_name.trim_end_matches('.') - }) + .find(|z| z.name.trim_end_matches('.') == zone_name.trim_end_matches('.')) .map(|z| z.id)), Err(list_error) => { - debug!("Zone list fallback failed for {}: {}", zone_name, list_error); + debug!( + "Zone list fallback failed for {}: {}", + zone_name, list_error + ); Ok(None) } } diff --git a/k8shost/crates/k8shost-server/src/ipam_client.rs b/k8shost/crates/k8shost-server/src/ipam_client.rs index 7902ea9..c0204de 100644 --- a/k8shost/crates/k8shost-server/src/ipam_client.rs +++ b/k8shost/crates/k8shost-server/src/ipam_client.rs @@ -119,14 +119,17 @@ impl IpamClient { let mut client = self.connect().await?; self.ensure_default_cluster_ip_pool(&mut client, org_id, project_id, authorization) .await?; - let request = Self::with_auth(AllocateServiceIpRequest { - org_id: org_id.to_string(), - project_id: project_id.to_string(), - pool_id: String::new(), // Use default pool - pool_type: ProtoServiceIpPoolType::ClusterIp as i32, - service_uid: service_uid.to_string(), - requested_ip: String::new(), // Auto-allocate - }, authorization)?; + let request = Self::with_auth( + AllocateServiceIpRequest { + org_id: org_id.to_string(), + project_id: project_id.to_string(), + pool_id: String::new(), // Use default pool + pool_type: ProtoServiceIpPoolType::ClusterIp as i32, + service_uid: service_uid.to_string(), + requested_ip: String::new(), // Auto-allocate + }, + authorization, + )?; let response = client .allocate_service_ip(request) @@ -151,11 +154,14 @@ impl IpamClient { authorization: Option<&str>, ) -> Result<()> { let mut client = self.connect().await?; - let request = Self::with_auth(ReleaseServiceIpRequest { - org_id: org_id.to_string(), - project_id: project_id.to_string(), - ip_address: ip_address.to_string(), - }, authorization)?; + let request = Self::with_auth( + ReleaseServiceIpRequest { + org_id: org_id.to_string(), + project_id: project_id.to_string(), + ip_address: ip_address.to_string(), + }, + authorization, + )?; client .release_service_ip(request) diff --git a/k8shost/crates/k8shost-server/src/lib.rs b/k8shost/crates/k8shost-server/src/lib.rs index 9f81c82..aec1de8 100644 --- a/k8shost/crates/k8shost-server/src/lib.rs +++ b/k8shost/crates/k8shost-server/src/lib.rs @@ -3,14 +3,16 @@ //! Exports modules for testing and reuse pub mod auth; +pub mod deployment_controller; pub mod ipam_client; pub mod services { + pub mod deployment; + pub mod node; pub mod pod; pub mod service; - pub mod node; } -pub mod storage; pub mod config; pub mod rest; +pub mod storage; pub use ipam_client::IpamClient; diff --git a/k8shost/crates/k8shost-server/src/main.rs b/k8shost/crates/k8shost-server/src/main.rs index 8b91226..a101aa5 100644 --- a/k8shost/crates/k8shost-server/src/main.rs +++ b/k8shost/crates/k8shost-server/src/main.rs @@ -1,6 +1,7 @@ mod auth; mod cni; mod config; +mod deployment_controller; mod fiberlb_controller; mod flashdns_controller; mod ipam_client; @@ -15,19 +16,19 @@ use chainfire_client::Client as ChainFireClient; use clap::Parser; use config::Config; use ipam_client::IpamClient; -use metrics_exporter_prometheus::PrometheusBuilder; use k8shost_proto::{ - deployment_service_server::{DeploymentService, DeploymentServiceServer}, - node_service_server::NodeServiceServer, - pod_service_server::PodServiceServer, - service_service_server::ServiceServiceServer, - *, + deployment_service_server::DeploymentServiceServer, node_service_server::NodeServiceServer, + pod_service_server::PodServiceServer, service_service_server::ServiceServiceServer, +}; +use metrics_exporter_prometheus::PrometheusBuilder; +use services::{ + deployment::DeploymentServiceImpl, node::NodeServiceImpl, pod::PodServiceImpl, + service::ServiceServiceImpl, }; -use services::{node::NodeServiceImpl, pod::PodServiceImpl, service::ServiceServiceImpl}; -use std::{path::PathBuf, sync::Arc}; use std::time::{SystemTime, UNIX_EPOCH}; +use std::{path::PathBuf, sync::Arc}; use storage::Storage; -use tonic::{transport::Server, Request, Response, Status}; +use tonic::{transport::Server, Request, Status}; use tracing::{info, warn}; use tracing_subscriber::EnvFilter; @@ -116,19 +117,27 @@ async fn main() -> Result<(), Box> { }, flaredb: config::FlareDbConfig { pd_addr: args.flaredb_pd_addr.or(loaded_config.flaredb.pd_addr), - direct_addr: args.flaredb_direct_addr.or(loaded_config.flaredb.direct_addr), + direct_addr: args + .flaredb_direct_addr + .or(loaded_config.flaredb.direct_addr), }, chainfire: config::ChainFireConfig { endpoint: args.chainfire_endpoint.or(loaded_config.chainfire.endpoint), }, iam: config::IamConfig { - server_addr: args.iam_server_addr.unwrap_or(loaded_config.iam.server_addr), + server_addr: args + .iam_server_addr + .unwrap_or(loaded_config.iam.server_addr), }, fiberlb: config::FiberLbConfig { - server_addr: args.fiberlb_server_addr.unwrap_or(loaded_config.fiberlb.server_addr), + server_addr: args + .fiberlb_server_addr + .unwrap_or(loaded_config.fiberlb.server_addr), }, flashdns: config::FlashDnsConfig { - server_addr: args.flashdns_server_addr.unwrap_or(loaded_config.flashdns.server_addr), + server_addr: args + .flashdns_server_addr + .unwrap_or(loaded_config.flashdns.server_addr), }, prismnet: config::PrismNetConfig { server_addr: args @@ -198,7 +207,7 @@ async fn main() -> Result<(), Box> { } } else { return Err( - anyhow::anyhow!("Failed to connect to FlareDB (direct): {}", e).into() + anyhow::anyhow!("Failed to connect to FlareDB (direct): {}", e).into(), ); } } @@ -211,7 +220,10 @@ async fn main() -> Result<(), Box> { Arc::new(s) } Err(e) => { - warn!("Failed to connect to FlareDB: {}. Server will not start.", e); + warn!( + "Failed to connect to FlareDB: {}. Server will not start.", + e + ); return Err(anyhow::anyhow!("Failed to connect to FlareDB: {}", e).into()); } } @@ -228,7 +240,10 @@ async fn main() -> Result<(), Box> { Arc::new(s) } Err(e) => { - warn!("Failed to connect to IAM server: {}. Server will not start.", e); + warn!( + "Failed to connect to IAM server: {}. Server will not start.", + e + ); return Err(anyhow::anyhow!("Failed to connect to IAM server: {}", e).into()); } }; @@ -262,7 +277,7 @@ async fn main() -> Result<(), Box> { auth_service.clone(), )); let node_service = Arc::new(NodeServiceImpl::new(storage.clone(), auth_service.clone())); - let deployment_service = DeploymentServiceImpl; // Still unimplemented + let deployment_service = DeploymentServiceImpl::new(storage.clone(), auth_service.clone()); // Start scheduler in background with CreditService integration let scheduler = Arc::new(scheduler::Scheduler::new_with_credit_service(storage.clone()).await); @@ -271,6 +286,14 @@ async fn main() -> Result<(), Box> { }); info!("Scheduler started - tenant-aware with quota enforcement"); + let deployment_controller = Arc::new(deployment_controller::DeploymentController::new( + storage.clone(), + )); + tokio::spawn(async move { + deployment_controller.run().await; + }); + info!("Deployment controller started - reconciling Deployment resources"); + // Start FiberLB controller in background let fiberlb_controller = Arc::new(fiberlb_controller::FiberLbController::new( storage.clone(), @@ -280,7 +303,9 @@ async fn main() -> Result<(), Box> { tokio::spawn(async move { fiberlb_controller.run().await; }); - info!("FiberLB controller started - monitoring LoadBalancer services with per-tenant IAM tokens"); + info!( + "FiberLB controller started - monitoring LoadBalancer services with per-tenant IAM tokens" + ); // Start FlashDNS controller in background let flashdns_controller = Arc::new(flashdns_controller::FlashDnsController::new( @@ -297,25 +322,22 @@ async fn main() -> Result<(), Box> { // Build gRPC server with authentication layer let grpc_server = Server::builder() - .add_service( - tonic::codegen::InterceptedService::new( - PodServiceServer::new(pod_service.as_ref().clone()), - make_interceptor(auth_service.clone()), - ), - ) - .add_service( - tonic::codegen::InterceptedService::new( - ServiceServiceServer::new(service_service.as_ref().clone()), - make_interceptor(auth_service.clone()), - ), - ) - .add_service( - tonic::codegen::InterceptedService::new( - NodeServiceServer::new(node_service.as_ref().clone()), - make_interceptor(auth_service.clone()), - ), - ) - .add_service(DeploymentServiceServer::new(deployment_service)) + .add_service(tonic::codegen::InterceptedService::new( + PodServiceServer::new(pod_service.as_ref().clone()), + make_interceptor(auth_service.clone()), + )) + .add_service(tonic::codegen::InterceptedService::new( + ServiceServiceServer::new(service_service.as_ref().clone()), + make_interceptor(auth_service.clone()), + )) + .add_service(tonic::codegen::InterceptedService::new( + NodeServiceServer::new(node_service.as_ref().clone()), + make_interceptor(auth_service.clone()), + )) + .add_service(tonic::codegen::InterceptedService::new( + DeploymentServiceServer::new(deployment_service), + make_interceptor(auth_service.clone()), + )) .serve(config.server.addr); // HTTP REST API server @@ -350,59 +372,15 @@ async fn main() -> Result<(), Box> { Ok(()) } -// Deployment Service Implementation (placeholder - not part of MVP) -#[derive(Debug, Default)] -struct DeploymentServiceImpl; - -#[tonic::async_trait] -impl DeploymentService for DeploymentServiceImpl { - async fn create_deployment( - &self, - _request: Request, - ) -> Result, Status> { - Err(Status::unimplemented("create_deployment not yet implemented")) - } - - async fn get_deployment( - &self, - _request: Request, - ) -> Result, Status> { - Err(Status::unimplemented("get_deployment not yet implemented")) - } - - async fn list_deployments( - &self, - _request: Request, - ) -> Result, Status> { - Err(Status::unimplemented("list_deployments not yet implemented")) - } - - async fn update_deployment( - &self, - _request: Request, - ) -> Result, Status> { - Err(Status::unimplemented("update_deployment not yet implemented")) - } - - async fn delete_deployment( - &self, - _request: Request, - ) -> Result, Status> { - Err(Status::unimplemented("delete_deployment not yet implemented")) - } -} - fn init_logging(level: &str) { tracing_subscriber::fmt() - .with_env_filter(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(level))) + .with_env_filter( + EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(level)), + ) .init(); } -async fn register_chainfire_membership( - endpoint: &str, - service: &str, - addr: String, -) -> Result<()> { +async fn register_chainfire_membership(endpoint: &str, service: &str, addr: String) -> Result<()> { let node_id = std::env::var("HOSTNAME").unwrap_or_else(|_| format!("{}-{}", service, std::process::id())); let ts = SystemTime::now() diff --git a/k8shost/crates/k8shost-server/src/rest.rs b/k8shost/crates/k8shost-server/src/rest.rs index 895f063..df997f7 100644 --- a/k8shost/crates/k8shost-server/src/rest.rs +++ b/k8shost/crates/k8shost-server/src/rest.rs @@ -11,26 +11,24 @@ use axum::{ extract::{Path, Query, State}, - http::StatusCode, http::HeaderMap, - routing::{delete, get, post}, + http::StatusCode, + routing::{delete, get}, Json, Router, }; use iam_service_auth::{resolve_tenant_ids_from_context, AuthService, TenantContext}; use k8shost_proto::{ - pod_service_server::PodService, - service_service_server::ServiceService, - node_service_server::NodeService, - CreatePodRequest, DeletePodRequest, ListPodsRequest, - CreateServiceRequest, DeleteServiceRequest, ListServicesRequest, - ListNodesRequest, Pod as ProtoPod, Service as ProtoService, Node as ProtoNode, - ObjectMeta, PodSpec, Container, ServiceSpec, ServicePort, + node_service_server::NodeService, pod_service_server::PodService, + service_service_server::ServiceService, Container, CreatePodRequest, CreateServiceRequest, + DeletePodRequest, DeleteServiceRequest, ListNodesRequest, ListPodsRequest, ListServicesRequest, + Node as ProtoNode, ObjectMeta, Pod as ProtoPod, PodSpec, Service as ProtoService, ServicePort, + ServiceSpec, }; use serde::{Deserialize, Serialize}; use std::sync::Arc; use tonic::{Code, Request}; -use crate::services::{pod::PodServiceImpl, service::ServiceServiceImpl, node::NodeServiceImpl}; +use crate::services::{node::NodeServiceImpl, pod::PodServiceImpl, service::ServiceServiceImpl}; /// REST API state #[derive(Clone)] @@ -125,15 +123,28 @@ pub struct PodResponse { impl From for PodResponse { fn from(pod: ProtoPod) -> Self { - let phase = pod.status.as_ref() + let phase = pod + .status + .as_ref() .and_then(|s| s.phase.clone()) .unwrap_or_else(|| "Unknown".to_string()); let ip = pod.status.as_ref().and_then(|s| s.pod_ip.clone()); - let name = pod.metadata.as_ref().map(|m| m.name.clone()).unwrap_or_default(); - let namespace = pod.metadata.as_ref() + let name = pod + .metadata + .as_ref() + .map(|m| m.name.clone()) + .unwrap_or_default(); + let namespace = pod + .metadata + .as_ref() .and_then(|m| m.namespace.clone()) .unwrap_or_else(|| "default".to_string()); - Self { name, namespace, phase, ip } + Self { + name, + namespace, + phase, + ip, + } } } @@ -156,24 +167,49 @@ pub struct ServicePortResponse { impl From for ServiceResponse { fn from(svc: ProtoService) -> Self { - let ports = svc.spec.as_ref().map(|s| { - s.ports.iter().map(|p| ServicePortResponse { - port: p.port, - target_port: p.target_port.unwrap_or(p.port), - protocol: p.protocol.clone().unwrap_or_else(|| "TCP".to_string()), - }).collect() - }).unwrap_or_default(); + let ports = svc + .spec + .as_ref() + .map(|s| { + s.ports + .iter() + .map(|p| ServicePortResponse { + port: p.port, + target_port: p.target_port.unwrap_or(p.port), + protocol: p.protocol.clone().unwrap_or_else(|| "TCP".to_string()), + }) + .collect() + }) + .unwrap_or_default(); - let name = svc.metadata.as_ref().map(|m| m.name.clone()).unwrap_or_default(); - let namespace = svc.metadata.as_ref() + let name = svc + .metadata + .as_ref() + .map(|m| m.name.clone()) + .unwrap_or_default(); + let namespace = svc + .metadata + .as_ref() .and_then(|m| m.namespace.clone()) .unwrap_or_else(|| "default".to_string()); - let service_type = svc.spec.as_ref() + let service_type = svc + .spec + .as_ref() .and_then(|s| s.r#type.clone()) .unwrap_or_else(|| "ClusterIP".to_string()); - let cluster_ip = svc.spec.as_ref().and_then(|s| s.cluster_ip.as_ref()).cloned(); + let cluster_ip = svc + .spec + .as_ref() + .and_then(|s| s.cluster_ip.as_ref()) + .cloned(); - Self { name, namespace, service_type, cluster_ip, ports } + Self { + name, + namespace, + service_type, + cluster_ip, + ports, + } } } @@ -188,13 +224,34 @@ pub struct NodeResponse { impl From for NodeResponse { fn from(node: ProtoNode) -> Self { - let ready = node.status.as_ref() - .map(|s| s.conditions.iter().any(|c| c.r#type == "Ready" && c.status == "True")) + let ready = node + .status + .as_ref() + .map(|s| { + s.conditions + .iter() + .any(|c| c.r#type == "Ready" && c.status == "True") + }) .unwrap_or(false); - let name = node.metadata.as_ref().map(|m| m.name.clone()).unwrap_or_default(); - let cpu_capacity = node.status.as_ref().and_then(|s| s.capacity.get("cpu").cloned()); - let memory_capacity = node.status.as_ref().and_then(|s| s.capacity.get("memory").cloned()); - Self { name, ready, cpu_capacity, memory_capacity } + let name = node + .metadata + .as_ref() + .map(|m| m.name.clone()) + .unwrap_or_default(); + let cpu_capacity = node + .status + .as_ref() + .and_then(|s| s.capacity.get("cpu").cloned()); + let memory_capacity = node + .status + .as_ref() + .and_then(|s| s.capacity.get("memory").cloned()); + Self { + name, + ready, + cpu_capacity, + memory_capacity, + } } } @@ -222,7 +279,10 @@ pub fn build_router(state: RestApiState) -> Router { .route("/api/v1/pods", get(list_pods).post(create_pod)) .route("/api/v1/pods/{namespace}/{name}", delete(delete_pod)) .route("/api/v1/services", get(list_services).post(create_service)) - .route("/api/v1/services/{namespace}/{name}", delete(delete_service)) + .route( + "/api/v1/services/{namespace}/{name}", + delete(delete_service), + ) .route("/api/v1/nodes", get(list_nodes)) .route("/health", get(health_check)) .with_state(state) @@ -232,7 +292,9 @@ pub fn build_router(state: RestApiState) -> Router { async fn health_check() -> (StatusCode, Json>) { ( StatusCode::OK, - Json(SuccessResponse::new(serde_json::json!({ "status": "healthy" }))), + Json(SuccessResponse::new( + serde_json::json!({ "status": "healthy" }), + )), ) } @@ -249,11 +311,20 @@ async fn list_pods( }); req.extensions_mut().insert(tenant); - let response = state.pod_service.list_pods(req) - .await - .map_err(|e| error_response(StatusCode::INTERNAL_SERVER_ERROR, "LIST_FAILED", &e.message()))?; + let response = state.pod_service.list_pods(req).await.map_err(|e| { + error_response( + StatusCode::INTERNAL_SERVER_ERROR, + "LIST_FAILED", + &e.message(), + ) + })?; - let pods: Vec = response.into_inner().items.into_iter().map(PodResponse::from).collect(); + let pods: Vec = response + .into_inner() + .items + .into_iter() + .map(PodResponse::from) + .collect(); Ok(Json(SuccessResponse::new(PodsResponse { pods }))) } @@ -297,12 +368,21 @@ async fn create_pod( }); grpc_req.extensions_mut().insert(tenant); - let response = state.pod_service.create_pod(grpc_req) - .await - .map_err(|e| error_response(StatusCode::INTERNAL_SERVER_ERROR, "CREATE_FAILED", &e.message()))?; + let response = state.pod_service.create_pod(grpc_req).await.map_err(|e| { + error_response( + StatusCode::INTERNAL_SERVER_ERROR, + "CREATE_FAILED", + &e.message(), + ) + })?; - let pod = response.into_inner().pod - .ok_or_else(|| error_response(StatusCode::INTERNAL_SERVER_ERROR, "CREATE_FAILED", "No pod returned"))?; + let pod = response.into_inner().pod.ok_or_else(|| { + error_response( + StatusCode::INTERNAL_SERVER_ERROR, + "CREATE_FAILED", + "No pod returned", + ) + })?; Ok(( StatusCode::CREATED, @@ -315,7 +395,8 @@ async fn delete_pod( State(state): State, Path((namespace, name)): Path<(String, String)>, headers: HeaderMap, -) -> Result<(StatusCode, Json>), (StatusCode, Json)> { +) -> Result<(StatusCode, Json>), (StatusCode, Json)> +{ let tenant = resolve_rest_tenant(&state, &headers).await?; let mut req = Request::new(DeletePodRequest { name: name.clone(), @@ -323,13 +404,19 @@ async fn delete_pod( }); req.extensions_mut().insert(tenant); - state.pod_service.delete_pod(req) - .await - .map_err(|e| error_response(StatusCode::INTERNAL_SERVER_ERROR, "DELETE_FAILED", &e.message()))?; + state.pod_service.delete_pod(req).await.map_err(|e| { + error_response( + StatusCode::INTERNAL_SERVER_ERROR, + "DELETE_FAILED", + &e.message(), + ) + })?; Ok(( StatusCode::OK, - Json(SuccessResponse::new(serde_json::json!({ "name": name, "namespace": namespace, "deleted": true }))), + Json(SuccessResponse::new( + serde_json::json!({ "name": name, "namespace": namespace, "deleted": true }), + )), )) } @@ -345,11 +432,24 @@ async fn list_services( }); req.extensions_mut().insert(tenant); - let response = state.service_service.list_services(req) + let response = state + .service_service + .list_services(req) .await - .map_err(|e| error_response(StatusCode::INTERNAL_SERVER_ERROR, "LIST_FAILED", &e.message()))?; + .map_err(|e| { + error_response( + StatusCode::INTERNAL_SERVER_ERROR, + "LIST_FAILED", + &e.message(), + ) + })?; - let services: Vec = response.into_inner().items.into_iter().map(ServiceResponse::from).collect(); + let services: Vec = response + .into_inner() + .items + .into_iter() + .map(ServiceResponse::from) + .collect(); Ok(Json(SuccessResponse::new(ServicesResponse { services }))) } @@ -359,7 +459,8 @@ async fn create_service( State(state): State, headers: HeaderMap, Json(req): Json, -) -> Result<(StatusCode, Json>), (StatusCode, Json)> { +) -> Result<(StatusCode, Json>), (StatusCode, Json)> +{ let tenant = resolve_rest_tenant(&state, &headers).await?; let namespace = req.namespace.unwrap_or_else(|| "default".to_string()); let service_type = req.service_type.unwrap_or_else(|| "ClusterIP".to_string()); @@ -393,12 +494,25 @@ async fn create_service( }); grpc_req.extensions_mut().insert(tenant); - let response = state.service_service.create_service(grpc_req) + let response = state + .service_service + .create_service(grpc_req) .await - .map_err(|e| error_response(StatusCode::INTERNAL_SERVER_ERROR, "CREATE_FAILED", &e.message()))?; + .map_err(|e| { + error_response( + StatusCode::INTERNAL_SERVER_ERROR, + "CREATE_FAILED", + &e.message(), + ) + })?; - let service = response.into_inner().service - .ok_or_else(|| error_response(StatusCode::INTERNAL_SERVER_ERROR, "CREATE_FAILED", "No service returned"))?; + let service = response.into_inner().service.ok_or_else(|| { + error_response( + StatusCode::INTERNAL_SERVER_ERROR, + "CREATE_FAILED", + "No service returned", + ) + })?; Ok(( StatusCode::CREATED, @@ -411,7 +525,8 @@ async fn delete_service( State(state): State, Path((namespace, name)): Path<(String, String)>, headers: HeaderMap, -) -> Result<(StatusCode, Json>), (StatusCode, Json)> { +) -> Result<(StatusCode, Json>), (StatusCode, Json)> +{ let tenant = resolve_rest_tenant(&state, &headers).await?; let mut req = Request::new(DeleteServiceRequest { name: name.clone(), @@ -419,13 +534,23 @@ async fn delete_service( }); req.extensions_mut().insert(tenant); - state.service_service.delete_service(req) + state + .service_service + .delete_service(req) .await - .map_err(|e| error_response(StatusCode::INTERNAL_SERVER_ERROR, "DELETE_FAILED", &e.message()))?; + .map_err(|e| { + error_response( + StatusCode::INTERNAL_SERVER_ERROR, + "DELETE_FAILED", + &e.message(), + ) + })?; Ok(( StatusCode::OK, - Json(SuccessResponse::new(serde_json::json!({ "name": name, "namespace": namespace, "deleted": true }))), + Json(SuccessResponse::new( + serde_json::json!({ "name": name, "namespace": namespace, "deleted": true }), + )), )) } @@ -438,11 +563,20 @@ async fn list_nodes( let mut req = Request::new(ListNodesRequest {}); req.extensions_mut().insert(tenant); - let response = state.node_service.list_nodes(req) - .await - .map_err(|e| error_response(StatusCode::INTERNAL_SERVER_ERROR, "LIST_FAILED", &e.message()))?; + let response = state.node_service.list_nodes(req).await.map_err(|e| { + error_response( + StatusCode::INTERNAL_SERVER_ERROR, + "LIST_FAILED", + &e.message(), + ) + })?; - let nodes: Vec = response.into_inner().items.into_iter().map(NodeResponse::from).collect(); + let nodes: Vec = response + .into_inner() + .items + .into_iter() + .map(NodeResponse::from) + .collect(); Ok(Json(SuccessResponse::new(NodesResponse { nodes }))) } diff --git a/k8shost/crates/k8shost-server/src/scheduler.rs b/k8shost/crates/k8shost-server/src/scheduler.rs index ca411c3..b4c6594 100644 --- a/k8shost/crates/k8shost-server/src/scheduler.rs +++ b/k8shost/crates/k8shost-server/src/scheduler.rs @@ -38,7 +38,10 @@ impl Scheduler { let credit_service = match std::env::var("CREDITSERVICE_ENDPOINT") { Ok(endpoint) => match CreditServiceClient::connect(&endpoint).await { Ok(client) => { - info!("Scheduler: CreditService quota enforcement enabled: {}", endpoint); + info!( + "Scheduler: CreditService quota enforcement enabled: {}", + endpoint + ); Some(Arc::new(RwLock::new(client))) } Err(e) => { @@ -64,7 +67,10 @@ impl Scheduler { /// Start the scheduler loop pub async fn run(self: Arc) { - info!("Scheduler started (spread algorithm, {}s interval)", self.interval.as_secs()); + info!( + "Scheduler started (spread algorithm, {}s interval)", + self.interval.as_secs() + ); loop { if let Err(e) = self.schedule_pending_pods().await { @@ -103,14 +109,10 @@ impl Scheduler { async fn get_active_tenants(&self) -> anyhow::Result> { // Query all pods to find unique (org_id, project_id) combinations // This is a pragmatic approach that doesn't require IAM changes - let all_pods = self - .storage - .list_all_pods() - .await - .unwrap_or_else(|e| { - warn!("Failed to query all pods for tenant discovery: {}", e); - vec![] - }); + let all_pods = self.storage.list_all_pods().await.unwrap_or_else(|e| { + warn!("Failed to query all pods for tenant discovery: {}", e); + vec![] + }); let mut tenants: HashSet<(String, String)> = HashSet::new(); @@ -133,7 +135,10 @@ impl Scheduler { /// Schedule pending pods for a specific tenant async fn schedule_tenant_pods(&self, org_id: &str, project_id: &str) -> anyhow::Result<()> { // Get all pods in all namespaces for this tenant - let all_pods = self.storage.list_pods(org_id, project_id, None, None).await?; + let all_pods = self + .storage + .list_pods(org_id, project_id, None, None) + .await?; // Filter to pending pods that need scheduling let pending_pods: Vec = all_pods @@ -157,15 +162,23 @@ impl Scheduler { return Ok(()); } - info!("Scheduling {} pending pod(s) for tenant {}/{}", - pending_pods.len(), org_id, project_id); + info!( + "Scheduling {} pending pod(s) for tenant {}/{}", + pending_pods.len(), + org_id, + project_id + ); // Get all nodes for this tenant let nodes = self.storage.list_nodes(org_id, project_id).await?; if nodes.is_empty() { - warn!("No nodes available for tenant {}/{}. {} pod(s) remain pending.", - org_id, project_id, pending_pods.len()); + warn!( + "No nodes available for tenant {}/{}. {} pod(s) remain pending.", + org_id, + project_id, + pending_pods.len() + ); return Ok(()); } @@ -176,15 +189,21 @@ impl Scheduler { .collect(); if ready_nodes.is_empty() { - warn!("No ready nodes available for tenant {}/{}. {} pod(s) remain pending.", - org_id, project_id, pending_pods.len()); + warn!( + "No ready nodes available for tenant {}/{}. {} pod(s) remain pending.", + org_id, + project_id, + pending_pods.len() + ); return Ok(()); } info!("Found {} ready node(s) for scheduling", ready_nodes.len()); // Get current pod count per node for spread algorithm - let pod_counts = self.count_pods_per_node(org_id, project_id, &ready_nodes).await?; + let pod_counts = self + .count_pods_per_node(org_id, project_id, &ready_nodes) + .await?; // Schedule each pending pod for pod in pending_pods { @@ -233,9 +252,10 @@ impl Scheduler { node.status .as_ref() .map(|status| { - status.conditions.iter().any(|cond| { - cond.r#type == "Ready" && cond.status == "True" - }) + status + .conditions + .iter() + .any(|cond| cond.r#type == "Ready" && cond.status == "True") }) .unwrap_or(false) } @@ -247,13 +267,14 @@ impl Scheduler { project_id: &str, nodes: &[Node], ) -> anyhow::Result> { - let mut counts: HashMap = nodes - .iter() - .map(|n| (n.metadata.name.clone(), 0)) - .collect(); + let mut counts: HashMap = + nodes.iter().map(|n| (n.metadata.name.clone(), 0)).collect(); // Get all assigned pods - let all_pods = self.storage.list_pods(org_id, project_id, None, None).await?; + let all_pods = self + .storage + .list_pods(org_id, project_id, None, None) + .await?; // Count pods per node for pod in all_pods { @@ -324,12 +345,7 @@ impl Scheduler { // Check if tenant has sufficient quota use creditservice_client::ResourceType; match client - .check_quota( - project_id, - ResourceType::K8sNode, - 1, - estimated_cost as i64, - ) + .check_quota(project_id, ResourceType::K8sNode, 1, estimated_cost as i64) .await { Ok(response) if !response.allowed => { @@ -389,10 +405,7 @@ impl Scheduler { /// Parse memory string to GB (e.g., "512Mi" -> 0.5, "2Gi" -> 2.0) fn parse_memory_to_gb(memory: &str) -> Option { if memory.ends_with("Gi") { - memory - .trim_end_matches("Gi") - .parse::() - .ok() + memory.trim_end_matches("Gi").parse::().ok() } else if memory.ends_with("Mi") { memory .trim_end_matches("Mi") @@ -407,7 +420,10 @@ impl Scheduler { .map(|ki| ki / (1024.0 * 1024.0)) } else { // Assume bytes - memory.parse::().ok().map(|bytes| bytes / (1024.0 * 1024.0 * 1024.0)) + memory + .parse::() + .ok() + .map(|bytes| bytes / (1024.0 * 1024.0 * 1024.0)) } } } @@ -419,7 +435,11 @@ mod tests { #[tokio::test] async fn test_is_node_ready() { - let storage = Arc::new(Storage::new("memory://test".to_string()).await.expect("Failed to create storage")); + let storage = Arc::new( + Storage::new("memory://test".to_string()) + .await + .expect("Failed to create storage"), + ); let scheduler = Scheduler::new(storage); // Node with Ready=True condition @@ -468,7 +488,11 @@ mod tests { #[tokio::test] async fn test_select_node_spread() { - let storage = Arc::new(Storage::new("memory://test".to_string()).await.expect("Failed to create storage")); + let storage = Arc::new( + Storage::new("memory://test".to_string()) + .await + .expect("Failed to create storage"), + ); let scheduler = Scheduler::new(storage); let node1 = Node { diff --git a/k8shost/crates/k8shost-server/src/services/deployment.rs b/k8shost/crates/k8shost-server/src/services/deployment.rs new file mode 100644 index 0000000..667f8df --- /dev/null +++ b/k8shost/crates/k8shost-server/src/services/deployment.rs @@ -0,0 +1,696 @@ +//! Deployment service implementation. +//! +//! Provides CRUD for Deployment resources and treats them as first-class +//! tenant-scoped objects persisted in FlareDB. + +use crate::auth::{ + get_tenant_context, resolve_tenant_ids_from_context, resource_for_tenant, AuthService, +}; +use crate::storage::Storage; +use chrono::Utc; +use k8shost_proto::{ + deployment_service_server::DeploymentService, CreateDeploymentRequest, + CreateDeploymentResponse, DeleteDeploymentRequest, DeleteDeploymentResponse, + GetDeploymentRequest, GetDeploymentResponse, ListDeploymentsRequest, ListDeploymentsResponse, + UpdateDeploymentRequest, UpdateDeploymentResponse, +}; +use sha2::{Digest, Sha256}; +use std::sync::Arc; +use tonic::{Request, Response, Status}; +use uuid::Uuid; + +const ACTION_DEPLOYMENT_CREATE: &str = "k8s:deployments:create"; +const ACTION_DEPLOYMENT_READ: &str = "k8s:deployments:read"; +const ACTION_DEPLOYMENT_LIST: &str = "k8s:deployments:list"; +const ACTION_DEPLOYMENT_UPDATE: &str = "k8s:deployments:update"; +const ACTION_DEPLOYMENT_DELETE: &str = "k8s:deployments:delete"; + +pub(crate) const DEPLOYMENT_NAME_ANNOTATION: &str = "k8shost.photoncloud.io/deployment-name"; +pub(crate) const DEPLOYMENT_UID_ANNOTATION: &str = "k8shost.photoncloud.io/deployment-uid"; +pub(crate) const TEMPLATE_HASH_ANNOTATION: &str = "k8shost.photoncloud.io/template-hash"; + +#[derive(Clone)] +pub struct DeploymentServiceImpl { + storage: Arc, + auth: Arc, +} + +impl DeploymentServiceImpl { + pub fn new(storage: Arc, auth: Arc) -> Self { + Self { storage, auth } + } + + pub fn to_proto_deployment( + deployment: &k8shost_types::Deployment, + ) -> k8shost_proto::Deployment { + k8shost_proto::Deployment { + metadata: Some(to_proto_meta(&deployment.metadata)), + spec: Some(k8shost_proto::DeploymentSpec { + replicas: deployment.spec.replicas, + selector: Some(k8shost_proto::LabelSelector { + match_labels: deployment.spec.selector.match_labels.clone(), + }), + template: Some(k8shost_proto::PodTemplateSpec { + metadata: Some(to_proto_meta(&deployment.spec.template.metadata)), + spec: Some(to_proto_pod_spec(&deployment.spec.template.spec)), + }), + }), + status: deployment + .status + .as_ref() + .map(|status| k8shost_proto::DeploymentStatus { + replicas: status.replicas, + ready_replicas: status.ready_replicas, + available_replicas: status.available_replicas, + }), + } + } + + pub fn from_proto_deployment( + deployment: &k8shost_proto::Deployment, + ) -> Result { + let metadata = deployment + .metadata + .as_ref() + .ok_or_else(|| Status::invalid_argument("metadata is required"))?; + let spec = deployment + .spec + .as_ref() + .ok_or_else(|| Status::invalid_argument("spec is required"))?; + let selector = spec + .selector + .as_ref() + .ok_or_else(|| Status::invalid_argument("selector is required"))?; + let template = spec + .template + .as_ref() + .ok_or_else(|| Status::invalid_argument("template is required"))?; + let template_meta = template + .metadata + .as_ref() + .ok_or_else(|| Status::invalid_argument("template.metadata is required"))?; + let template_spec = template + .spec + .as_ref() + .ok_or_else(|| Status::invalid_argument("template.spec is required"))?; + + Ok(k8shost_types::Deployment { + metadata: from_proto_meta(metadata)?, + spec: k8shost_types::DeploymentSpec { + replicas: spec.replicas, + selector: k8shost_types::LabelSelector { + match_labels: selector.match_labels.clone(), + }, + template: k8shost_types::PodTemplateSpec { + metadata: from_proto_meta(template_meta)?, + spec: from_proto_pod_spec(template_spec), + }, + }, + status: deployment + .status + .as_ref() + .map(|status| k8shost_types::DeploymentStatus { + replicas: status.replicas, + ready_replicas: status.ready_replicas, + available_replicas: status.available_replicas, + }), + }) + } + + pub(crate) fn validate_spec(deployment: &k8shost_types::Deployment) -> Result<(), Status> { + if deployment.metadata.namespace.is_none() { + return Err(Status::invalid_argument("namespace is required")); + } + if deployment.spec.replicas.unwrap_or(1) < 0 { + return Err(Status::invalid_argument("replicas must be >= 0")); + } + if deployment.spec.selector.match_labels.is_empty() { + return Err(Status::invalid_argument( + "selector.match_labels must not be empty", + )); + } + if deployment.spec.template.spec.containers.is_empty() { + return Err(Status::invalid_argument( + "template.spec.containers must not be empty", + )); + } + let template_labels = &deployment.spec.template.metadata.labels; + let selector_matches_template = deployment + .spec + .selector + .match_labels + .iter() + .all(|(key, value)| template_labels.get(key) == Some(value)); + if !selector_matches_template { + return Err(Status::invalid_argument( + "selector.match_labels must be present in template.metadata.labels", + )); + } + Ok(()) + } + + pub(crate) fn pod_is_owned_by_deployment( + deployment: &k8shost_types::Deployment, + pod: &k8shost_types::Pod, + ) -> bool { + let annotations = &pod.metadata.annotations; + if let Some(uid) = deployment.metadata.uid.as_deref() { + if annotations + .get(DEPLOYMENT_UID_ANNOTATION) + .map(String::as_str) + == Some(uid) + { + return true; + } + } + annotations + .get(DEPLOYMENT_NAME_ANNOTATION) + .map(String::as_str) + == Some(deployment.metadata.name.as_str()) + } +} + +pub(crate) fn deployment_template_hash( + deployment: &k8shost_types::Deployment, +) -> Result { + let payload = serde_json::to_vec(&deployment.spec.template) + .map_err(|e| Status::internal(format!("failed to hash deployment template: {}", e)))?; + let mut hasher = Sha256::new(); + hasher.update(payload); + Ok(format!("{:x}", hasher.finalize())) +} + +fn default_status() -> k8shost_types::DeploymentStatus { + k8shost_types::DeploymentStatus { + replicas: Some(0), + ready_replicas: Some(0), + available_replicas: Some(0), + } +} + +fn next_resource_version(current: Option<&str>) -> String { + let current = current + .and_then(|version| version.parse::().ok()) + .unwrap_or(0); + (current + 1).to_string() +} + +fn to_proto_meta(meta: &k8shost_types::ObjectMeta) -> k8shost_proto::ObjectMeta { + k8shost_proto::ObjectMeta { + name: meta.name.clone(), + namespace: meta.namespace.clone(), + uid: meta.uid.clone(), + resource_version: meta.resource_version.clone(), + creation_timestamp: meta.creation_timestamp.map(|ts| ts.to_rfc3339()), + labels: meta.labels.clone(), + annotations: meta.annotations.clone(), + org_id: meta.org_id.clone(), + project_id: meta.project_id.clone(), + } +} + +fn from_proto_meta(meta: &k8shost_proto::ObjectMeta) -> Result { + Ok(k8shost_types::ObjectMeta { + name: meta.name.clone(), + namespace: meta.namespace.clone(), + uid: meta.uid.clone(), + resource_version: meta.resource_version.clone(), + creation_timestamp: meta + .creation_timestamp + .as_ref() + .and_then(|ts| chrono::DateTime::parse_from_rfc3339(ts).ok()) + .map(|ts| ts.with_timezone(&Utc)), + labels: meta.labels.clone(), + annotations: meta.annotations.clone(), + org_id: meta.org_id.clone(), + project_id: meta.project_id.clone(), + }) +} + +fn to_proto_pod_spec(spec: &k8shost_types::PodSpec) -> k8shost_proto::PodSpec { + k8shost_proto::PodSpec { + containers: spec + .containers + .iter() + .map(|container| k8shost_proto::Container { + name: container.name.clone(), + image: container.image.clone(), + command: container.command.clone(), + args: container.args.clone(), + ports: container + .ports + .iter() + .map(|port| k8shost_proto::ContainerPort { + name: port.name.clone(), + container_port: port.container_port, + protocol: port.protocol.clone(), + }) + .collect(), + env: container + .env + .iter() + .map(|env| k8shost_proto::EnvVar { + name: env.name.clone(), + value: env.value.clone(), + }) + .collect(), + }) + .collect(), + restart_policy: spec.restart_policy.clone(), + node_name: spec.node_name.clone(), + } +} + +fn from_proto_pod_spec(spec: &k8shost_proto::PodSpec) -> k8shost_types::PodSpec { + k8shost_types::PodSpec { + containers: spec + .containers + .iter() + .map(|container| k8shost_types::Container { + name: container.name.clone(), + image: container.image.clone(), + command: container.command.clone(), + args: container.args.clone(), + ports: container + .ports + .iter() + .map(|port| k8shost_types::ContainerPort { + name: port.name.clone(), + container_port: port.container_port, + protocol: port.protocol.clone(), + }) + .collect(), + env: container + .env + .iter() + .map(|env| k8shost_types::EnvVar { + name: env.name.clone(), + value: env.value.clone(), + }) + .collect(), + resources: None, + }) + .collect(), + restart_policy: spec.restart_policy.clone(), + node_name: spec.node_name.clone(), + } +} + +#[tonic::async_trait] +impl DeploymentService for DeploymentServiceImpl { + async fn create_deployment( + &self, + request: Request, + ) -> Result, Status> { + let tenant = get_tenant_context(&request)?; + let req = request.into_inner(); + let proto_deployment = req + .deployment + .ok_or_else(|| Status::invalid_argument("deployment is required"))?; + let mut deployment = Self::from_proto_deployment(&proto_deployment)?; + + let (org_id, project_id) = resolve_tenant_ids_from_context( + &tenant, + deployment.metadata.org_id.as_deref().unwrap_or(""), + deployment.metadata.project_id.as_deref().unwrap_or(""), + )?; + deployment.metadata.org_id = Some(org_id.clone()); + deployment.metadata.project_id = Some(project_id.clone()); + Self::validate_spec(&deployment)?; + + let deployment_key = format!( + "{}/{}", + deployment + .metadata + .namespace + .as_deref() + .unwrap_or("default"), + deployment.metadata.name + ); + self.auth + .authorize( + &tenant, + ACTION_DEPLOYMENT_CREATE, + &resource_for_tenant("deployment", deployment_key, &org_id, &project_id), + ) + .await?; + + let namespace = deployment + .metadata + .namespace + .as_deref() + .ok_or_else(|| Status::invalid_argument("namespace is required"))?; + if self + .storage + .get_deployment(&org_id, &project_id, namespace, &deployment.metadata.name) + .await? + .is_some() + { + return Err(Status::already_exists(format!( + "Deployment {} already exists", + deployment.metadata.name + ))); + } + + if deployment.metadata.uid.is_none() { + deployment.metadata.uid = Some(Uuid::new_v4().to_string()); + } + if deployment.metadata.creation_timestamp.is_none() { + deployment.metadata.creation_timestamp = Some(Utc::now()); + } + deployment.metadata.resource_version = Some("1".to_string()); + deployment.status = Some(default_status()); + + self.storage.put_deployment(&deployment).await?; + + Ok(Response::new(CreateDeploymentResponse { + deployment: Some(Self::to_proto_deployment(&deployment)), + })) + } + + async fn get_deployment( + &self, + request: Request, + ) -> Result, Status> { + let tenant = get_tenant_context(&request)?; + let req = request.into_inner(); + let deployment_key = format!("{}/{}", req.namespace, req.name); + self.auth + .authorize( + &tenant, + ACTION_DEPLOYMENT_READ, + &resource_for_tenant( + "deployment", + deployment_key, + &tenant.org_id, + &tenant.project_id, + ), + ) + .await?; + + let deployment = self + .storage + .get_deployment( + &tenant.org_id, + &tenant.project_id, + &req.namespace, + &req.name, + ) + .await?; + + match deployment { + Some(deployment) => Ok(Response::new(GetDeploymentResponse { + deployment: Some(Self::to_proto_deployment(&deployment)), + })), + None => Err(Status::not_found(format!( + "Deployment {} not found", + req.name + ))), + } + } + + async fn list_deployments( + &self, + request: Request, + ) -> Result, Status> { + let tenant = get_tenant_context(&request)?; + self.auth + .authorize( + &tenant, + ACTION_DEPLOYMENT_LIST, + &resource_for_tenant("deployment", "*", &tenant.org_id, &tenant.project_id), + ) + .await?; + let req = request.into_inner(); + + let items = self + .storage + .list_deployments(&tenant.org_id, &tenant.project_id, req.namespace.as_deref()) + .await? + .iter() + .map(Self::to_proto_deployment) + .collect(); + + Ok(Response::new(ListDeploymentsResponse { items })) + } + + async fn update_deployment( + &self, + request: Request, + ) -> Result, Status> { + let tenant = get_tenant_context(&request)?; + let req = request.into_inner(); + let proto_deployment = req + .deployment + .ok_or_else(|| Status::invalid_argument("deployment is required"))?; + let mut deployment = Self::from_proto_deployment(&proto_deployment)?; + + let (org_id, project_id) = resolve_tenant_ids_from_context( + &tenant, + deployment.metadata.org_id.as_deref().unwrap_or(""), + deployment.metadata.project_id.as_deref().unwrap_or(""), + )?; + deployment.metadata.org_id = Some(org_id.clone()); + deployment.metadata.project_id = Some(project_id.clone()); + Self::validate_spec(&deployment)?; + + let namespace = deployment + .metadata + .namespace + .as_deref() + .ok_or_else(|| Status::invalid_argument("namespace is required"))?; + let deployment_key = format!("{}/{}", namespace, deployment.metadata.name); + self.auth + .authorize( + &tenant, + ACTION_DEPLOYMENT_UPDATE, + &resource_for_tenant("deployment", deployment_key, &org_id, &project_id), + ) + .await?; + + let current = self + .storage + .get_deployment(&org_id, &project_id, namespace, &deployment.metadata.name) + .await? + .ok_or_else(|| { + Status::not_found(format!("Deployment {} not found", deployment.metadata.name)) + })?; + + deployment.metadata.uid = current.metadata.uid.clone(); + deployment.metadata.creation_timestamp = current.metadata.creation_timestamp; + deployment.metadata.resource_version = Some(next_resource_version( + current.metadata.resource_version.as_deref(), + )); + deployment.status = current.status.clone(); + + self.storage.put_deployment(&deployment).await?; + + Ok(Response::new(UpdateDeploymentResponse { + deployment: Some(Self::to_proto_deployment(&deployment)), + })) + } + + async fn delete_deployment( + &self, + request: Request, + ) -> Result, Status> { + let tenant = get_tenant_context(&request)?; + let req = request.into_inner(); + let deployment_key = format!("{}/{}", req.namespace, req.name); + self.auth + .authorize( + &tenant, + ACTION_DEPLOYMENT_DELETE, + &resource_for_tenant( + "deployment", + deployment_key, + &tenant.org_id, + &tenant.project_id, + ), + ) + .await?; + + let existing = self + .storage + .get_deployment( + &tenant.org_id, + &tenant.project_id, + &req.namespace, + &req.name, + ) + .await?; + let Some(deployment) = existing else { + return Ok(Response::new(DeleteDeploymentResponse { success: false })); + }; + + let success = self + .storage + .delete_deployment( + &tenant.org_id, + &tenant.project_id, + &req.namespace, + &req.name, + ) + .await?; + + let pods = self + .storage + .list_pods( + &tenant.org_id, + &tenant.project_id, + Some(&req.namespace), + None, + ) + .await?; + for pod in pods + .into_iter() + .filter(|pod| Self::pod_is_owned_by_deployment(&deployment, pod)) + { + let namespace = pod.metadata.namespace.as_deref().unwrap_or("default"); + let _ = self + .storage + .delete_pod( + &tenant.org_id, + &tenant.project_id, + namespace, + &pod.metadata.name, + ) + .await?; + } + + Ok(Response::new(DeleteDeploymentResponse { success })) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::collections::HashMap; + + fn test_deployment() -> k8shost_types::Deployment { + k8shost_types::Deployment { + metadata: k8shost_types::ObjectMeta { + name: "web".to_string(), + namespace: Some("default".to_string()), + uid: Some("deploy-uid".to_string()), + resource_version: Some("4".to_string()), + creation_timestamp: Some(Utc::now()), + labels: HashMap::new(), + annotations: HashMap::new(), + org_id: Some("test-org".to_string()), + project_id: Some("test-project".to_string()), + }, + spec: k8shost_types::DeploymentSpec { + replicas: Some(2), + selector: k8shost_types::LabelSelector { + match_labels: HashMap::from([("app".to_string(), "web".to_string())]), + }, + template: k8shost_types::PodTemplateSpec { + metadata: k8shost_types::ObjectMeta { + name: "".to_string(), + namespace: Some("default".to_string()), + uid: None, + resource_version: None, + creation_timestamp: None, + labels: HashMap::from([("app".to_string(), "web".to_string())]), + annotations: HashMap::new(), + org_id: None, + project_id: None, + }, + spec: k8shost_types::PodSpec { + containers: vec![k8shost_types::Container { + name: "web".to_string(), + image: "nginx:latest".to_string(), + command: Vec::new(), + args: Vec::new(), + ports: Vec::new(), + env: Vec::new(), + resources: None, + }], + restart_policy: Some("Always".to_string()), + node_name: None, + }, + }, + }, + status: Some(default_status()), + } + } + + #[test] + fn deployment_round_trip_proto_conversion() { + let deployment = test_deployment(); + let proto = DeploymentServiceImpl::to_proto_deployment(&deployment); + let round_trip = DeploymentServiceImpl::from_proto_deployment(&proto).unwrap(); + + assert_eq!(round_trip.metadata.name, deployment.metadata.name); + assert_eq!(round_trip.spec.replicas, Some(2)); + assert_eq!( + round_trip + .spec + .selector + .match_labels + .get("app") + .map(String::as_str), + Some("web") + ); + } + + #[test] + fn validate_spec_requires_selector_labels_on_template() { + let mut deployment = test_deployment(); + deployment.spec.template.metadata.labels.clear(); + + let error = DeploymentServiceImpl::validate_spec(&deployment).unwrap_err(); + assert_eq!(error.code(), tonic::Code::InvalidArgument); + } + + #[test] + fn template_hash_changes_when_template_changes() { + let deployment = test_deployment(); + let mut changed = test_deployment(); + changed.spec.template.spec.containers[0].image = "caddy:latest".to_string(); + + let original_hash = deployment_template_hash(&deployment).unwrap(); + let changed_hash = deployment_template_hash(&changed).unwrap(); + + assert_ne!(original_hash, changed_hash); + } + + #[test] + fn owned_pod_detection_matches_uid_annotation() { + let deployment = test_deployment(); + let mut pod = k8shost_types::Pod { + metadata: k8shost_types::ObjectMeta { + name: "web-1".to_string(), + namespace: Some("default".to_string()), + uid: None, + resource_version: None, + creation_timestamp: None, + labels: HashMap::new(), + annotations: HashMap::from([( + DEPLOYMENT_UID_ANNOTATION.to_string(), + "deploy-uid".to_string(), + )]), + org_id: Some("test-org".to_string()), + project_id: Some("test-project".to_string()), + }, + spec: k8shost_types::PodSpec { + containers: Vec::new(), + restart_policy: None, + node_name: None, + }, + status: None, + }; + + assert!(DeploymentServiceImpl::pod_is_owned_by_deployment( + &deployment, + &pod + )); + + pod.metadata.annotations.clear(); + assert!(!DeploymentServiceImpl::pod_is_owned_by_deployment( + &deployment, + &pod + )); + } +} diff --git a/k8shost/crates/k8shost-server/src/services/mod.rs b/k8shost/crates/k8shost-server/src/services/mod.rs index 5cf86e9..2ea4918 100644 --- a/k8shost/crates/k8shost-server/src/services/mod.rs +++ b/k8shost/crates/k8shost-server/src/services/mod.rs @@ -1,6 +1,7 @@ +pub mod deployment; +pub mod node; pub mod pod; pub mod service; -pub mod node; #[cfg(test)] mod tests; diff --git a/k8shost/crates/k8shost-server/src/services/node.rs b/k8shost/crates/k8shost-server/src/services/node.rs index c93b67e..637ad7f 100644 --- a/k8shost/crates/k8shost-server/src/services/node.rs +++ b/k8shost/crates/k8shost-server/src/services/node.rs @@ -143,9 +143,7 @@ impl NodeServiceImpl { } /// Convert proto NodeStatus to k8shost_types::NodeStatus - fn from_proto_node_status( - proto: &k8shost_proto::NodeStatus, - ) -> k8shost_types::NodeStatus { + fn from_proto_node_status(proto: &k8shost_proto::NodeStatus) -> k8shost_types::NodeStatus { k8shost_types::NodeStatus { addresses: proto .addresses @@ -247,7 +245,11 @@ impl NodeService for NodeServiceImpl { // Get existing node let mut node = self .storage - .get_node(&tenant_context.org_id, &tenant_context.project_id, &req.node_name) + .get_node( + &tenant_context.org_id, + &tenant_context.project_id, + &req.node_name, + ) .await? .ok_or_else(|| Status::not_found(format!("Node {} not found", req.node_name)))?; @@ -257,9 +259,10 @@ impl NodeService for NodeServiceImpl { } // Update last heartbeat timestamp in annotations - node.metadata - .annotations - .insert("k8shost.io/last-heartbeat".to_string(), Utc::now().to_rfc3339()); + node.metadata.annotations.insert( + "k8shost.io/last-heartbeat".to_string(), + Utc::now().to_rfc3339(), + ); // Increment resource version let current_version = node @@ -286,15 +289,22 @@ impl NodeService for NodeServiceImpl { .authorize( &tenant_context, ACTION_NODE_LIST, - &resource_for_tenant("node", "*", &tenant_context.org_id, &tenant_context.project_id), + &resource_for_tenant( + "node", + "*", + &tenant_context.org_id, + &tenant_context.project_id, + ), ) .await?; let _req = request.into_inner(); - let nodes = self.storage.list_nodes(&tenant_context.org_id, &tenant_context.project_id).await?; + let nodes = self + .storage + .list_nodes(&tenant_context.org_id, &tenant_context.project_id) + .await?; - let items: Vec = - nodes.iter().map(Self::to_proto_node).collect(); + let items: Vec = nodes.iter().map(Self::to_proto_node).collect(); Ok(Response::new(ListNodesResponse { items })) } diff --git a/k8shost/crates/k8shost-server/src/services/pod.rs b/k8shost/crates/k8shost-server/src/services/pod.rs index 8821047..ca518b8 100644 --- a/k8shost/crates/k8shost-server/src/services/pod.rs +++ b/k8shost/crates/k8shost-server/src/services/pod.rs @@ -48,18 +48,19 @@ impl PodServiceImpl { pub async fn new_with_credit_service(storage: Arc, auth: Arc) -> Self { // Initialize CreditService client if endpoint is configured let credit_service = match std::env::var("CREDITSERVICE_ENDPOINT") { - Ok(endpoint) => { - match CreditServiceClient::connect(&endpoint).await { - Ok(client) => { - tracing::info!("CreditService admission control enabled: {}", endpoint); - Some(Arc::new(RwLock::new(client))) - } - Err(e) => { - tracing::warn!("Failed to connect to CreditService (admission control disabled): {}", e); - None - } + Ok(endpoint) => match CreditServiceClient::connect(&endpoint).await { + Ok(client) => { + tracing::info!("CreditService admission control enabled: {}", endpoint); + Some(Arc::new(RwLock::new(client))) } - } + Err(e) => { + tracing::warn!( + "Failed to connect to CreditService (admission control disabled): {}", + e + ); + None + } + }, Err(_) => { tracing::info!("CREDITSERVICE_ENDPOINT not set, admission control disabled"); None @@ -118,7 +119,10 @@ impl PodServiceImpl { stripped.parse::().ok() } else { // Full CPU cores - cpu_str.parse::().ok().map(|cores| (cores * 1000.0) as i64) + cpu_str + .parse::() + .ok() + .map(|cores| (cores * 1000.0) as i64) } } @@ -134,7 +138,10 @@ impl PodServiceImpl { stripped.parse::().ok().map(|gb| gb * 1000) } else { // Try parsing as bytes - mem_str.parse::().ok().map(|bytes| bytes / (1024 * 1024)) + mem_str + .parse::() + .ok() + .map(|bytes| bytes / (1024 * 1024)) } } @@ -307,7 +314,9 @@ impl PodService for PodServiceImpl { ) -> Result, Status> { let tenant = get_tenant_context(&request)?; let req = request.into_inner(); - let proto_pod = req.pod.ok_or_else(|| Status::invalid_argument("pod is required"))?; + let proto_pod = req + .pod + .ok_or_else(|| Status::invalid_argument("pod is required"))?; // Convert proto to internal type let mut pod = Self::from_proto_pod(&proto_pod)?; @@ -327,7 +336,9 @@ impl PodService for PodServiceImpl { ) .await?; if pod.metadata.namespace.is_none() { - return Err(Status::invalid_argument("namespace is required in metadata")); + return Err(Status::invalid_argument( + "namespace is required in metadata", + )); } // Assign UID if not present @@ -362,11 +373,15 @@ impl PodService for PodServiceImpl { // Calculate estimated cost based on Pod resource requests let estimated_cost = Self::calculate_pod_cost(&pod); - let project_id = pod.metadata.project_id.as_ref() - .ok_or_else(|| Status::invalid_argument("project_id required for admission control"))?; + let project_id = pod.metadata.project_id.as_ref().ok_or_else(|| { + Status::invalid_argument("project_id required for admission control") + })?; // Phase 0: Check quota - match client.check_quota(project_id, ResourceType::K8sNode, 1, estimated_cost).await { + match client + .check_quota(project_id, ResourceType::K8sNode, 1, estimated_cost) + .await + { Ok(resp) if !resp.allowed => { let reason = if resp.reason.is_empty() { "Insufficient quota or balance".to_string() @@ -378,7 +393,10 @@ impl PodService for PodServiceImpl { reason = %reason, "Pod creation denied by CreditService" ); - return Err(Status::resource_exhausted(format!("Admission denied: {}", reason))); + return Err(Status::resource_exhausted(format!( + "Admission denied: {}", + reason + ))); } Err(e) => { tracing::warn!("CreditService check_quota failed (allowing request): {}", e); @@ -388,13 +406,16 @@ impl PodService for PodServiceImpl { } // Phase 1: Reserve credits - match client.reserve_credits( - project_id, - estimated_cost, - format!("Pod {} creation", pod.metadata.name), - "PodInstance", - 300, // 5 minute TTL - ).await { + match client + .reserve_credits( + project_id, + estimated_cost, + format!("Pod {} creation", pod.metadata.name), + "PodInstance", + 300, // 5 minute TTL + ) + .await + { Ok(reservation) => { tracing::info!( reservation_id = %reservation.id, @@ -405,7 +426,10 @@ impl PodService for PodServiceImpl { } Err(e) => { tracing::warn!("CreditService reserve_credits failed: {}", e); - return Err(Status::resource_exhausted(format!("Failed to reserve credits: {}", e))); + return Err(Status::resource_exhausted(format!( + "Failed to reserve credits: {}", + e + ))); } } } else { @@ -417,9 +441,14 @@ impl PodService for PodServiceImpl { Ok(_) => {} Err(e) => { // Rollback: Release reservation on failure - if let (Some(ref credit_svc), Some(ref res_id)) = (&self.credit_service, &reservation_id) { + if let (Some(ref credit_svc), Some(ref res_id)) = + (&self.credit_service, &reservation_id) + { let mut client = credit_svc.write().await; - if let Err(release_err) = client.release_reservation(res_id, format!("Pod storage failed: {}", e)).await { + if let Err(release_err) = client + .release_reservation(res_id, format!("Pod storage failed: {}", e)) + .await + { tracing::warn!("Failed to release reservation {}: {}", res_id, release_err); } else { tracing::info!(reservation_id = %res_id, "Released reservation after Pod storage failure"); @@ -435,7 +464,10 @@ impl PodService for PodServiceImpl { let actual_cost = Self::calculate_pod_cost(&pod); let pod_uid = pod.metadata.uid.as_ref().unwrap_or(&pod.metadata.name); - if let Err(e) = client.commit_reservation(res_id, actual_cost, pod_uid).await { + if let Err(e) = client + .commit_reservation(res_id, actual_cost, pod_uid) + .await + { tracing::warn!("Failed to commit reservation {}: {}", res_id, e); // Pod is already created, so we don't fail here - billing will reconcile } else { @@ -471,13 +503,23 @@ impl PodService for PodServiceImpl { .authorize( &tenant_context, ACTION_POD_READ, - &resource_for_tenant("pod", pod_key, &tenant_context.org_id, &tenant_context.project_id), + &resource_for_tenant( + "pod", + pod_key, + &tenant_context.org_id, + &tenant_context.project_id, + ), ) .await?; let pod = self .storage - .get_pod(&tenant_context.org_id, &tenant_context.project_id, &req.namespace, &req.name) + .get_pod( + &tenant_context.org_id, + &tenant_context.project_id, + &req.namespace, + &req.name, + ) .await?; if let Some(pod) = pod { @@ -500,7 +542,12 @@ impl PodService for PodServiceImpl { .authorize( &tenant_context, ACTION_POD_LIST, - &resource_for_tenant("pod", "*", &tenant_context.org_id, &tenant_context.project_id), + &resource_for_tenant( + "pod", + "*", + &tenant_context.org_id, + &tenant_context.project_id, + ), ) .await?; let req = request.into_inner(); @@ -514,7 +561,12 @@ impl PodService for PodServiceImpl { let pods = self .storage - .list_pods(&tenant_context.org_id, &tenant_context.project_id, namespace, label_selector) + .list_pods( + &tenant_context.org_id, + &tenant_context.project_id, + namespace, + label_selector, + ) .await?; let items: Vec = pods.iter().map(Self::to_proto_pod).collect(); @@ -528,7 +580,9 @@ impl PodService for PodServiceImpl { ) -> Result, Status> { let tenant = get_tenant_context(&request)?; let req = request.into_inner(); - let proto_pod = req.pod.ok_or_else(|| Status::invalid_argument("pod is required"))?; + let proto_pod = req + .pod + .ok_or_else(|| Status::invalid_argument("pod is required"))?; let mut pod = Self::from_proto_pod(&proto_pod)?; @@ -585,13 +639,23 @@ impl PodService for PodServiceImpl { .authorize( &tenant_context, ACTION_POD_DELETE, - &resource_for_tenant("pod", pod_key, &tenant_context.org_id, &tenant_context.project_id), + &resource_for_tenant( + "pod", + pod_key, + &tenant_context.org_id, + &tenant_context.project_id, + ), ) .await?; let existed = self .storage - .delete_pod(&tenant_context.org_id, &tenant_context.project_id, &req.namespace, &req.name) + .delete_pod( + &tenant_context.org_id, + &tenant_context.project_id, + &req.namespace, + &req.name, + ) .await?; Ok(Response::new(DeletePodResponse { success: existed })) @@ -608,7 +672,12 @@ impl PodService for PodServiceImpl { .authorize( &tenant_context, ACTION_POD_LIST, - &resource_for_tenant("pod", "*", &tenant_context.org_id, &tenant_context.project_id), + &resource_for_tenant( + "pod", + "*", + &tenant_context.org_id, + &tenant_context.project_id, + ), ) .await?; let _req = request.into_inner(); diff --git a/k8shost/crates/k8shost-server/src/services/service.rs b/k8shost/crates/k8shost-server/src/services/service.rs index 5f14fcf..77beec8 100644 --- a/k8shost/crates/k8shost-server/src/services/service.rs +++ b/k8shost/crates/k8shost-server/src/services/service.rs @@ -33,7 +33,11 @@ pub struct ServiceServiceImpl { } impl ServiceServiceImpl { - pub fn new(storage: Arc, ipam_client: Arc, auth: Arc) -> Self { + pub fn new( + storage: Arc, + ipam_client: Arc, + auth: Arc, + ) -> Self { Self { storage, ipam_client, @@ -73,8 +77,10 @@ impl ServiceServiceImpl { }); let status = svc.status.as_ref().map(|s| k8shost_proto::ServiceStatus { - load_balancer: s.load_balancer.as_ref().map(|lb| { - k8shost_proto::LoadBalancerStatus { + load_balancer: s + .load_balancer + .as_ref() + .map(|lb| k8shost_proto::LoadBalancerStatus { ingress: lb .ingress .iter() @@ -83,8 +89,7 @@ impl ServiceServiceImpl { hostname: ing.hostname.clone(), }) .collect(), - } - }), + }), }); k8shost_proto::Service { @@ -141,8 +146,10 @@ impl ServiceServiceImpl { }; let status = proto.status.as_ref().map(|s| k8shost_types::ServiceStatus { - load_balancer: s.load_balancer.as_ref().map(|lb| { - k8shost_types::LoadBalancerStatus { + load_balancer: s + .load_balancer + .as_ref() + .map(|lb| k8shost_types::LoadBalancerStatus { ingress: lb .ingress .iter() @@ -151,8 +158,7 @@ impl ServiceServiceImpl { hostname: ing.hostname.clone(), }) .collect(), - } - }), + }), }); Ok(k8shost_types::Service { @@ -194,7 +200,9 @@ impl ServiceService for ServiceServiceImpl { service.metadata.org_id = Some(org_id.clone()); service.metadata.project_id = Some(project_id.clone()); if service.metadata.namespace.is_none() { - return Err(Status::invalid_argument("namespace is required in metadata")); + return Err(Status::invalid_argument( + "namespace is required in metadata", + )); } self.auth .authorize( @@ -221,11 +229,7 @@ impl ServiceService for ServiceServiceImpl { // Allocate cluster IP if not present and service type is ClusterIP if service.spec.cluster_ip.is_none() { - let svc_type = service - .spec - .r#type - .as_deref() - .unwrap_or("ClusterIP"); + let svc_type = service.spec.r#type.as_deref().unwrap_or("ClusterIP"); if svc_type == "ClusterIP" || svc_type == "LoadBalancer" { // Get org_id, project_id, and uid for IPAM let org_id = service.metadata.org_id.as_ref().unwrap(); @@ -235,12 +239,7 @@ impl ServiceService for ServiceServiceImpl { // Allocate IP from IPAM let cluster_ip = self .ipam_client - .allocate_cluster_ip( - org_id, - project_id, - service_uid, - authorization.as_deref(), - ) + .allocate_cluster_ip(org_id, project_id, service_uid, authorization.as_deref()) .await .map_err(|e| { Status::internal(format!("Failed to allocate Cluster IP: {}", e)) @@ -281,7 +280,12 @@ impl ServiceService for ServiceServiceImpl { let service = self .storage - .get_service(&tenant_context.org_id, &tenant_context.project_id, &req.namespace, &req.name) + .get_service( + &tenant_context.org_id, + &tenant_context.project_id, + &req.namespace, + &req.name, + ) .await?; if let Some(service) = service { @@ -304,7 +308,12 @@ impl ServiceService for ServiceServiceImpl { .authorize( &tenant_context, ACTION_SERVICE_LIST, - &resource_for_tenant("service", "*", &tenant_context.org_id, &tenant_context.project_id), + &resource_for_tenant( + "service", + "*", + &tenant_context.org_id, + &tenant_context.project_id, + ), ) .await?; let req = request.into_inner(); @@ -313,13 +322,15 @@ impl ServiceService for ServiceServiceImpl { let services = self .storage - .list_services(&tenant_context.org_id, &tenant_context.project_id, namespace) + .list_services( + &tenant_context.org_id, + &tenant_context.project_id, + namespace, + ) .await?; - let items: Vec = services - .iter() - .map(Self::to_proto_service) - .collect(); + let items: Vec = + services.iter().map(Self::to_proto_service).collect(); Ok(Response::new(ListServicesResponse { items })) } diff --git a/k8shost/crates/k8shost-server/src/services/tests.rs b/k8shost/crates/k8shost-server/src/services/tests.rs index cb60a9e..0eda9de 100644 --- a/k8shost/crates/k8shost-server/src/services/tests.rs +++ b/k8shost/crates/k8shost-server/src/services/tests.rs @@ -207,7 +207,9 @@ mod tests { async fn test_pod_crud_operations() { // This test requires a running FlareDB instance let pd_addr = std::env::var("FLAREDB_PD_ADDR").unwrap_or("127.0.0.1:2479".to_string()); - let storage = Storage::new(pd_addr).await.expect("Failed to connect to FlareDB"); + let storage = Storage::new(pd_addr) + .await + .expect("Failed to connect to FlareDB"); let pod_service = PodServiceImpl::new(Arc::new(storage), test_auth_service().await); // Create a pod @@ -226,10 +228,7 @@ mod tests { let get_resp = pod_service.get_pod(get_req).await; assert!(get_resp.is_ok()); let retrieved_pod = get_resp.unwrap().into_inner().pod.unwrap(); - assert_eq!( - retrieved_pod.metadata.as_ref().unwrap().name, - "test-pod-1" - ); + assert_eq!(retrieved_pod.metadata.as_ref().unwrap().name, "test-pod-1"); // List pods let list_req = with_test_tenant(Request::new(ListPodsRequest { @@ -255,7 +254,9 @@ mod tests { #[ignore] // Requires running FlareDB and PrismNET instances async fn test_service_crud_operations() { let pd_addr = std::env::var("FLAREDB_PD_ADDR").unwrap_or("127.0.0.1:2479".to_string()); - let storage = Storage::new(pd_addr).await.expect("Failed to connect to FlareDB"); + let storage = Storage::new(pd_addr) + .await + .expect("Failed to connect to FlareDB"); let prismnet_addr = std::env::var("PRISMNET_ADDR").unwrap_or("http://127.0.0.1:9090".to_string()); let ipam_client = crate::ipam_client::IpamClient::new(prismnet_addr); @@ -273,12 +274,7 @@ mod tests { let create_resp = service_service.create_service(create_req).await; assert!(create_resp.is_ok()); let created_service = create_resp.unwrap().into_inner().service.unwrap(); - assert!(created_service - .spec - .as_ref() - .unwrap() - .cluster_ip - .is_some()); + assert!(created_service.spec.as_ref().unwrap().cluster_ip.is_some()); // Get the service let get_req = with_test_tenant(Request::new(GetServiceRequest { @@ -308,7 +304,9 @@ mod tests { #[ignore] // Requires running FlareDB instance async fn test_node_operations() { let pd_addr = std::env::var("FLAREDB_PD_ADDR").unwrap_or("127.0.0.1:2479".to_string()); - let storage = Storage::new(pd_addr).await.expect("Failed to connect to FlareDB"); + let storage = Storage::new(pd_addr) + .await + .expect("Failed to connect to FlareDB"); let node_service = NodeServiceImpl::new(Arc::new(storage), test_auth_service().await); // Register a node diff --git a/k8shost/crates/k8shost-server/src/storage.rs b/k8shost/crates/k8shost-server/src/storage.rs index 9d40d29..59659c6 100644 --- a/k8shost/crates/k8shost-server/src/storage.rs +++ b/k8shost/crates/k8shost-server/src/storage.rs @@ -4,7 +4,7 @@ //! with multi-tenant support using FlareDB as the backend. use flaredb_client::RdbClient; -use k8shost_types::{Node, Pod, Service}; +use k8shost_types::{Deployment, Node, Pod, Service}; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::Mutex; @@ -18,12 +18,8 @@ pub struct Storage { impl Storage { /// Create a new storage instance with FlareDB backend pub async fn new(pd_addr: String) -> Result> { - let client = RdbClient::connect_with_pd_namespace( - pd_addr.clone(), - pd_addr, - "k8shost", - ) - .await?; + let client = + RdbClient::connect_with_pd_namespace(pd_addr.clone(), pd_addr, "k8shost").await?; Ok(Self { client: Arc::new(Mutex::new(client)), @@ -31,9 +27,7 @@ impl Storage { } /// Create a storage instance that connects directly to a single FlareDB server (no PD) - pub async fn new_direct( - server_addr: String, - ) -> Result> { + pub async fn new_direct(server_addr: String) -> Result> { let client = RdbClient::connect_direct(server_addr, "k8shost").await?; Ok(Self { client: Arc::new(Mutex::new(client)), @@ -68,11 +62,20 @@ impl Storage { /// Create or update a pod pub async fn put_pod(&self, pod: &Pod) -> Result<(), Status> { - let org_id = pod.metadata.org_id.as_ref() + let org_id = pod + .metadata + .org_id + .as_ref() .ok_or_else(|| Status::invalid_argument("org_id is required"))?; - let project_id = pod.metadata.project_id.as_ref() + let project_id = pod + .metadata + .project_id + .as_ref() .ok_or_else(|| Status::invalid_argument("project_id is required"))?; - let namespace = pod.metadata.namespace.as_ref() + let namespace = pod + .metadata + .namespace + .as_ref() .ok_or_else(|| Status::invalid_argument("namespace is required"))?; let key = Self::pod_key(org_id, project_id, namespace, &pod.metadata.name); @@ -80,7 +83,8 @@ impl Storage { .map_err(|e| Status::internal(format!("Failed to serialize pod: {}", e)))?; let mut client = self.client.lock().await; - client.raw_put(key, value) + client + .raw_put(key, value) .await .map_err(|e| Status::internal(format!("FlareDB put failed: {}", e)))?; @@ -98,7 +102,8 @@ impl Storage { let key = Self::pod_key(org_id, project_id, namespace, name); let mut client = self.client.lock().await; - let result = client.raw_get(key) + let result = client + .raw_get(key) .await .map_err(|e| Status::internal(format!("FlareDB get failed: {}", e)))?; @@ -139,13 +144,14 @@ impl Storage { // Paginate through all results loop { let mut client = self.client.lock().await; - let (_keys, values, next) = client.raw_scan( - start_key.clone(), - end_key.clone(), - 1000, // Batch size - ) - .await - .map_err(|e| Status::internal(format!("FlareDB scan failed: {}", e)))?; + let (_keys, values, next) = client + .raw_scan( + start_key.clone(), + end_key.clone(), + 1000, // Batch size + ) + .await + .map_err(|e| Status::internal(format!("FlareDB scan failed: {}", e)))?; // Deserialize pods for value in values { @@ -153,7 +159,11 @@ impl Storage { // Apply label selector filter if provided if let Some(selector) = label_selector { let matches = selector.iter().all(|(k, v)| { - pod.metadata.labels.get(k).map(|pv| pv == v).unwrap_or(false) + pod.metadata + .labels + .get(k) + .map(|pv| pv == v) + .unwrap_or(false) }); if matches { pods.push(pod); @@ -221,7 +231,8 @@ impl Storage { let key = Self::pod_key(org_id, project_id, namespace, name); let mut client = self.client.lock().await; - let existed = client.raw_delete(key) + let existed = client + .raw_delete(key) .await .map_err(|e| Status::internal(format!("FlareDB delete failed: {}", e)))?; @@ -234,7 +245,11 @@ impl Storage { /// Build key for service storage fn service_key(org_id: &str, project_id: &str, namespace: &str, name: &str) -> Vec { - format!("k8s/{}/{}/services/{}/{}", org_id, project_id, namespace, name).into_bytes() + format!( + "k8s/{}/{}/services/{}/{}", + org_id, project_id, namespace, name + ) + .into_bytes() } /// Build prefix for service listing @@ -248,11 +263,20 @@ impl Storage { /// Create or update a service pub async fn put_service(&self, service: &Service) -> Result<(), Status> { - let org_id = service.metadata.org_id.as_ref() + let org_id = service + .metadata + .org_id + .as_ref() .ok_or_else(|| Status::invalid_argument("org_id is required"))?; - let project_id = service.metadata.project_id.as_ref() + let project_id = service + .metadata + .project_id + .as_ref() .ok_or_else(|| Status::invalid_argument("project_id is required"))?; - let namespace = service.metadata.namespace.as_ref() + let namespace = service + .metadata + .namespace + .as_ref() .ok_or_else(|| Status::invalid_argument("namespace is required"))?; let key = Self::service_key(org_id, project_id, namespace, &service.metadata.name); @@ -260,7 +284,8 @@ impl Storage { .map_err(|e| Status::internal(format!("Failed to serialize service: {}", e)))?; let mut client = self.client.lock().await; - client.raw_put(key, value) + client + .raw_put(key, value) .await .map_err(|e| Status::internal(format!("FlareDB put failed: {}", e)))?; @@ -278,7 +303,8 @@ impl Storage { let key = Self::service_key(org_id, project_id, namespace, name); let mut client = self.client.lock().await; - let result = client.raw_get(key) + let result = client + .raw_get(key) .await .map_err(|e| Status::internal(format!("FlareDB get failed: {}", e)))?; @@ -316,13 +342,10 @@ impl Storage { loop { let mut client = self.client.lock().await; - let (_keys, values, next) = client.raw_scan( - start_key.clone(), - end_key.clone(), - 1000, - ) - .await - .map_err(|e| Status::internal(format!("FlareDB scan failed: {}", e)))?; + let (_keys, values, next) = client + .raw_scan(start_key.clone(), end_key.clone(), 1000) + .await + .map_err(|e| Status::internal(format!("FlareDB scan failed: {}", e)))?; for value in values { if let Ok(service) = serde_json::from_slice::(&value) { @@ -351,7 +374,8 @@ impl Storage { let key = Self::service_key(org_id, project_id, namespace, name); let mut client = self.client.lock().await; - let existed = client.raw_delete(key) + let existed = client + .raw_delete(key) .await .map_err(|e| Status::internal(format!("FlareDB delete failed: {}", e)))?; @@ -374,9 +398,15 @@ impl Storage { /// Create or update a node pub async fn put_node(&self, node: &Node) -> Result<(), Status> { - let org_id = node.metadata.org_id.as_ref() + let org_id = node + .metadata + .org_id + .as_ref() .ok_or_else(|| Status::invalid_argument("org_id is required"))?; - let project_id = node.metadata.project_id.as_ref() + let project_id = node + .metadata + .project_id + .as_ref() .ok_or_else(|| Status::invalid_argument("project_id is required"))?; let key = Self::node_key(org_id, project_id, &node.metadata.name); @@ -384,7 +414,8 @@ impl Storage { .map_err(|e| Status::internal(format!("Failed to serialize node: {}", e)))?; let mut client = self.client.lock().await; - client.raw_put(key, value) + client + .raw_put(key, value) .await .map_err(|e| Status::internal(format!("FlareDB put failed: {}", e)))?; @@ -401,7 +432,8 @@ impl Storage { let key = Self::node_key(org_id, project_id, name); let mut client = self.client.lock().await; - let result = client.raw_get(key) + let result = client + .raw_get(key) .await .map_err(|e| Status::internal(format!("FlareDB get failed: {}", e)))?; @@ -415,11 +447,7 @@ impl Storage { } /// List all nodes - pub async fn list_nodes( - &self, - org_id: &str, - project_id: &str, - ) -> Result, Status> { + pub async fn list_nodes(&self, org_id: &str, project_id: &str) -> Result, Status> { let prefix = Self::node_prefix(org_id, project_id); let mut end_key = prefix.clone(); @@ -438,13 +466,10 @@ impl Storage { loop { let mut client = self.client.lock().await; - let (_keys, values, next) = client.raw_scan( - start_key.clone(), - end_key.clone(), - 1000, - ) - .await - .map_err(|e| Status::internal(format!("FlareDB scan failed: {}", e)))?; + let (_keys, values, next) = client + .raw_scan(start_key.clone(), end_key.clone(), 1000) + .await + .map_err(|e| Status::internal(format!("FlareDB scan failed: {}", e)))?; for value in values { if let Ok(node) = serde_json::from_slice::(&value) { @@ -472,7 +497,184 @@ impl Storage { let key = Self::node_key(org_id, project_id, name); let mut client = self.client.lock().await; - let existed = client.raw_delete(key) + let existed = client + .raw_delete(key) + .await + .map_err(|e| Status::internal(format!("FlareDB delete failed: {}", e)))?; + + Ok(existed) + } + + // ============================================================================ + // Deployment Operations + // ============================================================================ + + /// Build key for deployment storage + fn deployment_key(org_id: &str, project_id: &str, namespace: &str, name: &str) -> Vec { + format!( + "k8s/{}/{}/deployments/{}/{}", + org_id, project_id, namespace, name + ) + .into_bytes() + } + + /// Build prefix for deployment listing + fn deployment_prefix(org_id: &str, project_id: &str, namespace: Option<&str>) -> Vec { + if let Some(ns) = namespace { + format!("k8s/{}/{}/deployments/{}/", org_id, project_id, ns).into_bytes() + } else { + format!("k8s/{}/{}/deployments/", org_id, project_id).into_bytes() + } + } + + /// Create or update a deployment + pub async fn put_deployment(&self, deployment: &Deployment) -> Result<(), Status> { + let org_id = deployment + .metadata + .org_id + .as_ref() + .ok_or_else(|| Status::invalid_argument("org_id is required"))?; + let project_id = deployment + .metadata + .project_id + .as_ref() + .ok_or_else(|| Status::invalid_argument("project_id is required"))?; + let namespace = deployment + .metadata + .namespace + .as_ref() + .ok_or_else(|| Status::invalid_argument("namespace is required"))?; + + let key = Self::deployment_key(org_id, project_id, namespace, &deployment.metadata.name); + let value = serde_json::to_vec(deployment) + .map_err(|e| Status::internal(format!("Failed to serialize deployment: {}", e)))?; + + let mut client = self.client.lock().await; + client + .raw_put(key, value) + .await + .map_err(|e| Status::internal(format!("FlareDB put failed: {}", e)))?; + + Ok(()) + } + + /// Get a deployment by name + pub async fn get_deployment( + &self, + org_id: &str, + project_id: &str, + namespace: &str, + name: &str, + ) -> Result, Status> { + let key = Self::deployment_key(org_id, project_id, namespace, name); + + let mut client = self.client.lock().await; + let result = client + .raw_get(key) + .await + .map_err(|e| Status::internal(format!("FlareDB get failed: {}", e)))?; + + if let Some(bytes) = result { + let deployment: Deployment = serde_json::from_slice(&bytes).map_err(|e| { + Status::internal(format!("Failed to deserialize deployment: {}", e)) + })?; + Ok(Some(deployment)) + } else { + Ok(None) + } + } + + /// List deployments in a namespace + pub async fn list_deployments( + &self, + org_id: &str, + project_id: &str, + namespace: Option<&str>, + ) -> Result, Status> { + let prefix = Self::deployment_prefix(org_id, project_id, namespace); + + let mut end_key = prefix.clone(); + if let Some(last) = end_key.last_mut() { + if *last == 0xff { + end_key.push(0x00); + } else { + *last += 1; + } + } else { + end_key.push(0xff); + } + + let mut deployments = Vec::new(); + let mut start_key = prefix; + + loop { + let mut client = self.client.lock().await; + let (_keys, values, next) = client + .raw_scan(start_key.clone(), end_key.clone(), 1000) + .await + .map_err(|e| Status::internal(format!("FlareDB scan failed: {}", e)))?; + + for value in values { + if let Ok(deployment) = serde_json::from_slice::(&value) { + deployments.push(deployment); + } + } + + if let Some(next_key) = next { + start_key = next_key; + } else { + break; + } + } + + Ok(deployments) + } + + /// List deployments across all tenants + pub async fn list_all_deployments(&self) -> Result, Status> { + let prefix = b"k8s/".to_vec(); + let mut end_key = prefix.clone(); + end_key.push(0xff); + + let mut deployments = Vec::new(); + let mut start_key = prefix; + + loop { + let mut client = self.client.lock().await; + let (_keys, values, next) = client + .raw_scan(start_key.clone(), end_key.clone(), 1000) + .await + .map_err(|e| Status::internal(format!("FlareDB scan failed: {}", e)))?; + + for value in values { + if let Ok(deployment) = serde_json::from_slice::(&value) { + deployments.push(deployment); + } + } + + if let Some(next_key) = next { + start_key = next_key; + } else { + break; + } + } + + Ok(deployments) + } + + /// Delete a deployment + pub async fn delete_deployment( + &self, + org_id: &str, + project_id: &str, + namespace: &str, + name: &str, + ) -> Result { + let key = Self::deployment_key(org_id, project_id, namespace, name); + + let mut client = self.client.lock().await; + let existed = client + .raw_delete(key) .await .map_err(|e| Status::internal(format!("FlareDB delete failed: {}", e)))?; diff --git a/nix/test-cluster/run-cluster.sh b/nix/test-cluster/run-cluster.sh index f4d84a6..22b1832 100755 --- a/nix/test-cluster/run-cluster.sh +++ b/nix/test-cluster/run-cluster.sh @@ -3474,9 +3474,10 @@ validate_k8shost_flow() { local org_id="default-org" local project_id="default-project" local principal_id="k8shost-smoke-$(date +%s)" - local token node_name pod_name service_name service_port + local token node_name deployment_name pod_name service_name service_port token="$(issue_project_admin_token 15080 "${org_id}" "${project_id}" "${principal_id}")" node_name="smoke-node-$(date +%s)" + deployment_name="smoke-deploy-$(date +%s)" pod_name="smoke-pod-$(date +%s)" service_name="smoke-svc-$(date +%s)" service_port=$((18180 + (RANDOM % 100))) @@ -3502,6 +3503,97 @@ validate_k8shost_flow() { 127.0.0.1:15087 k8shost.NodeService/ListNodes \ | jq -e --arg name "${node_name}" '.items | any(.metadata.name == $name)' >/dev/null + grpcurl -plaintext \ + -H "authorization: Bearer ${token}" \ + -import-path "${K8SHOST_PROTO_DIR}" \ + -proto "${K8SHOST_PROTO}" \ + -d "$(jq -cn --arg name "${deployment_name}" --arg org "${org_id}" --arg project "${project_id}" '{deployment:{metadata:{name:$name, namespace:"default", orgId:$org, projectId:$project}, spec:{replicas:2, selector:{matchLabels:{app:"k8shost-deployment-smoke", deployment:$name}}, template:{metadata:{name:"", namespace:"default", labels:{app:"k8shost-deployment-smoke", deployment:$name}}, spec:{containers:[{name:"backend", image:"smoke", ports:[{containerPort:8082, protocol:"TCP"}]}]}}}}}')" \ + 127.0.0.1:15087 k8shost.DeploymentService/CreateDeployment >/dev/null + grpcurl -plaintext \ + -H "authorization: Bearer ${token}" \ + -import-path "${K8SHOST_PROTO_DIR}" \ + -proto "${K8SHOST_PROTO}" \ + -d "$(jq -cn '{namespace:"default"}')" \ + 127.0.0.1:15087 k8shost.DeploymentService/ListDeployments \ + | jq -e --arg name "${deployment_name}" '.items | any(.metadata.name == $name)' >/dev/null + + deadline=$((SECONDS + HTTP_WAIT_TIMEOUT)) + while true; do + local deployment_pods_json + deployment_pods_json="$(grpcurl -plaintext \ + -H "authorization: Bearer ${token}" \ + -import-path "${K8SHOST_PROTO_DIR}" \ + -proto "${K8SHOST_PROTO}" \ + -d "$(jq -cn --arg ns "default" --arg deploy "${deployment_name}" '{namespace:$ns, labelSelector:{deployment:$deploy}}')" \ + 127.0.0.1:15087 k8shost.PodService/ListPods 2>/dev/null || true)" + if [[ -n "${deployment_pods_json}" ]] && printf '%s' "${deployment_pods_json}" | jq -e --arg node "${node_name}" ' + (.items | length) == 2 and + all(.items[]; .spec.nodeName == $node)' >/dev/null 2>&1; then + break + fi + if (( SECONDS >= deadline )); then + die "timed out waiting for K8sHost Deployment ${deployment_name} to create and schedule pods" + fi + sleep 2 + done + + local deployment_json + deployment_json="$(grpcurl -plaintext \ + -H "authorization: Bearer ${token}" \ + -import-path "${K8SHOST_PROTO_DIR}" \ + -proto "${K8SHOST_PROTO}" \ + -d "$(jq -cn --arg ns "default" --arg name "${deployment_name}" '{namespace:$ns, name:$name}')" \ + 127.0.0.1:15087 k8shost.DeploymentService/GetDeployment)" + grpcurl -plaintext \ + -H "authorization: Bearer ${token}" \ + -import-path "${K8SHOST_PROTO_DIR}" \ + -proto "${K8SHOST_PROTO}" \ + -d "$(printf '%s' "${deployment_json}" | jq '.deployment.spec.replicas = 1 | {deployment:.deployment}')" \ + 127.0.0.1:15087 k8shost.DeploymentService/UpdateDeployment >/dev/null + + deadline=$((SECONDS + HTTP_WAIT_TIMEOUT)) + while true; do + local scaled_pods_json + scaled_pods_json="$(grpcurl -plaintext \ + -H "authorization: Bearer ${token}" \ + -import-path "${K8SHOST_PROTO_DIR}" \ + -proto "${K8SHOST_PROTO}" \ + -d "$(jq -cn --arg ns "default" --arg deploy "${deployment_name}" '{namespace:$ns, labelSelector:{deployment:$deploy}}')" \ + 127.0.0.1:15087 k8shost.PodService/ListPods 2>/dev/null || true)" + if [[ -n "${scaled_pods_json}" ]] && printf '%s' "${scaled_pods_json}" | jq -e '.items | length == 1' >/dev/null 2>&1; then + break + fi + if (( SECONDS >= deadline )); then + die "timed out waiting for K8sHost Deployment ${deployment_name} to scale down" + fi + sleep 2 + done + + grpcurl -plaintext \ + -H "authorization: Bearer ${token}" \ + -import-path "${K8SHOST_PROTO_DIR}" \ + -proto "${K8SHOST_PROTO}" \ + -d "$(jq -cn --arg ns "default" --arg name "${deployment_name}" '{namespace:$ns, name:$name}')" \ + 127.0.0.1:15087 k8shost.DeploymentService/DeleteDeployment >/dev/null + + deadline=$((SECONDS + HTTP_WAIT_TIMEOUT)) + while true; do + local deleted_pods_json + deleted_pods_json="$(grpcurl -plaintext \ + -H "authorization: Bearer ${token}" \ + -import-path "${K8SHOST_PROTO_DIR}" \ + -proto "${K8SHOST_PROTO}" \ + -d "$(jq -cn --arg ns "default" --arg deploy "${deployment_name}" '{namespace:$ns, labelSelector:{deployment:$deploy}}')" \ + 127.0.0.1:15087 k8shost.PodService/ListPods 2>/dev/null || true)" + if [[ -n "${deleted_pods_json}" ]] && printf '%s' "${deleted_pods_json}" | jq -e '.items | length == 0' >/dev/null 2>&1; then + break + fi + if (( SECONDS >= deadline )); then + die "timed out waiting for K8sHost Deployment ${deployment_name} to delete managed pods" + fi + sleep 2 + done + grpcurl -plaintext \ -H "authorization: Bearer ${token}" \ -import-path "${K8SHOST_PROTO_DIR}" \