From bd09761def130c502499bf88c83e82e528d05f5b Mon Sep 17 00:00:00 2001
From: centra
Date: Tue, 31 Mar 2026 21:29:14 +0900
Subject: [PATCH] nightlight: add durable grpc runtime
---
.../crates/nightlight-server/src/config.rs | 11 -
.../crates/nightlight-server/src/grpc.rs | 502 ++++++++++++++++++
.../crates/nightlight-server/src/ingestion.rs | 189 +++++--
.../crates/nightlight-server/src/lib.rs | 1 +
.../crates/nightlight-server/src/main.rs | 219 +++++---
.../crates/nightlight-server/src/query.rs | 429 +++++++++++----
.../crates/nightlight-server/src/storage.rs | 211 ++++++--
nix/test-cluster/run-cluster.sh | 104 +++-
8 files changed, 1379 insertions(+), 287 deletions(-)
create mode 100644 nightlight/crates/nightlight-server/src/grpc.rs
diff --git a/nightlight/crates/nightlight-server/src/config.rs b/nightlight/crates/nightlight-server/src/config.rs
index 4626695..dd2300b 100644
--- a/nightlight/crates/nightlight-server/src/config.rs
+++ b/nightlight/crates/nightlight-server/src/config.rs
@@ -129,17 +129,6 @@ impl Config {
Ok(config)
}
- /// Load configuration from file, or use defaults if file doesn't exist
- pub fn load_or_default() -> Result {
- match Self::from_file("config.yaml") {
- Ok(config) => Ok(config),
- Err(_) => {
- tracing::warn!("No config file found, using defaults");
- Ok(Self::default())
- }
- }
- }
-
/// Save configuration to a YAML file
pub fn save(&self, path: &str) -> Result<()> {
let content = serde_yaml::to_string(self)?;
diff --git a/nightlight/crates/nightlight-server/src/grpc.rs b/nightlight/crates/nightlight-server/src/grpc.rs
new file mode 100644
index 0000000..5cee297
--- /dev/null
+++ b/nightlight/crates/nightlight-server/src/grpc.rs
@@ -0,0 +1,502 @@
+use std::sync::Arc;
+use std::time::Instant;
+
+use chrono::Utc;
+use tonic::{Request, Response, Status};
+
+use crate::ingestion::IngestionMetrics;
+use crate::query::{QueryMetrics, QueryResult as InstantQueryData, QueryService, RangeQueryResult};
+use crate::storage::{Storage, StorageStats};
+use nightlight_api::nightlight::admin_server::Admin;
+use nightlight_api::nightlight::metric_query_server::MetricQuery;
+use nightlight_api::nightlight::{
+ BuildInfoRequest, BuildInfoResponse, ComponentHealth, HealthRequest, HealthResponse,
+ LabelValuesRequest, LabelValuesResponse, QueryData, QueryResponse, QueryResult, SamplePair,
+ SeriesLabels, SeriesQueryRequest, SeriesQueryResponse, StatsRequest, StatsResponse,
+};
+use nightlight_types::Error;
+
+#[derive(Clone)]
+pub struct MetricQueryServiceImpl {
+ query: QueryService,
+}
+
+#[derive(Clone)]
+pub struct AdminServiceImpl {
+ storage: Arc,
+ ingestion_metrics: Arc,
+ query_metrics: Arc,
+ started_at: Instant,
+}
+
+impl MetricQueryServiceImpl {
+ pub fn new(query: QueryService) -> Self {
+ Self { query }
+ }
+}
+
+impl AdminServiceImpl {
+ pub fn new(
+ storage: Arc,
+ ingestion_metrics: Arc,
+ query_metrics: Arc,
+ ) -> Self {
+ Self {
+ storage,
+ ingestion_metrics,
+ query_metrics,
+ started_at: Instant::now(),
+ }
+ }
+}
+
+#[tonic::async_trait]
+impl MetricQuery for MetricQueryServiceImpl {
+ async fn instant_query(
+ &self,
+ request: Request,
+ ) -> Result, Status> {
+ let request = request.into_inner();
+ let time = if request.time == 0 {
+ Utc::now().timestamp_millis()
+ } else {
+ request.time
+ };
+
+ let response = match self.query.execute_instant_query(&request.query, time).await {
+ Ok(result) => QueryResponse {
+ status: "success".to_string(),
+ data: Some(instant_query_data_to_proto(result)),
+ error: String::new(),
+ error_type: String::new(),
+ warnings: Vec::new(),
+ },
+ Err(error) => QueryResponse {
+ status: "error".to_string(),
+ data: None,
+ error: error.to_string(),
+ error_type: query_error_type(&error).to_string(),
+ warnings: Vec::new(),
+ },
+ };
+
+ Ok(Response::new(response))
+ }
+
+ async fn range_query(
+ &self,
+ request: Request,
+ ) -> Result, Status> {
+ let request = request.into_inner();
+
+ let response = match self
+ .query
+ .execute_range_query(&request.query, request.start, request.end, request.step)
+ .await
+ {
+ Ok(result) => QueryResponse {
+ status: "success".to_string(),
+ data: Some(range_query_data_to_proto(result)),
+ error: String::new(),
+ error_type: String::new(),
+ warnings: Vec::new(),
+ },
+ Err(error) => QueryResponse {
+ status: "error".to_string(),
+ data: None,
+ error: error.to_string(),
+ error_type: query_error_type(&error).to_string(),
+ warnings: Vec::new(),
+ },
+ };
+
+ Ok(Response::new(response))
+ }
+
+ async fn series_query(
+ &self,
+ request: Request,
+ ) -> Result, Status> {
+ let request = request.into_inner();
+ let response = match self
+ .query
+ .series_metadata(
+ &request.r#match,
+ optional_millis(request.start),
+ optional_millis(request.end),
+ )
+ .await
+ {
+ Ok(series) => SeriesQueryResponse {
+ status: "success".to_string(),
+ data: series
+ .into_iter()
+ .map(|labels| SeriesLabels { labels })
+ .collect(),
+ error: String::new(),
+ },
+ Err(error) => SeriesQueryResponse {
+ status: "error".to_string(),
+ data: Vec::new(),
+ error: error.to_string(),
+ },
+ };
+
+ Ok(Response::new(response))
+ }
+
+ async fn label_values_query(
+ &self,
+ request: Request,
+ ) -> Result, Status> {
+ let request = request.into_inner();
+ let response = match self
+ .query
+ .label_values_for_matchers(
+ &request.label_name,
+ &request.r#match,
+ optional_millis(request.start),
+ optional_millis(request.end),
+ )
+ .await
+ {
+ Ok(values) => LabelValuesResponse {
+ status: "success".to_string(),
+ data: values,
+ error: String::new(),
+ },
+ Err(error) => LabelValuesResponse {
+ status: "error".to_string(),
+ data: Vec::new(),
+ error: error.to_string(),
+ },
+ };
+
+ Ok(Response::new(response))
+ }
+}
+
+#[tonic::async_trait]
+impl Admin for AdminServiceImpl {
+ async fn health(
+ &self,
+ _request: Request,
+ ) -> Result, Status> {
+ let storage_result = self.storage.stats().await;
+ let status = if storage_result.is_ok() { "ok" } else { "degraded" };
+ let storage_message = match &storage_result {
+ Ok(_) => "storage ready".to_string(),
+ Err(error) => error.to_string(),
+ };
+
+ Ok(Response::new(HealthResponse {
+ status: status.to_string(),
+ message: "nightlight ready".to_string(),
+ components: vec![
+ ComponentHealth {
+ name: "storage".to_string(),
+ status: status.to_string(),
+ message: storage_message,
+ },
+ ComponentHealth {
+ name: "ingestion".to_string(),
+ status: "ok".to_string(),
+ message: "remote_write endpoint ready".to_string(),
+ },
+ ComponentHealth {
+ name: "query_engine".to_string(),
+ status: "ok".to_string(),
+ message: "http and grpc query paths ready".to_string(),
+ },
+ ],
+ }))
+ }
+
+ async fn stats(
+ &self,
+ _request: Request,
+ ) -> Result, Status> {
+ let storage = self
+ .storage
+ .stats()
+ .await
+ .map_err(|error| Status::internal(error.to_string()))?;
+ let ingestion = self.ingestion_metrics.snapshot();
+ let query = self.query_metrics.snapshot();
+
+ Ok(Response::new(StatsResponse {
+ storage: Some(storage_stats_to_proto(storage)),
+ ingestion: Some(nightlight_api::nightlight::IngestionStats {
+ samples_ingested_total: ingestion.samples_ingested_total,
+ write_requests_total: ingestion.write_requests_total,
+ write_requests_failed: ingestion.write_requests_failed,
+ samples_per_second: ingestion.samples_per_second,
+ buffer_samples: ingestion.buffer_samples,
+ }),
+ query: Some(nightlight_api::nightlight::QueryStats {
+ queries_total: query.queries_total,
+ queries_failed: query.queries_failed,
+ queries_active: query.queries_active,
+ query_duration_p50: query.query_duration_p50,
+ query_duration_p95: query.query_duration_p95,
+ query_duration_p99: query.query_duration_p99,
+ }),
+ uptime_seconds: self.started_at.elapsed().as_secs(),
+ }))
+ }
+
+ async fn build_info(
+ &self,
+ _request: Request,
+ ) -> Result, Status> {
+ Ok(Response::new(BuildInfoResponse {
+ version: env!("CARGO_PKG_VERSION").to_string(),
+ commit: option_env!("GIT_COMMIT").unwrap_or("unknown").to_string(),
+ build_time: option_env!("BUILD_TIME").unwrap_or("unknown").to_string(),
+ rust_version: option_env!("RUSTC_VERSION").unwrap_or("unknown").to_string(),
+ target: format!("{}-{}", std::env::consts::ARCH, std::env::consts::OS),
+ }))
+ }
+}
+
+fn optional_millis(value: i64) -> Option {
+ if value == 0 {
+ None
+ } else {
+ Some(value)
+ }
+}
+
+fn query_error_type(error: &Error) -> &'static str {
+ match error {
+ Error::InvalidMetric(_) | Error::InvalidLabel(_) | Error::InvalidTimeRange(_) => "bad_data",
+ Error::Timeout(_) => "timeout",
+ _ => "execution",
+ }
+}
+
+fn instant_query_data_to_proto(result: InstantQueryData) -> QueryData {
+ QueryData {
+ result_type: result.result_type,
+ result: result
+ .result
+ .into_iter()
+ .map(|series| QueryResult {
+ metric: series.metric,
+ values: Vec::new(),
+ value: series.value.map(sample_pair_from_tuple),
+ })
+ .collect(),
+ }
+}
+
+fn range_query_data_to_proto(result: RangeQueryResult) -> QueryData {
+ QueryData {
+ result_type: result.result_type,
+ result: result
+ .result
+ .into_iter()
+ .map(|series| QueryResult {
+ metric: series.metric,
+ values: series
+ .values
+ .into_iter()
+ .map(sample_pair_from_tuple)
+ .collect(),
+ value: None,
+ })
+ .collect(),
+ }
+}
+
+fn sample_pair_from_tuple((timestamp, value): (i64, f64)) -> SamplePair {
+ SamplePair { timestamp, value }
+}
+
+fn storage_stats_to_proto(stats: StorageStats) -> nightlight_api::nightlight::StorageStats {
+ nightlight_api::nightlight::StorageStats {
+ active_series: stats.active_series,
+ total_samples: stats.total_samples,
+ blocks_count: stats.blocks_count,
+ head_samples: stats.head_samples,
+ disk_bytes_used: stats.disk_bytes_used,
+ oldest_sample_time: stats.oldest_sample_time,
+ newest_sample_time: stats.newest_sample_time,
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::ingestion::IngestionService;
+ use crate::storage::Storage;
+ use nightlight_api::nightlight::{
+ InstantQueryRequest, LabelValuesRequest, SeriesQueryRequest,
+ };
+ use nightlight_api::prometheus::{Label, Sample, TimeSeries, WriteRequest};
+
+ #[tokio::test]
+ async fn instant_query_grpc_returns_metric_data() {
+ let dir = tempfile::tempdir().unwrap();
+ let storage = Arc::new(Storage::new(dir.path().to_str().unwrap()).unwrap());
+ let ingestion = IngestionService::new(Arc::clone(&storage));
+ ingestion
+ .process_write_request(WriteRequest {
+ timeseries: vec![TimeSeries {
+ labels: vec![
+ Label {
+ name: "__name__".to_string(),
+ value: "grpc_metric".to_string(),
+ },
+ Label {
+ name: "job".to_string(),
+ value: "nightlight".to_string(),
+ },
+ ],
+ samples: vec![Sample {
+ value: 12.5,
+ timestamp: 1_000,
+ }],
+ }],
+ })
+ .await
+ .unwrap();
+
+ let service = MetricQueryServiceImpl::new(QueryService::from_storage(storage.queryable()));
+ let response = service
+ .instant_query(Request::new(InstantQueryRequest {
+ query: "grpc_metric{job=\"nightlight\"}".to_string(),
+ time: 2_000,
+ timeout: 0,
+ }))
+ .await
+ .unwrap()
+ .into_inner();
+
+ assert_eq!(response.status, "success");
+ let data = response.data.unwrap();
+ assert_eq!(data.result.len(), 1);
+ assert_eq!(
+ data.result[0].metric.get("__name__").map(String::as_str),
+ Some("grpc_metric")
+ );
+ assert_eq!(data.result[0].value.as_ref().map(|value| value.value), Some(12.5));
+ }
+
+ #[tokio::test]
+ async fn metadata_queries_grpc_filter_series() {
+ let dir = tempfile::tempdir().unwrap();
+ let storage = Arc::new(Storage::new(dir.path().to_str().unwrap()).unwrap());
+ let ingestion = IngestionService::new(Arc::clone(&storage));
+ ingestion
+ .process_write_request(WriteRequest {
+ timeseries: vec![
+ TimeSeries {
+ labels: vec![
+ Label {
+ name: "__name__".to_string(),
+ value: "grpc_metric".to_string(),
+ },
+ Label {
+ name: "job".to_string(),
+ value: "api".to_string(),
+ },
+ ],
+ samples: vec![Sample {
+ value: 1.0,
+ timestamp: 1_000,
+ }],
+ },
+ TimeSeries {
+ labels: vec![
+ Label {
+ name: "__name__".to_string(),
+ value: "grpc_metric".to_string(),
+ },
+ Label {
+ name: "job".to_string(),
+ value: "worker".to_string(),
+ },
+ ],
+ samples: vec![Sample {
+ value: 2.0,
+ timestamp: 2_000,
+ }],
+ },
+ ],
+ })
+ .await
+ .unwrap();
+
+ let service = MetricQueryServiceImpl::new(QueryService::from_storage(storage.queryable()));
+ let series = service
+ .series_query(Request::new(SeriesQueryRequest {
+ r#match: vec!["job=api".to_string()],
+ start: 0,
+ end: 0,
+ }))
+ .await
+ .unwrap()
+ .into_inner();
+ assert_eq!(series.status, "success");
+ assert_eq!(series.data.len(), 1);
+
+ let label_values = service
+ .label_values_query(Request::new(LabelValuesRequest {
+ label_name: "job".to_string(),
+ r#match: vec!["__name__=grpc_metric".to_string()],
+ start: 0,
+ end: 0,
+ }))
+ .await
+ .unwrap()
+ .into_inner();
+ assert_eq!(label_values.status, "success");
+ assert_eq!(label_values.data, vec!["api".to_string(), "worker".to_string()]);
+ }
+
+ #[tokio::test]
+ async fn admin_stats_report_ingestion_and_query_counters() {
+ let dir = tempfile::tempdir().unwrap();
+ let storage = Arc::new(Storage::new(dir.path().to_str().unwrap()).unwrap());
+ let ingestion = IngestionService::new(Arc::clone(&storage));
+ ingestion
+ .process_write_request(WriteRequest {
+ timeseries: vec![TimeSeries {
+ labels: vec![Label {
+ name: "__name__".to_string(),
+ value: "admin_metric".to_string(),
+ }],
+ samples: vec![Sample {
+ value: 3.0,
+ timestamp: 1_000,
+ }],
+ }],
+ })
+ .await
+ .unwrap();
+
+ let query = QueryService::from_storage(storage.queryable());
+ query.execute_instant_query("admin_metric", 2_000).await.unwrap();
+
+ let admin = AdminServiceImpl::new(
+ Arc::clone(&storage),
+ ingestion.metrics(),
+ query.metrics(),
+ );
+ let stats = admin
+ .stats(Request::new(StatsRequest {}))
+ .await
+ .unwrap()
+ .into_inner();
+
+ assert_eq!(stats.storage.as_ref().map(|value| value.total_samples), Some(1));
+ assert_eq!(
+ stats.ingestion
+ .as_ref()
+ .map(|value| value.samples_ingested_total),
+ Some(1)
+ );
+ assert_eq!(stats.query.as_ref().map(|value| value.queries_total), Some(1));
+ }
+}
diff --git a/nightlight/crates/nightlight-server/src/ingestion.rs b/nightlight/crates/nightlight-server/src/ingestion.rs
index 54f6589..93cc8f7 100644
--- a/nightlight/crates/nightlight-server/src/ingestion.rs
+++ b/nightlight/crates/nightlight-server/src/ingestion.rs
@@ -16,10 +16,11 @@ use nightlight_types::Error;
use prost::Message;
use snap::raw::Decoder as SnappyDecoder;
use std::sync::Arc;
-use tokio::sync::RwLock;
+use std::sync::atomic::{AtomicU64, Ordering};
+use std::time::Instant;
use tracing::{debug, error, info, warn};
-use crate::query::QueryableStorage;
+use crate::storage::Storage;
/// Maximum write request size (10 MB uncompressed)
const MAX_REQUEST_SIZE: usize = 10 * 1024 * 1024;
@@ -27,28 +28,33 @@ const MAX_REQUEST_SIZE: usize = 10 * 1024 * 1024;
/// Ingestion service state
#[derive(Clone)]
pub struct IngestionService {
- storage: Arc>,
+ storage: Arc,
metrics: Arc,
}
-/// Ingestion metrics for monitoring
-struct IngestionMetrics {
- samples_received: Arc,
- samples_invalid: Arc,
- requests_total: Arc,
- requests_failed: Arc,
+#[derive(Debug)]
+pub struct IngestionMetrics {
+ samples_received: AtomicU64,
+ samples_invalid: AtomicU64,
+ requests_total: AtomicU64,
+ requests_failed: AtomicU64,
+ started_at: Instant,
+}
+
+#[derive(Debug, Clone, Copy, Default)]
+pub struct IngestionMetricsSnapshot {
+ pub samples_ingested_total: u64,
+ pub write_requests_total: u64,
+ pub write_requests_failed: u64,
+ pub samples_per_second: f64,
+ pub buffer_samples: u64,
}
impl IngestionService {
- pub fn new(storage: Arc>) -> Self {
+ pub fn new(storage: Arc) -> Self {
Self {
storage,
- metrics: Arc::new(IngestionMetrics {
- samples_received: Arc::new(std::sync::atomic::AtomicU64::new(0)),
- samples_invalid: Arc::new(std::sync::atomic::AtomicU64::new(0)),
- requests_total: Arc::new(std::sync::atomic::AtomicU64::new(0)),
- requests_failed: Arc::new(std::sync::atomic::AtomicU64::new(0)),
- }),
+ metrics: Arc::new(IngestionMetrics::new()),
}
}
@@ -59,10 +65,14 @@ impl IngestionService {
.with_state(self)
}
+ pub fn metrics(&self) -> Arc {
+ Arc::clone(&self.metrics)
+ }
+
/// Process a WriteRequest and write to shared storage
- async fn process_write_request(&self, request: WriteRequest) -> Result {
- let mut storage = self.storage.write().await;
+ pub(crate) async fn process_write_request(&self, request: WriteRequest) -> Result {
let mut samples_processed = 0;
+ let mut series_to_append = Vec::new();
for ts in request.timeseries {
// Validate and normalize labels
@@ -83,7 +93,7 @@ impl IngestionService {
// Validate sample
if !sample.value.is_finite() {
warn!("Invalid sample value: {}", sample.value);
- self.metrics.samples_invalid.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
+ self.metrics.samples_invalid.fetch_add(1, Ordering::Relaxed);
continue;
}
@@ -113,20 +123,56 @@ impl IngestionService {
samples: internal_samples,
};
- // Write to shared storage (upsert merges samples)
- storage.upsert_series(time_series);
+ series_to_append.push(time_series);
}
- self.metrics.samples_received.fetch_add(samples_processed, std::sync::atomic::Ordering::Relaxed);
+ self.storage
+ .append(series_to_append)
+ .await
+ .map_err(|error| Error::Storage(error.to_string()))?;
+ self.metrics
+ .samples_received
+ .fetch_add(samples_processed, Ordering::Relaxed);
Ok(samples_processed)
}
/// Get current storage statistics
- pub async fn storage_stats(&self) -> (usize, usize) {
- let storage = self.storage.read().await;
- let total_samples: usize = storage.series.values().map(|s| s.samples.len()).sum();
- (total_samples, storage.series.len())
+ pub async fn storage_stats(&self) -> Result<(usize, usize), Error> {
+ let stats = self
+ .storage
+ .stats()
+ .await
+ .map_err(|error| Error::Storage(error.to_string()))?;
+ Ok((stats.total_samples as usize, stats.active_series as usize))
+ }
+}
+
+impl IngestionMetrics {
+ fn new() -> Self {
+ Self {
+ samples_received: AtomicU64::new(0),
+ samples_invalid: AtomicU64::new(0),
+ requests_total: AtomicU64::new(0),
+ requests_failed: AtomicU64::new(0),
+ started_at: Instant::now(),
+ }
+ }
+
+ pub fn snapshot(&self) -> IngestionMetricsSnapshot {
+ let uptime = self.started_at.elapsed().as_secs_f64();
+ let samples_ingested_total = self.samples_received.load(Ordering::Relaxed);
+ IngestionMetricsSnapshot {
+ samples_ingested_total,
+ write_requests_total: self.requests_total.load(Ordering::Relaxed),
+ write_requests_failed: self.requests_failed.load(Ordering::Relaxed),
+ samples_per_second: if uptime > 0.0 {
+ samples_ingested_total as f64 / uptime
+ } else {
+ 0.0
+ },
+ buffer_samples: 0,
+ }
}
}
@@ -135,7 +181,7 @@ async fn handle_remote_write(
State(service): State,
body: Bytes,
) -> Response {
- service.metrics.requests_total.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
+ service.metrics.requests_total.fetch_add(1, Ordering::Relaxed);
debug!("Received remote_write request, size: {} bytes", body.len());
@@ -150,7 +196,7 @@ async fn handle_remote_write(
Ok(data) => data,
Err(e) => {
error!("Snappy decompression failed: {}", e);
- return IngestionError::DecompressionFailed(e.to_string()).into_response();
+ return IngestionError::DecompressionFailed.into_response();
}
};
@@ -161,7 +207,7 @@ async fn handle_remote_write(
Ok(req) => req,
Err(e) => {
error!("Protobuf decode failed: {}", e);
- return IngestionError::InvalidProtobuf(e.to_string()).into_response();
+ return IngestionError::InvalidProtobuf.into_response();
}
};
@@ -178,18 +224,18 @@ async fn handle_remote_write(
}
Err(Error::Storage(msg)) if msg.contains("buffer full") => {
warn!("Write buffer full, returning 429");
- service.metrics.requests_failed.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
+ service.metrics.requests_failed.fetch_add(1, Ordering::Relaxed);
IngestionError::Backpressure.into_response()
}
Err(Error::InvalidLabel(msg)) => {
warn!("Invalid labels: {}", msg);
- service.metrics.requests_failed.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
- IngestionError::InvalidLabels(msg).into_response()
+ service.metrics.requests_failed.fetch_add(1, Ordering::Relaxed);
+ IngestionError::InvalidLabels.into_response()
}
Err(e) => {
error!("Failed to process write request: {}", e);
- service.metrics.requests_failed.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
- IngestionError::StorageError(e.to_string()).into_response()
+ service.metrics.requests_failed.fetch_add(1, Ordering::Relaxed);
+ IngestionError::StorageError.into_response()
}
}
}
@@ -271,10 +317,10 @@ fn compute_series_fingerprint(labels: &[nightlight_types::Label]) -> u64 {
#[derive(Debug)]
enum IngestionError {
PayloadTooLarge,
- DecompressionFailed(String),
- InvalidProtobuf(String),
- InvalidLabels(String),
- StorageError(String),
+ DecompressionFailed,
+ InvalidProtobuf,
+ InvalidLabels,
+ StorageError,
Backpressure,
}
@@ -284,16 +330,16 @@ impl IntoResponse for IngestionError {
IngestionError::PayloadTooLarge => {
(StatusCode::PAYLOAD_TOO_LARGE, "Request payload too large")
}
- IngestionError::DecompressionFailed(_) => {
+ IngestionError::DecompressionFailed => {
(StatusCode::BAD_REQUEST, "Snappy decompression failed")
}
- IngestionError::InvalidProtobuf(_) => {
+ IngestionError::InvalidProtobuf => {
(StatusCode::BAD_REQUEST, "Invalid protobuf encoding")
}
- IngestionError::InvalidLabels(_) => {
+ IngestionError::InvalidLabels => {
(StatusCode::BAD_REQUEST, "Invalid metric labels")
}
- IngestionError::StorageError(_) => {
+ IngestionError::StorageError => {
(StatusCode::INTERNAL_SERVER_ERROR, "Storage error")
}
IngestionError::Backpressure => {
@@ -308,6 +354,7 @@ impl IntoResponse for IngestionError {
#[cfg(test)]
mod tests {
use super::*;
+ use crate::storage::Storage;
#[test]
fn test_validate_labels_success() {
@@ -378,16 +425,58 @@ mod tests {
#[tokio::test]
async fn test_ingestion_service_storage() {
- use crate::query::QueryableStorage;
- use std::collections::HashMap;
-
- let storage = Arc::new(RwLock::new(QueryableStorage {
- series: HashMap::new(),
- label_index: HashMap::new(),
- }));
+ let dir = tempfile::tempdir().unwrap();
+ let storage = Arc::new(Storage::new(dir.path().to_str().unwrap()).unwrap());
let service = IngestionService::new(storage);
- let (samples, series) = service.storage_stats().await;
+ let (samples, series) = service.storage_stats().await.unwrap();
assert_eq!(samples, 0);
assert_eq!(series, 0);
}
+
+ #[tokio::test]
+ async fn test_process_write_request_persists_samples() {
+ let dir = tempfile::tempdir().unwrap();
+ let storage = Arc::new(Storage::new(dir.path().to_str().unwrap()).unwrap());
+ let service = IngestionService::new(Arc::clone(&storage));
+
+ let request = WriteRequest {
+ timeseries: vec![nightlight_api::prometheus::TimeSeries {
+ labels: vec![
+ Label {
+ name: "__name__".to_string(),
+ value: "ingest_metric".to_string(),
+ },
+ Label {
+ name: "job".to_string(),
+ value: "test".to_string(),
+ },
+ ],
+ samples: vec![nightlight_api::prometheus::Sample {
+ value: 42.0,
+ timestamp: 1_000,
+ }],
+ }],
+ };
+
+ let processed = service.process_write_request(request).await.unwrap();
+ assert_eq!(processed, 1);
+
+ storage.flush().await.unwrap();
+ let reloaded = Storage::new(dir.path().to_str().unwrap()).unwrap();
+ let ids = reloaded
+ .find_series(vec![
+ "__name__=ingest_metric".to_string(),
+ "job=test".to_string(),
+ ])
+ .await
+ .unwrap();
+ assert_eq!(ids.len(), 1);
+ let series = reloaded
+ .query_series(ids[0], 0, 10_000)
+ .await
+ .unwrap()
+ .unwrap();
+ assert_eq!(series.samples.len(), 1);
+ assert_eq!(series.samples[0].value, 42.0);
+ }
}
diff --git a/nightlight/crates/nightlight-server/src/lib.rs b/nightlight/crates/nightlight-server/src/lib.rs
index a547b0d..a72db3f 100644
--- a/nightlight/crates/nightlight-server/src/lib.rs
+++ b/nightlight/crates/nightlight-server/src/lib.rs
@@ -3,6 +3,7 @@
//! This library exposes the internal modules for integration testing.
pub mod config;
+pub mod grpc;
pub mod ingestion;
pub mod query;
pub mod storage;
diff --git a/nightlight/crates/nightlight-server/src/main.rs b/nightlight/crates/nightlight-server/src/main.rs
index 7c9af1d..3250ed5 100644
--- a/nightlight/crates/nightlight-server/src/main.rs
+++ b/nightlight/crates/nightlight-server/src/main.rs
@@ -1,127 +1,202 @@
-//! Nightlight Server
+//! Nightlight server binary.
//!
-//! A Prometheus-compatible metrics storage system with mTLS support.
-//!
-//! # Architecture
-//!
-//! - **Ingestion**: Prometheus remote_write API (HTTP POST with snappy compression)
-//! - **Query**: PromQL query engine (gRPC and HTTP APIs)
-//! - **Storage**: Time-series database with retention and compaction
-//! - **Security**: mTLS for all connections (following T027 patterns)
-//!
-//! # Configuration
-//!
-//! Configuration is loaded from a YAML file (default: config.yaml).
-//! See config.rs for the full configuration schema.
+//! Nightlight exposes:
+//! - Prometheus remote_write ingestion over HTTP
+//! - PromQL-compatible query endpoints over HTTP and gRPC
+//! - gRPC admin endpoints for health and stats
+//! - durable local storage backed by a WAL and snapshots
+
+use std::net::SocketAddr;
+use std::sync::Arc;
+use std::time::Duration;
use anyhow::Result;
-use tracing::{info, Level};
+use axum::{routing::get, Router};
+use nightlight_api::nightlight::admin_server::AdminServer;
+use nightlight_api::nightlight::metric_query_server::MetricQueryServer;
+use tokio::time::MissedTickBehavior;
+use tonic::transport::Server as TonicServer;
+use tonic_health::server::health_reporter;
+use tracing::{error, info, warn, Level};
mod config;
+mod grpc;
mod ingestion;
mod query;
mod storage;
-use config::Config;
+use config::{Config, StorageConfig};
+use grpc::{AdminServiceImpl, MetricQueryServiceImpl};
+use ingestion::IngestionService;
+use query::QueryService;
+use storage::Storage;
+
+const DEFAULT_SNAPSHOT_INTERVAL_SECS: u64 = 30;
#[tokio::main]
async fn main() -> Result<()> {
- // Initialize tracing subscriber for structured logging
tracing_subscriber::fmt()
.with_max_level(Level::INFO)
.with_target(false)
.with_thread_ids(true)
.init();
- info!("Nightlight server starting...");
+ info!("Nightlight server starting");
info!("Version: {}", env!("CARGO_PKG_VERSION"));
- // Load configuration from file or use defaults
let mut config = match Config::from_file("config.yaml") {
- Ok(cfg) => {
+ Ok(config) => {
info!("Configuration loaded from config.yaml");
- cfg
+ config
}
- Err(e) => {
- info!("Failed to load config.yaml: {}, using defaults", e);
+ Err(error) => {
+ info!("Failed to load config.yaml: {}, using defaults", error);
Config::default()
}
};
-
- // Apply environment variable overrides (for NixOS module integration)
config.apply_env_overrides();
+ if config.tls.is_some() {
+ warn!("Nightlight TLS configuration is currently ignored; starting plaintext listeners");
+ }
+
+ let http_addr: SocketAddr = config.server.http_addr.parse()?;
+ let grpc_addr: SocketAddr = config.server.grpc_addr.parse()?;
+ let storage = Arc::new(Storage::new(&config.storage.data_dir)?);
+ let query_service = QueryService::from_storage(storage.queryable());
+ let ingestion_service = IngestionService::new(Arc::clone(&storage));
+ let admin_service = AdminServiceImpl::new(
+ Arc::clone(&storage),
+ ingestion_service.metrics(),
+ query_service.metrics(),
+ );
+ let metric_query_service = MetricQueryServiceImpl::new(query_service.clone());
+
info!("Server configuration:");
- info!(" gRPC address: {}", config.server.grpc_addr);
- info!(" HTTP address: {}", config.server.http_addr);
+ info!(" HTTP address: {}", http_addr);
+ info!(" gRPC address: {}", grpc_addr);
info!(" Data directory: {}", config.storage.data_dir);
info!(" Retention: {} days", config.storage.retention_days);
info!(
- " TLS enabled: {}",
- config.tls.as_ref().map_or("no", |_| "yes")
+ " Compaction interval: {} seconds",
+ config.storage.compaction_interval_seconds
);
- // TODO (S5): Initialize storage layer
- // let storage = storage::Storage::new(&config.storage)?;
- // info!("Storage initialized");
+ let http_listener = tokio::net::TcpListener::bind(http_addr).await?;
+ let http_app = Router::new()
+ .route("/healthz", get(healthz))
+ .merge(ingestion_service.clone().router())
+ .merge(query_service.clone().router());
- // S5: Load persistent state from disk
- let data_path = std::path::PathBuf::from(&config.storage.data_dir)
- .join("nightlight.db");
- let query_service = query::QueryService::new_with_persistence(&data_path)?;
- info!("Query service initialized");
+ let (mut health_reporter, health_service) = health_reporter();
+ health_reporter
+ .set_serving::>()
+ .await;
+ health_reporter
+ .set_serving::>()
+ .await;
- // Initialize ingestion service with shared storage
- let shared_storage = query_service.storage();
- let ingestion_service = ingestion::IngestionService::new(shared_storage);
- info!("Ingestion service initialized (sharing storage with query service)");
+ let (shutdown_tx, _) = tokio::sync::broadcast::channel::<()>(1);
+ let mut http_shutdown = shutdown_tx.subscribe();
+ let mut grpc_shutdown = shutdown_tx.subscribe();
+ let maintenance_shutdown = shutdown_tx.subscribe();
- // Clone for shutdown handler
- let query_service_for_shutdown = query_service.clone();
- let data_path_for_shutdown = data_path.clone();
+ let http_server = async move {
+ axum::serve(http_listener, http_app)
+ .with_graceful_shutdown(async move {
+ let _ = http_shutdown.recv().await;
+ })
+ .await
+ };
- // Create router with both ingestion and query endpoints
- let app = ingestion_service.router().merge(query_service.router());
+ let grpc_server = TonicServer::builder()
+ .add_service(health_service)
+ .add_service(MetricQueryServer::new(metric_query_service))
+ .add_service(AdminServer::new(admin_service))
+ .serve_with_shutdown(grpc_addr, async move {
+ let _ = grpc_shutdown.recv().await;
+ });
- // Start HTTP server for both ingestion and query endpoints
- let listener = tokio::net::TcpListener::bind(&config.server.http_addr).await?;
- info!("HTTP server listening on {}", config.server.http_addr);
+ let maintenance_handle = tokio::spawn(maintenance_loop(
+ Arc::clone(&storage),
+ config.storage.clone(),
+ maintenance_shutdown,
+ ));
+
+ info!("HTTP server listening on {}", http_addr);
info!(" - Ingestion: POST /api/v1/write");
info!(" - Query: GET /api/v1/query, /api/v1/query_range");
info!(" - Metadata: GET /api/v1/series, /api/v1/label/:name/values");
+ info!(" - Health: GET /healthz");
+ info!("gRPC server listening on {}", grpc_addr);
+ info!(" - MetricQuery.InstantQuery / RangeQuery / SeriesQuery / LabelValuesQuery");
+ info!(" - Admin.Health / Stats / BuildInfo");
- // TODO (S5): Start background tasks
- // - Compaction
- // - Retention enforcement
- // - Metrics export
+ let shutdown = async {
+ tokio::signal::ctrl_c().await.expect("failed to install Ctrl+C handler");
+ };
+ tokio::pin!(shutdown);
- info!("Nightlight server ready");
- info!("Press Ctrl+C to shutdown");
+ tokio::select! {
+ result = http_server => {
+ result?;
+ }
+ result = grpc_server => {
+ result?;
+ }
+ _ = &mut shutdown => {
+ info!("Shutdown signal received");
+ }
+ }
- // Serve with graceful shutdown
- axum::serve(listener, app)
- .with_graceful_shutdown(shutdown_signal(query_service_for_shutdown, data_path_for_shutdown))
- .await?;
+ let _ = shutdown_tx.send(());
+ if let Err(error) = maintenance_handle.await {
+ error!(error = %error, "Nightlight maintenance task failed to join");
+ }
info!("Nightlight server stopped");
Ok(())
}
-async fn shutdown_signal(
- query_service: query::QueryService,
- data_path: std::path::PathBuf,
+async fn maintenance_loop(
+ storage: Arc,
+ config: StorageConfig,
+ mut shutdown: tokio::sync::broadcast::Receiver<()>,
) {
- tokio::signal::ctrl_c()
- .await
- .expect("Failed to install CTRL+C handler");
- info!("Shutdown signal received, saving data...");
+ let snapshot_interval_secs =
+ config.compaction_interval_seconds.clamp(5, DEFAULT_SNAPSHOT_INTERVAL_SECS);
+ let mut snapshot_interval = tokio::time::interval(Duration::from_secs(snapshot_interval_secs));
+ snapshot_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
- // S5: Save persistent state to disk before shutdown
- if let Err(e) = query_service.save_to_disk(&data_path).await {
- tracing::error!("Failed to save data on shutdown: {}", e);
- } else {
- info!("Data saved successfully");
+ let mut retention_interval = tokio::time::interval(Duration::from_secs(
+ config.compaction_interval_seconds.max(1),
+ ));
+ retention_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
+
+ loop {
+ tokio::select! {
+ _ = shutdown.recv() => break,
+ _ = snapshot_interval.tick() => {
+ if let Err(error) = storage.flush().await {
+ error!(error = %error, "Nightlight snapshot flush failed");
+ }
+ }
+ _ = retention_interval.tick() => {
+ if let Err(error) = storage.enforce_retention(config.retention_days).await {
+ error!(error = %error, "Nightlight retention sweep failed");
+ }
+ if let Err(error) = storage.compact().await {
+ error!(error = %error, "Nightlight compaction checkpoint failed");
+ }
+ }
+ }
}
- info!("Stopping server...");
+ if let Err(error) = storage.flush().await {
+ error!(error = %error, "Nightlight final snapshot flush failed");
+ }
+}
+
+async fn healthz() -> &'static str {
+ "ok"
}
diff --git a/nightlight/crates/nightlight-server/src/query.rs b/nightlight/crates/nightlight-server/src/query.rs
index 7ef90c0..99c064b 100644
--- a/nightlight/crates/nightlight-server/src/query.rs
+++ b/nightlight/crates/nightlight-server/src/query.rs
@@ -6,10 +6,11 @@
use axum::{
extract::{Path, Query, State},
http::StatusCode,
- response::{IntoResponse, Json, Response},
+ response::{IntoResponse, Json},
routing::get,
Router,
};
+use parking_lot::Mutex;
use nightlight_types::{Error, Label, Result, Sample, SeriesId, TimeSeries};
use promql_parser::{
label::Matchers,
@@ -18,16 +19,21 @@ use promql_parser::{
},
};
use serde::{Deserialize, Serialize};
-use std::collections::HashMap;
+use std::collections::{HashMap, VecDeque};
+use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
+use std::time::Instant;
use tokio::sync::RwLock;
use tracing::{debug, error, info};
+const QUERY_DURATION_HISTORY_LIMIT: usize = 512;
+
/// Query service state
#[derive(Clone)]
pub struct QueryService {
// Reference to queryable storage (shared with ingestion)
storage: Arc>,
+ metrics: Arc,
}
/// In-memory queryable storage (reads from ingestion buffer)
@@ -39,6 +45,24 @@ pub struct QueryableStorage {
pub label_index: HashMap>>,
}
+#[derive(Debug)]
+pub struct QueryMetrics {
+ queries_total: AtomicU64,
+ queries_failed: AtomicU64,
+ queries_active: AtomicU64,
+ durations_ms: Mutex>,
+}
+
+#[derive(Debug, Clone, Copy, Default)]
+pub struct QueryMetricsSnapshot {
+ pub queries_total: u64,
+ pub queries_failed: u64,
+ pub queries_active: u64,
+ pub query_duration_p50: f64,
+ pub query_duration_p95: f64,
+ pub query_duration_p99: f64,
+}
+
impl QueryService {
pub fn new() -> Self {
Self {
@@ -46,12 +70,16 @@ impl QueryService {
series: HashMap::new(),
label_index: HashMap::new(),
})),
+ metrics: Arc::new(QueryMetrics::new()),
}
}
/// Create QueryService from existing shared storage
pub fn from_storage(storage: Arc>) -> Self {
- Self { storage }
+ Self {
+ storage,
+ metrics: Arc::new(QueryMetrics::new()),
+ }
}
/// Create QueryService and load persistent state from disk if it exists
@@ -61,6 +89,7 @@ impl QueryService {
Ok(Self {
storage: Arc::new(RwLock::new(storage)),
+ metrics: Arc::new(QueryMetrics::new()),
})
}
@@ -82,17 +111,32 @@ impl QueryService {
.with_state(self)
}
+ pub fn metrics(&self) -> Arc {
+ Arc::clone(&self.metrics)
+ }
+
/// Execute an instant query at a specific timestamp
pub async fn execute_instant_query(&self, query: &str, time: i64) -> Result {
debug!("Executing instant query: {} at time {}", query, time);
+ let started = self.metrics.begin_query();
// Parse PromQL expression
let expr = promql_parser::parser::parse(query)
- .map_err(|e| Error::Query(format!("Parse error: {:?}", e)))?;
+ .map_err(|e| Error::Query(format!("Parse error: {:?}", e)));
+ let expr = match expr {
+ Ok(expr) => expr,
+ Err(error) => {
+ self.metrics.finish_query(started, false);
+ return Err(error);
+ }
+ };
// Execute the expression
let storage = self.storage.read().await;
- let result = self.evaluate_expr(&expr, time, time, 0, &storage).await?;
+ let result = self.evaluate_expr(&expr, time, time, 0, &storage).await;
+ let success = result.is_ok();
+ self.metrics.finish_query(started, success);
+ let result = result?;
Ok(QueryResult {
result_type: "vector".to_string(),
@@ -123,10 +167,31 @@ impl QueryService {
"Executing range query: {} from {} to {} step {}",
query, start, end, step
);
+ let started = self.metrics.begin_query();
+
+ if step <= 0 {
+ self.metrics.finish_query(started, false);
+ return Err(Error::InvalidTimeRange(
+ "range query step must be greater than zero".to_string(),
+ ));
+ }
+ if end < start {
+ self.metrics.finish_query(started, false);
+ return Err(Error::InvalidTimeRange(
+ "range query end must be greater than or equal to start".to_string(),
+ ));
+ }
// Parse PromQL expression
let expr = promql_parser::parser::parse(query)
- .map_err(|e| Error::Query(format!("Parse error: {:?}", e)))?;
+ .map_err(|e| Error::Query(format!("Parse error: {:?}", e)));
+ let expr = match expr {
+ Ok(expr) => expr,
+ Err(error) => {
+ self.metrics.finish_query(started, false);
+ return Err(error);
+ }
+ };
let storage = self.storage.read().await;
let mut results: HashMap = HashMap::new();
@@ -136,7 +201,14 @@ impl QueryService {
while current_time <= end {
let step_result = self
.evaluate_expr(&expr, current_time, end, step, &storage)
- .await?;
+ .await;
+ let step_result = match step_result {
+ Ok(step_result) => step_result,
+ Err(error) => {
+ self.metrics.finish_query(started, false);
+ return Err(error);
+ }
+ };
for ts in step_result {
// Create a unique key for this series based on labels
@@ -169,10 +241,12 @@ impl QueryService {
current_time += step;
}
- Ok(RangeQueryResult {
+ let result = RangeQueryResult {
result_type: "matrix".to_string(),
result: results.into_values().collect(),
- })
+ };
+ self.metrics.finish_query(started, true);
+ Ok(result)
}
/// Evaluate a PromQL expression (recursive with boxing for async)
@@ -589,9 +663,108 @@ impl QueryService {
true
}
- /// Get storage handle (for ingestion integration)
- pub fn storage(&self) -> Arc> {
- self.storage.clone()
+ pub async fn series_metadata(
+ &self,
+ matchers: &[String],
+ start: Option,
+ end: Option,
+ ) -> Result>> {
+ let started = self.metrics.begin_query();
+ let storage = self.storage.read().await;
+ let series = self.matching_series(&storage, matchers, start, end);
+ let result = Ok(series
+ .into_iter()
+ .map(|ts| {
+ ts.labels
+ .iter()
+ .map(|label| (label.name.clone(), label.value.clone()))
+ .collect()
+ })
+ .collect());
+ self.metrics.finish_query(started, true);
+ result
+ }
+
+ pub async fn label_values_for_matchers(
+ &self,
+ label_name: &str,
+ matchers: &[String],
+ start: Option,
+ end: Option,
+ ) -> Result> {
+ let started = self.metrics.begin_query();
+ let storage = self.storage.read().await;
+ let mut values: Vec = self
+ .matching_series(&storage, matchers, start, end)
+ .into_iter()
+ .filter_map(|series| series.get_label(label_name).map(str::to_string))
+ .collect();
+ values.sort();
+ values.dedup();
+ self.metrics.finish_query(started, true);
+ Ok(values)
+ }
+
+ fn matching_series(
+ &self,
+ storage: &QueryableStorage,
+ matchers: &[String],
+ start: Option,
+ end: Option,
+ ) -> Vec {
+ let parsed_matchers = parse_label_matchers(matchers);
+ storage
+ .series
+ .values()
+ .filter(|series| series_matches(series, &parsed_matchers))
+ .filter(|series| series_in_time_range(series, start, end))
+ .cloned()
+ .collect()
+ }
+}
+
+impl QueryMetrics {
+ fn new() -> Self {
+ Self {
+ queries_total: AtomicU64::new(0),
+ queries_failed: AtomicU64::new(0),
+ queries_active: AtomicU64::new(0),
+ durations_ms: Mutex::new(VecDeque::with_capacity(QUERY_DURATION_HISTORY_LIMIT)),
+ }
+ }
+
+ fn begin_query(&self) -> Instant {
+ self.queries_total.fetch_add(1, Ordering::Relaxed);
+ self.queries_active.fetch_add(1, Ordering::Relaxed);
+ Instant::now()
+ }
+
+ fn finish_query(&self, started: Instant, success: bool) {
+ if !success {
+ self.queries_failed.fetch_add(1, Ordering::Relaxed);
+ }
+ self.queries_active.fetch_sub(1, Ordering::Relaxed);
+
+ let elapsed_ms = started.elapsed().as_millis() as u64;
+ let mut durations = self.durations_ms.lock();
+ if durations.len() >= QUERY_DURATION_HISTORY_LIMIT {
+ durations.pop_front();
+ }
+ durations.push_back(elapsed_ms);
+ }
+
+ pub fn snapshot(&self) -> QueryMetricsSnapshot {
+ let mut sorted_durations: Vec = self.durations_ms.lock().iter().copied().collect();
+ sorted_durations.sort_unstable();
+
+ QueryMetricsSnapshot {
+ queries_total: self.queries_total.load(Ordering::Relaxed),
+ queries_failed: self.queries_failed.load(Ordering::Relaxed),
+ queries_active: self.queries_active.load(Ordering::Relaxed),
+ query_duration_p50: percentile(&sorted_durations, 0.50),
+ query_duration_p95: percentile(&sorted_durations, 0.95),
+ query_duration_p99: percentile(&sorted_durations, 0.99),
+ }
}
}
@@ -600,12 +773,15 @@ impl QueryableStorage {
pub fn upsert_series(&mut self, series: TimeSeries) {
// Update label index
for label in &series.labels {
- self.label_index
+ let series_ids = self
+ .label_index
.entry(label.name.clone())
.or_default()
.entry(label.value.clone())
- .or_default()
- .push(series.id);
+ .or_default();
+ if !series_ids.contains(&series.id) {
+ series_ids.push(series.id);
+ }
}
// Upsert series
@@ -624,11 +800,91 @@ impl QueryableStorage {
/// Get label values for a specific label name
pub fn label_values(&self, label_name: &str) -> Vec {
- self.label_index
+ let mut values: Vec = self
+ .label_index
.get(label_name)
.map(|values| values.keys().cloned().collect())
- .unwrap_or_default()
+ .unwrap_or_default();
+ values.sort();
+ values
}
+
+ pub fn rebuild_index(&mut self) {
+ self.label_index.clear();
+ let series: Vec = self.series.values().cloned().collect();
+ for series in series {
+ for label in &series.labels {
+ self.label_index
+ .entry(label.name.clone())
+ .or_default()
+ .entry(label.value.clone())
+ .or_default()
+ .push(series.id);
+ }
+ }
+ }
+
+ pub fn prune_before(&mut self, cutoff: i64) -> usize {
+ let mut removed_samples = 0usize;
+ self.series.retain(|_, series| {
+ let before = series.samples.len();
+ series.samples.retain(|sample| sample.timestamp >= cutoff);
+ removed_samples += before.saturating_sub(series.samples.len());
+ !series.samples.is_empty()
+ });
+ self.rebuild_index();
+ removed_samples
+ }
+}
+
+fn percentile(values: &[u64], quantile: f64) -> f64 {
+ if values.is_empty() {
+ return 0.0;
+ }
+
+ let index = ((values.len() - 1) as f64 * quantile).round() as usize;
+ values[index.min(values.len() - 1)] as f64
+}
+
+fn parse_label_matchers(matchers: &[String]) -> Vec<(String, String)> {
+ matchers
+ .iter()
+ .filter_map(|matcher| matcher.split_once('='))
+ .map(|(key, value)| {
+ (
+ key.trim().to_string(),
+ value.trim().trim_matches('"').to_string(),
+ )
+ })
+ .collect()
+}
+
+fn series_matches(series: &TimeSeries, matchers: &[(String, String)]) -> bool {
+ matchers.iter().all(|(key, value)| {
+ series
+ .labels
+ .iter()
+ .any(|label| &label.name == key && &label.value == value)
+ })
+}
+
+fn series_in_time_range(series: &TimeSeries, start: Option, end: Option) -> bool {
+ let Some((series_start, series_end)) = series.time_range() else {
+ return true;
+ };
+
+ if let Some(start) = start {
+ if series_end < start {
+ return false;
+ }
+ }
+ if let Some(end) = end {
+ if series_start > end {
+ return false;
+ }
+ }
+
+ true
}
/// HTTP handler for instant queries
@@ -696,46 +952,57 @@ async fn handle_range_query(
async fn handle_label_values(
State(service): State,
Path(label_name): Path,
+ Query(params): Query,
) -> impl IntoResponse {
- let storage = service.storage.read().await;
- let values = storage.label_values(&label_name);
-
- (
- StatusCode::OK,
- Json(LabelValuesResponse {
- status: "success".to_string(),
- data: values,
- }),
- )
+ match service
+ .label_values_for_matchers(&label_name, ¶ms.matchers, params.start, params.end)
+ .await
+ {
+ Ok(values) => (
+ StatusCode::OK,
+ Json(LabelValuesResponse {
+ status: "success".to_string(),
+ data: values,
+ }),
+ )
+ .into_response(),
+ Err(error) => (
+ StatusCode::BAD_REQUEST,
+ Json(serde_json::json!({
+ "status": "error",
+ "error": error.to_string(),
+ })),
+ )
+ .into_response(),
+ }
}
/// HTTP handler for series metadata
async fn handle_series(
State(service): State,
- Query(_params): Query,
+ Query(params): Query,
) -> impl IntoResponse {
- let storage = service.storage.read().await;
-
- // Return all series metadata (limited implementation)
- let series: Vec> = storage
- .series
- .values()
- .take(1000) // Limit to prevent OOM
- .map(|ts| {
- ts.labels
- .iter()
- .map(|l| (l.name.clone(), l.value.clone()))
- .collect()
- })
- .collect();
-
- (
- StatusCode::OK,
- Json(SeriesResponse {
- status: "success".to_string(),
- data: series,
- }),
- )
+ match service
+ .series_metadata(¶ms.matchers, params.start, params.end)
+ .await
+ {
+ Ok(series) => (
+ StatusCode::OK,
+ Json(SeriesResponse {
+ status: "success".to_string(),
+ data: series,
+ }),
+ )
+ .into_response(),
+ Err(error) => (
+ StatusCode::BAD_REQUEST,
+ Json(serde_json::json!({
+ "status": "error",
+ "error": error.to_string(),
+ })),
+ )
+ .into_response(),
+ }
}
// Request/Response Types
@@ -760,6 +1027,10 @@ struct SeriesQueryParams {
#[serde(default)]
#[serde(rename = "match[]")]
matchers: Vec,
+ #[serde(default)]
+ start: Option,
+ #[serde(default)]
+ end: Option,
}
#[derive(Debug, Serialize)]
@@ -770,30 +1041,30 @@ struct QueryResponse {
error_type: Option,
}
-#[derive(Debug, Serialize)]
-pub(crate) struct QueryResult {
+#[derive(Debug, Clone, Serialize)]
+pub struct QueryResult {
#[serde(rename = "resultType")]
- result_type: String,
- result: Vec,
+ pub result_type: String,
+ pub result: Vec,
}
-#[derive(Debug, Serialize)]
-struct InstantQueryResult {
- metric: HashMap,
- value: Option<(i64, f64)>,
+#[derive(Debug, Clone, Serialize)]
+pub struct InstantQueryResult {
+ pub metric: HashMap,
+ pub value: Option<(i64, f64)>,
}
-#[derive(Debug, Serialize)]
-pub(crate) struct RangeQueryResult {
+#[derive(Debug, Clone, Serialize)]
+pub struct RangeQueryResult {
#[serde(rename = "resultType")]
- result_type: String,
- result: Vec,
+ pub result_type: String,
+ pub result: Vec,
}
-#[derive(Debug, Serialize)]
-struct RangeResult {
- metric: HashMap,
- values: Vec<(i64, f64)>,
+#[derive(Debug, Clone, Serialize)]
+pub struct RangeResult {
+ pub metric: HashMap,
+ pub values: Vec<(i64, f64)>,
}
#[derive(Debug, Serialize)]
@@ -808,29 +1079,6 @@ struct SeriesResponse {
data: Vec>,
}
-#[derive(Debug)]
-enum QueryError {
- ParseFailed(String),
- ExecutionFailed(String),
-}
-
-impl IntoResponse for QueryError {
- fn into_response(self) -> Response {
- let (status, message) = match self {
- QueryError::ParseFailed(msg) => (StatusCode::BAD_REQUEST, msg),
- QueryError::ExecutionFailed(msg) => (StatusCode::INTERNAL_SERVER_ERROR, msg),
- };
-
- let body = serde_json::json!({
- "status": "error",
- "errorType": "execution",
- "error": message
- });
-
- (status, Json(body)).into_response()
- }
-}
-
impl Default for QueryService {
fn default() -> Self {
Self::new()
@@ -892,8 +1140,9 @@ impl QueryableStorage {
.map_err(|e| Error::Storage(format!("Failed to read file: {}", e)))?;
// Deserialize from bincode
- let storage = bincode::deserialize(&buffer)
+ let mut storage: Self = bincode::deserialize(&buffer)
.map_err(|e| Error::Storage(format!("Deserialization failed: {}", e)))?;
+ storage.rebuild_index();
Ok(storage)
}
diff --git a/nightlight/crates/nightlight-server/src/storage.rs b/nightlight/crates/nightlight-server/src/storage.rs
index 8da4126..620a315 100644
--- a/nightlight/crates/nightlight-server/src/storage.rs
+++ b/nightlight/crates/nightlight-server/src/storage.rs
@@ -1,12 +1,8 @@
-//! Time-series storage layer
-//!
-//! シンプルなWAL付きストレージ実装(S5足場)。
-//! - in-memory head: `QueryableStorage` を共有
-//! - WAL: bincode長さプレフィックスでappend / replay
-//! - スナップショット: `nightlight.db` にbincode保存
-//! - Retention/compactionは将来タスク(現状no-op)
+//! Time-series storage layer backed by an in-memory head, a write-ahead log,
+//! and periodic snapshots.
use anyhow::Result;
+use chrono::Utc;
use nightlight_types::{SeriesId, TimeSeries};
use std::{
fs::{File, OpenOptions},
@@ -15,16 +11,15 @@ use std::{
sync::Arc,
};
use tokio::sync::{Mutex, RwLock};
+use tracing::debug;
use crate::query::QueryableStorage;
-/// WALレコード
#[derive(serde::Serialize, serde::Deserialize)]
struct WalRecord {
series: TimeSeries,
}
-/// ストレージ本体
pub struct Storage {
head: Arc>,
wal_path: PathBuf,
@@ -33,7 +28,6 @@ pub struct Storage {
}
impl Storage {
- /// data_dirを初期化し、snapshot + WALをリプレイする
pub fn new(data_dir: &str) -> Result {
let data_dir = PathBuf::from(data_dir);
std::fs::create_dir_all(&data_dir)?;
@@ -41,12 +35,11 @@ impl Storage {
let snapshot_path = data_dir.join("nightlight.db");
let wal_path = data_dir.join("wal.log");
- // snapshotロード
let mut head = QueryableStorage::load_from_file(&snapshot_path)?;
- // WALリプレイ
if wal_path.exists() {
replay_wal(&wal_path, &mut head)?;
}
+ head.rebuild_index();
Ok(Self {
head: Arc::new(RwLock::new(head)),
@@ -56,13 +49,15 @@ impl Storage {
})
}
- /// 共有QueryableStorageを取得
pub fn queryable(&self) -> Arc> {
- self.head.clone()
+ Arc::clone(&self.head)
}
- /// WALへappendし、headへ反映
pub async fn append(&self, series_list: Vec) -> Result<()> {
+ if series_list.is_empty() {
+ return Ok(());
+ }
+
let _guard = self.wal_lock.lock().await;
let mut wal_file = OpenOptions::new()
.create(true)
@@ -78,14 +73,12 @@ impl Storage {
let len = encoded.len() as u32;
wal_file.write_all(&len.to_le_bytes())?;
wal_file.write_all(&encoded)?;
-
head.upsert_series(series);
}
wal_file.flush()?;
Ok(())
}
- /// 指定IDのシリーズを時間範囲で返す
pub async fn query_series(
&self,
series_id: SeriesId,
@@ -93,110 +86,142 @@ impl Storage {
end: i64,
) -> Result