Includes all pending changes needed for nixos-anywhere: - fiberlb: L7 policy, rule, certificate types - deployer: New service for cluster management - nix-nos: Generic network modules - Various service updates and fixes 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
280 lines
8 KiB
Rust
280 lines
8 KiB
Rust
//! Lease storage for TTL-based key expiration
|
|
//!
|
|
//! Manages lease lifecycle: grant, revoke, refresh, expiration.
|
|
|
|
use chainfire_types::error::StorageError;
|
|
use chainfire_types::lease::{Lease, LeaseData, LeaseId};
|
|
use dashmap::DashMap;
|
|
use std::sync::atomic::{AtomicI64, Ordering};
|
|
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
use tokio::sync::mpsc;
|
|
use tracing::{debug, info};
|
|
|
|
/// Store for managing leases
|
|
pub struct LeaseStore {
|
|
/// Active leases: lease_id -> Lease
|
|
leases: DashMap<LeaseId, Lease>,
|
|
/// ID generator for new leases
|
|
next_id: AtomicI64,
|
|
/// Channel to notify of expired leases (lease_id, keys_to_delete)
|
|
expiration_tx: Option<mpsc::UnboundedSender<(LeaseId, Vec<Vec<u8>>)>>,
|
|
}
|
|
|
|
impl LeaseStore {
|
|
/// Create a new lease store
|
|
pub fn new() -> Self {
|
|
Self {
|
|
leases: DashMap::new(),
|
|
next_id: AtomicI64::new(1),
|
|
expiration_tx: None,
|
|
}
|
|
}
|
|
|
|
/// Set the expiration notification channel
|
|
pub fn set_expiration_sender(&mut self, tx: mpsc::UnboundedSender<(LeaseId, Vec<Vec<u8>>)>) {
|
|
self.expiration_tx = Some(tx);
|
|
}
|
|
|
|
/// Grant a new lease
|
|
pub fn grant(&self, id: LeaseId, ttl: i64) -> Result<Lease, StorageError> {
|
|
let lease_id = if id == 0 {
|
|
self.next_id.fetch_add(1, Ordering::SeqCst)
|
|
} else {
|
|
// Check if ID is already in use
|
|
if self.leases.contains_key(&id) {
|
|
return Err(StorageError::LeaseError(format!("Lease {} already exists", id)));
|
|
}
|
|
// Update next_id if necessary
|
|
let _ = self.next_id.fetch_max(id + 1, Ordering::SeqCst);
|
|
id
|
|
};
|
|
|
|
let lease = Lease::new(lease_id, ttl);
|
|
self.leases.insert(lease_id, lease.clone());
|
|
|
|
debug!(lease_id, ttl, "Lease granted");
|
|
Ok(lease)
|
|
}
|
|
|
|
/// Revoke a lease and return keys to delete
|
|
pub fn revoke(&self, id: LeaseId) -> Result<Vec<Vec<u8>>, StorageError> {
|
|
match self.leases.remove(&id) {
|
|
Some((_, lease)) => {
|
|
info!(lease_id = id, keys_count = lease.keys.len(), "Lease revoked");
|
|
Ok(lease.keys)
|
|
}
|
|
None => Err(StorageError::LeaseError(format!("Lease {} not found", id))),
|
|
}
|
|
}
|
|
|
|
/// Refresh a lease (keep-alive)
|
|
pub fn refresh(&self, id: LeaseId) -> Result<i64, StorageError> {
|
|
match self.leases.get_mut(&id) {
|
|
Some(mut lease) => {
|
|
lease.refresh();
|
|
let ttl = lease.ttl;
|
|
debug!(lease_id = id, ttl, "Lease refreshed");
|
|
Ok(ttl)
|
|
}
|
|
None => Err(StorageError::LeaseError(format!("Lease {} not found", id))),
|
|
}
|
|
}
|
|
|
|
/// Get a lease by ID
|
|
pub fn get(&self, id: LeaseId) -> Option<Lease> {
|
|
self.leases.get(&id).map(|l| l.clone())
|
|
}
|
|
|
|
/// Get remaining TTL for a lease
|
|
pub fn time_to_live(&self, id: LeaseId) -> Option<(i64, i64, Vec<Vec<u8>>)> {
|
|
self.leases.get(&id).map(|lease| {
|
|
(lease.remaining(), lease.ttl, lease.keys.clone())
|
|
})
|
|
}
|
|
|
|
/// List all lease IDs
|
|
pub fn list(&self) -> Vec<LeaseId> {
|
|
self.leases.iter().map(|entry| *entry.key()).collect()
|
|
}
|
|
|
|
/// Attach a key to a lease
|
|
pub fn attach_key(&self, lease_id: LeaseId, key: Vec<u8>) -> Result<(), StorageError> {
|
|
match self.leases.get_mut(&lease_id) {
|
|
Some(mut lease) => {
|
|
lease.attach_key(key);
|
|
Ok(())
|
|
}
|
|
None => Err(StorageError::LeaseError(format!("Lease {} not found", lease_id))),
|
|
}
|
|
}
|
|
|
|
/// Detach a key from a lease
|
|
pub fn detach_key(&self, lease_id: LeaseId, key: &[u8]) {
|
|
if let Some(mut lease) = self.leases.get_mut(&lease_id) {
|
|
lease.detach_key(key);
|
|
}
|
|
}
|
|
|
|
/// Check for expired leases and return their IDs and keys
|
|
pub fn collect_expired(&self) -> Vec<(LeaseId, Vec<Vec<u8>>)> {
|
|
let mut expired = Vec::new();
|
|
|
|
for entry in self.leases.iter() {
|
|
if entry.is_expired() {
|
|
expired.push((*entry.key(), entry.keys.clone()));
|
|
}
|
|
}
|
|
|
|
// Remove expired leases
|
|
for (id, _) in &expired {
|
|
self.leases.remove(id);
|
|
}
|
|
|
|
expired
|
|
}
|
|
|
|
/// Export all leases for snapshot
|
|
pub fn export(&self) -> Vec<LeaseData> {
|
|
self.leases
|
|
.iter()
|
|
.map(|entry| LeaseData::from_lease(&entry))
|
|
.collect()
|
|
}
|
|
|
|
/// Import leases from snapshot
|
|
pub fn import(&self, leases: Vec<LeaseData>) {
|
|
self.leases.clear();
|
|
for data in leases {
|
|
let id = data.id;
|
|
let lease = data.to_lease();
|
|
self.leases.insert(id, lease);
|
|
// Update next_id
|
|
let _ = self.next_id.fetch_max(id + 1, Ordering::SeqCst);
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Default for LeaseStore {
|
|
fn default() -> Self {
|
|
Self::new()
|
|
}
|
|
}
|
|
|
|
/// Background worker that checks for expired leases
|
|
pub struct LeaseExpirationWorker {
|
|
store: Arc<LeaseStore>,
|
|
interval: Duration,
|
|
shutdown_rx: mpsc::Receiver<()>,
|
|
}
|
|
|
|
impl LeaseExpirationWorker {
|
|
/// Create a new expiration worker
|
|
pub fn new(
|
|
store: Arc<LeaseStore>,
|
|
interval: Duration,
|
|
shutdown_rx: mpsc::Receiver<()>,
|
|
) -> Self {
|
|
Self {
|
|
store,
|
|
interval,
|
|
shutdown_rx,
|
|
}
|
|
}
|
|
|
|
/// Run the expiration worker
|
|
pub async fn run(mut self, expire_callback: impl Fn(LeaseId, Vec<Vec<u8>>) + Send + 'static) {
|
|
let mut interval = tokio::time::interval(self.interval);
|
|
|
|
loop {
|
|
tokio::select! {
|
|
_ = interval.tick() => {
|
|
let expired = self.store.collect_expired();
|
|
for (lease_id, keys) in expired {
|
|
info!(lease_id, keys_count = keys.len(), "Lease expired");
|
|
expire_callback(lease_id, keys);
|
|
}
|
|
}
|
|
_ = self.shutdown_rx.recv() => {
|
|
info!("Lease expiration worker shutting down");
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
|
|
#[test]
|
|
fn test_lease_grant() {
|
|
let store = LeaseStore::new();
|
|
let lease = store.grant(0, 10).unwrap();
|
|
assert!(lease.id > 0);
|
|
assert_eq!(lease.ttl, 10);
|
|
}
|
|
|
|
#[test]
|
|
fn test_lease_grant_with_id() {
|
|
let store = LeaseStore::new();
|
|
let lease = store.grant(42, 10).unwrap();
|
|
assert_eq!(lease.id, 42);
|
|
}
|
|
|
|
#[test]
|
|
fn test_lease_revoke() {
|
|
let store = LeaseStore::new();
|
|
let lease = store.grant(0, 10).unwrap();
|
|
let id = lease.id;
|
|
|
|
// Attach some keys
|
|
store.attach_key(id, b"key1".to_vec()).unwrap();
|
|
store.attach_key(id, b"key2".to_vec()).unwrap();
|
|
|
|
let keys = store.revoke(id).unwrap();
|
|
assert_eq!(keys.len(), 2);
|
|
|
|
// Lease should be gone
|
|
assert!(store.get(id).is_none());
|
|
}
|
|
|
|
#[test]
|
|
fn test_lease_refresh() {
|
|
let store = LeaseStore::new();
|
|
let lease = store.grant(0, 10).unwrap();
|
|
let id = lease.id;
|
|
|
|
let ttl = store.refresh(id).unwrap();
|
|
assert_eq!(ttl, 10);
|
|
}
|
|
|
|
#[test]
|
|
fn test_lease_list() {
|
|
let store = LeaseStore::new();
|
|
store.grant(1, 10).unwrap();
|
|
store.grant(2, 10).unwrap();
|
|
store.grant(3, 10).unwrap();
|
|
|
|
let ids = store.list();
|
|
assert_eq!(ids.len(), 3);
|
|
}
|
|
|
|
#[test]
|
|
fn test_lease_attach_detach() {
|
|
let store = LeaseStore::new();
|
|
let lease = store.grant(0, 10).unwrap();
|
|
let id = lease.id;
|
|
|
|
store.attach_key(id, b"key1".to_vec()).unwrap();
|
|
store.attach_key(id, b"key2".to_vec()).unwrap();
|
|
|
|
let lease = store.get(id).unwrap();
|
|
assert_eq!(lease.keys.len(), 2);
|
|
|
|
store.detach_key(id, b"key1");
|
|
let lease = store.get(id).unwrap();
|
|
assert_eq!(lease.keys.len(), 1);
|
|
}
|
|
}
|