From 72a68e8fc47bf54abccfc55a318d0c2033afad9f Mon Sep 17 00:00:00 2001
From: centra
Date: Tue, 5 May 2026 22:49:03 +0900
Subject: [PATCH] =?UTF-8?q?=E3=81=BE=E3=81=A8=E3=82=81=E3=81=A6=E3=82=B3?=
=?UTF-8?q?=E3=83=9F=E3=83=83=E3=83=88?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
README.md | 10 +-
chainfire/chainfire-client/src/client.rs | 113 +-
.../chainfire-api/src/cluster_service.rs | 175 ++-
.../chainfire-api/src/internal_service.rs | 97 +-
.../crates/chainfire-api/src/raft_client.rs | 51 +-
chainfire/crates/chainfire-core/src/lib.rs | 2 +-
chainfire/crates/chainfire-raft/src/core.rs | 1394 ++++++++++++++++-
.../crates/chainfire-raft/src/network.rs | 68 +-
chainfire/crates/chainfire-server/src/node.rs | 81 +-
chainfire/crates/chainfire-server/src/rest.rs | 378 ++++-
.../crates/chainfire-server/src/server.rs | 66 +-
.../chainfire-storage/src/log_storage.rs | 33 +-
chainfire/proto/chainfire.proto | 56 +-
chainfire/proto/internal.proto | 20 +
deployer/Cargo.lock | 11 +
deployer/Cargo.toml | 1 +
deployer/crates/bootstrap-agent/Cargo.toml | 16 +
deployer/crates/bootstrap-agent/src/main.rs | 203 +++
deployer/crates/deployer-ctl/src/chainfire.rs | 2 +
.../crates/deployer-server/src/cloud_init.rs | 2 +
.../crates/deployer-server/src/phone_home.rs | 2 +
deployer/crates/deployer-types/src/lib.rs | 315 ++++
deployer/crates/nix-agent/src/main.rs | 2 +
docs/README.md | 2 +-
docs/control-plane-ops.md | 24 +-
docs/rollout-bundle.md | 2 +-
docs/testing.md | 13 +-
flake.nix | 9 +
nix/ci/workspaces.json | 1 +
nix/iso/ultracloud-iso.nix | 62 +-
nix/test-cluster/README.md | 13 +-
nix/test-cluster/common.nix | 1 +
nix/test-cluster/run-cluster.sh | 723 ++++++++-
.../run-core-control-plane-ops-proof.sh | 4 +-
nix/test-cluster/run-publishable-kvm-suite.sh | 1 +
35 files changed, 3604 insertions(+), 349 deletions(-)
create mode 100644 deployer/crates/bootstrap-agent/Cargo.toml
create mode 100644 deployer/crates/bootstrap-agent/src/main.rs
diff --git a/README.md b/README.md
index 646aeb6..2f6de10 100644
--- a/README.md
+++ b/README.md
@@ -25,7 +25,7 @@ The canonical bare-metal bootstrap proof is the ISO-on-QEMU path under [`nix/tes
## Core API Notes
-- `chainfire` ships a fixed-membership cluster API on the supported surface. Public cluster management is `MemberList` plus `Status`, and the internal Raft transport surface is `Vote` plus `AppendEntries`. `chainfire-core` is workspace-internal only; the old embeddable builder and distributed-KV scaffold are not part of the supported product contract.
+- `chainfire` ships a live cluster-management API on the supported surface. Public cluster management is `MemberAdd`, `MemberRemove`, `MemberList`, `Status`, and `LeaderTransfer`, and the internal Raft transport surface is `Vote`, `AppendEntries`, plus `TimeoutNow`. `chainfire-core` is workspace-internal only; the old embeddable builder and distributed-KV scaffold are not part of the supported product contract.
- `flaredb` ships SQL on both gRPC and REST. The supported REST SQL surface is `POST /api/v1/sql` for statement execution and `GET /api/v1/tables` for table discovery, alongside the existing KV and scan endpoints.
- `plasmavmc` ships a KVM-only public VM backend contract. The supported create and recovery surface is the KVM path exercised in `single-node-quickstart`, `fresh-smoke`, and `fresh-matrix`; Firecracker and mvisor remain archived non-product backends outside the supported surface until they have real tenant-network coverage.
- `lightningstor` keeps its optional gRPC surface live: bucket versioning, bucket policy, bucket tagging, and explicit object version listing are part of the supported contract for the canonical optional bundle.
@@ -38,7 +38,7 @@ The canonical bare-metal bootstrap proof is the ISO-on-QEMU path under [`nix/tes
The control-plane operator contract is fixed in [docs/control-plane-ops.md](docs/control-plane-ops.md).
-- ChainFire dynamic membership, replace-node, and scale-out are unsupported on the supported surface; the supported operator path is fixed-membership restore or whole-cluster replacement backed by the `durability-proof` backup/restore baseline.
+- ChainFire supports live membership add, remove, promotion, endpoint replacement, and leader transfer for voters and learners on the public surface, including current-leader removal followed by election on the remaining voters. The supported reconfiguration boundary is sequential one-voter transitions until joint consensus lands. The fallback operator path remains backup plus restore through `durability-proof`, and the dedicated KVM proof lane is `nix run ./nix/test-cluster#cluster -- chainfire-live-membership-proof`.
- FlareDB online migration and schema evolution must start from the durability-proof backup/restore baseline and stay additive-first until a later destructive cleanup window. FlareDB destructive DDL and fully automated online migration remain outside the supported product contract for this release.
- IAM bootstrap hardening requires an explicit admin token, an explicit signing key, and a 32-byte IAM_CRED_MASTER_KEY. Signing-key rotation, credential overlap-and-revoke rotation, and mTLS overlap-and-cutover rotation are part of the supported operator contract; multi-node IAM failover remains outside the supported product contract. The standalone proof is `./nix/test-cluster/run-core-control-plane-ops-proof.sh`.
@@ -93,6 +93,7 @@ nix develop
nix run ./nix/test-cluster#cluster -- fresh-smoke
nix run ./nix/test-cluster#cluster -- fresh-demo-vm-webapp
nix run ./nix/test-cluster#cluster -- fresh-matrix
+nix run ./nix/test-cluster#cluster -- chainfire-live-membership-proof
./nix/test-cluster/run-publishable-kvm-suite.sh ./work/publishable-kvm-suite
```
@@ -100,6 +101,7 @@ The checked-in entrypoint for the publishable nested-KVM suite is the local wrap
For the full supported-surface proof on a local AMD/KVM host, use `./nix/test-cluster/run-supported-surface-final-proof.sh ./work/final-proofs/latest`; it keeps builders local, builds `single-node-trial-vm`, runs `single-node-quickstart`, and captures the publishable KVM suite logs in one place.
`nix run ./nix/test-cluster#cluster -- durability-proof` is the canonical chainfire flaredb deployer backup/restore lane. It persists artifacts under `./work/durability-proof/latest`, proves logical backup/restore for ChainFire keys and FlareDB SQL rows, uses the canonical Deployer admin pre-register request itself as the backup artifact, verifies that the pre-registered node survives a `deployer.service` restart, replays the same request idempotently, and injects CoronaFS plus LightningStor failures against the same live KVM cluster.
`nix run ./nix/test-cluster#cluster -- rollout-soak` is the longer-running control-plane and rollout companion lane. It rebuilds from clean local KVM runtime state, persists artifacts under `./work/rollout-soak/latest`, validates exactly one planned `draining` maintenance cycle and one fail-stop worker-loss cycle on the two native-runtime workers, holds each degraded state for the configured soak window, then restarts `deployer`, `fleet-scheduler`, `node-agent`, `chainfire`, and `flaredb` before revalidating the cluster. The soak root also carries explicit scope markers so the supported boundary is encoded in the proof artifacts rather than only in docs. The steady-state KVM nodes do not run `nix-agent.service`, so the soak lane records explicit `nix-agent` scope markers instead of pretending a live-cluster `nix-agent` restart happened.
+`nix run ./nix/test-cluster#cluster -- chainfire-live-membership-proof` is the focused local-KVM live-reconfiguration lane for ChainFire. It rebuilds from clean local runtime state, starts a temporary ChainFire replica on `node04`, proves learner add plus local replication, voter promotion, live leader transfer, temporary-voter restart and rejoin, current-leader removal followed by re-election, removed-leader re-add, and final scale-in back to the canonical 3-node control-plane shape, and stores the resulting membership or local-read artifacts under `./work/chainfire-live-membership-proof/latest`.
`nix run ./nix/test-cluster#cluster -- provider-vm-reality-proof` is the focused local-KVM reality lane for the provider and VM-hosting bundles. It stores artifacts under `./work/provider-vm-reality-proof/latest`, captures authoritative FlashDNS answers, FiberLB backend drain and restore evidence, and PlasmaVMC KVM shared-storage migration plus post-migration restart state.
The 2026-04-10 local AMD/KVM proof logs are in `./work/final-proofs/32f64c10-1b74-4d8a-8d7d-b2cc6bf6b4f0-final` for `supported-surface-guard`, `single-node-trial-vm`, and `single-node-quickstart`, and in `./work/publishable-kvm-suite` for the final passing `fresh-smoke`, `fresh-demo-vm-webapp`, and `fresh-matrix` run through `./nix/test-cluster/run-publishable-kvm-suite.sh`.
The exact bare-metal check-runner proof from `2026-04-10` is in `./work/baremetal-iso-e2e/0de75570-dabd-471b-95fe-5898c54e2e8c`; its outer `environment.txt` records `execution_model=materialized-check-runner`, and `state/environment.txt` records `vm_accelerator_mode=kvm`.
@@ -108,13 +110,13 @@ The 2026-04-10 longer-running rollout and control-plane soak is in `./work/rollo
The 2026-04-10 provider and VM-hosting reality proof logs are in `./work/provider-vm-reality-proof/20260410T135827+0900`; `result.json` records `success=true`, and the artifact set includes `network-provider/fiberlb-drain-summary.txt`, `network-provider/flashdns-service-authoritative-answer.txt`, `vm-hosting/migration-summary.json`, and `vm-hosting/root-volume-after-post-migration-restart.json`.
Physical-node bring-up now has a canonical preflight wrapper as well: `nix run ./nix/test-cluster#hardware-smoke -- preflight`. It writes `kernel-params.txt`, expected markers, failure markers, and a machine-readable blocked or ready state under `./work/hardware-smoke/latest`, and the same entrypoint can later be rerun as `run` or `capture` when USB or BMC/Redfish transport is actually present.
-Within that suite, `fresh-matrix` is the public provider-bundle proof: it exercises PrismNet VPC/subnet/port flows plus security-group ACL add/remove, FlashDNS record publication, and FiberLB TCP plus TLS-terminated `Https` / `TerminatedHttps` listeners in one tenant-scoped composition run. The published FiberLB L4 algorithms are kept honest with targeted server unit tests in-tree. `provider-vm-reality-proof` is the artifact-producing companion lane for the same bundle and for the VM-hosting path.
+Within that suite, `fresh-matrix` is the public provider-bundle proof: it exercises PrismNet VPC/subnet/port flows plus security-group ACL add/remove, FlashDNS record publication, and FiberLB TCP plus TLS-terminated `Https` / `TerminatedHttps` listeners in one tenant-scoped composition run. The published FiberLB L4 algorithms are kept honest with targeted server unit tests in-tree. `provider-vm-reality-proof` is the artifact-producing companion lane for the same bundle and for the VM-hosting path, and `chainfire-live-membership-proof` is the dedicated control-plane live-reconfiguration companion for ChainFire.
PrismNet real OVS/OVN dataplane validation remains outside the supported local KVM surface. FiberLB native BGP or BFD peer interop plus hardware VIP ownership also remain outside the supported local KVM surface. PlasmaVMC real-hardware migration or storage handoff remains a later hardware proof; the current local-KVM proof fixes the release surface to KVM shared-storage migration on the worker pair.
Project-done release proof now requires both halves of the public validation surface to be green:
- `baremetal-iso` and `baremetal-iso-e2e` for the canonical `deployer -> installer -> nix-agent` bare-metal bootstrap path
-- the KVM publishable suite (`fresh-smoke`, `fresh-demo-vm-webapp`, `fresh-matrix`) for the nested-KVM multi-node VM-hosting path
+- the KVM publishable suite (`fresh-smoke`, `fresh-demo-vm-webapp`, `fresh-matrix`, `chainfire-live-membership-proof`) for the nested-KVM multi-node VM-hosting and live-control-plane path
Canonical bare-metal bootstrap proof:
diff --git a/chainfire/chainfire-client/src/client.rs b/chainfire/chainfire-client/src/client.rs
index edc2d83..f2a4fc6 100644
--- a/chainfire/chainfire-client/src/client.rs
+++ b/chainfire/chainfire-client/src/client.rs
@@ -4,7 +4,8 @@ use crate::error::{ClientError, Result};
use crate::watch::WatchHandle;
use chainfire_proto::proto::{
cluster_client::ClusterClient, compare, kv_client::KvClient, request_op, response_op,
- watch_client::WatchClient, Compare, DeleteRangeRequest, PutRequest, RangeRequest, RequestOp,
+ watch_client::WatchClient, Compare, DeleteRangeRequest, LeaderTransferRequest, Member,
+ MemberAddRequest, MemberListRequest, MemberRemoveRequest, PutRequest, RangeRequest, RequestOp,
StatusRequest, TxnRequest,
};
use std::time::Duration;
@@ -616,6 +617,89 @@ impl Client {
raft_term: resp.raft_term,
})
}
+
+ /// List current cluster members.
+ pub async fn member_list(&mut self) -> Result> {
+ let resp = self
+ .with_cluster_retry(|mut cluster| async move {
+ cluster
+ .member_list(MemberListRequest {})
+ .await
+ .map(|resp| resp.into_inner())
+ })
+ .await?;
+
+ Ok(resp
+ .members
+ .into_iter()
+ .map(ClusterMemberInfo::from)
+ .collect())
+ }
+
+ /// Add or update a cluster member.
+ pub async fn member_add(
+ &mut self,
+ id: u64,
+ name: impl Into,
+ peer_urls: Vec,
+ client_urls: Vec,
+ is_learner: bool,
+ ) -> Result {
+ let name = name.into();
+ let resp = self
+ .with_cluster_retry(|mut cluster| {
+ let request = MemberAddRequest {
+ id,
+ name: name.clone(),
+ peer_urls: peer_urls.clone(),
+ client_urls: client_urls.clone(),
+ is_learner,
+ };
+ async move {
+ cluster
+ .member_add(request)
+ .await
+ .map(|resp| resp.into_inner())
+ }
+ })
+ .await?;
+
+ resp.member
+ .map(ClusterMemberInfo::from)
+ .ok_or_else(|| ClientError::Internal("member_add response missing member".to_string()))
+ }
+
+ /// Remove a cluster member.
+ pub async fn member_remove(&mut self, id: u64) -> Result> {
+ let resp = self
+ .with_cluster_retry(|mut cluster| async move {
+ cluster
+ .member_remove(MemberRemoveRequest { id })
+ .await
+ .map(|resp| resp.into_inner())
+ })
+ .await?;
+
+ Ok(resp
+ .members
+ .into_iter()
+ .map(ClusterMemberInfo::from)
+ .collect())
+ }
+
+ /// Transfer leadership to a specific voting member.
+ pub async fn leader_transfer(&mut self, target_id: u64) -> Result {
+ let resp = self
+ .with_cluster_retry(|mut cluster| async move {
+ cluster
+ .leader_transfer(LeaderTransferRequest { target_id })
+ .await
+ .map(|resp| resp.into_inner())
+ })
+ .await?;
+
+ Ok(resp.leader)
+ }
}
/// Cluster status
@@ -629,6 +713,33 @@ pub struct ClusterStatus {
pub raft_term: u64,
}
+/// Cluster member returned by cluster-management RPCs.
+#[derive(Debug, Clone)]
+pub struct ClusterMemberInfo {
+ /// Unique member ID.
+ pub id: u64,
+ /// Human-readable node name.
+ pub name: String,
+ /// Peer URLs used for Raft replication.
+ pub peer_urls: Vec,
+ /// Client URLs exposed by the node.
+ pub client_urls: Vec,
+ /// Whether this member is configured as a learner.
+ pub is_learner: bool,
+}
+
+impl From for ClusterMemberInfo {
+ fn from(member: Member) -> Self {
+ Self {
+ id: member.id,
+ name: member.name,
+ peer_urls: member.peer_urls,
+ client_urls: member.client_urls,
+ is_learner: member.is_learner,
+ }
+ }
+}
+
/// CAS outcome returned by compare_and_swap
#[derive(Debug, Clone)]
pub struct CasOutcome {
diff --git a/chainfire/crates/chainfire-api/src/cluster_service.rs b/chainfire/crates/chainfire-api/src/cluster_service.rs
index 504294c..3ff1475 100644
--- a/chainfire/crates/chainfire-api/src/cluster_service.rs
+++ b/chainfire/crates/chainfire-api/src/cluster_service.rs
@@ -1,76 +1,153 @@
-//! Cluster management service implementation
+//! Cluster management service implementation.
//!
-//! This service handles cluster operations and status queries.
-//! The supported surface reports the fixed membership that the node booted with.
+//! This service exposes live member add/remove/list/status operations backed by
+//! the replicated membership state in `RaftCore`.
use crate::conversions::make_header;
use crate::proto::{
- cluster_server::Cluster, Member, MemberListRequest, MemberListResponse, StatusRequest,
- StatusResponse,
+ cluster_server::Cluster, LeaderTransferRequest, LeaderTransferResponse, Member,
+ MemberAddRequest, MemberAddResponse, MemberListRequest, MemberListResponse,
+ MemberRemoveRequest, MemberRemoveResponse, StatusRequest, StatusResponse,
};
-use chainfire_raft::core::RaftCore;
+use chainfire_raft::core::{ClusterMember as CoreClusterMember, ClusterMembership, RaftCore};
use std::sync::Arc;
use tonic::{Request, Response, Status};
use tracing::debug;
-/// Cluster service implementation
+/// Cluster service implementation.
pub struct ClusterServiceImpl {
- /// Raft core
+ /// Raft core.
raft: Arc,
- /// Cluster ID
+ /// Cluster ID.
cluster_id: u64,
- /// Configured members with client and peer URLs
- members: Vec,
- /// Server version
+ /// Server version.
version: String,
}
impl ClusterServiceImpl {
- /// Create a new cluster service
- pub fn new(
- raft: Arc,
- cluster_id: u64,
- members: Vec,
- ) -> Self {
+ /// Create a new cluster service.
+ pub fn new(raft: Arc, cluster_id: u64) -> Self {
Self {
raft,
cluster_id,
- members,
version: env!("CARGO_PKG_VERSION").to_string(),
}
}
- fn make_header(&self, revision: u64) -> crate::proto::ResponseHeader {
- make_header(self.cluster_id, self.raft.node_id(), revision, 0)
+ async fn make_header(&self, revision: u64) -> crate::proto::ResponseHeader {
+ let term = self.raft.current_term().await;
+ make_header(self.cluster_id, self.raft.node_id(), revision, term)
}
- /// Get current members as proto Member list
- /// Return the configured static membership that the server was booted with.
- async fn get_member_list(&self) -> Vec {
- if self.members.is_empty() {
- return vec![Member {
- id: self.raft.node_id(),
- name: format!("node-{}", self.raft.node_id()),
- peer_urls: vec![],
- client_urls: vec![],
- is_learner: false,
- }];
+ fn proto_member(member: &CoreClusterMember) -> Member {
+ Member {
+ id: member.id,
+ name: member.name.clone(),
+ peer_urls: member.peer_urls.clone(),
+ client_urls: member.client_urls.clone(),
+ is_learner: member.is_learner,
+ }
+ }
+
+ fn proto_members(membership: &ClusterMembership) -> Vec {
+ membership.members.iter().map(Self::proto_member).collect()
+ }
+}
+
+fn map_raft_error(error: chainfire_raft::core::RaftError) -> Status {
+ match error {
+ chainfire_raft::core::RaftError::NotLeader { leader_id } => {
+ Status::failed_precondition(format!("not leader; current leader is {:?}", leader_id))
+ }
+ chainfire_raft::core::RaftError::Rejected(message) => Status::failed_precondition(message),
+ chainfire_raft::core::RaftError::StorageError(message)
+ | chainfire_raft::core::RaftError::NetworkError(message) => Status::internal(message),
+ chainfire_raft::core::RaftError::Timeout => {
+ Status::deadline_exceeded("cluster operation timed out")
}
- self.members.clone()
}
}
#[tonic::async_trait]
impl Cluster for ClusterServiceImpl {
+ async fn member_add(
+ &self,
+ request: Request,
+ ) -> Result, Status> {
+ let req = request.into_inner();
+ debug!(member_id = req.id, "Member add request");
+
+ if req.id == 0 {
+ return Err(Status::invalid_argument("member id must be non-zero"));
+ }
+ if req.peer_urls.is_empty() {
+ return Err(Status::invalid_argument(
+ "member add requires at least one peer URL",
+ ));
+ }
+
+ let member = CoreClusterMember {
+ id: req.id,
+ name: if req.name.trim().is_empty() {
+ format!("node-{}", req.id)
+ } else {
+ req.name
+ },
+ peer_urls: req.peer_urls,
+ client_urls: req.client_urls,
+ is_learner: req.is_learner,
+ };
+
+ let membership = self
+ .raft
+ .add_member(member.clone())
+ .await
+ .map_err(map_raft_error)?;
+ let revision = self.raft.last_applied().await;
+ let applied_member = membership.member(member.id).cloned().unwrap_or(member);
+
+ Ok(Response::new(MemberAddResponse {
+ header: Some(self.make_header(revision).await),
+ member: Some(Self::proto_member(&applied_member)),
+ members: Self::proto_members(&membership),
+ }))
+ }
+
+ async fn member_remove(
+ &self,
+ request: Request,
+ ) -> Result, Status> {
+ let req = request.into_inner();
+ debug!(member_id = req.id, "Member remove request");
+
+ if req.id == 0 {
+ return Err(Status::invalid_argument("member id must be non-zero"));
+ }
+
+ let membership = self
+ .raft
+ .remove_member(req.id)
+ .await
+ .map_err(map_raft_error)?;
+ let revision = self.raft.last_applied().await;
+
+ Ok(Response::new(MemberRemoveResponse {
+ header: Some(self.make_header(revision).await),
+ members: Self::proto_members(&membership),
+ }))
+ }
+
async fn member_list(
&self,
_request: Request,
) -> Result, Status> {
debug!("Member list request");
+ let revision = self.raft.last_applied().await;
+ let membership = self.raft.cluster_membership().await;
Ok(Response::new(MemberListResponse {
- header: Some(self.make_header(0)),
- members: self.get_member_list().await,
+ header: Some(self.make_header(revision).await),
+ members: Self::proto_members(&membership),
}))
}
@@ -86,7 +163,7 @@ impl Cluster for ClusterServiceImpl {
let last_applied = self.raft.last_applied().await;
Ok(Response::new(StatusResponse {
- header: Some(self.make_header(last_applied)),
+ header: Some(self.make_header(last_applied).await),
version: self.version.clone(),
db_size: 0,
leader: leader.unwrap_or(0),
@@ -95,4 +172,30 @@ impl Cluster for ClusterServiceImpl {
raft_applied_index: last_applied,
}))
}
+
+ async fn leader_transfer(
+ &self,
+ request: Request,
+ ) -> Result, Status> {
+ let req = request.into_inner();
+ debug!(target_id = req.target_id, "Leader transfer request");
+
+ if req.target_id == 0 {
+ return Err(Status::invalid_argument(
+ "leader transfer target must be non-zero",
+ ));
+ }
+
+ let leader = self
+ .raft
+ .transfer_leader(req.target_id)
+ .await
+ .map_err(map_raft_error)?;
+ let revision = self.raft.last_applied().await;
+
+ Ok(Response::new(LeaderTransferResponse {
+ header: Some(self.make_header(revision).await),
+ leader,
+ }))
+ }
}
diff --git a/chainfire/crates/chainfire-api/src/internal_service.rs b/chainfire/crates/chainfire-api/src/internal_service.rs
index ab77877..0ea29d0 100644
--- a/chainfire/crates/chainfire-api/src/internal_service.rs
+++ b/chainfire/crates/chainfire-api/src/internal_service.rs
@@ -5,8 +5,9 @@
use crate::internal_proto::{
raft_service_server::RaftService, AppendEntriesRequest as ProtoAppendEntriesRequest,
- AppendEntriesResponse as ProtoAppendEntriesResponse, VoteRequest as ProtoVoteRequest,
- VoteResponse as ProtoVoteResponse,
+ AppendEntriesResponse as ProtoAppendEntriesResponse, EntryType as ProtoEntryType,
+ TimeoutNowRequest as ProtoTimeoutNowRequest, TimeoutNowResponse as ProtoTimeoutNowResponse,
+ VoteRequest as ProtoVoteRequest, VoteResponse as ProtoVoteResponse,
};
use chainfire_raft::core::{AppendEntriesRequest, RaftCore, VoteRequest};
use chainfire_storage::{EntryPayload, LogEntry as RaftLogEntry, LogId};
@@ -31,6 +32,32 @@ impl RaftServiceImpl {
}
}
+fn decode_log_entry(
+ entry: crate::internal_proto::LogEntry,
+) -> Result, Status> {
+ let payload = match ProtoEntryType::try_from(entry.entry_type).unwrap_or(ProtoEntryType::Blank)
+ {
+ ProtoEntryType::Blank => EntryPayload::Blank,
+ ProtoEntryType::Normal => {
+ let command = bincode::deserialize::(&entry.data).map_err(|err| {
+ Status::invalid_argument(format!(
+ "failed to decode normal raft entry payload: {err}"
+ ))
+ })?;
+ EntryPayload::Normal(command)
+ }
+ ProtoEntryType::Membership => EntryPayload::Membership(entry.data),
+ };
+
+ Ok(RaftLogEntry {
+ log_id: LogId {
+ term: entry.term,
+ index: entry.index,
+ },
+ payload,
+ })
+}
+
#[tonic::async_trait]
impl RaftService for RaftServiceImpl {
async fn vote(
@@ -91,26 +118,8 @@ impl RaftService for RaftServiceImpl {
let entries: Vec> = req
.entries
.into_iter()
- .map(|e| {
- let payload = if e.data.is_empty() {
- EntryPayload::Blank
- } else {
- // Deserialize the command from the entry data
- match bincode::deserialize::(&e.data) {
- Ok(cmd) => EntryPayload::Normal(cmd),
- Err(_) => EntryPayload::Blank,
- }
- };
-
- RaftLogEntry {
- log_id: LogId {
- term: e.term,
- index: e.index,
- },
- payload,
- }
- })
- .collect();
+ .map(decode_log_entry)
+ .collect::, _>>()?;
let append_req = AppendEntriesRequest {
term: req.term,
@@ -140,4 +149,48 @@ impl RaftService for RaftServiceImpl {
}))
}
+ async fn timeout_now(
+ &self,
+ _request: Request,
+ ) -> Result, Status> {
+ let (resp_tx, resp_rx) = oneshot::channel();
+ self.raft.timeout_now_rpc(resp_tx).await;
+
+ let result = resp_rx.await.map_err(|e| {
+ warn!(error = %e, "TimeoutNow request channel closed");
+ Status::internal("TimeoutNow request failed: channel closed")
+ })?;
+
+ let term = self.raft.current_term().await;
+ match result {
+ Ok(()) => Ok(Response::new(ProtoTimeoutNowResponse {
+ accepted: true,
+ term,
+ })),
+ Err(err) => Err(Status::failed_precondition(err.to_string())),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use chainfire_storage::EntryPayload;
+
+ #[test]
+ fn decode_log_entry_preserves_membership_payloads() {
+ let expected = vec![1, 2, 3, 4];
+ let decoded = decode_log_entry(crate::internal_proto::LogEntry {
+ index: 7,
+ term: 3,
+ data: expected.clone(),
+ entry_type: ProtoEntryType::Membership as i32,
+ })
+ .expect("decode membership entry");
+
+ match decoded.payload {
+ EntryPayload::Membership(bytes) => assert_eq!(bytes, expected),
+ other => panic!("expected membership payload, got {other:?}"),
+ }
+ }
}
diff --git a/chainfire/crates/chainfire-api/src/raft_client.rs b/chainfire/crates/chainfire-api/src/raft_client.rs
index 36a5f3a..acb1f22 100644
--- a/chainfire/crates/chainfire-api/src/raft_client.rs
+++ b/chainfire/crates/chainfire-api/src/raft_client.rs
@@ -5,7 +5,8 @@
use crate::internal_proto::{
raft_service_client::RaftServiceClient, AppendEntriesRequest as ProtoAppendEntriesRequest,
- LogEntry as ProtoLogEntry, VoteRequest as ProtoVoteRequest,
+ EntryType as ProtoEntryType, LogEntry as ProtoLogEntry,
+ TimeoutNowRequest as ProtoTimeoutNowRequest, VoteRequest as ProtoVoteRequest,
};
use chainfire_raft::network::{RaftNetworkError, RaftRpcClient};
use chainfire_types::NodeId;
@@ -241,6 +242,30 @@ impl Default for GrpcRaftClient {
#[async_trait::async_trait]
impl RaftRpcClient for GrpcRaftClient {
+ async fn add_node(&self, target: NodeId, addr: String) -> Result<(), RaftNetworkError> {
+ GrpcRaftClient::add_node(self, target, addr).await;
+ Ok(())
+ }
+
+ async fn remove_node(&self, target: NodeId) -> Result<(), RaftNetworkError> {
+ GrpcRaftClient::remove_node(self, target).await;
+ Ok(())
+ }
+
+ async fn timeout_now(&self, target: NodeId) -> Result<(), RaftNetworkError> {
+ trace!(target = target, "Sending timeout-now request");
+
+ self.with_retry(target, "timeout_now", || async {
+ let mut client = self.get_client(target).await?;
+ client
+ .timeout_now(ProtoTimeoutNowRequest {})
+ .await
+ .map_err(|e| RaftNetworkError::RpcFailed(e.to_string()))?;
+ Ok(())
+ })
+ .await
+ }
+
async fn vote(
&self,
target: NodeId,
@@ -286,17 +311,22 @@ impl RaftRpcClient for GrpcRaftClient {
);
// Clone entries once for potential retries
- let entries_data: Vec<(u64, u64, Vec)> = req
+ let entries_data: Vec<(u64, u64, Vec, i32)> = req
.entries
.iter()
.map(|e| {
use chainfire_storage::EntryPayload;
- let data = match &e.payload {
- EntryPayload::Blank => vec![],
- EntryPayload::Normal(cmd) => bincode::serialize(cmd).unwrap_or_default(),
- EntryPayload::Membership(_) => vec![],
+ let (data, entry_type) = match &e.payload {
+ EntryPayload::Blank => (vec![], ProtoEntryType::Blank as i32),
+ EntryPayload::Normal(cmd) => (
+ bincode::serialize(cmd).unwrap_or_default(),
+ ProtoEntryType::Normal as i32,
+ ),
+ EntryPayload::Membership(bytes) => {
+ (bytes.clone(), ProtoEntryType::Membership as i32)
+ }
};
- (e.log_id.index, e.log_id.term, data)
+ (e.log_id.index, e.log_id.term, data, entry_type)
})
.collect();
@@ -313,7 +343,12 @@ impl RaftRpcClient for GrpcRaftClient {
let entries: Vec = entries_data
.into_iter()
- .map(|(index, term, data)| ProtoLogEntry { index, term, data })
+ .map(|(index, term, data, entry_type)| ProtoLogEntry {
+ index,
+ term,
+ data,
+ entry_type,
+ })
.collect();
let proto_req = ProtoAppendEntriesRequest {
diff --git a/chainfire/crates/chainfire-core/src/lib.rs b/chainfire/crates/chainfire-core/src/lib.rs
index 24a2f79..2a82dbd 100644
--- a/chainfire/crates/chainfire-core/src/lib.rs
+++ b/chainfire/crates/chainfire-core/src/lib.rs
@@ -1,6 +1,6 @@
//! Internal compatibility crate for workspace-local ChainFire types.
//!
-//! The supported ChainFire product surface is the fixed-membership
+//! The supported ChainFire product surface is the live-membership
//! `chainfire-server` / `chainfire-api` contract documented in the repository
//! root. This crate intentionally does not export an embeddable cluster,
//! membership-mutation, or distributed-KV API.
diff --git a/chainfire/crates/chainfire-raft/src/core.rs b/chainfire/crates/chainfire-raft/src/core.rs
index ed31295..74dfd06 100644
--- a/chainfire/crates/chainfire-raft/src/core.rs
+++ b/chainfire/crates/chainfire-raft/src/core.rs
@@ -9,7 +9,8 @@
//! - RaftTimer: Election and heartbeat timeout management
//! - Integration with existing chainfire-storage and network layers
-use std::collections::HashMap;
+use serde::{Deserialize, Serialize};
+use std::collections::{BTreeSet, HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{mpsc, oneshot, Mutex, RwLock};
@@ -24,6 +25,67 @@ pub type NodeId = u64;
pub type Term = u64;
pub type LogIndex = u64;
+/// Public member description replicated through membership-change log entries.
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
+pub struct ClusterMember {
+ /// Unique member ID.
+ pub id: NodeId,
+ /// Human-readable member name.
+ pub name: String,
+ /// Peer URLs used for Raft replication.
+ pub peer_urls: Vec,
+ /// Client URLs exposed for public APIs.
+ pub client_urls: Vec,
+ /// Whether this member is a learner.
+ pub is_learner: bool,
+}
+
+/// Replicated cluster membership payload.
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
+pub struct ClusterMembership {
+ /// Ordered member set.
+ pub members: Vec,
+}
+
+impl ClusterMembership {
+ /// Return a normalized copy sorted by member ID with duplicate IDs removed.
+ pub fn normalized(&self) -> Self {
+ let mut members = self.members.clone();
+ members.sort_by_key(|member| member.id);
+ members.dedup_by_key(|member| member.id);
+ Self { members }
+ }
+
+ /// Return all voting member IDs.
+ pub fn voter_ids(&self) -> Vec {
+ self.members
+ .iter()
+ .filter(|member| !member.is_learner)
+ .map(|member| member.id)
+ .collect()
+ }
+
+ /// Find a member by ID.
+ pub fn member(&self, id: NodeId) -> Option<&ClusterMember> {
+ self.members.iter().find(|member| member.id == id)
+ }
+
+ /// Insert or replace a member and return the normalized result.
+ pub fn with_member(&self, member: ClusterMember) -> Self {
+ let mut members = self.members.clone();
+ members.retain(|existing| existing.id != member.id);
+ members.push(member);
+ Self { members }.normalized()
+ }
+
+ /// Remove a member by ID and return the normalized result.
+ pub fn without_member(&self, id: NodeId) -> Self {
+ let mut members = self.members.clone();
+ members.retain(|member| member.id != id);
+ Self { members }.normalized()
+ }
+}
+
// ============================================================================
// Core Raft Types
// ============================================================================
@@ -60,7 +122,7 @@ pub struct VolatileState {
#[derive(Debug, Clone)]
pub struct CandidateState {
/// Nodes that have granted votes (includes self)
- pub votes_received: std::collections::HashSet,
+ pub votes_received: HashSet,
}
/// Volatile state on leaders (reinitialized after election)
@@ -144,6 +206,11 @@ pub enum RaftEvent {
command: RaftCommand,
response_tx: oneshot::Sender>,
},
+ /// Cluster membership change request.
+ MembershipChange {
+ membership: ClusterMembership,
+ response_tx: oneshot::Sender>,
+ },
/// RequestVote RPC received
VoteRequest {
req: VoteRequest,
@@ -154,6 +221,10 @@ pub enum RaftEvent {
req: AppendEntriesRequest,
response_tx: oneshot::Sender,
},
+ /// Immediate-election request received from the current leader.
+ TimeoutNow {
+ response_tx: oneshot::Sender>,
+ },
/// RequestVote RPC response received
VoteResponse { from: NodeId, resp: VoteResponse },
/// AppendEntries RPC response received
@@ -170,6 +241,7 @@ pub enum RaftEvent {
#[derive(Debug, Clone)]
pub enum RaftError {
NotLeader { leader_id: Option },
+ Rejected(String),
StorageError(String),
NetworkError(String),
Timeout,
@@ -181,6 +253,7 @@ impl std::fmt::Display for RaftError {
RaftError::NotLeader { leader_id } => {
write!(f, "Not leader, leader is: {:?}", leader_id)
}
+ RaftError::Rejected(msg) => write!(f, "Rejected: {}", msg),
RaftError::StorageError(msg) => write!(f, "Storage error: {}", msg),
RaftError::NetworkError(msg) => write!(f, "Network error: {}", msg),
RaftError::Timeout => write!(f, "Operation timed out"),
@@ -197,8 +270,12 @@ impl std::error::Error for RaftError {}
pub struct RaftCore {
/// This node's ID
node_id: NodeId,
- /// Cluster members (excluding self)
- peers: Vec,
+ /// Voting peers (excluding self).
+ peers: Arc>>,
+ /// Replication targets (voters and learners, excluding self).
+ replication_peers: Arc>>,
+ /// Current replicated membership information including endpoint metadata.
+ membership: Arc>,
/// Persistent state
persistent: Arc>,
@@ -251,17 +328,31 @@ impl Default for RaftConfig {
impl RaftCore {
pub fn new(
node_id: NodeId,
- peers: Vec,
+ membership: ClusterMembership,
storage: Arc,
state_machine: Arc,
network: Arc,
config: RaftConfig,
) -> Self {
let (event_tx, event_rx) = mpsc::unbounded_channel();
+ let membership = membership.normalized();
+ let peers = membership
+ .voter_ids()
+ .into_iter()
+ .filter(|id| *id != node_id)
+ .collect();
+ let replication_peers = membership
+ .members
+ .iter()
+ .map(|member| member.id)
+ .filter(|id| *id != node_id)
+ .collect();
Self {
node_id,
- peers,
+ peers: Arc::new(RwLock::new(peers)),
+ replication_peers: Arc::new(RwLock::new(replication_peers)),
+ membership: Arc::new(RwLock::new(membership)),
persistent: Arc::new(RwLock::new(PersistentState {
current_term: 0,
voted_for: None,
@@ -308,6 +399,41 @@ impl RaftCore {
)));
}
}
+
+ match self.storage.read_membership() {
+ Ok(Some(bytes)) => {
+ let membership: ClusterMembership = bincode::deserialize(&bytes).map_err(|e| {
+ RaftError::StorageError(format!(
+ "Failed to deserialize membership payload: {}",
+ e
+ ))
+ })?;
+ self.apply_runtime_membership(membership, false).await?;
+ tracing::info!("Loaded membership from storage");
+ }
+ Ok(None) => {
+ let membership = self.membership.read().await.clone();
+ let bytes = bincode::serialize(&membership).map_err(|e| {
+ RaftError::StorageError(format!(
+ "Failed to serialize initial membership payload: {}",
+ e
+ ))
+ })?;
+ self.storage.save_membership(&bytes).map_err(|e| {
+ RaftError::StorageError(format!(
+ "Failed to persist initial membership payload: {}",
+ e
+ ))
+ })?;
+ }
+ Err(e) => {
+ return Err(RaftError::StorageError(format!(
+ "Failed to load membership: {}",
+ e
+ )));
+ }
+ }
+
Ok(())
}
@@ -327,6 +453,138 @@ impl RaftCore {
Ok(())
}
+ async fn peers_snapshot(&self) -> Vec {
+ self.peers.read().await.clone()
+ }
+
+ async fn replication_targets_snapshot(&self) -> Vec {
+ self.replication_peers.read().await.clone()
+ }
+
+ async fn is_voting_member(&self, node_id: NodeId) -> bool {
+ self.membership
+ .read()
+ .await
+ .member(node_id)
+ .map(|member| !member.is_learner)
+ .unwrap_or(false)
+ }
+
+ async fn self_is_voting_member(&self) -> bool {
+ self.is_voting_member(self.node_id).await
+ }
+
+ fn serialize_membership(membership: &ClusterMembership) -> Result, RaftError> {
+ bincode::serialize(membership).map_err(|e| {
+ RaftError::StorageError(format!("Failed to serialize membership payload: {}", e))
+ })
+ }
+
+ async fn apply_runtime_membership(
+ &self,
+ membership: ClusterMembership,
+ persist: bool,
+ ) -> Result<(), RaftError> {
+ let membership = membership.normalized();
+ let old_membership = self.membership.read().await.clone();
+
+ for member in &membership.members {
+ if let Some(peer_url) = member.peer_urls.first() {
+ let addr = peer_url
+ .strip_prefix("http://")
+ .or_else(|| peer_url.strip_prefix("https://"))
+ .unwrap_or(peer_url)
+ .to_string();
+ self.network
+ .add_node(member.id, addr)
+ .await
+ .map_err(|e| RaftError::NetworkError(e.to_string()))?;
+ }
+ }
+
+ for removed in old_membership
+ .members
+ .iter()
+ .filter(|member| membership.member(member.id).is_none())
+ {
+ self.network
+ .remove_node(removed.id)
+ .await
+ .map_err(|e| RaftError::NetworkError(e.to_string()))?;
+ }
+
+ let peers = membership
+ .voter_ids()
+ .into_iter()
+ .filter(|id| *id != self.node_id)
+ .collect::>();
+ let replication_peers = membership
+ .members
+ .iter()
+ .map(|member| member.id)
+ .filter(|id| *id != self.node_id)
+ .collect::>();
+
+ {
+ let mut membership_guard = self.membership.write().await;
+ *membership_guard = membership.clone();
+ }
+ {
+ let mut peers_guard = self.peers.write().await;
+ *peers_guard = peers.clone();
+ }
+ {
+ let mut replication_guard = self.replication_peers.write().await;
+ *replication_guard = replication_peers.clone();
+ }
+
+ if !membership
+ .member(self.node_id)
+ .map(|member| !member.is_learner)
+ .unwrap_or(false)
+ {
+ *self.role.write().await = RaftRole::Follower;
+ *self.candidate_state.write().await = None;
+ *self.leader_state.write().await = None;
+ let mut volatile = self.volatile.write().await;
+ if volatile.current_leader == Some(self.node_id) {
+ volatile.current_leader = None;
+ }
+ }
+
+ if persist {
+ let bytes = Self::serialize_membership(&membership)?;
+ self.storage.save_membership(&bytes).map_err(|e| {
+ RaftError::StorageError(format!("Failed to save membership: {}", e))
+ })?;
+ }
+
+ let current_role = *self.role.read().await;
+ if current_role == RaftRole::Leader {
+ let (last_log_index, _) = self.get_last_log_info().await?;
+ let mut leader_state_guard = self.leader_state.write().await;
+ if let Some(leader_state) = leader_state_guard.as_mut() {
+ let old_next = leader_state.next_index.clone();
+ let old_match = leader_state.match_index.clone();
+ leader_state.next_index.clear();
+ leader_state.match_index.clear();
+ for peer_id in &replication_peers {
+ let next_index = old_next
+ .get(peer_id)
+ .copied()
+ .unwrap_or(1)
+ .min(last_log_index + 1);
+ leader_state.next_index.insert(*peer_id, next_index);
+ leader_state
+ .match_index
+ .insert(*peer_id, old_match.get(peer_id).copied().unwrap_or(0));
+ }
+ }
+ }
+
+ Ok(())
+ }
+
/// Start the Raft event loop
pub async fn run(&self) -> Result<(), RaftError> {
eprintln!("[Node {}] EVENT LOOP STARTING", self.node_id);
@@ -353,8 +611,10 @@ impl RaftCore {
RaftEvent::VoteRequest { .. } => "VoteRequest",
RaftEvent::VoteResponse { .. } => "VoteResponse",
RaftEvent::AppendEntries { .. } => "AppendEntries",
+ RaftEvent::TimeoutNow { .. } => "TimeoutNow",
RaftEvent::AppendEntriesResponse { .. } => "AppendEntriesResponse",
RaftEvent::ClientWrite { .. } => "ClientWrite",
+ RaftEvent::MembershipChange { .. } => "MembershipChange",
};
eprintln!("[Node {}] EVENT LOOP received: {}", self.node_id, event_type);
if let Err(e) = self.handle_event(event).await {
@@ -389,6 +649,13 @@ impl RaftCore {
let result = self.handle_client_write(command).await;
let _ = response_tx.send(result);
}
+ RaftEvent::MembershipChange {
+ membership,
+ response_tx,
+ } => {
+ let result = self.handle_membership_change(membership).await;
+ let _ = response_tx.send(result);
+ }
RaftEvent::VoteRequest { req, response_tx } => {
let resp = self.handle_vote_request(req).await?;
let _ = response_tx.send(resp);
@@ -401,6 +668,10 @@ impl RaftCore {
let resp = self.handle_append_entries(req).await?;
let _ = response_tx.send(resp);
}
+ RaftEvent::TimeoutNow { response_tx } => {
+ let result = self.handle_timeout_now().await;
+ let _ = response_tx.send(result);
+ }
RaftEvent::VoteResponse { from, resp } => {
self.handle_vote_response(from, resp).await?;
}
@@ -417,6 +688,10 @@ impl RaftCore {
/// Handle election timeout - transition to candidate and start election
async fn handle_election_timeout(&self) -> Result<(), RaftError> {
+ if !self.self_is_voting_member().await {
+ return Ok(());
+ }
+
let role = *self.role.read().await;
eprintln!(
@@ -463,11 +738,12 @@ impl RaftCore {
});
// Check if already have majority (single-node case)
- let cluster_size = self.peers.len() + 1;
+ let peers = self.peers_snapshot().await;
+ let cluster_size = peers.len() + 1;
let majority = cluster_size / 2 + 1;
eprintln!(
"[Node {}] Cluster size={}, majority={}, peers={:?}",
- self.node_id, cluster_size, majority, self.peers
+ self.node_id, cluster_size, majority, peers
);
if 1 >= majority {
// For single-node cluster, immediately become leader
@@ -491,7 +767,7 @@ impl RaftCore {
};
// Send vote requests in parallel
- for peer_id in &self.peers {
+ for peer_id in &peers {
let peer_id = *peer_id;
let network = self.network.clone();
let req = vote_request.clone();
@@ -515,6 +791,17 @@ impl RaftCore {
Ok(())
}
+ /// Handle TimeoutNow RPC by immediately starting an election on this voter.
+ async fn handle_timeout_now(&self) -> Result<(), RaftError> {
+ if !self.self_is_voting_member().await {
+ return Err(RaftError::NetworkError(
+ "timeout-now requires a voting member target".to_string(),
+ ));
+ }
+
+ self.handle_election_timeout().await
+ }
+
/// Handle RequestVote RPC
async fn handle_vote_request(&self, req: VoteRequest) -> Result {
let mut persistent = self.persistent.write().await;
@@ -538,6 +825,13 @@ impl RaftCore {
persistent = self.persistent.write().await;
}
+ if !self.self_is_voting_member().await {
+ return Ok(VoteResponse {
+ term: persistent.current_term,
+ vote_granted: false,
+ });
+ }
+
// Check if we can grant vote
let can_vote =
persistent.voted_for.is_none() || persistent.voted_for == Some(req.candidate_id);
@@ -605,13 +899,11 @@ impl RaftCore {
// Count votes
if resp.vote_granted {
+ let cluster_size = self.peers_snapshot().await.len() + 1;
+ let majority = cluster_size / 2 + 1;
let mut candidate_state_guard = self.candidate_state.write().await;
if let Some(candidate_state) = candidate_state_guard.as_mut() {
candidate_state.votes_received.insert(from);
-
- // Calculate majority (cluster size = peers + 1 for self)
- let cluster_size = self.peers.len() + 1;
- let majority = cluster_size / 2 + 1;
let votes_count = candidate_state.votes_received.len();
// If received majority, become leader
@@ -645,19 +937,15 @@ impl RaftCore {
match_index: HashMap::new(),
};
- for peer_id in &self.peers {
+ let replication_targets = self.replication_targets_snapshot().await;
+ for peer_id in &replication_targets {
leader_state.next_index.insert(*peer_id, next_index);
leader_state.match_index.insert(*peer_id, 0);
}
*self.leader_state.write().await = Some(leader_state);
- // Start sending heartbeats immediately
- self.event_tx
- .send(RaftEvent::HeartbeatTimeout)
- .map_err(|e| RaftError::NetworkError(format!("Failed to send heartbeat: {}", e)))?;
-
- Ok(())
+ self.append_blank_leader_entry().await
}
/// Step down to follower
@@ -694,13 +982,14 @@ impl RaftCore {
let term = self.persistent.read().await.current_term;
let (last_log_index, _) = self.get_last_log_info().await?;
+ let peers = self.replication_targets_snapshot().await;
eprintln!(
"[Node {}] Sending heartbeat to peers: {:?} (term={})",
- self.node_id, self.peers, term
+ self.node_id, peers, term
);
// Send AppendEntries (with entries if available) to all peers
- for peer_id in &self.peers {
+ for peer_id in &peers {
let peer_id = *peer_id;
// Read commit_index fresh for each peer to ensure it's up-to-date
@@ -1122,16 +1411,22 @@ impl RaftCore {
/// Advance commit index based on majority replication
async fn advance_commit_index(&self) -> Result<(), RaftError> {
- let leader_state = self.leader_state.read().await;
- if leader_state.is_none() {
+ let voter_followers = self.peers_snapshot().await;
+ let Some(match_indices_from_followers) = ({
+ let leader_state = self.leader_state.read().await;
+ leader_state.as_ref().map(|state| {
+ voter_followers
+ .iter()
+ .map(|peer_id| state.match_index.get(peer_id).copied().unwrap_or(0))
+ .collect::>()
+ })
+ }) else {
return Ok(()); // Not leader
- }
-
- let leader_state = leader_state.as_ref().unwrap();
+ };
// Collect all match_index values plus leader's own log
let (last_log_index, _) = self.get_last_log_info().await?;
- let mut match_indices: Vec = leader_state.match_index.values().copied().collect();
+ let mut match_indices = match_indices_from_followers;
// Add leader's own index
match_indices.push(last_log_index);
@@ -1183,9 +1478,10 @@ impl RaftCore {
/// Apply committed entries to state machine
async fn apply_committed_entries(&self) -> Result<(), RaftError> {
- let mut volatile = self.volatile.write().await;
- let commit_index = volatile.commit_index;
- let last_applied = volatile.last_applied;
+ let (commit_index, last_applied) = {
+ let volatile = self.volatile.read().await;
+ (volatile.commit_index, volatile.last_applied)
+ };
if commit_index <= last_applied {
return Ok(()); // Nothing to apply
@@ -1201,26 +1497,51 @@ impl RaftCore {
// Apply each entry to state machine
for entry in &stored_entries {
- if let EntryPayload::Normal(data) = &entry.payload {
- // Deserialize the command
- let command: RaftCommand = bincode::deserialize(data).map_err(|e| {
- RaftError::StorageError(format!("Failed to deserialize for apply: {}", e))
- })?;
+ match &entry.payload {
+ EntryPayload::Normal(data) => {
+ // Deserialize the command
+ let command: RaftCommand = bincode::deserialize(data).map_err(|e| {
+ RaftError::StorageError(format!("Failed to deserialize for apply: {}", e))
+ })?;
- self.state_machine.apply(command).map_err(|e| {
- RaftError::StorageError(format!("Failed to apply to state machine: {}", e))
- })?;
+ self.state_machine.apply(command).map_err(|e| {
+ RaftError::StorageError(format!("Failed to apply to state machine: {}", e))
+ })?;
- debug!(
- index = entry.log_id.index,
- term = entry.log_id.term,
- "Applied entry to state machine"
- );
+ debug!(
+ index = entry.log_id.index,
+ term = entry.log_id.term,
+ "Applied entry to state machine"
+ );
+ }
+ EntryPayload::Membership(bytes) => {
+ let membership: ClusterMembership =
+ bincode::deserialize(bytes).map_err(|e| {
+ RaftError::StorageError(format!(
+ "Failed to deserialize membership for apply: {}",
+ e
+ ))
+ })?;
+ let removing_or_demoting_self = membership
+ .member(self.node_id)
+ .map(|member| member.is_learner)
+ .unwrap_or(true);
+ if removing_or_demoting_self && self.role().await == RaftRole::Leader {
+ self.handle_heartbeat_timeout().await?;
+ }
+ self.apply_runtime_membership(membership, true).await?;
+ debug!(
+ index = entry.log_id.index,
+ term = entry.log_id.term,
+ "Applied membership change"
+ );
+ }
+ EntryPayload::Blank => {}
}
}
// Update last_applied
- volatile.last_applied = commit_index;
+ self.volatile.write().await.last_applied = commit_index;
debug!(
last_applied = commit_index,
@@ -1235,11 +1556,66 @@ impl RaftCore {
// P3: Client Requests
// ========================================================================
+ async fn handle_membership_change(
+ &self,
+ membership: ClusterMembership,
+ ) -> Result {
+ let role = *self.role.read().await;
+ if role != RaftRole::Leader {
+ return Err(RaftError::NotLeader {
+ leader_id: self.volatile.read().await.current_leader,
+ });
+ }
+
+ let current_membership = self.membership.read().await.clone();
+ let membership = membership.normalized();
+ Self::validate_membership_transition(¤t_membership, &membership)?;
+ if current_membership == membership {
+ let (last_log_index, _) = self.get_last_log_info().await?;
+ return Ok(last_log_index);
+ }
+ if self.has_pending_membership_change().await? {
+ return Err(RaftError::Rejected(
+ "another membership change is still in flight".to_string(),
+ ));
+ }
+
+ let term = self.persistent.read().await.current_term;
+ let (last_log_index, _) = self.get_last_log_info().await?;
+ let new_index = last_log_index + 1;
+ let payload = Self::serialize_membership(&membership)?;
+ let entry: LogEntry> = LogEntry {
+ log_id: LogId {
+ term,
+ index: new_index,
+ },
+ payload: EntryPayload::Membership(payload),
+ };
+
+ self.storage.append(&[entry]).map_err(|e| {
+ RaftError::StorageError(format!("Failed to append membership entry: {}", e))
+ })?;
+
+ self.event_tx
+ .send(RaftEvent::HeartbeatTimeout)
+ .map_err(|e| {
+ RaftError::NetworkError(format!("Failed to trigger membership replication: {}", e))
+ })?;
+
+ if self.peers_snapshot().await.is_empty() {
+ self.advance_commit_index().await?;
+ }
+
+ Ok(new_index)
+ }
+
async fn handle_client_write(&self, command: RaftCommand) -> Result<(), RaftError> {
let role = *self.role.read().await;
if role != RaftRole::Leader {
- return Err(RaftError::NotLeader { leader_id: None });
+ return Err(RaftError::NotLeader {
+ leader_id: self.volatile.read().await.current_leader,
+ });
}
// Get current term and last log index
@@ -1320,7 +1696,7 @@ impl RaftCore {
})?;
// Single-node cluster: immediately commit since we're the only voter
- if self.peers.is_empty() {
+ if self.peers_snapshot().await.is_empty() {
self.advance_commit_index().await?;
}
@@ -1331,6 +1707,171 @@ impl RaftCore {
Ok(())
}
+ async fn append_blank_leader_entry(&self) -> Result<(), RaftError> {
+ let term = self.persistent.read().await.current_term;
+ let (last_log_index, _) = self.get_last_log_info().await?;
+ let new_index = last_log_index + 1;
+ let entry: LogEntry> = LogEntry {
+ log_id: LogId {
+ term,
+ index: new_index,
+ },
+ payload: EntryPayload::Blank,
+ };
+
+ self.storage
+ .append(&[entry])
+ .map_err(|e| RaftError::StorageError(format!("Failed to append blank entry: {}", e)))?;
+
+ self.event_tx
+ .send(RaftEvent::HeartbeatTimeout)
+ .map_err(|e| {
+ RaftError::NetworkError(format!("Failed to trigger blank-entry replication: {}", e))
+ })?;
+
+ if self.peers_snapshot().await.is_empty() {
+ self.advance_commit_index().await?;
+ }
+
+ Ok(())
+ }
+
+ async fn has_pending_membership_change(&self) -> Result {
+ let last_applied = self.last_applied().await;
+ let (last_log_index, _) = self.get_last_log_info().await?;
+ if last_log_index <= last_applied {
+ return Ok(false);
+ }
+
+ let entries: Vec>> = self
+ .storage
+ .get_log_entries((last_applied + 1)..=last_log_index)
+ .map_err(|e| {
+ RaftError::StorageError(format!(
+ "Failed to inspect pending membership changes: {}",
+ e
+ ))
+ })?;
+
+ Ok(entries
+ .iter()
+ .any(|entry| matches!(entry.payload, EntryPayload::Membership(_))))
+ }
+
+ fn validate_membership_change(membership: &ClusterMembership) -> Result<(), RaftError> {
+ if membership.members.is_empty() {
+ return Err(RaftError::Rejected(
+ "membership change must keep at least one member".to_string(),
+ ));
+ }
+ if membership.voter_ids().is_empty() {
+ return Err(RaftError::Rejected(
+ "membership change must keep at least one voting member".to_string(),
+ ));
+ }
+ Ok(())
+ }
+
+ fn validate_membership_transition(
+ current: &ClusterMembership,
+ target: &ClusterMembership,
+ ) -> Result<(), RaftError> {
+ Self::validate_membership_change(target)?;
+
+ let current_voters = current.voter_ids().into_iter().collect::>();
+ let target_voters = target.voter_ids().into_iter().collect::>();
+ let added_voters = target_voters
+ .difference(¤t_voters)
+ .copied()
+ .collect::>();
+ let removed_voters = current_voters
+ .difference(&target_voters)
+ .copied()
+ .collect::>();
+
+ if added_voters.is_empty() && removed_voters.is_empty() {
+ return Ok(());
+ }
+
+ if !added_voters.is_empty() && !removed_voters.is_empty() {
+ return Err(RaftError::Rejected(format!(
+ "membership change cannot add voters {:?} and remove voters {:?} in the same step; use sequential one-voter transitions until joint consensus is implemented",
+ added_voters, removed_voters
+ )));
+ }
+
+ if added_voters.len() > 1 || removed_voters.len() > 1 {
+ return Err(RaftError::Rejected(format!(
+ "membership change modifies multiple voting members in one step (added {:?}, removed {:?}); use sequential one-voter transitions until joint consensus is implemented",
+ added_voters, removed_voters
+ )));
+ }
+
+ Ok(())
+ }
+
+ async fn wait_for_transfer_target_caught_up(
+ &self,
+ target: NodeId,
+ timeout: Duration,
+ ) -> Result<(), RaftError> {
+ let start = time::Instant::now();
+ loop {
+ if *self.role.read().await != RaftRole::Leader {
+ return Err(RaftError::NotLeader {
+ leader_id: self.volatile.read().await.current_leader,
+ });
+ }
+
+ let (last_log_index, _) = self.get_last_log_info().await?;
+ let match_index = {
+ let leader_state = self.leader_state.read().await;
+ leader_state
+ .as_ref()
+ .and_then(|state| state.match_index.get(&target).copied())
+ .unwrap_or(0)
+ };
+
+ if match_index >= last_log_index {
+ return Ok(());
+ }
+
+ self.event_tx
+ .send(RaftEvent::HeartbeatTimeout)
+ .map_err(|e| {
+ RaftError::NetworkError(format!(
+ "Failed to trigger leader-transfer catch-up heartbeat: {}",
+ e
+ ))
+ })?;
+
+ if start.elapsed() > timeout {
+ return Err(RaftError::Timeout);
+ }
+
+ time::sleep(Duration::from_millis(20)).await;
+ }
+ }
+
+ async fn wait_for_observed_leader(
+ &self,
+ target: NodeId,
+ timeout: Duration,
+ ) -> Result<(), RaftError> {
+ let start = time::Instant::now();
+ loop {
+ if self.leader().await == Some(target) {
+ return Ok(());
+ }
+
+ if start.elapsed() > timeout {
+ return Err(RaftError::Timeout);
+ }
+
+ time::sleep(Duration::from_millis(20)).await;
+ }
+ }
+
// ========================================================================
// Helper Methods
// ========================================================================
@@ -1455,7 +1996,7 @@ impl RaftCore {
req,
response_tx: resp_tx,
});
- if let Err(e) = result {
+ if let Err(_e) = result {
eprintln!(
"[Node {}] ERROR: Failed to send AppendEntries event: channel closed",
self.node_id
@@ -1463,6 +2004,13 @@ impl RaftCore {
}
}
+ /// Inject TimeoutNow RPC (for testing or transport bridges).
+ pub async fn timeout_now_rpc(&self, resp_tx: oneshot::Sender>) {
+ let _ = self.event_tx.send(RaftEvent::TimeoutNow {
+ response_tx: resp_tx,
+ });
+ }
+
/// Get current leader
pub async fn leader(&self) -> Option {
self.volatile.read().await.current_leader
@@ -1526,6 +2074,104 @@ impl RaftCore {
}
}
+ /// Submit a membership change and wait until it is committed and applied.
+ pub async fn change_membership(
+ &self,
+ membership: ClusterMembership,
+ ) -> Result {
+ let target = membership.normalized();
+ let (tx, rx) = oneshot::channel();
+ self.event_tx
+ .send(RaftEvent::MembershipChange {
+ membership: target.clone(),
+ response_tx: tx,
+ })
+ .map_err(|e| {
+ RaftError::NetworkError(format!("Failed to send membership change: {}", e))
+ })?;
+
+ let target_index = rx.await.map_err(|e| {
+ RaftError::NetworkError(format!("Membership change response lost: {}", e))
+ })??;
+
+ let timeout = tokio::time::Duration::from_secs(10);
+ let start = tokio::time::Instant::now();
+ loop {
+ let applied = self.last_applied().await;
+ let current = self.membership.read().await.clone();
+ if applied >= target_index && current == target {
+ return Ok(current);
+ }
+
+ if start.elapsed() > timeout {
+ return Err(RaftError::Timeout);
+ }
+
+ tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
+ }
+ }
+
+ /// Add or update a member and wait for the change to be applied.
+ pub async fn add_member(&self, member: ClusterMember) -> Result {
+ let membership = self.membership.read().await.clone().with_member(member);
+ self.change_membership(membership).await
+ }
+
+ /// Remove a member and wait for the change to be applied.
+ pub async fn remove_member(&self, id: NodeId) -> Result {
+ let membership = self.membership.read().await.clone().without_member(id);
+ self.change_membership(membership).await
+ }
+
+ /// Transfer leadership to a specific voting member.
+ pub async fn transfer_leader(&self, target: NodeId) -> Result {
+ if target == 0 {
+ return Err(RaftError::Rejected(
+ "leader transfer target must be non-zero".to_string(),
+ ));
+ }
+
+ if *self.role.read().await != RaftRole::Leader {
+ return Err(RaftError::NotLeader {
+ leader_id: self.volatile.read().await.current_leader,
+ });
+ }
+
+ if target == self.node_id {
+ return Ok(target);
+ }
+
+ let membership = self.membership.read().await.clone();
+ let Some(member) = membership.member(target) else {
+ return Err(RaftError::Rejected(format!(
+ "leader transfer target {target} is not a cluster member"
+ )));
+ };
+ if member.is_learner {
+ return Err(RaftError::Rejected(format!(
+ "leader transfer target {target} must be a voting member"
+ )));
+ }
+ if self.has_pending_membership_change().await? {
+ return Err(RaftError::Rejected(
+ "cannot transfer leader while a membership change is still in flight".to_string(),
+ ));
+ }
+
+ self.wait_for_transfer_target_caught_up(target, Duration::from_secs(5))
+ .await?;
+
+ self.network
+ .timeout_now(target)
+ .await
+ .map_err(|e| RaftError::NetworkError(e.to_string()))?;
+
+ self.wait_for_observed_leader(target, Duration::from_secs(5))
+ .await?;
+
+ Ok(target)
+ }
+
/// Get current commit index
pub async fn commit_index(&self) -> LogIndex {
self.volatile.read().await.commit_index
@@ -1546,13 +2192,14 @@ impl RaftCore {
Arc::clone(&self.storage)
}
- /// Get current cluster membership as list of node IDs
- /// NOTE: Custom RaftCore uses static membership configured at startup
+ /// Get the current cluster membership details.
+ pub async fn cluster_membership(&self) -> ClusterMembership {
+ self.membership.read().await.clone()
+ }
+
+ /// Get current cluster membership as a sorted list of node IDs.
pub async fn membership(&self) -> Vec {
- let mut members = vec![self.node_id];
- members.extend(self.peers.iter().cloned());
- members.sort();
- members
+ self.cluster_membership().await.voter_ids()
}
}
@@ -1563,6 +2210,14 @@ impl RaftCore {
#[cfg(test)]
mod tests {
use super::*;
+ use crate::network::test_client::{InMemoryRpcClient, RpcMessage};
+ use chainfire_storage::RocksStore;
+ use std::future::Future;
+ use tempfile::{tempdir, TempDir};
+ use tokio::{
+ sync::mpsc,
+ time::{sleep, Duration, Instant},
+ };
#[test]
fn test_vote_request_creation() {
@@ -1577,8 +2232,635 @@ mod tests {
assert_eq!(req.candidate_id, 1);
}
+ fn member(id: NodeId, raft_addr: &str, client_url: &str) -> ClusterMember {
+ ClusterMember {
+ id,
+ name: format!("node-{id}"),
+ peer_urls: vec![raft_addr.to_string()],
+ client_urls: vec![client_url.to_string()],
+ is_learner: false,
+ }
+ }
+
+ fn learner_member(id: NodeId, raft_addr: &str, client_url: &str) -> ClusterMember {
+ ClusterMember {
+ is_learner: true,
+ ..member(id, raft_addr, client_url)
+ }
+ }
+
+ fn quiet_test_config() -> RaftConfig {
+ RaftConfig {
+ election_timeout_min: 10_000,
+ election_timeout_max: 20_000,
+ heartbeat_interval: 25,
+ }
+ }
+
+ struct TestClusterNode {
+ raft: Arc,
+ _dir: TempDir,
+ }
+
+ async fn spawn_single_node_leader(
+ store: RocksStore,
+ membership: ClusterMembership,
+ ) -> Arc {
+ let raft = Arc::new(RaftCore::new(
+ 1,
+ membership,
+ Arc::new(LogStorage::new(store.clone())),
+ Arc::new(StateMachine::new(store).expect("state machine")),
+ Arc::new(InMemoryRpcClient::new()) as Arc,
+ quiet_test_config(),
+ ));
+ raft.initialize().await.expect("initialize raft");
+
+ let raft_task = Arc::clone(&raft);
+ tokio::spawn(async move {
+ raft_task.run().await.expect("raft event loop");
+ });
+ sleep(Duration::from_millis(25)).await;
+
+ raft.become_leader().await.expect("become leader");
+ sleep(Duration::from_millis(25)).await;
+ raft
+ }
+
+ async fn spawn_cluster_node(
+ node_id: NodeId,
+ membership: ClusterMembership,
+ network: Arc,
+ ) -> TestClusterNode {
+ let dir = tempdir().expect("tempdir");
+ let store = RocksStore::new(dir.path()).expect("rocksdb store");
+ let raft = Arc::new(RaftCore::new(
+ node_id,
+ membership,
+ Arc::new(LogStorage::new(store.clone())),
+ Arc::new(StateMachine::new(store).expect("state machine")),
+ Arc::clone(&network) as Arc,
+ quiet_test_config(),
+ ));
+ raft.initialize().await.expect("initialize raft");
+
+ let (tx, mut rx) = mpsc::unbounded_channel();
+ network.register(node_id, tx).await;
+
+ let rpc_target = Arc::clone(&raft);
+ tokio::spawn(async move {
+ while let Some(message) = rx.recv().await {
+ match message {
+ RpcMessage::Vote(req, resp_tx) => {
+ rpc_target.request_vote_rpc(req, resp_tx).await;
+ }
+ RpcMessage::AppendEntries(req, resp_tx) => {
+ rpc_target.append_entries_rpc(req, resp_tx).await;
+ }
+ RpcMessage::TimeoutNow(resp_tx) => {
+ let (raft_resp_tx, raft_resp_rx) = tokio::sync::oneshot::channel();
+ rpc_target.timeout_now_rpc(raft_resp_tx).await;
+ let result = raft_resp_rx
+ .await
+ .map_err(|err| format!("TimeoutNow response lost: {err}"))
+ .and_then(|result| result.map_err(|err| err.to_string()));
+ let _ = resp_tx.send(result);
+ }
+ }
+ }
+ });
+
+ let raft_task = Arc::clone(&raft);
+ tokio::spawn(async move {
+ raft_task.run().await.expect("raft event loop");
+ });
+ sleep(Duration::from_millis(25)).await;
+
+ TestClusterNode { raft, _dir: dir }
+ }
+
+ async fn wait_until(label: &str, timeout: Duration, mut predicate: F)
+ where
+ F: FnMut() -> Fut,
+ Fut: Future