//! Metadata-oriented KV facade for Chainfire (and test backends). //! //! This module exists to standardize how UltraCloud services interact with //! control-plane metadata: versioned reads, CAS, prefix scans, etc. use async_trait::async_trait; use bytes::Bytes; use std::collections::BTreeMap; use std::sync::RwLock; use thiserror::Error; use tokio::sync::Mutex; use crate::{CasOutcome, Client as CfClient, ClientError as CfClientError}; #[derive(Debug, Error)] pub enum MetadataError { #[error("Connection error: {0}")] Connection(String), #[error("Backend error: {0}")] Backend(String), #[error("Conflict: expected version {expected}, actual {actual}")] Conflict { expected: u64, actual: u64 }, #[error("Not found")] NotFound, #[error("Serialization error: {0}")] Serialization(String), } pub type Result = std::result::Result; /// Key-value pair with version #[derive(Debug, Clone)] pub struct KvPair { pub key: Bytes, pub value: Bytes, pub version: u64, } /// Result of a CAS (Compare-And-Swap) operation #[derive(Debug, Clone)] pub enum CasResult { /// CAS succeeded, returning the new version Success(u64), /// CAS failed due to version mismatch or not found Conflict { expected: u64, actual: u64 }, /// Key not found (when expected version > 0) NotFound, } #[async_trait] pub trait MetadataClient: Send + Sync { /// Get a value by key async fn get(&self, key: &[u8]) -> Result>; /// Put a value (unconditional write) async fn put(&self, key: &[u8], value: &[u8]) -> Result; /// Compare-and-swap write /// - If expected_version is 0, only succeeds if key doesn't exist /// - Otherwise, only succeeds if current version matches expected_version async fn cas(&self, key: &[u8], expected_version: u64, value: &[u8]) -> Result; /// Delete a key async fn delete(&self, key: &[u8]) -> Result; /// Scan keys with a prefix async fn scan_prefix(&self, prefix: &[u8], limit: u32) -> Result>; /// Scan keys in a range [start, end) async fn scan_range(&self, start: &[u8], end: &[u8], limit: u32) -> Result>; /// Scan all keys with a prefix (best-effort pagination using `scan_range`). /// /// This exists because `scan_prefix` is intentionally bounded by a `limit` but many /// control-plane callers need "list everything under a prefix" semantics. async fn scan_prefix_all(&self, prefix: &[u8]) -> Result> { const PAGE_SIZE: u32 = 1024; let end = prefix_end(prefix); if end.is_empty() { // Prefix has no lexicographic successor (or is empty). Fall back to a single page. return self.scan_prefix(prefix, PAGE_SIZE).await; } let mut out = Vec::new(); let mut start = prefix.to_vec(); loop { let batch = self.scan_range(&start, &end, PAGE_SIZE).await?; if batch.is_empty() { break; } let last_key = batch .last() .map(|kv| kv.key.clone()) .unwrap_or_else(Bytes::new); out.extend(batch); let next = next_key_after(last_key.as_ref()); if next <= start { // Defensive: avoid infinite loops if the backend returns unsorted/duplicate keys. break; } start = next; } Ok(out) } } fn prefix_end(prefix: &[u8]) -> Vec { let mut end = prefix.to_vec(); for i in (0..end.len()).rev() { if end[i] < 0xff { end[i] += 1; end.truncate(i + 1); return end; } } Vec::new() } fn next_key_after(key: &[u8]) -> Vec { let mut next = key.to_vec(); next.push(0); next } // ============================================================================ // Chainfire Implementation // ============================================================================ /// Thread-safe metadata client backed by the Chainfire gRPC client. pub struct ChainfireClient { client: Mutex, } impl ChainfireClient { pub async fn new(endpoints: Vec) -> Result { let client = Self::connect_any(&endpoints).await?; Ok(Self { client: Mutex::new(client), }) } async fn connect_any(endpoints: &[String]) -> Result { let mut last_err = None; for ep in endpoints { let addr = if ep.starts_with("http://") || ep.starts_with("https://") { ep.clone() } else { format!("http://{}", ep) }; match CfClient::connect(addr.clone()).await { Ok(client) => return Ok(client), Err(e) => { last_err = Some(e); } } } Err(MetadataError::Connection( last_err .map(|e| e.to_string()) .unwrap_or_else(|| "no endpoints available".into()), )) } } #[async_trait] impl MetadataClient for ChainfireClient { async fn get(&self, key: &[u8]) -> Result> { let mut client = self.client.lock().await; let result = client .get_with_revision(key) .await .map_err(map_chainfire_error)?; Ok(result.map(|(v, rev)| (Bytes::from(v), rev))) } async fn put(&self, key: &[u8], value: &[u8]) -> Result { let mut client = self.client.lock().await; client.put(key, value).await.map_err(map_chainfire_error) } async fn cas(&self, key: &[u8], expected_version: u64, value: &[u8]) -> Result { let mut client = self.client.lock().await; let outcome: CasOutcome = client .compare_and_swap(key, expected_version, value) .await .map_err(map_chainfire_error)?; if outcome.success { return Ok(CasResult::Success(outcome.new_version)); } if expected_version == 0 { if outcome.current_version == 0 { Ok(CasResult::NotFound) } else { Ok(CasResult::Conflict { expected: 0, actual: outcome.current_version, }) } } else { Ok(CasResult::Conflict { expected: expected_version, actual: outcome.current_version, }) } } async fn delete(&self, key: &[u8]) -> Result { let mut client = self.client.lock().await; client.delete(key).await.map_err(map_chainfire_error) } async fn scan_prefix(&self, prefix: &[u8], limit: u32) -> Result> { let mut client = self.client.lock().await; let (results, _) = client .scan_prefix(prefix, limit as i64) .await .map_err(map_chainfire_error)?; Ok(results .into_iter() .map(|(k, v, ver)| KvPair { key: Bytes::from(k), value: Bytes::from(v), version: ver, }) .collect()) } async fn scan_range(&self, start: &[u8], end: &[u8], limit: u32) -> Result> { let mut client = self.client.lock().await; let (results, _) = client .scan_range(start, end, limit as i64) .await .map_err(map_chainfire_error)?; Ok(results .into_iter() .map(|(k, v, ver)| KvPair { key: Bytes::from(k), value: Bytes::from(v), version: ver, }) .collect()) } } fn map_chainfire_error(err: CfClientError) -> MetadataError { match err { CfClientError::Connection(msg) => MetadataError::Connection(msg), CfClientError::Transport(e) => MetadataError::Connection(e.to_string()), CfClientError::Rpc(status) => MetadataError::Backend(status.to_string()), other => MetadataError::Backend(other.to_string()), } } // ============================================================================ // Memory Implementation // ============================================================================ pub struct MemoryClient { data: RwLock, (Vec, u64)>>, version_counter: RwLock, } impl MemoryClient { pub fn new() -> Self { Self { data: RwLock::new(BTreeMap::new()), version_counter: RwLock::new(0), } } fn next_version(&self) -> u64 { let mut counter = self.version_counter.write().unwrap(); *counter += 1; *counter } } impl Default for MemoryClient { fn default() -> Self { Self::new() } } #[async_trait] impl MetadataClient for MemoryClient { async fn get(&self, key: &[u8]) -> Result> { let data = self.data.read().unwrap(); Ok(data .get(key) .map(|(v, ver)| (Bytes::copy_from_slice(v), *ver))) } async fn put(&self, key: &[u8], value: &[u8]) -> Result { let version = self.next_version(); let mut data = self.data.write().unwrap(); data.insert(key.to_vec(), (value.to_vec(), version)); Ok(version) } async fn cas(&self, key: &[u8], expected_version: u64, value: &[u8]) -> Result { let mut data = self.data.write().unwrap(); match data.get(key) { Some((_, current_version)) => { if *current_version != expected_version { return Ok(CasResult::Conflict { expected: expected_version, actual: *current_version, }); } } None => { if expected_version != 0 { return Ok(CasResult::NotFound); } } } let version = self.next_version(); data.insert(key.to_vec(), (value.to_vec(), version)); Ok(CasResult::Success(version)) } async fn delete(&self, key: &[u8]) -> Result { let mut data = self.data.write().unwrap(); Ok(data.remove(key).is_some()) } async fn scan_prefix(&self, prefix: &[u8], limit: u32) -> Result> { let data = self.data.read().unwrap(); let mut results = Vec::new(); for (k, (v, ver)) in data.range(prefix.to_vec()..) { if !k.starts_with(prefix) { break; } results.push(KvPair { key: Bytes::copy_from_slice(k), value: Bytes::copy_from_slice(v), version: *ver, }); if results.len() >= limit as usize { break; } } Ok(results) } async fn scan_range(&self, start: &[u8], end: &[u8], limit: u32) -> Result> { let data = self.data.read().unwrap(); let mut results = Vec::new(); for (k, (v, ver)) in data.range(start.to_vec()..end.to_vec()) { results.push(KvPair { key: Bytes::copy_from_slice(k), value: Bytes::copy_from_slice(v), version: *ver, }); if results.len() >= limit as usize { break; } } Ok(results) } }