From c17e5a6130282852542e339c6124b8ff83ff75bc Mon Sep 17 00:00:00 2001 From: centra Date: Wed, 1 Apr 2026 02:07:55 +0900 Subject: [PATCH] Implement FlareDB SQL routing and service CRUD --- fiberlb/crates/fiberlb-server/src/metadata.rs | 97 +++++- .../src/services/health_check.rs | 283 +++++++++++++----- flaredb/crates/flaredb-server/src/main.rs | 3 +- flaredb/crates/flaredb-server/src/merkle.rs | 7 +- .../crates/flaredb-server/src/pd_client.rs | 4 +- .../crates/flaredb-server/src/raft_service.rs | 14 +- flaredb/crates/flaredb-server/src/rest.rs | 169 +++++++++-- flaredb/crates/flaredb-server/src/service.rs | 40 ++- .../crates/flaredb-server/src/sql_service.rs | 37 +-- flaredb/crates/flaredb-server/src/store.rs | 4 +- flaredb/crates/flaredb-sql/src/executor.rs | 35 ++- flaredb/crates/flaredb-sql/src/parser.rs | 256 ++++++++++++++-- flaredb/crates/flaredb-sql/src/storage.rs | 226 ++++++++++++-- .../src/object_service.rs | 162 +++++++++- nix/test-cluster/run-cluster.sh | 171 ++++++++++- 15 files changed, 1290 insertions(+), 218 deletions(-) diff --git a/fiberlb/crates/fiberlb-server/src/metadata.rs b/fiberlb/crates/fiberlb-server/src/metadata.rs index b623bcb..6d2f4c2 100644 --- a/fiberlb/crates/fiberlb-server/src/metadata.rs +++ b/fiberlb/crates/fiberlb-server/src/metadata.rs @@ -156,7 +156,9 @@ impl LbMetadataStore { ) .execute(pool) .await - .map_err(|e| MetadataError::Storage(format!("Failed to initialize Postgres schema: {}", e)))?; + .map_err(|e| { + MetadataError::Storage(format!("Failed to initialize Postgres schema: {}", e)) + })?; Ok(()) } @@ -169,7 +171,9 @@ impl LbMetadataStore { ) .execute(pool) .await - .map_err(|e| MetadataError::Storage(format!("Failed to initialize SQLite schema: {}", e)))?; + .map_err(|e| { + MetadataError::Storage(format!("Failed to initialize SQLite schema: {}", e)) + })?; Ok(()) } @@ -196,9 +200,7 @@ impl LbMetadataStore { .bind(value) .execute(pool.as_ref()) .await - .map_err(|e| { - MetadataError::Storage(format!("Postgres put failed: {}", e)) - })?; + .map_err(|e| MetadataError::Storage(format!("Postgres put failed: {}", e)))?; } SqlStorageBackend::Sqlite(pool) => { sqlx::query( @@ -439,6 +441,10 @@ impl LbMetadataStore { format!("/fiberlb/healthchecks/{}/{}", pool_id, hc_id) } + fn health_check_id_key(hc_id: &HealthCheckId) -> String { + format!("/fiberlb/healthcheck_ids/{}", hc_id) + } + fn health_check_prefix(pool_id: &PoolId) -> String { format!("/fiberlb/healthchecks/{}/", pool_id) } @@ -865,7 +871,8 @@ impl LbMetadataStore { MetadataError::Serialization(format!("Failed to serialize health check: {}", e)) })?; - self.put(&key, &value).await + self.put(&key, &value).await?; + self.put(&Self::health_check_id_key(&hc.id), &key).await } /// Load health check @@ -886,6 +893,30 @@ impl LbMetadataStore { } } + /// Load health check by ID using the global ID index. + pub async fn load_health_check_by_id( + &self, + hc_id: &HealthCheckId, + ) -> Result> { + let id_key = Self::health_check_id_key(hc_id); + + if let Some(hc_key) = self.get(&id_key).await? { + if let Some(value) = self.get(&hc_key).await? { + let hc: HealthCheck = serde_json::from_str(&value).map_err(|e| { + MetadataError::Serialization(format!( + "Failed to deserialize health check: {}", + e + )) + })?; + Ok(Some(hc)) + } else { + Ok(None) + } + } else { + Ok(None) + } + } + /// List health checks for a pool pub async fn list_health_checks(&self, pool_id: &PoolId) -> Result> { let prefix = Self::health_check_prefix(pool_id); @@ -907,7 +938,8 @@ impl LbMetadataStore { /// Delete health check pub async fn delete_health_check(&self, hc: &HealthCheck) -> Result<()> { let key = Self::health_check_key(&hc.pool_id, &hc.id); - self.delete_key(&key).await + self.delete_key(&key).await?; + self.delete_key(&Self::health_check_id_key(&hc.id)).await } /// Delete all health checks for a pool @@ -1215,7 +1247,7 @@ fn normalize_transport_addr(endpoint: &str) -> String { #[cfg(test)] mod tests { use super::*; - use fiberlb_types::{ListenerProtocol, PoolAlgorithm, PoolProtocol}; + use fiberlb_types::{HealthCheck, ListenerProtocol, PoolAlgorithm, PoolProtocol}; #[tokio::test] async fn test_lb_crud() { @@ -1355,6 +1387,55 @@ mod tests { assert!(deleted.is_none()); } + #[tokio::test] + async fn test_health_check_crud() { + let store = LbMetadataStore::new_in_memory(); + + let lb = LoadBalancer::new("test-lb", "test-org", "test-project"); + store.save_lb(&lb).await.unwrap(); + + let pool = Pool::new( + "web-pool", + lb.id, + PoolAlgorithm::RoundRobin, + PoolProtocol::Http, + ); + store.save_pool(&pool).await.unwrap(); + + let hc = HealthCheck::new_http("http-check", pool.id, "/healthz"); + store.save_health_check(&hc).await.unwrap(); + + let loaded = store + .load_health_check(&pool.id, &hc.id) + .await + .unwrap() + .unwrap(); + assert_eq!(loaded.id, hc.id); + assert_eq!(loaded.name, "http-check"); + + let loaded_by_id = store + .load_health_check_by_id(&hc.id) + .await + .unwrap() + .unwrap(); + assert_eq!(loaded_by_id.pool_id, pool.id); + + let checks = store.list_health_checks(&pool.id).await.unwrap(); + assert_eq!(checks.len(), 1); + + store.delete_health_check(&hc).await.unwrap(); + assert!(store + .load_health_check(&pool.id, &hc.id) + .await + .unwrap() + .is_none()); + assert!(store + .load_health_check_by_id(&hc.id) + .await + .unwrap() + .is_none()); + } + #[tokio::test] async fn test_cascade_delete() { let store = LbMetadataStore::new_in_memory(); diff --git a/fiberlb/crates/fiberlb-server/src/services/health_check.rs b/fiberlb/crates/fiberlb-server/src/services/health_check.rs index 8e751bb..afed199 100644 --- a/fiberlb/crates/fiberlb-server/src/services/health_check.rs +++ b/fiberlb/crates/fiberlb-server/src/services/health_check.rs @@ -2,17 +2,15 @@ use std::sync::Arc; -use base64::Engine as _; use crate::metadata::LbMetadataStore; +use base64::Engine as _; use fiberlb_api::{ - health_check_service_server::HealthCheckService, - CreateHealthCheckRequest, CreateHealthCheckResponse, - DeleteHealthCheckRequest, DeleteHealthCheckResponse, - GetHealthCheckRequest, GetHealthCheckResponse, - ListHealthChecksRequest, ListHealthChecksResponse, - UpdateHealthCheckRequest, UpdateHealthCheckResponse, - HealthCheck as ProtoHealthCheck, HealthCheckType as ProtoHealthCheckType, - HttpHealthConfig as ProtoHttpHealthConfig, + health_check_service_server::HealthCheckService, CreateHealthCheckRequest, + CreateHealthCheckResponse, DeleteHealthCheckRequest, DeleteHealthCheckResponse, + GetHealthCheckRequest, GetHealthCheckResponse, HealthCheck as ProtoHealthCheck, + HealthCheckType as ProtoHealthCheckType, HttpHealthConfig as ProtoHttpHealthConfig, + ListHealthChecksRequest, ListHealthChecksResponse, UpdateHealthCheckRequest, + UpdateHealthCheckResponse, }; use fiberlb_types::{HealthCheck, HealthCheckId, HealthCheckType, HttpHealthConfig, PoolId}; use iam_service_auth::{get_tenant_context, resource_for_tenant, AuthService}; @@ -33,7 +31,10 @@ impl HealthCheckServiceImpl { } const ACTION_HEALTHCHECKS_CREATE: &str = "network:healthchecks:create"; +const ACTION_HEALTHCHECKS_READ: &str = "network:healthchecks:read"; const ACTION_HEALTHCHECKS_LIST: &str = "network:healthchecks:list"; +const ACTION_HEALTHCHECKS_UPDATE: &str = "network:healthchecks:update"; +const ACTION_HEALTHCHECKS_DELETE: &str = "network:healthchecks:delete"; /// Convert domain HealthCheck to proto fn health_check_to_proto(hc: &HealthCheck) -> ProtoHealthCheck { @@ -52,13 +53,11 @@ fn health_check_to_proto(hc: &HealthCheck) -> ProtoHealthCheck { timeout_seconds: hc.timeout_seconds, healthy_threshold: hc.healthy_threshold, unhealthy_threshold: hc.unhealthy_threshold, - http_config: hc.http_config.as_ref().map(|cfg| { - ProtoHttpHealthConfig { - method: cfg.method.clone(), - path: cfg.path.clone(), - expected_codes: cfg.expected_codes.iter().map(|&c| c as u32).collect(), - host: cfg.host.clone().unwrap_or_default(), - } + http_config: hc.http_config.as_ref().map(|cfg| ProtoHttpHealthConfig { + method: cfg.method.clone(), + path: cfg.path.clone(), + expected_codes: cfg.expected_codes.iter().map(|&c| c as u32).collect(), + host: cfg.host.clone().unwrap_or_default(), }), enabled: hc.enabled, created_at: hc.created_at, @@ -100,7 +99,11 @@ fn proto_to_http_config(cfg: Option) -> Option = None; - for lb in &lbs { - if let Some(_) = self - .metadata - .load_pool(&lb.id, &pool_id) - .await - .map_err(|e| Status::internal(format!("metadata error: {}", e)))? - { - scope = Some((lb.org_id.clone(), lb.project_id.clone())); - break; - } + .map_err(|e| Status::internal(format!("metadata error: {}", e)))? + .ok_or_else(|| Status::not_found("pool not found"))?; + let lb = self + .metadata + .load_lb_by_id(&pool.loadbalancer_id) + .await + .map_err(|e| Status::internal(format!("metadata error: {}", e)))? + .ok_or_else(|| Status::not_found("load balancer not found"))?; + if lb.org_id != tenant.org_id || lb.project_id != tenant.project_id { + return Err(Status::permission_denied("pool not in tenant scope")); } - let (lb_org_id, lb_project_id) = - scope.ok_or_else(|| Status::not_found("pool not found"))?; - self.auth .authorize( &tenant, ACTION_HEALTHCHECKS_CREATE, - &resource_for_tenant("health-check", "*", &lb_org_id, &lb_project_id), + &resource_for_tenant("health-check", "*", &lb.org_id, &lb.project_id), ) .await?; let check_type = proto_to_check_type(req.r#type); // Create health check based on type - let mut hc = if check_type == HealthCheckType::Http || check_type == HealthCheckType::Https { - let path = req.http_config.as_ref().map(|c| c.path.as_str()).unwrap_or("/health"); + let mut hc = if check_type == HealthCheckType::Http || check_type == HealthCheckType::Https + { + let path = req + .http_config + .as_ref() + .map(|c| c.path.as_str()) + .unwrap_or("/health"); HealthCheck::new_http(&req.name, pool_id, path) } else { HealthCheck::new_tcp(&req.name, pool_id) @@ -195,18 +197,55 @@ impl HealthCheckService for HealthCheckServiceImpl { &self, request: Request, ) -> Result, Status> { + let tenant = get_tenant_context(&request)?; let req = request.into_inner(); if req.id.is_empty() { return Err(Status::invalid_argument("id is required")); } - let _hc_id = parse_hc_id(&req.id)?; + let hc_id = parse_hc_id(&req.id)?; + let hc = self + .metadata + .load_health_check_by_id(&hc_id) + .await + .map_err(|e| Status::internal(format!("metadata error: {}", e)))? + .ok_or_else(|| Status::not_found("health check not found"))?; + let pool = self + .metadata + .load_pool_by_id(&hc.pool_id) + .await + .map_err(|e| Status::internal(format!("metadata error: {}", e)))? + .ok_or_else(|| Status::not_found("pool not found"))?; + let lb = self + .metadata + .load_lb_by_id(&pool.loadbalancer_id) + .await + .map_err(|e| Status::internal(format!("metadata error: {}", e)))? + .ok_or_else(|| Status::not_found("load balancer not found"))?; - // Need pool_id context to efficiently look up health check - Err(Status::unimplemented( - "get_health_check by ID requires pool_id context; use list_health_checks instead", - )) + if lb.org_id != tenant.org_id || lb.project_id != tenant.project_id { + return Err(Status::permission_denied( + "health check not in tenant scope", + )); + } + + self.auth + .authorize( + &tenant, + ACTION_HEALTHCHECKS_READ, + &resource_for_tenant( + "health-check", + &hc.id.to_string(), + &lb.org_id, + &lb.project_id, + ), + ) + .await?; + + Ok(Response::new(GetHealthCheckResponse { + health_check: Some(health_check_to_proto(&hc)), + })) } async fn list_health_checks( @@ -222,33 +261,27 @@ impl HealthCheckService for HealthCheckServiceImpl { let pool_id = parse_pool_id(&req.pool_id)?; - let lbs = self + let pool = self .metadata - .list_lbs(&tenant.org_id, Some(tenant.project_id.as_str())) + .load_pool_by_id(&pool_id) .await - .map_err(|e| Status::internal(format!("metadata error: {}", e)))?; - - let mut scope: Option<(String, String)> = None; - for lb in &lbs { - if let Some(_) = self - .metadata - .load_pool(&lb.id, &pool_id) - .await - .map_err(|e| Status::internal(format!("metadata error: {}", e)))? - { - scope = Some((lb.org_id.clone(), lb.project_id.clone())); - break; - } + .map_err(|e| Status::internal(format!("metadata error: {}", e)))? + .ok_or_else(|| Status::not_found("pool not found"))?; + let lb = self + .metadata + .load_lb_by_id(&pool.loadbalancer_id) + .await + .map_err(|e| Status::internal(format!("metadata error: {}", e)))? + .ok_or_else(|| Status::not_found("load balancer not found"))?; + if lb.org_id != tenant.org_id || lb.project_id != tenant.project_id { + return Err(Status::permission_denied("pool not in tenant scope")); } - let (lb_org_id, lb_project_id) = - scope.ok_or_else(|| Status::not_found("pool not found"))?; - self.auth .authorize( &tenant, ACTION_HEALTHCHECKS_LIST, - &resource_for_tenant("health-check", "*", &lb_org_id, &lb_project_id), + &resource_for_tenant("health-check", "*", &lb.org_id, &lb.project_id), ) .await?; @@ -299,31 +332,141 @@ impl HealthCheckService for HealthCheckServiceImpl { &self, request: Request, ) -> Result, Status> { + let tenant = get_tenant_context(&request)?; let req = request.into_inner(); if req.id.is_empty() { return Err(Status::invalid_argument("id is required")); } - // Need pool_id context for update - Err(Status::unimplemented( - "update_health_check requires pool_id context; include pool_id in request", - )) + let hc_id = parse_hc_id(&req.id)?; + let mut hc = self + .metadata + .load_health_check_by_id(&hc_id) + .await + .map_err(|e| Status::internal(format!("metadata error: {}", e)))? + .ok_or_else(|| Status::not_found("health check not found"))?; + let pool = self + .metadata + .load_pool_by_id(&hc.pool_id) + .await + .map_err(|e| Status::internal(format!("metadata error: {}", e)))? + .ok_or_else(|| Status::not_found("pool not found"))?; + let lb = self + .metadata + .load_lb_by_id(&pool.loadbalancer_id) + .await + .map_err(|e| Status::internal(format!("metadata error: {}", e)))? + .ok_or_else(|| Status::not_found("load balancer not found"))?; + + if lb.org_id != tenant.org_id || lb.project_id != tenant.project_id { + return Err(Status::permission_denied( + "health check not in tenant scope", + )); + } + + self.auth + .authorize( + &tenant, + ACTION_HEALTHCHECKS_UPDATE, + &resource_for_tenant( + "health-check", + &hc.id.to_string(), + &lb.org_id, + &lb.project_id, + ), + ) + .await?; + + if !req.name.is_empty() { + hc.name = req.name; + } + if req.interval_seconds > 0 { + hc.interval_seconds = req.interval_seconds; + } + if req.timeout_seconds > 0 { + hc.timeout_seconds = req.timeout_seconds; + } + if req.healthy_threshold > 0 { + hc.healthy_threshold = req.healthy_threshold; + } + if req.unhealthy_threshold > 0 { + hc.unhealthy_threshold = req.unhealthy_threshold; + } + if req.http_config.is_some() { + hc.http_config = proto_to_http_config(req.http_config); + } + hc.enabled = req.enabled; + hc.updated_at = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + + self.metadata + .save_health_check(&hc) + .await + .map_err(|e| Status::internal(format!("failed to save health check: {}", e)))?; + + Ok(Response::new(UpdateHealthCheckResponse { + health_check: Some(health_check_to_proto(&hc)), + })) } async fn delete_health_check( &self, request: Request, ) -> Result, Status> { + let tenant = get_tenant_context(&request)?; let req = request.into_inner(); if req.id.is_empty() { return Err(Status::invalid_argument("id is required")); } - // Need pool_id context for delete - Err(Status::unimplemented( - "delete_health_check requires pool_id context; include pool_id in request", - )) + let hc_id = parse_hc_id(&req.id)?; + let hc = self + .metadata + .load_health_check_by_id(&hc_id) + .await + .map_err(|e| Status::internal(format!("metadata error: {}", e)))? + .ok_or_else(|| Status::not_found("health check not found"))?; + let pool = self + .metadata + .load_pool_by_id(&hc.pool_id) + .await + .map_err(|e| Status::internal(format!("metadata error: {}", e)))? + .ok_or_else(|| Status::not_found("pool not found"))?; + let lb = self + .metadata + .load_lb_by_id(&pool.loadbalancer_id) + .await + .map_err(|e| Status::internal(format!("metadata error: {}", e)))? + .ok_or_else(|| Status::not_found("load balancer not found"))?; + + if lb.org_id != tenant.org_id || lb.project_id != tenant.project_id { + return Err(Status::permission_denied( + "health check not in tenant scope", + )); + } + + self.auth + .authorize( + &tenant, + ACTION_HEALTHCHECKS_DELETE, + &resource_for_tenant( + "health-check", + &hc.id.to_string(), + &lb.org_id, + &lb.project_id, + ), + ) + .await?; + + self.metadata + .delete_health_check(&hc) + .await + .map_err(|e| Status::internal(format!("failed to delete health check: {}", e)))?; + + Ok(Response::new(DeleteHealthCheckResponse {})) } } diff --git a/flaredb/crates/flaredb-server/src/main.rs b/flaredb/crates/flaredb-server/src/main.rs index dcddcf0..8280724 100644 --- a/flaredb/crates/flaredb-server/src/main.rs +++ b/flaredb/crates/flaredb-server/src/main.rs @@ -585,7 +585,8 @@ async fn main() -> Result<(), Box> { info!("FlareDB server starting with health checks enabled"); // Create SQL service - let sql_service = sql_service::SqlServiceImpl::new(server_config.addr.to_string()); + let sql_service = + sql_service::SqlServiceImpl::new(server_config.addr.to_string(), pd_endpoints.clone()); // Configure TLS if enabled let mut server = Server::builder(); diff --git a/flaredb/crates/flaredb-server/src/merkle.rs b/flaredb/crates/flaredb-server/src/merkle.rs index 5649806..6f606fb 100644 --- a/flaredb/crates/flaredb-server/src/merkle.rs +++ b/flaredb/crates/flaredb-server/src/merkle.rs @@ -33,10 +33,9 @@ pub fn build_merkle( let mut start_key = None; while let Some(Ok((k, v))) = iter.next() { - if (k.len() < 4 || k[..4] != prefix) - && !k.starts_with(&prefix) { - break; - } + if (k.len() < 4 || k[..4] != prefix) && !k.starts_with(&prefix) { + break; + } if start_key.is_none() { start_key = Some(k.to_vec()); } diff --git a/flaredb/crates/flaredb-server/src/pd_client.rs b/flaredb/crates/flaredb-server/src/pd_client.rs index 9cc4bf9..2d32760 100644 --- a/flaredb/crates/flaredb-server/src/pd_client.rs +++ b/flaredb/crates/flaredb-server/src/pd_client.rs @@ -199,9 +199,7 @@ impl PdClient { } /// Start watching for metadata changes in the background - pub async fn start_watch( - &mut self, - ) -> Result<(), Box> { + pub async fn start_watch(&mut self) -> Result<(), Box> { let (tx, rx) = mpsc::channel::(32); // Create watch requests for stores and regions prefixes diff --git a/flaredb/crates/flaredb-server/src/raft_service.rs b/flaredb/crates/flaredb-server/src/raft_service.rs index 96d80f2..f40ba86 100644 --- a/flaredb/crates/flaredb-server/src/raft_service.rs +++ b/flaredb/crates/flaredb-server/src/raft_service.rs @@ -1,7 +1,6 @@ +use crate::config::NamespaceManager; use crate::store::Store; -use flaredb_raft::{FlareNode, FlareNodeId, FlareTypeConfig}; use flaredb_proto::raft_server::raft_service_server::RaftService; -use openraft::raft::{AppendEntriesRequest, VoteRequest}; use flaredb_proto::raft_server::{ FetchRangeRequest, FetchRangeResponse, ForwardEventualRequest, GetMerkleRequest, GetMerkleResponse, GetModeRequest, GetModeResponse, ListNamespaceModesRequest, @@ -10,9 +9,10 @@ use flaredb_proto::raft_server::{ OpenRaftVoteRequest, OpenRaftVoteResponse, RaftMessage, RaftResponse, UpdateNamespaceModeRequest, UpdateNamespaceModeResponse, }; +use flaredb_raft::{FlareNode, FlareNodeId, FlareTypeConfig}; +use openraft::raft::{AppendEntriesRequest, VoteRequest}; use std::sync::Arc; use tonic::{Request, Response, Status}; -use crate::config::NamespaceManager; pub struct RaftServiceImpl { store: Arc, @@ -204,10 +204,10 @@ impl RaftService for RaftServiceImpl { .await .ok_or_else(|| Status::failed_precondition("region not found"))?; - let append_req: AppendEntriesRequest = - serde_json::from_slice(&req.data).map_err(|e| { - Status::invalid_argument(format!("invalid append_entries request: {}", e)) - })?; + let append_req: AppendEntriesRequest = serde_json::from_slice(&req.data) + .map_err(|e| { + Status::invalid_argument(format!("invalid append_entries request: {}", e)) + })?; let resp = node .raft diff --git a/flaredb/crates/flaredb-server/src/rest.rs b/flaredb/crates/flaredb-server/src/rest.rs index 07dc09a..db5e3cd 100644 --- a/flaredb/crates/flaredb-server/src/rest.rs +++ b/flaredb/crates/flaredb-server/src/rest.rs @@ -8,13 +8,13 @@ //! - GET /api/v1/scan - Range scan //! - GET /health - Health check +use crate::pd_client::PdClient; use axum::{ extract::{Path, Query, State}, http::StatusCode, routing::{get, post, put}, Json, Router, }; -use crate::pd_client::PdClient; use flaredb_client::RdbClient; use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -172,7 +172,9 @@ pub fn build_router(state: RestApiState) -> Router { async fn health_check() -> (StatusCode, Json>) { ( StatusCode::OK, - Json(SuccessResponse::new(serde_json::json!({ "status": "healthy" }))), + Json(SuccessResponse::new( + serde_json::json!({ "status": "healthy" }), + )), ) } @@ -211,12 +213,24 @@ async fn get_kv( ) -> Result>, (StatusCode, Json)> { let mut client = RdbClient::connect_direct(state.server_addr.clone(), "default") .await - .map_err(|e| error_response(StatusCode::SERVICE_UNAVAILABLE, "SERVICE_UNAVAILABLE", &format!("Failed to connect: {}", e)))?; + .map_err(|e| { + error_response( + StatusCode::SERVICE_UNAVAILABLE, + "SERVICE_UNAVAILABLE", + &format!("Failed to connect: {}", e), + ) + })?; let value = client .raw_get(key.as_bytes().to_vec()) .await - .map_err(|e| error_response(StatusCode::INTERNAL_SERVER_ERROR, "INTERNAL_ERROR", &e.to_string()))? + .map_err(|e| { + error_response( + StatusCode::INTERNAL_SERVER_ERROR, + "INTERNAL_ERROR", + &e.to_string(), + ) + })? .ok_or_else(|| error_response(StatusCode::NOT_FOUND, "NOT_FOUND", "Key not found"))?; Ok(Json(SuccessResponse::new(GetResponse { @@ -230,19 +244,34 @@ async fn put_kv( State(state): State, Path(key): Path, Json(req): Json, -) -> Result<(StatusCode, Json>), (StatusCode, Json)> { +) -> Result<(StatusCode, Json>), (StatusCode, Json)> +{ let mut client = RdbClient::connect_direct(state.server_addr.clone(), &req.namespace) .await - .map_err(|e| error_response(StatusCode::SERVICE_UNAVAILABLE, "SERVICE_UNAVAILABLE", &format!("Failed to connect: {}", e)))?; + .map_err(|e| { + error_response( + StatusCode::SERVICE_UNAVAILABLE, + "SERVICE_UNAVAILABLE", + &format!("Failed to connect: {}", e), + ) + })?; client .raw_put(key.as_bytes().to_vec(), req.value.as_bytes().to_vec()) .await - .map_err(|e| error_response(StatusCode::INTERNAL_SERVER_ERROR, "INTERNAL_ERROR", &e.to_string()))?; + .map_err(|e| { + error_response( + StatusCode::INTERNAL_SERVER_ERROR, + "INTERNAL_ERROR", + &e.to_string(), + ) + })?; Ok(( StatusCode::OK, - Json(SuccessResponse::new(serde_json::json!({ "key": key, "success": true }))), + Json(SuccessResponse::new( + serde_json::json!({ "key": key, "success": true }), + )), )) } @@ -253,15 +282,31 @@ async fn scan_kv( ) -> Result>, (StatusCode, Json)> { let mut client = RdbClient::connect_direct(state.server_addr.clone(), ¶ms.namespace) .await - .map_err(|e| error_response(StatusCode::SERVICE_UNAVAILABLE, "SERVICE_UNAVAILABLE", &format!("Failed to connect: {}", e)))?; + .map_err(|e| { + error_response( + StatusCode::SERVICE_UNAVAILABLE, + "SERVICE_UNAVAILABLE", + &format!("Failed to connect: {}", e), + ) + })?; let start_key = params.start.unwrap_or_default(); let end_key = params.end.unwrap_or_else(|| format!("{}~", start_key)); let (keys, values, _next) = client - .raw_scan(start_key.as_bytes().to_vec(), end_key.as_bytes().to_vec(), 100) + .raw_scan( + start_key.as_bytes().to_vec(), + end_key.as_bytes().to_vec(), + 100, + ) .await - .map_err(|e| error_response(StatusCode::INTERNAL_SERVER_ERROR, "INTERNAL_ERROR", &e.to_string()))?; + .map_err(|e| { + error_response( + StatusCode::INTERNAL_SERVER_ERROR, + "INTERNAL_ERROR", + &e.to_string(), + ) + })?; let items: Vec = keys .into_iter() @@ -282,13 +327,31 @@ async fn get_region( ) -> Result>, (StatusCode, Json)> { let mut pd_client = PdClient::connect_any(&state.pd_endpoints) .await - .map_err(|e| error_response(StatusCode::SERVICE_UNAVAILABLE, "PD_UNAVAILABLE", &format!("Failed to connect to PD: {}", e)))?; + .map_err(|e| { + error_response( + StatusCode::SERVICE_UNAVAILABLE, + "PD_UNAVAILABLE", + &format!("Failed to connect to PD: {}", e), + ) + })?; let region = pd_client .get_region(id) .await - .map_err(|e| error_response(StatusCode::INTERNAL_SERVER_ERROR, "INTERNAL_ERROR", &e.to_string()))? - .ok_or_else(|| error_response(StatusCode::NOT_FOUND, "NOT_FOUND", &format!("Region {} not found", id)))?; + .map_err(|e| { + error_response( + StatusCode::INTERNAL_SERVER_ERROR, + "INTERNAL_ERROR", + &e.to_string(), + ) + })? + .ok_or_else(|| { + error_response( + StatusCode::NOT_FOUND, + "NOT_FOUND", + &format!("Region {} not found", id), + ) + })?; Ok(Json(SuccessResponse::new(RegionResponse { id: region.id, @@ -305,23 +368,44 @@ async fn add_peer_to_region( ) -> Result>, (StatusCode, Json)> { let mut pd_client = PdClient::connect_any(&state.pd_endpoints) .await - .map_err(|e| error_response(StatusCode::SERVICE_UNAVAILABLE, "PD_UNAVAILABLE", &format!("Failed to connect to PD: {}", e)))?; + .map_err(|e| { + error_response( + StatusCode::SERVICE_UNAVAILABLE, + "PD_UNAVAILABLE", + &format!("Failed to connect to PD: {}", e), + ) + })?; let mut region = pd_client .get_region(id) .await - .map_err(|e| error_response(StatusCode::INTERNAL_SERVER_ERROR, "INTERNAL_ERROR", &e.to_string()))? - .ok_or_else(|| error_response(StatusCode::NOT_FOUND, "NOT_FOUND", &format!("Region {} not found", id)))?; + .map_err(|e| { + error_response( + StatusCode::INTERNAL_SERVER_ERROR, + "INTERNAL_ERROR", + &e.to_string(), + ) + })? + .ok_or_else(|| { + error_response( + StatusCode::NOT_FOUND, + "NOT_FOUND", + &format!("Region {} not found", id), + ) + })?; // Add peer if not already present if !region.peers.contains(&req.peer_id) { region.peers.push(req.peer_id); region.peers.sort(); - pd_client - .put_region(region.clone()) - .await - .map_err(|e| error_response(StatusCode::INTERNAL_SERVER_ERROR, "INTERNAL_ERROR", &e.to_string()))?; + pd_client.put_region(region.clone()).await.map_err(|e| { + error_response( + StatusCode::INTERNAL_SERVER_ERROR, + "INTERNAL_ERROR", + &e.to_string(), + ) + })?; } Ok(Json(SuccessResponse::new(RegionResponse { @@ -335,7 +419,8 @@ async fn add_peer_to_region( async fn add_member_legacy( State(state): State, Json(req): Json, -) -> Result<(StatusCode, Json>), (StatusCode, Json)> { +) -> Result<(StatusCode, Json>), (StatusCode, Json)> +{ let (peer_id, peer_addr) = resolve_join_peer(&state, &req).ok_or_else(|| { error_response( StatusCode::BAD_REQUEST, @@ -346,7 +431,13 @@ async fn add_member_legacy( let mut pd_client = PdClient::connect_any(&state.pd_endpoints) .await - .map_err(|e| error_response(StatusCode::SERVICE_UNAVAILABLE, "PD_UNAVAILABLE", &format!("Failed to connect to PD: {}", e)))?; + .map_err(|e| { + error_response( + StatusCode::SERVICE_UNAVAILABLE, + "PD_UNAVAILABLE", + &format!("Failed to connect to PD: {}", e), + ) + })?; let stores = pd_client.list_stores().await; let already_registered = stores.iter().any(|store| store.id == peer_id); @@ -354,14 +445,26 @@ async fn add_member_legacy( pd_client .register_store(peer_id, peer_addr.clone()) .await - .map_err(|e| error_response(StatusCode::INTERNAL_SERVER_ERROR, "INTERNAL_ERROR", &e.to_string()))?; + .map_err(|e| { + error_response( + StatusCode::INTERNAL_SERVER_ERROR, + "INTERNAL_ERROR", + &e.to_string(), + ) + })?; let mut regions = pd_client.list_regions().await; if regions.is_empty() { pd_client .init_default_region(vec![state.store_id, peer_id]) .await - .map_err(|e| error_response(StatusCode::INTERNAL_SERVER_ERROR, "INTERNAL_ERROR", &e.to_string()))?; + .map_err(|e| { + error_response( + StatusCode::INTERNAL_SERVER_ERROR, + "INTERNAL_ERROR", + &e.to_string(), + ) + })?; regions = vec![crate::pd_client::RegionInfo { id: 1, start_key: Vec::new(), @@ -376,10 +479,13 @@ async fn add_member_legacy( if !region.peers.contains(&peer_id) { region.peers.push(peer_id); region.peers.sort_unstable(); - pd_client - .put_region(region.clone()) - .await - .map_err(|e| error_response(StatusCode::INTERNAL_SERVER_ERROR, "INTERNAL_ERROR", &e.to_string()))?; + pd_client.put_region(region.clone()).await.map_err(|e| { + error_response( + StatusCode::INTERNAL_SERVER_ERROR, + "INTERNAL_ERROR", + &e.to_string(), + ) + })?; updated_regions.push(region.id); } } @@ -403,10 +509,7 @@ async fn add_member_legacy( )) } -fn resolve_join_peer( - state: &RestApiState, - req: &AddMemberRequestLegacy, -) -> Option<(u64, String)> { +fn resolve_join_peer(state: &RestApiState, req: &AddMemberRequestLegacy) -> Option<(u64, String)> { if let Ok(peer_id) = req.id.parse::() { if let Some(addr) = req .addr diff --git a/flaredb/crates/flaredb-server/src/service.rs b/flaredb/crates/flaredb-server/src/service.rs index cf60130..deff204 100644 --- a/flaredb/crates/flaredb-server/src/service.rs +++ b/flaredb/crates/flaredb-server/src/service.rs @@ -1,16 +1,19 @@ use crate::config::{ - decode_value_with_ts, encode_namespaced_key, encode_value_with_ts, ConsistencyMode, + decode_value_with_ts, + encode_namespaced_key, + encode_value_with_ts, + ConsistencyMode, NamespaceManager, // Renamed from ServerConfig }; use crate::store::Store; -use flaredb_raft::FlareRaftNode; use flaredb_proto::kvrpc::kv_cas_server::KvCas; use flaredb_proto::kvrpc::kv_raw_server::KvRaw; use flaredb_proto::kvrpc::{ - CasRequest, CasResponse, DeleteRequest, DeleteResponse, GetRequest, GetResponse, RawDeleteRequest, - RawDeleteResponse, RawGetRequest, RawGetResponse, RawPutRequest, RawPutResponse, RawScanRequest, - RawScanResponse, ScanRequest, ScanResponse, VersionedKv, + CasRequest, CasResponse, DeleteRequest, DeleteResponse, GetRequest, GetResponse, + RawDeleteRequest, RawDeleteResponse, RawGetRequest, RawGetResponse, RawPutRequest, + RawPutResponse, RawScanRequest, RawScanResponse, ScanRequest, ScanResponse, VersionedKv, }; +use flaredb_raft::FlareRaftNode; use flaredb_storage::rocks_engine::RocksEngine; use flaredb_storage::StorageEngine; use std::sync::Arc; @@ -30,7 +33,11 @@ pub struct KvServiceImpl { } impl KvServiceImpl { - pub fn new(engine: Arc, namespace_manager: Arc, store: Arc) -> Self { + pub fn new( + engine: Arc, + namespace_manager: Arc, + store: Arc, + ) -> Self { Self { engine, namespace_manager, @@ -245,9 +252,9 @@ impl KvRaw for KvServiceImpl { let ts = Self::now_millis(); if let Some(node) = self.route_raft_node(&encoded).await? { let existed = node.read_kv(ns_id, &req.key).await.is_some(); - node.delete_kv(ns_id, req.key, ts) - .await - .map_err(|e| Status::failed_precondition(format!("raft raw_delete failed: {}", e)))?; + node.delete_kv(ns_id, req.key, ts).await.map_err(|e| { + Status::failed_precondition(format!("raft raw_delete failed: {}", e)) + })?; Ok(Response::new(RawDeleteResponse { success: true, existed, @@ -573,7 +580,10 @@ mod tests { let service = KvServiceImpl::new( engine, - Arc::new(NamespaceManager::new(ConsistencyMode::Strong, HashMap::new())), // Use NamespaceManager directly + Arc::new(NamespaceManager::new( + ConsistencyMode::Strong, + HashMap::new(), + )), // Use NamespaceManager directly store, ); @@ -635,11 +645,17 @@ mod tests { let service = KvServiceImpl::new( engine, - Arc::new(NamespaceManager::new(ConsistencyMode::Strong, HashMap::new())), + Arc::new(NamespaceManager::new( + ConsistencyMode::Strong, + HashMap::new(), + )), store, ); - for (key, value) in [(b"k1".to_vec(), b"v1".to_vec()), (b"k2".to_vec(), b"v2".to_vec())] { + for (key, value) in [ + (b"k1".to_vec(), b"v1".to_vec()), + (b"k2".to_vec(), b"v2".to_vec()), + ] { service .compare_and_swap(Request::new(CasRequest { key, diff --git a/flaredb/crates/flaredb-server/src/sql_service.rs b/flaredb/crates/flaredb-server/src/sql_service.rs index 6457b58..85eda03 100644 --- a/flaredb/crates/flaredb-server/src/sql_service.rs +++ b/flaredb/crates/flaredb-server/src/sql_service.rs @@ -13,11 +13,16 @@ use tonic::{Request, Response, Status}; pub struct SqlServiceImpl { /// Address of the local FlareDB server server_addr: String, + /// ChainFire/PD endpoints used for region-aware routing + pd_endpoints: Vec, } impl SqlServiceImpl { - pub fn new(server_addr: String) -> Self { - Self { server_addr } + pub fn new(server_addr: String, pd_endpoints: Vec) -> Self { + Self { + server_addr, + pd_endpoints, + } } fn value_to_proto(value: &Value) -> ProtoValue { @@ -41,9 +46,7 @@ impl SqlServiceImpl { is_null: false, }, Value::Timestamp(ts) => ProtoValue { - value: Some(flaredb_proto::sqlrpc::sql_value::Value::TimestampValue( - *ts, - )), + value: Some(flaredb_proto::sqlrpc::sql_value::Value::TimestampValue(*ts)), is_null: false, }, } @@ -52,16 +55,17 @@ impl SqlServiceImpl { #[tonic::async_trait] impl SqlServiceTrait for SqlServiceImpl { - async fn execute( - &self, - request: Request, - ) -> Result, Status> { + async fn execute(&self, request: Request) -> Result, Status> { let req = request.into_inner(); - // Connect to the local FlareDB server with the requested namespace - let client = RdbClient::connect_direct(self.server_addr.clone(), req.namespace.clone()) - .await - .map_err(|e| Status::internal(format!("Failed to connect to FlareDB: {}", e)))?; + // Use PD-backed routing so SQL works for both strong and eventual namespaces. + let client = RdbClient::connect_with_pd_namespace( + self.server_addr.clone(), + self.pd_endpoints.join(","), + req.namespace.clone(), + ) + .await + .map_err(|e| Status::internal(format!("Failed to connect to FlareDB: {}", e)))?; // Create executor let executor = SqlExecutor::new(Arc::new(Mutex::new(client))); @@ -81,9 +85,7 @@ impl SqlServiceTrait for SqlServiceImpl { }, ExecutionResult::DmlSuccess(rows_affected) => SqlResponse { result: Some(flaredb_proto::sqlrpc::sql_response::Result::DmlResult( - DmlResult { - rows_affected, - }, + DmlResult { rows_affected }, )), }, ExecutionResult::Query(query_result) => { @@ -103,8 +105,7 @@ impl SqlServiceTrait for SqlServiceImpl { }, )), } - } - // Errors are returned via Result::Err and handled by .map_err() above + } // Errors are returned via Result::Err and handled by .map_err() above }; Ok(Response::new(response)) diff --git a/flaredb/crates/flaredb-server/src/store.rs b/flaredb/crates/flaredb-server/src/store.rs index 571a227..cac90a5 100644 --- a/flaredb/crates/flaredb-server/src/store.rs +++ b/flaredb/crates/flaredb-server/src/store.rs @@ -10,7 +10,7 @@ pub struct Store { engine: Arc, raft_nodes: tokio::sync::RwLock>>, regions: tokio::sync::RwLock>, - config: Arc, // Use Config + config: Arc, // Use Config pub namespace_manager: Arc, // Make public peer_addrs: Arc>, } @@ -21,7 +21,7 @@ impl Store { pub fn new( store_id: u64, engine: Arc, - config: Arc, // Use Config + config: Arc, // Use Config namespace_manager: Arc, // New argument peer_addrs: Arc>, ) -> Self { diff --git a/flaredb/crates/flaredb-sql/src/executor.rs b/flaredb/crates/flaredb-sql/src/executor.rs index efb45c4..0b3b1f2 100644 --- a/flaredb/crates/flaredb-sql/src/executor.rs +++ b/flaredb/crates/flaredb-sql/src/executor.rs @@ -1,6 +1,6 @@ -use crate::error::{Result, SqlError}; +use crate::error::Result; use crate::metadata::MetadataManager; -use crate::parser::{SqlStatement, parse_sql}; +use crate::parser::{parse_sql, SqlStatement}; use crate::storage::StorageManager; use crate::types::QueryResult; use flaredb_client::RdbClient; @@ -93,12 +93,31 @@ impl SqlExecutor { Ok(ExecutionResult::Query(result)) } - SqlStatement::Update { .. } => { - Err(SqlError::InvalidOperation("UPDATE not yet implemented".to_string())) + SqlStatement::Update { + table_name, + assignments, + where_clause, + } => { + let table = self.metadata_manager.get_table_metadata(table_name).await?; + let updated = self + .storage_manager + .update_rows(&table, assignments, where_clause.as_ref()) + .await?; + + Ok(ExecutionResult::DmlSuccess(updated)) } - SqlStatement::Delete { .. } => { - Err(SqlError::InvalidOperation("DELETE not yet implemented".to_string())) + SqlStatement::Delete { + table_name, + where_clause, + } => { + let table = self.metadata_manager.get_table_metadata(table_name).await?; + let deleted = self + .storage_manager + .delete_rows(&table, where_clause.as_ref()) + .await?; + + Ok(ExecutionResult::DmlSuccess(deleted)) } } } @@ -111,7 +130,9 @@ mod tests { #[tokio::test] #[ignore] // Requires FlareDB server async fn test_create_and_query_table() { - let client = RdbClient::connect_direct("127.0.0.1:8001".to_string(), "sqltest".to_string()).await.unwrap(); + let client = RdbClient::connect_direct("127.0.0.1:8001".to_string(), "sqltest".to_string()) + .await + .unwrap(); let executor = SqlExecutor::new(Arc::new(Mutex::new(client))); // Create table diff --git a/flaredb/crates/flaredb-sql/src/parser.rs b/flaredb/crates/flaredb-sql/src/parser.rs index 6c21d08..4d7c48c 100644 --- a/flaredb/crates/flaredb-sql/src/parser.rs +++ b/flaredb/crates/flaredb-sql/src/parser.rs @@ -1,8 +1,6 @@ use crate::error::{Result, SqlError}; use crate::types::{ColumnDef, DataType, Value}; -use sqlparser::ast::{ - ColumnDef as AstColumnDef, DataType as AstDataType, Expr, Statement, -}; +use sqlparser::ast::{ColumnDef as AstColumnDef, DataType as AstDataType, Expr, Statement}; use sqlparser::dialect::GenericDialect; use sqlparser::parser::Parser; @@ -85,7 +83,9 @@ fn parse_statement(stmt: &Statement) -> Result { Statement::CreateTable { .. } => parse_create_table(stmt), Statement::Drop { names, .. } => { if names.len() != 1 { - return Err(SqlError::ParseError("Expected single table name".to_string())); + return Err(SqlError::ParseError( + "Expected single table name".to_string(), + )); } Ok(SqlStatement::DropTable { table_name: names[0].to_string(), @@ -93,12 +93,8 @@ fn parse_statement(stmt: &Statement) -> Result { } Statement::Insert { .. } => parse_insert(stmt), Statement::Query(query) => parse_select(query), - Statement::Update { .. } => { - Err(SqlError::ParseError("UPDATE not yet implemented".to_string())) - } - Statement::Delete { .. } => { - Err(SqlError::ParseError("DELETE not yet implemented".to_string())) - } + Statement::Update { .. } => parse_update(stmt), + Statement::Delete { .. } => parse_delete(stmt), _ => Err(SqlError::ParseError(format!( "Unsupported statement: {:?}", stmt @@ -107,8 +103,16 @@ fn parse_statement(stmt: &Statement) -> Result { } fn parse_create_table(stmt: &Statement) -> Result { - let Statement::CreateTable { name, columns: col_defs, constraints, .. } = stmt else { - return Err(SqlError::ParseError("Expected CREATE TABLE statement".to_string())); + let Statement::CreateTable { + name, + columns: col_defs, + constraints, + .. + } = stmt + else { + return Err(SqlError::ParseError( + "Expected CREATE TABLE statement".to_string(), + )); }; let table_name = name.to_string(); @@ -122,7 +126,12 @@ fn parse_create_table(stmt: &Statement) -> Result { // Extract primary key from constraints for constraint in constraints { - if let sqlparser::ast::TableConstraint::Unique { columns: pk_cols, is_primary: true, .. } = constraint { + if let sqlparser::ast::TableConstraint::Unique { + columns: pk_cols, + is_primary: true, + .. + } = constraint + { for pk_col in pk_cols { primary_key.push(pk_col.value.to_string()); } @@ -133,7 +142,10 @@ fn parse_create_table(stmt: &Statement) -> Result { if primary_key.is_empty() { for column in col_defs { for option in &column.options { - if matches!(option.option, sqlparser::ast::ColumnOption::Unique { is_primary: true }) { + if matches!( + option.option, + sqlparser::ast::ColumnOption::Unique { is_primary: true } + ) { primary_key.push(column.name.value.to_string()); break; } @@ -142,9 +154,7 @@ fn parse_create_table(stmt: &Statement) -> Result { } if primary_key.is_empty() { - return Err(SqlError::ParseError( - "PRIMARY KEY is required".to_string(), - )); + return Err(SqlError::ParseError("PRIMARY KEY is required".to_string())); } Ok(SqlStatement::CreateTable { @@ -196,8 +206,16 @@ fn parse_data_type(dt: &AstDataType) -> Result { } fn parse_insert(stmt: &Statement) -> Result { - let Statement::Insert { table_name, columns: col_idents, source, .. } = stmt else { - return Err(SqlError::ParseError("Expected INSERT statement".to_string())); + let Statement::Insert { + table_name, + columns: col_idents, + source, + .. + } = stmt + else { + return Err(SqlError::ParseError( + "Expected INSERT statement".to_string(), + )); }; let table_name = table_name.to_string(); @@ -276,6 +294,145 @@ fn parse_select(query: &sqlparser::ast::Query) -> Result { } } +fn parse_update(stmt: &Statement) -> Result { + let Statement::Update { + table, + assignments, + from, + selection, + returning, + } = stmt + else { + return Err(SqlError::ParseError( + "Expected UPDATE statement".to_string(), + )); + }; + + if from.is_some() { + return Err(SqlError::ParseError( + "UPDATE ... FROM is not supported".to_string(), + )); + } + if returning.is_some() { + return Err(SqlError::ParseError( + "UPDATE ... RETURNING is not supported".to_string(), + )); + } + if !table.joins.is_empty() { + return Err(SqlError::ParseError( + "UPDATE with joins is not supported".to_string(), + )); + } + + let table_name = match &table.relation { + sqlparser::ast::TableFactor::Table { name, .. } => name.to_string(), + _ => { + return Err(SqlError::ParseError( + "Complex UPDATE targets are not supported".to_string(), + )) + } + }; + + let mut parsed_assignments = Vec::with_capacity(assignments.len()); + for assignment in assignments { + if assignment.id.len() != 1 { + return Err(SqlError::ParseError( + "Only simple column assignments are supported".to_string(), + )); + } + let column = assignment.id[0].value.to_string(); + let value = parse_expr_as_value(&assignment.value)?; + parsed_assignments.push((column, value)); + } + + let where_clause = if let Some(expr) = selection { + Some(parse_where_expr(expr)?) + } else { + None + }; + + Ok(SqlStatement::Update { + table_name, + assignments: parsed_assignments, + where_clause, + }) +} + +fn parse_delete(stmt: &Statement) -> Result { + let Statement::Delete { + tables, + from, + using, + selection, + returning, + order_by, + limit, + } = stmt + else { + return Err(SqlError::ParseError( + "Expected DELETE statement".to_string(), + )); + }; + + if !tables.is_empty() { + return Err(SqlError::ParseError( + "Multi-table DELETE is not supported".to_string(), + )); + } + if using.is_some() { + return Err(SqlError::ParseError( + "DELETE ... USING is not supported".to_string(), + )); + } + if returning.is_some() { + return Err(SqlError::ParseError( + "DELETE ... RETURNING is not supported".to_string(), + )); + } + if !order_by.is_empty() { + return Err(SqlError::ParseError( + "DELETE ... ORDER BY is not supported".to_string(), + )); + } + if limit.is_some() { + return Err(SqlError::ParseError( + "DELETE ... LIMIT is not supported".to_string(), + )); + } + if from.len() != 1 { + return Err(SqlError::ParseError( + "DELETE must target exactly one table".to_string(), + )); + } + + let table = &from[0]; + if !table.joins.is_empty() { + return Err(SqlError::ParseError( + "DELETE with joins is not supported".to_string(), + )); + } + + let table_name = match &table.relation { + sqlparser::ast::TableFactor::Table { name, .. } => name.to_string(), + _ => { + return Err(SqlError::ParseError( + "Complex DELETE targets are not supported".to_string(), + )) + } + }; + + let where_clause = if let Some(expr) = selection { + Some(parse_where_expr(expr)?) + } else { + None + }; + + Ok(SqlStatement::Delete { + table_name, + where_clause, + }) +} + fn parse_where_expr(expr: &Expr) -> Result { match expr { Expr::BinaryOp { left, op, right } => { @@ -356,3 +513,64 @@ fn parse_expr_as_value(expr: &Expr) -> Result { ))), } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parses_update_statement() { + let parsed = parse_sql("UPDATE users SET name = 'Bob', active = true WHERE id = 7") + .expect("update should parse"); + + match parsed { + SqlStatement::Update { + table_name, + assignments, + where_clause, + } => { + assert_eq!(table_name, "users"); + assert_eq!( + assignments, + vec![ + ("name".to_string(), Value::Text("Bob".to_string())), + ("active".to_string(), Value::Boolean(true)), + ] + ); + assert!(matches!( + where_clause, + Some(WhereClause::Comparison { + column, + op: ComparisonOp::Eq, + value: Value::Integer(7), + }) if column == "id" + )); + } + other => panic!("expected update statement, got {:?}", other), + } + } + + #[test] + fn parses_delete_statement() { + let parsed = + parse_sql("DELETE FROM users WHERE name = 'Alice'").expect("delete should parse"); + + match parsed { + SqlStatement::Delete { + table_name, + where_clause, + } => { + assert_eq!(table_name, "users"); + assert!(matches!( + where_clause, + Some(WhereClause::Comparison { + column, + op: ComparisonOp::Eq, + value: Value::Text(value), + }) if column == "name" && value == "Alice" + )); + } + other => panic!("expected delete statement, got {:?}", other), + } + } +} diff --git a/flaredb/crates/flaredb-sql/src/storage.rs b/flaredb/crates/flaredb-sql/src/storage.rs index 99e47a8..b0aca53 100644 --- a/flaredb/crates/flaredb-sql/src/storage.rs +++ b/flaredb/crates/flaredb-sql/src/storage.rs @@ -13,6 +13,12 @@ pub struct StorageManager { client: Arc>, } +struct ScannedRow { + key: Vec, + row: RowData, + version: u64, +} + impl StorageManager { pub fn new(client: Arc>) -> Self { Self { client } @@ -52,9 +58,9 @@ impl StorageManager { .primary_key .iter() .map(|pk_col| { - row_data - .get(pk_col) - .ok_or_else(|| SqlError::PrimaryKeyViolation(format!("Missing primary key column: {}", pk_col))) + row_data.get(pk_col).ok_or_else(|| { + SqlError::PrimaryKeyViolation(format!("Missing primary key column: {}", pk_col)) + }) }) .collect(); let pk_values = pk_values?; @@ -66,8 +72,8 @@ impl StorageManager { let row = RowData::new(row_data); // Serialize row - let value = bincode::serialize(&row) - .map_err(|e| SqlError::SerializationError(e.to_string()))?; + let value = + bincode::serialize(&row).map_err(|e| SqlError::SerializationError(e.to_string()))?; // Store in KVS using CAS (version 0 for new row) let mut client = self.client.lock().await; @@ -77,7 +83,9 @@ impl StorageManager { .map_err(|e| SqlError::KvsError(e.to_string()))?; if !success { - return Err(SqlError::PrimaryKeyViolation("Row with this primary key already exists".to_string())); + return Err(SqlError::PrimaryKeyViolation( + "Row with this primary key already exists".to_string(), + )); } Ok(()) @@ -109,21 +117,9 @@ impl StorageManager { let mut result = QueryResult::new(result_columns.clone()); - // Scan all rows for this table - let start_key = Self::encode_table_prefix(table.table_id); - let end_key = Self::encode_table_prefix(table.table_id + 1); - - let mut client = self.client.lock().await; - let (entries, _next_key) = client - .cas_scan(start_key, end_key, 1000) - .await - .map_err(|e| SqlError::KvsError(e.to_string()))?; - // Process each row - for (_key, value_bytes, _version) in entries { - let row: RowData = bincode::deserialize(&value_bytes) - .map_err(|e| SqlError::SerializationError(e.to_string()))?; - + for scanned in self.scan_table_rows(table).await? { + let row = scanned.row; // Apply WHERE filter if let Some(where_clause) = where_clause { if !Self::evaluate_where(&row, where_clause)? { @@ -134,10 +130,7 @@ impl StorageManager { // Extract requested columns let mut row_values = Vec::new(); for col_name in &result_columns { - let value = row - .get(col_name) - .cloned() - .unwrap_or(Value::Null); + let value = row.get(col_name).cloned().unwrap_or(Value::Null); row_values.push(value); } @@ -147,6 +140,138 @@ impl StorageManager { Ok(result) } + /// Update rows in a table + pub async fn update_rows( + &self, + table: &TableMetadata, + assignments: &[(String, Value)], + where_clause: Option<&WhereClause>, + ) -> Result { + for (column, _) in assignments { + if table.get_column(column).is_none() { + return Err(SqlError::ColumnNotFound( + column.clone(), + table.table_name.clone(), + )); + } + } + + let mut updated = 0u64; + for scanned in self.scan_table_rows(table).await? { + if let Some(where_clause) = where_clause { + if !Self::evaluate_where(&scanned.row, where_clause)? { + continue; + } + } + + let mut row = scanned.row; + for (column, value) in assignments { + row.set(column.clone(), value.clone()); + } + + let pk_values = Self::primary_key_values(table, &row)?; + let new_key = Self::encode_row_key(table.table_id, &pk_values)?; + let value = Self::serialize_row(&row)?; + + if new_key == scanned.key { + let mut client = self.client.lock().await; + let (success, _current_version, _new_version) = client + .cas(scanned.key.clone(), value, scanned.version) + .await + .map_err(|e| SqlError::KvsError(e.to_string()))?; + + if !success { + return Err(SqlError::InvalidOperation( + "Concurrent row update detected".to_string(), + )); + } + } else { + let mut client = self.client.lock().await; + let (created, _current_version, new_version) = client + .cas(new_key.clone(), value, 0) + .await + .map_err(|e| SqlError::KvsError(e.to_string()))?; + if !created { + return Err(SqlError::PrimaryKeyViolation( + "Row with this primary key already exists".to_string(), + )); + } + + let (deleted, _current_version, _existed) = client + .cas_delete(scanned.key.clone(), scanned.version) + .await + .map_err(|e| SqlError::KvsError(e.to_string()))?; + if !deleted { + let _ = client.cas_delete(new_key, new_version).await; + return Err(SqlError::InvalidOperation( + "Concurrent row update detected".to_string(), + )); + } + } + + updated += 1; + } + + Ok(updated) + } + + /// Delete rows from a table + pub async fn delete_rows( + &self, + table: &TableMetadata, + where_clause: Option<&WhereClause>, + ) -> Result { + let mut deleted = 0u64; + for scanned in self.scan_table_rows(table).await? { + if let Some(where_clause) = where_clause { + if !Self::evaluate_where(&scanned.row, where_clause)? { + continue; + } + } + + let mut client = self.client.lock().await; + let (success, _current_version, _existed) = client + .cas_delete(scanned.key, scanned.version) + .await + .map_err(|e| SqlError::KvsError(e.to_string()))?; + if !success { + return Err(SqlError::InvalidOperation( + "Concurrent row delete detected".to_string(), + )); + } + deleted += 1; + } + + Ok(deleted) + } + + async fn scan_table_rows(&self, table: &TableMetadata) -> Result> { + let (mut next_key, end_key) = Self::encode_table_scan_range(table.table_id); + let mut rows = Vec::new(); + let mut client = self.client.lock().await; + + loop { + let (entries, continuation) = client + .cas_scan(next_key.clone(), end_key.clone(), 1000) + .await + .map_err(|e| SqlError::KvsError(e.to_string()))?; + + for (key, value_bytes, version) in entries { + let row: RowData = bincode::deserialize(&value_bytes) + .map_err(|e| SqlError::SerializationError(e.to_string()))?; + rows.push(ScannedRow { key, row, version }); + } + + if let Some(continuation) = continuation { + next_key = continuation; + } else { + break; + } + } + + Ok(rows) + } + /// Evaluate WHERE clause against a row fn evaluate_where(row: &RowData, where_clause: &WhereClause) -> Result { match where_clause { @@ -213,4 +338,57 @@ impl StorageManager { fn encode_table_prefix(table_id: u32) -> Vec { format!("{}:{}:", DATA_PREFIX, table_id).into_bytes() } + + fn encode_table_scan_range(table_id: u32) -> (Vec, Vec) { + let start = Self::encode_table_prefix(table_id); + let end = Self::prefix_end(&start); + (start, end) + } + + fn prefix_end(prefix: &[u8]) -> Vec { + let mut end_key = prefix.to_vec(); + if let Some(last) = end_key.last_mut() { + if *last == 0xff { + end_key.push(0x00); + } else { + *last += 1; + } + } else { + end_key.push(0xff); + } + end_key + } + + fn primary_key_values<'a>( + table: &'a TableMetadata, + row: &'a RowData, + ) -> Result> { + table + .primary_key + .iter() + .map(|pk_col| { + row.get(pk_col).ok_or_else(|| { + SqlError::PrimaryKeyViolation(format!("Missing primary key column: {}", pk_col)) + }) + }) + .collect() + } + + fn serialize_row(row: &RowData) -> Result> { + bincode::serialize(row).map_err(|e| SqlError::SerializationError(e.to_string())) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn table_scan_range_uses_prefix_successor() { + let (start, end) = StorageManager::encode_table_scan_range(9); + assert_eq!(start, b"__sql_data:9:".to_vec()); + assert_eq!(end, b"__sql_data:9;".to_vec()); + assert!(b"__sql_data:9:1".to_vec() < end); + assert!(b"__sql_data:10:1".to_vec() < start); + } } diff --git a/lightningstor/crates/lightningstor-server/src/object_service.rs b/lightningstor/crates/lightningstor-server/src/object_service.rs index d323ef6..92b822d 100644 --- a/lightningstor/crates/lightningstor-server/src/object_service.rs +++ b/lightningstor/crates/lightningstor-server/src/object_service.rs @@ -170,7 +170,13 @@ impl ObjectServiceImpl { end: usize, ) -> ::GetObjectStream { let storage = self.storage.clone(); - let state = (storage, upload, Some(self.object_to_proto(object)), 0usize, 0u64); + let state = ( + storage, + upload, + Some(self.object_to_proto(object)), + 0usize, + 0u64, + ); let range_start = start as u64; let range_end = end as u64; let object_size = object.size; @@ -327,6 +333,41 @@ impl ObjectServiceImpl { .await } + async fn load_full_object_bytes(&self, object: &Object) -> Result { + if let Some(upload) = self + .metadata + .load_object_multipart_upload(&object.id) + .await + .map_err(Self::to_status)? + { + let mut body = BytesMut::new(); + for part in &upload.parts { + let bytes = self + .storage + .get_part(upload.upload_id.as_str(), part.part_number.as_u32()) + .await + .map_err(|e| { + Status::internal(format!("Failed to retrieve multipart object part: {}", e)) + })?; + body.extend_from_slice(bytes.as_ref()); + } + if body.len() as u64 != object.size { + return Err(Status::internal(format!( + "Multipart object {} has inconsistent size: expected {}, got {}", + object.id, + object.size, + body.len() + ))); + } + Ok(body.freeze()) + } else { + self.storage + .get_object(&object.id) + .await + .map_err(|e| Status::internal(format!("Failed to retrieve object: {}", e))) + } + } + fn multipart_lock(&self, upload_id: &str) -> Arc> { self.multipart_locks .entry(upload_id.to_string()) @@ -613,9 +654,99 @@ impl ObjectService for ObjectServiceImpl { async fn copy_object( &self, - _request: Request, + request: Request, ) -> Result, Status> { - Err(Status::unimplemented("CopyObject not yet implemented")) + let tenant = get_tenant_context(&request)?; + let req = request.into_inner(); + + if req.source_bucket.is_empty() { + return Err(Status::invalid_argument("source_bucket is required")); + } + if req.source_key.is_empty() { + return Err(Status::invalid_argument("source_key is required")); + } + if req.dest_bucket.is_empty() { + return Err(Status::invalid_argument("dest_bucket is required")); + } + if req.dest_key.is_empty() { + return Err(Status::invalid_argument("dest_key is required")); + } + + let source_bucket = self + .load_bucket_for_tenant(&tenant, &req.source_bucket) + .await?; + self.authorize_object_action( + &tenant, + ACTION_OBJECTS_READ, + &source_bucket, + &req.source_key, + ) + .await?; + + let dest_bucket = self + .load_bucket_for_tenant(&tenant, &req.dest_bucket) + .await?; + self.authorize_object_action(&tenant, ACTION_OBJECTS_CREATE, &dest_bucket, &req.dest_key) + .await?; + + let source_bucket_id: BucketId = BucketId::from_str(&source_bucket.id.to_string()) + .map_err(|_| Status::internal("Invalid source bucket ID"))?; + let dest_key = ObjectKey::new(&req.dest_key) + .map_err(|e| Status::invalid_argument(format!("Invalid destination key: {}", e)))?; + + let source_version_id = if req.source_version_id.is_empty() { + None + } else { + Some(req.source_version_id.as_str()) + }; + let source_object = self + .metadata + .load_object(&source_bucket_id, &req.source_key, source_version_id) + .await + .map_err(Self::to_status)? + .ok_or_else(|| Status::not_found(format!("Object {} not found", req.source_key)))?; + + if source_object.is_delete_marker { + return Err(Status::not_found("Source object is a delete marker")); + } + + let data = self.load_full_object_bytes(&source_object).await?; + let object_metadata = if req.metadata_directive_replace { + Self::proto_metadata_to_object_metadata(req.metadata) + } else { + source_object.metadata.clone() + }; + + let mut dest_object = Object::new( + dest_bucket.id.to_string(), + dest_key, + source_object.etag.clone(), + source_object.size, + object_metadata.content_type.clone(), + ); + dest_object.metadata = object_metadata; + dest_object.storage_class = source_object.storage_class.clone(); + if dest_bucket.versioning == lightningstor_types::Versioning::Enabled { + dest_object.version = ObjectVersion::new(); + } + + self.storage + .put_object(&dest_object.id, data) + .await + .map_err(|e| Status::internal(format!("Failed to store copied object: {}", e)))?; + self.metadata + .save_object(&dest_object) + .await + .map_err(Self::to_status)?; + + Ok(Response::new(CopyObjectResponse { + etag: dest_object.etag.as_str().to_string(), + version_id: dest_object.version.as_str().to_string(), + last_modified: Some(prost_types::Timestamp { + seconds: dest_object.last_modified.timestamp(), + nanos: dest_object.last_modified.timestamp_subsec_nanos() as i32, + }), + })) } async fn list_objects( @@ -801,10 +932,14 @@ impl ObjectService for ObjectServiceImpl { while let Some(chunk) = stream.message().await? { if !chunk.bucket.is_empty() && chunk.bucket != first.bucket { - return Err(Status::invalid_argument("bucket changed within UploadPart stream")); + return Err(Status::invalid_argument( + "bucket changed within UploadPart stream", + )); } if !chunk.key.is_empty() && chunk.key != first.key { - return Err(Status::invalid_argument("key changed within UploadPart stream")); + return Err(Status::invalid_argument( + "key changed within UploadPart stream", + )); } if !chunk.upload_id.is_empty() && chunk.upload_id != first.upload_id { return Err(Status::invalid_argument( @@ -922,7 +1057,10 @@ impl ObjectService for ObjectServiceImpl { selected_parts.push(part.clone()); } - let etags: Vec = selected_parts.iter().map(|part| part.etag.clone()).collect(); + let etags: Vec = selected_parts + .iter() + .map(|part| part.etag.clone()) + .collect(); let multipart_etag = ETag::multipart(&etags, selected_parts.len()); upload.parts = selected_parts; @@ -1018,7 +1156,11 @@ impl ObjectService for ObjectServiceImpl { )); } - let max_parts = if req.max_parts > 0 { req.max_parts } else { 1000 }; + let max_parts = if req.max_parts > 0 { + req.max_parts + } else { + 1000 + }; let remaining_count = upload .parts .iter() @@ -1064,7 +1206,11 @@ impl ObjectService for ObjectServiceImpl { let bucket_id: BucketId = BucketId::from_str(&bucket.id.to_string()) .map_err(|_| Status::internal("Invalid bucket ID"))?; - let max_uploads = if req.max_uploads > 0 { req.max_uploads } else { 1000 }; + let max_uploads = if req.max_uploads > 0 { + req.max_uploads + } else { + 1000 + }; let uploads = self .metadata .list_multipart_uploads(&bucket_id, &req.prefix, max_uploads) diff --git a/nix/test-cluster/run-cluster.sh b/nix/test-cluster/run-cluster.sh index 23c900f..d76865f 100755 --- a/nix/test-cluster/run-cluster.sh +++ b/nix/test-cluster/run-cluster.sh @@ -82,6 +82,7 @@ PLASMAVMC_PROTO_DIR="${REPO_ROOT}/plasmavmc/proto" PLASMAVMC_PROTO="${PLASMAVMC_PROTO_DIR}/plasmavmc.proto" FLAREDB_PROTO_DIR="${REPO_ROOT}/flaredb/crates/flaredb-proto/src" FLAREDB_PROTO="${FLAREDB_PROTO_DIR}/kvrpc.proto" +FLAREDB_SQL_PROTO="${FLAREDB_PROTO_DIR}/sqlrpc.proto" # shellcheck disable=SC2034 NODE_PHASES=( @@ -2507,6 +2508,7 @@ ensure_flaredb_proto_on_node() { ssh_node "${node}" "install -d -m 0755 ${proto_root}" scp_to_node "${node}" "${FLAREDB_PROTO}" "${proto_root}/kvrpc.proto" + scp_to_node "${node}" "${FLAREDB_SQL_PROTO}" "${proto_root}/sqlrpc.proto" } vm_runtime_dir_path() { @@ -3072,6 +3074,74 @@ while true; do fi sleep 1 done +EOS + + log "Validating FlareDB SQL DDL/DML execution" + ssh_node_script node01 "${flaredb_proto_root}" "10.100.0.11:2479" <<'EOS' +set -euo pipefail +proto_root="$1" +sql_target="$2" +sql_namespace="validation-sql-$(date +%s)" + +grpcurl -plaintext \ + -import-path "${proto_root}" \ + -proto "${proto_root}/sqlrpc.proto" \ + -d "$(jq -cn --arg namespace "${sql_namespace}" --arg sql "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT NOT NULL, active BOOLEAN)" '{namespace:$namespace, sql:$sql}')" \ + "${sql_target}" sqlrpc.SqlService/Execute \ + | jq -e '.ddlResult.message | contains("created")' >/dev/null + +grpcurl -plaintext \ + -import-path "${proto_root}" \ + -proto "${proto_root}/sqlrpc.proto" \ + -d "$(jq -cn --arg namespace "${sql_namespace}" --arg sql "INSERT INTO users (id, name, active) VALUES (1, 'Alice', true)" '{namespace:$namespace, sql:$sql}')" \ + "${sql_target}" sqlrpc.SqlService/Execute \ + | jq -e '(.dmlResult.rowsAffected | tonumber) == 1' >/dev/null + +grpcurl -plaintext \ + -import-path "${proto_root}" \ + -proto "${proto_root}/sqlrpc.proto" \ + -d "$(jq -cn --arg namespace "${sql_namespace}" --arg sql "INSERT INTO users (id, name, active) VALUES (2, 'Bob', false)" '{namespace:$namespace, sql:$sql}')" \ + "${sql_target}" sqlrpc.SqlService/Execute \ + | jq -e '(.dmlResult.rowsAffected | tonumber) == 1' >/dev/null + +grpcurl -plaintext \ + -import-path "${proto_root}" \ + -proto "${proto_root}/sqlrpc.proto" \ + -d "$(jq -cn --arg namespace "${sql_namespace}" --arg sql "UPDATE users SET name = 'Carol', active = false WHERE id = 1" '{namespace:$namespace, sql:$sql}')" \ + "${sql_target}" sqlrpc.SqlService/Execute \ + | jq -e '(.dmlResult.rowsAffected | tonumber) == 1' >/dev/null + +grpcurl -plaintext \ + -import-path "${proto_root}" \ + -proto "${proto_root}/sqlrpc.proto" \ + -d "$(jq -cn --arg namespace "${sql_namespace}" --arg sql "SELECT id, name, active FROM users WHERE id = 1" '{namespace:$namespace, sql:$sql}')" \ + "${sql_target}" sqlrpc.SqlService/Execute \ + | jq -e ' + .queryResult.columns == ["id", "name", "active"] and + (.queryResult.rows | length) == 1 and + .queryResult.rows[0].values[0].intValue == "1" and + .queryResult.rows[0].values[1].textValue == "Carol" and + ((.queryResult.rows[0].values[2].boolValue // false) == false) + ' >/dev/null + +grpcurl -plaintext \ + -import-path "${proto_root}" \ + -proto "${proto_root}/sqlrpc.proto" \ + -d "$(jq -cn --arg namespace "${sql_namespace}" --arg sql "DELETE FROM users WHERE id = 2" '{namespace:$namespace, sql:$sql}')" \ + "${sql_target}" sqlrpc.SqlService/Execute \ + | jq -e '(.dmlResult.rowsAffected | tonumber) == 1' >/dev/null + +grpcurl -plaintext \ + -import-path "${proto_root}" \ + -proto "${proto_root}/sqlrpc.proto" \ + -d "$(jq -cn --arg namespace "${sql_namespace}" --arg sql "SELECT id, name FROM users" '{namespace:$namespace, sql:$sql}')" \ + "${sql_target}" sqlrpc.SqlService/Execute \ + | jq -e ' + .queryResult.columns == ["id", "name"] and + (.queryResult.rows | length) == 1 and + .queryResult.rows[0].values[0].intValue == "1" and + .queryResult.rows[0].values[1].textValue == "Carol" + ' >/dev/null EOS } @@ -3328,7 +3398,7 @@ validate_fiberlb_flow() { local org_id="fiberlb-smoke-org" local project_id="fiberlb-smoke-project" local principal_id="fiberlb-smoke-$(date +%s)" - local token lb_id pool_id backend_id listener_id listener_port + local token lb_id pool_id health_check_id backend_id listener_id listener_port token="$(issue_project_admin_token 15080 "${org_id}" "${project_id}" "${principal_id}")" listener_port=$((18080 + (RANDOM % 100))) @@ -3350,6 +3420,39 @@ validate_fiberlb_flow() { | jq -r '.pool.id')" [[ -n "${pool_id}" && "${pool_id}" != "null" ]] || die "FiberLB CreatePool did not return an ID" + health_check_id="$(grpcurl -plaintext \ + -H "authorization: Bearer ${token}" \ + -import-path "${FIBERLB_PROTO_DIR}" \ + -proto "${FIBERLB_PROTO}" \ + -d "$(jq -cn --arg name "fiberlb-smoke-health" --arg pool "${pool_id}" '{name:$name, poolId:$pool, type:"HEALTH_CHECK_TYPE_HTTP", intervalSeconds:10, timeoutSeconds:3, healthyThreshold:2, unhealthyThreshold:2, httpConfig:{method:"GET", path:"/health", expectedCodes:[200]}}')" \ + 127.0.0.1:15085 fiberlb.v1.HealthCheckService/CreateHealthCheck \ + | jq -r '.healthCheck.id')" + [[ -n "${health_check_id}" && "${health_check_id}" != "null" ]] || die "FiberLB CreateHealthCheck did not return an ID" + + grpcurl -plaintext \ + -H "authorization: Bearer ${token}" \ + -import-path "${FIBERLB_PROTO_DIR}" \ + -proto "${FIBERLB_PROTO}" \ + -d "$(jq -cn --arg id "${health_check_id}" '{id:$id}')" \ + 127.0.0.1:15085 fiberlb.v1.HealthCheckService/GetHealthCheck \ + | jq -e --arg id "${health_check_id}" '.healthCheck.id == $id and .healthCheck.httpConfig.path == "/health"' >/dev/null + + grpcurl -plaintext \ + -H "authorization: Bearer ${token}" \ + -import-path "${FIBERLB_PROTO_DIR}" \ + -proto "${FIBERLB_PROTO}" \ + -d "$(jq -cn --arg pool "${pool_id}" '{poolId:$pool}')" \ + 127.0.0.1:15085 fiberlb.v1.HealthCheckService/ListHealthChecks \ + | jq -e --arg id "${health_check_id}" '(.healthChecks | length) == 1 and .healthChecks[0].id == $id' >/dev/null + + grpcurl -plaintext \ + -H "authorization: Bearer ${token}" \ + -import-path "${FIBERLB_PROTO_DIR}" \ + -proto "${FIBERLB_PROTO}" \ + -d "$(jq -cn --arg id "${health_check_id}" '{id:$id, name:"fiberlb-smoke-health-updated", intervalSeconds:15, timeoutSeconds:4, healthyThreshold:3, unhealthyThreshold:4, httpConfig:{method:"GET", path:"/readyz", expectedCodes:[200,204]}, enabled:false}')" \ + 127.0.0.1:15085 fiberlb.v1.HealthCheckService/UpdateHealthCheck \ + | jq -e '.healthCheck.name == "fiberlb-smoke-health-updated" and .healthCheck.httpConfig.path == "/readyz" and ((.healthCheck.enabled // false) == false)' >/dev/null + backend_id="$(grpcurl -plaintext \ -H "authorization: Bearer ${token}" \ -import-path "${FIBERLB_PROTO_DIR}" \ @@ -3442,6 +3545,12 @@ validate_fiberlb_flow() { -proto "${FIBERLB_PROTO}" \ -d "$(jq -cn --arg id "${backend_id}" '{id:$id}')" \ 127.0.0.1:15085 fiberlb.v1.BackendService/DeleteBackend >/dev/null + grpcurl -plaintext \ + -H "authorization: Bearer ${token}" \ + -import-path "${FIBERLB_PROTO_DIR}" \ + -proto "${FIBERLB_PROTO}" \ + -d "$(jq -cn --arg id "${health_check_id}" '{id:$id}')" \ + 127.0.0.1:15085 fiberlb.v1.HealthCheckService/DeleteHealthCheck >/dev/null grpcurl -plaintext \ -H "authorization: Bearer ${token}" \ -import-path "${FIBERLB_PROTO_DIR}" \ @@ -3798,11 +3907,12 @@ validate_lightningstor_distributed_storage() { read -r before_node01 before_node04 before_node05 < <(lightningstor_count_triplet) local key="replication-check-$(date +%s)" + local copy_key="${key}-copy" local body="distributed-object-${key}" local body_b64 body_b64="$(printf '%s' "${body}" | base64 -w0)" - local put_json head_json delete_json output + local put_json head_json copy_head_json delete_json copy_delete_json output put_json="$( jq -cn \ --arg bucket "${bucket}" \ @@ -3849,7 +3959,54 @@ validate_lightningstor_distributed_storage() { wait_for_lightningstor_counts_greater_than "${before_node01}" "${before_node04}" "${before_node05}" "generic object replication" + local copy_json copied_body + copy_json="$( + jq -cn \ + --arg source_bucket "${bucket}" \ + --arg source_key "${key}" \ + --arg dest_bucket "${bucket}" \ + --arg dest_key "${copy_key}" \ + '{sourceBucket:$source_bucket, sourceKey:$source_key, destBucket:$dest_bucket, destKey:$dest_key, metadataDirectiveReplace:true, metadata:{contentType:"text/copied", userMetadata:{copied:"yes"}}}' + )" + log "LightningStor distributed replication: COPY ${bucket}/${key} -> ${bucket}/${copy_key}" + output="$( + grpcurl_capture -plaintext \ + -H "authorization: Bearer ${token}" \ + -import-path "${LIGHTNINGSTOR_PROTO_DIR}" \ + -proto "${LIGHTNINGSTOR_PROTO}" \ + -d "${copy_json}" \ + 127.0.0.1:15086 lightningstor.v1.ObjectService/CopyObject + )" || die "failed to copy LightningStor distributed replication probe ${bucket}/${key}: ${output}" + printf '%s\n' "${output}" | jq -e '.etag != "" and .versionId != ""' >/dev/null \ + || die "LightningStor copy response was incomplete: ${output}" + + copy_head_json="$(jq -cn --arg bucket "${bucket}" --arg key "${copy_key}" '{bucket:$bucket, key:$key}')" + output="$( + grpcurl_capture -plaintext \ + -H "authorization: Bearer ${token}" \ + -import-path "${LIGHTNINGSTOR_PROTO_DIR}" \ + -proto "${LIGHTNINGSTOR_PROTO}" \ + -d "${copy_head_json}" \ + 127.0.0.1:15086 lightningstor.v1.ObjectService/HeadObject + )" || die "failed to head copied LightningStor object ${bucket}/${copy_key}: ${output}" + printf '%s\n' "${output}" \ + | jq -e --arg size "$(printf '%s' "${body}" | wc -c | awk '{print $1}')" '.object.metadata.contentType == "text/copied" and (.object.size | tonumber) == ($size | tonumber)' >/dev/null \ + || die "LightningStor copied object returned unexpected metadata: ${output}" + + output="$( + grpcurl_capture -plaintext \ + -H "authorization: Bearer ${token}" \ + -import-path "${LIGHTNINGSTOR_PROTO_DIR}" \ + -proto "${LIGHTNINGSTOR_PROTO}" \ + -d "${copy_head_json}" \ + 127.0.0.1:15086 lightningstor.v1.ObjectService/GetObject + )" || die "failed to fetch copied LightningStor object ${bucket}/${copy_key}: ${output}" + copied_body="$(printf '%s\n' "${output}" | jq -rsr '[.[] | .bodyChunk? | select(. != null) | @base64d] | join("")')" \ + || die "failed to decode copied LightningStor object ${bucket}/${copy_key}: ${output}" + [[ "${copied_body}" == "${body}" ]] || die "copied LightningStor object payload did not match source" + delete_json="$(jq -cn --arg bucket "${bucket}" --arg key "${key}" '{bucket:$bucket, key:$key}')" + copy_delete_json="$(jq -cn --arg bucket "${bucket}" --arg key "${copy_key}" '{bucket:$bucket, key:$key}')" log "LightningStor distributed replication: DELETE ${bucket}/${key}" output="$( grpcurl_capture -plaintext \ @@ -3860,6 +4017,16 @@ validate_lightningstor_distributed_storage() { 127.0.0.1:15086 lightningstor.v1.ObjectService/DeleteObject )" || die "failed to delete LightningStor distributed replication probe ${bucket}/${key}: ${output}" + log "LightningStor distributed replication: DELETE ${bucket}/${copy_key}" + output="$( + grpcurl_capture -plaintext \ + -H "authorization: Bearer ${token}" \ + -import-path "${LIGHTNINGSTOR_PROTO_DIR}" \ + -proto "${LIGHTNINGSTOR_PROTO}" \ + -d "${copy_delete_json}" \ + 127.0.0.1:15086 lightningstor.v1.ObjectService/DeleteObject + )" || die "failed to delete copied LightningStor object ${bucket}/${copy_key}: ${output}" + wait_for_lightningstor_counts_equal "${before_node01}" "${before_node04}" "${before_node05}" "generic object cleanup" trap - RETURN