photoncloud-monorepo/plasmavmc/crates/plasmavmc-server/src/main.rs

441 lines
15 KiB
Rust

//! PlasmaVMC control plane server binary
use clap::Parser;
use iam_service_auth::AuthService;
use metrics_exporter_prometheus::PrometheusBuilder;
use plasmavmc_api::proto::image_service_server::ImageServiceServer;
use plasmavmc_api::proto::node_service_client::NodeServiceClient;
use plasmavmc_api::proto::node_service_server::NodeServiceServer;
use plasmavmc_api::proto::vm_service_server::VmServiceServer;
use plasmavmc_api::proto::volume_service_server::VolumeServiceServer;
use plasmavmc_api::proto::{
HeartbeatNodeRequest, HypervisorType as ProtoHypervisorType, NodeCapacity,
NodeState as ProtoNodeState, VolumeDriverKind as ProtoVolumeDriverKind,
};
use plasmavmc_hypervisor::HypervisorRegistry;
use plasmavmc_kvm::KvmBackend;
use plasmavmc_server::config::{AgentRuntimeConfig, ServerConfig};
use plasmavmc_server::watcher::{StateSynchronizer, StateWatcher, WatcherConfig};
use plasmavmc_server::VmServiceImpl;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use std::{collections::HashMap, fs};
use tonic::transport::{Certificate, Endpoint, Identity, Server, ServerTlsConfig};
use tonic::{Request, Status};
use tonic_health::server::health_reporter;
use tracing_subscriber::EnvFilter;
/// PlasmaVMC control plane server
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
/// Configuration file path
#[arg(short, long, default_value = "plasmavmc.toml")]
config: PathBuf,
/// Address to listen on (overrides config)
#[arg(short, long)]
addr: Option<String>,
/// Log level (overrides config)
#[arg(short, long)]
log_level: Option<String>,
/// Metrics port for Prometheus scraping
#[arg(long, default_value = "9102")]
metrics_port: u16,
}
fn normalize_endpoint(endpoint: &str) -> String {
if endpoint.starts_with("http://") || endpoint.starts_with("https://") {
endpoint.to_string()
} else {
format!("http://{endpoint}")
}
}
fn available_memory_mib() -> u64 {
let Ok(meminfo) = fs::read_to_string("/proc/meminfo") else {
return 0;
};
meminfo
.lines()
.find_map(|line| line.strip_prefix("MemTotal:"))
.and_then(|rest| rest.split_whitespace().next())
.and_then(|value| value.parse::<u64>().ok())
.map(|kib| kib / 1024)
.unwrap_or(0)
}
async fn start_agent_heartbeat(
local_addr: SocketAddr,
supported_volume_drivers: Vec<i32>,
supported_storage_classes: Vec<String>,
shared_live_migration: bool,
agent_config: &AgentRuntimeConfig,
) {
let Some(control_plane_addr) = agent_config
.control_plane_addr
.as_ref()
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty())
else {
return;
};
let Some(node_id) = agent_config
.node_id
.as_ref()
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty())
else {
return;
};
let endpoint = normalize_endpoint(&control_plane_addr);
let advertise_endpoint = agent_config
.advertise_endpoint
.as_ref()
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty())
.unwrap_or_else(|| local_addr.to_string());
let node_name = agent_config
.node_name
.as_ref()
.cloned()
.filter(|value| !value.trim().is_empty())
.unwrap_or_else(|| node_id.clone());
let heartbeat_secs = agent_config.heartbeat_interval_secs.max(1);
tokio::spawn(async move {
let mut ticker = tokio::time::interval(Duration::from_secs(heartbeat_secs));
loop {
ticker.tick().await;
let channel = match Endpoint::from_shared(endpoint.clone()) {
Ok(endpoint) => match endpoint.connect().await {
Ok(channel) => channel,
Err(error) => {
tracing::warn!(%error, "Failed to connect to PlasmaVMC control plane for heartbeat");
continue;
}
},
Err(error) => {
tracing::warn!(%error, "Invalid PlasmaVMC control plane endpoint for heartbeat");
continue;
}
};
let mut client = NodeServiceClient::new(channel);
let mut labels = HashMap::new();
labels.insert("plasmavmc_endpoint".to_string(), advertise_endpoint.clone());
let request = HeartbeatNodeRequest {
node_id: node_id.clone(),
name: node_name.clone(),
state: ProtoNodeState::Ready as i32,
capacity: Some(NodeCapacity {
vcpus: std::thread::available_parallelism()
.map(|parallelism| parallelism.get() as u32)
.unwrap_or(1),
memory_mib: available_memory_mib(),
storage_gib: 0,
}),
allocatable: Some(NodeCapacity {
vcpus: std::thread::available_parallelism()
.map(|parallelism| parallelism.get() as u32)
.unwrap_or(1),
memory_mib: available_memory_mib(),
storage_gib: 0,
}),
hypervisors: vec![ProtoHypervisorType::Kvm as i32],
labels,
agent_version: env!("CARGO_PKG_VERSION").to_string(),
supported_volume_drivers: supported_volume_drivers.clone(),
supported_storage_classes: supported_storage_classes.clone(),
shared_live_migration,
};
if let Err(error) = client.heartbeat_node(request).await {
tracing::warn!(%error, "Failed to heartbeat PlasmaVMC node");
}
}
});
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Args::parse();
// Load configuration from file or use defaults
let mut config = if args.config.exists() {
let contents = tokio::fs::read_to_string(&args.config).await?;
toml::from_str(&contents)?
} else {
tracing::info!(
"Config file not found: {}, using defaults",
args.config.display()
);
ServerConfig::default()
};
// Apply command line overrides
if let Some(addr_str) = args.addr {
config.addr = addr_str.parse()?;
}
if let Some(log_level) = args.log_level {
config.log_level = log_level;
}
// Initialize tracing
tracing_subscriber::fmt()
.with_env_filter(
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(&config.log_level)),
)
.init();
tracing::info!("Starting PlasmaVMC server on {}", config.addr);
// Initialize Prometheus metrics exporter
let metrics_addr = format!("0.0.0.0:{}", args.metrics_port);
let builder = PrometheusBuilder::new();
builder
.with_http_listener(metrics_addr.parse::<std::net::SocketAddr>()?)
.install()
.expect("Failed to install Prometheus metrics exporter");
tracing::info!(
"Prometheus metrics available at http://{}/metrics",
metrics_addr
);
// Create hypervisor registry and register backends
let registry = Arc::new(HypervisorRegistry::new());
// Register KVM backend (always available)
let kvm_backend = Arc::new(KvmBackend::new(
config
.kvm
.qemu_path
.clone()
.unwrap_or_else(|| PathBuf::from("/usr/bin/qemu-system-x86_64")),
config
.kvm
.runtime_dir
.clone()
.unwrap_or_else(|| PathBuf::from("/run/libvirt/plasmavmc")),
));
registry.register(kvm_backend);
// FireCracker stays outside the supported public PlasmaVMC surface for now.
let has_kernel = config.firecracker.kernel_path.is_some()
|| std::env::var_os("PLASMAVMC_FIRECRACKER_KERNEL_PATH").is_some();
let has_rootfs = config.firecracker.rootfs_path.is_some()
|| std::env::var_os("PLASMAVMC_FIRECRACKER_ROOTFS_PATH").is_some();
if has_kernel || has_rootfs {
tracing::warn!(
"FireCracker backend inputs were provided, but the supported PlasmaVMC public backend contract is KVM-only; ignoring FireCracker configuration"
);
} else {
tracing::debug!("FireCracker backend remains outside the supported public surface");
}
tracing::info!("Registered hypervisors: {:?}", registry.available());
// Initialize IAM authentication service
tracing::info!(
"Connecting to IAM server at {}",
config.auth.iam_server_addr
);
let auth_service = AuthService::new(&config.auth.iam_server_addr)
.await
.map_err(|e| format!("Failed to connect to IAM server: {}", e))?;
let auth_service = Arc::new(auth_service);
// gRPC interceptors are synchronous, so bridge into the current Tokio runtime
// from a blocking section instead of creating a nested runtime that would
// later be dropped from async context during shutdown.
let auth_handle = tokio::runtime::Handle::current();
let make_interceptor = |auth: Arc<AuthService>| {
let handle = auth_handle.clone();
move |mut req: Request<()>| -> Result<Request<()>, Status> {
let auth = auth.clone();
tokio::task::block_in_place(|| {
handle.block_on(async move {
let tenant_context = auth.authenticate_request(&req).await?;
req.extensions_mut().insert(tenant_context);
Ok(req)
})
})
}
};
// Create services
let vm_service = Arc::new(
VmServiceImpl::new(
registry,
auth_service.clone(),
config.auth.iam_server_addr.clone(),
&config,
)
.await?,
);
// Optional: start state watcher for multi-instance HA sync
if config.watcher.enabled {
let watcher_config = WatcherConfig {
poll_interval: Duration::from_millis(config.watcher.poll_interval_ms.max(100)),
buffer_size: 256,
};
let (watcher, rx) = StateWatcher::new(vm_service.store(), watcher_config);
let synchronizer = StateSynchronizer::new(vm_service.clone());
tokio::spawn(async move {
if let Err(e) = watcher.start().await {
tracing::error!(error = %e, "State watcher failed to start");
}
});
tokio::spawn(async move {
synchronizer.run(rx).await;
});
tracing::info!("State watcher enabled");
}
// Optional: start health monitor to refresh VM status periodically
if let Some(secs) = config
.health
.vm_monitor_interval_secs
.filter(|secs| *secs > 0)
{
if secs > 0 {
vm_service
.clone()
.start_health_monitor(Duration::from_secs(secs));
}
}
// Optional: start node health monitor to detect stale heartbeats
if let Some(interval_secs) = config
.health
.node_monitor_interval_secs
.filter(|secs| *secs > 0)
{
if interval_secs > 0 {
vm_service.clone().start_node_health_monitor(
Duration::from_secs(interval_secs),
Duration::from_secs(config.health.node_heartbeat_timeout_secs.max(1)),
);
}
}
// Setup health service
let (mut health_reporter, health_service) = health_reporter();
health_reporter
.set_serving::<VmServiceServer<VmServiceImpl>>()
.await;
health_reporter
.set_serving::<ImageServiceServer<VmServiceImpl>>()
.await;
health_reporter
.set_serving::<VolumeServiceServer<VmServiceImpl>>()
.await;
health_reporter
.set_serving::<NodeServiceServer<VmServiceImpl>>()
.await;
// Parse address
let addr: SocketAddr = config.addr;
let heartbeat_volume_drivers = vm_service
.supported_volume_drivers()
.into_iter()
.map(|driver| match driver {
plasmavmc_types::VolumeDriverKind::Managed => ProtoVolumeDriverKind::Managed as i32,
plasmavmc_types::VolumeDriverKind::CephRbd => ProtoVolumeDriverKind::CephRbd as i32,
})
.collect();
let heartbeat_storage_classes = vm_service.supported_storage_classes();
let shared_live_migration = vm_service.shared_live_migration();
start_agent_heartbeat(
addr,
heartbeat_volume_drivers,
heartbeat_storage_classes,
shared_live_migration,
&config.agent,
)
.await;
tracing::info!("PlasmaVMC gRPC server listening on {}", addr);
// Configure TLS if enabled
let mut server = Server::builder();
if let Some(tls_config) = &config.tls {
tracing::info!("TLS enabled, loading certificates...");
let cert = tokio::fs::read(&tls_config.cert_file).await?;
let key = tokio::fs::read(&tls_config.key_file).await?;
let server_identity = Identity::from_pem(cert, key);
let tls = if tls_config.require_client_cert {
tracing::info!("mTLS enabled");
let ca_cert = tokio::fs::read(
tls_config
.ca_file
.as_ref()
.ok_or("ca_file required for mTLS")?,
)
.await?;
let ca = Certificate::from_pem(ca_cert);
ServerTlsConfig::new()
.identity(server_identity)
.client_ca_root(ca)
} else {
ServerTlsConfig::new().identity(server_identity)
};
server = server.tls_config(tls)?;
}
// gRPC server (clone Arc for gRPC service)
let grpc_vm_service = Arc::clone(&vm_service);
let grpc_server = server
.add_service(health_service)
.add_service(tonic::codegen::InterceptedService::new(
VmServiceServer::from_arc(grpc_vm_service),
make_interceptor(auth_service.clone()),
))
.add_service(tonic::codegen::InterceptedService::new(
ImageServiceServer::from_arc(Arc::clone(&vm_service)),
make_interceptor(auth_service.clone()),
))
.add_service(tonic::codegen::InterceptedService::new(
VolumeServiceServer::from_arc(Arc::clone(&vm_service)),
make_interceptor(auth_service.clone()),
))
.add_service(NodeServiceServer::from_arc(Arc::clone(&vm_service)))
.serve(addr);
// HTTP REST API server
let http_addr = config.http_addr;
let rest_state = plasmavmc_server::rest::RestApiState {
vm_service: vm_service,
auth_service: auth_service.clone(),
};
let rest_app = plasmavmc_server::rest::build_router(rest_state);
let http_listener = tokio::net::TcpListener::bind(&http_addr).await?;
tracing::info!("PlasmaVMC HTTP REST API server starting on {}", http_addr);
let http_server = async move {
axum::serve(http_listener, rest_app)
.await
.map_err(|e| format!("HTTP server error: {}", e))
};
// Run both servers concurrently
tokio::select! {
result = grpc_server => {
result?;
}
result = http_server => {
result?;
}
}
Ok(())
}