From 2b7c3166d2dce4ce44ba4c059f70fe922dc49c75 Mon Sep 17 00:00:00 2001
From: centra
Date: Tue, 31 Mar 2026 23:15:04 +0900
Subject: [PATCH] Implement K8sHost deployment reconciliation
---
k8shost/Cargo.lock | 1 +
k8shost/crates/k8shost-server/Cargo.toml | 1 +
k8shost/crates/k8shost-server/src/cni.rs | 19 +-
k8shost/crates/k8shost-server/src/config.rs | 3 +-
.../src/deployment_controller.rs | 414 +++++++++++
.../k8shost-server/src/fiberlb_controller.rs | 162 ++--
.../k8shost-server/src/flashdns_controller.rs | 30 +-
.../crates/k8shost-server/src/ipam_client.rs | 32 +-
k8shost/crates/k8shost-server/src/lib.rs | 6 +-
k8shost/crates/k8shost-server/src/main.rs | 148 ++--
k8shost/crates/k8shost-server/src/rest.rs | 262 +++++--
.../crates/k8shost-server/src/scheduler.rs | 102 ++-
.../k8shost-server/src/services/deployment.rs | 696 ++++++++++++++++++
.../crates/k8shost-server/src/services/mod.rs | 3 +-
.../k8shost-server/src/services/node.rs | 32 +-
.../crates/k8shost-server/src/services/pod.rs | 145 +++-
.../k8shost-server/src/services/service.rs | 67 +-
.../k8shost-server/src/services/tests.rs | 24 +-
k8shost/crates/k8shost-server/src/storage.rs | 312 ++++++--
nix/test-cluster/run-cluster.sh | 94 ++-
20 files changed, 2119 insertions(+), 434 deletions(-)
create mode 100644 k8shost/crates/k8shost-server/src/deployment_controller.rs
create mode 100644 k8shost/crates/k8shost-server/src/services/deployment.rs
diff --git a/k8shost/Cargo.lock b/k8shost/Cargo.lock
index 47d8064..ac2d6ef 100644
--- a/k8shost/Cargo.lock
+++ b/k8shost/Cargo.lock
@@ -1985,6 +1985,7 @@ dependencies = [
"prost 0.13.5",
"serde",
"serde_json",
+ "sha2",
"tempfile",
"tokio",
"tokio-stream",
diff --git a/k8shost/crates/k8shost-server/Cargo.toml b/k8shost/crates/k8shost-server/Cargo.toml
index a6cb508..cce801c 100644
--- a/k8shost/crates/k8shost-server/Cargo.toml
+++ b/k8shost/crates/k8shost-server/Cargo.toml
@@ -39,6 +39,7 @@ chrono = { workspace = true }
clap = { workspace = true }
config = { workspace = true }
toml = { workspace = true }
+sha2 = "0.10"
# REST API dependencies
axum = "0.8"
diff --git a/k8shost/crates/k8shost-server/src/cni.rs b/k8shost/crates/k8shost-server/src/cni.rs
index ebc6d76..4e327bf 100644
--- a/k8shost/crates/k8shost-server/src/cni.rs
+++ b/k8shost/crates/k8shost-server/src/cni.rs
@@ -8,8 +8,8 @@
use anyhow::{Context, Result};
use serde_json::json;
-use std::process::Command;
use std::io::Write;
+use std::process::Command;
/// CNI configuration for pod network setup
#[derive(Debug, Clone)]
@@ -85,7 +85,9 @@ pub async fn invoke_cni_add(
}
// Wait for result
- let output = child.wait_with_output().context("Failed to wait for CNI plugin")?;
+ let output = child
+ .wait_with_output()
+ .context("Failed to wait for CNI plugin")?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
@@ -93,8 +95,8 @@ pub async fn invoke_cni_add(
}
// Parse result
- let result: CniResult = serde_json::from_slice(&output.stdout)
- .context("Failed to parse CNI result")?;
+ let result: CniResult =
+ serde_json::from_slice(&output.stdout).context("Failed to parse CNI result")?;
Ok(result)
}
@@ -146,11 +148,16 @@ pub async fn invoke_cni_del(
}
// Wait for result
- let output = child.wait_with_output().context("Failed to wait for CNI plugin")?;
+ let output = child
+ .wait_with_output()
+ .context("Failed to wait for CNI plugin")?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
- tracing::warn!("CNI DEL error (may be expected if already deleted): {}", stderr);
+ tracing::warn!(
+ "CNI DEL error (may be expected if already deleted): {}",
+ stderr
+ );
}
Ok(())
diff --git a/k8shost/crates/k8shost-server/src/config.rs b/k8shost/crates/k8shost-server/src/config.rs
index a64272f..d2cbdd6 100644
--- a/k8shost/crates/k8shost-server/src/config.rs
+++ b/k8shost/crates/k8shost-server/src/config.rs
@@ -104,8 +104,7 @@ impl Default for ChainFireConfig {
}
}
-#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
-#[derive(Default)]
+#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
pub struct Config {
pub server: ServerConfig,
pub flaredb: FlareDbConfig,
diff --git a/k8shost/crates/k8shost-server/src/deployment_controller.rs b/k8shost/crates/k8shost-server/src/deployment_controller.rs
new file mode 100644
index 0000000..7f3b20d
--- /dev/null
+++ b/k8shost/crates/k8shost-server/src/deployment_controller.rs
@@ -0,0 +1,414 @@
+//! Background reconciler for Deployment resources.
+
+use crate::services::deployment::{
+ deployment_template_hash, DeploymentServiceImpl, DEPLOYMENT_NAME_ANNOTATION,
+ DEPLOYMENT_UID_ANNOTATION, TEMPLATE_HASH_ANNOTATION,
+};
+use crate::storage::Storage;
+use chrono::Utc;
+use k8shost_types::{Deployment, DeploymentStatus, Pod, PodStatus};
+use std::collections::BTreeSet;
+use std::sync::Arc;
+use std::time::Duration;
+use tokio::time::sleep;
+use tonic::Status;
+use tracing::{debug, info, warn};
+use uuid::Uuid;
+
+pub struct DeploymentController {
+ storage: Arc,
+ interval: Duration,
+}
+
+#[derive(Debug)]
+struct DeploymentPlan {
+ creates: Vec,
+ deletes: Vec,
+ status: DeploymentStatus,
+}
+
+impl DeploymentController {
+ pub fn new(storage: Arc) -> Self {
+ Self {
+ storage,
+ interval: Duration::from_secs(5),
+ }
+ }
+
+ pub async fn run(self: Arc) {
+ info!(
+ "Deployment controller started ({}s interval)",
+ self.interval.as_secs()
+ );
+
+ loop {
+ if let Err(error) = self.reconcile_all().await {
+ warn!(error = %error, "deployment reconciliation failed");
+ }
+ sleep(self.interval).await;
+ }
+ }
+
+ async fn reconcile_all(&self) -> anyhow::Result<()> {
+ let mut deployments = self.storage.list_all_deployments().await?;
+ deployments.sort_by(|lhs, rhs| {
+ lhs.metadata
+ .org_id
+ .cmp(&rhs.metadata.org_id)
+ .then_with(|| lhs.metadata.project_id.cmp(&rhs.metadata.project_id))
+ .then_with(|| lhs.metadata.namespace.cmp(&rhs.metadata.namespace))
+ .then_with(|| lhs.metadata.name.cmp(&rhs.metadata.name))
+ });
+
+ for deployment in deployments {
+ if let Err(error) = self.reconcile_deployment(deployment).await {
+ warn!(error = %error, "failed to reconcile deployment");
+ }
+ }
+
+ Ok(())
+ }
+
+ async fn reconcile_deployment(&self, deployment: Deployment) -> anyhow::Result<()> {
+ let Some(org_id) = deployment.metadata.org_id.as_deref() else {
+ warn!(deployment = %deployment.metadata.name, "deployment missing org_id");
+ return Ok(());
+ };
+ let Some(project_id) = deployment.metadata.project_id.as_deref() else {
+ warn!(deployment = %deployment.metadata.name, "deployment missing project_id");
+ return Ok(());
+ };
+ let Some(namespace) = deployment.metadata.namespace.as_deref() else {
+ warn!(deployment = %deployment.metadata.name, "deployment missing namespace");
+ return Ok(());
+ };
+
+ let pods = self
+ .storage
+ .list_pods(org_id, project_id, Some(namespace), None)
+ .await?;
+ let plan = plan_deployment_reconciliation(&deployment, &pods)?;
+
+ for pod_name in &plan.deletes {
+ let deleted = self
+ .storage
+ .delete_pod(org_id, project_id, namespace, pod_name)
+ .await?;
+ if deleted {
+ debug!(
+ deployment = %deployment.metadata.name,
+ pod = %pod_name,
+ "deleted deployment-managed pod"
+ );
+ }
+ }
+
+ for pod in &plan.creates {
+ self.storage.put_pod(pod).await?;
+ debug!(
+ deployment = %deployment.metadata.name,
+ pod = %pod.metadata.name,
+ "created deployment-managed pod"
+ );
+ }
+
+ let status_changed = !deployment_status_eq(deployment.status.as_ref(), Some(&plan.status));
+ if !plan.creates.is_empty() || !plan.deletes.is_empty() || status_changed {
+ let mut updated = deployment.clone();
+ updated.status = Some(plan.status);
+ updated.metadata.resource_version = Some(next_resource_version(
+ deployment.metadata.resource_version.as_deref(),
+ ));
+ self.storage.put_deployment(&updated).await?;
+ }
+
+ Ok(())
+ }
+}
+
+fn plan_deployment_reconciliation(
+ deployment: &Deployment,
+ pods: &[Pod],
+) -> Result {
+ DeploymentServiceImpl::validate_spec(deployment)?;
+ let template_hash = deployment_template_hash(deployment)?;
+ let desired_replicas = deployment.spec.replicas.unwrap_or(1).max(0) as usize;
+ let mut owned = pods
+ .iter()
+ .filter(|pod| DeploymentServiceImpl::pod_is_owned_by_deployment(deployment, pod))
+ .cloned()
+ .collect::>();
+ owned.sort_by(|lhs, rhs| lhs.metadata.name.cmp(&rhs.metadata.name));
+
+ let mut current = Vec::new();
+ let mut stale = Vec::new();
+ for pod in owned {
+ let pod_hash = pod
+ .metadata
+ .annotations
+ .get(TEMPLATE_HASH_ANNOTATION)
+ .map(String::as_str);
+ if pod_hash == Some(template_hash.as_str()) {
+ current.push(pod);
+ } else {
+ stale.push(pod);
+ }
+ }
+
+ let mut deletes = stale
+ .iter()
+ .map(|pod| pod.metadata.name.clone())
+ .collect::>();
+ let mut survivors = current;
+ if survivors.len() > desired_replicas {
+ let excess = survivors.split_off(desired_replicas);
+ deletes.extend(excess.into_iter().map(|pod| pod.metadata.name));
+ }
+
+ let used_ordinals = pods
+ .iter()
+ .filter(|pod| DeploymentServiceImpl::pod_is_owned_by_deployment(deployment, pod))
+ .filter_map(|pod| parse_pod_ordinal(&deployment.metadata.name, &pod.metadata.name))
+ .collect::>();
+
+ let ready_replicas = survivors.iter().filter(|pod| pod_is_ready(pod)).count() as i32;
+ let available_replicas = ready_replicas;
+
+ let mut creates = Vec::new();
+ let mut used_ordinals = used_ordinals;
+ while survivors.len() + creates.len() < desired_replicas {
+ let ordinal = next_available_ordinal(&used_ordinals);
+ used_ordinals.insert(ordinal);
+ creates.push(build_pod(deployment, &template_hash, ordinal)?);
+ }
+
+ Ok(DeploymentPlan {
+ creates,
+ deletes,
+ status: DeploymentStatus {
+ replicas: Some(desired_replicas as i32),
+ ready_replicas: Some(ready_replicas),
+ available_replicas: Some(available_replicas),
+ },
+ })
+}
+
+fn build_pod(deployment: &Deployment, template_hash: &str, ordinal: usize) -> Result {
+ let Some(namespace) = deployment.metadata.namespace.clone() else {
+ return Err(Status::invalid_argument("deployment namespace is required"));
+ };
+ let Some(org_id) = deployment.metadata.org_id.clone() else {
+ return Err(Status::invalid_argument("deployment org_id is required"));
+ };
+ let Some(project_id) = deployment.metadata.project_id.clone() else {
+ return Err(Status::invalid_argument(
+ "deployment project_id is required",
+ ));
+ };
+
+ let mut metadata = deployment.spec.template.metadata.clone();
+ metadata.name = format!("{}-{}", deployment.metadata.name, ordinal);
+ metadata.namespace = Some(namespace);
+ metadata.org_id = Some(org_id);
+ metadata.project_id = Some(project_id);
+ metadata.uid = Some(Uuid::new_v4().to_string());
+ metadata.resource_version = Some("1".to_string());
+ metadata.creation_timestamp = Some(Utc::now());
+ metadata.annotations.insert(
+ DEPLOYMENT_NAME_ANNOTATION.to_string(),
+ deployment.metadata.name.clone(),
+ );
+ if let Some(uid) = deployment.metadata.uid.as_ref() {
+ metadata
+ .annotations
+ .insert(DEPLOYMENT_UID_ANNOTATION.to_string(), uid.clone());
+ }
+ metadata.annotations.insert(
+ TEMPLATE_HASH_ANNOTATION.to_string(),
+ template_hash.to_string(),
+ );
+
+ let mut spec = deployment.spec.template.spec.clone();
+ spec.node_name = None;
+
+ Ok(Pod {
+ metadata,
+ spec,
+ status: Some(PodStatus {
+ phase: Some("Pending".to_string()),
+ pod_ip: None,
+ host_ip: None,
+ conditions: Vec::new(),
+ }),
+ })
+}
+
+fn pod_is_ready(pod: &Pod) -> bool {
+ pod.status
+ .as_ref()
+ .and_then(|status| status.phase.as_deref())
+ == Some("Running")
+}
+
+fn parse_pod_ordinal(deployment_name: &str, pod_name: &str) -> Option {
+ let suffix = pod_name.strip_prefix(&format!("{}-", deployment_name))?;
+ suffix.parse::().ok()
+}
+
+fn next_available_ordinal(used: &BTreeSet) -> usize {
+ let mut ordinal = 1usize;
+ while used.contains(&ordinal) {
+ ordinal += 1;
+ }
+ ordinal
+}
+
+fn deployment_status_eq(lhs: Option<&DeploymentStatus>, rhs: Option<&DeploymentStatus>) -> bool {
+ match (lhs, rhs) {
+ (None, None) => true,
+ (Some(lhs), Some(rhs)) => {
+ lhs.replicas == rhs.replicas
+ && lhs.ready_replicas == rhs.ready_replicas
+ && lhs.available_replicas == rhs.available_replicas
+ }
+ _ => false,
+ }
+}
+
+fn next_resource_version(current: Option<&str>) -> String {
+ let current = current
+ .and_then(|version| version.parse::().ok())
+ .unwrap_or(0);
+ (current + 1).to_string()
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::collections::HashMap;
+
+ fn test_deployment() -> Deployment {
+ Deployment {
+ metadata: k8shost_types::ObjectMeta {
+ name: "web".to_string(),
+ namespace: Some("default".to_string()),
+ uid: Some("deployment-uid".to_string()),
+ resource_version: Some("1".to_string()),
+ creation_timestamp: None,
+ labels: HashMap::new(),
+ annotations: HashMap::new(),
+ org_id: Some("test-org".to_string()),
+ project_id: Some("test-project".to_string()),
+ },
+ spec: k8shost_types::DeploymentSpec {
+ replicas: Some(2),
+ selector: k8shost_types::LabelSelector {
+ match_labels: HashMap::from([("app".to_string(), "web".to_string())]),
+ },
+ template: k8shost_types::PodTemplateSpec {
+ metadata: k8shost_types::ObjectMeta {
+ name: "".to_string(),
+ namespace: Some("default".to_string()),
+ uid: None,
+ resource_version: None,
+ creation_timestamp: None,
+ labels: HashMap::from([("app".to_string(), "web".to_string())]),
+ annotations: HashMap::new(),
+ org_id: None,
+ project_id: None,
+ },
+ spec: k8shost_types::PodSpec {
+ containers: vec![k8shost_types::Container {
+ name: "web".to_string(),
+ image: "nginx:latest".to_string(),
+ command: Vec::new(),
+ args: Vec::new(),
+ ports: Vec::new(),
+ env: Vec::new(),
+ resources: None,
+ }],
+ restart_policy: Some("Always".to_string()),
+ node_name: None,
+ },
+ },
+ },
+ status: None,
+ }
+ }
+
+ fn owned_pod(name: &str, hash: &str, phase: &str) -> Pod {
+ Pod {
+ metadata: k8shost_types::ObjectMeta {
+ name: name.to_string(),
+ namespace: Some("default".to_string()),
+ uid: None,
+ resource_version: None,
+ creation_timestamp: None,
+ labels: HashMap::from([("app".to_string(), "web".to_string())]),
+ annotations: HashMap::from([
+ (DEPLOYMENT_NAME_ANNOTATION.to_string(), "web".to_string()),
+ (
+ DEPLOYMENT_UID_ANNOTATION.to_string(),
+ "deployment-uid".to_string(),
+ ),
+ (TEMPLATE_HASH_ANNOTATION.to_string(), hash.to_string()),
+ ]),
+ org_id: Some("test-org".to_string()),
+ project_id: Some("test-project".to_string()),
+ },
+ spec: k8shost_types::PodSpec {
+ containers: Vec::new(),
+ restart_policy: None,
+ node_name: None,
+ },
+ status: Some(PodStatus {
+ phase: Some(phase.to_string()),
+ pod_ip: None,
+ host_ip: None,
+ conditions: Vec::new(),
+ }),
+ }
+ }
+
+ #[test]
+ fn plan_creates_missing_replicas() {
+ let deployment = test_deployment();
+ let plan = plan_deployment_reconciliation(&deployment, &[]).unwrap();
+
+ assert_eq!(plan.creates.len(), 2);
+ assert!(plan.deletes.is_empty());
+ assert_eq!(plan.status.replicas, Some(2));
+ }
+
+ #[test]
+ fn plan_scales_down_extra_pods() {
+ let mut deployment = test_deployment();
+ deployment.spec.replicas = Some(1);
+ let hash = deployment_template_hash(&deployment).unwrap();
+ let pods = vec![
+ owned_pod("web-1", &hash, "Running"),
+ owned_pod("web-2", &hash, "Running"),
+ ];
+
+ let plan = plan_deployment_reconciliation(&deployment, &pods).unwrap();
+
+ assert!(plan.creates.is_empty());
+ assert_eq!(plan.deletes, vec!["web-2".to_string()]);
+ assert_eq!(plan.status.ready_replicas, Some(1));
+ }
+
+ #[test]
+ fn plan_replaces_stale_template_pods() {
+ let deployment = test_deployment();
+ let pods = vec![
+ owned_pod("web-1", "stale", "Running"),
+ owned_pod("web-2", "stale", "Running"),
+ ];
+
+ let plan = plan_deployment_reconciliation(&deployment, &pods).unwrap();
+
+ assert_eq!(plan.creates.len(), 2);
+ assert_eq!(plan.deletes.len(), 2);
+ assert_eq!(plan.status.ready_replicas, Some(0));
+ }
+}
diff --git a/k8shost/crates/k8shost-server/src/fiberlb_controller.rs b/k8shost/crates/k8shost-server/src/fiberlb_controller.rs
index 4de33c3..f639a7c 100644
--- a/k8shost/crates/k8shost-server/src/fiberlb_controller.rs
+++ b/k8shost/crates/k8shost-server/src/fiberlb_controller.rs
@@ -11,8 +11,8 @@ use fiberlb_api::listener_service_client::ListenerServiceClient;
use fiberlb_api::load_balancer_service_client::LoadBalancerServiceClient;
use fiberlb_api::pool_service_client::PoolServiceClient;
use fiberlb_api::{
- CreateBackendRequest, CreateListenerRequest, CreateLoadBalancerRequest,
- CreatePoolRequest, DeleteLoadBalancerRequest, ListenerProtocol, PoolAlgorithm, PoolProtocol,
+ CreateBackendRequest, CreateListenerRequest, CreateLoadBalancerRequest, CreatePoolRequest,
+ DeleteLoadBalancerRequest, ListenerProtocol, PoolAlgorithm, PoolProtocol,
};
use k8shost_types::{LoadBalancerIngress, LoadBalancerStatus, ServiceStatus};
use std::sync::Arc;
@@ -65,7 +65,10 @@ impl FiberLbController {
let tenants = vec![("default-org".to_string(), "default-project".to_string())];
for (org_id, project_id) in tenants {
- if let Err(e) = self.reconcile_tenant_loadbalancers(&org_id, &project_id).await {
+ if let Err(e) = self
+ .reconcile_tenant_loadbalancers(&org_id, &project_id)
+ .await
+ {
warn!(
"Failed to reconcile LoadBalancers for tenant {}/{}: {}",
org_id, project_id, e
@@ -79,10 +82,7 @@ impl FiberLbController {
/// Reconcile LoadBalancer services for a specific tenant
async fn reconcile_tenant_loadbalancers(&self, org_id: &str, project_id: &str) -> Result<()> {
// Get all services for this tenant
- let services = self
- .storage
- .list_services(org_id, project_id, None)
- .await?;
+ let services = self.storage.list_services(org_id, project_id, None).await?;
// Filter for LoadBalancer services that need provisioning
let lb_services: Vec<_> = services
@@ -93,12 +93,19 @@ impl FiberLbController {
// 2. status is None OR status.load_balancer is None (not yet provisioned)
svc.spec.r#type.as_deref() == Some("LoadBalancer")
&& (svc.status.is_none()
- || svc.status.as_ref().and_then(|s| s.load_balancer.as_ref()).is_none())
+ || svc
+ .status
+ .as_ref()
+ .and_then(|s| s.load_balancer.as_ref())
+ .is_none())
})
.collect();
if lb_services.is_empty() {
- debug!("No LoadBalancer services to provision for tenant {}/{}", org_id, project_id);
+ debug!(
+ "No LoadBalancer services to provision for tenant {}/{}",
+ org_id, project_id
+ );
return Ok(());
}
@@ -122,7 +129,10 @@ impl FiberLbController {
match LoadBalancerServiceClient::connect(self.fiberlb_addr.clone()).await {
Ok(client) => client,
Err(e) => {
- warn!("Failed to connect to FiberLB at {}: {}", self.fiberlb_addr, e);
+ warn!(
+ "Failed to connect to FiberLB at {}: {}",
+ self.fiberlb_addr, e
+ );
return Ok(());
}
};
@@ -135,21 +145,23 @@ impl FiberLbController {
}
};
- let mut listener_client = match ListenerServiceClient::connect(self.fiberlb_addr.clone()).await {
- Ok(client) => client,
- Err(e) => {
- warn!("Failed to connect to FiberLB ListenerService: {}", e);
- return Ok(());
- }
- };
+ let mut listener_client =
+ match ListenerServiceClient::connect(self.fiberlb_addr.clone()).await {
+ Ok(client) => client,
+ Err(e) => {
+ warn!("Failed to connect to FiberLB ListenerService: {}", e);
+ return Ok(());
+ }
+ };
- let mut backend_client = match BackendServiceClient::connect(self.fiberlb_addr.clone()).await {
- Ok(client) => client,
- Err(e) => {
- warn!("Failed to connect to FiberLB BackendService: {}", e);
- return Ok(());
- }
- };
+ let mut backend_client =
+ match BackendServiceClient::connect(self.fiberlb_addr.clone()).await {
+ Ok(client) => client,
+ Err(e) => {
+ warn!("Failed to connect to FiberLB BackendService: {}", e);
+ return Ok(());
+ }
+ };
// Provision each LoadBalancer service
for mut service in lb_services {
@@ -160,7 +172,10 @@ impl FiberLbController {
.unwrap_or_else(|| "default".to_string());
let name = service.metadata.name.clone();
- info!("Provisioning LoadBalancer for service {}/{}", namespace, name);
+ info!(
+ "Provisioning LoadBalancer for service {}/{}",
+ namespace, name
+ );
// Create LoadBalancer in FiberLB
let lb_name = format!("{}.{}", name, namespace);
@@ -210,19 +225,25 @@ impl FiberLbController {
// Create Pool for this LoadBalancer
let pool_name = format!("{}-pool", lb_name);
let pool_id = match pool_client
- .create_pool(authorized_request(CreatePoolRequest {
- name: pool_name.clone(),
- loadbalancer_id: lb_id.clone(),
- algorithm: PoolAlgorithm::RoundRobin as i32,
- protocol: PoolProtocol::Tcp as i32,
- session_persistence: None,
- }, &auth_token))
+ .create_pool(authorized_request(
+ CreatePoolRequest {
+ name: pool_name.clone(),
+ loadbalancer_id: lb_id.clone(),
+ algorithm: PoolAlgorithm::RoundRobin as i32,
+ protocol: PoolProtocol::Tcp as i32,
+ session_persistence: None,
+ },
+ &auth_token,
+ ))
.await
{
Ok(response) => {
let pool = response.into_inner().pool;
if let Some(pool) = pool {
- info!("Created Pool {} for service {}/{}", pool.id, namespace, name);
+ info!(
+ "Created Pool {} for service {}/{}",
+ pool.id, namespace, name
+ );
pool.id
} else {
warn!("Failed to create Pool for service {}/{}", namespace, name);
@@ -230,7 +251,10 @@ impl FiberLbController {
}
}
Err(e) => {
- warn!("Failed to create Pool for service {}/{}: {}", namespace, name, e);
+ warn!(
+ "Failed to create Pool for service {}/{}: {}",
+ namespace, name, e
+ );
continue;
}
};
@@ -241,19 +265,25 @@ impl FiberLbController {
let listener_name = format!(
"{}-listener-{}",
lb_name,
- svc_port.name.as_deref().unwrap_or(&svc_port.port.to_string())
+ svc_port
+ .name
+ .as_deref()
+ .unwrap_or(&svc_port.port.to_string())
);
match listener_client
- .create_listener(authorized_request(CreateListenerRequest {
- name: listener_name.clone(),
- loadbalancer_id: lb_id.clone(),
- protocol: ListenerProtocol::Tcp as i32,
- port: svc_port.port as u32,
- default_pool_id: pool_id.clone(),
- tls_config: None,
- connection_limit: 0, // No limit
- }, &auth_token))
+ .create_listener(authorized_request(
+ CreateListenerRequest {
+ name: listener_name.clone(),
+ loadbalancer_id: lb_id.clone(),
+ protocol: ListenerProtocol::Tcp as i32,
+ port: svc_port.port as u32,
+ default_pool_id: pool_id.clone(),
+ tls_config: None,
+ connection_limit: 0, // No limit
+ },
+ &auth_token,
+ ))
.await
{
Ok(response) => {
@@ -327,21 +357,20 @@ impl FiberLbController {
// Use target_port if specified, otherwise use port
let backend_port = svc_port.target_port.unwrap_or(svc_port.port);
- let backend_name = format!(
- "{}-backend-{}-{}",
- lb_name,
- pod.metadata.name,
- backend_port
- );
+ let backend_name =
+ format!("{}-backend-{}-{}", lb_name, pod.metadata.name, backend_port);
match backend_client
- .create_backend(authorized_request(CreateBackendRequest {
- name: backend_name.clone(),
- pool_id: pool_id.clone(),
- address: pod_ip.clone(),
- port: backend_port as u32,
- weight: 1,
- }, &auth_token))
+ .create_backend(authorized_request(
+ CreateBackendRequest {
+ name: backend_name.clone(),
+ pool_id: pool_id.clone(),
+ address: pod_ip.clone(),
+ port: backend_port as u32,
+ weight: 1,
+ },
+ &auth_token,
+ ))
.await
{
Ok(response) => {
@@ -397,10 +426,10 @@ impl FiberLbController {
.metadata
.annotations
.insert("fiberlb.plasmacloud.io/lb-id".to_string(), lb_id.clone());
- service
- .metadata
- .annotations
- .insert("fiberlb.plasmacloud.io/pool-id".to_string(), pool_id.clone());
+ service.metadata.annotations.insert(
+ "fiberlb.plasmacloud.io/pool-id".to_string(),
+ pool_id.clone(),
+ );
// Merge with the latest stored version so the DNS controller does not lose its annotations.
if let Ok(Some(mut current)) = self
@@ -447,9 +476,14 @@ impl FiberLbController {
/// This should be called when a Service with type=LoadBalancer is deleted.
/// For MVP, this is not automatically triggered - would need a deletion watch.
#[allow(dead_code)]
- async fn cleanup_loadbalancer(&self, org_id: &str, project_id: &str, lb_id: &str) -> Result<()> {
- let mut fiberlb_client = LoadBalancerServiceClient::connect(self.fiberlb_addr.clone())
- .await?;
+ async fn cleanup_loadbalancer(
+ &self,
+ org_id: &str,
+ project_id: &str,
+ lb_id: &str,
+ ) -> Result<()> {
+ let mut fiberlb_client =
+ LoadBalancerServiceClient::connect(self.fiberlb_addr.clone()).await?;
let auth_token = issue_controller_token(
&self.iam_server_addr,
CONTROLLER_PRINCIPAL_ID,
diff --git a/k8shost/crates/k8shost-server/src/flashdns_controller.rs b/k8shost/crates/k8shost-server/src/flashdns_controller.rs
index 67a3d7c..05e173e 100644
--- a/k8shost/crates/k8shost-server/src/flashdns_controller.rs
+++ b/k8shost/crates/k8shost-server/src/flashdns_controller.rs
@@ -90,7 +90,10 @@ impl FlashDnsController {
.await?;
// Ensure cluster.local zone exists for this tenant
- let zone_id = match self.ensure_zone_exists(org_id, project_id, &auth_token).await {
+ let zone_id = match self
+ .ensure_zone_exists(org_id, project_id, &auth_token)
+ .await
+ {
Ok(id) => id,
Err(e) => {
warn!(
@@ -102,10 +105,7 @@ impl FlashDnsController {
};
// Get all services for this tenant
- let services = self
- .storage
- .list_services(org_id, project_id, None)
- .await?;
+ let services = self.storage.list_services(org_id, project_id, None).await?;
// Filter for services that need DNS records
let services_needing_dns: Vec<_> = services
@@ -123,7 +123,10 @@ impl FlashDnsController {
.collect();
if services_needing_dns.is_empty() {
- debug!("No services need DNS records for tenant {}/{}", org_id, project_id);
+ debug!(
+ "No services need DNS records for tenant {}/{}",
+ org_id, project_id
+ );
return Ok(());
}
@@ -139,7 +142,10 @@ impl FlashDnsController {
{
Ok(client) => client,
Err(e) => {
- warn!("Failed to connect to FlashDNS at {}: {}", self.flashdns_addr, e);
+ warn!(
+ "Failed to connect to FlashDNS at {}: {}",
+ self.flashdns_addr, e
+ );
return Ok(());
}
};
@@ -375,13 +381,13 @@ impl FlashDnsController {
.into_inner()
.zones
.into_iter()
- .find(|z| {
- z.name.trim_end_matches('.')
- == zone_name.trim_end_matches('.')
- })
+ .find(|z| z.name.trim_end_matches('.') == zone_name.trim_end_matches('.'))
.map(|z| z.id)),
Err(list_error) => {
- debug!("Zone list fallback failed for {}: {}", zone_name, list_error);
+ debug!(
+ "Zone list fallback failed for {}: {}",
+ zone_name, list_error
+ );
Ok(None)
}
}
diff --git a/k8shost/crates/k8shost-server/src/ipam_client.rs b/k8shost/crates/k8shost-server/src/ipam_client.rs
index 7902ea9..c0204de 100644
--- a/k8shost/crates/k8shost-server/src/ipam_client.rs
+++ b/k8shost/crates/k8shost-server/src/ipam_client.rs
@@ -119,14 +119,17 @@ impl IpamClient {
let mut client = self.connect().await?;
self.ensure_default_cluster_ip_pool(&mut client, org_id, project_id, authorization)
.await?;
- let request = Self::with_auth(AllocateServiceIpRequest {
- org_id: org_id.to_string(),
- project_id: project_id.to_string(),
- pool_id: String::new(), // Use default pool
- pool_type: ProtoServiceIpPoolType::ClusterIp as i32,
- service_uid: service_uid.to_string(),
- requested_ip: String::new(), // Auto-allocate
- }, authorization)?;
+ let request = Self::with_auth(
+ AllocateServiceIpRequest {
+ org_id: org_id.to_string(),
+ project_id: project_id.to_string(),
+ pool_id: String::new(), // Use default pool
+ pool_type: ProtoServiceIpPoolType::ClusterIp as i32,
+ service_uid: service_uid.to_string(),
+ requested_ip: String::new(), // Auto-allocate
+ },
+ authorization,
+ )?;
let response = client
.allocate_service_ip(request)
@@ -151,11 +154,14 @@ impl IpamClient {
authorization: Option<&str>,
) -> Result<()> {
let mut client = self.connect().await?;
- let request = Self::with_auth(ReleaseServiceIpRequest {
- org_id: org_id.to_string(),
- project_id: project_id.to_string(),
- ip_address: ip_address.to_string(),
- }, authorization)?;
+ let request = Self::with_auth(
+ ReleaseServiceIpRequest {
+ org_id: org_id.to_string(),
+ project_id: project_id.to_string(),
+ ip_address: ip_address.to_string(),
+ },
+ authorization,
+ )?;
client
.release_service_ip(request)
diff --git a/k8shost/crates/k8shost-server/src/lib.rs b/k8shost/crates/k8shost-server/src/lib.rs
index 9f81c82..aec1de8 100644
--- a/k8shost/crates/k8shost-server/src/lib.rs
+++ b/k8shost/crates/k8shost-server/src/lib.rs
@@ -3,14 +3,16 @@
//! Exports modules for testing and reuse
pub mod auth;
+pub mod deployment_controller;
pub mod ipam_client;
pub mod services {
+ pub mod deployment;
+ pub mod node;
pub mod pod;
pub mod service;
- pub mod node;
}
-pub mod storage;
pub mod config;
pub mod rest;
+pub mod storage;
pub use ipam_client::IpamClient;
diff --git a/k8shost/crates/k8shost-server/src/main.rs b/k8shost/crates/k8shost-server/src/main.rs
index 8b91226..a101aa5 100644
--- a/k8shost/crates/k8shost-server/src/main.rs
+++ b/k8shost/crates/k8shost-server/src/main.rs
@@ -1,6 +1,7 @@
mod auth;
mod cni;
mod config;
+mod deployment_controller;
mod fiberlb_controller;
mod flashdns_controller;
mod ipam_client;
@@ -15,19 +16,19 @@ use chainfire_client::Client as ChainFireClient;
use clap::Parser;
use config::Config;
use ipam_client::IpamClient;
-use metrics_exporter_prometheus::PrometheusBuilder;
use k8shost_proto::{
- deployment_service_server::{DeploymentService, DeploymentServiceServer},
- node_service_server::NodeServiceServer,
- pod_service_server::PodServiceServer,
- service_service_server::ServiceServiceServer,
- *,
+ deployment_service_server::DeploymentServiceServer, node_service_server::NodeServiceServer,
+ pod_service_server::PodServiceServer, service_service_server::ServiceServiceServer,
+};
+use metrics_exporter_prometheus::PrometheusBuilder;
+use services::{
+ deployment::DeploymentServiceImpl, node::NodeServiceImpl, pod::PodServiceImpl,
+ service::ServiceServiceImpl,
};
-use services::{node::NodeServiceImpl, pod::PodServiceImpl, service::ServiceServiceImpl};
-use std::{path::PathBuf, sync::Arc};
use std::time::{SystemTime, UNIX_EPOCH};
+use std::{path::PathBuf, sync::Arc};
use storage::Storage;
-use tonic::{transport::Server, Request, Response, Status};
+use tonic::{transport::Server, Request, Status};
use tracing::{info, warn};
use tracing_subscriber::EnvFilter;
@@ -116,19 +117,27 @@ async fn main() -> Result<(), Box> {
},
flaredb: config::FlareDbConfig {
pd_addr: args.flaredb_pd_addr.or(loaded_config.flaredb.pd_addr),
- direct_addr: args.flaredb_direct_addr.or(loaded_config.flaredb.direct_addr),
+ direct_addr: args
+ .flaredb_direct_addr
+ .or(loaded_config.flaredb.direct_addr),
},
chainfire: config::ChainFireConfig {
endpoint: args.chainfire_endpoint.or(loaded_config.chainfire.endpoint),
},
iam: config::IamConfig {
- server_addr: args.iam_server_addr.unwrap_or(loaded_config.iam.server_addr),
+ server_addr: args
+ .iam_server_addr
+ .unwrap_or(loaded_config.iam.server_addr),
},
fiberlb: config::FiberLbConfig {
- server_addr: args.fiberlb_server_addr.unwrap_or(loaded_config.fiberlb.server_addr),
+ server_addr: args
+ .fiberlb_server_addr
+ .unwrap_or(loaded_config.fiberlb.server_addr),
},
flashdns: config::FlashDnsConfig {
- server_addr: args.flashdns_server_addr.unwrap_or(loaded_config.flashdns.server_addr),
+ server_addr: args
+ .flashdns_server_addr
+ .unwrap_or(loaded_config.flashdns.server_addr),
},
prismnet: config::PrismNetConfig {
server_addr: args
@@ -198,7 +207,7 @@ async fn main() -> Result<(), Box> {
}
} else {
return Err(
- anyhow::anyhow!("Failed to connect to FlareDB (direct): {}", e).into()
+ anyhow::anyhow!("Failed to connect to FlareDB (direct): {}", e).into(),
);
}
}
@@ -211,7 +220,10 @@ async fn main() -> Result<(), Box> {
Arc::new(s)
}
Err(e) => {
- warn!("Failed to connect to FlareDB: {}. Server will not start.", e);
+ warn!(
+ "Failed to connect to FlareDB: {}. Server will not start.",
+ e
+ );
return Err(anyhow::anyhow!("Failed to connect to FlareDB: {}", e).into());
}
}
@@ -228,7 +240,10 @@ async fn main() -> Result<(), Box> {
Arc::new(s)
}
Err(e) => {
- warn!("Failed to connect to IAM server: {}. Server will not start.", e);
+ warn!(
+ "Failed to connect to IAM server: {}. Server will not start.",
+ e
+ );
return Err(anyhow::anyhow!("Failed to connect to IAM server: {}", e).into());
}
};
@@ -262,7 +277,7 @@ async fn main() -> Result<(), Box> {
auth_service.clone(),
));
let node_service = Arc::new(NodeServiceImpl::new(storage.clone(), auth_service.clone()));
- let deployment_service = DeploymentServiceImpl; // Still unimplemented
+ let deployment_service = DeploymentServiceImpl::new(storage.clone(), auth_service.clone());
// Start scheduler in background with CreditService integration
let scheduler = Arc::new(scheduler::Scheduler::new_with_credit_service(storage.clone()).await);
@@ -271,6 +286,14 @@ async fn main() -> Result<(), Box> {
});
info!("Scheduler started - tenant-aware with quota enforcement");
+ let deployment_controller = Arc::new(deployment_controller::DeploymentController::new(
+ storage.clone(),
+ ));
+ tokio::spawn(async move {
+ deployment_controller.run().await;
+ });
+ info!("Deployment controller started - reconciling Deployment resources");
+
// Start FiberLB controller in background
let fiberlb_controller = Arc::new(fiberlb_controller::FiberLbController::new(
storage.clone(),
@@ -280,7 +303,9 @@ async fn main() -> Result<(), Box> {
tokio::spawn(async move {
fiberlb_controller.run().await;
});
- info!("FiberLB controller started - monitoring LoadBalancer services with per-tenant IAM tokens");
+ info!(
+ "FiberLB controller started - monitoring LoadBalancer services with per-tenant IAM tokens"
+ );
// Start FlashDNS controller in background
let flashdns_controller = Arc::new(flashdns_controller::FlashDnsController::new(
@@ -297,25 +322,22 @@ async fn main() -> Result<(), Box> {
// Build gRPC server with authentication layer
let grpc_server = Server::builder()
- .add_service(
- tonic::codegen::InterceptedService::new(
- PodServiceServer::new(pod_service.as_ref().clone()),
- make_interceptor(auth_service.clone()),
- ),
- )
- .add_service(
- tonic::codegen::InterceptedService::new(
- ServiceServiceServer::new(service_service.as_ref().clone()),
- make_interceptor(auth_service.clone()),
- ),
- )
- .add_service(
- tonic::codegen::InterceptedService::new(
- NodeServiceServer::new(node_service.as_ref().clone()),
- make_interceptor(auth_service.clone()),
- ),
- )
- .add_service(DeploymentServiceServer::new(deployment_service))
+ .add_service(tonic::codegen::InterceptedService::new(
+ PodServiceServer::new(pod_service.as_ref().clone()),
+ make_interceptor(auth_service.clone()),
+ ))
+ .add_service(tonic::codegen::InterceptedService::new(
+ ServiceServiceServer::new(service_service.as_ref().clone()),
+ make_interceptor(auth_service.clone()),
+ ))
+ .add_service(tonic::codegen::InterceptedService::new(
+ NodeServiceServer::new(node_service.as_ref().clone()),
+ make_interceptor(auth_service.clone()),
+ ))
+ .add_service(tonic::codegen::InterceptedService::new(
+ DeploymentServiceServer::new(deployment_service),
+ make_interceptor(auth_service.clone()),
+ ))
.serve(config.server.addr);
// HTTP REST API server
@@ -350,59 +372,15 @@ async fn main() -> Result<(), Box> {
Ok(())
}
-// Deployment Service Implementation (placeholder - not part of MVP)
-#[derive(Debug, Default)]
-struct DeploymentServiceImpl;
-
-#[tonic::async_trait]
-impl DeploymentService for DeploymentServiceImpl {
- async fn create_deployment(
- &self,
- _request: Request,
- ) -> Result, Status> {
- Err(Status::unimplemented("create_deployment not yet implemented"))
- }
-
- async fn get_deployment(
- &self,
- _request: Request,
- ) -> Result, Status> {
- Err(Status::unimplemented("get_deployment not yet implemented"))
- }
-
- async fn list_deployments(
- &self,
- _request: Request,
- ) -> Result, Status> {
- Err(Status::unimplemented("list_deployments not yet implemented"))
- }
-
- async fn update_deployment(
- &self,
- _request: Request,
- ) -> Result, Status> {
- Err(Status::unimplemented("update_deployment not yet implemented"))
- }
-
- async fn delete_deployment(
- &self,
- _request: Request,
- ) -> Result, Status> {
- Err(Status::unimplemented("delete_deployment not yet implemented"))
- }
-}
-
fn init_logging(level: &str) {
tracing_subscriber::fmt()
- .with_env_filter(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(level)))
+ .with_env_filter(
+ EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(level)),
+ )
.init();
}
-async fn register_chainfire_membership(
- endpoint: &str,
- service: &str,
- addr: String,
-) -> Result<()> {
+async fn register_chainfire_membership(endpoint: &str, service: &str, addr: String) -> Result<()> {
let node_id =
std::env::var("HOSTNAME").unwrap_or_else(|_| format!("{}-{}", service, std::process::id()));
let ts = SystemTime::now()
diff --git a/k8shost/crates/k8shost-server/src/rest.rs b/k8shost/crates/k8shost-server/src/rest.rs
index 895f063..df997f7 100644
--- a/k8shost/crates/k8shost-server/src/rest.rs
+++ b/k8shost/crates/k8shost-server/src/rest.rs
@@ -11,26 +11,24 @@
use axum::{
extract::{Path, Query, State},
- http::StatusCode,
http::HeaderMap,
- routing::{delete, get, post},
+ http::StatusCode,
+ routing::{delete, get},
Json, Router,
};
use iam_service_auth::{resolve_tenant_ids_from_context, AuthService, TenantContext};
use k8shost_proto::{
- pod_service_server::PodService,
- service_service_server::ServiceService,
- node_service_server::NodeService,
- CreatePodRequest, DeletePodRequest, ListPodsRequest,
- CreateServiceRequest, DeleteServiceRequest, ListServicesRequest,
- ListNodesRequest, Pod as ProtoPod, Service as ProtoService, Node as ProtoNode,
- ObjectMeta, PodSpec, Container, ServiceSpec, ServicePort,
+ node_service_server::NodeService, pod_service_server::PodService,
+ service_service_server::ServiceService, Container, CreatePodRequest, CreateServiceRequest,
+ DeletePodRequest, DeleteServiceRequest, ListNodesRequest, ListPodsRequest, ListServicesRequest,
+ Node as ProtoNode, ObjectMeta, Pod as ProtoPod, PodSpec, Service as ProtoService, ServicePort,
+ ServiceSpec,
};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tonic::{Code, Request};
-use crate::services::{pod::PodServiceImpl, service::ServiceServiceImpl, node::NodeServiceImpl};
+use crate::services::{node::NodeServiceImpl, pod::PodServiceImpl, service::ServiceServiceImpl};
/// REST API state
#[derive(Clone)]
@@ -125,15 +123,28 @@ pub struct PodResponse {
impl From for PodResponse {
fn from(pod: ProtoPod) -> Self {
- let phase = pod.status.as_ref()
+ let phase = pod
+ .status
+ .as_ref()
.and_then(|s| s.phase.clone())
.unwrap_or_else(|| "Unknown".to_string());
let ip = pod.status.as_ref().and_then(|s| s.pod_ip.clone());
- let name = pod.metadata.as_ref().map(|m| m.name.clone()).unwrap_or_default();
- let namespace = pod.metadata.as_ref()
+ let name = pod
+ .metadata
+ .as_ref()
+ .map(|m| m.name.clone())
+ .unwrap_or_default();
+ let namespace = pod
+ .metadata
+ .as_ref()
.and_then(|m| m.namespace.clone())
.unwrap_or_else(|| "default".to_string());
- Self { name, namespace, phase, ip }
+ Self {
+ name,
+ namespace,
+ phase,
+ ip,
+ }
}
}
@@ -156,24 +167,49 @@ pub struct ServicePortResponse {
impl From for ServiceResponse {
fn from(svc: ProtoService) -> Self {
- let ports = svc.spec.as_ref().map(|s| {
- s.ports.iter().map(|p| ServicePortResponse {
- port: p.port,
- target_port: p.target_port.unwrap_or(p.port),
- protocol: p.protocol.clone().unwrap_or_else(|| "TCP".to_string()),
- }).collect()
- }).unwrap_or_default();
+ let ports = svc
+ .spec
+ .as_ref()
+ .map(|s| {
+ s.ports
+ .iter()
+ .map(|p| ServicePortResponse {
+ port: p.port,
+ target_port: p.target_port.unwrap_or(p.port),
+ protocol: p.protocol.clone().unwrap_or_else(|| "TCP".to_string()),
+ })
+ .collect()
+ })
+ .unwrap_or_default();
- let name = svc.metadata.as_ref().map(|m| m.name.clone()).unwrap_or_default();
- let namespace = svc.metadata.as_ref()
+ let name = svc
+ .metadata
+ .as_ref()
+ .map(|m| m.name.clone())
+ .unwrap_or_default();
+ let namespace = svc
+ .metadata
+ .as_ref()
.and_then(|m| m.namespace.clone())
.unwrap_or_else(|| "default".to_string());
- let service_type = svc.spec.as_ref()
+ let service_type = svc
+ .spec
+ .as_ref()
.and_then(|s| s.r#type.clone())
.unwrap_or_else(|| "ClusterIP".to_string());
- let cluster_ip = svc.spec.as_ref().and_then(|s| s.cluster_ip.as_ref()).cloned();
+ let cluster_ip = svc
+ .spec
+ .as_ref()
+ .and_then(|s| s.cluster_ip.as_ref())
+ .cloned();
- Self { name, namespace, service_type, cluster_ip, ports }
+ Self {
+ name,
+ namespace,
+ service_type,
+ cluster_ip,
+ ports,
+ }
}
}
@@ -188,13 +224,34 @@ pub struct NodeResponse {
impl From for NodeResponse {
fn from(node: ProtoNode) -> Self {
- let ready = node.status.as_ref()
- .map(|s| s.conditions.iter().any(|c| c.r#type == "Ready" && c.status == "True"))
+ let ready = node
+ .status
+ .as_ref()
+ .map(|s| {
+ s.conditions
+ .iter()
+ .any(|c| c.r#type == "Ready" && c.status == "True")
+ })
.unwrap_or(false);
- let name = node.metadata.as_ref().map(|m| m.name.clone()).unwrap_or_default();
- let cpu_capacity = node.status.as_ref().and_then(|s| s.capacity.get("cpu").cloned());
- let memory_capacity = node.status.as_ref().and_then(|s| s.capacity.get("memory").cloned());
- Self { name, ready, cpu_capacity, memory_capacity }
+ let name = node
+ .metadata
+ .as_ref()
+ .map(|m| m.name.clone())
+ .unwrap_or_default();
+ let cpu_capacity = node
+ .status
+ .as_ref()
+ .and_then(|s| s.capacity.get("cpu").cloned());
+ let memory_capacity = node
+ .status
+ .as_ref()
+ .and_then(|s| s.capacity.get("memory").cloned());
+ Self {
+ name,
+ ready,
+ cpu_capacity,
+ memory_capacity,
+ }
}
}
@@ -222,7 +279,10 @@ pub fn build_router(state: RestApiState) -> Router {
.route("/api/v1/pods", get(list_pods).post(create_pod))
.route("/api/v1/pods/{namespace}/{name}", delete(delete_pod))
.route("/api/v1/services", get(list_services).post(create_service))
- .route("/api/v1/services/{namespace}/{name}", delete(delete_service))
+ .route(
+ "/api/v1/services/{namespace}/{name}",
+ delete(delete_service),
+ )
.route("/api/v1/nodes", get(list_nodes))
.route("/health", get(health_check))
.with_state(state)
@@ -232,7 +292,9 @@ pub fn build_router(state: RestApiState) -> Router {
async fn health_check() -> (StatusCode, Json>) {
(
StatusCode::OK,
- Json(SuccessResponse::new(serde_json::json!({ "status": "healthy" }))),
+ Json(SuccessResponse::new(
+ serde_json::json!({ "status": "healthy" }),
+ )),
)
}
@@ -249,11 +311,20 @@ async fn list_pods(
});
req.extensions_mut().insert(tenant);
- let response = state.pod_service.list_pods(req)
- .await
- .map_err(|e| error_response(StatusCode::INTERNAL_SERVER_ERROR, "LIST_FAILED", &e.message()))?;
+ let response = state.pod_service.list_pods(req).await.map_err(|e| {
+ error_response(
+ StatusCode::INTERNAL_SERVER_ERROR,
+ "LIST_FAILED",
+ &e.message(),
+ )
+ })?;
- let pods: Vec = response.into_inner().items.into_iter().map(PodResponse::from).collect();
+ let pods: Vec = response
+ .into_inner()
+ .items
+ .into_iter()
+ .map(PodResponse::from)
+ .collect();
Ok(Json(SuccessResponse::new(PodsResponse { pods })))
}
@@ -297,12 +368,21 @@ async fn create_pod(
});
grpc_req.extensions_mut().insert(tenant);
- let response = state.pod_service.create_pod(grpc_req)
- .await
- .map_err(|e| error_response(StatusCode::INTERNAL_SERVER_ERROR, "CREATE_FAILED", &e.message()))?;
+ let response = state.pod_service.create_pod(grpc_req).await.map_err(|e| {
+ error_response(
+ StatusCode::INTERNAL_SERVER_ERROR,
+ "CREATE_FAILED",
+ &e.message(),
+ )
+ })?;
- let pod = response.into_inner().pod
- .ok_or_else(|| error_response(StatusCode::INTERNAL_SERVER_ERROR, "CREATE_FAILED", "No pod returned"))?;
+ let pod = response.into_inner().pod.ok_or_else(|| {
+ error_response(
+ StatusCode::INTERNAL_SERVER_ERROR,
+ "CREATE_FAILED",
+ "No pod returned",
+ )
+ })?;
Ok((
StatusCode::CREATED,
@@ -315,7 +395,8 @@ async fn delete_pod(
State(state): State,
Path((namespace, name)): Path<(String, String)>,
headers: HeaderMap,
-) -> Result<(StatusCode, Json>), (StatusCode, Json)> {
+) -> Result<(StatusCode, Json>), (StatusCode, Json)>
+{
let tenant = resolve_rest_tenant(&state, &headers).await?;
let mut req = Request::new(DeletePodRequest {
name: name.clone(),
@@ -323,13 +404,19 @@ async fn delete_pod(
});
req.extensions_mut().insert(tenant);
- state.pod_service.delete_pod(req)
- .await
- .map_err(|e| error_response(StatusCode::INTERNAL_SERVER_ERROR, "DELETE_FAILED", &e.message()))?;
+ state.pod_service.delete_pod(req).await.map_err(|e| {
+ error_response(
+ StatusCode::INTERNAL_SERVER_ERROR,
+ "DELETE_FAILED",
+ &e.message(),
+ )
+ })?;
Ok((
StatusCode::OK,
- Json(SuccessResponse::new(serde_json::json!({ "name": name, "namespace": namespace, "deleted": true }))),
+ Json(SuccessResponse::new(
+ serde_json::json!({ "name": name, "namespace": namespace, "deleted": true }),
+ )),
))
}
@@ -345,11 +432,24 @@ async fn list_services(
});
req.extensions_mut().insert(tenant);
- let response = state.service_service.list_services(req)
+ let response = state
+ .service_service
+ .list_services(req)
.await
- .map_err(|e| error_response(StatusCode::INTERNAL_SERVER_ERROR, "LIST_FAILED", &e.message()))?;
+ .map_err(|e| {
+ error_response(
+ StatusCode::INTERNAL_SERVER_ERROR,
+ "LIST_FAILED",
+ &e.message(),
+ )
+ })?;
- let services: Vec = response.into_inner().items.into_iter().map(ServiceResponse::from).collect();
+ let services: Vec = response
+ .into_inner()
+ .items
+ .into_iter()
+ .map(ServiceResponse::from)
+ .collect();
Ok(Json(SuccessResponse::new(ServicesResponse { services })))
}
@@ -359,7 +459,8 @@ async fn create_service(
State(state): State,
headers: HeaderMap,
Json(req): Json,
-) -> Result<(StatusCode, Json>), (StatusCode, Json)> {
+) -> Result<(StatusCode, Json>), (StatusCode, Json)>
+{
let tenant = resolve_rest_tenant(&state, &headers).await?;
let namespace = req.namespace.unwrap_or_else(|| "default".to_string());
let service_type = req.service_type.unwrap_or_else(|| "ClusterIP".to_string());
@@ -393,12 +494,25 @@ async fn create_service(
});
grpc_req.extensions_mut().insert(tenant);
- let response = state.service_service.create_service(grpc_req)
+ let response = state
+ .service_service
+ .create_service(grpc_req)
.await
- .map_err(|e| error_response(StatusCode::INTERNAL_SERVER_ERROR, "CREATE_FAILED", &e.message()))?;
+ .map_err(|e| {
+ error_response(
+ StatusCode::INTERNAL_SERVER_ERROR,
+ "CREATE_FAILED",
+ &e.message(),
+ )
+ })?;
- let service = response.into_inner().service
- .ok_or_else(|| error_response(StatusCode::INTERNAL_SERVER_ERROR, "CREATE_FAILED", "No service returned"))?;
+ let service = response.into_inner().service.ok_or_else(|| {
+ error_response(
+ StatusCode::INTERNAL_SERVER_ERROR,
+ "CREATE_FAILED",
+ "No service returned",
+ )
+ })?;
Ok((
StatusCode::CREATED,
@@ -411,7 +525,8 @@ async fn delete_service(
State(state): State,
Path((namespace, name)): Path<(String, String)>,
headers: HeaderMap,
-) -> Result<(StatusCode, Json>), (StatusCode, Json)> {
+) -> Result<(StatusCode, Json>), (StatusCode, Json)>
+{
let tenant = resolve_rest_tenant(&state, &headers).await?;
let mut req = Request::new(DeleteServiceRequest {
name: name.clone(),
@@ -419,13 +534,23 @@ async fn delete_service(
});
req.extensions_mut().insert(tenant);
- state.service_service.delete_service(req)
+ state
+ .service_service
+ .delete_service(req)
.await
- .map_err(|e| error_response(StatusCode::INTERNAL_SERVER_ERROR, "DELETE_FAILED", &e.message()))?;
+ .map_err(|e| {
+ error_response(
+ StatusCode::INTERNAL_SERVER_ERROR,
+ "DELETE_FAILED",
+ &e.message(),
+ )
+ })?;
Ok((
StatusCode::OK,
- Json(SuccessResponse::new(serde_json::json!({ "name": name, "namespace": namespace, "deleted": true }))),
+ Json(SuccessResponse::new(
+ serde_json::json!({ "name": name, "namespace": namespace, "deleted": true }),
+ )),
))
}
@@ -438,11 +563,20 @@ async fn list_nodes(
let mut req = Request::new(ListNodesRequest {});
req.extensions_mut().insert(tenant);
- let response = state.node_service.list_nodes(req)
- .await
- .map_err(|e| error_response(StatusCode::INTERNAL_SERVER_ERROR, "LIST_FAILED", &e.message()))?;
+ let response = state.node_service.list_nodes(req).await.map_err(|e| {
+ error_response(
+ StatusCode::INTERNAL_SERVER_ERROR,
+ "LIST_FAILED",
+ &e.message(),
+ )
+ })?;
- let nodes: Vec = response.into_inner().items.into_iter().map(NodeResponse::from).collect();
+ let nodes: Vec = response
+ .into_inner()
+ .items
+ .into_iter()
+ .map(NodeResponse::from)
+ .collect();
Ok(Json(SuccessResponse::new(NodesResponse { nodes })))
}
diff --git a/k8shost/crates/k8shost-server/src/scheduler.rs b/k8shost/crates/k8shost-server/src/scheduler.rs
index ca411c3..b4c6594 100644
--- a/k8shost/crates/k8shost-server/src/scheduler.rs
+++ b/k8shost/crates/k8shost-server/src/scheduler.rs
@@ -38,7 +38,10 @@ impl Scheduler {
let credit_service = match std::env::var("CREDITSERVICE_ENDPOINT") {
Ok(endpoint) => match CreditServiceClient::connect(&endpoint).await {
Ok(client) => {
- info!("Scheduler: CreditService quota enforcement enabled: {}", endpoint);
+ info!(
+ "Scheduler: CreditService quota enforcement enabled: {}",
+ endpoint
+ );
Some(Arc::new(RwLock::new(client)))
}
Err(e) => {
@@ -64,7 +67,10 @@ impl Scheduler {
/// Start the scheduler loop
pub async fn run(self: Arc) {
- info!("Scheduler started (spread algorithm, {}s interval)", self.interval.as_secs());
+ info!(
+ "Scheduler started (spread algorithm, {}s interval)",
+ self.interval.as_secs()
+ );
loop {
if let Err(e) = self.schedule_pending_pods().await {
@@ -103,14 +109,10 @@ impl Scheduler {
async fn get_active_tenants(&self) -> anyhow::Result> {
// Query all pods to find unique (org_id, project_id) combinations
// This is a pragmatic approach that doesn't require IAM changes
- let all_pods = self
- .storage
- .list_all_pods()
- .await
- .unwrap_or_else(|e| {
- warn!("Failed to query all pods for tenant discovery: {}", e);
- vec![]
- });
+ let all_pods = self.storage.list_all_pods().await.unwrap_or_else(|e| {
+ warn!("Failed to query all pods for tenant discovery: {}", e);
+ vec![]
+ });
let mut tenants: HashSet<(String, String)> = HashSet::new();
@@ -133,7 +135,10 @@ impl Scheduler {
/// Schedule pending pods for a specific tenant
async fn schedule_tenant_pods(&self, org_id: &str, project_id: &str) -> anyhow::Result<()> {
// Get all pods in all namespaces for this tenant
- let all_pods = self.storage.list_pods(org_id, project_id, None, None).await?;
+ let all_pods = self
+ .storage
+ .list_pods(org_id, project_id, None, None)
+ .await?;
// Filter to pending pods that need scheduling
let pending_pods: Vec = all_pods
@@ -157,15 +162,23 @@ impl Scheduler {
return Ok(());
}
- info!("Scheduling {} pending pod(s) for tenant {}/{}",
- pending_pods.len(), org_id, project_id);
+ info!(
+ "Scheduling {} pending pod(s) for tenant {}/{}",
+ pending_pods.len(),
+ org_id,
+ project_id
+ );
// Get all nodes for this tenant
let nodes = self.storage.list_nodes(org_id, project_id).await?;
if nodes.is_empty() {
- warn!("No nodes available for tenant {}/{}. {} pod(s) remain pending.",
- org_id, project_id, pending_pods.len());
+ warn!(
+ "No nodes available for tenant {}/{}. {} pod(s) remain pending.",
+ org_id,
+ project_id,
+ pending_pods.len()
+ );
return Ok(());
}
@@ -176,15 +189,21 @@ impl Scheduler {
.collect();
if ready_nodes.is_empty() {
- warn!("No ready nodes available for tenant {}/{}. {} pod(s) remain pending.",
- org_id, project_id, pending_pods.len());
+ warn!(
+ "No ready nodes available for tenant {}/{}. {} pod(s) remain pending.",
+ org_id,
+ project_id,
+ pending_pods.len()
+ );
return Ok(());
}
info!("Found {} ready node(s) for scheduling", ready_nodes.len());
// Get current pod count per node for spread algorithm
- let pod_counts = self.count_pods_per_node(org_id, project_id, &ready_nodes).await?;
+ let pod_counts = self
+ .count_pods_per_node(org_id, project_id, &ready_nodes)
+ .await?;
// Schedule each pending pod
for pod in pending_pods {
@@ -233,9 +252,10 @@ impl Scheduler {
node.status
.as_ref()
.map(|status| {
- status.conditions.iter().any(|cond| {
- cond.r#type == "Ready" && cond.status == "True"
- })
+ status
+ .conditions
+ .iter()
+ .any(|cond| cond.r#type == "Ready" && cond.status == "True")
})
.unwrap_or(false)
}
@@ -247,13 +267,14 @@ impl Scheduler {
project_id: &str,
nodes: &[Node],
) -> anyhow::Result> {
- let mut counts: HashMap = nodes
- .iter()
- .map(|n| (n.metadata.name.clone(), 0))
- .collect();
+ let mut counts: HashMap =
+ nodes.iter().map(|n| (n.metadata.name.clone(), 0)).collect();
// Get all assigned pods
- let all_pods = self.storage.list_pods(org_id, project_id, None, None).await?;
+ let all_pods = self
+ .storage
+ .list_pods(org_id, project_id, None, None)
+ .await?;
// Count pods per node
for pod in all_pods {
@@ -324,12 +345,7 @@ impl Scheduler {
// Check if tenant has sufficient quota
use creditservice_client::ResourceType;
match client
- .check_quota(
- project_id,
- ResourceType::K8sNode,
- 1,
- estimated_cost as i64,
- )
+ .check_quota(project_id, ResourceType::K8sNode, 1, estimated_cost as i64)
.await
{
Ok(response) if !response.allowed => {
@@ -389,10 +405,7 @@ impl Scheduler {
/// Parse memory string to GB (e.g., "512Mi" -> 0.5, "2Gi" -> 2.0)
fn parse_memory_to_gb(memory: &str) -> Option {
if memory.ends_with("Gi") {
- memory
- .trim_end_matches("Gi")
- .parse::()
- .ok()
+ memory.trim_end_matches("Gi").parse::().ok()
} else if memory.ends_with("Mi") {
memory
.trim_end_matches("Mi")
@@ -407,7 +420,10 @@ impl Scheduler {
.map(|ki| ki / (1024.0 * 1024.0))
} else {
// Assume bytes
- memory.parse::().ok().map(|bytes| bytes / (1024.0 * 1024.0 * 1024.0))
+ memory
+ .parse::()
+ .ok()
+ .map(|bytes| bytes / (1024.0 * 1024.0 * 1024.0))
}
}
}
@@ -419,7 +435,11 @@ mod tests {
#[tokio::test]
async fn test_is_node_ready() {
- let storage = Arc::new(Storage::new("memory://test".to_string()).await.expect("Failed to create storage"));
+ let storage = Arc::new(
+ Storage::new("memory://test".to_string())
+ .await
+ .expect("Failed to create storage"),
+ );
let scheduler = Scheduler::new(storage);
// Node with Ready=True condition
@@ -468,7 +488,11 @@ mod tests {
#[tokio::test]
async fn test_select_node_spread() {
- let storage = Arc::new(Storage::new("memory://test".to_string()).await.expect("Failed to create storage"));
+ let storage = Arc::new(
+ Storage::new("memory://test".to_string())
+ .await
+ .expect("Failed to create storage"),
+ );
let scheduler = Scheduler::new(storage);
let node1 = Node {
diff --git a/k8shost/crates/k8shost-server/src/services/deployment.rs b/k8shost/crates/k8shost-server/src/services/deployment.rs
new file mode 100644
index 0000000..667f8df
--- /dev/null
+++ b/k8shost/crates/k8shost-server/src/services/deployment.rs
@@ -0,0 +1,696 @@
+//! Deployment service implementation.
+//!
+//! Provides CRUD for Deployment resources and treats them as first-class
+//! tenant-scoped objects persisted in FlareDB.
+
+use crate::auth::{
+ get_tenant_context, resolve_tenant_ids_from_context, resource_for_tenant, AuthService,
+};
+use crate::storage::Storage;
+use chrono::Utc;
+use k8shost_proto::{
+ deployment_service_server::DeploymentService, CreateDeploymentRequest,
+ CreateDeploymentResponse, DeleteDeploymentRequest, DeleteDeploymentResponse,
+ GetDeploymentRequest, GetDeploymentResponse, ListDeploymentsRequest, ListDeploymentsResponse,
+ UpdateDeploymentRequest, UpdateDeploymentResponse,
+};
+use sha2::{Digest, Sha256};
+use std::sync::Arc;
+use tonic::{Request, Response, Status};
+use uuid::Uuid;
+
+const ACTION_DEPLOYMENT_CREATE: &str = "k8s:deployments:create";
+const ACTION_DEPLOYMENT_READ: &str = "k8s:deployments:read";
+const ACTION_DEPLOYMENT_LIST: &str = "k8s:deployments:list";
+const ACTION_DEPLOYMENT_UPDATE: &str = "k8s:deployments:update";
+const ACTION_DEPLOYMENT_DELETE: &str = "k8s:deployments:delete";
+
+pub(crate) const DEPLOYMENT_NAME_ANNOTATION: &str = "k8shost.photoncloud.io/deployment-name";
+pub(crate) const DEPLOYMENT_UID_ANNOTATION: &str = "k8shost.photoncloud.io/deployment-uid";
+pub(crate) const TEMPLATE_HASH_ANNOTATION: &str = "k8shost.photoncloud.io/template-hash";
+
+#[derive(Clone)]
+pub struct DeploymentServiceImpl {
+ storage: Arc,
+ auth: Arc,
+}
+
+impl DeploymentServiceImpl {
+ pub fn new(storage: Arc, auth: Arc) -> Self {
+ Self { storage, auth }
+ }
+
+ pub fn to_proto_deployment(
+ deployment: &k8shost_types::Deployment,
+ ) -> k8shost_proto::Deployment {
+ k8shost_proto::Deployment {
+ metadata: Some(to_proto_meta(&deployment.metadata)),
+ spec: Some(k8shost_proto::DeploymentSpec {
+ replicas: deployment.spec.replicas,
+ selector: Some(k8shost_proto::LabelSelector {
+ match_labels: deployment.spec.selector.match_labels.clone(),
+ }),
+ template: Some(k8shost_proto::PodTemplateSpec {
+ metadata: Some(to_proto_meta(&deployment.spec.template.metadata)),
+ spec: Some(to_proto_pod_spec(&deployment.spec.template.spec)),
+ }),
+ }),
+ status: deployment
+ .status
+ .as_ref()
+ .map(|status| k8shost_proto::DeploymentStatus {
+ replicas: status.replicas,
+ ready_replicas: status.ready_replicas,
+ available_replicas: status.available_replicas,
+ }),
+ }
+ }
+
+ pub fn from_proto_deployment(
+ deployment: &k8shost_proto::Deployment,
+ ) -> Result {
+ let metadata = deployment
+ .metadata
+ .as_ref()
+ .ok_or_else(|| Status::invalid_argument("metadata is required"))?;
+ let spec = deployment
+ .spec
+ .as_ref()
+ .ok_or_else(|| Status::invalid_argument("spec is required"))?;
+ let selector = spec
+ .selector
+ .as_ref()
+ .ok_or_else(|| Status::invalid_argument("selector is required"))?;
+ let template = spec
+ .template
+ .as_ref()
+ .ok_or_else(|| Status::invalid_argument("template is required"))?;
+ let template_meta = template
+ .metadata
+ .as_ref()
+ .ok_or_else(|| Status::invalid_argument("template.metadata is required"))?;
+ let template_spec = template
+ .spec
+ .as_ref()
+ .ok_or_else(|| Status::invalid_argument("template.spec is required"))?;
+
+ Ok(k8shost_types::Deployment {
+ metadata: from_proto_meta(metadata)?,
+ spec: k8shost_types::DeploymentSpec {
+ replicas: spec.replicas,
+ selector: k8shost_types::LabelSelector {
+ match_labels: selector.match_labels.clone(),
+ },
+ template: k8shost_types::PodTemplateSpec {
+ metadata: from_proto_meta(template_meta)?,
+ spec: from_proto_pod_spec(template_spec),
+ },
+ },
+ status: deployment
+ .status
+ .as_ref()
+ .map(|status| k8shost_types::DeploymentStatus {
+ replicas: status.replicas,
+ ready_replicas: status.ready_replicas,
+ available_replicas: status.available_replicas,
+ }),
+ })
+ }
+
+ pub(crate) fn validate_spec(deployment: &k8shost_types::Deployment) -> Result<(), Status> {
+ if deployment.metadata.namespace.is_none() {
+ return Err(Status::invalid_argument("namespace is required"));
+ }
+ if deployment.spec.replicas.unwrap_or(1) < 0 {
+ return Err(Status::invalid_argument("replicas must be >= 0"));
+ }
+ if deployment.spec.selector.match_labels.is_empty() {
+ return Err(Status::invalid_argument(
+ "selector.match_labels must not be empty",
+ ));
+ }
+ if deployment.spec.template.spec.containers.is_empty() {
+ return Err(Status::invalid_argument(
+ "template.spec.containers must not be empty",
+ ));
+ }
+ let template_labels = &deployment.spec.template.metadata.labels;
+ let selector_matches_template = deployment
+ .spec
+ .selector
+ .match_labels
+ .iter()
+ .all(|(key, value)| template_labels.get(key) == Some(value));
+ if !selector_matches_template {
+ return Err(Status::invalid_argument(
+ "selector.match_labels must be present in template.metadata.labels",
+ ));
+ }
+ Ok(())
+ }
+
+ pub(crate) fn pod_is_owned_by_deployment(
+ deployment: &k8shost_types::Deployment,
+ pod: &k8shost_types::Pod,
+ ) -> bool {
+ let annotations = &pod.metadata.annotations;
+ if let Some(uid) = deployment.metadata.uid.as_deref() {
+ if annotations
+ .get(DEPLOYMENT_UID_ANNOTATION)
+ .map(String::as_str)
+ == Some(uid)
+ {
+ return true;
+ }
+ }
+ annotations
+ .get(DEPLOYMENT_NAME_ANNOTATION)
+ .map(String::as_str)
+ == Some(deployment.metadata.name.as_str())
+ }
+}
+
+pub(crate) fn deployment_template_hash(
+ deployment: &k8shost_types::Deployment,
+) -> Result {
+ let payload = serde_json::to_vec(&deployment.spec.template)
+ .map_err(|e| Status::internal(format!("failed to hash deployment template: {}", e)))?;
+ let mut hasher = Sha256::new();
+ hasher.update(payload);
+ Ok(format!("{:x}", hasher.finalize()))
+}
+
+fn default_status() -> k8shost_types::DeploymentStatus {
+ k8shost_types::DeploymentStatus {
+ replicas: Some(0),
+ ready_replicas: Some(0),
+ available_replicas: Some(0),
+ }
+}
+
+fn next_resource_version(current: Option<&str>) -> String {
+ let current = current
+ .and_then(|version| version.parse::().ok())
+ .unwrap_or(0);
+ (current + 1).to_string()
+}
+
+fn to_proto_meta(meta: &k8shost_types::ObjectMeta) -> k8shost_proto::ObjectMeta {
+ k8shost_proto::ObjectMeta {
+ name: meta.name.clone(),
+ namespace: meta.namespace.clone(),
+ uid: meta.uid.clone(),
+ resource_version: meta.resource_version.clone(),
+ creation_timestamp: meta.creation_timestamp.map(|ts| ts.to_rfc3339()),
+ labels: meta.labels.clone(),
+ annotations: meta.annotations.clone(),
+ org_id: meta.org_id.clone(),
+ project_id: meta.project_id.clone(),
+ }
+}
+
+fn from_proto_meta(meta: &k8shost_proto::ObjectMeta) -> Result {
+ Ok(k8shost_types::ObjectMeta {
+ name: meta.name.clone(),
+ namespace: meta.namespace.clone(),
+ uid: meta.uid.clone(),
+ resource_version: meta.resource_version.clone(),
+ creation_timestamp: meta
+ .creation_timestamp
+ .as_ref()
+ .and_then(|ts| chrono::DateTime::parse_from_rfc3339(ts).ok())
+ .map(|ts| ts.with_timezone(&Utc)),
+ labels: meta.labels.clone(),
+ annotations: meta.annotations.clone(),
+ org_id: meta.org_id.clone(),
+ project_id: meta.project_id.clone(),
+ })
+}
+
+fn to_proto_pod_spec(spec: &k8shost_types::PodSpec) -> k8shost_proto::PodSpec {
+ k8shost_proto::PodSpec {
+ containers: spec
+ .containers
+ .iter()
+ .map(|container| k8shost_proto::Container {
+ name: container.name.clone(),
+ image: container.image.clone(),
+ command: container.command.clone(),
+ args: container.args.clone(),
+ ports: container
+ .ports
+ .iter()
+ .map(|port| k8shost_proto::ContainerPort {
+ name: port.name.clone(),
+ container_port: port.container_port,
+ protocol: port.protocol.clone(),
+ })
+ .collect(),
+ env: container
+ .env
+ .iter()
+ .map(|env| k8shost_proto::EnvVar {
+ name: env.name.clone(),
+ value: env.value.clone(),
+ })
+ .collect(),
+ })
+ .collect(),
+ restart_policy: spec.restart_policy.clone(),
+ node_name: spec.node_name.clone(),
+ }
+}
+
+fn from_proto_pod_spec(spec: &k8shost_proto::PodSpec) -> k8shost_types::PodSpec {
+ k8shost_types::PodSpec {
+ containers: spec
+ .containers
+ .iter()
+ .map(|container| k8shost_types::Container {
+ name: container.name.clone(),
+ image: container.image.clone(),
+ command: container.command.clone(),
+ args: container.args.clone(),
+ ports: container
+ .ports
+ .iter()
+ .map(|port| k8shost_types::ContainerPort {
+ name: port.name.clone(),
+ container_port: port.container_port,
+ protocol: port.protocol.clone(),
+ })
+ .collect(),
+ env: container
+ .env
+ .iter()
+ .map(|env| k8shost_types::EnvVar {
+ name: env.name.clone(),
+ value: env.value.clone(),
+ })
+ .collect(),
+ resources: None,
+ })
+ .collect(),
+ restart_policy: spec.restart_policy.clone(),
+ node_name: spec.node_name.clone(),
+ }
+}
+
+#[tonic::async_trait]
+impl DeploymentService for DeploymentServiceImpl {
+ async fn create_deployment(
+ &self,
+ request: Request,
+ ) -> Result, Status> {
+ let tenant = get_tenant_context(&request)?;
+ let req = request.into_inner();
+ let proto_deployment = req
+ .deployment
+ .ok_or_else(|| Status::invalid_argument("deployment is required"))?;
+ let mut deployment = Self::from_proto_deployment(&proto_deployment)?;
+
+ let (org_id, project_id) = resolve_tenant_ids_from_context(
+ &tenant,
+ deployment.metadata.org_id.as_deref().unwrap_or(""),
+ deployment.metadata.project_id.as_deref().unwrap_or(""),
+ )?;
+ deployment.metadata.org_id = Some(org_id.clone());
+ deployment.metadata.project_id = Some(project_id.clone());
+ Self::validate_spec(&deployment)?;
+
+ let deployment_key = format!(
+ "{}/{}",
+ deployment
+ .metadata
+ .namespace
+ .as_deref()
+ .unwrap_or("default"),
+ deployment.metadata.name
+ );
+ self.auth
+ .authorize(
+ &tenant,
+ ACTION_DEPLOYMENT_CREATE,
+ &resource_for_tenant("deployment", deployment_key, &org_id, &project_id),
+ )
+ .await?;
+
+ let namespace = deployment
+ .metadata
+ .namespace
+ .as_deref()
+ .ok_or_else(|| Status::invalid_argument("namespace is required"))?;
+ if self
+ .storage
+ .get_deployment(&org_id, &project_id, namespace, &deployment.metadata.name)
+ .await?
+ .is_some()
+ {
+ return Err(Status::already_exists(format!(
+ "Deployment {} already exists",
+ deployment.metadata.name
+ )));
+ }
+
+ if deployment.metadata.uid.is_none() {
+ deployment.metadata.uid = Some(Uuid::new_v4().to_string());
+ }
+ if deployment.metadata.creation_timestamp.is_none() {
+ deployment.metadata.creation_timestamp = Some(Utc::now());
+ }
+ deployment.metadata.resource_version = Some("1".to_string());
+ deployment.status = Some(default_status());
+
+ self.storage.put_deployment(&deployment).await?;
+
+ Ok(Response::new(CreateDeploymentResponse {
+ deployment: Some(Self::to_proto_deployment(&deployment)),
+ }))
+ }
+
+ async fn get_deployment(
+ &self,
+ request: Request,
+ ) -> Result, Status> {
+ let tenant = get_tenant_context(&request)?;
+ let req = request.into_inner();
+ let deployment_key = format!("{}/{}", req.namespace, req.name);
+ self.auth
+ .authorize(
+ &tenant,
+ ACTION_DEPLOYMENT_READ,
+ &resource_for_tenant(
+ "deployment",
+ deployment_key,
+ &tenant.org_id,
+ &tenant.project_id,
+ ),
+ )
+ .await?;
+
+ let deployment = self
+ .storage
+ .get_deployment(
+ &tenant.org_id,
+ &tenant.project_id,
+ &req.namespace,
+ &req.name,
+ )
+ .await?;
+
+ match deployment {
+ Some(deployment) => Ok(Response::new(GetDeploymentResponse {
+ deployment: Some(Self::to_proto_deployment(&deployment)),
+ })),
+ None => Err(Status::not_found(format!(
+ "Deployment {} not found",
+ req.name
+ ))),
+ }
+ }
+
+ async fn list_deployments(
+ &self,
+ request: Request,
+ ) -> Result, Status> {
+ let tenant = get_tenant_context(&request)?;
+ self.auth
+ .authorize(
+ &tenant,
+ ACTION_DEPLOYMENT_LIST,
+ &resource_for_tenant("deployment", "*", &tenant.org_id, &tenant.project_id),
+ )
+ .await?;
+ let req = request.into_inner();
+
+ let items = self
+ .storage
+ .list_deployments(&tenant.org_id, &tenant.project_id, req.namespace.as_deref())
+ .await?
+ .iter()
+ .map(Self::to_proto_deployment)
+ .collect();
+
+ Ok(Response::new(ListDeploymentsResponse { items }))
+ }
+
+ async fn update_deployment(
+ &self,
+ request: Request,
+ ) -> Result, Status> {
+ let tenant = get_tenant_context(&request)?;
+ let req = request.into_inner();
+ let proto_deployment = req
+ .deployment
+ .ok_or_else(|| Status::invalid_argument("deployment is required"))?;
+ let mut deployment = Self::from_proto_deployment(&proto_deployment)?;
+
+ let (org_id, project_id) = resolve_tenant_ids_from_context(
+ &tenant,
+ deployment.metadata.org_id.as_deref().unwrap_or(""),
+ deployment.metadata.project_id.as_deref().unwrap_or(""),
+ )?;
+ deployment.metadata.org_id = Some(org_id.clone());
+ deployment.metadata.project_id = Some(project_id.clone());
+ Self::validate_spec(&deployment)?;
+
+ let namespace = deployment
+ .metadata
+ .namespace
+ .as_deref()
+ .ok_or_else(|| Status::invalid_argument("namespace is required"))?;
+ let deployment_key = format!("{}/{}", namespace, deployment.metadata.name);
+ self.auth
+ .authorize(
+ &tenant,
+ ACTION_DEPLOYMENT_UPDATE,
+ &resource_for_tenant("deployment", deployment_key, &org_id, &project_id),
+ )
+ .await?;
+
+ let current = self
+ .storage
+ .get_deployment(&org_id, &project_id, namespace, &deployment.metadata.name)
+ .await?
+ .ok_or_else(|| {
+ Status::not_found(format!("Deployment {} not found", deployment.metadata.name))
+ })?;
+
+ deployment.metadata.uid = current.metadata.uid.clone();
+ deployment.metadata.creation_timestamp = current.metadata.creation_timestamp;
+ deployment.metadata.resource_version = Some(next_resource_version(
+ current.metadata.resource_version.as_deref(),
+ ));
+ deployment.status = current.status.clone();
+
+ self.storage.put_deployment(&deployment).await?;
+
+ Ok(Response::new(UpdateDeploymentResponse {
+ deployment: Some(Self::to_proto_deployment(&deployment)),
+ }))
+ }
+
+ async fn delete_deployment(
+ &self,
+ request: Request,
+ ) -> Result, Status> {
+ let tenant = get_tenant_context(&request)?;
+ let req = request.into_inner();
+ let deployment_key = format!("{}/{}", req.namespace, req.name);
+ self.auth
+ .authorize(
+ &tenant,
+ ACTION_DEPLOYMENT_DELETE,
+ &resource_for_tenant(
+ "deployment",
+ deployment_key,
+ &tenant.org_id,
+ &tenant.project_id,
+ ),
+ )
+ .await?;
+
+ let existing = self
+ .storage
+ .get_deployment(
+ &tenant.org_id,
+ &tenant.project_id,
+ &req.namespace,
+ &req.name,
+ )
+ .await?;
+ let Some(deployment) = existing else {
+ return Ok(Response::new(DeleteDeploymentResponse { success: false }));
+ };
+
+ let success = self
+ .storage
+ .delete_deployment(
+ &tenant.org_id,
+ &tenant.project_id,
+ &req.namespace,
+ &req.name,
+ )
+ .await?;
+
+ let pods = self
+ .storage
+ .list_pods(
+ &tenant.org_id,
+ &tenant.project_id,
+ Some(&req.namespace),
+ None,
+ )
+ .await?;
+ for pod in pods
+ .into_iter()
+ .filter(|pod| Self::pod_is_owned_by_deployment(&deployment, pod))
+ {
+ let namespace = pod.metadata.namespace.as_deref().unwrap_or("default");
+ let _ = self
+ .storage
+ .delete_pod(
+ &tenant.org_id,
+ &tenant.project_id,
+ namespace,
+ &pod.metadata.name,
+ )
+ .await?;
+ }
+
+ Ok(Response::new(DeleteDeploymentResponse { success }))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::collections::HashMap;
+
+ fn test_deployment() -> k8shost_types::Deployment {
+ k8shost_types::Deployment {
+ metadata: k8shost_types::ObjectMeta {
+ name: "web".to_string(),
+ namespace: Some("default".to_string()),
+ uid: Some("deploy-uid".to_string()),
+ resource_version: Some("4".to_string()),
+ creation_timestamp: Some(Utc::now()),
+ labels: HashMap::new(),
+ annotations: HashMap::new(),
+ org_id: Some("test-org".to_string()),
+ project_id: Some("test-project".to_string()),
+ },
+ spec: k8shost_types::DeploymentSpec {
+ replicas: Some(2),
+ selector: k8shost_types::LabelSelector {
+ match_labels: HashMap::from([("app".to_string(), "web".to_string())]),
+ },
+ template: k8shost_types::PodTemplateSpec {
+ metadata: k8shost_types::ObjectMeta {
+ name: "".to_string(),
+ namespace: Some("default".to_string()),
+ uid: None,
+ resource_version: None,
+ creation_timestamp: None,
+ labels: HashMap::from([("app".to_string(), "web".to_string())]),
+ annotations: HashMap::new(),
+ org_id: None,
+ project_id: None,
+ },
+ spec: k8shost_types::PodSpec {
+ containers: vec![k8shost_types::Container {
+ name: "web".to_string(),
+ image: "nginx:latest".to_string(),
+ command: Vec::new(),
+ args: Vec::new(),
+ ports: Vec::new(),
+ env: Vec::new(),
+ resources: None,
+ }],
+ restart_policy: Some("Always".to_string()),
+ node_name: None,
+ },
+ },
+ },
+ status: Some(default_status()),
+ }
+ }
+
+ #[test]
+ fn deployment_round_trip_proto_conversion() {
+ let deployment = test_deployment();
+ let proto = DeploymentServiceImpl::to_proto_deployment(&deployment);
+ let round_trip = DeploymentServiceImpl::from_proto_deployment(&proto).unwrap();
+
+ assert_eq!(round_trip.metadata.name, deployment.metadata.name);
+ assert_eq!(round_trip.spec.replicas, Some(2));
+ assert_eq!(
+ round_trip
+ .spec
+ .selector
+ .match_labels
+ .get("app")
+ .map(String::as_str),
+ Some("web")
+ );
+ }
+
+ #[test]
+ fn validate_spec_requires_selector_labels_on_template() {
+ let mut deployment = test_deployment();
+ deployment.spec.template.metadata.labels.clear();
+
+ let error = DeploymentServiceImpl::validate_spec(&deployment).unwrap_err();
+ assert_eq!(error.code(), tonic::Code::InvalidArgument);
+ }
+
+ #[test]
+ fn template_hash_changes_when_template_changes() {
+ let deployment = test_deployment();
+ let mut changed = test_deployment();
+ changed.spec.template.spec.containers[0].image = "caddy:latest".to_string();
+
+ let original_hash = deployment_template_hash(&deployment).unwrap();
+ let changed_hash = deployment_template_hash(&changed).unwrap();
+
+ assert_ne!(original_hash, changed_hash);
+ }
+
+ #[test]
+ fn owned_pod_detection_matches_uid_annotation() {
+ let deployment = test_deployment();
+ let mut pod = k8shost_types::Pod {
+ metadata: k8shost_types::ObjectMeta {
+ name: "web-1".to_string(),
+ namespace: Some("default".to_string()),
+ uid: None,
+ resource_version: None,
+ creation_timestamp: None,
+ labels: HashMap::new(),
+ annotations: HashMap::from([(
+ DEPLOYMENT_UID_ANNOTATION.to_string(),
+ "deploy-uid".to_string(),
+ )]),
+ org_id: Some("test-org".to_string()),
+ project_id: Some("test-project".to_string()),
+ },
+ spec: k8shost_types::PodSpec {
+ containers: Vec::new(),
+ restart_policy: None,
+ node_name: None,
+ },
+ status: None,
+ };
+
+ assert!(DeploymentServiceImpl::pod_is_owned_by_deployment(
+ &deployment,
+ &pod
+ ));
+
+ pod.metadata.annotations.clear();
+ assert!(!DeploymentServiceImpl::pod_is_owned_by_deployment(
+ &deployment,
+ &pod
+ ));
+ }
+}
diff --git a/k8shost/crates/k8shost-server/src/services/mod.rs b/k8shost/crates/k8shost-server/src/services/mod.rs
index 5cf86e9..2ea4918 100644
--- a/k8shost/crates/k8shost-server/src/services/mod.rs
+++ b/k8shost/crates/k8shost-server/src/services/mod.rs
@@ -1,6 +1,7 @@
+pub mod deployment;
+pub mod node;
pub mod pod;
pub mod service;
-pub mod node;
#[cfg(test)]
mod tests;
diff --git a/k8shost/crates/k8shost-server/src/services/node.rs b/k8shost/crates/k8shost-server/src/services/node.rs
index c93b67e..637ad7f 100644
--- a/k8shost/crates/k8shost-server/src/services/node.rs
+++ b/k8shost/crates/k8shost-server/src/services/node.rs
@@ -143,9 +143,7 @@ impl NodeServiceImpl {
}
/// Convert proto NodeStatus to k8shost_types::NodeStatus
- fn from_proto_node_status(
- proto: &k8shost_proto::NodeStatus,
- ) -> k8shost_types::NodeStatus {
+ fn from_proto_node_status(proto: &k8shost_proto::NodeStatus) -> k8shost_types::NodeStatus {
k8shost_types::NodeStatus {
addresses: proto
.addresses
@@ -247,7 +245,11 @@ impl NodeService for NodeServiceImpl {
// Get existing node
let mut node = self
.storage
- .get_node(&tenant_context.org_id, &tenant_context.project_id, &req.node_name)
+ .get_node(
+ &tenant_context.org_id,
+ &tenant_context.project_id,
+ &req.node_name,
+ )
.await?
.ok_or_else(|| Status::not_found(format!("Node {} not found", req.node_name)))?;
@@ -257,9 +259,10 @@ impl NodeService for NodeServiceImpl {
}
// Update last heartbeat timestamp in annotations
- node.metadata
- .annotations
- .insert("k8shost.io/last-heartbeat".to_string(), Utc::now().to_rfc3339());
+ node.metadata.annotations.insert(
+ "k8shost.io/last-heartbeat".to_string(),
+ Utc::now().to_rfc3339(),
+ );
// Increment resource version
let current_version = node
@@ -286,15 +289,22 @@ impl NodeService for NodeServiceImpl {
.authorize(
&tenant_context,
ACTION_NODE_LIST,
- &resource_for_tenant("node", "*", &tenant_context.org_id, &tenant_context.project_id),
+ &resource_for_tenant(
+ "node",
+ "*",
+ &tenant_context.org_id,
+ &tenant_context.project_id,
+ ),
)
.await?;
let _req = request.into_inner();
- let nodes = self.storage.list_nodes(&tenant_context.org_id, &tenant_context.project_id).await?;
+ let nodes = self
+ .storage
+ .list_nodes(&tenant_context.org_id, &tenant_context.project_id)
+ .await?;
- let items: Vec =
- nodes.iter().map(Self::to_proto_node).collect();
+ let items: Vec = nodes.iter().map(Self::to_proto_node).collect();
Ok(Response::new(ListNodesResponse { items }))
}
diff --git a/k8shost/crates/k8shost-server/src/services/pod.rs b/k8shost/crates/k8shost-server/src/services/pod.rs
index 8821047..ca518b8 100644
--- a/k8shost/crates/k8shost-server/src/services/pod.rs
+++ b/k8shost/crates/k8shost-server/src/services/pod.rs
@@ -48,18 +48,19 @@ impl PodServiceImpl {
pub async fn new_with_credit_service(storage: Arc, auth: Arc) -> Self {
// Initialize CreditService client if endpoint is configured
let credit_service = match std::env::var("CREDITSERVICE_ENDPOINT") {
- Ok(endpoint) => {
- match CreditServiceClient::connect(&endpoint).await {
- Ok(client) => {
- tracing::info!("CreditService admission control enabled: {}", endpoint);
- Some(Arc::new(RwLock::new(client)))
- }
- Err(e) => {
- tracing::warn!("Failed to connect to CreditService (admission control disabled): {}", e);
- None
- }
+ Ok(endpoint) => match CreditServiceClient::connect(&endpoint).await {
+ Ok(client) => {
+ tracing::info!("CreditService admission control enabled: {}", endpoint);
+ Some(Arc::new(RwLock::new(client)))
}
- }
+ Err(e) => {
+ tracing::warn!(
+ "Failed to connect to CreditService (admission control disabled): {}",
+ e
+ );
+ None
+ }
+ },
Err(_) => {
tracing::info!("CREDITSERVICE_ENDPOINT not set, admission control disabled");
None
@@ -118,7 +119,10 @@ impl PodServiceImpl {
stripped.parse::().ok()
} else {
// Full CPU cores
- cpu_str.parse::().ok().map(|cores| (cores * 1000.0) as i64)
+ cpu_str
+ .parse::()
+ .ok()
+ .map(|cores| (cores * 1000.0) as i64)
}
}
@@ -134,7 +138,10 @@ impl PodServiceImpl {
stripped.parse::().ok().map(|gb| gb * 1000)
} else {
// Try parsing as bytes
- mem_str.parse::().ok().map(|bytes| bytes / (1024 * 1024))
+ mem_str
+ .parse::()
+ .ok()
+ .map(|bytes| bytes / (1024 * 1024))
}
}
@@ -307,7 +314,9 @@ impl PodService for PodServiceImpl {
) -> Result, Status> {
let tenant = get_tenant_context(&request)?;
let req = request.into_inner();
- let proto_pod = req.pod.ok_or_else(|| Status::invalid_argument("pod is required"))?;
+ let proto_pod = req
+ .pod
+ .ok_or_else(|| Status::invalid_argument("pod is required"))?;
// Convert proto to internal type
let mut pod = Self::from_proto_pod(&proto_pod)?;
@@ -327,7 +336,9 @@ impl PodService for PodServiceImpl {
)
.await?;
if pod.metadata.namespace.is_none() {
- return Err(Status::invalid_argument("namespace is required in metadata"));
+ return Err(Status::invalid_argument(
+ "namespace is required in metadata",
+ ));
}
// Assign UID if not present
@@ -362,11 +373,15 @@ impl PodService for PodServiceImpl {
// Calculate estimated cost based on Pod resource requests
let estimated_cost = Self::calculate_pod_cost(&pod);
- let project_id = pod.metadata.project_id.as_ref()
- .ok_or_else(|| Status::invalid_argument("project_id required for admission control"))?;
+ let project_id = pod.metadata.project_id.as_ref().ok_or_else(|| {
+ Status::invalid_argument("project_id required for admission control")
+ })?;
// Phase 0: Check quota
- match client.check_quota(project_id, ResourceType::K8sNode, 1, estimated_cost).await {
+ match client
+ .check_quota(project_id, ResourceType::K8sNode, 1, estimated_cost)
+ .await
+ {
Ok(resp) if !resp.allowed => {
let reason = if resp.reason.is_empty() {
"Insufficient quota or balance".to_string()
@@ -378,7 +393,10 @@ impl PodService for PodServiceImpl {
reason = %reason,
"Pod creation denied by CreditService"
);
- return Err(Status::resource_exhausted(format!("Admission denied: {}", reason)));
+ return Err(Status::resource_exhausted(format!(
+ "Admission denied: {}",
+ reason
+ )));
}
Err(e) => {
tracing::warn!("CreditService check_quota failed (allowing request): {}", e);
@@ -388,13 +406,16 @@ impl PodService for PodServiceImpl {
}
// Phase 1: Reserve credits
- match client.reserve_credits(
- project_id,
- estimated_cost,
- format!("Pod {} creation", pod.metadata.name),
- "PodInstance",
- 300, // 5 minute TTL
- ).await {
+ match client
+ .reserve_credits(
+ project_id,
+ estimated_cost,
+ format!("Pod {} creation", pod.metadata.name),
+ "PodInstance",
+ 300, // 5 minute TTL
+ )
+ .await
+ {
Ok(reservation) => {
tracing::info!(
reservation_id = %reservation.id,
@@ -405,7 +426,10 @@ impl PodService for PodServiceImpl {
}
Err(e) => {
tracing::warn!("CreditService reserve_credits failed: {}", e);
- return Err(Status::resource_exhausted(format!("Failed to reserve credits: {}", e)));
+ return Err(Status::resource_exhausted(format!(
+ "Failed to reserve credits: {}",
+ e
+ )));
}
}
} else {
@@ -417,9 +441,14 @@ impl PodService for PodServiceImpl {
Ok(_) => {}
Err(e) => {
// Rollback: Release reservation on failure
- if let (Some(ref credit_svc), Some(ref res_id)) = (&self.credit_service, &reservation_id) {
+ if let (Some(ref credit_svc), Some(ref res_id)) =
+ (&self.credit_service, &reservation_id)
+ {
let mut client = credit_svc.write().await;
- if let Err(release_err) = client.release_reservation(res_id, format!("Pod storage failed: {}", e)).await {
+ if let Err(release_err) = client
+ .release_reservation(res_id, format!("Pod storage failed: {}", e))
+ .await
+ {
tracing::warn!("Failed to release reservation {}: {}", res_id, release_err);
} else {
tracing::info!(reservation_id = %res_id, "Released reservation after Pod storage failure");
@@ -435,7 +464,10 @@ impl PodService for PodServiceImpl {
let actual_cost = Self::calculate_pod_cost(&pod);
let pod_uid = pod.metadata.uid.as_ref().unwrap_or(&pod.metadata.name);
- if let Err(e) = client.commit_reservation(res_id, actual_cost, pod_uid).await {
+ if let Err(e) = client
+ .commit_reservation(res_id, actual_cost, pod_uid)
+ .await
+ {
tracing::warn!("Failed to commit reservation {}: {}", res_id, e);
// Pod is already created, so we don't fail here - billing will reconcile
} else {
@@ -471,13 +503,23 @@ impl PodService for PodServiceImpl {
.authorize(
&tenant_context,
ACTION_POD_READ,
- &resource_for_tenant("pod", pod_key, &tenant_context.org_id, &tenant_context.project_id),
+ &resource_for_tenant(
+ "pod",
+ pod_key,
+ &tenant_context.org_id,
+ &tenant_context.project_id,
+ ),
)
.await?;
let pod = self
.storage
- .get_pod(&tenant_context.org_id, &tenant_context.project_id, &req.namespace, &req.name)
+ .get_pod(
+ &tenant_context.org_id,
+ &tenant_context.project_id,
+ &req.namespace,
+ &req.name,
+ )
.await?;
if let Some(pod) = pod {
@@ -500,7 +542,12 @@ impl PodService for PodServiceImpl {
.authorize(
&tenant_context,
ACTION_POD_LIST,
- &resource_for_tenant("pod", "*", &tenant_context.org_id, &tenant_context.project_id),
+ &resource_for_tenant(
+ "pod",
+ "*",
+ &tenant_context.org_id,
+ &tenant_context.project_id,
+ ),
)
.await?;
let req = request.into_inner();
@@ -514,7 +561,12 @@ impl PodService for PodServiceImpl {
let pods = self
.storage
- .list_pods(&tenant_context.org_id, &tenant_context.project_id, namespace, label_selector)
+ .list_pods(
+ &tenant_context.org_id,
+ &tenant_context.project_id,
+ namespace,
+ label_selector,
+ )
.await?;
let items: Vec = pods.iter().map(Self::to_proto_pod).collect();
@@ -528,7 +580,9 @@ impl PodService for PodServiceImpl {
) -> Result, Status> {
let tenant = get_tenant_context(&request)?;
let req = request.into_inner();
- let proto_pod = req.pod.ok_or_else(|| Status::invalid_argument("pod is required"))?;
+ let proto_pod = req
+ .pod
+ .ok_or_else(|| Status::invalid_argument("pod is required"))?;
let mut pod = Self::from_proto_pod(&proto_pod)?;
@@ -585,13 +639,23 @@ impl PodService for PodServiceImpl {
.authorize(
&tenant_context,
ACTION_POD_DELETE,
- &resource_for_tenant("pod", pod_key, &tenant_context.org_id, &tenant_context.project_id),
+ &resource_for_tenant(
+ "pod",
+ pod_key,
+ &tenant_context.org_id,
+ &tenant_context.project_id,
+ ),
)
.await?;
let existed = self
.storage
- .delete_pod(&tenant_context.org_id, &tenant_context.project_id, &req.namespace, &req.name)
+ .delete_pod(
+ &tenant_context.org_id,
+ &tenant_context.project_id,
+ &req.namespace,
+ &req.name,
+ )
.await?;
Ok(Response::new(DeletePodResponse { success: existed }))
@@ -608,7 +672,12 @@ impl PodService for PodServiceImpl {
.authorize(
&tenant_context,
ACTION_POD_LIST,
- &resource_for_tenant("pod", "*", &tenant_context.org_id, &tenant_context.project_id),
+ &resource_for_tenant(
+ "pod",
+ "*",
+ &tenant_context.org_id,
+ &tenant_context.project_id,
+ ),
)
.await?;
let _req = request.into_inner();
diff --git a/k8shost/crates/k8shost-server/src/services/service.rs b/k8shost/crates/k8shost-server/src/services/service.rs
index 5f14fcf..77beec8 100644
--- a/k8shost/crates/k8shost-server/src/services/service.rs
+++ b/k8shost/crates/k8shost-server/src/services/service.rs
@@ -33,7 +33,11 @@ pub struct ServiceServiceImpl {
}
impl ServiceServiceImpl {
- pub fn new(storage: Arc, ipam_client: Arc, auth: Arc) -> Self {
+ pub fn new(
+ storage: Arc,
+ ipam_client: Arc,
+ auth: Arc,
+ ) -> Self {
Self {
storage,
ipam_client,
@@ -73,8 +77,10 @@ impl ServiceServiceImpl {
});
let status = svc.status.as_ref().map(|s| k8shost_proto::ServiceStatus {
- load_balancer: s.load_balancer.as_ref().map(|lb| {
- k8shost_proto::LoadBalancerStatus {
+ load_balancer: s
+ .load_balancer
+ .as_ref()
+ .map(|lb| k8shost_proto::LoadBalancerStatus {
ingress: lb
.ingress
.iter()
@@ -83,8 +89,7 @@ impl ServiceServiceImpl {
hostname: ing.hostname.clone(),
})
.collect(),
- }
- }),
+ }),
});
k8shost_proto::Service {
@@ -141,8 +146,10 @@ impl ServiceServiceImpl {
};
let status = proto.status.as_ref().map(|s| k8shost_types::ServiceStatus {
- load_balancer: s.load_balancer.as_ref().map(|lb| {
- k8shost_types::LoadBalancerStatus {
+ load_balancer: s
+ .load_balancer
+ .as_ref()
+ .map(|lb| k8shost_types::LoadBalancerStatus {
ingress: lb
.ingress
.iter()
@@ -151,8 +158,7 @@ impl ServiceServiceImpl {
hostname: ing.hostname.clone(),
})
.collect(),
- }
- }),
+ }),
});
Ok(k8shost_types::Service {
@@ -194,7 +200,9 @@ impl ServiceService for ServiceServiceImpl {
service.metadata.org_id = Some(org_id.clone());
service.metadata.project_id = Some(project_id.clone());
if service.metadata.namespace.is_none() {
- return Err(Status::invalid_argument("namespace is required in metadata"));
+ return Err(Status::invalid_argument(
+ "namespace is required in metadata",
+ ));
}
self.auth
.authorize(
@@ -221,11 +229,7 @@ impl ServiceService for ServiceServiceImpl {
// Allocate cluster IP if not present and service type is ClusterIP
if service.spec.cluster_ip.is_none() {
- let svc_type = service
- .spec
- .r#type
- .as_deref()
- .unwrap_or("ClusterIP");
+ let svc_type = service.spec.r#type.as_deref().unwrap_or("ClusterIP");
if svc_type == "ClusterIP" || svc_type == "LoadBalancer" {
// Get org_id, project_id, and uid for IPAM
let org_id = service.metadata.org_id.as_ref().unwrap();
@@ -235,12 +239,7 @@ impl ServiceService for ServiceServiceImpl {
// Allocate IP from IPAM
let cluster_ip = self
.ipam_client
- .allocate_cluster_ip(
- org_id,
- project_id,
- service_uid,
- authorization.as_deref(),
- )
+ .allocate_cluster_ip(org_id, project_id, service_uid, authorization.as_deref())
.await
.map_err(|e| {
Status::internal(format!("Failed to allocate Cluster IP: {}", e))
@@ -281,7 +280,12 @@ impl ServiceService for ServiceServiceImpl {
let service = self
.storage
- .get_service(&tenant_context.org_id, &tenant_context.project_id, &req.namespace, &req.name)
+ .get_service(
+ &tenant_context.org_id,
+ &tenant_context.project_id,
+ &req.namespace,
+ &req.name,
+ )
.await?;
if let Some(service) = service {
@@ -304,7 +308,12 @@ impl ServiceService for ServiceServiceImpl {
.authorize(
&tenant_context,
ACTION_SERVICE_LIST,
- &resource_for_tenant("service", "*", &tenant_context.org_id, &tenant_context.project_id),
+ &resource_for_tenant(
+ "service",
+ "*",
+ &tenant_context.org_id,
+ &tenant_context.project_id,
+ ),
)
.await?;
let req = request.into_inner();
@@ -313,13 +322,15 @@ impl ServiceService for ServiceServiceImpl {
let services = self
.storage
- .list_services(&tenant_context.org_id, &tenant_context.project_id, namespace)
+ .list_services(
+ &tenant_context.org_id,
+ &tenant_context.project_id,
+ namespace,
+ )
.await?;
- let items: Vec = services
- .iter()
- .map(Self::to_proto_service)
- .collect();
+ let items: Vec =
+ services.iter().map(Self::to_proto_service).collect();
Ok(Response::new(ListServicesResponse { items }))
}
diff --git a/k8shost/crates/k8shost-server/src/services/tests.rs b/k8shost/crates/k8shost-server/src/services/tests.rs
index cb60a9e..0eda9de 100644
--- a/k8shost/crates/k8shost-server/src/services/tests.rs
+++ b/k8shost/crates/k8shost-server/src/services/tests.rs
@@ -207,7 +207,9 @@ mod tests {
async fn test_pod_crud_operations() {
// This test requires a running FlareDB instance
let pd_addr = std::env::var("FLAREDB_PD_ADDR").unwrap_or("127.0.0.1:2479".to_string());
- let storage = Storage::new(pd_addr).await.expect("Failed to connect to FlareDB");
+ let storage = Storage::new(pd_addr)
+ .await
+ .expect("Failed to connect to FlareDB");
let pod_service = PodServiceImpl::new(Arc::new(storage), test_auth_service().await);
// Create a pod
@@ -226,10 +228,7 @@ mod tests {
let get_resp = pod_service.get_pod(get_req).await;
assert!(get_resp.is_ok());
let retrieved_pod = get_resp.unwrap().into_inner().pod.unwrap();
- assert_eq!(
- retrieved_pod.metadata.as_ref().unwrap().name,
- "test-pod-1"
- );
+ assert_eq!(retrieved_pod.metadata.as_ref().unwrap().name, "test-pod-1");
// List pods
let list_req = with_test_tenant(Request::new(ListPodsRequest {
@@ -255,7 +254,9 @@ mod tests {
#[ignore] // Requires running FlareDB and PrismNET instances
async fn test_service_crud_operations() {
let pd_addr = std::env::var("FLAREDB_PD_ADDR").unwrap_or("127.0.0.1:2479".to_string());
- let storage = Storage::new(pd_addr).await.expect("Failed to connect to FlareDB");
+ let storage = Storage::new(pd_addr)
+ .await
+ .expect("Failed to connect to FlareDB");
let prismnet_addr =
std::env::var("PRISMNET_ADDR").unwrap_or("http://127.0.0.1:9090".to_string());
let ipam_client = crate::ipam_client::IpamClient::new(prismnet_addr);
@@ -273,12 +274,7 @@ mod tests {
let create_resp = service_service.create_service(create_req).await;
assert!(create_resp.is_ok());
let created_service = create_resp.unwrap().into_inner().service.unwrap();
- assert!(created_service
- .spec
- .as_ref()
- .unwrap()
- .cluster_ip
- .is_some());
+ assert!(created_service.spec.as_ref().unwrap().cluster_ip.is_some());
// Get the service
let get_req = with_test_tenant(Request::new(GetServiceRequest {
@@ -308,7 +304,9 @@ mod tests {
#[ignore] // Requires running FlareDB instance
async fn test_node_operations() {
let pd_addr = std::env::var("FLAREDB_PD_ADDR").unwrap_or("127.0.0.1:2479".to_string());
- let storage = Storage::new(pd_addr).await.expect("Failed to connect to FlareDB");
+ let storage = Storage::new(pd_addr)
+ .await
+ .expect("Failed to connect to FlareDB");
let node_service = NodeServiceImpl::new(Arc::new(storage), test_auth_service().await);
// Register a node
diff --git a/k8shost/crates/k8shost-server/src/storage.rs b/k8shost/crates/k8shost-server/src/storage.rs
index 9d40d29..59659c6 100644
--- a/k8shost/crates/k8shost-server/src/storage.rs
+++ b/k8shost/crates/k8shost-server/src/storage.rs
@@ -4,7 +4,7 @@
//! with multi-tenant support using FlareDB as the backend.
use flaredb_client::RdbClient;
-use k8shost_types::{Node, Pod, Service};
+use k8shost_types::{Deployment, Node, Pod, Service};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
@@ -18,12 +18,8 @@ pub struct Storage {
impl Storage {
/// Create a new storage instance with FlareDB backend
pub async fn new(pd_addr: String) -> Result> {
- let client = RdbClient::connect_with_pd_namespace(
- pd_addr.clone(),
- pd_addr,
- "k8shost",
- )
- .await?;
+ let client =
+ RdbClient::connect_with_pd_namespace(pd_addr.clone(), pd_addr, "k8shost").await?;
Ok(Self {
client: Arc::new(Mutex::new(client)),
@@ -31,9 +27,7 @@ impl Storage {
}
/// Create a storage instance that connects directly to a single FlareDB server (no PD)
- pub async fn new_direct(
- server_addr: String,
- ) -> Result> {
+ pub async fn new_direct(server_addr: String) -> Result> {
let client = RdbClient::connect_direct(server_addr, "k8shost").await?;
Ok(Self {
client: Arc::new(Mutex::new(client)),
@@ -68,11 +62,20 @@ impl Storage {
/// Create or update a pod
pub async fn put_pod(&self, pod: &Pod) -> Result<(), Status> {
- let org_id = pod.metadata.org_id.as_ref()
+ let org_id = pod
+ .metadata
+ .org_id
+ .as_ref()
.ok_or_else(|| Status::invalid_argument("org_id is required"))?;
- let project_id = pod.metadata.project_id.as_ref()
+ let project_id = pod
+ .metadata
+ .project_id
+ .as_ref()
.ok_or_else(|| Status::invalid_argument("project_id is required"))?;
- let namespace = pod.metadata.namespace.as_ref()
+ let namespace = pod
+ .metadata
+ .namespace
+ .as_ref()
.ok_or_else(|| Status::invalid_argument("namespace is required"))?;
let key = Self::pod_key(org_id, project_id, namespace, &pod.metadata.name);
@@ -80,7 +83,8 @@ impl Storage {
.map_err(|e| Status::internal(format!("Failed to serialize pod: {}", e)))?;
let mut client = self.client.lock().await;
- client.raw_put(key, value)
+ client
+ .raw_put(key, value)
.await
.map_err(|e| Status::internal(format!("FlareDB put failed: {}", e)))?;
@@ -98,7 +102,8 @@ impl Storage {
let key = Self::pod_key(org_id, project_id, namespace, name);
let mut client = self.client.lock().await;
- let result = client.raw_get(key)
+ let result = client
+ .raw_get(key)
.await
.map_err(|e| Status::internal(format!("FlareDB get failed: {}", e)))?;
@@ -139,13 +144,14 @@ impl Storage {
// Paginate through all results
loop {
let mut client = self.client.lock().await;
- let (_keys, values, next) = client.raw_scan(
- start_key.clone(),
- end_key.clone(),
- 1000, // Batch size
- )
- .await
- .map_err(|e| Status::internal(format!("FlareDB scan failed: {}", e)))?;
+ let (_keys, values, next) = client
+ .raw_scan(
+ start_key.clone(),
+ end_key.clone(),
+ 1000, // Batch size
+ )
+ .await
+ .map_err(|e| Status::internal(format!("FlareDB scan failed: {}", e)))?;
// Deserialize pods
for value in values {
@@ -153,7 +159,11 @@ impl Storage {
// Apply label selector filter if provided
if let Some(selector) = label_selector {
let matches = selector.iter().all(|(k, v)| {
- pod.metadata.labels.get(k).map(|pv| pv == v).unwrap_or(false)
+ pod.metadata
+ .labels
+ .get(k)
+ .map(|pv| pv == v)
+ .unwrap_or(false)
});
if matches {
pods.push(pod);
@@ -221,7 +231,8 @@ impl Storage {
let key = Self::pod_key(org_id, project_id, namespace, name);
let mut client = self.client.lock().await;
- let existed = client.raw_delete(key)
+ let existed = client
+ .raw_delete(key)
.await
.map_err(|e| Status::internal(format!("FlareDB delete failed: {}", e)))?;
@@ -234,7 +245,11 @@ impl Storage {
/// Build key for service storage
fn service_key(org_id: &str, project_id: &str, namespace: &str, name: &str) -> Vec {
- format!("k8s/{}/{}/services/{}/{}", org_id, project_id, namespace, name).into_bytes()
+ format!(
+ "k8s/{}/{}/services/{}/{}",
+ org_id, project_id, namespace, name
+ )
+ .into_bytes()
}
/// Build prefix for service listing
@@ -248,11 +263,20 @@ impl Storage {
/// Create or update a service
pub async fn put_service(&self, service: &Service) -> Result<(), Status> {
- let org_id = service.metadata.org_id.as_ref()
+ let org_id = service
+ .metadata
+ .org_id
+ .as_ref()
.ok_or_else(|| Status::invalid_argument("org_id is required"))?;
- let project_id = service.metadata.project_id.as_ref()
+ let project_id = service
+ .metadata
+ .project_id
+ .as_ref()
.ok_or_else(|| Status::invalid_argument("project_id is required"))?;
- let namespace = service.metadata.namespace.as_ref()
+ let namespace = service
+ .metadata
+ .namespace
+ .as_ref()
.ok_or_else(|| Status::invalid_argument("namespace is required"))?;
let key = Self::service_key(org_id, project_id, namespace, &service.metadata.name);
@@ -260,7 +284,8 @@ impl Storage {
.map_err(|e| Status::internal(format!("Failed to serialize service: {}", e)))?;
let mut client = self.client.lock().await;
- client.raw_put(key, value)
+ client
+ .raw_put(key, value)
.await
.map_err(|e| Status::internal(format!("FlareDB put failed: {}", e)))?;
@@ -278,7 +303,8 @@ impl Storage {
let key = Self::service_key(org_id, project_id, namespace, name);
let mut client = self.client.lock().await;
- let result = client.raw_get(key)
+ let result = client
+ .raw_get(key)
.await
.map_err(|e| Status::internal(format!("FlareDB get failed: {}", e)))?;
@@ -316,13 +342,10 @@ impl Storage {
loop {
let mut client = self.client.lock().await;
- let (_keys, values, next) = client.raw_scan(
- start_key.clone(),
- end_key.clone(),
- 1000,
- )
- .await
- .map_err(|e| Status::internal(format!("FlareDB scan failed: {}", e)))?;
+ let (_keys, values, next) = client
+ .raw_scan(start_key.clone(), end_key.clone(), 1000)
+ .await
+ .map_err(|e| Status::internal(format!("FlareDB scan failed: {}", e)))?;
for value in values {
if let Ok(service) = serde_json::from_slice::(&value) {
@@ -351,7 +374,8 @@ impl Storage {
let key = Self::service_key(org_id, project_id, namespace, name);
let mut client = self.client.lock().await;
- let existed = client.raw_delete(key)
+ let existed = client
+ .raw_delete(key)
.await
.map_err(|e| Status::internal(format!("FlareDB delete failed: {}", e)))?;
@@ -374,9 +398,15 @@ impl Storage {
/// Create or update a node
pub async fn put_node(&self, node: &Node) -> Result<(), Status> {
- let org_id = node.metadata.org_id.as_ref()
+ let org_id = node
+ .metadata
+ .org_id
+ .as_ref()
.ok_or_else(|| Status::invalid_argument("org_id is required"))?;
- let project_id = node.metadata.project_id.as_ref()
+ let project_id = node
+ .metadata
+ .project_id
+ .as_ref()
.ok_or_else(|| Status::invalid_argument("project_id is required"))?;
let key = Self::node_key(org_id, project_id, &node.metadata.name);
@@ -384,7 +414,8 @@ impl Storage {
.map_err(|e| Status::internal(format!("Failed to serialize node: {}", e)))?;
let mut client = self.client.lock().await;
- client.raw_put(key, value)
+ client
+ .raw_put(key, value)
.await
.map_err(|e| Status::internal(format!("FlareDB put failed: {}", e)))?;
@@ -401,7 +432,8 @@ impl Storage {
let key = Self::node_key(org_id, project_id, name);
let mut client = self.client.lock().await;
- let result = client.raw_get(key)
+ let result = client
+ .raw_get(key)
.await
.map_err(|e| Status::internal(format!("FlareDB get failed: {}", e)))?;
@@ -415,11 +447,7 @@ impl Storage {
}
/// List all nodes
- pub async fn list_nodes(
- &self,
- org_id: &str,
- project_id: &str,
- ) -> Result, Status> {
+ pub async fn list_nodes(&self, org_id: &str, project_id: &str) -> Result, Status> {
let prefix = Self::node_prefix(org_id, project_id);
let mut end_key = prefix.clone();
@@ -438,13 +466,10 @@ impl Storage {
loop {
let mut client = self.client.lock().await;
- let (_keys, values, next) = client.raw_scan(
- start_key.clone(),
- end_key.clone(),
- 1000,
- )
- .await
- .map_err(|e| Status::internal(format!("FlareDB scan failed: {}", e)))?;
+ let (_keys, values, next) = client
+ .raw_scan(start_key.clone(), end_key.clone(), 1000)
+ .await
+ .map_err(|e| Status::internal(format!("FlareDB scan failed: {}", e)))?;
for value in values {
if let Ok(node) = serde_json::from_slice::(&value) {
@@ -472,7 +497,184 @@ impl Storage {
let key = Self::node_key(org_id, project_id, name);
let mut client = self.client.lock().await;
- let existed = client.raw_delete(key)
+ let existed = client
+ .raw_delete(key)
+ .await
+ .map_err(|e| Status::internal(format!("FlareDB delete failed: {}", e)))?;
+
+ Ok(existed)
+ }
+
+ // ============================================================================
+ // Deployment Operations
+ // ============================================================================
+
+ /// Build key for deployment storage
+ fn deployment_key(org_id: &str, project_id: &str, namespace: &str, name: &str) -> Vec {
+ format!(
+ "k8s/{}/{}/deployments/{}/{}",
+ org_id, project_id, namespace, name
+ )
+ .into_bytes()
+ }
+
+ /// Build prefix for deployment listing
+ fn deployment_prefix(org_id: &str, project_id: &str, namespace: Option<&str>) -> Vec {
+ if let Some(ns) = namespace {
+ format!("k8s/{}/{}/deployments/{}/", org_id, project_id, ns).into_bytes()
+ } else {
+ format!("k8s/{}/{}/deployments/", org_id, project_id).into_bytes()
+ }
+ }
+
+ /// Create or update a deployment
+ pub async fn put_deployment(&self, deployment: &Deployment) -> Result<(), Status> {
+ let org_id = deployment
+ .metadata
+ .org_id
+ .as_ref()
+ .ok_or_else(|| Status::invalid_argument("org_id is required"))?;
+ let project_id = deployment
+ .metadata
+ .project_id
+ .as_ref()
+ .ok_or_else(|| Status::invalid_argument("project_id is required"))?;
+ let namespace = deployment
+ .metadata
+ .namespace
+ .as_ref()
+ .ok_or_else(|| Status::invalid_argument("namespace is required"))?;
+
+ let key = Self::deployment_key(org_id, project_id, namespace, &deployment.metadata.name);
+ let value = serde_json::to_vec(deployment)
+ .map_err(|e| Status::internal(format!("Failed to serialize deployment: {}", e)))?;
+
+ let mut client = self.client.lock().await;
+ client
+ .raw_put(key, value)
+ .await
+ .map_err(|e| Status::internal(format!("FlareDB put failed: {}", e)))?;
+
+ Ok(())
+ }
+
+ /// Get a deployment by name
+ pub async fn get_deployment(
+ &self,
+ org_id: &str,
+ project_id: &str,
+ namespace: &str,
+ name: &str,
+ ) -> Result