Implement K8sHost deployment reconciliation

This commit is contained in:
centra 2026-03-31 23:15:04 +09:00
parent ec55fdea05
commit 2b7c3166d2
Signed by: centra
GPG key ID: 0C09689D20B25ACA
20 changed files with 2119 additions and 434 deletions

1
k8shost/Cargo.lock generated
View file

@ -1985,6 +1985,7 @@ dependencies = [
"prost 0.13.5",
"serde",
"serde_json",
"sha2",
"tempfile",
"tokio",
"tokio-stream",

View file

@ -39,6 +39,7 @@ chrono = { workspace = true }
clap = { workspace = true }
config = { workspace = true }
toml = { workspace = true }
sha2 = "0.10"
# REST API dependencies
axum = "0.8"

View file

@ -8,8 +8,8 @@
use anyhow::{Context, Result};
use serde_json::json;
use std::process::Command;
use std::io::Write;
use std::process::Command;
/// CNI configuration for pod network setup
#[derive(Debug, Clone)]
@ -85,7 +85,9 @@ pub async fn invoke_cni_add(
}
// Wait for result
let output = child.wait_with_output().context("Failed to wait for CNI plugin")?;
let output = child
.wait_with_output()
.context("Failed to wait for CNI plugin")?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
@ -93,8 +95,8 @@ pub async fn invoke_cni_add(
}
// Parse result
let result: CniResult = serde_json::from_slice(&output.stdout)
.context("Failed to parse CNI result")?;
let result: CniResult =
serde_json::from_slice(&output.stdout).context("Failed to parse CNI result")?;
Ok(result)
}
@ -146,11 +148,16 @@ pub async fn invoke_cni_del(
}
// Wait for result
let output = child.wait_with_output().context("Failed to wait for CNI plugin")?;
let output = child
.wait_with_output()
.context("Failed to wait for CNI plugin")?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
tracing::warn!("CNI DEL error (may be expected if already deleted): {}", stderr);
tracing::warn!(
"CNI DEL error (may be expected if already deleted): {}",
stderr
);
}
Ok(())

View file

@ -104,8 +104,7 @@ impl Default for ChainFireConfig {
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[derive(Default)]
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
pub struct Config {
pub server: ServerConfig,
pub flaredb: FlareDbConfig,

View file

@ -0,0 +1,414 @@
//! Background reconciler for Deployment resources.
use crate::services::deployment::{
deployment_template_hash, DeploymentServiceImpl, DEPLOYMENT_NAME_ANNOTATION,
DEPLOYMENT_UID_ANNOTATION, TEMPLATE_HASH_ANNOTATION,
};
use crate::storage::Storage;
use chrono::Utc;
use k8shost_types::{Deployment, DeploymentStatus, Pod, PodStatus};
use std::collections::BTreeSet;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;
use tonic::Status;
use tracing::{debug, info, warn};
use uuid::Uuid;
pub struct DeploymentController {
storage: Arc<Storage>,
interval: Duration,
}
#[derive(Debug)]
struct DeploymentPlan {
creates: Vec<Pod>,
deletes: Vec<String>,
status: DeploymentStatus,
}
impl DeploymentController {
pub fn new(storage: Arc<Storage>) -> Self {
Self {
storage,
interval: Duration::from_secs(5),
}
}
pub async fn run(self: Arc<Self>) {
info!(
"Deployment controller started ({}s interval)",
self.interval.as_secs()
);
loop {
if let Err(error) = self.reconcile_all().await {
warn!(error = %error, "deployment reconciliation failed");
}
sleep(self.interval).await;
}
}
async fn reconcile_all(&self) -> anyhow::Result<()> {
let mut deployments = self.storage.list_all_deployments().await?;
deployments.sort_by(|lhs, rhs| {
lhs.metadata
.org_id
.cmp(&rhs.metadata.org_id)
.then_with(|| lhs.metadata.project_id.cmp(&rhs.metadata.project_id))
.then_with(|| lhs.metadata.namespace.cmp(&rhs.metadata.namespace))
.then_with(|| lhs.metadata.name.cmp(&rhs.metadata.name))
});
for deployment in deployments {
if let Err(error) = self.reconcile_deployment(deployment).await {
warn!(error = %error, "failed to reconcile deployment");
}
}
Ok(())
}
async fn reconcile_deployment(&self, deployment: Deployment) -> anyhow::Result<()> {
let Some(org_id) = deployment.metadata.org_id.as_deref() else {
warn!(deployment = %deployment.metadata.name, "deployment missing org_id");
return Ok(());
};
let Some(project_id) = deployment.metadata.project_id.as_deref() else {
warn!(deployment = %deployment.metadata.name, "deployment missing project_id");
return Ok(());
};
let Some(namespace) = deployment.metadata.namespace.as_deref() else {
warn!(deployment = %deployment.metadata.name, "deployment missing namespace");
return Ok(());
};
let pods = self
.storage
.list_pods(org_id, project_id, Some(namespace), None)
.await?;
let plan = plan_deployment_reconciliation(&deployment, &pods)?;
for pod_name in &plan.deletes {
let deleted = self
.storage
.delete_pod(org_id, project_id, namespace, pod_name)
.await?;
if deleted {
debug!(
deployment = %deployment.metadata.name,
pod = %pod_name,
"deleted deployment-managed pod"
);
}
}
for pod in &plan.creates {
self.storage.put_pod(pod).await?;
debug!(
deployment = %deployment.metadata.name,
pod = %pod.metadata.name,
"created deployment-managed pod"
);
}
let status_changed = !deployment_status_eq(deployment.status.as_ref(), Some(&plan.status));
if !plan.creates.is_empty() || !plan.deletes.is_empty() || status_changed {
let mut updated = deployment.clone();
updated.status = Some(plan.status);
updated.metadata.resource_version = Some(next_resource_version(
deployment.metadata.resource_version.as_deref(),
));
self.storage.put_deployment(&updated).await?;
}
Ok(())
}
}
fn plan_deployment_reconciliation(
deployment: &Deployment,
pods: &[Pod],
) -> Result<DeploymentPlan, Status> {
DeploymentServiceImpl::validate_spec(deployment)?;
let template_hash = deployment_template_hash(deployment)?;
let desired_replicas = deployment.spec.replicas.unwrap_or(1).max(0) as usize;
let mut owned = pods
.iter()
.filter(|pod| DeploymentServiceImpl::pod_is_owned_by_deployment(deployment, pod))
.cloned()
.collect::<Vec<_>>();
owned.sort_by(|lhs, rhs| lhs.metadata.name.cmp(&rhs.metadata.name));
let mut current = Vec::new();
let mut stale = Vec::new();
for pod in owned {
let pod_hash = pod
.metadata
.annotations
.get(TEMPLATE_HASH_ANNOTATION)
.map(String::as_str);
if pod_hash == Some(template_hash.as_str()) {
current.push(pod);
} else {
stale.push(pod);
}
}
let mut deletes = stale
.iter()
.map(|pod| pod.metadata.name.clone())
.collect::<Vec<_>>();
let mut survivors = current;
if survivors.len() > desired_replicas {
let excess = survivors.split_off(desired_replicas);
deletes.extend(excess.into_iter().map(|pod| pod.metadata.name));
}
let used_ordinals = pods
.iter()
.filter(|pod| DeploymentServiceImpl::pod_is_owned_by_deployment(deployment, pod))
.filter_map(|pod| parse_pod_ordinal(&deployment.metadata.name, &pod.metadata.name))
.collect::<BTreeSet<_>>();
let ready_replicas = survivors.iter().filter(|pod| pod_is_ready(pod)).count() as i32;
let available_replicas = ready_replicas;
let mut creates = Vec::new();
let mut used_ordinals = used_ordinals;
while survivors.len() + creates.len() < desired_replicas {
let ordinal = next_available_ordinal(&used_ordinals);
used_ordinals.insert(ordinal);
creates.push(build_pod(deployment, &template_hash, ordinal)?);
}
Ok(DeploymentPlan {
creates,
deletes,
status: DeploymentStatus {
replicas: Some(desired_replicas as i32),
ready_replicas: Some(ready_replicas),
available_replicas: Some(available_replicas),
},
})
}
fn build_pod(deployment: &Deployment, template_hash: &str, ordinal: usize) -> Result<Pod, Status> {
let Some(namespace) = deployment.metadata.namespace.clone() else {
return Err(Status::invalid_argument("deployment namespace is required"));
};
let Some(org_id) = deployment.metadata.org_id.clone() else {
return Err(Status::invalid_argument("deployment org_id is required"));
};
let Some(project_id) = deployment.metadata.project_id.clone() else {
return Err(Status::invalid_argument(
"deployment project_id is required",
));
};
let mut metadata = deployment.spec.template.metadata.clone();
metadata.name = format!("{}-{}", deployment.metadata.name, ordinal);
metadata.namespace = Some(namespace);
metadata.org_id = Some(org_id);
metadata.project_id = Some(project_id);
metadata.uid = Some(Uuid::new_v4().to_string());
metadata.resource_version = Some("1".to_string());
metadata.creation_timestamp = Some(Utc::now());
metadata.annotations.insert(
DEPLOYMENT_NAME_ANNOTATION.to_string(),
deployment.metadata.name.clone(),
);
if let Some(uid) = deployment.metadata.uid.as_ref() {
metadata
.annotations
.insert(DEPLOYMENT_UID_ANNOTATION.to_string(), uid.clone());
}
metadata.annotations.insert(
TEMPLATE_HASH_ANNOTATION.to_string(),
template_hash.to_string(),
);
let mut spec = deployment.spec.template.spec.clone();
spec.node_name = None;
Ok(Pod {
metadata,
spec,
status: Some(PodStatus {
phase: Some("Pending".to_string()),
pod_ip: None,
host_ip: None,
conditions: Vec::new(),
}),
})
}
fn pod_is_ready(pod: &Pod) -> bool {
pod.status
.as_ref()
.and_then(|status| status.phase.as_deref())
== Some("Running")
}
fn parse_pod_ordinal(deployment_name: &str, pod_name: &str) -> Option<usize> {
let suffix = pod_name.strip_prefix(&format!("{}-", deployment_name))?;
suffix.parse::<usize>().ok()
}
fn next_available_ordinal(used: &BTreeSet<usize>) -> usize {
let mut ordinal = 1usize;
while used.contains(&ordinal) {
ordinal += 1;
}
ordinal
}
fn deployment_status_eq(lhs: Option<&DeploymentStatus>, rhs: Option<&DeploymentStatus>) -> bool {
match (lhs, rhs) {
(None, None) => true,
(Some(lhs), Some(rhs)) => {
lhs.replicas == rhs.replicas
&& lhs.ready_replicas == rhs.ready_replicas
&& lhs.available_replicas == rhs.available_replicas
}
_ => false,
}
}
fn next_resource_version(current: Option<&str>) -> String {
let current = current
.and_then(|version| version.parse::<u64>().ok())
.unwrap_or(0);
(current + 1).to_string()
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
fn test_deployment() -> Deployment {
Deployment {
metadata: k8shost_types::ObjectMeta {
name: "web".to_string(),
namespace: Some("default".to_string()),
uid: Some("deployment-uid".to_string()),
resource_version: Some("1".to_string()),
creation_timestamp: None,
labels: HashMap::new(),
annotations: HashMap::new(),
org_id: Some("test-org".to_string()),
project_id: Some("test-project".to_string()),
},
spec: k8shost_types::DeploymentSpec {
replicas: Some(2),
selector: k8shost_types::LabelSelector {
match_labels: HashMap::from([("app".to_string(), "web".to_string())]),
},
template: k8shost_types::PodTemplateSpec {
metadata: k8shost_types::ObjectMeta {
name: "".to_string(),
namespace: Some("default".to_string()),
uid: None,
resource_version: None,
creation_timestamp: None,
labels: HashMap::from([("app".to_string(), "web".to_string())]),
annotations: HashMap::new(),
org_id: None,
project_id: None,
},
spec: k8shost_types::PodSpec {
containers: vec![k8shost_types::Container {
name: "web".to_string(),
image: "nginx:latest".to_string(),
command: Vec::new(),
args: Vec::new(),
ports: Vec::new(),
env: Vec::new(),
resources: None,
}],
restart_policy: Some("Always".to_string()),
node_name: None,
},
},
},
status: None,
}
}
fn owned_pod(name: &str, hash: &str, phase: &str) -> Pod {
Pod {
metadata: k8shost_types::ObjectMeta {
name: name.to_string(),
namespace: Some("default".to_string()),
uid: None,
resource_version: None,
creation_timestamp: None,
labels: HashMap::from([("app".to_string(), "web".to_string())]),
annotations: HashMap::from([
(DEPLOYMENT_NAME_ANNOTATION.to_string(), "web".to_string()),
(
DEPLOYMENT_UID_ANNOTATION.to_string(),
"deployment-uid".to_string(),
),
(TEMPLATE_HASH_ANNOTATION.to_string(), hash.to_string()),
]),
org_id: Some("test-org".to_string()),
project_id: Some("test-project".to_string()),
},
spec: k8shost_types::PodSpec {
containers: Vec::new(),
restart_policy: None,
node_name: None,
},
status: Some(PodStatus {
phase: Some(phase.to_string()),
pod_ip: None,
host_ip: None,
conditions: Vec::new(),
}),
}
}
#[test]
fn plan_creates_missing_replicas() {
let deployment = test_deployment();
let plan = plan_deployment_reconciliation(&deployment, &[]).unwrap();
assert_eq!(plan.creates.len(), 2);
assert!(plan.deletes.is_empty());
assert_eq!(plan.status.replicas, Some(2));
}
#[test]
fn plan_scales_down_extra_pods() {
let mut deployment = test_deployment();
deployment.spec.replicas = Some(1);
let hash = deployment_template_hash(&deployment).unwrap();
let pods = vec![
owned_pod("web-1", &hash, "Running"),
owned_pod("web-2", &hash, "Running"),
];
let plan = plan_deployment_reconciliation(&deployment, &pods).unwrap();
assert!(plan.creates.is_empty());
assert_eq!(plan.deletes, vec!["web-2".to_string()]);
assert_eq!(plan.status.ready_replicas, Some(1));
}
#[test]
fn plan_replaces_stale_template_pods() {
let deployment = test_deployment();
let pods = vec![
owned_pod("web-1", "stale", "Running"),
owned_pod("web-2", "stale", "Running"),
];
let plan = plan_deployment_reconciliation(&deployment, &pods).unwrap();
assert_eq!(plan.creates.len(), 2);
assert_eq!(plan.deletes.len(), 2);
assert_eq!(plan.status.ready_replicas, Some(0));
}
}

View file

@ -11,8 +11,8 @@ use fiberlb_api::listener_service_client::ListenerServiceClient;
use fiberlb_api::load_balancer_service_client::LoadBalancerServiceClient;
use fiberlb_api::pool_service_client::PoolServiceClient;
use fiberlb_api::{
CreateBackendRequest, CreateListenerRequest, CreateLoadBalancerRequest,
CreatePoolRequest, DeleteLoadBalancerRequest, ListenerProtocol, PoolAlgorithm, PoolProtocol,
CreateBackendRequest, CreateListenerRequest, CreateLoadBalancerRequest, CreatePoolRequest,
DeleteLoadBalancerRequest, ListenerProtocol, PoolAlgorithm, PoolProtocol,
};
use k8shost_types::{LoadBalancerIngress, LoadBalancerStatus, ServiceStatus};
use std::sync::Arc;
@ -65,7 +65,10 @@ impl FiberLbController {
let tenants = vec![("default-org".to_string(), "default-project".to_string())];
for (org_id, project_id) in tenants {
if let Err(e) = self.reconcile_tenant_loadbalancers(&org_id, &project_id).await {
if let Err(e) = self
.reconcile_tenant_loadbalancers(&org_id, &project_id)
.await
{
warn!(
"Failed to reconcile LoadBalancers for tenant {}/{}: {}",
org_id, project_id, e
@ -79,10 +82,7 @@ impl FiberLbController {
/// Reconcile LoadBalancer services for a specific tenant
async fn reconcile_tenant_loadbalancers(&self, org_id: &str, project_id: &str) -> Result<()> {
// Get all services for this tenant
let services = self
.storage
.list_services(org_id, project_id, None)
.await?;
let services = self.storage.list_services(org_id, project_id, None).await?;
// Filter for LoadBalancer services that need provisioning
let lb_services: Vec<_> = services
@ -93,12 +93,19 @@ impl FiberLbController {
// 2. status is None OR status.load_balancer is None (not yet provisioned)
svc.spec.r#type.as_deref() == Some("LoadBalancer")
&& (svc.status.is_none()
|| svc.status.as_ref().and_then(|s| s.load_balancer.as_ref()).is_none())
|| svc
.status
.as_ref()
.and_then(|s| s.load_balancer.as_ref())
.is_none())
})
.collect();
if lb_services.is_empty() {
debug!("No LoadBalancer services to provision for tenant {}/{}", org_id, project_id);
debug!(
"No LoadBalancer services to provision for tenant {}/{}",
org_id, project_id
);
return Ok(());
}
@ -122,7 +129,10 @@ impl FiberLbController {
match LoadBalancerServiceClient::connect(self.fiberlb_addr.clone()).await {
Ok(client) => client,
Err(e) => {
warn!("Failed to connect to FiberLB at {}: {}", self.fiberlb_addr, e);
warn!(
"Failed to connect to FiberLB at {}: {}",
self.fiberlb_addr, e
);
return Ok(());
}
};
@ -135,7 +145,8 @@ impl FiberLbController {
}
};
let mut listener_client = match ListenerServiceClient::connect(self.fiberlb_addr.clone()).await {
let mut listener_client =
match ListenerServiceClient::connect(self.fiberlb_addr.clone()).await {
Ok(client) => client,
Err(e) => {
warn!("Failed to connect to FiberLB ListenerService: {}", e);
@ -143,7 +154,8 @@ impl FiberLbController {
}
};
let mut backend_client = match BackendServiceClient::connect(self.fiberlb_addr.clone()).await {
let mut backend_client =
match BackendServiceClient::connect(self.fiberlb_addr.clone()).await {
Ok(client) => client,
Err(e) => {
warn!("Failed to connect to FiberLB BackendService: {}", e);
@ -160,7 +172,10 @@ impl FiberLbController {
.unwrap_or_else(|| "default".to_string());
let name = service.metadata.name.clone();
info!("Provisioning LoadBalancer for service {}/{}", namespace, name);
info!(
"Provisioning LoadBalancer for service {}/{}",
namespace, name
);
// Create LoadBalancer in FiberLB
let lb_name = format!("{}.{}", name, namespace);
@ -210,19 +225,25 @@ impl FiberLbController {
// Create Pool for this LoadBalancer
let pool_name = format!("{}-pool", lb_name);
let pool_id = match pool_client
.create_pool(authorized_request(CreatePoolRequest {
.create_pool(authorized_request(
CreatePoolRequest {
name: pool_name.clone(),
loadbalancer_id: lb_id.clone(),
algorithm: PoolAlgorithm::RoundRobin as i32,
protocol: PoolProtocol::Tcp as i32,
session_persistence: None,
}, &auth_token))
},
&auth_token,
))
.await
{
Ok(response) => {
let pool = response.into_inner().pool;
if let Some(pool) = pool {
info!("Created Pool {} for service {}/{}", pool.id, namespace, name);
info!(
"Created Pool {} for service {}/{}",
pool.id, namespace, name
);
pool.id
} else {
warn!("Failed to create Pool for service {}/{}", namespace, name);
@ -230,7 +251,10 @@ impl FiberLbController {
}
}
Err(e) => {
warn!("Failed to create Pool for service {}/{}: {}", namespace, name, e);
warn!(
"Failed to create Pool for service {}/{}: {}",
namespace, name, e
);
continue;
}
};
@ -241,11 +265,15 @@ impl FiberLbController {
let listener_name = format!(
"{}-listener-{}",
lb_name,
svc_port.name.as_deref().unwrap_or(&svc_port.port.to_string())
svc_port
.name
.as_deref()
.unwrap_or(&svc_port.port.to_string())
);
match listener_client
.create_listener(authorized_request(CreateListenerRequest {
.create_listener(authorized_request(
CreateListenerRequest {
name: listener_name.clone(),
loadbalancer_id: lb_id.clone(),
protocol: ListenerProtocol::Tcp as i32,
@ -253,7 +281,9 @@ impl FiberLbController {
default_pool_id: pool_id.clone(),
tls_config: None,
connection_limit: 0, // No limit
}, &auth_token))
},
&auth_token,
))
.await
{
Ok(response) => {
@ -327,21 +357,20 @@ impl FiberLbController {
// Use target_port if specified, otherwise use port
let backend_port = svc_port.target_port.unwrap_or(svc_port.port);
let backend_name = format!(
"{}-backend-{}-{}",
lb_name,
pod.metadata.name,
backend_port
);
let backend_name =
format!("{}-backend-{}-{}", lb_name, pod.metadata.name, backend_port);
match backend_client
.create_backend(authorized_request(CreateBackendRequest {
.create_backend(authorized_request(
CreateBackendRequest {
name: backend_name.clone(),
pool_id: pool_id.clone(),
address: pod_ip.clone(),
port: backend_port as u32,
weight: 1,
}, &auth_token))
},
&auth_token,
))
.await
{
Ok(response) => {
@ -397,10 +426,10 @@ impl FiberLbController {
.metadata
.annotations
.insert("fiberlb.plasmacloud.io/lb-id".to_string(), lb_id.clone());
service
.metadata
.annotations
.insert("fiberlb.plasmacloud.io/pool-id".to_string(), pool_id.clone());
service.metadata.annotations.insert(
"fiberlb.plasmacloud.io/pool-id".to_string(),
pool_id.clone(),
);
// Merge with the latest stored version so the DNS controller does not lose its annotations.
if let Ok(Some(mut current)) = self
@ -447,9 +476,14 @@ impl FiberLbController {
/// This should be called when a Service with type=LoadBalancer is deleted.
/// For MVP, this is not automatically triggered - would need a deletion watch.
#[allow(dead_code)]
async fn cleanup_loadbalancer(&self, org_id: &str, project_id: &str, lb_id: &str) -> Result<()> {
let mut fiberlb_client = LoadBalancerServiceClient::connect(self.fiberlb_addr.clone())
.await?;
async fn cleanup_loadbalancer(
&self,
org_id: &str,
project_id: &str,
lb_id: &str,
) -> Result<()> {
let mut fiberlb_client =
LoadBalancerServiceClient::connect(self.fiberlb_addr.clone()).await?;
let auth_token = issue_controller_token(
&self.iam_server_addr,
CONTROLLER_PRINCIPAL_ID,

View file

@ -90,7 +90,10 @@ impl FlashDnsController {
.await?;
// Ensure cluster.local zone exists for this tenant
let zone_id = match self.ensure_zone_exists(org_id, project_id, &auth_token).await {
let zone_id = match self
.ensure_zone_exists(org_id, project_id, &auth_token)
.await
{
Ok(id) => id,
Err(e) => {
warn!(
@ -102,10 +105,7 @@ impl FlashDnsController {
};
// Get all services for this tenant
let services = self
.storage
.list_services(org_id, project_id, None)
.await?;
let services = self.storage.list_services(org_id, project_id, None).await?;
// Filter for services that need DNS records
let services_needing_dns: Vec<_> = services
@ -123,7 +123,10 @@ impl FlashDnsController {
.collect();
if services_needing_dns.is_empty() {
debug!("No services need DNS records for tenant {}/{}", org_id, project_id);
debug!(
"No services need DNS records for tenant {}/{}",
org_id, project_id
);
return Ok(());
}
@ -139,7 +142,10 @@ impl FlashDnsController {
{
Ok(client) => client,
Err(e) => {
warn!("Failed to connect to FlashDNS at {}: {}", self.flashdns_addr, e);
warn!(
"Failed to connect to FlashDNS at {}: {}",
self.flashdns_addr, e
);
return Ok(());
}
};
@ -375,13 +381,13 @@ impl FlashDnsController {
.into_inner()
.zones
.into_iter()
.find(|z| {
z.name.trim_end_matches('.')
== zone_name.trim_end_matches('.')
})
.find(|z| z.name.trim_end_matches('.') == zone_name.trim_end_matches('.'))
.map(|z| z.id)),
Err(list_error) => {
debug!("Zone list fallback failed for {}: {}", zone_name, list_error);
debug!(
"Zone list fallback failed for {}: {}",
zone_name, list_error
);
Ok(None)
}
}

View file

@ -119,14 +119,17 @@ impl IpamClient {
let mut client = self.connect().await?;
self.ensure_default_cluster_ip_pool(&mut client, org_id, project_id, authorization)
.await?;
let request = Self::with_auth(AllocateServiceIpRequest {
let request = Self::with_auth(
AllocateServiceIpRequest {
org_id: org_id.to_string(),
project_id: project_id.to_string(),
pool_id: String::new(), // Use default pool
pool_type: ProtoServiceIpPoolType::ClusterIp as i32,
service_uid: service_uid.to_string(),
requested_ip: String::new(), // Auto-allocate
}, authorization)?;
},
authorization,
)?;
let response = client
.allocate_service_ip(request)
@ -151,11 +154,14 @@ impl IpamClient {
authorization: Option<&str>,
) -> Result<()> {
let mut client = self.connect().await?;
let request = Self::with_auth(ReleaseServiceIpRequest {
let request = Self::with_auth(
ReleaseServiceIpRequest {
org_id: org_id.to_string(),
project_id: project_id.to_string(),
ip_address: ip_address.to_string(),
}, authorization)?;
},
authorization,
)?;
client
.release_service_ip(request)

View file

@ -3,14 +3,16 @@
//! Exports modules for testing and reuse
pub mod auth;
pub mod deployment_controller;
pub mod ipam_client;
pub mod services {
pub mod deployment;
pub mod node;
pub mod pod;
pub mod service;
pub mod node;
}
pub mod storage;
pub mod config;
pub mod rest;
pub mod storage;
pub use ipam_client::IpamClient;

View file

@ -1,6 +1,7 @@
mod auth;
mod cni;
mod config;
mod deployment_controller;
mod fiberlb_controller;
mod flashdns_controller;
mod ipam_client;
@ -15,19 +16,19 @@ use chainfire_client::Client as ChainFireClient;
use clap::Parser;
use config::Config;
use ipam_client::IpamClient;
use metrics_exporter_prometheus::PrometheusBuilder;
use k8shost_proto::{
deployment_service_server::{DeploymentService, DeploymentServiceServer},
node_service_server::NodeServiceServer,
pod_service_server::PodServiceServer,
service_service_server::ServiceServiceServer,
*,
deployment_service_server::DeploymentServiceServer, node_service_server::NodeServiceServer,
pod_service_server::PodServiceServer, service_service_server::ServiceServiceServer,
};
use metrics_exporter_prometheus::PrometheusBuilder;
use services::{
deployment::DeploymentServiceImpl, node::NodeServiceImpl, pod::PodServiceImpl,
service::ServiceServiceImpl,
};
use services::{node::NodeServiceImpl, pod::PodServiceImpl, service::ServiceServiceImpl};
use std::{path::PathBuf, sync::Arc};
use std::time::{SystemTime, UNIX_EPOCH};
use std::{path::PathBuf, sync::Arc};
use storage::Storage;
use tonic::{transport::Server, Request, Response, Status};
use tonic::{transport::Server, Request, Status};
use tracing::{info, warn};
use tracing_subscriber::EnvFilter;
@ -116,19 +117,27 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
},
flaredb: config::FlareDbConfig {
pd_addr: args.flaredb_pd_addr.or(loaded_config.flaredb.pd_addr),
direct_addr: args.flaredb_direct_addr.or(loaded_config.flaredb.direct_addr),
direct_addr: args
.flaredb_direct_addr
.or(loaded_config.flaredb.direct_addr),
},
chainfire: config::ChainFireConfig {
endpoint: args.chainfire_endpoint.or(loaded_config.chainfire.endpoint),
},
iam: config::IamConfig {
server_addr: args.iam_server_addr.unwrap_or(loaded_config.iam.server_addr),
server_addr: args
.iam_server_addr
.unwrap_or(loaded_config.iam.server_addr),
},
fiberlb: config::FiberLbConfig {
server_addr: args.fiberlb_server_addr.unwrap_or(loaded_config.fiberlb.server_addr),
server_addr: args
.fiberlb_server_addr
.unwrap_or(loaded_config.fiberlb.server_addr),
},
flashdns: config::FlashDnsConfig {
server_addr: args.flashdns_server_addr.unwrap_or(loaded_config.flashdns.server_addr),
server_addr: args
.flashdns_server_addr
.unwrap_or(loaded_config.flashdns.server_addr),
},
prismnet: config::PrismNetConfig {
server_addr: args
@ -198,7 +207,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
} else {
return Err(
anyhow::anyhow!("Failed to connect to FlareDB (direct): {}", e).into()
anyhow::anyhow!("Failed to connect to FlareDB (direct): {}", e).into(),
);
}
}
@ -211,7 +220,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
Arc::new(s)
}
Err(e) => {
warn!("Failed to connect to FlareDB: {}. Server will not start.", e);
warn!(
"Failed to connect to FlareDB: {}. Server will not start.",
e
);
return Err(anyhow::anyhow!("Failed to connect to FlareDB: {}", e).into());
}
}
@ -228,7 +240,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
Arc::new(s)
}
Err(e) => {
warn!("Failed to connect to IAM server: {}. Server will not start.", e);
warn!(
"Failed to connect to IAM server: {}. Server will not start.",
e
);
return Err(anyhow::anyhow!("Failed to connect to IAM server: {}", e).into());
}
};
@ -262,7 +277,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
auth_service.clone(),
));
let node_service = Arc::new(NodeServiceImpl::new(storage.clone(), auth_service.clone()));
let deployment_service = DeploymentServiceImpl; // Still unimplemented
let deployment_service = DeploymentServiceImpl::new(storage.clone(), auth_service.clone());
// Start scheduler in background with CreditService integration
let scheduler = Arc::new(scheduler::Scheduler::new_with_credit_service(storage.clone()).await);
@ -271,6 +286,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
});
info!("Scheduler started - tenant-aware with quota enforcement");
let deployment_controller = Arc::new(deployment_controller::DeploymentController::new(
storage.clone(),
));
tokio::spawn(async move {
deployment_controller.run().await;
});
info!("Deployment controller started - reconciling Deployment resources");
// Start FiberLB controller in background
let fiberlb_controller = Arc::new(fiberlb_controller::FiberLbController::new(
storage.clone(),
@ -280,7 +303,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
tokio::spawn(async move {
fiberlb_controller.run().await;
});
info!("FiberLB controller started - monitoring LoadBalancer services with per-tenant IAM tokens");
info!(
"FiberLB controller started - monitoring LoadBalancer services with per-tenant IAM tokens"
);
// Start FlashDNS controller in background
let flashdns_controller = Arc::new(flashdns_controller::FlashDnsController::new(
@ -297,25 +322,22 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Build gRPC server with authentication layer
let grpc_server = Server::builder()
.add_service(
tonic::codegen::InterceptedService::new(
.add_service(tonic::codegen::InterceptedService::new(
PodServiceServer::new(pod_service.as_ref().clone()),
make_interceptor(auth_service.clone()),
),
)
.add_service(
tonic::codegen::InterceptedService::new(
))
.add_service(tonic::codegen::InterceptedService::new(
ServiceServiceServer::new(service_service.as_ref().clone()),
make_interceptor(auth_service.clone()),
),
)
.add_service(
tonic::codegen::InterceptedService::new(
))
.add_service(tonic::codegen::InterceptedService::new(
NodeServiceServer::new(node_service.as_ref().clone()),
make_interceptor(auth_service.clone()),
),
)
.add_service(DeploymentServiceServer::new(deployment_service))
))
.add_service(tonic::codegen::InterceptedService::new(
DeploymentServiceServer::new(deployment_service),
make_interceptor(auth_service.clone()),
))
.serve(config.server.addr);
// HTTP REST API server
@ -350,59 +372,15 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
Ok(())
}
// Deployment Service Implementation (placeholder - not part of MVP)
#[derive(Debug, Default)]
struct DeploymentServiceImpl;
#[tonic::async_trait]
impl DeploymentService for DeploymentServiceImpl {
async fn create_deployment(
&self,
_request: Request<CreateDeploymentRequest>,
) -> Result<Response<CreateDeploymentResponse>, Status> {
Err(Status::unimplemented("create_deployment not yet implemented"))
}
async fn get_deployment(
&self,
_request: Request<GetDeploymentRequest>,
) -> Result<Response<GetDeploymentResponse>, Status> {
Err(Status::unimplemented("get_deployment not yet implemented"))
}
async fn list_deployments(
&self,
_request: Request<ListDeploymentsRequest>,
) -> Result<Response<ListDeploymentsResponse>, Status> {
Err(Status::unimplemented("list_deployments not yet implemented"))
}
async fn update_deployment(
&self,
_request: Request<UpdateDeploymentRequest>,
) -> Result<Response<UpdateDeploymentResponse>, Status> {
Err(Status::unimplemented("update_deployment not yet implemented"))
}
async fn delete_deployment(
&self,
_request: Request<DeleteDeploymentRequest>,
) -> Result<Response<DeleteDeploymentResponse>, Status> {
Err(Status::unimplemented("delete_deployment not yet implemented"))
}
}
fn init_logging(level: &str) {
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(level)))
.with_env_filter(
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(level)),
)
.init();
}
async fn register_chainfire_membership(
endpoint: &str,
service: &str,
addr: String,
) -> Result<()> {
async fn register_chainfire_membership(endpoint: &str, service: &str, addr: String) -> Result<()> {
let node_id =
std::env::var("HOSTNAME").unwrap_or_else(|_| format!("{}-{}", service, std::process::id()));
let ts = SystemTime::now()

View file

@ -11,26 +11,24 @@
use axum::{
extract::{Path, Query, State},
http::StatusCode,
http::HeaderMap,
routing::{delete, get, post},
http::StatusCode,
routing::{delete, get},
Json, Router,
};
use iam_service_auth::{resolve_tenant_ids_from_context, AuthService, TenantContext};
use k8shost_proto::{
pod_service_server::PodService,
service_service_server::ServiceService,
node_service_server::NodeService,
CreatePodRequest, DeletePodRequest, ListPodsRequest,
CreateServiceRequest, DeleteServiceRequest, ListServicesRequest,
ListNodesRequest, Pod as ProtoPod, Service as ProtoService, Node as ProtoNode,
ObjectMeta, PodSpec, Container, ServiceSpec, ServicePort,
node_service_server::NodeService, pod_service_server::PodService,
service_service_server::ServiceService, Container, CreatePodRequest, CreateServiceRequest,
DeletePodRequest, DeleteServiceRequest, ListNodesRequest, ListPodsRequest, ListServicesRequest,
Node as ProtoNode, ObjectMeta, Pod as ProtoPod, PodSpec, Service as ProtoService, ServicePort,
ServiceSpec,
};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tonic::{Code, Request};
use crate::services::{pod::PodServiceImpl, service::ServiceServiceImpl, node::NodeServiceImpl};
use crate::services::{node::NodeServiceImpl, pod::PodServiceImpl, service::ServiceServiceImpl};
/// REST API state
#[derive(Clone)]
@ -125,15 +123,28 @@ pub struct PodResponse {
impl From<ProtoPod> for PodResponse {
fn from(pod: ProtoPod) -> Self {
let phase = pod.status.as_ref()
let phase = pod
.status
.as_ref()
.and_then(|s| s.phase.clone())
.unwrap_or_else(|| "Unknown".to_string());
let ip = pod.status.as_ref().and_then(|s| s.pod_ip.clone());
let name = pod.metadata.as_ref().map(|m| m.name.clone()).unwrap_or_default();
let namespace = pod.metadata.as_ref()
let name = pod
.metadata
.as_ref()
.map(|m| m.name.clone())
.unwrap_or_default();
let namespace = pod
.metadata
.as_ref()
.and_then(|m| m.namespace.clone())
.unwrap_or_else(|| "default".to_string());
Self { name, namespace, phase, ip }
Self {
name,
namespace,
phase,
ip,
}
}
}
@ -156,24 +167,49 @@ pub struct ServicePortResponse {
impl From<ProtoService> for ServiceResponse {
fn from(svc: ProtoService) -> Self {
let ports = svc.spec.as_ref().map(|s| {
s.ports.iter().map(|p| ServicePortResponse {
let ports = svc
.spec
.as_ref()
.map(|s| {
s.ports
.iter()
.map(|p| ServicePortResponse {
port: p.port,
target_port: p.target_port.unwrap_or(p.port),
protocol: p.protocol.clone().unwrap_or_else(|| "TCP".to_string()),
}).collect()
}).unwrap_or_default();
})
.collect()
})
.unwrap_or_default();
let name = svc.metadata.as_ref().map(|m| m.name.clone()).unwrap_or_default();
let namespace = svc.metadata.as_ref()
let name = svc
.metadata
.as_ref()
.map(|m| m.name.clone())
.unwrap_or_default();
let namespace = svc
.metadata
.as_ref()
.and_then(|m| m.namespace.clone())
.unwrap_or_else(|| "default".to_string());
let service_type = svc.spec.as_ref()
let service_type = svc
.spec
.as_ref()
.and_then(|s| s.r#type.clone())
.unwrap_or_else(|| "ClusterIP".to_string());
let cluster_ip = svc.spec.as_ref().and_then(|s| s.cluster_ip.as_ref()).cloned();
let cluster_ip = svc
.spec
.as_ref()
.and_then(|s| s.cluster_ip.as_ref())
.cloned();
Self { name, namespace, service_type, cluster_ip, ports }
Self {
name,
namespace,
service_type,
cluster_ip,
ports,
}
}
}
@ -188,13 +224,34 @@ pub struct NodeResponse {
impl From<ProtoNode> for NodeResponse {
fn from(node: ProtoNode) -> Self {
let ready = node.status.as_ref()
.map(|s| s.conditions.iter().any(|c| c.r#type == "Ready" && c.status == "True"))
let ready = node
.status
.as_ref()
.map(|s| {
s.conditions
.iter()
.any(|c| c.r#type == "Ready" && c.status == "True")
})
.unwrap_or(false);
let name = node.metadata.as_ref().map(|m| m.name.clone()).unwrap_or_default();
let cpu_capacity = node.status.as_ref().and_then(|s| s.capacity.get("cpu").cloned());
let memory_capacity = node.status.as_ref().and_then(|s| s.capacity.get("memory").cloned());
Self { name, ready, cpu_capacity, memory_capacity }
let name = node
.metadata
.as_ref()
.map(|m| m.name.clone())
.unwrap_or_default();
let cpu_capacity = node
.status
.as_ref()
.and_then(|s| s.capacity.get("cpu").cloned());
let memory_capacity = node
.status
.as_ref()
.and_then(|s| s.capacity.get("memory").cloned());
Self {
name,
ready,
cpu_capacity,
memory_capacity,
}
}
}
@ -222,7 +279,10 @@ pub fn build_router(state: RestApiState) -> Router {
.route("/api/v1/pods", get(list_pods).post(create_pod))
.route("/api/v1/pods/{namespace}/{name}", delete(delete_pod))
.route("/api/v1/services", get(list_services).post(create_service))
.route("/api/v1/services/{namespace}/{name}", delete(delete_service))
.route(
"/api/v1/services/{namespace}/{name}",
delete(delete_service),
)
.route("/api/v1/nodes", get(list_nodes))
.route("/health", get(health_check))
.with_state(state)
@ -232,7 +292,9 @@ pub fn build_router(state: RestApiState) -> Router {
async fn health_check() -> (StatusCode, Json<SuccessResponse<serde_json::Value>>) {
(
StatusCode::OK,
Json(SuccessResponse::new(serde_json::json!({ "status": "healthy" }))),
Json(SuccessResponse::new(
serde_json::json!({ "status": "healthy" }),
)),
)
}
@ -249,11 +311,20 @@ async fn list_pods(
});
req.extensions_mut().insert(tenant);
let response = state.pod_service.list_pods(req)
.await
.map_err(|e| error_response(StatusCode::INTERNAL_SERVER_ERROR, "LIST_FAILED", &e.message()))?;
let response = state.pod_service.list_pods(req).await.map_err(|e| {
error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"LIST_FAILED",
&e.message(),
)
})?;
let pods: Vec<PodResponse> = response.into_inner().items.into_iter().map(PodResponse::from).collect();
let pods: Vec<PodResponse> = response
.into_inner()
.items
.into_iter()
.map(PodResponse::from)
.collect();
Ok(Json(SuccessResponse::new(PodsResponse { pods })))
}
@ -297,12 +368,21 @@ async fn create_pod(
});
grpc_req.extensions_mut().insert(tenant);
let response = state.pod_service.create_pod(grpc_req)
.await
.map_err(|e| error_response(StatusCode::INTERNAL_SERVER_ERROR, "CREATE_FAILED", &e.message()))?;
let response = state.pod_service.create_pod(grpc_req).await.map_err(|e| {
error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"CREATE_FAILED",
&e.message(),
)
})?;
let pod = response.into_inner().pod
.ok_or_else(|| error_response(StatusCode::INTERNAL_SERVER_ERROR, "CREATE_FAILED", "No pod returned"))?;
let pod = response.into_inner().pod.ok_or_else(|| {
error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"CREATE_FAILED",
"No pod returned",
)
})?;
Ok((
StatusCode::CREATED,
@ -315,7 +395,8 @@ async fn delete_pod(
State(state): State<RestApiState>,
Path((namespace, name)): Path<(String, String)>,
headers: HeaderMap,
) -> Result<(StatusCode, Json<SuccessResponse<serde_json::Value>>), (StatusCode, Json<ErrorResponse>)> {
) -> Result<(StatusCode, Json<SuccessResponse<serde_json::Value>>), (StatusCode, Json<ErrorResponse>)>
{
let tenant = resolve_rest_tenant(&state, &headers).await?;
let mut req = Request::new(DeletePodRequest {
name: name.clone(),
@ -323,13 +404,19 @@ async fn delete_pod(
});
req.extensions_mut().insert(tenant);
state.pod_service.delete_pod(req)
.await
.map_err(|e| error_response(StatusCode::INTERNAL_SERVER_ERROR, "DELETE_FAILED", &e.message()))?;
state.pod_service.delete_pod(req).await.map_err(|e| {
error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"DELETE_FAILED",
&e.message(),
)
})?;
Ok((
StatusCode::OK,
Json(SuccessResponse::new(serde_json::json!({ "name": name, "namespace": namespace, "deleted": true }))),
Json(SuccessResponse::new(
serde_json::json!({ "name": name, "namespace": namespace, "deleted": true }),
)),
))
}
@ -345,11 +432,24 @@ async fn list_services(
});
req.extensions_mut().insert(tenant);
let response = state.service_service.list_services(req)
let response = state
.service_service
.list_services(req)
.await
.map_err(|e| error_response(StatusCode::INTERNAL_SERVER_ERROR, "LIST_FAILED", &e.message()))?;
.map_err(|e| {
error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"LIST_FAILED",
&e.message(),
)
})?;
let services: Vec<ServiceResponse> = response.into_inner().items.into_iter().map(ServiceResponse::from).collect();
let services: Vec<ServiceResponse> = response
.into_inner()
.items
.into_iter()
.map(ServiceResponse::from)
.collect();
Ok(Json(SuccessResponse::new(ServicesResponse { services })))
}
@ -359,7 +459,8 @@ async fn create_service(
State(state): State<RestApiState>,
headers: HeaderMap,
Json(req): Json<CreateServiceRequestRest>,
) -> Result<(StatusCode, Json<SuccessResponse<ServiceResponse>>), (StatusCode, Json<ErrorResponse>)> {
) -> Result<(StatusCode, Json<SuccessResponse<ServiceResponse>>), (StatusCode, Json<ErrorResponse>)>
{
let tenant = resolve_rest_tenant(&state, &headers).await?;
let namespace = req.namespace.unwrap_or_else(|| "default".to_string());
let service_type = req.service_type.unwrap_or_else(|| "ClusterIP".to_string());
@ -393,12 +494,25 @@ async fn create_service(
});
grpc_req.extensions_mut().insert(tenant);
let response = state.service_service.create_service(grpc_req)
let response = state
.service_service
.create_service(grpc_req)
.await
.map_err(|e| error_response(StatusCode::INTERNAL_SERVER_ERROR, "CREATE_FAILED", &e.message()))?;
.map_err(|e| {
error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"CREATE_FAILED",
&e.message(),
)
})?;
let service = response.into_inner().service
.ok_or_else(|| error_response(StatusCode::INTERNAL_SERVER_ERROR, "CREATE_FAILED", "No service returned"))?;
let service = response.into_inner().service.ok_or_else(|| {
error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"CREATE_FAILED",
"No service returned",
)
})?;
Ok((
StatusCode::CREATED,
@ -411,7 +525,8 @@ async fn delete_service(
State(state): State<RestApiState>,
Path((namespace, name)): Path<(String, String)>,
headers: HeaderMap,
) -> Result<(StatusCode, Json<SuccessResponse<serde_json::Value>>), (StatusCode, Json<ErrorResponse>)> {
) -> Result<(StatusCode, Json<SuccessResponse<serde_json::Value>>), (StatusCode, Json<ErrorResponse>)>
{
let tenant = resolve_rest_tenant(&state, &headers).await?;
let mut req = Request::new(DeleteServiceRequest {
name: name.clone(),
@ -419,13 +534,23 @@ async fn delete_service(
});
req.extensions_mut().insert(tenant);
state.service_service.delete_service(req)
state
.service_service
.delete_service(req)
.await
.map_err(|e| error_response(StatusCode::INTERNAL_SERVER_ERROR, "DELETE_FAILED", &e.message()))?;
.map_err(|e| {
error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"DELETE_FAILED",
&e.message(),
)
})?;
Ok((
StatusCode::OK,
Json(SuccessResponse::new(serde_json::json!({ "name": name, "namespace": namespace, "deleted": true }))),
Json(SuccessResponse::new(
serde_json::json!({ "name": name, "namespace": namespace, "deleted": true }),
)),
))
}
@ -438,11 +563,20 @@ async fn list_nodes(
let mut req = Request::new(ListNodesRequest {});
req.extensions_mut().insert(tenant);
let response = state.node_service.list_nodes(req)
.await
.map_err(|e| error_response(StatusCode::INTERNAL_SERVER_ERROR, "LIST_FAILED", &e.message()))?;
let response = state.node_service.list_nodes(req).await.map_err(|e| {
error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"LIST_FAILED",
&e.message(),
)
})?;
let nodes: Vec<NodeResponse> = response.into_inner().items.into_iter().map(NodeResponse::from).collect();
let nodes: Vec<NodeResponse> = response
.into_inner()
.items
.into_iter()
.map(NodeResponse::from)
.collect();
Ok(Json(SuccessResponse::new(NodesResponse { nodes })))
}

View file

@ -38,7 +38,10 @@ impl Scheduler {
let credit_service = match std::env::var("CREDITSERVICE_ENDPOINT") {
Ok(endpoint) => match CreditServiceClient::connect(&endpoint).await {
Ok(client) => {
info!("Scheduler: CreditService quota enforcement enabled: {}", endpoint);
info!(
"Scheduler: CreditService quota enforcement enabled: {}",
endpoint
);
Some(Arc::new(RwLock::new(client)))
}
Err(e) => {
@ -64,7 +67,10 @@ impl Scheduler {
/// Start the scheduler loop
pub async fn run(self: Arc<Self>) {
info!("Scheduler started (spread algorithm, {}s interval)", self.interval.as_secs());
info!(
"Scheduler started (spread algorithm, {}s interval)",
self.interval.as_secs()
);
loop {
if let Err(e) = self.schedule_pending_pods().await {
@ -103,11 +109,7 @@ impl Scheduler {
async fn get_active_tenants(&self) -> anyhow::Result<Vec<(String, String)>> {
// Query all pods to find unique (org_id, project_id) combinations
// This is a pragmatic approach that doesn't require IAM changes
let all_pods = self
.storage
.list_all_pods()
.await
.unwrap_or_else(|e| {
let all_pods = self.storage.list_all_pods().await.unwrap_or_else(|e| {
warn!("Failed to query all pods for tenant discovery: {}", e);
vec![]
});
@ -133,7 +135,10 @@ impl Scheduler {
/// Schedule pending pods for a specific tenant
async fn schedule_tenant_pods(&self, org_id: &str, project_id: &str) -> anyhow::Result<()> {
// Get all pods in all namespaces for this tenant
let all_pods = self.storage.list_pods(org_id, project_id, None, None).await?;
let all_pods = self
.storage
.list_pods(org_id, project_id, None, None)
.await?;
// Filter to pending pods that need scheduling
let pending_pods: Vec<Pod> = all_pods
@ -157,15 +162,23 @@ impl Scheduler {
return Ok(());
}
info!("Scheduling {} pending pod(s) for tenant {}/{}",
pending_pods.len(), org_id, project_id);
info!(
"Scheduling {} pending pod(s) for tenant {}/{}",
pending_pods.len(),
org_id,
project_id
);
// Get all nodes for this tenant
let nodes = self.storage.list_nodes(org_id, project_id).await?;
if nodes.is_empty() {
warn!("No nodes available for tenant {}/{}. {} pod(s) remain pending.",
org_id, project_id, pending_pods.len());
warn!(
"No nodes available for tenant {}/{}. {} pod(s) remain pending.",
org_id,
project_id,
pending_pods.len()
);
return Ok(());
}
@ -176,15 +189,21 @@ impl Scheduler {
.collect();
if ready_nodes.is_empty() {
warn!("No ready nodes available for tenant {}/{}. {} pod(s) remain pending.",
org_id, project_id, pending_pods.len());
warn!(
"No ready nodes available for tenant {}/{}. {} pod(s) remain pending.",
org_id,
project_id,
pending_pods.len()
);
return Ok(());
}
info!("Found {} ready node(s) for scheduling", ready_nodes.len());
// Get current pod count per node for spread algorithm
let pod_counts = self.count_pods_per_node(org_id, project_id, &ready_nodes).await?;
let pod_counts = self
.count_pods_per_node(org_id, project_id, &ready_nodes)
.await?;
// Schedule each pending pod
for pod in pending_pods {
@ -233,9 +252,10 @@ impl Scheduler {
node.status
.as_ref()
.map(|status| {
status.conditions.iter().any(|cond| {
cond.r#type == "Ready" && cond.status == "True"
})
status
.conditions
.iter()
.any(|cond| cond.r#type == "Ready" && cond.status == "True")
})
.unwrap_or(false)
}
@ -247,13 +267,14 @@ impl Scheduler {
project_id: &str,
nodes: &[Node],
) -> anyhow::Result<HashMap<String, usize>> {
let mut counts: HashMap<String, usize> = nodes
.iter()
.map(|n| (n.metadata.name.clone(), 0))
.collect();
let mut counts: HashMap<String, usize> =
nodes.iter().map(|n| (n.metadata.name.clone(), 0)).collect();
// Get all assigned pods
let all_pods = self.storage.list_pods(org_id, project_id, None, None).await?;
let all_pods = self
.storage
.list_pods(org_id, project_id, None, None)
.await?;
// Count pods per node
for pod in all_pods {
@ -324,12 +345,7 @@ impl Scheduler {
// Check if tenant has sufficient quota
use creditservice_client::ResourceType;
match client
.check_quota(
project_id,
ResourceType::K8sNode,
1,
estimated_cost as i64,
)
.check_quota(project_id, ResourceType::K8sNode, 1, estimated_cost as i64)
.await
{
Ok(response) if !response.allowed => {
@ -389,10 +405,7 @@ impl Scheduler {
/// Parse memory string to GB (e.g., "512Mi" -> 0.5, "2Gi" -> 2.0)
fn parse_memory_to_gb(memory: &str) -> Option<f64> {
if memory.ends_with("Gi") {
memory
.trim_end_matches("Gi")
.parse::<f64>()
.ok()
memory.trim_end_matches("Gi").parse::<f64>().ok()
} else if memory.ends_with("Mi") {
memory
.trim_end_matches("Mi")
@ -407,7 +420,10 @@ impl Scheduler {
.map(|ki| ki / (1024.0 * 1024.0))
} else {
// Assume bytes
memory.parse::<f64>().ok().map(|bytes| bytes / (1024.0 * 1024.0 * 1024.0))
memory
.parse::<f64>()
.ok()
.map(|bytes| bytes / (1024.0 * 1024.0 * 1024.0))
}
}
}
@ -419,7 +435,11 @@ mod tests {
#[tokio::test]
async fn test_is_node_ready() {
let storage = Arc::new(Storage::new("memory://test".to_string()).await.expect("Failed to create storage"));
let storage = Arc::new(
Storage::new("memory://test".to_string())
.await
.expect("Failed to create storage"),
);
let scheduler = Scheduler::new(storage);
// Node with Ready=True condition
@ -468,7 +488,11 @@ mod tests {
#[tokio::test]
async fn test_select_node_spread() {
let storage = Arc::new(Storage::new("memory://test".to_string()).await.expect("Failed to create storage"));
let storage = Arc::new(
Storage::new("memory://test".to_string())
.await
.expect("Failed to create storage"),
);
let scheduler = Scheduler::new(storage);
let node1 = Node {

View file

@ -0,0 +1,696 @@
//! Deployment service implementation.
//!
//! Provides CRUD for Deployment resources and treats them as first-class
//! tenant-scoped objects persisted in FlareDB.
use crate::auth::{
get_tenant_context, resolve_tenant_ids_from_context, resource_for_tenant, AuthService,
};
use crate::storage::Storage;
use chrono::Utc;
use k8shost_proto::{
deployment_service_server::DeploymentService, CreateDeploymentRequest,
CreateDeploymentResponse, DeleteDeploymentRequest, DeleteDeploymentResponse,
GetDeploymentRequest, GetDeploymentResponse, ListDeploymentsRequest, ListDeploymentsResponse,
UpdateDeploymentRequest, UpdateDeploymentResponse,
};
use sha2::{Digest, Sha256};
use std::sync::Arc;
use tonic::{Request, Response, Status};
use uuid::Uuid;
const ACTION_DEPLOYMENT_CREATE: &str = "k8s:deployments:create";
const ACTION_DEPLOYMENT_READ: &str = "k8s:deployments:read";
const ACTION_DEPLOYMENT_LIST: &str = "k8s:deployments:list";
const ACTION_DEPLOYMENT_UPDATE: &str = "k8s:deployments:update";
const ACTION_DEPLOYMENT_DELETE: &str = "k8s:deployments:delete";
pub(crate) const DEPLOYMENT_NAME_ANNOTATION: &str = "k8shost.photoncloud.io/deployment-name";
pub(crate) const DEPLOYMENT_UID_ANNOTATION: &str = "k8shost.photoncloud.io/deployment-uid";
pub(crate) const TEMPLATE_HASH_ANNOTATION: &str = "k8shost.photoncloud.io/template-hash";
#[derive(Clone)]
pub struct DeploymentServiceImpl {
storage: Arc<Storage>,
auth: Arc<AuthService>,
}
impl DeploymentServiceImpl {
pub fn new(storage: Arc<Storage>, auth: Arc<AuthService>) -> Self {
Self { storage, auth }
}
pub fn to_proto_deployment(
deployment: &k8shost_types::Deployment,
) -> k8shost_proto::Deployment {
k8shost_proto::Deployment {
metadata: Some(to_proto_meta(&deployment.metadata)),
spec: Some(k8shost_proto::DeploymentSpec {
replicas: deployment.spec.replicas,
selector: Some(k8shost_proto::LabelSelector {
match_labels: deployment.spec.selector.match_labels.clone(),
}),
template: Some(k8shost_proto::PodTemplateSpec {
metadata: Some(to_proto_meta(&deployment.spec.template.metadata)),
spec: Some(to_proto_pod_spec(&deployment.spec.template.spec)),
}),
}),
status: deployment
.status
.as_ref()
.map(|status| k8shost_proto::DeploymentStatus {
replicas: status.replicas,
ready_replicas: status.ready_replicas,
available_replicas: status.available_replicas,
}),
}
}
pub fn from_proto_deployment(
deployment: &k8shost_proto::Deployment,
) -> Result<k8shost_types::Deployment, Status> {
let metadata = deployment
.metadata
.as_ref()
.ok_or_else(|| Status::invalid_argument("metadata is required"))?;
let spec = deployment
.spec
.as_ref()
.ok_or_else(|| Status::invalid_argument("spec is required"))?;
let selector = spec
.selector
.as_ref()
.ok_or_else(|| Status::invalid_argument("selector is required"))?;
let template = spec
.template
.as_ref()
.ok_or_else(|| Status::invalid_argument("template is required"))?;
let template_meta = template
.metadata
.as_ref()
.ok_or_else(|| Status::invalid_argument("template.metadata is required"))?;
let template_spec = template
.spec
.as_ref()
.ok_or_else(|| Status::invalid_argument("template.spec is required"))?;
Ok(k8shost_types::Deployment {
metadata: from_proto_meta(metadata)?,
spec: k8shost_types::DeploymentSpec {
replicas: spec.replicas,
selector: k8shost_types::LabelSelector {
match_labels: selector.match_labels.clone(),
},
template: k8shost_types::PodTemplateSpec {
metadata: from_proto_meta(template_meta)?,
spec: from_proto_pod_spec(template_spec),
},
},
status: deployment
.status
.as_ref()
.map(|status| k8shost_types::DeploymentStatus {
replicas: status.replicas,
ready_replicas: status.ready_replicas,
available_replicas: status.available_replicas,
}),
})
}
pub(crate) fn validate_spec(deployment: &k8shost_types::Deployment) -> Result<(), Status> {
if deployment.metadata.namespace.is_none() {
return Err(Status::invalid_argument("namespace is required"));
}
if deployment.spec.replicas.unwrap_or(1) < 0 {
return Err(Status::invalid_argument("replicas must be >= 0"));
}
if deployment.spec.selector.match_labels.is_empty() {
return Err(Status::invalid_argument(
"selector.match_labels must not be empty",
));
}
if deployment.spec.template.spec.containers.is_empty() {
return Err(Status::invalid_argument(
"template.spec.containers must not be empty",
));
}
let template_labels = &deployment.spec.template.metadata.labels;
let selector_matches_template = deployment
.spec
.selector
.match_labels
.iter()
.all(|(key, value)| template_labels.get(key) == Some(value));
if !selector_matches_template {
return Err(Status::invalid_argument(
"selector.match_labels must be present in template.metadata.labels",
));
}
Ok(())
}
pub(crate) fn pod_is_owned_by_deployment(
deployment: &k8shost_types::Deployment,
pod: &k8shost_types::Pod,
) -> bool {
let annotations = &pod.metadata.annotations;
if let Some(uid) = deployment.metadata.uid.as_deref() {
if annotations
.get(DEPLOYMENT_UID_ANNOTATION)
.map(String::as_str)
== Some(uid)
{
return true;
}
}
annotations
.get(DEPLOYMENT_NAME_ANNOTATION)
.map(String::as_str)
== Some(deployment.metadata.name.as_str())
}
}
pub(crate) fn deployment_template_hash(
deployment: &k8shost_types::Deployment,
) -> Result<String, Status> {
let payload = serde_json::to_vec(&deployment.spec.template)
.map_err(|e| Status::internal(format!("failed to hash deployment template: {}", e)))?;
let mut hasher = Sha256::new();
hasher.update(payload);
Ok(format!("{:x}", hasher.finalize()))
}
fn default_status() -> k8shost_types::DeploymentStatus {
k8shost_types::DeploymentStatus {
replicas: Some(0),
ready_replicas: Some(0),
available_replicas: Some(0),
}
}
fn next_resource_version(current: Option<&str>) -> String {
let current = current
.and_then(|version| version.parse::<u64>().ok())
.unwrap_or(0);
(current + 1).to_string()
}
fn to_proto_meta(meta: &k8shost_types::ObjectMeta) -> k8shost_proto::ObjectMeta {
k8shost_proto::ObjectMeta {
name: meta.name.clone(),
namespace: meta.namespace.clone(),
uid: meta.uid.clone(),
resource_version: meta.resource_version.clone(),
creation_timestamp: meta.creation_timestamp.map(|ts| ts.to_rfc3339()),
labels: meta.labels.clone(),
annotations: meta.annotations.clone(),
org_id: meta.org_id.clone(),
project_id: meta.project_id.clone(),
}
}
fn from_proto_meta(meta: &k8shost_proto::ObjectMeta) -> Result<k8shost_types::ObjectMeta, Status> {
Ok(k8shost_types::ObjectMeta {
name: meta.name.clone(),
namespace: meta.namespace.clone(),
uid: meta.uid.clone(),
resource_version: meta.resource_version.clone(),
creation_timestamp: meta
.creation_timestamp
.as_ref()
.and_then(|ts| chrono::DateTime::parse_from_rfc3339(ts).ok())
.map(|ts| ts.with_timezone(&Utc)),
labels: meta.labels.clone(),
annotations: meta.annotations.clone(),
org_id: meta.org_id.clone(),
project_id: meta.project_id.clone(),
})
}
fn to_proto_pod_spec(spec: &k8shost_types::PodSpec) -> k8shost_proto::PodSpec {
k8shost_proto::PodSpec {
containers: spec
.containers
.iter()
.map(|container| k8shost_proto::Container {
name: container.name.clone(),
image: container.image.clone(),
command: container.command.clone(),
args: container.args.clone(),
ports: container
.ports
.iter()
.map(|port| k8shost_proto::ContainerPort {
name: port.name.clone(),
container_port: port.container_port,
protocol: port.protocol.clone(),
})
.collect(),
env: container
.env
.iter()
.map(|env| k8shost_proto::EnvVar {
name: env.name.clone(),
value: env.value.clone(),
})
.collect(),
})
.collect(),
restart_policy: spec.restart_policy.clone(),
node_name: spec.node_name.clone(),
}
}
fn from_proto_pod_spec(spec: &k8shost_proto::PodSpec) -> k8shost_types::PodSpec {
k8shost_types::PodSpec {
containers: spec
.containers
.iter()
.map(|container| k8shost_types::Container {
name: container.name.clone(),
image: container.image.clone(),
command: container.command.clone(),
args: container.args.clone(),
ports: container
.ports
.iter()
.map(|port| k8shost_types::ContainerPort {
name: port.name.clone(),
container_port: port.container_port,
protocol: port.protocol.clone(),
})
.collect(),
env: container
.env
.iter()
.map(|env| k8shost_types::EnvVar {
name: env.name.clone(),
value: env.value.clone(),
})
.collect(),
resources: None,
})
.collect(),
restart_policy: spec.restart_policy.clone(),
node_name: spec.node_name.clone(),
}
}
#[tonic::async_trait]
impl DeploymentService for DeploymentServiceImpl {
async fn create_deployment(
&self,
request: Request<CreateDeploymentRequest>,
) -> Result<Response<CreateDeploymentResponse>, Status> {
let tenant = get_tenant_context(&request)?;
let req = request.into_inner();
let proto_deployment = req
.deployment
.ok_or_else(|| Status::invalid_argument("deployment is required"))?;
let mut deployment = Self::from_proto_deployment(&proto_deployment)?;
let (org_id, project_id) = resolve_tenant_ids_from_context(
&tenant,
deployment.metadata.org_id.as_deref().unwrap_or(""),
deployment.metadata.project_id.as_deref().unwrap_or(""),
)?;
deployment.metadata.org_id = Some(org_id.clone());
deployment.metadata.project_id = Some(project_id.clone());
Self::validate_spec(&deployment)?;
let deployment_key = format!(
"{}/{}",
deployment
.metadata
.namespace
.as_deref()
.unwrap_or("default"),
deployment.metadata.name
);
self.auth
.authorize(
&tenant,
ACTION_DEPLOYMENT_CREATE,
&resource_for_tenant("deployment", deployment_key, &org_id, &project_id),
)
.await?;
let namespace = deployment
.metadata
.namespace
.as_deref()
.ok_or_else(|| Status::invalid_argument("namespace is required"))?;
if self
.storage
.get_deployment(&org_id, &project_id, namespace, &deployment.metadata.name)
.await?
.is_some()
{
return Err(Status::already_exists(format!(
"Deployment {} already exists",
deployment.metadata.name
)));
}
if deployment.metadata.uid.is_none() {
deployment.metadata.uid = Some(Uuid::new_v4().to_string());
}
if deployment.metadata.creation_timestamp.is_none() {
deployment.metadata.creation_timestamp = Some(Utc::now());
}
deployment.metadata.resource_version = Some("1".to_string());
deployment.status = Some(default_status());
self.storage.put_deployment(&deployment).await?;
Ok(Response::new(CreateDeploymentResponse {
deployment: Some(Self::to_proto_deployment(&deployment)),
}))
}
async fn get_deployment(
&self,
request: Request<GetDeploymentRequest>,
) -> Result<Response<GetDeploymentResponse>, Status> {
let tenant = get_tenant_context(&request)?;
let req = request.into_inner();
let deployment_key = format!("{}/{}", req.namespace, req.name);
self.auth
.authorize(
&tenant,
ACTION_DEPLOYMENT_READ,
&resource_for_tenant(
"deployment",
deployment_key,
&tenant.org_id,
&tenant.project_id,
),
)
.await?;
let deployment = self
.storage
.get_deployment(
&tenant.org_id,
&tenant.project_id,
&req.namespace,
&req.name,
)
.await?;
match deployment {
Some(deployment) => Ok(Response::new(GetDeploymentResponse {
deployment: Some(Self::to_proto_deployment(&deployment)),
})),
None => Err(Status::not_found(format!(
"Deployment {} not found",
req.name
))),
}
}
async fn list_deployments(
&self,
request: Request<ListDeploymentsRequest>,
) -> Result<Response<ListDeploymentsResponse>, Status> {
let tenant = get_tenant_context(&request)?;
self.auth
.authorize(
&tenant,
ACTION_DEPLOYMENT_LIST,
&resource_for_tenant("deployment", "*", &tenant.org_id, &tenant.project_id),
)
.await?;
let req = request.into_inner();
let items = self
.storage
.list_deployments(&tenant.org_id, &tenant.project_id, req.namespace.as_deref())
.await?
.iter()
.map(Self::to_proto_deployment)
.collect();
Ok(Response::new(ListDeploymentsResponse { items }))
}
async fn update_deployment(
&self,
request: Request<UpdateDeploymentRequest>,
) -> Result<Response<UpdateDeploymentResponse>, Status> {
let tenant = get_tenant_context(&request)?;
let req = request.into_inner();
let proto_deployment = req
.deployment
.ok_or_else(|| Status::invalid_argument("deployment is required"))?;
let mut deployment = Self::from_proto_deployment(&proto_deployment)?;
let (org_id, project_id) = resolve_tenant_ids_from_context(
&tenant,
deployment.metadata.org_id.as_deref().unwrap_or(""),
deployment.metadata.project_id.as_deref().unwrap_or(""),
)?;
deployment.metadata.org_id = Some(org_id.clone());
deployment.metadata.project_id = Some(project_id.clone());
Self::validate_spec(&deployment)?;
let namespace = deployment
.metadata
.namespace
.as_deref()
.ok_or_else(|| Status::invalid_argument("namespace is required"))?;
let deployment_key = format!("{}/{}", namespace, deployment.metadata.name);
self.auth
.authorize(
&tenant,
ACTION_DEPLOYMENT_UPDATE,
&resource_for_tenant("deployment", deployment_key, &org_id, &project_id),
)
.await?;
let current = self
.storage
.get_deployment(&org_id, &project_id, namespace, &deployment.metadata.name)
.await?
.ok_or_else(|| {
Status::not_found(format!("Deployment {} not found", deployment.metadata.name))
})?;
deployment.metadata.uid = current.metadata.uid.clone();
deployment.metadata.creation_timestamp = current.metadata.creation_timestamp;
deployment.metadata.resource_version = Some(next_resource_version(
current.metadata.resource_version.as_deref(),
));
deployment.status = current.status.clone();
self.storage.put_deployment(&deployment).await?;
Ok(Response::new(UpdateDeploymentResponse {
deployment: Some(Self::to_proto_deployment(&deployment)),
}))
}
async fn delete_deployment(
&self,
request: Request<DeleteDeploymentRequest>,
) -> Result<Response<DeleteDeploymentResponse>, Status> {
let tenant = get_tenant_context(&request)?;
let req = request.into_inner();
let deployment_key = format!("{}/{}", req.namespace, req.name);
self.auth
.authorize(
&tenant,
ACTION_DEPLOYMENT_DELETE,
&resource_for_tenant(
"deployment",
deployment_key,
&tenant.org_id,
&tenant.project_id,
),
)
.await?;
let existing = self
.storage
.get_deployment(
&tenant.org_id,
&tenant.project_id,
&req.namespace,
&req.name,
)
.await?;
let Some(deployment) = existing else {
return Ok(Response::new(DeleteDeploymentResponse { success: false }));
};
let success = self
.storage
.delete_deployment(
&tenant.org_id,
&tenant.project_id,
&req.namespace,
&req.name,
)
.await?;
let pods = self
.storage
.list_pods(
&tenant.org_id,
&tenant.project_id,
Some(&req.namespace),
None,
)
.await?;
for pod in pods
.into_iter()
.filter(|pod| Self::pod_is_owned_by_deployment(&deployment, pod))
{
let namespace = pod.metadata.namespace.as_deref().unwrap_or("default");
let _ = self
.storage
.delete_pod(
&tenant.org_id,
&tenant.project_id,
namespace,
&pod.metadata.name,
)
.await?;
}
Ok(Response::new(DeleteDeploymentResponse { success }))
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
fn test_deployment() -> k8shost_types::Deployment {
k8shost_types::Deployment {
metadata: k8shost_types::ObjectMeta {
name: "web".to_string(),
namespace: Some("default".to_string()),
uid: Some("deploy-uid".to_string()),
resource_version: Some("4".to_string()),
creation_timestamp: Some(Utc::now()),
labels: HashMap::new(),
annotations: HashMap::new(),
org_id: Some("test-org".to_string()),
project_id: Some("test-project".to_string()),
},
spec: k8shost_types::DeploymentSpec {
replicas: Some(2),
selector: k8shost_types::LabelSelector {
match_labels: HashMap::from([("app".to_string(), "web".to_string())]),
},
template: k8shost_types::PodTemplateSpec {
metadata: k8shost_types::ObjectMeta {
name: "".to_string(),
namespace: Some("default".to_string()),
uid: None,
resource_version: None,
creation_timestamp: None,
labels: HashMap::from([("app".to_string(), "web".to_string())]),
annotations: HashMap::new(),
org_id: None,
project_id: None,
},
spec: k8shost_types::PodSpec {
containers: vec![k8shost_types::Container {
name: "web".to_string(),
image: "nginx:latest".to_string(),
command: Vec::new(),
args: Vec::new(),
ports: Vec::new(),
env: Vec::new(),
resources: None,
}],
restart_policy: Some("Always".to_string()),
node_name: None,
},
},
},
status: Some(default_status()),
}
}
#[test]
fn deployment_round_trip_proto_conversion() {
let deployment = test_deployment();
let proto = DeploymentServiceImpl::to_proto_deployment(&deployment);
let round_trip = DeploymentServiceImpl::from_proto_deployment(&proto).unwrap();
assert_eq!(round_trip.metadata.name, deployment.metadata.name);
assert_eq!(round_trip.spec.replicas, Some(2));
assert_eq!(
round_trip
.spec
.selector
.match_labels
.get("app")
.map(String::as_str),
Some("web")
);
}
#[test]
fn validate_spec_requires_selector_labels_on_template() {
let mut deployment = test_deployment();
deployment.spec.template.metadata.labels.clear();
let error = DeploymentServiceImpl::validate_spec(&deployment).unwrap_err();
assert_eq!(error.code(), tonic::Code::InvalidArgument);
}
#[test]
fn template_hash_changes_when_template_changes() {
let deployment = test_deployment();
let mut changed = test_deployment();
changed.spec.template.spec.containers[0].image = "caddy:latest".to_string();
let original_hash = deployment_template_hash(&deployment).unwrap();
let changed_hash = deployment_template_hash(&changed).unwrap();
assert_ne!(original_hash, changed_hash);
}
#[test]
fn owned_pod_detection_matches_uid_annotation() {
let deployment = test_deployment();
let mut pod = k8shost_types::Pod {
metadata: k8shost_types::ObjectMeta {
name: "web-1".to_string(),
namespace: Some("default".to_string()),
uid: None,
resource_version: None,
creation_timestamp: None,
labels: HashMap::new(),
annotations: HashMap::from([(
DEPLOYMENT_UID_ANNOTATION.to_string(),
"deploy-uid".to_string(),
)]),
org_id: Some("test-org".to_string()),
project_id: Some("test-project".to_string()),
},
spec: k8shost_types::PodSpec {
containers: Vec::new(),
restart_policy: None,
node_name: None,
},
status: None,
};
assert!(DeploymentServiceImpl::pod_is_owned_by_deployment(
&deployment,
&pod
));
pod.metadata.annotations.clear();
assert!(!DeploymentServiceImpl::pod_is_owned_by_deployment(
&deployment,
&pod
));
}
}

View file

@ -1,6 +1,7 @@
pub mod deployment;
pub mod node;
pub mod pod;
pub mod service;
pub mod node;
#[cfg(test)]
mod tests;

View file

@ -143,9 +143,7 @@ impl NodeServiceImpl {
}
/// Convert proto NodeStatus to k8shost_types::NodeStatus
fn from_proto_node_status(
proto: &k8shost_proto::NodeStatus,
) -> k8shost_types::NodeStatus {
fn from_proto_node_status(proto: &k8shost_proto::NodeStatus) -> k8shost_types::NodeStatus {
k8shost_types::NodeStatus {
addresses: proto
.addresses
@ -247,7 +245,11 @@ impl NodeService for NodeServiceImpl {
// Get existing node
let mut node = self
.storage
.get_node(&tenant_context.org_id, &tenant_context.project_id, &req.node_name)
.get_node(
&tenant_context.org_id,
&tenant_context.project_id,
&req.node_name,
)
.await?
.ok_or_else(|| Status::not_found(format!("Node {} not found", req.node_name)))?;
@ -257,9 +259,10 @@ impl NodeService for NodeServiceImpl {
}
// Update last heartbeat timestamp in annotations
node.metadata
.annotations
.insert("k8shost.io/last-heartbeat".to_string(), Utc::now().to_rfc3339());
node.metadata.annotations.insert(
"k8shost.io/last-heartbeat".to_string(),
Utc::now().to_rfc3339(),
);
// Increment resource version
let current_version = node
@ -286,15 +289,22 @@ impl NodeService for NodeServiceImpl {
.authorize(
&tenant_context,
ACTION_NODE_LIST,
&resource_for_tenant("node", "*", &tenant_context.org_id, &tenant_context.project_id),
&resource_for_tenant(
"node",
"*",
&tenant_context.org_id,
&tenant_context.project_id,
),
)
.await?;
let _req = request.into_inner();
let nodes = self.storage.list_nodes(&tenant_context.org_id, &tenant_context.project_id).await?;
let nodes = self
.storage
.list_nodes(&tenant_context.org_id, &tenant_context.project_id)
.await?;
let items: Vec<k8shost_proto::Node> =
nodes.iter().map(Self::to_proto_node).collect();
let items: Vec<k8shost_proto::Node> = nodes.iter().map(Self::to_proto_node).collect();
Ok(Response::new(ListNodesResponse { items }))
}

View file

@ -48,18 +48,19 @@ impl PodServiceImpl {
pub async fn new_with_credit_service(storage: Arc<Storage>, auth: Arc<AuthService>) -> Self {
// Initialize CreditService client if endpoint is configured
let credit_service = match std::env::var("CREDITSERVICE_ENDPOINT") {
Ok(endpoint) => {
match CreditServiceClient::connect(&endpoint).await {
Ok(endpoint) => match CreditServiceClient::connect(&endpoint).await {
Ok(client) => {
tracing::info!("CreditService admission control enabled: {}", endpoint);
Some(Arc::new(RwLock::new(client)))
}
Err(e) => {
tracing::warn!("Failed to connect to CreditService (admission control disabled): {}", e);
tracing::warn!(
"Failed to connect to CreditService (admission control disabled): {}",
e
);
None
}
}
}
},
Err(_) => {
tracing::info!("CREDITSERVICE_ENDPOINT not set, admission control disabled");
None
@ -118,7 +119,10 @@ impl PodServiceImpl {
stripped.parse::<i64>().ok()
} else {
// Full CPU cores
cpu_str.parse::<f64>().ok().map(|cores| (cores * 1000.0) as i64)
cpu_str
.parse::<f64>()
.ok()
.map(|cores| (cores * 1000.0) as i64)
}
}
@ -134,7 +138,10 @@ impl PodServiceImpl {
stripped.parse::<i64>().ok().map(|gb| gb * 1000)
} else {
// Try parsing as bytes
mem_str.parse::<i64>().ok().map(|bytes| bytes / (1024 * 1024))
mem_str
.parse::<i64>()
.ok()
.map(|bytes| bytes / (1024 * 1024))
}
}
@ -307,7 +314,9 @@ impl PodService for PodServiceImpl {
) -> Result<Response<CreatePodResponse>, Status> {
let tenant = get_tenant_context(&request)?;
let req = request.into_inner();
let proto_pod = req.pod.ok_or_else(|| Status::invalid_argument("pod is required"))?;
let proto_pod = req
.pod
.ok_or_else(|| Status::invalid_argument("pod is required"))?;
// Convert proto to internal type
let mut pod = Self::from_proto_pod(&proto_pod)?;
@ -327,7 +336,9 @@ impl PodService for PodServiceImpl {
)
.await?;
if pod.metadata.namespace.is_none() {
return Err(Status::invalid_argument("namespace is required in metadata"));
return Err(Status::invalid_argument(
"namespace is required in metadata",
));
}
// Assign UID if not present
@ -362,11 +373,15 @@ impl PodService for PodServiceImpl {
// Calculate estimated cost based on Pod resource requests
let estimated_cost = Self::calculate_pod_cost(&pod);
let project_id = pod.metadata.project_id.as_ref()
.ok_or_else(|| Status::invalid_argument("project_id required for admission control"))?;
let project_id = pod.metadata.project_id.as_ref().ok_or_else(|| {
Status::invalid_argument("project_id required for admission control")
})?;
// Phase 0: Check quota
match client.check_quota(project_id, ResourceType::K8sNode, 1, estimated_cost).await {
match client
.check_quota(project_id, ResourceType::K8sNode, 1, estimated_cost)
.await
{
Ok(resp) if !resp.allowed => {
let reason = if resp.reason.is_empty() {
"Insufficient quota or balance".to_string()
@ -378,7 +393,10 @@ impl PodService for PodServiceImpl {
reason = %reason,
"Pod creation denied by CreditService"
);
return Err(Status::resource_exhausted(format!("Admission denied: {}", reason)));
return Err(Status::resource_exhausted(format!(
"Admission denied: {}",
reason
)));
}
Err(e) => {
tracing::warn!("CreditService check_quota failed (allowing request): {}", e);
@ -388,13 +406,16 @@ impl PodService for PodServiceImpl {
}
// Phase 1: Reserve credits
match client.reserve_credits(
match client
.reserve_credits(
project_id,
estimated_cost,
format!("Pod {} creation", pod.metadata.name),
"PodInstance",
300, // 5 minute TTL
).await {
)
.await
{
Ok(reservation) => {
tracing::info!(
reservation_id = %reservation.id,
@ -405,7 +426,10 @@ impl PodService for PodServiceImpl {
}
Err(e) => {
tracing::warn!("CreditService reserve_credits failed: {}", e);
return Err(Status::resource_exhausted(format!("Failed to reserve credits: {}", e)));
return Err(Status::resource_exhausted(format!(
"Failed to reserve credits: {}",
e
)));
}
}
} else {
@ -417,9 +441,14 @@ impl PodService for PodServiceImpl {
Ok(_) => {}
Err(e) => {
// Rollback: Release reservation on failure
if let (Some(ref credit_svc), Some(ref res_id)) = (&self.credit_service, &reservation_id) {
if let (Some(ref credit_svc), Some(ref res_id)) =
(&self.credit_service, &reservation_id)
{
let mut client = credit_svc.write().await;
if let Err(release_err) = client.release_reservation(res_id, format!("Pod storage failed: {}", e)).await {
if let Err(release_err) = client
.release_reservation(res_id, format!("Pod storage failed: {}", e))
.await
{
tracing::warn!("Failed to release reservation {}: {}", res_id, release_err);
} else {
tracing::info!(reservation_id = %res_id, "Released reservation after Pod storage failure");
@ -435,7 +464,10 @@ impl PodService for PodServiceImpl {
let actual_cost = Self::calculate_pod_cost(&pod);
let pod_uid = pod.metadata.uid.as_ref().unwrap_or(&pod.metadata.name);
if let Err(e) = client.commit_reservation(res_id, actual_cost, pod_uid).await {
if let Err(e) = client
.commit_reservation(res_id, actual_cost, pod_uid)
.await
{
tracing::warn!("Failed to commit reservation {}: {}", res_id, e);
// Pod is already created, so we don't fail here - billing will reconcile
} else {
@ -471,13 +503,23 @@ impl PodService for PodServiceImpl {
.authorize(
&tenant_context,
ACTION_POD_READ,
&resource_for_tenant("pod", pod_key, &tenant_context.org_id, &tenant_context.project_id),
&resource_for_tenant(
"pod",
pod_key,
&tenant_context.org_id,
&tenant_context.project_id,
),
)
.await?;
let pod = self
.storage
.get_pod(&tenant_context.org_id, &tenant_context.project_id, &req.namespace, &req.name)
.get_pod(
&tenant_context.org_id,
&tenant_context.project_id,
&req.namespace,
&req.name,
)
.await?;
if let Some(pod) = pod {
@ -500,7 +542,12 @@ impl PodService for PodServiceImpl {
.authorize(
&tenant_context,
ACTION_POD_LIST,
&resource_for_tenant("pod", "*", &tenant_context.org_id, &tenant_context.project_id),
&resource_for_tenant(
"pod",
"*",
&tenant_context.org_id,
&tenant_context.project_id,
),
)
.await?;
let req = request.into_inner();
@ -514,7 +561,12 @@ impl PodService for PodServiceImpl {
let pods = self
.storage
.list_pods(&tenant_context.org_id, &tenant_context.project_id, namespace, label_selector)
.list_pods(
&tenant_context.org_id,
&tenant_context.project_id,
namespace,
label_selector,
)
.await?;
let items: Vec<k8shost_proto::Pod> = pods.iter().map(Self::to_proto_pod).collect();
@ -528,7 +580,9 @@ impl PodService for PodServiceImpl {
) -> Result<Response<UpdatePodResponse>, Status> {
let tenant = get_tenant_context(&request)?;
let req = request.into_inner();
let proto_pod = req.pod.ok_or_else(|| Status::invalid_argument("pod is required"))?;
let proto_pod = req
.pod
.ok_or_else(|| Status::invalid_argument("pod is required"))?;
let mut pod = Self::from_proto_pod(&proto_pod)?;
@ -585,13 +639,23 @@ impl PodService for PodServiceImpl {
.authorize(
&tenant_context,
ACTION_POD_DELETE,
&resource_for_tenant("pod", pod_key, &tenant_context.org_id, &tenant_context.project_id),
&resource_for_tenant(
"pod",
pod_key,
&tenant_context.org_id,
&tenant_context.project_id,
),
)
.await?;
let existed = self
.storage
.delete_pod(&tenant_context.org_id, &tenant_context.project_id, &req.namespace, &req.name)
.delete_pod(
&tenant_context.org_id,
&tenant_context.project_id,
&req.namespace,
&req.name,
)
.await?;
Ok(Response::new(DeletePodResponse { success: existed }))
@ -608,7 +672,12 @@ impl PodService for PodServiceImpl {
.authorize(
&tenant_context,
ACTION_POD_LIST,
&resource_for_tenant("pod", "*", &tenant_context.org_id, &tenant_context.project_id),
&resource_for_tenant(
"pod",
"*",
&tenant_context.org_id,
&tenant_context.project_id,
),
)
.await?;
let _req = request.into_inner();

View file

@ -33,7 +33,11 @@ pub struct ServiceServiceImpl {
}
impl ServiceServiceImpl {
pub fn new(storage: Arc<Storage>, ipam_client: Arc<IpamClient>, auth: Arc<AuthService>) -> Self {
pub fn new(
storage: Arc<Storage>,
ipam_client: Arc<IpamClient>,
auth: Arc<AuthService>,
) -> Self {
Self {
storage,
ipam_client,
@ -73,8 +77,10 @@ impl ServiceServiceImpl {
});
let status = svc.status.as_ref().map(|s| k8shost_proto::ServiceStatus {
load_balancer: s.load_balancer.as_ref().map(|lb| {
k8shost_proto::LoadBalancerStatus {
load_balancer: s
.load_balancer
.as_ref()
.map(|lb| k8shost_proto::LoadBalancerStatus {
ingress: lb
.ingress
.iter()
@ -83,7 +89,6 @@ impl ServiceServiceImpl {
hostname: ing.hostname.clone(),
})
.collect(),
}
}),
});
@ -141,8 +146,10 @@ impl ServiceServiceImpl {
};
let status = proto.status.as_ref().map(|s| k8shost_types::ServiceStatus {
load_balancer: s.load_balancer.as_ref().map(|lb| {
k8shost_types::LoadBalancerStatus {
load_balancer: s
.load_balancer
.as_ref()
.map(|lb| k8shost_types::LoadBalancerStatus {
ingress: lb
.ingress
.iter()
@ -151,7 +158,6 @@ impl ServiceServiceImpl {
hostname: ing.hostname.clone(),
})
.collect(),
}
}),
});
@ -194,7 +200,9 @@ impl ServiceService for ServiceServiceImpl {
service.metadata.org_id = Some(org_id.clone());
service.metadata.project_id = Some(project_id.clone());
if service.metadata.namespace.is_none() {
return Err(Status::invalid_argument("namespace is required in metadata"));
return Err(Status::invalid_argument(
"namespace is required in metadata",
));
}
self.auth
.authorize(
@ -221,11 +229,7 @@ impl ServiceService for ServiceServiceImpl {
// Allocate cluster IP if not present and service type is ClusterIP
if service.spec.cluster_ip.is_none() {
let svc_type = service
.spec
.r#type
.as_deref()
.unwrap_or("ClusterIP");
let svc_type = service.spec.r#type.as_deref().unwrap_or("ClusterIP");
if svc_type == "ClusterIP" || svc_type == "LoadBalancer" {
// Get org_id, project_id, and uid for IPAM
let org_id = service.metadata.org_id.as_ref().unwrap();
@ -235,12 +239,7 @@ impl ServiceService for ServiceServiceImpl {
// Allocate IP from IPAM
let cluster_ip = self
.ipam_client
.allocate_cluster_ip(
org_id,
project_id,
service_uid,
authorization.as_deref(),
)
.allocate_cluster_ip(org_id, project_id, service_uid, authorization.as_deref())
.await
.map_err(|e| {
Status::internal(format!("Failed to allocate Cluster IP: {}", e))
@ -281,7 +280,12 @@ impl ServiceService for ServiceServiceImpl {
let service = self
.storage
.get_service(&tenant_context.org_id, &tenant_context.project_id, &req.namespace, &req.name)
.get_service(
&tenant_context.org_id,
&tenant_context.project_id,
&req.namespace,
&req.name,
)
.await?;
if let Some(service) = service {
@ -304,7 +308,12 @@ impl ServiceService for ServiceServiceImpl {
.authorize(
&tenant_context,
ACTION_SERVICE_LIST,
&resource_for_tenant("service", "*", &tenant_context.org_id, &tenant_context.project_id),
&resource_for_tenant(
"service",
"*",
&tenant_context.org_id,
&tenant_context.project_id,
),
)
.await?;
let req = request.into_inner();
@ -313,13 +322,15 @@ impl ServiceService for ServiceServiceImpl {
let services = self
.storage
.list_services(&tenant_context.org_id, &tenant_context.project_id, namespace)
.list_services(
&tenant_context.org_id,
&tenant_context.project_id,
namespace,
)
.await?;
let items: Vec<k8shost_proto::Service> = services
.iter()
.map(Self::to_proto_service)
.collect();
let items: Vec<k8shost_proto::Service> =
services.iter().map(Self::to_proto_service).collect();
Ok(Response::new(ListServicesResponse { items }))
}

View file

@ -207,7 +207,9 @@ mod tests {
async fn test_pod_crud_operations() {
// This test requires a running FlareDB instance
let pd_addr = std::env::var("FLAREDB_PD_ADDR").unwrap_or("127.0.0.1:2479".to_string());
let storage = Storage::new(pd_addr).await.expect("Failed to connect to FlareDB");
let storage = Storage::new(pd_addr)
.await
.expect("Failed to connect to FlareDB");
let pod_service = PodServiceImpl::new(Arc::new(storage), test_auth_service().await);
// Create a pod
@ -226,10 +228,7 @@ mod tests {
let get_resp = pod_service.get_pod(get_req).await;
assert!(get_resp.is_ok());
let retrieved_pod = get_resp.unwrap().into_inner().pod.unwrap();
assert_eq!(
retrieved_pod.metadata.as_ref().unwrap().name,
"test-pod-1"
);
assert_eq!(retrieved_pod.metadata.as_ref().unwrap().name, "test-pod-1");
// List pods
let list_req = with_test_tenant(Request::new(ListPodsRequest {
@ -255,7 +254,9 @@ mod tests {
#[ignore] // Requires running FlareDB and PrismNET instances
async fn test_service_crud_operations() {
let pd_addr = std::env::var("FLAREDB_PD_ADDR").unwrap_or("127.0.0.1:2479".to_string());
let storage = Storage::new(pd_addr).await.expect("Failed to connect to FlareDB");
let storage = Storage::new(pd_addr)
.await
.expect("Failed to connect to FlareDB");
let prismnet_addr =
std::env::var("PRISMNET_ADDR").unwrap_or("http://127.0.0.1:9090".to_string());
let ipam_client = crate::ipam_client::IpamClient::new(prismnet_addr);
@ -273,12 +274,7 @@ mod tests {
let create_resp = service_service.create_service(create_req).await;
assert!(create_resp.is_ok());
let created_service = create_resp.unwrap().into_inner().service.unwrap();
assert!(created_service
.spec
.as_ref()
.unwrap()
.cluster_ip
.is_some());
assert!(created_service.spec.as_ref().unwrap().cluster_ip.is_some());
// Get the service
let get_req = with_test_tenant(Request::new(GetServiceRequest {
@ -308,7 +304,9 @@ mod tests {
#[ignore] // Requires running FlareDB instance
async fn test_node_operations() {
let pd_addr = std::env::var("FLAREDB_PD_ADDR").unwrap_or("127.0.0.1:2479".to_string());
let storage = Storage::new(pd_addr).await.expect("Failed to connect to FlareDB");
let storage = Storage::new(pd_addr)
.await
.expect("Failed to connect to FlareDB");
let node_service = NodeServiceImpl::new(Arc::new(storage), test_auth_service().await);
// Register a node

View file

@ -4,7 +4,7 @@
//! with multi-tenant support using FlareDB as the backend.
use flaredb_client::RdbClient;
use k8shost_types::{Node, Pod, Service};
use k8shost_types::{Deployment, Node, Pod, Service};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
@ -18,12 +18,8 @@ pub struct Storage {
impl Storage {
/// Create a new storage instance with FlareDB backend
pub async fn new(pd_addr: String) -> Result<Self, Box<dyn std::error::Error>> {
let client = RdbClient::connect_with_pd_namespace(
pd_addr.clone(),
pd_addr,
"k8shost",
)
.await?;
let client =
RdbClient::connect_with_pd_namespace(pd_addr.clone(), pd_addr, "k8shost").await?;
Ok(Self {
client: Arc::new(Mutex::new(client)),
@ -31,9 +27,7 @@ impl Storage {
}
/// Create a storage instance that connects directly to a single FlareDB server (no PD)
pub async fn new_direct(
server_addr: String,
) -> Result<Self, Box<dyn std::error::Error>> {
pub async fn new_direct(server_addr: String) -> Result<Self, Box<dyn std::error::Error>> {
let client = RdbClient::connect_direct(server_addr, "k8shost").await?;
Ok(Self {
client: Arc::new(Mutex::new(client)),
@ -68,11 +62,20 @@ impl Storage {
/// Create or update a pod
pub async fn put_pod(&self, pod: &Pod) -> Result<(), Status> {
let org_id = pod.metadata.org_id.as_ref()
let org_id = pod
.metadata
.org_id
.as_ref()
.ok_or_else(|| Status::invalid_argument("org_id is required"))?;
let project_id = pod.metadata.project_id.as_ref()
let project_id = pod
.metadata
.project_id
.as_ref()
.ok_or_else(|| Status::invalid_argument("project_id is required"))?;
let namespace = pod.metadata.namespace.as_ref()
let namespace = pod
.metadata
.namespace
.as_ref()
.ok_or_else(|| Status::invalid_argument("namespace is required"))?;
let key = Self::pod_key(org_id, project_id, namespace, &pod.metadata.name);
@ -80,7 +83,8 @@ impl Storage {
.map_err(|e| Status::internal(format!("Failed to serialize pod: {}", e)))?;
let mut client = self.client.lock().await;
client.raw_put(key, value)
client
.raw_put(key, value)
.await
.map_err(|e| Status::internal(format!("FlareDB put failed: {}", e)))?;
@ -98,7 +102,8 @@ impl Storage {
let key = Self::pod_key(org_id, project_id, namespace, name);
let mut client = self.client.lock().await;
let result = client.raw_get(key)
let result = client
.raw_get(key)
.await
.map_err(|e| Status::internal(format!("FlareDB get failed: {}", e)))?;
@ -139,7 +144,8 @@ impl Storage {
// Paginate through all results
loop {
let mut client = self.client.lock().await;
let (_keys, values, next) = client.raw_scan(
let (_keys, values, next) = client
.raw_scan(
start_key.clone(),
end_key.clone(),
1000, // Batch size
@ -153,7 +159,11 @@ impl Storage {
// Apply label selector filter if provided
if let Some(selector) = label_selector {
let matches = selector.iter().all(|(k, v)| {
pod.metadata.labels.get(k).map(|pv| pv == v).unwrap_or(false)
pod.metadata
.labels
.get(k)
.map(|pv| pv == v)
.unwrap_or(false)
});
if matches {
pods.push(pod);
@ -221,7 +231,8 @@ impl Storage {
let key = Self::pod_key(org_id, project_id, namespace, name);
let mut client = self.client.lock().await;
let existed = client.raw_delete(key)
let existed = client
.raw_delete(key)
.await
.map_err(|e| Status::internal(format!("FlareDB delete failed: {}", e)))?;
@ -234,7 +245,11 @@ impl Storage {
/// Build key for service storage
fn service_key(org_id: &str, project_id: &str, namespace: &str, name: &str) -> Vec<u8> {
format!("k8s/{}/{}/services/{}/{}", org_id, project_id, namespace, name).into_bytes()
format!(
"k8s/{}/{}/services/{}/{}",
org_id, project_id, namespace, name
)
.into_bytes()
}
/// Build prefix for service listing
@ -248,11 +263,20 @@ impl Storage {
/// Create or update a service
pub async fn put_service(&self, service: &Service) -> Result<(), Status> {
let org_id = service.metadata.org_id.as_ref()
let org_id = service
.metadata
.org_id
.as_ref()
.ok_or_else(|| Status::invalid_argument("org_id is required"))?;
let project_id = service.metadata.project_id.as_ref()
let project_id = service
.metadata
.project_id
.as_ref()
.ok_or_else(|| Status::invalid_argument("project_id is required"))?;
let namespace = service.metadata.namespace.as_ref()
let namespace = service
.metadata
.namespace
.as_ref()
.ok_or_else(|| Status::invalid_argument("namespace is required"))?;
let key = Self::service_key(org_id, project_id, namespace, &service.metadata.name);
@ -260,7 +284,8 @@ impl Storage {
.map_err(|e| Status::internal(format!("Failed to serialize service: {}", e)))?;
let mut client = self.client.lock().await;
client.raw_put(key, value)
client
.raw_put(key, value)
.await
.map_err(|e| Status::internal(format!("FlareDB put failed: {}", e)))?;
@ -278,7 +303,8 @@ impl Storage {
let key = Self::service_key(org_id, project_id, namespace, name);
let mut client = self.client.lock().await;
let result = client.raw_get(key)
let result = client
.raw_get(key)
.await
.map_err(|e| Status::internal(format!("FlareDB get failed: {}", e)))?;
@ -316,11 +342,8 @@ impl Storage {
loop {
let mut client = self.client.lock().await;
let (_keys, values, next) = client.raw_scan(
start_key.clone(),
end_key.clone(),
1000,
)
let (_keys, values, next) = client
.raw_scan(start_key.clone(), end_key.clone(), 1000)
.await
.map_err(|e| Status::internal(format!("FlareDB scan failed: {}", e)))?;
@ -351,7 +374,8 @@ impl Storage {
let key = Self::service_key(org_id, project_id, namespace, name);
let mut client = self.client.lock().await;
let existed = client.raw_delete(key)
let existed = client
.raw_delete(key)
.await
.map_err(|e| Status::internal(format!("FlareDB delete failed: {}", e)))?;
@ -374,9 +398,15 @@ impl Storage {
/// Create or update a node
pub async fn put_node(&self, node: &Node) -> Result<(), Status> {
let org_id = node.metadata.org_id.as_ref()
let org_id = node
.metadata
.org_id
.as_ref()
.ok_or_else(|| Status::invalid_argument("org_id is required"))?;
let project_id = node.metadata.project_id.as_ref()
let project_id = node
.metadata
.project_id
.as_ref()
.ok_or_else(|| Status::invalid_argument("project_id is required"))?;
let key = Self::node_key(org_id, project_id, &node.metadata.name);
@ -384,7 +414,8 @@ impl Storage {
.map_err(|e| Status::internal(format!("Failed to serialize node: {}", e)))?;
let mut client = self.client.lock().await;
client.raw_put(key, value)
client
.raw_put(key, value)
.await
.map_err(|e| Status::internal(format!("FlareDB put failed: {}", e)))?;
@ -401,7 +432,8 @@ impl Storage {
let key = Self::node_key(org_id, project_id, name);
let mut client = self.client.lock().await;
let result = client.raw_get(key)
let result = client
.raw_get(key)
.await
.map_err(|e| Status::internal(format!("FlareDB get failed: {}", e)))?;
@ -415,11 +447,7 @@ impl Storage {
}
/// List all nodes
pub async fn list_nodes(
&self,
org_id: &str,
project_id: &str,
) -> Result<Vec<Node>, Status> {
pub async fn list_nodes(&self, org_id: &str, project_id: &str) -> Result<Vec<Node>, Status> {
let prefix = Self::node_prefix(org_id, project_id);
let mut end_key = prefix.clone();
@ -438,11 +466,8 @@ impl Storage {
loop {
let mut client = self.client.lock().await;
let (_keys, values, next) = client.raw_scan(
start_key.clone(),
end_key.clone(),
1000,
)
let (_keys, values, next) = client
.raw_scan(start_key.clone(), end_key.clone(), 1000)
.await
.map_err(|e| Status::internal(format!("FlareDB scan failed: {}", e)))?;
@ -472,7 +497,184 @@ impl Storage {
let key = Self::node_key(org_id, project_id, name);
let mut client = self.client.lock().await;
let existed = client.raw_delete(key)
let existed = client
.raw_delete(key)
.await
.map_err(|e| Status::internal(format!("FlareDB delete failed: {}", e)))?;
Ok(existed)
}
// ============================================================================
// Deployment Operations
// ============================================================================
/// Build key for deployment storage
fn deployment_key(org_id: &str, project_id: &str, namespace: &str, name: &str) -> Vec<u8> {
format!(
"k8s/{}/{}/deployments/{}/{}",
org_id, project_id, namespace, name
)
.into_bytes()
}
/// Build prefix for deployment listing
fn deployment_prefix(org_id: &str, project_id: &str, namespace: Option<&str>) -> Vec<u8> {
if let Some(ns) = namespace {
format!("k8s/{}/{}/deployments/{}/", org_id, project_id, ns).into_bytes()
} else {
format!("k8s/{}/{}/deployments/", org_id, project_id).into_bytes()
}
}
/// Create or update a deployment
pub async fn put_deployment(&self, deployment: &Deployment) -> Result<(), Status> {
let org_id = deployment
.metadata
.org_id
.as_ref()
.ok_or_else(|| Status::invalid_argument("org_id is required"))?;
let project_id = deployment
.metadata
.project_id
.as_ref()
.ok_or_else(|| Status::invalid_argument("project_id is required"))?;
let namespace = deployment
.metadata
.namespace
.as_ref()
.ok_or_else(|| Status::invalid_argument("namespace is required"))?;
let key = Self::deployment_key(org_id, project_id, namespace, &deployment.metadata.name);
let value = serde_json::to_vec(deployment)
.map_err(|e| Status::internal(format!("Failed to serialize deployment: {}", e)))?;
let mut client = self.client.lock().await;
client
.raw_put(key, value)
.await
.map_err(|e| Status::internal(format!("FlareDB put failed: {}", e)))?;
Ok(())
}
/// Get a deployment by name
pub async fn get_deployment(
&self,
org_id: &str,
project_id: &str,
namespace: &str,
name: &str,
) -> Result<Option<Deployment>, Status> {
let key = Self::deployment_key(org_id, project_id, namespace, name);
let mut client = self.client.lock().await;
let result = client
.raw_get(key)
.await
.map_err(|e| Status::internal(format!("FlareDB get failed: {}", e)))?;
if let Some(bytes) = result {
let deployment: Deployment = serde_json::from_slice(&bytes).map_err(|e| {
Status::internal(format!("Failed to deserialize deployment: {}", e))
})?;
Ok(Some(deployment))
} else {
Ok(None)
}
}
/// List deployments in a namespace
pub async fn list_deployments(
&self,
org_id: &str,
project_id: &str,
namespace: Option<&str>,
) -> Result<Vec<Deployment>, Status> {
let prefix = Self::deployment_prefix(org_id, project_id, namespace);
let mut end_key = prefix.clone();
if let Some(last) = end_key.last_mut() {
if *last == 0xff {
end_key.push(0x00);
} else {
*last += 1;
}
} else {
end_key.push(0xff);
}
let mut deployments = Vec::new();
let mut start_key = prefix;
loop {
let mut client = self.client.lock().await;
let (_keys, values, next) = client
.raw_scan(start_key.clone(), end_key.clone(), 1000)
.await
.map_err(|e| Status::internal(format!("FlareDB scan failed: {}", e)))?;
for value in values {
if let Ok(deployment) = serde_json::from_slice::<Deployment>(&value) {
deployments.push(deployment);
}
}
if let Some(next_key) = next {
start_key = next_key;
} else {
break;
}
}
Ok(deployments)
}
/// List deployments across all tenants
pub async fn list_all_deployments(&self) -> Result<Vec<Deployment>, Status> {
let prefix = b"k8s/".to_vec();
let mut end_key = prefix.clone();
end_key.push(0xff);
let mut deployments = Vec::new();
let mut start_key = prefix;
loop {
let mut client = self.client.lock().await;
let (_keys, values, next) = client
.raw_scan(start_key.clone(), end_key.clone(), 1000)
.await
.map_err(|e| Status::internal(format!("FlareDB scan failed: {}", e)))?;
for value in values {
if let Ok(deployment) = serde_json::from_slice::<Deployment>(&value) {
deployments.push(deployment);
}
}
if let Some(next_key) = next {
start_key = next_key;
} else {
break;
}
}
Ok(deployments)
}
/// Delete a deployment
pub async fn delete_deployment(
&self,
org_id: &str,
project_id: &str,
namespace: &str,
name: &str,
) -> Result<bool, Status> {
let key = Self::deployment_key(org_id, project_id, namespace, name);
let mut client = self.client.lock().await;
let existed = client
.raw_delete(key)
.await
.map_err(|e| Status::internal(format!("FlareDB delete failed: {}", e)))?;

View file

@ -3474,9 +3474,10 @@ validate_k8shost_flow() {
local org_id="default-org"
local project_id="default-project"
local principal_id="k8shost-smoke-$(date +%s)"
local token node_name pod_name service_name service_port
local token node_name deployment_name pod_name service_name service_port
token="$(issue_project_admin_token 15080 "${org_id}" "${project_id}" "${principal_id}")"
node_name="smoke-node-$(date +%s)"
deployment_name="smoke-deploy-$(date +%s)"
pod_name="smoke-pod-$(date +%s)"
service_name="smoke-svc-$(date +%s)"
service_port=$((18180 + (RANDOM % 100)))
@ -3502,6 +3503,97 @@ validate_k8shost_flow() {
127.0.0.1:15087 k8shost.NodeService/ListNodes \
| jq -e --arg name "${node_name}" '.items | any(.metadata.name == $name)' >/dev/null
grpcurl -plaintext \
-H "authorization: Bearer ${token}" \
-import-path "${K8SHOST_PROTO_DIR}" \
-proto "${K8SHOST_PROTO}" \
-d "$(jq -cn --arg name "${deployment_name}" --arg org "${org_id}" --arg project "${project_id}" '{deployment:{metadata:{name:$name, namespace:"default", orgId:$org, projectId:$project}, spec:{replicas:2, selector:{matchLabels:{app:"k8shost-deployment-smoke", deployment:$name}}, template:{metadata:{name:"", namespace:"default", labels:{app:"k8shost-deployment-smoke", deployment:$name}}, spec:{containers:[{name:"backend", image:"smoke", ports:[{containerPort:8082, protocol:"TCP"}]}]}}}}}')" \
127.0.0.1:15087 k8shost.DeploymentService/CreateDeployment >/dev/null
grpcurl -plaintext \
-H "authorization: Bearer ${token}" \
-import-path "${K8SHOST_PROTO_DIR}" \
-proto "${K8SHOST_PROTO}" \
-d "$(jq -cn '{namespace:"default"}')" \
127.0.0.1:15087 k8shost.DeploymentService/ListDeployments \
| jq -e --arg name "${deployment_name}" '.items | any(.metadata.name == $name)' >/dev/null
deadline=$((SECONDS + HTTP_WAIT_TIMEOUT))
while true; do
local deployment_pods_json
deployment_pods_json="$(grpcurl -plaintext \
-H "authorization: Bearer ${token}" \
-import-path "${K8SHOST_PROTO_DIR}" \
-proto "${K8SHOST_PROTO}" \
-d "$(jq -cn --arg ns "default" --arg deploy "${deployment_name}" '{namespace:$ns, labelSelector:{deployment:$deploy}}')" \
127.0.0.1:15087 k8shost.PodService/ListPods 2>/dev/null || true)"
if [[ -n "${deployment_pods_json}" ]] && printf '%s' "${deployment_pods_json}" | jq -e --arg node "${node_name}" '
(.items | length) == 2 and
all(.items[]; .spec.nodeName == $node)' >/dev/null 2>&1; then
break
fi
if (( SECONDS >= deadline )); then
die "timed out waiting for K8sHost Deployment ${deployment_name} to create and schedule pods"
fi
sleep 2
done
local deployment_json
deployment_json="$(grpcurl -plaintext \
-H "authorization: Bearer ${token}" \
-import-path "${K8SHOST_PROTO_DIR}" \
-proto "${K8SHOST_PROTO}" \
-d "$(jq -cn --arg ns "default" --arg name "${deployment_name}" '{namespace:$ns, name:$name}')" \
127.0.0.1:15087 k8shost.DeploymentService/GetDeployment)"
grpcurl -plaintext \
-H "authorization: Bearer ${token}" \
-import-path "${K8SHOST_PROTO_DIR}" \
-proto "${K8SHOST_PROTO}" \
-d "$(printf '%s' "${deployment_json}" | jq '.deployment.spec.replicas = 1 | {deployment:.deployment}')" \
127.0.0.1:15087 k8shost.DeploymentService/UpdateDeployment >/dev/null
deadline=$((SECONDS + HTTP_WAIT_TIMEOUT))
while true; do
local scaled_pods_json
scaled_pods_json="$(grpcurl -plaintext \
-H "authorization: Bearer ${token}" \
-import-path "${K8SHOST_PROTO_DIR}" \
-proto "${K8SHOST_PROTO}" \
-d "$(jq -cn --arg ns "default" --arg deploy "${deployment_name}" '{namespace:$ns, labelSelector:{deployment:$deploy}}')" \
127.0.0.1:15087 k8shost.PodService/ListPods 2>/dev/null || true)"
if [[ -n "${scaled_pods_json}" ]] && printf '%s' "${scaled_pods_json}" | jq -e '.items | length == 1' >/dev/null 2>&1; then
break
fi
if (( SECONDS >= deadline )); then
die "timed out waiting for K8sHost Deployment ${deployment_name} to scale down"
fi
sleep 2
done
grpcurl -plaintext \
-H "authorization: Bearer ${token}" \
-import-path "${K8SHOST_PROTO_DIR}" \
-proto "${K8SHOST_PROTO}" \
-d "$(jq -cn --arg ns "default" --arg name "${deployment_name}" '{namespace:$ns, name:$name}')" \
127.0.0.1:15087 k8shost.DeploymentService/DeleteDeployment >/dev/null
deadline=$((SECONDS + HTTP_WAIT_TIMEOUT))
while true; do
local deleted_pods_json
deleted_pods_json="$(grpcurl -plaintext \
-H "authorization: Bearer ${token}" \
-import-path "${K8SHOST_PROTO_DIR}" \
-proto "${K8SHOST_PROTO}" \
-d "$(jq -cn --arg ns "default" --arg deploy "${deployment_name}" '{namespace:$ns, labelSelector:{deployment:$deploy}}')" \
127.0.0.1:15087 k8shost.PodService/ListPods 2>/dev/null || true)"
if [[ -n "${deleted_pods_json}" ]] && printf '%s' "${deleted_pods_json}" | jq -e '.items | length == 0' >/dev/null 2>&1; then
break
fi
if (( SECONDS >= deadline )); then
die "timed out waiting for K8sHost Deployment ${deployment_name} to delete managed pods"
fi
sleep 2
done
grpcurl -plaintext \
-H "authorization: Bearer ${token}" \
-import-path "${K8SHOST_PROTO_DIR}" \