fix cluster resiliency gaps across VM watch, runtime health, and FlareDB routing

This commit is contained in:
centra 2026-03-31 14:09:45 +09:00
parent 1698009062
commit 9dfe86f92a
Signed by: centra
GPG key ID: 0C09689D20B25ACA
10 changed files with 1229 additions and 464 deletions

View file

@ -3,24 +3,13 @@
use crate::error::{ClientError, Result}; use crate::error::{ClientError, Result};
use crate::watch::WatchHandle; use crate::watch::WatchHandle;
use chainfire_proto::proto::{ use chainfire_proto::proto::{
cluster_client::ClusterClient, cluster_client::ClusterClient, compare, kv_client::KvClient, request_op, response_op,
compare, watch_client::WatchClient, Compare, DeleteRangeRequest, MemberAddRequest, PutRequest,
kv_client::KvClient, RangeRequest, RequestOp, StatusRequest, TxnRequest,
request_op,
response_op,
watch_client::WatchClient,
Compare,
DeleteRangeRequest,
MemberAddRequest,
PutRequest,
RangeRequest,
RequestOp,
StatusRequest,
TxnRequest,
}; };
use std::time::Duration; use std::time::Duration;
use tonic::Code;
use tonic::transport::Channel; use tonic::transport::Channel;
use tonic::Code;
use tracing::{debug, warn}; use tracing::{debug, warn};
/// Chainfire client /// Chainfire client
@ -64,7 +53,9 @@ impl Client {
} }
} }
Err(last_error.unwrap_or_else(|| ClientError::Connection("no Chainfire endpoints configured".to_string()))) Err(last_error.unwrap_or_else(|| {
ClientError::Connection("no Chainfire endpoints configured".to_string())
}))
} }
async fn with_kv_retry<T, F, Fut>(&mut self, mut op: F) -> Result<T> async fn with_kv_retry<T, F, Fut>(&mut self, mut op: F) -> Result<T>
@ -88,14 +79,17 @@ impl Client {
"retrying Chainfire KV RPC on alternate endpoint" "retrying Chainfire KV RPC on alternate endpoint"
); );
last_status = Some(status); last_status = Some(status);
self.recover_after_status(last_status.as_ref().unwrap()).await?; self.recover_after_status(last_status.as_ref().unwrap())
.await?;
tokio::time::sleep(retry_delay(attempt)).await; tokio::time::sleep(retry_delay(attempt)).await;
} }
Err(status) => return Err(status.into()), Err(status) => return Err(status.into()),
} }
} }
Err(last_status.unwrap_or_else(|| tonic::Status::unavailable("Chainfire KV retry exhausted")).into()) Err(last_status
.unwrap_or_else(|| tonic::Status::unavailable("Chainfire KV retry exhausted"))
.into())
} }
async fn with_cluster_retry<T, F, Fut>(&mut self, mut op: F) -> Result<T> async fn with_cluster_retry<T, F, Fut>(&mut self, mut op: F) -> Result<T>
@ -119,14 +113,17 @@ impl Client {
"retrying Chainfire cluster RPC on alternate endpoint" "retrying Chainfire cluster RPC on alternate endpoint"
); );
last_status = Some(status); last_status = Some(status);
self.recover_after_status(last_status.as_ref().unwrap()).await?; self.recover_after_status(last_status.as_ref().unwrap())
.await?;
tokio::time::sleep(retry_delay(attempt)).await; tokio::time::sleep(retry_delay(attempt)).await;
} }
Err(status) => return Err(status.into()), Err(status) => return Err(status.into()),
} }
} }
Err(last_status.unwrap_or_else(|| tonic::Status::unavailable("Chainfire cluster retry exhausted")).into()) Err(last_status
.unwrap_or_else(|| tonic::Status::unavailable("Chainfire cluster retry exhausted"))
.into())
} }
async fn recover_after_status(&mut self, status: &tonic::Status) -> Result<()> { async fn recover_after_status(&mut self, status: &tonic::Status) -> Result<()> {
@ -150,7 +147,9 @@ impl Client {
let endpoint = self let endpoint = self
.endpoints .endpoints
.get(index) .get(index)
.ok_or_else(|| ClientError::Connection(format!("invalid Chainfire endpoint index {index}")))? .ok_or_else(|| {
ClientError::Connection(format!("invalid Chainfire endpoint index {index}"))
})?
.clone(); .clone();
let (channel, kv, cluster) = connect_endpoint(&endpoint).await?; let (channel, kv, cluster) = connect_endpoint(&endpoint).await?;
self.current_endpoint = index; self.current_endpoint = index;
@ -182,7 +181,11 @@ impl Client {
match cluster.status(StatusRequest {}).await { match cluster.status(StatusRequest {}).await {
Ok(response) => { Ok(response) => {
let status = response.into_inner(); let status = response.into_inner();
let member_id = status.header.as_ref().map(|header| header.member_id).unwrap_or(0); let member_id = status
.header
.as_ref()
.map(|header| header.member_id)
.unwrap_or(0);
if status.leader != 0 && status.leader == member_id { if status.leader != 0 && status.leader == member_id {
return Ok(Some(index)); return Ok(Some(index));
} }
@ -232,10 +235,7 @@ impl Client {
/// Get a value by key /// Get a value by key
pub async fn get(&mut self, key: impl AsRef<[u8]>) -> Result<Option<Vec<u8>>> { pub async fn get(&mut self, key: impl AsRef<[u8]>) -> Result<Option<Vec<u8>>> {
Ok(self Ok(self.get_with_revision(key).await?.map(|(value, _)| value))
.get_with_revision(key)
.await?
.map(|(value, _)| value))
} }
/// Get a value by key along with its current revision /// Get a value by key along with its current revision
@ -263,13 +263,14 @@ impl Client {
}) })
.await?; .await?;
Ok(resp.kvs.into_iter().next().map(|kv| (kv.value, kv.mod_revision as u64))) Ok(resp
.kvs
.into_iter()
.next()
.map(|kv| (kv.value, kv.mod_revision as u64)))
} }
/// Put a key-value pair only if the key's mod_revision matches. /// Put a key-value pair only if the key's mod_revision matches.
///
/// This is a best-effort compare-and-set. The server may not return
/// a reliable success flag, so callers should treat this as "attempted".
pub async fn put_if_revision( pub async fn put_if_revision(
&mut self, &mut self,
key: impl AsRef<[u8]>, key: impl AsRef<[u8]>,
@ -277,6 +278,7 @@ impl Client {
expected_mod_revision: u64, expected_mod_revision: u64,
) -> Result<()> { ) -> Result<()> {
let key_bytes = key.as_ref().to_vec(); let key_bytes = key.as_ref().to_vec();
let key_display = String::from_utf8_lossy(&key_bytes).to_string();
let compare = Compare { let compare = Compare {
result: compare::CompareResult::Equal as i32, result: compare::CompareResult::Equal as i32,
target: compare::CompareTarget::Mod as i32, target: compare::CompareTarget::Mod as i32,
@ -288,21 +290,35 @@ impl Client {
let put_op = RequestOp { let put_op = RequestOp {
request: Some(request_op::Request::RequestPut(PutRequest { request: Some(request_op::Request::RequestPut(PutRequest {
key: key_bytes, key: key_bytes.clone(),
value: value.as_ref().to_vec(), value: value.as_ref().to_vec(),
lease: 0, lease: 0,
prev_kv: false, prev_kv: false,
})), })),
}; };
self.with_kv_retry(|mut kv| { let read_on_fail = RequestOp {
request: Some(request_op::Request::RequestRange(RangeRequest {
key: key_bytes.clone(),
range_end: vec![],
limit: 1,
revision: 0,
keys_only: false,
count_only: false,
serializable: false,
})),
};
let resp = self
.with_kv_retry(|mut kv| {
let compare = compare.clone(); let compare = compare.clone();
let put_op = put_op.clone(); let put_op = put_op.clone();
let read_on_fail = read_on_fail.clone();
async move { async move {
kv.txn(TxnRequest { kv.txn(TxnRequest {
compare: vec![compare], compare: vec![compare],
success: vec![put_op], success: vec![put_op],
failure: vec![], failure: vec![read_on_fail],
}) })
.await .await
.map(|resp| resp.into_inner()) .map(|resp| resp.into_inner())
@ -310,7 +326,27 @@ impl Client {
}) })
.await?; .await?;
Ok(()) if resp.succeeded {
return Ok(());
}
let current_revision = resp
.responses
.into_iter()
.filter_map(|op| match op.response {
Some(response_op::Response::ResponseRange(range)) => range
.kvs
.into_iter()
.next()
.map(|kv| kv.mod_revision as u64),
_ => None,
})
.next()
.unwrap_or(0);
Err(ClientError::Conflict(format!(
"mod_revision mismatch for key {key_display}: expected {expected_mod_revision}, current {current_revision}"
)))
} }
/// Get a value as string /// Get a value as string
@ -341,7 +377,10 @@ impl Client {
} }
/// Get all keys with a prefix /// Get all keys with a prefix
pub async fn get_prefix(&mut self, prefix: impl AsRef<[u8]>) -> Result<Vec<(Vec<u8>, Vec<u8>)>> { pub async fn get_prefix(
&mut self,
prefix: impl AsRef<[u8]>,
) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
let prefix = prefix.as_ref(); let prefix = prefix.as_ref();
let range_end = prefix_end(prefix); let range_end = prefix_end(prefix);
@ -404,8 +443,7 @@ impl Client {
.map(|kv| (kv.key, kv.value, kv.mod_revision as u64)) .map(|kv| (kv.key, kv.value, kv.mod_revision as u64))
.collect(); .collect();
let next_key = if more { let next_key = if more {
kvs.last() kvs.last().map(|(k, _, _)| {
.map(|(k, _, _)| {
let mut nk = k.clone(); let mut nk = k.clone();
nk.push(0); nk.push(0);
nk nk
@ -451,8 +489,7 @@ impl Client {
.map(|kv| (kv.key, kv.value, kv.mod_revision as u64)) .map(|kv| (kv.key, kv.value, kv.mod_revision as u64))
.collect(); .collect();
let next_key = if more { let next_key = if more {
kvs.last() kvs.last().map(|(k, _, _)| {
.map(|(k, _, _)| {
let mut nk = k.clone(); let mut nk = k.clone();
nk.push(0); nk.push(0);
nk nk
@ -519,11 +556,7 @@ impl Client {
.await?; .await?;
if resp.succeeded { if resp.succeeded {
let new_version = resp let new_version = resp.header.as_ref().map(|h| h.revision as u64).unwrap_or(0);
.header
.as_ref()
.map(|h| h.revision as u64)
.unwrap_or(0);
return Ok(CasOutcome { return Ok(CasOutcome {
success: true, success: true,
current_version: new_version, current_version: new_version,
@ -536,11 +569,9 @@ impl Client {
.responses .responses
.into_iter() .into_iter()
.filter_map(|op| match op.response { .filter_map(|op| match op.response {
Some(response_op::Response::ResponseRange(r)) => r Some(response_op::Response::ResponseRange(r)) => {
.kvs r.kvs.into_iter().next().map(|kv| kv.mod_revision as u64)
.into_iter() }
.next()
.map(|kv| kv.mod_revision as u64),
_ => None, _ => None,
}) })
.next() .next()
@ -594,7 +625,12 @@ impl Client {
/// ///
/// # Returns /// # Returns
/// The node ID of the added member /// The node ID of the added member
pub async fn member_add(&mut self, node_id: u64, peer_url: impl AsRef<str>, is_learner: bool) -> Result<u64> { pub async fn member_add(
&mut self,
node_id: u64,
peer_url: impl AsRef<str>,
is_learner: bool,
) -> Result<u64> {
let peer_url = peer_url.as_ref().to_string(); let peer_url = peer_url.as_ref().to_string();
let resp = self let resp = self
.with_cluster_retry(|mut cluster| { .with_cluster_retry(|mut cluster| {
@ -660,7 +696,9 @@ fn parse_endpoints(input: &str) -> Result<Vec<String>> {
.collect(); .collect();
if endpoints.is_empty() { if endpoints.is_empty() {
return Err(ClientError::Connection("no Chainfire endpoints configured".to_string())); return Err(ClientError::Connection(
"no Chainfire endpoints configured".to_string(),
));
} }
Ok(endpoints) Ok(endpoints)
@ -674,7 +712,9 @@ fn normalize_endpoint(endpoint: &str) -> String {
} }
} }
async fn connect_endpoint(endpoint: &str) -> Result<(Channel, KvClient<Channel>, ClusterClient<Channel>)> { async fn connect_endpoint(
endpoint: &str,
) -> Result<(Channel, KvClient<Channel>, ClusterClient<Channel>)> {
let channel = Channel::from_shared(endpoint.to_string()) let channel = Channel::from_shared(endpoint.to_string())
.map_err(|e| ClientError::Connection(e.to_string()))? .map_err(|e| ClientError::Connection(e.to_string()))?
.connect() .connect()
@ -693,7 +733,11 @@ fn retry_delay(attempt: usize) -> Duration {
fn is_retryable_status(status: &tonic::Status) -> bool { fn is_retryable_status(status: &tonic::Status) -> bool {
matches!( matches!(
status.code(), status.code(),
Code::Unavailable | Code::DeadlineExceeded | Code::Internal | Code::Aborted | Code::FailedPrecondition Code::Unavailable
| Code::DeadlineExceeded
| Code::Internal
| Code::Aborted
| Code::FailedPrecondition
) || retryable_message(status.message()) ) || retryable_message(status.message())
} }
@ -734,8 +778,14 @@ mod tests {
#[test] #[test]
fn normalize_endpoint_adds_http_scheme() { fn normalize_endpoint_adds_http_scheme() {
assert_eq!(normalize_endpoint("127.0.0.1:2379"), "http://127.0.0.1:2379"); assert_eq!(
assert_eq!(normalize_endpoint("http://127.0.0.1:2379"), "http://127.0.0.1:2379"); normalize_endpoint("127.0.0.1:2379"),
"http://127.0.0.1:2379"
);
assert_eq!(
normalize_endpoint("http://127.0.0.1:2379"),
"http://127.0.0.1:2379"
);
} }
#[test] #[test]

View file

@ -31,4 +31,8 @@ pub enum ClientError {
/// Internal error /// Internal error
#[error("Internal error: {0}")] #[error("Internal error: {0}")]
Internal(String), Internal(String),
/// Compare-and-set conflict
#[error("Conflict: {0}")]
Conflict(String),
} }

View file

@ -5,7 +5,7 @@ use std::process::Stdio;
use std::time::Duration; use std::time::Duration;
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use chainfire_client::Client; use chainfire_client::{Client, ClientError};
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use deployer_types::{ContainerSpec, HealthCheckSpec, ProcessSpec, ServiceInstanceSpec}; use deployer_types::{ContainerSpec, HealthCheckSpec, ProcessSpec, ServiceInstanceSpec};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -254,7 +254,11 @@ impl Agent {
} }
} }
fn render_container_spec(&self, spec: &ContainerSpec, inst: &ServiceInstanceSpec) -> ContainerSpec { fn render_container_spec(
&self,
spec: &ContainerSpec,
inst: &ServiceInstanceSpec,
) -> ContainerSpec {
let mut rendered = spec.clone(); let mut rendered = spec.clone();
rendered.image = self.render_template_value(&rendered.image, inst); rendered.image = self.render_template_value(&rendered.image, inst);
rendered.command = rendered rendered.command = rendered
@ -283,10 +287,7 @@ impl Agent {
rendered rendered
} }
fn desired_process_spec( fn desired_process_spec(&self, inst: &ServiceInstanceSpec) -> Option<ProcessSpec> {
&self,
inst: &ServiceInstanceSpec,
) -> Option<ProcessSpec> {
match (&inst.container, &inst.process) { match (&inst.container, &inst.process) {
(Some(container), maybe_process) => { (Some(container), maybe_process) => {
if maybe_process.is_some() { if maybe_process.is_some() {
@ -309,6 +310,93 @@ impl Agent {
} }
} }
fn apply_instance_health_fields(
inst_value: &mut Value,
started_at: &DateTime<Utc>,
heartbeat_at: &DateTime<Utc>,
health_status: &str,
) {
let Some(obj) = inst_value.as_object_mut() else {
return;
};
let observed_at = Value::String(started_at.to_rfc3339());
match obj.get_mut("observed_at") {
Some(slot) if slot.is_null() => *slot = observed_at,
Some(_) => {}
None => {
obj.insert("observed_at".to_string(), observed_at);
}
}
obj.insert(
"state".to_string(),
Value::String(health_status.to_string()),
);
obj.insert(
"last_heartbeat".to_string(),
Value::String(heartbeat_at.to_rfc3339()),
);
}
async fn persist_instance_health(
&self,
client: &mut Client,
key: &[u8],
inst: &ServiceInstanceSpec,
mut inst_value: Value,
mut mod_revision: u64,
started_at: &DateTime<Utc>,
health_status: &str,
) -> Result<()> {
for attempt in 0..3 {
let heartbeat_at = Utc::now();
Self::apply_instance_health_fields(
&mut inst_value,
started_at,
&heartbeat_at,
health_status,
);
let updated = serde_json::to_vec(&inst_value)?;
match client.put_if_revision(key, &updated, mod_revision).await {
Ok(()) => return Ok(()),
Err(ClientError::Conflict(error)) if attempt < 2 => {
warn!(
service = %inst.service,
instance_id = %inst.instance_id,
mod_revision,
attempt = attempt + 1,
error = %error,
"instance health update raced with another writer; retrying with fresh revision"
);
let Some((latest_bytes, latest_revision)) =
client.get_with_revision(key).await?
else {
warn!(
service = %inst.service,
instance_id = %inst.instance_id,
"instance disappeared while retrying health update"
);
return Ok(());
};
inst_value = serde_json::from_slice(&latest_bytes).with_context(|| {
format!(
"failed to parse refreshed instance JSON for {}",
inst.instance_id
)
})?;
mod_revision = latest_revision;
}
Err(error) => return Err(error.into()),
}
}
Ok(())
}
/// ローカルファイル (/etc/photoncloud/instances.json) から ServiceInstance 定義を読み、 /// ローカルファイル (/etc/photoncloud/instances.json) から ServiceInstance 定義を読み、
/// Chainfire 上の `photoncloud/clusters/{cluster_id}/instances/{service}/{instance_id}` に upsert する。 /// Chainfire 上の `photoncloud/clusters/{cluster_id}/instances/{service}/{instance_id}` に upsert する。
async fn sync_local_instances(&self, client: &mut Client) -> Result<()> { async fn sync_local_instances(&self, client: &mut Client) -> Result<()> {
@ -348,7 +436,9 @@ impl Agent {
{ {
for preserve_key in ["state", "last_heartbeat", "observed_at"] { for preserve_key in ["state", "last_heartbeat", "observed_at"] {
if let Some(value) = existing_obj.get(preserve_key) { if let Some(value) = existing_obj.get(preserve_key) {
desired_obj.entry(preserve_key.to_string()).or_insert(value.clone()); desired_obj
.entry(preserve_key.to_string())
.or_insert(value.clone());
} }
} }
} }
@ -398,7 +488,6 @@ impl Agent {
// Desired Stateに基づいてプロセスを管理 // Desired Stateに基づいてプロセスを管理
for (service, instance_id, proc_spec) in desired_instances { for (service, instance_id, proc_spec) in desired_instances {
if let Some(process) = self.process_manager.get_mut(&service, &instance_id) { if let Some(process) = self.process_manager.get_mut(&service, &instance_id) {
if process.spec != proc_spec { if process.spec != proc_spec {
process.spec = proc_spec; process.spec = proc_spec;
@ -526,7 +615,7 @@ impl Agent {
let mut seen = HashSet::new(); let mut seen = HashSet::new();
for (key, value, mod_revision) in kvs { for (key, value, mod_revision) in kvs {
let mut inst_value: Value = match serde_json::from_slice(&value) { let inst_value: Value = match serde_json::from_slice(&value) {
Ok(v) => v, Ok(v) => v,
Err(e) => { Err(e) => {
warn!(error = %e, "failed to parse instance json"); warn!(error = %e, "failed to parse instance json");
@ -582,22 +671,18 @@ impl Agent {
"healthy".to_string() // デフォルトはhealthy "healthy".to_string() // デフォルトはhealthy
}; };
// Chainfire上のServiceInstanceに状態を反映 if let Err(e) = self
if let Some(obj) = inst_value.as_object_mut() { .persist_instance_health(
obj.entry("observed_at".to_string()) client,
.or_insert_with(|| Value::String(started_at.to_rfc3339())); &key,
obj.insert( &inst,
"state".to_string(), inst_value,
Value::String(health_status.clone()), mod_revision,
); &started_at,
obj.insert( &health_status,
"last_heartbeat".to_string(), )
Value::String(now.to_rfc3339()), .await
); {
}
let updated = serde_json::to_vec(&inst_value)?;
if let Err(e) = client.put_if_revision(&key, &updated, mod_revision).await {
warn!( warn!(
service = %inst.service, service = %inst.service,
instance_id = %inst.instance_id, instance_id = %inst.instance_id,
@ -615,8 +700,7 @@ impl Agent {
); );
} }
self.next_health_checks self.next_health_checks.retain(|key, _| seen.contains(key));
.retain(|key, _| seen.contains(key));
Ok(()) Ok(())
} }
@ -732,7 +816,10 @@ mod tests {
assert_eq!(rendered.args[2], "18080"); assert_eq!(rendered.args[2], "18080");
assert_eq!(rendered.args[4], "127.0.0.2"); assert_eq!(rendered.args[4], "127.0.0.2");
assert_eq!(rendered.working_dir.as_deref(), Some("/srv/api")); assert_eq!(rendered.working_dir.as_deref(), Some("/srv/api"));
assert_eq!(rendered.env.get("INSTANCE").map(String::as_str), Some("api-node01")); assert_eq!(
rendered.env.get("INSTANCE").map(String::as_str),
Some("api-node01")
);
} }
#[test] #[test]
@ -779,4 +866,59 @@ mod tests {
.insert(key, Utc::now() - chrono::Duration::seconds(1)); .insert(key, Utc::now() - chrono::Duration::seconds(1));
assert!(agent.health_check_due(&instance, &health_check)); assert!(agent.health_check_due(&instance, &health_check));
} }
#[test]
fn test_apply_instance_health_fields_replaces_nulls_and_preserves_observed_at() {
let started_at = DateTime::parse_from_rfc3339("2026-03-31T03:00:00Z")
.unwrap()
.with_timezone(&Utc);
let heartbeat_at = DateTime::parse_from_rfc3339("2026-03-31T03:00:05Z")
.unwrap()
.with_timezone(&Utc);
let original_observed_at = "2026-03-31T02:59:59Z";
let mut null_observed = serde_json::json!({
"observed_at": null,
"state": null,
"last_heartbeat": null
});
Agent::apply_instance_health_fields(
&mut null_observed,
&started_at,
&heartbeat_at,
"healthy",
);
assert_eq!(
null_observed.get("observed_at").and_then(Value::as_str),
Some("2026-03-31T03:00:00+00:00")
);
assert_eq!(
null_observed.get("state").and_then(Value::as_str),
Some("healthy")
);
assert_eq!(
null_observed.get("last_heartbeat").and_then(Value::as_str),
Some("2026-03-31T03:00:05+00:00")
);
let mut existing_observed = serde_json::json!({
"observed_at": original_observed_at,
"state": "starting",
"last_heartbeat": null
});
Agent::apply_instance_health_fields(
&mut existing_observed,
&started_at,
&heartbeat_at,
"healthy",
);
assert_eq!(
existing_observed.get("observed_at").and_then(Value::as_str),
Some(original_observed_at)
);
assert_eq!(
existing_observed.get("state").and_then(Value::as_str),
Some("healthy")
);
}
} }

View file

@ -7,10 +7,10 @@ use flaredb_proto::kvrpc::{
RawScanRequest, RawScanRequest,
}; };
use flaredb_proto::pdpb::Store; use flaredb_proto::pdpb::Store;
use serde::Deserialize;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use std::time::{Instant, SystemTime, UNIX_EPOCH}; use std::time::{Instant, SystemTime, UNIX_EPOCH};
use serde::Deserialize;
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tonic::transport::Channel; use tonic::transport::Channel;
@ -61,6 +61,74 @@ struct ChainfireRouteSnapshot {
fetched_at: Instant, fetched_at: Instant,
} }
#[derive(Debug, Clone)]
struct ResolvedRoute {
region: Region,
leader: Store,
candidate_addrs: Vec<String>,
}
fn push_unique_addr(addrs: &mut Vec<String>, addr: &str) {
if !addrs.iter().any(|existing| existing == addr) {
addrs.push(addr.to_string());
}
}
fn resolve_chainfire_route_from_snapshot(
key: &[u8],
snapshot: &ChainfireRouteSnapshot,
) -> Result<ResolvedRoute, tonic::Status> {
let region = snapshot
.regions
.iter()
.find(|region| {
let start_ok = region.start_key.is_empty() || key >= region.start_key.as_slice();
let end_ok = region.end_key.is_empty() || key < region.end_key.as_slice();
start_ok && end_ok
})
.cloned()
.ok_or_else(|| tonic::Status::not_found("region not found"))?;
let mut candidate_addrs = Vec::new();
let mut selected_store = snapshot.stores.get(&region.leader_id).cloned();
if let Some(store) = &selected_store {
push_unique_addr(&mut candidate_addrs, &store.addr);
}
for peer_id in &region.peers {
if let Some(store) = snapshot.stores.get(peer_id) {
if selected_store.is_none() {
selected_store = Some(store.clone());
}
push_unique_addr(&mut candidate_addrs, &store.addr);
}
}
let selected_store = selected_store
.ok_or_else(|| tonic::Status::not_found("region peer store not found"))?;
if candidate_addrs.is_empty() {
return Err(tonic::Status::not_found(
"region has no candidate store addresses",
));
}
Ok(ResolvedRoute {
region: Region {
id: region.id,
start_key: region.start_key,
end_key: region.end_key,
peers: region.peers,
leader_id: region.leader_id,
},
leader: Store {
id: selected_store.id,
addr: selected_store.addr,
},
candidate_addrs,
})
}
impl RdbClient { impl RdbClient {
const ROUTE_RETRY_LIMIT: usize = 12; const ROUTE_RETRY_LIMIT: usize = 12;
const ROUTE_RETRY_BASE_DELAY_MS: u64 = 100; const ROUTE_RETRY_BASE_DELAY_MS: u64 = 100;
@ -116,11 +184,9 @@ impl RdbClient {
.await; .await;
clients = Some(match probe { clients = Some(match probe {
Err(status) if status.code() == tonic::Code::Unimplemented => ( Err(status) if status.code() == tonic::Code::Unimplemented => {
None, (None, None, Some(ChainfireKvClient::new(pd_channel)))
None, }
Some(ChainfireKvClient::new(pd_channel)),
),
_ => ( _ => (
Some(TsoClient::new(pd_channel.clone())), Some(TsoClient::new(pd_channel.clone())),
Some(PdClient::new(pd_channel)), Some(PdClient::new(pd_channel)),
@ -134,13 +200,11 @@ impl RdbClient {
} else if let Some(error) = last_error { } else if let Some(error) = last_error {
return Err(error); return Err(error);
} else { } else {
return Err( return Err(Channel::from_shared("http://127.0.0.1:1".to_string())
Channel::from_shared("http://127.0.0.1:1".to_string())
.unwrap() .unwrap()
.connect() .connect()
.await .await
.expect_err("unreachable fallback endpoint should fail to connect"), .expect_err("unreachable fallback endpoint should fail to connect"));
);
} }
}; };
@ -187,13 +251,11 @@ impl RdbClient {
} else if let Some(error) = last_error { } else if let Some(error) = last_error {
return Err(error); return Err(error);
} else { } else {
return Err( return Err(Channel::from_shared("http://127.0.0.1:1".to_string())
Channel::from_shared("http://127.0.0.1:1".to_string())
.unwrap() .unwrap()
.connect() .connect()
.await .await
.expect_err("unreachable fallback endpoint should fail to connect"), .expect_err("unreachable fallback endpoint should fail to connect"));
);
}; };
let channel = channel.expect("direct connect should produce a channel when selected"); let channel = channel.expect("direct connect should produce a channel when selected");
@ -219,7 +281,13 @@ impl RdbClient {
} }
if let Some(chainfire_kv_client) = &self.chainfire_kv_client { if let Some(chainfire_kv_client) = &self.chainfire_kv_client {
return self.resolve_addr_via_chainfire(key, chainfire_kv_client.clone()).await; let route = self
.resolve_route_via_chainfire(key, chainfire_kv_client.clone(), false)
.await?;
self.region_cache
.update(route.region.clone(), route.leader.clone())
.await;
return Ok(route.leader.addr);
} }
if let Some(pd_client) = &self.pd_client { if let Some(pd_client) = &self.pd_client {
@ -244,7 +312,13 @@ impl RdbClient {
self.invalidate_chainfire_route_cache().await; self.invalidate_chainfire_route_cache().await;
if let Some(chainfire_kv_client) = &self.chainfire_kv_client { if let Some(chainfire_kv_client) = &self.chainfire_kv_client {
return self.resolve_addr_via_chainfire(key, chainfire_kv_client.clone()).await; let route = self
.resolve_route_via_chainfire(key, chainfire_kv_client.clone(), true)
.await?;
self.region_cache
.update(route.region.clone(), route.leader.clone())
.await;
return Ok(route.leader.addr);
} }
if let Some(pd_client) = &self.pd_client { if let Some(pd_client) = &self.pd_client {
@ -310,53 +384,21 @@ impl RdbClient {
Ok(snapshot) Ok(snapshot)
} }
fn resolve_addr_from_chainfire_snapshot(
&self,
key: &[u8],
snapshot: &ChainfireRouteSnapshot,
) -> Result<(Region, Store), tonic::Status> {
let region = snapshot
.regions
.iter()
.find(|region| {
let start_ok = region.start_key.is_empty() || key >= region.start_key.as_slice();
let end_ok = region.end_key.is_empty() || key < region.end_key.as_slice();
start_ok && end_ok
})
.cloned()
.ok_or_else(|| tonic::Status::not_found("region not found"))?;
let leader = snapshot
.stores
.get(&region.leader_id)
.cloned()
.ok_or_else(|| tonic::Status::not_found("leader store not found"))?;
Ok((
Region {
id: region.id,
start_key: region.start_key,
end_key: region.end_key,
peers: region.peers,
leader_id: region.leader_id,
},
Store {
id: leader.id,
addr: leader.addr,
},
))
}
async fn with_routed_addr<T, F, Fut>(&self, key: &[u8], mut op: F) -> Result<T, tonic::Status> async fn with_routed_addr<T, F, Fut>(&self, key: &[u8], mut op: F) -> Result<T, tonic::Status>
where where
F: FnMut(String) -> Fut, F: FnMut(String) -> Fut,
Fut: Future<Output = Result<T, tonic::Status>>, Fut: Future<Output = Result<T, tonic::Status>>,
{ {
let mut addr = self.resolve_addr(key).await?; let mut candidate_addrs = self.resolve_route_candidates(key, false).await?;
let mut candidate_idx = 0usize;
let mut refreshed = false; let mut refreshed = false;
let mut last_status = None; let mut last_status = None;
for attempt in 0..Self::ROUTE_RETRY_LIMIT { for attempt in 0..Self::ROUTE_RETRY_LIMIT {
let addr = candidate_addrs
.get(candidate_idx)
.cloned()
.ok_or_else(|| tonic::Status::internal("routing candidate list exhausted"))?;
match tokio::time::timeout(Self::ROUTED_RPC_TIMEOUT, op(addr.clone())).await { match tokio::time::timeout(Self::ROUTED_RPC_TIMEOUT, op(addr.clone())).await {
Err(_) => { Err(_) => {
Self::evict_channel_from_map(&self.channels, &addr).await; Self::evict_channel_from_map(&self.channels, &addr).await;
@ -366,10 +408,19 @@ impl RdbClient {
Self::ROUTED_RPC_TIMEOUT.as_millis() Self::ROUTED_RPC_TIMEOUT.as_millis()
)); ));
if candidate_idx + 1 < candidate_addrs.len() {
candidate_idx += 1;
last_status = Some(status);
tokio::time::sleep(Self::retry_delay(attempt)).await;
continue;
}
if !refreshed && self.direct_addr.is_none() { if !refreshed && self.direct_addr.is_none() {
refreshed = true; refreshed = true;
if let Ok(fresh_addr) = self.resolve_addr_uncached(key).await { if let Ok(fresh_candidates) = self.resolve_route_candidates(key, true).await
addr = fresh_addr; {
candidate_addrs = fresh_candidates;
candidate_idx = 0;
last_status = Some(status); last_status = Some(status);
tokio::time::sleep(Self::retry_delay(attempt)).await; tokio::time::sleep(Self::retry_delay(attempt)).await;
continue; continue;
@ -391,20 +442,33 @@ impl RdbClient {
.override_store_addr(key, redirect_addr.clone()) .override_store_addr(key, redirect_addr.clone())
.await; .await;
if redirect_addr != addr { if redirect_addr != addr {
addr = redirect_addr; candidate_addrs.retain(|candidate| candidate != &redirect_addr);
candidate_addrs.insert(0, redirect_addr);
candidate_idx = 0;
last_status = Some(status); last_status = Some(status);
tokio::time::sleep(Self::retry_delay(attempt)).await; tokio::time::sleep(Self::retry_delay(attempt)).await;
continue; continue;
} }
} }
if (transport_error || Self::is_retryable_route_error(&status))
&& candidate_idx + 1 < candidate_addrs.len()
{
candidate_idx += 1;
last_status = Some(status);
tokio::time::sleep(Self::retry_delay(attempt)).await;
continue;
}
if !refreshed if !refreshed
&& self.direct_addr.is_none() && self.direct_addr.is_none()
&& Self::is_retryable_route_error(&status) && Self::is_retryable_route_error(&status)
{ {
refreshed = true; refreshed = true;
if let Ok(fresh_addr) = self.resolve_addr_uncached(key).await { if let Ok(fresh_candidates) = self.resolve_route_candidates(key, true).await
addr = fresh_addr; {
candidate_addrs = fresh_candidates;
candidate_idx = 0;
last_status = Some(status); last_status = Some(status);
tokio::time::sleep(Self::retry_delay(attempt)).await; tokio::time::sleep(Self::retry_delay(attempt)).await;
continue; continue;
@ -425,13 +489,54 @@ impl RdbClient {
return Err(status); return Err(status);
} }
Ok(Ok(value)) => return Ok(value), Ok(Ok(value)) => {
if candidate_idx > 0 {
self.region_cache.override_store_addr(key, addr).await;
}
return Ok(value);
}
} }
} }
Err(last_status.unwrap_or_else(|| tonic::Status::internal("routing retry exhausted"))) Err(last_status.unwrap_or_else(|| tonic::Status::internal("routing retry exhausted")))
} }
async fn resolve_route_candidates(
&self,
key: &[u8],
force_refresh: bool,
) -> Result<Vec<String>, tonic::Status> {
if let Some(addr) = &self.direct_addr {
return Ok(vec![addr.clone()]);
}
if !force_refresh {
if let Some(addr) = self.region_cache.get_store_addr(key).await {
return Ok(vec![addr]);
}
} else {
self.region_cache.clear().await;
self.invalidate_chainfire_route_cache().await;
}
if let Some(chainfire_kv_client) = &self.chainfire_kv_client {
let route = self
.resolve_route_via_chainfire(key, chainfire_kv_client.clone(), force_refresh)
.await?;
self.region_cache
.update(route.region.clone(), route.leader.clone())
.await;
return Ok(route.candidate_addrs);
}
let addr = if force_refresh {
self.resolve_addr_uncached(key).await?
} else {
self.resolve_addr(key).await?
};
Ok(vec![addr])
}
fn is_retryable_route_error(status: &tonic::Status) -> bool { fn is_retryable_route_error(status: &tonic::Status) -> bool {
if !matches!( if !matches!(
status.code(), status.code(),
@ -454,9 +559,7 @@ impl RdbClient {
} }
fn retry_delay(attempt: usize) -> Duration { fn retry_delay(attempt: usize) -> Duration {
Duration::from_millis( Duration::from_millis(Self::ROUTE_RETRY_BASE_DELAY_MS.saturating_mul((attempt as u64) + 1))
Self::ROUTE_RETRY_BASE_DELAY_MS.saturating_mul((attempt as u64) + 1),
)
} }
fn is_transport_error(status: &tonic::Status) -> bool { fn is_transport_error(status: &tonic::Status) -> bool {
@ -691,7 +794,11 @@ impl RdbClient {
.into_iter() .into_iter()
.map(|kv| (kv.key, kv.value, kv.version)) .map(|kv| (kv.key, kv.value, kv.version))
.collect(); .collect();
let next = if resp.has_more { Some(resp.next_key) } else { None }; let next = if resp.has_more {
Some(resp.next_key)
} else {
None
};
Ok((entries, next)) Ok((entries, next))
} }
}) })
@ -727,20 +834,25 @@ impl RdbClient {
.await .await
} }
async fn resolve_addr_via_chainfire( async fn resolve_route_via_chainfire(
&self, &self,
key: &[u8], key: &[u8],
kv_client: ChainfireKvClient<Channel>, kv_client: ChainfireKvClient<Channel>,
) -> Result<String, tonic::Status> { force_refresh: bool,
for force_refresh in [false, true] { ) -> Result<ResolvedRoute, tonic::Status> {
if force_refresh {
let snapshot = self let snapshot = self
.chainfire_route_snapshot(kv_client.clone(), force_refresh) .chainfire_route_snapshot(kv_client, true)
.await?; .await?;
if let Ok((region, leader)) = return resolve_chainfire_route_from_snapshot(key, &snapshot);
self.resolve_addr_from_chainfire_snapshot(key, &snapshot) }
{
self.region_cache.update(region, leader.clone()).await; for refresh in [false, true] {
return Ok(leader.addr); let snapshot = self
.chainfire_route_snapshot(kv_client.clone(), refresh)
.await?;
if let Ok(route) = resolve_chainfire_route_from_snapshot(key, &snapshot) {
return Ok(route);
} }
} }
@ -833,7 +945,13 @@ async fn list_chainfire_regions(
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::{RdbClient, normalize_transport_addr, parse_transport_endpoints}; use super::{
normalize_transport_addr, parse_transport_endpoints,
resolve_chainfire_route_from_snapshot, ChainfireRegionInfo, ChainfireRouteSnapshot,
ChainfireStoreInfo, RdbClient,
};
use std::collections::HashMap;
use std::time::Instant;
#[test] #[test]
fn unknown_transport_errors_are_treated_as_retryable_routes() { fn unknown_transport_errors_are_treated_as_retryable_routes() {
@ -864,4 +982,77 @@ mod tests {
"10.0.0.1:2479".to_string() "10.0.0.1:2479".to_string()
); );
} }
#[test]
fn chainfire_routes_try_leader_then_other_peers() {
let snapshot = ChainfireRouteSnapshot {
stores: HashMap::from([
(
1,
ChainfireStoreInfo {
id: 1,
addr: "10.0.0.1:2479".to_string(),
},
),
(
2,
ChainfireStoreInfo {
id: 2,
addr: "10.0.0.2:2479".to_string(),
},
),
(
3,
ChainfireStoreInfo {
id: 3,
addr: "10.0.0.3:2479".to_string(),
},
),
]),
regions: vec![ChainfireRegionInfo {
id: 1,
start_key: Vec::new(),
end_key: Vec::new(),
peers: vec![1, 2, 3],
leader_id: 2,
}],
fetched_at: Instant::now(),
};
let route = resolve_chainfire_route_from_snapshot(b"tenant/key", &snapshot).unwrap();
assert_eq!(route.leader.id, 2);
assert_eq!(
route.candidate_addrs,
vec![
"10.0.0.2:2479".to_string(),
"10.0.0.1:2479".to_string(),
"10.0.0.3:2479".to_string(),
]
);
}
#[test]
fn chainfire_routes_fall_back_when_reported_leader_store_is_missing() {
let snapshot = ChainfireRouteSnapshot {
stores: HashMap::from([(
1,
ChainfireStoreInfo {
id: 1,
addr: "10.0.0.1:2479".to_string(),
},
)]),
regions: vec![ChainfireRegionInfo {
id: 1,
start_key: Vec::new(),
end_key: Vec::new(),
peers: vec![1, 2],
leader_id: 2,
}],
fetched_at: Instant::now(),
};
let route = resolve_chainfire_route_from_snapshot(b"tenant/key", &snapshot).unwrap();
assert_eq!(route.leader.id, 1);
assert_eq!(route.candidate_addrs, vec!["10.0.0.1:2479".to_string()]);
}
} }

View file

@ -1,3 +1,4 @@
use anyhow::Result;
use clap::Parser; use clap::Parser;
use flaredb_proto::kvrpc::kv_cas_server::KvCasServer; use flaredb_proto::kvrpc::kv_cas_server::KvCasServer;
use flaredb_proto::kvrpc::kv_raw_server::KvRawServer; use flaredb_proto::kvrpc::kv_raw_server::KvRawServer;
@ -17,8 +18,7 @@ use tokio::time::{sleep, Duration};
use tonic::transport::{Certificate, Channel, Identity, Server, ServerTlsConfig}; use tonic::transport::{Certificate, Channel, Identity, Server, ServerTlsConfig};
use tonic_health::server::health_reporter; use tonic_health::server::health_reporter;
use tracing::{info, warn}; // Import warn use tracing::{info, warn}; // Import warn
use tracing_subscriber::EnvFilter; use tracing_subscriber::EnvFilter; // Import anyhow
use anyhow::Result; // Import anyhow
mod heartbeat; mod heartbeat;
mod merkle; mod merkle;
@ -278,7 +278,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
peer_addrs.clone(), peer_addrs.clone(),
)); ));
let service = service::KvServiceImpl::new(engine.clone(), namespace_manager.clone(), store.clone()); let service =
service::KvServiceImpl::new(engine.clone(), namespace_manager.clone(), store.clone());
let raft_service = raft_service::RaftServiceImpl::new(store.clone(), server_config.store_id); let raft_service = raft_service::RaftServiceImpl::new(store.clone(), server_config.store_id);
let pd_endpoints = server_config.resolved_pd_endpoints(); let pd_endpoints = server_config.resolved_pd_endpoints();
@ -389,6 +390,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let store_id = server_config.store_id; let store_id = server_config.store_id;
let server_addr_string = server_config.addr.to_string(); let server_addr_string = server_config.addr.to_string();
tokio::spawn(async move { tokio::spawn(async move {
let mut last_reported_leaders: HashMap<u64, u64> = HashMap::new();
let client = Arc::new(Mutex::new( let client = Arc::new(Mutex::new(
ChainfirePdClient::connect_any(&pd_endpoints_for_task) ChainfirePdClient::connect_any(&pd_endpoints_for_task)
.await .await
@ -399,8 +401,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut guard = client.lock().await; let mut guard = client.lock().await;
if let Some(ref mut c) = *guard { if let Some(ref mut c) = *guard {
// Send heartbeat // Send heartbeat
let heartbeat_ok = let heartbeat_ok = match c.heartbeat(store_id, server_addr_string.clone()).await
match c.heartbeat(store_id, server_addr_string.clone()).await { {
Ok(_) => true, Ok(_) => true,
Err(e) => { Err(e) => {
warn!("Heartbeat failed: {}", e); warn!("Heartbeat failed: {}", e);
@ -417,15 +419,38 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Report observed leader status so routing metadata converges // Report observed leader status so routing metadata converges
// even when followers are the first nodes to notice a leadership change. // even when followers are the first nodes to notice a leadership change.
let region_ids = store_clone.list_region_ids().await; let region_ids = store_clone.list_region_ids().await;
let mut observed_regions = HashMap::new();
for region_id in region_ids { for region_id in region_ids {
if let Some(node) = store_clone.get_raft_node(region_id).await { if let Some(node) = store_clone.get_raft_node(region_id).await {
if let Some(observed_leader) = node.leader_id().await { if let Some(observed_leader) = node.leader_id().await {
if let Err(e) = c.report_leader(region_id, observed_leader).await { observed_regions.insert(region_id, observed_leader);
warn!("Report leader failed: {}", e); let previous = last_reported_leaders.get(&region_id).copied();
if previous != Some(observed_leader) {
info!(
region_id,
previous_leader = ?previous,
observed_leader,
"Reporting FlareDB region leader to PD"
);
}
match c.report_leader(region_id, observed_leader).await {
Ok(_) => {
last_reported_leaders.insert(region_id, observed_leader);
}
Err(e) => {
warn!(
region_id,
observed_leader,
error = %e,
"Report leader failed"
);
} }
} }
} }
} }
}
last_reported_leaders
.retain(|region_id, _| observed_regions.contains_key(region_id));
// Refresh regions from PD (from cache, updated via watch) // Refresh regions from PD (from cache, updated via watch)
let regions = c.list_regions().await; let regions = c.list_regions().await;
@ -577,12 +602,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let tls = if tls_config.require_client_cert { let tls = if tls_config.require_client_cert {
info!("mTLS enabled, requiring client certificates"); info!("mTLS enabled, requiring client certificates");
let ca_cert = tokio::fs::read( let ca_cert = tokio::fs::read(tls_config.ca_file.as_ref().ok_or_else(|| {
tls_config anyhow::anyhow!("ca_file required when require_client_cert=true")
.ca_file })?)
.as_ref()
.ok_or_else(|| anyhow::anyhow!("ca_file required when require_client_cert=true"))?,
)
.await .await
.map_err(|e| anyhow::anyhow!("Failed to read CA file: {}", e))?; .map_err(|e| anyhow::anyhow!("Failed to read CA file: {}", e))?;
let ca = Certificate::from_pem(ca_cert); let ca = Certificate::from_pem(ca_cert);
@ -650,6 +672,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
fn init_logging(level: &str) { fn init_logging(level: &str) {
tracing_subscriber::fmt() tracing_subscriber::fmt()
.with_env_filter(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(level))) .with_env_filter(
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(level)),
)
.init(); .init();
} }

View file

@ -2633,6 +2633,91 @@ try_get_volume_json() {
127.0.0.1:${vm_port} plasmavmc.v1.VolumeService/GetVolume 127.0.0.1:${vm_port} plasmavmc.v1.VolumeService/GetVolume
} }
start_plasmavmc_vm_watch() {
local node="$1"
local proto_root="$2"
local token="$3"
local org_id="$4"
local project_id="$5"
local vm_id="$6"
local output_path="$7"
ssh_node_script "${node}" "${proto_root}" "${token}" "${org_id}" "${project_id}" "${vm_id}" "${output_path}" <<'EOS'
set -euo pipefail
proto_root="$1"
token="$2"
org_id="$3"
project_id="$4"
vm_id="$5"
output_path="$6"
rm -f "${output_path}" "${output_path}.pid" "${output_path}.stderr"
nohup timeout 600 grpcurl -plaintext \
-H "authorization: Bearer ${token}" \
-import-path "${proto_root}/plasmavmc" \
-proto "${proto_root}/plasmavmc/plasmavmc.proto" \
-d "$(jq -cn --arg org "${org_id}" --arg project "${project_id}" --arg vm "${vm_id}" '{orgId:$org, projectId:$project, vmId:$vm}')" \
127.0.0.1:50082 plasmavmc.v1.VmService/WatchVm \
>"${output_path}" 2>"${output_path}.stderr" &
echo $! >"${output_path}.pid"
EOS
}
wait_for_plasmavmc_vm_watch_completion() {
local node="$1"
local output_path="$2"
local timeout="${3:-60}"
local deadline=$((SECONDS + timeout))
while true; do
if ssh_node_script "${node}" "${output_path}" <<'EOS'
set -euo pipefail
output_path="$1"
if [[ ! -f "${output_path}.pid" ]]; then
exit 0
fi
pid="$(cat "${output_path}.pid")"
if kill -0 "${pid}" >/dev/null 2>&1; then
exit 1
fi
EOS
then
return 0
fi
if (( SECONDS >= deadline )); then
ssh_node "${node}" "test -f ${output_path}.stderr && cat ${output_path}.stderr || true" >&2 || true
ssh_node "${node}" "test -f ${output_path} && cat ${output_path} || true" >&2 || true
die "timed out waiting for PlasmaVMC watch stream to exit"
fi
sleep 1
done
}
assert_plasmavmc_vm_watch_events() {
local node="$1"
local output_path="$2"
local vm_id="$3"
ssh_node_script "${node}" "${output_path}" "${vm_id}" <<'EOS'
set -euo pipefail
output_path="$1"
vm_id="$2"
[[ -s "${output_path}" ]] || {
echo "PlasmaVMC watch output is empty" >&2
test -f "${output_path}.stderr" && cat "${output_path}.stderr" >&2 || true
exit 1
}
jq -s --arg vm "${vm_id}" '
any(.vmId == $vm and .eventType == "VM_EVENT_TYPE_STATE_CHANGED" and .vm.state == "VM_STATE_RUNNING") and
any(.vmId == $vm and .eventType == "VM_EVENT_TYPE_STATE_CHANGED" and .vm.state == "VM_STATE_STOPPED") and
any(.vmId == $vm and .eventType == "VM_EVENT_TYPE_DELETED")
' "${output_path}" >/dev/null || {
cat "${output_path}" >&2
test -f "${output_path}.stderr" && cat "${output_path}.stderr" >&2 || true
exit 1
}
EOS
}
wait_requested() { wait_requested() {
local nodes local nodes
mapfile -t nodes < <(all_or_requested_nodes "$@") mapfile -t nodes < <(all_or_requested_nodes "$@")
@ -3707,6 +3792,7 @@ validate_vm_storage_flow() {
node04_coronafs_tunnel="$(start_ssh_tunnel node04 25088 "${CORONAFS_API_PORT}")" node04_coronafs_tunnel="$(start_ssh_tunnel node04 25088 "${CORONAFS_API_PORT}")"
node05_coronafs_tunnel="$(start_ssh_tunnel node05 35088 "${CORONAFS_API_PORT}")" node05_coronafs_tunnel="$(start_ssh_tunnel node05 35088 "${CORONAFS_API_PORT}")"
local image_source_path="" local image_source_path=""
local vm_watch_output=""
local node01_proto_root="/var/lib/plasmavmc/test-protos" local node01_proto_root="/var/lib/plasmavmc/test-protos"
local vpc_id="" subnet_id="" port_id="" port_ip="" port_mac="" local vpc_id="" subnet_id="" port_id="" port_ip="" port_mac=""
cleanup_vm_storage_flow() { cleanup_vm_storage_flow() {
@ -3737,6 +3823,9 @@ validate_vm_storage_flow() {
if [[ -n "${image_source_path}" && "${image_source_path}" != /nix/store/* ]]; then if [[ -n "${image_source_path}" && "${image_source_path}" != /nix/store/* ]]; then
ssh_node node01 "rm -f ${image_source_path}" >/dev/null 2>&1 || true ssh_node node01 "rm -f ${image_source_path}" >/dev/null 2>&1 || true
fi fi
if [[ -n "${vm_watch_output}" ]]; then
ssh_node node01 "rm -f ${vm_watch_output} ${vm_watch_output}.pid ${vm_watch_output}.stderr" >/dev/null 2>&1 || true
fi
stop_ssh_tunnel node05 "${node05_coronafs_tunnel}" stop_ssh_tunnel node05 "${node05_coronafs_tunnel}"
stop_ssh_tunnel node04 "${node04_coronafs_tunnel}" stop_ssh_tunnel node04 "${node04_coronafs_tunnel}"
stop_ssh_tunnel node01 "${coronafs_tunnel}" stop_ssh_tunnel node01 "${coronafs_tunnel}"
@ -3993,6 +4082,9 @@ EOS
)" )"
vm_id="$(printf '%s' "${create_response}" | jq -r '.id')" vm_id="$(printf '%s' "${create_response}" | jq -r '.id')"
[[ -n "${vm_id}" && "${vm_id}" != "null" ]] || die "failed to create VM through PlasmaVMC" [[ -n "${vm_id}" && "${vm_id}" != "null" ]] || die "failed to create VM through PlasmaVMC"
vm_watch_output="/tmp/plasmavmc-watch-${vm_id}.json"
start_plasmavmc_vm_watch node01 "${node01_proto_root}" "${token}" "${org_id}" "${project_id}" "${vm_id}" "${vm_watch_output}"
sleep 2
local get_vm_json local get_vm_json
get_vm_json="$(jq -cn --arg org "${org_id}" --arg project "${project_id}" --arg vm "${vm_id}" '{orgId:$org, projectId:$project, vmId:$vm}')" get_vm_json="$(jq -cn --arg org "${org_id}" --arg project "${project_id}" --arg vm "${vm_id}" '{orgId:$org, projectId:$project, vmId:$vm}')"
@ -4420,6 +4512,8 @@ EOS
fi fi
sleep 2 sleep 2
done done
wait_for_plasmavmc_vm_watch_completion node01 "${vm_watch_output}" 60
assert_plasmavmc_vm_watch_events node01 "${vm_watch_output}" "${vm_id}"
wait_for_prismnet_port_detachment "${token}" "${org_id}" "${project_id}" "${subnet_id}" "${port_id}" >/dev/null wait_for_prismnet_port_detachment "${token}" "${org_id}" "${project_id}" "${subnet_id}" "${port_id}" >/dev/null
ssh_node "${node_id}" "bash -lc '[[ ! -d $(printf '%q' "$(vm_runtime_dir_path "${vm_id}")") ]]'" ssh_node "${node_id}" "bash -lc '[[ ! -d $(printf '%q' "$(vm_runtime_dir_path "${vm_id}")") ]]'"

View file

@ -1,32 +1,32 @@
//! PlasmaVMC control plane server binary //! PlasmaVMC control plane server binary
use clap::Parser; use clap::Parser;
use iam_service_auth::AuthService;
use metrics_exporter_prometheus::PrometheusBuilder; use metrics_exporter_prometheus::PrometheusBuilder;
use plasmavmc_api::proto::image_service_server::ImageServiceServer; use plasmavmc_api::proto::image_service_server::ImageServiceServer;
use plasmavmc_api::proto::node_service_server::NodeServiceServer;
use plasmavmc_api::proto::node_service_client::NodeServiceClient; use plasmavmc_api::proto::node_service_client::NodeServiceClient;
use plasmavmc_api::proto::volume_service_server::VolumeServiceServer; use plasmavmc_api::proto::node_service_server::NodeServiceServer;
use plasmavmc_api::proto::vm_service_server::VmServiceServer; use plasmavmc_api::proto::vm_service_server::VmServiceServer;
use plasmavmc_api::proto::volume_service_server::VolumeServiceServer;
use plasmavmc_api::proto::{ use plasmavmc_api::proto::{
HeartbeatNodeRequest, HypervisorType as ProtoHypervisorType, NodeCapacity, HeartbeatNodeRequest, HypervisorType as ProtoHypervisorType, NodeCapacity,
NodeState as ProtoNodeState, VolumeDriverKind as ProtoVolumeDriverKind, NodeState as ProtoNodeState, VolumeDriverKind as ProtoVolumeDriverKind,
}; };
use plasmavmc_firecracker::FireCrackerBackend;
use plasmavmc_hypervisor::HypervisorRegistry; use plasmavmc_hypervisor::HypervisorRegistry;
use plasmavmc_kvm::KvmBackend; use plasmavmc_kvm::KvmBackend;
use plasmavmc_firecracker::FireCrackerBackend;
use iam_service_auth::AuthService;
use plasmavmc_server::config::ServerConfig; use plasmavmc_server::config::ServerConfig;
use plasmavmc_server::VmServiceImpl;
use plasmavmc_server::watcher::{StateSynchronizer, StateWatcher, WatcherConfig}; use plasmavmc_server::watcher::{StateSynchronizer, StateWatcher, WatcherConfig};
use plasmavmc_server::VmServiceImpl;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use tonic::transport::{Certificate, Endpoint, Identity, Server, ServerTlsConfig};
use tonic_health::server::health_reporter;
use tonic::{Request, Status};
use tracing_subscriber::EnvFilter;
use std::time::Duration; use std::time::Duration;
use std::{collections::HashMap, fs}; use std::{collections::HashMap, fs};
use tonic::transport::{Certificate, Endpoint, Identity, Server, ServerTlsConfig};
use tonic::{Request, Status};
use tonic_health::server::health_reporter;
use tracing_subscriber::EnvFilter;
/// PlasmaVMC control plane server /// PlasmaVMC control plane server
#[derive(Parser, Debug)] #[derive(Parser, Debug)]
@ -175,7 +175,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let contents = tokio::fs::read_to_string(&args.config).await?; let contents = tokio::fs::read_to_string(&args.config).await?;
toml::from_str(&contents)? toml::from_str(&contents)?
} else { } else {
tracing::info!("Config file not found: {}, using defaults", args.config.display()); tracing::info!(
"Config file not found: {}, using defaults",
args.config.display()
);
ServerConfig::default() ServerConfig::default()
}; };
@ -246,13 +249,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing::debug!("FireCracker backend not available (missing kernel/rootfs paths)"); tracing::debug!("FireCracker backend not available (missing kernel/rootfs paths)");
} }
tracing::info!( tracing::info!("Registered hypervisors: {:?}", registry.available());
"Registered hypervisors: {:?}",
registry.available()
);
// Initialize IAM authentication service // Initialize IAM authentication service
tracing::info!("Connecting to IAM server at {}", config.auth.iam_server_addr); tracing::info!(
"Connecting to IAM server at {}",
config.auth.iam_server_addr
);
let auth_service = AuthService::new(&config.auth.iam_server_addr) let auth_service = AuthService::new(&config.auth.iam_server_addr)
.await .await
.map_err(|e| format!("Failed to connect to IAM server: {}", e))?; .map_err(|e| format!("Failed to connect to IAM server: {}", e))?;
@ -278,7 +281,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create services // Create services
let vm_service = Arc::new( let vm_service = Arc::new(
VmServiceImpl::new(registry, auth_service.clone(), config.auth.iam_server_addr.clone()) VmServiceImpl::new(
registry,
auth_service.clone(),
config.auth.iam_server_addr.clone(),
)
.await?, .await?,
); );
@ -288,7 +295,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.unwrap_or(false) .unwrap_or(false)
{ {
let config = WatcherConfig::default(); let config = WatcherConfig::default();
let (watcher, rx) = StateWatcher::new(config); let (watcher, rx) = StateWatcher::new(vm_service.store(), config);
let synchronizer = StateSynchronizer::new(vm_service.clone()); let synchronizer = StateSynchronizer::new(vm_service.clone());
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = watcher.start().await { if let Err(e) = watcher.start().await {
@ -307,7 +314,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.and_then(|v| v.parse::<u64>().ok()) .and_then(|v| v.parse::<u64>().ok())
{ {
if secs > 0 { if secs > 0 {
vm_service.clone().start_health_monitor(Duration::from_secs(secs)); vm_service
.clone()
.start_health_monitor(Duration::from_secs(secs));
} }
} }
@ -321,9 +330,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.ok() .ok()
.and_then(|v| v.parse::<u64>().ok()) .and_then(|v| v.parse::<u64>().ok())
.unwrap_or(60); .unwrap_or(60);
vm_service vm_service.clone().start_node_health_monitor(
.clone()
.start_node_health_monitor(
Duration::from_secs(interval_secs), Duration::from_secs(interval_secs),
Duration::from_secs(timeout_secs), Duration::from_secs(timeout_secs),
); );

View file

@ -68,6 +68,9 @@ pub trait VmStore: Send + Sync {
/// List all VMs for a tenant /// List all VMs for a tenant
async fn list_vms(&self, org_id: &str, project_id: &str) -> StorageResult<Vec<VirtualMachine>>; async fn list_vms(&self, org_id: &str, project_id: &str) -> StorageResult<Vec<VirtualMachine>>;
/// List all VMs across every tenant.
async fn list_all_vms(&self) -> StorageResult<Vec<VirtualMachine>>;
/// Save a VM handle /// Save a VM handle
async fn save_handle( async fn save_handle(
&self, &self,
@ -431,6 +434,17 @@ impl VmStore for FlareDBStore {
Ok(vms) Ok(vms)
} }
async fn list_all_vms(&self) -> StorageResult<Vec<VirtualMachine>> {
let mut vms = Vec::new();
for value in self.cas_scan_values("/plasmavmc/vms/").await? {
if let Ok(vm) = serde_json::from_slice::<VirtualMachine>(&value) {
vms.push(vm);
}
}
Ok(vms)
}
async fn save_handle( async fn save_handle(
&self, &self,
org_id: &str, org_id: &str,
@ -580,7 +594,9 @@ impl VmStore for FlareDBStore {
client client
.cas(key.as_bytes().to_vec(), value, expected_version) .cas(key.as_bytes().to_vec(), value, expected_version)
.await .await
.map_err(|e| StorageError::FlareDB(format!("FlareDB CAS volume save failed: {}", e)))? .map_err(|e| {
StorageError::FlareDB(format!("FlareDB CAS volume save failed: {}", e))
})?
}; };
Ok(success) Ok(success)
} }
@ -714,6 +730,10 @@ impl VmStore for FileStore {
.collect()) .collect())
} }
async fn list_all_vms(&self) -> StorageResult<Vec<VirtualMachine>> {
Ok(self.load_state().unwrap_or_default().vms)
}
async fn save_handle( async fn save_handle(
&self, &self,
_org_id: &str, _org_id: &str,

View file

@ -30,10 +30,11 @@ use plasmavmc_api::proto::{
NodeState as ProtoNodeState, OsType as ProtoOsType, PrepareVmMigrationRequest, RebootVmRequest, NodeState as ProtoNodeState, OsType as ProtoOsType, PrepareVmMigrationRequest, RebootVmRequest,
RecoverVmRequest, RegisterExternalVolumeRequest, ResetVmRequest, ResizeVolumeRequest, RecoverVmRequest, RegisterExternalVolumeRequest, ResetVmRequest, ResizeVolumeRequest,
StartVmRequest, StopVmRequest, UncordonNodeRequest, UpdateImageRequest, UpdateVmRequest, StartVmRequest, StopVmRequest, UncordonNodeRequest, UpdateImageRequest, UpdateVmRequest,
VirtualMachine, Visibility as ProtoVisibility, VmEvent, VmSpec as ProtoVmSpec, VirtualMachine, Visibility as ProtoVisibility, VmEvent, VmEventType as ProtoVmEventType,
VmState as ProtoVmState, VmStatus as ProtoVmStatus, Volume as ProtoVolume, VmSpec as ProtoVmSpec, VmState as ProtoVmState, VmStatus as ProtoVmStatus,
VolumeBacking as ProtoVolumeBacking, VolumeDriverKind as ProtoVolumeDriverKind, Volume as ProtoVolume, VolumeBacking as ProtoVolumeBacking,
VolumeFormat as ProtoVolumeFormat, VolumeStatus as ProtoVolumeStatus, WatchVmRequest, VolumeDriverKind as ProtoVolumeDriverKind, VolumeFormat as ProtoVolumeFormat,
VolumeStatus as ProtoVolumeStatus, WatchVmRequest,
}; };
use plasmavmc_hypervisor::HypervisorRegistry; use plasmavmc_hypervisor::HypervisorRegistry;
use plasmavmc_types::{ use plasmavmc_types::{
@ -247,6 +248,10 @@ impl VmServiceImpl {
.unwrap_or(true) .unwrap_or(true)
} }
pub fn store(&self) -> Arc<dyn VmStore> {
Arc::clone(&self.store)
}
fn to_status_code(err: plasmavmc_types::Error) -> Status { fn to_status_code(err: plasmavmc_types::Error) -> Status {
Status::internal(err.to_string()) Status::internal(err.to_string())
} }
@ -287,6 +292,61 @@ impl VmServiceImpl {
.as_secs() .as_secs()
} }
fn watch_poll_interval() -> Duration {
let poll_interval_ms = std::env::var("PLASMAVMC_VM_WATCH_POLL_INTERVAL_MS")
.ok()
.and_then(|value| value.parse::<u64>().ok())
.unwrap_or(500)
.max(100);
Duration::from_millis(poll_interval_ms)
}
fn vm_values_differ(
previous: &plasmavmc_types::VirtualMachine,
current: &plasmavmc_types::VirtualMachine,
) -> Result<bool, Status> {
let previous = serde_json::to_value(previous).map_err(|error| {
Status::internal(format!("failed to serialize VM snapshot: {error}"))
})?;
let current = serde_json::to_value(current).map_err(|error| {
Status::internal(format!("failed to serialize VM snapshot: {error}"))
})?;
Ok(previous != current)
}
fn build_vm_event(
previous: Option<&plasmavmc_types::VirtualMachine>,
current: Option<&plasmavmc_types::VirtualMachine>,
) -> Result<Option<VmEvent>, Status> {
let event_type = match (previous, current) {
(None, Some(_)) => ProtoVmEventType::Created,
(Some(_), None) => ProtoVmEventType::Deleted,
(Some(previous), Some(current)) => {
if previous.state != current.state
|| previous.status.actual_state != current.status.actual_state
{
ProtoVmEventType::StateChanged
} else if Self::vm_values_differ(previous, current)? {
ProtoVmEventType::Updated
} else {
return Ok(None);
}
}
(None, None) => return Ok(None),
};
let vm = current
.or(previous)
.ok_or_else(|| Status::internal("watch event builder received an empty VM snapshot"))?;
Ok(Some(VmEvent {
vm_id: vm.id.to_string(),
event_type: event_type as i32,
vm: Some(Self::to_proto_vm(vm, vm.status.clone())),
timestamp: Self::now_epoch() as i64,
}))
}
fn endpoint_host(endpoint: &str) -> Result<String, Status> { fn endpoint_host(endpoint: &str) -> Result<String, Status> {
let authority = endpoint let authority = endpoint
.split("://") .split("://")
@ -1925,6 +1985,7 @@ impl VmServiceImpl {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use plasmavmc_types::VmSpec;
#[test] #[test]
fn unspecified_disk_cache_defaults_to_writeback() { fn unspecified_disk_cache_defaults_to_writeback() {
@ -1945,6 +2006,47 @@ mod tests {
prismnet_api::proto::DeviceType::None as i32 prismnet_api::proto::DeviceType::None as i32
); );
} }
#[test]
fn build_vm_event_classifies_lifecycle_changes() {
let vm = plasmavmc_types::VirtualMachine::new(
"watch-vm",
"watch-org",
"watch-project",
VmSpec::default(),
);
let created = VmServiceImpl::build_vm_event(None, Some(&vm))
.unwrap()
.unwrap();
assert_eq!(created.event_type, ProtoVmEventType::Created as i32);
let mut updated_vm = vm.clone();
updated_vm.updated_at += 1;
updated_vm
.metadata
.insert("note".to_string(), "changed".to_string());
let updated = VmServiceImpl::build_vm_event(Some(&vm), Some(&updated_vm))
.unwrap()
.unwrap();
assert_eq!(updated.event_type, ProtoVmEventType::Updated as i32);
let mut running_vm = updated_vm.clone();
running_vm.state = VmState::Running;
running_vm.status.actual_state = VmState::Running;
let state_changed = VmServiceImpl::build_vm_event(Some(&updated_vm), Some(&running_vm))
.unwrap()
.unwrap();
assert_eq!(
state_changed.event_type,
ProtoVmEventType::StateChanged as i32
);
let deleted = VmServiceImpl::build_vm_event(Some(&running_vm), None)
.unwrap()
.unwrap();
assert_eq!(deleted.event_type, ProtoVmEventType::Deleted as i32);
}
} }
impl StateSink for VmServiceImpl { impl StateSink for VmServiceImpl {
@ -3742,7 +3844,7 @@ impl VmService for VmServiceImpl {
vm_id = %req.vm_id, vm_id = %req.vm_id,
org_id = %req.org_id, org_id = %req.org_id,
project_id = %req.project_id, project_id = %req.project_id,
"WatchVm request (stub implementation)" "WatchVm request"
); );
self.auth self.auth
.authorize( .authorize(
@ -3752,8 +3854,68 @@ impl VmService for VmServiceImpl {
) )
.await?; .await?;
// TODO: Implement VM watch via ChainFire watch let initial_vm = self
Err(Status::unimplemented("VM watch not yet implemented")) .ensure_vm_loaded(&req.org_id, &req.project_id, &req.vm_id)
.await
.ok_or_else(|| Status::not_found("VM not found"))?;
let poll_interval = Self::watch_poll_interval();
let store = Arc::clone(&self.store);
let org_id = req.org_id.clone();
let project_id = req.project_id.clone();
let vm_id = req.vm_id.clone();
let (tx, rx) = tokio::sync::mpsc::channel(32);
tokio::spawn(async move {
let mut last_seen = Some(initial_vm);
let mut ticker = tokio::time::interval(poll_interval);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
ticker.tick().await;
let next_vm = match tokio::time::timeout(
STORE_OP_TIMEOUT,
store.load_vm(&org_id, &project_id, &vm_id),
)
.await
{
Ok(Ok(vm)) => vm,
Ok(Err(error)) => {
let _ = tx
.send(Err(Status::unavailable(format!(
"VM watch poll failed: {error}"
))))
.await;
break;
}
Err(_) => {
let _ = tx
.send(Err(Status::deadline_exceeded("VM watch poll timed out")))
.await;
break;
}
};
let event = match Self::build_vm_event(last_seen.as_ref(), next_vm.as_ref()) {
Ok(event) => event,
Err(status) => {
let _ = tx.send(Err(status)).await;
break;
}
};
if let Some(event) = event {
if tx.send(Ok(event)).await.is_err() {
break;
}
}
match next_vm {
Some(vm) => last_seen = Some(vm),
None => break,
}
}
});
Ok(Response::new(ReceiverStream::new(rx)))
} }
} }

View file

@ -1,14 +1,23 @@
//! ChainFire state watcher for PlasmaVMC //! Storage-backed state watcher for PlasmaVMC.
//! //!
//! Provides state synchronization across multiple PlasmaVMC instances //! PlasmaVMC persists VM intent and node heartbeats in the shared metadata
//! by watching ChainFire for VM and handle changes made by other nodes. //! store. This watcher polls that store and mirrors external changes into each
//! process-local cache so multiple control-plane or agent instances converge on
//! the same durable view.
use chainfire_client::{Client as ChainFireClient, EventType, WatchEvent}; use crate::storage::VmStore;
use plasmavmc_types::{Node, VirtualMachine, VmHandle}; use plasmavmc_types::{Node, VirtualMachine, VmHandle};
use serde::Serialize;
use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::time::MissedTickBehavior;
use tracing::{debug, info, warn}; use tracing::{debug, info, warn};
type VmSnapshot = HashMap<VmKey, VirtualMachine>;
type NodeSnapshot = HashMap<String, Node>;
/// Event types from the state watcher /// Event types from the state watcher
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum StateEvent { pub enum StateEvent {
@ -39,255 +48,246 @@ pub enum StateEvent {
vm_id: String, vm_id: String,
}, },
/// A node was updated /// A node was updated
NodeUpdated { NodeUpdated { node_id: String, node: Node },
node_id: String,
node: Node,
},
/// A node was deleted /// A node was deleted
NodeDeleted { NodeDeleted { node_id: String },
node_id: String,
},
} }
/// Configuration for the state watcher /// Configuration for the state watcher
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct WatcherConfig { pub struct WatcherConfig {
/// ChainFire endpoint /// Poll interval for metadata refresh.
pub chainfire_endpoint: String, pub poll_interval: Duration,
/// Channel buffer size for events /// Channel buffer size for events
pub buffer_size: usize, pub buffer_size: usize,
} }
impl Default for WatcherConfig { impl Default for WatcherConfig {
fn default() -> Self { fn default() -> Self {
let poll_interval_ms = std::env::var("PLASMAVMC_STATE_WATCHER_POLL_INTERVAL_MS")
.ok()
.and_then(|value| value.parse::<u64>().ok())
.unwrap_or(1000)
.max(100);
Self { Self {
chainfire_endpoint: std::env::var("PLASMAVMC_CHAINFIRE_ENDPOINT") poll_interval: Duration::from_millis(poll_interval_ms),
.unwrap_or_else(|_| "http://127.0.0.1:2379".to_string()),
buffer_size: 256, buffer_size: 256,
} }
} }
} }
/// State watcher that monitors ChainFire for external changes /// State watcher that monitors the shared VM store for external changes.
pub struct StateWatcher { pub struct StateWatcher {
store: Arc<dyn VmStore>,
config: WatcherConfig, config: WatcherConfig,
event_tx: mpsc::Sender<StateEvent>, event_tx: mpsc::Sender<StateEvent>,
} }
impl StateWatcher { impl StateWatcher {
/// Create a new state watcher and return the event receiver /// Create a new state watcher and return the event receiver.
pub fn new(config: WatcherConfig) -> (Self, mpsc::Receiver<StateEvent>) { pub fn new(
store: Arc<dyn VmStore>,
config: WatcherConfig,
) -> (Self, mpsc::Receiver<StateEvent>) {
let (event_tx, event_rx) = mpsc::channel(config.buffer_size); let (event_tx, event_rx) = mpsc::channel(config.buffer_size);
(Self { config, event_tx }, event_rx) (
Self {
store,
config,
event_tx,
},
event_rx,
)
} }
/// Start watching for state changes /// Start watching for state changes.
/// ///
/// This spawns background tasks that watch: /// VM handles remain local process state because they point at ephemeral
/// - `/plasmavmc/vms/` prefix for VM changes /// runtime artifacts such as QMP sockets and per-node directories. Durable
/// - `/plasmavmc/handles/` prefix for handle changes /// VM intent and node heartbeats are the shared state synchronized here.
/// - `/plasmavmc/nodes/` prefix for node changes
pub async fn start(&self) -> Result<(), WatcherError> { pub async fn start(&self) -> Result<(), WatcherError> {
info!("Starting PlasmaVMC state watcher"); info!(
poll_interval_ms = self.config.poll_interval.as_millis(),
"Starting storage-backed PlasmaVMC state watcher"
);
// Connect to ChainFire let mut vm_snapshot = self.load_vms().await?;
let mut client = ChainFireClient::connect(&self.config.chainfire_endpoint) let mut node_snapshot = self.load_nodes().await?;
.await let mut ticker = tokio::time::interval(self.config.poll_interval);
.map_err(|e| WatcherError::Connection(e.to_string()))?; ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
// Start watching VMs loop {
let vm_watch = client ticker.tick().await;
.watch_prefix(b"/plasmavmc/vms/")
.await
.map_err(|e| WatcherError::Watch(e.to_string()))?;
let event_tx_vm = self.event_tx.clone(); let next_vm_snapshot = match self.load_vms().await {
tokio::spawn(async move { Ok(snapshot) => snapshot,
Self::watch_loop(vm_watch, event_tx_vm, WatchType::Vm).await; Err(error) => {
}); warn!(error = %error, "Failed to refresh PlasmaVMC VM snapshot");
continue;
// Connect again for second watch (each watch uses its own stream)
let mut client2 = ChainFireClient::connect(&self.config.chainfire_endpoint)
.await
.map_err(|e| WatcherError::Connection(e.to_string()))?;
// Start watching handles
let handle_watch = client2
.watch_prefix(b"/plasmavmc/handles/")
.await
.map_err(|e| WatcherError::Watch(e.to_string()))?;
let event_tx_handle = self.event_tx.clone();
tokio::spawn(async move {
Self::watch_loop(handle_watch, event_tx_handle, WatchType::Handle).await;
});
// Connect again for node watch
let mut client3 = ChainFireClient::connect(&self.config.chainfire_endpoint)
.await
.map_err(|e| WatcherError::Connection(e.to_string()))?;
let node_watch = client3
.watch_prefix(b"/plasmavmc/nodes/")
.await
.map_err(|e| WatcherError::Watch(e.to_string()))?;
let event_tx_node = self.event_tx.clone();
tokio::spawn(async move {
Self::watch_loop(node_watch, event_tx_node, WatchType::Node).await;
});
info!("State watcher started successfully");
Ok(())
} }
/// Watch loop for processing events
async fn watch_loop(
mut watch: chainfire_client::WatchHandle,
event_tx: mpsc::Sender<StateEvent>,
watch_type: WatchType,
) {
debug!(?watch_type, "Starting watch loop");
while let Some(event) = watch.recv().await {
match Self::process_event(&event, &watch_type) {
Ok(Some(state_event)) => {
if event_tx.send(state_event).await.is_err() {
warn!("Event receiver dropped, stopping watch loop");
break;
}
}
Ok(None) => {
// Event was filtered or not relevant
}
Err(e) => {
warn!(?watch_type, error = %e, "Failed to process watch event");
}
}
}
debug!(?watch_type, "Watch loop ended");
}
/// Process a watch event into a state event
fn process_event(
event: &WatchEvent,
watch_type: &WatchType,
) -> Result<Option<StateEvent>, WatcherError> {
let key_str = String::from_utf8_lossy(&event.key);
// Parse the key to extract org_id, project_id, vm_id
let (org_id, project_id, vm_id) = match watch_type {
WatchType::Vm => parse_vm_key(&key_str)?,
WatchType::Handle => parse_handle_key(&key_str)?,
WatchType::Node => (String::new(), String::new(), parse_node_key(&key_str)?),
}; };
match diff_vm_snapshots(&vm_snapshot, &next_vm_snapshot) {
Ok(events) => {
for event in events {
if self.event_tx.send(event).await.is_err() {
info!("State watcher receiver dropped, stopping");
return Ok(());
}
}
vm_snapshot = next_vm_snapshot;
}
Err(error) => {
warn!(error = %error, "Failed to diff PlasmaVMC VM snapshot");
}
}
match event.event_type { let next_node_snapshot = match self.load_nodes().await {
EventType::Put => { Ok(snapshot) => snapshot,
match watch_type { Err(error) => {
WatchType::Vm => { warn!(error = %error, "Failed to refresh PlasmaVMC node snapshot");
let vm: VirtualMachine = serde_json::from_slice(&event.value) continue;
.map_err(|e| WatcherError::Deserialize(e.to_string()))?;
Ok(Some(StateEvent::VmUpdated {
org_id,
project_id,
vm_id,
vm,
}))
} }
WatchType::Handle => { };
let handle: VmHandle = serde_json::from_slice(&event.value) match diff_node_snapshots(&node_snapshot, &next_node_snapshot) {
.map_err(|e| WatcherError::Deserialize(e.to_string()))?; Ok(events) => {
Ok(Some(StateEvent::HandleUpdated { for event in events {
org_id, if self.event_tx.send(event).await.is_err() {
project_id, info!("State watcher receiver dropped, stopping");
vm_id, return Ok(());
handle,
}))
}
WatchType::Node => {
let node: Node = serde_json::from_slice(&event.value)
.map_err(|e| WatcherError::Deserialize(e.to_string()))?;
Ok(Some(StateEvent::NodeUpdated {
node_id: vm_id,
node,
}))
} }
} }
node_snapshot = next_node_snapshot;
} }
EventType::Delete => { Err(error) => {
match watch_type { warn!(error = %error, "Failed to diff PlasmaVMC node snapshot");
WatchType::Vm => Ok(Some(StateEvent::VmDeleted {
org_id,
project_id,
vm_id,
})),
WatchType::Handle => Ok(Some(StateEvent::HandleDeleted {
org_id,
project_id,
vm_id,
})),
WatchType::Node => Ok(Some(StateEvent::NodeDeleted { node_id: vm_id })),
}
} }
} }
} }
} }
#[derive(Debug, Clone, Copy)] async fn load_vms(&self) -> Result<VmSnapshot, WatcherError> {
enum WatchType { let vms = self
Vm, .store
Handle, .list_all_vms()
Node, .await
.map_err(|error| WatcherError::Storage(error.to_string()))?;
let mut snapshot = HashMap::with_capacity(vms.len());
for vm in vms {
snapshot.insert(VmKey::from_vm(&vm), vm);
}
Ok(snapshot)
} }
/// Parse VM key: /plasmavmc/vms/{org_id}/{project_id}/{vm_id} async fn load_nodes(&self) -> Result<NodeSnapshot, WatcherError> {
fn parse_vm_key(key: &str) -> Result<(String, String, String), WatcherError> { let nodes = self
let parts: Vec<&str> = key.trim_start_matches('/').split('/').collect(); .store
if parts.len() < 5 || parts[0] != "plasmavmc" || parts[1] != "vms" { .list_nodes()
return Err(WatcherError::InvalidKey(key.to_string())); .await
.map_err(|error| WatcherError::Storage(error.to_string()))?;
let mut snapshot = HashMap::with_capacity(nodes.len());
for node in nodes {
snapshot.insert(node.id.to_string(), node);
}
Ok(snapshot)
} }
Ok((
parts[2].to_string(),
parts[3].to_string(),
parts[4].to_string(),
))
} }
/// Parse handle key: /plasmavmc/handles/{org_id}/{project_id}/{vm_id} #[derive(Debug, Clone, PartialEq, Eq, Hash)]
fn parse_handle_key(key: &str) -> Result<(String, String, String), WatcherError> { struct VmKey {
let parts: Vec<&str> = key.trim_start_matches('/').split('/').collect(); org_id: String,
if parts.len() < 5 || parts[0] != "plasmavmc" || parts[1] != "handles" { project_id: String,
return Err(WatcherError::InvalidKey(key.to_string())); vm_id: String,
}
Ok((
parts[2].to_string(),
parts[3].to_string(),
parts[4].to_string(),
))
} }
/// Parse node key: /plasmavmc/nodes/{node_id} impl VmKey {
fn parse_node_key(key: &str) -> Result<String, WatcherError> { fn from_vm(vm: &VirtualMachine) -> Self {
let parts: Vec<&str> = key.trim_start_matches('/').split('/').collect(); Self {
if parts.len() < 3 || parts[0] != "plasmavmc" || parts[1] != "nodes" { org_id: vm.org_id.clone(),
return Err(WatcherError::InvalidKey(key.to_string())); project_id: vm.project_id.clone(),
vm_id: vm.id.to_string(),
} }
Ok(parts[2].to_string()) }
}
fn diff_vm_snapshots(
previous: &VmSnapshot,
current: &VmSnapshot,
) -> Result<Vec<StateEvent>, WatcherError> {
let mut events = Vec::new();
for (key, vm) in current {
let changed = match previous.get(key) {
Some(old_vm) => values_differ(old_vm, vm)?,
None => true,
};
if changed {
events.push(StateEvent::VmUpdated {
org_id: key.org_id.clone(),
project_id: key.project_id.clone(),
vm_id: key.vm_id.clone(),
vm: vm.clone(),
});
}
}
for key in previous.keys() {
if !current.contains_key(key) {
events.push(StateEvent::VmDeleted {
org_id: key.org_id.clone(),
project_id: key.project_id.clone(),
vm_id: key.vm_id.clone(),
});
}
}
Ok(events)
}
fn diff_node_snapshots(
previous: &NodeSnapshot,
current: &NodeSnapshot,
) -> Result<Vec<StateEvent>, WatcherError> {
let mut events = Vec::new();
for (node_id, node) in current {
let changed = match previous.get(node_id) {
Some(old_node) => values_differ(old_node, node)?,
None => true,
};
if changed {
events.push(StateEvent::NodeUpdated {
node_id: node_id.clone(),
node: node.clone(),
});
}
}
for node_id in previous.keys() {
if !current.contains_key(node_id) {
events.push(StateEvent::NodeDeleted {
node_id: node_id.clone(),
});
}
}
Ok(events)
}
fn values_differ<T: Serialize>(lhs: &T, rhs: &T) -> Result<bool, WatcherError> {
let lhs =
serde_json::to_value(lhs).map_err(|error| WatcherError::Serialize(error.to_string()))?;
let rhs =
serde_json::to_value(rhs).map_err(|error| WatcherError::Serialize(error.to_string()))?;
Ok(lhs != rhs)
} }
/// Watcher errors /// Watcher errors
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum WatcherError { pub enum WatcherError {
#[error("Connection error: {0}")] #[error("Storage error: {0}")]
Connection(String), Storage(String),
#[error("Watch error: {0}")] #[error("Serialization error: {0}")]
Watch(String), Serialize(String),
#[error("Invalid key format: {0}")]
InvalidKey(String),
#[error("Deserialization error: {0}")]
Deserialize(String),
} }
/// State synchronizer that applies watch events to local state /// State synchronizer that applies watch events to local state
@ -323,20 +323,42 @@ impl<S: StateSink> StateSynchronizer<S> {
while let Some(event) = event_rx.recv().await { while let Some(event) = event_rx.recv().await {
match event { match event {
StateEvent::VmUpdated { org_id, project_id, vm_id, vm } => { StateEvent::VmUpdated {
org_id,
project_id,
vm_id,
vm,
} => {
debug!(org_id, project_id, vm_id, "External VM update received"); debug!(org_id, project_id, vm_id, "External VM update received");
self.sink.on_vm_updated(&org_id, &project_id, &vm_id, vm); self.sink.on_vm_updated(&org_id, &project_id, &vm_id, vm);
} }
StateEvent::VmDeleted { org_id, project_id, vm_id } => { StateEvent::VmDeleted {
org_id,
project_id,
vm_id,
} => {
debug!(org_id, project_id, vm_id, "External VM deletion received"); debug!(org_id, project_id, vm_id, "External VM deletion received");
self.sink.on_vm_deleted(&org_id, &project_id, &vm_id); self.sink.on_vm_deleted(&org_id, &project_id, &vm_id);
} }
StateEvent::HandleUpdated { org_id, project_id, vm_id, handle } => { StateEvent::HandleUpdated {
org_id,
project_id,
vm_id,
handle,
} => {
debug!(org_id, project_id, vm_id, "External handle update received"); debug!(org_id, project_id, vm_id, "External handle update received");
self.sink.on_handle_updated(&org_id, &project_id, &vm_id, handle); self.sink
.on_handle_updated(&org_id, &project_id, &vm_id, handle);
} }
StateEvent::HandleDeleted { org_id, project_id, vm_id } => { StateEvent::HandleDeleted {
debug!(org_id, project_id, vm_id, "External handle deletion received"); org_id,
project_id,
vm_id,
} => {
debug!(
org_id,
project_id, vm_id, "External handle deletion received"
);
self.sink.on_handle_deleted(&org_id, &project_id, &vm_id); self.sink.on_handle_deleted(&org_id, &project_id, &vm_id);
} }
StateEvent::NodeUpdated { node_id, node } => { StateEvent::NodeUpdated { node_id, node } => {
@ -357,33 +379,82 @@ impl<S: StateSink> StateSynchronizer<S> {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use plasmavmc_types::{VmSpec, VmState};
#[test] fn sample_vm() -> VirtualMachine {
fn test_parse_vm_key() { VirtualMachine::new("vm-a", "org-a", "project-a", VmSpec::default())
let (org, proj, vm) = parse_vm_key("/plasmavmc/vms/org1/proj1/vm-123").unwrap(); }
assert_eq!(org, "org1");
assert_eq!(proj, "proj1"); fn sample_node() -> Node {
assert_eq!(vm, "vm-123"); Node::new("node-a")
} }
#[test] #[test]
fn test_parse_handle_key() { fn vm_snapshot_diff_emits_update_for_new_and_changed_vms() {
let (org, proj, vm) = parse_handle_key("/plasmavmc/handles/org1/proj1/vm-123").unwrap(); let mut previous = VmSnapshot::new();
assert_eq!(org, "org1"); let mut current = VmSnapshot::new();
assert_eq!(proj, "proj1"); let vm = sample_vm();
assert_eq!(vm, "vm-123"); let key = VmKey::from_vm(&vm);
current.insert(key.clone(), vm.clone());
let events = diff_vm_snapshots(&previous, &current).unwrap();
assert!(matches!(
events.as_slice(),
[StateEvent::VmUpdated { vm_id, .. }] if vm_id == &vm.id.to_string()
));
previous = current.clone();
let mut updated_vm = vm.clone();
updated_vm.state = VmState::Running;
updated_vm.status.actual_state = VmState::Running;
current.insert(key, updated_vm.clone());
let events = diff_vm_snapshots(&previous, &current).unwrap();
assert!(matches!(
events.as_slice(),
[StateEvent::VmUpdated { vm, .. }] if vm.state == VmState::Running
));
} }
#[test] #[test]
fn test_parse_node_key() { fn vm_snapshot_diff_emits_delete_for_removed_vms() {
let node_id = parse_node_key("/plasmavmc/nodes/node-1").unwrap(); let vm = sample_vm();
assert_eq!(node_id, "node-1"); let key = VmKey::from_vm(&vm);
let previous = HashMap::from([(key, vm.clone())]);
let current = VmSnapshot::new();
let events = diff_vm_snapshots(&previous, &current).unwrap();
assert!(matches!(
events.as_slice(),
[StateEvent::VmDeleted { vm_id, .. }] if vm_id == &vm.id.to_string()
));
} }
#[test] #[test]
fn test_invalid_key() { fn node_snapshot_diff_emits_update_and_delete() {
assert!(parse_vm_key("/invalid/key").is_err()); let node = sample_node();
assert!(parse_handle_key("/plasmavmc/wrong/a/b/c").is_err()); let mut previous = NodeSnapshot::new();
assert!(parse_node_key("/plasmavmc/wrong").is_err()); let mut current = NodeSnapshot::from([(node.id.to_string(), node.clone())]);
let events = diff_node_snapshots(&previous, &current).unwrap();
assert!(matches!(
events.as_slice(),
[StateEvent::NodeUpdated { node_id, .. }] if node_id == &node.id.to_string()
));
previous = current.clone();
let mut updated_node = node.clone();
updated_node.last_heartbeat = 42;
current.insert(updated_node.id.to_string(), updated_node);
let events = diff_node_snapshots(&previous, &current).unwrap();
assert!(matches!(
events.as_slice(),
[StateEvent::NodeUpdated { node, .. }] if node.last_heartbeat == 42
));
let events = diff_node_snapshots(&current, &NodeSnapshot::new()).unwrap();
assert!(matches!(
events.as_slice(),
[StateEvent::NodeDeleted { node_id }] if node_id == &node.id.to_string()
));
} }
} }