photoncloud-monorepo/deployer/scripts/verify-fleet-scheduler-e2e.sh

420 lines
9.4 KiB
Bash
Executable file

#!/usr/bin/env bash
set -euo pipefail
ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)"
if [[ -z "${PHOTONCLOUD_E2E_IN_NIX:-}" ]]; then
exec nix develop "$ROOT" -c env PHOTONCLOUD_E2E_IN_NIX=1 bash "$0" "$@"
fi
tmp_dir="$(mktemp -d)"
cf_pid=""
cleanup() {
set +e
if [[ -d "$tmp_dir/pids" ]]; then
while IFS= read -r -d '' pid_file; do
[[ -f "$pid_file" ]] || continue
kill "$(cat "$pid_file")" 2>/dev/null || true
done < <(find "$tmp_dir/pids" -type f -name '*.pid' -print0 2>/dev/null)
fi
if [[ -n "$cf_pid" ]]; then
kill "$cf_pid" 2>/dev/null || true
wait "$cf_pid" 2>/dev/null || true
fi
rm -rf "$tmp_dir"
}
trap cleanup EXIT
free_port() {
python3 - <<'PY'
import socket
s = socket.socket()
s.bind(("127.0.0.1", 0))
print(s.getsockname()[1])
s.close()
PY
}
wait_for_port() {
local host="$1"
local port="$2"
local timeout_secs="${3:-60}"
local deadline=$((SECONDS + timeout_secs))
while (( SECONDS < deadline )); do
if python3 - "$host" "$port" <<'PY'
import socket
import sys
host = sys.argv[1]
port = int(sys.argv[2])
with socket.socket() as sock:
sock.settimeout(0.5)
try:
sock.connect((host, port))
except OSError:
raise SystemExit(1)
raise SystemExit(0)
PY
then
return 0
fi
sleep 1
done
echo "timed out waiting for ${host}:${port}" >&2
return 1
}
api_port="$(free_port)"
http_port="$(free_port)"
raft_port="$(free_port)"
gossip_port="$(free_port)"
cat >"$tmp_dir/chainfire.toml" <<EOF
[node]
id = 1
name = "chainfire-1"
role = "control_plane"
[storage]
data_dir = "$tmp_dir/chainfire-data"
[network]
api_addr = "127.0.0.1:${api_port}"
http_addr = "127.0.0.1:${http_port}"
raft_addr = "127.0.0.1:${raft_port}"
gossip_addr = "127.0.0.1:${gossip_port}"
[cluster]
id = 1
initial_members = []
bootstrap = true
[raft]
role = "voter"
EOF
mkdir -p "$tmp_dir/pids"
echo "Starting ChainFire on 127.0.0.1:${api_port}"
cargo run --manifest-path "$ROOT/chainfire/Cargo.toml" -p chainfire-server -- \
--config "$tmp_dir/chainfire.toml" \
>"$tmp_dir/chainfire.log" 2>&1 &
cf_pid="$!"
wait_for_port "127.0.0.1" "$api_port" 120
cat >"$tmp_dir/cluster.yaml" <<'EOF'
cluster:
cluster_id: test-cluster
environment: dev
node_classes:
- name: worker-linux
description: Standard worker nodes
nix_profile: profiles/worker-linux
roles:
- worker
labels:
tier: general
pools:
- name: general
description: Default capacity pool
node_class: worker-linux
min_size: 2
max_size: 10
labels:
env: dev
nodes:
- node_id: node01
hostname: node01
ip: 127.0.0.2
pool: general
failure_domain: rack-a
state: pending
- node_id: node02
hostname: node02
ip: 127.0.0.3
pool: general
failure_domain: rack-b
state: pending
services:
- name: api
ports:
http: 18080
protocol: http
schedule:
replicas: 2
placement:
roles:
- worker
pools:
- general
node_classes:
- worker-linux
match_labels:
tier: general
spread_by_label: failure_domain
max_instances_per_node: 1
instance_port: 18080
process:
command: python3
args:
- -m
- http.server
- ${INSTANCE_PORT}
- --bind
- ${INSTANCE_IP}
health_check:
type: http
path: /
interval_secs: 1
timeout_secs: 2
EOF
cat >"$tmp_dir/cluster-scaled.yaml" <<'EOF'
cluster:
cluster_id: test-cluster
environment: dev
node_classes:
- name: worker-linux
description: Standard worker nodes
nix_profile: profiles/worker-linux
roles:
- worker
labels:
tier: general
pools:
- name: general
description: Default capacity pool
node_class: worker-linux
min_size: 2
max_size: 10
labels:
env: dev
nodes:
- node_id: node01
hostname: node01
ip: 127.0.0.2
pool: general
failure_domain: rack-a
state: active
- node_id: node02
hostname: node02
ip: 127.0.0.3
pool: general
failure_domain: rack-b
state: active
services:
- name: api
ports:
http: 18080
protocol: http
schedule:
replicas: 1
placement:
roles:
- worker
pools:
- general
node_classes:
- worker-linux
match_labels:
tier: general
spread_by_label: failure_domain
max_instances_per_node: 1
instance_port: 18080
process:
command: python3
args:
- -m
- http.server
- ${INSTANCE_PORT}
- --bind
- ${INSTANCE_IP}
health_check:
type: http
path: /
interval_secs: 1
timeout_secs: 2
EOF
endpoint="http://127.0.0.1:${api_port}"
run_deployer_ctl() {
cargo run --quiet --manifest-path "$ROOT/deployer/Cargo.toml" -p deployer-ctl -- \
--chainfire-endpoint "$endpoint" \
--cluster-id test-cluster \
"$@"
}
run_node_agent_once() {
local node_id="$1"
local pid_dir="$tmp_dir/pids/$node_id"
mkdir -p "$pid_dir"
cargo run --quiet --manifest-path "$ROOT/deployer/Cargo.toml" -p node-agent -- \
--chainfire-endpoint "$endpoint" \
--cluster-id test-cluster \
--node-id "$node_id" \
--pid-dir "$pid_dir" \
--interval-secs 1 \
--apply \
--once
}
run_scheduler_once() {
cargo run --quiet --manifest-path "$ROOT/deployer/Cargo.toml" -p fleet-scheduler -- \
--chainfire-endpoint "$endpoint" \
--cluster-id test-cluster \
--interval-secs 1 \
--once
}
echo "Applying cluster declaration"
run_deployer_ctl apply --config "$tmp_dir/cluster.yaml"
echo "Activating nodes through node-agent"
run_node_agent_once node01
run_node_agent_once node02
echo "Scheduling managed instances"
run_scheduler_once
echo "Reconciling processes and health"
for _ in 1 2 3; do
run_node_agent_once node01
run_node_agent_once node02
sleep 1
done
echo "Validating HTTP endpoints"
python3 - <<'PY'
import urllib.request
for address in ("http://127.0.0.2:18080/", "http://127.0.0.3:18080/"):
with urllib.request.urlopen(address, timeout=5) as response:
body = response.read().decode("utf-8")
if response.status != 200:
raise SystemExit(f"{address} returned {response.status}")
if "Directory listing" not in body and "DOCTYPE" not in body:
raise SystemExit(f"{address} returned unexpected body")
print("HTTP endpoints are healthy")
PY
echo "Inspecting instance state in ChainFire"
run_deployer_ctl dump --prefix "photoncloud/clusters/test-cluster/instances/api/" >"$tmp_dir/instances.dump"
python3 - "$tmp_dir/instances.dump" <<'PY'
import json
import sys
path = sys.argv[1]
instances = []
with open(path, "r", encoding="utf-8") as handle:
for line in handle:
line = line.strip()
if not line:
continue
marker = " value="
if marker not in line:
continue
value = line.split(marker, 1)[1]
instances.append(json.loads(value))
if len(instances) != 2:
raise SystemExit(f"expected 2 scheduled instances, found {len(instances)}")
node_ids = sorted(instance["node_id"] for instance in instances)
states = sorted(instance.get("state") for instance in instances)
if node_ids != ["node01", "node02"]:
raise SystemExit(f"unexpected node placement: {node_ids}")
if states != ["healthy", "healthy"]:
raise SystemExit(f"unexpected health states: {states}")
print("Observed two healthy scheduled instances across node01 and node02")
PY
echo "Applying scaled declaration"
run_deployer_ctl apply --config "$tmp_dir/cluster-scaled.yaml" --prune
echo "Re-running scheduler after scale-down"
run_scheduler_once
echo "Reconciling processes and health after scale-down"
for _ in 1 2 3; do
run_node_agent_once node01
run_node_agent_once node02
sleep 1
done
echo "Inspecting scaled instance state in ChainFire"
run_deployer_ctl dump --prefix "photoncloud/clusters/test-cluster/instances/api/" >"$tmp_dir/instances-scaled.dump"
python3 - "$tmp_dir/instances-scaled.dump" <<'PY'
import json
import sys
path = sys.argv[1]
instances = []
with open(path, "r", encoding="utf-8") as handle:
for line in handle:
line = line.strip()
if not line:
continue
marker = " value="
if marker not in line:
continue
value = line.split(marker, 1)[1]
instances.append(json.loads(value))
if len(instances) != 1:
raise SystemExit(f"expected 1 scheduled instance after scale-down, found {len(instances)}")
instance = instances[0]
if instance["node_id"] != "node01":
raise SystemExit(f"expected remaining instance on node01, found {instance['node_id']}")
if instance.get("state") != "healthy":
raise SystemExit(f"expected remaining instance to be healthy, found {instance.get('state')}")
print("Observed one healthy scheduled instance on node01 after scale-down")
PY
echo "Validating endpoint convergence after scale-down"
python3 - <<'PY'
import socket
import urllib.request
with urllib.request.urlopen("http://127.0.0.2:18080/", timeout=5) as response:
if response.status != 200:
raise SystemExit(f"node01 endpoint returned {response.status}")
sock = socket.socket()
sock.settimeout(1.5)
try:
sock.connect(("127.0.0.3", 18080))
except OSError:
pass
else:
raise SystemExit("node02 endpoint still accepts connections after scale-down")
finally:
sock.close()
print("Endpoint convergence validated")
PY
echo "Fleet scheduler E2E verification passed"