From 9dfe86f92a17413ff82c2bb3653b6f818b70b6bc Mon Sep 17 00:00:00 2001
From: centra
Date: Tue, 31 Mar 2026 14:09:45 +0900
Subject: [PATCH] fix cluster resiliency gaps across VM watch, runtime health,
and FlareDB routing
---
chainfire/chainfire-client/src/client.rs | 200 ++++---
chainfire/chainfire-client/src/error.rs | 4 +
deployer/crates/node-agent/src/agent.rs | 210 +++++--
flaredb/crates/flaredb-client/src/client.rs | 351 +++++++++---
flaredb/crates/flaredb-server/src/main.rs | 66 ++-
nix/test-cluster/run-cluster.sh | 94 ++++
plasmavmc/crates/plasmavmc-server/src/main.rs | 57 +-
.../crates/plasmavmc-server/src/storage.rs | 22 +-
.../crates/plasmavmc-server/src/vm_service.rs | 176 +++++-
.../crates/plasmavmc-server/src/watcher.rs | 513 ++++++++++--------
10 files changed, 1229 insertions(+), 464 deletions(-)
diff --git a/chainfire/chainfire-client/src/client.rs b/chainfire/chainfire-client/src/client.rs
index 9a645f2..55f0d08 100644
--- a/chainfire/chainfire-client/src/client.rs
+++ b/chainfire/chainfire-client/src/client.rs
@@ -3,24 +3,13 @@
use crate::error::{ClientError, Result};
use crate::watch::WatchHandle;
use chainfire_proto::proto::{
- cluster_client::ClusterClient,
- compare,
- kv_client::KvClient,
- request_op,
- response_op,
- watch_client::WatchClient,
- Compare,
- DeleteRangeRequest,
- MemberAddRequest,
- PutRequest,
- RangeRequest,
- RequestOp,
- StatusRequest,
- TxnRequest,
+ cluster_client::ClusterClient, compare, kv_client::KvClient, request_op, response_op,
+ watch_client::WatchClient, Compare, DeleteRangeRequest, MemberAddRequest, PutRequest,
+ RangeRequest, RequestOp, StatusRequest, TxnRequest,
};
use std::time::Duration;
-use tonic::Code;
use tonic::transport::Channel;
+use tonic::Code;
use tracing::{debug, warn};
/// Chainfire client
@@ -64,7 +53,9 @@ impl Client {
}
}
- Err(last_error.unwrap_or_else(|| ClientError::Connection("no Chainfire endpoints configured".to_string())))
+ Err(last_error.unwrap_or_else(|| {
+ ClientError::Connection("no Chainfire endpoints configured".to_string())
+ }))
}
async fn with_kv_retry(&mut self, mut op: F) -> Result
@@ -88,14 +79,17 @@ impl Client {
"retrying Chainfire KV RPC on alternate endpoint"
);
last_status = Some(status);
- self.recover_after_status(last_status.as_ref().unwrap()).await?;
+ self.recover_after_status(last_status.as_ref().unwrap())
+ .await?;
tokio::time::sleep(retry_delay(attempt)).await;
}
Err(status) => return Err(status.into()),
}
}
- Err(last_status.unwrap_or_else(|| tonic::Status::unavailable("Chainfire KV retry exhausted")).into())
+ Err(last_status
+ .unwrap_or_else(|| tonic::Status::unavailable("Chainfire KV retry exhausted"))
+ .into())
}
async fn with_cluster_retry(&mut self, mut op: F) -> Result
@@ -119,14 +113,17 @@ impl Client {
"retrying Chainfire cluster RPC on alternate endpoint"
);
last_status = Some(status);
- self.recover_after_status(last_status.as_ref().unwrap()).await?;
+ self.recover_after_status(last_status.as_ref().unwrap())
+ .await?;
tokio::time::sleep(retry_delay(attempt)).await;
}
Err(status) => return Err(status.into()),
}
}
- Err(last_status.unwrap_or_else(|| tonic::Status::unavailable("Chainfire cluster retry exhausted")).into())
+ Err(last_status
+ .unwrap_or_else(|| tonic::Status::unavailable("Chainfire cluster retry exhausted"))
+ .into())
}
async fn recover_after_status(&mut self, status: &tonic::Status) -> Result<()> {
@@ -150,7 +147,9 @@ impl Client {
let endpoint = self
.endpoints
.get(index)
- .ok_or_else(|| ClientError::Connection(format!("invalid Chainfire endpoint index {index}")))?
+ .ok_or_else(|| {
+ ClientError::Connection(format!("invalid Chainfire endpoint index {index}"))
+ })?
.clone();
let (channel, kv, cluster) = connect_endpoint(&endpoint).await?;
self.current_endpoint = index;
@@ -182,7 +181,11 @@ impl Client {
match cluster.status(StatusRequest {}).await {
Ok(response) => {
let status = response.into_inner();
- let member_id = status.header.as_ref().map(|header| header.member_id).unwrap_or(0);
+ let member_id = status
+ .header
+ .as_ref()
+ .map(|header| header.member_id)
+ .unwrap_or(0);
if status.leader != 0 && status.leader == member_id {
return Ok(Some(index));
}
@@ -232,10 +235,7 @@ impl Client {
/// Get a value by key
pub async fn get(&mut self, key: impl AsRef<[u8]>) -> Result