photoncloud-monorepo/metricstor/crates/metricstor-server/tests/integration_test.rs
centra 5c6eb04a46 T036: Add VM cluster deployment configs for nixos-anywhere
- netboot-base.nix with SSH key auth
- Launch scripts for node01/02/03
- Node configuration.nix and disko.nix
- Nix modules for first-boot automation

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2025-12-11 09:59:19 +09:00

199 lines
6.8 KiB
Rust

//! Integration tests for Metricstor
//!
//! Tests the full roundtrip: ingestion -> storage -> query
use axum::{
body::Body,
http::{Request, StatusCode},
};
use metricstor_api::prometheus::{Label, Sample, TimeSeries as ProtoTimeSeries, WriteRequest};
use prost::Message;
use snap::raw::Encoder as SnappyEncoder;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use tower::ServiceExt; // for oneshot
use metricstor_server::{ingestion::IngestionService, query::QueryService, query::QueryableStorage};
/// Helper function to create a snappy-compressed protobuf WriteRequest
fn create_write_request_body(metric_name: &str, value: f64, timestamp: i64) -> Vec<u8> {
let write_request = WriteRequest {
timeseries: vec![ProtoTimeSeries {
labels: vec![
Label {
name: "__name__".to_string(),
value: metric_name.to_string(),
},
Label {
name: "job".to_string(),
value: "test".to_string(),
},
Label {
name: "instance".to_string(),
value: "localhost:9090".to_string(),
},
],
samples: vec![Sample { timestamp, value }],
}],
};
// Encode to protobuf
let mut buf = Vec::new();
write_request.encode(&mut buf).unwrap();
// Compress with snappy
let mut encoder = SnappyEncoder::new();
let compressed_len = snap::raw::max_compress_len(buf.len());
let mut compressed = vec![0u8; compressed_len];
let compressed_size = encoder.compress(&buf, &mut compressed).unwrap();
compressed.truncate(compressed_size);
compressed
}
#[tokio::test]
async fn test_ingestion_query_roundtrip() {
// Create shared storage
let shared_storage = Arc::new(RwLock::new(QueryableStorage {
series: HashMap::new(),
label_index: HashMap::new(),
}));
// Create services with shared storage
let ingestion_service = IngestionService::new(shared_storage.clone());
let query_service = QueryService::from_storage(shared_storage.clone());
// Create routers
let ingestion_router = ingestion_service.router();
let query_router = query_service.router();
// Merge routers
let app = ingestion_router.merge(query_router);
// Step 1: Ingest a metric
let metric_name = "test_metric_total";
let timestamp = 1234567890000i64; // milliseconds
let value = 42.5;
let write_body = create_write_request_body(metric_name, value, timestamp);
let write_request = Request::builder()
.method("POST")
.uri("/api/v1/write")
.header("Content-Type", "application/x-protobuf")
.body(Body::from(write_body))
.unwrap();
let write_response = app.clone().oneshot(write_request).await.unwrap();
// Verify ingestion succeeded
assert_eq!(write_response.status(), StatusCode::NO_CONTENT);
// Step 2: Query the metric back
let query_request = Request::builder()
.method("GET")
.uri(format!("/api/v1/query?query={}&time={}", metric_name, timestamp))
.body(Body::empty())
.unwrap();
let query_response = app.oneshot(query_request).await.unwrap();
// Verify query succeeded
assert_eq!(query_response.status(), StatusCode::OK);
// Parse response body
let body_bytes = axum::body::to_bytes(query_response.into_body(), usize::MAX)
.await
.unwrap();
let response_json: serde_json::Value = serde_json::from_slice(&body_bytes).unwrap();
// Verify response structure
assert_eq!(response_json["status"], "success");
assert!(response_json["data"].is_object());
let data = &response_json["data"];
assert_eq!(data["resultType"], "vector");
assert!(data["result"].is_array());
// Verify we got our metric back
let results = data["result"].as_array().unwrap();
assert_eq!(results.len(), 1, "Expected 1 result, got {}", results.len());
let result = &results[0];
assert!(result["metric"].is_object());
assert!(result["value"].is_array());
// Verify metric labels
let metric = result["metric"].as_object().unwrap();
assert_eq!(metric["__name__"], metric_name);
assert_eq!(metric["job"], "test");
assert_eq!(metric["instance"], "localhost:9090");
// Verify value
let value_array = result["value"].as_array().unwrap();
assert_eq!(value_array.len(), 2); // [timestamp, value]
assert_eq!(value_array[0].as_i64().unwrap(), timestamp);
assert_eq!(value_array[1].as_f64().unwrap(), value);
println!("✓ Integration test passed: ingestion → query roundtrip works!");
}
#[tokio::test]
async fn test_multiple_metrics_roundtrip() {
// Create shared storage
let shared_storage = Arc::new(RwLock::new(QueryableStorage {
series: HashMap::new(),
label_index: HashMap::new(),
}));
// Create services with shared storage
let ingestion_service = IngestionService::new(shared_storage.clone());
let query_service = QueryService::from_storage(shared_storage.clone());
// Create routers
let app = ingestion_service.router().merge(query_service.router());
// Ingest multiple metrics with different timestamps
let metric_name = "http_requests_total";
let timestamps = vec![1000, 2000, 3000, 4000, 5000];
let values = vec![10.0, 20.0, 30.0, 40.0, 50.0];
for (timestamp, value) in timestamps.iter().zip(values.iter()) {
let write_body = create_write_request_body(metric_name, *value, *timestamp);
let write_request = Request::builder()
.method("POST")
.uri("/api/v1/write")
.header("Content-Type", "application/x-protobuf")
.body(Body::from(write_body))
.unwrap();
let write_response = app.clone().oneshot(write_request).await.unwrap();
assert_eq!(write_response.status(), StatusCode::NO_CONTENT);
}
// Query the latest value
let query_request = Request::builder()
.method("GET")
.uri(format!("/api/v1/query?query={}&time=5000", metric_name))
.body(Body::empty())
.unwrap();
let query_response = app.oneshot(query_request).await.unwrap();
assert_eq!(query_response.status(), StatusCode::OK);
let body_bytes = axum::body::to_bytes(query_response.into_body(), usize::MAX)
.await
.unwrap();
let response_json: serde_json::Value = serde_json::from_slice(&body_bytes).unwrap();
// Verify we got the latest value
assert_eq!(response_json["status"], "success");
let results = response_json["data"]["result"].as_array().unwrap();
assert_eq!(results.len(), 1);
let value_array = results[0]["value"].as_array().unwrap();
assert_eq!(value_array[1].as_f64().unwrap(), 50.0);
println!("✓ Multiple metrics roundtrip test passed!");
}