//! Lease storage for TTL-based key expiration //! //! Manages lease lifecycle: grant, revoke, refresh, expiration. use chainfire_types::error::StorageError; use chainfire_types::lease::{Lease, LeaseData, LeaseId}; use dashmap::DashMap; use std::sync::atomic::{AtomicI64, Ordering}; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; use tracing::{debug, info}; /// Store for managing leases pub struct LeaseStore { /// Active leases: lease_id -> Lease leases: DashMap, /// ID generator for new leases next_id: AtomicI64, /// Channel to notify of expired leases (lease_id, keys_to_delete) expiration_tx: Option>)>>, } impl LeaseStore { /// Create a new lease store pub fn new() -> Self { Self { leases: DashMap::new(), next_id: AtomicI64::new(1), expiration_tx: None, } } /// Set the expiration notification channel pub fn set_expiration_sender(&mut self, tx: mpsc::UnboundedSender<(LeaseId, Vec>)>) { self.expiration_tx = Some(tx); } /// Grant a new lease pub fn grant(&self, id: LeaseId, ttl: i64) -> Result { let lease_id = if id == 0 { self.next_id.fetch_add(1, Ordering::SeqCst) } else { // Check if ID is already in use if self.leases.contains_key(&id) { return Err(StorageError::LeaseError(format!("Lease {} already exists", id))); } // Update next_id if necessary let _ = self.next_id.fetch_max(id + 1, Ordering::SeqCst); id }; let lease = Lease::new(lease_id, ttl); self.leases.insert(lease_id, lease.clone()); debug!(lease_id, ttl, "Lease granted"); Ok(lease) } /// Revoke a lease and return keys to delete pub fn revoke(&self, id: LeaseId) -> Result>, StorageError> { match self.leases.remove(&id) { Some((_, lease)) => { info!(lease_id = id, keys_count = lease.keys.len(), "Lease revoked"); Ok(lease.keys) } None => Err(StorageError::LeaseError(format!("Lease {} not found", id))), } } /// Refresh a lease (keep-alive) pub fn refresh(&self, id: LeaseId) -> Result { match self.leases.get_mut(&id) { Some(mut lease) => { lease.refresh(); let ttl = lease.ttl; debug!(lease_id = id, ttl, "Lease refreshed"); Ok(ttl) } None => Err(StorageError::LeaseError(format!("Lease {} not found", id))), } } /// Get a lease by ID pub fn get(&self, id: LeaseId) -> Option { self.leases.get(&id).map(|l| l.clone()) } /// Get remaining TTL for a lease pub fn time_to_live(&self, id: LeaseId) -> Option<(i64, i64, Vec>)> { self.leases.get(&id).map(|lease| { (lease.remaining(), lease.ttl, lease.keys.clone()) }) } /// List all lease IDs pub fn list(&self) -> Vec { self.leases.iter().map(|entry| *entry.key()).collect() } /// Attach a key to a lease pub fn attach_key(&self, lease_id: LeaseId, key: Vec) -> Result<(), StorageError> { match self.leases.get_mut(&lease_id) { Some(mut lease) => { lease.attach_key(key); Ok(()) } None => Err(StorageError::LeaseError(format!("Lease {} not found", lease_id))), } } /// Detach a key from a lease pub fn detach_key(&self, lease_id: LeaseId, key: &[u8]) { if let Some(mut lease) = self.leases.get_mut(&lease_id) { lease.detach_key(key); } } /// Check for expired leases and return their IDs and keys pub fn collect_expired(&self) -> Vec<(LeaseId, Vec>)> { let mut expired = Vec::new(); for entry in self.leases.iter() { if entry.is_expired() { expired.push((*entry.key(), entry.keys.clone())); } } // Remove expired leases for (id, _) in &expired { self.leases.remove(id); } expired } /// Export all leases for snapshot pub fn export(&self) -> Vec { self.leases .iter() .map(|entry| LeaseData::from_lease(&entry)) .collect() } /// Import leases from snapshot pub fn import(&self, leases: Vec) { self.leases.clear(); for data in leases { let id = data.id; let lease = data.to_lease(); self.leases.insert(id, lease); // Update next_id let _ = self.next_id.fetch_max(id + 1, Ordering::SeqCst); } } } impl Default for LeaseStore { fn default() -> Self { Self::new() } } /// Background worker that checks for expired leases pub struct LeaseExpirationWorker { store: Arc, interval: Duration, shutdown_rx: mpsc::Receiver<()>, } impl LeaseExpirationWorker { /// Create a new expiration worker pub fn new( store: Arc, interval: Duration, shutdown_rx: mpsc::Receiver<()>, ) -> Self { Self { store, interval, shutdown_rx, } } /// Run the expiration worker pub async fn run(mut self, expire_callback: impl Fn(LeaseId, Vec>) + Send + 'static) { let mut interval = tokio::time::interval(self.interval); loop { tokio::select! { _ = interval.tick() => { let expired = self.store.collect_expired(); for (lease_id, keys) in expired { info!(lease_id, keys_count = keys.len(), "Lease expired"); expire_callback(lease_id, keys); } } _ = self.shutdown_rx.recv() => { info!("Lease expiration worker shutting down"); break; } } } } } #[cfg(test)] mod tests { use super::*; #[test] fn test_lease_grant() { let store = LeaseStore::new(); let lease = store.grant(0, 10).unwrap(); assert!(lease.id > 0); assert_eq!(lease.ttl, 10); } #[test] fn test_lease_grant_with_id() { let store = LeaseStore::new(); let lease = store.grant(42, 10).unwrap(); assert_eq!(lease.id, 42); } #[test] fn test_lease_revoke() { let store = LeaseStore::new(); let lease = store.grant(0, 10).unwrap(); let id = lease.id; // Attach some keys store.attach_key(id, b"key1".to_vec()).unwrap(); store.attach_key(id, b"key2".to_vec()).unwrap(); let keys = store.revoke(id).unwrap(); assert_eq!(keys.len(), 2); // Lease should be gone assert!(store.get(id).is_none()); } #[test] fn test_lease_refresh() { let store = LeaseStore::new(); let lease = store.grant(0, 10).unwrap(); let id = lease.id; let ttl = store.refresh(id).unwrap(); assert_eq!(ttl, 10); } #[test] fn test_lease_list() { let store = LeaseStore::new(); store.grant(1, 10).unwrap(); store.grant(2, 10).unwrap(); store.grant(3, 10).unwrap(); let ids = store.list(); assert_eq!(ids.len(), 3); } #[test] fn test_lease_attach_detach() { let store = LeaseStore::new(); let lease = store.grant(0, 10).unwrap(); let id = lease.id; store.attach_key(id, b"key1".to_vec()).unwrap(); store.attach_key(id, b"key2".to_vec()).unwrap(); let lease = store.get(id).unwrap(); assert_eq!(lease.keys.len(), 2); store.detach_key(id, b"key1"); let lease = store.get(id).unwrap(); assert_eq!(lease.keys.len(), 1); } }