use crate::error::{Result, SqlError}; use crate::types::{ColumnDef, DataType, TableMetadata}; use flaredb_client::RdbClient; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use tokio::sync::{Mutex, RwLock}; const META_PREFIX: &str = "__sql_meta"; const TABLES_KEY_PREFIX: &str = "__sql_meta:tables"; const NEXT_TABLE_ID_KEY: &str = "__sql_meta:next_table_id"; /// Metadata manager for SQL tables pub struct MetadataManager { client: Arc>, cache: Arc>>, } impl MetadataManager { pub fn new(client: Arc>) -> Self { Self { client, cache: Arc::new(RwLock::new(std::collections::HashMap::new())), } } /// Create a new table pub async fn create_table( &self, table_name: String, columns: Vec, primary_key: Vec, ) -> Result { // Check if table already exists if self.table_exists(&table_name).await? { return Err(SqlError::TableAlreadyExists(table_name)); } // Validate primary key columns exist for pk_col in &primary_key { if !columns.iter().any(|c| &c.name == pk_col) { return Err(SqlError::ColumnNotFound( pk_col.clone(), table_name.clone(), )); } } // Allocate table ID let table_id = self.allocate_table_id().await?; // Create table metadata let metadata = TableMetadata { table_id, table_name: table_name.clone(), columns, primary_key, created_at: SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_secs(), }; // Store metadata in KVS self.store_metadata(&metadata).await?; // Update cache self.cache .write() .await .insert(table_name.clone(), metadata.clone()); Ok(metadata) } /// Drop an existing table pub async fn drop_table(&self, table_name: &str) -> Result<()> { // Check if table exists if !self.table_exists(table_name).await? { return Err(SqlError::TableNotFound(table_name.to_string())); } // Get table metadata to find table_id let metadata = self.get_table_metadata(table_name).await?; // Delete metadata from KVS let key = format!("{}:{}", TABLES_KEY_PREFIX, table_name); let mut client = self.client.lock().await; client .raw_delete(key.as_bytes().to_vec()) .await .map_err(|e| SqlError::KvsError(e.to_string()))?; // Remove from cache self.cache.write().await.remove(table_name); // Note: We don't delete table data here for simplicity // In production, we'd need to scan and delete all rows with table_id prefix Ok(()) } /// Get table metadata pub async fn get_table_metadata(&self, table_name: &str) -> Result { // Check cache first { let cache = self.cache.read().await; if let Some(metadata) = cache.get(table_name) { return Ok(metadata.clone()); } } // Load from KVS let key = format!("{}:{}", TABLES_KEY_PREFIX, table_name); let mut client = self.client.lock().await; let value = client .cas_get(key.as_bytes().to_vec()) .await .map_err(|e| SqlError::KvsError(e.to_string()))?; if let Some((_version, ref bytes)) = value { let metadata: TableMetadata = bincode::deserialize(&bytes) .map_err(|e| SqlError::SerializationError(e.to_string()))?; // Update cache self.cache .write() .await .insert(table_name.to_string(), metadata.clone()); Ok(metadata) } else { Err(SqlError::TableNotFound(table_name.to_string())) } } /// Check if table exists pub async fn table_exists(&self, table_name: &str) -> Result { // Check cache first { let cache = self.cache.read().await; if cache.contains_key(table_name) { return Ok(true); } } // Check KVS let key = format!("{}:{}", TABLES_KEY_PREFIX, table_name); let mut client = self.client.lock().await; let value = client .cas_get(key.as_bytes().to_vec()) .await .map_err(|e| SqlError::KvsError(e.to_string()))?; Ok(value.is_some()) } /// List all tables in the namespace pub async fn list_tables(&self) -> Result> { let start_key = format!("{}:", TABLES_KEY_PREFIX); let end_key = format!("{}~", TABLES_KEY_PREFIX); // '~' is after ':' in ASCII let mut client = self.client.lock().await; let (entries, _next_key) = client .cas_scan(start_key.as_bytes().to_vec(), end_key.as_bytes().to_vec(), 1000) .await .map_err(|e| SqlError::KvsError(e.to_string()))?; let mut tables = Vec::new(); for (key, _value, _version) in entries { if let Ok(key_str) = String::from_utf8(key) { if let Some(table_name) = key_str.strip_prefix(&format!("{}:", TABLES_KEY_PREFIX)) { tables.push(table_name.to_string()); } } } Ok(tables) } /// Store table metadata in KVS async fn store_metadata(&self, metadata: &TableMetadata) -> Result<()> { let key = format!("{}:{}", TABLES_KEY_PREFIX, metadata.table_name); let value = bincode::serialize(metadata).map_err(|e| SqlError::SerializationError(e.to_string()))?; let mut client = self.client.lock().await; // Use version 0 for new table (we already checked it doesn't exist) let (success, _current_version, _new_version) = client .cas(key.as_bytes().to_vec(), value, 0) .await .map_err(|e| SqlError::KvsError(e.to_string()))?; if !success { return Err(SqlError::InternalError("Failed to store table metadata".to_string())); } Ok(()) } /// Allocate a new table ID using CAS for atomicity async fn allocate_table_id(&self) -> Result { let mut client = self.client.lock().await; let key = NEXT_TABLE_ID_KEY.as_bytes().to_vec(); // Retry loop for CAS for _attempt in 0..10 { // Get current counter with version let current = client .cas_get(key.clone()) .await .map_err(|e| SqlError::KvsError(e.to_string()))?; let (next_id, expected_version) = if let Some((version, bytes)) = current { let current_id = u32::from_be_bytes( bytes .try_into() .map_err(|_| SqlError::InternalError("Invalid table ID format".to_string()))?, ); (current_id + 1, version) } else { (1u32, 0u64) // Start from 1 if no counter exists }; // Try to CAS the incremented counter let value = next_id.to_be_bytes().to_vec(); let (success, _current_version, _new_version) = client .cas(key.clone(), value, expected_version) .await .map_err(|e| SqlError::KvsError(e.to_string()))?; if success { return Ok(next_id); } // CAS failed, retry } Err(SqlError::InternalError("Failed to allocate table ID after retries".to_string())) } } #[cfg(test)] mod tests { use super::*; use crate::types::DataType; // Note: These tests require a running FlareDB instance // For now, we'll keep them as examples #[tokio::test] #[ignore] // Requires FlareDB server async fn test_create_table() { let client = RdbClient::connect_direct("127.0.0.1:8001".to_string(), "sqltest".to_string()).await.unwrap(); let manager = MetadataManager::new(Arc::new(Mutex::new(client))); let columns = vec![ ColumnDef { name: "id".to_string(), data_type: DataType::Integer, nullable: false, default_value: None, }, ColumnDef { name: "name".to_string(), data_type: DataType::Text, nullable: false, default_value: None, }, ]; let metadata = manager .create_table("users".to_string(), columns, vec!["id".to_string()]) .await .unwrap(); assert_eq!(metadata.table_name, "users"); assert_eq!(metadata.columns.len(), 2); assert_eq!(metadata.primary_key, vec!["id".to_string()]); } }