From 9b26deee9ba2db0b731c27b47b3836e3b922bb13 Mon Sep 17 00:00:00 2001
From: centra
Date: Wed, 1 Apr 2026 15:46:11 +0900
Subject: [PATCH] nightlight: implement promql arithmetic and quantiles
---
.../crates/nightlight-server/src/query.rs | 1119 ++++++++++++++---
nix/test-cluster/run-cluster.sh | 163 ++-
2 files changed, 1086 insertions(+), 196 deletions(-)
diff --git a/nightlight/crates/nightlight-server/src/query.rs b/nightlight/crates/nightlight-server/src/query.rs
index 2d7a353..a7a3b52 100644
--- a/nightlight/crates/nightlight-server/src/query.rs
+++ b/nightlight/crates/nightlight-server/src/query.rs
@@ -10,23 +10,24 @@ use axum::{
routing::get,
Router,
};
-use parking_lot::Mutex;
use nightlight_types::{Error, Label, Result, Sample, SeriesId, TimeSeries};
+use parking_lot::Mutex;
use promql_parser::{
- label::Matchers,
+ label::{MatchOp, Matchers},
parser::{
- AggregateExpr, BinaryExpr, Call, Expr, MatrixSelector, VectorSelector,
+ AggregateExpr, BinModifier, BinaryExpr, Call, Expr, LabelModifier, MatrixSelector,
+ NumberLiteral, UnaryExpr, VectorMatchCardinality, VectorSelector,
},
};
use serde::{Deserialize, Serialize};
-use std::collections::{HashMap, VecDeque};
+use std::collections::{BTreeMap, HashMap, VecDeque};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::RwLock;
-use tracing::{debug, error};
#[cfg(test)]
use tracing::info;
+use tracing::{debug, error};
const QUERY_DURATION_HISTORY_LIMIT: usize = 512;
@@ -65,6 +66,12 @@ pub struct QueryMetricsSnapshot {
pub query_duration_p99: f64,
}
+#[derive(Debug, Clone)]
+enum EvalValue {
+ Vector(Vec),
+ Scalar(f64),
+}
+
impl QueryService {
pub fn new() -> Self {
Self {
@@ -88,7 +95,10 @@ impl QueryService {
#[cfg(test)]
pub fn new_with_persistence(data_path: &std::path::Path) -> Result {
let storage = QueryableStorage::load_from_file(data_path)?;
- info!("Loaded {} series from persistent storage", storage.series.len());
+ info!(
+ "Loaded {} series from persistent storage",
+ storage.series.len()
+ );
Ok(Self {
storage: Arc::new(RwLock::new(storage)),
@@ -101,7 +111,10 @@ impl QueryService {
pub async fn save_to_disk(&self, data_path: &std::path::Path) -> Result<()> {
let storage = self.storage.read().await;
storage.save_to_file(data_path)?;
- debug!("Saved {} series to persistent storage", storage.series.len());
+ debug!(
+ "Saved {} series to persistent storage",
+ storage.series.len()
+ );
Ok(())
}
@@ -137,26 +150,12 @@ impl QueryService {
// Execute the expression
let storage = self.storage.read().await;
- let result = self.evaluate_expr(&expr, time, time, 0, &storage).await;
+ let result = self.evaluate_value(&expr, time, time, 0, &storage).await;
let success = result.is_ok();
self.metrics.finish_query(started, success);
let result = result?;
- Ok(QueryResult {
- result_type: "vector".to_string(),
- result: result
- .into_iter()
- .map(|ts| {
- let metric = ts
- .labels
- .into_iter()
- .map(|l| (l.name, l.value))
- .collect();
- let value = ts.samples.last().map(|s| (s.timestamp, s.value));
- InstantQueryResult { metric, value }
- })
- .collect(),
- })
+ Ok(instant_query_from_eval_value(result, time))
}
/// Execute a range query over a time range with step
@@ -204,7 +203,7 @@ impl QueryService {
let mut current_time = start;
while current_time <= end {
let step_result = self
- .evaluate_expr(&expr, current_time, end, step, &storage)
+ .evaluate_value(&expr, current_time, end, step, &storage)
.await;
let step_result = match step_result {
Ok(step_result) => step_result,
@@ -214,33 +213,7 @@ impl QueryService {
}
};
- for ts in step_result {
- // Create a unique key for this series based on labels
- let series_key = ts
- .labels
- .iter()
- .map(|l| format!("{}={}", l.name, l.value))
- .collect::>()
- .join(",");
-
- // Find or create result entry for this series
- let entry = results.entry(series_key).or_insert_with(|| {
- let metric = ts
- .labels
- .iter()
- .map(|l| (l.name.clone(), l.value.clone()))
- .collect();
- RangeResult {
- metric,
- values: Vec::new(),
- }
- });
-
- // Append samples to existing series
- for sample in ts.samples {
- entry.values.push((sample.timestamp, sample.value));
- }
- }
+ append_range_step_result(&mut results, step_result, current_time);
current_time += step;
}
@@ -254,28 +227,31 @@ impl QueryService {
}
/// Evaluate a PromQL expression (recursive with boxing for async)
- fn evaluate_expr<'a>(
+ fn evaluate_value<'a>(
&'a self,
expr: &'a Expr,
time: i64,
end_time: i64,
step: i64,
storage: &'a QueryableStorage,
- ) -> std::pin::Pin>> + Send + 'a>> {
+ ) -> std::pin::Pin> + Send + 'a>> {
Box::pin(async move {
match expr {
Expr::VectorSelector(selector) => {
// Vector selector: metric_name{label="value"}
self.evaluate_vector_selector(selector, time, storage)
+ .map(EvalValue::Vector)
}
Expr::MatrixSelector(selector) => {
// Range selector: metric_name[5m]
self.evaluate_matrix_selector(selector, time, storage)
+ .map(EvalValue::Vector)
}
Expr::Aggregate(agg) => {
// Aggregation: sum(metric), avg(metric), etc.
self.evaluate_aggregation(agg, time, end_time, step, storage)
.await
+ .map(EvalValue::Vector)
}
Expr::Call(call) => {
// Function call: rate(metric[5m]), etc.
@@ -287,6 +263,15 @@ impl QueryService {
self.evaluate_binary(binary, time, end_time, step, storage)
.await
}
+ Expr::NumberLiteral(NumberLiteral { val }) => Ok(EvalValue::Scalar(*val)),
+ Expr::Paren(paren) => {
+ self.evaluate_value(&paren.expr, time, end_time, step, storage)
+ .await
+ }
+ Expr::Unary(unary) => {
+ self.evaluate_unary(unary, time, end_time, step, storage)
+ .await
+ }
_ => Err(Error::Query(format!(
"Unsupported expression type: {:?}",
expr
@@ -306,7 +291,7 @@ impl QueryService {
let matching_series: Vec = storage
.series
.values()
- .filter(|ts| self.matches_selectors(ts, &selector.matchers))
+ .filter(|ts| self.matches_selector(ts, selector))
.cloned()
.map(|mut ts| {
// Filter to get sample closest to query time
@@ -338,7 +323,7 @@ impl QueryService {
let mut series = storage
.series
.values()
- .filter(|ts| self.matches_selectors(ts, &selector.vs.matchers))
+ .filter(|ts| self.matches_selector(ts, &selector.vs))
.cloned()
.collect::>();
@@ -363,62 +348,47 @@ impl QueryService {
step: i64,
storage: &QueryableStorage,
) -> Result> {
- // Evaluate the input expression
- let input_series = self
- .evaluate_expr(&agg.expr, time, end_time, step, storage)
- .await?;
+ let input_series = expect_vector(
+ self.evaluate_value(&agg.expr, time, end_time, step, storage)
+ .await?,
+ "aggregation input must be a vector",
+ )?;
if input_series.is_empty() {
return Ok(vec![]);
}
- // Perform aggregation
- let aggregated_value = match agg.op.to_string().to_lowercase().as_str() {
- "sum" => input_series
- .iter()
- .flat_map(|ts| ts.samples.iter().map(|s| s.value))
- .sum(),
- "avg" => {
- let sum: f64 = input_series
- .iter()
- .flat_map(|ts| ts.samples.iter().map(|s| s.value))
- .sum();
- let count = input_series
- .iter()
- .map(|ts| ts.samples.len())
- .sum::() as f64;
- if count > 0.0 {
- sum / count
- } else {
- 0.0
- }
- }
- "min" => input_series
- .iter()
- .flat_map(|ts| ts.samples.iter().map(|s| s.value))
- .fold(f64::INFINITY, f64::min),
- "max" => input_series
- .iter()
- .flat_map(|ts| ts.samples.iter().map(|s| s.value))
- .fold(f64::NEG_INFINITY, f64::max),
- "count" => input_series.len() as f64,
- op => {
- return Err(Error::Query(format!("Unsupported aggregation: {}", op)));
- }
- };
+ let mut groups: BTreeMap, Vec)> = BTreeMap::new();
+ for series in input_series {
+ let Some(sample) = series.samples.last() else {
+ continue;
+ };
- // Create result time series
- Ok(vec![TimeSeries {
- id: SeriesId(0), // Aggregated result has no series ID
- labels: vec![Label {
- name: "__name__".to_string(),
- value: format!("{}()", agg.op),
- }],
- samples: vec![Sample {
- timestamp: time,
- value: aggregated_value,
- }],
- }])
+ let labels = aggregation_output_labels(&series.labels, agg.modifier.as_ref());
+ let key = labels_key(&labels);
+ groups
+ .entry(key)
+ .or_insert_with(|| (labels, Vec::new()))
+ .1
+ .push(sample.value);
+ }
+
+ let op = agg.op.to_string().to_lowercase();
+ let mut results = Vec::new();
+ for (labels, values) in groups.into_values() {
+ if values.is_empty() {
+ continue;
+ }
+
+ let aggregated_value = aggregate_values(&op, &values)?;
+ results.push(TimeSeries {
+ id: SeriesId(results.len() as u64),
+ labels,
+ samples: vec![Sample::new(time, aggregated_value)],
+ });
+ }
+
+ Ok(results)
}
/// Evaluate function call (rate, irate, increase, histogram_quantile)
@@ -429,23 +399,26 @@ impl QueryService {
end_time: i64,
step: i64,
storage: &QueryableStorage,
- ) -> Result> {
+ ) -> Result {
let func_name = &call.func.name;
match func_name.to_string().as_str() {
- "rate" => self.evaluate_rate(call, time, end_time, step, storage).await,
- "irate" => {
- self.evaluate_irate(call, time, end_time, step, storage)
- .await
- }
- "increase" => {
- self.evaluate_increase(call, time, end_time, step, storage)
- .await
- }
- "histogram_quantile" => {
- self.evaluate_histogram_quantile(call, time, end_time, step, storage)
- .await
- }
+ "rate" => self
+ .evaluate_rate(call, time, end_time, step, storage)
+ .await
+ .map(EvalValue::Vector),
+ "irate" => self
+ .evaluate_irate(call, time, end_time, step, storage)
+ .await
+ .map(EvalValue::Vector),
+ "increase" => self
+ .evaluate_increase(call, time, end_time, step, storage)
+ .await
+ .map(EvalValue::Vector),
+ "histogram_quantile" => self
+ .evaluate_histogram_quantile(call, time, end_time, step, storage)
+ .await
+ .map(EvalValue::Vector),
_ => Err(Error::Query(format!("Unsupported function: {}", func_name))),
}
}
@@ -461,14 +434,17 @@ impl QueryService {
) -> Result> {
// rate() expects a range vector (MatrixSelector) as argument
if call.args.args.is_empty() {
- return Err(Error::Query("rate() requires a range vector argument".into()));
+ return Err(Error::Query(
+ "rate() requires a range vector argument".into(),
+ ));
}
- // Get the first argument - should be a MatrixSelector
let arg = &call.args.args[0];
-
- // Evaluate the argument to get range data
- let series_list = Box::pin(self.evaluate_expr(arg, time, end_time, step, storage)).await?;
+ let series_list = expect_vector(
+ self.evaluate_value(arg, time, end_time, step, storage)
+ .await?,
+ "rate() requires a range vector argument",
+ )?;
// Apply rate calculation to each series
let mut result = Vec::new();
@@ -517,14 +493,17 @@ impl QueryService {
) -> Result> {
// irate() expects a range vector (MatrixSelector) as argument
if call.args.args.is_empty() {
- return Err(Error::Query("irate() requires a range vector argument".into()));
+ return Err(Error::Query(
+ "irate() requires a range vector argument".into(),
+ ));
}
- // Get the first argument - should be a MatrixSelector
let arg = &call.args.args[0];
-
- // Evaluate the argument to get range data
- let series_list = Box::pin(self.evaluate_expr(arg, time, end_time, step, storage)).await?;
+ let series_list = expect_vector(
+ self.evaluate_value(arg, time, end_time, step, storage)
+ .await?,
+ "irate() requires a range vector argument",
+ )?;
// Apply irate calculation to each series
let mut result = Vec::new();
@@ -572,14 +551,17 @@ impl QueryService {
) -> Result> {
// increase() expects a range vector (MatrixSelector) as argument
if call.args.args.is_empty() {
- return Err(Error::Query("increase() requires a range vector argument".into()));
+ return Err(Error::Query(
+ "increase() requires a range vector argument".into(),
+ ));
}
- // Get the first argument - should be a MatrixSelector
let arg = &call.args.args[0];
-
- // Evaluate the argument to get range data
- let series_list = Box::pin(self.evaluate_expr(arg, time, end_time, step, storage)).await?;
+ let series_list = expect_vector(
+ self.evaluate_value(arg, time, end_time, step, storage)
+ .await?,
+ "increase() requires a range vector argument",
+ )?;
// Apply increase calculation to each series
let mut result = Vec::new();
@@ -610,61 +592,179 @@ impl QueryService {
Ok(result)
}
- /// Evaluate histogram_quantile() function (simplified - stub for now)
+ /// Evaluate histogram_quantile() from Prometheus-style cumulative buckets.
async fn evaluate_histogram_quantile(
&self,
- _call: &Call,
- _time: i64,
- _end_time: i64,
- _step: i64,
- _storage: &QueryableStorage,
+ call: &Call,
+ time: i64,
+ end_time: i64,
+ step: i64,
+ storage: &QueryableStorage,
) -> Result> {
- // Simplified implementation - full histogram quantile is complex
- // This would require proper histogram bucket handling
- Err(Error::Query(
- "histogram_quantile() not yet implemented".into(),
- ))
+ if call.args.args.len() != 2 {
+ return Err(Error::Query(
+ "histogram_quantile() requires a scalar quantile and a bucket vector".into(),
+ ));
+ }
+
+ let quantile = expect_scalar(
+ self.evaluate_value(&call.args.args[0], time, end_time, step, storage)
+ .await?,
+ "histogram_quantile() requires a scalar quantile",
+ )?;
+ if !(0.0..=1.0).contains(&quantile) {
+ return Err(Error::Query(format!(
+ "histogram_quantile() quantile must be within [0, 1], got {quantile}"
+ )));
+ }
+
+ let buckets = expect_vector(
+ self.evaluate_value(&call.args.args[1], time, end_time, step, storage)
+ .await?,
+ "histogram_quantile() requires a bucket vector",
+ )?;
+
+ let mut grouped: BTreeMap, Vec<(f64, f64)>)> = BTreeMap::new();
+ for series in buckets {
+ let Some(sample) = series.samples.last() else {
+ continue;
+ };
+ let Some(le) = series.get_label("le") else {
+ continue;
+ };
+ let upper_bound = parse_histogram_bound(le)?;
+ let labels = histogram_output_labels(&series.labels);
+ let key = labels_key(&labels);
+ grouped
+ .entry(key)
+ .or_insert_with(|| (labels, Vec::new()))
+ .1
+ .push((upper_bound, sample.value));
+ }
+
+ let mut results = Vec::new();
+ for (labels, mut buckets) in grouped.into_values() {
+ buckets.sort_by(|(lhs, _), (rhs, _)| lhs.total_cmp(rhs));
+ let value = histogram_quantile_value(quantile, &buckets)?;
+ results.push(TimeSeries {
+ id: SeriesId(results.len() as u64),
+ labels,
+ samples: vec![Sample::new(time, value)],
+ });
+ }
+
+ Ok(results)
}
- /// Evaluate binary operation (simplified - stub for now)
+ /// Evaluate binary operations for scalar/vector arithmetic and one-to-one vector matching.
async fn evaluate_binary(
&self,
- _binary: &BinaryExpr,
- _time: i64,
- _end_time: i64,
- _step: i64,
- _storage: &QueryableStorage,
- ) -> Result> {
- // Simplified binary operation evaluation
- // Full implementation would require proper vector matching
- Err(Error::Query(
- "Binary operations not yet fully implemented".into(),
- ))
+ binary: &BinaryExpr,
+ time: i64,
+ end_time: i64,
+ step: i64,
+ storage: &QueryableStorage,
+ ) -> Result {
+ if let Some(modifier) = &binary.modifier {
+ if !matches!(modifier.card, VectorMatchCardinality::OneToOne) {
+ return Err(Error::Query(
+ "many-to-one and many-to-many vector matching are not supported yet".into(),
+ ));
+ }
+ }
+
+ let lhs = self
+ .evaluate_value(&binary.lhs, time, end_time, step, storage)
+ .await?;
+ let rhs = self
+ .evaluate_value(&binary.rhs, time, end_time, step, storage)
+ .await?;
+
+ match (lhs, rhs) {
+ (EvalValue::Scalar(lhs), EvalValue::Scalar(rhs)) => {
+ evaluate_scalar_binary(binary, lhs, rhs).map(EvalValue::Scalar)
+ }
+ (EvalValue::Vector(lhs), EvalValue::Scalar(rhs)) => {
+ evaluate_vector_scalar_binary(binary, lhs, rhs, true).map(EvalValue::Vector)
+ }
+ (EvalValue::Scalar(lhs), EvalValue::Vector(rhs)) => {
+ evaluate_vector_scalar_binary(binary, rhs, lhs, false).map(EvalValue::Vector)
+ }
+ (EvalValue::Vector(lhs), EvalValue::Vector(rhs)) => {
+ evaluate_vector_vector_binary(binary, lhs, rhs).map(EvalValue::Vector)
+ }
+ }
}
- /// Check if a time series matches label matchers
- fn matches_selectors(&self, ts: &TimeSeries, matchers: &Matchers) -> bool {
- for matcher in &matchers.matchers {
- let label_name = &matcher.name;
- let label_value = ts
- .labels
- .iter()
- .find(|l| &l.name == label_name)
- .map(|l| l.value.as_str());
+ async fn evaluate_unary(
+ &self,
+ unary: &UnaryExpr,
+ time: i64,
+ end_time: i64,
+ step: i64,
+ storage: &QueryableStorage,
+ ) -> Result {
+ match self
+ .evaluate_value(&unary.expr, time, end_time, step, storage)
+ .await?
+ {
+ EvalValue::Scalar(value) => Ok(EvalValue::Scalar(-value)),
+ EvalValue::Vector(mut series) => {
+ for entry in &mut series {
+ for sample in &mut entry.samples {
+ sample.value = -sample.value;
+ }
+ }
+ Ok(EvalValue::Vector(series))
+ }
+ }
+ }
- // For now, simple exact match (can be enhanced)
- let matches = if let Some(value) = label_value {
- matcher.value == value
- } else {
- false
- };
-
- if !matches {
+ fn matches_selector(&self, ts: &TimeSeries, selector: &VectorSelector) -> bool {
+ if let Some(name) = selector.name.as_deref() {
+ if ts.name() != Some(name) {
return false;
}
}
- true
+ self.matches_matchers(ts, &selector.matchers)
+ }
+
+ fn matches_matchers(&self, ts: &TimeSeries, matchers: &Matchers) -> bool {
+ if !matchers
+ .matchers
+ .iter()
+ .all(|matcher| self.matcher_matches(ts, matcher))
+ {
+ return false;
+ }
+
+ if matchers.or_matchers.is_empty() {
+ return true;
+ }
+
+ matchers.or_matchers.iter().any(|group| {
+ group
+ .iter()
+ .all(|matcher| self.matcher_matches(ts, matcher))
+ })
+ }
+
+ fn matcher_matches(&self, ts: &TimeSeries, matcher: &promql_parser::label::Matcher) -> bool {
+ let label_value = ts.get_label(&matcher.name);
+
+ match &matcher.op {
+ MatchOp::Equal => label_value == Some(matcher.value.as_str()),
+ MatchOp::NotEqual => match label_value {
+ Some(value) => value != matcher.value,
+ None => true,
+ },
+ MatchOp::Re(regex) => label_value.is_some_and(|value| regex.is_match(value)),
+ MatchOp::NotRe(regex) => match label_value {
+ Some(value) => !regex.is_match(value),
+ None => true,
+ },
+ }
}
pub async fn series_metadata(
@@ -842,6 +942,385 @@ impl QueryableStorage {
}
}
+fn expect_vector(value: EvalValue, message: &str) -> Result> {
+ match value {
+ EvalValue::Vector(series) => Ok(series),
+ EvalValue::Scalar(_) => Err(Error::Query(message.to_string())),
+ }
+}
+
+fn expect_scalar(value: EvalValue, message: &str) -> Result {
+ match value {
+ EvalValue::Scalar(value) => Ok(value),
+ EvalValue::Vector(_) => Err(Error::Query(message.to_string())),
+ }
+}
+
+fn instant_query_from_eval_value(value: EvalValue, time: i64) -> QueryResult {
+ match value {
+ EvalValue::Scalar(value) => QueryResult {
+ result_type: "scalar".to_string(),
+ result: vec![InstantQueryResult {
+ metric: HashMap::new(),
+ value: Some((time, value)),
+ }],
+ },
+ EvalValue::Vector(series) => QueryResult {
+ result_type: "vector".to_string(),
+ result: series
+ .into_iter()
+ .map(|ts| InstantQueryResult {
+ metric: labels_to_map(&ts.labels),
+ value: ts
+ .samples
+ .last()
+ .map(|sample| (sample.timestamp, sample.value)),
+ })
+ .collect(),
+ },
+ }
+}
+
+fn append_range_step_result(
+ results: &mut HashMap,
+ step_result: EvalValue,
+ time: i64,
+) {
+ match step_result {
+ EvalValue::Scalar(value) => {
+ let entry = results
+ .entry("__scalar__".to_string())
+ .or_insert_with(|| RangeResult {
+ metric: HashMap::new(),
+ values: Vec::new(),
+ });
+ entry.values.push((time, value));
+ }
+ EvalValue::Vector(series) => {
+ for ts in series {
+ let key = labels_key(&ts.labels);
+ let metric = labels_to_map(&ts.labels);
+ let entry = results.entry(key).or_insert_with(|| RangeResult {
+ metric,
+ values: Vec::new(),
+ });
+ for sample in ts.samples {
+ entry.values.push((sample.timestamp, sample.value));
+ }
+ }
+ }
+ }
+}
+
+fn labels_to_map(labels: &[Label]) -> HashMap {
+ labels
+ .iter()
+ .map(|label| (label.name.clone(), label.value.clone()))
+ .collect()
+}
+
+fn normalize_labels(mut labels: Vec