photoncloud-monorepo/deployer/crates/fleet-scheduler/src/main.rs
centra 37f5479ab8
Some checks failed
Nix CI / filter (push) Failing after 1s
Nix CI / gate () (push) Has been skipped
Nix CI / gate (shared crates) (push) Has been skipped
Nix CI / build () (push) Has been skipped
Nix CI / ci-status (push) Failing after 1s
Add daemon scheduling for native services
2026-03-30 21:31:32 +09:00

1549 lines
51 KiB
Rust

mod auth;
mod publish;
use anyhow::{Context, Result};
use chainfire_client::Client;
use chrono::Utc;
use clap::Parser;
use deployer_types::{
ClusterNodeRecord, PlacementPolicy, ServiceInstanceSpec, ServiceScheduleMode, ServiceSpec,
};
use publish::{PublicationConfig, PublicationReconciler};
use serde_json::Value;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::time::Duration;
use tokio::time::sleep;
use tracing::{debug, info, warn};
use tracing_subscriber::EnvFilter;
const MANAGED_BY: &str = "fleet-scheduler";
#[derive(Debug, Parser)]
#[command(author, version, about = "PhotonCloud non-Kubernetes fleet scheduler")]
struct Cli {
#[arg(long, default_value = "http://127.0.0.1:7000")]
chainfire_endpoint: String,
#[arg(long, default_value = "photoncloud")]
cluster_namespace: String,
#[arg(long)]
cluster_id: String,
#[arg(long, default_value_t = 15)]
interval_secs: u64,
#[arg(long, default_value_t = 300)]
heartbeat_timeout_secs: u64,
#[arg(long, default_value_t = false)]
dry_run: bool,
#[arg(long)]
iam_endpoint: Option<String>,
#[arg(long)]
fiberlb_endpoint: Option<String>,
#[arg(long)]
flashdns_endpoint: Option<String>,
#[arg(long)]
publish_address: Option<String>,
#[arg(long, default_value = "default-org")]
default_org_id: String,
#[arg(long, default_value = "default-project")]
default_project_id: String,
#[arg(long, default_value = MANAGED_BY)]
controller_principal_id: String,
#[arg(long, default_value_t = false)]
once: bool,
}
struct Scheduler {
endpoint: String,
cluster_namespace: String,
cluster_id: String,
interval: Duration,
heartbeat_timeout_secs: u64,
dry_run: bool,
publication: PublicationReconciler,
once: bool,
}
#[derive(Debug)]
struct PlannedUpsert {
instance: ServiceInstanceSpec,
desired_value: Value,
}
#[derive(Debug, Default)]
struct ReconcilePlan {
upserts: Vec<PlannedUpsert>,
deletes: Vec<String>,
deferred_creates: usize,
deferred_updates: usize,
deferred_deletes: usize,
}
impl Scheduler {
fn new(cli: Cli) -> Self {
let cluster_namespace = cli.cluster_namespace;
let cluster_id = cli.cluster_id;
Self {
endpoint: cli.chainfire_endpoint,
cluster_namespace,
cluster_id: cluster_id.clone(),
interval: Duration::from_secs(cli.interval_secs),
heartbeat_timeout_secs: cli.heartbeat_timeout_secs,
dry_run: cli.dry_run,
publication: PublicationReconciler::new(PublicationConfig {
cluster_id,
heartbeat_timeout_secs: cli.heartbeat_timeout_secs,
iam_endpoint: cli.iam_endpoint,
fiberlb_endpoint: cli.fiberlb_endpoint,
flashdns_endpoint: cli.flashdns_endpoint,
publish_address: cli.publish_address,
controller_principal_id: cli.controller_principal_id,
default_org_id: cli.default_org_id,
default_project_id: cli.default_project_id,
dry_run: cli.dry_run,
}),
once: cli.once,
}
}
async fn run_loop(&self) -> Result<()> {
if self.once {
return self.reconcile_once().await;
}
loop {
if let Err(error) = self.reconcile_once().await {
warn!(error = %error, "fleet scheduler reconciliation failed");
}
sleep(self.interval).await;
}
}
async fn reconcile_once(&self) -> Result<()> {
let mut client = Client::connect(self.endpoint.clone()).await?;
let nodes = self.load_cluster_nodes(&mut client).await?;
let services = self.load_services(&mut client).await?;
debug!(
nodes = nodes.len(),
services = services.len(),
"loaded scheduler inputs"
);
for service in &services {
if service.schedule.is_none() {
continue;
}
self.reconcile_service(&mut client, &nodes, service).await?;
}
self.publication
.reconcile_all(
&mut client,
&self.cluster_namespace,
&self.cluster_id,
&services,
self.dry_run,
)
.await?;
Ok(())
}
async fn load_cluster_nodes(&self, client: &mut Client) -> Result<Vec<ClusterNodeRecord>> {
let prefix = format!(
"{}/clusters/{}/nodes/",
self.cluster_namespace, self.cluster_id
);
let kvs = client.get_prefix(prefix.as_bytes()).await?;
let mut nodes = Vec::with_capacity(kvs.len());
for (_key, value) in kvs {
match serde_json::from_slice::<ClusterNodeRecord>(&value) {
Ok(node) => nodes.push(node),
Err(error) => warn!(error = %error, "failed to decode cluster node"),
}
}
nodes.sort_by(|lhs, rhs| lhs.node_id.cmp(&rhs.node_id));
Ok(nodes)
}
async fn load_services(&self, client: &mut Client) -> Result<Vec<ServiceSpec>> {
let prefix = format!(
"{}/clusters/{}/services/",
self.cluster_namespace, self.cluster_id
);
let kvs = client.get_prefix(prefix.as_bytes()).await?;
let mut services = Vec::with_capacity(kvs.len());
for (_key, value) in kvs {
match serde_json::from_slice::<ServiceSpec>(&value) {
Ok(service) => services.push(service),
Err(error) => warn!(error = %error, "failed to decode service spec"),
}
}
services.sort_by(|lhs, rhs| lhs.name.cmp(&rhs.name));
Ok(services)
}
async fn reconcile_service(
&self,
client: &mut Client,
nodes: &[ClusterNodeRecord],
service: &ServiceSpec,
) -> Result<()> {
let schedule = service
.schedule
.as_ref()
.context("service marked schedulable without schedule block")?;
let eligible_nodes =
eligible_nodes(nodes, &schedule.placement, self.heartbeat_timeout_secs);
if eligible_nodes.is_empty() {
warn!(service = %service.name, "no eligible nodes for scheduled service");
return Ok(());
}
let existing = self.load_instance_values(client, &service.name).await?;
let existing_instances = decode_managed_instances(&existing);
let desired_instances =
build_desired_instances(service, &eligible_nodes, &existing_instances)?;
let target_instances = schedule_target_count(schedule.mode, eligible_nodes.len(), schedule.replicas);
if desired_instances.len() < target_instances {
warn!(
service = %service.name,
requested = target_instances,
scheduled = desired_instances.len(),
mode = ?schedule.mode,
"insufficient eligible node capacity for requested service instances"
);
}
let plan = plan_managed_reconciliation(
service,
&desired_instances,
&existing,
&existing_instances,
self.heartbeat_timeout_secs,
)?;
for upsert in plan.upserts {
let key = instance_key(
&self.cluster_namespace,
&self.cluster_id,
&upsert.instance.service,
&upsert.instance.instance_id,
);
if self.dry_run {
info!(
service = %service.name,
instance_id = %upsert.instance.instance_id,
node_id = %upsert.instance.node_id,
"would upsert managed instance"
);
} else {
client
.put(&key, serde_json::to_vec(&upsert.desired_value)?)
.await?;
info!(
service = %service.name,
instance_id = %upsert.instance.instance_id,
node_id = %upsert.instance.node_id,
"upserted managed instance"
);
}
}
if plan.deferred_creates > 0 || plan.deferred_updates > 0 || plan.deferred_deletes > 0 {
info!(
service = %service.name,
deferred_creates = plan.deferred_creates,
deferred_updates = plan.deferred_updates,
deferred_deletes = plan.deferred_deletes,
"deferring managed instance changes until rollout budget frees"
);
}
for instance_id in plan.deletes {
let key = instance_key(
&self.cluster_namespace,
&self.cluster_id,
&service.name,
&instance_id,
);
if self.dry_run {
info!(
service = %service.name,
instance_id = %instance_id,
"would delete stale managed instance"
);
} else if client.delete(&key).await? {
info!(
service = %service.name,
instance_id = %instance_id,
"deleted stale managed instance"
);
}
}
Ok(())
}
async fn load_instance_values(
&self,
client: &mut Client,
service: &str,
) -> Result<HashMap<String, Value>> {
let prefix = format!(
"{}/clusters/{}/instances/{}/",
self.cluster_namespace, self.cluster_id, service
);
let kvs = client.get_prefix(prefix.as_bytes()).await?;
let mut instances = HashMap::with_capacity(kvs.len());
for (_key, value) in kvs {
let parsed: Value = match serde_json::from_slice(&value) {
Ok(value) => value,
Err(error) => {
warn!(service = %service, error = %error, "failed to decode instance value");
continue;
}
};
let Some(instance_id) = parsed
.get("instance_id")
.and_then(|value| value.as_str())
.map(|value| value.to_string())
else {
warn!(service = %service, "instance record missing instance_id");
continue;
};
instances.insert(instance_id, parsed);
}
Ok(instances)
}
}
fn eligible_nodes<'a>(
nodes: &'a [ClusterNodeRecord],
placement: &PlacementPolicy,
heartbeat_timeout_secs: u64,
) -> Vec<&'a ClusterNodeRecord> {
nodes
.iter()
.filter(|node| node_is_eligible(node, placement, heartbeat_timeout_secs))
.collect()
}
fn node_is_eligible(
node: &ClusterNodeRecord,
placement: &PlacementPolicy,
heartbeat_timeout_secs: u64,
) -> bool {
if node.state.as_deref() != Some("active") {
return false;
}
if heartbeat_timeout_secs > 0 {
let Some(last) = node.last_heartbeat else {
return false;
};
let age = Utc::now().signed_duration_since(last).num_seconds();
if age > heartbeat_timeout_secs as i64 {
return false;
}
}
if !placement.roles.is_empty()
&& !node
.roles
.iter()
.any(|role| placement.roles.iter().any(|expected| expected == role))
{
return false;
}
if !placement.pools.is_empty()
&& !node_pool(node)
.map(|pool| placement.pools.iter().any(|expected| expected == pool))
.unwrap_or(false)
{
return false;
}
if !placement.node_classes.is_empty()
&& !node_class(node)
.map(|node_class| {
placement
.node_classes
.iter()
.any(|expected| expected == node_class)
})
.unwrap_or(false)
{
return false;
}
placement
.match_labels
.iter()
.all(|(key, value)| node.labels.get(key) == Some(value))
}
fn build_desired_instances(
service: &ServiceSpec,
eligible_nodes: &[&ClusterNodeRecord],
existing_instances: &[ServiceInstanceSpec],
) -> Result<Vec<ServiceInstanceSpec>> {
let schedule = service
.schedule
.as_ref()
.context("scheduled service missing schedule block")?;
match schedule.mode {
ServiceScheduleMode::Replicated => {
build_replicated_desired_instances(service, eligible_nodes, existing_instances)
}
ServiceScheduleMode::Daemon => {
build_daemon_desired_instances(service, eligible_nodes, existing_instances)
}
}
}
fn build_replicated_desired_instances(
service: &ServiceSpec,
eligible_nodes: &[&ClusterNodeRecord],
existing_instances: &[ServiceInstanceSpec],
) -> Result<Vec<ServiceInstanceSpec>> {
let schedule = service
.schedule
.as_ref()
.context("scheduled service missing schedule block")?;
let port = resolve_instance_port(service).with_context(|| {
format!(
"service {} is missing instance_port and service ports",
service.name
)
})?;
let max_instances_per_node = schedule.placement.max_instances_per_node.max(1);
let eligible_by_node: HashMap<&str, &ClusterNodeRecord> = eligible_nodes
.iter()
.copied()
.map(|node| (node.node_id.as_str(), node))
.collect();
let mut counts: BTreeMap<String, u32> = eligible_nodes
.iter()
.map(|node| (node.node_id.clone(), 0))
.collect();
let mut used_ordinals: BTreeMap<String, HashSet<u32>> = BTreeMap::new();
let mut desired = Vec::new();
let mut reusable = existing_instances
.iter()
.filter(|instance| eligible_by_node.contains_key(instance.node_id.as_str()))
.filter(|instance| instance_is_reusable(instance))
.collect::<Vec<_>>();
reusable.sort_by(|lhs, rhs| {
instance_state_rank(lhs)
.cmp(&instance_state_rank(rhs))
.then_with(|| lhs.instance_id.cmp(&rhs.instance_id))
});
for instance in reusable {
if desired.len() >= schedule.replicas as usize {
break;
}
let Some(node) = eligible_by_node.get(instance.node_id.as_str()).copied() else {
continue;
};
let ordinal = counts.get(&node.node_id).copied().unwrap_or(0);
if ordinal >= max_instances_per_node {
continue;
}
counts.insert(node.node_id.clone(), ordinal + 1);
if let Some(parsed_ordinal) =
parse_instance_ordinal(&service.name, &node.node_id, &instance.instance_id)
{
used_ordinals
.entry(node.node_id.clone())
.or_default()
.insert(parsed_ordinal);
}
desired.push(desired_instance(service, node, &instance.instance_id, port));
}
while desired.len() < schedule.replicas as usize {
let Some(node) = pick_next_node(
eligible_nodes,
&counts,
max_instances_per_node,
schedule.placement.spread_by_label.as_deref(),
existing_instances,
) else {
break;
};
let ordinal = counts.get(&node.node_id).copied().unwrap_or(0);
counts.insert(node.node_id.clone(), ordinal + 1);
let instance_id = render_next_instance_id(&service.name, &node.node_id, &mut used_ordinals);
desired.push(desired_instance(service, node, &instance_id, port));
}
Ok(desired)
}
fn build_daemon_desired_instances(
service: &ServiceSpec,
eligible_nodes: &[&ClusterNodeRecord],
existing_instances: &[ServiceInstanceSpec],
) -> Result<Vec<ServiceInstanceSpec>> {
let port = resolve_instance_port(service).with_context(|| {
format!(
"service {} is missing instance_port and service ports",
service.name
)
})?;
let eligible_by_node: HashSet<&str> = eligible_nodes
.iter()
.map(|node| node.node_id.as_str())
.collect();
let mut reusable = existing_instances
.iter()
.filter(|instance| eligible_by_node.contains(instance.node_id.as_str()))
.filter(|instance| instance_is_reusable(instance))
.collect::<Vec<_>>();
reusable.sort_by(|lhs, rhs| {
lhs.node_id
.cmp(&rhs.node_id)
.then_with(|| instance_state_rank(lhs).cmp(&instance_state_rank(rhs)))
.then_with(|| lhs.instance_id.cmp(&rhs.instance_id))
});
let mut reusable_by_node: BTreeMap<String, Vec<String>> = BTreeMap::new();
for instance in reusable {
reusable_by_node
.entry(instance.node_id.clone())
.or_default()
.push(instance.instance_id.clone());
}
let mut desired = Vec::with_capacity(eligible_nodes.len());
for node in eligible_nodes {
let instance_id = reusable_by_node
.get_mut(&node.node_id)
.and_then(|instance_ids| {
if instance_ids.is_empty() {
None
} else {
Some(instance_ids.remove(0))
}
})
.unwrap_or_else(|| render_instance_id(&service.name, &node.node_id, 0));
desired.push(desired_instance(service, node, &instance_id, port));
}
Ok(desired)
}
fn desired_instance(
service: &ServiceSpec,
node: &ClusterNodeRecord,
instance_id: &str,
port: u16,
) -> ServiceInstanceSpec {
let schedule = service
.schedule
.as_ref()
.expect("scheduled service missing schedule block");
ServiceInstanceSpec {
instance_id: instance_id.to_string(),
service: service.name.clone(),
node_id: node.node_id.clone(),
ip: node.ip.clone(),
port,
mesh_port: schedule.mesh_port,
version: None,
health_check: schedule.health_check.clone(),
process: schedule.process.clone(),
container: schedule.container.clone(),
managed_by: Some(MANAGED_BY.to_string()),
state: None,
last_heartbeat: None,
observed_at: None,
}
}
fn pick_next_node<'a>(
eligible_nodes: &'a [&ClusterNodeRecord],
counts: &BTreeMap<String, u32>,
max_instances_per_node: u32,
spread_by_label: Option<&str>,
existing_instances: &[ServiceInstanceSpec],
) -> Option<&'a ClusterNodeRecord> {
eligible_nodes
.iter()
.copied()
.filter(|node| counts.get(&node.node_id).copied().unwrap_or(0) < max_instances_per_node)
.min_by(|lhs, rhs| {
let lhs_spread = spread_count_for_node(eligible_nodes, counts, lhs, spread_by_label);
let rhs_spread = spread_count_for_node(eligible_nodes, counts, rhs, spread_by_label);
let lhs_count = counts.get(&lhs.node_id).copied().unwrap_or(0);
let rhs_count = counts.get(&rhs.node_id).copied().unwrap_or(0);
let lhs_preference = node_preference_rank(existing_instances, &lhs.node_id);
let rhs_preference = node_preference_rank(existing_instances, &rhs.node_id);
lhs_spread
.cmp(&rhs_spread)
.then_with(|| lhs_count.cmp(&rhs_count))
.then_with(|| lhs_preference.cmp(&rhs_preference))
.then_with(|| {
spread_value(lhs, spread_by_label).cmp(&spread_value(rhs, spread_by_label))
})
.then_with(|| lhs.node_id.cmp(&rhs.node_id))
})
}
fn spread_count_for_node(
eligible_nodes: &[&ClusterNodeRecord],
counts: &BTreeMap<String, u32>,
node: &ClusterNodeRecord,
spread_by_label: Option<&str>,
) -> u32 {
let Some(spread_by_label) = spread_by_label else {
return 0;
};
let target = spread_value(node, Some(spread_by_label));
eligible_nodes
.iter()
.filter(|candidate| spread_value(candidate, Some(spread_by_label)) == target)
.map(|candidate| counts.get(&candidate.node_id).copied().unwrap_or(0))
.sum()
}
fn spread_value(node: &ClusterNodeRecord, spread_by_label: Option<&str>) -> String {
let Some(label) = spread_by_label else {
return String::new();
};
match label {
"pool" => node_pool(node)
.map(ToOwned::to_owned)
.unwrap_or_else(|| node.node_id.clone()),
"node_class" => node_class(node)
.map(ToOwned::to_owned)
.unwrap_or_else(|| node.node_id.clone()),
"failure_domain" => node
.failure_domain
.clone()
.or_else(|| node.labels.get("failure_domain").cloned())
.or_else(|| node.labels.get("topology.kubernetes.io/zone").cloned())
.unwrap_or_else(|| node.node_id.clone()),
other => node
.labels
.get(other)
.cloned()
.unwrap_or_else(|| node.node_id.clone()),
}
}
fn node_pool(node: &ClusterNodeRecord) -> Option<&str> {
node.pool
.as_deref()
.or_else(|| node.labels.get("pool").map(String::as_str))
.or_else(|| {
node.labels
.get("pool.photoncloud.io/name")
.map(String::as_str)
})
}
fn node_class(node: &ClusterNodeRecord) -> Option<&str> {
node.node_class
.as_deref()
.or_else(|| node.labels.get("node_class").map(String::as_str))
.or_else(|| {
node.labels
.get("nodeclass.photoncloud.io/name")
.map(String::as_str)
})
}
fn resolve_instance_port(service: &ServiceSpec) -> Option<u16> {
service
.schedule
.as_ref()
.and_then(|schedule| schedule.instance_port)
.or_else(|| service.ports.as_ref().and_then(|ports| ports.http))
.or_else(|| service.ports.as_ref().and_then(|ports| ports.grpc))
}
fn schedule_target_count(mode: ServiceScheduleMode, eligible_nodes: usize, replicas: u32) -> usize {
match mode {
ServiceScheduleMode::Replicated => replicas as usize,
ServiceScheduleMode::Daemon => eligible_nodes,
}
}
fn render_instance_id(service: &str, node_id: &str, ordinal: u32) -> String {
if ordinal == 0 {
format!("{service}-{node_id}")
} else {
format!("{service}-{node_id}-{}", ordinal + 1)
}
}
fn render_next_instance_id(
service: &str,
node_id: &str,
used_ordinals: &mut BTreeMap<String, HashSet<u32>>,
) -> String {
let used = used_ordinals.entry(node_id.to_string()).or_default();
let mut ordinal = 0;
while used.contains(&ordinal) {
ordinal += 1;
}
used.insert(ordinal);
render_instance_id(service, node_id, ordinal)
}
fn parse_instance_ordinal(service: &str, node_id: &str, instance_id: &str) -> Option<u32> {
let base = format!("{service}-{node_id}");
if instance_id == base {
return Some(0);
}
let suffix = instance_id.strip_prefix(&format!("{base}-"))?;
let ordinal = suffix.parse::<u32>().ok()?;
ordinal.checked_sub(1)
}
fn decode_managed_instances(existing: &HashMap<String, Value>) -> Vec<ServiceInstanceSpec> {
let mut decoded = Vec::new();
for value in existing.values() {
if !is_managed_by_scheduler(value) {
continue;
}
match serde_json::from_value::<ServiceInstanceSpec>(value.clone()) {
Ok(instance) => decoded.push(instance),
Err(error) => warn!(error = %error, "failed to decode managed instance"),
}
}
decoded.sort_by(|lhs, rhs| lhs.instance_id.cmp(&rhs.instance_id));
decoded
}
fn plan_managed_reconciliation(
service: &ServiceSpec,
desired_instances: &[ServiceInstanceSpec],
existing_values: &HashMap<String, Value>,
existing_instances: &[ServiceInstanceSpec],
heartbeat_timeout_secs: u64,
) -> Result<ReconcilePlan> {
let schedule = service
.schedule
.as_ref()
.context("scheduled service missing schedule block")?;
let desired_replicas = desired_instances.len();
let min_available = desired_replicas.saturating_sub(schedule.rollout.max_unavailable as usize);
let max_total_instances = desired_replicas + schedule.rollout.max_surge as usize;
let mut available_count = existing_instances
.iter()
.filter(|instance| instance_is_available(instance, heartbeat_timeout_secs))
.count();
let desired_ids: HashSet<_> = desired_instances
.iter()
.map(|instance| instance.instance_id.clone())
.collect();
let mut plan = ReconcilePlan::default();
let mut managed_count = existing_instances.len();
let mut stale_instances = existing_instances
.iter()
.filter(|instance| !desired_ids.contains(&instance.instance_id))
.collect::<Vec<_>>();
stale_instances.sort_by(|lhs, rhs| {
instance_state_rank(lhs)
.cmp(&instance_state_rank(rhs))
.then_with(|| lhs.instance_id.cmp(&rhs.instance_id))
});
for instance in stale_instances {
if instance_is_available(instance, heartbeat_timeout_secs)
&& available_count.saturating_sub(1) < min_available
{
plan.deferred_deletes += 1;
continue;
}
if instance_is_available(instance, heartbeat_timeout_secs) {
available_count = available_count.saturating_sub(1);
}
managed_count = managed_count.saturating_sub(1);
plan.deletes.push(instance.instance_id.clone());
}
let mut create_budget = max_total_instances.saturating_sub(managed_count);
let mut disruption_budget = available_count.saturating_sub(min_available);
for instance in desired_instances {
let existing_value = existing_values.get(&instance.instance_id);
let desired_value = merge_preserved_fields(serde_json::to_value(instance)?, existing_value);
if existing_value == Some(&desired_value) {
continue;
}
let existing_instance = existing_instances
.iter()
.find(|current| current.instance_id == instance.instance_id);
match existing_instance {
None => {
if create_budget == 0 {
plan.deferred_creates += 1;
continue;
}
create_budget -= 1;
plan.upserts.push(PlannedUpsert {
instance: instance.clone(),
desired_value,
});
}
Some(current) => {
if instance_is_available(current, heartbeat_timeout_secs) {
if disruption_budget == 0 {
plan.deferred_updates += 1;
continue;
}
disruption_budget -= 1;
}
plan.upserts.push(PlannedUpsert {
instance: instance.clone(),
desired_value,
});
}
}
}
Ok(plan)
}
fn instance_is_available(instance: &ServiceInstanceSpec, heartbeat_timeout_secs: u64) -> bool {
matches!(instance.state.as_deref(), Some("healthy"))
&& instance_has_fresh_heartbeat(instance, heartbeat_timeout_secs)
}
fn instance_has_fresh_heartbeat(instance: &ServiceInstanceSpec, heartbeat_timeout_secs: u64) -> bool {
if heartbeat_timeout_secs == 0 {
return true;
}
let Some(last_heartbeat) = instance.last_heartbeat.or(instance.observed_at) else {
return false;
};
Utc::now()
.signed_duration_since(last_heartbeat)
.num_seconds()
<= heartbeat_timeout_secs as i64
}
fn instance_is_reusable(instance: &ServiceInstanceSpec) -> bool {
!matches!(
instance.state.as_deref(),
Some("unhealthy") | Some("failed")
)
}
fn instance_state_rank(instance: &ServiceInstanceSpec) -> u8 {
match instance.state.as_deref() {
Some("healthy") => 0,
Some("pending") | Some("provisioning") | Some("starting") | None => 1,
Some("unknown") => 2,
Some("unhealthy") | Some("failed") => 3,
Some(_) => 4,
}
}
fn node_preference_rank(existing_instances: &[ServiceInstanceSpec], node_id: &str) -> u8 {
let mut saw_failed = false;
for instance in existing_instances
.iter()
.filter(|instance| instance.node_id == node_id)
{
if instance_is_reusable(instance) {
return 0;
}
saw_failed = true;
}
if saw_failed {
2
} else {
1
}
}
fn merge_preserved_fields(mut desired: Value, existing: Option<&Value>) -> Value {
let Some(existing) = existing else {
return desired;
};
let (Some(desired_obj), Some(existing_obj)) = (desired.as_object_mut(), existing.as_object())
else {
return desired;
};
for preserve_key in ["state", "last_heartbeat", "observed_at"] {
if let Some(value) = existing_obj.get(preserve_key) {
match desired_obj.get_mut(preserve_key) {
Some(slot) if slot.is_null() => *slot = value.clone(),
Some(_) => {}
None => {
desired_obj.insert(preserve_key.to_string(), value.clone());
}
}
}
}
desired
}
fn is_managed_by_scheduler(value: &Value) -> bool {
value.get("managed_by").and_then(|value| value.as_str()) == Some(MANAGED_BY)
}
fn instance_key(
cluster_namespace: &str,
cluster_id: &str,
service: &str,
instance_id: &str,
) -> String {
format!(
"{}/clusters/{}/instances/{}/{}",
cluster_namespace, cluster_id, service, instance_id
)
}
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env().add_directive("info".parse()?))
.init();
let scheduler = Scheduler::new(Cli::parse());
scheduler.run_loop().await
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Duration as ChronoDuration;
use deployer_types::{
ClusterNodeRecord, HealthCheckSpec, PlacementPolicy, ProcessSpec, RolloutStrategySpec,
ServicePorts, ServiceScheduleMode, ServiceScheduleSpec,
};
fn active_node(node_id: &str, roles: &[&str], labels: &[(&str, &str)]) -> ClusterNodeRecord {
ClusterNodeRecord {
node_id: node_id.to_string(),
machine_id: None,
ip: format!("10.0.0.{}", &node_id[node_id.len() - 1..]),
hostname: node_id.to_string(),
roles: roles.iter().map(|role| role.to_string()).collect(),
labels: labels
.iter()
.map(|(key, value)| (key.to_string(), value.to_string()))
.collect(),
pool: Some("general".to_string()),
node_class: Some("worker-linux".to_string()),
failure_domain: Some(format!("rack-{}", &node_id[node_id.len() - 1..])),
nix_profile: Some("profiles/worker-linux".to_string()),
install_plan: None,
hardware_facts: None,
state: Some("active".to_string()),
commission_state: None,
install_state: None,
commissioned_at: None,
last_inventory_hash: None,
power_state: None,
bmc_ref: None,
last_heartbeat: Some(Utc::now() - ChronoDuration::seconds(10)),
}
}
fn scheduled_service() -> ServiceSpec {
ServiceSpec {
name: "api".to_string(),
ports: Some(ServicePorts {
http: Some(8080),
grpc: None,
}),
protocol: Some("http".to_string()),
mtls_required: None,
mesh_mode: None,
schedule: Some(ServiceScheduleSpec {
mode: ServiceScheduleMode::Replicated,
replicas: 2,
placement: PlacementPolicy {
roles: vec!["worker".to_string()],
pools: vec!["general".to_string()],
node_classes: vec!["worker-linux".to_string()],
match_labels: HashMap::from([("tier".to_string(), "general".to_string())]),
spread_by_label: Some("failure_domain".to_string()),
max_instances_per_node: 1,
},
rollout: RolloutStrategySpec::default(),
instance_port: None,
mesh_port: Some(18080),
process: Some(ProcessSpec {
command: "/usr/bin/api".to_string(),
args: vec!["serve".to_string()],
working_dir: None,
env: HashMap::new(),
}),
container: None,
health_check: Some(HealthCheckSpec {
check_type: "http".to_string(),
path: Some("/health".to_string()),
interval_secs: Some(10),
timeout_secs: Some(5),
startup_grace_secs: Some(30),
}),
}),
publish: None,
}
}
#[test]
fn test_node_eligibility_matches_roles_and_labels() {
let node = active_node("node01", &["worker"], &[("tier", "general")]);
let placement = PlacementPolicy {
roles: vec!["worker".to_string()],
pools: vec!["general".to_string()],
node_classes: vec!["worker-linux".to_string()],
match_labels: HashMap::from([("tier".to_string(), "general".to_string())]),
spread_by_label: Some("failure_domain".to_string()),
max_instances_per_node: 1,
};
assert!(node_is_eligible(&node, &placement, 300));
}
#[test]
fn test_node_eligibility_rejects_stale_or_wrong_label() {
let mut stale = active_node("node01", &["worker"], &[("tier", "general")]);
stale.last_heartbeat = Some(Utc::now() - ChronoDuration::seconds(600));
let placement = PlacementPolicy {
roles: vec!["worker".to_string()],
pools: vec!["gpu".to_string()],
node_classes: vec!["gpu-worker".to_string()],
match_labels: HashMap::from([("tier".to_string(), "gpu".to_string())]),
spread_by_label: Some("failure_domain".to_string()),
max_instances_per_node: 1,
};
assert!(!node_is_eligible(&stale, &placement, 300));
}
#[test]
fn test_build_desired_instances_spreads_across_matching_nodes() {
let nodes = vec![
active_node("node01", &["worker"], &[("tier", "general")]),
active_node("node02", &["worker"], &[("tier", "general")]),
];
let refs: Vec<&ClusterNodeRecord> = nodes.iter().collect();
let desired = build_desired_instances(&scheduled_service(), &refs, &[]).unwrap();
assert_eq!(desired.len(), 2);
assert_eq!(desired[0].instance_id, "api-node01");
assert_eq!(desired[1].instance_id, "api-node02");
assert_eq!(desired[0].process.as_ref().unwrap().command, "/usr/bin/api");
}
#[test]
fn test_build_desired_instances_honors_max_instances_per_node() {
let nodes = vec![active_node("node01", &["worker"], &[("tier", "general")])];
let refs: Vec<&ClusterNodeRecord> = nodes.iter().collect();
let mut service = scheduled_service();
let schedule = service.schedule.as_mut().unwrap();
schedule.replicas = 3;
schedule.placement.max_instances_per_node = 2;
let desired = build_desired_instances(&service, &refs, &[]).unwrap();
assert_eq!(desired.len(), 2);
assert_eq!(desired[0].instance_id, "api-node01");
assert_eq!(desired[1].instance_id, "api-node01-2");
}
#[test]
fn test_build_desired_instances_daemon_places_on_every_eligible_node() {
let nodes = vec![
active_node("node01", &["worker"], &[("tier", "general")]),
active_node("node02", &["worker"], &[("tier", "general")]),
];
let refs: Vec<&ClusterNodeRecord> = nodes.iter().collect();
let mut service = scheduled_service();
let schedule = service.schedule.as_mut().unwrap();
schedule.mode = ServiceScheduleMode::Daemon;
schedule.replicas = 1;
let desired = build_desired_instances(&service, &refs, &[]).unwrap();
assert_eq!(desired.len(), 2);
assert_eq!(desired[0].instance_id, "api-node01");
assert_eq!(desired[1].instance_id, "api-node02");
}
#[test]
fn test_build_desired_instances_daemon_reuses_existing_instance_per_node() {
let nodes = vec![
active_node("node01", &["worker"], &[("tier", "general")]),
active_node("node02", &["worker"], &[("tier", "general")]),
];
let refs: Vec<&ClusterNodeRecord> = nodes.iter().collect();
let mut service = scheduled_service();
let schedule = service.schedule.as_mut().unwrap();
schedule.mode = ServiceScheduleMode::Daemon;
schedule.replicas = 1;
let existing = vec![
ServiceInstanceSpec {
instance_id: "api-node01-2".to_string(),
service: "api".to_string(),
node_id: "node01".to_string(),
ip: "10.0.0.1".to_string(),
port: 8080,
mesh_port: Some(18080),
version: None,
health_check: None,
process: None,
container: None,
managed_by: Some(MANAGED_BY.to_string()),
state: Some("healthy".to_string()),
last_heartbeat: None,
observed_at: None,
},
ServiceInstanceSpec {
instance_id: "api-node02".to_string(),
service: "api".to_string(),
node_id: "node02".to_string(),
ip: "10.0.0.2".to_string(),
port: 8080,
mesh_port: Some(18080),
version: None,
health_check: None,
process: None,
container: None,
managed_by: Some(MANAGED_BY.to_string()),
state: Some("healthy".to_string()),
last_heartbeat: None,
observed_at: None,
},
];
let desired = build_desired_instances(&service, &refs, &existing).unwrap();
assert_eq!(desired.len(), 2);
assert_eq!(desired[0].instance_id, "api-node01-2");
assert_eq!(desired[1].instance_id, "api-node02");
}
#[test]
fn test_pick_next_node_prefers_less_used_failure_domain() {
let nodes = vec![
active_node("node01", &["worker"], &[("tier", "general")]),
active_node("node02", &["worker"], &[("tier", "general")]),
active_node("node03", &["worker"], &[("tier", "general")]),
];
let refs: Vec<&ClusterNodeRecord> = nodes.iter().collect();
let counts = BTreeMap::from([
("node01".to_string(), 1),
("node02".to_string(), 0),
("node03".to_string(), 1),
]);
let picked = pick_next_node(&refs, &counts, 2, Some("failure_domain"), &[]).unwrap();
assert_eq!(picked.node_id, "node02");
}
#[test]
fn test_build_desired_instances_preserves_existing_healthy_placement() {
let nodes = vec![
active_node("node01", &["worker"], &[("tier", "general")]),
active_node("node02", &["worker"], &[("tier", "general")]),
];
let refs: Vec<&ClusterNodeRecord> = nodes.iter().collect();
let mut service = scheduled_service();
service.schedule.as_mut().unwrap().replicas = 1;
let existing = vec![ServiceInstanceSpec {
instance_id: "api-node02".to_string(),
service: "api".to_string(),
node_id: "node02".to_string(),
ip: "10.0.0.2".to_string(),
port: 8080,
mesh_port: Some(18080),
version: None,
health_check: None,
process: None,
container: None,
managed_by: Some(MANAGED_BY.to_string()),
state: Some("healthy".to_string()),
last_heartbeat: None,
observed_at: None,
}];
let desired = build_desired_instances(&service, &refs, &existing).unwrap();
assert_eq!(desired.len(), 1);
assert_eq!(desired[0].instance_id, "api-node02");
assert_eq!(desired[0].node_id, "node02");
}
#[test]
fn test_build_desired_instances_avoids_unhealthy_node_when_spare_exists() {
let nodes = vec![
active_node("node01", &["worker"], &[("tier", "general")]),
active_node("node02", &["worker"], &[("tier", "general")]),
];
let refs: Vec<&ClusterNodeRecord> = nodes.iter().collect();
let mut service = scheduled_service();
service.schedule.as_mut().unwrap().replicas = 1;
let existing = vec![ServiceInstanceSpec {
instance_id: "api-node01".to_string(),
service: "api".to_string(),
node_id: "node01".to_string(),
ip: "10.0.0.1".to_string(),
port: 8080,
mesh_port: Some(18080),
version: None,
health_check: None,
process: None,
container: None,
managed_by: Some(MANAGED_BY.to_string()),
state: Some("unhealthy".to_string()),
last_heartbeat: None,
observed_at: None,
}];
let desired = build_desired_instances(&service, &refs, &existing).unwrap();
assert_eq!(desired.len(), 1);
assert_eq!(desired[0].node_id, "node02");
assert_eq!(desired[0].instance_id, "api-node02");
}
#[test]
fn test_plan_reconciliation_defers_delete_until_replacement_is_healthy() {
let mut service = scheduled_service();
let schedule = service.schedule.as_mut().unwrap();
schedule.replicas = 1;
schedule.rollout = RolloutStrategySpec {
max_unavailable: 0,
max_surge: 1,
};
let existing_instance = ServiceInstanceSpec {
instance_id: "api-node01".to_string(),
service: "api".to_string(),
node_id: "node01".to_string(),
ip: "10.0.0.1".to_string(),
port: 8080,
mesh_port: Some(18080),
version: None,
health_check: None,
process: schedule.process.clone(),
container: None,
managed_by: Some(MANAGED_BY.to_string()),
state: Some("healthy".to_string()),
last_heartbeat: None,
observed_at: None,
};
let existing = HashMap::from([(
existing_instance.instance_id.clone(),
serde_json::to_value(&existing_instance).unwrap(),
)]);
let desired_instances = vec![ServiceInstanceSpec {
instance_id: "api-node02".to_string(),
node_id: "node02".to_string(),
ip: "10.0.0.2".to_string(),
..existing_instance.clone()
}];
let plan = plan_managed_reconciliation(
&service,
&desired_instances,
&existing,
std::slice::from_ref(&existing_instance),
0,
)
.unwrap();
assert_eq!(plan.upserts.len(), 1);
assert_eq!(plan.upserts[0].instance.instance_id, "api-node02");
assert!(plan.deletes.is_empty());
assert_eq!(plan.deferred_deletes, 1);
}
#[test]
fn test_plan_reconciliation_limits_healthy_updates_by_rollout_budget() {
let mut service = scheduled_service();
let schedule = service.schedule.as_mut().unwrap();
schedule.replicas = 2;
schedule.rollout = RolloutStrategySpec {
max_unavailable: 1,
max_surge: 0,
};
let old_process = schedule.process.clone().unwrap();
let mut new_process = old_process.clone();
new_process.args.push("--new-flag".to_string());
schedule.process = Some(new_process.clone());
let existing_instances = vec![
ServiceInstanceSpec {
instance_id: "api-node01".to_string(),
service: "api".to_string(),
node_id: "node01".to_string(),
ip: "10.0.0.1".to_string(),
port: 8080,
mesh_port: Some(18080),
version: None,
health_check: None,
process: Some(old_process.clone()),
container: None,
managed_by: Some(MANAGED_BY.to_string()),
state: Some("healthy".to_string()),
last_heartbeat: None,
observed_at: None,
},
ServiceInstanceSpec {
instance_id: "api-node02".to_string(),
service: "api".to_string(),
node_id: "node02".to_string(),
ip: "10.0.0.2".to_string(),
port: 8080,
mesh_port: Some(18080),
version: None,
health_check: None,
process: Some(old_process),
container: None,
managed_by: Some(MANAGED_BY.to_string()),
state: Some("healthy".to_string()),
last_heartbeat: None,
observed_at: None,
},
];
let existing = existing_instances
.iter()
.map(|instance| {
(
instance.instance_id.clone(),
serde_json::to_value(instance).unwrap(),
)
})
.collect::<HashMap<_, _>>();
let desired_instances = existing_instances
.iter()
.map(|instance| ServiceInstanceSpec {
process: Some(new_process.clone()),
..instance.clone()
})
.collect::<Vec<_>>();
let plan = plan_managed_reconciliation(
&service,
&desired_instances,
&existing,
&existing_instances,
0,
)
.unwrap();
assert_eq!(plan.upserts.len(), 1);
assert_eq!(plan.deferred_updates, 1);
}
#[test]
fn test_plan_reconciliation_treats_stale_healthy_instance_as_unavailable() {
let mut service = scheduled_service();
let schedule = service.schedule.as_mut().unwrap();
schedule.replicas = 1;
schedule.rollout = RolloutStrategySpec {
max_unavailable: 0,
max_surge: 0,
};
let existing_instance = ServiceInstanceSpec {
instance_id: "api-node01".to_string(),
service: "api".to_string(),
node_id: "node01".to_string(),
ip: "10.0.0.1".to_string(),
port: 8080,
mesh_port: Some(18080),
version: None,
health_check: None,
process: schedule.process.clone(),
container: None,
managed_by: Some(MANAGED_BY.to_string()),
state: Some("healthy".to_string()),
last_heartbeat: Some(Utc::now() - chrono::Duration::seconds(600)),
observed_at: None,
};
let existing = HashMap::from([(
existing_instance.instance_id.clone(),
serde_json::to_value(&existing_instance).unwrap(),
)]);
let desired_instances = vec![ServiceInstanceSpec {
instance_id: "api-node02".to_string(),
node_id: "node02".to_string(),
ip: "10.0.0.2".to_string(),
last_heartbeat: Some(Utc::now()),
..existing_instance.clone()
}];
let plan = plan_managed_reconciliation(
&service,
&desired_instances,
&existing,
std::slice::from_ref(&existing_instance),
300,
)
.unwrap();
assert_eq!(plan.upserts.len(), 1);
assert_eq!(plan.deferred_updates, 0);
assert_eq!(plan.deferred_deletes, 0);
assert_eq!(plan.deletes, vec!["api-node01".to_string()]);
}
#[test]
fn test_plan_reconciliation_for_daemon_uses_current_eligible_node_count() {
let mut service = scheduled_service();
let schedule = service.schedule.as_mut().unwrap();
schedule.mode = ServiceScheduleMode::Daemon;
schedule.replicas = 99;
schedule.rollout = RolloutStrategySpec {
max_unavailable: 0,
max_surge: 0,
};
let existing_instances = vec![
ServiceInstanceSpec {
instance_id: "api-node01".to_string(),
service: "api".to_string(),
node_id: "node01".to_string(),
ip: "10.0.0.1".to_string(),
port: 8080,
mesh_port: Some(18080),
version: None,
health_check: None,
process: schedule.process.clone(),
container: None,
managed_by: Some(MANAGED_BY.to_string()),
state: Some("healthy".to_string()),
last_heartbeat: None,
observed_at: None,
},
ServiceInstanceSpec {
instance_id: "api-node02".to_string(),
service: "api".to_string(),
node_id: "node02".to_string(),
ip: "10.0.0.2".to_string(),
port: 8080,
mesh_port: Some(18080),
version: None,
health_check: None,
process: schedule.process.clone(),
container: None,
managed_by: Some(MANAGED_BY.to_string()),
state: Some("healthy".to_string()),
last_heartbeat: None,
observed_at: None,
},
];
let existing = existing_instances
.iter()
.map(|instance| {
(
instance.instance_id.clone(),
serde_json::to_value(instance).unwrap(),
)
})
.collect::<HashMap<_, _>>();
let desired_instances = vec![existing_instances[1].clone()];
let plan = plan_managed_reconciliation(
&service,
&desired_instances,
&existing,
&existing_instances,
0,
)
.unwrap();
assert!(plan.upserts.is_empty());
assert_eq!(plan.deletes, vec!["api-node01".to_string()]);
assert_eq!(plan.deferred_deletes, 0);
}
#[test]
fn test_merge_preserved_fields_replaces_null_status_fields() {
let desired = serde_json::json!({
"instance_id": "api-node01",
"state": null,
"last_heartbeat": null,
"observed_at": null,
});
let existing = serde_json::json!({
"instance_id": "api-node01",
"state": "healthy",
"last_heartbeat": "2026-03-11T06:59:50Z",
"observed_at": "2026-03-11T06:59:51Z",
});
let merged = merge_preserved_fields(desired, Some(&existing));
assert_eq!(merged.get("state").and_then(Value::as_str), Some("healthy"));
assert_eq!(
merged.get("last_heartbeat").and_then(Value::as_str),
Some("2026-03-11T06:59:50Z")
);
assert_eq!(
merged.get("observed_at").and_then(Value::as_str),
Some("2026-03-11T06:59:51Z")
);
}
#[test]
fn test_render_next_instance_id_skips_used_ordinals() {
let mut used =
BTreeMap::from([("node01".to_string(), HashSet::from([0_u32, 1_u32, 3_u32]))]);
let instance_id = render_next_instance_id("api", "node01", &mut used);
assert_eq!(instance_id, "api-node01-3");
}
}