photoncloud-monorepo/creditservice/crates/creditservice-api/src/nightlight.rs
centra d2149b6249 fix(lightningstor): Fix SigV4 canonicalization for AWS S3 auth
- Replace form_urlencoded with RFC 3986 compliant URI encoding
- Implement aws_uri_encode() matching AWS SigV4 spec exactly
- Unreserved chars (A-Z,a-z,0-9,-,_,.,~) not encoded
- All other chars percent-encoded with uppercase hex
- Preserve slashes in paths, encode in query params
- Normalize empty paths to '/' per AWS spec
- Fix test expectations (body hash, HMAC values)
- Add comprehensive SigV4 signature determinism test

This fixes the canonicalization mismatch that caused signature
validation failures in T047. Auth can now be enabled for production.

Refs: T058.S1
2025-12-12 06:23:46 +09:00

421 lines
14 KiB
Rust

//! NightLight (Nightlight) integration for usage metrics
//!
//! This module provides a client for querying usage metrics from NightLight,
//! enabling the billing batch process to calculate charges based on actual
//! resource consumption.
use crate::billing::{ResourceUsage, UsageMetrics, UsageMetricsProvider};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use creditservice_types::{Error, ResourceType, Result};
use reqwest::Client;
use serde::Deserialize;
use std::collections::HashMap;
use std::sync::Arc;
use tracing::{debug, info, warn};
/// NightLight (Nightlight) client for usage metrics queries
#[derive(Clone)]
pub struct NightLightClient {
client: Client,
base_url: String,
}
/// Prometheus API response format
#[derive(Debug, Deserialize)]
struct PrometheusResponse {
status: String,
data: Option<PrometheusData>,
error: Option<String>,
}
#[derive(Debug, Deserialize)]
struct PrometheusData {
#[serde(rename = "resultType")]
result_type: String,
result: Vec<PrometheusResult>,
}
#[derive(Debug, Clone, Deserialize)]
struct PrometheusResult {
metric: HashMap<String, String>,
value: Option<(f64, String)>, // For instant queries
values: Option<Vec<(f64, String)>>, // For range queries
}
impl NightLightClient {
/// Create a new NightLight client
pub fn new(endpoint: &str) -> Self {
Self {
client: Client::new(),
base_url: endpoint.trim_end_matches('/').to_string(),
}
}
/// Create a NightLight client wrapped in Arc for sharing
pub fn new_shared(endpoint: &str) -> Arc<Self> {
Arc::new(Self::new(endpoint))
}
/// Query usage for a specific resource type
async fn query_resource_usage(
&self,
project_id: &str,
resource_type: ResourceType,
period_start: DateTime<Utc>,
period_end: DateTime<Utc>,
) -> Result<Option<ResourceUsage>> {
let (query, unit) = Self::build_promql(project_id, resource_type, period_start, period_end);
debug!(
project_id = %project_id,
resource_type = ?resource_type,
query = %query,
"Executing PromQL query"
);
let response = self
.client
.get(format!("{}/api/v1/query", self.base_url))
.query(&[
("query", query.as_str()),
("time", &period_end.timestamp_millis().to_string()),
])
.send()
.await
.map_err(|e| Error::Internal(format!("NightLight request failed: {}", e)))?;
if !response.status().is_success() {
return Err(Error::Internal(format!(
"NightLight returned error status: {}",
response.status()
)));
}
let prom_response: PrometheusResponse = response
.json()
.await
.map_err(|e| Error::Internal(format!("Failed to parse NightLight response: {}", e)))?;
if prom_response.status != "success" {
return Err(Error::Internal(format!(
"NightLight query failed: {}",
prom_response.error.unwrap_or_default()
)));
}
// Extract the value from the response
let quantity = prom_response
.data
.and_then(|d| d.result.first().cloned())
.and_then(|r| r.value)
.map(|(_, v)| v.parse::<f64>().unwrap_or(0.0))
.unwrap_or(0.0);
if quantity > 0.0 {
Ok(Some(ResourceUsage {
resource_type,
quantity,
unit,
}))
} else {
Ok(None)
}
}
/// Build PromQL query for a resource type
fn build_promql(
project_id: &str,
resource_type: ResourceType,
period_start: DateTime<Utc>,
period_end: DateTime<Utc>,
) -> (String, String) {
let duration_hours = (period_end - period_start).num_hours().max(1);
let duration_str = format!("{}h", duration_hours);
match resource_type {
ResourceType::VmCpu => {
// CPU hours: sum of CPU seconds converted to hours
let query = format!(
r#"sum by (project_id) (increase(vm_cpu_seconds_total{{project_id="{}"}}[{}])) / 3600"#,
project_id, duration_str
);
(query, "cpu-hours".to_string())
}
ResourceType::VmMemoryGb => {
// Memory GB-hours: average memory over time
let query = format!(
r#"sum by (project_id) (avg_over_time(vm_memory_bytes{{project_id="{}"}}[{}])) / (1024*1024*1024)"#,
project_id, duration_str
);
(query, "gb-hours".to_string())
}
ResourceType::StorageGb => {
// Storage GB-hours: average storage over time
let query = format!(
r#"sum by (project_id) (avg_over_time(storage_bytes_total{{project_id="{}"}}[{}])) / (1024*1024*1024)"#,
project_id, duration_str
);
(query, "gb-hours".to_string())
}
ResourceType::VmInstance => {
// Instance hours: count of running instances over time
let query = format!(
r#"sum by (project_id) (count_over_time(vm_instance_running{{project_id="{}"}}[{}])) / (60 * {})"#,
project_id, duration_str, duration_hours
);
(query, "instance-hours".to_string())
}
ResourceType::NetworkPort => {
let query = format!(
r#"sum by (project_id) (count_over_time(network_port_active{{project_id="{}"}}[{}])) / (60 * {})"#,
project_id, duration_str, duration_hours
);
(query, "port-hours".to_string())
}
ResourceType::LoadBalancer => {
let query = format!(
r#"sum by (project_id) (count_over_time(lb_instance_active{{project_id="{}"}}[{}])) / (60 * {})"#,
project_id, duration_str, duration_hours
);
(query, "lb-hours".to_string())
}
ResourceType::DnsZone => {
let query = format!(
r#"count(dns_zone_active{{project_id="{}"}})"#,
project_id
);
(query, "zones".to_string())
}
ResourceType::DnsRecord => {
let query = format!(
r#"count(dns_record_active{{project_id="{}"}})"#,
project_id
);
(query, "records".to_string())
}
ResourceType::K8sCluster => {
let query = format!(
r#"sum by (project_id) (count_over_time(k8s_cluster_running{{project_id="{}"}}[{}])) / (60 * {})"#,
project_id, duration_str, duration_hours
);
(query, "cluster-hours".to_string())
}
ResourceType::K8sNode => {
let query = format!(
r#"sum by (project_id) (count_over_time(k8s_node_running{{project_id="{}"}}[{}])) / (60 * {})"#,
project_id, duration_str, duration_hours
);
(query, "node-hours".to_string())
}
}
}
/// Health check - verify NightLight connectivity
pub async fn health_check(&self) -> Result<()> {
let response = self
.client
.get(format!("{}/api/v1/query", self.base_url))
.query(&[("query", "up")])
.send()
.await
.map_err(|e| Error::Internal(format!("NightLight health check failed: {}", e)))?;
if response.status().is_success() {
Ok(())
} else {
Err(Error::Internal(format!(
"NightLight health check returned: {}",
response.status()
)))
}
}
}
#[async_trait]
impl UsageMetricsProvider for NightLightClient {
async fn get_usage_metrics(
&self,
project_id: &str,
period_start: DateTime<Utc>,
period_end: DateTime<Utc>,
) -> Result<UsageMetrics> {
info!(
project_id = %project_id,
period_start = %period_start,
period_end = %period_end,
"Querying NightLight for usage metrics"
);
let mut resource_usage = HashMap::new();
// Query each resource type
for resource_type in [
ResourceType::VmInstance,
ResourceType::VmCpu,
ResourceType::VmMemoryGb,
ResourceType::StorageGb,
ResourceType::NetworkPort,
ResourceType::LoadBalancer,
ResourceType::DnsZone,
ResourceType::DnsRecord,
ResourceType::K8sCluster,
ResourceType::K8sNode,
] {
match self
.query_resource_usage(project_id, resource_type, period_start, period_end)
.await
{
Ok(Some(usage)) => {
resource_usage.insert(resource_type, usage);
}
Ok(None) => {
// No usage for this resource type
}
Err(e) => {
warn!(
project_id = %project_id,
resource_type = ?resource_type,
error = %e,
"Failed to query resource usage, skipping"
);
}
}
}
Ok(UsageMetrics {
project_id: project_id.to_string(),
resource_usage,
period_start,
period_end,
})
}
async fn list_projects_with_usage(
&self,
period_start: DateTime<Utc>,
period_end: DateTime<Utc>,
) -> Result<Vec<String>> {
let duration_hours = (period_end - period_start).num_hours().max(1);
let duration_str = format!("{}h", duration_hours);
// Query for all project_ids with any metric in the period
let query = format!(
r#"group by (project_id) ({{project_id=~".+"}}[{}])"#,
duration_str
);
debug!(query = %query, "Listing projects with usage");
let response = self
.client
.get(format!("{}/api/v1/query", self.base_url))
.query(&[
("query", query.as_str()),
("time", &period_end.timestamp_millis().to_string()),
])
.send()
.await
.map_err(|e| Error::Internal(format!("NightLight request failed: {}", e)))?;
if !response.status().is_success() {
return Err(Error::Internal(format!(
"NightLight returned error status: {}",
response.status()
)));
}
let prom_response: PrometheusResponse = response
.json()
.await
.map_err(|e| Error::Internal(format!("Failed to parse NightLight response: {}", e)))?;
if prom_response.status != "success" {
return Err(Error::Internal(format!(
"NightLight query failed: {}",
prom_response.error.unwrap_or_default()
)));
}
// Extract project_ids from results
let project_ids: Vec<String> = prom_response
.data
.map(|d| {
d.result
.into_iter()
.filter_map(|r| r.metric.get("project_id").cloned())
.collect()
})
.unwrap_or_default();
Ok(project_ids)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_build_promql_cpu() {
let start = DateTime::parse_from_rfc3339("2025-12-11T00:00:00Z")
.unwrap()
.with_timezone(&Utc);
let end = DateTime::parse_from_rfc3339("2025-12-11T01:00:00Z")
.unwrap()
.with_timezone(&Utc);
let (query, unit) =
NightLightClient::build_promql("proj-1", ResourceType::VmCpu, start, end);
assert!(query.contains("vm_cpu_seconds_total"));
assert!(query.contains("project_id=\"proj-1\""));
assert!(query.contains("[1h]"));
assert_eq!(unit, "cpu-hours");
}
#[test]
fn test_build_promql_storage() {
let start = DateTime::parse_from_rfc3339("2025-12-11T00:00:00Z")
.unwrap()
.with_timezone(&Utc);
let end = DateTime::parse_from_rfc3339("2025-12-11T12:00:00Z")
.unwrap()
.with_timezone(&Utc);
let (query, unit) =
NightLightClient::build_promql("proj-2", ResourceType::StorageGb, start, end);
assert!(query.contains("storage_bytes_total"));
assert!(query.contains("project_id=\"proj-2\""));
assert!(query.contains("[12h]"));
assert_eq!(unit, "gb-hours");
}
#[test]
fn test_build_promql_vm_instance() {
let start = DateTime::parse_from_rfc3339("2025-12-11T00:00:00Z")
.unwrap()
.with_timezone(&Utc);
let end = DateTime::parse_from_rfc3339("2025-12-11T06:00:00Z")
.unwrap()
.with_timezone(&Utc);
let (query, unit) =
NightLightClient::build_promql("proj-3", ResourceType::VmInstance, start, end);
assert!(query.contains("vm_instance_running"));
assert!(query.contains("project_id=\"proj-3\""));
assert!(query.contains("[6h]"));
assert_eq!(unit, "instance-hours");
}
#[test]
fn test_client_creation() {
let client = NightLightClient::new("http://nightlight:8080");
assert_eq!(client.base_url, "http://nightlight:8080");
let client2 = NightLightClient::new("http://nightlight:8080/");
assert_eq!(client2.base_url, "http://nightlight:8080");
}
}