Implement FlareDB SQL routing and service CRUD

This commit is contained in:
centra 2026-04-01 02:07:55 +09:00
parent 23ec8b5edb
commit c17e5a6130
Signed by: centra
GPG key ID: 0C09689D20B25ACA
15 changed files with 1290 additions and 218 deletions

View file

@ -156,7 +156,9 @@ impl LbMetadataStore {
)
.execute(pool)
.await
.map_err(|e| MetadataError::Storage(format!("Failed to initialize Postgres schema: {}", e)))?;
.map_err(|e| {
MetadataError::Storage(format!("Failed to initialize Postgres schema: {}", e))
})?;
Ok(())
}
@ -169,7 +171,9 @@ impl LbMetadataStore {
)
.execute(pool)
.await
.map_err(|e| MetadataError::Storage(format!("Failed to initialize SQLite schema: {}", e)))?;
.map_err(|e| {
MetadataError::Storage(format!("Failed to initialize SQLite schema: {}", e))
})?;
Ok(())
}
@ -196,9 +200,7 @@ impl LbMetadataStore {
.bind(value)
.execute(pool.as_ref())
.await
.map_err(|e| {
MetadataError::Storage(format!("Postgres put failed: {}", e))
})?;
.map_err(|e| MetadataError::Storage(format!("Postgres put failed: {}", e)))?;
}
SqlStorageBackend::Sqlite(pool) => {
sqlx::query(
@ -439,6 +441,10 @@ impl LbMetadataStore {
format!("/fiberlb/healthchecks/{}/{}", pool_id, hc_id)
}
fn health_check_id_key(hc_id: &HealthCheckId) -> String {
format!("/fiberlb/healthcheck_ids/{}", hc_id)
}
fn health_check_prefix(pool_id: &PoolId) -> String {
format!("/fiberlb/healthchecks/{}/", pool_id)
}
@ -865,7 +871,8 @@ impl LbMetadataStore {
MetadataError::Serialization(format!("Failed to serialize health check: {}", e))
})?;
self.put(&key, &value).await
self.put(&key, &value).await?;
self.put(&Self::health_check_id_key(&hc.id), &key).await
}
/// Load health check
@ -886,6 +893,30 @@ impl LbMetadataStore {
}
}
/// Load health check by ID using the global ID index.
pub async fn load_health_check_by_id(
&self,
hc_id: &HealthCheckId,
) -> Result<Option<HealthCheck>> {
let id_key = Self::health_check_id_key(hc_id);
if let Some(hc_key) = self.get(&id_key).await? {
if let Some(value) = self.get(&hc_key).await? {
let hc: HealthCheck = serde_json::from_str(&value).map_err(|e| {
MetadataError::Serialization(format!(
"Failed to deserialize health check: {}",
e
))
})?;
Ok(Some(hc))
} else {
Ok(None)
}
} else {
Ok(None)
}
}
/// List health checks for a pool
pub async fn list_health_checks(&self, pool_id: &PoolId) -> Result<Vec<HealthCheck>> {
let prefix = Self::health_check_prefix(pool_id);
@ -907,7 +938,8 @@ impl LbMetadataStore {
/// Delete health check
pub async fn delete_health_check(&self, hc: &HealthCheck) -> Result<()> {
let key = Self::health_check_key(&hc.pool_id, &hc.id);
self.delete_key(&key).await
self.delete_key(&key).await?;
self.delete_key(&Self::health_check_id_key(&hc.id)).await
}
/// Delete all health checks for a pool
@ -1215,7 +1247,7 @@ fn normalize_transport_addr(endpoint: &str) -> String {
#[cfg(test)]
mod tests {
use super::*;
use fiberlb_types::{ListenerProtocol, PoolAlgorithm, PoolProtocol};
use fiberlb_types::{HealthCheck, ListenerProtocol, PoolAlgorithm, PoolProtocol};
#[tokio::test]
async fn test_lb_crud() {
@ -1355,6 +1387,55 @@ mod tests {
assert!(deleted.is_none());
}
#[tokio::test]
async fn test_health_check_crud() {
let store = LbMetadataStore::new_in_memory();
let lb = LoadBalancer::new("test-lb", "test-org", "test-project");
store.save_lb(&lb).await.unwrap();
let pool = Pool::new(
"web-pool",
lb.id,
PoolAlgorithm::RoundRobin,
PoolProtocol::Http,
);
store.save_pool(&pool).await.unwrap();
let hc = HealthCheck::new_http("http-check", pool.id, "/healthz");
store.save_health_check(&hc).await.unwrap();
let loaded = store
.load_health_check(&pool.id, &hc.id)
.await
.unwrap()
.unwrap();
assert_eq!(loaded.id, hc.id);
assert_eq!(loaded.name, "http-check");
let loaded_by_id = store
.load_health_check_by_id(&hc.id)
.await
.unwrap()
.unwrap();
assert_eq!(loaded_by_id.pool_id, pool.id);
let checks = store.list_health_checks(&pool.id).await.unwrap();
assert_eq!(checks.len(), 1);
store.delete_health_check(&hc).await.unwrap();
assert!(store
.load_health_check(&pool.id, &hc.id)
.await
.unwrap()
.is_none());
assert!(store
.load_health_check_by_id(&hc.id)
.await
.unwrap()
.is_none());
}
#[tokio::test]
async fn test_cascade_delete() {
let store = LbMetadataStore::new_in_memory();

View file

@ -2,17 +2,15 @@
use std::sync::Arc;
use base64::Engine as _;
use crate::metadata::LbMetadataStore;
use base64::Engine as _;
use fiberlb_api::{
health_check_service_server::HealthCheckService,
CreateHealthCheckRequest, CreateHealthCheckResponse,
DeleteHealthCheckRequest, DeleteHealthCheckResponse,
GetHealthCheckRequest, GetHealthCheckResponse,
ListHealthChecksRequest, ListHealthChecksResponse,
UpdateHealthCheckRequest, UpdateHealthCheckResponse,
HealthCheck as ProtoHealthCheck, HealthCheckType as ProtoHealthCheckType,
HttpHealthConfig as ProtoHttpHealthConfig,
health_check_service_server::HealthCheckService, CreateHealthCheckRequest,
CreateHealthCheckResponse, DeleteHealthCheckRequest, DeleteHealthCheckResponse,
GetHealthCheckRequest, GetHealthCheckResponse, HealthCheck as ProtoHealthCheck,
HealthCheckType as ProtoHealthCheckType, HttpHealthConfig as ProtoHttpHealthConfig,
ListHealthChecksRequest, ListHealthChecksResponse, UpdateHealthCheckRequest,
UpdateHealthCheckResponse,
};
use fiberlb_types::{HealthCheck, HealthCheckId, HealthCheckType, HttpHealthConfig, PoolId};
use iam_service_auth::{get_tenant_context, resource_for_tenant, AuthService};
@ -33,7 +31,10 @@ impl HealthCheckServiceImpl {
}
const ACTION_HEALTHCHECKS_CREATE: &str = "network:healthchecks:create";
const ACTION_HEALTHCHECKS_READ: &str = "network:healthchecks:read";
const ACTION_HEALTHCHECKS_LIST: &str = "network:healthchecks:list";
const ACTION_HEALTHCHECKS_UPDATE: &str = "network:healthchecks:update";
const ACTION_HEALTHCHECKS_DELETE: &str = "network:healthchecks:delete";
/// Convert domain HealthCheck to proto
fn health_check_to_proto(hc: &HealthCheck) -> ProtoHealthCheck {
@ -52,13 +53,11 @@ fn health_check_to_proto(hc: &HealthCheck) -> ProtoHealthCheck {
timeout_seconds: hc.timeout_seconds,
healthy_threshold: hc.healthy_threshold,
unhealthy_threshold: hc.unhealthy_threshold,
http_config: hc.http_config.as_ref().map(|cfg| {
ProtoHttpHealthConfig {
http_config: hc.http_config.as_ref().map(|cfg| ProtoHttpHealthConfig {
method: cfg.method.clone(),
path: cfg.path.clone(),
expected_codes: cfg.expected_codes.iter().map(|&c| c as u32).collect(),
host: cfg.host.clone().unwrap_or_default(),
}
}),
enabled: hc.enabled,
created_at: hc.created_at,
@ -100,7 +99,11 @@ fn proto_to_http_config(cfg: Option<ProtoHttpHealthConfig>) -> Option<HttpHealth
method: c.method,
path: c.path,
expected_codes: c.expected_codes.iter().map(|&c| c as u16).collect(),
host: if c.host.is_empty() { None } else { Some(c.host) },
host: if c.host.is_empty() {
None
} else {
Some(c.host)
},
})
}
@ -123,40 +126,39 @@ impl HealthCheckService for HealthCheckServiceImpl {
let pool_id = parse_pool_id(&req.pool_id)?;
let lbs = self
let pool = self
.metadata
.list_lbs(&tenant.org_id, Some(tenant.project_id.as_str()))
.await
.map_err(|e| Status::internal(format!("metadata error: {}", e)))?;
let mut scope: Option<(String, String)> = None;
for lb in &lbs {
if let Some(_) = self
.metadata
.load_pool(&lb.id, &pool_id)
.load_pool_by_id(&pool_id)
.await
.map_err(|e| Status::internal(format!("metadata error: {}", e)))?
{
scope = Some((lb.org_id.clone(), lb.project_id.clone()));
break;
.ok_or_else(|| Status::not_found("pool not found"))?;
let lb = self
.metadata
.load_lb_by_id(&pool.loadbalancer_id)
.await
.map_err(|e| Status::internal(format!("metadata error: {}", e)))?
.ok_or_else(|| Status::not_found("load balancer not found"))?;
if lb.org_id != tenant.org_id || lb.project_id != tenant.project_id {
return Err(Status::permission_denied("pool not in tenant scope"));
}
}
let (lb_org_id, lb_project_id) =
scope.ok_or_else(|| Status::not_found("pool not found"))?;
self.auth
.authorize(
&tenant,
ACTION_HEALTHCHECKS_CREATE,
&resource_for_tenant("health-check", "*", &lb_org_id, &lb_project_id),
&resource_for_tenant("health-check", "*", &lb.org_id, &lb.project_id),
)
.await?;
let check_type = proto_to_check_type(req.r#type);
// Create health check based on type
let mut hc = if check_type == HealthCheckType::Http || check_type == HealthCheckType::Https {
let path = req.http_config.as_ref().map(|c| c.path.as_str()).unwrap_or("/health");
let mut hc = if check_type == HealthCheckType::Http || check_type == HealthCheckType::Https
{
let path = req
.http_config
.as_ref()
.map(|c| c.path.as_str())
.unwrap_or("/health");
HealthCheck::new_http(&req.name, pool_id, path)
} else {
HealthCheck::new_tcp(&req.name, pool_id)
@ -195,18 +197,55 @@ impl HealthCheckService for HealthCheckServiceImpl {
&self,
request: Request<GetHealthCheckRequest>,
) -> Result<Response<GetHealthCheckResponse>, Status> {
let tenant = get_tenant_context(&request)?;
let req = request.into_inner();
if req.id.is_empty() {
return Err(Status::invalid_argument("id is required"));
}
let _hc_id = parse_hc_id(&req.id)?;
let hc_id = parse_hc_id(&req.id)?;
let hc = self
.metadata
.load_health_check_by_id(&hc_id)
.await
.map_err(|e| Status::internal(format!("metadata error: {}", e)))?
.ok_or_else(|| Status::not_found("health check not found"))?;
let pool = self
.metadata
.load_pool_by_id(&hc.pool_id)
.await
.map_err(|e| Status::internal(format!("metadata error: {}", e)))?
.ok_or_else(|| Status::not_found("pool not found"))?;
let lb = self
.metadata
.load_lb_by_id(&pool.loadbalancer_id)
.await
.map_err(|e| Status::internal(format!("metadata error: {}", e)))?
.ok_or_else(|| Status::not_found("load balancer not found"))?;
// Need pool_id context to efficiently look up health check
Err(Status::unimplemented(
"get_health_check by ID requires pool_id context; use list_health_checks instead",
))
if lb.org_id != tenant.org_id || lb.project_id != tenant.project_id {
return Err(Status::permission_denied(
"health check not in tenant scope",
));
}
self.auth
.authorize(
&tenant,
ACTION_HEALTHCHECKS_READ,
&resource_for_tenant(
"health-check",
&hc.id.to_string(),
&lb.org_id,
&lb.project_id,
),
)
.await?;
Ok(Response::new(GetHealthCheckResponse {
health_check: Some(health_check_to_proto(&hc)),
}))
}
async fn list_health_checks(
@ -222,33 +261,27 @@ impl HealthCheckService for HealthCheckServiceImpl {
let pool_id = parse_pool_id(&req.pool_id)?;
let lbs = self
let pool = self
.metadata
.list_lbs(&tenant.org_id, Some(tenant.project_id.as_str()))
.await
.map_err(|e| Status::internal(format!("metadata error: {}", e)))?;
let mut scope: Option<(String, String)> = None;
for lb in &lbs {
if let Some(_) = self
.metadata
.load_pool(&lb.id, &pool_id)
.load_pool_by_id(&pool_id)
.await
.map_err(|e| Status::internal(format!("metadata error: {}", e)))?
{
scope = Some((lb.org_id.clone(), lb.project_id.clone()));
break;
.ok_or_else(|| Status::not_found("pool not found"))?;
let lb = self
.metadata
.load_lb_by_id(&pool.loadbalancer_id)
.await
.map_err(|e| Status::internal(format!("metadata error: {}", e)))?
.ok_or_else(|| Status::not_found("load balancer not found"))?;
if lb.org_id != tenant.org_id || lb.project_id != tenant.project_id {
return Err(Status::permission_denied("pool not in tenant scope"));
}
}
let (lb_org_id, lb_project_id) =
scope.ok_or_else(|| Status::not_found("pool not found"))?;
self.auth
.authorize(
&tenant,
ACTION_HEALTHCHECKS_LIST,
&resource_for_tenant("health-check", "*", &lb_org_id, &lb_project_id),
&resource_for_tenant("health-check", "*", &lb.org_id, &lb.project_id),
)
.await?;
@ -299,31 +332,141 @@ impl HealthCheckService for HealthCheckServiceImpl {
&self,
request: Request<UpdateHealthCheckRequest>,
) -> Result<Response<UpdateHealthCheckResponse>, Status> {
let tenant = get_tenant_context(&request)?;
let req = request.into_inner();
if req.id.is_empty() {
return Err(Status::invalid_argument("id is required"));
}
// Need pool_id context for update
Err(Status::unimplemented(
"update_health_check requires pool_id context; include pool_id in request",
))
let hc_id = parse_hc_id(&req.id)?;
let mut hc = self
.metadata
.load_health_check_by_id(&hc_id)
.await
.map_err(|e| Status::internal(format!("metadata error: {}", e)))?
.ok_or_else(|| Status::not_found("health check not found"))?;
let pool = self
.metadata
.load_pool_by_id(&hc.pool_id)
.await
.map_err(|e| Status::internal(format!("metadata error: {}", e)))?
.ok_or_else(|| Status::not_found("pool not found"))?;
let lb = self
.metadata
.load_lb_by_id(&pool.loadbalancer_id)
.await
.map_err(|e| Status::internal(format!("metadata error: {}", e)))?
.ok_or_else(|| Status::not_found("load balancer not found"))?;
if lb.org_id != tenant.org_id || lb.project_id != tenant.project_id {
return Err(Status::permission_denied(
"health check not in tenant scope",
));
}
self.auth
.authorize(
&tenant,
ACTION_HEALTHCHECKS_UPDATE,
&resource_for_tenant(
"health-check",
&hc.id.to_string(),
&lb.org_id,
&lb.project_id,
),
)
.await?;
if !req.name.is_empty() {
hc.name = req.name;
}
if req.interval_seconds > 0 {
hc.interval_seconds = req.interval_seconds;
}
if req.timeout_seconds > 0 {
hc.timeout_seconds = req.timeout_seconds;
}
if req.healthy_threshold > 0 {
hc.healthy_threshold = req.healthy_threshold;
}
if req.unhealthy_threshold > 0 {
hc.unhealthy_threshold = req.unhealthy_threshold;
}
if req.http_config.is_some() {
hc.http_config = proto_to_http_config(req.http_config);
}
hc.enabled = req.enabled;
hc.updated_at = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
self.metadata
.save_health_check(&hc)
.await
.map_err(|e| Status::internal(format!("failed to save health check: {}", e)))?;
Ok(Response::new(UpdateHealthCheckResponse {
health_check: Some(health_check_to_proto(&hc)),
}))
}
async fn delete_health_check(
&self,
request: Request<DeleteHealthCheckRequest>,
) -> Result<Response<DeleteHealthCheckResponse>, Status> {
let tenant = get_tenant_context(&request)?;
let req = request.into_inner();
if req.id.is_empty() {
return Err(Status::invalid_argument("id is required"));
}
// Need pool_id context for delete
Err(Status::unimplemented(
"delete_health_check requires pool_id context; include pool_id in request",
))
let hc_id = parse_hc_id(&req.id)?;
let hc = self
.metadata
.load_health_check_by_id(&hc_id)
.await
.map_err(|e| Status::internal(format!("metadata error: {}", e)))?
.ok_or_else(|| Status::not_found("health check not found"))?;
let pool = self
.metadata
.load_pool_by_id(&hc.pool_id)
.await
.map_err(|e| Status::internal(format!("metadata error: {}", e)))?
.ok_or_else(|| Status::not_found("pool not found"))?;
let lb = self
.metadata
.load_lb_by_id(&pool.loadbalancer_id)
.await
.map_err(|e| Status::internal(format!("metadata error: {}", e)))?
.ok_or_else(|| Status::not_found("load balancer not found"))?;
if lb.org_id != tenant.org_id || lb.project_id != tenant.project_id {
return Err(Status::permission_denied(
"health check not in tenant scope",
));
}
self.auth
.authorize(
&tenant,
ACTION_HEALTHCHECKS_DELETE,
&resource_for_tenant(
"health-check",
&hc.id.to_string(),
&lb.org_id,
&lb.project_id,
),
)
.await?;
self.metadata
.delete_health_check(&hc)
.await
.map_err(|e| Status::internal(format!("failed to delete health check: {}", e)))?;
Ok(Response::new(DeleteHealthCheckResponse {}))
}
}

View file

@ -585,7 +585,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
info!("FlareDB server starting with health checks enabled");
// Create SQL service
let sql_service = sql_service::SqlServiceImpl::new(server_config.addr.to_string());
let sql_service =
sql_service::SqlServiceImpl::new(server_config.addr.to_string(), pd_endpoints.clone());
// Configure TLS if enabled
let mut server = Server::builder();

View file

@ -33,8 +33,7 @@ pub fn build_merkle(
let mut start_key = None;
while let Some(Ok((k, v))) = iter.next() {
if (k.len() < 4 || k[..4] != prefix)
&& !k.starts_with(&prefix) {
if (k.len() < 4 || k[..4] != prefix) && !k.starts_with(&prefix) {
break;
}
if start_key.is_none() {

View file

@ -199,9 +199,7 @@ impl PdClient {
}
/// Start watching for metadata changes in the background
pub async fn start_watch(
&mut self,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
pub async fn start_watch(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let (tx, rx) = mpsc::channel::<WatchRequest>(32);
// Create watch requests for stores and regions prefixes

View file

@ -1,7 +1,6 @@
use crate::config::NamespaceManager;
use crate::store::Store;
use flaredb_raft::{FlareNode, FlareNodeId, FlareTypeConfig};
use flaredb_proto::raft_server::raft_service_server::RaftService;
use openraft::raft::{AppendEntriesRequest, VoteRequest};
use flaredb_proto::raft_server::{
FetchRangeRequest, FetchRangeResponse, ForwardEventualRequest, GetMerkleRequest,
GetMerkleResponse, GetModeRequest, GetModeResponse, ListNamespaceModesRequest,
@ -10,9 +9,10 @@ use flaredb_proto::raft_server::{
OpenRaftVoteRequest, OpenRaftVoteResponse, RaftMessage, RaftResponse,
UpdateNamespaceModeRequest, UpdateNamespaceModeResponse,
};
use flaredb_raft::{FlareNode, FlareNodeId, FlareTypeConfig};
use openraft::raft::{AppendEntriesRequest, VoteRequest};
use std::sync::Arc;
use tonic::{Request, Response, Status};
use crate::config::NamespaceManager;
pub struct RaftServiceImpl {
store: Arc<Store>,
@ -204,8 +204,8 @@ impl RaftService for RaftServiceImpl {
.await
.ok_or_else(|| Status::failed_precondition("region not found"))?;
let append_req: AppendEntriesRequest<FlareTypeConfig> =
serde_json::from_slice(&req.data).map_err(|e| {
let append_req: AppendEntriesRequest<FlareTypeConfig> = serde_json::from_slice(&req.data)
.map_err(|e| {
Status::invalid_argument(format!("invalid append_entries request: {}", e))
})?;

View file

@ -8,13 +8,13 @@
//! - GET /api/v1/scan - Range scan
//! - GET /health - Health check
use crate::pd_client::PdClient;
use axum::{
extract::{Path, Query, State},
http::StatusCode,
routing::{get, post, put},
Json, Router,
};
use crate::pd_client::PdClient;
use flaredb_client::RdbClient;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
@ -172,7 +172,9 @@ pub fn build_router(state: RestApiState) -> Router {
async fn health_check() -> (StatusCode, Json<SuccessResponse<serde_json::Value>>) {
(
StatusCode::OK,
Json(SuccessResponse::new(serde_json::json!({ "status": "healthy" }))),
Json(SuccessResponse::new(
serde_json::json!({ "status": "healthy" }),
)),
)
}
@ -211,12 +213,24 @@ async fn get_kv(
) -> Result<Json<SuccessResponse<GetResponse>>, (StatusCode, Json<ErrorResponse>)> {
let mut client = RdbClient::connect_direct(state.server_addr.clone(), "default")
.await
.map_err(|e| error_response(StatusCode::SERVICE_UNAVAILABLE, "SERVICE_UNAVAILABLE", &format!("Failed to connect: {}", e)))?;
.map_err(|e| {
error_response(
StatusCode::SERVICE_UNAVAILABLE,
"SERVICE_UNAVAILABLE",
&format!("Failed to connect: {}", e),
)
})?;
let value = client
.raw_get(key.as_bytes().to_vec())
.await
.map_err(|e| error_response(StatusCode::INTERNAL_SERVER_ERROR, "INTERNAL_ERROR", &e.to_string()))?
.map_err(|e| {
error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"INTERNAL_ERROR",
&e.to_string(),
)
})?
.ok_or_else(|| error_response(StatusCode::NOT_FOUND, "NOT_FOUND", "Key not found"))?;
Ok(Json(SuccessResponse::new(GetResponse {
@ -230,19 +244,34 @@ async fn put_kv(
State(state): State<RestApiState>,
Path(key): Path<String>,
Json(req): Json<PutRequest>,
) -> Result<(StatusCode, Json<SuccessResponse<serde_json::Value>>), (StatusCode, Json<ErrorResponse>)> {
) -> Result<(StatusCode, Json<SuccessResponse<serde_json::Value>>), (StatusCode, Json<ErrorResponse>)>
{
let mut client = RdbClient::connect_direct(state.server_addr.clone(), &req.namespace)
.await
.map_err(|e| error_response(StatusCode::SERVICE_UNAVAILABLE, "SERVICE_UNAVAILABLE", &format!("Failed to connect: {}", e)))?;
.map_err(|e| {
error_response(
StatusCode::SERVICE_UNAVAILABLE,
"SERVICE_UNAVAILABLE",
&format!("Failed to connect: {}", e),
)
})?;
client
.raw_put(key.as_bytes().to_vec(), req.value.as_bytes().to_vec())
.await
.map_err(|e| error_response(StatusCode::INTERNAL_SERVER_ERROR, "INTERNAL_ERROR", &e.to_string()))?;
.map_err(|e| {
error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"INTERNAL_ERROR",
&e.to_string(),
)
})?;
Ok((
StatusCode::OK,
Json(SuccessResponse::new(serde_json::json!({ "key": key, "success": true }))),
Json(SuccessResponse::new(
serde_json::json!({ "key": key, "success": true }),
)),
))
}
@ -253,15 +282,31 @@ async fn scan_kv(
) -> Result<Json<SuccessResponse<ScanResponse>>, (StatusCode, Json<ErrorResponse>)> {
let mut client = RdbClient::connect_direct(state.server_addr.clone(), &params.namespace)
.await
.map_err(|e| error_response(StatusCode::SERVICE_UNAVAILABLE, "SERVICE_UNAVAILABLE", &format!("Failed to connect: {}", e)))?;
.map_err(|e| {
error_response(
StatusCode::SERVICE_UNAVAILABLE,
"SERVICE_UNAVAILABLE",
&format!("Failed to connect: {}", e),
)
})?;
let start_key = params.start.unwrap_or_default();
let end_key = params.end.unwrap_or_else(|| format!("{}~", start_key));
let (keys, values, _next) = client
.raw_scan(start_key.as_bytes().to_vec(), end_key.as_bytes().to_vec(), 100)
.raw_scan(
start_key.as_bytes().to_vec(),
end_key.as_bytes().to_vec(),
100,
)
.await
.map_err(|e| error_response(StatusCode::INTERNAL_SERVER_ERROR, "INTERNAL_ERROR", &e.to_string()))?;
.map_err(|e| {
error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"INTERNAL_ERROR",
&e.to_string(),
)
})?;
let items: Vec<KvItem> = keys
.into_iter()
@ -282,13 +327,31 @@ async fn get_region(
) -> Result<Json<SuccessResponse<RegionResponse>>, (StatusCode, Json<ErrorResponse>)> {
let mut pd_client = PdClient::connect_any(&state.pd_endpoints)
.await
.map_err(|e| error_response(StatusCode::SERVICE_UNAVAILABLE, "PD_UNAVAILABLE", &format!("Failed to connect to PD: {}", e)))?;
.map_err(|e| {
error_response(
StatusCode::SERVICE_UNAVAILABLE,
"PD_UNAVAILABLE",
&format!("Failed to connect to PD: {}", e),
)
})?;
let region = pd_client
.get_region(id)
.await
.map_err(|e| error_response(StatusCode::INTERNAL_SERVER_ERROR, "INTERNAL_ERROR", &e.to_string()))?
.ok_or_else(|| error_response(StatusCode::NOT_FOUND, "NOT_FOUND", &format!("Region {} not found", id)))?;
.map_err(|e| {
error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"INTERNAL_ERROR",
&e.to_string(),
)
})?
.ok_or_else(|| {
error_response(
StatusCode::NOT_FOUND,
"NOT_FOUND",
&format!("Region {} not found", id),
)
})?;
Ok(Json(SuccessResponse::new(RegionResponse {
id: region.id,
@ -305,23 +368,44 @@ async fn add_peer_to_region(
) -> Result<Json<SuccessResponse<RegionResponse>>, (StatusCode, Json<ErrorResponse>)> {
let mut pd_client = PdClient::connect_any(&state.pd_endpoints)
.await
.map_err(|e| error_response(StatusCode::SERVICE_UNAVAILABLE, "PD_UNAVAILABLE", &format!("Failed to connect to PD: {}", e)))?;
.map_err(|e| {
error_response(
StatusCode::SERVICE_UNAVAILABLE,
"PD_UNAVAILABLE",
&format!("Failed to connect to PD: {}", e),
)
})?;
let mut region = pd_client
.get_region(id)
.await
.map_err(|e| error_response(StatusCode::INTERNAL_SERVER_ERROR, "INTERNAL_ERROR", &e.to_string()))?
.ok_or_else(|| error_response(StatusCode::NOT_FOUND, "NOT_FOUND", &format!("Region {} not found", id)))?;
.map_err(|e| {
error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"INTERNAL_ERROR",
&e.to_string(),
)
})?
.ok_or_else(|| {
error_response(
StatusCode::NOT_FOUND,
"NOT_FOUND",
&format!("Region {} not found", id),
)
})?;
// Add peer if not already present
if !region.peers.contains(&req.peer_id) {
region.peers.push(req.peer_id);
region.peers.sort();
pd_client
.put_region(region.clone())
.await
.map_err(|e| error_response(StatusCode::INTERNAL_SERVER_ERROR, "INTERNAL_ERROR", &e.to_string()))?;
pd_client.put_region(region.clone()).await.map_err(|e| {
error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"INTERNAL_ERROR",
&e.to_string(),
)
})?;
}
Ok(Json(SuccessResponse::new(RegionResponse {
@ -335,7 +419,8 @@ async fn add_peer_to_region(
async fn add_member_legacy(
State(state): State<RestApiState>,
Json(req): Json<AddMemberRequestLegacy>,
) -> Result<(StatusCode, Json<SuccessResponse<serde_json::Value>>), (StatusCode, Json<ErrorResponse>)> {
) -> Result<(StatusCode, Json<SuccessResponse<serde_json::Value>>), (StatusCode, Json<ErrorResponse>)>
{
let (peer_id, peer_addr) = resolve_join_peer(&state, &req).ok_or_else(|| {
error_response(
StatusCode::BAD_REQUEST,
@ -346,7 +431,13 @@ async fn add_member_legacy(
let mut pd_client = PdClient::connect_any(&state.pd_endpoints)
.await
.map_err(|e| error_response(StatusCode::SERVICE_UNAVAILABLE, "PD_UNAVAILABLE", &format!("Failed to connect to PD: {}", e)))?;
.map_err(|e| {
error_response(
StatusCode::SERVICE_UNAVAILABLE,
"PD_UNAVAILABLE",
&format!("Failed to connect to PD: {}", e),
)
})?;
let stores = pd_client.list_stores().await;
let already_registered = stores.iter().any(|store| store.id == peer_id);
@ -354,14 +445,26 @@ async fn add_member_legacy(
pd_client
.register_store(peer_id, peer_addr.clone())
.await
.map_err(|e| error_response(StatusCode::INTERNAL_SERVER_ERROR, "INTERNAL_ERROR", &e.to_string()))?;
.map_err(|e| {
error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"INTERNAL_ERROR",
&e.to_string(),
)
})?;
let mut regions = pd_client.list_regions().await;
if regions.is_empty() {
pd_client
.init_default_region(vec![state.store_id, peer_id])
.await
.map_err(|e| error_response(StatusCode::INTERNAL_SERVER_ERROR, "INTERNAL_ERROR", &e.to_string()))?;
.map_err(|e| {
error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"INTERNAL_ERROR",
&e.to_string(),
)
})?;
regions = vec![crate::pd_client::RegionInfo {
id: 1,
start_key: Vec::new(),
@ -376,10 +479,13 @@ async fn add_member_legacy(
if !region.peers.contains(&peer_id) {
region.peers.push(peer_id);
region.peers.sort_unstable();
pd_client
.put_region(region.clone())
.await
.map_err(|e| error_response(StatusCode::INTERNAL_SERVER_ERROR, "INTERNAL_ERROR", &e.to_string()))?;
pd_client.put_region(region.clone()).await.map_err(|e| {
error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"INTERNAL_ERROR",
&e.to_string(),
)
})?;
updated_regions.push(region.id);
}
}
@ -403,10 +509,7 @@ async fn add_member_legacy(
))
}
fn resolve_join_peer(
state: &RestApiState,
req: &AddMemberRequestLegacy,
) -> Option<(u64, String)> {
fn resolve_join_peer(state: &RestApiState, req: &AddMemberRequestLegacy) -> Option<(u64, String)> {
if let Ok(peer_id) = req.id.parse::<u64>() {
if let Some(addr) = req
.addr

View file

@ -1,16 +1,19 @@
use crate::config::{
decode_value_with_ts, encode_namespaced_key, encode_value_with_ts, ConsistencyMode,
decode_value_with_ts,
encode_namespaced_key,
encode_value_with_ts,
ConsistencyMode,
NamespaceManager, // Renamed from ServerConfig
};
use crate::store::Store;
use flaredb_raft::FlareRaftNode;
use flaredb_proto::kvrpc::kv_cas_server::KvCas;
use flaredb_proto::kvrpc::kv_raw_server::KvRaw;
use flaredb_proto::kvrpc::{
CasRequest, CasResponse, DeleteRequest, DeleteResponse, GetRequest, GetResponse, RawDeleteRequest,
RawDeleteResponse, RawGetRequest, RawGetResponse, RawPutRequest, RawPutResponse, RawScanRequest,
RawScanResponse, ScanRequest, ScanResponse, VersionedKv,
CasRequest, CasResponse, DeleteRequest, DeleteResponse, GetRequest, GetResponse,
RawDeleteRequest, RawDeleteResponse, RawGetRequest, RawGetResponse, RawPutRequest,
RawPutResponse, RawScanRequest, RawScanResponse, ScanRequest, ScanResponse, VersionedKv,
};
use flaredb_raft::FlareRaftNode;
use flaredb_storage::rocks_engine::RocksEngine;
use flaredb_storage::StorageEngine;
use std::sync::Arc;
@ -30,7 +33,11 @@ pub struct KvServiceImpl {
}
impl KvServiceImpl {
pub fn new(engine: Arc<RocksEngine>, namespace_manager: Arc<NamespaceManager>, store: Arc<Store>) -> Self {
pub fn new(
engine: Arc<RocksEngine>,
namespace_manager: Arc<NamespaceManager>,
store: Arc<Store>,
) -> Self {
Self {
engine,
namespace_manager,
@ -245,9 +252,9 @@ impl KvRaw for KvServiceImpl {
let ts = Self::now_millis();
if let Some(node) = self.route_raft_node(&encoded).await? {
let existed = node.read_kv(ns_id, &req.key).await.is_some();
node.delete_kv(ns_id, req.key, ts)
.await
.map_err(|e| Status::failed_precondition(format!("raft raw_delete failed: {}", e)))?;
node.delete_kv(ns_id, req.key, ts).await.map_err(|e| {
Status::failed_precondition(format!("raft raw_delete failed: {}", e))
})?;
Ok(Response::new(RawDeleteResponse {
success: true,
existed,
@ -573,7 +580,10 @@ mod tests {
let service = KvServiceImpl::new(
engine,
Arc::new(NamespaceManager::new(ConsistencyMode::Strong, HashMap::new())), // Use NamespaceManager directly
Arc::new(NamespaceManager::new(
ConsistencyMode::Strong,
HashMap::new(),
)), // Use NamespaceManager directly
store,
);
@ -635,11 +645,17 @@ mod tests {
let service = KvServiceImpl::new(
engine,
Arc::new(NamespaceManager::new(ConsistencyMode::Strong, HashMap::new())),
Arc::new(NamespaceManager::new(
ConsistencyMode::Strong,
HashMap::new(),
)),
store,
);
for (key, value) in [(b"k1".to_vec(), b"v1".to_vec()), (b"k2".to_vec(), b"v2".to_vec())] {
for (key, value) in [
(b"k1".to_vec(), b"v1".to_vec()),
(b"k2".to_vec(), b"v2".to_vec()),
] {
service
.compare_and_swap(Request::new(CasRequest {
key,

View file

@ -13,11 +13,16 @@ use tonic::{Request, Response, Status};
pub struct SqlServiceImpl {
/// Address of the local FlareDB server
server_addr: String,
/// ChainFire/PD endpoints used for region-aware routing
pd_endpoints: Vec<String>,
}
impl SqlServiceImpl {
pub fn new(server_addr: String) -> Self {
Self { server_addr }
pub fn new(server_addr: String, pd_endpoints: Vec<String>) -> Self {
Self {
server_addr,
pd_endpoints,
}
}
fn value_to_proto(value: &Value) -> ProtoValue {
@ -41,9 +46,7 @@ impl SqlServiceImpl {
is_null: false,
},
Value::Timestamp(ts) => ProtoValue {
value: Some(flaredb_proto::sqlrpc::sql_value::Value::TimestampValue(
*ts,
)),
value: Some(flaredb_proto::sqlrpc::sql_value::Value::TimestampValue(*ts)),
is_null: false,
},
}
@ -52,14 +55,15 @@ impl SqlServiceImpl {
#[tonic::async_trait]
impl SqlServiceTrait for SqlServiceImpl {
async fn execute(
&self,
request: Request<SqlRequest>,
) -> Result<Response<SqlResponse>, Status> {
async fn execute(&self, request: Request<SqlRequest>) -> Result<Response<SqlResponse>, Status> {
let req = request.into_inner();
// Connect to the local FlareDB server with the requested namespace
let client = RdbClient::connect_direct(self.server_addr.clone(), req.namespace.clone())
// Use PD-backed routing so SQL works for both strong and eventual namespaces.
let client = RdbClient::connect_with_pd_namespace(
self.server_addr.clone(),
self.pd_endpoints.join(","),
req.namespace.clone(),
)
.await
.map_err(|e| Status::internal(format!("Failed to connect to FlareDB: {}", e)))?;
@ -81,9 +85,7 @@ impl SqlServiceTrait for SqlServiceImpl {
},
ExecutionResult::DmlSuccess(rows_affected) => SqlResponse {
result: Some(flaredb_proto::sqlrpc::sql_response::Result::DmlResult(
DmlResult {
rows_affected,
},
DmlResult { rows_affected },
)),
},
ExecutionResult::Query(query_result) => {
@ -103,8 +105,7 @@ impl SqlServiceTrait for SqlServiceImpl {
},
)),
}
}
// Errors are returned via Result::Err and handled by .map_err() above
} // Errors are returned via Result::Err and handled by .map_err() above
};
Ok(Response::new(response))

View file

@ -1,6 +1,6 @@
use crate::error::{Result, SqlError};
use crate::error::Result;
use crate::metadata::MetadataManager;
use crate::parser::{SqlStatement, parse_sql};
use crate::parser::{parse_sql, SqlStatement};
use crate::storage::StorageManager;
use crate::types::QueryResult;
use flaredb_client::RdbClient;
@ -93,12 +93,31 @@ impl SqlExecutor {
Ok(ExecutionResult::Query(result))
}
SqlStatement::Update { .. } => {
Err(SqlError::InvalidOperation("UPDATE not yet implemented".to_string()))
SqlStatement::Update {
table_name,
assignments,
where_clause,
} => {
let table = self.metadata_manager.get_table_metadata(table_name).await?;
let updated = self
.storage_manager
.update_rows(&table, assignments, where_clause.as_ref())
.await?;
Ok(ExecutionResult::DmlSuccess(updated))
}
SqlStatement::Delete { .. } => {
Err(SqlError::InvalidOperation("DELETE not yet implemented".to_string()))
SqlStatement::Delete {
table_name,
where_clause,
} => {
let table = self.metadata_manager.get_table_metadata(table_name).await?;
let deleted = self
.storage_manager
.delete_rows(&table, where_clause.as_ref())
.await?;
Ok(ExecutionResult::DmlSuccess(deleted))
}
}
}
@ -111,7 +130,9 @@ mod tests {
#[tokio::test]
#[ignore] // Requires FlareDB server
async fn test_create_and_query_table() {
let client = RdbClient::connect_direct("127.0.0.1:8001".to_string(), "sqltest".to_string()).await.unwrap();
let client = RdbClient::connect_direct("127.0.0.1:8001".to_string(), "sqltest".to_string())
.await
.unwrap();
let executor = SqlExecutor::new(Arc::new(Mutex::new(client)));
// Create table

View file

@ -1,8 +1,6 @@
use crate::error::{Result, SqlError};
use crate::types::{ColumnDef, DataType, Value};
use sqlparser::ast::{
ColumnDef as AstColumnDef, DataType as AstDataType, Expr, Statement,
};
use sqlparser::ast::{ColumnDef as AstColumnDef, DataType as AstDataType, Expr, Statement};
use sqlparser::dialect::GenericDialect;
use sqlparser::parser::Parser;
@ -85,7 +83,9 @@ fn parse_statement(stmt: &Statement) -> Result<SqlStatement> {
Statement::CreateTable { .. } => parse_create_table(stmt),
Statement::Drop { names, .. } => {
if names.len() != 1 {
return Err(SqlError::ParseError("Expected single table name".to_string()));
return Err(SqlError::ParseError(
"Expected single table name".to_string(),
));
}
Ok(SqlStatement::DropTable {
table_name: names[0].to_string(),
@ -93,12 +93,8 @@ fn parse_statement(stmt: &Statement) -> Result<SqlStatement> {
}
Statement::Insert { .. } => parse_insert(stmt),
Statement::Query(query) => parse_select(query),
Statement::Update { .. } => {
Err(SqlError::ParseError("UPDATE not yet implemented".to_string()))
}
Statement::Delete { .. } => {
Err(SqlError::ParseError("DELETE not yet implemented".to_string()))
}
Statement::Update { .. } => parse_update(stmt),
Statement::Delete { .. } => parse_delete(stmt),
_ => Err(SqlError::ParseError(format!(
"Unsupported statement: {:?}",
stmt
@ -107,8 +103,16 @@ fn parse_statement(stmt: &Statement) -> Result<SqlStatement> {
}
fn parse_create_table(stmt: &Statement) -> Result<SqlStatement> {
let Statement::CreateTable { name, columns: col_defs, constraints, .. } = stmt else {
return Err(SqlError::ParseError("Expected CREATE TABLE statement".to_string()));
let Statement::CreateTable {
name,
columns: col_defs,
constraints,
..
} = stmt
else {
return Err(SqlError::ParseError(
"Expected CREATE TABLE statement".to_string(),
));
};
let table_name = name.to_string();
@ -122,7 +126,12 @@ fn parse_create_table(stmt: &Statement) -> Result<SqlStatement> {
// Extract primary key from constraints
for constraint in constraints {
if let sqlparser::ast::TableConstraint::Unique { columns: pk_cols, is_primary: true, .. } = constraint {
if let sqlparser::ast::TableConstraint::Unique {
columns: pk_cols,
is_primary: true,
..
} = constraint
{
for pk_col in pk_cols {
primary_key.push(pk_col.value.to_string());
}
@ -133,7 +142,10 @@ fn parse_create_table(stmt: &Statement) -> Result<SqlStatement> {
if primary_key.is_empty() {
for column in col_defs {
for option in &column.options {
if matches!(option.option, sqlparser::ast::ColumnOption::Unique { is_primary: true }) {
if matches!(
option.option,
sqlparser::ast::ColumnOption::Unique { is_primary: true }
) {
primary_key.push(column.name.value.to_string());
break;
}
@ -142,9 +154,7 @@ fn parse_create_table(stmt: &Statement) -> Result<SqlStatement> {
}
if primary_key.is_empty() {
return Err(SqlError::ParseError(
"PRIMARY KEY is required".to_string(),
));
return Err(SqlError::ParseError("PRIMARY KEY is required".to_string()));
}
Ok(SqlStatement::CreateTable {
@ -196,8 +206,16 @@ fn parse_data_type(dt: &AstDataType) -> Result<DataType> {
}
fn parse_insert(stmt: &Statement) -> Result<SqlStatement> {
let Statement::Insert { table_name, columns: col_idents, source, .. } = stmt else {
return Err(SqlError::ParseError("Expected INSERT statement".to_string()));
let Statement::Insert {
table_name,
columns: col_idents,
source,
..
} = stmt
else {
return Err(SqlError::ParseError(
"Expected INSERT statement".to_string(),
));
};
let table_name = table_name.to_string();
@ -276,6 +294,145 @@ fn parse_select(query: &sqlparser::ast::Query) -> Result<SqlStatement> {
}
}
fn parse_update(stmt: &Statement) -> Result<SqlStatement> {
let Statement::Update {
table,
assignments,
from,
selection,
returning,
} = stmt
else {
return Err(SqlError::ParseError(
"Expected UPDATE statement".to_string(),
));
};
if from.is_some() {
return Err(SqlError::ParseError(
"UPDATE ... FROM is not supported".to_string(),
));
}
if returning.is_some() {
return Err(SqlError::ParseError(
"UPDATE ... RETURNING is not supported".to_string(),
));
}
if !table.joins.is_empty() {
return Err(SqlError::ParseError(
"UPDATE with joins is not supported".to_string(),
));
}
let table_name = match &table.relation {
sqlparser::ast::TableFactor::Table { name, .. } => name.to_string(),
_ => {
return Err(SqlError::ParseError(
"Complex UPDATE targets are not supported".to_string(),
))
}
};
let mut parsed_assignments = Vec::with_capacity(assignments.len());
for assignment in assignments {
if assignment.id.len() != 1 {
return Err(SqlError::ParseError(
"Only simple column assignments are supported".to_string(),
));
}
let column = assignment.id[0].value.to_string();
let value = parse_expr_as_value(&assignment.value)?;
parsed_assignments.push((column, value));
}
let where_clause = if let Some(expr) = selection {
Some(parse_where_expr(expr)?)
} else {
None
};
Ok(SqlStatement::Update {
table_name,
assignments: parsed_assignments,
where_clause,
})
}
fn parse_delete(stmt: &Statement) -> Result<SqlStatement> {
let Statement::Delete {
tables,
from,
using,
selection,
returning,
order_by,
limit,
} = stmt
else {
return Err(SqlError::ParseError(
"Expected DELETE statement".to_string(),
));
};
if !tables.is_empty() {
return Err(SqlError::ParseError(
"Multi-table DELETE is not supported".to_string(),
));
}
if using.is_some() {
return Err(SqlError::ParseError(
"DELETE ... USING is not supported".to_string(),
));
}
if returning.is_some() {
return Err(SqlError::ParseError(
"DELETE ... RETURNING is not supported".to_string(),
));
}
if !order_by.is_empty() {
return Err(SqlError::ParseError(
"DELETE ... ORDER BY is not supported".to_string(),
));
}
if limit.is_some() {
return Err(SqlError::ParseError(
"DELETE ... LIMIT is not supported".to_string(),
));
}
if from.len() != 1 {
return Err(SqlError::ParseError(
"DELETE must target exactly one table".to_string(),
));
}
let table = &from[0];
if !table.joins.is_empty() {
return Err(SqlError::ParseError(
"DELETE with joins is not supported".to_string(),
));
}
let table_name = match &table.relation {
sqlparser::ast::TableFactor::Table { name, .. } => name.to_string(),
_ => {
return Err(SqlError::ParseError(
"Complex DELETE targets are not supported".to_string(),
))
}
};
let where_clause = if let Some(expr) = selection {
Some(parse_where_expr(expr)?)
} else {
None
};
Ok(SqlStatement::Delete {
table_name,
where_clause,
})
}
fn parse_where_expr(expr: &Expr) -> Result<WhereClause> {
match expr {
Expr::BinaryOp { left, op, right } => {
@ -356,3 +513,64 @@ fn parse_expr_as_value(expr: &Expr) -> Result<Value> {
))),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parses_update_statement() {
let parsed = parse_sql("UPDATE users SET name = 'Bob', active = true WHERE id = 7")
.expect("update should parse");
match parsed {
SqlStatement::Update {
table_name,
assignments,
where_clause,
} => {
assert_eq!(table_name, "users");
assert_eq!(
assignments,
vec![
("name".to_string(), Value::Text("Bob".to_string())),
("active".to_string(), Value::Boolean(true)),
]
);
assert!(matches!(
where_clause,
Some(WhereClause::Comparison {
column,
op: ComparisonOp::Eq,
value: Value::Integer(7),
}) if column == "id"
));
}
other => panic!("expected update statement, got {:?}", other),
}
}
#[test]
fn parses_delete_statement() {
let parsed =
parse_sql("DELETE FROM users WHERE name = 'Alice'").expect("delete should parse");
match parsed {
SqlStatement::Delete {
table_name,
where_clause,
} => {
assert_eq!(table_name, "users");
assert!(matches!(
where_clause,
Some(WhereClause::Comparison {
column,
op: ComparisonOp::Eq,
value: Value::Text(value),
}) if column == "name" && value == "Alice"
));
}
other => panic!("expected delete statement, got {:?}", other),
}
}
}

View file

@ -13,6 +13,12 @@ pub struct StorageManager {
client: Arc<Mutex<RdbClient>>,
}
struct ScannedRow {
key: Vec<u8>,
row: RowData,
version: u64,
}
impl StorageManager {
pub fn new(client: Arc<Mutex<RdbClient>>) -> Self {
Self { client }
@ -52,9 +58,9 @@ impl StorageManager {
.primary_key
.iter()
.map(|pk_col| {
row_data
.get(pk_col)
.ok_or_else(|| SqlError::PrimaryKeyViolation(format!("Missing primary key column: {}", pk_col)))
row_data.get(pk_col).ok_or_else(|| {
SqlError::PrimaryKeyViolation(format!("Missing primary key column: {}", pk_col))
})
})
.collect();
let pk_values = pk_values?;
@ -66,8 +72,8 @@ impl StorageManager {
let row = RowData::new(row_data);
// Serialize row
let value = bincode::serialize(&row)
.map_err(|e| SqlError::SerializationError(e.to_string()))?;
let value =
bincode::serialize(&row).map_err(|e| SqlError::SerializationError(e.to_string()))?;
// Store in KVS using CAS (version 0 for new row)
let mut client = self.client.lock().await;
@ -77,7 +83,9 @@ impl StorageManager {
.map_err(|e| SqlError::KvsError(e.to_string()))?;
if !success {
return Err(SqlError::PrimaryKeyViolation("Row with this primary key already exists".to_string()));
return Err(SqlError::PrimaryKeyViolation(
"Row with this primary key already exists".to_string(),
));
}
Ok(())
@ -109,21 +117,9 @@ impl StorageManager {
let mut result = QueryResult::new(result_columns.clone());
// Scan all rows for this table
let start_key = Self::encode_table_prefix(table.table_id);
let end_key = Self::encode_table_prefix(table.table_id + 1);
let mut client = self.client.lock().await;
let (entries, _next_key) = client
.cas_scan(start_key, end_key, 1000)
.await
.map_err(|e| SqlError::KvsError(e.to_string()))?;
// Process each row
for (_key, value_bytes, _version) in entries {
let row: RowData = bincode::deserialize(&value_bytes)
.map_err(|e| SqlError::SerializationError(e.to_string()))?;
for scanned in self.scan_table_rows(table).await? {
let row = scanned.row;
// Apply WHERE filter
if let Some(where_clause) = where_clause {
if !Self::evaluate_where(&row, where_clause)? {
@ -134,10 +130,7 @@ impl StorageManager {
// Extract requested columns
let mut row_values = Vec::new();
for col_name in &result_columns {
let value = row
.get(col_name)
.cloned()
.unwrap_or(Value::Null);
let value = row.get(col_name).cloned().unwrap_or(Value::Null);
row_values.push(value);
}
@ -147,6 +140,138 @@ impl StorageManager {
Ok(result)
}
/// Update rows in a table
pub async fn update_rows(
&self,
table: &TableMetadata,
assignments: &[(String, Value)],
where_clause: Option<&WhereClause>,
) -> Result<u64> {
for (column, _) in assignments {
if table.get_column(column).is_none() {
return Err(SqlError::ColumnNotFound(
column.clone(),
table.table_name.clone(),
));
}
}
let mut updated = 0u64;
for scanned in self.scan_table_rows(table).await? {
if let Some(where_clause) = where_clause {
if !Self::evaluate_where(&scanned.row, where_clause)? {
continue;
}
}
let mut row = scanned.row;
for (column, value) in assignments {
row.set(column.clone(), value.clone());
}
let pk_values = Self::primary_key_values(table, &row)?;
let new_key = Self::encode_row_key(table.table_id, &pk_values)?;
let value = Self::serialize_row(&row)?;
if new_key == scanned.key {
let mut client = self.client.lock().await;
let (success, _current_version, _new_version) = client
.cas(scanned.key.clone(), value, scanned.version)
.await
.map_err(|e| SqlError::KvsError(e.to_string()))?;
if !success {
return Err(SqlError::InvalidOperation(
"Concurrent row update detected".to_string(),
));
}
} else {
let mut client = self.client.lock().await;
let (created, _current_version, new_version) = client
.cas(new_key.clone(), value, 0)
.await
.map_err(|e| SqlError::KvsError(e.to_string()))?;
if !created {
return Err(SqlError::PrimaryKeyViolation(
"Row with this primary key already exists".to_string(),
));
}
let (deleted, _current_version, _existed) = client
.cas_delete(scanned.key.clone(), scanned.version)
.await
.map_err(|e| SqlError::KvsError(e.to_string()))?;
if !deleted {
let _ = client.cas_delete(new_key, new_version).await;
return Err(SqlError::InvalidOperation(
"Concurrent row update detected".to_string(),
));
}
}
updated += 1;
}
Ok(updated)
}
/// Delete rows from a table
pub async fn delete_rows(
&self,
table: &TableMetadata,
where_clause: Option<&WhereClause>,
) -> Result<u64> {
let mut deleted = 0u64;
for scanned in self.scan_table_rows(table).await? {
if let Some(where_clause) = where_clause {
if !Self::evaluate_where(&scanned.row, where_clause)? {
continue;
}
}
let mut client = self.client.lock().await;
let (success, _current_version, _existed) = client
.cas_delete(scanned.key, scanned.version)
.await
.map_err(|e| SqlError::KvsError(e.to_string()))?;
if !success {
return Err(SqlError::InvalidOperation(
"Concurrent row delete detected".to_string(),
));
}
deleted += 1;
}
Ok(deleted)
}
async fn scan_table_rows(&self, table: &TableMetadata) -> Result<Vec<ScannedRow>> {
let (mut next_key, end_key) = Self::encode_table_scan_range(table.table_id);
let mut rows = Vec::new();
let mut client = self.client.lock().await;
loop {
let (entries, continuation) = client
.cas_scan(next_key.clone(), end_key.clone(), 1000)
.await
.map_err(|e| SqlError::KvsError(e.to_string()))?;
for (key, value_bytes, version) in entries {
let row: RowData = bincode::deserialize(&value_bytes)
.map_err(|e| SqlError::SerializationError(e.to_string()))?;
rows.push(ScannedRow { key, row, version });
}
if let Some(continuation) = continuation {
next_key = continuation;
} else {
break;
}
}
Ok(rows)
}
/// Evaluate WHERE clause against a row
fn evaluate_where(row: &RowData, where_clause: &WhereClause) -> Result<bool> {
match where_clause {
@ -213,4 +338,57 @@ impl StorageManager {
fn encode_table_prefix(table_id: u32) -> Vec<u8> {
format!("{}:{}:", DATA_PREFIX, table_id).into_bytes()
}
fn encode_table_scan_range(table_id: u32) -> (Vec<u8>, Vec<u8>) {
let start = Self::encode_table_prefix(table_id);
let end = Self::prefix_end(&start);
(start, end)
}
fn prefix_end(prefix: &[u8]) -> Vec<u8> {
let mut end_key = prefix.to_vec();
if let Some(last) = end_key.last_mut() {
if *last == 0xff {
end_key.push(0x00);
} else {
*last += 1;
}
} else {
end_key.push(0xff);
}
end_key
}
fn primary_key_values<'a>(
table: &'a TableMetadata,
row: &'a RowData,
) -> Result<Vec<&'a Value>> {
table
.primary_key
.iter()
.map(|pk_col| {
row.get(pk_col).ok_or_else(|| {
SqlError::PrimaryKeyViolation(format!("Missing primary key column: {}", pk_col))
})
})
.collect()
}
fn serialize_row(row: &RowData) -> Result<Vec<u8>> {
bincode::serialize(row).map_err(|e| SqlError::SerializationError(e.to_string()))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn table_scan_range_uses_prefix_successor() {
let (start, end) = StorageManager::encode_table_scan_range(9);
assert_eq!(start, b"__sql_data:9:".to_vec());
assert_eq!(end, b"__sql_data:9;".to_vec());
assert!(b"__sql_data:9:1".to_vec() < end);
assert!(b"__sql_data:10:1".to_vec() < start);
}
}

View file

@ -170,7 +170,13 @@ impl ObjectServiceImpl {
end: usize,
) -> <Self as ObjectService>::GetObjectStream {
let storage = self.storage.clone();
let state = (storage, upload, Some(self.object_to_proto(object)), 0usize, 0u64);
let state = (
storage,
upload,
Some(self.object_to_proto(object)),
0usize,
0u64,
);
let range_start = start as u64;
let range_end = end as u64;
let object_size = object.size;
@ -327,6 +333,41 @@ impl ObjectServiceImpl {
.await
}
async fn load_full_object_bytes(&self, object: &Object) -> Result<Bytes, Status> {
if let Some(upload) = self
.metadata
.load_object_multipart_upload(&object.id)
.await
.map_err(Self::to_status)?
{
let mut body = BytesMut::new();
for part in &upload.parts {
let bytes = self
.storage
.get_part(upload.upload_id.as_str(), part.part_number.as_u32())
.await
.map_err(|e| {
Status::internal(format!("Failed to retrieve multipart object part: {}", e))
})?;
body.extend_from_slice(bytes.as_ref());
}
if body.len() as u64 != object.size {
return Err(Status::internal(format!(
"Multipart object {} has inconsistent size: expected {}, got {}",
object.id,
object.size,
body.len()
)));
}
Ok(body.freeze())
} else {
self.storage
.get_object(&object.id)
.await
.map_err(|e| Status::internal(format!("Failed to retrieve object: {}", e)))
}
}
fn multipart_lock(&self, upload_id: &str) -> Arc<Mutex<()>> {
self.multipart_locks
.entry(upload_id.to_string())
@ -613,9 +654,99 @@ impl ObjectService for ObjectServiceImpl {
async fn copy_object(
&self,
_request: Request<CopyObjectRequest>,
request: Request<CopyObjectRequest>,
) -> Result<Response<CopyObjectResponse>, Status> {
Err(Status::unimplemented("CopyObject not yet implemented"))
let tenant = get_tenant_context(&request)?;
let req = request.into_inner();
if req.source_bucket.is_empty() {
return Err(Status::invalid_argument("source_bucket is required"));
}
if req.source_key.is_empty() {
return Err(Status::invalid_argument("source_key is required"));
}
if req.dest_bucket.is_empty() {
return Err(Status::invalid_argument("dest_bucket is required"));
}
if req.dest_key.is_empty() {
return Err(Status::invalid_argument("dest_key is required"));
}
let source_bucket = self
.load_bucket_for_tenant(&tenant, &req.source_bucket)
.await?;
self.authorize_object_action(
&tenant,
ACTION_OBJECTS_READ,
&source_bucket,
&req.source_key,
)
.await?;
let dest_bucket = self
.load_bucket_for_tenant(&tenant, &req.dest_bucket)
.await?;
self.authorize_object_action(&tenant, ACTION_OBJECTS_CREATE, &dest_bucket, &req.dest_key)
.await?;
let source_bucket_id: BucketId = BucketId::from_str(&source_bucket.id.to_string())
.map_err(|_| Status::internal("Invalid source bucket ID"))?;
let dest_key = ObjectKey::new(&req.dest_key)
.map_err(|e| Status::invalid_argument(format!("Invalid destination key: {}", e)))?;
let source_version_id = if req.source_version_id.is_empty() {
None
} else {
Some(req.source_version_id.as_str())
};
let source_object = self
.metadata
.load_object(&source_bucket_id, &req.source_key, source_version_id)
.await
.map_err(Self::to_status)?
.ok_or_else(|| Status::not_found(format!("Object {} not found", req.source_key)))?;
if source_object.is_delete_marker {
return Err(Status::not_found("Source object is a delete marker"));
}
let data = self.load_full_object_bytes(&source_object).await?;
let object_metadata = if req.metadata_directive_replace {
Self::proto_metadata_to_object_metadata(req.metadata)
} else {
source_object.metadata.clone()
};
let mut dest_object = Object::new(
dest_bucket.id.to_string(),
dest_key,
source_object.etag.clone(),
source_object.size,
object_metadata.content_type.clone(),
);
dest_object.metadata = object_metadata;
dest_object.storage_class = source_object.storage_class.clone();
if dest_bucket.versioning == lightningstor_types::Versioning::Enabled {
dest_object.version = ObjectVersion::new();
}
self.storage
.put_object(&dest_object.id, data)
.await
.map_err(|e| Status::internal(format!("Failed to store copied object: {}", e)))?;
self.metadata
.save_object(&dest_object)
.await
.map_err(Self::to_status)?;
Ok(Response::new(CopyObjectResponse {
etag: dest_object.etag.as_str().to_string(),
version_id: dest_object.version.as_str().to_string(),
last_modified: Some(prost_types::Timestamp {
seconds: dest_object.last_modified.timestamp(),
nanos: dest_object.last_modified.timestamp_subsec_nanos() as i32,
}),
}))
}
async fn list_objects(
@ -801,10 +932,14 @@ impl ObjectService for ObjectServiceImpl {
while let Some(chunk) = stream.message().await? {
if !chunk.bucket.is_empty() && chunk.bucket != first.bucket {
return Err(Status::invalid_argument("bucket changed within UploadPart stream"));
return Err(Status::invalid_argument(
"bucket changed within UploadPart stream",
));
}
if !chunk.key.is_empty() && chunk.key != first.key {
return Err(Status::invalid_argument("key changed within UploadPart stream"));
return Err(Status::invalid_argument(
"key changed within UploadPart stream",
));
}
if !chunk.upload_id.is_empty() && chunk.upload_id != first.upload_id {
return Err(Status::invalid_argument(
@ -922,7 +1057,10 @@ impl ObjectService for ObjectServiceImpl {
selected_parts.push(part.clone());
}
let etags: Vec<ETag> = selected_parts.iter().map(|part| part.etag.clone()).collect();
let etags: Vec<ETag> = selected_parts
.iter()
.map(|part| part.etag.clone())
.collect();
let multipart_etag = ETag::multipart(&etags, selected_parts.len());
upload.parts = selected_parts;
@ -1018,7 +1156,11 @@ impl ObjectService for ObjectServiceImpl {
));
}
let max_parts = if req.max_parts > 0 { req.max_parts } else { 1000 };
let max_parts = if req.max_parts > 0 {
req.max_parts
} else {
1000
};
let remaining_count = upload
.parts
.iter()
@ -1064,7 +1206,11 @@ impl ObjectService for ObjectServiceImpl {
let bucket_id: BucketId = BucketId::from_str(&bucket.id.to_string())
.map_err(|_| Status::internal("Invalid bucket ID"))?;
let max_uploads = if req.max_uploads > 0 { req.max_uploads } else { 1000 };
let max_uploads = if req.max_uploads > 0 {
req.max_uploads
} else {
1000
};
let uploads = self
.metadata
.list_multipart_uploads(&bucket_id, &req.prefix, max_uploads)

View file

@ -82,6 +82,7 @@ PLASMAVMC_PROTO_DIR="${REPO_ROOT}/plasmavmc/proto"
PLASMAVMC_PROTO="${PLASMAVMC_PROTO_DIR}/plasmavmc.proto"
FLAREDB_PROTO_DIR="${REPO_ROOT}/flaredb/crates/flaredb-proto/src"
FLAREDB_PROTO="${FLAREDB_PROTO_DIR}/kvrpc.proto"
FLAREDB_SQL_PROTO="${FLAREDB_PROTO_DIR}/sqlrpc.proto"
# shellcheck disable=SC2034
NODE_PHASES=(
@ -2507,6 +2508,7 @@ ensure_flaredb_proto_on_node() {
ssh_node "${node}" "install -d -m 0755 ${proto_root}"
scp_to_node "${node}" "${FLAREDB_PROTO}" "${proto_root}/kvrpc.proto"
scp_to_node "${node}" "${FLAREDB_SQL_PROTO}" "${proto_root}/sqlrpc.proto"
}
vm_runtime_dir_path() {
@ -3072,6 +3074,74 @@ while true; do
fi
sleep 1
done
EOS
log "Validating FlareDB SQL DDL/DML execution"
ssh_node_script node01 "${flaredb_proto_root}" "10.100.0.11:2479" <<'EOS'
set -euo pipefail
proto_root="$1"
sql_target="$2"
sql_namespace="validation-sql-$(date +%s)"
grpcurl -plaintext \
-import-path "${proto_root}" \
-proto "${proto_root}/sqlrpc.proto" \
-d "$(jq -cn --arg namespace "${sql_namespace}" --arg sql "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT NOT NULL, active BOOLEAN)" '{namespace:$namespace, sql:$sql}')" \
"${sql_target}" sqlrpc.SqlService/Execute \
| jq -e '.ddlResult.message | contains("created")' >/dev/null
grpcurl -plaintext \
-import-path "${proto_root}" \
-proto "${proto_root}/sqlrpc.proto" \
-d "$(jq -cn --arg namespace "${sql_namespace}" --arg sql "INSERT INTO users (id, name, active) VALUES (1, 'Alice', true)" '{namespace:$namespace, sql:$sql}')" \
"${sql_target}" sqlrpc.SqlService/Execute \
| jq -e '(.dmlResult.rowsAffected | tonumber) == 1' >/dev/null
grpcurl -plaintext \
-import-path "${proto_root}" \
-proto "${proto_root}/sqlrpc.proto" \
-d "$(jq -cn --arg namespace "${sql_namespace}" --arg sql "INSERT INTO users (id, name, active) VALUES (2, 'Bob', false)" '{namespace:$namespace, sql:$sql}')" \
"${sql_target}" sqlrpc.SqlService/Execute \
| jq -e '(.dmlResult.rowsAffected | tonumber) == 1' >/dev/null
grpcurl -plaintext \
-import-path "${proto_root}" \
-proto "${proto_root}/sqlrpc.proto" \
-d "$(jq -cn --arg namespace "${sql_namespace}" --arg sql "UPDATE users SET name = 'Carol', active = false WHERE id = 1" '{namespace:$namespace, sql:$sql}')" \
"${sql_target}" sqlrpc.SqlService/Execute \
| jq -e '(.dmlResult.rowsAffected | tonumber) == 1' >/dev/null
grpcurl -plaintext \
-import-path "${proto_root}" \
-proto "${proto_root}/sqlrpc.proto" \
-d "$(jq -cn --arg namespace "${sql_namespace}" --arg sql "SELECT id, name, active FROM users WHERE id = 1" '{namespace:$namespace, sql:$sql}')" \
"${sql_target}" sqlrpc.SqlService/Execute \
| jq -e '
.queryResult.columns == ["id", "name", "active"] and
(.queryResult.rows | length) == 1 and
.queryResult.rows[0].values[0].intValue == "1" and
.queryResult.rows[0].values[1].textValue == "Carol" and
((.queryResult.rows[0].values[2].boolValue // false) == false)
' >/dev/null
grpcurl -plaintext \
-import-path "${proto_root}" \
-proto "${proto_root}/sqlrpc.proto" \
-d "$(jq -cn --arg namespace "${sql_namespace}" --arg sql "DELETE FROM users WHERE id = 2" '{namespace:$namespace, sql:$sql}')" \
"${sql_target}" sqlrpc.SqlService/Execute \
| jq -e '(.dmlResult.rowsAffected | tonumber) == 1' >/dev/null
grpcurl -plaintext \
-import-path "${proto_root}" \
-proto "${proto_root}/sqlrpc.proto" \
-d "$(jq -cn --arg namespace "${sql_namespace}" --arg sql "SELECT id, name FROM users" '{namespace:$namespace, sql:$sql}')" \
"${sql_target}" sqlrpc.SqlService/Execute \
| jq -e '
.queryResult.columns == ["id", "name"] and
(.queryResult.rows | length) == 1 and
.queryResult.rows[0].values[0].intValue == "1" and
.queryResult.rows[0].values[1].textValue == "Carol"
' >/dev/null
EOS
}
@ -3328,7 +3398,7 @@ validate_fiberlb_flow() {
local org_id="fiberlb-smoke-org"
local project_id="fiberlb-smoke-project"
local principal_id="fiberlb-smoke-$(date +%s)"
local token lb_id pool_id backend_id listener_id listener_port
local token lb_id pool_id health_check_id backend_id listener_id listener_port
token="$(issue_project_admin_token 15080 "${org_id}" "${project_id}" "${principal_id}")"
listener_port=$((18080 + (RANDOM % 100)))
@ -3350,6 +3420,39 @@ validate_fiberlb_flow() {
| jq -r '.pool.id')"
[[ -n "${pool_id}" && "${pool_id}" != "null" ]] || die "FiberLB CreatePool did not return an ID"
health_check_id="$(grpcurl -plaintext \
-H "authorization: Bearer ${token}" \
-import-path "${FIBERLB_PROTO_DIR}" \
-proto "${FIBERLB_PROTO}" \
-d "$(jq -cn --arg name "fiberlb-smoke-health" --arg pool "${pool_id}" '{name:$name, poolId:$pool, type:"HEALTH_CHECK_TYPE_HTTP", intervalSeconds:10, timeoutSeconds:3, healthyThreshold:2, unhealthyThreshold:2, httpConfig:{method:"GET", path:"/health", expectedCodes:[200]}}')" \
127.0.0.1:15085 fiberlb.v1.HealthCheckService/CreateHealthCheck \
| jq -r '.healthCheck.id')"
[[ -n "${health_check_id}" && "${health_check_id}" != "null" ]] || die "FiberLB CreateHealthCheck did not return an ID"
grpcurl -plaintext \
-H "authorization: Bearer ${token}" \
-import-path "${FIBERLB_PROTO_DIR}" \
-proto "${FIBERLB_PROTO}" \
-d "$(jq -cn --arg id "${health_check_id}" '{id:$id}')" \
127.0.0.1:15085 fiberlb.v1.HealthCheckService/GetHealthCheck \
| jq -e --arg id "${health_check_id}" '.healthCheck.id == $id and .healthCheck.httpConfig.path == "/health"' >/dev/null
grpcurl -plaintext \
-H "authorization: Bearer ${token}" \
-import-path "${FIBERLB_PROTO_DIR}" \
-proto "${FIBERLB_PROTO}" \
-d "$(jq -cn --arg pool "${pool_id}" '{poolId:$pool}')" \
127.0.0.1:15085 fiberlb.v1.HealthCheckService/ListHealthChecks \
| jq -e --arg id "${health_check_id}" '(.healthChecks | length) == 1 and .healthChecks[0].id == $id' >/dev/null
grpcurl -plaintext \
-H "authorization: Bearer ${token}" \
-import-path "${FIBERLB_PROTO_DIR}" \
-proto "${FIBERLB_PROTO}" \
-d "$(jq -cn --arg id "${health_check_id}" '{id:$id, name:"fiberlb-smoke-health-updated", intervalSeconds:15, timeoutSeconds:4, healthyThreshold:3, unhealthyThreshold:4, httpConfig:{method:"GET", path:"/readyz", expectedCodes:[200,204]}, enabled:false}')" \
127.0.0.1:15085 fiberlb.v1.HealthCheckService/UpdateHealthCheck \
| jq -e '.healthCheck.name == "fiberlb-smoke-health-updated" and .healthCheck.httpConfig.path == "/readyz" and ((.healthCheck.enabled // false) == false)' >/dev/null
backend_id="$(grpcurl -plaintext \
-H "authorization: Bearer ${token}" \
-import-path "${FIBERLB_PROTO_DIR}" \
@ -3442,6 +3545,12 @@ validate_fiberlb_flow() {
-proto "${FIBERLB_PROTO}" \
-d "$(jq -cn --arg id "${backend_id}" '{id:$id}')" \
127.0.0.1:15085 fiberlb.v1.BackendService/DeleteBackend >/dev/null
grpcurl -plaintext \
-H "authorization: Bearer ${token}" \
-import-path "${FIBERLB_PROTO_DIR}" \
-proto "${FIBERLB_PROTO}" \
-d "$(jq -cn --arg id "${health_check_id}" '{id:$id}')" \
127.0.0.1:15085 fiberlb.v1.HealthCheckService/DeleteHealthCheck >/dev/null
grpcurl -plaintext \
-H "authorization: Bearer ${token}" \
-import-path "${FIBERLB_PROTO_DIR}" \
@ -3798,11 +3907,12 @@ validate_lightningstor_distributed_storage() {
read -r before_node01 before_node04 before_node05 < <(lightningstor_count_triplet)
local key="replication-check-$(date +%s)"
local copy_key="${key}-copy"
local body="distributed-object-${key}"
local body_b64
body_b64="$(printf '%s' "${body}" | base64 -w0)"
local put_json head_json delete_json output
local put_json head_json copy_head_json delete_json copy_delete_json output
put_json="$(
jq -cn \
--arg bucket "${bucket}" \
@ -3849,7 +3959,54 @@ validate_lightningstor_distributed_storage() {
wait_for_lightningstor_counts_greater_than "${before_node01}" "${before_node04}" "${before_node05}" "generic object replication"
local copy_json copied_body
copy_json="$(
jq -cn \
--arg source_bucket "${bucket}" \
--arg source_key "${key}" \
--arg dest_bucket "${bucket}" \
--arg dest_key "${copy_key}" \
'{sourceBucket:$source_bucket, sourceKey:$source_key, destBucket:$dest_bucket, destKey:$dest_key, metadataDirectiveReplace:true, metadata:{contentType:"text/copied", userMetadata:{copied:"yes"}}}'
)"
log "LightningStor distributed replication: COPY ${bucket}/${key} -> ${bucket}/${copy_key}"
output="$(
grpcurl_capture -plaintext \
-H "authorization: Bearer ${token}" \
-import-path "${LIGHTNINGSTOR_PROTO_DIR}" \
-proto "${LIGHTNINGSTOR_PROTO}" \
-d "${copy_json}" \
127.0.0.1:15086 lightningstor.v1.ObjectService/CopyObject
)" || die "failed to copy LightningStor distributed replication probe ${bucket}/${key}: ${output}"
printf '%s\n' "${output}" | jq -e '.etag != "" and .versionId != ""' >/dev/null \
|| die "LightningStor copy response was incomplete: ${output}"
copy_head_json="$(jq -cn --arg bucket "${bucket}" --arg key "${copy_key}" '{bucket:$bucket, key:$key}')"
output="$(
grpcurl_capture -plaintext \
-H "authorization: Bearer ${token}" \
-import-path "${LIGHTNINGSTOR_PROTO_DIR}" \
-proto "${LIGHTNINGSTOR_PROTO}" \
-d "${copy_head_json}" \
127.0.0.1:15086 lightningstor.v1.ObjectService/HeadObject
)" || die "failed to head copied LightningStor object ${bucket}/${copy_key}: ${output}"
printf '%s\n' "${output}" \
| jq -e --arg size "$(printf '%s' "${body}" | wc -c | awk '{print $1}')" '.object.metadata.contentType == "text/copied" and (.object.size | tonumber) == ($size | tonumber)' >/dev/null \
|| die "LightningStor copied object returned unexpected metadata: ${output}"
output="$(
grpcurl_capture -plaintext \
-H "authorization: Bearer ${token}" \
-import-path "${LIGHTNINGSTOR_PROTO_DIR}" \
-proto "${LIGHTNINGSTOR_PROTO}" \
-d "${copy_head_json}" \
127.0.0.1:15086 lightningstor.v1.ObjectService/GetObject
)" || die "failed to fetch copied LightningStor object ${bucket}/${copy_key}: ${output}"
copied_body="$(printf '%s\n' "${output}" | jq -rsr '[.[] | .bodyChunk? | select(. != null) | @base64d] | join("")')" \
|| die "failed to decode copied LightningStor object ${bucket}/${copy_key}: ${output}"
[[ "${copied_body}" == "${body}" ]] || die "copied LightningStor object payload did not match source"
delete_json="$(jq -cn --arg bucket "${bucket}" --arg key "${key}" '{bucket:$bucket, key:$key}')"
copy_delete_json="$(jq -cn --arg bucket "${bucket}" --arg key "${copy_key}" '{bucket:$bucket, key:$key}')"
log "LightningStor distributed replication: DELETE ${bucket}/${key}"
output="$(
grpcurl_capture -plaintext \
@ -3860,6 +4017,16 @@ validate_lightningstor_distributed_storage() {
127.0.0.1:15086 lightningstor.v1.ObjectService/DeleteObject
)" || die "failed to delete LightningStor distributed replication probe ${bucket}/${key}: ${output}"
log "LightningStor distributed replication: DELETE ${bucket}/${copy_key}"
output="$(
grpcurl_capture -plaintext \
-H "authorization: Bearer ${token}" \
-import-path "${LIGHTNINGSTOR_PROTO_DIR}" \
-proto "${LIGHTNINGSTOR_PROTO}" \
-d "${copy_delete_json}" \
127.0.0.1:15086 lightningstor.v1.ObjectService/DeleteObject
)" || die "failed to delete copied LightningStor object ${bucket}/${copy_key}: ${output}"
wait_for_lightningstor_counts_equal "${before_node01}" "${before_node04}" "${before_node05}" "generic object cleanup"
trap - RETURN