nightlight: align metadata matcher semantics
This commit is contained in:
parent
9b26deee9b
commit
b07bcb3772
1 changed files with 169 additions and 50 deletions
|
|
@ -19,6 +19,7 @@ use promql_parser::{
|
|||
NumberLiteral, UnaryExpr, VectorMatchCardinality, VectorSelector,
|
||||
},
|
||||
};
|
||||
use regex::Regex;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::{BTreeMap, HashMap, VecDeque};
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
|
|
@ -72,6 +73,13 @@ enum EvalValue {
|
|||
Scalar(f64),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct MetadataMatcher {
|
||||
name: String,
|
||||
op: MatchOp,
|
||||
value: String,
|
||||
}
|
||||
|
||||
impl QueryService {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
|
|
@ -751,20 +759,7 @@ impl QueryService {
|
|||
}
|
||||
|
||||
fn matcher_matches(&self, ts: &TimeSeries, matcher: &promql_parser::label::Matcher) -> bool {
|
||||
let label_value = ts.get_label(&matcher.name);
|
||||
|
||||
match &matcher.op {
|
||||
MatchOp::Equal => label_value == Some(matcher.value.as_str()),
|
||||
MatchOp::NotEqual => match label_value {
|
||||
Some(value) => value != matcher.value,
|
||||
None => true,
|
||||
},
|
||||
MatchOp::Re(regex) => label_value.is_some_and(|value| regex.is_match(value)),
|
||||
MatchOp::NotRe(regex) => match label_value {
|
||||
Some(value) => !regex.is_match(value),
|
||||
None => true,
|
||||
},
|
||||
}
|
||||
label_matcher_matches(ts.get_label(&matcher.name), &matcher.op, &matcher.value)
|
||||
}
|
||||
|
||||
pub async fn series_metadata(
|
||||
|
|
@ -776,16 +771,19 @@ impl QueryService {
|
|||
let started = self.metrics.begin_query();
|
||||
let storage = self.storage.read().await;
|
||||
let series = self.matching_series(&storage, matchers, start, end);
|
||||
let result = Ok(series
|
||||
.into_iter()
|
||||
.map(|ts| {
|
||||
ts.labels
|
||||
.iter()
|
||||
.map(|label| (label.name.clone(), label.value.clone()))
|
||||
.collect()
|
||||
})
|
||||
.collect());
|
||||
self.metrics.finish_query(started, true);
|
||||
let success = series.is_ok();
|
||||
let result = series.map(|series| {
|
||||
series
|
||||
.into_iter()
|
||||
.map(|ts| {
|
||||
ts.labels
|
||||
.iter()
|
||||
.map(|label| (label.name.clone(), label.value.clone()))
|
||||
.collect()
|
||||
})
|
||||
.collect()
|
||||
});
|
||||
self.metrics.finish_query(started, success);
|
||||
result
|
||||
}
|
||||
|
||||
|
|
@ -798,15 +796,19 @@ impl QueryService {
|
|||
) -> Result<Vec<String>> {
|
||||
let started = self.metrics.begin_query();
|
||||
let storage = self.storage.read().await;
|
||||
let mut values: Vec<String> = self
|
||||
let values = self
|
||||
.matching_series(&storage, matchers, start, end)
|
||||
.into_iter()
|
||||
.filter_map(|series| series.get_label(label_name).map(str::to_string))
|
||||
.collect();
|
||||
values.sort();
|
||||
values.dedup();
|
||||
self.metrics.finish_query(started, true);
|
||||
Ok(values)
|
||||
.map(|series| {
|
||||
let mut values: Vec<String> = series
|
||||
.into_iter()
|
||||
.filter_map(|series| series.get_label(label_name).map(str::to_string))
|
||||
.collect();
|
||||
values.sort();
|
||||
values.dedup();
|
||||
values
|
||||
});
|
||||
self.metrics.finish_query(started, values.is_ok());
|
||||
values
|
||||
}
|
||||
|
||||
fn matching_series(
|
||||
|
|
@ -815,15 +817,15 @@ impl QueryService {
|
|||
matchers: &[String],
|
||||
start: Option<i64>,
|
||||
end: Option<i64>,
|
||||
) -> Vec<TimeSeries> {
|
||||
let parsed_matchers = parse_label_matchers(matchers);
|
||||
storage
|
||||
) -> Result<Vec<TimeSeries>> {
|
||||
let parsed_matchers = parse_metadata_matchers(matchers)?;
|
||||
Ok(storage
|
||||
.series
|
||||
.values()
|
||||
.filter(|series| series_matches(series, &parsed_matchers))
|
||||
.filter(|series| series_in_time_range(series, start, end))
|
||||
.cloned()
|
||||
.collect()
|
||||
.collect())
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1330,28 +1332,65 @@ fn percentile(values: &[u64], quantile: f64) -> f64 {
|
|||
values[index.min(values.len() - 1)] as f64
|
||||
}
|
||||
|
||||
fn parse_label_matchers(matchers: &[String]) -> Vec<(String, String)> {
|
||||
fn parse_metadata_matchers(matchers: &[String]) -> Result<Vec<MetadataMatcher>> {
|
||||
matchers
|
||||
.iter()
|
||||
.filter_map(|matcher| matcher.split_once('='))
|
||||
.map(|(key, value)| {
|
||||
(
|
||||
key.trim().to_string(),
|
||||
value.trim().trim_matches('"').to_string(),
|
||||
)
|
||||
})
|
||||
.map(|matcher| parse_metadata_matcher(matcher))
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn series_matches(series: &TimeSeries, matchers: &[(String, String)]) -> bool {
|
||||
matchers.iter().all(|(key, value)| {
|
||||
series
|
||||
.labels
|
||||
.iter()
|
||||
.any(|label| &label.name == key && &label.value == value)
|
||||
fn parse_metadata_matcher(input: &str) -> Result<MetadataMatcher> {
|
||||
for operator in ["!~", "=~", "!=", "="] {
|
||||
if let Some((name, value)) = input.split_once(operator) {
|
||||
let name = name.trim();
|
||||
let raw_value = value.trim().trim_matches('"');
|
||||
if name.is_empty() {
|
||||
return Err(Error::Query(format!(
|
||||
"invalid metadata matcher {input:?}: empty label name"
|
||||
)));
|
||||
}
|
||||
|
||||
let op = match operator {
|
||||
"=" => MatchOp::Equal,
|
||||
"!=" => MatchOp::NotEqual,
|
||||
"=~" => MatchOp::Re(Regex::new(raw_value).map_err(|error| {
|
||||
Error::Query(format!("invalid regex in matcher {input:?}: {error}"))
|
||||
})?),
|
||||
"!~" => MatchOp::NotRe(Regex::new(raw_value).map_err(|error| {
|
||||
Error::Query(format!("invalid regex in matcher {input:?}: {error}"))
|
||||
})?),
|
||||
_ => unreachable!(),
|
||||
};
|
||||
|
||||
return Ok(MetadataMatcher {
|
||||
name: name.to_string(),
|
||||
op,
|
||||
value: raw_value.to_string(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Err(Error::Query(format!(
|
||||
"invalid metadata matcher {input:?}: expected one of =, !=, =~, !~"
|
||||
)))
|
||||
}
|
||||
|
||||
fn series_matches(series: &TimeSeries, matchers: &[MetadataMatcher]) -> bool {
|
||||
matchers.iter().all(|matcher| {
|
||||
label_matcher_matches(series.get_label(&matcher.name), &matcher.op, &matcher.value)
|
||||
})
|
||||
}
|
||||
|
||||
fn label_matcher_matches(label_value: Option<&str>, op: &MatchOp, expected: &str) -> bool {
|
||||
let value = label_value.unwrap_or("");
|
||||
match op {
|
||||
MatchOp::Equal => value == expected,
|
||||
MatchOp::NotEqual => value != expected,
|
||||
MatchOp::Re(regex) => regex.is_match(value),
|
||||
MatchOp::NotRe(regex) => !regex.is_match(value),
|
||||
}
|
||||
}
|
||||
|
||||
fn series_in_time_range(series: &TimeSeries, start: Option<i64>, end: Option<i64>) -> bool {
|
||||
let Some((series_start, series_end)) = series.time_range() else {
|
||||
return true;
|
||||
|
|
@ -2033,6 +2072,86 @@ mod tests {
|
|||
.any(|value| value == "test_job"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_metadata_queries_support_regex_and_negative_matchers() {
|
||||
let service = QueryService::new();
|
||||
seed_series(
|
||||
&service,
|
||||
vec![
|
||||
test_series(
|
||||
1,
|
||||
&[
|
||||
("__name__", "http_requests_total"),
|
||||
("job", "api"),
|
||||
("instance", "a"),
|
||||
],
|
||||
&[(1000, 1.0)],
|
||||
),
|
||||
test_series(
|
||||
2,
|
||||
&[
|
||||
("__name__", "http_requests_total"),
|
||||
("job", "worker"),
|
||||
("instance", "b"),
|
||||
],
|
||||
&[(1000, 1.0)],
|
||||
),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
|
||||
let series = service
|
||||
.series_metadata(&["job=~\"a.*|api\"".to_string()], None, None)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(series.len(), 1);
|
||||
assert_eq!(series[0].get("job"), Some(&"api".to_string()));
|
||||
|
||||
let values = service
|
||||
.label_values_for_matchers("instance", &["job!=\"worker\"".to_string()], None, None)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(values, vec!["a".to_string()]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_empty_label_matchers_treat_missing_labels_as_empty() {
|
||||
let service = QueryService::new();
|
||||
seed_series(
|
||||
&service,
|
||||
vec![test_series(
|
||||
1,
|
||||
&[("__name__", "up"), ("job", "api")],
|
||||
&[(1000, 1.0)],
|
||||
)],
|
||||
)
|
||||
.await;
|
||||
|
||||
let equal_empty = service
|
||||
.execute_instant_query("up{zone=\"\"}", 1000)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(equal_empty.result.len(), 1);
|
||||
|
||||
let regex_empty = service
|
||||
.execute_instant_query("up{zone=~\".*\"}", 1000)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(regex_empty.result.len(), 1);
|
||||
|
||||
let not_equal_empty = service
|
||||
.execute_instant_query("up{zone!=\"\"}", 1000)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(not_equal_empty.result.is_empty());
|
||||
|
||||
let metadata = service
|
||||
.series_metadata(&["zone=\"\"".to_string()], None, None)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(metadata.len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_persistence_save_load_empty() {
|
||||
use tempfile::tempdir;
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue