photoncloud-monorepo/deployer/crates/deployer-server/src/local_storage.rs

362 lines
11 KiB
Rust

//! Local file-backed storage for Deployer bootstrap state.
use std::collections::{HashMap, HashSet};
use std::fs::{self, OpenOptions};
use std::io::Write;
#[cfg(unix)]
use std::os::unix::fs::{OpenOptionsExt, PermissionsExt};
use std::path::{Path, PathBuf};
use std::process::Command;
use std::time::{SystemTime, UNIX_EPOCH};
use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use tracing::{debug, warn};
use crate::cluster::ClusterNodeRecord;
use crate::tls::issue_node_cert;
use deployer_types::{NodeConfig, NodeInfo};
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
struct LocalState {
machine_configs: HashMap<String, (String, NodeConfig)>,
nodes: HashMap<String, NodeInfo>,
cluster_nodes: HashMap<String, ClusterNodeRecord>,
ssh_host_keys: HashMap<String, String>,
tls_material: HashMap<String, TlsMaterial>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct TlsMaterial {
cert: String,
key: String,
}
/// Local file-backed storage for node state.
pub struct LocalStorage {
state_path: PathBuf,
state: LocalState,
}
impl LocalStorage {
/// Open or create local storage at the given path.
///
/// If the path is a directory, `state.json` will be created within it.
pub fn open(path: impl AsRef<Path>) -> Result<Self> {
let state_path = resolve_state_path(path.as_ref());
if let Some(parent) = state_path.parent() {
fs::create_dir_all(parent).with_context(|| {
format!("failed to create local state dir {}", parent.display())
})?;
}
let state = if state_path.exists() {
let contents = fs::read_to_string(&state_path)
.with_context(|| format!("failed to read local state {}", state_path.display()))?;
serde_json::from_str::<LocalState>(&contents)
.with_context(|| format!("failed to parse local state {}", state_path.display()))?
} else {
LocalState::default()
};
Ok(Self { state_path, state })
}
pub fn register_node(
&mut self,
machine_id: &str,
node_id: &str,
config: &NodeConfig,
) -> Result<()> {
if let Some((existing_id, _)) = self.state.machine_configs.get(machine_id) {
if existing_id != node_id {
anyhow::bail!(
"machine_id {} already mapped to {}",
machine_id,
existing_id
);
}
}
self.state.machine_configs.insert(
machine_id.to_string(),
(node_id.to_string(), config.clone()),
);
self.save()
}
pub fn get_node_config(&self, machine_id: &str) -> Option<(String, NodeConfig)> {
self.state.machine_configs.get(machine_id).cloned()
}
pub fn store_node_info(&mut self, node_info: &NodeInfo) -> Result<()> {
self.state
.nodes
.insert(node_info.id.clone(), node_info.clone());
self.save()
}
pub fn get_node_info(&self, node_id: &str) -> Option<NodeInfo> {
self.state.nodes.get(node_id).cloned()
}
pub fn list_nodes(&self) -> Vec<NodeInfo> {
self.state.nodes.values().cloned().collect()
}
pub fn list_machine_configs(&self) -> Vec<(String, String, NodeConfig)> {
self.state
.machine_configs
.iter()
.map(|(machine_id, (node_id, config))| {
(machine_id.clone(), node_id.clone(), config.clone())
})
.collect()
}
pub fn store_cluster_node(
&mut self,
cluster_namespace: &str,
cluster_id: &str,
node_id: &str,
record: &ClusterNodeRecord,
) -> Result<()> {
let key = cluster_key(cluster_namespace, cluster_id, node_id);
self.state.cluster_nodes.insert(key, record.clone());
self.save()
}
pub fn list_cluster_nodes(
&self,
cluster_namespace: &str,
cluster_id: &str,
) -> Vec<ClusterNodeRecord> {
let prefix = cluster_prefix(cluster_namespace, cluster_id);
let legacy_prefix = legacy_cluster_prefix(cluster_namespace, cluster_id);
let mut seen = HashSet::new();
let mut nodes = Vec::new();
for (key, record) in self.state.cluster_nodes.iter() {
if key.starts_with(&prefix) || key.starts_with(&legacy_prefix) {
if seen.insert(record.node_id.clone()) {
nodes.push(record.clone());
}
}
}
nodes
}
pub fn get_or_generate_ssh_host_key(&mut self, node_id: &str) -> Result<String> {
if let Some(key) = self.state.ssh_host_keys.get(node_id) {
return Ok(key.clone());
}
let key = generate_ssh_host_key(node_id, self.state_path.parent())?;
self.state
.ssh_host_keys
.insert(node_id.to_string(), key.clone());
self.save()?;
Ok(key)
}
pub fn get_or_generate_tls_cert(
&mut self,
node_id: &str,
hostname: &str,
ip: &str,
ca_cert_path: Option<&str>,
ca_key_path: Option<&str>,
) -> Result<(String, String)> {
if let Some(entry) = self.state.tls_material.get(node_id) {
return Ok((entry.cert.clone(), entry.key.clone()));
}
let (cert, key) = issue_node_cert(node_id, hostname, ip, ca_cert_path, ca_key_path)?;
self.state.tls_material.insert(
node_id.to_string(),
TlsMaterial {
cert: cert.clone(),
key: key.clone(),
},
);
self.save()?;
Ok((cert, key))
}
fn save(&self) -> Result<()> {
let data = serde_json::to_vec_pretty(&self.state)?;
let tmp_path = tmp_path_for(&self.state_path);
if let Some(parent) = self.state_path.parent() {
fs::create_dir_all(parent).with_context(|| {
format!("failed to create local state dir {}", parent.display())
})?;
}
let mut options = OpenOptions::new();
options.create(true).write(true).truncate(true);
#[cfg(unix)]
{
options.mode(0o600);
}
let mut file = options
.open(&tmp_path)
.with_context(|| format!("failed to open temp state {}", tmp_path.display()))?;
file.write_all(&data)
.with_context(|| format!("failed to write temp state {}", tmp_path.display()))?;
file.sync_all()
.with_context(|| format!("failed to sync temp state {}", tmp_path.display()))?;
fs::rename(&tmp_path, &self.state_path)
.with_context(|| format!("failed to persist state {}", self.state_path.display()))?;
#[cfg(unix)]
{
fs::set_permissions(&self.state_path, fs::Permissions::from_mode(0o600)).with_context(
|| format!("failed to set permissions on {}", self.state_path.display()),
)?;
}
Ok(())
}
}
fn resolve_state_path(path: &Path) -> PathBuf {
if let Ok(meta) = fs::metadata(path) {
if meta.is_file() {
return path.to_path_buf();
}
if meta.is_dir() {
return path.join("state.json");
}
}
if path.extension().is_some() {
path.to_path_buf()
} else {
path.join("state.json")
}
}
fn tmp_path_for(path: &Path) -> PathBuf {
let mut tmp = path.to_path_buf();
let ext = match path.extension().and_then(|s| s.to_str()) {
Some(ext) => format!("{}.tmp", ext),
None => "tmp".to_string(),
};
tmp.set_extension(ext);
tmp
}
fn cluster_prefix(cluster_namespace: &str, cluster_id: &str) -> String {
format!("{}/clusters/{}/nodes/", cluster_namespace, cluster_id)
}
fn cluster_key(cluster_namespace: &str, cluster_id: &str, node_id: &str) -> String {
format!("{}{node_id}", cluster_prefix(cluster_namespace, cluster_id))
}
fn legacy_cluster_prefix(cluster_namespace: &str, cluster_id: &str) -> String {
format!("{}/{}/", cluster_namespace, cluster_id)
}
fn generate_ssh_host_key(node_id: &str, parent: Option<&Path>) -> Result<String> {
let base_dir = parent.map(PathBuf::from).unwrap_or_else(std::env::temp_dir);
let ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
let filename = format!("ssh_host_key_{}_{}", node_id, ts);
let key_path = base_dir.join(filename);
let status = Command::new("ssh-keygen")
.arg("-t")
.arg("ed25519")
.arg("-N")
.arg("")
.arg("-f")
.arg(&key_path)
.status()
.with_context(|| "failed to execute ssh-keygen")?;
if !status.success() {
anyhow::bail!("ssh-keygen failed with status {}", status);
}
let key = fs::read_to_string(&key_path)
.with_context(|| format!("failed to read ssh host key {}", key_path.display()))?;
if let Err(e) = fs::remove_file(&key_path) {
warn!(error = %e, "failed to remove temporary ssh key file");
}
let pub_path = key_path.with_extension("pub");
if let Err(e) = fs::remove_file(&pub_path) {
debug!(error = %e, "failed to remove temporary ssh public key file");
}
Ok(key)
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
use std::fs;
fn temp_state_dir() -> PathBuf {
let mut dir = std::env::temp_dir();
let ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
dir.push(format!("deployer-local-state-{}", ts));
dir
}
#[test]
fn test_local_storage_roundtrip() {
let dir = temp_state_dir();
let mut storage = LocalStorage::open(&dir).expect("open storage");
let config = NodeConfig {
hostname: "node01".to_string(),
role: "control-plane".to_string(),
ip: "10.0.1.10".to_string(),
services: vec!["chainfire".to_string()],
ssh_authorized_keys: vec![],
labels: HashMap::new(),
pool: None,
node_class: None,
failure_domain: None,
nix_profile: None,
install_plan: None,
};
storage
.register_node("machine-1", "node01", &config)
.expect("register node");
let node_info = NodeInfo {
id: "node01".to_string(),
machine_id: Some("machine-1".to_string()),
hostname: "node01".to_string(),
ip: "10.0.1.10".to_string(),
state: deployer_types::NodeState::Provisioning,
cluster_config_hash: "hash".to_string(),
last_heartbeat: chrono::Utc::now(),
metadata: HashMap::new(),
};
storage
.store_node_info(&node_info)
.expect("store node info");
let reopened = LocalStorage::open(&dir).expect("reopen storage");
let loaded = reopened.get_node_config("machine-1");
assert!(loaded.is_some());
let (_, loaded_config) = loaded.unwrap();
assert_eq!(loaded_config.hostname, "node01");
let loaded_node = reopened.get_node_info("node01").expect("node info");
assert_eq!(loaded_node.hostname, "node01");
let _ = fs::remove_dir_all(dir);
}
}