//! CreditService gRPC implementation use crate::billing::{PricingRules, UsageMetricsProvider}; use crate::storage::CreditStorage; use chrono::{DateTime, Utc}; use creditservice_proto::{ credit_service_server::CreditService, BillingResult as ProtoBillingResult, CheckQuotaRequest, CheckQuotaResponse, CommitReservationRequest, CommitReservationResponse, CreateWalletRequest, CreateWalletResponse, GetQuotaRequest, GetQuotaResponse, GetTransactionsRequest, GetTransactionsResponse, GetWalletRequest, GetWalletResponse, ListQuotasRequest, ListQuotasResponse, ProcessBillingRequest, ProcessBillingResponse, Quota as ProtoQuota, ReleaseReservationRequest, ReleaseReservationResponse, Reservation as ProtoReservation, ReservationStatus as ProtoReservationStatus, ReserveCreditsRequest, ReserveCreditsResponse, ResourceType as ProtoResourceType, SetQuotaRequest, SetQuotaResponse, TopUpRequest, TopUpResponse, Transaction as ProtoTransaction, TransactionType as ProtoTransactionType, Wallet as ProtoWallet, WalletStatus as ProtoWalletStatus, }; use creditservice_types::{ Quota, Reservation, ReservationStatus, ResourceType, Transaction, TransactionType, Wallet, WalletStatus, }; use photon_auth_client::{ get_tenant_context, resolve_tenant_ids_from_context, resource_for_tenant, AuthService, TenantContext, }; use prost_types::Timestamp; use std::sync::Arc; use tokio::sync::RwLock; use tonic::{Request, Response, Status}; use tracing::{info, warn}; /// CreditService gRPC implementation #[derive(Clone)] pub struct CreditServiceImpl { storage: Arc, usage_provider: Arc>>>, pricing: PricingRules, auth: Option>, } const ACTION_WALLET_READ: &str = "billing:wallets:read"; const ACTION_WALLET_CREATE: &str = "billing:wallets:create"; const ACTION_WALLET_UPDATE: &str = "billing:wallets:update"; const ACTION_TXN_LIST: &str = "billing:transactions:list"; const ACTION_QUOTA_READ: &str = "billing:quotas:read"; const ACTION_QUOTA_SET: &str = "billing:quotas:update"; const ACTION_RESERVATION_CREATE: &str = "billing:reservations:create"; const ACTION_RESERVATION_COMMIT: &str = "billing:reservations:commit"; const ACTION_RESERVATION_RELEASE: &str = "billing:reservations:release"; const ACTION_BILLING_PROCESS: &str = "billing:process"; impl CreditServiceImpl { /// Create a new CreditServiceImpl with the given storage backend pub fn new(storage: Arc, auth: Arc) -> Self { Self { storage, usage_provider: Arc::new(RwLock::new(None)), pricing: PricingRules::default(), auth: Some(auth), } } /// Create with custom billing configuration pub fn with_billing( storage: Arc, usage_provider: Arc, pricing: PricingRules, auth: Arc, ) -> Self { Self { storage, usage_provider: Arc::new(RwLock::new(Some(usage_provider))), pricing, auth: Some(auth), } } #[cfg(test)] pub fn new_for_tests(storage: Arc) -> Self { Self { storage, usage_provider: Arc::new(RwLock::new(None)), pricing: PricingRules::default(), auth: None, } } #[cfg(test)] pub fn with_billing_for_tests( storage: Arc, usage_provider: Arc, pricing: PricingRules, ) -> Self { Self { storage, usage_provider: Arc::new(RwLock::new(Some(usage_provider))), pricing, auth: None, } } /// Set usage metrics provider (for late binding, e.g., after S5 is complete) pub async fn set_usage_provider(&self, provider: Arc) { let mut guard = self.usage_provider.write().await; *guard = Some(provider); } fn resolve_project_scope( &self, tenant: Option<&TenantContext>, req_org_id: Option<&str>, req_project_id: &str, ) -> Result<(String, String), Status> { if req_project_id.is_empty() { return Err(Status::invalid_argument("project_id is required")); } match tenant { Some(tenant) => { let org_id = req_org_id.unwrap_or(""); resolve_tenant_ids_from_context(tenant, org_id, req_project_id) } None => Ok((req_org_id.unwrap_or("").to_string(), req_project_id.to_string())), } } async fn authorize_project_action( &self, tenant: Option<&TenantContext>, action: &str, kind: &str, resource_id: &str, org_id: &str, project_id: &str, ) -> Result<(), Status> { let Some(tenant) = tenant else { return Ok(()); }; let auth = self .auth .as_ref() .ok_or_else(|| Status::internal("auth service not configured"))?; auth.authorize( tenant, action, &resource_for_tenant(kind, resource_id, org_id, project_id), ) .await } fn tenant_context(&self, request: &Request) -> Result, Status> { if self.auth.is_some() { Ok(Some(get_tenant_context(request)?)) } else { Ok(request.extensions().get::().cloned()) } } /// Process billing for a single project async fn process_project_billing( &self, project_id: &str, period_start: DateTime, period_end: DateTime, usage_provider: &Arc, ) -> Result { // Get wallet let mut wallet = self .storage .get_wallet(project_id) .await .map_err(map_storage_error)? .ok_or_else(|| Status::not_found(format!("Wallet not found: {}", project_id)))?; // Get usage metrics let usage = usage_provider .get_usage_metrics(project_id, period_start, period_end) .await .map_err(|e| Status::internal(format!("Failed to get usage metrics: {:?}", e)))?; // Calculate charge let charge = self.pricing.calculate_charge(&usage); if charge == 0 { info!(project_id = %project_id, "No charges for billing period"); return Ok(0); } info!( project_id = %project_id, charge = charge, "Processing billing charge" ); // Deduct from wallet wallet.balance -= charge; wallet.total_consumed += charge; wallet.updated_at = Utc::now(); // Suspend wallet if balance drops to zero or below if wallet.balance <= 0 { wallet.status = WalletStatus::Suspended; warn!(project_id = %project_id, balance = wallet.balance, "Wallet suspended due to zero/negative balance"); } // Create transaction let txn = Transaction::new( project_id.to_string(), TransactionType::BillingCharge, -charge, // Negative for charge wallet.balance, format!( "Billing charge for period {} to {}", period_start.format("%Y-%m-%d %H:%M"), period_end.format("%Y-%m-%d %H:%M") ), ); // Persist self.storage .update_wallet(wallet) .await .map_err(map_storage_error)?; self.storage .add_transaction(txn) .await .map_err(map_storage_error)?; Ok(charge) } } // Conversion helpers fn wallet_to_proto(wallet: &Wallet) -> ProtoWallet { ProtoWallet { project_id: wallet.project_id.clone(), org_id: wallet.org_id.clone(), balance: wallet.balance, reserved: wallet.reserved, total_deposited: wallet.total_deposited, total_consumed: wallet.total_consumed, status: match wallet.status { WalletStatus::Active => ProtoWalletStatus::Active as i32, WalletStatus::Suspended => ProtoWalletStatus::Suspended as i32, WalletStatus::Closed => ProtoWalletStatus::Closed as i32, }, created_at: Some(datetime_to_timestamp(wallet.created_at)), updated_at: Some(datetime_to_timestamp(wallet.updated_at)), } } fn transaction_to_proto(txn: &Transaction) -> ProtoTransaction { ProtoTransaction { id: txn.id.clone(), project_id: txn.project_id.clone(), r#type: match txn.transaction_type { TransactionType::TopUp => ProtoTransactionType::TopUp as i32, TransactionType::Reservation => ProtoTransactionType::Reservation as i32, TransactionType::Charge => ProtoTransactionType::Charge as i32, TransactionType::Release => ProtoTransactionType::Release as i32, TransactionType::Refund => ProtoTransactionType::Refund as i32, TransactionType::BillingCharge => ProtoTransactionType::BillingCharge as i32, }, amount: txn.amount, balance_after: txn.balance_after, description: txn.description.clone(), resource_id: txn.resource_id.clone().unwrap_or_default(), created_at: Some(datetime_to_timestamp(txn.created_at)), } } fn datetime_to_timestamp(dt: chrono::DateTime) -> Timestamp { Timestamp { seconds: dt.timestamp(), nanos: dt.timestamp_subsec_nanos() as i32, } } fn reservation_to_proto(res: &Reservation) -> ProtoReservation { ProtoReservation { id: res.id.clone(), project_id: res.project_id.clone(), amount: res.amount, status: match res.status { ReservationStatus::Pending => ProtoReservationStatus::Pending as i32, ReservationStatus::Committed => ProtoReservationStatus::Committed as i32, ReservationStatus::Released => ProtoReservationStatus::Released as i32, ReservationStatus::Expired => ProtoReservationStatus::Expired as i32, }, description: res.description.clone(), expires_at: Some(datetime_to_timestamp(res.expires_at)), created_at: Some(datetime_to_timestamp(res.created_at)), } } fn quota_to_proto(quota: &Quota) -> ProtoQuota { ProtoQuota { project_id: quota.project_id.clone(), resource_type: resource_type_to_proto(quota.resource_type) as i32, limit: quota.limit, current_usage: quota.current_usage, } } fn resource_type_to_proto(rt: ResourceType) -> ProtoResourceType { match rt { ResourceType::VmInstance => ProtoResourceType::VmInstance, ResourceType::VmCpu => ProtoResourceType::VmCpu, ResourceType::VmMemoryGb => ProtoResourceType::VmMemoryGb, ResourceType::StorageGb => ProtoResourceType::StorageGb, ResourceType::NetworkPort => ProtoResourceType::NetworkPort, ResourceType::LoadBalancer => ProtoResourceType::LoadBalancer, ResourceType::DnsZone => ProtoResourceType::DnsZone, ResourceType::DnsRecord => ProtoResourceType::DnsRecord, ResourceType::K8sCluster => ProtoResourceType::K8sCluster, ResourceType::K8sNode => ProtoResourceType::K8sNode, } } fn proto_to_resource_type(proto_rt: i32) -> Result { match ProtoResourceType::try_from(proto_rt) { Ok(ProtoResourceType::VmInstance) => Ok(ResourceType::VmInstance), Ok(ProtoResourceType::VmCpu) => Ok(ResourceType::VmCpu), Ok(ProtoResourceType::VmMemoryGb) => Ok(ResourceType::VmMemoryGb), Ok(ProtoResourceType::StorageGb) => Ok(ResourceType::StorageGb), Ok(ProtoResourceType::NetworkPort) => Ok(ResourceType::NetworkPort), Ok(ProtoResourceType::LoadBalancer) => Ok(ResourceType::LoadBalancer), Ok(ProtoResourceType::DnsZone) => Ok(ResourceType::DnsZone), Ok(ProtoResourceType::DnsRecord) => Ok(ResourceType::DnsRecord), Ok(ProtoResourceType::K8sCluster) => Ok(ResourceType::K8sCluster), Ok(ProtoResourceType::K8sNode) => Ok(ResourceType::K8sNode), _ => Err(Status::invalid_argument("Invalid resource type")), } } fn map_storage_error(err: creditservice_types::Error) -> Status { match err { creditservice_types::Error::WalletNotFound(id) => { Status::not_found(format!("Wallet not found: {}", id)) } creditservice_types::Error::WalletAlreadyExists(id) => { Status::already_exists(format!("Wallet already exists: {}", id)) } creditservice_types::Error::InsufficientBalance { available, required, } => Status::failed_precondition(format!( "Insufficient balance: available={}, required={}", available, required )), creditservice_types::Error::ReservationNotFound(id) => { Status::not_found(format!("Reservation not found: {}", id)) } creditservice_types::Error::QuotaExceeded { resource_type, limit, current, } => Status::resource_exhausted(format!( "Quota exceeded for {:?}: limit={}, current={}", resource_type, limit, current )), _ => Status::internal(format!("Internal error: {:?}", err)), } } #[tonic::async_trait] impl CreditService for CreditServiceImpl { async fn get_wallet( &self, request: Request, ) -> Result, Status> { let tenant = self.tenant_context(&request)?; let req = request.into_inner(); if req.project_id.is_empty() { return Err(Status::invalid_argument("project_id is required")); } let (org_id, project_id) = self.resolve_project_scope(tenant.as_ref(), None, &req.project_id)?; self.authorize_project_action( tenant.as_ref(), ACTION_WALLET_READ, "wallet", &project_id, &org_id, &project_id, ) .await?; info!(project_id = %project_id, "GetWallet request"); let wallet = self .storage .get_wallet(&project_id) .await .map_err(map_storage_error)?; match wallet { Some(w) => Ok(Response::new(GetWalletResponse { wallet: Some(wallet_to_proto(&w)), })), None => Err(Status::not_found(format!( "Wallet not found: {}", req.project_id ))), } } async fn create_wallet( &self, request: Request, ) -> Result, Status> { let tenant = self.tenant_context(&request)?; let req = request.into_inner(); if req.project_id.is_empty() { return Err(Status::invalid_argument("project_id is required")); } if req.initial_balance < 0 { return Err(Status::invalid_argument( "initial_balance must be non-negative", )); } let (org_id, project_id) = self.resolve_project_scope(tenant.as_ref(), Some(req.org_id.as_str()), &req.project_id)?; self.authorize_project_action( tenant.as_ref(), ACTION_WALLET_CREATE, "wallet", &project_id, &org_id, &project_id, ) .await?; info!( project_id = %project_id, org_id = %org_id, initial_balance = req.initial_balance, "CreateWallet request" ); let wallet = Wallet::new(project_id, org_id, req.initial_balance); let created = self .storage .create_wallet(wallet) .await .map_err(map_storage_error)?; if req.initial_balance > 0 { let opening_txn = Transaction::new( created.project_id.clone(), TransactionType::TopUp, req.initial_balance, created.balance, "Initial wallet funding".to_string(), ); self.storage .add_transaction(opening_txn) .await .map_err(map_storage_error)?; } Ok(Response::new(CreateWalletResponse { wallet: Some(wallet_to_proto(&created)), })) } async fn top_up( &self, request: Request, ) -> Result, Status> { let tenant = self.tenant_context(&request)?; let req = request.into_inner(); if req.project_id.is_empty() { return Err(Status::invalid_argument("project_id is required")); } let (org_id, project_id) = self.resolve_project_scope(tenant.as_ref(), None, &req.project_id)?; self.authorize_project_action( tenant.as_ref(), ACTION_WALLET_UPDATE, "wallet", &project_id, &org_id, &project_id, ) .await?; info!( project_id = %project_id, amount = req.amount, "TopUp request" ); if req.amount <= 0 { return Err(Status::invalid_argument("Amount must be positive")); } // Get current wallet let mut wallet = self .storage .get_wallet(&project_id) .await .map_err(map_storage_error)? .ok_or_else(|| Status::not_found(format!("Wallet not found: {}", project_id)))?; // Update balance wallet.balance += req.amount; wallet.total_deposited += req.amount; wallet.updated_at = Utc::now(); // Re-activate if suspended if wallet.status == WalletStatus::Suspended && wallet.balance > 0 { wallet.status = WalletStatus::Active; } // Create transaction let txn = Transaction::new( wallet.project_id.clone(), TransactionType::TopUp, req.amount, wallet.balance, req.description, ); // Persist let updated_wallet = self .storage .update_wallet(wallet) .await .map_err(map_storage_error)?; let saved_txn = self .storage .add_transaction(txn) .await .map_err(map_storage_error)?; Ok(Response::new(TopUpResponse { wallet: Some(wallet_to_proto(&updated_wallet)), transaction: Some(transaction_to_proto(&saved_txn)), })) } async fn get_transactions( &self, request: Request, ) -> Result, Status> { let tenant = self.tenant_context(&request)?; let req = request.into_inner(); if req.project_id.is_empty() { return Err(Status::invalid_argument("project_id is required")); } let (org_id, project_id) = self.resolve_project_scope(tenant.as_ref(), None, &req.project_id)?; self.authorize_project_action( tenant.as_ref(), ACTION_TXN_LIST, "transaction", &project_id, &org_id, &project_id, ) .await?; info!( project_id = %project_id, page_size = req.page_size, "GetTransactions request" ); // Parse page token as offset (simple pagination) let offset: usize = if req.page_token.is_empty() { 0 } else { req.page_token .parse() .map_err(|_| Status::invalid_argument("Invalid page token"))? }; let limit = if req.page_size > 0 { req.page_size as usize } else { 50 // Default page size }; let transactions = self .storage .get_transactions(&project_id, limit + 1, offset) .await .map_err(map_storage_error)?; // Check if there are more results let has_more = transactions.len() > limit; let transactions: Vec<_> = transactions .into_iter() .take(limit) .map(|t| transaction_to_proto(&t)) .collect(); let next_page_token = if has_more { (offset + limit).to_string() } else { String::new() }; Ok(Response::new(GetTransactionsResponse { transactions, next_page_token, })) } async fn check_quota( &self, request: Request, ) -> Result, Status> { let tenant = self.tenant_context(&request)?; let req = request.into_inner(); if req.project_id.is_empty() { return Err(Status::invalid_argument("project_id is required")); } let (org_id, project_id) = self.resolve_project_scope(tenant.as_ref(), None, &req.project_id)?; self.authorize_project_action( tenant.as_ref(), ACTION_QUOTA_READ, "quota", &project_id, &org_id, &project_id, ) .await?; let resource_type = proto_to_resource_type(req.resource_type)?; info!( project_id = %project_id, resource_type = ?resource_type, quantity = req.quantity, "CheckQuota request" ); // Get wallet let wallet = self .storage .get_wallet(&project_id) .await .map_err(map_storage_error)? .ok_or_else(|| Status::not_found(format!("Wallet not found: {}", project_id)))?; // Check balance let available_balance = wallet.available_balance(); let balance_ok = req.estimated_cost <= 0 || available_balance >= req.estimated_cost; // Check quota let quota = self .storage .get_quota(&project_id, resource_type) .await .map_err(map_storage_error)?; let (quota_ok, available_quota) = match "a { Some(q) => (q.allows(req.quantity as i64), q.remaining()), None => (true, i64::MAX), // No quota = unlimited }; let allowed = balance_ok && quota_ok && wallet.status == WalletStatus::Active; let reason = if !allowed { if wallet.status != WalletStatus::Active { format!("Wallet status: {:?}", wallet.status) } else if !balance_ok { format!( "Insufficient balance: available={}, required={}", available_balance, req.estimated_cost ) } else { format!( "Quota exceeded for {:?}: available={}, required={}", resource_type, available_quota, req.quantity ) } } else { String::new() }; Ok(Response::new(CheckQuotaResponse { allowed, reason, available_balance, available_quota, })) } async fn reserve_credits( &self, request: Request, ) -> Result, Status> { let tenant = self.tenant_context(&request)?; let req = request.into_inner(); info!( project_id = %req.project_id, amount = req.amount, "ReserveCredits request" ); if req.project_id.is_empty() { return Err(Status::invalid_argument("project_id is required")); } let (org_id, project_id) = self.resolve_project_scope(tenant.as_ref(), None, &req.project_id)?; self.authorize_project_action( tenant.as_ref(), ACTION_RESERVATION_CREATE, "reservation", "*", &org_id, &project_id, ) .await?; if req.amount <= 0 { return Err(Status::invalid_argument("Amount must be positive")); } // Get wallet and check available balance let mut wallet = self .storage .get_wallet(&project_id) .await .map_err(map_storage_error)? .ok_or_else(|| Status::not_found(format!("Wallet not found: {}", project_id)))?; if wallet.status != WalletStatus::Active { return Err(Status::failed_precondition(format!( "Wallet status: {:?}", wallet.status ))); } if wallet.available_balance() < req.amount { return Err(Status::failed_precondition(format!( "Insufficient balance: available={}, required={}", wallet.available_balance(), req.amount ))); } // Create reservation let ttl_seconds = if req.ttl_seconds > 0 { req.ttl_seconds as i64 } else { 300 // Default 5 minutes }; let reservation = Reservation::new(project_id.clone(), req.amount, req.description, ttl_seconds); // Update wallet reserved amount wallet.reserved += req.amount; wallet.updated_at = Utc::now(); // Persist self.storage .update_wallet(wallet) .await .map_err(map_storage_error)?; let created = self .storage .create_reservation(reservation) .await .map_err(map_storage_error)?; Ok(Response::new(ReserveCreditsResponse { reservation: Some(reservation_to_proto(&created)), })) } async fn commit_reservation( &self, request: Request, ) -> Result, Status> { let tenant = self.tenant_context(&request)?; let req = request.into_inner(); info!( reservation_id = %req.reservation_id, actual_amount = req.actual_amount, "CommitReservation request" ); // Get reservation let mut reservation = self .storage .get_reservation(&req.reservation_id) .await .map_err(map_storage_error)? .ok_or_else(|| { Status::not_found(format!("Reservation not found: {}", req.reservation_id)) })?; if !reservation.can_commit() { return Err(Status::failed_precondition(format!( "Reservation cannot be committed: status={:?}, expired={}", reservation.status, reservation.is_expired() ))); } // Get wallet let mut wallet = self .storage .get_wallet(&reservation.project_id) .await .map_err(map_storage_error)? .ok_or_else(|| { Status::not_found(format!("Wallet not found: {}", reservation.project_id)) })?; let (org_id, project_id) = self.resolve_project_scope( tenant.as_ref(), Some(wallet.org_id.as_str()), &reservation.project_id, )?; self.authorize_project_action( tenant.as_ref(), ACTION_RESERVATION_COMMIT, "reservation", &reservation.id, &org_id, &project_id, ) .await?; // Calculate actual charge (may differ from reserved) let charge_amount = if req.actual_amount > 0 { req.actual_amount } else { reservation.amount }; // Update wallet: deduct from both reserved and balance wallet.reserved -= reservation.amount; wallet.balance -= charge_amount; wallet.total_consumed += charge_amount; wallet.updated_at = Utc::now(); // Suspend if balance drops below zero if wallet.balance <= 0 { wallet.status = WalletStatus::Suspended; } // Create transaction let txn = Transaction::new_with_resource( wallet.project_id.clone(), TransactionType::Charge, -charge_amount, // Negative for deduction wallet.balance, reservation.description.clone(), Some(req.resource_id), ); // Update reservation status reservation.status = ReservationStatus::Committed; // Persist let updated_wallet = self .storage .update_wallet(wallet) .await .map_err(map_storage_error)?; self.storage .update_reservation(reservation) .await .map_err(map_storage_error)?; let saved_txn = self .storage .add_transaction(txn) .await .map_err(map_storage_error)?; Ok(Response::new(CommitReservationResponse { transaction: Some(transaction_to_proto(&saved_txn)), wallet: Some(wallet_to_proto(&updated_wallet)), })) } async fn release_reservation( &self, request: Request, ) -> Result, Status> { let tenant = self.tenant_context(&request)?; let req = request.into_inner(); info!( reservation_id = %req.reservation_id, reason = %req.reason, "ReleaseReservation request" ); // Get reservation let mut reservation = self .storage .get_reservation(&req.reservation_id) .await .map_err(map_storage_error)? .ok_or_else(|| { Status::not_found(format!("Reservation not found: {}", req.reservation_id)) })?; if reservation.status != ReservationStatus::Pending { return Err(Status::failed_precondition(format!( "Reservation cannot be released: status={:?}", reservation.status ))); } // Get wallet and release reserved amount let mut wallet = self .storage .get_wallet(&reservation.project_id) .await .map_err(map_storage_error)? .ok_or_else(|| { Status::not_found(format!("Wallet not found: {}", reservation.project_id)) })?; let (org_id, project_id) = self.resolve_project_scope( tenant.as_ref(), Some(wallet.org_id.as_str()), &reservation.project_id, )?; self.authorize_project_action( tenant.as_ref(), ACTION_RESERVATION_RELEASE, "reservation", &reservation.id, &org_id, &project_id, ) .await?; wallet.reserved -= reservation.amount; wallet.updated_at = Utc::now(); // Update reservation status reservation.status = ReservationStatus::Released; // Persist self.storage .update_wallet(wallet) .await .map_err(map_storage_error)?; self.storage .update_reservation(reservation) .await .map_err(map_storage_error)?; Ok(Response::new(ReleaseReservationResponse { success: true })) } async fn process_billing( &self, request: Request, ) -> Result, Status> { let tenant = self.tenant_context(&request)?; let req = request.into_inner(); if req.project_id.is_empty() { return Err(Status::invalid_argument("project_id is required")); } let (org_id, project_id) = self.resolve_project_scope(tenant.as_ref(), None, &req.project_id)?; self.authorize_project_action( tenant.as_ref(), ACTION_BILLING_PROCESS, "billing", &project_id, &org_id, &project_id, ) .await?; // Parse billing period let period_start = req .billing_period_start .map(|ts| { DateTime::from_timestamp(ts.seconds, ts.nanos as u32).unwrap_or_else(Utc::now) }) .unwrap_or_else(|| Utc::now() - chrono::Duration::hours(1)); let period_end = req .billing_period_end .map(|ts| { DateTime::from_timestamp(ts.seconds, ts.nanos as u32).unwrap_or_else(Utc::now) }) .unwrap_or_else(Utc::now); info!( project_id = %project_id, period_start = %period_start, period_end = %period_end, "ProcessBilling request" ); // Get usage provider let usage_provider_guard = self.usage_provider.read().await; let usage_provider = match usage_provider_guard.as_ref() { Some(p) => p.clone(), None => { warn!("No usage metrics provider configured, billing will show zero charges"); return Ok(Response::new(ProcessBillingResponse { projects_processed: 0, total_charged: 0, results: vec![], })); } }; drop(usage_provider_guard); // Get list of projects to bill let project_ids = vec![project_id.clone()]; let mut results = Vec::new(); let mut total_charged: i64 = 0; for project_id in &project_ids { let result = self .process_project_billing(project_id, period_start, period_end, &usage_provider) .await; match result { Ok(amount) => { total_charged += amount; results.push(ProtoBillingResult { project_id: project_id.clone(), amount_charged: amount, success: true, error: String::new(), }); } Err(e) => { warn!(project_id = %project_id, error = %e, "Billing failed for project"); results.push(ProtoBillingResult { project_id: project_id.clone(), amount_charged: 0, success: false, error: e.to_string(), }); } } } Ok(Response::new(ProcessBillingResponse { projects_processed: results.len() as i32, total_charged, results, })) } async fn set_quota( &self, request: Request, ) -> Result, Status> { let tenant = self.tenant_context(&request)?; let req = request.into_inner(); if req.project_id.is_empty() { return Err(Status::invalid_argument("project_id is required")); } let resource_type = proto_to_resource_type(req.resource_type)?; let (org_id, project_id) = self.resolve_project_scope(tenant.as_ref(), None, &req.project_id)?; self.authorize_project_action( tenant.as_ref(), ACTION_QUOTA_SET, "quota", &project_id, &org_id, &project_id, ) .await?; info!( project_id = %project_id, resource_type = ?resource_type, limit = req.limit, "SetQuota request" ); let quota = Quota::new(project_id, resource_type, req.limit); let saved = self .storage .set_quota(quota) .await .map_err(map_storage_error)?; Ok(Response::new(SetQuotaResponse { quota: Some(quota_to_proto(&saved)), })) } async fn get_quota( &self, request: Request, ) -> Result, Status> { let tenant = self.tenant_context(&request)?; let req = request.into_inner(); if req.project_id.is_empty() { return Err(Status::invalid_argument("project_id is required")); } let resource_type = proto_to_resource_type(req.resource_type)?; let (org_id, project_id) = self.resolve_project_scope(tenant.as_ref(), None, &req.project_id)?; self.authorize_project_action( tenant.as_ref(), ACTION_QUOTA_READ, "quota", &project_id, &org_id, &project_id, ) .await?; info!( project_id = %project_id, resource_type = ?resource_type, "GetQuota request" ); let quota = self .storage .get_quota(&project_id, resource_type) .await .map_err(map_storage_error)?; match quota { Some(q) => Ok(Response::new(GetQuotaResponse { quota: Some(quota_to_proto(&q)), })), None => Err(Status::not_found(format!( "Quota not found for {:?}", resource_type ))), } } async fn list_quotas( &self, request: Request, ) -> Result, Status> { let tenant = self.tenant_context(&request)?; let req = request.into_inner(); if req.project_id.is_empty() { return Err(Status::invalid_argument("project_id is required")); } let (org_id, project_id) = self.resolve_project_scope(tenant.as_ref(), None, &req.project_id)?; self.authorize_project_action( tenant.as_ref(), ACTION_QUOTA_READ, "quota", &project_id, &org_id, &project_id, ) .await?; info!(project_id = %project_id, "ListQuotas request"); let quotas = self .storage .list_quotas(&project_id) .await .map_err(map_storage_error)?; Ok(Response::new(ListQuotasResponse { quotas: quotas.iter().map(quota_to_proto).collect(), })) } } #[cfg(test)] mod tests { use super::*; use crate::storage::InMemoryStorage; #[tokio::test] async fn test_create_and_get_wallet() { let storage = InMemoryStorage::new(); let service = CreditServiceImpl::new_for_tests(storage); // Create wallet let create_req = Request::new(CreateWalletRequest { project_id: "proj-test".into(), org_id: "org-test".into(), initial_balance: 10000, }); let create_resp = service.create_wallet(create_req).await.unwrap(); let wallet = create_resp.into_inner().wallet.unwrap(); assert_eq!(wallet.project_id, "proj-test"); assert_eq!(wallet.balance, 10000); // Get wallet let get_req = Request::new(GetWalletRequest { project_id: "proj-test".into(), }); let get_resp = service.get_wallet(get_req).await.unwrap(); let wallet = get_resp.into_inner().wallet.unwrap(); assert_eq!(wallet.balance, 10000); } #[tokio::test] async fn test_create_wallet_records_initial_funding_transaction() { let storage = InMemoryStorage::new(); let service = CreditServiceImpl::new_for_tests(storage); let create_req = Request::new(CreateWalletRequest { project_id: "proj-test".into(), org_id: "org-test".into(), initial_balance: 10000, }); service.create_wallet(create_req).await.unwrap(); let get_txn_req = Request::new(GetTransactionsRequest { project_id: "proj-test".into(), page_size: 10, page_token: String::new(), type_filter: 0, start_time: None, end_time: None, }); let resp = service.get_transactions(get_txn_req).await.unwrap(); let inner = resp.into_inner(); assert_eq!(inner.transactions.len(), 1); assert_eq!(inner.transactions[0].amount, 10000); assert_eq!( inner.transactions[0].r#type, ProtoTransactionType::TopUp as i32 ); } #[tokio::test] async fn test_top_up() { let storage = InMemoryStorage::new(); let service = CreditServiceImpl::new_for_tests(storage); // Create wallet let create_req = Request::new(CreateWalletRequest { project_id: "proj-test".into(), org_id: "org-test".into(), initial_balance: 5000, }); service.create_wallet(create_req).await.unwrap(); // Top up let top_up_req = Request::new(TopUpRequest { project_id: "proj-test".into(), amount: 3000, description: "Test top-up".into(), }); let top_up_resp = service.top_up(top_up_req).await.unwrap(); let inner = top_up_resp.into_inner(); let wallet = inner.wallet.unwrap(); let txn = inner.transaction.unwrap(); assert_eq!(wallet.balance, 8000); assert_eq!(txn.amount, 3000); assert_eq!(txn.balance_after, 8000); } #[tokio::test] async fn test_get_transactions() { let storage = InMemoryStorage::new(); let service = CreditServiceImpl::new_for_tests(storage); // Create wallet let create_req = Request::new(CreateWalletRequest { project_id: "proj-test".into(), org_id: "org-test".into(), initial_balance: 10000, }); service.create_wallet(create_req).await.unwrap(); // Multiple top-ups for i in 1..=5 { let top_up_req = Request::new(TopUpRequest { project_id: "proj-test".into(), amount: 1000, description: format!("Top-up #{}", i), }); service.top_up(top_up_req).await.unwrap(); } // Get transactions let get_txn_req = Request::new(GetTransactionsRequest { project_id: "proj-test".into(), page_size: 3, page_token: String::new(), type_filter: 0, start_time: None, end_time: None, }); let resp = service.get_transactions(get_txn_req).await.unwrap(); let inner = resp.into_inner(); assert_eq!(inner.transactions.len(), 3); assert!(!inner.next_page_token.is_empty()); } #[tokio::test] async fn test_wallet_not_found() { let storage = InMemoryStorage::new(); let service = CreditServiceImpl::new_for_tests(storage); let get_req = Request::new(GetWalletRequest { project_id: "nonexistent".into(), }); let result = service.get_wallet(get_req).await; assert!(result.is_err()); let status = result.unwrap_err(); assert_eq!(status.code(), tonic::Code::NotFound); } // S4 Admission Control Tests #[tokio::test] async fn test_check_quota_allowed() { let storage = InMemoryStorage::new(); let service = CreditServiceImpl::new_for_tests(storage); // Create wallet with sufficient balance let create_req = Request::new(CreateWalletRequest { project_id: "proj-test".into(), org_id: "org-test".into(), initial_balance: 100000, }); service.create_wallet(create_req).await.unwrap(); // Check quota - should be allowed let check_req = Request::new(CheckQuotaRequest { project_id: "proj-test".into(), resource_type: ProtoResourceType::VmInstance as i32, quantity: 1, estimated_cost: 5000, }); let resp = service.check_quota(check_req).await.unwrap(); let inner = resp.into_inner(); assert!(inner.allowed); assert!(inner.reason.is_empty()); } #[tokio::test] async fn test_check_quota_insufficient_balance() { let storage = InMemoryStorage::new(); let service = CreditServiceImpl::new_for_tests(storage); // Create wallet with low balance let create_req = Request::new(CreateWalletRequest { project_id: "proj-test".into(), org_id: "org-test".into(), initial_balance: 1000, }); service.create_wallet(create_req).await.unwrap(); // Check quota - should be denied due to balance let check_req = Request::new(CheckQuotaRequest { project_id: "proj-test".into(), resource_type: ProtoResourceType::VmInstance as i32, quantity: 1, estimated_cost: 5000, }); let resp = service.check_quota(check_req).await.unwrap(); let inner = resp.into_inner(); assert!(!inner.allowed); assert!(inner.reason.contains("Insufficient balance")); } #[tokio::test] async fn test_reserve_and_commit() { let storage = InMemoryStorage::new(); let service = CreditServiceImpl::new_for_tests(storage); // Create wallet let create_req = Request::new(CreateWalletRequest { project_id: "proj-test".into(), org_id: "org-test".into(), initial_balance: 10000, }); service.create_wallet(create_req).await.unwrap(); // Reserve credits let reserve_req = Request::new(ReserveCreditsRequest { project_id: "proj-test".into(), amount: 3000, description: "VM creation".into(), resource_type: "vm_instance".into(), ttl_seconds: 300, }); let reserve_resp = service.reserve_credits(reserve_req).await.unwrap(); let reservation = reserve_resp.into_inner().reservation.unwrap(); assert_eq!(reservation.amount, 3000); assert_eq!(reservation.status, ProtoReservationStatus::Pending as i32); // Verify wallet has reserved amount let get_req = Request::new(GetWalletRequest { project_id: "proj-test".into(), }); let wallet = service .get_wallet(get_req) .await .unwrap() .into_inner() .wallet .unwrap(); assert_eq!(wallet.balance, 10000); assert_eq!(wallet.reserved, 3000); // Commit reservation let commit_req = Request::new(CommitReservationRequest { reservation_id: reservation.id.clone(), actual_amount: 2500, // Slightly less than reserved resource_id: "vm-123".into(), }); let commit_resp = service.commit_reservation(commit_req).await.unwrap(); let inner = commit_resp.into_inner(); let wallet = inner.wallet.unwrap(); let txn = inner.transaction.unwrap(); assert_eq!(wallet.balance, 7500); // 10000 - 2500 assert_eq!(wallet.reserved, 0); assert_eq!(txn.amount, -2500); } #[tokio::test] async fn test_reserve_and_release() { let storage = InMemoryStorage::new(); let service = CreditServiceImpl::new_for_tests(storage); // Create wallet let create_req = Request::new(CreateWalletRequest { project_id: "proj-test".into(), org_id: "org-test".into(), initial_balance: 10000, }); service.create_wallet(create_req).await.unwrap(); // Reserve credits let reserve_req = Request::new(ReserveCreditsRequest { project_id: "proj-test".into(), amount: 5000, description: "VM creation".into(), resource_type: "vm_instance".into(), ttl_seconds: 300, }); let reserve_resp = service.reserve_credits(reserve_req).await.unwrap(); let reservation = reserve_resp.into_inner().reservation.unwrap(); // Release reservation let release_req = Request::new(ReleaseReservationRequest { reservation_id: reservation.id.clone(), reason: "Creation cancelled".into(), }); let release_resp = service.release_reservation(release_req).await.unwrap(); assert!(release_resp.into_inner().success); // Verify wallet reserved is back to 0 let get_req = Request::new(GetWalletRequest { project_id: "proj-test".into(), }); let wallet = service .get_wallet(get_req) .await .unwrap() .into_inner() .wallet .unwrap(); assert_eq!(wallet.balance, 10000); assert_eq!(wallet.reserved, 0); } #[tokio::test] async fn test_reserve_insufficient_balance() { let storage = InMemoryStorage::new(); let service = CreditServiceImpl::new_for_tests(storage); // Create wallet with low balance let create_req = Request::new(CreateWalletRequest { project_id: "proj-test".into(), org_id: "org-test".into(), initial_balance: 1000, }); service.create_wallet(create_req).await.unwrap(); // Try to reserve more than available let reserve_req = Request::new(ReserveCreditsRequest { project_id: "proj-test".into(), amount: 5000, description: "VM creation".into(), resource_type: "vm_instance".into(), ttl_seconds: 300, }); let result = service.reserve_credits(reserve_req).await; assert!(result.is_err()); assert_eq!(result.unwrap_err().code(), tonic::Code::FailedPrecondition); } #[tokio::test] async fn test_quota_management() { let storage = InMemoryStorage::new(); let service = CreditServiceImpl::new_for_tests(storage); // Set quota let set_req = Request::new(SetQuotaRequest { project_id: "proj-test".into(), resource_type: ProtoResourceType::VmInstance as i32, limit: 10, }); let set_resp = service.set_quota(set_req).await.unwrap(); let quota = set_resp.into_inner().quota.unwrap(); assert_eq!(quota.limit, 10); // Get quota let get_req = Request::new(GetQuotaRequest { project_id: "proj-test".into(), resource_type: ProtoResourceType::VmInstance as i32, }); let get_resp = service.get_quota(get_req).await.unwrap(); let quota = get_resp.into_inner().quota.unwrap(); assert_eq!(quota.limit, 10); // List quotas let list_req = Request::new(ListQuotasRequest { project_id: "proj-test".into(), }); let list_resp = service.list_quotas(list_req).await.unwrap(); assert_eq!(list_resp.into_inner().quotas.len(), 1); } #[tokio::test] async fn test_check_quota_with_quota_limit() { let storage = InMemoryStorage::new(); let service = CreditServiceImpl::new_for_tests(storage); // Create wallet let create_req = Request::new(CreateWalletRequest { project_id: "proj-test".into(), org_id: "org-test".into(), initial_balance: 100000, }); service.create_wallet(create_req).await.unwrap(); // Set a tight quota let set_req = Request::new(SetQuotaRequest { project_id: "proj-test".into(), resource_type: ProtoResourceType::VmInstance as i32, limit: 2, }); service.set_quota(set_req).await.unwrap(); // Check quota for 3 VMs - should be denied let check_req = Request::new(CheckQuotaRequest { project_id: "proj-test".into(), resource_type: ProtoResourceType::VmInstance as i32, quantity: 3, estimated_cost: 1000, }); let resp = service.check_quota(check_req).await.unwrap(); let inner = resp.into_inner(); assert!(!inner.allowed); assert!(inner.reason.contains("Quota exceeded")); } // S6 Billing Tests #[tokio::test] async fn test_process_billing_no_provider() { let storage = InMemoryStorage::new(); let service = CreditServiceImpl::new_for_tests(storage); // Create wallet let create_req = Request::new(CreateWalletRequest { project_id: "proj-test".into(), org_id: "org-test".into(), initial_balance: 10000, }); service.create_wallet(create_req).await.unwrap(); // Process billing without provider - should return empty let billing_req = Request::new(ProcessBillingRequest { project_id: "proj-test".into(), billing_period_start: None, billing_period_end: None, }); let resp = service.process_billing(billing_req).await.unwrap(); let inner = resp.into_inner(); assert_eq!(inner.projects_processed, 0); assert_eq!(inner.total_charged, 0); } #[tokio::test] async fn test_process_billing_with_usage() { use crate::billing::{MockUsageMetricsProvider, PricingRules, ResourceUsage, UsageMetrics}; use std::collections::HashMap; let storage = InMemoryStorage::new(); let mut mock_provider = MockUsageMetricsProvider::new(); // Add mock usage data let mut usage = UsageMetrics { project_id: "proj-test".into(), resource_usage: HashMap::new(), period_start: Utc::now() - chrono::Duration::hours(1), period_end: Utc::now(), }; usage.resource_usage.insert( ResourceType::VmInstance, ResourceUsage::new(ResourceType::VmInstance, 10.0, "hours"), ); mock_provider.add_usage("proj-test".into(), usage); let service = CreditServiceImpl::with_billing_for_tests( storage, Arc::new(mock_provider), PricingRules::default(), ); // Create wallet let create_req = Request::new(CreateWalletRequest { project_id: "proj-test".into(), org_id: "org-test".into(), initial_balance: 10000, }); service.create_wallet(create_req).await.unwrap(); // Process billing let billing_req = Request::new(ProcessBillingRequest { project_id: "proj-test".into(), billing_period_start: None, billing_period_end: None, }); let resp = service.process_billing(billing_req).await.unwrap(); let inner = resp.into_inner(); assert_eq!(inner.projects_processed, 1); // 10 hours * 100 credits/hour = 1000 credits assert_eq!(inner.total_charged, 1000); assert!(inner.results[0].success); assert_eq!(inner.results[0].amount_charged, 1000); // Verify wallet balance was deducted let get_req = Request::new(GetWalletRequest { project_id: "proj-test".into(), }); let wallet = service .get_wallet(get_req) .await .unwrap() .into_inner() .wallet .unwrap(); assert_eq!(wallet.balance, 9000); // 10000 - 1000 } #[tokio::test] async fn test_process_billing_suspends_wallet() { use crate::billing::{MockUsageMetricsProvider, PricingRules, ResourceUsage, UsageMetrics}; use std::collections::HashMap; let storage = InMemoryStorage::new(); let mut mock_provider = MockUsageMetricsProvider::new(); // Add large usage that will exhaust wallet let mut usage = UsageMetrics { project_id: "proj-test".into(), resource_usage: HashMap::new(), period_start: Utc::now() - chrono::Duration::hours(1), period_end: Utc::now(), }; usage.resource_usage.insert( ResourceType::VmInstance, ResourceUsage::new(ResourceType::VmInstance, 100.0, "hours"), // 10000 credits ); mock_provider.add_usage("proj-test".into(), usage); let service = CreditServiceImpl::with_billing_for_tests( storage, Arc::new(mock_provider), PricingRules::default(), ); // Create wallet with exact amount (will go to 0) let create_req = Request::new(CreateWalletRequest { project_id: "proj-test".into(), org_id: "org-test".into(), initial_balance: 10000, }); service.create_wallet(create_req).await.unwrap(); // Process billing let billing_req = Request::new(ProcessBillingRequest { project_id: "proj-test".into(), billing_period_start: None, billing_period_end: None, }); let resp = service.process_billing(billing_req).await.unwrap(); let inner = resp.into_inner(); assert_eq!(inner.total_charged, 10000); // Verify wallet is suspended let get_req = Request::new(GetWalletRequest { project_id: "proj-test".into(), }); let wallet = service .get_wallet(get_req) .await .unwrap() .into_inner() .wallet .unwrap(); assert_eq!(wallet.balance, 0); assert_eq!(wallet.status, ProtoWalletStatus::Suspended as i32); } }