diff --git a/deployer/crates/deployer-types/src/lib.rs b/deployer/crates/deployer-types/src/lib.rs index ee4ca4c..0f7bf75 100644 --- a/deployer/crates/deployer-types/src/lib.rs +++ b/deployer/crates/deployer-types/src/lib.rs @@ -303,6 +303,10 @@ fn default_service_replicas() -> u32 { 1 } +fn default_service_schedule_mode() -> ServiceScheduleMode { + ServiceScheduleMode::Replicated +} + fn default_rollout_max_unavailable() -> u32 { 1 } @@ -433,9 +437,22 @@ impl Default for RolloutStrategySpec { } } +/// Scheduling mode for native runtime services. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)] +#[serde(rename_all = "snake_case")] +pub enum ServiceScheduleMode { + /// Keep a fixed replica count and place instances across eligible nodes. + #[default] + Replicated, + /// Run one instance on every eligible node. + Daemon, +} + /// Scheduler-specific service intent used by the non-Kubernetes fleet scheduler. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct ServiceScheduleSpec { + #[serde(default = "default_service_schedule_mode")] + pub mode: ServiceScheduleMode, #[serde(default = "default_service_replicas")] pub replicas: u32, #[serde(default)] @@ -457,6 +474,7 @@ pub struct ServiceScheduleSpec { impl Default for ServiceScheduleSpec { fn default() -> Self { Self { + mode: default_service_schedule_mode(), replicas: default_service_replicas(), placement: PlacementPolicy::default(), rollout: RolloutStrategySpec::default(), @@ -1100,6 +1118,7 @@ mod tests { #[test] fn test_service_schedule_defaults() { let schedule = ServiceScheduleSpec::default(); + assert_eq!(schedule.mode, ServiceScheduleMode::Replicated); assert_eq!(schedule.replicas, 1); assert_eq!(schedule.placement.max_instances_per_node, 1); } diff --git a/deployer/crates/fleet-scheduler/src/main.rs b/deployer/crates/fleet-scheduler/src/main.rs index 7fcbbf3..c7d9b74 100644 --- a/deployer/crates/fleet-scheduler/src/main.rs +++ b/deployer/crates/fleet-scheduler/src/main.rs @@ -5,7 +5,9 @@ use anyhow::{Context, Result}; use chainfire_client::Client; use chrono::Utc; use clap::Parser; -use deployer_types::{ClusterNodeRecord, PlacementPolicy, ServiceInstanceSpec, ServiceSpec}; +use deployer_types::{ + ClusterNodeRecord, PlacementPolicy, ServiceInstanceSpec, ServiceScheduleMode, ServiceSpec, +}; use publish::{PublicationConfig, PublicationReconciler}; use serde_json::Value; use std::collections::{BTreeMap, HashMap, HashSet}; @@ -220,12 +222,14 @@ impl Scheduler { let existing_instances = decode_managed_instances(&existing); let desired_instances = build_desired_instances(service, &eligible_nodes, &existing_instances)?; - if desired_instances.len() < schedule.replicas as usize { + let target_instances = schedule_target_count(schedule.mode, eligible_nodes.len(), schedule.replicas); + if desired_instances.len() < target_instances { warn!( service = %service.name, - requested = schedule.replicas, + requested = target_instances, scheduled = desired_instances.len(), - "insufficient eligible node capacity for requested replicas" + mode = ?schedule.mode, + "insufficient eligible node capacity for requested service instances" ); } @@ -407,6 +411,26 @@ fn build_desired_instances( service: &ServiceSpec, eligible_nodes: &[&ClusterNodeRecord], existing_instances: &[ServiceInstanceSpec], +) -> Result> { + let schedule = service + .schedule + .as_ref() + .context("scheduled service missing schedule block")?; + + match schedule.mode { + ServiceScheduleMode::Replicated => { + build_replicated_desired_instances(service, eligible_nodes, existing_instances) + } + ServiceScheduleMode::Daemon => { + build_daemon_desired_instances(service, eligible_nodes, existing_instances) + } + } +} + +fn build_replicated_desired_instances( + service: &ServiceSpec, + eligible_nodes: &[&ClusterNodeRecord], + existing_instances: &[ServiceInstanceSpec], ) -> Result> { let schedule = service .schedule @@ -486,6 +510,59 @@ fn build_desired_instances( Ok(desired) } +fn build_daemon_desired_instances( + service: &ServiceSpec, + eligible_nodes: &[&ClusterNodeRecord], + existing_instances: &[ServiceInstanceSpec], +) -> Result> { + let port = resolve_instance_port(service).with_context(|| { + format!( + "service {} is missing instance_port and service ports", + service.name + ) + })?; + let eligible_by_node: HashSet<&str> = eligible_nodes + .iter() + .map(|node| node.node_id.as_str()) + .collect(); + let mut reusable = existing_instances + .iter() + .filter(|instance| eligible_by_node.contains(instance.node_id.as_str())) + .filter(|instance| instance_is_reusable(instance)) + .collect::>(); + reusable.sort_by(|lhs, rhs| { + lhs.node_id + .cmp(&rhs.node_id) + .then_with(|| instance_state_rank(lhs).cmp(&instance_state_rank(rhs))) + .then_with(|| lhs.instance_id.cmp(&rhs.instance_id)) + }); + + let mut reusable_by_node: BTreeMap> = BTreeMap::new(); + for instance in reusable { + reusable_by_node + .entry(instance.node_id.clone()) + .or_default() + .push(instance.instance_id.clone()); + } + + let mut desired = Vec::with_capacity(eligible_nodes.len()); + for node in eligible_nodes { + let instance_id = reusable_by_node + .get_mut(&node.node_id) + .and_then(|instance_ids| { + if instance_ids.is_empty() { + None + } else { + Some(instance_ids.remove(0)) + } + }) + .unwrap_or_else(|| render_instance_id(&service.name, &node.node_id, 0)); + desired.push(desired_instance(service, node, &instance_id, port)); + } + + Ok(desired) +} + fn desired_instance( service: &ServiceSpec, node: &ClusterNodeRecord, @@ -618,6 +695,13 @@ fn resolve_instance_port(service: &ServiceSpec) -> Option { .or_else(|| service.ports.as_ref().and_then(|ports| ports.grpc)) } +fn schedule_target_count(mode: ServiceScheduleMode, eligible_nodes: usize, replicas: u32) -> usize { + match mode { + ServiceScheduleMode::Replicated => replicas as usize, + ServiceScheduleMode::Daemon => eligible_nodes, + } +} + fn render_instance_id(service: &str, node_id: &str, ordinal: u32) -> String { if ordinal == 0 { format!("{service}-{node_id}") @@ -680,7 +764,7 @@ fn plan_managed_reconciliation( .schedule .as_ref() .context("scheduled service missing schedule block")?; - let desired_replicas = schedule.replicas as usize; + let desired_replicas = desired_instances.len(); let min_available = desired_replicas.saturating_sub(schedule.rollout.max_unavailable as usize); let max_total_instances = desired_replicas + schedule.rollout.max_surge as usize; let mut available_count = existing_instances @@ -878,7 +962,7 @@ mod tests { use chrono::Duration as ChronoDuration; use deployer_types::{ ClusterNodeRecord, HealthCheckSpec, PlacementPolicy, ProcessSpec, RolloutStrategySpec, - ServicePorts, ServiceScheduleSpec, + ServicePorts, ServiceScheduleMode, ServiceScheduleSpec, }; fn active_node(node_id: &str, roles: &[&str], labels: &[(&str, &str)]) -> ClusterNodeRecord { @@ -920,6 +1004,7 @@ mod tests { mtls_required: None, mesh_mode: None, schedule: Some(ServiceScheduleSpec { + mode: ServiceScheduleMode::Replicated, replicas: 2, placement: PlacementPolicy { roles: vec!["worker".to_string()], @@ -1013,6 +1098,79 @@ mod tests { assert_eq!(desired[1].instance_id, "api-node01-2"); } + #[test] + fn test_build_desired_instances_daemon_places_on_every_eligible_node() { + let nodes = vec![ + active_node("node01", &["worker"], &[("tier", "general")]), + active_node("node02", &["worker"], &[("tier", "general")]), + ]; + let refs: Vec<&ClusterNodeRecord> = nodes.iter().collect(); + let mut service = scheduled_service(); + let schedule = service.schedule.as_mut().unwrap(); + schedule.mode = ServiceScheduleMode::Daemon; + schedule.replicas = 1; + + let desired = build_desired_instances(&service, &refs, &[]).unwrap(); + + assert_eq!(desired.len(), 2); + assert_eq!(desired[0].instance_id, "api-node01"); + assert_eq!(desired[1].instance_id, "api-node02"); + } + + #[test] + fn test_build_desired_instances_daemon_reuses_existing_instance_per_node() { + let nodes = vec![ + active_node("node01", &["worker"], &[("tier", "general")]), + active_node("node02", &["worker"], &[("tier", "general")]), + ]; + let refs: Vec<&ClusterNodeRecord> = nodes.iter().collect(); + let mut service = scheduled_service(); + let schedule = service.schedule.as_mut().unwrap(); + schedule.mode = ServiceScheduleMode::Daemon; + schedule.replicas = 1; + + let existing = vec![ + ServiceInstanceSpec { + instance_id: "api-node01-2".to_string(), + service: "api".to_string(), + node_id: "node01".to_string(), + ip: "10.0.0.1".to_string(), + port: 8080, + mesh_port: Some(18080), + version: None, + health_check: None, + process: None, + container: None, + managed_by: Some(MANAGED_BY.to_string()), + state: Some("healthy".to_string()), + last_heartbeat: None, + observed_at: None, + }, + ServiceInstanceSpec { + instance_id: "api-node02".to_string(), + service: "api".to_string(), + node_id: "node02".to_string(), + ip: "10.0.0.2".to_string(), + port: 8080, + mesh_port: Some(18080), + version: None, + health_check: None, + process: None, + container: None, + managed_by: Some(MANAGED_BY.to_string()), + state: Some("healthy".to_string()), + last_heartbeat: None, + observed_at: None, + }, + ]; + + let desired = build_desired_instances(&service, &refs, &existing).unwrap(); + + assert_eq!(desired.len(), 2); + assert_eq!(desired[0].instance_id, "api-node01-2"); + assert_eq!(desired[1].instance_id, "api-node02"); + } + #[test] fn test_pick_next_node_prefers_less_used_failure_domain() { let nodes = vec![ @@ -1281,6 +1439,76 @@ mod tests { assert_eq!(plan.deletes, vec!["api-node01".to_string()]); } + #[test] + fn test_plan_reconciliation_for_daemon_uses_current_eligible_node_count() { + let mut service = scheduled_service(); + let schedule = service.schedule.as_mut().unwrap(); + schedule.mode = ServiceScheduleMode::Daemon; + schedule.replicas = 99; + schedule.rollout = RolloutStrategySpec { + max_unavailable: 0, + max_surge: 0, + }; + + let existing_instances = vec![ + ServiceInstanceSpec { + instance_id: "api-node01".to_string(), + service: "api".to_string(), + node_id: "node01".to_string(), + ip: "10.0.0.1".to_string(), + port: 8080, + mesh_port: Some(18080), + version: None, + health_check: None, + process: schedule.process.clone(), + container: None, + managed_by: Some(MANAGED_BY.to_string()), + state: Some("healthy".to_string()), + last_heartbeat: None, + observed_at: None, + }, + ServiceInstanceSpec { + instance_id: "api-node02".to_string(), + service: "api".to_string(), + node_id: "node02".to_string(), + ip: "10.0.0.2".to_string(), + port: 8080, + mesh_port: Some(18080), + version: None, + health_check: None, + process: schedule.process.clone(), + container: None, + managed_by: Some(MANAGED_BY.to_string()), + state: Some("healthy".to_string()), + last_heartbeat: None, + observed_at: None, + }, + ]; + let existing = existing_instances + .iter() + .map(|instance| { + ( + instance.instance_id.clone(), + serde_json::to_value(instance).unwrap(), + ) + }) + .collect::>(); + let desired_instances = vec![existing_instances[1].clone()]; + + let plan = plan_managed_reconciliation( + &service, + &desired_instances, + &existing, + &existing_instances, + 0, + ) + .unwrap(); + + assert!(plan.upserts.is_empty()); + assert_eq!(plan.deletes, vec!["api-node01".to_string()]); + assert_eq!(plan.deferred_deletes, 0); + } + #[test] fn test_merge_preserved_fields_replaces_null_status_fields() { let desired = serde_json::json!({ diff --git a/deployer/crates/fleet-scheduler/src/publish.rs b/deployer/crates/fleet-scheduler/src/publish.rs index 82317bd..be41e0f 100644 --- a/deployer/crates/fleet-scheduler/src/publish.rs +++ b/deployer/crates/fleet-scheduler/src/publish.rs @@ -545,6 +545,7 @@ async fn ensure_load_balancer( org_id: org_id.to_string(), project_id: project_id.to_string(), description: format!("native runtime service {name}"), + vip_address: String::new(), }, auth_token, )) diff --git a/deployer/crates/plasmacloud-reconciler/src/main.rs b/deployer/crates/plasmacloud-reconciler/src/main.rs index 55ebd3d..d900380 100644 --- a/deployer/crates/plasmacloud-reconciler/src/main.rs +++ b/deployer/crates/plasmacloud-reconciler/src/main.rs @@ -423,6 +423,7 @@ async fn ensure_load_balancer( org_id: spec.org_id.clone(), project_id: spec.project_id.clone().unwrap_or_default(), description: spec.description.clone().unwrap_or_default(), + vip_address: String::new(), }) .await? .into_inner(); diff --git a/k8shost/crates/k8shost-server/src/fiberlb_controller.rs b/k8shost/crates/k8shost-server/src/fiberlb_controller.rs index e39ad4e..4de33c3 100644 --- a/k8shost/crates/k8shost-server/src/fiberlb_controller.rs +++ b/k8shost/crates/k8shost-server/src/fiberlb_controller.rs @@ -170,6 +170,7 @@ impl FiberLbController { org_id: org_id.to_string(), project_id: project_id.to_string(), description: format!("k8s service {}/{}", namespace, name), + vip_address: String::new(), }; let lb_id = match lb_client diff --git a/nix-nos/lib/cluster-config-lib.nix b/nix-nos/lib/cluster-config-lib.nix index ea6bf68..4bef600 100644 --- a/nix-nos/lib/cluster-config-lib.nix +++ b/nix-nos/lib/cluster-config-lib.nix @@ -445,6 +445,12 @@ let healthCheckType = mkHealthCheckType types; in types.submodule { options = { + mode = mkOption { + type = types.enum [ "replicated" "daemon" ]; + default = "replicated"; + description = "Scheduling mode used by the native runtime service"; + }; + replicas = mkOption { type = types.ints.positive; default = 1; @@ -1253,6 +1259,7 @@ let mkServiceScheduleSpec = schedule: { + mode = schedule.mode; replicas = schedule.replicas; placement = mkPlacementPolicySpec schedule.placement; rollout = mkRolloutStrategySpec schedule.rollout; diff --git a/nix/test-cluster/common.nix b/nix/test-cluster/common.nix index f431eb5..be19a09 100644 --- a/nix/test-cluster/common.nix +++ b/nix/test-cluster/common.nix @@ -364,6 +364,41 @@ in }; }; }; + + native-daemon = { + protocol = "http"; + ports.http = 18193; + schedule = { + mode = "daemon"; + replicas = 1; + placement = { + roles = [ "worker" ]; + pools = [ "general" ]; + nodeClasses = [ "worker-linux" ]; + matchLabels = { + runtime = "native"; + }; + maxInstancesPerNode = 1; + }; + instancePort = 18193; + process = { + command = "python3"; + args = [ + "-m" + "http.server" + "\${INSTANCE_PORT}" + "--bind" + "\${INSTANCE_IP}" + ]; + }; + healthCheck = { + type = "http"; + path = "/"; + intervalSecs = 5; + timeoutSecs = 3; + }; + }; + }; }; }; diff --git a/nix/test-cluster/run-cluster.sh b/nix/test-cluster/run-cluster.sh index 34e3fb8..03c834d 100755 --- a/nix/test-cluster/run-cluster.sh +++ b/nix/test-cluster/run-cluster.sh @@ -4969,8 +4969,8 @@ validate_native_runtime_flow() { wait_for_native_dump_count \ "photoncloud/clusters/test-cluster/services/" \ - 'map(select(.name == "native-web" or .name == "native-container")) | length' \ - "2" \ + 'map(select(.name == "native-web" or .name == "native-container" or .name == "native-daemon")) | length' \ + "3" \ 180 wait_for_native_dump_count \ "photoncloud/clusters/test-cluster/nodes/" \ @@ -4997,9 +4997,21 @@ validate_native_runtime_flow() { "${native_fresh_healthy_count_expr}" \ "1" \ 360 + wait_for_native_dump_count \ + "photoncloud/clusters/test-cluster/instances/native-daemon/" \ + 'length' \ + "2" \ + 300 + wait_for_native_dump_count \ + "photoncloud/clusters/test-cluster/instances/native-daemon/" \ + "${native_fresh_healthy_count_expr}" \ + "2" \ + 300 wait_for_http node04 "http://10.100.0.21:18190/" 240 wait_for_http node05 "http://10.100.0.22:18190/" 240 + wait_for_http node04 "http://10.100.0.21:18193/" 240 + wait_for_http node05 "http://10.100.0.22:18193/" 240 local container_value container_node container_ip container_port container_value="$(native_first_healthy_instance "native-container")" container_node="$(printf '%s' "${container_value}" | jq -r '.node_id')" @@ -5067,6 +5079,16 @@ validate_native_runtime_flow() { "${native_fresh_healthy_count_expr}" \ "1" \ 240 + wait_for_native_dump_count \ + "photoncloud/clusters/test-cluster/instances/native-daemon/" \ + 'length' \ + "1" \ + 240 + wait_for_native_dump_count \ + "photoncloud/clusters/test-cluster/instances/native-daemon/" \ + "${native_fresh_healthy_count_expr}" \ + "1" \ + 240 local drained_web_value drained_web_node drained_container_value drained_container_node drained_web_value="$(wait_for_native_instance_node "native-web" "node05" 240)" drained_web_node="$(printf '%s' "${drained_web_value}" | jq -r '.node_id')" @@ -5074,8 +5096,10 @@ validate_native_runtime_flow() { drained_container_value="$(wait_for_native_instance_node "native-container" "node05" 240)" drained_container_node="$(printf '%s' "${drained_container_value}" | jq -r '.node_id')" [[ "${drained_container_node}" == "node05" ]] || die "native-container did not relocate to node05 after draining node04" + wait_for_native_instance_node "native-daemon" "node05" 240 >/dev/null wait_for_http node05 "http://10.100.0.22:18190/" 240 wait_for_http node05 "http://10.100.0.22:18192/" 240 + wait_for_http node05 "http://10.100.0.22:18193/" 240 wait_for_http node01 "http://127.0.0.1:18191/" 240 publication_value="$(native_publication_state)" publication_pool_id="$(printf '%s' "${publication_value}" | jq -r '.load_balancer.pool_id')" @@ -5105,8 +5129,20 @@ validate_native_runtime_flow() { "${native_fresh_healthy_count_expr}" \ "1" \ 240 + wait_for_native_dump_count \ + "photoncloud/clusters/test-cluster/instances/native-daemon/" \ + 'length' \ + "2" \ + 240 + wait_for_native_dump_count \ + "photoncloud/clusters/test-cluster/instances/native-daemon/" \ + "${native_fresh_healthy_count_expr}" \ + "2" \ + 240 wait_for_native_instance_node "native-web" "node04" 240 >/dev/null wait_for_native_instance_node "native-web" "node05" 240 >/dev/null + wait_for_native_instance_node "native-daemon" "node04" 240 >/dev/null + wait_for_native_instance_node "native-daemon" "node05" 240 >/dev/null local restored_container_value restored_container_node restored_container_value="$(wait_for_native_instance_node "native-container" "node05" 240)" restored_container_node="$(printf '%s' "${restored_container_value}" | jq -r '.node_id')" @@ -5132,6 +5168,11 @@ validate_native_runtime_flow() { "${native_fresh_healthy_count_expr}" \ "1" \ 240 + wait_for_native_dump_count \ + "photoncloud/clusters/test-cluster/instances/native-daemon/" \ + "${native_fresh_healthy_count_expr}" \ + "1" \ + 240 local failover_web_value failover_web_node failover_container_value failover_container_node failover_web_value="$(wait_for_native_instance_node "native-web" "node04" 240)" @@ -5140,6 +5181,7 @@ validate_native_runtime_flow() { failover_container_value="$(wait_for_native_instance_node "native-container" "node04" 240)" failover_container_node="$(printf '%s' "${failover_container_value}" | jq -r '.node_id')" [[ "${failover_container_node}" == "node04" ]] || die "native-container did not fail over to node04 after node05 stopped" + wait_for_native_instance_node "native-daemon" "node04" 240 >/dev/null publication_value="$(native_publication_state)" publication_pool_id="$(printf '%s' "${publication_value}" | jq -r '.load_balancer.pool_id')" publication_ip="$(printf '%s' "${publication_value}" | jq -r '.dns.value')" @@ -5147,6 +5189,7 @@ validate_native_runtime_flow() { wait_for_native_dns_record "${publication_fqdn}" "${publication_ip}" 180 wait_for_http node04 "http://10.100.0.21:18190/" 240 wait_for_http node04 "http://10.100.0.21:18192/" 240 + wait_for_http node04 "http://10.100.0.21:18193/" 240 wait_for_http node01 "http://127.0.0.1:18191/" 240 log "Restarting native worker and ensuring declarative replica count is restored" @@ -5175,8 +5218,20 @@ validate_native_runtime_flow() { "${native_fresh_healthy_count_expr}" \ "1" \ 240 + wait_for_native_dump_count \ + "photoncloud/clusters/test-cluster/instances/native-daemon/" \ + 'length' \ + "2" \ + 240 + wait_for_native_dump_count \ + "photoncloud/clusters/test-cluster/instances/native-daemon/" \ + "${native_fresh_healthy_count_expr}" \ + "2" \ + 240 wait_for_native_instance_node "native-web" "node04" 240 >/dev/null wait_for_native_instance_node "native-web" "node05" 240 >/dev/null + wait_for_native_instance_node "native-daemon" "node04" 240 >/dev/null + wait_for_native_instance_node "native-daemon" "node05" 240 >/dev/null local recovered_container_value recovered_container_node recovered_container_value="$(wait_for_native_instance_node "native-container" "node04" 240)" recovered_container_node="$(printf '%s' "${recovered_container_value}" | jq -r '.node_id')"