photoncloud-monorepo/chainfire/crates/chainfire-storage/src/lease_store.rs
centra 3eeb303dcb feat: Batch commit for T039.S3 deployment
Includes all pending changes needed for nixos-anywhere:
- fiberlb: L7 policy, rule, certificate types
- deployer: New service for cluster management
- nix-nos: Generic network modules
- Various service updates and fixes

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-13 04:34:51 +09:00

280 lines
8 KiB
Rust

//! Lease storage for TTL-based key expiration
//!
//! Manages lease lifecycle: grant, revoke, refresh, expiration.
use chainfire_types::error::StorageError;
use chainfire_types::lease::{Lease, LeaseData, LeaseId};
use dashmap::DashMap;
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use tracing::{debug, info};
/// Store for managing leases
pub struct LeaseStore {
/// Active leases: lease_id -> Lease
leases: DashMap<LeaseId, Lease>,
/// ID generator for new leases
next_id: AtomicI64,
/// Channel to notify of expired leases (lease_id, keys_to_delete)
expiration_tx: Option<mpsc::UnboundedSender<(LeaseId, Vec<Vec<u8>>)>>,
}
impl LeaseStore {
/// Create a new lease store
pub fn new() -> Self {
Self {
leases: DashMap::new(),
next_id: AtomicI64::new(1),
expiration_tx: None,
}
}
/// Set the expiration notification channel
pub fn set_expiration_sender(&mut self, tx: mpsc::UnboundedSender<(LeaseId, Vec<Vec<u8>>)>) {
self.expiration_tx = Some(tx);
}
/// Grant a new lease
pub fn grant(&self, id: LeaseId, ttl: i64) -> Result<Lease, StorageError> {
let lease_id = if id == 0 {
self.next_id.fetch_add(1, Ordering::SeqCst)
} else {
// Check if ID is already in use
if self.leases.contains_key(&id) {
return Err(StorageError::LeaseError(format!("Lease {} already exists", id)));
}
// Update next_id if necessary
let _ = self.next_id.fetch_max(id + 1, Ordering::SeqCst);
id
};
let lease = Lease::new(lease_id, ttl);
self.leases.insert(lease_id, lease.clone());
debug!(lease_id, ttl, "Lease granted");
Ok(lease)
}
/// Revoke a lease and return keys to delete
pub fn revoke(&self, id: LeaseId) -> Result<Vec<Vec<u8>>, StorageError> {
match self.leases.remove(&id) {
Some((_, lease)) => {
info!(lease_id = id, keys_count = lease.keys.len(), "Lease revoked");
Ok(lease.keys)
}
None => Err(StorageError::LeaseError(format!("Lease {} not found", id))),
}
}
/// Refresh a lease (keep-alive)
pub fn refresh(&self, id: LeaseId) -> Result<i64, StorageError> {
match self.leases.get_mut(&id) {
Some(mut lease) => {
lease.refresh();
let ttl = lease.ttl;
debug!(lease_id = id, ttl, "Lease refreshed");
Ok(ttl)
}
None => Err(StorageError::LeaseError(format!("Lease {} not found", id))),
}
}
/// Get a lease by ID
pub fn get(&self, id: LeaseId) -> Option<Lease> {
self.leases.get(&id).map(|l| l.clone())
}
/// Get remaining TTL for a lease
pub fn time_to_live(&self, id: LeaseId) -> Option<(i64, i64, Vec<Vec<u8>>)> {
self.leases.get(&id).map(|lease| {
(lease.remaining(), lease.ttl, lease.keys.clone())
})
}
/// List all lease IDs
pub fn list(&self) -> Vec<LeaseId> {
self.leases.iter().map(|entry| *entry.key()).collect()
}
/// Attach a key to a lease
pub fn attach_key(&self, lease_id: LeaseId, key: Vec<u8>) -> Result<(), StorageError> {
match self.leases.get_mut(&lease_id) {
Some(mut lease) => {
lease.attach_key(key);
Ok(())
}
None => Err(StorageError::LeaseError(format!("Lease {} not found", lease_id))),
}
}
/// Detach a key from a lease
pub fn detach_key(&self, lease_id: LeaseId, key: &[u8]) {
if let Some(mut lease) = self.leases.get_mut(&lease_id) {
lease.detach_key(key);
}
}
/// Check for expired leases and return their IDs and keys
pub fn collect_expired(&self) -> Vec<(LeaseId, Vec<Vec<u8>>)> {
let mut expired = Vec::new();
for entry in self.leases.iter() {
if entry.is_expired() {
expired.push((*entry.key(), entry.keys.clone()));
}
}
// Remove expired leases
for (id, _) in &expired {
self.leases.remove(id);
}
expired
}
/// Export all leases for snapshot
pub fn export(&self) -> Vec<LeaseData> {
self.leases
.iter()
.map(|entry| LeaseData::from_lease(&entry))
.collect()
}
/// Import leases from snapshot
pub fn import(&self, leases: Vec<LeaseData>) {
self.leases.clear();
for data in leases {
let id = data.id;
let lease = data.to_lease();
self.leases.insert(id, lease);
// Update next_id
let _ = self.next_id.fetch_max(id + 1, Ordering::SeqCst);
}
}
}
impl Default for LeaseStore {
fn default() -> Self {
Self::new()
}
}
/// Background worker that checks for expired leases
pub struct LeaseExpirationWorker {
store: Arc<LeaseStore>,
interval: Duration,
shutdown_rx: mpsc::Receiver<()>,
}
impl LeaseExpirationWorker {
/// Create a new expiration worker
pub fn new(
store: Arc<LeaseStore>,
interval: Duration,
shutdown_rx: mpsc::Receiver<()>,
) -> Self {
Self {
store,
interval,
shutdown_rx,
}
}
/// Run the expiration worker
pub async fn run(mut self, expire_callback: impl Fn(LeaseId, Vec<Vec<u8>>) + Send + 'static) {
let mut interval = tokio::time::interval(self.interval);
loop {
tokio::select! {
_ = interval.tick() => {
let expired = self.store.collect_expired();
for (lease_id, keys) in expired {
info!(lease_id, keys_count = keys.len(), "Lease expired");
expire_callback(lease_id, keys);
}
}
_ = self.shutdown_rx.recv() => {
info!("Lease expiration worker shutting down");
break;
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_lease_grant() {
let store = LeaseStore::new();
let lease = store.grant(0, 10).unwrap();
assert!(lease.id > 0);
assert_eq!(lease.ttl, 10);
}
#[test]
fn test_lease_grant_with_id() {
let store = LeaseStore::new();
let lease = store.grant(42, 10).unwrap();
assert_eq!(lease.id, 42);
}
#[test]
fn test_lease_revoke() {
let store = LeaseStore::new();
let lease = store.grant(0, 10).unwrap();
let id = lease.id;
// Attach some keys
store.attach_key(id, b"key1".to_vec()).unwrap();
store.attach_key(id, b"key2".to_vec()).unwrap();
let keys = store.revoke(id).unwrap();
assert_eq!(keys.len(), 2);
// Lease should be gone
assert!(store.get(id).is_none());
}
#[test]
fn test_lease_refresh() {
let store = LeaseStore::new();
let lease = store.grant(0, 10).unwrap();
let id = lease.id;
let ttl = store.refresh(id).unwrap();
assert_eq!(ttl, 10);
}
#[test]
fn test_lease_list() {
let store = LeaseStore::new();
store.grant(1, 10).unwrap();
store.grant(2, 10).unwrap();
store.grant(3, 10).unwrap();
let ids = store.list();
assert_eq!(ids.len(), 3);
}
#[test]
fn test_lease_attach_detach() {
let store = LeaseStore::new();
let lease = store.grant(0, 10).unwrap();
let id = lease.id;
store.attach_key(id, b"key1".to_vec()).unwrap();
store.attach_key(id, b"key2".to_vec()).unwrap();
let lease = store.get(id).unwrap();
assert_eq!(lease.keys.len(), 2);
store.detach_key(id, b"key1");
let lease = store.get(id).unwrap();
assert_eq!(lease.keys.len(), 1);
}
}