diff --git a/nightlight/crates/nightlight-server/src/config.rs b/nightlight/crates/nightlight-server/src/config.rs index 4626695..dd2300b 100644 --- a/nightlight/crates/nightlight-server/src/config.rs +++ b/nightlight/crates/nightlight-server/src/config.rs @@ -129,17 +129,6 @@ impl Config { Ok(config) } - /// Load configuration from file, or use defaults if file doesn't exist - pub fn load_or_default() -> Result { - match Self::from_file("config.yaml") { - Ok(config) => Ok(config), - Err(_) => { - tracing::warn!("No config file found, using defaults"); - Ok(Self::default()) - } - } - } - /// Save configuration to a YAML file pub fn save(&self, path: &str) -> Result<()> { let content = serde_yaml::to_string(self)?; diff --git a/nightlight/crates/nightlight-server/src/grpc.rs b/nightlight/crates/nightlight-server/src/grpc.rs new file mode 100644 index 0000000..5cee297 --- /dev/null +++ b/nightlight/crates/nightlight-server/src/grpc.rs @@ -0,0 +1,502 @@ +use std::sync::Arc; +use std::time::Instant; + +use chrono::Utc; +use tonic::{Request, Response, Status}; + +use crate::ingestion::IngestionMetrics; +use crate::query::{QueryMetrics, QueryResult as InstantQueryData, QueryService, RangeQueryResult}; +use crate::storage::{Storage, StorageStats}; +use nightlight_api::nightlight::admin_server::Admin; +use nightlight_api::nightlight::metric_query_server::MetricQuery; +use nightlight_api::nightlight::{ + BuildInfoRequest, BuildInfoResponse, ComponentHealth, HealthRequest, HealthResponse, + LabelValuesRequest, LabelValuesResponse, QueryData, QueryResponse, QueryResult, SamplePair, + SeriesLabels, SeriesQueryRequest, SeriesQueryResponse, StatsRequest, StatsResponse, +}; +use nightlight_types::Error; + +#[derive(Clone)] +pub struct MetricQueryServiceImpl { + query: QueryService, +} + +#[derive(Clone)] +pub struct AdminServiceImpl { + storage: Arc, + ingestion_metrics: Arc, + query_metrics: Arc, + started_at: Instant, +} + +impl MetricQueryServiceImpl { + pub fn new(query: QueryService) -> Self { + Self { query } + } +} + +impl AdminServiceImpl { + pub fn new( + storage: Arc, + ingestion_metrics: Arc, + query_metrics: Arc, + ) -> Self { + Self { + storage, + ingestion_metrics, + query_metrics, + started_at: Instant::now(), + } + } +} + +#[tonic::async_trait] +impl MetricQuery for MetricQueryServiceImpl { + async fn instant_query( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + let time = if request.time == 0 { + Utc::now().timestamp_millis() + } else { + request.time + }; + + let response = match self.query.execute_instant_query(&request.query, time).await { + Ok(result) => QueryResponse { + status: "success".to_string(), + data: Some(instant_query_data_to_proto(result)), + error: String::new(), + error_type: String::new(), + warnings: Vec::new(), + }, + Err(error) => QueryResponse { + status: "error".to_string(), + data: None, + error: error.to_string(), + error_type: query_error_type(&error).to_string(), + warnings: Vec::new(), + }, + }; + + Ok(Response::new(response)) + } + + async fn range_query( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + + let response = match self + .query + .execute_range_query(&request.query, request.start, request.end, request.step) + .await + { + Ok(result) => QueryResponse { + status: "success".to_string(), + data: Some(range_query_data_to_proto(result)), + error: String::new(), + error_type: String::new(), + warnings: Vec::new(), + }, + Err(error) => QueryResponse { + status: "error".to_string(), + data: None, + error: error.to_string(), + error_type: query_error_type(&error).to_string(), + warnings: Vec::new(), + }, + }; + + Ok(Response::new(response)) + } + + async fn series_query( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + let response = match self + .query + .series_metadata( + &request.r#match, + optional_millis(request.start), + optional_millis(request.end), + ) + .await + { + Ok(series) => SeriesQueryResponse { + status: "success".to_string(), + data: series + .into_iter() + .map(|labels| SeriesLabels { labels }) + .collect(), + error: String::new(), + }, + Err(error) => SeriesQueryResponse { + status: "error".to_string(), + data: Vec::new(), + error: error.to_string(), + }, + }; + + Ok(Response::new(response)) + } + + async fn label_values_query( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + let response = match self + .query + .label_values_for_matchers( + &request.label_name, + &request.r#match, + optional_millis(request.start), + optional_millis(request.end), + ) + .await + { + Ok(values) => LabelValuesResponse { + status: "success".to_string(), + data: values, + error: String::new(), + }, + Err(error) => LabelValuesResponse { + status: "error".to_string(), + data: Vec::new(), + error: error.to_string(), + }, + }; + + Ok(Response::new(response)) + } +} + +#[tonic::async_trait] +impl Admin for AdminServiceImpl { + async fn health( + &self, + _request: Request, + ) -> Result, Status> { + let storage_result = self.storage.stats().await; + let status = if storage_result.is_ok() { "ok" } else { "degraded" }; + let storage_message = match &storage_result { + Ok(_) => "storage ready".to_string(), + Err(error) => error.to_string(), + }; + + Ok(Response::new(HealthResponse { + status: status.to_string(), + message: "nightlight ready".to_string(), + components: vec![ + ComponentHealth { + name: "storage".to_string(), + status: status.to_string(), + message: storage_message, + }, + ComponentHealth { + name: "ingestion".to_string(), + status: "ok".to_string(), + message: "remote_write endpoint ready".to_string(), + }, + ComponentHealth { + name: "query_engine".to_string(), + status: "ok".to_string(), + message: "http and grpc query paths ready".to_string(), + }, + ], + })) + } + + async fn stats( + &self, + _request: Request, + ) -> Result, Status> { + let storage = self + .storage + .stats() + .await + .map_err(|error| Status::internal(error.to_string()))?; + let ingestion = self.ingestion_metrics.snapshot(); + let query = self.query_metrics.snapshot(); + + Ok(Response::new(StatsResponse { + storage: Some(storage_stats_to_proto(storage)), + ingestion: Some(nightlight_api::nightlight::IngestionStats { + samples_ingested_total: ingestion.samples_ingested_total, + write_requests_total: ingestion.write_requests_total, + write_requests_failed: ingestion.write_requests_failed, + samples_per_second: ingestion.samples_per_second, + buffer_samples: ingestion.buffer_samples, + }), + query: Some(nightlight_api::nightlight::QueryStats { + queries_total: query.queries_total, + queries_failed: query.queries_failed, + queries_active: query.queries_active, + query_duration_p50: query.query_duration_p50, + query_duration_p95: query.query_duration_p95, + query_duration_p99: query.query_duration_p99, + }), + uptime_seconds: self.started_at.elapsed().as_secs(), + })) + } + + async fn build_info( + &self, + _request: Request, + ) -> Result, Status> { + Ok(Response::new(BuildInfoResponse { + version: env!("CARGO_PKG_VERSION").to_string(), + commit: option_env!("GIT_COMMIT").unwrap_or("unknown").to_string(), + build_time: option_env!("BUILD_TIME").unwrap_or("unknown").to_string(), + rust_version: option_env!("RUSTC_VERSION").unwrap_or("unknown").to_string(), + target: format!("{}-{}", std::env::consts::ARCH, std::env::consts::OS), + })) + } +} + +fn optional_millis(value: i64) -> Option { + if value == 0 { + None + } else { + Some(value) + } +} + +fn query_error_type(error: &Error) -> &'static str { + match error { + Error::InvalidMetric(_) | Error::InvalidLabel(_) | Error::InvalidTimeRange(_) => "bad_data", + Error::Timeout(_) => "timeout", + _ => "execution", + } +} + +fn instant_query_data_to_proto(result: InstantQueryData) -> QueryData { + QueryData { + result_type: result.result_type, + result: result + .result + .into_iter() + .map(|series| QueryResult { + metric: series.metric, + values: Vec::new(), + value: series.value.map(sample_pair_from_tuple), + }) + .collect(), + } +} + +fn range_query_data_to_proto(result: RangeQueryResult) -> QueryData { + QueryData { + result_type: result.result_type, + result: result + .result + .into_iter() + .map(|series| QueryResult { + metric: series.metric, + values: series + .values + .into_iter() + .map(sample_pair_from_tuple) + .collect(), + value: None, + }) + .collect(), + } +} + +fn sample_pair_from_tuple((timestamp, value): (i64, f64)) -> SamplePair { + SamplePair { timestamp, value } +} + +fn storage_stats_to_proto(stats: StorageStats) -> nightlight_api::nightlight::StorageStats { + nightlight_api::nightlight::StorageStats { + active_series: stats.active_series, + total_samples: stats.total_samples, + blocks_count: stats.blocks_count, + head_samples: stats.head_samples, + disk_bytes_used: stats.disk_bytes_used, + oldest_sample_time: stats.oldest_sample_time, + newest_sample_time: stats.newest_sample_time, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::ingestion::IngestionService; + use crate::storage::Storage; + use nightlight_api::nightlight::{ + InstantQueryRequest, LabelValuesRequest, SeriesQueryRequest, + }; + use nightlight_api::prometheus::{Label, Sample, TimeSeries, WriteRequest}; + + #[tokio::test] + async fn instant_query_grpc_returns_metric_data() { + let dir = tempfile::tempdir().unwrap(); + let storage = Arc::new(Storage::new(dir.path().to_str().unwrap()).unwrap()); + let ingestion = IngestionService::new(Arc::clone(&storage)); + ingestion + .process_write_request(WriteRequest { + timeseries: vec![TimeSeries { + labels: vec![ + Label { + name: "__name__".to_string(), + value: "grpc_metric".to_string(), + }, + Label { + name: "job".to_string(), + value: "nightlight".to_string(), + }, + ], + samples: vec![Sample { + value: 12.5, + timestamp: 1_000, + }], + }], + }) + .await + .unwrap(); + + let service = MetricQueryServiceImpl::new(QueryService::from_storage(storage.queryable())); + let response = service + .instant_query(Request::new(InstantQueryRequest { + query: "grpc_metric{job=\"nightlight\"}".to_string(), + time: 2_000, + timeout: 0, + })) + .await + .unwrap() + .into_inner(); + + assert_eq!(response.status, "success"); + let data = response.data.unwrap(); + assert_eq!(data.result.len(), 1); + assert_eq!( + data.result[0].metric.get("__name__").map(String::as_str), + Some("grpc_metric") + ); + assert_eq!(data.result[0].value.as_ref().map(|value| value.value), Some(12.5)); + } + + #[tokio::test] + async fn metadata_queries_grpc_filter_series() { + let dir = tempfile::tempdir().unwrap(); + let storage = Arc::new(Storage::new(dir.path().to_str().unwrap()).unwrap()); + let ingestion = IngestionService::new(Arc::clone(&storage)); + ingestion + .process_write_request(WriteRequest { + timeseries: vec![ + TimeSeries { + labels: vec![ + Label { + name: "__name__".to_string(), + value: "grpc_metric".to_string(), + }, + Label { + name: "job".to_string(), + value: "api".to_string(), + }, + ], + samples: vec![Sample { + value: 1.0, + timestamp: 1_000, + }], + }, + TimeSeries { + labels: vec![ + Label { + name: "__name__".to_string(), + value: "grpc_metric".to_string(), + }, + Label { + name: "job".to_string(), + value: "worker".to_string(), + }, + ], + samples: vec![Sample { + value: 2.0, + timestamp: 2_000, + }], + }, + ], + }) + .await + .unwrap(); + + let service = MetricQueryServiceImpl::new(QueryService::from_storage(storage.queryable())); + let series = service + .series_query(Request::new(SeriesQueryRequest { + r#match: vec!["job=api".to_string()], + start: 0, + end: 0, + })) + .await + .unwrap() + .into_inner(); + assert_eq!(series.status, "success"); + assert_eq!(series.data.len(), 1); + + let label_values = service + .label_values_query(Request::new(LabelValuesRequest { + label_name: "job".to_string(), + r#match: vec!["__name__=grpc_metric".to_string()], + start: 0, + end: 0, + })) + .await + .unwrap() + .into_inner(); + assert_eq!(label_values.status, "success"); + assert_eq!(label_values.data, vec!["api".to_string(), "worker".to_string()]); + } + + #[tokio::test] + async fn admin_stats_report_ingestion_and_query_counters() { + let dir = tempfile::tempdir().unwrap(); + let storage = Arc::new(Storage::new(dir.path().to_str().unwrap()).unwrap()); + let ingestion = IngestionService::new(Arc::clone(&storage)); + ingestion + .process_write_request(WriteRequest { + timeseries: vec![TimeSeries { + labels: vec![Label { + name: "__name__".to_string(), + value: "admin_metric".to_string(), + }], + samples: vec![Sample { + value: 3.0, + timestamp: 1_000, + }], + }], + }) + .await + .unwrap(); + + let query = QueryService::from_storage(storage.queryable()); + query.execute_instant_query("admin_metric", 2_000).await.unwrap(); + + let admin = AdminServiceImpl::new( + Arc::clone(&storage), + ingestion.metrics(), + query.metrics(), + ); + let stats = admin + .stats(Request::new(StatsRequest {})) + .await + .unwrap() + .into_inner(); + + assert_eq!(stats.storage.as_ref().map(|value| value.total_samples), Some(1)); + assert_eq!( + stats.ingestion + .as_ref() + .map(|value| value.samples_ingested_total), + Some(1) + ); + assert_eq!(stats.query.as_ref().map(|value| value.queries_total), Some(1)); + } +} diff --git a/nightlight/crates/nightlight-server/src/ingestion.rs b/nightlight/crates/nightlight-server/src/ingestion.rs index 54f6589..93cc8f7 100644 --- a/nightlight/crates/nightlight-server/src/ingestion.rs +++ b/nightlight/crates/nightlight-server/src/ingestion.rs @@ -16,10 +16,11 @@ use nightlight_types::Error; use prost::Message; use snap::raw::Decoder as SnappyDecoder; use std::sync::Arc; -use tokio::sync::RwLock; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::Instant; use tracing::{debug, error, info, warn}; -use crate::query::QueryableStorage; +use crate::storage::Storage; /// Maximum write request size (10 MB uncompressed) const MAX_REQUEST_SIZE: usize = 10 * 1024 * 1024; @@ -27,28 +28,33 @@ const MAX_REQUEST_SIZE: usize = 10 * 1024 * 1024; /// Ingestion service state #[derive(Clone)] pub struct IngestionService { - storage: Arc>, + storage: Arc, metrics: Arc, } -/// Ingestion metrics for monitoring -struct IngestionMetrics { - samples_received: Arc, - samples_invalid: Arc, - requests_total: Arc, - requests_failed: Arc, +#[derive(Debug)] +pub struct IngestionMetrics { + samples_received: AtomicU64, + samples_invalid: AtomicU64, + requests_total: AtomicU64, + requests_failed: AtomicU64, + started_at: Instant, +} + +#[derive(Debug, Clone, Copy, Default)] +pub struct IngestionMetricsSnapshot { + pub samples_ingested_total: u64, + pub write_requests_total: u64, + pub write_requests_failed: u64, + pub samples_per_second: f64, + pub buffer_samples: u64, } impl IngestionService { - pub fn new(storage: Arc>) -> Self { + pub fn new(storage: Arc) -> Self { Self { storage, - metrics: Arc::new(IngestionMetrics { - samples_received: Arc::new(std::sync::atomic::AtomicU64::new(0)), - samples_invalid: Arc::new(std::sync::atomic::AtomicU64::new(0)), - requests_total: Arc::new(std::sync::atomic::AtomicU64::new(0)), - requests_failed: Arc::new(std::sync::atomic::AtomicU64::new(0)), - }), + metrics: Arc::new(IngestionMetrics::new()), } } @@ -59,10 +65,14 @@ impl IngestionService { .with_state(self) } + pub fn metrics(&self) -> Arc { + Arc::clone(&self.metrics) + } + /// Process a WriteRequest and write to shared storage - async fn process_write_request(&self, request: WriteRequest) -> Result { - let mut storage = self.storage.write().await; + pub(crate) async fn process_write_request(&self, request: WriteRequest) -> Result { let mut samples_processed = 0; + let mut series_to_append = Vec::new(); for ts in request.timeseries { // Validate and normalize labels @@ -83,7 +93,7 @@ impl IngestionService { // Validate sample if !sample.value.is_finite() { warn!("Invalid sample value: {}", sample.value); - self.metrics.samples_invalid.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + self.metrics.samples_invalid.fetch_add(1, Ordering::Relaxed); continue; } @@ -113,20 +123,56 @@ impl IngestionService { samples: internal_samples, }; - // Write to shared storage (upsert merges samples) - storage.upsert_series(time_series); + series_to_append.push(time_series); } - self.metrics.samples_received.fetch_add(samples_processed, std::sync::atomic::Ordering::Relaxed); + self.storage + .append(series_to_append) + .await + .map_err(|error| Error::Storage(error.to_string()))?; + self.metrics + .samples_received + .fetch_add(samples_processed, Ordering::Relaxed); Ok(samples_processed) } /// Get current storage statistics - pub async fn storage_stats(&self) -> (usize, usize) { - let storage = self.storage.read().await; - let total_samples: usize = storage.series.values().map(|s| s.samples.len()).sum(); - (total_samples, storage.series.len()) + pub async fn storage_stats(&self) -> Result<(usize, usize), Error> { + let stats = self + .storage + .stats() + .await + .map_err(|error| Error::Storage(error.to_string()))?; + Ok((stats.total_samples as usize, stats.active_series as usize)) + } +} + +impl IngestionMetrics { + fn new() -> Self { + Self { + samples_received: AtomicU64::new(0), + samples_invalid: AtomicU64::new(0), + requests_total: AtomicU64::new(0), + requests_failed: AtomicU64::new(0), + started_at: Instant::now(), + } + } + + pub fn snapshot(&self) -> IngestionMetricsSnapshot { + let uptime = self.started_at.elapsed().as_secs_f64(); + let samples_ingested_total = self.samples_received.load(Ordering::Relaxed); + IngestionMetricsSnapshot { + samples_ingested_total, + write_requests_total: self.requests_total.load(Ordering::Relaxed), + write_requests_failed: self.requests_failed.load(Ordering::Relaxed), + samples_per_second: if uptime > 0.0 { + samples_ingested_total as f64 / uptime + } else { + 0.0 + }, + buffer_samples: 0, + } } } @@ -135,7 +181,7 @@ async fn handle_remote_write( State(service): State, body: Bytes, ) -> Response { - service.metrics.requests_total.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + service.metrics.requests_total.fetch_add(1, Ordering::Relaxed); debug!("Received remote_write request, size: {} bytes", body.len()); @@ -150,7 +196,7 @@ async fn handle_remote_write( Ok(data) => data, Err(e) => { error!("Snappy decompression failed: {}", e); - return IngestionError::DecompressionFailed(e.to_string()).into_response(); + return IngestionError::DecompressionFailed.into_response(); } }; @@ -161,7 +207,7 @@ async fn handle_remote_write( Ok(req) => req, Err(e) => { error!("Protobuf decode failed: {}", e); - return IngestionError::InvalidProtobuf(e.to_string()).into_response(); + return IngestionError::InvalidProtobuf.into_response(); } }; @@ -178,18 +224,18 @@ async fn handle_remote_write( } Err(Error::Storage(msg)) if msg.contains("buffer full") => { warn!("Write buffer full, returning 429"); - service.metrics.requests_failed.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + service.metrics.requests_failed.fetch_add(1, Ordering::Relaxed); IngestionError::Backpressure.into_response() } Err(Error::InvalidLabel(msg)) => { warn!("Invalid labels: {}", msg); - service.metrics.requests_failed.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - IngestionError::InvalidLabels(msg).into_response() + service.metrics.requests_failed.fetch_add(1, Ordering::Relaxed); + IngestionError::InvalidLabels.into_response() } Err(e) => { error!("Failed to process write request: {}", e); - service.metrics.requests_failed.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - IngestionError::StorageError(e.to_string()).into_response() + service.metrics.requests_failed.fetch_add(1, Ordering::Relaxed); + IngestionError::StorageError.into_response() } } } @@ -271,10 +317,10 @@ fn compute_series_fingerprint(labels: &[nightlight_types::Label]) -> u64 { #[derive(Debug)] enum IngestionError { PayloadTooLarge, - DecompressionFailed(String), - InvalidProtobuf(String), - InvalidLabels(String), - StorageError(String), + DecompressionFailed, + InvalidProtobuf, + InvalidLabels, + StorageError, Backpressure, } @@ -284,16 +330,16 @@ impl IntoResponse for IngestionError { IngestionError::PayloadTooLarge => { (StatusCode::PAYLOAD_TOO_LARGE, "Request payload too large") } - IngestionError::DecompressionFailed(_) => { + IngestionError::DecompressionFailed => { (StatusCode::BAD_REQUEST, "Snappy decompression failed") } - IngestionError::InvalidProtobuf(_) => { + IngestionError::InvalidProtobuf => { (StatusCode::BAD_REQUEST, "Invalid protobuf encoding") } - IngestionError::InvalidLabels(_) => { + IngestionError::InvalidLabels => { (StatusCode::BAD_REQUEST, "Invalid metric labels") } - IngestionError::StorageError(_) => { + IngestionError::StorageError => { (StatusCode::INTERNAL_SERVER_ERROR, "Storage error") } IngestionError::Backpressure => { @@ -308,6 +354,7 @@ impl IntoResponse for IngestionError { #[cfg(test)] mod tests { use super::*; + use crate::storage::Storage; #[test] fn test_validate_labels_success() { @@ -378,16 +425,58 @@ mod tests { #[tokio::test] async fn test_ingestion_service_storage() { - use crate::query::QueryableStorage; - use std::collections::HashMap; - - let storage = Arc::new(RwLock::new(QueryableStorage { - series: HashMap::new(), - label_index: HashMap::new(), - })); + let dir = tempfile::tempdir().unwrap(); + let storage = Arc::new(Storage::new(dir.path().to_str().unwrap()).unwrap()); let service = IngestionService::new(storage); - let (samples, series) = service.storage_stats().await; + let (samples, series) = service.storage_stats().await.unwrap(); assert_eq!(samples, 0); assert_eq!(series, 0); } + + #[tokio::test] + async fn test_process_write_request_persists_samples() { + let dir = tempfile::tempdir().unwrap(); + let storage = Arc::new(Storage::new(dir.path().to_str().unwrap()).unwrap()); + let service = IngestionService::new(Arc::clone(&storage)); + + let request = WriteRequest { + timeseries: vec![nightlight_api::prometheus::TimeSeries { + labels: vec![ + Label { + name: "__name__".to_string(), + value: "ingest_metric".to_string(), + }, + Label { + name: "job".to_string(), + value: "test".to_string(), + }, + ], + samples: vec![nightlight_api::prometheus::Sample { + value: 42.0, + timestamp: 1_000, + }], + }], + }; + + let processed = service.process_write_request(request).await.unwrap(); + assert_eq!(processed, 1); + + storage.flush().await.unwrap(); + let reloaded = Storage::new(dir.path().to_str().unwrap()).unwrap(); + let ids = reloaded + .find_series(vec![ + "__name__=ingest_metric".to_string(), + "job=test".to_string(), + ]) + .await + .unwrap(); + assert_eq!(ids.len(), 1); + let series = reloaded + .query_series(ids[0], 0, 10_000) + .await + .unwrap() + .unwrap(); + assert_eq!(series.samples.len(), 1); + assert_eq!(series.samples[0].value, 42.0); + } } diff --git a/nightlight/crates/nightlight-server/src/lib.rs b/nightlight/crates/nightlight-server/src/lib.rs index a547b0d..a72db3f 100644 --- a/nightlight/crates/nightlight-server/src/lib.rs +++ b/nightlight/crates/nightlight-server/src/lib.rs @@ -3,6 +3,7 @@ //! This library exposes the internal modules for integration testing. pub mod config; +pub mod grpc; pub mod ingestion; pub mod query; pub mod storage; diff --git a/nightlight/crates/nightlight-server/src/main.rs b/nightlight/crates/nightlight-server/src/main.rs index 7c9af1d..3250ed5 100644 --- a/nightlight/crates/nightlight-server/src/main.rs +++ b/nightlight/crates/nightlight-server/src/main.rs @@ -1,127 +1,202 @@ -//! Nightlight Server +//! Nightlight server binary. //! -//! A Prometheus-compatible metrics storage system with mTLS support. -//! -//! # Architecture -//! -//! - **Ingestion**: Prometheus remote_write API (HTTP POST with snappy compression) -//! - **Query**: PromQL query engine (gRPC and HTTP APIs) -//! - **Storage**: Time-series database with retention and compaction -//! - **Security**: mTLS for all connections (following T027 patterns) -//! -//! # Configuration -//! -//! Configuration is loaded from a YAML file (default: config.yaml). -//! See config.rs for the full configuration schema. +//! Nightlight exposes: +//! - Prometheus remote_write ingestion over HTTP +//! - PromQL-compatible query endpoints over HTTP and gRPC +//! - gRPC admin endpoints for health and stats +//! - durable local storage backed by a WAL and snapshots + +use std::net::SocketAddr; +use std::sync::Arc; +use std::time::Duration; use anyhow::Result; -use tracing::{info, Level}; +use axum::{routing::get, Router}; +use nightlight_api::nightlight::admin_server::AdminServer; +use nightlight_api::nightlight::metric_query_server::MetricQueryServer; +use tokio::time::MissedTickBehavior; +use tonic::transport::Server as TonicServer; +use tonic_health::server::health_reporter; +use tracing::{error, info, warn, Level}; mod config; +mod grpc; mod ingestion; mod query; mod storage; -use config::Config; +use config::{Config, StorageConfig}; +use grpc::{AdminServiceImpl, MetricQueryServiceImpl}; +use ingestion::IngestionService; +use query::QueryService; +use storage::Storage; + +const DEFAULT_SNAPSHOT_INTERVAL_SECS: u64 = 30; #[tokio::main] async fn main() -> Result<()> { - // Initialize tracing subscriber for structured logging tracing_subscriber::fmt() .with_max_level(Level::INFO) .with_target(false) .with_thread_ids(true) .init(); - info!("Nightlight server starting..."); + info!("Nightlight server starting"); info!("Version: {}", env!("CARGO_PKG_VERSION")); - // Load configuration from file or use defaults let mut config = match Config::from_file("config.yaml") { - Ok(cfg) => { + Ok(config) => { info!("Configuration loaded from config.yaml"); - cfg + config } - Err(e) => { - info!("Failed to load config.yaml: {}, using defaults", e); + Err(error) => { + info!("Failed to load config.yaml: {}, using defaults", error); Config::default() } }; - - // Apply environment variable overrides (for NixOS module integration) config.apply_env_overrides(); + if config.tls.is_some() { + warn!("Nightlight TLS configuration is currently ignored; starting plaintext listeners"); + } + + let http_addr: SocketAddr = config.server.http_addr.parse()?; + let grpc_addr: SocketAddr = config.server.grpc_addr.parse()?; + let storage = Arc::new(Storage::new(&config.storage.data_dir)?); + let query_service = QueryService::from_storage(storage.queryable()); + let ingestion_service = IngestionService::new(Arc::clone(&storage)); + let admin_service = AdminServiceImpl::new( + Arc::clone(&storage), + ingestion_service.metrics(), + query_service.metrics(), + ); + let metric_query_service = MetricQueryServiceImpl::new(query_service.clone()); + info!("Server configuration:"); - info!(" gRPC address: {}", config.server.grpc_addr); - info!(" HTTP address: {}", config.server.http_addr); + info!(" HTTP address: {}", http_addr); + info!(" gRPC address: {}", grpc_addr); info!(" Data directory: {}", config.storage.data_dir); info!(" Retention: {} days", config.storage.retention_days); info!( - " TLS enabled: {}", - config.tls.as_ref().map_or("no", |_| "yes") + " Compaction interval: {} seconds", + config.storage.compaction_interval_seconds ); - // TODO (S5): Initialize storage layer - // let storage = storage::Storage::new(&config.storage)?; - // info!("Storage initialized"); + let http_listener = tokio::net::TcpListener::bind(http_addr).await?; + let http_app = Router::new() + .route("/healthz", get(healthz)) + .merge(ingestion_service.clone().router()) + .merge(query_service.clone().router()); - // S5: Load persistent state from disk - let data_path = std::path::PathBuf::from(&config.storage.data_dir) - .join("nightlight.db"); - let query_service = query::QueryService::new_with_persistence(&data_path)?; - info!("Query service initialized"); + let (mut health_reporter, health_service) = health_reporter(); + health_reporter + .set_serving::>() + .await; + health_reporter + .set_serving::>() + .await; - // Initialize ingestion service with shared storage - let shared_storage = query_service.storage(); - let ingestion_service = ingestion::IngestionService::new(shared_storage); - info!("Ingestion service initialized (sharing storage with query service)"); + let (shutdown_tx, _) = tokio::sync::broadcast::channel::<()>(1); + let mut http_shutdown = shutdown_tx.subscribe(); + let mut grpc_shutdown = shutdown_tx.subscribe(); + let maintenance_shutdown = shutdown_tx.subscribe(); - // Clone for shutdown handler - let query_service_for_shutdown = query_service.clone(); - let data_path_for_shutdown = data_path.clone(); + let http_server = async move { + axum::serve(http_listener, http_app) + .with_graceful_shutdown(async move { + let _ = http_shutdown.recv().await; + }) + .await + }; - // Create router with both ingestion and query endpoints - let app = ingestion_service.router().merge(query_service.router()); + let grpc_server = TonicServer::builder() + .add_service(health_service) + .add_service(MetricQueryServer::new(metric_query_service)) + .add_service(AdminServer::new(admin_service)) + .serve_with_shutdown(grpc_addr, async move { + let _ = grpc_shutdown.recv().await; + }); - // Start HTTP server for both ingestion and query endpoints - let listener = tokio::net::TcpListener::bind(&config.server.http_addr).await?; - info!("HTTP server listening on {}", config.server.http_addr); + let maintenance_handle = tokio::spawn(maintenance_loop( + Arc::clone(&storage), + config.storage.clone(), + maintenance_shutdown, + )); + + info!("HTTP server listening on {}", http_addr); info!(" - Ingestion: POST /api/v1/write"); info!(" - Query: GET /api/v1/query, /api/v1/query_range"); info!(" - Metadata: GET /api/v1/series, /api/v1/label/:name/values"); + info!(" - Health: GET /healthz"); + info!("gRPC server listening on {}", grpc_addr); + info!(" - MetricQuery.InstantQuery / RangeQuery / SeriesQuery / LabelValuesQuery"); + info!(" - Admin.Health / Stats / BuildInfo"); - // TODO (S5): Start background tasks - // - Compaction - // - Retention enforcement - // - Metrics export + let shutdown = async { + tokio::signal::ctrl_c().await.expect("failed to install Ctrl+C handler"); + }; + tokio::pin!(shutdown); - info!("Nightlight server ready"); - info!("Press Ctrl+C to shutdown"); + tokio::select! { + result = http_server => { + result?; + } + result = grpc_server => { + result?; + } + _ = &mut shutdown => { + info!("Shutdown signal received"); + } + } - // Serve with graceful shutdown - axum::serve(listener, app) - .with_graceful_shutdown(shutdown_signal(query_service_for_shutdown, data_path_for_shutdown)) - .await?; + let _ = shutdown_tx.send(()); + if let Err(error) = maintenance_handle.await { + error!(error = %error, "Nightlight maintenance task failed to join"); + } info!("Nightlight server stopped"); Ok(()) } -async fn shutdown_signal( - query_service: query::QueryService, - data_path: std::path::PathBuf, +async fn maintenance_loop( + storage: Arc, + config: StorageConfig, + mut shutdown: tokio::sync::broadcast::Receiver<()>, ) { - tokio::signal::ctrl_c() - .await - .expect("Failed to install CTRL+C handler"); - info!("Shutdown signal received, saving data..."); + let snapshot_interval_secs = + config.compaction_interval_seconds.clamp(5, DEFAULT_SNAPSHOT_INTERVAL_SECS); + let mut snapshot_interval = tokio::time::interval(Duration::from_secs(snapshot_interval_secs)); + snapshot_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); - // S5: Save persistent state to disk before shutdown - if let Err(e) = query_service.save_to_disk(&data_path).await { - tracing::error!("Failed to save data on shutdown: {}", e); - } else { - info!("Data saved successfully"); + let mut retention_interval = tokio::time::interval(Duration::from_secs( + config.compaction_interval_seconds.max(1), + )); + retention_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + + loop { + tokio::select! { + _ = shutdown.recv() => break, + _ = snapshot_interval.tick() => { + if let Err(error) = storage.flush().await { + error!(error = %error, "Nightlight snapshot flush failed"); + } + } + _ = retention_interval.tick() => { + if let Err(error) = storage.enforce_retention(config.retention_days).await { + error!(error = %error, "Nightlight retention sweep failed"); + } + if let Err(error) = storage.compact().await { + error!(error = %error, "Nightlight compaction checkpoint failed"); + } + } + } } - info!("Stopping server..."); + if let Err(error) = storage.flush().await { + error!(error = %error, "Nightlight final snapshot flush failed"); + } +} + +async fn healthz() -> &'static str { + "ok" } diff --git a/nightlight/crates/nightlight-server/src/query.rs b/nightlight/crates/nightlight-server/src/query.rs index 7ef90c0..99c064b 100644 --- a/nightlight/crates/nightlight-server/src/query.rs +++ b/nightlight/crates/nightlight-server/src/query.rs @@ -6,10 +6,11 @@ use axum::{ extract::{Path, Query, State}, http::StatusCode, - response::{IntoResponse, Json, Response}, + response::{IntoResponse, Json}, routing::get, Router, }; +use parking_lot::Mutex; use nightlight_types::{Error, Label, Result, Sample, SeriesId, TimeSeries}; use promql_parser::{ label::Matchers, @@ -18,16 +19,21 @@ use promql_parser::{ }, }; use serde::{Deserialize, Serialize}; -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; +use std::time::Instant; use tokio::sync::RwLock; use tracing::{debug, error, info}; +const QUERY_DURATION_HISTORY_LIMIT: usize = 512; + /// Query service state #[derive(Clone)] pub struct QueryService { // Reference to queryable storage (shared with ingestion) storage: Arc>, + metrics: Arc, } /// In-memory queryable storage (reads from ingestion buffer) @@ -39,6 +45,24 @@ pub struct QueryableStorage { pub label_index: HashMap>>, } +#[derive(Debug)] +pub struct QueryMetrics { + queries_total: AtomicU64, + queries_failed: AtomicU64, + queries_active: AtomicU64, + durations_ms: Mutex>, +} + +#[derive(Debug, Clone, Copy, Default)] +pub struct QueryMetricsSnapshot { + pub queries_total: u64, + pub queries_failed: u64, + pub queries_active: u64, + pub query_duration_p50: f64, + pub query_duration_p95: f64, + pub query_duration_p99: f64, +} + impl QueryService { pub fn new() -> Self { Self { @@ -46,12 +70,16 @@ impl QueryService { series: HashMap::new(), label_index: HashMap::new(), })), + metrics: Arc::new(QueryMetrics::new()), } } /// Create QueryService from existing shared storage pub fn from_storage(storage: Arc>) -> Self { - Self { storage } + Self { + storage, + metrics: Arc::new(QueryMetrics::new()), + } } /// Create QueryService and load persistent state from disk if it exists @@ -61,6 +89,7 @@ impl QueryService { Ok(Self { storage: Arc::new(RwLock::new(storage)), + metrics: Arc::new(QueryMetrics::new()), }) } @@ -82,17 +111,32 @@ impl QueryService { .with_state(self) } + pub fn metrics(&self) -> Arc { + Arc::clone(&self.metrics) + } + /// Execute an instant query at a specific timestamp pub async fn execute_instant_query(&self, query: &str, time: i64) -> Result { debug!("Executing instant query: {} at time {}", query, time); + let started = self.metrics.begin_query(); // Parse PromQL expression let expr = promql_parser::parser::parse(query) - .map_err(|e| Error::Query(format!("Parse error: {:?}", e)))?; + .map_err(|e| Error::Query(format!("Parse error: {:?}", e))); + let expr = match expr { + Ok(expr) => expr, + Err(error) => { + self.metrics.finish_query(started, false); + return Err(error); + } + }; // Execute the expression let storage = self.storage.read().await; - let result = self.evaluate_expr(&expr, time, time, 0, &storage).await?; + let result = self.evaluate_expr(&expr, time, time, 0, &storage).await; + let success = result.is_ok(); + self.metrics.finish_query(started, success); + let result = result?; Ok(QueryResult { result_type: "vector".to_string(), @@ -123,10 +167,31 @@ impl QueryService { "Executing range query: {} from {} to {} step {}", query, start, end, step ); + let started = self.metrics.begin_query(); + + if step <= 0 { + self.metrics.finish_query(started, false); + return Err(Error::InvalidTimeRange( + "range query step must be greater than zero".to_string(), + )); + } + if end < start { + self.metrics.finish_query(started, false); + return Err(Error::InvalidTimeRange( + "range query end must be greater than or equal to start".to_string(), + )); + } // Parse PromQL expression let expr = promql_parser::parser::parse(query) - .map_err(|e| Error::Query(format!("Parse error: {:?}", e)))?; + .map_err(|e| Error::Query(format!("Parse error: {:?}", e))); + let expr = match expr { + Ok(expr) => expr, + Err(error) => { + self.metrics.finish_query(started, false); + return Err(error); + } + }; let storage = self.storage.read().await; let mut results: HashMap = HashMap::new(); @@ -136,7 +201,14 @@ impl QueryService { while current_time <= end { let step_result = self .evaluate_expr(&expr, current_time, end, step, &storage) - .await?; + .await; + let step_result = match step_result { + Ok(step_result) => step_result, + Err(error) => { + self.metrics.finish_query(started, false); + return Err(error); + } + }; for ts in step_result { // Create a unique key for this series based on labels @@ -169,10 +241,12 @@ impl QueryService { current_time += step; } - Ok(RangeQueryResult { + let result = RangeQueryResult { result_type: "matrix".to_string(), result: results.into_values().collect(), - }) + }; + self.metrics.finish_query(started, true); + Ok(result) } /// Evaluate a PromQL expression (recursive with boxing for async) @@ -589,9 +663,108 @@ impl QueryService { true } - /// Get storage handle (for ingestion integration) - pub fn storage(&self) -> Arc> { - self.storage.clone() + pub async fn series_metadata( + &self, + matchers: &[String], + start: Option, + end: Option, + ) -> Result>> { + let started = self.metrics.begin_query(); + let storage = self.storage.read().await; + let series = self.matching_series(&storage, matchers, start, end); + let result = Ok(series + .into_iter() + .map(|ts| { + ts.labels + .iter() + .map(|label| (label.name.clone(), label.value.clone())) + .collect() + }) + .collect()); + self.metrics.finish_query(started, true); + result + } + + pub async fn label_values_for_matchers( + &self, + label_name: &str, + matchers: &[String], + start: Option, + end: Option, + ) -> Result> { + let started = self.metrics.begin_query(); + let storage = self.storage.read().await; + let mut values: Vec = self + .matching_series(&storage, matchers, start, end) + .into_iter() + .filter_map(|series| series.get_label(label_name).map(str::to_string)) + .collect(); + values.sort(); + values.dedup(); + self.metrics.finish_query(started, true); + Ok(values) + } + + fn matching_series( + &self, + storage: &QueryableStorage, + matchers: &[String], + start: Option, + end: Option, + ) -> Vec { + let parsed_matchers = parse_label_matchers(matchers); + storage + .series + .values() + .filter(|series| series_matches(series, &parsed_matchers)) + .filter(|series| series_in_time_range(series, start, end)) + .cloned() + .collect() + } +} + +impl QueryMetrics { + fn new() -> Self { + Self { + queries_total: AtomicU64::new(0), + queries_failed: AtomicU64::new(0), + queries_active: AtomicU64::new(0), + durations_ms: Mutex::new(VecDeque::with_capacity(QUERY_DURATION_HISTORY_LIMIT)), + } + } + + fn begin_query(&self) -> Instant { + self.queries_total.fetch_add(1, Ordering::Relaxed); + self.queries_active.fetch_add(1, Ordering::Relaxed); + Instant::now() + } + + fn finish_query(&self, started: Instant, success: bool) { + if !success { + self.queries_failed.fetch_add(1, Ordering::Relaxed); + } + self.queries_active.fetch_sub(1, Ordering::Relaxed); + + let elapsed_ms = started.elapsed().as_millis() as u64; + let mut durations = self.durations_ms.lock(); + if durations.len() >= QUERY_DURATION_HISTORY_LIMIT { + durations.pop_front(); + } + durations.push_back(elapsed_ms); + } + + pub fn snapshot(&self) -> QueryMetricsSnapshot { + let mut sorted_durations: Vec = self.durations_ms.lock().iter().copied().collect(); + sorted_durations.sort_unstable(); + + QueryMetricsSnapshot { + queries_total: self.queries_total.load(Ordering::Relaxed), + queries_failed: self.queries_failed.load(Ordering::Relaxed), + queries_active: self.queries_active.load(Ordering::Relaxed), + query_duration_p50: percentile(&sorted_durations, 0.50), + query_duration_p95: percentile(&sorted_durations, 0.95), + query_duration_p99: percentile(&sorted_durations, 0.99), + } } } @@ -600,12 +773,15 @@ impl QueryableStorage { pub fn upsert_series(&mut self, series: TimeSeries) { // Update label index for label in &series.labels { - self.label_index + let series_ids = self + .label_index .entry(label.name.clone()) .or_default() .entry(label.value.clone()) - .or_default() - .push(series.id); + .or_default(); + if !series_ids.contains(&series.id) { + series_ids.push(series.id); + } } // Upsert series @@ -624,11 +800,91 @@ impl QueryableStorage { /// Get label values for a specific label name pub fn label_values(&self, label_name: &str) -> Vec { - self.label_index + let mut values: Vec = self + .label_index .get(label_name) .map(|values| values.keys().cloned().collect()) - .unwrap_or_default() + .unwrap_or_default(); + values.sort(); + values } + + pub fn rebuild_index(&mut self) { + self.label_index.clear(); + let series: Vec = self.series.values().cloned().collect(); + for series in series { + for label in &series.labels { + self.label_index + .entry(label.name.clone()) + .or_default() + .entry(label.value.clone()) + .or_default() + .push(series.id); + } + } + } + + pub fn prune_before(&mut self, cutoff: i64) -> usize { + let mut removed_samples = 0usize; + self.series.retain(|_, series| { + let before = series.samples.len(); + series.samples.retain(|sample| sample.timestamp >= cutoff); + removed_samples += before.saturating_sub(series.samples.len()); + !series.samples.is_empty() + }); + self.rebuild_index(); + removed_samples + } +} + +fn percentile(values: &[u64], quantile: f64) -> f64 { + if values.is_empty() { + return 0.0; + } + + let index = ((values.len() - 1) as f64 * quantile).round() as usize; + values[index.min(values.len() - 1)] as f64 +} + +fn parse_label_matchers(matchers: &[String]) -> Vec<(String, String)> { + matchers + .iter() + .filter_map(|matcher| matcher.split_once('=')) + .map(|(key, value)| { + ( + key.trim().to_string(), + value.trim().trim_matches('"').to_string(), + ) + }) + .collect() +} + +fn series_matches(series: &TimeSeries, matchers: &[(String, String)]) -> bool { + matchers.iter().all(|(key, value)| { + series + .labels + .iter() + .any(|label| &label.name == key && &label.value == value) + }) +} + +fn series_in_time_range(series: &TimeSeries, start: Option, end: Option) -> bool { + let Some((series_start, series_end)) = series.time_range() else { + return true; + }; + + if let Some(start) = start { + if series_end < start { + return false; + } + } + if let Some(end) = end { + if series_start > end { + return false; + } + } + + true } /// HTTP handler for instant queries @@ -696,46 +952,57 @@ async fn handle_range_query( async fn handle_label_values( State(service): State, Path(label_name): Path, + Query(params): Query, ) -> impl IntoResponse { - let storage = service.storage.read().await; - let values = storage.label_values(&label_name); - - ( - StatusCode::OK, - Json(LabelValuesResponse { - status: "success".to_string(), - data: values, - }), - ) + match service + .label_values_for_matchers(&label_name, ¶ms.matchers, params.start, params.end) + .await + { + Ok(values) => ( + StatusCode::OK, + Json(LabelValuesResponse { + status: "success".to_string(), + data: values, + }), + ) + .into_response(), + Err(error) => ( + StatusCode::BAD_REQUEST, + Json(serde_json::json!({ + "status": "error", + "error": error.to_string(), + })), + ) + .into_response(), + } } /// HTTP handler for series metadata async fn handle_series( State(service): State, - Query(_params): Query, + Query(params): Query, ) -> impl IntoResponse { - let storage = service.storage.read().await; - - // Return all series metadata (limited implementation) - let series: Vec> = storage - .series - .values() - .take(1000) // Limit to prevent OOM - .map(|ts| { - ts.labels - .iter() - .map(|l| (l.name.clone(), l.value.clone())) - .collect() - }) - .collect(); - - ( - StatusCode::OK, - Json(SeriesResponse { - status: "success".to_string(), - data: series, - }), - ) + match service + .series_metadata(¶ms.matchers, params.start, params.end) + .await + { + Ok(series) => ( + StatusCode::OK, + Json(SeriesResponse { + status: "success".to_string(), + data: series, + }), + ) + .into_response(), + Err(error) => ( + StatusCode::BAD_REQUEST, + Json(serde_json::json!({ + "status": "error", + "error": error.to_string(), + })), + ) + .into_response(), + } } // Request/Response Types @@ -760,6 +1027,10 @@ struct SeriesQueryParams { #[serde(default)] #[serde(rename = "match[]")] matchers: Vec, + #[serde(default)] + start: Option, + #[serde(default)] + end: Option, } #[derive(Debug, Serialize)] @@ -770,30 +1041,30 @@ struct QueryResponse { error_type: Option, } -#[derive(Debug, Serialize)] -pub(crate) struct QueryResult { +#[derive(Debug, Clone, Serialize)] +pub struct QueryResult { #[serde(rename = "resultType")] - result_type: String, - result: Vec, + pub result_type: String, + pub result: Vec, } -#[derive(Debug, Serialize)] -struct InstantQueryResult { - metric: HashMap, - value: Option<(i64, f64)>, +#[derive(Debug, Clone, Serialize)] +pub struct InstantQueryResult { + pub metric: HashMap, + pub value: Option<(i64, f64)>, } -#[derive(Debug, Serialize)] -pub(crate) struct RangeQueryResult { +#[derive(Debug, Clone, Serialize)] +pub struct RangeQueryResult { #[serde(rename = "resultType")] - result_type: String, - result: Vec, + pub result_type: String, + pub result: Vec, } -#[derive(Debug, Serialize)] -struct RangeResult { - metric: HashMap, - values: Vec<(i64, f64)>, +#[derive(Debug, Clone, Serialize)] +pub struct RangeResult { + pub metric: HashMap, + pub values: Vec<(i64, f64)>, } #[derive(Debug, Serialize)] @@ -808,29 +1079,6 @@ struct SeriesResponse { data: Vec>, } -#[derive(Debug)] -enum QueryError { - ParseFailed(String), - ExecutionFailed(String), -} - -impl IntoResponse for QueryError { - fn into_response(self) -> Response { - let (status, message) = match self { - QueryError::ParseFailed(msg) => (StatusCode::BAD_REQUEST, msg), - QueryError::ExecutionFailed(msg) => (StatusCode::INTERNAL_SERVER_ERROR, msg), - }; - - let body = serde_json::json!({ - "status": "error", - "errorType": "execution", - "error": message - }); - - (status, Json(body)).into_response() - } -} - impl Default for QueryService { fn default() -> Self { Self::new() @@ -892,8 +1140,9 @@ impl QueryableStorage { .map_err(|e| Error::Storage(format!("Failed to read file: {}", e)))?; // Deserialize from bincode - let storage = bincode::deserialize(&buffer) + let mut storage: Self = bincode::deserialize(&buffer) .map_err(|e| Error::Storage(format!("Deserialization failed: {}", e)))?; + storage.rebuild_index(); Ok(storage) } diff --git a/nightlight/crates/nightlight-server/src/storage.rs b/nightlight/crates/nightlight-server/src/storage.rs index 8da4126..620a315 100644 --- a/nightlight/crates/nightlight-server/src/storage.rs +++ b/nightlight/crates/nightlight-server/src/storage.rs @@ -1,12 +1,8 @@ -//! Time-series storage layer -//! -//! シンプルなWAL付きストレージ実装(S5足場)。 -//! - in-memory head: `QueryableStorage` を共有 -//! - WAL: bincode長さプレフィックスでappend / replay -//! - スナップショット: `nightlight.db` にbincode保存 -//! - Retention/compactionは将来タスク(現状no-op) +//! Time-series storage layer backed by an in-memory head, a write-ahead log, +//! and periodic snapshots. use anyhow::Result; +use chrono::Utc; use nightlight_types::{SeriesId, TimeSeries}; use std::{ fs::{File, OpenOptions}, @@ -15,16 +11,15 @@ use std::{ sync::Arc, }; use tokio::sync::{Mutex, RwLock}; +use tracing::debug; use crate::query::QueryableStorage; -/// WALレコード #[derive(serde::Serialize, serde::Deserialize)] struct WalRecord { series: TimeSeries, } -/// ストレージ本体 pub struct Storage { head: Arc>, wal_path: PathBuf, @@ -33,7 +28,6 @@ pub struct Storage { } impl Storage { - /// data_dirを初期化し、snapshot + WALをリプレイする pub fn new(data_dir: &str) -> Result { let data_dir = PathBuf::from(data_dir); std::fs::create_dir_all(&data_dir)?; @@ -41,12 +35,11 @@ impl Storage { let snapshot_path = data_dir.join("nightlight.db"); let wal_path = data_dir.join("wal.log"); - // snapshotロード let mut head = QueryableStorage::load_from_file(&snapshot_path)?; - // WALリプレイ if wal_path.exists() { replay_wal(&wal_path, &mut head)?; } + head.rebuild_index(); Ok(Self { head: Arc::new(RwLock::new(head)), @@ -56,13 +49,15 @@ impl Storage { }) } - /// 共有QueryableStorageを取得 pub fn queryable(&self) -> Arc> { - self.head.clone() + Arc::clone(&self.head) } - /// WALへappendし、headへ反映 pub async fn append(&self, series_list: Vec) -> Result<()> { + if series_list.is_empty() { + return Ok(()); + } + let _guard = self.wal_lock.lock().await; let mut wal_file = OpenOptions::new() .create(true) @@ -78,14 +73,12 @@ impl Storage { let len = encoded.len() as u32; wal_file.write_all(&len.to_le_bytes())?; wal_file.write_all(&encoded)?; - head.upsert_series(series); } wal_file.flush()?; Ok(()) } - /// 指定IDのシリーズを時間範囲で返す pub async fn query_series( &self, series_id: SeriesId, @@ -93,110 +86,142 @@ impl Storage { end: i64, ) -> Result> { let head = self.head.read().await; - if let Some(series) = head.series.get(&series_id) { - let mut filtered = series.clone(); - filtered - .samples - .retain(|s| s.timestamp >= start && s.timestamp <= end); - return Ok(Some(filtered)); - } - Ok(None) + Ok(head + .series + .get(&series_id) + .map(|series| series.filter_by_time(start, end))) } - /// 簡易ラベル一致検索 pub async fn find_series(&self, matchers: Vec) -> Result> { let parsed: Vec<(String, String)> = matchers .iter() - .filter_map(|m| m.split_once('=')) - .map(|(k, v)| (k.to_string(), v.to_string())) + .filter_map(|matcher| matcher.split_once('=')) + .map(|(key, value)| { + ( + key.trim().to_string(), + value.trim().trim_matches('"').to_string(), + ) + }) .collect(); let head = self.head.read().await; let mut result = Vec::new(); - 'outer: for (series_id, ts) in &head.series { - for (k, v) in &parsed { - if !ts.labels.iter().any(|l| &l.name == k && &l.value == v) { + 'outer: for (series_id, series) in &head.series { + for (key, value) in &parsed { + if !series + .labels + .iter() + .any(|label| &label.name == key && &label.value == value) + { continue 'outer; } } result.push(*series_id); } + result.sort_unstable(); Ok(result) } - /// スナップショット保存 + WAL truncate pub async fn flush(&self) -> Result<()> { - let head = self.head.read().await; - head.save_to_file(&self.snapshot_path)?; - drop(head); - let _guard = self.wal_lock.lock().await; - File::create(&self.wal_path)?; // truncate + let snapshot = { + let head = self.head.read().await; + head.clone() + }; + snapshot.save_to_file(&self.snapshot_path)?; + File::create(&self.wal_path)?; Ok(()) } - /// Retentionは将来実装(no-op) - pub async fn enforce_retention(&self, _retention_days: u32) -> Result<()> { + pub async fn enforce_retention(&self, retention_days: u32) -> Result<()> { + if retention_days == 0 { + return Ok(()); + } + + let retention_ms = i64::from(retention_days) * 24 * 60 * 60 * 1000; + let cutoff = Utc::now().timestamp_millis() - retention_ms; + let removed_samples = { + let mut head = self.head.write().await; + head.prune_before(cutoff) + }; + + if removed_samples > 0 { + debug!(removed_samples, cutoff, "pruned expired Nightlight samples"); + } + Ok(()) } - /// Compactionは将来実装(no-op) pub async fn compact(&self) -> Result<()> { - Ok(()) + self.flush().await } - /// 現在の統計 pub async fn stats(&self) -> Result { let head = self.head.read().await; - let active_series = head.series.len() as u64; - let total_samples = head + let total_samples: u64 = head .series .values() - .map(|s| s.samples.len() as u64) + .map(|series| series.samples.len() as u64) .sum(); - + let oldest_sample_time = head + .series + .values() + .filter_map(|series| series.oldest_sample().map(|sample| sample.timestamp)) + .min() + .unwrap_or(0); + let newest_sample_time = head + .series + .values() + .filter_map(|series| series.latest_sample().map(|sample| sample.timestamp)) + .max() + .unwrap_or(0); let wal_size = std::fs::metadata(&self.wal_path) - .map(|m| m.len()) + .map(|metadata| metadata.len()) .unwrap_or(0); let snapshot_size = std::fs::metadata(&self.snapshot_path) - .map(|m| m.len()) + .map(|metadata| metadata.len()) .unwrap_or(0); Ok(StorageStats { - active_series, + active_series: head.series.len() as u64, total_samples, - blocks_count: 1, + blocks_count: u64::from(snapshot_size > 0), + head_samples: total_samples, disk_bytes_used: wal_size + snapshot_size, + oldest_sample_time, + newest_sample_time, }) } } -/// WALリプレイ fn replay_wal(path: &Path, storage: &mut QueryableStorage) -> Result<()> { let mut file = File::open(path)?; let mut len_buf = [0u8; 4]; loop { - if let Err(e) = file.read_exact(&mut len_buf) { - if e.kind() == std::io::ErrorKind::UnexpectedEof { + if let Err(error) = file.read_exact(&mut len_buf) { + if error.kind() == std::io::ErrorKind::UnexpectedEof { break; } - return Err(e.into()); + return Err(error.into()); } let len = u32::from_le_bytes(len_buf) as usize; - let mut buf = vec![0u8; len]; - file.read_exact(&mut buf)?; - let record: WalRecord = bincode::deserialize(&buf)?; + let mut buffer = vec![0u8; len]; + file.read_exact(&mut buffer)?; + let record: WalRecord = bincode::deserialize(&buffer)?; storage.upsert_series(record.series); } Ok(()) } -/// Storage statistics +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] pub struct StorageStats { pub active_series: u64, pub total_samples: u64, pub blocks_count: u64, + pub head_samples: u64, pub disk_bytes_used: u64, + pub oldest_sample_time: i64, + pub newest_sample_time: i64, } #[cfg(test)] @@ -247,6 +272,76 @@ mod tests { assert_eq!(res.samples[1].value, 2.0); } + #[tokio::test] + async fn test_retention_prunes_old_samples_and_series() { + let dir = tempfile::tempdir().unwrap(); + let storage = Storage::new(dir.path().to_str().unwrap()).unwrap(); + let now = Utc::now().timestamp_millis(); + + storage + .append(vec![ + TimeSeries { + id: SeriesId::new(1), + labels: vec![Label::new("__name__", "retained_metric")], + samples: vec![ + nightlight_types::Sample::new(now - (2 * 24 * 60 * 60 * 1000), 1.0), + nightlight_types::Sample::new(now, 2.0), + ], + }, + TimeSeries { + id: SeriesId::new(2), + labels: vec![Label::new("__name__", "expired_metric")], + samples: vec![nightlight_types::Sample::new( + now - (3 * 24 * 60 * 60 * 1000), + 3.0, + )], + }, + ]) + .await + .unwrap(); + + storage.enforce_retention(1).await.unwrap(); + + let retained = storage + .query_series(SeriesId::new(1), 0, now + 1) + .await + .unwrap() + .unwrap(); + assert_eq!(retained.samples.len(), 1); + assert_eq!(retained.samples[0].value, 2.0); + + let expired = storage + .query_series(SeriesId::new(2), 0, now + 1) + .await + .unwrap(); + assert!(expired.is_none()); + } + + #[tokio::test] + async fn test_stats_report_sample_bounds() { + let dir = tempfile::tempdir().unwrap(); + let storage = Storage::new(dir.path().to_str().unwrap()).unwrap(); + + storage + .append(vec![TimeSeries { + id: SeriesId::new(99), + labels: vec![Label::new("__name__", "stats_metric")], + samples: vec![ + nightlight_types::Sample::new(1000, 1.0), + nightlight_types::Sample::new(2000, 2.0), + ], + }]) + .await + .unwrap(); + + let stats = storage.stats().await.unwrap(); + assert_eq!(stats.active_series, 1); + assert_eq!(stats.total_samples, 2); + assert_eq!(stats.head_samples, 2); + assert_eq!(stats.oldest_sample_time, 1000); + assert_eq!(stats.newest_sample_time, 2000); + } + #[tokio::test] async fn test_find_series() { let dir = tempfile::tempdir().unwrap(); diff --git a/nix/test-cluster/run-cluster.sh b/nix/test-cluster/run-cluster.sh index 91a6db9..f4d84a6 100755 --- a/nix/test-cluster/run-cluster.sh +++ b/nix/test-cluster/run-cluster.sh @@ -75,6 +75,9 @@ CREDITSERVICE_PROTO_DIR="${REPO_ROOT}/creditservice/proto" CREDITSERVICE_PROTO="${CREDITSERVICE_PROTO_DIR}/creditservice.proto" LIGHTNINGSTOR_PROTO_DIR="${REPO_ROOT}/lightningstor/crates/lightningstor-api/proto" LIGHTNINGSTOR_PROTO="${LIGHTNINGSTOR_PROTO_DIR}/lightningstor.proto" +NIGHTLIGHT_PROTO_DIR="${REPO_ROOT}/nightlight/crates/nightlight-api/proto" +NIGHTLIGHT_QUERY_PROTO="${NIGHTLIGHT_PROTO_DIR}/query.proto" +NIGHTLIGHT_ADMIN_PROTO="${NIGHTLIGHT_PROTO_DIR}/admin.proto" PLASMAVMC_PROTO_DIR="${REPO_ROOT}/plasmavmc/proto" PLASMAVMC_PROTO="${PLASMAVMC_PROTO_DIR}/plasmavmc.proto" FLAREDB_PROTO_DIR="${REPO_ROOT}/flaredb/crates/flaredb-proto/src" @@ -4676,10 +4679,24 @@ validate_nightlight_flow_with_base() { --label source=smoke \ --label cluster=photoncloud + wait_for_nightlight_query_result "${base_url}" "${flow_name}" "${metric_name}" "${metric_value}" "source=\"smoke\"" + + curl -fsS "${base_url}/label/__name__/values" \ + | jq -e --arg name "${metric_name}" '.status == "success" and (.data | index($name)) != null' >/dev/null + curl -fsS "${base_url}/series" \ + | jq -e --arg name "${metric_name}" '.status == "success" and (.data | any(.__name__ == $name))' >/dev/null +} + +wait_for_nightlight_query_result() { + local base_url="$1" + local flow_name="$2" + local metric_name="$3" + local metric_value="$4" + local selector_suffix="${5:-}" local deadline=$((SECONDS + HTTP_WAIT_TIMEOUT)) while true; do if curl -fsS --get "${base_url}/query" \ - --data-urlencode "query=${metric_name}{source=\"smoke\"}" \ + --data-urlencode "query=${metric_name}{${selector_suffix}}" \ | jq -e --arg name "${metric_name}" --argjson expected "${metric_value}" ' .status == "success" and (.data.result | length) >= 1 @@ -4692,15 +4709,11 @@ validate_nightlight_flow_with_base() { fi sleep 2 done - - curl -fsS "${base_url}/label/__name__/values" \ - | jq -e --arg name "${metric_name}" '.status == "success" and (.data | index($name)) != null' >/dev/null - curl -fsS "${base_url}/series" \ - | jq -e --arg name "${metric_name}" '.status == "success" and (.data | any(.__name__ == $name))' >/dev/null } validate_nightlight_flow() { validate_nightlight_flow_with_base "http://127.0.0.1:9090/api/v1" "NightLight" + validate_nightlight_grpc_and_persistence } validate_apigateway_nightlight_flow() { @@ -4709,6 +4722,85 @@ validate_apigateway_nightlight_flow() { validate_nightlight_flow_with_base "http://127.0.0.1:8080/api/v1/metrics" "API Gateway -> NightLight" } +validate_nightlight_grpc_and_persistence() { + log "Validating NightLight gRPC query/admin APIs and restart persistence" + + local base_url="http://127.0.0.1:9090/api/v1" + local grpc_tunnel="" + local metric_name="nightlight_persist_metric_$(date +%s)" + local metric_value + metric_value="$(awk 'BEGIN{srand(); printf "%.3f\n", (rand()*100)+1}')" + + grpc_tunnel="$(start_ssh_tunnel node06 15090 50088)" + trap 'stop_ssh_tunnel node06 "${grpc_tunnel}"' RETURN + + python3 "${REPO_ROOT}/nix/test-cluster/nightlight_remote_write.py" \ + --url "${base_url}/write" \ + --metric "${metric_name}" \ + --value "${metric_value}" \ + --label source=grpc \ + --label cluster=photoncloud + + wait_for_nightlight_query_result "${base_url}" "NightLight persistence pre-restart" "${metric_name}" "${metric_value}" "source=\"grpc\"" + + grpcurl -plaintext \ + -import-path "${NIGHTLIGHT_PROTO_DIR}" \ + -proto "${NIGHTLIGHT_QUERY_PROTO}" \ + -d "$(jq -cn --arg query "${metric_name}{source=\"grpc\"}" '{query:$query, time:0, timeout:5000}')" \ + 127.0.0.1:15090 nightlight.MetricQuery/InstantQuery \ + | jq -e --arg name "${metric_name}" --argjson expected "${metric_value}" ' + .status == "success" + and (.data.result | any(.metric.__name__ == $name and (.value.value >= ($expected - 0.001)) and (.value.value <= ($expected + 0.001)))) + ' >/dev/null + + grpcurl -plaintext \ + -import-path "${NIGHTLIGHT_PROTO_DIR}" \ + -proto "${NIGHTLIGHT_QUERY_PROTO}" \ + -d "$(jq -cn --arg match "__name__=${metric_name}" '{match:[$match]}')" \ + 127.0.0.1:15090 nightlight.MetricQuery/SeriesQuery \ + | jq -e --arg name "${metric_name}" '.status == "success" and (.data | any(.labels.__name__ == $name))' >/dev/null + + grpcurl -plaintext \ + -import-path "${NIGHTLIGHT_PROTO_DIR}" \ + -proto "${NIGHTLIGHT_QUERY_PROTO}" \ + -d "$(jq -cn --arg label "source" --arg match "__name__=${metric_name}" '{labelName:$label, match:[$match]}')" \ + 127.0.0.1:15090 nightlight.MetricQuery/LabelValuesQuery \ + | jq -e '.status == "success" and (.data | index("grpc")) != null' >/dev/null + + grpcurl -plaintext \ + -import-path "${NIGHTLIGHT_PROTO_DIR}" \ + -proto "${NIGHTLIGHT_ADMIN_PROTO}" \ + -d '{}' \ + 127.0.0.1:15090 nightlight.Admin/Health \ + | jq -e '.status == "ok" and (.components | any(.name == "storage" and .status == "ok"))' >/dev/null + + grpcurl -plaintext \ + -import-path "${NIGHTLIGHT_PROTO_DIR}" \ + -proto "${NIGHTLIGHT_ADMIN_PROTO}" \ + -d '{}' \ + 127.0.0.1:15090 nightlight.Admin/Stats \ + | jq -e '.storage.totalSamples >= 1 and .ingestion.samplesIngestedTotal >= 1 and .query.queriesTotal >= 1' >/dev/null + + ssh_node node06 "systemctl restart nightlight.service" + wait_for_host_http http://127.0.0.1:9090/healthz + wait_for_tcp_port node06 50088 + + wait_for_nightlight_query_result "${base_url}" "NightLight persistence post-restart" "${metric_name}" "${metric_value}" "source=\"grpc\"" + + grpcurl -plaintext \ + -import-path "${NIGHTLIGHT_PROTO_DIR}" \ + -proto "${NIGHTLIGHT_QUERY_PROTO}" \ + -d "$(jq -cn --arg query "${metric_name}{source=\"grpc\"}" '{query:$query, time:0, timeout:5000}')" \ + 127.0.0.1:15090 nightlight.MetricQuery/InstantQuery \ + | jq -e --arg name "${metric_name}" --argjson expected "${metric_value}" ' + .status == "success" + and (.data.result | any(.metric.__name__ == $name and (.value.value >= ($expected - 0.001)) and (.value.value <= ($expected + 0.001)))) + ' >/dev/null + + trap - RETURN + stop_ssh_tunnel node06 "${grpc_tunnel}" +} + validate_creditservice_rest_flow() { local base_url="$1" local token="$2"