- Replace form_urlencoded with RFC 3986 compliant URI encoding - Implement aws_uri_encode() matching AWS SigV4 spec exactly - Unreserved chars (A-Z,a-z,0-9,-,_,.,~) not encoded - All other chars percent-encoded with uppercase hex - Preserve slashes in paths, encode in query params - Normalize empty paths to '/' per AWS spec - Fix test expectations (body hash, HMAC values) - Add comprehensive SigV4 signature determinism test This fixes the canonicalization mismatch that caused signature validation failures in T047. Auth can now be enabled for production. Refs: T058.S1
156 lines
4.7 KiB
Rust
156 lines
4.7 KiB
Rust
//! Example: Push metrics to Nightlight using Prometheus remote_write
|
|
//!
|
|
//! This example demonstrates how to send metrics to a Nightlight server
|
|
//! using the Prometheus remote_write protocol with snappy compression
|
|
//! and protobuf encoding.
|
|
//!
|
|
//! # Usage
|
|
//!
|
|
//! 1. Start the Nightlight server:
|
|
//! ```bash
|
|
//! cargo run --bin nightlight-server
|
|
//! ```
|
|
//!
|
|
//! 2. In another terminal, run this example:
|
|
//! ```bash
|
|
//! cargo run --example push_metrics
|
|
//! ```
|
|
//!
|
|
//! # Protocol
|
|
//!
|
|
//! The remote_write protocol involves:
|
|
//! 1. Create WriteRequest protobuf with time series data
|
|
//! 2. Encode to protobuf binary format
|
|
//! 3. Compress with Snappy compression
|
|
//! 4. POST to /api/v1/write endpoint
|
|
//!
|
|
//! # Expected Output
|
|
//!
|
|
//! ```text
|
|
//! Pushing metrics to http://127.0.0.1:9101/api/v1/write...
|
|
//! Response status: 204 No Content
|
|
//! Successfully pushed 3 samples across 2 time series
|
|
//! ```
|
|
|
|
use nightlight_api::prometheus::{Label, Sample, TimeSeries, WriteRequest};
|
|
use prost::Message;
|
|
use snap::raw::Encoder as SnappyEncoder;
|
|
use std::time::{SystemTime, UNIX_EPOCH};
|
|
|
|
#[tokio::main]
|
|
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|
// Create HTTP client
|
|
let client = reqwest::Client::new();
|
|
|
|
// Server URL (default Nightlight HTTP address)
|
|
let url = "http://127.0.0.1:9101/api/v1/write";
|
|
|
|
println!("Pushing metrics to {}...", url);
|
|
|
|
// Get current timestamp in milliseconds
|
|
let now = SystemTime::now()
|
|
.duration_since(UNIX_EPOCH)?
|
|
.as_millis() as i64;
|
|
|
|
// Create WriteRequest with sample metrics
|
|
let write_request = WriteRequest {
|
|
timeseries: vec![
|
|
// Example 1: HTTP request counter
|
|
TimeSeries {
|
|
labels: vec![
|
|
Label {
|
|
name: "__name__".to_string(),
|
|
value: "http_requests_total".to_string(),
|
|
},
|
|
Label {
|
|
name: "job".to_string(),
|
|
value: "example_app".to_string(),
|
|
},
|
|
Label {
|
|
name: "method".to_string(),
|
|
value: "GET".to_string(),
|
|
},
|
|
Label {
|
|
name: "status".to_string(),
|
|
value: "200".to_string(),
|
|
},
|
|
],
|
|
samples: vec![
|
|
Sample {
|
|
value: 1234.0,
|
|
timestamp: now,
|
|
},
|
|
],
|
|
},
|
|
// Example 2: Request duration histogram
|
|
TimeSeries {
|
|
labels: vec![
|
|
Label {
|
|
name: "__name__".to_string(),
|
|
value: "http_request_duration_seconds".to_string(),
|
|
},
|
|
Label {
|
|
name: "job".to_string(),
|
|
value: "example_app".to_string(),
|
|
},
|
|
Label {
|
|
name: "method".to_string(),
|
|
value: "GET".to_string(),
|
|
},
|
|
],
|
|
samples: vec![
|
|
Sample {
|
|
value: 0.042,
|
|
timestamp: now,
|
|
},
|
|
Sample {
|
|
value: 0.055,
|
|
timestamp: now + 1000, // 1 second later
|
|
},
|
|
],
|
|
},
|
|
],
|
|
};
|
|
|
|
// Count total samples
|
|
let total_samples: usize = write_request
|
|
.timeseries
|
|
.iter()
|
|
.map(|ts| ts.samples.len())
|
|
.sum();
|
|
|
|
// Encode to protobuf
|
|
let mut buf = Vec::new();
|
|
write_request.encode(&mut buf)?;
|
|
println!("Encoded {} bytes of protobuf data", buf.len());
|
|
|
|
// Compress with snappy
|
|
let mut encoder = SnappyEncoder::new();
|
|
let compressed = encoder.compress_vec(&buf)?;
|
|
println!("Compressed to {} bytes with Snappy", compressed.len());
|
|
|
|
// Send to Nightlight
|
|
let response = client
|
|
.post(url)
|
|
.header("Content-Type", "application/x-protobuf")
|
|
.header("Content-Encoding", "snappy")
|
|
.body(compressed)
|
|
.send()
|
|
.await?;
|
|
|
|
println!("Response status: {}", response.status());
|
|
|
|
if response.status().is_success() {
|
|
println!(
|
|
"Successfully pushed {} samples across {} time series",
|
|
total_samples,
|
|
write_request.timeseries.len()
|
|
);
|
|
} else {
|
|
let error_text = response.text().await?;
|
|
eprintln!("Error response: {}", error_text);
|
|
std::process::exit(1);
|
|
}
|
|
|
|
Ok(())
|
|
}
|