841 lines
26 KiB
Rust
841 lines
26 KiB
Rust
//! REST HTTP API handlers for ChainFire
|
|
//!
|
|
//! Implements REST endpoints as specified in T050.S2:
|
|
//! - GET /api/v1/kv/{key} - Get value
|
|
//! - POST /api/v1/kv/{key}/put - Put value
|
|
//! - POST /api/v1/kv/{key}/delete - Delete key
|
|
//! - GET /api/v1/kv?prefix={prefix} - Range scan
|
|
//! - GET /api/v1/cluster/status - Cluster health
|
|
//! - POST /api/v1/cluster/members - Add member
|
|
//! - POST /api/v1/cluster/leader/transfer - Transfer cluster leadership
|
|
|
|
use axum::{
|
|
extract::{Path, Query, State},
|
|
http::StatusCode,
|
|
routing::{delete, get, post},
|
|
Json, Router,
|
|
};
|
|
use chainfire_raft::{
|
|
core::{ClusterMember, RaftError},
|
|
RaftCore,
|
|
};
|
|
use chainfire_types::command::RaftCommand;
|
|
use serde::{Deserialize, Serialize};
|
|
use std::sync::Arc;
|
|
|
|
/// REST API state
|
|
#[derive(Clone)]
|
|
pub struct RestApiState {
|
|
pub raft: Arc<RaftCore>,
|
|
pub cluster_id: u64,
|
|
pub http_client: reqwest::Client,
|
|
pub http_port: u16,
|
|
}
|
|
|
|
/// Standard REST error response
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
pub struct ErrorResponse {
|
|
pub error: ErrorDetail,
|
|
pub meta: ResponseMeta,
|
|
}
|
|
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
pub struct ErrorDetail {
|
|
pub code: String,
|
|
pub message: String,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub details: Option<serde_json::Value>,
|
|
}
|
|
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
pub struct ResponseMeta {
|
|
pub request_id: String,
|
|
pub timestamp: String,
|
|
}
|
|
|
|
impl ResponseMeta {
|
|
fn new() -> Self {
|
|
Self {
|
|
request_id: uuid::Uuid::new_v4().to_string(),
|
|
timestamp: chrono::Utc::now().to_rfc3339(),
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Standard REST success response
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
pub struct SuccessResponse<T> {
|
|
pub data: T,
|
|
pub meta: ResponseMeta,
|
|
}
|
|
|
|
impl<T> SuccessResponse<T> {
|
|
fn new(data: T) -> Self {
|
|
Self {
|
|
data,
|
|
meta: ResponseMeta::new(),
|
|
}
|
|
}
|
|
}
|
|
|
|
/// KV Put request body
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
pub struct PutRequest {
|
|
pub value: String,
|
|
}
|
|
|
|
/// KV Get response
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
pub struct GetResponse {
|
|
pub key: String,
|
|
pub value: String,
|
|
}
|
|
|
|
/// KV List response
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
pub struct ListResponse {
|
|
pub items: Vec<KvItem>,
|
|
}
|
|
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
pub struct KvItem {
|
|
pub key: String,
|
|
pub value: String,
|
|
}
|
|
|
|
/// Cluster status response
|
|
#[derive(Debug, Serialize)]
|
|
pub struct ClusterStatusResponse {
|
|
pub node_id: u64,
|
|
pub cluster_id: u64,
|
|
pub term: u64,
|
|
pub role: String,
|
|
pub is_leader: bool,
|
|
}
|
|
|
|
/// Add member request
|
|
#[derive(Debug, Deserialize, Serialize)]
|
|
pub struct AddMemberRequest {
|
|
pub node_id: u64,
|
|
pub raft_addr: String,
|
|
#[serde(default)]
|
|
pub client_url: Option<String>,
|
|
#[serde(default)]
|
|
pub name: Option<String>,
|
|
#[serde(default)]
|
|
pub is_learner: bool,
|
|
}
|
|
|
|
/// Add member request (legacy format from first-boot-automation)
|
|
/// Accepts string id and converts to numeric node_id
|
|
#[derive(Debug, Deserialize, Serialize)]
|
|
pub struct AddMemberRequestLegacy {
|
|
/// Node ID as string (e.g., "node01", "node02")
|
|
pub id: String,
|
|
pub raft_addr: String,
|
|
}
|
|
|
|
/// Remove member request body.
|
|
#[derive(Debug, Deserialize)]
|
|
pub struct RemoveMemberRequest {
|
|
pub node_id: u64,
|
|
}
|
|
|
|
/// Leader-transfer request body.
|
|
#[derive(Debug, Deserialize, Serialize)]
|
|
pub struct LeaderTransferRequest {
|
|
pub target_id: u64,
|
|
}
|
|
|
|
/// Query parameters for prefix scan
|
|
#[derive(Debug, Deserialize)]
|
|
pub struct PrefixQuery {
|
|
pub prefix: Option<String>,
|
|
pub consistency: Option<String>,
|
|
}
|
|
|
|
/// Query parameters for key reads
|
|
#[derive(Debug, Default, Deserialize)]
|
|
pub struct ReadQuery {
|
|
pub consistency: Option<String>,
|
|
}
|
|
|
|
/// Build the REST API router
|
|
pub fn build_router(state: RestApiState) -> Router {
|
|
Router::new()
|
|
// Wildcard route handles all keys (with or without slashes)
|
|
.route(
|
|
"/api/v1/kv/*key",
|
|
get(get_kv_wildcard)
|
|
.put(put_kv_wildcard)
|
|
.delete(delete_kv_wildcard),
|
|
)
|
|
.route("/api/v1/kv", get(list_kv))
|
|
.route("/api/v1/cluster/status", get(cluster_status))
|
|
.route("/api/v1/cluster/members", post(add_member))
|
|
.route("/api/v1/cluster/leader/transfer", post(transfer_leader))
|
|
.route("/api/v1/cluster/members/:node_id", delete(remove_member))
|
|
// Legacy endpoint for first-boot-automation compatibility
|
|
.route("/admin/member/add", post(add_member_legacy))
|
|
.route("/health", get(health_check))
|
|
.with_state(state)
|
|
}
|
|
|
|
/// Health check endpoint
|
|
async fn health_check() -> (StatusCode, Json<SuccessResponse<serde_json::Value>>) {
|
|
(
|
|
StatusCode::OK,
|
|
Json(SuccessResponse::new(
|
|
serde_json::json!({ "status": "healthy" }),
|
|
)),
|
|
)
|
|
}
|
|
|
|
/// GET /api/v1/kv/*key - Get value (wildcard for all keys)
|
|
async fn get_kv_wildcard(
|
|
State(state): State<RestApiState>,
|
|
Path(key): Path<String>,
|
|
Query(query): Query<ReadQuery>,
|
|
) -> Result<Json<SuccessResponse<GetResponse>>, (StatusCode, Json<ErrorResponse>)> {
|
|
// Use key as-is for simple keys, prepend / for namespaced keys
|
|
// Keys like "testkey" stay as "testkey", keys like "flaredb/stores/1" become "/flaredb/stores/1"
|
|
let full_key = if key.contains('/') {
|
|
format!("/{}", key)
|
|
} else {
|
|
key.clone()
|
|
};
|
|
if should_proxy_read(query.consistency.as_deref(), &state).await {
|
|
return proxy_read_to_leader(
|
|
&state,
|
|
&format!("/api/v1/kv/{}", full_key.trim_start_matches('/')),
|
|
None,
|
|
)
|
|
.await;
|
|
}
|
|
let sm = state.raft.state_machine();
|
|
let key_bytes = full_key.as_bytes().to_vec();
|
|
|
|
let results = sm.kv().get(&key_bytes).map_err(|e| {
|
|
error_response(
|
|
StatusCode::INTERNAL_SERVER_ERROR,
|
|
"INTERNAL_ERROR",
|
|
&e.to_string(),
|
|
)
|
|
})?;
|
|
|
|
let value = results
|
|
.into_iter()
|
|
.next()
|
|
.ok_or_else(|| error_response(StatusCode::NOT_FOUND, "NOT_FOUND", "Key not found"))?;
|
|
|
|
Ok(Json(SuccessResponse::new(GetResponse {
|
|
key: full_key,
|
|
value: String::from_utf8_lossy(&value.value).to_string(),
|
|
})))
|
|
}
|
|
|
|
/// PUT /api/v1/kv/*key - Put value (wildcard for all keys)
|
|
async fn put_kv_wildcard(
|
|
State(state): State<RestApiState>,
|
|
Path(key): Path<String>,
|
|
Json(req): Json<PutRequest>,
|
|
) -> Result<(StatusCode, Json<SuccessResponse<serde_json::Value>>), (StatusCode, Json<ErrorResponse>)>
|
|
{
|
|
// Use key as-is for simple keys, prepend / for namespaced keys
|
|
let full_key = if key.contains('/') {
|
|
format!("/{}", key)
|
|
} else {
|
|
key.clone()
|
|
};
|
|
let command = RaftCommand::Put {
|
|
key: full_key.as_bytes().to_vec(),
|
|
value: req.value.as_bytes().to_vec(),
|
|
lease_id: None,
|
|
prev_kv: false,
|
|
};
|
|
|
|
submit_rest_write(&state, command, Some(&req), &full_key, reqwest::Method::PUT).await?;
|
|
|
|
Ok((
|
|
StatusCode::OK,
|
|
Json(SuccessResponse::new(
|
|
serde_json::json!({ "key": full_key, "success": true }),
|
|
)),
|
|
))
|
|
}
|
|
|
|
/// DELETE /api/v1/kv/*key - Delete key (wildcard for all keys)
|
|
async fn delete_kv_wildcard(
|
|
State(state): State<RestApiState>,
|
|
Path(key): Path<String>,
|
|
) -> Result<(StatusCode, Json<SuccessResponse<serde_json::Value>>), (StatusCode, Json<ErrorResponse>)>
|
|
{
|
|
// Use key as-is for simple keys, prepend / for namespaced keys
|
|
let full_key = if key.contains('/') {
|
|
format!("/{}", key)
|
|
} else {
|
|
key.clone()
|
|
};
|
|
let command = RaftCommand::Delete {
|
|
key: full_key.as_bytes().to_vec(),
|
|
prev_kv: false,
|
|
};
|
|
|
|
submit_rest_write(&state, command, None, &full_key, reqwest::Method::DELETE).await?;
|
|
|
|
Ok((
|
|
StatusCode::OK,
|
|
Json(SuccessResponse::new(
|
|
serde_json::json!({ "key": full_key, "success": true }),
|
|
)),
|
|
))
|
|
}
|
|
|
|
/// GET /api/v1/kv?prefix={prefix} - Range scan
|
|
async fn list_kv(
|
|
State(state): State<RestApiState>,
|
|
Query(params): Query<PrefixQuery>,
|
|
) -> Result<Json<SuccessResponse<ListResponse>>, (StatusCode, Json<ErrorResponse>)> {
|
|
if should_proxy_read(params.consistency.as_deref(), &state).await {
|
|
let query = params
|
|
.prefix
|
|
.as_ref()
|
|
.map(|prefix| vec![("prefix", prefix.as_str())]);
|
|
return proxy_read_to_leader(&state, "/api/v1/kv", query.as_deref()).await;
|
|
}
|
|
let prefix = params.prefix.unwrap_or_default();
|
|
let sm = state.raft.state_machine();
|
|
|
|
let start_key = prefix.as_bytes().to_vec();
|
|
let end_key = format!("{}~", prefix).as_bytes().to_vec();
|
|
|
|
let results = sm.kv().range(&start_key, Some(&end_key)).map_err(|e| {
|
|
error_response(
|
|
StatusCode::INTERNAL_SERVER_ERROR,
|
|
"INTERNAL_ERROR",
|
|
&e.to_string(),
|
|
)
|
|
})?;
|
|
|
|
let items: Vec<KvItem> = results
|
|
.into_iter()
|
|
.map(|kv| KvItem {
|
|
key: String::from_utf8_lossy(&kv.key).to_string(),
|
|
value: String::from_utf8_lossy(&kv.value).to_string(),
|
|
})
|
|
.collect();
|
|
|
|
Ok(Json(SuccessResponse::new(ListResponse { items })))
|
|
}
|
|
|
|
/// GET /api/v1/cluster/status - Cluster health
|
|
async fn cluster_status(
|
|
State(state): State<RestApiState>,
|
|
) -> Result<Json<SuccessResponse<ClusterStatusResponse>>, (StatusCode, Json<ErrorResponse>)> {
|
|
let node_id = state.raft.node_id();
|
|
let role = state.raft.role().await;
|
|
let leader_id = state.raft.leader().await;
|
|
let is_leader = leader_id == Some(node_id);
|
|
let term = state.raft.current_term().await;
|
|
|
|
Ok(Json(SuccessResponse::new(ClusterStatusResponse {
|
|
node_id,
|
|
cluster_id: state.cluster_id,
|
|
term,
|
|
role: format!("{:?}", role),
|
|
is_leader,
|
|
})))
|
|
}
|
|
|
|
/// Convert string node ID to numeric (e.g., "node01" -> 1, "node02" -> 2)
|
|
fn string_to_node_id(s: &str) -> u64 {
|
|
// Try to extract number from string like "node01", "node02"
|
|
if let Some(num_str) = s.strip_prefix("node") {
|
|
if let Ok(num) = num_str.parse::<u64>() {
|
|
return num;
|
|
}
|
|
}
|
|
// Fallback: use hash of the string
|
|
use std::collections::hash_map::DefaultHasher;
|
|
use std::hash::{Hash, Hasher};
|
|
let mut hasher = DefaultHasher::new();
|
|
s.hash(&mut hasher);
|
|
hasher.finish()
|
|
}
|
|
|
|
fn cluster_operation_error(err: &RaftError) -> (StatusCode, &'static str, String) {
|
|
match err {
|
|
RaftError::Rejected(message) => (
|
|
StatusCode::PRECONDITION_FAILED,
|
|
"PRECONDITION_FAILED",
|
|
message.clone(),
|
|
),
|
|
RaftError::Timeout => (
|
|
StatusCode::REQUEST_TIMEOUT,
|
|
"TIMEOUT",
|
|
"cluster operation timed out".to_string(),
|
|
),
|
|
_ => (
|
|
StatusCode::INTERNAL_SERVER_ERROR,
|
|
"INTERNAL_ERROR",
|
|
err.to_string(),
|
|
),
|
|
}
|
|
}
|
|
|
|
/// POST /api/v1/cluster/members - Add member
|
|
async fn add_member(
|
|
State(state): State<RestApiState>,
|
|
Json(req): Json<AddMemberRequest>,
|
|
) -> Result<(StatusCode, Json<SuccessResponse<serde_json::Value>>), (StatusCode, Json<ErrorResponse>)>
|
|
{
|
|
let member = ClusterMember {
|
|
id: req.node_id,
|
|
name: req
|
|
.name
|
|
.clone()
|
|
.filter(|value| !value.trim().is_empty())
|
|
.unwrap_or_else(|| format!("node-{}", req.node_id)),
|
|
peer_urls: vec![normalize_peer_url(&req.raft_addr)],
|
|
client_urls: req.client_url.clone().into_iter().collect(),
|
|
is_learner: req.is_learner,
|
|
};
|
|
|
|
match state.raft.add_member(member).await {
|
|
Ok(membership) => {
|
|
return Ok((
|
|
StatusCode::CREATED,
|
|
Json(SuccessResponse::new(serde_json::json!({
|
|
"node_id": req.node_id,
|
|
"raft_addr": req.raft_addr,
|
|
"members": membership.members.len(),
|
|
"success": true
|
|
}))),
|
|
));
|
|
}
|
|
Err(RaftError::NotLeader { leader_id }) => {
|
|
return proxy_cluster_write_to_leader(
|
|
&state,
|
|
leader_id,
|
|
"/api/v1/cluster/members",
|
|
reqwest::Method::POST,
|
|
Some(serde_json::to_value(&req).map_err(|err| {
|
|
error_response(
|
|
StatusCode::INTERNAL_SERVER_ERROR,
|
|
"INTERNAL_ERROR",
|
|
&format!("failed to encode add-member request: {err}"),
|
|
)
|
|
})?),
|
|
)
|
|
.await;
|
|
}
|
|
Err(err) => {
|
|
let (status, code, message) = cluster_operation_error(&err);
|
|
return Err(error_response(status, code, &message));
|
|
}
|
|
}
|
|
}
|
|
|
|
/// POST /admin/member/add - Add member (legacy format for first-boot-automation)
|
|
async fn add_member_legacy(
|
|
State(state): State<RestApiState>,
|
|
Json(req): Json<AddMemberRequestLegacy>,
|
|
) -> Result<(StatusCode, Json<SuccessResponse<serde_json::Value>>), (StatusCode, Json<ErrorResponse>)>
|
|
{
|
|
let node_id = string_to_node_id(&req.id);
|
|
add_member(
|
|
State(state),
|
|
Json(AddMemberRequest {
|
|
node_id,
|
|
raft_addr: req.raft_addr,
|
|
client_url: None,
|
|
name: Some(req.id),
|
|
is_learner: false,
|
|
}),
|
|
)
|
|
.await
|
|
}
|
|
|
|
/// DELETE /api/v1/cluster/members/:node_id - Remove member.
|
|
async fn remove_member(
|
|
State(state): State<RestApiState>,
|
|
Path(node_id): Path<u64>,
|
|
) -> Result<(StatusCode, Json<SuccessResponse<serde_json::Value>>), (StatusCode, Json<ErrorResponse>)>
|
|
{
|
|
match state.raft.remove_member(node_id).await {
|
|
Ok(membership) => Ok((
|
|
StatusCode::OK,
|
|
Json(SuccessResponse::new(serde_json::json!({
|
|
"node_id": node_id,
|
|
"members": membership.members.len(),
|
|
"success": true
|
|
}))),
|
|
)),
|
|
Err(RaftError::NotLeader { leader_id }) => {
|
|
proxy_cluster_write_to_leader(
|
|
&state,
|
|
leader_id,
|
|
&format!("/api/v1/cluster/members/{node_id}"),
|
|
reqwest::Method::DELETE,
|
|
None,
|
|
)
|
|
.await
|
|
}
|
|
Err(err) => {
|
|
let (status, code, message) = cluster_operation_error(&err);
|
|
Err(error_response(status, code, &message))
|
|
}
|
|
}
|
|
}
|
|
|
|
/// POST /api/v1/cluster/leader/transfer - Transfer cluster leadership.
|
|
async fn transfer_leader(
|
|
State(state): State<RestApiState>,
|
|
Json(req): Json<LeaderTransferRequest>,
|
|
) -> Result<(StatusCode, Json<SuccessResponse<serde_json::Value>>), (StatusCode, Json<ErrorResponse>)>
|
|
{
|
|
if req.target_id == 0 {
|
|
return Err(error_response(
|
|
StatusCode::BAD_REQUEST,
|
|
"INVALID_ARGUMENT",
|
|
"leader transfer target must be non-zero",
|
|
));
|
|
}
|
|
|
|
match state.raft.transfer_leader(req.target_id).await {
|
|
Ok(leader) => Ok((
|
|
StatusCode::OK,
|
|
Json(SuccessResponse::new(serde_json::json!({
|
|
"leader": leader,
|
|
"success": true
|
|
}))),
|
|
)),
|
|
Err(RaftError::NotLeader { leader_id }) => {
|
|
proxy_cluster_write_to_leader(
|
|
&state,
|
|
leader_id,
|
|
"/api/v1/cluster/leader/transfer",
|
|
reqwest::Method::POST,
|
|
Some(serde_json::to_value(&req).map_err(|err| {
|
|
error_response(
|
|
StatusCode::INTERNAL_SERVER_ERROR,
|
|
"INTERNAL_ERROR",
|
|
&format!("failed to encode leader-transfer request: {err}"),
|
|
)
|
|
})?),
|
|
)
|
|
.await
|
|
}
|
|
Err(err) => {
|
|
let (status, code, message) = cluster_operation_error(&err);
|
|
Err(error_response(status, code, &message))
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Helper to create error response
|
|
fn error_response(
|
|
status: StatusCode,
|
|
code: &str,
|
|
message: &str,
|
|
) -> (StatusCode, Json<ErrorResponse>) {
|
|
(
|
|
status,
|
|
Json(ErrorResponse {
|
|
error: ErrorDetail {
|
|
code: code.to_string(),
|
|
message: message.to_string(),
|
|
details: None,
|
|
},
|
|
meta: ResponseMeta::new(),
|
|
}),
|
|
)
|
|
}
|
|
|
|
fn normalize_peer_url(raft_addr: &str) -> String {
|
|
if raft_addr.contains("://") {
|
|
raft_addr.to_string()
|
|
} else {
|
|
format!("http://{raft_addr}")
|
|
}
|
|
}
|
|
|
|
fn http_endpoint_from_peer_url(peer_url: &str, http_port: u16) -> Option<String> {
|
|
let trimmed = peer_url
|
|
.strip_prefix("http://")
|
|
.or_else(|| peer_url.strip_prefix("https://"))
|
|
.unwrap_or(peer_url);
|
|
if let Ok(addr) = trimmed.parse::<std::net::SocketAddr>() {
|
|
return Some(format!("http://{}:{}", addr.ip(), http_port));
|
|
}
|
|
let (host, _) = trimmed.rsplit_once(':')?;
|
|
Some(format!("http://{}:{}", host, http_port))
|
|
}
|
|
|
|
async fn leader_http_addr(
|
|
state: &RestApiState,
|
|
leader_id: u64,
|
|
) -> Result<String, (StatusCode, Json<ErrorResponse>)> {
|
|
let membership = state.raft.cluster_membership().await;
|
|
let leader = membership.member(leader_id).ok_or_else(|| {
|
|
error_response(
|
|
StatusCode::SERVICE_UNAVAILABLE,
|
|
"NOT_LEADER",
|
|
&format!("leader {leader_id} is known but has no membership record"),
|
|
)
|
|
})?;
|
|
let peer_url = leader.peer_urls.first().ok_or_else(|| {
|
|
error_response(
|
|
StatusCode::SERVICE_UNAVAILABLE,
|
|
"NOT_LEADER",
|
|
&format!("leader {leader_id} is known but has no peer URL"),
|
|
)
|
|
})?;
|
|
http_endpoint_from_peer_url(peer_url, state.http_port).ok_or_else(|| {
|
|
error_response(
|
|
StatusCode::SERVICE_UNAVAILABLE,
|
|
"NOT_LEADER",
|
|
&format!("leader {leader_id} peer URL {peer_url} cannot be mapped to HTTP"),
|
|
)
|
|
})
|
|
}
|
|
|
|
async fn submit_rest_write(
|
|
state: &RestApiState,
|
|
command: RaftCommand,
|
|
body: Option<&PutRequest>,
|
|
key: &str,
|
|
method: reqwest::Method,
|
|
) -> Result<(), (StatusCode, Json<ErrorResponse>)> {
|
|
match state.raft.client_write(command).await {
|
|
Ok(()) => Ok(()),
|
|
Err(RaftError::NotLeader { leader_id }) => {
|
|
let resolved_leader = match leader_id {
|
|
Some(leader_id) => Some(leader_id),
|
|
None => state.raft.leader().await,
|
|
};
|
|
proxy_write_to_leader(state, resolved_leader, key, method, body).await
|
|
}
|
|
Err(err) => Err(error_response(
|
|
StatusCode::INTERNAL_SERVER_ERROR,
|
|
"INTERNAL_ERROR",
|
|
&err.to_string(),
|
|
)),
|
|
}
|
|
}
|
|
|
|
async fn proxy_write_to_leader(
|
|
state: &RestApiState,
|
|
leader_id: Option<u64>,
|
|
key: &str,
|
|
method: reqwest::Method,
|
|
body: Option<&PutRequest>,
|
|
) -> Result<(), (StatusCode, Json<ErrorResponse>)> {
|
|
let leader_id = leader_id.ok_or_else(|| {
|
|
error_response(
|
|
StatusCode::SERVICE_UNAVAILABLE,
|
|
"NOT_LEADER",
|
|
"current node is not the leader and no leader is known yet",
|
|
)
|
|
})?;
|
|
let leader_http_addr = leader_http_addr(state, leader_id).await?;
|
|
let url = format!(
|
|
"{}/api/v1/kv/{}",
|
|
leader_http_addr.trim_end_matches('/'),
|
|
key.trim_start_matches('/')
|
|
);
|
|
let mut request = state.http_client.request(method, &url);
|
|
if let Some(body) = body {
|
|
request = request.json(body);
|
|
}
|
|
let response = request.send().await.map_err(|err| {
|
|
error_response(
|
|
StatusCode::BAD_GATEWAY,
|
|
"LEADER_PROXY_FAILED",
|
|
&format!("failed to forward write to leader {leader_id}: {err}"),
|
|
)
|
|
})?;
|
|
if response.status().is_success() {
|
|
return Ok(());
|
|
}
|
|
let status =
|
|
StatusCode::from_u16(response.status().as_u16()).unwrap_or(StatusCode::BAD_GATEWAY);
|
|
let payload = response
|
|
.json::<ErrorResponse>()
|
|
.await
|
|
.unwrap_or_else(|err| ErrorResponse {
|
|
error: ErrorDetail {
|
|
code: "LEADER_PROXY_FAILED".to_string(),
|
|
message: format!("leader {leader_id} returned {status}: {err}"),
|
|
details: None,
|
|
},
|
|
meta: ResponseMeta::new(),
|
|
});
|
|
Err((status, Json(payload)))
|
|
}
|
|
|
|
async fn proxy_cluster_write_to_leader(
|
|
state: &RestApiState,
|
|
leader_id: Option<u64>,
|
|
path: &str,
|
|
method: reqwest::Method,
|
|
body: Option<serde_json::Value>,
|
|
) -> Result<(StatusCode, Json<SuccessResponse<serde_json::Value>>), (StatusCode, Json<ErrorResponse>)>
|
|
{
|
|
let leader_id = leader_id.ok_or_else(|| {
|
|
error_response(
|
|
StatusCode::SERVICE_UNAVAILABLE,
|
|
"NOT_LEADER",
|
|
"current node is not the leader and no leader is known yet",
|
|
)
|
|
})?;
|
|
let leader_http_addr = leader_http_addr(state, leader_id).await?;
|
|
let url = format!("{}{}", leader_http_addr.trim_end_matches('/'), path);
|
|
let mut request = state.http_client.request(method, &url);
|
|
if let Some(body) = body {
|
|
request = request.json(&body);
|
|
}
|
|
let response = request.send().await.map_err(|err| {
|
|
error_response(
|
|
StatusCode::BAD_GATEWAY,
|
|
"LEADER_PROXY_FAILED",
|
|
&format!("failed to forward cluster write to leader {leader_id}: {err}"),
|
|
)
|
|
})?;
|
|
if response.status().is_success() {
|
|
let status = StatusCode::from_u16(response.status().as_u16()).unwrap_or(StatusCode::OK);
|
|
let payload = response
|
|
.json::<SuccessResponse<serde_json::Value>>()
|
|
.await
|
|
.map_err(|err| {
|
|
error_response(
|
|
StatusCode::BAD_GATEWAY,
|
|
"LEADER_PROXY_FAILED",
|
|
&format!("failed to decode leader {leader_id} response: {err}"),
|
|
)
|
|
})?;
|
|
return Ok((status, Json(payload)));
|
|
}
|
|
let status =
|
|
StatusCode::from_u16(response.status().as_u16()).unwrap_or(StatusCode::BAD_GATEWAY);
|
|
let payload = response
|
|
.json::<ErrorResponse>()
|
|
.await
|
|
.unwrap_or_else(|err| ErrorResponse {
|
|
error: ErrorDetail {
|
|
code: "LEADER_PROXY_FAILED".to_string(),
|
|
message: format!("leader {leader_id} returned {status}: {err}"),
|
|
details: None,
|
|
},
|
|
meta: ResponseMeta::new(),
|
|
});
|
|
Err((status, Json(payload)))
|
|
}
|
|
|
|
async fn should_proxy_read(consistency: Option<&str>, state: &RestApiState) -> bool {
|
|
let node_id = state.raft.node_id();
|
|
let leader_id = state.raft.leader().await;
|
|
read_requires_leader_proxy(consistency, node_id, leader_id)
|
|
}
|
|
|
|
fn read_requires_leader_proxy(
|
|
consistency: Option<&str>,
|
|
node_id: u64,
|
|
leader_id: Option<u64>,
|
|
) -> bool {
|
|
if matches!(
|
|
consistency,
|
|
Some(mode)
|
|
if mode.eq_ignore_ascii_case("local")
|
|
|| mode.eq_ignore_ascii_case("serializable")
|
|
) {
|
|
return false;
|
|
}
|
|
matches!(leader_id, Some(leader_id) if leader_id != node_id)
|
|
}
|
|
|
|
async fn proxy_read_to_leader<T>(
|
|
state: &RestApiState,
|
|
path: &str,
|
|
query: Option<&[(&str, &str)]>,
|
|
) -> Result<Json<SuccessResponse<T>>, (StatusCode, Json<ErrorResponse>)>
|
|
where
|
|
T: for<'de> Deserialize<'de>,
|
|
{
|
|
let leader_id = state.raft.leader().await.ok_or_else(|| {
|
|
error_response(
|
|
StatusCode::SERVICE_UNAVAILABLE,
|
|
"NOT_LEADER",
|
|
"current node is not the leader and no leader is known yet",
|
|
)
|
|
})?;
|
|
let leader_http_addr = leader_http_addr(state, leader_id).await?;
|
|
let url = format!("{}{}", leader_http_addr.trim_end_matches('/'), path);
|
|
let mut request = state.http_client.get(&url);
|
|
if let Some(query) = query {
|
|
request = request.query(query);
|
|
}
|
|
let response = request.send().await.map_err(|err| {
|
|
error_response(
|
|
StatusCode::BAD_GATEWAY,
|
|
"LEADER_PROXY_FAILED",
|
|
&format!("failed to forward read to leader {leader_id}: {err}"),
|
|
)
|
|
})?;
|
|
if response.status().is_success() {
|
|
let payload = response.json::<SuccessResponse<T>>().await.map_err(|err| {
|
|
error_response(
|
|
StatusCode::BAD_GATEWAY,
|
|
"LEADER_PROXY_FAILED",
|
|
&format!("failed to decode leader {leader_id} response: {err}"),
|
|
)
|
|
})?;
|
|
return Ok(Json(payload));
|
|
}
|
|
let status =
|
|
StatusCode::from_u16(response.status().as_u16()).unwrap_or(StatusCode::BAD_GATEWAY);
|
|
let payload = response
|
|
.json::<ErrorResponse>()
|
|
.await
|
|
.unwrap_or_else(|err| ErrorResponse {
|
|
error: ErrorDetail {
|
|
code: "LEADER_PROXY_FAILED".to_string(),
|
|
message: format!("leader {leader_id} returned {status}: {err}"),
|
|
details: None,
|
|
},
|
|
meta: ResponseMeta::new(),
|
|
});
|
|
Err((status, Json(payload)))
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
|
|
#[test]
|
|
fn read_requires_leader_proxy_defaults_to_leader_consistency() {
|
|
assert!(read_requires_leader_proxy(None, 2, Some(1)));
|
|
assert!(!read_requires_leader_proxy(Some("local"), 2, Some(1)));
|
|
assert!(!read_requires_leader_proxy(
|
|
Some("serializable"),
|
|
2,
|
|
Some(1)
|
|
));
|
|
assert!(!read_requires_leader_proxy(
|
|
Some("SERIALIZABLE"),
|
|
2,
|
|
Some(1)
|
|
));
|
|
assert!(!read_requires_leader_proxy(None, 2, Some(2)));
|
|
assert!(!read_requires_leader_proxy(None, 2, None));
|
|
}
|
|
|
|
#[test]
|
|
fn cluster_operation_error_maps_rejected_to_precondition_failed() {
|
|
let (status, code, message) =
|
|
cluster_operation_error(&RaftError::Rejected("needs sequential reconfigure".into()));
|
|
assert_eq!(status, StatusCode::PRECONDITION_FAILED);
|
|
assert_eq!(code, "PRECONDITION_FAILED");
|
|
assert_eq!(message, "needs sequential reconfigure");
|
|
}
|
|
}
|