//! Prometheus remote_write ingestion endpoint //! //! Implements the Prometheus remote_write protocol v1.0 for push-based //! metric ingestion with snappy compression and protobuf encoding. use axum::{ body::Bytes, extract::State, http::StatusCode, response::{IntoResponse, Response}, routing::post, Router, }; use nightlight_api::prometheus::{Label, WriteRequest}; use nightlight_types::Error; use prost::Message; use snap::raw::Decoder as SnappyDecoder; use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; use std::time::Instant; use tracing::{debug, error, info, warn}; use crate::storage::Storage; /// Maximum write request size (10 MB uncompressed) const MAX_REQUEST_SIZE: usize = 10 * 1024 * 1024; /// Ingestion service state #[derive(Clone)] pub struct IngestionService { storage: Arc, metrics: Arc, } #[derive(Debug)] pub struct IngestionMetrics { samples_received: AtomicU64, samples_invalid: AtomicU64, requests_total: AtomicU64, requests_failed: AtomicU64, started_at: Instant, } #[derive(Debug, Clone, Copy, Default)] pub struct IngestionMetricsSnapshot { pub samples_ingested_total: u64, pub write_requests_total: u64, pub write_requests_failed: u64, pub samples_per_second: f64, pub buffer_samples: u64, } impl IngestionService { pub fn new(storage: Arc) -> Self { Self { storage, metrics: Arc::new(IngestionMetrics::new()), } } /// Create Axum router for ingestion endpoints pub fn router(self) -> Router { Router::new() .route("/api/v1/write", post(handle_remote_write)) .with_state(self) } pub fn metrics(&self) -> Arc { Arc::clone(&self.metrics) } /// Process a WriteRequest and write to shared storage pub(crate) async fn process_write_request(&self, request: WriteRequest) -> Result { let mut samples_processed = 0; let mut series_to_append = Vec::new(); for ts in request.timeseries { // Validate and normalize labels let labels = validate_labels(ts.labels)?; // Convert to internal types let internal_labels: Vec = labels .into_iter() .map(|l| nightlight_types::Label { name: l.name, value: l.value, }) .collect(); // Process samples let mut internal_samples = Vec::new(); for sample in ts.samples { // Validate sample if !sample.value.is_finite() { warn!("Invalid sample value: {}", sample.value); self.metrics.samples_invalid.fetch_add(1, Ordering::Relaxed); continue; } // Convert to internal type let internal_sample = nightlight_types::Sample { timestamp: sample.timestamp, value: sample.value, }; internal_samples.push(internal_sample); samples_processed += 1; } // Skip if no valid samples if internal_samples.is_empty() { continue; } // Store series with samples in shared storage let series_id = nightlight_types::SeriesId( compute_series_fingerprint(&internal_labels) ); let time_series = nightlight_types::TimeSeries { id: series_id, labels: internal_labels, samples: internal_samples, }; series_to_append.push(time_series); } self.storage .append(series_to_append) .await .map_err(|error| Error::Storage(error.to_string()))?; self.metrics .samples_received .fetch_add(samples_processed, Ordering::Relaxed); Ok(samples_processed) } /// Get current storage statistics #[cfg(test)] pub async fn storage_stats(&self) -> Result<(usize, usize), Error> { let stats = self .storage .stats() .await .map_err(|error| Error::Storage(error.to_string()))?; Ok((stats.total_samples as usize, stats.active_series as usize)) } } impl IngestionMetrics { fn new() -> Self { Self { samples_received: AtomicU64::new(0), samples_invalid: AtomicU64::new(0), requests_total: AtomicU64::new(0), requests_failed: AtomicU64::new(0), started_at: Instant::now(), } } pub fn snapshot(&self) -> IngestionMetricsSnapshot { let uptime = self.started_at.elapsed().as_secs_f64(); let samples_ingested_total = self.samples_received.load(Ordering::Relaxed); IngestionMetricsSnapshot { samples_ingested_total, write_requests_total: self.requests_total.load(Ordering::Relaxed), write_requests_failed: self.requests_failed.load(Ordering::Relaxed), samples_per_second: if uptime > 0.0 { samples_ingested_total as f64 / uptime } else { 0.0 }, buffer_samples: 0, } } } /// Axum handler for /api/v1/write endpoint async fn handle_remote_write( State(service): State, body: Bytes, ) -> Response { service.metrics.requests_total.fetch_add(1, Ordering::Relaxed); debug!("Received remote_write request, size: {} bytes", body.len()); // Check request size if body.len() > MAX_REQUEST_SIZE { warn!("Request too large: {} bytes", body.len()); return IngestionError::PayloadTooLarge.into_response(); } // Decompress snappy-encoded payload let decompressed = match decompress_snappy(&body) { Ok(data) => data, Err(e) => { error!("Snappy decompression failed: {}", e); return IngestionError::DecompressionFailed.into_response(); } }; debug!("Decompressed payload: {} bytes", decompressed.len()); // Decode protobuf WriteRequest let write_request = match WriteRequest::decode(&decompressed[..]) { Ok(req) => req, Err(e) => { error!("Protobuf decode failed: {}", e); return IngestionError::InvalidProtobuf.into_response(); } }; info!( "Decoded WriteRequest with {} time series", write_request.timeseries.len() ); // Process the request match service.process_write_request(write_request).await { Ok(samples_count) => { info!("Successfully ingested {} samples", samples_count); (StatusCode::NO_CONTENT, "").into_response() } Err(Error::Storage(msg)) if msg.contains("buffer full") => { warn!("Write buffer full, returning 429"); service.metrics.requests_failed.fetch_add(1, Ordering::Relaxed); IngestionError::Backpressure.into_response() } Err(Error::InvalidLabel(msg)) => { warn!("Invalid labels: {}", msg); service.metrics.requests_failed.fetch_add(1, Ordering::Relaxed); IngestionError::InvalidLabels.into_response() } Err(e) => { error!("Failed to process write request: {}", e); service.metrics.requests_failed.fetch_add(1, Ordering::Relaxed); IngestionError::StorageError.into_response() } } } /// Decompress snappy-encoded data fn decompress_snappy(compressed: &[u8]) -> Result, Error> { let mut decoder = SnappyDecoder::new(); let decompressed_len = snap::raw::decompress_len(compressed) .map_err(|e| Error::InvalidMetric(format!("Invalid snappy data: {}", e)))?; let mut decompressed = vec![0u8; decompressed_len]; decoder .decompress(compressed, &mut decompressed) .map_err(|e| Error::InvalidMetric(format!("Snappy decompression failed: {}", e)))?; Ok(decompressed) } /// Validate and normalize Prometheus labels fn validate_labels(labels: Vec