//! Local chunk storage use dashmap::DashMap; use std::collections::hash_map::DefaultHasher; use std::hash::{Hash, Hasher}; use std::path::PathBuf; use std::sync::atomic::{AtomicU64, Ordering}; use thiserror::Error; use tokio::fs; use tokio::io::AsyncWriteExt; use tokio::sync::Mutex; use tracing::debug; const WRITE_LOCK_STRIPES: usize = 256; /// Errors from chunk storage operations #[derive(Debug, Error)] pub enum StorageError { #[error("IO error: {0}")] Io(#[from] std::io::Error), #[error("Chunk not found: {0}")] NotFound(String), #[error("Storage capacity exceeded")] CapacityExceeded, } pub type StorageResult = Result; /// Local filesystem-based chunk storage pub struct LocalChunkStore { /// Data directory data_dir: PathBuf, /// In-memory index of chunk sizes for fast lookups chunk_sizes: DashMap, /// Actual chunk paths so reads/deletes avoid extra filesystem probes. chunk_paths: DashMap, /// Total bytes stored total_bytes: AtomicU64, /// Maximum capacity (0 = unlimited) max_capacity: u64, /// Number of chunks stored chunk_count: AtomicU64, /// Whether writes should be flushed before they are acknowledged. sync_on_write: bool, /// Monotonic nonce for per-write temporary paths. temp_file_nonce: AtomicU64, /// Striped per-chunk write/delete locks to keep same-key updates coherent. write_locks: Vec>, } impl LocalChunkStore { /// Create a new local chunk store pub async fn new( data_dir: PathBuf, max_capacity: u64, sync_on_write: bool, ) -> StorageResult { // Ensure data directory exists fs::create_dir_all(&data_dir).await?; let store = Self { data_dir, chunk_sizes: DashMap::new(), chunk_paths: DashMap::new(), total_bytes: AtomicU64::new(0), max_capacity, chunk_count: AtomicU64::new(0), sync_on_write, temp_file_nonce: AtomicU64::new(0), write_locks: (0..WRITE_LOCK_STRIPES).map(|_| Mutex::new(())).collect(), }; // Scan existing chunks store.scan_existing_chunks().await?; Ok(store) } /// Scan existing chunks in the data directory async fn scan_existing_chunks(&self) -> StorageResult<()> { let mut pending = vec![self.data_dir.clone()]; let mut total_bytes = 0u64; let mut chunk_count = 0u64; while let Some(dir) = pending.pop() { let mut entries = fs::read_dir(&dir).await?; while let Some(entry) = entries.next_entry().await? { let path = entry.path(); let metadata = entry.metadata().await?; if metadata.is_dir() { pending.push(path); continue; } if metadata.is_file() { if let Some(name) = path.file_name().and_then(|n| n.to_str()) { if name.ends_with(".tmp") || name.starts_with(".tmp.") { continue; } let size = metadata.len(); self.chunk_sizes.insert(name.to_string(), size); self.chunk_paths.insert(name.to_string(), path.clone()); total_bytes += size; chunk_count += 1; } } } } self.total_bytes.store(total_bytes, Ordering::SeqCst); self.chunk_count.store(chunk_count, Ordering::SeqCst); debug!( total_bytes, chunk_count, "Scanned existing chunks" ); Ok(()) } /// Get the path for a chunk fn chunk_path(&self, chunk_id: &str) -> PathBuf { // Sanitize chunk_id to be a valid filename let safe_id = chunk_id.replace(['/', '\\', ':', '*', '?', '"', '<', '>', '|'], "_"); let first = safe_id.get(0..2).unwrap_or("xx"); let second = safe_id.get(2..4).unwrap_or("yy"); self.data_dir.join(first).join(second).join(safe_id) } fn legacy_chunk_path(&self, chunk_id: &str) -> PathBuf { let safe_id = chunk_id.replace(['/', '\\', ':', '*', '?', '"', '<', '>', '|'], "_"); self.data_dir.join(safe_id) } fn temporary_chunk_path(&self, path: &std::path::Path) -> PathBuf { let nonce = self.temp_file_nonce.fetch_add(1, Ordering::Relaxed); let pid = std::process::id(); let file_name = path .file_name() .and_then(|name| name.to_str()) .unwrap_or("chunk"); path.parent() .unwrap_or(&self.data_dir) .join(format!(".tmp.{file_name}.{pid}.{nonce}")) } fn write_lock(&self, chunk_id: &str) -> &Mutex<()> { let mut hasher = DefaultHasher::new(); chunk_id.hash(&mut hasher); let slot = (hasher.finish() as usize) % self.write_locks.len().max(1); &self.write_locks[slot] } async fn resolve_existing_chunk_path(&self, chunk_id: &str) -> StorageResult { if let Some(path) = self.chunk_paths.get(chunk_id) { return Ok(path.clone()); } let path = self.chunk_path(chunk_id); if fs::try_exists(&path).await? { self.chunk_paths.insert(chunk_id.to_string(), path.clone()); return Ok(path); } let legacy_path = self.legacy_chunk_path(chunk_id); if fs::try_exists(&legacy_path).await? { self.chunk_paths .insert(chunk_id.to_string(), legacy_path.clone()); return Ok(legacy_path); } Err(StorageError::NotFound(chunk_id.to_string())) } /// Store a chunk pub async fn put(&self, chunk_id: &str, data: &[u8]) -> StorageResult { let _guard = self.write_lock(chunk_id).lock().await; let size = data.len() as u64; // Check if replacing existing chunk let old_size = self.chunk_sizes.get(chunk_id).map(|v| *v).unwrap_or(0); // Check capacity if self.max_capacity > 0 { let current = self.total_bytes.load(Ordering::SeqCst); let projected = current.saturating_sub(old_size).saturating_add(size); if projected > self.max_capacity { return Err(StorageError::CapacityExceeded); } } let path = self.chunk_path(chunk_id); let temp_path = self.temporary_chunk_path(&path); if let Some(parent) = path.parent() { // Multipart uploads fan out concurrent writes into the same shard // directory. Create the parent path unconditionally so no writer can // observe the directory as "prepared" before it actually exists. fs::create_dir_all(parent).await?; } // Write atomically so readers never see a partially-written chunk. let mut file = fs::File::create(&temp_path).await?; file.write_all(data).await?; if self.sync_on_write { file.sync_data().await?; } drop(file); fs::rename(&temp_path, &path).await?; // Update index self.chunk_sizes.insert(chunk_id.to_string(), size); self.chunk_paths.insert(chunk_id.to_string(), path.clone()); // Update totals if old_size > 0 { // Replacing existing chunk self.total_bytes.fetch_sub(old_size, Ordering::SeqCst); } else { // New chunk self.chunk_count.fetch_add(1, Ordering::SeqCst); } self.total_bytes.fetch_add(size, Ordering::SeqCst); debug!(chunk_id, size, "Stored chunk"); Ok(size) } /// Retrieve a chunk pub async fn get(&self, chunk_id: &str) -> StorageResult> { let path = self.resolve_existing_chunk_path(chunk_id).await?; let data = fs::read(&path).await?; debug!(chunk_id, size = data.len(), "Retrieved chunk"); Ok(data) } /// Delete a chunk pub async fn delete(&self, chunk_id: &str) -> StorageResult<()> { let _guard = self.write_lock(chunk_id).lock().await; if let Some((_, size)) = self.chunk_sizes.remove(chunk_id) { let path = match self.chunk_paths.remove(chunk_id) { Some((_, path)) => path, None => match self.resolve_existing_chunk_path(chunk_id).await { Ok(path) => path, Err(StorageError::NotFound(_)) => return Ok(()), Err(err) => return Err(err), }, }; if fs::try_exists(&path).await? { fs::remove_file(&path).await?; } self.total_bytes.fetch_sub(size, Ordering::SeqCst); self.chunk_count.fetch_sub(1, Ordering::SeqCst); debug!(chunk_id, "Deleted chunk"); } Ok(()) } /// Check if a chunk exists pub fn exists(&self, chunk_id: &str) -> bool { self.chunk_sizes.contains_key(chunk_id) } /// Get the size of a chunk pub fn size(&self, chunk_id: &str) -> Option { self.chunk_sizes.get(chunk_id).map(|v| *v) } /// Get total bytes stored pub fn total_bytes(&self) -> u64 { self.total_bytes.load(Ordering::SeqCst) } /// Get chunk count pub fn chunk_count(&self) -> u64 { self.chunk_count.load(Ordering::SeqCst) } /// Get maximum capacity pub fn max_capacity(&self) -> u64 { self.max_capacity } /// Get available capacity pub fn available_bytes(&self) -> u64 { if self.max_capacity == 0 { u64::MAX } else { self.max_capacity.saturating_sub(self.total_bytes()) } } } #[cfg(test)] mod tests { use super::*; use tempfile::TempDir; use std::sync::Arc; use tokio::sync::Barrier; async fn create_test_store() -> (LocalChunkStore, TempDir) { let temp_dir = TempDir::new().unwrap(); let store = LocalChunkStore::new(temp_dir.path().to_path_buf(), 0, false) .await .unwrap(); (store, temp_dir) } #[tokio::test] async fn test_put_get() { let (store, _temp) = create_test_store().await; let chunk_id = "test-chunk-1"; let data = vec![42u8; 1024]; let size = store.put(chunk_id, &data).await.unwrap(); assert_eq!(size, 1024); let retrieved = store.get(chunk_id).await.unwrap(); assert_eq!(retrieved, data); } #[tokio::test] async fn test_delete() { let (store, _temp) = create_test_store().await; let chunk_id = "test-chunk-2"; let data = vec![42u8; 512]; store.put(chunk_id, &data).await.unwrap(); assert!(store.exists(chunk_id)); store.delete(chunk_id).await.unwrap(); assert!(!store.exists(chunk_id)); let result = store.get(chunk_id).await; assert!(result.is_err()); } #[tokio::test] async fn test_size_tracking() { let (store, _temp) = create_test_store().await; assert_eq!(store.total_bytes(), 0); assert_eq!(store.chunk_count(), 0); store.put("chunk1", &vec![0u8; 100]).await.unwrap(); assert_eq!(store.total_bytes(), 100); assert_eq!(store.chunk_count(), 1); store.put("chunk2", &vec![0u8; 200]).await.unwrap(); assert_eq!(store.total_bytes(), 300); assert_eq!(store.chunk_count(), 2); store.delete("chunk1").await.unwrap(); assert_eq!(store.total_bytes(), 200); assert_eq!(store.chunk_count(), 1); } #[tokio::test] async fn test_capacity_limit() { let temp_dir = TempDir::new().unwrap(); let store = LocalChunkStore::new(temp_dir.path().to_path_buf(), 1000, false) .await .unwrap(); // Should succeed store.put("chunk1", &vec![0u8; 500]).await.unwrap(); // Should fail - would exceed capacity let result = store.put("chunk2", &vec![0u8; 600]).await; assert!(matches!(result, Err(StorageError::CapacityExceeded))); // Should succeed - within remaining capacity store.put("chunk2", &vec![0u8; 400]).await.unwrap(); } #[tokio::test] async fn test_replace_chunk() { let (store, _temp) = create_test_store().await; let chunk_id = "test-chunk"; store.put(chunk_id, &vec![0u8; 100]).await.unwrap(); assert_eq!(store.total_bytes(), 100); assert_eq!(store.chunk_count(), 1); // Replace with larger data store.put(chunk_id, &vec![0u8; 200]).await.unwrap(); assert_eq!(store.total_bytes(), 200); assert_eq!(store.chunk_count(), 1); // Still 1 chunk // Replace with smaller data store.put(chunk_id, &vec![0u8; 50]).await.unwrap(); assert_eq!(store.total_bytes(), 50); assert_eq!(store.chunk_count(), 1); } #[tokio::test] async fn test_scan_preserves_chunk_path_cache() { let temp_dir = TempDir::new().unwrap(); let nested_path = temp_dir.path().join("ab").join("cd").join("abcd-test"); fs::create_dir_all(nested_path.parent().unwrap()).await.unwrap(); fs::write(&nested_path, vec![7u8; 128]).await.unwrap(); let store = LocalChunkStore::new(temp_dir.path().to_path_buf(), 0, false) .await .unwrap(); let resolved = store.resolve_existing_chunk_path("abcd-test").await.unwrap(); assert_eq!(resolved, nested_path); assert_eq!(store.get("abcd-test").await.unwrap(), vec![7u8; 128]); } #[tokio::test] async fn test_concurrent_puts_materialize_shard_directory_once_ready() { let (store, _temp) = create_test_store().await; let store = Arc::new(store); let barrier = Arc::new(Barrier::new(17)); let mut tasks = Vec::new(); for idx in 0..16u8 { let store = Arc::clone(&store); let barrier = Arc::clone(&barrier); tasks.push(tokio::spawn(async move { let chunk_id = format!("abcd-chunk-{idx}"); let data = vec![idx; 4096]; barrier.wait().await; store.put(&chunk_id, &data).await.unwrap(); (chunk_id, data) })); } barrier.wait().await; for task in tasks { let (chunk_id, data) = task.await.unwrap(); assert_eq!(store.get(&chunk_id).await.unwrap(), data); } assert_eq!(store.chunk_count(), 16); } #[tokio::test] async fn test_concurrent_rewrites_same_chunk_use_unique_temp_paths() { let (store, _temp) = create_test_store().await; let store = Arc::new(store); let barrier = Arc::new(Barrier::new(9)); let mut tasks = Vec::new(); for idx in 0..8u8 { let store = Arc::clone(&store); let barrier = Arc::clone(&barrier); tasks.push(tokio::spawn(async move { let payload = vec![idx; 2048]; barrier.wait().await; store.put("shared-chunk", &payload).await.unwrap(); payload })); } barrier.wait().await; let mut expected_payloads = Vec::new(); for task in tasks { expected_payloads.push(task.await.unwrap()); } let stored = store.get("shared-chunk").await.unwrap(); assert!(expected_payloads.iter().any(|payload| payload == &stored)); assert_eq!(store.chunk_count(), 1); } }