From aba1b73d5beb31505b7549d45e70e4694959254a Mon Sep 17 00:00:00 2001 From: centra Date: Tue, 31 Mar 2026 10:03:40 +0900 Subject: [PATCH] Tighten cluster builds and add record-set DNS publication --- .cargo/config.toml | 2 + .github/workflows/nix.yml | 11 +- Makefile | 6 +- deployer/crates/deployer-types/src/lib.rs | 10 + .../crates/fleet-scheduler/src/publish.rs | 321 +++++++++++++----- flake.nix | 290 +++++++++++++--- .../crates/flashdns-server/src/metadata.rs | 136 ++++++-- iam/crates/iam-server/src/rest.rs | 6 +- .../crates/nightlight-server/src/query.rs | 42 +++ .../crates/nightlight-server/src/storage.rs | 1 + nix-nos/lib/cluster-config-lib.nix | 2 +- nix/ci/flake.nix | 70 +++- nix/ci/workspaces.json | 1 - nix/test-cluster/common.nix | 8 + nix/test-cluster/node01.nix | 1 + nix/test-cluster/node02.nix | 1 + nix/test-cluster/node03.nix | 1 + nix/test-cluster/run-cluster.sh | 74 +++- nix/test-cluster/storage-node01.nix | 1 + nix/test-cluster/storage-node02.nix | 1 + nix/test-cluster/storage-node03.nix | 1 + scripts/ci_changed_workspaces.py | 140 +++++++- 22 files changed, 925 insertions(+), 201 deletions(-) create mode 100644 .cargo/config.toml diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 0000000..013d047 --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,2 @@ +[build] +target-dir = "work/cargo-target" diff --git a/.github/workflows/nix.yml b/.github/workflows/nix.yml index 9f87315..1770d79 100644 --- a/.github/workflows/nix.yml +++ b/.github/workflows/nix.yml @@ -17,6 +17,7 @@ jobs: any_changed: ${{ steps.detect.outputs.any_changed }} build_changed: ${{ steps.detect.outputs.build_changed }} global_changed: ${{ steps.detect.outputs.global_changed }} + shared_crates: ${{ steps.detect.outputs.shared_crates }} shared_crates_changed: ${{ steps.detect.outputs.shared_crates_changed }} steps: - uses: actions/checkout@v4 @@ -81,15 +82,19 @@ jobs: needs: filter if: ${{ needs.filter.outputs.shared_crates_changed == 'true' }} runs-on: ubuntu-latest - name: gate (shared crates) + strategy: + fail-fast: false + matrix: + crate: ${{ fromJSON(needs.filter.outputs.shared_crates) }} + name: gate (shared crate: ${{ matrix.crate }}) steps: - uses: actions/checkout@v4 - uses: DeterminateSystems/nix-installer-action@v11 - uses: DeterminateSystems/magic-nix-cache-action@v8 - - name: Run Shared Crates Gate + - name: Run Shared Crate Gate run: | - nix run ./nix/ci#gate-ci -- --shared-crates --tier 0 --no-logs + nix run ./nix/ci#gate-ci -- --shared-crate ${{ matrix.crate }} --tier 0 --no-logs # Build server packages (tier 1+) build: diff --git a/Makefile b/Makefile index 6f994ac..a880eba 100644 --- a/Makefile +++ b/Makefile @@ -3,9 +3,11 @@ .PHONY: all build cluster-up cluster-down cluster-status cluster-validate cluster-smoke cluster-matrix cluster-bench-storage clean -# Build all services (using Nix) +PACKAGE ?= default + +# Build a single package by default; set PACKAGE=default to build the full bundle. build: - nix build .#packages.x86_64-linux.default + nix build .#$(PACKAGE) # Cluster Management cluster-up: diff --git a/deployer/crates/deployer-types/src/lib.rs b/deployer/crates/deployer-types/src/lib.rs index 0f7bf75..ae5c125 100644 --- a/deployer/crates/deployer-types/src/lib.rs +++ b/deployer/crates/deployer-types/src/lib.rs @@ -496,6 +496,8 @@ pub enum DnsPublishMode { LoadBalancer, /// Publish the first healthy instance IP directly. Direct, + /// Publish all healthy instance IPs directly as a DNS record set. + DirectMulti, } /// Desired DNS publication for a service. @@ -894,9 +896,15 @@ pub struct PublishedLoadBalancerState { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct PublishedDnsRecordState { pub zone_id: String, + #[serde(default)] pub record_id: String, + #[serde(default)] + pub record_ids: Vec, pub fqdn: String, + #[serde(default)] pub value: String, + #[serde(default)] + pub values: Vec, } /// Observed publication state stored separately from ServiceSpec desired state. @@ -1215,8 +1223,10 @@ mod tests { dns: Some(PublishedDnsRecordState { zone_id: "zone-1".to_string(), record_id: "record-1".to_string(), + record_ids: vec!["record-1".to_string()], fqdn: "api.test.cluster.local".to_string(), value: "10.0.0.50".to_string(), + values: vec!["10.0.0.50".to_string()], }), observed_at: None, }; diff --git a/deployer/crates/fleet-scheduler/src/publish.rs b/deployer/crates/fleet-scheduler/src/publish.rs index be41e0f..9012585 100644 --- a/deployer/crates/fleet-scheduler/src/publish.rs +++ b/deployer/crates/fleet-scheduler/src/publish.rs @@ -1,4 +1,4 @@ -use std::collections::{HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet}; use anyhow::{Context, Result}; use chainfire_client::Client; @@ -354,24 +354,33 @@ impl PublicationController { return Ok(existing.cloned()); }; - let Some(value) = desired_dns_value(spec, healthy_instances, load_balancer) else { + let desired_values = desired_dns_values(spec, healthy_instances, load_balancer); + if desired_values.is_empty() { if let Some(existing) = existing { self.cleanup_dns(auth_token, existing).await?; } return Ok(None); - }; + } let zone_name = normalize_zone_name(&spec.zone); let record_name = record_name_for_service(spec, service); let fqdn = format!("{}.{}", record_name, zone_name); + let primary_value = desired_values.first().cloned().unwrap_or_default(); if self.config.dry_run { - info!(service = %service.name, fqdn = %fqdn, value = %value, "would reconcile native DNS record"); + info!( + service = %service.name, + fqdn = %fqdn, + values = ?desired_values, + "would reconcile native DNS record set" + ); return Ok(existing.cloned().or(Some(PublishedDnsRecordState { zone_id: String::new(), record_id: String::new(), + record_ids: Vec::new(), fqdn, - value, + value: primary_value, + values: desired_values, }))); } @@ -380,22 +389,28 @@ impl PublicationController { let zone = ensure_zone(&mut zone_client, auth_token, &zone_name, org_id, project_id).await?; - let record = ensure_record( + let records = ensure_records( &mut record_client, auth_token, existing, &zone.id, &record_name, spec.ttl, - &value, + &desired_values, ) .await?; + let record_ids = records + .iter() + .map(|record| record.id.clone()) + .collect::>(); Ok(Some(PublishedDnsRecordState { zone_id: zone.id, - record_id: record.id, + record_id: record_ids.first().cloned().unwrap_or_default(), + record_ids, fqdn, - value, + value: primary_value, + values: desired_values, })) } @@ -408,18 +423,25 @@ impl PublicationController { return Ok(()); }; let mut record_client = RecordServiceClient::connect(endpoint.clone()).await?; - match record_client - .delete_record(authorized_request( - DeleteRecordRequest { - id: dns_state.record_id.clone(), - }, - auth_token, - )) - .await - { - Ok(_) => {} - Err(status) if status.code() == Code::NotFound => {} - Err(status) => return Err(status.into()), + let mut record_ids = dns_state.record_ids.clone(); + if record_ids.is_empty() && !dns_state.record_id.is_empty() { + record_ids.push(dns_state.record_id.clone()); + } + record_ids.sort(); + record_ids.dedup(); + + for record_id in record_ids { + match record_client + .delete_record(authorized_request( + DeleteRecordRequest { id: record_id }, + auth_token, + )) + .await + { + Ok(_) => {} + Err(status) if status.code() == Code::NotFound => {} + Err(status) => return Err(status.into()), + } } Ok(()) } @@ -812,15 +834,15 @@ async fn ensure_zone( .context("FlashDNS returned empty CreateZone response")?) } -async fn ensure_record( +async fn ensure_records( client: &mut RecordServiceClient, auth_token: &str, existing: Option<&PublishedDnsRecordState>, zone_id: &str, name: &str, ttl: u32, - value: &str, -) -> Result { + desired_values: &[String], +) -> Result> { let records = client .list_records(authorized_request( ListRecordsRequest { @@ -839,73 +861,96 @@ async fn ensure_record( let mut matching = records .iter() .filter(|record| { - existing.map(|state| state.record_id.as_str()) == Some(record.id.as_str()) - || record.name == name + record.name == name + || existing.map(|state| state.record_id.as_str()) == Some(record.id.as_str()) + || existing + .map(|state| state.record_ids.iter().any(|id| id == &record.id)) + .unwrap_or(false) }) .cloned() .collect::>(); + matching.sort_by(|lhs, rhs| { + record_a_value(lhs) + .cmp(&record_a_value(rhs)) + .then_with(|| lhs.id.cmp(&rhs.id)) + }); - if let Some(record) = matching.first().cloned() { - let record_value = record - .data - .as_ref() - .and_then(|data| data.data.as_ref()) - .and_then(|data| match data { - record_data::Data::A(record) => Some(record.address.clone()), - _ => None, - }); + let mut records_by_value: BTreeMap> = BTreeMap::new(); + for record in matching { + let Some(value) = record_a_value(&record) else { + continue; + }; + records_by_value.entry(value).or_default().push(record); + } - if record_value.as_deref() != Some(value) || record.ttl != ttl { - let updated = client - .update_record(authorized_request( - UpdateRecordRequest { - id: record.id.clone(), - ttl: Some(ttl), + let mut ensured = Vec::new(); + for desired_value in desired_values { + if let Some(record) = records_by_value.get_mut(desired_value).and_then(|records| { + if records.is_empty() { + None + } else { + Some(records.remove(0)) + } + }) { + if record.ttl != ttl || !record.enabled { + let updated = client + .update_record(authorized_request( + UpdateRecordRequest { + id: record.id.clone(), + ttl: Some(ttl), + data: Some(RecordData { + data: Some(record_data::Data::A(ARecord { + address: desired_value.to_string(), + })), + }), + enabled: Some(true), + }, + auth_token, + )) + .await? + .into_inner() + .record + .context("FlashDNS returned empty UpdateRecord response")?; + ensured.push(updated); + } else { + ensured.push(record); + } + continue; + } + + ensured.push( + client + .create_record(authorized_request( + CreateRecordRequest { + zone_id: zone_id.to_string(), + name: name.to_string(), + record_type: "A".to_string(), + ttl, data: Some(RecordData { data: Some(record_data::Data::A(ARecord { - address: value.to_string(), + address: desired_value.to_string(), })), }), - enabled: Some(true), }, auth_token, )) .await? .into_inner() .record - .context("FlashDNS returned empty UpdateRecord response")?; - matching.remove(0); - for extra in matching { - delete_record(client, auth_token, &extra.id).await?; - } - return Ok(updated); - } - - for extra in matching.into_iter().skip(1) { - delete_record(client, auth_token, &extra.id).await?; - } - return Ok(record); + .context("FlashDNS returned empty CreateRecord response")?, + ); } - Ok(client - .create_record(authorized_request( - CreateRecordRequest { - zone_id: zone_id.to_string(), - name: name.to_string(), - record_type: "A".to_string(), - ttl, - data: Some(RecordData { - data: Some(record_data::Data::A(ARecord { - address: value.to_string(), - })), - }), - }, - auth_token, - )) - .await? - .into_inner() - .record - .context("FlashDNS returned empty CreateRecord response")?) + for extra in records_by_value.into_values().flatten() { + delete_record(client, auth_token, &extra.id).await?; + } + + ensured.sort_by(|lhs, rhs| { + record_a_value(lhs) + .cmp(&record_a_value(rhs)) + .then_with(|| lhs.id.cmp(&rhs.id)) + }); + Ok(ensured) } async fn delete_record( @@ -940,23 +985,54 @@ fn resolve_target_port(service: &ServiceSpec, spec: &LoadBalancerPublicationSpec .or_else(|| service.ports.as_ref().and_then(|ports| ports.grpc)) } -fn desired_dns_value( +fn record_a_value(record: &RecordInfo) -> Option { + record + .data + .as_ref() + .and_then(|data| data.data.as_ref()) + .and_then(|data| match data { + record_data::Data::A(record) => Some(record.address.clone()), + _ => None, + }) +} + +fn normalize_dns_values(values: impl IntoIterator) -> Vec { + let mut values = values + .into_iter() + .map(|value| value.trim().to_string()) + .filter(|value| !value.is_empty()) + .collect::>(); + values.sort(); + values.dedup(); + values +} + +fn desired_dns_values( spec: &DnsPublicationSpec, healthy_instances: &[ServiceInstanceSpec], load_balancer: Option<&PublishedLoadBalancerState>, -) -> Option { +) -> Vec { match spec.mode { - DnsPublishMode::LoadBalancer => load_balancer - .and_then(|state| state.vip_address.clone()) - .filter(|value| !value.is_empty() && value != "0.0.0.0") - .or_else(|| { - healthy_instances - .first() - .map(|instance| instance.ip.clone()) - }), - DnsPublishMode::Direct => healthy_instances - .first() - .map(|instance| instance.ip.clone()), + DnsPublishMode::LoadBalancer => normalize_dns_values( + load_balancer + .and_then(|state| state.vip_address.clone()) + .filter(|value| !value.is_empty() && value != "0.0.0.0") + .or_else(|| { + healthy_instances + .first() + .map(|instance| instance.ip.clone()) + }) + .into_iter(), + ), + DnsPublishMode::Direct => normalize_dns_values( + healthy_instances + .first() + .map(|instance| instance.ip.clone()) + .into_iter(), + ), + DnsPublishMode::DirectMulti => { + normalize_dns_values(healthy_instances.iter().map(|instance| instance.ip.clone())) + } } } @@ -1153,7 +1229,7 @@ mod tests { } #[test] - fn test_dns_value_falls_back_to_healthy_instance_when_vip_missing() { + fn test_dns_values_fall_back_to_healthy_instance_when_vip_missing() { let spec = DnsPublicationSpec { zone: "native.cluster.test".to_string(), name: Some("api".to_string()), @@ -1178,8 +1254,73 @@ mod tests { }]; assert_eq!( - desired_dns_value(&spec, &instances, None).as_deref(), - Some("10.0.0.11") + desired_dns_values(&spec, &instances, None), + vec!["10.0.0.11".to_string()] + ); + } + + #[test] + fn test_direct_multi_dns_publishes_all_healthy_instance_ips() { + let spec = DnsPublicationSpec { + zone: "native.cluster.test".to_string(), + name: Some("daemon".to_string()), + ttl: 60, + mode: DnsPublishMode::DirectMulti, + }; + let instances = vec![ + ServiceInstanceSpec { + instance_id: "daemon-node02".to_string(), + service: "daemon".to_string(), + node_id: "node02".to_string(), + ip: "10.0.0.12".to_string(), + port: 8080, + mesh_port: None, + version: None, + health_check: None, + process: None, + container: None, + managed_by: None, + state: Some("healthy".to_string()), + last_heartbeat: None, + observed_at: None, + }, + ServiceInstanceSpec { + instance_id: "daemon-node01".to_string(), + service: "daemon".to_string(), + node_id: "node01".to_string(), + ip: "10.0.0.11".to_string(), + port: 8080, + mesh_port: None, + version: None, + health_check: None, + process: None, + container: None, + managed_by: None, + state: Some("healthy".to_string()), + last_heartbeat: None, + observed_at: None, + }, + ServiceInstanceSpec { + instance_id: "daemon-node03".to_string(), + service: "daemon".to_string(), + node_id: "node03".to_string(), + ip: "10.0.0.11".to_string(), + port: 8080, + mesh_port: None, + version: None, + health_check: None, + process: None, + container: None, + managed_by: None, + state: Some("healthy".to_string()), + last_heartbeat: None, + observed_at: None, + }, + ]; + + assert_eq!( + desired_dns_values(&spec, &instances, None), + vec!["10.0.0.11".to_string(), "10.0.0.12".to_string()] ); } diff --git a/flake.nix b/flake.nix index 34aad00..3a42ff4 100644 --- a/flake.nix +++ b/flake.nix @@ -71,37 +71,160 @@ clusterPython = pkgs.python3.withPackages (ps: [ ps.python-snappy ]); - # Keep package builds stable even when docs or archived assets change. - repoSrc = pkgs.lib.cleanSourceWith { - src = ./.; - filter = path: type: - let - rel = pkgs.lib.removePrefix ((toString ./. ) + "/") (toString path); - topLevel = builtins.head (pkgs.lib.splitString "/" rel); - includedTopLevels = [ - "apigateway" - "chainfire" - "coronafs" - "crates" - "creditservice" - "deployer" - "fiberlb" - "flashdns" - "flaredb" - "iam" - "k8shost" - "lightningstor" - "mtls-agent" - "nightlight" - "plasmavmc" - "prismnet" - ]; - in - rel == "" - || builtins.elem rel [ "flake.nix" "flake.lock" ] - || builtins.elem topLevel includedTopLevels; + # Keep Rust package builds stable without invalidating every package on + # unrelated workspace changes. + workspaceSourceRoots = { + chainfire = [ "chainfire" ]; + flaredb = [ "flaredb" ]; + iam = [ + "apigateway" + "chainfire" + "creditservice" + "crates/photon-auth-client" + "crates/photon-config" + "crates/photon-runtime" + "crates/photon-state" + "flaredb" + "iam" + ]; + coronafs = [ "coronafs" ]; + plasmavmc = [ + "apigateway" + "chainfire" + "creditservice" + "crates/photon-auth-client" + "crates/photon-config" + "crates/photon-runtime" + "crates/photon-state" + "flaredb" + "iam" + "lightningstor" + "plasmavmc" + "prismnet" + ]; + prismnet = [ + "apigateway" + "chainfire" + "creditservice" + "crates/photon-auth-client" + "crates/photon-config" + "crates/photon-runtime" + "crates/photon-state" + "flaredb" + "iam" + "prismnet" + ]; + flashdns = [ + "apigateway" + "chainfire" + "creditservice" + "crates/photon-auth-client" + "crates/photon-config" + "crates/photon-runtime" + "crates/photon-state" + "flashdns" + "flaredb" + "iam" + ]; + fiberlb = [ + "apigateway" + "chainfire" + "creditservice" + "crates/photon-auth-client" + "crates/photon-config" + "crates/photon-runtime" + "crates/photon-state" + "fiberlb" + "flaredb" + "iam" + ]; + lightningstor = [ + "apigateway" + "chainfire" + "creditservice" + "crates/photon-auth-client" + "crates/photon-config" + "crates/photon-runtime" + "crates/photon-state" + "flaredb" + "iam" + "lightningstor" + ]; + nightlight = [ "nightlight" ]; + creditservice = [ + "apigateway" + "chainfire" + "creditservice" + "crates/photon-auth-client" + "crates/photon-config" + "crates/photon-runtime" + "crates/photon-state" + "flaredb" + "iam" + ]; + apigateway = [ + "apigateway" + "chainfire" + "creditservice" + "crates/photon-auth-client" + "crates/photon-config" + "crates/photon-runtime" + "crates/photon-state" + "flaredb" + "iam" + ]; + k8shost = [ + "apigateway" + "chainfire" + "creditservice" + "crates/photon-auth-client" + "crates/photon-config" + "crates/photon-runtime" + "crates/photon-state" + "fiberlb" + "flaredb" + "flashdns" + "iam" + "k8shost" + "lightningstor" + "plasmavmc" + "prismnet" + ]; + deployer = [ + "apigateway" + "chainfire" + "creditservice" + "crates/photon-auth-client" + "crates/photon-config" + "crates/photon-runtime" + "crates/photon-state" + "deployer" + "fiberlb" + "flaredb" + "flashdns" + "iam" + ]; }; + mkWorkspaceSrc = workspaceSubdir: + let + sourceRoots = workspaceSourceRoots.${workspaceSubdir} or [ workspaceSubdir ]; + in + pkgs.lib.cleanSourceWith { + src = ./.; + filter = path: type: + let + rel = pkgs.lib.removePrefix ((toString ./. ) + "/") (toString path); + in + rel == "" + || builtins.elem rel [ "flake.nix" "flake.lock" ] + || builtins.any (root: + rel == root + || pkgs.lib.hasPrefix "${root}/" rel + || pkgs.lib.hasPrefix "${rel}/" root + ) sourceRoots; + }; + flakeBundleSrc = pkgs.lib.cleanSourceWith { src = ./.; filter = path: type: @@ -440,12 +563,12 @@ # workspaceSubdir: subdirectory containing Cargo.toml (e.g., "chainfire") # mainCrate: optional main crate name if different from workspace # description: package description for meta - # doCheck: whether to run tests during build (default: true) - buildRustWorkspace = { name, workspaceSubdir, mainCrate ? null, description ? "", doCheck ? true }: + # doCheck: whether to run tests during build (default: false) + buildRustWorkspace = { name, workspaceSubdir, mainCrate ? null, description ? "", doCheck ? false }: pkgs.rustPlatform.buildRustPackage ({ pname = name; version = "0.1.0"; - src = repoSrc; + src = mkWorkspaceSrc workspaceSubdir; cargoLock = { lockFile = ./${workspaceSubdir}/Cargo.lock; @@ -486,6 +609,43 @@ cargoBuildFlags = [ "-p" mainCrate ]; }); + # Helper function to build multiple binaries from the same workspace in + # one cargo invocation. This is mainly used by the VM cluster builds so + # a single host build can satisfy several services from the same + # workspace. + buildRustWorkspaceBundle = { name, workspaceSubdir, crates, description ? "", doCheck ? false }: + pkgs.rustPlatform.buildRustPackage { + pname = name; + version = "0.1.0"; + src = mkWorkspaceSrc workspaceSubdir; + + cargoLock = { + lockFile = ./${workspaceSubdir}/Cargo.lock; + }; + + buildAndTestSubdir = workspaceSubdir; + + postUnpack = '' + cp $sourceRoot/${workspaceSubdir}/Cargo.lock $sourceRoot/Cargo.lock + ''; + + nativeBuildInputs = commonNativeBuildInputs; + buildInputs = commonBuildInputs; + + inherit (commonEnvVars) LIBCLANG_PATH PROTOC ROCKSDB_LIB_DIR; + inherit doCheck; + + cargoBuildFlags = pkgs.lib.concatMap (crate: [ "-p" crate ]) crates; + + meta = with pkgs.lib; { + description = description; + homepage = "https://github.com/yourorg/plasmacloud"; + license = licenses.asl20; + maintainers = [ ]; + platforms = platforms.linux; + }; + }; + in { # ====================================================================== @@ -658,6 +818,16 @@ description = "LightningStor distributed storage node daemon"; }; + lightningstor-workspace = buildRustWorkspaceBundle { + name = "lightningstor-workspace"; + workspaceSubdir = "lightningstor"; + crates = [ + "lightningstor-server" + "lightningstor-node" + ]; + description = "Combined LightningStor server and node workspace build"; + }; + # -------------------------------------------------------------------- # NightLight: Prometheus-compatible Metrics Store # -------------------------------------------------------------------- @@ -768,6 +938,20 @@ description = "Label-aware service scheduler for PhotonCloud bare-metal fleets"; }; + deployer-workspace = buildRustWorkspaceBundle { + name = "deployer-workspace"; + workspaceSubdir = "deployer"; + crates = [ + "deployer-server" + "deployer-ctl" + "node-agent" + "nix-agent" + "plasmacloud-reconciler" + "fleet-scheduler" + ]; + description = "Combined deployer workspace build for cluster images and checks"; + }; + vmClusterDeployerState = self.nixosConfigurations.node01.config.system.build.plasmacloudDeployerClusterState; @@ -787,18 +971,12 @@ self.packages.${system}.prismnet-server self.packages.${system}.flashdns-server self.packages.${system}.fiberlb-server - self.packages.${system}.lightningstor-server - self.packages.${system}.lightningstor-node + self.packages.${system}.lightningstor-workspace self.packages.${system}.nightlight-server self.packages.${system}.creditservice-server self.packages.${system}.apigateway-server self.packages.${system}.k8shost-server - self.packages.${system}.deployer-server - self.packages.${system}.deployer-ctl - self.packages.${system}.plasmacloud-reconciler - self.packages.${system}.nix-agent - self.packages.${system}.node-agent - self.packages.${system}.fleet-scheduler + self.packages.${system}.deployer-workspace self.packages.${system}.vmClusterDeployerState ]; }; @@ -970,9 +1148,9 @@ PHOTONCLOUD_CHAINFIRE_SERVER_BIN = "${self.packages.${system}.chainfire-server}/bin/chainfire"; PHOTONCLOUD_DEPLOYER_SERVER_BIN = - "${self.packages.${system}.deployer-server}/bin/deployer-server"; + "${self.packages.${system}.deployer-workspace}/bin/deployer-server"; PHOTONCLOUD_DEPLOYER_CTL_BIN = - "${self.packages.${system}.deployer-ctl}/bin/deployer-ctl"; + "${self.packages.${system}.deployer-workspace}/bin/deployer-ctl"; } '' export HOME="$TMPDIR/home" mkdir -p "$HOME" @@ -1007,9 +1185,9 @@ PHOTONCLOUD_CHAINFIRE_SERVER_BIN = "${self.packages.${system}.chainfire-server}/bin/chainfire"; PHOTONCLOUD_DEPLOYER_CTL_BIN = - "${self.packages.${system}.deployer-ctl}/bin/deployer-ctl"; + "${self.packages.${system}.deployer-workspace}/bin/deployer-ctl"; PHOTONCLOUD_PLASMACLOUD_RECONCILER_BIN = - "${self.packages.${system}.plasmacloud-reconciler}/bin/plasmacloud-reconciler"; + "${self.packages.${system}.deployer-workspace}/bin/plasmacloud-reconciler"; } '' export HOME="$TMPDIR/home" mkdir -p "$HOME" @@ -1044,11 +1222,11 @@ PHOTONCLOUD_CHAINFIRE_SERVER_BIN = "${self.packages.${system}.chainfire-server}/bin/chainfire"; PHOTONCLOUD_DEPLOYER_CTL_BIN = - "${self.packages.${system}.deployer-ctl}/bin/deployer-ctl"; + "${self.packages.${system}.deployer-workspace}/bin/deployer-ctl"; PHOTONCLOUD_NODE_AGENT_BIN = - "${self.packages.${system}.node-agent}/bin/node-agent"; + "${self.packages.${system}.deployer-workspace}/bin/node-agent"; PHOTONCLOUD_FLEET_SCHEDULER_BIN = - "${self.packages.${system}.fleet-scheduler}/bin/fleet-scheduler"; + "${self.packages.${system}.deployer-workspace}/bin/fleet-scheduler"; } '' export HOME="$TMPDIR/home" mkdir -p "$HOME" @@ -1229,19 +1407,21 @@ prismnet-server = self.packages.${final.system}.prismnet-server; flashdns-server = self.packages.${final.system}.flashdns-server; fiberlb-server = self.packages.${final.system}.fiberlb-server; - lightningstor-server = self.packages.${final.system}.lightningstor-server; - lightningstor-node = self.packages.${final.system}.lightningstor-node; + lightningstor-workspace = self.packages.${final.system}.lightningstor-workspace; + lightningstor-server = self.packages.${final.system}.lightningstor-workspace; + lightningstor-node = self.packages.${final.system}.lightningstor-workspace; nightlight-server = self.packages.${final.system}.nightlight-server; creditservice-server = self.packages.${final.system}.creditservice-server; apigateway-server = self.packages.${final.system}.apigateway-server; k8shost-server = self.packages.${final.system}.k8shost-server; - deployer-server = self.packages.${final.system}.deployer-server; - deployer-ctl = self.packages.${final.system}.deployer-ctl; - plasmacloud-reconciler = self.packages.${final.system}.plasmacloud-reconciler; + deployer-workspace = self.packages.${final.system}.deployer-workspace; + deployer-server = self.packages.${final.system}.deployer-workspace; + deployer-ctl = self.packages.${final.system}.deployer-workspace; + plasmacloud-reconciler = self.packages.${final.system}.deployer-workspace; plasmacloudFlakeBundle = self.packages.${final.system}.plasmacloudFlakeBundle; - nix-agent = self.packages.${final.system}.nix-agent; - node-agent = self.packages.${final.system}.node-agent; - fleet-scheduler = self.packages.${final.system}.fleet-scheduler; + nix-agent = self.packages.${final.system}.deployer-workspace; + node-agent = self.packages.${final.system}.deployer-workspace; + fleet-scheduler = self.packages.${final.system}.deployer-workspace; }; }; } diff --git a/flashdns/crates/flashdns-server/src/metadata.rs b/flashdns/crates/flashdns-server/src/metadata.rs index 3a5688d..fb9eff3 100644 --- a/flashdns/crates/flashdns-server/src/metadata.rs +++ b/flashdns/crates/flashdns-server/src/metadata.rs @@ -152,7 +152,9 @@ impl DnsMetadataStore { ) .execute(pool) .await - .map_err(|e| MetadataError::Storage(format!("Failed to initialize Postgres schema: {}", e)))?; + .map_err(|e| { + MetadataError::Storage(format!("Failed to initialize Postgres schema: {}", e)) + })?; Ok(()) } @@ -165,7 +167,9 @@ impl DnsMetadataStore { ) .execute(pool) .await - .map_err(|e| MetadataError::Storage(format!("Failed to initialize SQLite schema: {}", e)))?; + .map_err(|e| { + MetadataError::Storage(format!("Failed to initialize SQLite schema: {}", e)) + })?; Ok(()) } @@ -192,9 +196,7 @@ impl DnsMetadataStore { .bind(value) .execute(pool.as_ref()) .await - .map_err(|e| { - MetadataError::Storage(format!("Postgres put failed: {}", e)) - })?; + .map_err(|e| MetadataError::Storage(format!("Postgres put failed: {}", e)))?; } SqlStorageBackend::Sqlite(pool) => { sqlx::query( @@ -395,10 +397,15 @@ impl DnsMetadataStore { format!("/flashdns/zone_ids/{}", zone_id) } - fn record_key(zone_id: &ZoneId, record_name: &str, record_type: RecordType) -> String { + fn record_key( + zone_id: &ZoneId, + record_name: &str, + record_type: RecordType, + record_id: &RecordId, + ) -> String { format!( - "/flashdns/records/{}/{}/{}", - zone_id, record_name, record_type + "/flashdns/records/{}/{}/{}/{}", + zone_id, record_name, record_type, record_id ) } @@ -406,6 +413,20 @@ impl DnsMetadataStore { format!("/flashdns/records/{}/", zone_id) } + fn record_type_prefix(zone_id: &ZoneId, record_name: &str, record_type: RecordType) -> String { + format!( + "/flashdns/records/{}/{}/{}/", + zone_id, record_name, record_type + ) + } + + fn legacy_record_key(zone_id: &ZoneId, record_name: &str, record_type: RecordType) -> String { + format!( + "/flashdns/records/{}/{}/{}", + zone_id, record_name, record_type + ) + } + fn record_id_key(record_id: &RecordId) -> String { format!("/flashdns/record_ids/{}", record_id) } @@ -521,7 +542,18 @@ impl DnsMetadataStore { /// Save record pub async fn save_record(&self, record: &Record) -> Result<()> { - let key = Self::record_key(&record.zone_id, &record.name, record.record_type); + let key = Self::record_key( + &record.zone_id, + &record.name, + record.record_type, + &record.id, + ); + let id_key = Self::record_id_key(&record.id); + if let Some(existing_key) = self.get(&id_key).await? { + if existing_key != key { + self.delete_key(&existing_key).await?; + } + } let value = serde_json::to_string(record).map_err(|e| { MetadataError::Serialization(format!("Failed to serialize record: {}", e)) })?; @@ -529,29 +561,40 @@ impl DnsMetadataStore { self.put(&key, &value).await?; // Also save record ID mapping - let id_key = Self::record_id_key(&record.id); self.put(&id_key, &key).await?; Ok(()) } - /// Load record by name and type + /// Load the first record by name and type, preserving compatibility with + /// older single-record keys. pub async fn load_record( &self, zone_id: &ZoneId, record_name: &str, record_type: RecordType, ) -> Result> { - let key = Self::record_key(zone_id, record_name, record_type); + let prefix = Self::record_type_prefix(zone_id, record_name, record_type); + let mut records = self + .get_prefix(&prefix) + .await? + .into_iter() + .filter_map(|(_, value)| serde_json::from_str::(&value).ok()) + .collect::>(); - if let Some(value) = self.get(&key).await? { - let record: Record = serde_json::from_str(&value).map_err(|e| { - MetadataError::Serialization(format!("Failed to deserialize record: {}", e)) - })?; - Ok(Some(record)) - } else { - Ok(None) + if records.is_empty() { + let legacy_key = Self::legacy_record_key(zone_id, record_name, record_type); + if let Some(value) = self.get(&legacy_key).await? { + let record: Record = serde_json::from_str(&value).map_err(|e| { + MetadataError::Serialization(format!("Failed to deserialize record: {}", e)) + })?; + return Ok(Some(record)); + } + return Ok(None); } + + records.sort_by(|lhs, rhs| lhs.id.to_string().cmp(&rhs.id.to_string())); + Ok(records.into_iter().next()) } /// Load record by ID @@ -574,10 +617,24 @@ impl DnsMetadataStore { /// Delete record pub async fn delete_record(&self, record: &Record) -> Result<()> { - let key = Self::record_key(&record.zone_id, &record.name, record.record_type); let id_key = Self::record_id_key(&record.id); - - self.delete_key(&key).await?; + if let Some(key) = self.get(&id_key).await? { + self.delete_key(&key).await?; + } else { + self.delete_key(&Self::record_key( + &record.zone_id, + &record.name, + record.record_type, + &record.id, + )) + .await?; + self.delete_key(&Self::legacy_record_key( + &record.zone_id, + &record.name, + record.record_type, + )) + .await?; + } self.delete_key(&id_key).await?; Ok(()) @@ -601,6 +658,7 @@ impl DnsMetadataStore { a.name .cmp(&b.name) .then(a.record_type.type_code().cmp(&b.record_type.type_code())) + .then(a.id.to_string().cmp(&b.id.to_string())) }); Ok(records) @@ -829,4 +887,38 @@ mod tests { .unwrap(); assert!(deleted.is_none()); } + + #[tokio::test] + async fn test_record_set_supports_multiple_records_with_same_name_and_type() { + let store = DnsMetadataStore::new_in_memory(); + + let zone_name = ZoneName::new("example.com").unwrap(); + let zone = Zone::new(zone_name, "test-org", "test-project"); + store.save_zone(&zone).await.unwrap(); + + let first = Record::new( + zone.id, + "api", + RecordData::a_from_str("192.168.1.10").unwrap(), + ); + let second = Record::new( + zone.id, + "api", + RecordData::a_from_str("192.168.1.11").unwrap(), + ); + + store.save_record(&first).await.unwrap(); + store.save_record(&second).await.unwrap(); + + let records = store.list_records_by_name(&zone.id, "api").await.unwrap(); + assert_eq!(records.len(), 2); + assert!(records.iter().any(|record| record.id == first.id)); + assert!(records.iter().any(|record| record.id == second.id)); + + store.delete_record(&first).await.unwrap(); + + let remaining = store.list_records_by_name(&zone.id, "api").await.unwrap(); + assert_eq!(remaining.len(), 1); + assert_eq!(remaining[0].id, second.id); + } } diff --git a/iam/crates/iam-server/src/rest.rs b/iam/crates/iam-server/src/rest.rs index 369d594..a3327b6 100644 --- a/iam/crates/iam-server/src/rest.rs +++ b/iam/crates/iam-server/src/rest.rs @@ -231,20 +231,20 @@ pub fn build_router(state: RestApiState) -> Router { .route("/api/v1/auth/token", post(issue_token)) .route("/api/v1/auth/verify", post(verify_token)) .route("/api/v1/users", get(list_users).post(create_user)) - .route("/api/v1/users/:id", get(get_user)) + .route("/api/v1/users/{id}", get(get_user)) .route( "/api/v1/orgs", get(list_organizations).post(create_organization), ) .route( - "/api/v1/orgs/:org_id", + "/api/v1/orgs/{org_id}", get(get_organization) .patch(update_organization) .delete(delete_organization), ) .route("/api/v1/projects", get(list_projects).post(create_project)) .route( - "/api/v1/orgs/:org_id/projects/:project_id", + "/api/v1/orgs/{org_id}/projects/{project_id}", get(get_project) .patch(update_project) .delete(delete_project), diff --git a/nightlight/crates/nightlight-server/src/query.rs b/nightlight/crates/nightlight-server/src/query.rs index 5627896..7ef90c0 100644 --- a/nightlight/crates/nightlight-server/src/query.rs +++ b/nightlight/crates/nightlight-server/src/query.rs @@ -902,6 +902,11 @@ impl QueryableStorage { #[cfg(test)] mod tests { use super::*; + use axum::{ + body::{to_bytes, Body}, + http::{Method, Request, StatusCode}, + }; + use tower::ServiceExt; #[tokio::test] async fn test_query_service_creation() { @@ -996,6 +1001,43 @@ mod tests { assert!(values.contains(&"test_job".to_string())); } + #[tokio::test] + async fn test_label_values_route() { + let service = QueryService::new(); + { + let mut storage = service.storage.write().await; + storage.upsert_series(TimeSeries { + id: SeriesId(1), + labels: vec![ + Label::new("__name__", "test_metric"), + Label::new("job", "test_job"), + ], + samples: vec![], + }); + } + + let app: axum::Router = service.router(); + let request = Request::builder() + .method(Method::GET) + .uri("/api/v1/label/job/values") + .body(Body::empty()) + .unwrap(); + let response = app + .oneshot(request) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + + let body = to_bytes(response.into_body(), 1024 * 1024).await.unwrap(); + let payload: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(payload["status"], "success"); + assert!(payload["data"] + .as_array() + .unwrap() + .iter() + .any(|value| value == "test_job")); + } + #[test] fn test_persistence_save_load_empty() { use tempfile::tempdir; diff --git a/nightlight/crates/nightlight-server/src/storage.rs b/nightlight/crates/nightlight-server/src/storage.rs index 63e8911..8da4126 100644 --- a/nightlight/crates/nightlight-server/src/storage.rs +++ b/nightlight/crates/nightlight-server/src/storage.rs @@ -202,6 +202,7 @@ pub struct StorageStats { #[cfg(test)] mod tests { use super::*; + use nightlight_types::Label; #[test] fn test_storage_creation() { diff --git a/nix-nos/lib/cluster-config-lib.nix b/nix-nos/lib/cluster-config-lib.nix index 4bef600..1b7292d 100644 --- a/nix-nos/lib/cluster-config-lib.nix +++ b/nix-nos/lib/cluster-config-lib.nix @@ -521,7 +521,7 @@ let }; mode = mkOption { - type = types.enum [ "load_balancer" "direct" ]; + type = types.enum [ "load_balancer" "direct" "direct_multi" ]; default = "load_balancer"; description = "Whether DNS publishes the load balancer VIP or a direct instance address"; }; diff --git a/nix/ci/flake.nix b/nix/ci/flake.nix index 197d60f..f776de1 100644 --- a/nix/ci/flake.nix +++ b/nix/ci/flake.nix @@ -62,7 +62,7 @@ PhotonCloud local CI gates (provider-agnostic) Usage: - photoncloud-gate [--tier 0|1|2] [--workspace ] [--shared-crates] [--no-logs] [--fix] + photoncloud-gate [--tier 0|1|2] [--workspace ] [--shared-crates] [--shared-crate ] [--no-logs] [--fix] Tiers: 0: fmt + clippy + unit tests (lib) (fast, stable default) @@ -79,6 +79,7 @@ tier="0" only_ws="" shared_crates="0" + only_shared_crate="" no_logs="0" fix="0" @@ -90,6 +91,8 @@ only_ws="$2"; shift 2;; --shared-crates) shared_crates="1"; shift 1;; + --shared-crate) + only_shared_crate="$2"; shift 2;; --no-logs) no_logs="1"; shift 1;; --fix) @@ -159,6 +162,8 @@ export LIBCLANG_PATH="${pkgs.llvmPackages.libclang.lib}/lib"; export PROTOC="${pkgs.protobuf}/bin/protoc" export ROCKSDB_LIB_DIR="${pkgs.rocksdb}/lib" + export CARGO_TARGET_DIR="$repo_root/work/cargo-target" + mkdir -p "$CARGO_TARGET_DIR" manifest_has_target_kind() { local manifest="$1"; shift @@ -211,8 +216,46 @@ fi } + run_shared_crate_clippy() { + local crate="$1"; shift + local manifest="$1"; shift + local ran_clippy="0" + + if manifest_has_target_kind "$manifest" "lib"; then + run_shared_crate_cmd "$crate" "$manifest" "clippy (lib)" "$CARGO_CLIPPY clippy --manifest-path \"$manifest\" --lib -- -D warnings" + ran_clippy="1" + fi + if manifest_has_target_kind "$manifest" "bin"; then + run_shared_crate_cmd "$crate" "$manifest" "clippy (bin)" "$CARGO_CLIPPY clippy --manifest-path \"$manifest\" --bins -- -D warnings" + ran_clippy="1" + fi + if [[ "$ran_clippy" == "0" ]]; then + echo "[gate][shared:$crate] WARN: no lib/bin targets for clippy" + fi + } + + run_workspace_clippy() { + local ws="$1"; shift + local manifest="$1"; shift + local ran_clippy="0" + + if manifest_has_target_kind "$manifest" "lib"; then + run_cmd "$ws" "clippy (lib)" "$CARGO_CLIPPY clippy --workspace --lib -- -D warnings" + ran_clippy="1" + fi + if manifest_has_target_kind "$manifest" "bin"; then + run_cmd "$ws" "clippy (bin)" "$CARGO_CLIPPY clippy --workspace --bins -- -D warnings" + ran_clippy="1" + fi + if [[ "$ran_clippy" == "0" ]]; then + echo "[gate][$ws] WARN: no lib/bin targets for clippy" + fi + } + run_shared_crates() { + local only_crate="$1" local manifests=() + local selected_count=0 while IFS= read -r manifest; do manifests+=("$manifest") done < <(find "$repo_root/crates" -mindepth 2 -maxdepth 2 -name Cargo.toml | sort) @@ -226,8 +269,12 @@ local crate local ran_unit_tests crate="$(basename "$(dirname "$manifest")")" + if [[ -n "$only_crate" && "$crate" != "$only_crate" ]]; then + continue + fi + selected_count=$((selected_count + 1)) run_shared_crate_cmd "$crate" "$manifest" "fmt" "$CARGO_FMT fmt --manifest-path \"$manifest\" $fmt_rustfmt_args" - run_shared_crate_cmd "$crate" "$manifest" "clippy" "$CARGO_CLIPPY clippy --manifest-path \"$manifest\" --all-targets -- -D warnings" + run_shared_crate_clippy "$crate" "$manifest" ran_unit_tests="0" if manifest_has_target_kind "$manifest" "lib"; then run_shared_crate_cmd "$crate" "$manifest" "test (tier0 unit lib)" "$CARGO test --manifest-path \"$manifest\" --lib" @@ -249,10 +296,22 @@ run_shared_crate_cmd "$crate" "$manifest" "test (tier2 ignored)" "$CARGO test --manifest-path \"$manifest\" --tests -- --ignored" fi done + + if [[ -n "$only_crate" && "$selected_count" -eq 0 ]]; then + echo "[gate] ERROR: shared crate not found: $only_crate" >&2 + exit 2 + fi } if [[ "$shared_crates" == "1" ]]; then - run_shared_crates + run_shared_crates "$only_shared_crate" + echo "" + echo "[gate] OK (tier=$tier, shared-crates)" + exit 0 + fi + + if [[ -n "$only_shared_crate" ]]; then + run_shared_crates "$only_shared_crate" echo "" echo "[gate] OK (tier=$tier, shared-crates)" exit 0 @@ -274,8 +333,9 @@ # # NOTE: Avoid `--all` here; with path-dependencies it may traverse outside the workspace directory. run_cmd "$ws" "fmt" "$CARGO_FMT fmt $fmt_rustfmt_args" - # Lint gate: call Nix-provided `cargo-clippy` directly (avoid resolving ~/.cargo/bin/cargo-clippy). - run_cmd "$ws" "clippy" "$CARGO_CLIPPY clippy --workspace --all-targets -- -D warnings" + # Tier0 clippy stays on lib/bin targets to avoid dragging in + # heavy integration-only dev-dependencies for every workspace. + run_workspace_clippy "$ws" "$workspace_manifest" ran_unit_tests="0" if manifest_has_target_kind "$workspace_manifest" "lib"; then run_cmd "$ws" "test (tier0 unit lib)" "$CARGO test --workspace --lib" diff --git a/nix/ci/workspaces.json b/nix/ci/workspaces.json index e9ccafe..186125f 100644 --- a/nix/ci/workspaces.json +++ b/nix/ci/workspaces.json @@ -8,7 +8,6 @@ ".github/workflows/nix.yml", "Cargo.toml", "Cargo.lock", - "crates/**", "baremetal/**", "scripts/**", "specifications/**", diff --git a/nix/test-cluster/common.nix b/nix/test-cluster/common.nix index be19a09..ccb5a5b 100644 --- a/nix/test-cluster/common.nix +++ b/nix/test-cluster/common.nix @@ -398,6 +398,14 @@ in timeoutSecs = 3; }; }; + publish = { + dns = { + zone = "native.cluster.test"; + name = "daemon"; + ttl = 30; + mode = "direct_multi"; + }; + }; }; }; }; diff --git a/nix/test-cluster/node01.nix b/nix/test-cluster/node01.nix index b00e225..b591b32 100644 --- a/nix/test-cluster/node01.nix +++ b/nix/test-cluster/node01.nix @@ -164,6 +164,7 @@ systemd.services.iam.environment = { IAM_ALLOW_RANDOM_SIGNING_KEY = "1"; + IAM_ALLOW_UNAUTHENTICATED_ADMIN = "true"; }; systemd.services.lightningstor.environment = { diff --git a/nix/test-cluster/node02.nix b/nix/test-cluster/node02.nix index 430fdf9..8215e69 100644 --- a/nix/test-cluster/node02.nix +++ b/nix/test-cluster/node02.nix @@ -68,5 +68,6 @@ systemd.services.iam.environment = { IAM_ALLOW_RANDOM_SIGNING_KEY = "1"; + IAM_ALLOW_UNAUTHENTICATED_ADMIN = "true"; }; } diff --git a/nix/test-cluster/node03.nix b/nix/test-cluster/node03.nix index d03704b..5e40f40 100644 --- a/nix/test-cluster/node03.nix +++ b/nix/test-cluster/node03.nix @@ -68,5 +68,6 @@ systemd.services.iam.environment = { IAM_ALLOW_RANDOM_SIGNING_KEY = "1"; + IAM_ALLOW_UNAUTHENTICATED_ADMIN = "true"; }; } diff --git a/nix/test-cluster/run-cluster.sh b/nix/test-cluster/run-cluster.sh index 03c834d..4bbe6b2 100755 --- a/nix/test-cluster/run-cluster.sh +++ b/nix/test-cluster/run-cluster.sh @@ -45,8 +45,15 @@ STORAGE_BENCHMARK_COMMAND="${PHOTON_VM_STORAGE_BENCH_COMMAND:-bench-storage}" LIGHTNINGSTOR_BENCH_CLIENT_NODE="${PHOTON_VM_LIGHTNINGSTOR_BENCH_CLIENT_NODE:-node06}" STORAGE_SKIP_PLASMAVMC_IMAGE_BENCH="${PHOTON_VM_SKIP_PLASMAVMC_IMAGE_BENCH:-0}" STORAGE_SKIP_PLASMAVMC_GUEST_RUNTIME_BENCH="${PHOTON_VM_SKIP_PLASMAVMC_GUEST_RUNTIME_BENCH:-0}" -CLUSTER_NIX_MAX_JOBS="${PHOTON_CLUSTER_NIX_MAX_JOBS:-2}" -CLUSTER_NIX_BUILD_CORES="${PHOTON_CLUSTER_NIX_BUILD_CORES:-4}" +HOST_CPU_COUNT="$(getconf _NPROCESSORS_ONLN 2>/dev/null || nproc 2>/dev/null || echo 4)" +DEFAULT_CLUSTER_NIX_MAX_JOBS=2 +DEFAULT_CLUSTER_NIX_BUILD_CORES=4 +if [[ "${HOST_CPU_COUNT}" =~ ^[0-9]+$ ]] && (( HOST_CPU_COUNT >= 12 )); then + DEFAULT_CLUSTER_NIX_MAX_JOBS=3 + DEFAULT_CLUSTER_NIX_BUILD_CORES=6 +fi +CLUSTER_NIX_MAX_JOBS="${PHOTON_CLUSTER_NIX_MAX_JOBS:-${DEFAULT_CLUSTER_NIX_MAX_JOBS}}" +CLUSTER_NIX_BUILD_CORES="${PHOTON_CLUSTER_NIX_BUILD_CORES:-${DEFAULT_CLUSTER_NIX_BUILD_CORES}}" BUILD_PROFILE="${PHOTON_CLUSTER_BUILD_PROFILE:-default}" CLUSTER_SKIP_BUILD="${PHOTON_CLUSTER_SKIP_BUILD:-0}" CLUSTER_LOCK_HELD=0 @@ -4894,9 +4901,10 @@ validate_native_runtime_flow() { } native_publication_state() { + local service="$1" native_dump_values "photoncloud/clusters/test-cluster/publications/" \ | sed '/^$/d' \ - | jq -sr 'map(select(.service == "native-web")) | first' + | jq -sr --arg service "${service}" 'map(select(.service == $service)) | first' } wait_for_native_dns_record() { @@ -4916,6 +4924,33 @@ validate_native_runtime_flow() { done } + wait_for_native_dns_records() { + local fqdn="$1" + local timeout="$2" + shift 2 + local expected_json actual_json + local deadline=$((SECONDS + timeout)) + + expected_json="$(printf '%s\n' "$@" | sed '/^$/d' | sort -u | jq -R . | jq -cs 'sort')" + + while true; do + actual_json="$( + ssh_node node01 "dig @127.0.0.1 -p 5353 +short ${fqdn} A" 2>/dev/null \ + | sed '/^$/d' \ + | sort -u \ + | jq -R . \ + | jq -cs 'sort' + )" || actual_json="[]" + if [[ "${actual_json}" == "${expected_json}" ]]; then + return 0 + fi + if (( SECONDS >= deadline )); then + die "timed out waiting for native DNS record set for ${fqdn}: expected ${expected_json}, got ${actual_json}" + fi + sleep 2 + done + } + wait_for_native_lb_backends() { local pool_id="$1" local expected_count="$2" @@ -5023,8 +5058,8 @@ validate_native_runtime_flow() { wait_for_native_dump_count \ "photoncloud/clusters/test-cluster/publications/" \ - 'map(select(.service == "native-web")) | length' \ - "1" \ + 'map(select(.service == "native-web" or .service == "native-daemon")) | length' \ + "2" \ 180 iam_tunnel="$(start_ssh_tunnel node01 15080 50080)" @@ -5041,15 +5076,20 @@ validate_native_runtime_flow() { | jq -e --arg name "${lb_name}" '.loadbalancers | any(.name == $name)' >/dev/null local publication_value publication_fqdn publication_ip publication_pool_id - publication_value="$(native_publication_state)" + local daemon_publication_value daemon_publication_fqdn + publication_value="$(native_publication_state "native-web")" publication_fqdn="$(printf '%s' "${publication_value}" | jq -r '.dns.fqdn')" publication_ip="$(printf '%s' "${publication_value}" | jq -r '.dns.value')" publication_pool_id="$(printf '%s' "${publication_value}" | jq -r '.load_balancer.pool_id')" - [[ -n "${publication_fqdn}" && "${publication_fqdn}" != "null" ]] || die "native publication missing fqdn" - [[ -n "${publication_ip}" && "${publication_ip}" != "null" ]] || die "native publication missing dns value" - [[ -n "${publication_pool_id}" && "${publication_pool_id}" != "null" ]] || die "native publication missing pool id" + daemon_publication_value="$(native_publication_state "native-daemon")" + daemon_publication_fqdn="$(printf '%s' "${daemon_publication_value}" | jq -r '.dns.fqdn')" + [[ -n "${publication_fqdn}" && "${publication_fqdn}" != "null" ]] || die "native-web publication missing fqdn" + [[ -n "${publication_ip}" && "${publication_ip}" != "null" ]] || die "native-web publication missing dns value" + [[ -n "${publication_pool_id}" && "${publication_pool_id}" != "null" ]] || die "native-web publication missing pool id" + [[ -n "${daemon_publication_fqdn}" && "${daemon_publication_fqdn}" != "null" ]] || die "native-daemon publication missing fqdn" wait_for_native_dns_record "${publication_fqdn}" "${publication_ip}" 180 + wait_for_native_dns_records "${daemon_publication_fqdn}" 180 10.100.0.21 10.100.0.22 wait_for_native_lb_backends "${publication_pool_id}" "2" 180 10.100.0.21 10.100.0.22 log "Draining node04 through deployer lifecycle state" @@ -5101,11 +5141,13 @@ validate_native_runtime_flow() { wait_for_http node05 "http://10.100.0.22:18192/" 240 wait_for_http node05 "http://10.100.0.22:18193/" 240 wait_for_http node01 "http://127.0.0.1:18191/" 240 - publication_value="$(native_publication_state)" + publication_value="$(native_publication_state "native-web")" publication_pool_id="$(printf '%s' "${publication_value}" | jq -r '.load_balancer.pool_id')" publication_ip="$(printf '%s' "${publication_value}" | jq -r '.dns.value')" + daemon_publication_value="$(native_publication_state "native-daemon")" wait_for_native_lb_backends "${publication_pool_id}" "1" 180 10.100.0.22 wait_for_native_dns_record "${publication_fqdn}" "${publication_ip}" 180 + wait_for_native_dns_records "${daemon_publication_fqdn}" 180 10.100.0.22 log "Restoring node04 and ensuring capacity returns without moving healthy singleton work" set_native_node_state "node04" "active" @@ -5147,11 +5189,13 @@ validate_native_runtime_flow() { restored_container_value="$(wait_for_native_instance_node "native-container" "node05" 240)" restored_container_node="$(printf '%s' "${restored_container_value}" | jq -r '.node_id')" [[ "${restored_container_node}" == "node05" ]] || die "native-container unexpectedly moved after node04 returned to service" - publication_value="$(native_publication_state)" + publication_value="$(native_publication_state "native-web")" publication_pool_id="$(printf '%s' "${publication_value}" | jq -r '.load_balancer.pool_id')" publication_ip="$(printf '%s' "${publication_value}" | jq -r '.dns.value')" + daemon_publication_value="$(native_publication_state "native-daemon")" wait_for_native_lb_backends "${publication_pool_id}" "2" 180 10.100.0.21 10.100.0.22 wait_for_native_dns_record "${publication_fqdn}" "${publication_ip}" 180 + wait_for_native_dns_records "${daemon_publication_fqdn}" 180 10.100.0.21 10.100.0.22 wait_for_http node01 "http://127.0.0.1:18191/" 240 log "Simulating native worker loss and scheduler failover" @@ -5182,11 +5226,13 @@ validate_native_runtime_flow() { failover_container_node="$(printf '%s' "${failover_container_value}" | jq -r '.node_id')" [[ "${failover_container_node}" == "node04" ]] || die "native-container did not fail over to node04 after node05 stopped" wait_for_native_instance_node "native-daemon" "node04" 240 >/dev/null - publication_value="$(native_publication_state)" + publication_value="$(native_publication_state "native-web")" publication_pool_id="$(printf '%s' "${publication_value}" | jq -r '.load_balancer.pool_id')" publication_ip="$(printf '%s' "${publication_value}" | jq -r '.dns.value')" + daemon_publication_value="$(native_publication_state "native-daemon")" wait_for_native_lb_backends "${publication_pool_id}" "1" 240 10.100.0.21 wait_for_native_dns_record "${publication_fqdn}" "${publication_ip}" 180 + wait_for_native_dns_records "${daemon_publication_fqdn}" 180 10.100.0.21 wait_for_http node04 "http://10.100.0.21:18190/" 240 wait_for_http node04 "http://10.100.0.21:18192/" 240 wait_for_http node04 "http://10.100.0.21:18193/" 240 @@ -5236,11 +5282,13 @@ validate_native_runtime_flow() { recovered_container_value="$(wait_for_native_instance_node "native-container" "node04" 240)" recovered_container_node="$(printf '%s' "${recovered_container_value}" | jq -r '.node_id')" [[ "${recovered_container_node}" == "node04" ]] || die "native-container unexpectedly churned after node05 recovered" - publication_value="$(native_publication_state)" + publication_value="$(native_publication_state "native-web")" publication_pool_id="$(printf '%s' "${publication_value}" | jq -r '.load_balancer.pool_id')" publication_ip="$(printf '%s' "${publication_value}" | jq -r '.dns.value')" + daemon_publication_value="$(native_publication_state "native-daemon")" wait_for_native_lb_backends "${publication_pool_id}" "2" 180 10.100.0.21 10.100.0.22 wait_for_native_dns_record "${publication_fqdn}" "${publication_ip}" 180 + wait_for_native_dns_records "${daemon_publication_fqdn}" 180 10.100.0.21 10.100.0.22 wait_for_http node01 "http://127.0.0.1:18191/" 240 trap - RETURN diff --git a/nix/test-cluster/storage-node01.nix b/nix/test-cluster/storage-node01.nix index d773127..233f08e 100644 --- a/nix/test-cluster/storage-node01.nix +++ b/nix/test-cluster/storage-node01.nix @@ -126,6 +126,7 @@ systemd.services.iam.environment = { IAM_ALLOW_RANDOM_SIGNING_KEY = "1"; + IAM_ALLOW_UNAUTHENTICATED_ADMIN = "true"; }; systemd.services.lightningstor.environment = { diff --git a/nix/test-cluster/storage-node02.nix b/nix/test-cluster/storage-node02.nix index 77a12f2..c781538 100644 --- a/nix/test-cluster/storage-node02.nix +++ b/nix/test-cluster/storage-node02.nix @@ -70,5 +70,6 @@ systemd.services.iam.environment = { IAM_ALLOW_RANDOM_SIGNING_KEY = "1"; + IAM_ALLOW_UNAUTHENTICATED_ADMIN = "true"; }; } diff --git a/nix/test-cluster/storage-node03.nix b/nix/test-cluster/storage-node03.nix index 4cad017..136c799 100644 --- a/nix/test-cluster/storage-node03.nix +++ b/nix/test-cluster/storage-node03.nix @@ -70,5 +70,6 @@ systemd.services.iam.environment = { IAM_ALLOW_RANDOM_SIGNING_KEY = "1"; + IAM_ALLOW_UNAUTHENTICATED_ADMIN = "true"; }; } diff --git a/scripts/ci_changed_workspaces.py b/scripts/ci_changed_workspaces.py index c3e5bb4..3ed61ba 100644 --- a/scripts/ci_changed_workspaces.py +++ b/scripts/ci_changed_workspaces.py @@ -4,6 +4,7 @@ import argparse import fnmatch import json from pathlib import Path +import tomllib from typing import Any @@ -24,26 +25,149 @@ def matches_any(path: str, patterns: list[str]) -> bool: return any(fnmatch.fnmatchcase(path, pattern) for pattern in patterns) -def detect_changes(config: dict[str, Any], changed_files: list[str]) -> dict[str, Any]: +def workspace_root(workspace: dict[str, Any]) -> str: + for pattern in workspace["paths"]: + root = pattern.split("/", 1)[0] + if root and "*" not in root and "?" not in root: + return root + raise ValueError(f"Could not infer workspace root for {workspace['name']}") + + +def collect_path_dependencies(obj: Any) -> list[str]: + path_dependencies: list[str] = [] + + if isinstance(obj, dict): + path = obj.get("path") + if isinstance(path, str): + path_dependencies.append(path) + for value in obj.values(): + path_dependencies.extend(collect_path_dependencies(value)) + elif isinstance(obj, list): + for value in obj: + path_dependencies.extend(collect_path_dependencies(value)) + + return path_dependencies + + +def build_nodes(config: dict[str, Any], repo_root: Path) -> dict[str, dict[str, Any]]: + nodes: dict[str, dict[str, Any]] = {} + + for workspace in config["workspaces"]: + nodes[workspace["name"]] = { + "name": workspace["name"], + "kind": "workspace", + "root": repo_root / workspace_root(workspace), + } + + crates_dir = repo_root / "crates" + if crates_dir.is_dir(): + for manifest in sorted(crates_dir.glob("*/Cargo.toml")): + crate_name = manifest.parent.name + nodes[f"crate:{crate_name}"] = { + "name": crate_name, + "kind": "shared_crate", + "root": manifest.parent, + } + + return nodes + + +def resolve_node_for_path(path: Path, nodes: dict[str, dict[str, Any]]) -> str | None: + for node_name, node in sorted( + nodes.items(), + key=lambda item: len(str(item[1]["root"])), + reverse=True, + ): + try: + path.relative_to(node["root"]) + except ValueError: + continue + return node_name + return None + + +def build_dependency_graph( + nodes: dict[str, dict[str, Any]], +) -> dict[str, set[str]]: + graph = {node_name: set() for node_name in nodes} + + for node_name, node in nodes.items(): + root = node["root"] + for manifest in root.rglob("Cargo.toml"): + manifest_data = tomllib.loads(manifest.read_text()) + for dependency_path in collect_path_dependencies(manifest_data): + resolved_dependency = (manifest.parent / dependency_path).resolve() + dependency_node = resolve_node_for_path(resolved_dependency, nodes) + if dependency_node is None or dependency_node == node_name: + continue + graph[node_name].add(dependency_node) + + return graph + + +def reverse_graph(graph: dict[str, set[str]]) -> dict[str, set[str]]: + reversed_graph = {node_name: set() for node_name in graph} + for node_name, dependencies in graph.items(): + for dependency in dependencies: + reversed_graph[dependency].add(node_name) + return reversed_graph + + +def impacted_nodes( + changed_nodes: set[str], + reversed_dependencies: dict[str, set[str]], +) -> set[str]: + selected = set(changed_nodes) + queue = list(changed_nodes) + + while queue: + current = queue.pop() + for dependent in reversed_dependencies[current]: + if dependent in selected: + continue + selected.add(dependent) + queue.append(dependent) + + return selected + + +def detect_changes( + config: dict[str, Any], + changed_files: list[str], + repo_root: Path, +) -> dict[str, Any]: workspaces: list[dict[str, Any]] = config["workspaces"] all_workspace_names = [workspace["name"] for workspace in workspaces] + nodes = build_nodes(config, repo_root) + dependency_graph = build_dependency_graph(nodes) + reversed_dependencies = reverse_graph(dependency_graph) global_changed = any( matches_any(path, config["global_paths"]) for path in changed_files ) - shared_crates_changed = any( - matches_any(path, config["shared_crates_paths"]) + directly_changed_nodes = { + node_name for path in changed_files + for node_name in [resolve_node_for_path((repo_root / path).resolve(), nodes)] + if node_name is not None + } + + shared_crates = sorted( + nodes[node_name]["name"] + for node_name in directly_changed_nodes + if nodes[node_name]["kind"] == "shared_crate" ) + shared_crates_changed = bool(shared_crates) if global_changed: changed_workspaces = all_workspace_names else: + selected_nodes = impacted_nodes(directly_changed_nodes, reversed_dependencies) changed_workspaces = [ workspace["name"] for workspace in workspaces - if any(matches_any(path, workspace["paths"]) for path in changed_files) + if workspace["name"] in selected_nodes ] selected_workspaces = set(changed_workspaces) @@ -70,6 +194,7 @@ def detect_changes(config: dict[str, Any], changed_files: list[str]) -> dict[str "any_changed": global_changed or bool(changed_workspaces), "build_changed": bool(build_targets), "global_changed": global_changed, + "shared_crates": shared_crates, "shared_crates_changed": shared_crates_changed, } @@ -81,6 +206,7 @@ def write_github_output(path: Path, result: dict[str, Any]) -> None: "any_changed": str(result["any_changed"]).lower(), "build_changed": str(result["build_changed"]).lower(), "global_changed": str(result["global_changed"]).lower(), + "shared_crates": json.dumps(result["shared_crates"], separators=(",", ":")), "shared_crates_changed": str(result["shared_crates_changed"]).lower(), } @@ -119,9 +245,11 @@ def parse_args() -> argparse.Namespace: def main() -> int: args = parse_args() - config = json.loads(Path(args.config).read_text()) + config_path = Path(args.config).resolve() + repo_root = config_path.parents[2] + config = json.loads(config_path.read_text()) changed_files = load_changed_files(args) - result = detect_changes(config, changed_files) + result = detect_changes(config, changed_files, repo_root) if args.github_output: write_github_output(Path(args.github_output), result)