//! Key-Value store operations use crate::{cf, meta_keys, RocksStore}; use chainfire_types::error::StorageError; use chainfire_types::kv::{KeyRange, KvEntry, Revision}; use rocksdb::WriteBatch; use std::sync::atomic::{AtomicU64, Ordering}; use tracing::{debug, trace}; /// KV store built on RocksDB pub struct KvStore { store: RocksStore, /// Current revision counter revision: AtomicU64, } impl KvStore { /// Create a new KV store pub fn new(store: RocksStore) -> Result { let revision = Self::load_revision(&store)?; Ok(Self { store, revision: AtomicU64::new(revision), }) } /// Load the current revision from storage fn load_revision(store: &RocksStore) -> Result { let cf = store .cf_handle(cf::META) .ok_or_else(|| StorageError::RocksDb("META cf not found".into()))?; match store .db() .get_cf(&cf, meta_keys::REVISION) .map_err(|e| StorageError::RocksDb(e.to_string()))? { Some(bytes) => { let revision: Revision = bincode::deserialize(&bytes) .map_err(|e| StorageError::Serialization(e.to_string()))?; Ok(revision) } None => Ok(0), } } /// Get current revision pub fn current_revision(&self) -> Revision { self.revision.load(Ordering::SeqCst) } /// Increment and return new revision fn next_revision(&self) -> Revision { self.revision.fetch_add(1, Ordering::SeqCst) + 1 } /// Persist current revision fn save_revision(&self, revision: Revision) -> Result<(), StorageError> { let cf = self .store .cf_handle(cf::META) .ok_or_else(|| StorageError::RocksDb("META cf not found".into()))?; let bytes = bincode::serialize(&revision).map_err(|e| StorageError::Serialization(e.to_string()))?; self.store .db() .put_cf(&cf, meta_keys::REVISION, bytes) .map_err(|e| StorageError::RocksDb(e.to_string()))?; Ok(()) } /// Get a single key pub fn get(&self, key: &[u8]) -> Result, StorageError> { let cf = self .store .cf_handle(cf::KV) .ok_or_else(|| StorageError::RocksDb("KV cf not found".into()))?; match self .store .db() .get_cf(&cf, key) .map_err(|e| StorageError::RocksDb(e.to_string()))? { Some(bytes) => { let entry: KvEntry = bincode::deserialize(&bytes) .map_err(|e| StorageError::Serialization(e.to_string()))?; Ok(Some(entry)) } None => Ok(None), } } /// Put a key-value pair pub fn put( &self, key: Vec, value: Vec, lease_id: Option, ) -> Result<(Revision, Option), StorageError> { let cf = self .store .cf_handle(cf::KV) .ok_or_else(|| StorageError::RocksDb("KV cf not found".into()))?; // Get previous entry let prev = self.get(&key)?; let revision = self.next_revision(); // Create new entry let entry = match &prev { Some(old) => old.update(value, revision), None => { if let Some(lease) = lease_id { KvEntry::with_lease(key.clone(), value, revision, lease) } else { KvEntry::new(key.clone(), value, revision) } } }; // Write to RocksDB let bytes = bincode::serialize(&entry).map_err(|e| StorageError::Serialization(e.to_string()))?; let mut batch = WriteBatch::default(); batch.put_cf(&cf, &key, &bytes); // Also persist revision let meta_cf = self .store .cf_handle(cf::META) .ok_or_else(|| StorageError::RocksDb("META cf not found".into()))?; let rev_bytes = bincode::serialize(&revision) .map_err(|e| StorageError::Serialization(e.to_string()))?; batch.put_cf(&meta_cf, meta_keys::REVISION, &rev_bytes); self.store .db() .write(batch) .map_err(|e| StorageError::RocksDb(e.to_string()))?; debug!(key = ?String::from_utf8_lossy(&key), revision, "Put key"); Ok((revision, prev)) } /// Delete a single key pub fn delete(&self, key: &[u8]) -> Result<(Revision, Option), StorageError> { let cf = self .store .cf_handle(cf::KV) .ok_or_else(|| StorageError::RocksDb("KV cf not found".into()))?; // Get previous entry let prev = self.get(key)?; if prev.is_none() { return Ok((self.current_revision(), None)); } let revision = self.next_revision(); // Delete from RocksDB let mut batch = WriteBatch::default(); batch.delete_cf(&cf, key); // Persist revision let meta_cf = self .store .cf_handle(cf::META) .ok_or_else(|| StorageError::RocksDb("META cf not found".into()))?; let rev_bytes = bincode::serialize(&revision) .map_err(|e| StorageError::Serialization(e.to_string()))?; batch.put_cf(&meta_cf, meta_keys::REVISION, &rev_bytes); self.store .db() .write(batch) .map_err(|e| StorageError::RocksDb(e.to_string()))?; debug!(key = ?String::from_utf8_lossy(key), revision, "Deleted key"); Ok((revision, prev)) } /// Delete a range of keys pub fn delete_range( &self, start: &[u8], end: &[u8], ) -> Result<(Revision, Vec), StorageError> { let cf = self .store .cf_handle(cf::KV) .ok_or_else(|| StorageError::RocksDb("KV cf not found".into()))?; // First, collect all keys to delete let entries = self.range(start, Some(end))?; if entries.is_empty() { return Ok((self.current_revision(), Vec::new())); } let revision = self.next_revision(); // Delete all keys let mut batch = WriteBatch::default(); for entry in &entries { batch.delete_cf(&cf, &entry.key); } // Persist revision let meta_cf = self .store .cf_handle(cf::META) .ok_or_else(|| StorageError::RocksDb("META cf not found".into()))?; let rev_bytes = bincode::serialize(&revision) .map_err(|e| StorageError::Serialization(e.to_string()))?; batch.put_cf(&meta_cf, meta_keys::REVISION, &rev_bytes); self.store .db() .write(batch) .map_err(|e| StorageError::RocksDb(e.to_string()))?; debug!( start = ?String::from_utf8_lossy(start), end = ?String::from_utf8_lossy(end), deleted = entries.len(), revision, "Deleted range" ); Ok((revision, entries)) } /// Scan a range of keys pub fn range(&self, start: &[u8], end: Option<&[u8]>) -> Result, StorageError> { let cf = self .store .cf_handle(cf::KV) .ok_or_else(|| StorageError::RocksDb("KV cf not found".into()))?; let mut entries = Vec::new(); let iter = self.store.db().iterator_cf( &cf, rocksdb::IteratorMode::From(start, rocksdb::Direction::Forward), ); for item in iter { let (key, value) = item.map_err(|e| StorageError::RocksDb(e.to_string()))?; // Check if we've passed the end if let Some(end_key) = end { if key.as_ref() >= end_key { break; } } let entry: KvEntry = bincode::deserialize(&value) .map_err(|e| StorageError::Serialization(e.to_string()))?; entries.push(entry); } trace!( start = ?String::from_utf8_lossy(start), count = entries.len(), "Range scan" ); Ok(entries) } /// Scan keys with a prefix pub fn prefix(&self, prefix: &[u8]) -> Result, StorageError> { let range = KeyRange::prefix(prefix); self.range(&range.start, range.end.as_deref()) } /// Get the underlying store pub fn store(&self) -> &RocksStore { &self.store } } #[cfg(test)] mod tests { use super::*; use tempfile::tempdir; fn create_test_store() -> KvStore { let dir = tempdir().unwrap(); let store = RocksStore::new(dir.path()).unwrap(); KvStore::new(store).unwrap() } #[test] fn test_put_and_get() { let kv = create_test_store(); let (rev, prev) = kv.put(b"key1".to_vec(), b"value1".to_vec(), None).unwrap(); assert_eq!(rev, 1); assert!(prev.is_none()); let entry = kv.get(b"key1").unwrap().unwrap(); assert_eq!(entry.key, b"key1"); assert_eq!(entry.value, b"value1"); assert_eq!(entry.version, 1); } #[test] fn test_update() { let kv = create_test_store(); kv.put(b"key1".to_vec(), b"value1".to_vec(), None).unwrap(); let (rev, prev) = kv.put(b"key1".to_vec(), b"value2".to_vec(), None).unwrap(); assert_eq!(rev, 2); assert!(prev.is_some()); assert_eq!(prev.unwrap().value, b"value1"); let entry = kv.get(b"key1").unwrap().unwrap(); assert_eq!(entry.value, b"value2"); assert_eq!(entry.version, 2); assert_eq!(entry.create_revision, 1); // Unchanged assert_eq!(entry.mod_revision, 2); } #[test] fn test_delete() { let kv = create_test_store(); kv.put(b"key1".to_vec(), b"value1".to_vec(), None).unwrap(); let (rev, prev) = kv.delete(b"key1").unwrap(); assert_eq!(rev, 2); assert!(prev.is_some()); assert_eq!(prev.unwrap().value, b"value1"); let entry = kv.get(b"key1").unwrap(); assert!(entry.is_none()); } #[test] fn test_delete_nonexistent() { let kv = create_test_store(); let (rev, prev) = kv.delete(b"nonexistent").unwrap(); assert_eq!(rev, 0); assert!(prev.is_none()); } #[test] fn test_range() { let kv = create_test_store(); kv.put(b"a".to_vec(), b"1".to_vec(), None).unwrap(); kv.put(b"b".to_vec(), b"2".to_vec(), None).unwrap(); kv.put(b"c".to_vec(), b"3".to_vec(), None).unwrap(); kv.put(b"d".to_vec(), b"4".to_vec(), None).unwrap(); let entries = kv.range(b"b", Some(b"d")).unwrap(); assert_eq!(entries.len(), 2); assert_eq!(entries[0].key, b"b"); assert_eq!(entries[1].key, b"c"); } #[test] fn test_prefix() { let kv = create_test_store(); kv.put(b"/nodes/1".to_vec(), b"node1".to_vec(), None) .unwrap(); kv.put(b"/nodes/2".to_vec(), b"node2".to_vec(), None) .unwrap(); kv.put(b"/tasks/1".to_vec(), b"task1".to_vec(), None) .unwrap(); let entries = kv.prefix(b"/nodes/").unwrap(); assert_eq!(entries.len(), 2); } #[test] fn test_delete_range() { let kv = create_test_store(); kv.put(b"/nodes/1".to_vec(), b"node1".to_vec(), None) .unwrap(); kv.put(b"/nodes/2".to_vec(), b"node2".to_vec(), None) .unwrap(); kv.put(b"/tasks/1".to_vec(), b"task1".to_vec(), None) .unwrap(); let (rev, deleted) = kv.delete_range(b"/nodes/", b"/nodes0").unwrap(); assert_eq!(deleted.len(), 2); // Verify nodes are gone assert!(kv.get(b"/nodes/1").unwrap().is_none()); assert!(kv.get(b"/nodes/2").unwrap().is_none()); // Verify task still exists assert!(kv.get(b"/tasks/1").unwrap().is_some()); } #[test] fn test_revision_persistence() { let dir = tempdir().unwrap(); // Create store and write some data { let store = RocksStore::new(dir.path()).unwrap(); let kv = KvStore::new(store).unwrap(); kv.put(b"key1".to_vec(), b"value1".to_vec(), None).unwrap(); kv.put(b"key2".to_vec(), b"value2".to_vec(), None).unwrap(); assert_eq!(kv.current_revision(), 2); } // Reopen and verify revision is restored { let store = RocksStore::new(dir.path()).unwrap(); let kv = KvStore::new(store).unwrap(); assert_eq!(kv.current_revision(), 2); // Next write should continue from 3 let (rev, _) = kv.put(b"key3".to_vec(), b"value3".to_vec(), None).unwrap(); assert_eq!(rev, 3); } } }