use crate::config::{ decode_value_with_ts, encode_namespaced_key, encode_value_with_ts, ConsistencyMode, NamespaceManager, // Renamed from ServerConfig }; use crate::store::Store; use flaredb_raft::FlareRaftNode; use flaredb_proto::kvrpc::kv_cas_server::KvCas; use flaredb_proto::kvrpc::kv_raw_server::KvRaw; use flaredb_proto::kvrpc::{ CasRequest, CasResponse, DeleteRequest, DeleteResponse, GetRequest, GetResponse, RawDeleteRequest, RawDeleteResponse, RawGetRequest, RawGetResponse, RawPutRequest, RawPutResponse, RawScanRequest, RawScanResponse, ScanRequest, ScanResponse, VersionedKv, }; use flaredb_storage::rocks_engine::RocksEngine; use flaredb_storage::StorageEngine; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use tonic::{Request, Response, Status}; /// Default limit for scan operations const DEFAULT_SCAN_LIMIT: usize = 100; /// Maximum limit for scan operations const MAX_SCAN_LIMIT: usize = 10000; #[derive(Clone)] pub struct KvServiceImpl { engine: Arc, namespace_manager: Arc, // Renamed field store: Arc, } impl KvServiceImpl { pub fn new(engine: Arc, namespace_manager: Arc, store: Arc) -> Self { Self { engine, namespace_manager, store, } } fn resolve_namespace<'a>(&self, ns: &'a str) -> (&'a str, ConsistencyMode, u32) { let name = if ns.is_empty() { "default" } else { ns }; let cfg = self.namespace_manager.get_namespace(name); // Use namespace_manager (name, cfg.mode, cfg.id) } fn now_millis() -> u64 { SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_millis() as u64 } async fn route_raft_node(&self, key: &[u8]) -> Result>, Status> { let region_id = self .store .route_key(key) .await .ok_or_else(|| Status::failed_precondition("no region found for key"))?; Ok(self.store.get_raft_node(region_id).await) } } #[tonic::async_trait] impl KvRaw for KvServiceImpl { async fn raw_put( &self, request: Request, ) -> Result, Status> { let req = request.into_inner(); let (ns, mode, ns_id) = self.resolve_namespace(&req.namespace); if !matches!(mode, ConsistencyMode::Eventual) { return Err(Status::failed_precondition(format!( "namespace '{}' is not eventual (mode={:?})", ns, mode ))); } let encoded = encode_namespaced_key(ns_id, &req.key); let ts = Self::now_millis(); if let Some(node) = self.route_raft_node(&encoded).await? { node.write_kv(ns_id, req.key, req.value, ts) .await .map_err(|e| Status::failed_precondition(format!("raft raw_put failed: {}", e)))?; } else { let encoded_val = encode_value_with_ts(ts, &req.value); // LWW guard: skip if existing value is newer. if let Ok(existing) = self.engine.get_raw(&encoded).await { if let Some(val) = existing { let (old_ts, _) = decode_value_with_ts(&val); if old_ts > ts { return Ok(Response::new(RawPutResponse { success: true })); } } } self.engine .put_raw(&encoded, &encoded_val) .await .map_err(|e| Status::internal(e.to_string()))?; } Ok(Response::new(RawPutResponse { success: true })) } async fn raw_get( &self, request: Request, ) -> Result, Status> { let req = request.into_inner(); let (ns, mode, ns_id) = self.resolve_namespace(&req.namespace); if !matches!(mode, ConsistencyMode::Eventual) { return Err(Status::failed_precondition(format!( "namespace '{}' is not eventual (mode={:?})", ns, mode ))); } let encoded = encode_namespaced_key(ns_id, &req.key); if let Some(node) = self.route_raft_node(&encoded).await? { let val = node.read_kv(ns_id, &req.key).await; Ok(Response::new(RawGetResponse { found: val.is_some(), value: val.map(|(value, _ts)| value).unwrap_or_default(), })) } else { let val = self .engine .get_raw(&encoded) .await .map_err(|e| Status::internal(e.to_string()))?; Ok(Response::new(RawGetResponse { found: val.is_some(), value: val.map(|v| decode_value_with_ts(&v).1).unwrap_or_default(), })) } } async fn raw_scan( &self, request: Request, ) -> Result, Status> { let req = request.into_inner(); let (ns, mode, ns_id) = self.resolve_namespace(&req.namespace); if !matches!(mode, ConsistencyMode::Eventual) { return Err(Status::failed_precondition(format!( "namespace '{}' is not eventual (mode={:?})", ns, mode ))); } let limit = if req.limit == 0 || req.limit as usize > MAX_SCAN_LIMIT { DEFAULT_SCAN_LIMIT } else { req.limit as usize }; // Encode keys with namespace prefix for region routing only. let start = encode_namespaced_key(ns_id, &req.start_key); if let Some(node) = self.route_raft_node(&start).await? { let entries = node .scan_kv(ns_id, &req.start_key, &req.end_key, limit + 1) .await; let has_more = entries.len() > limit; let actual_entries = if has_more { &entries[..limit] } else { &entries[..] }; let (keys, values): (Vec<_>, Vec<_>) = actual_entries .iter() .map(|(key, value, _ts)| (key.clone(), value.clone())) .unzip(); let next_key = if has_more { entries[limit].0.clone() } else { vec![] }; Ok(Response::new(RawScanResponse { keys, values, has_more, next_key, })) } else { let end = if req.end_key.is_empty() { // Scan to end of namespace (next namespace prefix) encode_namespaced_key(ns_id + 1, &[]) } else { encode_namespaced_key(ns_id, &req.end_key) }; // Fetch one extra to detect has_more let entries = self .engine .scan_raw(&start, &end, limit + 1) .await .map_err(|e| Status::internal(e.to_string()))?; let has_more = entries.len() > limit; let actual_entries = if has_more { &entries[..limit] } else { &entries[..] }; let (keys, values): (Vec<_>, Vec<_>) = actual_entries .iter() .map(|(k, v)| { let user_key = k[4..].to_vec(); let (_, decoded_value) = decode_value_with_ts(v); (user_key, decoded_value) }) .unzip(); let next_key = if has_more { entries[limit].0[4..].to_vec() } else { vec![] }; Ok(Response::new(RawScanResponse { keys, values, has_more, next_key, })) } } async fn raw_delete( &self, request: Request, ) -> Result, Status> { let req = request.into_inner(); let (ns, mode, ns_id) = self.resolve_namespace(&req.namespace); if !matches!(mode, ConsistencyMode::Eventual) { return Err(Status::failed_precondition(format!( "namespace '{}' is not eventual (mode={:?})", ns, mode ))); } let encoded = encode_namespaced_key(ns_id, &req.key); let ts = Self::now_millis(); if let Some(node) = self.route_raft_node(&encoded).await? { let existed = node.read_kv(ns_id, &req.key).await.is_some(); node.delete_kv(ns_id, req.key, ts) .await .map_err(|e| Status::failed_precondition(format!("raft raw_delete failed: {}", e)))?; Ok(Response::new(RawDeleteResponse { success: true, existed, })) } else { let existed = self .engine .get_raw(&encoded) .await .map_err(|e| Status::internal(e.to_string()))? .is_some(); self.engine .delete_raw(&encoded) .await .map_err(|e| Status::internal(e.to_string()))?; Ok(Response::new(RawDeleteResponse { success: true, existed, })) } } } #[tonic::async_trait] impl KvCas for KvServiceImpl { async fn compare_and_swap( &self, request: Request, ) -> Result, Status> { let req = request.into_inner(); let (ns, mode, ns_id) = self.resolve_namespace(&req.namespace); if !matches!(mode, ConsistencyMode::Strong) { return Err(Status::failed_precondition(format!( "namespace '{}' is not strong (mode={:?})", ns, mode ))); } let encoded = encode_namespaced_key(ns_id, &req.key); let ts = Self::now_millis(); // CAS is executed atomically within the Raft state machine. // This ensures linearizable semantics - no race between read and write. let node = self .route_raft_node(&encoded) .await? .ok_or_else(|| Status::failed_precondition("region not found for key"))?; if !node.is_leader().await { return Err(Status::failed_precondition( "not leader for strong namespace; redirect required", )); } // Single atomic CAS operation through Raft let response = node .cas_write(ns_id, encoded, req.value, req.expected_version, ts) .await .map_err(|e| Status::internal(format!("raft cas_write failed: {}", e)))?; // Extract CAS result from Raft response match response { flaredb_raft::FlareResponse::CasResult { success, current_version, new_version, } => Ok(Response::new(CasResponse { success, current_version, new_version, })), _ => Err(Status::internal("unexpected raft response for CAS")), } } async fn get(&self, request: Request) -> Result, Status> { let req = request.into_inner(); let (ns, mode, ns_id) = self.resolve_namespace(&req.namespace); if !matches!(mode, ConsistencyMode::Strong) { return Err(Status::failed_precondition(format!( "namespace '{}' is not strong (mode={:?})", ns, mode ))); } let encoded = encode_namespaced_key(ns_id, &req.key); // For strong consistency, use linearizable read through Raft state machine let node = self .route_raft_node(&encoded) .await? .ok_or_else(|| Status::failed_precondition("region not found for key"))?; // Linearizable read from Raft state machine (CAS data) let val_opt = node .linearizable_read_cas(ns_id, &encoded) .await .map_err(Status::failed_precondition)?; if let Some((value, version, _ts)) = val_opt { Ok(Response::new(GetResponse { found: true, value, version, })) } else { Ok(Response::new(GetResponse { found: false, value: Vec::new(), version: 0, })) } } async fn scan(&self, request: Request) -> Result, Status> { let req = request.into_inner(); let (ns, mode, ns_id) = self.resolve_namespace(&req.namespace); if !matches!(mode, ConsistencyMode::Strong) { return Err(Status::failed_precondition(format!( "namespace '{}' is not strong (mode={:?})", ns, mode ))); } let limit = if req.limit == 0 || req.limit as usize > MAX_SCAN_LIMIT { DEFAULT_SCAN_LIMIT } else { req.limit as usize }; // Encode keys with namespace prefix let start = encode_namespaced_key(ns_id, &req.start_key); let end = if req.end_key.is_empty() { encode_namespaced_key(ns_id + 1, &[]) } else { encode_namespaced_key(ns_id, &req.end_key) }; // For strong consistency, read from the replicated Raft state machine // after a linearizable barrier. CAS entries are stored with encoded // namespace-prefixed keys in the state machine, so scans must use the // encoded range and then strip the prefix for the user response. if let Some(node) = self.route_raft_node(&start).await? { node.linearizable_read_kv(ns_id, &req.start_key) .await .map_err(Status::failed_precondition)?; let state_machine_end = if req.end_key.is_empty() { Vec::new() } else { end.clone() }; let raw_entries = node .scan_cas(ns_id, &start, &state_machine_end, limit + 1) .await; let has_more = raw_entries.len() > limit; let actual_entries = if has_more { &raw_entries[..limit] } else { &raw_entries[..] }; let entries: Vec = actual_entries .iter() .map(|(key, value, version, _ts)| VersionedKv { key: key[4..].to_vec(), value: value.clone(), version: *version, }) .collect(); let next_key = if has_more { raw_entries[limit].0[4..].to_vec() } else { vec![] }; Ok(Response::new(ScanResponse { entries, has_more, next_key, })) } else { // Single-node / non-Raft fallback. let raw_entries = self .engine .scan_cas(&start, &end, limit + 1) .await .map_err(|e| Status::internal(e.to_string()))?; let has_more = raw_entries.len() > limit; let actual_entries = if has_more { &raw_entries[..limit] } else { &raw_entries[..] }; let entries: Vec = actual_entries .iter() .map(|(k, v, version)| { // Strip namespace prefix (4 bytes) and decode value let user_key = k[4..].to_vec(); let (_, decoded_value) = decode_value_with_ts(v); VersionedKv { key: user_key, value: decoded_value, version: *version, } }) .collect(); let next_key = if has_more { raw_entries[limit].0[4..].to_vec() } else { vec![] }; Ok(Response::new(ScanResponse { entries, has_more, next_key, })) } } async fn delete( &self, request: Request, ) -> Result, Status> { let req = request.into_inner(); let (ns, mode, ns_id) = self.resolve_namespace(&req.namespace); if !matches!(mode, ConsistencyMode::Strong) { return Err(Status::failed_precondition(format!( "namespace '{}' is not strong (mode={:?})", ns, mode ))); } let encoded = encode_namespaced_key(ns_id, &req.key); let ts = Self::now_millis(); // CAS delete is executed atomically within the Raft state machine let node = self .route_raft_node(&encoded) .await? .ok_or_else(|| Status::failed_precondition("region not found for key"))?; if !node.is_leader().await { return Err(Status::failed_precondition( "not leader for strong namespace; redirect required", )); } // Single atomic CAS delete operation through Raft let response = node .cas_delete(ns_id, encoded, req.expected_version, ts) .await .map_err(|e| Status::internal(format!("raft cas_delete failed: {}", e)))?; // Extract CAS delete result from Raft response match response { flaredb_raft::FlareResponse::CasDeleteResult { success, current_version, existed, } => Ok(Response::new(DeleteResponse { success, current_version, existed, })), _ => Err(Status::internal("unexpected raft response for CAS delete")), } } } #[cfg(test)] mod tests { use super::*; use crate::config::{Config, NamespaceManager}; use flaredb_proto::kvrpc::CasRequest; use flaredb_proto::kvrpc::GetRequest; use flaredb_proto::kvrpc::ScanRequest; use flaredb_types::RegionMeta; use std::collections::HashMap; use std::sync::Arc; use tempfile::TempDir; fn test_peer_addrs() -> Arc> { let mut addrs = HashMap::new(); addrs.insert(1, "127.0.0.1:50051".to_string()); Arc::new(addrs) } #[tokio::test] async fn get_returns_value_and_version() { let dir = TempDir::new().unwrap(); let engine = Arc::new(RocksEngine::new(dir.path().to_str().unwrap()).unwrap()); let config = Config::default(); let namespace_manager = Arc::new(NamespaceManager::from_config(&config)); let store = Arc::new(crate::store::Store::new( 1, engine.clone(), Arc::new(config), namespace_manager, test_peer_addrs(), )); store .bootstrap_regions(vec![( RegionMeta { id: 1, start_key: Vec::new(), end_key: Vec::new(), }, vec![1], )]) .await .unwrap(); // Wait for leader election in single-node cluster if let Some(node) = store.get_raft_node(1).await { node.trigger_election().await.unwrap(); tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; } let service = KvServiceImpl::new( engine, Arc::new(NamespaceManager::new(ConsistencyMode::Strong, HashMap::new())), // Use NamespaceManager directly store, ); // Write via CAS let req = CasRequest { key: b"k1".to_vec(), value: b"v1".to_vec(), expected_version: 0, namespace: "default".to_string(), }; service .compare_and_swap(Request::new(req)) .await .expect("cas"); let resp = service .get(Request::new(GetRequest { key: b"k1".to_vec(), namespace: "default".to_string(), })) .await .expect("get") .into_inner(); assert!(resp.found); assert_eq!(resp.version, 1); assert_eq!(resp.value, b"v1"); } #[tokio::test] async fn scan_returns_decoded_cas_keys() { let dir = TempDir::new().unwrap(); let engine = Arc::new(RocksEngine::new(dir.path().to_str().unwrap()).unwrap()); let config = Config::default(); let namespace_manager = Arc::new(NamespaceManager::from_config(&config)); let store = Arc::new(crate::store::Store::new( 1, engine.clone(), Arc::new(config), namespace_manager, test_peer_addrs(), )); store .bootstrap_regions(vec![( RegionMeta { id: 1, start_key: Vec::new(), end_key: Vec::new(), }, vec![1], )]) .await .unwrap(); if let Some(node) = store.get_raft_node(1).await { node.trigger_election().await.unwrap(); tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; } let service = KvServiceImpl::new( engine, Arc::new(NamespaceManager::new(ConsistencyMode::Strong, HashMap::new())), store, ); for (key, value) in [(b"k1".to_vec(), b"v1".to_vec()), (b"k2".to_vec(), b"v2".to_vec())] { service .compare_and_swap(Request::new(CasRequest { key, value, expected_version: 0, namespace: "default".to_string(), })) .await .expect("cas"); } let resp = service .scan(Request::new(ScanRequest { start_key: b"k".to_vec(), end_key: Vec::new(), limit: 10, namespace: "default".to_string(), })) .await .expect("scan") .into_inner(); assert_eq!(resp.entries.len(), 2); assert_eq!(resp.entries[0].key, b"k1"); assert_eq!(resp.entries[0].value, b"v1"); assert_eq!(resp.entries[1].key, b"k2"); assert_eq!(resp.entries[1].value, b"v2"); } }