photoncloud-monorepo/nightlight/crates/nightlight-server/src/query.rs
centra 3eeb303dcb feat: Batch commit for T039.S3 deployment
Includes all pending changes needed for nixos-anywhere:
- fiberlb: L7 policy, rule, certificate types
- deployer: New service for cluster management
- nix-nos: Generic network modules
- Various service updates and fixes

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-13 04:34:51 +09:00

1122 lines
35 KiB
Rust

//! PromQL query engine
//!
//! Implements a subset of PromQL for querying time-series data.
//! Provides instant and range query execution with basic aggregations.
use axum::{
extract::{Path, Query, State},
http::StatusCode,
response::{IntoResponse, Json, Response},
routing::get,
Router,
};
use nightlight_types::{Error, Label, Result, Sample, SeriesId, TimeSeries};
use promql_parser::{
label::Matchers,
parser::{
AggregateExpr, BinaryExpr, Call, Expr, MatrixSelector, VectorSelector,
},
};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{debug, error, info};
/// Query service state
#[derive(Clone)]
pub struct QueryService {
// Reference to queryable storage (shared with ingestion)
storage: Arc<RwLock<QueryableStorage>>,
}
/// In-memory queryable storage (reads from ingestion buffer)
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct QueryableStorage {
// Series metadata indexed by SeriesId
pub series: HashMap<SeriesId, TimeSeries>,
// Inverted index: label name -> label value -> [SeriesId]
pub label_index: HashMap<String, HashMap<String, Vec<SeriesId>>>,
}
impl QueryService {
pub fn new() -> Self {
Self {
storage: Arc::new(RwLock::new(QueryableStorage {
series: HashMap::new(),
label_index: HashMap::new(),
})),
}
}
/// Create QueryService from existing shared storage
pub fn from_storage(storage: Arc<RwLock<QueryableStorage>>) -> Self {
Self { storage }
}
/// Create QueryService and load persistent state from disk if it exists
pub fn new_with_persistence(data_path: &std::path::Path) -> Result<Self> {
let storage = QueryableStorage::load_from_file(data_path)?;
info!("Loaded {} series from persistent storage", storage.series.len());
Ok(Self {
storage: Arc::new(RwLock::new(storage)),
})
}
/// Save current storage state to disk
pub async fn save_to_disk(&self, data_path: &std::path::Path) -> Result<()> {
let storage = self.storage.read().await;
storage.save_to_file(data_path)?;
debug!("Saved {} series to persistent storage", storage.series.len());
Ok(())
}
/// Create Axum router for query endpoints
pub fn router(self) -> Router {
Router::new()
.route("/api/v1/query", get(handle_instant_query))
.route("/api/v1/query_range", get(handle_range_query))
.route("/api/v1/label/:label_name/values", get(handle_label_values))
.route("/api/v1/series", get(handle_series))
.with_state(self)
}
/// Execute an instant query at a specific timestamp
pub async fn execute_instant_query(&self, query: &str, time: i64) -> Result<QueryResult> {
debug!("Executing instant query: {} at time {}", query, time);
// Parse PromQL expression
let expr = promql_parser::parser::parse(query)
.map_err(|e| Error::Query(format!("Parse error: {:?}", e)))?;
// Execute the expression
let storage = self.storage.read().await;
let result = self.evaluate_expr(&expr, time, time, 0, &storage).await?;
Ok(QueryResult {
result_type: "vector".to_string(),
result: result
.into_iter()
.map(|ts| {
let metric = ts
.labels
.into_iter()
.map(|l| (l.name, l.value))
.collect();
let value = ts.samples.last().map(|s| (s.timestamp, s.value));
InstantQueryResult { metric, value }
})
.collect(),
})
}
/// Execute a range query over a time range with step
pub async fn execute_range_query(
&self,
query: &str,
start: i64,
end: i64,
step: i64,
) -> Result<RangeQueryResult> {
debug!(
"Executing range query: {} from {} to {} step {}",
query, start, end, step
);
// Parse PromQL expression
let expr = promql_parser::parser::parse(query)
.map_err(|e| Error::Query(format!("Parse error: {:?}", e)))?;
let storage = self.storage.read().await;
let mut results: HashMap<String, RangeResult> = HashMap::new();
// Evaluate at each step
let mut current_time = start;
while current_time <= end {
let step_result = self
.evaluate_expr(&expr, current_time, end, step, &storage)
.await?;
for ts in step_result {
// Create a unique key for this series based on labels
let series_key = ts
.labels
.iter()
.map(|l| format!("{}={}", l.name, l.value))
.collect::<Vec<_>>()
.join(",");
// Find or create result entry for this series
let entry = results.entry(series_key).or_insert_with(|| {
let metric = ts
.labels
.iter()
.map(|l| (l.name.clone(), l.value.clone()))
.collect();
RangeResult {
metric,
values: Vec::new(),
}
});
// Append samples to existing series
for sample in ts.samples {
entry.values.push((sample.timestamp, sample.value));
}
}
current_time += step;
}
Ok(RangeQueryResult {
result_type: "matrix".to_string(),
result: results.into_values().collect(),
})
}
/// Evaluate a PromQL expression (recursive with boxing for async)
fn evaluate_expr<'a>(
&'a self,
expr: &'a Expr,
time: i64,
end_time: i64,
step: i64,
storage: &'a QueryableStorage,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<TimeSeries>>> + Send + 'a>> {
Box::pin(async move {
match expr {
Expr::VectorSelector(selector) => {
// Vector selector: metric_name{label="value"}
self.evaluate_vector_selector(selector, time, storage)
}
Expr::MatrixSelector(selector) => {
// Range selector: metric_name[5m]
self.evaluate_matrix_selector(selector, time, storage)
}
Expr::Aggregate(agg) => {
// Aggregation: sum(metric), avg(metric), etc.
self.evaluate_aggregation(agg, time, end_time, step, storage)
.await
}
Expr::Call(call) => {
// Function call: rate(metric[5m]), etc.
self.evaluate_function(call, time, end_time, step, storage)
.await
}
Expr::Binary(binary) => {
// Binary operation: metric1 + metric2
self.evaluate_binary(binary, time, end_time, step, storage)
.await
}
_ => Err(Error::Query(format!(
"Unsupported expression type: {:?}",
expr
))),
}
})
}
/// Evaluate vector selector
fn evaluate_vector_selector(
&self,
selector: &VectorSelector,
time: i64,
storage: &QueryableStorage,
) -> Result<Vec<TimeSeries>> {
// Find all series matching the label matchers
let matching_series: Vec<TimeSeries> = storage
.series
.values()
.filter(|ts| self.matches_selectors(ts, &selector.matchers))
.cloned()
.map(|mut ts| {
// Filter to get sample closest to query time
ts.samples.retain(|s| s.timestamp <= time);
// Keep only the most recent sample
if let Some(last) = ts.samples.last().cloned() {
ts.samples = vec![last];
}
ts
})
.filter(|ts| !ts.samples.is_empty())
.collect();
Ok(matching_series)
}
/// Evaluate matrix selector (range selector)
fn evaluate_matrix_selector(
&self,
selector: &MatrixSelector,
time: i64,
storage: &QueryableStorage,
) -> Result<Vec<TimeSeries>> {
// Get the time range in milliseconds
let range_ms = selector.range.as_millis() as i64;
let start_time = time - range_ms;
// Evaluate underlying vector selector
let mut series = storage
.series
.values()
.filter(|ts| self.matches_selectors(ts, &selector.vs.matchers))
.cloned()
.collect::<Vec<_>>();
// Filter samples to the time range
for ts in &mut series {
ts.samples
.retain(|s| s.timestamp >= start_time && s.timestamp <= time);
}
// Remove empty series
series.retain(|ts| !ts.samples.is_empty());
Ok(series)
}
/// Evaluate aggregation (sum, avg, min, max, count)
async fn evaluate_aggregation(
&self,
agg: &AggregateExpr,
time: i64,
end_time: i64,
step: i64,
storage: &QueryableStorage,
) -> Result<Vec<TimeSeries>> {
// Evaluate the input expression
let input_series = self
.evaluate_expr(&agg.expr, time, end_time, step, storage)
.await?;
if input_series.is_empty() {
return Ok(vec![]);
}
// Perform aggregation
let aggregated_value = match agg.op.to_string().to_lowercase().as_str() {
"sum" => input_series
.iter()
.flat_map(|ts| ts.samples.iter().map(|s| s.value))
.sum(),
"avg" => {
let sum: f64 = input_series
.iter()
.flat_map(|ts| ts.samples.iter().map(|s| s.value))
.sum();
let count = input_series
.iter()
.map(|ts| ts.samples.len())
.sum::<usize>() as f64;
if count > 0.0 {
sum / count
} else {
0.0
}
}
"min" => input_series
.iter()
.flat_map(|ts| ts.samples.iter().map(|s| s.value))
.fold(f64::INFINITY, f64::min),
"max" => input_series
.iter()
.flat_map(|ts| ts.samples.iter().map(|s| s.value))
.fold(f64::NEG_INFINITY, f64::max),
"count" => input_series.len() as f64,
op => {
return Err(Error::Query(format!("Unsupported aggregation: {}", op)));
}
};
// Create result time series
Ok(vec![TimeSeries {
id: SeriesId(0), // Aggregated result has no series ID
labels: vec![Label {
name: "__name__".to_string(),
value: format!("{}()", agg.op),
}],
samples: vec![Sample {
timestamp: time,
value: aggregated_value,
}],
}])
}
/// Evaluate function call (rate, irate, increase, histogram_quantile)
async fn evaluate_function(
&self,
call: &Call,
time: i64,
end_time: i64,
step: i64,
storage: &QueryableStorage,
) -> Result<Vec<TimeSeries>> {
let func_name = &call.func.name;
match func_name.to_string().as_str() {
"rate" => self.evaluate_rate(call, time, end_time, step, storage).await,
"irate" => {
self.evaluate_irate(call, time, end_time, step, storage)
.await
}
"increase" => {
self.evaluate_increase(call, time, end_time, step, storage)
.await
}
"histogram_quantile" => {
self.evaluate_histogram_quantile(call, time, end_time, step, storage)
.await
}
_ => Err(Error::Query(format!("Unsupported function: {}", func_name))),
}
}
/// Evaluate rate() function - calculates per-second average rate of increase
async fn evaluate_rate(
&self,
call: &Call,
time: i64,
end_time: i64,
step: i64,
storage: &QueryableStorage,
) -> Result<Vec<TimeSeries>> {
// rate() expects a range vector (MatrixSelector) as argument
if call.args.args.is_empty() {
return Err(Error::Query("rate() requires a range vector argument".into()));
}
// Get the first argument - should be a MatrixSelector
let arg = &call.args.args[0];
// Evaluate the argument to get range data
let series_list = Box::pin(self.evaluate_expr(arg, time, end_time, step, storage)).await?;
// Apply rate calculation to each series
let mut result = Vec::new();
for series in series_list {
if series.samples.len() < 2 {
continue; // Need at least 2 samples for rate calculation
}
// Get first and last samples
let first = &series.samples[0];
let last = &series.samples[series.samples.len() - 1];
// Calculate time range in seconds
let duration_seconds = (last.timestamp - first.timestamp) as f64 / 1000.0;
if duration_seconds <= 0.0 {
continue;
}
// Calculate rate (per-second average)
// For counter metrics, we should handle resets, but simplified here
let value_diff = last.value - first.value;
let rate = value_diff / duration_seconds;
// Create result series with single sample at query time
result.push(TimeSeries {
id: series.id,
labels: series.labels.clone(),
samples: vec![Sample {
timestamp: time,
value: rate.max(0.0), // Rates can't be negative for counters
}],
});
}
Ok(result)
}
/// Evaluate irate() function - calculates instant rate using last two samples
async fn evaluate_irate(
&self,
call: &Call,
time: i64,
end_time: i64,
step: i64,
storage: &QueryableStorage,
) -> Result<Vec<TimeSeries>> {
// irate() expects a range vector (MatrixSelector) as argument
if call.args.args.is_empty() {
return Err(Error::Query("irate() requires a range vector argument".into()));
}
// Get the first argument - should be a MatrixSelector
let arg = &call.args.args[0];
// Evaluate the argument to get range data
let series_list = Box::pin(self.evaluate_expr(arg, time, end_time, step, storage)).await?;
// Apply irate calculation to each series
let mut result = Vec::new();
for series in series_list {
if series.samples.len() < 2 {
continue; // Need at least 2 samples for irate calculation
}
// Get last two samples
let second_last = &series.samples[series.samples.len() - 2];
let last = &series.samples[series.samples.len() - 1];
// Calculate time difference in seconds
let duration_seconds = (last.timestamp - second_last.timestamp) as f64 / 1000.0;
if duration_seconds <= 0.0 {
continue;
}
// Calculate instant rate
let value_diff = last.value - second_last.value;
let rate = value_diff / duration_seconds;
// Create result series with single sample at query time
result.push(TimeSeries {
id: series.id,
labels: series.labels.clone(),
samples: vec![Sample {
timestamp: time,
value: rate.max(0.0), // Rates can't be negative for counters
}],
});
}
Ok(result)
}
/// Evaluate increase() function - calculates total increase over time range
async fn evaluate_increase(
&self,
call: &Call,
time: i64,
end_time: i64,
step: i64,
storage: &QueryableStorage,
) -> Result<Vec<TimeSeries>> {
// increase() expects a range vector (MatrixSelector) as argument
if call.args.args.is_empty() {
return Err(Error::Query("increase() requires a range vector argument".into()));
}
// Get the first argument - should be a MatrixSelector
let arg = &call.args.args[0];
// Evaluate the argument to get range data
let series_list = Box::pin(self.evaluate_expr(arg, time, end_time, step, storage)).await?;
// Apply increase calculation to each series
let mut result = Vec::new();
for series in series_list {
if series.samples.len() < 2 {
continue; // Need at least 2 samples for increase calculation
}
// Get first and last samples
let first = &series.samples[0];
let last = &series.samples[series.samples.len() - 1];
// Calculate total increase
// For counter metrics, we should handle resets, but simplified here
let increase = last.value - first.value;
// Create result series with single sample at query time
result.push(TimeSeries {
id: series.id,
labels: series.labels.clone(),
samples: vec![Sample {
timestamp: time,
value: increase.max(0.0), // Increase can't be negative for counters
}],
});
}
Ok(result)
}
/// Evaluate histogram_quantile() function (simplified - stub for now)
async fn evaluate_histogram_quantile(
&self,
_call: &Call,
_time: i64,
_end_time: i64,
_step: i64,
_storage: &QueryableStorage,
) -> Result<Vec<TimeSeries>> {
// Simplified implementation - full histogram quantile is complex
// This would require proper histogram bucket handling
Err(Error::Query(
"histogram_quantile() not yet implemented".into(),
))
}
/// Evaluate binary operation (simplified - stub for now)
async fn evaluate_binary(
&self,
_binary: &BinaryExpr,
_time: i64,
_end_time: i64,
_step: i64,
_storage: &QueryableStorage,
) -> Result<Vec<TimeSeries>> {
// Simplified binary operation evaluation
// Full implementation would require proper vector matching
Err(Error::Query(
"Binary operations not yet fully implemented".into(),
))
}
/// Check if a time series matches label matchers
fn matches_selectors(&self, ts: &TimeSeries, matchers: &Matchers) -> bool {
for matcher in &matchers.matchers {
let label_name = &matcher.name;
let label_value = ts
.labels
.iter()
.find(|l| &l.name == label_name)
.map(|l| l.value.as_str());
// For now, simple exact match (can be enhanced)
let matches = if let Some(value) = label_value {
matcher.value == value
} else {
false
};
if !matches {
return false;
}
}
true
}
/// Get storage handle (for ingestion integration)
pub fn storage(&self) -> Arc<RwLock<QueryableStorage>> {
self.storage.clone()
}
}
impl QueryableStorage {
/// Add or update a time series in storage
pub fn upsert_series(&mut self, series: TimeSeries) {
// Update label index
for label in &series.labels {
self.label_index
.entry(label.name.clone())
.or_default()
.entry(label.value.clone())
.or_default()
.push(series.id);
}
// Upsert series
self.series
.entry(series.id)
.and_modify(|existing| {
// Merge samples (append new samples)
existing.samples.extend(series.samples.clone());
// Sort by timestamp
existing.samples.sort_by_key(|s| s.timestamp);
// Deduplicate
existing.samples.dedup_by_key(|s| s.timestamp);
})
.or_insert(series);
}
/// Get label values for a specific label name
pub fn label_values(&self, label_name: &str) -> Vec<String> {
self.label_index
.get(label_name)
.map(|values| values.keys().cloned().collect())
.unwrap_or_default()
}
}
/// HTTP handler for instant queries
#[axum::debug_handler]
async fn handle_instant_query(
State(service): State<QueryService>,
Query(params): Query<InstantQueryParams>,
) -> (StatusCode, Json<QueryResponse>) {
let time = params
.time
.unwrap_or_else(|| chrono::Utc::now().timestamp_millis());
let response = match service.execute_instant_query(&params.query, time).await {
Ok(result) => QueryResponse {
status: "success".to_string(),
data: Some(serde_json::to_value(result).unwrap()),
error: None,
error_type: None,
},
Err(e) => {
error!("Query failed: {}", e);
QueryResponse {
status: "error".to_string(),
data: None,
error: Some(e.to_string()),
error_type: Some("execution".to_string()),
}
}
};
(StatusCode::OK, Json(response))
}
/// HTTP handler for range queries
#[axum::debug_handler]
async fn handle_range_query(
State(service): State<QueryService>,
Query(params): Query<RangeQueryParams>,
) -> (StatusCode, Json<QueryResponse>) {
let response = match service
.execute_range_query(&params.query, params.start, params.end, params.step)
.await
{
Ok(result) => QueryResponse {
status: "success".to_string(),
data: Some(serde_json::to_value(result).unwrap()),
error: None,
error_type: None,
},
Err(e) => {
error!("Range query failed: {}", e);
QueryResponse {
status: "error".to_string(),
data: None,
error: Some(e.to_string()),
error_type: Some("execution".to_string()),
}
}
};
(StatusCode::OK, Json(response))
}
/// HTTP handler for label values
async fn handle_label_values(
State(service): State<QueryService>,
Path(label_name): Path<String>,
) -> impl IntoResponse {
let storage = service.storage.read().await;
let values = storage.label_values(&label_name);
(
StatusCode::OK,
Json(LabelValuesResponse {
status: "success".to_string(),
data: values,
}),
)
}
/// HTTP handler for series metadata
async fn handle_series(
State(service): State<QueryService>,
Query(_params): Query<SeriesQueryParams>,
) -> impl IntoResponse {
let storage = service.storage.read().await;
// Return all series metadata (limited implementation)
let series: Vec<HashMap<String, String>> = storage
.series
.values()
.take(1000) // Limit to prevent OOM
.map(|ts| {
ts.labels
.iter()
.map(|l| (l.name.clone(), l.value.clone()))
.collect()
})
.collect();
(
StatusCode::OK,
Json(SeriesResponse {
status: "success".to_string(),
data: series,
}),
)
}
// Request/Response Types
#[derive(Debug, Deserialize)]
struct InstantQueryParams {
query: String,
#[serde(default)]
time: Option<i64>,
}
#[derive(Debug, Deserialize)]
struct RangeQueryParams {
query: String,
start: i64,
end: i64,
step: i64,
}
#[derive(Debug, Deserialize)]
struct SeriesQueryParams {
#[serde(default)]
#[serde(rename = "match[]")]
matchers: Vec<String>,
}
#[derive(Debug, Serialize)]
struct QueryResponse {
status: String,
data: Option<serde_json::Value>,
error: Option<String>,
error_type: Option<String>,
}
#[derive(Debug, Serialize)]
pub(crate) struct QueryResult {
#[serde(rename = "resultType")]
result_type: String,
result: Vec<InstantQueryResult>,
}
#[derive(Debug, Serialize)]
struct InstantQueryResult {
metric: HashMap<String, String>,
value: Option<(i64, f64)>,
}
#[derive(Debug, Serialize)]
pub(crate) struct RangeQueryResult {
#[serde(rename = "resultType")]
result_type: String,
result: Vec<RangeResult>,
}
#[derive(Debug, Serialize)]
struct RangeResult {
metric: HashMap<String, String>,
values: Vec<(i64, f64)>,
}
#[derive(Debug, Serialize)]
struct LabelValuesResponse {
status: String,
data: Vec<String>,
}
#[derive(Debug, Serialize)]
struct SeriesResponse {
status: String,
data: Vec<HashMap<String, String>>,
}
#[derive(Debug)]
enum QueryError {
ParseFailed(String),
ExecutionFailed(String),
}
impl IntoResponse for QueryError {
fn into_response(self) -> Response {
let (status, message) = match self {
QueryError::ParseFailed(msg) => (StatusCode::BAD_REQUEST, msg),
QueryError::ExecutionFailed(msg) => (StatusCode::INTERNAL_SERVER_ERROR, msg),
};
let body = serde_json::json!({
"status": "error",
"errorType": "execution",
"error": message
});
(status, Json(body)).into_response()
}
}
impl Default for QueryService {
fn default() -> Self {
Self::new()
}
}
impl QueryableStorage {
/// Save storage state to disk using bincode serialization
pub fn save_to_file(&self, path: &std::path::Path) -> Result<()> {
use std::fs::File;
use std::io::Write;
// Serialize to bincode
let encoded = bincode::serialize(self)
.map_err(|e| Error::Storage(format!("Serialization failed: {}", e)))?;
// Create parent directory if needed
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)
.map_err(|e| Error::Storage(format!("Failed to create directory: {}", e)))?;
}
// Write to file atomically (write to temp, then rename)
let temp_path = path.with_extension("tmp");
let mut file = File::create(&temp_path)
.map_err(|e| Error::Storage(format!("Failed to create file: {}", e)))?;
file.write_all(&encoded)
.map_err(|e| Error::Storage(format!("Failed to write file: {}", e)))?;
file.sync_all()
.map_err(|e| Error::Storage(format!("Failed to sync file: {}", e)))?;
std::fs::rename(&temp_path, path)
.map_err(|e| Error::Storage(format!("Failed to rename file: {}", e)))?;
Ok(())
}
/// Load storage state from disk using bincode deserialization
pub fn load_from_file(path: &std::path::Path) -> Result<Self> {
use std::fs::File;
use std::io::Read;
// Check if file exists
if !path.exists() {
return Ok(Self {
series: HashMap::new(),
label_index: HashMap::new(),
});
}
// Read file
let mut file = File::open(path)
.map_err(|e| Error::Storage(format!("Failed to open file: {}", e)))?;
let mut buffer = Vec::new();
file.read_to_end(&mut buffer)
.map_err(|e| Error::Storage(format!("Failed to read file: {}", e)))?;
// Deserialize from bincode
let storage = bincode::deserialize(&buffer)
.map_err(|e| Error::Storage(format!("Deserialization failed: {}", e)))?;
Ok(storage)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_query_service_creation() {
let service = QueryService::new();
// Verify service can be created
assert!(service.storage.read().await.series.is_empty());
}
#[test]
fn test_simple_selector_parsing() {
// Test that we can parse a simple PromQL query
let query = "http_requests_total";
let result = promql_parser::parser::parse(query);
assert!(result.is_ok());
}
#[test]
fn test_label_selector_parsing() {
let query = "http_requests_total{method=\"GET\"}";
let result = promql_parser::parser::parse(query);
assert!(result.is_ok());
}
#[test]
fn test_aggregation_parsing() {
let query = "sum(http_requests_total)";
let result = promql_parser::parser::parse(query);
assert!(result.is_ok());
}
#[test]
fn test_rate_function_parsing() {
let query = "rate(http_requests_total[5m])";
let result = promql_parser::parser::parse(query);
assert!(result.is_ok());
}
#[tokio::test]
async fn test_instant_query_empty_storage() {
let service = QueryService::new();
let result = service.execute_instant_query("up", 1000).await;
assert!(result.is_ok());
let query_result = result.unwrap();
assert_eq!(query_result.result_type, "vector");
}
#[tokio::test]
async fn test_range_query_empty_storage() {
let service = QueryService::new();
let result = service
.execute_range_query("up", 1000, 2000, 100)
.await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_storage_upsert() {
let service = QueryService::new();
let mut storage = service.storage.write().await;
let series = TimeSeries {
id: SeriesId(1),
labels: vec![
Label::new("__name__", "test_metric"),
Label::new("job", "test"),
],
samples: vec![Sample::new(1000, 42.0)],
};
storage.upsert_series(series);
assert_eq!(storage.series.len(), 1);
}
#[tokio::test]
async fn test_label_values() {
let service = QueryService::new();
let mut storage = service.storage.write().await;
let series = TimeSeries {
id: SeriesId(1),
labels: vec![
Label::new("__name__", "test_metric"),
Label::new("job", "test_job"),
],
samples: vec![],
};
storage.upsert_series(series);
let values = storage.label_values("job");
assert_eq!(values.len(), 1);
assert!(values.contains(&"test_job".to_string()));
}
#[test]
fn test_persistence_save_load_empty() {
use tempfile::tempdir;
// Create temporary directory
let dir = tempdir().unwrap();
let path = dir.path().join("test.db");
// Create empty storage and save
let storage = QueryableStorage {
series: HashMap::new(),
label_index: HashMap::new(),
};
storage.save_to_file(&path).unwrap();
assert!(path.exists());
// Load back
let loaded = QueryableStorage::load_from_file(&path).unwrap();
assert_eq!(loaded.series.len(), 0);
assert_eq!(loaded.label_index.len(), 0);
}
#[test]
fn test_persistence_save_load_with_data() {
use tempfile::tempdir;
let dir = tempdir().unwrap();
let path = dir.path().join("test.db");
// Create storage with data
let mut storage = QueryableStorage {
series: HashMap::new(),
label_index: HashMap::new(),
};
let series1 = TimeSeries {
id: SeriesId(1),
labels: vec![
Label::new("__name__", "metric1"),
Label::new("job", "test"),
],
samples: vec![Sample::new(1000, 10.0), Sample::new(2000, 20.0)],
};
let series2 = TimeSeries {
id: SeriesId(2),
labels: vec![
Label::new("__name__", "metric2"),
Label::new("job", "prod"),
],
samples: vec![Sample::new(1000, 30.0)],
};
storage.upsert_series(series1.clone());
storage.upsert_series(series2.clone());
// Save to disk
storage.save_to_file(&path).unwrap();
assert!(path.exists());
// Load back
let loaded = QueryableStorage::load_from_file(&path).unwrap();
assert_eq!(loaded.series.len(), 2);
assert_eq!(loaded.label_index.len(), 2); // __name__ and job
// Verify series data
let loaded_series1 = loaded.series.get(&SeriesId(1)).unwrap();
assert_eq!(loaded_series1.labels.len(), 2);
assert_eq!(loaded_series1.samples.len(), 2);
assert_eq!(loaded_series1.samples[0].value, 10.0);
// Verify label index
let job_values = loaded.label_values("job");
assert_eq!(job_values.len(), 2);
assert!(job_values.contains(&"test".to_string()));
assert!(job_values.contains(&"prod".to_string()));
}
#[test]
fn test_persistence_load_nonexistent_file() {
use tempfile::tempdir;
let dir = tempdir().unwrap();
let path = dir.path().join("nonexistent.db");
// Loading non-existent file should return empty storage
let loaded = QueryableStorage::load_from_file(&path).unwrap();
assert_eq!(loaded.series.len(), 0);
assert_eq!(loaded.label_index.len(), 0);
}
#[tokio::test]
async fn test_query_service_persistence() {
use tempfile::tempdir;
let dir = tempdir().unwrap();
let path = dir.path().join("service_test.db");
// Create service and add some data
let service = QueryService::new();
{
let mut storage = service.storage.write().await;
let series = TimeSeries {
id: SeriesId(42),
labels: vec![Label::new("__name__", "test_metric")],
samples: vec![Sample::new(1000, 99.5)],
};
storage.upsert_series(series);
}
// Save to disk
service.save_to_disk(&path).await.unwrap();
// Create new service loading from disk
let new_service = QueryService::new_with_persistence(&path).unwrap();
let storage = new_service.storage.read().await;
// Verify data was persisted
assert_eq!(storage.series.len(), 1);
let loaded_series = storage.series.get(&SeriesId(42)).unwrap();
assert_eq!(loaded_series.samples[0].value, 99.5);
}
}