photoncloud-monorepo/chainfire/crates/chainfire-storage/src/kv_store.rs
centra 8f94aee1fa Fix R8: Convert submodule gitlinks to regular directories
- Remove gitlinks (160000 mode) for chainfire, flaredb, iam
- Add workspace contents as regular tracked files
- Update flake.nix to use simple paths instead of builtins.fetchGit

This resolves the nix build failure where submodule directories
appeared empty in the nix store.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-09 16:51:20 +09:00

435 lines
13 KiB
Rust

//! Key-Value store operations
use crate::{cf, meta_keys, RocksStore};
use chainfire_types::error::StorageError;
use chainfire_types::kv::{KeyRange, KvEntry, Revision};
use parking_lot::RwLock;
use rocksdb::WriteBatch;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tracing::{debug, trace};
/// KV store built on RocksDB
pub struct KvStore {
store: RocksStore,
/// Current revision counter
revision: AtomicU64,
}
impl KvStore {
/// Create a new KV store
pub fn new(store: RocksStore) -> Result<Self, StorageError> {
let revision = Self::load_revision(&store)?;
Ok(Self {
store,
revision: AtomicU64::new(revision),
})
}
/// Load the current revision from storage
fn load_revision(store: &RocksStore) -> Result<Revision, StorageError> {
let cf = store
.cf_handle(cf::META)
.ok_or_else(|| StorageError::RocksDb("META cf not found".into()))?;
match store
.db()
.get_cf(&cf, meta_keys::REVISION)
.map_err(|e| StorageError::RocksDb(e.to_string()))?
{
Some(bytes) => {
let revision: Revision = bincode::deserialize(&bytes)
.map_err(|e| StorageError::Serialization(e.to_string()))?;
Ok(revision)
}
None => Ok(0),
}
}
/// Get current revision
pub fn current_revision(&self) -> Revision {
self.revision.load(Ordering::SeqCst)
}
/// Increment and return new revision
fn next_revision(&self) -> Revision {
self.revision.fetch_add(1, Ordering::SeqCst) + 1
}
/// Persist current revision
fn save_revision(&self, revision: Revision) -> Result<(), StorageError> {
let cf = self
.store
.cf_handle(cf::META)
.ok_or_else(|| StorageError::RocksDb("META cf not found".into()))?;
let bytes =
bincode::serialize(&revision).map_err(|e| StorageError::Serialization(e.to_string()))?;
self.store
.db()
.put_cf(&cf, meta_keys::REVISION, bytes)
.map_err(|e| StorageError::RocksDb(e.to_string()))?;
Ok(())
}
/// Get a single key
pub fn get(&self, key: &[u8]) -> Result<Option<KvEntry>, StorageError> {
let cf = self
.store
.cf_handle(cf::KV)
.ok_or_else(|| StorageError::RocksDb("KV cf not found".into()))?;
match self
.store
.db()
.get_cf(&cf, key)
.map_err(|e| StorageError::RocksDb(e.to_string()))?
{
Some(bytes) => {
let entry: KvEntry = bincode::deserialize(&bytes)
.map_err(|e| StorageError::Serialization(e.to_string()))?;
Ok(Some(entry))
}
None => Ok(None),
}
}
/// Put a key-value pair
pub fn put(
&self,
key: Vec<u8>,
value: Vec<u8>,
lease_id: Option<i64>,
) -> Result<(Revision, Option<KvEntry>), StorageError> {
let cf = self
.store
.cf_handle(cf::KV)
.ok_or_else(|| StorageError::RocksDb("KV cf not found".into()))?;
// Get previous entry
let prev = self.get(&key)?;
let revision = self.next_revision();
// Create new entry
let entry = match &prev {
Some(old) => old.update(value, revision),
None => {
if let Some(lease) = lease_id {
KvEntry::with_lease(key.clone(), value, revision, lease)
} else {
KvEntry::new(key.clone(), value, revision)
}
}
};
// Write to RocksDB
let bytes =
bincode::serialize(&entry).map_err(|e| StorageError::Serialization(e.to_string()))?;
let mut batch = WriteBatch::default();
batch.put_cf(&cf, &key, &bytes);
// Also persist revision
let meta_cf = self
.store
.cf_handle(cf::META)
.ok_or_else(|| StorageError::RocksDb("META cf not found".into()))?;
let rev_bytes = bincode::serialize(&revision)
.map_err(|e| StorageError::Serialization(e.to_string()))?;
batch.put_cf(&meta_cf, meta_keys::REVISION, &rev_bytes);
self.store
.db()
.write(batch)
.map_err(|e| StorageError::RocksDb(e.to_string()))?;
debug!(key = ?String::from_utf8_lossy(&key), revision, "Put key");
Ok((revision, prev))
}
/// Delete a single key
pub fn delete(&self, key: &[u8]) -> Result<(Revision, Option<KvEntry>), StorageError> {
let cf = self
.store
.cf_handle(cf::KV)
.ok_or_else(|| StorageError::RocksDb("KV cf not found".into()))?;
// Get previous entry
let prev = self.get(key)?;
if prev.is_none() {
return Ok((self.current_revision(), None));
}
let revision = self.next_revision();
// Delete from RocksDB
let mut batch = WriteBatch::default();
batch.delete_cf(&cf, key);
// Persist revision
let meta_cf = self
.store
.cf_handle(cf::META)
.ok_or_else(|| StorageError::RocksDb("META cf not found".into()))?;
let rev_bytes = bincode::serialize(&revision)
.map_err(|e| StorageError::Serialization(e.to_string()))?;
batch.put_cf(&meta_cf, meta_keys::REVISION, &rev_bytes);
self.store
.db()
.write(batch)
.map_err(|e| StorageError::RocksDb(e.to_string()))?;
debug!(key = ?String::from_utf8_lossy(key), revision, "Deleted key");
Ok((revision, prev))
}
/// Delete a range of keys
pub fn delete_range(
&self,
start: &[u8],
end: &[u8],
) -> Result<(Revision, Vec<KvEntry>), StorageError> {
let cf = self
.store
.cf_handle(cf::KV)
.ok_or_else(|| StorageError::RocksDb("KV cf not found".into()))?;
// First, collect all keys to delete
let entries = self.range(start, Some(end))?;
if entries.is_empty() {
return Ok((self.current_revision(), Vec::new()));
}
let revision = self.next_revision();
// Delete all keys
let mut batch = WriteBatch::default();
for entry in &entries {
batch.delete_cf(&cf, &entry.key);
}
// Persist revision
let meta_cf = self
.store
.cf_handle(cf::META)
.ok_or_else(|| StorageError::RocksDb("META cf not found".into()))?;
let rev_bytes = bincode::serialize(&revision)
.map_err(|e| StorageError::Serialization(e.to_string()))?;
batch.put_cf(&meta_cf, meta_keys::REVISION, &rev_bytes);
self.store
.db()
.write(batch)
.map_err(|e| StorageError::RocksDb(e.to_string()))?;
debug!(
start = ?String::from_utf8_lossy(start),
end = ?String::from_utf8_lossy(end),
deleted = entries.len(),
revision,
"Deleted range"
);
Ok((revision, entries))
}
/// Scan a range of keys
pub fn range(&self, start: &[u8], end: Option<&[u8]>) -> Result<Vec<KvEntry>, StorageError> {
let cf = self
.store
.cf_handle(cf::KV)
.ok_or_else(|| StorageError::RocksDb("KV cf not found".into()))?;
let mut entries = Vec::new();
let iter = self.store.db().iterator_cf(
&cf,
rocksdb::IteratorMode::From(start, rocksdb::Direction::Forward),
);
for item in iter {
let (key, value) = item.map_err(|e| StorageError::RocksDb(e.to_string()))?;
// Check if we've passed the end
if let Some(end_key) = end {
if key.as_ref() >= end_key {
break;
}
}
let entry: KvEntry = bincode::deserialize(&value)
.map_err(|e| StorageError::Serialization(e.to_string()))?;
entries.push(entry);
}
trace!(
start = ?String::from_utf8_lossy(start),
count = entries.len(),
"Range scan"
);
Ok(entries)
}
/// Scan keys with a prefix
pub fn prefix(&self, prefix: &[u8]) -> Result<Vec<KvEntry>, StorageError> {
let range = KeyRange::prefix(prefix);
self.range(&range.start, range.end.as_deref())
}
/// Get the underlying store
pub fn store(&self) -> &RocksStore {
&self.store
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
fn create_test_store() -> KvStore {
let dir = tempdir().unwrap();
let store = RocksStore::new(dir.path()).unwrap();
KvStore::new(store).unwrap()
}
#[test]
fn test_put_and_get() {
let kv = create_test_store();
let (rev, prev) = kv.put(b"key1".to_vec(), b"value1".to_vec(), None).unwrap();
assert_eq!(rev, 1);
assert!(prev.is_none());
let entry = kv.get(b"key1").unwrap().unwrap();
assert_eq!(entry.key, b"key1");
assert_eq!(entry.value, b"value1");
assert_eq!(entry.version, 1);
}
#[test]
fn test_update() {
let kv = create_test_store();
kv.put(b"key1".to_vec(), b"value1".to_vec(), None).unwrap();
let (rev, prev) = kv.put(b"key1".to_vec(), b"value2".to_vec(), None).unwrap();
assert_eq!(rev, 2);
assert!(prev.is_some());
assert_eq!(prev.unwrap().value, b"value1");
let entry = kv.get(b"key1").unwrap().unwrap();
assert_eq!(entry.value, b"value2");
assert_eq!(entry.version, 2);
assert_eq!(entry.create_revision, 1); // Unchanged
assert_eq!(entry.mod_revision, 2);
}
#[test]
fn test_delete() {
let kv = create_test_store();
kv.put(b"key1".to_vec(), b"value1".to_vec(), None).unwrap();
let (rev, prev) = kv.delete(b"key1").unwrap();
assert_eq!(rev, 2);
assert!(prev.is_some());
assert_eq!(prev.unwrap().value, b"value1");
let entry = kv.get(b"key1").unwrap();
assert!(entry.is_none());
}
#[test]
fn test_delete_nonexistent() {
let kv = create_test_store();
let (rev, prev) = kv.delete(b"nonexistent").unwrap();
assert_eq!(rev, 0);
assert!(prev.is_none());
}
#[test]
fn test_range() {
let kv = create_test_store();
kv.put(b"a".to_vec(), b"1".to_vec(), None).unwrap();
kv.put(b"b".to_vec(), b"2".to_vec(), None).unwrap();
kv.put(b"c".to_vec(), b"3".to_vec(), None).unwrap();
kv.put(b"d".to_vec(), b"4".to_vec(), None).unwrap();
let entries = kv.range(b"b", Some(b"d")).unwrap();
assert_eq!(entries.len(), 2);
assert_eq!(entries[0].key, b"b");
assert_eq!(entries[1].key, b"c");
}
#[test]
fn test_prefix() {
let kv = create_test_store();
kv.put(b"/nodes/1".to_vec(), b"node1".to_vec(), None)
.unwrap();
kv.put(b"/nodes/2".to_vec(), b"node2".to_vec(), None)
.unwrap();
kv.put(b"/tasks/1".to_vec(), b"task1".to_vec(), None)
.unwrap();
let entries = kv.prefix(b"/nodes/").unwrap();
assert_eq!(entries.len(), 2);
}
#[test]
fn test_delete_range() {
let kv = create_test_store();
kv.put(b"/nodes/1".to_vec(), b"node1".to_vec(), None)
.unwrap();
kv.put(b"/nodes/2".to_vec(), b"node2".to_vec(), None)
.unwrap();
kv.put(b"/tasks/1".to_vec(), b"task1".to_vec(), None)
.unwrap();
let (rev, deleted) = kv.delete_range(b"/nodes/", b"/nodes0").unwrap();
assert_eq!(deleted.len(), 2);
// Verify nodes are gone
assert!(kv.get(b"/nodes/1").unwrap().is_none());
assert!(kv.get(b"/nodes/2").unwrap().is_none());
// Verify task still exists
assert!(kv.get(b"/tasks/1").unwrap().is_some());
}
#[test]
fn test_revision_persistence() {
let dir = tempdir().unwrap();
// Create store and write some data
{
let store = RocksStore::new(dir.path()).unwrap();
let kv = KvStore::new(store).unwrap();
kv.put(b"key1".to_vec(), b"value1".to_vec(), None).unwrap();
kv.put(b"key2".to_vec(), b"value2".to_vec(), None).unwrap();
assert_eq!(kv.current_revision(), 2);
}
// Reopen and verify revision is restored
{
let store = RocksStore::new(dir.path()).unwrap();
let kv = KvStore::new(store).unwrap();
assert_eq!(kv.current_revision(), 2);
// Next write should continue from 3
let (rev, _) = kv.put(b"key3".to_vec(), b"value3".to_vec(), None).unwrap();
assert_eq!(rev, 3);
}
}
}