#!/usr/bin/env python3 import argparse import struct import sys import time import urllib.request import snappy def encode_varint(value: int) -> bytes: if value < 0: raise ValueError("varint encoder only supports non-negative values") out = bytearray() while True: chunk = value & 0x7F value >>= 7 if value: out.append(chunk | 0x80) else: out.append(chunk) return bytes(out) def field_key(field_number: int, wire_type: int) -> bytes: return encode_varint((field_number << 3) | wire_type) def encode_length_delimited(field_number: int, payload: bytes) -> bytes: return field_key(field_number, 2) + encode_varint(len(payload)) + payload def encode_string(field_number: int, value: str) -> bytes: return encode_length_delimited(field_number, value.encode("utf-8")) def encode_double(field_number: int, value: float) -> bytes: return field_key(field_number, 1) + struct.pack(" bytes: return field_key(field_number, 0) + encode_varint(value) def encode_label(name: str, value: str) -> bytes: return encode_string(1, name) + encode_string(2, value) def encode_sample(value: float, timestamp_ms: int) -> bytes: return encode_double(1, value) + encode_int64(2, timestamp_ms) def encode_timeseries(labels, samples) -> bytes: payload = bytearray() for name, value in labels: payload.extend(encode_length_delimited(1, encode_label(name, value))) for sample_value, timestamp_ms in samples: payload.extend(encode_length_delimited(2, encode_sample(sample_value, timestamp_ms))) return bytes(payload) def encode_write_request(timeseries_list) -> bytes: payload = bytearray() for labels, samples in timeseries_list: payload.extend(encode_length_delimited(1, encode_timeseries(labels, samples))) return bytes(payload) def parse_label(text: str): if "=" not in text: raise ValueError(f"invalid label {text!r}: expected name=value") name, value = text.split("=", 1) if not name: raise ValueError(f"invalid label {text!r}: empty label name") return name, value def main() -> int: parser = argparse.ArgumentParser(description="Send a NightLight remote_write sample") parser.add_argument("--url", required=True) parser.add_argument("--metric", required=True) parser.add_argument("--value", required=True, type=float) parser.add_argument("--timestamp-ms", type=int, default=int(time.time() * 1000)) parser.add_argument("--label", action="append", default=[]) args = parser.parse_args() labels = [("__name__", args.metric)] labels.extend(parse_label(item) for item in args.label) protobuf_payload = encode_write_request( [(labels, [(args.value, args.timestamp_ms)])] ) compressed = snappy.compress(protobuf_payload) request = urllib.request.Request( args.url, data=compressed, method="POST", headers={ "Content-Type": "application/x-protobuf", "Content-Encoding": "snappy", "X-Prometheus-Remote-Write-Version": "0.1.0", }, ) with urllib.request.urlopen(request, timeout=15) as response: if response.status not in (200, 204): raise RuntimeError(f"unexpected HTTP status {response.status}") return 0 if __name__ == "__main__": try: raise SystemExit(main()) except Exception as exc: print(f"nightlight remote_write failed: {exc}", file=sys.stderr) raise