//! PromQL query engine //! //! Implements a subset of PromQL for querying time-series data. //! Provides instant and range query execution with basic aggregations. use axum::{ extract::{Path, Query, State}, http::StatusCode, response::{IntoResponse, Json}, routing::get, Router, }; use nightlight_types::{Error, Label, Result, Sample, SeriesId, TimeSeries}; use parking_lot::Mutex; use promql_parser::{ label::{MatchOp, Matchers}, parser::{ AggregateExpr, BinModifier, BinaryExpr, Call, Expr, LabelModifier, MatrixSelector, NumberLiteral, UnaryExpr, VectorMatchCardinality, VectorSelector, }, }; use serde::{Deserialize, Serialize}; use std::collections::{BTreeMap, HashMap, VecDeque}; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::Instant; use tokio::sync::RwLock; #[cfg(test)] use tracing::info; use tracing::{debug, error}; const QUERY_DURATION_HISTORY_LIMIT: usize = 512; /// Query service state #[derive(Clone)] pub struct QueryService { // Reference to queryable storage (shared with ingestion) storage: Arc>, metrics: Arc, } /// In-memory queryable storage (reads from ingestion buffer) #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct QueryableStorage { // Series metadata indexed by SeriesId pub series: HashMap, // Inverted index: label name -> label value -> [SeriesId] pub label_index: HashMap>>, } #[derive(Debug)] pub struct QueryMetrics { queries_total: AtomicU64, queries_failed: AtomicU64, queries_active: AtomicU64, durations_ms: Mutex>, } #[derive(Debug, Clone, Copy, Default)] pub struct QueryMetricsSnapshot { pub queries_total: u64, pub queries_failed: u64, pub queries_active: u64, pub query_duration_p50: f64, pub query_duration_p95: f64, pub query_duration_p99: f64, } #[derive(Debug, Clone)] enum EvalValue { Vector(Vec), Scalar(f64), } impl QueryService { pub fn new() -> Self { Self { storage: Arc::new(RwLock::new(QueryableStorage { series: HashMap::new(), label_index: HashMap::new(), })), metrics: Arc::new(QueryMetrics::new()), } } /// Create QueryService from existing shared storage pub fn from_storage(storage: Arc>) -> Self { Self { storage, metrics: Arc::new(QueryMetrics::new()), } } /// Create QueryService and load persistent state from disk if it exists #[cfg(test)] pub fn new_with_persistence(data_path: &std::path::Path) -> Result { let storage = QueryableStorage::load_from_file(data_path)?; info!( "Loaded {} series from persistent storage", storage.series.len() ); Ok(Self { storage: Arc::new(RwLock::new(storage)), metrics: Arc::new(QueryMetrics::new()), }) } /// Save current storage state to disk #[cfg(test)] pub async fn save_to_disk(&self, data_path: &std::path::Path) -> Result<()> { let storage = self.storage.read().await; storage.save_to_file(data_path)?; debug!( "Saved {} series to persistent storage", storage.series.len() ); Ok(()) } /// Create Axum router for query endpoints pub fn router(self) -> Router { Router::new() .route("/api/v1/query", get(handle_instant_query)) .route("/api/v1/query_range", get(handle_range_query)) .route("/api/v1/label/:label_name/values", get(handle_label_values)) .route("/api/v1/series", get(handle_series)) .with_state(self) } pub fn metrics(&self) -> Arc { Arc::clone(&self.metrics) } /// Execute an instant query at a specific timestamp pub async fn execute_instant_query(&self, query: &str, time: i64) -> Result { debug!("Executing instant query: {} at time {}", query, time); let started = self.metrics.begin_query(); // Parse PromQL expression let expr = promql_parser::parser::parse(query) .map_err(|e| Error::Query(format!("Parse error: {:?}", e))); let expr = match expr { Ok(expr) => expr, Err(error) => { self.metrics.finish_query(started, false); return Err(error); } }; // Execute the expression let storage = self.storage.read().await; let result = self.evaluate_value(&expr, time, time, 0, &storage).await; let success = result.is_ok(); self.metrics.finish_query(started, success); let result = result?; Ok(instant_query_from_eval_value(result, time)) } /// Execute a range query over a time range with step pub async fn execute_range_query( &self, query: &str, start: i64, end: i64, step: i64, ) -> Result { debug!( "Executing range query: {} from {} to {} step {}", query, start, end, step ); let started = self.metrics.begin_query(); if step <= 0 { self.metrics.finish_query(started, false); return Err(Error::InvalidTimeRange( "range query step must be greater than zero".to_string(), )); } if end < start { self.metrics.finish_query(started, false); return Err(Error::InvalidTimeRange( "range query end must be greater than or equal to start".to_string(), )); } // Parse PromQL expression let expr = promql_parser::parser::parse(query) .map_err(|e| Error::Query(format!("Parse error: {:?}", e))); let expr = match expr { Ok(expr) => expr, Err(error) => { self.metrics.finish_query(started, false); return Err(error); } }; let storage = self.storage.read().await; let mut results: HashMap = HashMap::new(); // Evaluate at each step let mut current_time = start; while current_time <= end { let step_result = self .evaluate_value(&expr, current_time, end, step, &storage) .await; let step_result = match step_result { Ok(step_result) => step_result, Err(error) => { self.metrics.finish_query(started, false); return Err(error); } }; append_range_step_result(&mut results, step_result, current_time); current_time += step; } let result = RangeQueryResult { result_type: "matrix".to_string(), result: results.into_values().collect(), }; self.metrics.finish_query(started, true); Ok(result) } /// Evaluate a PromQL expression (recursive with boxing for async) fn evaluate_value<'a>( &'a self, expr: &'a Expr, time: i64, end_time: i64, step: i64, storage: &'a QueryableStorage, ) -> std::pin::Pin> + Send + 'a>> { Box::pin(async move { match expr { Expr::VectorSelector(selector) => { // Vector selector: metric_name{label="value"} self.evaluate_vector_selector(selector, time, storage) .map(EvalValue::Vector) } Expr::MatrixSelector(selector) => { // Range selector: metric_name[5m] self.evaluate_matrix_selector(selector, time, storage) .map(EvalValue::Vector) } Expr::Aggregate(agg) => { // Aggregation: sum(metric), avg(metric), etc. self.evaluate_aggregation(agg, time, end_time, step, storage) .await .map(EvalValue::Vector) } Expr::Call(call) => { // Function call: rate(metric[5m]), etc. self.evaluate_function(call, time, end_time, step, storage) .await } Expr::Binary(binary) => { // Binary operation: metric1 + metric2 self.evaluate_binary(binary, time, end_time, step, storage) .await } Expr::NumberLiteral(NumberLiteral { val }) => Ok(EvalValue::Scalar(*val)), Expr::Paren(paren) => { self.evaluate_value(&paren.expr, time, end_time, step, storage) .await } Expr::Unary(unary) => { self.evaluate_unary(unary, time, end_time, step, storage) .await } _ => Err(Error::Query(format!( "Unsupported expression type: {:?}", expr ))), } }) } /// Evaluate vector selector fn evaluate_vector_selector( &self, selector: &VectorSelector, time: i64, storage: &QueryableStorage, ) -> Result> { // Find all series matching the label matchers let matching_series: Vec = storage .series .values() .filter(|ts| self.matches_selector(ts, selector)) .cloned() .map(|mut ts| { // Filter to get sample closest to query time ts.samples.retain(|s| s.timestamp <= time); // Keep only the most recent sample if let Some(last) = ts.samples.last().cloned() { ts.samples = vec![last]; } ts }) .filter(|ts| !ts.samples.is_empty()) .collect(); Ok(matching_series) } /// Evaluate matrix selector (range selector) fn evaluate_matrix_selector( &self, selector: &MatrixSelector, time: i64, storage: &QueryableStorage, ) -> Result> { // Get the time range in milliseconds let range_ms = selector.range.as_millis() as i64; let start_time = time - range_ms; // Evaluate underlying vector selector let mut series = storage .series .values() .filter(|ts| self.matches_selector(ts, &selector.vs)) .cloned() .collect::>(); // Filter samples to the time range for ts in &mut series { ts.samples .retain(|s| s.timestamp >= start_time && s.timestamp <= time); } // Remove empty series series.retain(|ts| !ts.samples.is_empty()); Ok(series) } /// Evaluate aggregation (sum, avg, min, max, count) async fn evaluate_aggregation( &self, agg: &AggregateExpr, time: i64, end_time: i64, step: i64, storage: &QueryableStorage, ) -> Result> { let input_series = expect_vector( self.evaluate_value(&agg.expr, time, end_time, step, storage) .await?, "aggregation input must be a vector", )?; if input_series.is_empty() { return Ok(vec![]); } let mut groups: BTreeMap, Vec)> = BTreeMap::new(); for series in input_series { let Some(sample) = series.samples.last() else { continue; }; let labels = aggregation_output_labels(&series.labels, agg.modifier.as_ref()); let key = labels_key(&labels); groups .entry(key) .or_insert_with(|| (labels, Vec::new())) .1 .push(sample.value); } let op = agg.op.to_string().to_lowercase(); let mut results = Vec::new(); for (labels, values) in groups.into_values() { if values.is_empty() { continue; } let aggregated_value = aggregate_values(&op, &values)?; results.push(TimeSeries { id: SeriesId(results.len() as u64), labels, samples: vec![Sample::new(time, aggregated_value)], }); } Ok(results) } /// Evaluate function call (rate, irate, increase, histogram_quantile) async fn evaluate_function( &self, call: &Call, time: i64, end_time: i64, step: i64, storage: &QueryableStorage, ) -> Result { let func_name = &call.func.name; match func_name.to_string().as_str() { "rate" => self .evaluate_rate(call, time, end_time, step, storage) .await .map(EvalValue::Vector), "irate" => self .evaluate_irate(call, time, end_time, step, storage) .await .map(EvalValue::Vector), "increase" => self .evaluate_increase(call, time, end_time, step, storage) .await .map(EvalValue::Vector), "histogram_quantile" => self .evaluate_histogram_quantile(call, time, end_time, step, storage) .await .map(EvalValue::Vector), _ => Err(Error::Query(format!("Unsupported function: {}", func_name))), } } /// Evaluate rate() function - calculates per-second average rate of increase async fn evaluate_rate( &self, call: &Call, time: i64, end_time: i64, step: i64, storage: &QueryableStorage, ) -> Result> { // rate() expects a range vector (MatrixSelector) as argument if call.args.args.is_empty() { return Err(Error::Query( "rate() requires a range vector argument".into(), )); } let arg = &call.args.args[0]; let series_list = expect_vector( self.evaluate_value(arg, time, end_time, step, storage) .await?, "rate() requires a range vector argument", )?; // Apply rate calculation to each series let mut result = Vec::new(); for series in series_list { if series.samples.len() < 2 { continue; // Need at least 2 samples for rate calculation } // Get first and last samples let first = &series.samples[0]; let last = &series.samples[series.samples.len() - 1]; // Calculate time range in seconds let duration_seconds = (last.timestamp - first.timestamp) as f64 / 1000.0; if duration_seconds <= 0.0 { continue; } // Calculate rate (per-second average) // For counter metrics, we should handle resets, but simplified here let value_diff = last.value - first.value; let rate = value_diff / duration_seconds; // Create result series with single sample at query time result.push(TimeSeries { id: series.id, labels: series.labels.clone(), samples: vec![Sample { timestamp: time, value: rate.max(0.0), // Rates can't be negative for counters }], }); } Ok(result) } /// Evaluate irate() function - calculates instant rate using last two samples async fn evaluate_irate( &self, call: &Call, time: i64, end_time: i64, step: i64, storage: &QueryableStorage, ) -> Result> { // irate() expects a range vector (MatrixSelector) as argument if call.args.args.is_empty() { return Err(Error::Query( "irate() requires a range vector argument".into(), )); } let arg = &call.args.args[0]; let series_list = expect_vector( self.evaluate_value(arg, time, end_time, step, storage) .await?, "irate() requires a range vector argument", )?; // Apply irate calculation to each series let mut result = Vec::new(); for series in series_list { if series.samples.len() < 2 { continue; // Need at least 2 samples for irate calculation } // Get last two samples let second_last = &series.samples[series.samples.len() - 2]; let last = &series.samples[series.samples.len() - 1]; // Calculate time difference in seconds let duration_seconds = (last.timestamp - second_last.timestamp) as f64 / 1000.0; if duration_seconds <= 0.0 { continue; } // Calculate instant rate let value_diff = last.value - second_last.value; let rate = value_diff / duration_seconds; // Create result series with single sample at query time result.push(TimeSeries { id: series.id, labels: series.labels.clone(), samples: vec![Sample { timestamp: time, value: rate.max(0.0), // Rates can't be negative for counters }], }); } Ok(result) } /// Evaluate increase() function - calculates total increase over time range async fn evaluate_increase( &self, call: &Call, time: i64, end_time: i64, step: i64, storage: &QueryableStorage, ) -> Result> { // increase() expects a range vector (MatrixSelector) as argument if call.args.args.is_empty() { return Err(Error::Query( "increase() requires a range vector argument".into(), )); } let arg = &call.args.args[0]; let series_list = expect_vector( self.evaluate_value(arg, time, end_time, step, storage) .await?, "increase() requires a range vector argument", )?; // Apply increase calculation to each series let mut result = Vec::new(); for series in series_list { if series.samples.len() < 2 { continue; // Need at least 2 samples for increase calculation } // Get first and last samples let first = &series.samples[0]; let last = &series.samples[series.samples.len() - 1]; // Calculate total increase // For counter metrics, we should handle resets, but simplified here let increase = last.value - first.value; // Create result series with single sample at query time result.push(TimeSeries { id: series.id, labels: series.labels.clone(), samples: vec![Sample { timestamp: time, value: increase.max(0.0), // Increase can't be negative for counters }], }); } Ok(result) } /// Evaluate histogram_quantile() from Prometheus-style cumulative buckets. async fn evaluate_histogram_quantile( &self, call: &Call, time: i64, end_time: i64, step: i64, storage: &QueryableStorage, ) -> Result> { if call.args.args.len() != 2 { return Err(Error::Query( "histogram_quantile() requires a scalar quantile and a bucket vector".into(), )); } let quantile = expect_scalar( self.evaluate_value(&call.args.args[0], time, end_time, step, storage) .await?, "histogram_quantile() requires a scalar quantile", )?; if !(0.0..=1.0).contains(&quantile) { return Err(Error::Query(format!( "histogram_quantile() quantile must be within [0, 1], got {quantile}" ))); } let buckets = expect_vector( self.evaluate_value(&call.args.args[1], time, end_time, step, storage) .await?, "histogram_quantile() requires a bucket vector", )?; let mut grouped: BTreeMap, Vec<(f64, f64)>)> = BTreeMap::new(); for series in buckets { let Some(sample) = series.samples.last() else { continue; }; let Some(le) = series.get_label("le") else { continue; }; let upper_bound = parse_histogram_bound(le)?; let labels = histogram_output_labels(&series.labels); let key = labels_key(&labels); grouped .entry(key) .or_insert_with(|| (labels, Vec::new())) .1 .push((upper_bound, sample.value)); } let mut results = Vec::new(); for (labels, mut buckets) in grouped.into_values() { buckets.sort_by(|(lhs, _), (rhs, _)| lhs.total_cmp(rhs)); let value = histogram_quantile_value(quantile, &buckets)?; results.push(TimeSeries { id: SeriesId(results.len() as u64), labels, samples: vec![Sample::new(time, value)], }); } Ok(results) } /// Evaluate binary operations for scalar/vector arithmetic and one-to-one vector matching. async fn evaluate_binary( &self, binary: &BinaryExpr, time: i64, end_time: i64, step: i64, storage: &QueryableStorage, ) -> Result { if let Some(modifier) = &binary.modifier { if !matches!(modifier.card, VectorMatchCardinality::OneToOne) { return Err(Error::Query( "many-to-one and many-to-many vector matching are not supported yet".into(), )); } } let lhs = self .evaluate_value(&binary.lhs, time, end_time, step, storage) .await?; let rhs = self .evaluate_value(&binary.rhs, time, end_time, step, storage) .await?; match (lhs, rhs) { (EvalValue::Scalar(lhs), EvalValue::Scalar(rhs)) => { evaluate_scalar_binary(binary, lhs, rhs).map(EvalValue::Scalar) } (EvalValue::Vector(lhs), EvalValue::Scalar(rhs)) => { evaluate_vector_scalar_binary(binary, lhs, rhs, true).map(EvalValue::Vector) } (EvalValue::Scalar(lhs), EvalValue::Vector(rhs)) => { evaluate_vector_scalar_binary(binary, rhs, lhs, false).map(EvalValue::Vector) } (EvalValue::Vector(lhs), EvalValue::Vector(rhs)) => { evaluate_vector_vector_binary(binary, lhs, rhs).map(EvalValue::Vector) } } } async fn evaluate_unary( &self, unary: &UnaryExpr, time: i64, end_time: i64, step: i64, storage: &QueryableStorage, ) -> Result { match self .evaluate_value(&unary.expr, time, end_time, step, storage) .await? { EvalValue::Scalar(value) => Ok(EvalValue::Scalar(-value)), EvalValue::Vector(mut series) => { for entry in &mut series { for sample in &mut entry.samples { sample.value = -sample.value; } } Ok(EvalValue::Vector(series)) } } } fn matches_selector(&self, ts: &TimeSeries, selector: &VectorSelector) -> bool { if let Some(name) = selector.name.as_deref() { if ts.name() != Some(name) { return false; } } self.matches_matchers(ts, &selector.matchers) } fn matches_matchers(&self, ts: &TimeSeries, matchers: &Matchers) -> bool { if !matchers .matchers .iter() .all(|matcher| self.matcher_matches(ts, matcher)) { return false; } if matchers.or_matchers.is_empty() { return true; } matchers.or_matchers.iter().any(|group| { group .iter() .all(|matcher| self.matcher_matches(ts, matcher)) }) } fn matcher_matches(&self, ts: &TimeSeries, matcher: &promql_parser::label::Matcher) -> bool { let label_value = ts.get_label(&matcher.name); match &matcher.op { MatchOp::Equal => label_value == Some(matcher.value.as_str()), MatchOp::NotEqual => match label_value { Some(value) => value != matcher.value, None => true, }, MatchOp::Re(regex) => label_value.is_some_and(|value| regex.is_match(value)), MatchOp::NotRe(regex) => match label_value { Some(value) => !regex.is_match(value), None => true, }, } } pub async fn series_metadata( &self, matchers: &[String], start: Option, end: Option, ) -> Result>> { let started = self.metrics.begin_query(); let storage = self.storage.read().await; let series = self.matching_series(&storage, matchers, start, end); let result = Ok(series .into_iter() .map(|ts| { ts.labels .iter() .map(|label| (label.name.clone(), label.value.clone())) .collect() }) .collect()); self.metrics.finish_query(started, true); result } pub async fn label_values_for_matchers( &self, label_name: &str, matchers: &[String], start: Option, end: Option, ) -> Result> { let started = self.metrics.begin_query(); let storage = self.storage.read().await; let mut values: Vec = self .matching_series(&storage, matchers, start, end) .into_iter() .filter_map(|series| series.get_label(label_name).map(str::to_string)) .collect(); values.sort(); values.dedup(); self.metrics.finish_query(started, true); Ok(values) } fn matching_series( &self, storage: &QueryableStorage, matchers: &[String], start: Option, end: Option, ) -> Vec { let parsed_matchers = parse_label_matchers(matchers); storage .series .values() .filter(|series| series_matches(series, &parsed_matchers)) .filter(|series| series_in_time_range(series, start, end)) .cloned() .collect() } } impl QueryMetrics { fn new() -> Self { Self { queries_total: AtomicU64::new(0), queries_failed: AtomicU64::new(0), queries_active: AtomicU64::new(0), durations_ms: Mutex::new(VecDeque::with_capacity(QUERY_DURATION_HISTORY_LIMIT)), } } fn begin_query(&self) -> Instant { self.queries_total.fetch_add(1, Ordering::Relaxed); self.queries_active.fetch_add(1, Ordering::Relaxed); Instant::now() } fn finish_query(&self, started: Instant, success: bool) { if !success { self.queries_failed.fetch_add(1, Ordering::Relaxed); } self.queries_active.fetch_sub(1, Ordering::Relaxed); let elapsed_ms = started.elapsed().as_millis() as u64; let mut durations = self.durations_ms.lock(); if durations.len() >= QUERY_DURATION_HISTORY_LIMIT { durations.pop_front(); } durations.push_back(elapsed_ms); } pub fn snapshot(&self) -> QueryMetricsSnapshot { let mut sorted_durations: Vec = self.durations_ms.lock().iter().copied().collect(); sorted_durations.sort_unstable(); QueryMetricsSnapshot { queries_total: self.queries_total.load(Ordering::Relaxed), queries_failed: self.queries_failed.load(Ordering::Relaxed), queries_active: self.queries_active.load(Ordering::Relaxed), query_duration_p50: percentile(&sorted_durations, 0.50), query_duration_p95: percentile(&sorted_durations, 0.95), query_duration_p99: percentile(&sorted_durations, 0.99), } } } impl QueryableStorage { /// Add or update a time series in storage pub fn upsert_series(&mut self, series: TimeSeries) { // Update label index for label in &series.labels { let series_ids = self .label_index .entry(label.name.clone()) .or_default() .entry(label.value.clone()) .or_default(); if !series_ids.contains(&series.id) { series_ids.push(series.id); } } // Upsert series self.series .entry(series.id) .and_modify(|existing| { // Merge samples (append new samples) existing.samples.extend(series.samples.clone()); // Sort by timestamp existing.samples.sort_by_key(|s| s.timestamp); // Deduplicate existing.samples.dedup_by_key(|s| s.timestamp); }) .or_insert(series); } /// Get label values for a specific label name #[cfg(test)] pub fn label_values(&self, label_name: &str) -> Vec { let mut values: Vec = self .label_index .get(label_name) .map(|values| values.keys().cloned().collect()) .unwrap_or_default(); values.sort(); values } pub fn rebuild_index(&mut self) { self.label_index.clear(); let series: Vec = self.series.values().cloned().collect(); for series in series { for label in &series.labels { self.label_index .entry(label.name.clone()) .or_default() .entry(label.value.clone()) .or_default() .push(series.id); } } } pub fn prune_before(&mut self, cutoff: i64) -> usize { let mut removed_samples = 0usize; self.series.retain(|_, series| { let before = series.samples.len(); series.samples.retain(|sample| sample.timestamp >= cutoff); removed_samples += before.saturating_sub(series.samples.len()); !series.samples.is_empty() }); self.rebuild_index(); removed_samples } } fn expect_vector(value: EvalValue, message: &str) -> Result> { match value { EvalValue::Vector(series) => Ok(series), EvalValue::Scalar(_) => Err(Error::Query(message.to_string())), } } fn expect_scalar(value: EvalValue, message: &str) -> Result { match value { EvalValue::Scalar(value) => Ok(value), EvalValue::Vector(_) => Err(Error::Query(message.to_string())), } } fn instant_query_from_eval_value(value: EvalValue, time: i64) -> QueryResult { match value { EvalValue::Scalar(value) => QueryResult { result_type: "scalar".to_string(), result: vec![InstantQueryResult { metric: HashMap::new(), value: Some((time, value)), }], }, EvalValue::Vector(series) => QueryResult { result_type: "vector".to_string(), result: series .into_iter() .map(|ts| InstantQueryResult { metric: labels_to_map(&ts.labels), value: ts .samples .last() .map(|sample| (sample.timestamp, sample.value)), }) .collect(), }, } } fn append_range_step_result( results: &mut HashMap, step_result: EvalValue, time: i64, ) { match step_result { EvalValue::Scalar(value) => { let entry = results .entry("__scalar__".to_string()) .or_insert_with(|| RangeResult { metric: HashMap::new(), values: Vec::new(), }); entry.values.push((time, value)); } EvalValue::Vector(series) => { for ts in series { let key = labels_key(&ts.labels); let metric = labels_to_map(&ts.labels); let entry = results.entry(key).or_insert_with(|| RangeResult { metric, values: Vec::new(), }); for sample in ts.samples { entry.values.push((sample.timestamp, sample.value)); } } } } } fn labels_to_map(labels: &[Label]) -> HashMap { labels .iter() .map(|label| (label.name.clone(), label.value.clone())) .collect() } fn normalize_labels(mut labels: Vec