Skip to main content

object_storage_proxy/utils/
metrics.rs

1//! Prometheus metrics registry and collection helpers.
2//!
3//! Enabled only when the `metrics` Cargo feature is active.
4//! Exposes a global [`Registry`] with the following instruments:
5//!
6//! | Name | Type | Labels | Description |
7//! |---|---|---|---|
8//! | `osp_requests_total` | Counter | method, bucket, status | Total requests |
9//! | `osp_request_errors_total` | Counter | method, bucket, error | Total errors |
10//! | `osp_active_connections` | Gauge | — | Current open connections |
11//! | `osp_request_duration_seconds` | Histogram | method, bucket | Latency |
12//! | `osp_transfer_bytes_total` | Counter | direction (rx/tx), bucket | Bytes transferred |
13//! | `osp_presigned_url_hits_total` | Counter | bucket | Presigned URL hits |
14//! | `osp_memory_bytes` | Gauge | — | RSS memory usage |
15//! | `osp_build_info` | Gauge | version, rustc | Static build metadata |
16
17use once_cell::sync::Lazy;
18use prometheus::{
19    Encoder, Gauge, GaugeVec, HistogramOpts, HistogramVec, IntCounterVec, IntGauge, Opts, Registry,
20    TextEncoder,
21};
22
23/// The global Prometheus registry for all OSP metrics.
24pub static REGISTRY: Lazy<Registry> = Lazy::new(Registry::new);
25
26// ── Counters ──────────────────────────────────────────────────────────────────
27
28/// Total number of proxied requests, labelled by HTTP method, bucket and
29/// upstream response status code.
30pub static REQUESTS_TOTAL: Lazy<IntCounterVec> = Lazy::new(|| {
31    let opts = Opts::new("osp_requests_total", "Total proxied requests").namespace("osp");
32    let counter = IntCounterVec::new(opts, &["method", "bucket", "status"])
33        .expect("osp_requests_total metric created");
34    REGISTRY
35        .register(Box::new(counter.clone()))
36        .expect("register");
37    counter
38});
39
40/// Total number of requests that resulted in an error (4xx/5xx or internal).
41pub static REQUEST_ERRORS_TOTAL: Lazy<IntCounterVec> = Lazy::new(|| {
42    let opts = Opts::new("osp_request_errors_total", "Total request errors").namespace("osp");
43    let counter = IntCounterVec::new(opts, &["method", "bucket", "error"])
44        .expect("osp_request_errors_total metric created");
45    REGISTRY
46        .register(Box::new(counter.clone()))
47        .expect("register");
48    counter
49});
50
51/// Bytes transferred, labelled by direction (`rx` = client->proxy,
52/// `tx` = proxy->client) and bucket.
53pub static TRANSFER_BYTES_TOTAL: Lazy<IntCounterVec> = Lazy::new(|| {
54    let opts = Opts::new("osp_transfer_bytes_total", "Total bytes transferred").namespace("osp");
55    let counter = IntCounterVec::new(opts, &["direction", "bucket"])
56        .expect("osp_transfer_bytes_total metric created");
57    REGISTRY
58        .register(Box::new(counter.clone()))
59        .expect("register");
60    counter
61});
62
63/// Total presigned-URL hits per bucket.
64pub static PRESIGNED_URL_HITS_TOTAL: Lazy<IntCounterVec> = Lazy::new(|| {
65    let opts = Opts::new(
66        "osp_presigned_url_hits_total",
67        "Total presigned URL requests",
68    )
69    .namespace("osp");
70    let counter =
71        IntCounterVec::new(opts, &["bucket"]).expect("osp_presigned_url_hits_total metric created");
72    REGISTRY
73        .register(Box::new(counter.clone()))
74        .expect("register");
75    counter
76});
77
78/// Total presigned-URL rejections (usage limit exceeded).
79pub static PRESIGNED_URL_REJECTED_TOTAL: Lazy<IntCounterVec> = Lazy::new(|| {
80    let opts = Opts::new(
81        "osp_presigned_url_rejected_total",
82        "Presigned URL requests rejected due to usage limit",
83    )
84    .namespace("osp");
85    let counter = IntCounterVec::new(opts, &["bucket"])
86        .expect("osp_presigned_url_rejected_total metric created");
87    REGISTRY
88        .register(Box::new(counter.clone()))
89        .expect("register");
90    counter
91});
92
93// ── Gauges ────────────────────────────────────────────────────────────────────
94
95/// Number of currently active (in-flight) connections.
96pub static ACTIVE_CONNECTIONS: Lazy<IntGauge> = Lazy::new(|| {
97    let opts = Opts::new("osp_active_connections", "Current active connections").namespace("osp");
98    let gauge = IntGauge::with_opts(opts).expect("osp_active_connections metric created");
99    REGISTRY
100        .register(Box::new(gauge.clone()))
101        .expect("register");
102    gauge
103});
104
105/// Resident Set Size in bytes (sampled at scrape time via [`update_memory_gauge`]).
106pub static MEMORY_BYTES: Lazy<Gauge> = Lazy::new(|| {
107    let opts = Opts::new("osp_memory_bytes", "Resident set size in bytes").namespace("osp");
108    let gauge = Gauge::with_opts(opts).expect("osp_memory_bytes metric created");
109    REGISTRY
110        .register(Box::new(gauge.clone()))
111        .expect("register");
112    gauge
113});
114
115/// Static build-info gauge — always 1, used to expose version labels.
116pub static BUILD_INFO: Lazy<GaugeVec> = Lazy::new(|| {
117    let opts = Opts::new("osp_build_info", "Static build metadata (always 1)").namespace("osp");
118    let gauge = GaugeVec::new(opts, &["version", "rustc"]).expect("osp_build_info metric created");
119    REGISTRY
120        .register(Box::new(gauge.clone()))
121        .expect("register");
122    gauge
123});
124
125// ── Histograms ────────────────────────────────────────────────────────────────
126
127/// End-to-end request latency in seconds, labelled by method and bucket.
128pub static REQUEST_DURATION_SECONDS: Lazy<HistogramVec> = Lazy::new(|| {
129    let buckets = vec![
130        0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
131    ];
132    let opts = HistogramOpts::new(
133        "osp_request_duration_seconds",
134        "End-to-end request latency in seconds",
135    )
136    .namespace("osp")
137    .buckets(buckets);
138    let hist = HistogramVec::new(opts, &["method", "bucket"])
139        .expect("osp_request_duration_seconds metric created");
140    REGISTRY.register(Box::new(hist.clone())).expect("register");
141    hist
142});
143
144/// Response body size histogram (bytes), labelled by method and bucket.
145pub static RESPONSE_SIZE_BYTES: Lazy<HistogramVec> = Lazy::new(|| {
146    let buckets = prometheus::exponential_buckets(1024.0, 4.0, 10).expect("valid bucket spec");
147    let opts = HistogramOpts::new("osp_response_size_bytes", "Response body size in bytes")
148        .namespace("osp")
149        .buckets(buckets);
150    let hist = HistogramVec::new(opts, &["method", "bucket"])
151        .expect("osp_response_size_bytes metric created");
152    REGISTRY.register(Box::new(hist.clone())).expect("register");
153    hist
154});
155
156// ── Init ──────────────────────────────────────────────────────────────────────
157
158/// Force all `Lazy` statics to initialise and record the build-info gauge.
159///
160/// Call this once from [`crate::run_server`] before the server starts accepting
161/// connections.
162pub fn init_metrics() {
163    // Touch each lazy to ensure it is registered.
164    Lazy::force(&REQUESTS_TOTAL);
165    Lazy::force(&REQUEST_ERRORS_TOTAL);
166    Lazy::force(&TRANSFER_BYTES_TOTAL);
167    Lazy::force(&PRESIGNED_URL_HITS_TOTAL);
168    Lazy::force(&PRESIGNED_URL_REJECTED_TOTAL);
169    Lazy::force(&ACTIVE_CONNECTIONS);
170    Lazy::force(&MEMORY_BYTES);
171    Lazy::force(&BUILD_INFO);
172    Lazy::force(&REQUEST_DURATION_SECONDS);
173    Lazy::force(&RESPONSE_SIZE_BYTES);
174
175    BUILD_INFO
176        .with_label_values(&[env!("CARGO_PKG_VERSION"), "stable"])
177        .set(1.0);
178}
179
180// ── Memory sampling ───────────────────────────────────────────────────────────
181
182/// Read `/proc/self/status` on Linux to obtain RSS; no-op on other platforms.
183pub fn update_memory_gauge() {
184    #[cfg(target_os = "linux")]
185    {
186        if let Ok(status) = std::fs::read_to_string("/proc/self/status") {
187            for line in status.lines() {
188                if line.starts_with("VmRSS:") {
189                    if let Some(kb) = line
190                        .split_whitespace()
191                        .nth(1)
192                        .and_then(|v| v.parse::<f64>().ok())
193                    {
194                        MEMORY_BYTES.set(kb * 1024.0);
195                    }
196                    break;
197                }
198            }
199        }
200    }
201}
202
203// ── Scrape endpoint ───────────────────────────────────────────────────────────
204
205/// Encode the global registry to the Prometheus text format.
206pub fn gather_metrics() -> String {
207    update_memory_gauge();
208    let encoder = TextEncoder::new();
209    let mut buf = Vec::new();
210    encoder
211        .encode(&REGISTRY.gather(), &mut buf)
212        .expect("metric encode");
213    String::from_utf8(buf).unwrap_or_default()
214}
215
216/// Spawn a minimal Tokio HTTP server on `port` that serves `/metrics`.
217///
218/// The server runs in a background task and never blocks the Pingora thread.
219pub async fn serve_metrics(port: u16) {
220    use tokio::io::{AsyncReadExt, AsyncWriteExt};
221    use tokio::net::TcpListener;
222
223    let listener = TcpListener::bind(format!("0.0.0.0:{}", port))
224        .await
225        .expect("failed to bind metrics port");
226
227    tracing::info!(port, "Prometheus metrics endpoint listening");
228
229    loop {
230        let Ok((mut stream, _addr)) = listener.accept().await else {
231            continue;
232        };
233
234        tokio::spawn(async move {
235            let mut buf = [0u8; 4096];
236            // Read just enough to identify the request path.
237            let _ = stream.read(&mut buf).await;
238            let req = String::from_utf8_lossy(&buf);
239
240            let (status, body) = if req.contains("GET /metrics") {
241                ("200 OK", gather_metrics())
242            } else {
243                ("404 Not Found", String::from("not found\n"))
244            };
245
246            let response = format!(
247                "HTTP/1.1 {}\r\nContent-Type: text/plain; version=0.0.4\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
248                status,
249                body.len(),
250                body
251            );
252            let _ = stream.write_all(response.as_bytes()).await;
253        });
254    }
255}