photoncloud-monorepo/plasmavmc/crates/plasmavmc-server/src/storage.rs
centra 3eeb303dcb feat: Batch commit for T039.S3 deployment
Includes all pending changes needed for nixos-anywhere:
- fiberlb: L7 policy, rule, certificate types
- deployer: New service for cluster management
- nix-nos: Generic network modules
- Various service updates and fixes

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-13 04:34:51 +09:00

580 lines
16 KiB
Rust

//! Storage abstraction for VM persistence
use async_trait::async_trait;
use plasmavmc_types::{VmHandle, VirtualMachine};
use std::path::PathBuf;
use thiserror::Error;
/// Storage backend type
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StorageBackend {
ChainFire,
FlareDB,
File,
}
impl StorageBackend {
pub fn from_env() -> Self {
match std::env::var("PLASMAVMC_STORAGE_BACKEND")
.as_deref()
.unwrap_or("chainfire")
{
"flaredb" => Self::FlareDB,
"file" => Self::File,
_ => Self::ChainFire,
}
}
}
/// Storage error
#[derive(Debug, Error)]
pub enum StorageError {
#[error("Serialization error: {0}")]
Serialization(#[from] serde_json::Error),
#[error("ChainFire error: {0}")]
ChainFire(#[from] chainfire_client::ClientError),
#[error("FlareDB error: {0}")]
FlareDB(String),
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("Lock contention: {0}")]
LockContention(String),
#[error("Not found: {0}")]
NotFound(String),
#[error("Storage unavailable")]
Unavailable,
}
/// Result type for storage operations
pub type StorageResult<T> = Result<T, StorageError>;
/// Storage trait for VM persistence
#[async_trait]
pub trait VmStore: Send + Sync {
/// Save a VM
async fn save_vm(&self, vm: &VirtualMachine) -> StorageResult<()>;
/// Load a VM by ID
async fn load_vm(
&self,
org_id: &str,
project_id: &str,
vm_id: &str,
) -> StorageResult<Option<VirtualMachine>>;
/// Delete a VM
async fn delete_vm(
&self,
org_id: &str,
project_id: &str,
vm_id: &str,
) -> StorageResult<()>;
/// List all VMs for a tenant
async fn list_vms(
&self,
org_id: &str,
project_id: &str,
) -> StorageResult<Vec<VirtualMachine>>;
/// Save a VM handle
async fn save_handle(
&self,
org_id: &str,
project_id: &str,
vm_id: &str,
handle: &VmHandle,
) -> StorageResult<()>;
/// Load a VM handle
async fn load_handle(
&self,
org_id: &str,
project_id: &str,
vm_id: &str,
) -> StorageResult<Option<VmHandle>>;
/// Delete a VM handle
async fn delete_handle(
&self,
org_id: &str,
project_id: &str,
vm_id: &str,
) -> StorageResult<()>;
}
/// Build key for VM metadata
fn vm_key(org_id: &str, project_id: &str, vm_id: &str) -> String {
format!("/plasmavmc/vms/{}/{}/{}", org_id, project_id, vm_id)
}
/// Build key for VM handle
fn handle_key(org_id: &str, project_id: &str, vm_id: &str) -> String {
format!("/plasmavmc/handles/{}/{}/{}", org_id, project_id, vm_id)
}
/// Build prefix for tenant VM listing
fn vm_prefix(org_id: &str, project_id: &str) -> String {
format!("/plasmavmc/vms/{}/{}/", org_id, project_id)
}
/// ChainFire-backed storage
pub struct ChainFireStore {
client: tokio::sync::Mutex<chainfire_client::Client>,
}
impl ChainFireStore {
/// Create a new ChainFire store
pub async fn new(endpoint: Option<String>) -> StorageResult<Self> {
let endpoint = endpoint.unwrap_or_else(|| {
std::env::var("PLASMAVMC_CHAINFIRE_ENDPOINT")
.unwrap_or_else(|_| "http://127.0.0.1:50051".to_string())
});
let client = chainfire_client::Client::connect(&endpoint)
.await
.map_err(StorageError::ChainFire)?;
Ok(Self {
client: tokio::sync::Mutex::new(client),
})
}
}
#[async_trait]
impl VmStore for ChainFireStore {
async fn save_vm(&self, vm: &VirtualMachine) -> StorageResult<()> {
let key = vm_key(&vm.org_id, &vm.project_id, &vm.id.to_string());
let value = serde_json::to_vec(vm)?;
let mut client = self.client.lock().await;
client.put(key.as_bytes(), value).await?;
Ok(())
}
async fn load_vm(
&self,
org_id: &str,
project_id: &str,
vm_id: &str,
) -> StorageResult<Option<VirtualMachine>> {
let key = vm_key(org_id, project_id, vm_id);
let mut client = self.client.lock().await;
match client.get(key.as_bytes()).await? {
Some(data) => {
let vm: VirtualMachine = serde_json::from_slice(&data)?;
Ok(Some(vm))
}
None => Ok(None),
}
}
async fn delete_vm(
&self,
org_id: &str,
project_id: &str,
vm_id: &str,
) -> StorageResult<()> {
let key = vm_key(org_id, project_id, vm_id);
let mut client = self.client.lock().await;
client.delete(key.as_bytes()).await?;
Ok(())
}
async fn list_vms(
&self,
org_id: &str,
project_id: &str,
) -> StorageResult<Vec<VirtualMachine>> {
let prefix = vm_prefix(org_id, project_id);
let mut client = self.client.lock().await;
let kvs = client.get_prefix(prefix.as_bytes()).await?;
let mut vms = Vec::new();
for (_, value) in kvs {
if let Ok(vm) = serde_json::from_slice::<VirtualMachine>(&value) {
vms.push(vm);
}
}
Ok(vms)
}
async fn save_handle(
&self,
org_id: &str,
project_id: &str,
vm_id: &str,
handle: &VmHandle,
) -> StorageResult<()> {
let key = handle_key(org_id, project_id, vm_id);
let value = serde_json::to_vec(handle)?;
let mut client = self.client.lock().await;
client.put(key.as_bytes(), value).await?;
Ok(())
}
async fn load_handle(
&self,
org_id: &str,
project_id: &str,
vm_id: &str,
) -> StorageResult<Option<VmHandle>> {
let key = handle_key(org_id, project_id, vm_id);
let mut client = self.client.lock().await;
match client.get(key.as_bytes()).await? {
Some(data) => {
let handle: VmHandle = serde_json::from_slice(&data)?;
Ok(Some(handle))
}
None => Ok(None),
}
}
async fn delete_handle(
&self,
org_id: &str,
project_id: &str,
vm_id: &str,
) -> StorageResult<()> {
let key = handle_key(org_id, project_id, vm_id);
let mut client = self.client.lock().await;
client.delete(key.as_bytes()).await?;
Ok(())
}
}
/// FlareDB-backed storage
pub struct FlareDBStore {
client: tokio::sync::Mutex<flaredb_client::RdbClient>,
}
impl FlareDBStore {
/// Create a new FlareDB store
pub async fn new(endpoint: Option<String>) -> StorageResult<Self> {
let endpoint = endpoint.unwrap_or_else(|| {
std::env::var("PLASMAVMC_FLAREDB_ENDPOINT")
.unwrap_or_else(|_| "127.0.0.1:2379".to_string())
});
let client = flaredb_client::RdbClient::connect_with_pd_namespace(
endpoint.clone(),
endpoint.clone(),
"plasmavmc",
)
.await
.map_err(|e| StorageError::FlareDB(format!("Failed to connect to FlareDB: {}", e)))?;
Ok(Self {
client: tokio::sync::Mutex::new(client),
})
}
}
#[async_trait]
impl VmStore for FlareDBStore {
async fn save_vm(&self, vm: &VirtualMachine) -> StorageResult<()> {
let key = vm_key(&vm.org_id, &vm.project_id, &vm.id.to_string());
let value = serde_json::to_vec(vm)?;
let mut client = self.client.lock().await;
client
.raw_put(key.as_bytes().to_vec(), value)
.await
.map_err(|e| StorageError::FlareDB(format!("FlareDB put failed: {}", e)))?;
Ok(())
}
async fn load_vm(
&self,
org_id: &str,
project_id: &str,
vm_id: &str,
) -> StorageResult<Option<VirtualMachine>> {
let key = vm_key(org_id, project_id, vm_id);
let mut client = self.client.lock().await;
match client
.raw_get(key.as_bytes().to_vec())
.await
.map_err(|e| StorageError::FlareDB(format!("FlareDB get failed: {}", e)))?
{
Some(data) => {
let vm: VirtualMachine = serde_json::from_slice(&data)?;
Ok(Some(vm))
}
None => Ok(None),
}
}
async fn delete_vm(
&self,
org_id: &str,
project_id: &str,
vm_id: &str,
) -> StorageResult<()> {
let key = vm_key(org_id, project_id, vm_id);
let mut client = self.client.lock().await;
client
.raw_delete(key.as_bytes().to_vec())
.await
.map_err(|e| StorageError::FlareDB(format!("FlareDB delete failed: {}", e)))?;
Ok(())
}
async fn list_vms(
&self,
org_id: &str,
project_id: &str,
) -> StorageResult<Vec<VirtualMachine>> {
let prefix = vm_prefix(org_id, project_id);
let mut client = self.client.lock().await;
// Calculate end_key by incrementing the last byte of prefix
let mut end_key = prefix.as_bytes().to_vec();
if let Some(last) = end_key.last_mut() {
if *last == 0xff {
// If last byte is 0xff, append a 0x00
end_key.push(0x00);
} else {
*last += 1;
}
} else {
// Empty prefix - scan everything
end_key.push(0xff);
}
let mut vms = Vec::new();
let mut start_key = prefix.as_bytes().to_vec();
// Pagination loop to get all results
loop {
let (_keys, values, next) = client
.raw_scan(start_key.clone(), end_key.clone(), 1000)
.await
.map_err(|e| StorageError::FlareDB(format!("FlareDB scan failed: {}", e)))?;
// Deserialize each value
for value in values {
if let Ok(vm) = serde_json::from_slice::<VirtualMachine>(&value) {
vms.push(vm);
}
}
// Check if there are more results
if let Some(next_key) = next {
start_key = next_key;
} else {
break;
}
}
Ok(vms)
}
async fn save_handle(
&self,
org_id: &str,
project_id: &str,
vm_id: &str,
handle: &VmHandle,
) -> StorageResult<()> {
let key = handle_key(org_id, project_id, vm_id);
let value = serde_json::to_vec(handle)?;
let mut client = self.client.lock().await;
client
.raw_put(key.as_bytes().to_vec(), value)
.await
.map_err(|e| StorageError::FlareDB(format!("FlareDB put failed: {}", e)))?;
Ok(())
}
async fn load_handle(
&self,
org_id: &str,
project_id: &str,
vm_id: &str,
) -> StorageResult<Option<VmHandle>> {
let key = handle_key(org_id, project_id, vm_id);
let mut client = self.client.lock().await;
match client
.raw_get(key.as_bytes().to_vec())
.await
.map_err(|e| StorageError::FlareDB(format!("FlareDB get failed: {}", e)))?
{
Some(data) => {
let handle: VmHandle = serde_json::from_slice(&data)?;
Ok(Some(handle))
}
None => Ok(None),
}
}
async fn delete_handle(
&self,
org_id: &str,
project_id: &str,
vm_id: &str,
) -> StorageResult<()> {
let key = handle_key(org_id, project_id, vm_id);
let mut client = self.client.lock().await;
client
.raw_delete(key.as_bytes().to_vec())
.await
.map_err(|e| StorageError::FlareDB(format!("FlareDB delete failed: {}", e)))?;
Ok(())
}
}
/// File-backed storage with atomic writes
pub struct FileStore {
state_path: PathBuf,
}
#[derive(serde::Serialize, serde::Deserialize)]
struct PersistedState {
vms: Vec<VirtualMachine>,
handles: Vec<VmHandle>,
}
impl FileStore {
/// Create a new file store
pub fn new(path: Option<PathBuf>) -> Self {
let state_path = path.unwrap_or_else(|| {
std::env::var("PLASMAVMC_STATE_PATH")
.map(PathBuf::from)
.unwrap_or_else(|_| PathBuf::from("/var/run/plasmavmc/state.json"))
});
Self { state_path }
}
/// Load state from file
fn load_state(&self) -> StorageResult<PersistedState> {
let data = std::fs::read(&self.state_path)?;
let state: PersistedState = serde_json::from_slice(&data)?;
Ok(state)
}
/// Save state to file atomically
fn save_state(&self, state: &PersistedState) -> StorageResult<()> {
let serialized = serde_json::to_vec_pretty(state)?;
if let Some(parent) = self.state_path.parent() {
std::fs::create_dir_all(parent)?;
}
// Atomic write: write to temp file, then rename
let temp_path = self.state_path.with_extension("json.tmp");
std::fs::write(&temp_path, serialized)?;
std::fs::rename(&temp_path, &self.state_path)?;
Ok(())
}
}
#[async_trait]
impl VmStore for FileStore {
async fn save_vm(&self, vm: &VirtualMachine) -> StorageResult<()> {
let mut state = self.load_state().unwrap_or_else(|_| PersistedState {
vms: Vec::new(),
handles: Vec::new(),
});
// Remove existing VM if present
state.vms.retain(|v| v.id.to_string() != vm.id.to_string());
state.vms.push(vm.clone());
self.save_state(&state)?;
Ok(())
}
async fn load_vm(
&self,
org_id: &str,
project_id: &str,
vm_id: &str,
) -> StorageResult<Option<VirtualMachine>> {
let state = self.load_state().unwrap_or_else(|_| PersistedState {
vms: Vec::new(),
handles: Vec::new(),
});
Ok(state
.vms
.into_iter()
.find(|v| {
v.org_id == org_id && v.project_id == project_id && v.id.to_string() == vm_id
}))
}
async fn delete_vm(
&self,
org_id: &str,
project_id: &str,
vm_id: &str,
) -> StorageResult<()> {
let mut state = self.load_state().unwrap_or_else(|_| PersistedState {
vms: Vec::new(),
handles: Vec::new(),
});
state.vms.retain(|v| {
!(v.org_id == org_id && v.project_id == project_id && v.id.to_string() == vm_id)
});
state.handles.retain(|h| h.vm_id.to_string() != vm_id);
self.save_state(&state)?;
Ok(())
}
async fn list_vms(
&self,
org_id: &str,
project_id: &str,
) -> StorageResult<Vec<VirtualMachine>> {
let state = self.load_state().unwrap_or_else(|_| PersistedState {
vms: Vec::new(),
handles: Vec::new(),
});
Ok(state
.vms
.into_iter()
.filter(|v| v.org_id == org_id && v.project_id == project_id)
.collect())
}
async fn save_handle(
&self,
org_id: &str,
project_id: &str,
vm_id: &str,
handle: &VmHandle,
) -> StorageResult<()> {
let mut state = self.load_state().unwrap_or_else(|_| PersistedState {
vms: Vec::new(),
handles: Vec::new(),
});
state.handles.retain(|h| h.vm_id.to_string() != vm_id);
state.handles.push(handle.clone());
self.save_state(&state)?;
Ok(())
}
async fn load_handle(
&self,
_org_id: &str,
_project_id: &str,
vm_id: &str,
) -> StorageResult<Option<VmHandle>> {
let state = self.load_state().unwrap_or_else(|_| PersistedState {
vms: Vec::new(),
handles: Vec::new(),
});
Ok(state
.handles
.into_iter()
.find(|h| h.vm_id.to_string() == vm_id))
}
async fn delete_handle(
&self,
_org_id: &str,
_project_id: &str,
vm_id: &str,
) -> StorageResult<()> {
let mut state = self.load_state().unwrap_or_else(|_| PersistedState {
vms: Vec::new(),
handles: Vec::new(),
});
state.handles.retain(|h| h.vm_id.to_string() != vm_id);
self.save_state(&state)?;
Ok(())
}
}