diff --git a/nightlight/crates/nightlight-server/src/query.rs b/nightlight/crates/nightlight-server/src/query.rs index 2d7a353..a7a3b52 100644 --- a/nightlight/crates/nightlight-server/src/query.rs +++ b/nightlight/crates/nightlight-server/src/query.rs @@ -10,23 +10,24 @@ use axum::{ routing::get, Router, }; -use parking_lot::Mutex; use nightlight_types::{Error, Label, Result, Sample, SeriesId, TimeSeries}; +use parking_lot::Mutex; use promql_parser::{ - label::Matchers, + label::{MatchOp, Matchers}, parser::{ - AggregateExpr, BinaryExpr, Call, Expr, MatrixSelector, VectorSelector, + AggregateExpr, BinModifier, BinaryExpr, Call, Expr, LabelModifier, MatrixSelector, + NumberLiteral, UnaryExpr, VectorMatchCardinality, VectorSelector, }, }; use serde::{Deserialize, Serialize}; -use std::collections::{HashMap, VecDeque}; +use std::collections::{BTreeMap, HashMap, VecDeque}; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::Instant; use tokio::sync::RwLock; -use tracing::{debug, error}; #[cfg(test)] use tracing::info; +use tracing::{debug, error}; const QUERY_DURATION_HISTORY_LIMIT: usize = 512; @@ -65,6 +66,12 @@ pub struct QueryMetricsSnapshot { pub query_duration_p99: f64, } +#[derive(Debug, Clone)] +enum EvalValue { + Vector(Vec), + Scalar(f64), +} + impl QueryService { pub fn new() -> Self { Self { @@ -88,7 +95,10 @@ impl QueryService { #[cfg(test)] pub fn new_with_persistence(data_path: &std::path::Path) -> Result { let storage = QueryableStorage::load_from_file(data_path)?; - info!("Loaded {} series from persistent storage", storage.series.len()); + info!( + "Loaded {} series from persistent storage", + storage.series.len() + ); Ok(Self { storage: Arc::new(RwLock::new(storage)), @@ -101,7 +111,10 @@ impl QueryService { pub async fn save_to_disk(&self, data_path: &std::path::Path) -> Result<()> { let storage = self.storage.read().await; storage.save_to_file(data_path)?; - debug!("Saved {} series to persistent storage", storage.series.len()); + debug!( + "Saved {} series to persistent storage", + storage.series.len() + ); Ok(()) } @@ -137,26 +150,12 @@ impl QueryService { // Execute the expression let storage = self.storage.read().await; - let result = self.evaluate_expr(&expr, time, time, 0, &storage).await; + let result = self.evaluate_value(&expr, time, time, 0, &storage).await; let success = result.is_ok(); self.metrics.finish_query(started, success); let result = result?; - Ok(QueryResult { - result_type: "vector".to_string(), - result: result - .into_iter() - .map(|ts| { - let metric = ts - .labels - .into_iter() - .map(|l| (l.name, l.value)) - .collect(); - let value = ts.samples.last().map(|s| (s.timestamp, s.value)); - InstantQueryResult { metric, value } - }) - .collect(), - }) + Ok(instant_query_from_eval_value(result, time)) } /// Execute a range query over a time range with step @@ -204,7 +203,7 @@ impl QueryService { let mut current_time = start; while current_time <= end { let step_result = self - .evaluate_expr(&expr, current_time, end, step, &storage) + .evaluate_value(&expr, current_time, end, step, &storage) .await; let step_result = match step_result { Ok(step_result) => step_result, @@ -214,33 +213,7 @@ impl QueryService { } }; - for ts in step_result { - // Create a unique key for this series based on labels - let series_key = ts - .labels - .iter() - .map(|l| format!("{}={}", l.name, l.value)) - .collect::>() - .join(","); - - // Find or create result entry for this series - let entry = results.entry(series_key).or_insert_with(|| { - let metric = ts - .labels - .iter() - .map(|l| (l.name.clone(), l.value.clone())) - .collect(); - RangeResult { - metric, - values: Vec::new(), - } - }); - - // Append samples to existing series - for sample in ts.samples { - entry.values.push((sample.timestamp, sample.value)); - } - } + append_range_step_result(&mut results, step_result, current_time); current_time += step; } @@ -254,28 +227,31 @@ impl QueryService { } /// Evaluate a PromQL expression (recursive with boxing for async) - fn evaluate_expr<'a>( + fn evaluate_value<'a>( &'a self, expr: &'a Expr, time: i64, end_time: i64, step: i64, storage: &'a QueryableStorage, - ) -> std::pin::Pin>> + Send + 'a>> { + ) -> std::pin::Pin> + Send + 'a>> { Box::pin(async move { match expr { Expr::VectorSelector(selector) => { // Vector selector: metric_name{label="value"} self.evaluate_vector_selector(selector, time, storage) + .map(EvalValue::Vector) } Expr::MatrixSelector(selector) => { // Range selector: metric_name[5m] self.evaluate_matrix_selector(selector, time, storage) + .map(EvalValue::Vector) } Expr::Aggregate(agg) => { // Aggregation: sum(metric), avg(metric), etc. self.evaluate_aggregation(agg, time, end_time, step, storage) .await + .map(EvalValue::Vector) } Expr::Call(call) => { // Function call: rate(metric[5m]), etc. @@ -287,6 +263,15 @@ impl QueryService { self.evaluate_binary(binary, time, end_time, step, storage) .await } + Expr::NumberLiteral(NumberLiteral { val }) => Ok(EvalValue::Scalar(*val)), + Expr::Paren(paren) => { + self.evaluate_value(&paren.expr, time, end_time, step, storage) + .await + } + Expr::Unary(unary) => { + self.evaluate_unary(unary, time, end_time, step, storage) + .await + } _ => Err(Error::Query(format!( "Unsupported expression type: {:?}", expr @@ -306,7 +291,7 @@ impl QueryService { let matching_series: Vec = storage .series .values() - .filter(|ts| self.matches_selectors(ts, &selector.matchers)) + .filter(|ts| self.matches_selector(ts, selector)) .cloned() .map(|mut ts| { // Filter to get sample closest to query time @@ -338,7 +323,7 @@ impl QueryService { let mut series = storage .series .values() - .filter(|ts| self.matches_selectors(ts, &selector.vs.matchers)) + .filter(|ts| self.matches_selector(ts, &selector.vs)) .cloned() .collect::>(); @@ -363,62 +348,47 @@ impl QueryService { step: i64, storage: &QueryableStorage, ) -> Result> { - // Evaluate the input expression - let input_series = self - .evaluate_expr(&agg.expr, time, end_time, step, storage) - .await?; + let input_series = expect_vector( + self.evaluate_value(&agg.expr, time, end_time, step, storage) + .await?, + "aggregation input must be a vector", + )?; if input_series.is_empty() { return Ok(vec![]); } - // Perform aggregation - let aggregated_value = match agg.op.to_string().to_lowercase().as_str() { - "sum" => input_series - .iter() - .flat_map(|ts| ts.samples.iter().map(|s| s.value)) - .sum(), - "avg" => { - let sum: f64 = input_series - .iter() - .flat_map(|ts| ts.samples.iter().map(|s| s.value)) - .sum(); - let count = input_series - .iter() - .map(|ts| ts.samples.len()) - .sum::() as f64; - if count > 0.0 { - sum / count - } else { - 0.0 - } - } - "min" => input_series - .iter() - .flat_map(|ts| ts.samples.iter().map(|s| s.value)) - .fold(f64::INFINITY, f64::min), - "max" => input_series - .iter() - .flat_map(|ts| ts.samples.iter().map(|s| s.value)) - .fold(f64::NEG_INFINITY, f64::max), - "count" => input_series.len() as f64, - op => { - return Err(Error::Query(format!("Unsupported aggregation: {}", op))); - } - }; + let mut groups: BTreeMap, Vec)> = BTreeMap::new(); + for series in input_series { + let Some(sample) = series.samples.last() else { + continue; + }; - // Create result time series - Ok(vec![TimeSeries { - id: SeriesId(0), // Aggregated result has no series ID - labels: vec![Label { - name: "__name__".to_string(), - value: format!("{}()", agg.op), - }], - samples: vec![Sample { - timestamp: time, - value: aggregated_value, - }], - }]) + let labels = aggregation_output_labels(&series.labels, agg.modifier.as_ref()); + let key = labels_key(&labels); + groups + .entry(key) + .or_insert_with(|| (labels, Vec::new())) + .1 + .push(sample.value); + } + + let op = agg.op.to_string().to_lowercase(); + let mut results = Vec::new(); + for (labels, values) in groups.into_values() { + if values.is_empty() { + continue; + } + + let aggregated_value = aggregate_values(&op, &values)?; + results.push(TimeSeries { + id: SeriesId(results.len() as u64), + labels, + samples: vec![Sample::new(time, aggregated_value)], + }); + } + + Ok(results) } /// Evaluate function call (rate, irate, increase, histogram_quantile) @@ -429,23 +399,26 @@ impl QueryService { end_time: i64, step: i64, storage: &QueryableStorage, - ) -> Result> { + ) -> Result { let func_name = &call.func.name; match func_name.to_string().as_str() { - "rate" => self.evaluate_rate(call, time, end_time, step, storage).await, - "irate" => { - self.evaluate_irate(call, time, end_time, step, storage) - .await - } - "increase" => { - self.evaluate_increase(call, time, end_time, step, storage) - .await - } - "histogram_quantile" => { - self.evaluate_histogram_quantile(call, time, end_time, step, storage) - .await - } + "rate" => self + .evaluate_rate(call, time, end_time, step, storage) + .await + .map(EvalValue::Vector), + "irate" => self + .evaluate_irate(call, time, end_time, step, storage) + .await + .map(EvalValue::Vector), + "increase" => self + .evaluate_increase(call, time, end_time, step, storage) + .await + .map(EvalValue::Vector), + "histogram_quantile" => self + .evaluate_histogram_quantile(call, time, end_time, step, storage) + .await + .map(EvalValue::Vector), _ => Err(Error::Query(format!("Unsupported function: {}", func_name))), } } @@ -461,14 +434,17 @@ impl QueryService { ) -> Result> { // rate() expects a range vector (MatrixSelector) as argument if call.args.args.is_empty() { - return Err(Error::Query("rate() requires a range vector argument".into())); + return Err(Error::Query( + "rate() requires a range vector argument".into(), + )); } - // Get the first argument - should be a MatrixSelector let arg = &call.args.args[0]; - - // Evaluate the argument to get range data - let series_list = Box::pin(self.evaluate_expr(arg, time, end_time, step, storage)).await?; + let series_list = expect_vector( + self.evaluate_value(arg, time, end_time, step, storage) + .await?, + "rate() requires a range vector argument", + )?; // Apply rate calculation to each series let mut result = Vec::new(); @@ -517,14 +493,17 @@ impl QueryService { ) -> Result> { // irate() expects a range vector (MatrixSelector) as argument if call.args.args.is_empty() { - return Err(Error::Query("irate() requires a range vector argument".into())); + return Err(Error::Query( + "irate() requires a range vector argument".into(), + )); } - // Get the first argument - should be a MatrixSelector let arg = &call.args.args[0]; - - // Evaluate the argument to get range data - let series_list = Box::pin(self.evaluate_expr(arg, time, end_time, step, storage)).await?; + let series_list = expect_vector( + self.evaluate_value(arg, time, end_time, step, storage) + .await?, + "irate() requires a range vector argument", + )?; // Apply irate calculation to each series let mut result = Vec::new(); @@ -572,14 +551,17 @@ impl QueryService { ) -> Result> { // increase() expects a range vector (MatrixSelector) as argument if call.args.args.is_empty() { - return Err(Error::Query("increase() requires a range vector argument".into())); + return Err(Error::Query( + "increase() requires a range vector argument".into(), + )); } - // Get the first argument - should be a MatrixSelector let arg = &call.args.args[0]; - - // Evaluate the argument to get range data - let series_list = Box::pin(self.evaluate_expr(arg, time, end_time, step, storage)).await?; + let series_list = expect_vector( + self.evaluate_value(arg, time, end_time, step, storage) + .await?, + "increase() requires a range vector argument", + )?; // Apply increase calculation to each series let mut result = Vec::new(); @@ -610,61 +592,179 @@ impl QueryService { Ok(result) } - /// Evaluate histogram_quantile() function (simplified - stub for now) + /// Evaluate histogram_quantile() from Prometheus-style cumulative buckets. async fn evaluate_histogram_quantile( &self, - _call: &Call, - _time: i64, - _end_time: i64, - _step: i64, - _storage: &QueryableStorage, + call: &Call, + time: i64, + end_time: i64, + step: i64, + storage: &QueryableStorage, ) -> Result> { - // Simplified implementation - full histogram quantile is complex - // This would require proper histogram bucket handling - Err(Error::Query( - "histogram_quantile() not yet implemented".into(), - )) + if call.args.args.len() != 2 { + return Err(Error::Query( + "histogram_quantile() requires a scalar quantile and a bucket vector".into(), + )); + } + + let quantile = expect_scalar( + self.evaluate_value(&call.args.args[0], time, end_time, step, storage) + .await?, + "histogram_quantile() requires a scalar quantile", + )?; + if !(0.0..=1.0).contains(&quantile) { + return Err(Error::Query(format!( + "histogram_quantile() quantile must be within [0, 1], got {quantile}" + ))); + } + + let buckets = expect_vector( + self.evaluate_value(&call.args.args[1], time, end_time, step, storage) + .await?, + "histogram_quantile() requires a bucket vector", + )?; + + let mut grouped: BTreeMap, Vec<(f64, f64)>)> = BTreeMap::new(); + for series in buckets { + let Some(sample) = series.samples.last() else { + continue; + }; + let Some(le) = series.get_label("le") else { + continue; + }; + let upper_bound = parse_histogram_bound(le)?; + let labels = histogram_output_labels(&series.labels); + let key = labels_key(&labels); + grouped + .entry(key) + .or_insert_with(|| (labels, Vec::new())) + .1 + .push((upper_bound, sample.value)); + } + + let mut results = Vec::new(); + for (labels, mut buckets) in grouped.into_values() { + buckets.sort_by(|(lhs, _), (rhs, _)| lhs.total_cmp(rhs)); + let value = histogram_quantile_value(quantile, &buckets)?; + results.push(TimeSeries { + id: SeriesId(results.len() as u64), + labels, + samples: vec![Sample::new(time, value)], + }); + } + + Ok(results) } - /// Evaluate binary operation (simplified - stub for now) + /// Evaluate binary operations for scalar/vector arithmetic and one-to-one vector matching. async fn evaluate_binary( &self, - _binary: &BinaryExpr, - _time: i64, - _end_time: i64, - _step: i64, - _storage: &QueryableStorage, - ) -> Result> { - // Simplified binary operation evaluation - // Full implementation would require proper vector matching - Err(Error::Query( - "Binary operations not yet fully implemented".into(), - )) + binary: &BinaryExpr, + time: i64, + end_time: i64, + step: i64, + storage: &QueryableStorage, + ) -> Result { + if let Some(modifier) = &binary.modifier { + if !matches!(modifier.card, VectorMatchCardinality::OneToOne) { + return Err(Error::Query( + "many-to-one and many-to-many vector matching are not supported yet".into(), + )); + } + } + + let lhs = self + .evaluate_value(&binary.lhs, time, end_time, step, storage) + .await?; + let rhs = self + .evaluate_value(&binary.rhs, time, end_time, step, storage) + .await?; + + match (lhs, rhs) { + (EvalValue::Scalar(lhs), EvalValue::Scalar(rhs)) => { + evaluate_scalar_binary(binary, lhs, rhs).map(EvalValue::Scalar) + } + (EvalValue::Vector(lhs), EvalValue::Scalar(rhs)) => { + evaluate_vector_scalar_binary(binary, lhs, rhs, true).map(EvalValue::Vector) + } + (EvalValue::Scalar(lhs), EvalValue::Vector(rhs)) => { + evaluate_vector_scalar_binary(binary, rhs, lhs, false).map(EvalValue::Vector) + } + (EvalValue::Vector(lhs), EvalValue::Vector(rhs)) => { + evaluate_vector_vector_binary(binary, lhs, rhs).map(EvalValue::Vector) + } + } } - /// Check if a time series matches label matchers - fn matches_selectors(&self, ts: &TimeSeries, matchers: &Matchers) -> bool { - for matcher in &matchers.matchers { - let label_name = &matcher.name; - let label_value = ts - .labels - .iter() - .find(|l| &l.name == label_name) - .map(|l| l.value.as_str()); + async fn evaluate_unary( + &self, + unary: &UnaryExpr, + time: i64, + end_time: i64, + step: i64, + storage: &QueryableStorage, + ) -> Result { + match self + .evaluate_value(&unary.expr, time, end_time, step, storage) + .await? + { + EvalValue::Scalar(value) => Ok(EvalValue::Scalar(-value)), + EvalValue::Vector(mut series) => { + for entry in &mut series { + for sample in &mut entry.samples { + sample.value = -sample.value; + } + } + Ok(EvalValue::Vector(series)) + } + } + } - // For now, simple exact match (can be enhanced) - let matches = if let Some(value) = label_value { - matcher.value == value - } else { - false - }; - - if !matches { + fn matches_selector(&self, ts: &TimeSeries, selector: &VectorSelector) -> bool { + if let Some(name) = selector.name.as_deref() { + if ts.name() != Some(name) { return false; } } - true + self.matches_matchers(ts, &selector.matchers) + } + + fn matches_matchers(&self, ts: &TimeSeries, matchers: &Matchers) -> bool { + if !matchers + .matchers + .iter() + .all(|matcher| self.matcher_matches(ts, matcher)) + { + return false; + } + + if matchers.or_matchers.is_empty() { + return true; + } + + matchers.or_matchers.iter().any(|group| { + group + .iter() + .all(|matcher| self.matcher_matches(ts, matcher)) + }) + } + + fn matcher_matches(&self, ts: &TimeSeries, matcher: &promql_parser::label::Matcher) -> bool { + let label_value = ts.get_label(&matcher.name); + + match &matcher.op { + MatchOp::Equal => label_value == Some(matcher.value.as_str()), + MatchOp::NotEqual => match label_value { + Some(value) => value != matcher.value, + None => true, + }, + MatchOp::Re(regex) => label_value.is_some_and(|value| regex.is_match(value)), + MatchOp::NotRe(regex) => match label_value { + Some(value) => !regex.is_match(value), + None => true, + }, + } } pub async fn series_metadata( @@ -842,6 +942,385 @@ impl QueryableStorage { } } +fn expect_vector(value: EvalValue, message: &str) -> Result> { + match value { + EvalValue::Vector(series) => Ok(series), + EvalValue::Scalar(_) => Err(Error::Query(message.to_string())), + } +} + +fn expect_scalar(value: EvalValue, message: &str) -> Result { + match value { + EvalValue::Scalar(value) => Ok(value), + EvalValue::Vector(_) => Err(Error::Query(message.to_string())), + } +} + +fn instant_query_from_eval_value(value: EvalValue, time: i64) -> QueryResult { + match value { + EvalValue::Scalar(value) => QueryResult { + result_type: "scalar".to_string(), + result: vec![InstantQueryResult { + metric: HashMap::new(), + value: Some((time, value)), + }], + }, + EvalValue::Vector(series) => QueryResult { + result_type: "vector".to_string(), + result: series + .into_iter() + .map(|ts| InstantQueryResult { + metric: labels_to_map(&ts.labels), + value: ts + .samples + .last() + .map(|sample| (sample.timestamp, sample.value)), + }) + .collect(), + }, + } +} + +fn append_range_step_result( + results: &mut HashMap, + step_result: EvalValue, + time: i64, +) { + match step_result { + EvalValue::Scalar(value) => { + let entry = results + .entry("__scalar__".to_string()) + .or_insert_with(|| RangeResult { + metric: HashMap::new(), + values: Vec::new(), + }); + entry.values.push((time, value)); + } + EvalValue::Vector(series) => { + for ts in series { + let key = labels_key(&ts.labels); + let metric = labels_to_map(&ts.labels); + let entry = results.entry(key).or_insert_with(|| RangeResult { + metric, + values: Vec::new(), + }); + for sample in ts.samples { + entry.values.push((sample.timestamp, sample.value)); + } + } + } + } +} + +fn labels_to_map(labels: &[Label]) -> HashMap { + labels + .iter() + .map(|label| (label.name.clone(), label.value.clone())) + .collect() +} + +fn normalize_labels(mut labels: Vec