//! Metadata storage using FlareDB, PostgreSQL, or SQLite. use dashmap::DashMap; use flaredb_client::RdbClient; use lightningstor_distributed::ReplicatedRepairTask; use lightningstor_types::{Bucket, BucketId, MultipartUpload, Object, ObjectId, Result}; use serde_json; use sqlx::pool::PoolOptions; use sqlx::{Pool, Postgres, Sqlite}; use std::collections::hash_map::DefaultHasher; use std::hash::{Hash, Hasher}; use std::str::FromStr; use std::sync::Arc; use tokio::sync::Mutex; use tonic::Code; /// Storage backend enum enum StorageBackend { FlareDB(Vec>>), Sql(SqlStorageBackend), InMemory(Arc>), } enum SqlStorageBackend { Postgres(Arc>), Sqlite(Arc>), } const FLAREDB_CLIENT_POOL_SIZE: usize = 8; /// Metadata store for buckets and objects pub struct MetadataStore { backend: StorageBackend, bucket_cache: Arc>, object_cache: Arc>, } impl MetadataStore { fn flaredb_requires_strong(status: &tonic::Status) -> bool { status.code() == Code::FailedPrecondition && status.message().contains("not eventual") } /// Create a new metadata store with FlareDB backend pub async fn new(endpoint: Option) -> Result { Self::new_flaredb(endpoint).await } /// Create a new metadata store with FlareDB backend pub async fn new_flaredb(endpoint: Option) -> Result { Self::new_flaredb_with_pd(endpoint, None).await } /// Create a new metadata store with FlareDB backend and explicit PD address. pub async fn new_flaredb_with_pd( endpoint: Option, pd_endpoint: Option, ) -> Result { let endpoint = endpoint.unwrap_or_else(|| "127.0.0.1:2479".to_string()); let pd_endpoint = pd_endpoint .map(|value| normalize_transport_addr(&value)) .unwrap_or_else(|| endpoint.clone()); let mut clients = Vec::with_capacity(FLAREDB_CLIENT_POOL_SIZE); for _ in 0..FLAREDB_CLIENT_POOL_SIZE { let client = RdbClient::connect_with_pd_namespace( endpoint.clone(), pd_endpoint.clone(), "lightningstor", ) .await .map_err(|e| { lightningstor_types::Error::StorageError(format!( "Failed to connect to FlareDB: {}", e )) })?; clients.push(Arc::new(Mutex::new(client))); } Ok(Self { backend: StorageBackend::FlareDB(clients), bucket_cache: Arc::new(DashMap::new()), object_cache: Arc::new(DashMap::new()), }) } /// Create a metadata store backed by PostgreSQL or SQLite. pub async fn new_sql(database_url: &str, single_node: bool) -> Result { let url = database_url.trim(); if url.is_empty() { return Err(lightningstor_types::Error::StorageError( "metadata database URL is empty".to_string(), )); } if Self::is_postgres_url(url) { let pool = PoolOptions::::new() .max_connections(10) .connect(url) .await .map_err(|e| { lightningstor_types::Error::StorageError(format!( "Failed to connect to Postgres: {}", e )) })?; Self::ensure_sql_schema_postgres(&pool).await?; return Ok(Self { backend: StorageBackend::Sql(SqlStorageBackend::Postgres(Arc::new(pool))), bucket_cache: Arc::new(DashMap::new()), object_cache: Arc::new(DashMap::new()), }); } if Self::is_sqlite_url(url) { if !single_node { return Err(lightningstor_types::Error::StorageError( "SQLite is allowed only in single-node mode".to_string(), )); } if url.contains(":memory:") { return Err(lightningstor_types::Error::StorageError( "In-memory SQLite is not allowed".to_string(), )); } let pool = PoolOptions::::new() .max_connections(1) .connect(url) .await .map_err(|e| { lightningstor_types::Error::StorageError(format!( "Failed to connect to SQLite: {}", e )) })?; Self::ensure_sql_schema_sqlite(&pool).await?; return Ok(Self { backend: StorageBackend::Sql(SqlStorageBackend::Sqlite(Arc::new(pool))), bucket_cache: Arc::new(DashMap::new()), object_cache: Arc::new(DashMap::new()), }); } Err(lightningstor_types::Error::StorageError( "Unsupported metadata database URL (use postgres://, postgresql://, or sqlite:)" .to_string(), )) } /// Create a new in-memory metadata store (for testing) pub fn new_in_memory() -> Self { Self { backend: StorageBackend::InMemory(Arc::new(DashMap::new())), bucket_cache: Arc::new(DashMap::new()), object_cache: Arc::new(DashMap::new()), } } fn is_postgres_url(url: &str) -> bool { url.starts_with("postgres://") || url.starts_with("postgresql://") } fn is_sqlite_url(url: &str) -> bool { url.starts_with("sqlite:") } async fn ensure_sql_schema_postgres(pool: &Pool) -> Result<()> { sqlx::query( "CREATE TABLE IF NOT EXISTS metadata_kv ( key TEXT PRIMARY KEY, value TEXT NOT NULL )", ) .execute(pool) .await .map_err(|e| { lightningstor_types::Error::StorageError(format!( "Failed to initialize Postgres schema: {}", e )) })?; Ok(()) } async fn ensure_sql_schema_sqlite(pool: &Pool) -> Result<()> { sqlx::query( "CREATE TABLE IF NOT EXISTS metadata_kv ( key TEXT PRIMARY KEY, value TEXT NOT NULL )", ) .execute(pool) .await .map_err(|e| { lightningstor_types::Error::StorageError(format!( "Failed to initialize SQLite schema: {}", e )) })?; Ok(()) } fn prefix_end(prefix: &[u8]) -> Vec { let mut end_key = prefix.to_vec(); if let Some(last) = end_key.last_mut() { if *last == 0xff { end_key.push(0x00); } else { *last += 1; } } else { end_key.push(0xff); } end_key } fn exclusive_scan_start(key: &[u8]) -> Vec { let mut next = key.to_vec(); next.push(0); next } fn flaredb_client_for_key<'a>( clients: &'a [Arc>], key: &[u8], ) -> &'a Arc> { let mut hasher = DefaultHasher::new(); key.hash(&mut hasher); let index = (hasher.finish() as usize) % clients.len().max(1); &clients[index] } fn flaredb_scan_client(clients: &[Arc>]) -> &Arc> { &clients[0] } async fn flaredb_put_strong( client: &Arc>, key: &[u8], value: &[u8], ) -> Result<()> { const MAX_RETRIES: usize = 8; for _ in 0..MAX_RETRIES { let mut c = client.lock().await; let expected_version = c .cas_get(key.to_vec()) .await .map_err(|e| { lightningstor_types::Error::StorageError(format!( "FlareDB CAS get failed: {}", e )) })? .map(|(version, _)| version) .unwrap_or(0); let (success, _current_version, _new_version) = c .cas(key.to_vec(), value.to_vec(), expected_version) .await .map_err(|e| { lightningstor_types::Error::StorageError(format!( "FlareDB CAS put failed: {}", e )) })?; if success { return Ok(()); } } Err(lightningstor_types::Error::StorageError( "FlareDB CAS put exhausted retries".to_string(), )) } async fn flaredb_get_strong( client: &Arc>, key: &[u8], ) -> Result> { let mut c = client.lock().await; let result = c.cas_get(key.to_vec()).await.map_err(|e| { lightningstor_types::Error::StorageError(format!("FlareDB CAS get failed: {}", e)) })?; Ok(result.map(|(_version, bytes)| String::from_utf8_lossy(&bytes).to_string())) } async fn flaredb_delete_strong(client: &Arc>, key: &[u8]) -> Result<()> { let mut c = client.lock().await; c.cas_delete(key.to_vec(), 0).await.map_err(|e| { lightningstor_types::Error::StorageError(format!("FlareDB CAS delete failed: {}", e)) })?; Ok(()) } async fn flaredb_scan_strong( client: &Arc>, start_key: &[u8], end_key: &[u8], limit: u32, ) -> Result<(Vec<(String, String)>, Option>)> { let mut c = client.lock().await; let (entries, next) = c .cas_scan(start_key.to_vec(), end_key.to_vec(), limit) .await .map_err(|e| { lightningstor_types::Error::StorageError(format!("FlareDB CAS scan failed: {}", e)) })?; let results = entries .into_iter() .map(|(key, value, _version)| { ( String::from_utf8_lossy(&key).to_string(), String::from_utf8_lossy(&value).to_string(), ) }) .collect(); Ok((results, next)) } async fn flaredb_put( clients: &[Arc>], key: &[u8], value: &[u8], ) -> Result<()> { let client = Self::flaredb_client_for_key(clients, key); let raw_result = { let mut c = client.lock().await; c.raw_put(key.to_vec(), value.to_vec()).await }; match raw_result { Ok(()) => Ok(()), Err(status) if Self::flaredb_requires_strong(&status) => { Self::flaredb_put_strong(client, key, value).await } Err(error) => Err(lightningstor_types::Error::StorageError(format!( "FlareDB put failed: {}", error ))), } } async fn flaredb_get(clients: &[Arc>], key: &[u8]) -> Result> { let client = Self::flaredb_client_for_key(clients, key); let raw_result = { let mut c = client.lock().await; c.raw_get(key.to_vec()).await }; match raw_result { Ok(result) => Ok(result.map(|bytes| String::from_utf8_lossy(&bytes).to_string())), Err(status) if Self::flaredb_requires_strong(&status) => { Self::flaredb_get_strong(client, key).await } Err(error) => Err(lightningstor_types::Error::StorageError(format!( "FlareDB get failed: {}", error ))), } } async fn flaredb_delete(clients: &[Arc>], key: &[u8]) -> Result<()> { let client = Self::flaredb_client_for_key(clients, key); let raw_result = { let mut c = client.lock().await; c.raw_delete(key.to_vec()).await }; match raw_result { Ok(_) => Ok(()), Err(status) if Self::flaredb_requires_strong(&status) => { Self::flaredb_delete_strong(client, key).await } Err(error) => Err(lightningstor_types::Error::StorageError(format!( "FlareDB delete failed: {}", error ))), } } async fn flaredb_scan( clients: &[Arc>], prefix: &[u8], limit: u32, ) -> Result> { let end_key = Self::prefix_end(prefix); let mut results = Vec::new(); let mut start_key = prefix.to_vec(); loop { let client = Self::flaredb_scan_client(clients); let (items, next) = match { let mut c = client.lock().await; c.raw_scan(start_key.clone(), end_key.clone(), limit).await } { Ok((keys, values, next)) => { let items = keys .into_iter() .zip(values.into_iter()) .map(|(key, value)| { ( String::from_utf8_lossy(&key).to_string(), String::from_utf8_lossy(&value).to_string(), ) }) .collect(); (items, next) } Err(status) if Self::flaredb_requires_strong(&status) => { Self::flaredb_scan_strong(client, &start_key, &end_key, limit).await? } Err(error) => { return Err(lightningstor_types::Error::StorageError(format!( "FlareDB scan failed: {}", error ))); } }; results.extend(items); if let Some(next_key) = next { start_key = next_key; } else { break; } } Ok(results) } async fn flaredb_scan_page( clients: &[Arc>], prefix: &[u8], start_after: Option<&[u8]>, limit: u32, ) -> Result<(Vec<(String, String)>, bool)> { let end_key = Self::prefix_end(prefix); let start_key = start_after .map(Self::exclusive_scan_start) .unwrap_or_else(|| prefix.to_vec()); let fetch_limit = limit.saturating_add(1).max(1); let client = Self::flaredb_scan_client(clients); let (mut items, next) = match { let mut c = client.lock().await; c.raw_scan(start_key.clone(), end_key.clone(), fetch_limit) .await } { Ok((keys, values, next)) => { let items = keys .into_iter() .zip(values.into_iter()) .map(|(key, value)| { ( String::from_utf8_lossy(&key).to_string(), String::from_utf8_lossy(&value).to_string(), ) }) .collect::>(); (items, next) } Err(status) if Self::flaredb_requires_strong(&status) => { Self::flaredb_scan_strong(client, &start_key, &end_key, fetch_limit).await? } Err(error) => { return Err(lightningstor_types::Error::StorageError(format!( "FlareDB scan failed: {}", error ))); } }; let has_more = if items.len() > limit as usize { items.truncate(limit as usize); true } else { next.is_some() }; Ok((items, has_more)) } async fn flaredb_has_prefix(clients: &[Arc>], prefix: &[u8]) -> Result { let end_key = Self::prefix_end(prefix); let client = Self::flaredb_scan_client(clients); match { let mut c = client.lock().await; c.raw_scan(prefix.to_vec(), end_key.clone(), 1).await } { Ok((keys, _, _)) => Ok(!keys.is_empty()), Err(status) if Self::flaredb_requires_strong(&status) => { let (entries, _) = Self::flaredb_scan_strong(client, prefix, &end_key, 1).await?; Ok(!entries.is_empty()) } Err(error) => Err(lightningstor_types::Error::StorageError(format!( "FlareDB scan failed: {}", error ))), } } /// Internal: put a key-value pair async fn put(&self, key: &str, value: &str) -> Result<()> { match &self.backend { StorageBackend::FlareDB(client) => { Self::flaredb_put(client, key.as_bytes(), value.as_bytes()).await?; } StorageBackend::Sql(sql) => match sql { SqlStorageBackend::Postgres(pool) => { sqlx::query( "INSERT INTO metadata_kv (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value", ) .bind(key) .bind(value) .execute(pool.as_ref()) .await .map_err(|e| { lightningstor_types::Error::StorageError(format!( "Postgres put failed: {}", e )) })?; } SqlStorageBackend::Sqlite(pool) => { sqlx::query( "INSERT INTO metadata_kv (key, value) VALUES (?1, ?2) ON CONFLICT(key) DO UPDATE SET value = excluded.value", ) .bind(key) .bind(value) .execute(pool.as_ref()) .await .map_err(|e| { lightningstor_types::Error::StorageError(format!( "SQLite put failed: {}", e )) })?; } }, StorageBackend::InMemory(map) => { map.insert(key.to_string(), value.to_string()); } } Ok(()) } /// Internal: get a value by key async fn get(&self, key: &str) -> Result> { match &self.backend { StorageBackend::FlareDB(client) => Self::flaredb_get(client, key.as_bytes()).await, StorageBackend::Sql(sql) => match sql { SqlStorageBackend::Postgres(pool) => { let value: Option = sqlx::query_scalar("SELECT value FROM metadata_kv WHERE key = $1") .bind(key) .fetch_optional(pool.as_ref()) .await .map_err(|e| { lightningstor_types::Error::StorageError(format!( "Postgres get failed: {}", e )) })?; Ok(value) } SqlStorageBackend::Sqlite(pool) => { let value: Option = sqlx::query_scalar("SELECT value FROM metadata_kv WHERE key = ?1") .bind(key) .fetch_optional(pool.as_ref()) .await .map_err(|e| { lightningstor_types::Error::StorageError(format!( "SQLite get failed: {}", e )) })?; Ok(value) } }, StorageBackend::InMemory(map) => Ok(map.get(key).map(|v| v.value().clone())), } } /// Internal: delete a key async fn delete_key(&self, key: &str) -> Result<()> { match &self.backend { StorageBackend::FlareDB(client) => Self::flaredb_delete(client, key.as_bytes()).await?, StorageBackend::Sql(sql) => match sql { SqlStorageBackend::Postgres(pool) => { sqlx::query("DELETE FROM metadata_kv WHERE key = $1") .bind(key) .execute(pool.as_ref()) .await .map_err(|e| { lightningstor_types::Error::StorageError(format!( "Postgres delete failed: {}", e )) })?; } SqlStorageBackend::Sqlite(pool) => { sqlx::query("DELETE FROM metadata_kv WHERE key = ?1") .bind(key) .execute(pool.as_ref()) .await .map_err(|e| { lightningstor_types::Error::StorageError(format!( "SQLite delete failed: {}", e )) })?; } }, StorageBackend::InMemory(map) => { map.remove(key); } } Ok(()) } /// Internal: get all keys with a prefix async fn get_prefix(&self, prefix: &str) -> Result> { match &self.backend { StorageBackend::FlareDB(client) => { Self::flaredb_scan(client, prefix.as_bytes(), 1000).await } StorageBackend::Sql(sql) => { let like_pattern = format!("{}%", prefix); match sql { SqlStorageBackend::Postgres(pool) => { let rows: Vec<(String, String)> = sqlx::query_as( "SELECT key, value FROM metadata_kv WHERE key LIKE $1 ORDER BY key", ) .bind(like_pattern) .fetch_all(pool.as_ref()) .await .map_err(|e| { lightningstor_types::Error::StorageError(format!( "Postgres scan failed: {}", e )) })?; Ok(rows) } SqlStorageBackend::Sqlite(pool) => { let rows: Vec<(String, String)> = sqlx::query_as( "SELECT key, value FROM metadata_kv WHERE key LIKE ?1 ORDER BY key", ) .bind(like_pattern) .fetch_all(pool.as_ref()) .await .map_err(|e| { lightningstor_types::Error::StorageError(format!( "SQLite scan failed: {}", e )) })?; Ok(rows) } } } StorageBackend::InMemory(map) => { let mut results = Vec::new(); for entry in map.iter() { if entry.key().starts_with(prefix) { results.push((entry.key().clone(), entry.value().clone())); } } results.sort_by(|lhs, rhs| lhs.0.cmp(&rhs.0)); Ok(results) } } } async fn get_prefix_page( &self, prefix: &str, start_after: Option<&str>, limit: u32, ) -> Result<(Vec<(String, String)>, bool)> { if limit == 0 { return Ok((Vec::new(), false)); } match &self.backend { StorageBackend::FlareDB(client) => { Self::flaredb_scan_page( client, prefix.as_bytes(), start_after.map(str::as_bytes), limit, ) .await } StorageBackend::Sql(sql) => { let prefix_end = String::from_utf8(Self::prefix_end(prefix.as_bytes())).map_err(|e| { lightningstor_types::Error::StorageError(format!( "Failed to encode prefix end: {}", e )) })?; let fetch_limit = (limit.saturating_add(1)) as i64; match sql { SqlStorageBackend::Postgres(pool) => { let rows: Vec<(String, String)> = if let Some(after) = start_after { sqlx::query_as( "SELECT key, value FROM metadata_kv WHERE key >= $1 AND key < $2 AND key > $3 ORDER BY key LIMIT $4", ) .bind(prefix) .bind(&prefix_end) .bind(after) .bind(fetch_limit) .fetch_all(pool.as_ref()) .await .map_err(|e| { lightningstor_types::Error::StorageError(format!( "Postgres paged scan failed: {}", e )) })? } else { sqlx::query_as( "SELECT key, value FROM metadata_kv WHERE key >= $1 AND key < $2 ORDER BY key LIMIT $3", ) .bind(prefix) .bind(&prefix_end) .bind(fetch_limit) .fetch_all(pool.as_ref()) .await .map_err(|e| { lightningstor_types::Error::StorageError(format!( "Postgres paged scan failed: {}", e )) })? }; let has_more = rows.len() > limit as usize; let items = rows.into_iter().take(limit as usize).collect(); Ok((items, has_more)) } SqlStorageBackend::Sqlite(pool) => { let rows: Vec<(String, String)> = if let Some(after) = start_after { sqlx::query_as( "SELECT key, value FROM metadata_kv WHERE key >= ?1 AND key < ?2 AND key > ?3 ORDER BY key LIMIT ?4", ) .bind(prefix) .bind(&prefix_end) .bind(after) .bind(fetch_limit) .fetch_all(pool.as_ref()) .await .map_err(|e| { lightningstor_types::Error::StorageError(format!( "SQLite paged scan failed: {}", e )) })? } else { sqlx::query_as( "SELECT key, value FROM metadata_kv WHERE key >= ?1 AND key < ?2 ORDER BY key LIMIT ?3", ) .bind(prefix) .bind(&prefix_end) .bind(fetch_limit) .fetch_all(pool.as_ref()) .await .map_err(|e| { lightningstor_types::Error::StorageError(format!( "SQLite paged scan failed: {}", e )) })? }; let has_more = rows.len() > limit as usize; let items = rows.into_iter().take(limit as usize).collect(); Ok((items, has_more)) } } } StorageBackend::InMemory(map) => { let mut rows: Vec<(String, String)> = map .iter() .filter(|entry| entry.key().starts_with(prefix)) .map(|entry| (entry.key().clone(), entry.value().clone())) .collect(); rows.sort_by(|lhs, rhs| lhs.0.cmp(&rhs.0)); if let Some(after) = start_after { rows.retain(|(key, _)| key.as_str() > after); } let has_more = rows.len() > limit as usize; let items = rows.into_iter().take(limit as usize).collect(); Ok((items, has_more)) } } } /// Internal: check if any key exists with a prefix async fn has_prefix(&self, prefix: &str) -> Result { match &self.backend { StorageBackend::FlareDB(client) => { Self::flaredb_has_prefix(client, prefix.as_bytes()).await } StorageBackend::Sql(sql) => { let like_pattern = format!("{}%", prefix); match sql { SqlStorageBackend::Postgres(pool) => { let found: Option = sqlx::query_scalar( "SELECT key FROM metadata_kv WHERE key LIKE $1 LIMIT 1", ) .bind(like_pattern) .fetch_optional(pool.as_ref()) .await .map_err(|e| { lightningstor_types::Error::StorageError(format!( "Postgres scan failed: {}", e )) })?; Ok(found.is_some()) } SqlStorageBackend::Sqlite(pool) => { let found: Option = sqlx::query_scalar( "SELECT key FROM metadata_kv WHERE key LIKE ?1 LIMIT 1", ) .bind(like_pattern) .fetch_optional(pool.as_ref()) .await .map_err(|e| { lightningstor_types::Error::StorageError(format!( "SQLite scan failed: {}", e )) })?; Ok(found.is_some()) } } } StorageBackend::InMemory(map) => { for entry in map.iter() { if entry.key().starts_with(prefix) { return Ok(true); } } Ok(false) } } } /// Build bucket key fn bucket_key(org_id: &str, project_id: &str, bucket_name: &str) -> String { format!( "/lightningstor/buckets/{}/{}/{}", org_id, project_id, bucket_name ) } /// Build bucket ID key fn bucket_id_key(bucket_id: &BucketId) -> String { format!("/lightningstor/bucket_ids/{}", bucket_id) } /// Build object key fn object_key(bucket_id: &BucketId, object_key: &str, version_id: Option<&str>) -> String { if let Some(version_id) = version_id { format!( "/lightningstor/objects/{}/{}/{}", bucket_id, object_key, version_id ) } else { format!("/lightningstor/objects/{}/{}", bucket_id, object_key) } } /// Build object prefix for listing fn object_prefix(bucket_id: &BucketId, prefix: &str) -> String { format!("/lightningstor/objects/{}/{}", bucket_id, prefix) } fn multipart_upload_key(upload_id: &str) -> String { format!("/lightningstor/multipart/uploads/{}", upload_id) } fn multipart_upload_prefix() -> &'static str { "/lightningstor/multipart/uploads/" } fn multipart_bucket_key(bucket_id: &str, object_key: &str, upload_id: &str) -> String { format!( "/lightningstor/multipart/by-bucket/{}/{}/{}", bucket_id, object_key, upload_id ) } fn multipart_bucket_prefix(bucket_id: &BucketId, prefix: &str) -> String { format!( "/lightningstor/multipart/by-bucket/{}/{}", bucket_id, prefix ) } fn multipart_object_key(object_id: &ObjectId) -> String { format!("/lightningstor/multipart/objects/{}", object_id) } fn replicated_repair_task_key(task_id: &str) -> String { format!("/lightningstor/repair/replicated/{}", task_id) } fn replicated_repair_task_prefix() -> &'static str { "/lightningstor/repair/replicated/" } pub async fn save_replicated_repair_task(&self, task: &ReplicatedRepairTask) -> Result<()> { let key = Self::replicated_repair_task_key(&task.id); let value = serde_json::to_string(task).map_err(|e| { lightningstor_types::Error::StorageError(format!( "Failed to serialize replicated repair task: {}", e )) })?; self.put(&key, &value).await } pub async fn list_replicated_repair_tasks( &self, limit: u32, ) -> Result> { let (items, _) = self .get_prefix_page(Self::replicated_repair_task_prefix(), None, limit) .await?; let mut tasks = Vec::new(); for (_, value) in items { let task: ReplicatedRepairTask = serde_json::from_str(&value).map_err(|e| { lightningstor_types::Error::StorageError(format!( "Failed to deserialize replicated repair task: {}", e )) })?; tasks.push(task); } Ok(tasks) } pub async fn delete_replicated_repair_task(&self, task_id: &str) -> Result<()> { self.delete_key(&Self::replicated_repair_task_key(task_id)) .await } /// Save bucket metadata pub async fn save_bucket(&self, bucket: &Bucket) -> Result<()> { let key = Self::bucket_key(&bucket.org_id, &bucket.project_id, bucket.name.as_str()); let value = serde_json::to_string(bucket).map_err(|e| { lightningstor_types::Error::StorageError(format!("Failed to serialize bucket: {}", e)) })?; self.put(&key, &value).await?; // Also save bucket ID mapping let id_key = Self::bucket_id_key(&bucket.id); self.put(&id_key, &key).await?; self.bucket_cache.insert(key, bucket.clone()); self.bucket_cache.insert(id_key, bucket.clone()); Ok(()) } /// Load bucket metadata pub async fn load_bucket( &self, org_id: &str, project_id: &str, bucket_name: &str, ) -> Result> { let key = Self::bucket_key(org_id, project_id, bucket_name); if let Some(bucket) = self.bucket_cache.get(&key) { return Ok(Some(bucket.clone())); } if let Some(value) = self.get(&key).await? { let bucket: Bucket = serde_json::from_str(&value).map_err(|e| { lightningstor_types::Error::StorageError(format!( "Failed to deserialize bucket: {}", e )) })?; self.bucket_cache.insert(key, bucket.clone()); Ok(Some(bucket)) } else { Ok(None) } } /// Load bucket by ID pub async fn load_bucket_by_id(&self, bucket_id: &BucketId) -> Result> { let id_key = Self::bucket_id_key(bucket_id); if let Some(bucket) = self.bucket_cache.get(&id_key) { return Ok(Some(bucket.clone())); } if let Some(bucket_key) = self.get(&id_key).await? { if let Some(value) = self.get(&bucket_key).await? { let bucket: Bucket = serde_json::from_str(&value).map_err(|e| { lightningstor_types::Error::StorageError(format!( "Failed to deserialize bucket: {}", e )) })?; self.bucket_cache.insert(bucket_key.clone(), bucket.clone()); self.bucket_cache.insert(id_key, bucket.clone()); Ok(Some(bucket)) } else { Ok(None) } } else { Ok(None) } } /// Delete bucket metadata pub async fn delete_bucket(&self, bucket: &Bucket) -> Result<()> { // Only delete bucket metadata; object deletion should be explicit. let key = Self::bucket_key(&bucket.org_id, &bucket.project_id, bucket.name.as_str()); let id_key = Self::bucket_id_key(&bucket.id); self.delete_key(&key).await?; self.delete_key(&id_key).await?; self.bucket_cache.remove(&key); self.bucket_cache.remove(&id_key); Ok(()) } /// Check whether a bucket has any objects pub async fn has_objects(&self, bucket_id: &BucketId) -> Result { let prefix = format!("/lightningstor/objects/{}/", bucket_id); self.has_prefix(&prefix).await } /// List buckets for a tenant pub async fn list_buckets( &self, org_id: &str, project_id: Option<&str>, ) -> Result> { let prefix = if let Some(project_id) = project_id { format!("/lightningstor/buckets/{}/{}/", org_id, project_id) } else { format!("/lightningstor/buckets/{}/", org_id) }; let items = self.get_prefix(&prefix).await?; let mut buckets = Vec::new(); for (_, value) in items { if let Ok(bucket) = serde_json::from_str::(&value) { let key = Self::bucket_key(&bucket.org_id, &bucket.project_id, bucket.name.as_str()); let id_key = Self::bucket_id_key(&bucket.id); self.bucket_cache.insert(key, bucket.clone()); self.bucket_cache.insert(id_key, bucket.clone()); buckets.push(bucket); } } Ok(buckets) } /// Save object metadata pub async fn save_object(&self, object: &Object) -> Result<()> { let version_id = if object.version.is_null() { None } else { Some(object.version.as_str()) }; // bucket_id is stored as String in Object, need to parse it let bucket_id = BucketId::from_str(&object.bucket_id).map_err(|_| { lightningstor_types::Error::InvalidArgument("Invalid bucket ID".to_string()) })?; let key = Self::object_key(&bucket_id, object.key.as_str(), version_id); let value = serde_json::to_string(object).map_err(|e| { lightningstor_types::Error::StorageError(format!("Failed to serialize object: {}", e)) })?; self.put(&key, &value).await?; self.object_cache.insert(key, object.clone()); Ok(()) } /// Load object metadata pub async fn load_object( &self, bucket_id: &BucketId, object_key: &str, version_id: Option<&str>, ) -> Result> { let key = Self::object_key(bucket_id, object_key, version_id); if let Some(object) = self.object_cache.get(&key) { return Ok(Some(object.clone())); } if let Some(value) = self.get(&key).await? { let object: Object = serde_json::from_str(&value).map_err(|e| { lightningstor_types::Error::StorageError(format!( "Failed to deserialize object: {}", e )) })?; self.object_cache.insert(key, object.clone()); Ok(Some(object)) } else { Ok(None) } } /// Delete object metadata pub async fn delete_object( &self, bucket_id: &BucketId, object_key: &str, version_id: Option<&str>, ) -> Result<()> { let key = Self::object_key(bucket_id, object_key, version_id); self.delete_key(&key).await?; self.object_cache.remove(&key); Ok(()) } /// List objects in a bucket pub async fn list_objects( &self, bucket_id: &BucketId, prefix: &str, max_keys: u32, ) -> Result> { if max_keys > 0 { return self .list_objects_page(bucket_id, prefix, None, max_keys) .await .map(|(objects, _)| objects); } let prefix_key = Self::object_prefix(bucket_id, prefix); let items = self.get_prefix(&prefix_key).await?; let mut objects = Vec::new(); for (_, value) in items.into_iter() { if let Ok(object) = serde_json::from_str::(&value) { objects.push(object); } } // Sort by key for consistent ordering objects.sort_by(|a, b| a.key.as_str().cmp(b.key.as_str())); if max_keys > 0 && objects.len() > max_keys as usize { objects.truncate(max_keys as usize); } Ok(objects) } pub async fn list_objects_page( &self, bucket_id: &BucketId, prefix: &str, start_after_key: Option<&str>, max_keys: u32, ) -> Result<(Vec, bool)> { if max_keys == 0 { return Ok((Vec::new(), false)); } let prefix_key = Self::object_prefix(bucket_id, prefix); let start_after_storage_key = start_after_key.map(|key| Self::object_key(bucket_id, key, None)); let (items, has_more) = self .get_prefix_page(&prefix_key, start_after_storage_key.as_deref(), max_keys) .await?; let mut objects = Vec::new(); for (_, value) in items { if let Ok(object) = serde_json::from_str::(&value) { objects.push(object); } } Ok((objects, has_more)) } pub async fn save_multipart_upload(&self, upload: &MultipartUpload) -> Result<()> { let key = Self::multipart_upload_key(upload.upload_id.as_str()); let value = serde_json::to_string(upload).map_err(|e| { lightningstor_types::Error::StorageError(format!( "Failed to serialize multipart upload: {}", e )) })?; self.put(&key, &value).await?; self.put( &Self::multipart_bucket_key( &upload.bucket_id, upload.key.as_str(), upload.upload_id.as_str(), ), &value, ) .await } pub async fn load_multipart_upload(&self, upload_id: &str) -> Result> { let key = Self::multipart_upload_key(upload_id); if let Some(value) = self.get(&key).await? { let upload: MultipartUpload = serde_json::from_str(&value).map_err(|e| { lightningstor_types::Error::StorageError(format!( "Failed to deserialize multipart upload: {}", e )) })?; Ok(Some(upload)) } else { Ok(None) } } pub async fn delete_multipart_upload(&self, upload_id: &str) -> Result<()> { if let Some(upload) = self.load_multipart_upload(upload_id).await? { self.delete_key(&Self::multipart_bucket_key( &upload.bucket_id, upload.key.as_str(), upload.upload_id.as_str(), )) .await?; } self.delete_key(&Self::multipart_upload_key(upload_id)) .await } pub async fn list_multipart_uploads( &self, bucket_id: &BucketId, prefix: &str, max_uploads: u32, ) -> Result> { let index_prefix = Self::multipart_bucket_prefix(bucket_id, prefix); let items = if max_uploads > 0 { self.get_prefix_page(&index_prefix, None, max_uploads) .await? .0 } else { self.get_prefix(&index_prefix).await? }; let mut uploads = Vec::new(); for (_, value) in items { if let Ok(upload) = serde_json::from_str::(&value) { uploads.push(upload); } } if uploads.is_empty() { let fallback_items = self.get_prefix(Self::multipart_upload_prefix()).await?; for (_, value) in fallback_items { if let Ok(upload) = serde_json::from_str::(&value) { if upload.bucket_id == bucket_id.to_string() && upload.key.as_str().starts_with(prefix) { uploads.push(upload); } } } } uploads.sort_by(|a, b| { a.key .as_str() .cmp(b.key.as_str()) .then_with(|| a.initiated.cmp(&b.initiated)) }); if max_uploads > 0 && uploads.len() > max_uploads as usize { uploads.truncate(max_uploads as usize); } Ok(uploads) } pub async fn save_object_multipart_upload( &self, object_id: &ObjectId, upload: &MultipartUpload, ) -> Result<()> { let key = Self::multipart_object_key(object_id); let value = serde_json::to_string(upload).map_err(|e| { lightningstor_types::Error::StorageError(format!( "Failed to serialize multipart manifest: {}", e )) })?; self.put(&key, &value).await } pub async fn load_object_multipart_upload( &self, object_id: &ObjectId, ) -> Result> { let key = Self::multipart_object_key(object_id); if let Some(value) = self.get(&key).await? { let upload: MultipartUpload = serde_json::from_str(&value).map_err(|e| { lightningstor_types::Error::StorageError(format!( "Failed to deserialize multipart manifest: {}", e )) })?; Ok(Some(upload)) } else { Ok(None) } } pub async fn delete_object_multipart_upload(&self, object_id: &ObjectId) -> Result<()> { self.delete_key(&Self::multipart_object_key(object_id)) .await } } fn normalize_transport_addr(endpoint: &str) -> String { endpoint .trim() .trim_start_matches("http://") .trim_start_matches("https://") .trim_end_matches('/') .to_string() } #[cfg(test)] mod tests { use super::*; use lightningstor_distributed::ReplicatedRepairTask; use lightningstor_types::{BucketName, ETag, ObjectKey}; #[tokio::test] async fn bucket_cache_hits_and_invalidates_on_delete() { let store = MetadataStore::new_in_memory(); let bucket = Bucket::new( BucketName::new("bench-bucket").unwrap(), "org-a", "project-a", "default", ); store.save_bucket(&bucket).await.unwrap(); let cache_key = MetadataStore::bucket_key("org-a", "project-a", "bench-bucket"); let cache_id_key = MetadataStore::bucket_id_key(&bucket.id); assert!(store.bucket_cache.contains_key(&cache_key)); assert!(store.bucket_cache.contains_key(&cache_id_key)); let loaded = store .load_bucket("org-a", "project-a", "bench-bucket") .await .unwrap() .unwrap(); assert_eq!(loaded.id, bucket.id); let by_id = store.load_bucket_by_id(&bucket.id).await.unwrap().unwrap(); assert_eq!(by_id.name, bucket.name); store.delete_bucket(&bucket).await.unwrap(); assert!(!store.bucket_cache.contains_key(&cache_key)); assert!(!store.bucket_cache.contains_key(&cache_id_key)); assert!(store .load_bucket("org-a", "project-a", "bench-bucket") .await .unwrap() .is_none()); } #[tokio::test] async fn object_cache_hits_and_invalidates_on_delete() { let store = MetadataStore::new_in_memory(); let bucket = Bucket::new( BucketName::new("objects-bucket").unwrap(), "org-a", "project-a", "default", ); store.save_bucket(&bucket).await.unwrap(); let mut object = Object::new( bucket.id.to_string(), ObjectKey::new("bench/object.bin").unwrap(), ETag::from_md5(&[1u8; 16]), 4096, Some("application/octet-stream".to_string()), ); object.version = lightningstor_types::ObjectVersion::null(); store.save_object(&object).await.unwrap(); let cache_key = MetadataStore::object_key(&bucket.id, object.key.as_str(), None); assert!(store.object_cache.contains_key(&cache_key)); let loaded = store .load_object(&bucket.id, object.key.as_str(), None) .await .unwrap() .unwrap(); assert_eq!(loaded.id, object.id); store .delete_object(&bucket.id, object.key.as_str(), None) .await .unwrap(); assert!(!store.object_cache.contains_key(&cache_key)); assert!(store .load_object(&bucket.id, object.key.as_str(), None) .await .unwrap() .is_none()); } #[tokio::test] async fn list_objects_page_honors_start_after_and_has_more() { let store = MetadataStore::new_in_memory(); let bucket = Bucket::new( BucketName::new("paged-bucket").unwrap(), "org-a", "project-a", "default", ); store.save_bucket(&bucket).await.unwrap(); for key in ["a.txt", "b.txt", "c.txt"] { let mut object = Object::new( bucket.id.to_string(), ObjectKey::new(key).unwrap(), ETag::from_md5(&[7u8; 16]), 128, Some("text/plain".to_string()), ); object.version = lightningstor_types::ObjectVersion::null(); store.save_object(&object).await.unwrap(); } let (first_page, first_has_more) = store .list_objects_page(&bucket.id, "", None, 2) .await .unwrap(); assert_eq!( first_page .iter() .map(|object| object.key.as_str().to_string()) .collect::>(), vec!["a.txt".to_string(), "b.txt".to_string()] ); assert!(first_has_more); let (second_page, second_has_more) = store .list_objects_page(&bucket.id, "", Some("b.txt"), 2) .await .unwrap(); assert_eq!( second_page .iter() .map(|object| object.key.as_str().to_string()) .collect::>(), vec!["c.txt".to_string()] ); assert!(!second_has_more); } #[tokio::test] async fn list_multipart_uploads_uses_bucket_prefix_index() { let store = MetadataStore::new_in_memory(); let bucket = Bucket::new( BucketName::new("multipart-bucket").unwrap(), "org-a", "project-a", "default", ); store.save_bucket(&bucket).await.unwrap(); let upload_a = MultipartUpload::new(bucket.id.to_string(), ObjectKey::new("a/one.bin").unwrap()); let upload_b = MultipartUpload::new(bucket.id.to_string(), ObjectKey::new("a/two.bin").unwrap()); let other_bucket = Bucket::new( BucketName::new("other-bucket").unwrap(), "org-a", "project-a", "default", ); store.save_bucket(&other_bucket).await.unwrap(); let upload_other = MultipartUpload::new( other_bucket.id.to_string(), ObjectKey::new("a/three.bin").unwrap(), ); store.save_multipart_upload(&upload_a).await.unwrap(); store.save_multipart_upload(&upload_b).await.unwrap(); store.save_multipart_upload(&upload_other).await.unwrap(); let uploads = store .list_multipart_uploads(&bucket.id, "a/", 10) .await .unwrap(); assert_eq!(uploads.len(), 2); assert_eq!( uploads .iter() .map(|upload| upload.key.as_str().to_string()) .collect::>(), vec!["a/one.bin".to_string(), "a/two.bin".to_string()] ); } #[tokio::test] async fn replicated_repair_tasks_round_trip() { let store = MetadataStore::new_in_memory(); let mut task = ReplicatedRepairTask::new("obj_abc", 0, "quorum write"); store.save_replicated_repair_task(&task).await.unwrap(); let tasks = store.list_replicated_repair_tasks(10).await.unwrap(); assert_eq!(tasks.len(), 1); assert_eq!(tasks[0].key, "obj_abc"); task.schedule_retry("transient failure", 5_000); store.save_replicated_repair_task(&task).await.unwrap(); let tasks = store.list_replicated_repair_tasks(10).await.unwrap(); assert_eq!(tasks[0].attempt_count, 1); assert_eq!(tasks[0].last_error.as_deref(), Some("transient failure")); store.delete_replicated_repair_task(&task.id).await.unwrap(); assert!(store .list_replicated_repair_tasks(10) .await .unwrap() .is_empty()); } }