photoncloud-monorepo/chainfire/crates/chainfire-server/src/rest.rs

597 lines
18 KiB
Rust

//! REST HTTP API handlers for ChainFire
//!
//! Implements REST endpoints as specified in T050.S2:
//! - GET /api/v1/kv/{key} - Get value
//! - POST /api/v1/kv/{key}/put - Put value
//! - POST /api/v1/kv/{key}/delete - Delete key
//! - GET /api/v1/kv?prefix={prefix} - Range scan
//! - GET /api/v1/cluster/status - Cluster health
//! - POST /api/v1/cluster/members - Add member
use axum::{
extract::{Path, Query, State},
http::StatusCode,
routing::{get, post},
Json, Router,
};
use chainfire_api::GrpcRaftClient;
use chainfire_raft::{core::RaftError, RaftCore};
use chainfire_types::command::RaftCommand;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
/// REST API state
#[derive(Clone)]
pub struct RestApiState {
pub raft: Arc<RaftCore>,
pub cluster_id: u64,
pub rpc_client: Option<Arc<GrpcRaftClient>>,
pub http_client: reqwest::Client,
pub peer_http_addrs: Arc<HashMap<u64, String>>,
}
/// Standard REST error response
#[derive(Debug, Serialize, Deserialize)]
pub struct ErrorResponse {
pub error: ErrorDetail,
pub meta: ResponseMeta,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ErrorDetail {
pub code: String,
pub message: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub details: Option<serde_json::Value>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ResponseMeta {
pub request_id: String,
pub timestamp: String,
}
impl ResponseMeta {
fn new() -> Self {
Self {
request_id: uuid::Uuid::new_v4().to_string(),
timestamp: chrono::Utc::now().to_rfc3339(),
}
}
}
/// Standard REST success response
#[derive(Debug, Serialize, Deserialize)]
pub struct SuccessResponse<T> {
pub data: T,
pub meta: ResponseMeta,
}
impl<T> SuccessResponse<T> {
fn new(data: T) -> Self {
Self {
data,
meta: ResponseMeta::new(),
}
}
}
/// KV Put request body
#[derive(Debug, Serialize, Deserialize)]
pub struct PutRequest {
pub value: String,
}
/// KV Get response
#[derive(Debug, Serialize, Deserialize)]
pub struct GetResponse {
pub key: String,
pub value: String,
}
/// KV List response
#[derive(Debug, Serialize, Deserialize)]
pub struct ListResponse {
pub items: Vec<KvItem>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct KvItem {
pub key: String,
pub value: String,
}
/// Cluster status response
#[derive(Debug, Serialize)]
pub struct ClusterStatusResponse {
pub node_id: u64,
pub cluster_id: u64,
pub term: u64,
pub role: String,
pub is_leader: bool,
}
/// Add member request
#[derive(Debug, Deserialize)]
pub struct AddMemberRequest {
pub node_id: u64,
pub raft_addr: String,
}
/// Add member request (legacy format from first-boot-automation)
/// Accepts string id and converts to numeric node_id
#[derive(Debug, Deserialize)]
pub struct AddMemberRequestLegacy {
/// Node ID as string (e.g., "node01", "node02")
pub id: String,
pub raft_addr: String,
}
/// Query parameters for prefix scan
#[derive(Debug, Deserialize)]
pub struct PrefixQuery {
pub prefix: Option<String>,
pub consistency: Option<String>,
}
/// Query parameters for key reads
#[derive(Debug, Default, Deserialize)]
pub struct ReadQuery {
pub consistency: Option<String>,
}
/// Build the REST API router
pub fn build_router(state: RestApiState) -> Router {
Router::new()
// Wildcard route handles all keys (with or without slashes)
.route(
"/api/v1/kv/*key",
get(get_kv_wildcard)
.put(put_kv_wildcard)
.delete(delete_kv_wildcard),
)
.route("/api/v1/kv", get(list_kv))
.route("/api/v1/cluster/status", get(cluster_status))
.route("/api/v1/cluster/members", post(add_member))
// Legacy endpoint for first-boot-automation compatibility
.route("/admin/member/add", post(add_member_legacy))
.route("/health", get(health_check))
.with_state(state)
}
/// Health check endpoint
async fn health_check() -> (StatusCode, Json<SuccessResponse<serde_json::Value>>) {
(
StatusCode::OK,
Json(SuccessResponse::new(
serde_json::json!({ "status": "healthy" }),
)),
)
}
/// GET /api/v1/kv/*key - Get value (wildcard for all keys)
async fn get_kv_wildcard(
State(state): State<RestApiState>,
Path(key): Path<String>,
Query(query): Query<ReadQuery>,
) -> Result<Json<SuccessResponse<GetResponse>>, (StatusCode, Json<ErrorResponse>)> {
// Use key as-is for simple keys, prepend / for namespaced keys
// Keys like "testkey" stay as "testkey", keys like "flaredb/stores/1" become "/flaredb/stores/1"
let full_key = if key.contains('/') {
format!("/{}", key)
} else {
key.clone()
};
if should_proxy_read(query.consistency.as_deref(), &state).await {
return proxy_read_to_leader(
&state,
&format!("/api/v1/kv/{}", full_key.trim_start_matches('/')),
None,
)
.await;
}
let sm = state.raft.state_machine();
let key_bytes = full_key.as_bytes().to_vec();
let results = sm.kv().get(&key_bytes).map_err(|e| {
error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"INTERNAL_ERROR",
&e.to_string(),
)
})?;
let value = results
.into_iter()
.next()
.ok_or_else(|| error_response(StatusCode::NOT_FOUND, "NOT_FOUND", "Key not found"))?;
Ok(Json(SuccessResponse::new(GetResponse {
key: full_key,
value: String::from_utf8_lossy(&value.value).to_string(),
})))
}
/// PUT /api/v1/kv/*key - Put value (wildcard for all keys)
async fn put_kv_wildcard(
State(state): State<RestApiState>,
Path(key): Path<String>,
Json(req): Json<PutRequest>,
) -> Result<(StatusCode, Json<SuccessResponse<serde_json::Value>>), (StatusCode, Json<ErrorResponse>)>
{
// Use key as-is for simple keys, prepend / for namespaced keys
let full_key = if key.contains('/') {
format!("/{}", key)
} else {
key.clone()
};
let command = RaftCommand::Put {
key: full_key.as_bytes().to_vec(),
value: req.value.as_bytes().to_vec(),
lease_id: None,
prev_kv: false,
};
submit_rest_write(&state, command, Some(&req), &full_key, reqwest::Method::PUT).await?;
Ok((
StatusCode::OK,
Json(SuccessResponse::new(
serde_json::json!({ "key": full_key, "success": true }),
)),
))
}
/// DELETE /api/v1/kv/*key - Delete key (wildcard for all keys)
async fn delete_kv_wildcard(
State(state): State<RestApiState>,
Path(key): Path<String>,
) -> Result<(StatusCode, Json<SuccessResponse<serde_json::Value>>), (StatusCode, Json<ErrorResponse>)>
{
// Use key as-is for simple keys, prepend / for namespaced keys
let full_key = if key.contains('/') {
format!("/{}", key)
} else {
key.clone()
};
let command = RaftCommand::Delete {
key: full_key.as_bytes().to_vec(),
prev_kv: false,
};
submit_rest_write(&state, command, None, &full_key, reqwest::Method::DELETE).await?;
Ok((
StatusCode::OK,
Json(SuccessResponse::new(
serde_json::json!({ "key": full_key, "success": true }),
)),
))
}
/// GET /api/v1/kv?prefix={prefix} - Range scan
async fn list_kv(
State(state): State<RestApiState>,
Query(params): Query<PrefixQuery>,
) -> Result<Json<SuccessResponse<ListResponse>>, (StatusCode, Json<ErrorResponse>)> {
if should_proxy_read(params.consistency.as_deref(), &state).await {
let query = params
.prefix
.as_ref()
.map(|prefix| vec![("prefix", prefix.as_str())]);
return proxy_read_to_leader(&state, "/api/v1/kv", query.as_deref()).await;
}
let prefix = params.prefix.unwrap_or_default();
let sm = state.raft.state_machine();
let start_key = prefix.as_bytes().to_vec();
let end_key = format!("{}~", prefix).as_bytes().to_vec();
let results = sm.kv().range(&start_key, Some(&end_key)).map_err(|e| {
error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"INTERNAL_ERROR",
&e.to_string(),
)
})?;
let items: Vec<KvItem> = results
.into_iter()
.map(|kv| KvItem {
key: String::from_utf8_lossy(&kv.key).to_string(),
value: String::from_utf8_lossy(&kv.value).to_string(),
})
.collect();
Ok(Json(SuccessResponse::new(ListResponse { items })))
}
/// GET /api/v1/cluster/status - Cluster health
async fn cluster_status(
State(state): State<RestApiState>,
) -> Result<Json<SuccessResponse<ClusterStatusResponse>>, (StatusCode, Json<ErrorResponse>)> {
let node_id = state.raft.node_id();
let role = state.raft.role().await;
let leader_id = state.raft.leader().await;
let is_leader = leader_id == Some(node_id);
let term = state.raft.current_term().await;
Ok(Json(SuccessResponse::new(ClusterStatusResponse {
node_id,
cluster_id: state.cluster_id,
term,
role: format!("{:?}", role),
is_leader,
})))
}
/// Convert string node ID to numeric (e.g., "node01" -> 1, "node02" -> 2)
fn string_to_node_id(s: &str) -> u64 {
// Try to extract number from string like "node01", "node02"
if let Some(num_str) = s.strip_prefix("node") {
if let Ok(num) = num_str.parse::<u64>() {
return num;
}
}
// Fallback: use hash of the string
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
s.hash(&mut hasher);
hasher.finish()
}
/// POST /api/v1/cluster/members - Add member
async fn add_member(
State(state): State<RestApiState>,
Json(req): Json<AddMemberRequest>,
) -> Result<(StatusCode, Json<SuccessResponse<serde_json::Value>>), (StatusCode, Json<ErrorResponse>)>
{
let rpc_client = state.rpc_client.as_ref().ok_or_else(|| {
error_response(
StatusCode::SERVICE_UNAVAILABLE,
"SERVICE_UNAVAILABLE",
"RPC client not available",
)
})?;
// Add node to RPC client's routing table
rpc_client
.add_node(req.node_id, req.raft_addr.clone())
.await;
// Note: RaftCore doesn't have add_peer() - members are managed via configuration
// For now, we just register the node in the RPC client
// In a full implementation, this would trigger a Raft configuration change
Ok((
StatusCode::CREATED,
Json(SuccessResponse::new(serde_json::json!({
"node_id": req.node_id,
"raft_addr": req.raft_addr,
"success": true,
"note": "Node registered in RPC client routing table"
}))),
))
}
/// POST /admin/member/add - Add member (legacy format for first-boot-automation)
async fn add_member_legacy(
State(state): State<RestApiState>,
Json(req): Json<AddMemberRequestLegacy>,
) -> Result<(StatusCode, Json<SuccessResponse<serde_json::Value>>), (StatusCode, Json<ErrorResponse>)>
{
let node_id = string_to_node_id(&req.id);
let rpc_client = state.rpc_client.as_ref().ok_or_else(|| {
error_response(
StatusCode::SERVICE_UNAVAILABLE,
"SERVICE_UNAVAILABLE",
"RPC client not available",
)
})?;
// Add node to RPC client's routing table
rpc_client.add_node(node_id, req.raft_addr.clone()).await;
Ok((
StatusCode::CREATED,
Json(SuccessResponse::new(serde_json::json!({
"id": req.id,
"node_id": node_id,
"raft_addr": req.raft_addr,
"success": true,
"note": "Node registered in RPC client routing table (legacy API)"
}))),
))
}
/// Helper to create error response
fn error_response(
status: StatusCode,
code: &str,
message: &str,
) -> (StatusCode, Json<ErrorResponse>) {
(
status,
Json(ErrorResponse {
error: ErrorDetail {
code: code.to_string(),
message: message.to_string(),
details: None,
},
meta: ResponseMeta::new(),
}),
)
}
async fn submit_rest_write(
state: &RestApiState,
command: RaftCommand,
body: Option<&PutRequest>,
key: &str,
method: reqwest::Method,
) -> Result<(), (StatusCode, Json<ErrorResponse>)> {
match state.raft.client_write(command).await {
Ok(()) => Ok(()),
Err(RaftError::NotLeader { leader_id }) => {
let resolved_leader = match leader_id {
Some(leader_id) => Some(leader_id),
None => state.raft.leader().await,
};
proxy_write_to_leader(state, resolved_leader, key, method, body).await
}
Err(err) => Err(error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"INTERNAL_ERROR",
&err.to_string(),
)),
}
}
async fn proxy_write_to_leader(
state: &RestApiState,
leader_id: Option<u64>,
key: &str,
method: reqwest::Method,
body: Option<&PutRequest>,
) -> Result<(), (StatusCode, Json<ErrorResponse>)> {
let leader_id = leader_id.ok_or_else(|| {
error_response(
StatusCode::SERVICE_UNAVAILABLE,
"NOT_LEADER",
"current node is not the leader and no leader is known yet",
)
})?;
let leader_http_addr = state.peer_http_addrs.get(&leader_id).ok_or_else(|| {
error_response(
StatusCode::SERVICE_UNAVAILABLE,
"NOT_LEADER",
&format!("leader {leader_id} is known but has no HTTP endpoint mapping"),
)
})?;
let url = format!(
"{}/api/v1/kv/{}",
leader_http_addr.trim_end_matches('/'),
key.trim_start_matches('/')
);
let mut request = state.http_client.request(method, &url);
if let Some(body) = body {
request = request.json(body);
}
let response = request.send().await.map_err(|err| {
error_response(
StatusCode::BAD_GATEWAY,
"LEADER_PROXY_FAILED",
&format!("failed to forward write to leader {leader_id}: {err}"),
)
})?;
if response.status().is_success() {
return Ok(());
}
let status =
StatusCode::from_u16(response.status().as_u16()).unwrap_or(StatusCode::BAD_GATEWAY);
let payload = response
.json::<ErrorResponse>()
.await
.unwrap_or_else(|err| ErrorResponse {
error: ErrorDetail {
code: "LEADER_PROXY_FAILED".to_string(),
message: format!("leader {leader_id} returned {status}: {err}"),
details: None,
},
meta: ResponseMeta::new(),
});
Err((status, Json(payload)))
}
async fn should_proxy_read(consistency: Option<&str>, state: &RestApiState) -> bool {
let node_id = state.raft.node_id();
let leader_id = state.raft.leader().await;
read_requires_leader_proxy(consistency, node_id, leader_id)
}
fn read_requires_leader_proxy(
consistency: Option<&str>,
node_id: u64,
leader_id: Option<u64>,
) -> bool {
if matches!(consistency, Some(mode) if mode.eq_ignore_ascii_case("local")) {
return false;
}
matches!(leader_id, Some(leader_id) if leader_id != node_id)
}
async fn proxy_read_to_leader<T>(
state: &RestApiState,
path: &str,
query: Option<&[(&str, &str)]>,
) -> Result<Json<SuccessResponse<T>>, (StatusCode, Json<ErrorResponse>)>
where
T: for<'de> Deserialize<'de>,
{
let leader_id = state.raft.leader().await.ok_or_else(|| {
error_response(
StatusCode::SERVICE_UNAVAILABLE,
"NOT_LEADER",
"current node is not the leader and no leader is known yet",
)
})?;
let leader_http_addr = state.peer_http_addrs.get(&leader_id).ok_or_else(|| {
error_response(
StatusCode::SERVICE_UNAVAILABLE,
"NOT_LEADER",
&format!("leader {leader_id} is known but has no HTTP endpoint mapping"),
)
})?;
let url = format!("{}{}", leader_http_addr.trim_end_matches('/'), path);
let mut request = state.http_client.get(&url);
if let Some(query) = query {
request = request.query(query);
}
let response = request.send().await.map_err(|err| {
error_response(
StatusCode::BAD_GATEWAY,
"LEADER_PROXY_FAILED",
&format!("failed to forward read to leader {leader_id}: {err}"),
)
})?;
if response.status().is_success() {
let payload = response.json::<SuccessResponse<T>>().await.map_err(|err| {
error_response(
StatusCode::BAD_GATEWAY,
"LEADER_PROXY_FAILED",
&format!("failed to decode leader {leader_id} response: {err}"),
)
})?;
return Ok(Json(payload));
}
let status =
StatusCode::from_u16(response.status().as_u16()).unwrap_or(StatusCode::BAD_GATEWAY);
let payload = response
.json::<ErrorResponse>()
.await
.unwrap_or_else(|err| ErrorResponse {
error: ErrorDetail {
code: "LEADER_PROXY_FAILED".to_string(),
message: format!("leader {leader_id} returned {status}: {err}"),
details: None,
},
meta: ResponseMeta::new(),
});
Err((status, Json(payload)))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn read_requires_leader_proxy_defaults_to_leader_consistency() {
assert!(read_requires_leader_proxy(None, 2, Some(1)));
assert!(!read_requires_leader_proxy(Some("local"), 2, Some(1)));
assert!(!read_requires_leader_proxy(None, 2, Some(2)));
assert!(!read_requires_leader_proxy(None, 2, None));
}
}