//! SQL storage implementation for CreditService (Postgres/SQLite). use async_trait::async_trait; use creditservice_types::{Error, Quota, Reservation, ResourceType, Result, Transaction, Wallet}; use serde::{Deserialize, Serialize}; use sqlx::pool::PoolOptions; use sqlx::{Pool, Postgres, Sqlite}; use std::sync::Arc; use super::CreditStorage; enum SqlBackend { Postgres(Arc>), Sqlite(Arc>), } /// SQL storage implementation for CreditService data pub struct SqlStorage { backend: SqlBackend, } impl SqlStorage { /// Create a new SQL storage from `postgres://...` or `sqlite:...`. pub async fn new(database_url: &str, single_node: bool) -> Result> { let url = database_url.trim(); if url.is_empty() { return Err(Error::Storage("database URL is empty".to_string())); } if Self::is_postgres_url(url) { let pool = PoolOptions::::new() .max_connections(10) .connect(url) .await .map_err(|e| Error::Storage(format!("Failed to connect to Postgres: {}", e)))?; Self::ensure_schema_postgres(&pool).await?; return Ok(Arc::new(Self { backend: SqlBackend::Postgres(Arc::new(pool)), })); } if Self::is_sqlite_url(url) { if !single_node { return Err(Error::Storage( "SQLite is allowed only in single-node mode".to_string(), )); } if url.contains(":memory:") { return Err(Error::Storage("In-memory SQLite is not allowed".to_string())); } let pool = PoolOptions::::new() .max_connections(1) .connect(url) .await .map_err(|e| Error::Storage(format!("Failed to connect to SQLite: {}", e)))?; Self::ensure_schema_sqlite(&pool).await?; return Ok(Arc::new(Self { backend: SqlBackend::Sqlite(Arc::new(pool)), })); } Err(Error::Storage( "Unsupported database URL (use postgres://, postgresql://, or sqlite:)".to_string(), )) } fn is_postgres_url(url: &str) -> bool { url.starts_with("postgres://") || url.starts_with("postgresql://") } fn is_sqlite_url(url: &str) -> bool { url.starts_with("sqlite:") } async fn ensure_schema_postgres(pool: &Pool) -> Result<()> { sqlx::query( "CREATE TABLE IF NOT EXISTS creditservice_kv ( key TEXT PRIMARY KEY, value BYTEA NOT NULL )", ) .execute(pool) .await .map_err(|e| Error::Storage(format!("Failed to initialize Postgres schema: {}", e)))?; Ok(()) } async fn ensure_schema_sqlite(pool: &Pool) -> Result<()> { sqlx::query( "CREATE TABLE IF NOT EXISTS creditservice_kv ( key TEXT PRIMARY KEY, value BLOB NOT NULL )", ) .execute(pool) .await .map_err(|e| Error::Storage(format!("Failed to initialize SQLite schema: {}", e)))?; Ok(()) } fn wallet_key(project_id: &str) -> String { format!("/creditservice/wallets/{}", project_id) } fn transaction_key(project_id: &str, transaction_id: &str, timestamp_nanos: u64) -> String { format!( "/creditservice/transactions/{}/{}_{}", project_id, timestamp_nanos, transaction_id ) } fn reservation_key(id: &str) -> String { format!("/creditservice/reservations/{}", id) } fn quota_key(project_id: &str, resource_type: ResourceType) -> String { format!("/creditservice/quotas/{}/{}", project_id, resource_type.as_str()) } fn transactions_prefix(project_id: &str) -> String { format!("/creditservice/transactions/{}/", project_id) } fn quotas_prefix(project_id: &str) -> String { format!("/creditservice/quotas/{}/", project_id) } fn reservations_prefix() -> String { "/creditservice/reservations/".to_string() } fn serialize(value: &T) -> Result> { serde_json::to_vec(value) .map_err(|e| Error::Storage(format!("Failed to serialize data: {}", e))) } fn deserialize Deserialize<'de>>(bytes: &[u8]) -> Result { serde_json::from_slice(bytes) .map_err(|e| Error::Storage(format!("Failed to deserialize data: {}", e))) } async fn put(&self, key: &str, value: &[u8]) -> Result<()> { match &self.backend { SqlBackend::Postgres(pool) => { sqlx::query( "INSERT INTO creditservice_kv (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value", ) .bind(key) .bind(value) .execute(pool.as_ref()) .await .map_err(|e| Error::Storage(format!("Postgres put failed: {}", e)))?; } SqlBackend::Sqlite(pool) => { sqlx::query( "INSERT INTO creditservice_kv (key, value) VALUES (?1, ?2) ON CONFLICT(key) DO UPDATE SET value = excluded.value", ) .bind(key) .bind(value) .execute(pool.as_ref()) .await .map_err(|e| Error::Storage(format!("SQLite put failed: {}", e)))?; } } Ok(()) } async fn put_if_absent(&self, key: &str, value: &[u8]) -> Result { let rows_affected = match &self.backend { SqlBackend::Postgres(pool) => { sqlx::query("INSERT INTO creditservice_kv (key, value) VALUES ($1, $2) ON CONFLICT DO NOTHING") .bind(key) .bind(value) .execute(pool.as_ref()) .await .map_err(|e| Error::Storage(format!("Postgres insert failed: {}", e)))? .rows_affected() } SqlBackend::Sqlite(pool) => { sqlx::query("INSERT OR IGNORE INTO creditservice_kv (key, value) VALUES (?1, ?2)") .bind(key) .bind(value) .execute(pool.as_ref()) .await .map_err(|e| Error::Storage(format!("SQLite insert failed: {}", e)))? .rows_affected() } }; Ok(rows_affected > 0) } async fn get(&self, key: &str) -> Result>> { match &self.backend { SqlBackend::Postgres(pool) => { let value: Option> = sqlx::query_scalar("SELECT value FROM creditservice_kv WHERE key = $1") .bind(key) .fetch_optional(pool.as_ref()) .await .map_err(|e| Error::Storage(format!("Postgres get failed: {}", e)))?; Ok(value) } SqlBackend::Sqlite(pool) => { let value: Option> = sqlx::query_scalar("SELECT value FROM creditservice_kv WHERE key = ?1") .bind(key) .fetch_optional(pool.as_ref()) .await .map_err(|e| Error::Storage(format!("SQLite get failed: {}", e)))?; Ok(value) } } } async fn delete(&self, key: &str) -> Result { let rows_affected = match &self.backend { SqlBackend::Postgres(pool) => { sqlx::query("DELETE FROM creditservice_kv WHERE key = $1") .bind(key) .execute(pool.as_ref()) .await .map_err(|e| Error::Storage(format!("Postgres delete failed: {}", e)))? .rows_affected() } SqlBackend::Sqlite(pool) => { sqlx::query("DELETE FROM creditservice_kv WHERE key = ?1") .bind(key) .execute(pool.as_ref()) .await .map_err(|e| Error::Storage(format!("SQLite delete failed: {}", e)))? .rows_affected() } }; Ok(rows_affected > 0) } async fn scan_prefix_values(&self, prefix: &str) -> Result>> { let like_pattern = format!("{}%", prefix); match &self.backend { SqlBackend::Postgres(pool) => { let values: Vec> = sqlx::query_scalar( "SELECT value FROM creditservice_kv WHERE key LIKE $1 ORDER BY key", ) .bind(like_pattern) .fetch_all(pool.as_ref()) .await .map_err(|e| Error::Storage(format!("Postgres scan failed: {}", e)))?; Ok(values) } SqlBackend::Sqlite(pool) => { let values: Vec> = sqlx::query_scalar( "SELECT value FROM creditservice_kv WHERE key LIKE ?1 ORDER BY key", ) .bind(like_pattern) .fetch_all(pool.as_ref()) .await .map_err(|e| Error::Storage(format!("SQLite scan failed: {}", e)))?; Ok(values) } } } } #[async_trait] impl CreditStorage for SqlStorage { async fn get_wallet(&self, project_id: &str) -> Result> { let key = Self::wallet_key(project_id); self.get(&key) .await? .map(|v| Self::deserialize(v.as_slice())) .transpose() } async fn create_wallet(&self, wallet: Wallet) -> Result { let key = Self::wallet_key(&wallet.project_id); let value = Self::serialize(&wallet)?; if self.put_if_absent(&key, &value).await? { Ok(wallet) } else { Err(Error::WalletAlreadyExists(wallet.project_id)) } } async fn update_wallet(&self, wallet: Wallet) -> Result { let key = Self::wallet_key(&wallet.project_id); let value = Self::serialize(&wallet)?; self.put(&key, &value).await?; Ok(wallet) } async fn delete_wallet(&self, project_id: &str) -> Result { let key = Self::wallet_key(project_id); self.delete(&key).await } async fn add_transaction(&self, transaction: Transaction) -> Result { let key = Self::transaction_key( &transaction.project_id, &transaction.id, transaction.created_at.timestamp_nanos() as u64, ); let value = Self::serialize(&transaction)?; self.put(&key, &value).await?; Ok(transaction) } async fn get_transactions( &self, project_id: &str, limit: usize, offset: usize, ) -> Result> { let prefix = Self::transactions_prefix(project_id); let mut transactions: Vec = self .scan_prefix_values(&prefix) .await? .into_iter() .filter_map(|v| Self::deserialize(v.as_slice()).ok()) .collect(); transactions.sort_by(|a, b| b.created_at.cmp(&a.created_at)); Ok(transactions.into_iter().skip(offset).take(limit).collect()) } async fn get_reservation(&self, id: &str) -> Result> { let key = Self::reservation_key(id); self.get(&key) .await? .map(|v| Self::deserialize(v.as_slice())) .transpose() } async fn create_reservation(&self, reservation: Reservation) -> Result { let key = Self::reservation_key(&reservation.id); let value = Self::serialize(&reservation)?; self.put(&key, &value).await?; Ok(reservation) } async fn update_reservation(&self, reservation: Reservation) -> Result { let key = Self::reservation_key(&reservation.id); let value = Self::serialize(&reservation)?; self.put(&key, &value).await?; Ok(reservation) } async fn delete_reservation(&self, id: &str) -> Result { let key = Self::reservation_key(id); self.delete(&key).await } async fn get_pending_reservations(&self, project_id: &str) -> Result> { let prefix = Self::reservations_prefix(); let reservations: Vec = self .scan_prefix_values(&prefix) .await? .into_iter() .filter_map(|v| Self::deserialize(v.as_slice()).ok()) .filter(|r: &Reservation| { r.status == creditservice_types::ReservationStatus::Pending && r.project_id == project_id }) .collect(); Ok(reservations) } async fn get_quota(&self, project_id: &str, resource_type: ResourceType) -> Result> { let key = Self::quota_key(project_id, resource_type); self.get(&key) .await? .map(|v| Self::deserialize(v.as_slice())) .transpose() } async fn set_quota(&self, quota: Quota) -> Result { let key = Self::quota_key("a.project_id, quota.resource_type); let value = Self::serialize("a)?; self.put(&key, &value).await?; Ok(quota) } async fn list_quotas(&self, project_id: &str) -> Result> { let prefix = Self::quotas_prefix(project_id); let quotas: Vec = self .scan_prefix_values(&prefix) .await? .into_iter() .filter_map(|v| Self::deserialize(v.as_slice()).ok()) .collect(); Ok(quotas) } }