//! Integration tests for Prometheus remote_write ingestion endpoint //! //! These tests verify the end-to-end functionality of the ingestion service, //! including HTTP handling, snappy compression, and protobuf encoding. use axum::body::Body; use axum::http::{Request, StatusCode}; use metricstor_api::prometheus::{Label, Sample, TimeSeries, WriteRequest}; use prost::Message; use snap::raw::Encoder as SnappyEncoder; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::RwLock; use tower::ServiceExt; // For oneshot use metricstor_server::query::QueryableStorage; /// Helper function to create shared storage for tests fn create_shared_storage() -> Arc> { Arc::new(RwLock::new(QueryableStorage { series: HashMap::new(), label_index: HashMap::new(), })) } /// Helper function to create a valid WriteRequest fn create_test_write_request() -> WriteRequest { WriteRequest { timeseries: vec![TimeSeries { labels: vec![ Label { name: "__name__".to_string(), value: "test_metric".to_string(), }, Label { name: "job".to_string(), value: "test".to_string(), }, Label { name: "instance".to_string(), value: "localhost:9090".to_string(), }, ], samples: vec![ Sample { value: 42.0, timestamp: 1234567890000, }, Sample { value: 43.0, timestamp: 1234567891000, }, ], }], } } /// Helper function to encode and compress a WriteRequest fn encode_and_compress(request: &WriteRequest) -> Vec { // Encode to protobuf let mut buf = Vec::new(); request.encode(&mut buf).unwrap(); // Compress with snappy let mut encoder = SnappyEncoder::new(); encoder.compress_vec(&buf).unwrap() } #[tokio::test] async fn test_remote_write_valid_request() { // Create a valid WriteRequest let write_request = create_test_write_request(); // Encode and compress let compressed = encode_and_compress(&write_request); // Create HTTP request let request = Request::builder() .method("POST") .uri("/api/v1/write") .header("Content-Type", "application/x-protobuf") .header("Content-Encoding", "snappy") .body(Body::from(compressed)) .unwrap(); // Create service and send request let service = metricstor_server::ingestion::IngestionService::new(create_shared_storage()); let app = service.router(); let response = app.oneshot(request).await.unwrap(); // Should return 204 No Content on success assert_eq!(response.status(), StatusCode::NO_CONTENT); } #[tokio::test] async fn test_remote_write_missing_name_label() { // Create WriteRequest without __name__ label let write_request = WriteRequest { timeseries: vec![TimeSeries { labels: vec![Label { name: "job".to_string(), value: "test".to_string(), }], samples: vec![Sample { value: 42.0, timestamp: 1234567890000, }], }], }; let compressed = encode_and_compress(&write_request); let request = Request::builder() .method("POST") .uri("/api/v1/write") .body(Body::from(compressed)) .unwrap(); let service = metricstor_server::ingestion::IngestionService::new(create_shared_storage()); let app = service.router(); let response = app.oneshot(request).await.unwrap(); // Should return 400 Bad Request for invalid labels assert_eq!(response.status(), StatusCode::BAD_REQUEST); } #[tokio::test] async fn test_remote_write_invalid_label_name() { // Create WriteRequest with invalid label name (starts with digit) let write_request = WriteRequest { timeseries: vec![TimeSeries { labels: vec![ Label { name: "__name__".to_string(), value: "test_metric".to_string(), }, Label { name: "123invalid".to_string(), // Invalid: starts with digit value: "value".to_string(), }, ], samples: vec![Sample { value: 42.0, timestamp: 1234567890000, }], }], }; let compressed = encode_and_compress(&write_request); let request = Request::builder() .method("POST") .uri("/api/v1/write") .body(Body::from(compressed)) .unwrap(); let service = metricstor_server::ingestion::IngestionService::new(create_shared_storage()); let app = service.router(); let response = app.oneshot(request).await.unwrap(); // Should return 400 Bad Request for invalid label name assert_eq!(response.status(), StatusCode::BAD_REQUEST); } #[tokio::test] async fn test_remote_write_invalid_protobuf() { // Send invalid protobuf data (but properly snappy-compressed) let invalid_data = b"this is not valid protobuf data"; let mut encoder = SnappyEncoder::new(); let compressed = encoder.compress_vec(invalid_data).unwrap(); let request = Request::builder() .method("POST") .uri("/api/v1/write") .body(Body::from(compressed)) .unwrap(); let service = metricstor_server::ingestion::IngestionService::new(create_shared_storage()); let app = service.router(); let response = app.oneshot(request).await.unwrap(); // Should return 400 Bad Request for invalid protobuf assert_eq!(response.status(), StatusCode::BAD_REQUEST); } #[tokio::test] async fn test_remote_write_invalid_snappy() { // Send data that's not snappy-compressed let invalid_data = b"not snappy compressed data"; let request = Request::builder() .method("POST") .uri("/api/v1/write") .body(Body::from(invalid_data.to_vec())) .unwrap(); let service = metricstor_server::ingestion::IngestionService::new(create_shared_storage()); let app = service.router(); let response = app.oneshot(request).await.unwrap(); // Should return 400 Bad Request for invalid snappy compression assert_eq!(response.status(), StatusCode::BAD_REQUEST); } #[tokio::test] async fn test_remote_write_multiple_series() { // Create WriteRequest with multiple time series let write_request = WriteRequest { timeseries: vec![ TimeSeries { labels: vec![ Label { name: "__name__".to_string(), value: "http_requests_total".to_string(), }, Label { name: "method".to_string(), value: "GET".to_string(), }, ], samples: vec![Sample { value: 100.0, timestamp: 1234567890000, }], }, TimeSeries { labels: vec![ Label { name: "__name__".to_string(), value: "http_requests_total".to_string(), }, Label { name: "method".to_string(), value: "POST".to_string(), }, ], samples: vec![Sample { value: 50.0, timestamp: 1234567890000, }], }, ], }; let compressed = encode_and_compress(&write_request); let request = Request::builder() .method("POST") .uri("/api/v1/write") .body(Body::from(compressed)) .unwrap(); let service = metricstor_server::ingestion::IngestionService::new(create_shared_storage()); let app = service.router(); let response = app.oneshot(request).await.unwrap(); // Should return 204 No Content on success assert_eq!(response.status(), StatusCode::NO_CONTENT); } #[tokio::test] async fn test_remote_write_nan_value() { // Create WriteRequest with NaN value (should be rejected) let write_request = WriteRequest { timeseries: vec![TimeSeries { labels: vec![ Label { name: "__name__".to_string(), value: "test_metric".to_string(), }, ], samples: vec![Sample { value: f64::NAN, // Invalid value timestamp: 1234567890000, }], }], }; let compressed = encode_and_compress(&write_request); let request = Request::builder() .method("POST") .uri("/api/v1/write") .body(Body::from(compressed)) .unwrap(); let service = metricstor_server::ingestion::IngestionService::new(create_shared_storage()); let app = service.router(); let response = app.oneshot(request).await.unwrap(); // NaN values are filtered out, but request still succeeds // (just with 0 samples ingested) assert_eq!(response.status(), StatusCode::NO_CONTENT); } #[tokio::test] async fn test_buffer_stats() { let service = metricstor_server::ingestion::IngestionService::new(create_shared_storage()); // Initially buffer should be empty let (samples, series) = service.storage_stats().await; assert_eq!(samples, 0); assert_eq!(series, 0); // Send a write request let write_request = create_test_write_request(); let compressed = encode_and_compress(&write_request); let request = Request::builder() .method("POST") .uri("/api/v1/write") .body(Body::from(compressed)) .unwrap(); let app = service.clone().router(); let response = app.oneshot(request).await.unwrap(); assert_eq!(response.status(), StatusCode::NO_CONTENT); // Buffer should now contain samples let (samples, series) = service.storage_stats().await; assert_eq!(samples, 2); // 2 samples in the test request assert_eq!(series, 1); // 1 time series }