photoncloud-monorepo/deployer/crates/node-agent/src/process.rs

721 lines
23 KiB
Rust

use std::collections::{HashMap, HashSet};
use std::env;
use std::fs::{self, OpenOptions};
use std::os::unix::fs::PermissionsExt;
use std::path::{Path, PathBuf};
use std::process::Stdio;
use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use deployer_types::{ContainerSpec, ProcessSpec};
use serde::{Deserialize, Serialize};
use tokio::process::{Child, Command};
use tracing::{info, warn};
fn normalized_pull_policy(policy: &str) -> &str {
match policy {
"if-not-present" => "missing",
other => other,
}
}
fn sanitize_container_name(service: &str, instance_id: &str) -> String {
format!("{service}-{instance_id}")
.chars()
.map(|ch| {
if ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-') {
ch
} else {
'-'
}
})
.collect()
}
pub fn render_container_process_spec(
service: &str,
instance_id: &str,
host_port: u16,
container: &ContainerSpec,
) -> ProcessSpec {
let runtime = container
.runtime
.clone()
.unwrap_or_else(|| "podman".to_string());
let container_name = sanitize_container_name(service, instance_id);
let network_mode = container.network_mode.as_deref();
let host_network = matches!(network_mode, Some("host"));
let mut args = vec![
"run".to_string(),
"--rm".to_string(),
"--name".to_string(),
container_name,
];
if runtime == "podman" {
args.push("--replace".to_string());
}
if let Some(pull_policy) = container.pull_policy.as_deref() {
args.push("--pull".to_string());
args.push(normalized_pull_policy(pull_policy).to_string());
}
if let Some(network_mode) = network_mode {
args.push("--network".to_string());
args.push(network_mode.to_string());
}
if let Some(working_dir) = container.working_dir.as_deref() {
args.push("--workdir".to_string());
args.push(working_dir.to_string());
}
let mut env_pairs: Vec<_> = container.env.iter().collect();
env_pairs.sort_by(|lhs, rhs| lhs.0.cmp(rhs.0));
for (key, value) in env_pairs {
args.push("--env".to_string());
args.push(format!("{key}={value}"));
}
for volume in &container.volumes {
args.push("--volume".to_string());
let mut value = format!("{}:{}", volume.source, volume.target);
if volume.read_only {
value.push_str(":ro");
}
args.push(value);
}
if !host_network {
if container.ports.is_empty() {
args.push("--publish".to_string());
args.push(format!("{host_port}:{host_port}"));
} else {
for (index, port) in container.ports.iter().enumerate() {
let published = port.host_port.unwrap_or_else(|| {
if index == 0 {
host_port
} else {
port.container_port
}
});
let mut mapping = format!("{published}:{}", port.container_port);
if let Some(protocol) = port.protocol.as_deref() {
mapping.push('/');
mapping.push_str(protocol);
}
args.push("--publish".to_string());
args.push(mapping);
}
}
}
args.push(container.image.clone());
args.extend(container.command.iter().cloned());
args.extend(container.args.iter().cloned());
ProcessSpec {
command: runtime,
args,
working_dir: None,
env: Default::default(),
}
}
#[derive(Debug)]
pub struct ManagedProcess {
pub service: String,
pub instance_id: String,
pub spec: ProcessSpec,
pub child: Option<Child>,
pub started_at: Option<DateTime<Utc>>,
pub pid_file: PathBuf,
pub metadata_file: PathBuf,
pub log_file: PathBuf,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct ManagedProcessMetadata {
service: String,
instance_id: String,
#[serde(default)]
command: Option<String>,
}
fn metadata_file_path(pid_file: &PathBuf) -> PathBuf {
PathBuf::from(format!("{}.meta.json", pid_file.display()))
}
fn log_file_path(pid_file: &PathBuf) -> PathBuf {
PathBuf::from(format!("{}.log", pid_file.display()))
}
const FALLBACK_EXEC_PATHS: &[&str] = &[
"/run/current-system/sw/bin",
"/run/current-system/sw/sbin",
"/nix/var/nix/profiles/default/bin",
"/nix/var/nix/profiles/default/sbin",
"/usr/local/bin",
"/usr/local/sbin",
"/usr/bin",
"/usr/sbin",
"/bin",
"/sbin",
];
fn append_search_paths(paths: &mut Vec<PathBuf>, seen: &mut HashSet<PathBuf>, value: &str) {
for entry in env::split_paths(value) {
if entry.as_os_str().is_empty() || !seen.insert(entry.clone()) {
continue;
}
paths.push(entry);
}
}
fn default_runtime_path(spec_env: &HashMap<String, String>) -> String {
let mut paths = Vec::new();
let mut seen = HashSet::new();
if let Some(path) = spec_env.get("PATH") {
append_search_paths(&mut paths, &mut seen, path);
}
if let Ok(path) = env::var("PATH") {
append_search_paths(&mut paths, &mut seen, &path);
}
for fallback in FALLBACK_EXEC_PATHS {
let path = PathBuf::from(fallback);
if seen.insert(path.clone()) {
paths.push(path);
}
}
let mut rendered = String::new();
for (index, path) in paths.iter().enumerate() {
if index > 0 {
rendered.push(':');
}
rendered.push_str(&path.to_string_lossy());
}
rendered
}
fn is_executable_file(path: &Path) -> bool {
fs::metadata(path)
.map(|metadata| metadata.is_file() && (metadata.permissions().mode() & 0o111 != 0))
.unwrap_or(false)
}
fn resolve_command_path(command: &str, runtime_path: &str) -> PathBuf {
if command.contains('/') {
return PathBuf::from(command);
}
for dir in env::split_paths(runtime_path) {
let candidate = dir.join(command);
if is_executable_file(&candidate) {
return candidate;
}
}
PathBuf::from(command)
}
impl ManagedProcess {
pub fn new(service: String, instance_id: String, spec: ProcessSpec, pid_dir: &PathBuf) -> Self {
let pid_file = pid_dir.join(format!("{}-{}.pid", service, instance_id));
let metadata_file = metadata_file_path(&pid_file);
let log_file = log_file_path(&pid_file);
Self {
service,
instance_id,
spec,
child: None,
started_at: None,
pid_file,
metadata_file,
log_file,
}
}
pub async fn start(&mut self) -> Result<()> {
if self.is_running().await? {
info!(
service = %self.service,
instance_id = %self.instance_id,
"process already running"
);
return Ok(());
}
info!(
service = %self.service,
instance_id = %self.instance_id,
command = %self.spec.command,
"starting process"
);
let runtime_path = default_runtime_path(&self.spec.env);
let resolved_command = resolve_command_path(&self.spec.command, &runtime_path);
let mut cmd = Command::new(&resolved_command);
cmd.args(&self.spec.args);
if let Some(ref wd) = self.spec.working_dir {
cmd.current_dir(wd);
}
cmd.env("PATH", &runtime_path);
for (k, v) in &self.spec.env {
if k == "PATH" {
continue;
}
cmd.env(k, v);
}
if let Some(parent) = self.pid_file.parent() {
fs::create_dir_all(parent).ok();
}
let log_file = OpenOptions::new()
.create(true)
.append(true)
.open(&self.log_file)
.with_context(|| format!("failed to open process log {:?}", self.log_file))?;
let stderr_log = log_file
.try_clone()
.with_context(|| format!("failed to clone process log {:?}", self.log_file))?;
cmd.stdout(Stdio::from(log_file))
.stderr(Stdio::from(stderr_log));
let child = cmd.spawn().with_context(|| {
format!(
"failed to spawn process for {}/{} using {} (PATH={})",
self.service,
self.instance_id,
resolved_command.display(),
runtime_path
)
})?;
let pid = child.id().context("failed to get child PID")?;
fs::write(&self.pid_file, pid.to_string())
.with_context(|| format!("failed to write PID file {:?}", self.pid_file))?;
let metadata = ManagedProcessMetadata {
service: self.service.clone(),
instance_id: self.instance_id.clone(),
command: Some(self.spec.command.clone()),
};
fs::write(&self.metadata_file, serde_json::to_vec(&metadata)?).with_context(|| {
format!("failed to write process metadata {:?}", self.metadata_file)
})?;
self.child = Some(child);
self.started_at = Some(Utc::now());
info!(
service = %self.service,
instance_id = %self.instance_id,
pid = pid,
log_file = %self.log_file.display(),
"process started"
);
Ok(())
}
pub async fn stop(&mut self) -> Result<()> {
if !self.is_running().await? {
info!(
service = %self.service,
instance_id = %self.instance_id,
"process not running"
);
return Ok(());
}
info!(
service = %self.service,
instance_id = %self.instance_id,
"stopping process"
);
if let Some(mut child) = self.child.take() {
child.kill().await.ok();
child.wait().await.ok();
} else {
// PIDファイルからPIDを読み取って停止
if let Ok(pid_str) = fs::read_to_string(&self.pid_file) {
if let Ok(pid) = pid_str.trim().parse::<u32>() {
Command::new("kill")
.arg(pid.to_string())
.output()
.await
.ok();
for _ in 0..10 {
let still_running = Command::new("kill")
.arg("-0")
.arg(pid.to_string())
.output()
.await
.map(|output| output.status.success())
.unwrap_or(false);
if !still_running {
break;
}
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
}
let still_running = Command::new("kill")
.arg("-0")
.arg(pid.to_string())
.output()
.await
.map(|output| output.status.success())
.unwrap_or(false);
if still_running {
Command::new("kill")
.arg("-9")
.arg(pid.to_string())
.output()
.await
.ok();
}
}
}
}
// PIDファイルを削除
fs::remove_file(&self.pid_file).ok();
fs::remove_file(&self.metadata_file).ok();
self.started_at = None;
info!(
service = %self.service,
instance_id = %self.instance_id,
"process stopped"
);
Ok(())
}
pub async fn is_running(&mut self) -> Result<bool> {
if let Some(child) = &mut self.child {
if let Some(_status) = child.try_wait().with_context(|| {
format!(
"failed to check child status for {}/{}",
self.service, self.instance_id
)
})? {
self.child = None;
fs::remove_file(&self.pid_file).ok();
return Ok(false);
}
return Ok(true);
}
// PIDファイルが存在するかチェック
if !self.pid_file.exists() {
return Ok(false);
}
// PIDファイルからPIDを読み取り
let pid_str = fs::read_to_string(&self.pid_file)
.with_context(|| format!("failed to read PID file {:?}", self.pid_file))?;
let pid = pid_str
.trim()
.parse::<u32>()
.with_context(|| format!("invalid PID in file {:?}", self.pid_file))?;
// プロセスが存在するかチェック(簡易実装)
let output = Command::new("kill")
.arg("-0")
.arg(pid.to_string())
.output()
.await
.with_context(|| format!("failed to check process {}", pid))?;
if !output.status.success() {
return Ok(false);
}
// PID再利用対策: /proc からコマンドラインを確認
let cmdline_path = format!("/proc/{}/cmdline", pid);
if let Ok(cmdline) = fs::read_to_string(&cmdline_path) {
let cmdline = cmdline.replace('\0', " ");
if !cmdline.contains(&self.spec.command) {
return Ok(false);
}
}
if self.started_at.is_none() {
self.started_at = Some(Utc::now());
}
Ok(true)
}
pub async fn restart(&mut self) -> Result<()> {
self.stop().await?;
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
self.start().await?;
Ok(())
}
}
pub struct ProcessManager {
processes: HashMap<String, ManagedProcess>,
pid_dir: PathBuf,
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
use std::os::unix::fs::PermissionsExt;
#[test]
fn test_render_container_process_spec_with_host_network() {
let spec = render_container_process_spec(
"api",
"api-node01",
8080,
&ContainerSpec {
image: "ghcr.io/example/api:latest".to_string(),
runtime: Some("podman".to_string()),
command: vec!["/bin/api".to_string()],
args: vec!["serve".to_string()],
env: HashMap::from([("RUST_LOG".to_string(), "info".to_string())]),
ports: vec![],
volumes: vec![],
network_mode: Some("host".to_string()),
pull_policy: Some("if-not-present".to_string()),
working_dir: Some("/srv/api".to_string()),
},
);
assert_eq!(spec.command, "podman");
assert!(spec.args.contains(&"--replace".to_string()));
assert!(spec.args.contains(&"--network".to_string()));
assert!(spec.args.contains(&"host".to_string()));
assert!(spec.args.contains(&"--pull".to_string()));
assert!(spec.args.contains(&"missing".to_string()));
assert!(spec
.args
.contains(&"ghcr.io/example/api:latest".to_string()));
}
#[test]
fn test_render_container_process_spec_publishes_instance_port() {
let spec = render_container_process_spec(
"web",
"web/node01",
18080,
&ContainerSpec {
image: "nginx:latest".to_string(),
runtime: Some("docker".to_string()),
command: vec![],
args: vec![],
env: HashMap::new(),
ports: vec![],
volumes: vec![],
network_mode: None,
pull_policy: None,
working_dir: None,
},
);
assert_eq!(spec.command, "docker");
assert!(spec.args.contains(&"--publish".to_string()));
assert!(spec.args.contains(&"18080:18080".to_string()));
}
#[test]
fn test_default_runtime_path_includes_fallback_system_profile() {
let runtime_path = default_runtime_path(&HashMap::new());
assert!(runtime_path.contains("/run/current-system/sw/bin"));
}
#[test]
fn test_resolve_command_path_uses_spec_path_before_fallbacks() {
let temp = env::temp_dir().join(format!(
"node-agent-process-test-{}-{}",
std::process::id(),
chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default()
));
let _ = fs::remove_dir_all(&temp);
let bin_dir = temp.join("bin");
fs::create_dir_all(&bin_dir).unwrap();
let executable = bin_dir.join("demo-command");
fs::write(&executable, "#!/bin/sh\nexit 0\n").unwrap();
let mut permissions = fs::metadata(&executable).unwrap().permissions();
permissions.set_mode(0o755);
fs::set_permissions(&executable, permissions).unwrap();
let mut env = HashMap::new();
env.insert("PATH".to_string(), bin_dir.to_string_lossy().to_string());
let runtime_path = default_runtime_path(&env);
let resolved = resolve_command_path("demo-command", &runtime_path);
assert_eq!(resolved, executable);
let _ = fs::remove_dir_all(&temp);
}
}
impl ProcessManager {
pub fn new(pid_dir: PathBuf) -> Self {
Self {
processes: HashMap::new(),
pid_dir,
}
}
pub fn add(&mut self, service: String, instance_id: String, spec: ProcessSpec) {
let key = format!("{}/{}", service, instance_id);
let process = ManagedProcess::new(service, instance_id, spec, &self.pid_dir);
self.processes.insert(key, process);
}
pub fn list_instances(&self) -> Vec<(String, String)> {
self.processes
.keys()
.filter_map(|key| {
let mut parts = key.splitn(2, '/');
let service = parts.next()?;
let instance_id = parts.next()?;
Some((service.to_string(), instance_id.to_string()))
})
.collect()
}
pub fn remove(&mut self, service: &str, instance_id: &str) -> Option<ManagedProcess> {
let key = format!("{}/{}", service, instance_id);
self.processes.remove(&key)
}
pub async fn stop_and_remove(&mut self, service: &str, instance_id: &str) -> Result<()> {
if let Some(mut process) = self.remove(service, instance_id) {
process.stop().await?;
}
Ok(())
}
pub fn get_mut(&mut self, service: &str, instance_id: &str) -> Option<&mut ManagedProcess> {
let key = format!("{}/{}", service, instance_id);
self.processes.get_mut(&key)
}
pub async fn is_running(&mut self, service: &str, instance_id: &str) -> Result<bool> {
match self.get_mut(service, instance_id) {
Some(process) => process.is_running().await,
None => Ok(false),
}
}
pub fn started_at(&self, service: &str, instance_id: &str) -> Option<DateTime<Utc>> {
let key = format!("{}/{}", service, instance_id);
self.processes
.get(&key)
.and_then(|process| process.started_at.as_ref().cloned())
}
pub async fn stop_unmanaged(&mut self, desired_keys: &HashSet<String>) -> Result<()> {
let entries = match fs::read_dir(&self.pid_dir) {
Ok(entries) => entries,
Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(()),
Err(error) => {
return Err(error)
.with_context(|| format!("failed to read {}", self.pid_dir.display()))
}
};
for entry in entries {
let entry = entry?;
let path = entry.path();
let Some(name) = path.file_name().and_then(|value| value.to_str()) else {
continue;
};
if !name.ends_with(".pid.meta.json") {
continue;
}
let metadata: ManagedProcessMetadata = match fs::read(&path)
.ok()
.and_then(|bytes| serde_json::from_slice(&bytes).ok())
{
Some(metadata) => metadata,
None => {
warn!(path = %path.display(), "failed to decode process metadata");
continue;
}
};
let key = format!("{}/{}", metadata.service, metadata.instance_id);
if desired_keys.contains(&key) {
continue;
}
let pid_file = PathBuf::from(path.to_string_lossy().trim_end_matches(".meta.json"));
let log_file = log_file_path(&pid_file);
let mut process = ManagedProcess {
service: metadata.service,
instance_id: metadata.instance_id,
spec: ProcessSpec {
command: metadata.command.unwrap_or_default(),
args: Vec::new(),
working_dir: None,
env: Default::default(),
},
child: None,
started_at: None,
pid_file,
metadata_file: path.clone(),
log_file,
};
process.stop().await?;
info!(
service = %process.service,
instance_id = %process.instance_id,
"stopped stale unmanaged process from pid-dir"
);
}
Ok(())
}
pub async fn reconcile(&mut self) -> Result<()> {
for (_, process) in self.processes.iter_mut() {
match process.is_running().await {
Ok(true) => {
// プロセスは実行中、何もしない
}
Ok(false) => {
// プロセスが停止しているので起動
warn!(
service = %process.service,
instance_id = %process.instance_id,
"process is not running, restarting"
);
if let Err(e) = process.start().await {
warn!(
service = %process.service,
instance_id = %process.instance_id,
error = ?e,
"failed to restart process"
);
}
}
Err(e) => {
warn!(
service = %process.service,
instance_id = %process.instance_id,
error = ?e,
"failed to check process status"
);
}
}
}
Ok(())
}
}