use flaredb_proto::kvrpc::kv_cas_client::KvCasClient; use flaredb_proto::kvrpc::kv_raw_client::KvRawClient; use flaredb_proto::kvrpc::{ CasRequest, DeleteRequest, GetRequest, RawDeleteRequest, RawGetRequest, RawPutRequest, RawScanRequest, }; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::Mutex; use tonic::transport::Channel; use flaredb_proto::pdpb::pd_client::PdClient; use flaredb_proto::pdpb::tso_client::TsoClient; use flaredb_proto::pdpb::{GetRegionRequest, TsoRequest}; use crate::region_cache::RegionCache; pub struct RdbClient { // We need a map of addr -> Channel/Client to reuse connections // Or just create on fly for MVP? Connection creation is expensive. // Let's cache channels. channels: Arc>>, direct_addr: Option, // Clients for PD (fixed) tso_client: TsoClient, pd_client: PdClient, region_cache: RegionCache, namespace: String, } impl RdbClient { pub async fn connect_with_pd( _server_addr: String, pd_addr: String, ) -> Result { Self::connect_with_pd_namespace(_server_addr, pd_addr, String::new()).await } pub async fn connect_with_pd_namespace( _server_addr: String, pd_addr: String, namespace: impl Into, ) -> Result { // server_addr is intentionally unused for now; once the region cache is populated we route via PD. let pd_ep = format!("http://{}", pd_addr); let pd_channel = Channel::from_shared(pd_ep).unwrap().connect().await?; Ok(Self { channels: Arc::new(Mutex::new(HashMap::new())), direct_addr: None, tso_client: TsoClient::new(pd_channel.clone()), pd_client: PdClient::new(pd_channel), region_cache: RegionCache::new(), namespace: namespace.into(), }) } /// Connect directly to a single FlareDB server without PD/region lookup. pub async fn connect_direct( server_addr: String, namespace: impl Into, ) -> Result { let ep = format!("http://{}", server_addr); let channel = Channel::from_shared(ep).unwrap().connect().await?; Ok(Self { channels: Arc::new(Mutex::new(HashMap::new())), direct_addr: Some(server_addr), tso_client: TsoClient::new(channel.clone()), pd_client: PdClient::new(channel), region_cache: RegionCache::new(), namespace: namespace.into(), }) } async fn resolve_addr(&self, key: &[u8]) -> Result { if let Some(addr) = &self.direct_addr { return Ok(addr.clone()); } if let Some(addr) = self.region_cache.get_store_addr(key).await { return Ok(addr); } let mut pd_c = self.pd_client.clone(); let req = GetRegionRequest { key: key.to_vec() }; let resp = pd_c.get_region(req).await?.into_inner(); if let (Some(region), Some(leader)) = (resp.region, resp.leader) { self.region_cache.update(region, leader.clone()).await; Ok(leader.addr) } else { Err(tonic::Status::not_found("Region not found")) } } async fn get_channel(&self, addr: &str) -> Result { let mut map = self.channels.lock().await; if let Some(chan) = map.get(addr) { return Ok(chan.clone()); } let ep = format!("http://{}", addr); let chan = Channel::from_shared(ep).unwrap().connect().await?; map.insert(addr.to_string(), chan.clone()); Ok(chan) } pub async fn get_tso(&mut self) -> Result { let req = TsoRequest { count: 1 }; let resp = self.tso_client.get_timestamp(req).await?.into_inner(); Ok(resp.timestamp) } pub async fn raw_put(&mut self, key: Vec, value: Vec) -> Result<(), tonic::Status> { let addr = self.resolve_addr(&key).await?; let channel = self .get_channel(&addr) .await .map_err(|e| tonic::Status::internal(e.to_string()))?; let mut client = KvRawClient::new(channel); let req = RawPutRequest { key, value, namespace: self.namespace.clone(), }; client.raw_put(req).await?; Ok(()) } pub async fn raw_get(&mut self, key: Vec) -> Result>, tonic::Status> { let addr = self.resolve_addr(&key).await?; let channel = self .get_channel(&addr) .await .map_err(|e| tonic::Status::internal(e.to_string()))?; let mut client = KvRawClient::new(channel); let req = RawGetRequest { key, namespace: self.namespace.clone(), }; let resp = client.raw_get(req).await?.into_inner(); if resp.found { Ok(Some(resp.value)) } else { Ok(None) } } pub async fn raw_delete(&mut self, key: Vec) -> Result { let addr = self.resolve_addr(&key).await?; let channel = self .get_channel(&addr) .await .map_err(|e| tonic::Status::internal(e.to_string()))?; let mut client = KvRawClient::new(channel); let req = RawDeleteRequest { key, namespace: self.namespace.clone(), }; let resp = client.raw_delete(req).await?.into_inner(); Ok(resp.existed) } /// Scan a range of keys in raw (eventual consistency) mode. /// /// Returns (keys, values, next_key if has_more). pub async fn raw_scan( &mut self, start_key: Vec, end_key: Vec, limit: u32, ) -> Result<(Vec>, Vec>, Option>), tonic::Status> { let addr = self.resolve_addr(&start_key).await?; let channel = self .get_channel(&addr) .await .map_err(|e| tonic::Status::internal(e.to_string()))?; let mut client = KvRawClient::new(channel); let req = RawScanRequest { start_key, end_key, limit, namespace: self.namespace.clone(), }; let resp = client.raw_scan(req).await?.into_inner(); let next = if resp.has_more { Some(resp.next_key) } else { None }; Ok((resp.keys, resp.values, next)) } pub async fn cas( &mut self, key: Vec, value: Vec, expected_version: u64, ) -> Result<(bool, u64, u64), tonic::Status> { let addr = self.resolve_addr(&key).await?; let channel = self .get_channel(&addr) .await .map_err(|e| tonic::Status::internal(e.to_string()))?; let mut client = KvCasClient::new(channel); let req = CasRequest { key, value, expected_version, namespace: self.namespace.clone(), }; let resp = client.compare_and_swap(req).await?.into_inner(); Ok((resp.success, resp.current_version, resp.new_version)) } pub async fn cas_get(&mut self, key: Vec) -> Result)>, tonic::Status> { let addr = self.resolve_addr(&key).await?; let channel = self .get_channel(&addr) .await .map_err(|e| tonic::Status::internal(e.to_string()))?; let mut client = KvCasClient::new(channel); let req = GetRequest { key, namespace: self.namespace.clone(), }; let resp = client.get(req).await?.into_inner(); if resp.found { Ok(Some((resp.version, resp.value))) } else { Ok(None) } } pub async fn cas_scan( &mut self, start_key: Vec, end_key: Vec, limit: u32, ) -> Result<(Vec<(Vec, Vec, u64)>, Option>), tonic::Status> { let addr = self.resolve_addr(&start_key).await?; let channel = self .get_channel(&addr) .await .map_err(|e| tonic::Status::internal(e.to_string()))?; let mut client = KvCasClient::new(channel); let req = flaredb_proto::kvrpc::ScanRequest { start_key, end_key, limit, namespace: self.namespace.clone(), }; let resp = client.scan(req).await?.into_inner(); let entries: Vec<(Vec, Vec, u64)> = resp .entries .into_iter() .map(|kv| (kv.key, kv.value, kv.version)) .collect(); let next = if resp.has_more { Some(resp.next_key) } else { None }; Ok((entries, next)) } pub async fn cas_delete( &mut self, key: Vec, expected_version: u64, ) -> Result<(bool, u64, bool), tonic::Status> { let addr = self.resolve_addr(&key).await?; let channel = self .get_channel(&addr) .await .map_err(|e| tonic::Status::internal(e.to_string()))?; let mut client = KvCasClient::new(channel); let req = DeleteRequest { key, expected_version, namespace: self.namespace.clone(), }; let resp = client.delete(req).await?.into_inner(); Ok((resp.success, resp.current_version, resp.existed)) } }