Skip to main content

object_storage_proxy/
lib.rs

1//! # object-storage-proxy
2//!
3//! A fast, in-process reverse proxy for **AWS S3**, **IBM Cloud Object Storage (COS)** and other S3-compatible object storage services,
4//! with a Python interface for custom authentication and credential management.
5//!
6//! The proxy is built on top of [Pingora](https://github.com/cloudflare/pingora) and exposed
7//! to Python via [PyO3](https://pyo3.rs). It handles:
8//!
9//! * **AWS Signature Version 4** re-signing — incoming requests are validated and
10//!   then re-signed with backend credentials before being forwarded.
11//! * **Presigned URL enforcement** — optional per-URL usage limits prevent replay abuse.
12//! * **IBM IAM bearer-token exchange** — API keys are automatically exchanged for
13//!   short-lived IAM tokens and cached.
14//! * **Pluggable Python callbacks** — supply an async validator and/or a credential
15//!   fetcher callable from Python to integrate with any auth backend.
16//!
17//! ## Quick start (Python)
18//!
19//! ```python
20//! from object_storage_proxy import ProxyServerConfig, start_server
21//!
22//! config = ProxyServerConfig(
23//!     cos_map={
24//!         "my-bucket": {
25//!             "host": "s3.eu-west-3.amazonaws.com",
26//!             "port": 443,
27//!             "access_key": "AKIAIOSFODNN7EXAMPLE",
28//!             "secret_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
29//!             "region": "eu-west-3",
30//!         }
31//!     },
32//!     http_port=6190,
33//! )
34//! start_server(config)
35//! ```
36
37#![warn(clippy::all)]
38use async_trait::async_trait;
39use bytes::BytesMut;
40use credentials::signer::{
41    self, resign_streaming_request, signature_is_valid_for_presigned,
42    signature_is_valid_for_request,
43};
44use dashmap::DashMap;
45use dotenv::dotenv;
46use http::uri::Authority;
47use http::{StatusCode, Uri};
48use parsers::cos_map::{CosMapItem, parse_cos_map};
49use parsers::keystore::parse_hmac_list;
50use pingora::Result;
51use pingora::http::ResponseHeader;
52use pingora::proxy::{ProxyHttp, Session};
53use pingora::server::Server;
54use pingora::upstreams::peer::HttpPeer;
55use pyo3::prelude::*;
56use pyo3::types::{PyModule, PyModuleMethods};
57use pyo3::{Bound, PyResult, Python, pyclass, pyfunction, pymodule, wrap_pyfunction};
58use std::sync::{
59    Arc,
60    atomic::{AtomicBool, AtomicUsize, Ordering},
61};
62
63// use utils::functions::inspect_callable_signature;
64
65use std::collections::HashMap;
66use std::fmt::Debug;
67
68use std::time::Duration;
69use tokio::sync::RwLock;
70use tracing::{debug, error, info, warn};
71use tracing_subscriber::EnvFilter;
72use tracing_subscriber::fmt::time::ChronoLocal;
73
74pub mod parsers;
75use parsers::credentials::{parse_presigned_params, parse_token_from_header};
76use parsers::path::{parse_path, parse_query};
77
78pub mod credentials;
79use credentials::{
80    secrets_proxy::{SecretsCache, get_bearer, get_credential_for_bucket},
81    signer::sign_request,
82};
83
84pub mod utils;
85use utils::banner::print_banner;
86use utils::response::write_error_response_with_header;
87use utils::validator::{AuthCache, validate_request};
88
89static REQ_COUNTER: AtomicUsize = AtomicUsize::new(0);
90static REQ_COUNTER_ENABLED: AtomicBool = AtomicBool::new(false);
91const DEFAULT_SERVER_NAME: &str = "<osp⚡>";
92
93/// Thread-safe hit counter for presigned URLs.
94///
95/// Tracks how many times each presigned URL has been used so that a configurable
96/// maximum can be enforced.  Regular (re-signed) requests are **not** tracked —
97/// the aws-cli issues parallel range-GET sub-requests for the same object, which
98/// would exhaust a small limit instantly.
99///
100/// Internally backed by a [`DashMap`] so that concurrent access from multiple
101/// Pingora worker threads never requires a global lock.
102#[derive(Clone)]
103pub struct UrlTracker {
104    /// Per-URL hit counters.
105    pub counts: Arc<DashMap<String, usize>>,
106}
107
108impl Default for UrlTracker {
109    fn default() -> Self {
110        Self::new()
111    }
112}
113
114impl UrlTracker {
115    /// Create a new, empty tracker.
116    pub fn new() -> Self {
117        UrlTracker {
118            counts: Arc::new(DashMap::new()),
119        }
120    }
121
122    /// Increment the hit counter for `url` by one.
123    pub fn track(&self, url: &str) {
124        let mut entry = self.counts.entry(url.to_string()).or_insert(0);
125        *entry += 1;
126        debug!(url, count = *entry, "tracking presigned URL");
127    }
128
129    /// Return the current hit count for `url`, or `None` if it has never been tracked.
130    pub fn get(&self, url: &str) -> Option<usize> {
131        self.counts.get(url).map(|v| *v)
132    }
133
134    /// Return a snapshot of all tracked URLs and their counts.
135    pub fn get_all(&self) -> Vec<(String, usize)> {
136        self.counts
137            .iter()
138            .map(|e| (e.key().clone(), *e.value()))
139            .collect()
140    }
141}
142
143/// Configuration object for :pyfunc:`object_storage_proxy.start_server`.
144///
145/// Parameters
146/// ----------
147/// cos_map:
148///    A dictionary mapping bucket names to their respective COS configuration.
149///   Each entry should contain the following
150///   keys:
151///   - host: The COS endpoint (e.g., "s3.eu-de.cloud-object-storage.appdomain.cloud")
152///   - port: The port number (e.g., 443)
153///   - api_key/apikey: The API key for the bucket (optional)
154///   - ttl/time-to-live: The time-to-live for the API key in seconds (optional)
155///
156/// bucket_creds_fetcher:
157///     Optional Python async callable that fetches the API key for a bucket.
158///     The callable should accept a single argument, the bucket name.
159///     It should return a string containing the API key.
160/// http_port:
161///     The HTTP port to listen on.
162/// https_port:
163///     The HTTPS port to listen on.
164/// validator:
165///     Optional Python async callable that validates the request.
166///     The callable should accept two arguments, the token and the bucket name.
167///     It should return a boolean indicating whether the request is valid.
168/// threads:
169///     Optional number of threads to use for the server.
170///     If not specified, the server will use a single thread.
171///
172#[pyclass]
173#[pyo3(name = "ProxyServerConfig")]
174#[derive(Debug)]
175pub struct ProxyServerConfig {
176    #[pyo3(get, set)]
177    pub bucket_creds_fetcher: Option<Py<PyAny>>,
178
179    #[pyo3(get, set)]
180    pub cos_map: PyObject,
181
182    #[pyo3(get, set)]
183    pub http_port: Option<u16>,
184
185    #[pyo3(get, set)]
186    pub https_port: Option<u16>,
187
188    #[pyo3(get, set)]
189    pub validator: Option<Py<PyAny>>,
190
191    #[pyo3(get, set)]
192    pub threads: Option<usize>,
193
194    #[pyo3(get, set)]
195    pub verify: Option<bool>,
196
197    #[pyo3(get, set)]
198    pub hmac_keystore: PyObject,
199
200    #[pyo3(get, set)]
201    pub skip_signature_validation: Option<bool>,
202
203    #[pyo3(get, set)]
204    pub hmac_fetcher: Option<Py<PyAny>>,
205
206    #[pyo3(get, set)]
207    pub max_presign_url_usage_attempts: Option<usize>,
208
209    #[pyo3(get, set)]
210    pub server_name: String,
211}
212
213impl Default for ProxyServerConfig {
214    fn default() -> Self {
215        ProxyServerConfig {
216            cos_map: Python::with_gil(|py| py.None()),
217            bucket_creds_fetcher: None,
218            http_port: None,
219            https_port: None,
220            validator: None,
221            threads: Some(1),
222            verify: None,
223            hmac_keystore: Python::with_gil(|py| py.None()),
224            skip_signature_validation: Some(false),
225            hmac_fetcher: None,
226            max_presign_url_usage_attempts: Some(3),
227            server_name: "<osp⚡>".to_string(),
228        }
229    }
230}
231
232#[pymethods]
233impl ProxyServerConfig {
234    #[new]
235    #[pyo3(
236        signature = (
237            cos_map,
238            hmac_keystore = None,
239            bucket_creds_fetcher = None,
240            http_port = None,
241            https_port = None,
242            validator = None,
243            threads = Some(1),
244            verify = None,
245            skip_signature_validation = Some(false),
246            hmac_fetcher = None,
247            max_presign_url_usage_attempts = Some(3),
248            server_name = "<osp⚡>".to_string(),
249        )
250    )]
251    #[allow(clippy::too_many_arguments)]
252    pub fn new(
253        cos_map: PyObject,
254        hmac_keystore: Option<PyObject>,
255        bucket_creds_fetcher: Option<PyObject>,
256        http_port: Option<u16>,
257        https_port: Option<u16>,
258        validator: Option<PyObject>,
259        threads: Option<usize>,
260        verify: Option<bool>,
261        skip_signature_validation: Option<bool>,
262        hmac_fetcher: Option<PyObject>,
263        max_presign_url_usage_attempts: Option<usize>,
264        server_name: String,
265    ) -> Self {
266        ProxyServerConfig {
267            cos_map,
268            hmac_keystore: hmac_keystore.unwrap_or_else(|| Python::with_gil(|py| py.None())),
269            bucket_creds_fetcher,
270            http_port,
271            https_port,
272            validator,
273            threads,
274            verify,
275            skip_signature_validation,
276            hmac_fetcher,
277            max_presign_url_usage_attempts,
278            server_name,
279        }
280    }
281
282    fn __repr__(&self) -> PyResult<String> {
283        Ok(format!(
284            "ProxyServerConfig(http_port={}, https_port={}, threads={:?})",
285            self.http_port.unwrap_or(0),
286            self.https_port.unwrap_or(0),
287            self.threads
288        ))
289    }
290}
291
292/// The core Pingora proxy handler.
293///
294/// One instance is created per server and shared (via [`Arc`]) across all worker
295/// threads.  It implements [`ProxyHttp`] and drives the full request lifecycle:
296/// signature validation → authorization → credential injection → upstream routing.
297pub struct MyProxy {
298    cos_endpoint: String,
299    cos_mapping: Arc<RwLock<HashMap<String, CosMapItem>>>,
300    hmac_keystore: Arc<RwLock<HashMap<String, String>>>,
301    secrets_cache: SecretsCache,
302    auth_cache: AuthCache,
303    validator: Option<PyObject>,
304    bucket_creds_fetcher: Option<PyObject>,
305    verify: Option<bool>,
306    skip_signature_validation: Option<bool>,
307    hmac_fetcher: Option<PyObject>,
308    tracker: UrlTracker,
309    max_presign_url_usage_attempts: Option<usize>,
310    #[allow(dead_code)]
311    server_name: String,
312}
313
314/// Per-request context threaded through the Pingora middleware chain.
315///
316/// A fresh `MyCtx` is created by [`MyProxy::new_ctx`] for every incoming
317/// connection and is discarded when the request completes.
318pub struct MyCtx {
319    cos_mapping: Arc<RwLock<HashMap<String, CosMapItem>>>,
320    hmac_keystore: Arc<RwLock<HashMap<String, String>>>,
321    secrets_cache: SecretsCache,
322    auth_cache: AuthCache,
323    validator: Option<PyObject>,
324    bucket_creds_fetcher: Option<PyObject>,
325    hmac_fetcher: Option<PyObject>,
326    is_presigned: Option<bool>,
327    stream_state: Option<signer::StreamingState>,
328}
329
330// impl MyCtx {
331//     fn streaming(&mut self) -> &mut signer::StreamingState {
332//         self.stream_state.as_mut().expect("stream_state not initialised")
333//     }
334// }
335
336#[async_trait]
337impl ProxyHttp for MyProxy {
338    type CTX = MyCtx;
339    fn new_ctx(&self) -> Self::CTX {
340        MyCtx {
341            cos_mapping: Arc::clone(&self.cos_mapping),
342            hmac_keystore: Arc::clone(&self.hmac_keystore),
343            secrets_cache: self.secrets_cache.clone(),
344            auth_cache: self.auth_cache.clone(),
345            validator: self
346                .validator
347                .as_ref()
348                .map(|v| Python::with_gil(|py| v.clone_ref(py))),
349            bucket_creds_fetcher: self
350                .bucket_creds_fetcher
351                .as_ref()
352                .map(|v| Python::with_gil(|py| v.clone_ref(py))),
353            hmac_fetcher: self
354                .hmac_fetcher
355                .as_ref()
356                .map(|v| Python::with_gil(|py| v.clone_ref(py))),
357            is_presigned: None,
358            stream_state: None,
359        }
360    }
361
362    async fn upstream_peer(
363        &self,
364        session: &mut Session,
365        ctx: &mut Self::CTX,
366    ) -> Result<Box<HttpPeer>> {
367        debug!("upstream_peer::start");
368        if REQ_COUNTER_ENABLED.load(Ordering::Relaxed) {
369            let new_val = REQ_COUNTER.fetch_add(1, Ordering::Relaxed) + 1;
370            debug!("Request count: {}", new_val);
371        }
372
373        let path = session.req_header().uri.path();
374
375        let parse_path_result = parse_path(path);
376        if parse_path_result.is_err() {
377            error!("Failed to parse path: {:?}", parse_path_result);
378            return Err(pingora::Error::new_str("Failed to parse path"));
379        }
380
381        let (_, (bucket, _)) = parse_path_result.expect("checked above");
382
383        let hdr_bucket = bucket.to_owned();
384
385        let bucket_config = {
386            let map = ctx.cos_mapping.read().await;
387            map.get(&hdr_bucket).cloned()
388        };
389
390        let addressing_style = bucket_config
391            .clone()
392            .and_then(|config| config.addressing_style)
393            .unwrap_or("virtual".to_string());
394
395        let endpoint = match bucket_config.clone() {
396            Some(config) => {
397                if addressing_style == "path" {
398                    config.host.to_owned()
399                } else {
400                    format!("{}.{}", bucket, config.host)
401                }
402            }
403            None => {
404                format!("{}.{}", bucket, self.cos_endpoint)
405            }
406        };
407
408        let port = bucket_config
409            .clone()
410            .map(|config| config.port)
411            .unwrap_or(443);
412
413        let addr = (endpoint.clone(), port);
414
415        let endpoint_is_tls = bucket_config.and_then(|config| config.tls).unwrap_or(true);
416
417        debug!(endpoint_is_tls, endpoint, "resolved upstream peer");
418
419        let mut peer = Box::new(HttpPeer::new(addr, endpoint_is_tls, endpoint.clone()));
420        debug!(?peer, "upstream peer created");
421
422        // todo: make ths configurable
423
424        peer.options.max_h2_streams = 128;
425        peer.options.h2_ping_interval = Some(Duration::from_secs(30));
426
427        // peer.options.idle_timeout          = Some(Duration::from_secs(300));
428        // peer.options.connection_timeout    = Some(Duration::from_secs(30));
429        // peer.options.read_timeout          = Some(Duration::from_secs(300));
430        // peer.options.write_timeout         = Some(Duration::from_secs(300));
431
432        debug!("peer: {:#?}", &peer);
433
434        if let Some(verify) = self.verify {
435            info!("Verify peer (upstream) certificates disabled!");
436            peer.options.verify_cert = verify;
437            peer.options.verify_hostname = verify;
438        } else {
439            peer.options.verify_cert = true;
440        }
441
442        debug!("peer: {:#?}", &peer);
443
444        debug!("upstream_peer::end");
445        Ok(peer)
446    }
447
448    async fn request_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<bool> {
449        debug!("request_filter::start");
450
451        // Tracking the request count for presigned URLs only.
452        // Regular (re-signed) requests must not be counted — aws-cli issues multiple
453        // parallel range-GET requests for the same object (multipart download), so
454        // counting every request would exhaust the limit almost immediately.
455        let url = session.req_header().uri.to_string();
456        let path = session.req_header().uri.path().to_string();
457        let is_presigned_url = session
458            .req_header()
459            .uri
460            .query()
461            .is_some_and(|q| q.contains("X-Amz-Signature"));
462        if is_presigned_url {
463            self.tracker.track(&url);
464        }
465        let tracked_count = self.tracker.get(&url).unwrap_or(0);
466        if is_presigned_url && tracked_count > self.max_presign_url_usage_attempts.unwrap_or(3) {
467            warn!(
468                url,
469                tracked_count,
470                max = self.max_presign_url_usage_attempts.unwrap_or(3),
471                "presigned URL usage limit exceeded, denying"
472            );
473            let msg = format!(
474                "URL ({}) has been tracked too many times: {} (max={}).  Access Denied!",
475                path,
476                tracked_count,
477                self.max_presign_url_usage_attempts.unwrap_or(3)
478            );
479
480            // let mut hdr = ResponseHeader::build(StatusCode::FORBIDDEN, Some(msg.len()))?;
481            // hdr.insert_header("content-type", "text/plain")?;
482            // hdr.insert_header("server", self.server_name.clone())?;
483            // hdr.insert_header("x-content-type-options", "nosniff")?;
484
485            // // Send it
486            // session.write_response_header(Box::new(hdr), false).await?;
487            // // session
488            // //     .write_response_body(Some(msg.into()), true)
489            // //     .await?;
490
491            // session.respond_error_with_body(403, msg.into()).await?;
492            write_error_response_with_header(session, StatusCode::FORBIDDEN, msg).await?;
493            return Ok(true);
494        }
495
496        debug!(summary = ?session.request_summary(), "request summary");
497        debug!(uri = ?session.req_header().uri, "incoming request URI");
498
499        let request_query = session.req_header().uri.query().unwrap_or("");
500        info!("request path: {}", session.req_header().uri.path());
501        info!("request query: {}", request_query);
502        info!("request method : {}", session.req_header().method);
503
504        let parsed_query_result = parse_query(request_query);
505
506        if parsed_query_result.is_err() {
507            error!("Failed to parse query: {:?}", parsed_query_result);
508            return Err(pingora::Error::new_str("Failed to parse query"));
509        }
510        let (rest, mut query_dict) = parsed_query_result.expect("checked above");
511        if rest.is_empty() {
512            info!("Parsed query: {:#?}", query_dict);
513        } else {
514            error!("Failed to parse query: {}", rest);
515        }
516
517        query_dict.insert(
518            "method".to_string(),
519            session.req_header().method.to_string(),
520        );
521        query_dict.insert(
522            "path".to_string(),
523            session.req_header().uri.path().to_string(),
524        );
525        // insert source
526        query_dict.insert(
527            "source".to_string(),
528            session
529                .req_header()
530                .headers
531                .get("x-forwarded-for")
532                .and_then(|h| h.to_str().ok())
533                .unwrap_or_default()
534                .to_string(),
535        );
536
537        info!("---> Parsed query: {:#?}", query_dict);
538
539        if session
540            .req_header()
541            .headers
542            .get("expect")
543            .map(|v| {
544                v.to_str()
545                    .unwrap_or("")
546                    .eq_ignore_ascii_case("100-continue")
547            })
548            .unwrap_or(false)
549        {
550            return Ok(false);
551        };
552
553        let path = session.req_header().uri.path().to_owned();
554
555        let parse_path_result = parse_path(&path);
556        if parse_path_result.is_err() {
557            error!("Failed to parse path: {:?}", parse_path_result);
558            return Err(pingora::Error::new_str("Failed to parse path"));
559        }
560
561        let (_, (bucket, _uri_path)) = parse_path_result.expect("checked above");
562
563        let hdr_bucket = bucket.to_owned();
564
565        let auth_header = session
566            .req_header()
567            .headers
568            .get("authorization")
569            .and_then(|h| h.to_str().ok())
570            .map(ToString::to_string)
571            .unwrap_or_default();
572
573        let ttl = {
574            let map = ctx.cos_mapping.read().await;
575            map.get(bucket).and_then(|c| c.ttl).unwrap_or(0)
576        };
577        let mut access_key: String = String::new();
578
579        if auth_header.is_empty() {
580            if let Some(q) = session.req_header().uri.query()
581                && q.contains("X-Amz-Credential")
582            {
583                let (_, p) = parse_presigned_params(&format!("?{q}"))
584                    .map_err(|_| pingora::Error::new_str("Failed to parse presigned params"))?;
585                access_key = p.access_key.clone();
586            }
587        } else {
588            access_key = parse_token_from_header(&auth_header)
589                .map_err(|_| pingora::Error::new_str("Failed to parse access_key"))?
590                .1
591                .to_string();
592        }
593
594        let is_authorized = if let Some(py_cb) = &ctx.validator {
595            let is_multipart = session
596                .req_header()
597                .uri
598                .query()
599                .is_some_and(|q| q.contains("uploadId="));
600
601            info!("CHECKING SIGNATURE");
602            if let Some(skip) = self.skip_signature_validation {
603                if skip || is_multipart {
604                    info!("Skipping local signature check");
605                    // continue
606                } else {
607                    // presigned
608                    info!("Checking presigned signature");
609                    let uri_q = session.req_header().uri.query().unwrap_or("");
610
611                    if auth_header.is_empty() && uri_q.contains("X-Amz-Signature") {
612                        ctx.is_presigned = Some(true);
613
614                        // ensure we have the secret_key in the keystore
615                        if !ctx.hmac_keystore.read().await.contains_key(&access_key) {
616                            debug!(
617                                "No key in keystore, trying to fetch via hmac_fetcher for ->{}<-",
618                                access_key
619                            );
620                            // fetch via hmac_fetcher exactly as you do below…
621                            if let Some(py_fetcher) = &ctx.hmac_fetcher {
622                                // call Python callback
623                                let cb = py_fetcher;
624                                let secret: PyResult<String> = Python::with_gil(|py| {
625                                    cb.call1(py, (&access_key,)).and_then(|r| r.extract(py))
626                                });
627                                debug!("Got secret: {:#?}", secret);
628                                match secret {
629                                    Ok(secret_key) => {
630                                        debug!("got key and inserting into keystore");
631                                        ctx.hmac_keystore
632                                            .write()
633                                            .await
634                                            .insert(access_key.clone().to_string(), secret_key);
635                                    }
636                                    Err(_) => {
637                                        // no key → unauthorized
638                                        write_error_response_with_header(
639                                            session,
640                                            StatusCode::UNAUTHORIZED,
641                                            "No key found for presigned URL".to_string(),
642                                        )
643                                        .await?;
644                                        // session.respond_error(401).await?;
645                                        return Ok(true);
646                                    }
647                                }
648                            } else {
649                                // session.respond_error(401).await?;
650                                write_error_response_with_header(
651                                    session,
652                                    StatusCode::UNAUTHORIZED,
653                                    "No key found for presigned URL".to_string(),
654                                )
655                                .await?;
656                                return Ok(true);
657                            }
658                        }
659                        debug!("now checking if the signature is valid for presigned...");
660                        let sk = ctx
661                            .hmac_keystore
662                            .read()
663                            .await
664                            .get(&access_key)
665                            .expect("key was just inserted")
666                            .clone();
667                        debug!("got secret {} from keystore", sk);
668                        debug!("RAW_PATH       = {}", &session.req_header().uri);
669                        debug!(
670                            "RAW_HOST_HDR   = {:?}",
671                            &session.req_header().headers.get("host")
672                        );
673                        let ok = match signature_is_valid_for_presigned(session, &sk).await {
674                            Ok(b) => b,
675                            Err(e) => {
676                                error!("presigned-URL validation error: {e}"); // <-- keep the info
677                                return Err(pingora::Error::new_str("Failed to check signature"));
678                            }
679                        };
680                        info!("is signature valid?: {}", ok);
681                        if !ok {
682                            let msg = format!(
683                                "Signature invalid for presigned URL: {}",
684                                &session.req_header().uri.path()
685                            );
686                            session.respond_error_with_body(401, msg.into()).await?;
687                            return Ok(true);
688                        }
689                    } else {
690                        info!("processing a regular request");
691
692                        let has_key = {
693                            let map = ctx.hmac_keystore.read().await;
694                            map.contains_key(&access_key)
695                        };
696                        if !has_key {
697                            if let Some(py_fetcher) = &ctx.hmac_fetcher {
698                                // call Python callback
699                                let cb = py_fetcher;
700                                let secret: PyResult<String> = Python::with_gil(|py| {
701                                    cb.call1(py, (&access_key,)).and_then(|r| r.extract(py))
702                                });
703                                match secret {
704                                    Ok(secret_key) => {
705                                        ctx.hmac_keystore
706                                            .write()
707                                            .await
708                                            .insert(access_key.clone().to_string(), secret_key);
709                                    }
710                                    Err(_) => {
711                                        // no key → unauthorized
712                                        // session.respond_error(401).await?;
713                                        write_error_response_with_header(
714                                            session,
715                                            StatusCode::UNAUTHORIZED,
716                                            "No key found for request".to_string(),
717                                        )
718                                        .await?;
719                                        return Ok(true);
720                                    }
721                                }
722                            } else {
723                                // session.respond_error(401).await?;
724                                write_error_response_with_header(
725                                    session,
726                                    StatusCode::UNAUTHORIZED,
727                                    "No key found for request".to_string(),
728                                )
729                                .await?;
730                                return Ok(true);
731                            }
732                        }
733                        let secret_key = {
734                            let map = ctx.hmac_keystore.read().await;
735                            map.get(&access_key).cloned()
736                        };
737
738                        info!("Checking signature");
739                        let sig_ok = match signature_is_valid_for_request(
740                            &auth_header,
741                            session,
742                            &secret_key.expect("key was just inserted"),
743                        )
744                        .await
745                        {
746                            Ok(true) => true,
747                            Ok(false) => {
748                                info!("Signature invalid");
749                                false
750                            }
751                            Err(err) => {
752                                error!("Signature check error: {}", err);
753                                false
754                            }
755                        };
756
757                        // if signature failed, skip further validation
758                        if !sig_ok {
759                            //  session.respond_error(401).await?;
760                            write_error_response_with_header(
761                                session,
762                                StatusCode::UNAUTHORIZED,
763                                "Signature invalid".to_string(),
764                            )
765                            .await?;
766                            return Ok(true);
767                        }
768                    }
769                }
770            }
771            info!("Signature check passed, continuing now onto the bespoke validation");
772            let cache_key = format!("{}:{}:{:?}", &access_key, bucket, &query_dict);
773            debug!("Cache key: {}", cache_key);
774
775            let bucket_clone = bucket.to_string();
776            let callback_clone: PyObject = Python::with_gil(|py| py_cb.clone_ref(py));
777
778            let move_access_key = access_key.clone();
779            let req = query_dict.clone();
780
781            ctx.auth_cache
782                .get_or_validate(&cache_key, Duration::from_secs(ttl), move || {
783                    let tk = move_access_key.clone();
784                    let bu = bucket_clone.clone();
785                    let cb = Python::with_gil(|py| callback_clone.clone_ref(py));
786                    {
787                        let req_value = req.clone();
788                        async move {
789                            validate_request(&tk, &bu, &req_value, cb)
790                                .await
791                                .map_err(|_| pingora::Error::new_str("Validator error"))
792                        }
793                    }
794                })
795                .await?
796        } else {
797            true
798        };
799
800        if !is_authorized {
801            info!("Access denied for bucket: {}.  End of request.", bucket);
802            // session.respond_error(401).await?;
803            write_error_response_with_header(
804                session,
805                StatusCode::UNAUTHORIZED,
806                format!("Access denied for bucket: {}", bucket),
807            )
808            .await?;
809            return Ok(true);
810        }
811
812        let bucket_config = {
813            let map = ctx.cos_mapping.read().await;
814            map.get(&hdr_bucket).cloned()
815        };
816
817        debug!("Access key: {}", &access_key);
818
819        // we have to check for some available credentials here to be able to return unauthorized already if not
820        match bucket_config.clone() {
821            Some(mut config) => {
822                let fetcher_opt = ctx.bucket_creds_fetcher.as_ref().map(|py_cb| {
823                    // clone the PyObject so the async block is 'static
824                    let cb = Python::with_gil(|py| py_cb.clone_ref(py));
825                    move |bucket: String| async move {
826                        get_credential_for_bucket(&cb, bucket, access_key)
827                            .await
828                            .map_err(|e| e.into()) // Convert PyErr → Box<dyn Error>
829                    }
830                });
831
832                config
833                    .ensure_credentials(&hdr_bucket, fetcher_opt)
834                    .await
835                    .map_err(|e| {
836                        error!("Credential check failed for {hdr_bucket}: {e}");
837                        pingora::Error::new_str("Credential check failed")
838                    })?;
839
840                ctx.cos_mapping
841                    .write()
842                    .await
843                    .insert(hdr_bucket.clone(), config);
844            }
845            None => {
846                error!("No configuration available for bucket: {hdr_bucket}");
847                return Err(pingora::Error::new_str(
848                    "No configuration available for bucket",
849                ));
850            }
851        }
852        debug!(
853            "request_filter::Credentials checked for bucket: {}. End of function.",
854            hdr_bucket
855        );
856        debug!("request_filter::end");
857        Ok(false)
858    }
859
860    async fn upstream_request_filter(
861        &self,
862        _session: &mut Session,
863        upstream_request: &mut pingora::http::RequestHeader,
864        ctx: &mut Self::CTX,
865    ) -> Result<()> {
866        if let Some(presigned) = ctx.is_presigned
867            && presigned
868        {
869            debug!("upstream_request_filter::presigned");
870            let cleaned_q = upstream_request
871                .uri
872                .query()
873                .unwrap_or("")
874                .split('&')
875                .filter(|kv| !kv.starts_with("X-Amz-"))
876                .collect::<Vec<_>>()
877                .join("&");
878
879            let _ = upstream_request.remove_header("authorization");
880
881            let new_path_and_query = if cleaned_q.is_empty() {
882                upstream_request.uri.path().to_owned()
883            } else {
884                format!("{}?{}", upstream_request.uri.path(), cleaned_q)
885            };
886
887            upstream_request.set_uri(
888                new_path_and_query
889                    .try_into()
890                    .map_err(|_| pingora::Error::new_str("invalid URI after query rewrite"))?,
891            );
892        }
893
894        let _ = upstream_request.remove_header("accept-encoding");
895
896        debug!("upstream_request_filter::start");
897
898        let (_, (bucket, my_updated_url)) = parse_path(upstream_request.uri.path())
899            .map_err(|_| pingora::Error::new_str("failed to parse upstream request path"))?;
900
901        debug!(my_updated_url, "parsed upstream path");
902
903        let hdr_bucket = bucket.to_string();
904
905        let my_query = match upstream_request.uri.query() {
906            Some(q) if !q.is_empty() => format!("?{}", q),
907            _ => String::new(),
908        };
909
910        let bucket_config = {
911            let map = ctx.cos_mapping.read().await;
912            map.get(&hdr_bucket).cloned()
913        };
914
915        let addressing_style = bucket_config
916            .clone()
917            .and_then(|config| config.addressing_style)
918            .unwrap_or("virtual".to_string());
919
920        let this_url = match addressing_style.as_str() {
921            "virtual" => my_updated_url,
922            _ => {
923                let u_url = format!("/{}{}", bucket, my_updated_url);
924                debug!(u_url, "using path addressing style");
925                &u_url.clone()
926            }
927        };
928
929        let endpoint = match bucket_config.clone() {
930            Some(cfg) => {
931                let this_host = match addressing_style.as_str() {
932                    "path" => cfg.host.to_owned(),
933                    _ => format!("{}.{}", bucket, cfg.host),
934                };
935                if cfg.port == 443 {
936                    this_host
937                } else {
938                    format!("{}:{}", this_host, cfg.port)
939                }
940            }
941            None => format!("{}.{}", bucket, self.cos_endpoint),
942        };
943
944        debug!("endpoint: {}.", &endpoint);
945
946        // Box:leak the temporary string to get a static reference which will outlive the function
947        let authority = Authority::from_static(Box::leak(endpoint.clone().into_boxed_str()));
948        // if addressing_style == "virtual" {
949
950        let new_uri = Uri::builder()
951            .scheme("https")
952            .authority(authority.clone())
953            .path_and_query(this_url.to_owned() + &my_query)
954            .build()
955            .expect("should build a valid URI");
956
957        upstream_request.set_uri(new_uri.clone());
958        // }
959        upstream_request.insert_header("host", authority.as_str())?;
960
961        let (maybe_hmac, maybe_api_key) = match &bucket_config {
962            Some(cfg) => (cfg.has_hmac(), cfg.api_key.clone()),
963            None => (false, None),
964        };
965
966        let allowed = [
967            "host",
968            "content-length",
969            "x-amz-date",
970            "x-amz-content-sha256",
971            "x-amz-security-token",
972            // "content-md5",
973            "transfer-encoding",
974            "content-encoding",
975            "x-amz-decoded-content-length",
976            "x-amz-trailer",
977            "x-amz-sdk-checksum-algorithm",
978            "range",
979            "expect",
980            // "content-encoding",
981            // "range",
982            // "trailer",
983            // "x-amz-trailer",
984        ];
985
986        let to_check: Vec<String> = upstream_request
987            .headers
988            .iter()
989            .map(|(name, _)| name.as_str().to_owned())
990            .collect();
991
992        for name in to_check {
993            let keep = allowed.contains(&name.as_str()) || name.starts_with("x-amz-checksum-");
994            if !keep {
995                let _ = upstream_request.remove_header(&name);
996            }
997        }
998
999        if maybe_hmac {
1000            debug!("HMAC: Signing request for bucket: {}", hdr_bucket);
1001
1002            let streaming = {
1003                upstream_request
1004                    .headers
1005                    .get("x-amz-content-sha256")
1006                    .map(|v| v.as_bytes().starts_with(b"STREAMING-"))
1007                    .unwrap_or(false)
1008            };
1009
1010            if streaming {
1011                let streaming_header = upstream_request
1012                    .headers
1013                    .get("x-amz-content-sha256")
1014                    .and_then(|v| v.to_str().ok())
1015                    .unwrap_or_default();
1016
1017                debug!(streaming_header, "streaming upload detected");
1018
1019                let cfg = bucket_config.as_ref().ok_or_else(|| {
1020                    pingora::Error::new_str("no bucket config for streaming upload")
1021                })?;
1022                let access_key = cfg.access_key.as_deref().unwrap_or_default().to_string();
1023                let secret_key = cfg.secret_key.as_deref().unwrap_or_default().to_string();
1024                let region = cfg.region.as_deref().unwrap_or_default().to_string();
1025
1026                // let decoded_len = upstream_request
1027                //     .headers
1028                //     .get("x-amz-decoded-content-length")
1029                //     .and_then(|v| v.to_str().ok())
1030                //     .unwrap_or("0")
1031                //     .to_owned();
1032
1033                // remove the original streaming headers we cannot forward.
1034                // upstream_request.remove_header("x-amz-decoded-content-length");
1035
1036                //  stream-chunk.
1037                debug!(headers = ?upstream_request.headers, "upstream request headers before streaming rewrite");
1038                upstream_request.remove_header("content-length");
1039                upstream_request.remove_header("content-md5");
1040                upstream_request.insert_header("transfer-encoding", "chunked")?;
1041                // upstream_request.insert_header("x-amz-decoded-content-length", decoded_len)?;
1042                upstream_request.set_send_end_stream(false);
1043
1044                // produce *seed* signature and signing key that will be reused
1045                //    for every DATA frame in the forthcoming request_body_filter.
1046                let ts = chrono::Utc::now();
1047                resign_streaming_request(upstream_request, &region, &access_key, &secret_key, ts)
1048                    .map_err(|e| {
1049                    error!("Failed to sign request: {e}");
1050                    pingora::Error::new_str("Failed to sign request")
1051                })?;
1052
1053                let seed_sig = upstream_request
1054                    .headers
1055                    .get("authorization")
1056                    .and_then(|v| v.to_str().ok())
1057                    .and_then(|v| v.split("Signature=").nth(1))
1058                    .expect("seed signature missing")
1059                    .to_owned();
1060
1061                // stash everything the body filter will need.
1062                ctx.stream_state = Some(signer::StreamingState::new(
1063                    region.to_string(),
1064                    access_key.to_string(),
1065                    secret_key.to_string(),
1066                    ts,
1067                    seed_sig,
1068                ));
1069            } else {
1070                sign_request(
1071                    upstream_request,
1072                    bucket_config
1073                        .as_ref()
1074                        .ok_or_else(|| pingora::Error::new_str("no bucket config for signing"))?,
1075                )
1076                .await
1077                .map_err(|e| {
1078                    error!("Failed to sign request for {}: {e}", hdr_bucket);
1079                    pingora::Error::new_str("Failed to sign request")
1080                })?;
1081            }
1082
1083            debug!("Request signed for bucket: {}", hdr_bucket);
1084            debug!("{:#?}", &upstream_request.headers);
1085        } else {
1086            debug!("Using API key for bucket: {}", hdr_bucket);
1087            let api_key = match maybe_api_key {
1088                Some(key) => key,
1089                None => {
1090                    // should be impossible because request_filter already
1091                    // called ensure_credentials, but double‑check anyway
1092                    error!("No API key for bucket {hdr_bucket}");
1093                    return Err(pingora::Error::new_str("No API key configured for bucket"));
1094                }
1095            };
1096
1097            // closure captured by SecretsCache
1098            let bearer_fetcher = {
1099                let api_key = api_key.clone();
1100                move || get_bearer(api_key.clone())
1101            };
1102
1103            let bearer_token = ctx
1104                .secrets_cache
1105                .get(&hdr_bucket, bearer_fetcher)
1106                .await
1107                .ok_or_else(|| pingora::Error::new_str("Failed to obtain bearer token"))?;
1108
1109            upstream_request.insert_header("Authorization", format!("Bearer {bearer_token}"))?;
1110        }
1111
1112        // debug!("Sending request to upstream: {}", &new_uri);
1113
1114        debug!("Request sent to upstream.");
1115        debug!("upstream_request_filter::end");
1116
1117        Ok(())
1118    }
1119
1120    async fn response_filter(
1121        &self,
1122        _session: &mut Session,
1123        resp: &mut ResponseHeader,
1124        _ctx: &mut Self::CTX,
1125    ) -> Result<()> {
1126        let _ = resp.remove_header("Server");
1127
1128        let _ = resp.insert_header("Server", DEFAULT_SERVER_NAME);
1129
1130        Ok(())
1131    }
1132
1133    async fn request_body_filter(
1134        &self,
1135        _session: &mut Session,
1136        body: &mut Option<bytes::Bytes>,
1137        end_of_stream: bool,
1138        ctx: &mut Self::CTX,
1139    ) -> Result<()> {
1140        // 0. Only active when we stashed a StreamingState in the request filter
1141        let Some(state) = ctx.stream_state.as_mut() else {
1142            return Ok(());
1143        };
1144
1145        // 1. Flush frames are empty and *not* EOS - just ignore them
1146        let Some(payload) = body.take() else {
1147            return Ok(());
1148        };
1149        if payload.is_empty() && !end_of_stream {
1150            return Ok(());
1151        };
1152
1153        // 2. Build the outgoing buffer
1154        let mut out = BytesMut::new();
1155        if !payload.is_empty() {
1156            out.extend_from_slice(&state.sign_chunk(&payload).map_err(|e| {
1157                error!("Failed to sign chunk: {e}");
1158                pingora::Error::new_str("Failed to sign chunk")
1159            })?);
1160        }
1161        if end_of_stream {
1162            out.extend_from_slice(&state.final_chunk().map_err(|e| {
1163                error!("Failed to sign trailer: {e}");
1164                pingora::Error::new_str("Failed to sign trailer")
1165            })?);
1166            ctx.stream_state = None; // upload finished
1167        }
1168
1169        // 3. Hand the encoded bytes to Pingora
1170        *body = Some(out.freeze());
1171        Ok(())
1172    }
1173}
1174
1175/// Initialise the global [`tracing`] subscriber.
1176///
1177/// Configures a human-readable formatter with RFC 3339 timestamps.  The log
1178/// level is controlled by the `RUST_LOG` environment variable (e.g.
1179/// `RUST_LOG=object_storage_proxy=debug`).
1180///
1181/// This is called automatically by [`run_server`] and should not normally be
1182/// invoked by application code.
1183pub fn init_tracing() {
1184    tracing_subscriber::fmt()
1185        .with_timer(ChronoLocal::rfc_3339())
1186        .with_env_filter(EnvFilter::from_default_env())
1187        .init();
1188}
1189
1190/// Build and run the Pingora proxy server.
1191///
1192/// This is the Rust entry-point called from [`start_server`].  It:
1193/// 1. Initialises tracing.
1194/// 2. Parses the COS map and HMAC keystore from the Python objects in `run_args`.
1195/// 3. Creates the Pingora [`Server`], attaches HTTP and/or HTTPS listeners, and
1196///    enters the run-forever loop (blocking the calling thread).
1197///
1198/// # Panics
1199///
1200/// Panics if `run_args.cos_map` cannot be parsed, or if the TLS certificate /
1201/// key paths are missing when `https_port` is set.
1202pub fn run_server(py: Python, run_args: &ProxyServerConfig) {
1203    print_banner();
1204    init_tracing();
1205
1206    if run_args.http_port.is_none() && run_args.https_port.is_none() {
1207        error!("At least one of http_port or https_port must be specified!");
1208        return;
1209    }
1210
1211    if let Some(http_port) = run_args.http_port {
1212        info!("starting HTTP server on port {}", http_port);
1213    }
1214
1215    if let Some(https_port) = run_args.https_port {
1216        info!("starting HTTPS server on port {}", https_port);
1217    }
1218
1219    let local_hmac_map = if Python::with_gil(|py| run_args.hmac_keystore.is_none(py)) {
1220        HashMap::new()
1221    } else {
1222        parse_hmac_list(py, &run_args.hmac_keystore).unwrap_or_default()
1223    };
1224
1225    debug!("HMAC keys: {:#?}", &local_hmac_map);
1226
1227    let cosmap = Arc::new(RwLock::new(
1228        parse_cos_map(py, &run_args.cos_map).expect("failed to parse cos_map"),
1229    ));
1230    let hmac_keystore = Arc::new(RwLock::new(local_hmac_map));
1231
1232    let mut my_server = Server::new(None).expect("failed to create pingora server");
1233    my_server.bootstrap();
1234
1235    let validator = run_args.validator.as_ref().map(|v| v.clone_ref(py));
1236    let hmac_fetcher = run_args.hmac_fetcher.as_ref().map(|v| v.clone_ref(py));
1237
1238    let mut my_proxy = pingora::proxy::http_proxy_service(
1239        &my_server.configuration,
1240        MyProxy {
1241            cos_endpoint: "s3.eu-de.cloud-object-storage.appdomain.cloud".to_string(),
1242            cos_mapping: Arc::clone(&cosmap),
1243            hmac_keystore: Arc::clone(&hmac_keystore),
1244            secrets_cache: SecretsCache::new(),
1245            auth_cache: AuthCache::new(),
1246            validator,
1247            bucket_creds_fetcher: run_args
1248                .bucket_creds_fetcher
1249                .as_ref()
1250                .map(|v| v.clone_ref(py)),
1251            verify: run_args.verify,
1252            skip_signature_validation: run_args.skip_signature_validation,
1253            hmac_fetcher,
1254            tracker: UrlTracker::new(),
1255            max_presign_url_usage_attempts: run_args.max_presign_url_usage_attempts,
1256            server_name: "<osp⚡>".to_string(),
1257        },
1258    );
1259
1260    if run_args.threads.is_some() {
1261        my_proxy.threads = run_args.threads;
1262    }
1263
1264    debug!("Proxy service threads: {:?}", &my_proxy.threads);
1265
1266    if let Some(http_port) = run_args.http_port {
1267        info!("starting HTTP server on port {}", &http_port);
1268        let addr = format!("0.0.0.0:{}", http_port);
1269        my_proxy.add_tcp(addr.as_str());
1270    }
1271
1272    if let Some(https_port) = run_args.https_port {
1273        let cert_path =
1274            std::env::var("TLS_CERT_PATH").expect("Set TLS_CERT_PATH to the PEM certificate file");
1275        let key_path =
1276            std::env::var("TLS_KEY_PATH").expect("Set TLS_KEY_PATH to the PEM private-key file");
1277
1278        let mut tls = pingora::listeners::tls::TlsSettings::intermediate(&cert_path, &key_path)
1279            .expect("failed to build TLS settings");
1280
1281        tls.enable_h2();
1282        let https_addr = format!("0.0.0.0:{}", https_port);
1283        my_proxy.add_tls_with_settings(https_addr.as_str(), /*tcp_opts*/ None, tls);
1284    }
1285
1286    my_server.add_service(my_proxy);
1287
1288    debug!("{:?}", &my_server.configuration);
1289
1290    py.allow_threads(|| my_server.run_forever());
1291
1292    info!("server running ...");
1293}
1294
1295/// Start an HTTP + HTTPS reverse‑proxy for IBM COS.
1296///
1297/// Equivalent to running ``pingora`` with a custom handler.
1298///
1299/// Parameters
1300/// ----------
1301/// run_args:
1302///    A :py:class:`ProxyServerConfig` object containing the configuration for the server.
1303///     The configuration includes the following parameters:
1304///   - cos_map: A dictionary mapping bucket names to their respective COS configuration.
1305///     Each entry should contain the following
1306///     keys:
1307///        - host: The COS endpoint (e.g., "s3.eu-de.cloud-object-storage.appdomain.cloud")
1308///        - port: The port number (e.g., 443)
1309///        - api_key/apikey: The API key for the bucket (optional)
1310///        - ttl/time-to-live: The time-to-live for the API key in seconds (optional)
1311///   - bucket_creds_fetcher: Optional Python async callable that fetches the API key for a bucket.
1312///     The callable should accept a single argument, the bucket name.
1313///     It should return a string containing the API key.
1314///   - http_port: The HTTP port to listen on.
1315///   - https_port: The HTTPS port to listen on.
1316///   - validator: Optional Python async callable that validates the request.
1317///     The callable should accept two arguments, the access_key and the bucket name.
1318///     It should return a boolean indicating whether the request is valid.
1319///   - threads: Optional number of threads to use for the server.
1320///     If not specified, the server will use a single thread.
1321#[pyfunction]
1322pub fn start_server(py: Python, run_args: &ProxyServerConfig) -> PyResult<()> {
1323    rustls::crypto::ring::default_provider()
1324        .install_default()
1325        .expect("Failed to install rustls crypto provider");
1326
1327    dotenv().ok();
1328
1329    run_server(py, run_args);
1330
1331    Ok(())
1332}
1333
1334/// Enable the global request counter (disabled by default).
1335///
1336/// Once enabled every request proxied increments an atomic counter that can be
1337/// read with [`get_request_count`].  Useful for testing and load-measurement.
1338#[pyfunction]
1339fn enable_request_counting() {
1340    REQ_COUNTER_ENABLED.store(true, Ordering::Relaxed);
1341}
1342
1343/// Disable the global request counter.
1344#[pyfunction]
1345fn disable_request_counting() {
1346    REQ_COUNTER_ENABLED.store(false, Ordering::Relaxed);
1347}
1348
1349/// Return the total number of proxied requests since counting was enabled.
1350#[pyfunction]
1351fn get_request_count() -> PyResult<usize> {
1352    Ok(REQ_COUNTER.load(Ordering::Relaxed))
1353}
1354
1355#[pymodule]
1356fn object_storage_proxy(m: &Bound<'_, PyModule>) -> PyResult<()> {
1357    m.add_function(wrap_pyfunction!(start_server, m)?)?;
1358    m.add_class::<ProxyServerConfig>()?;
1359    m.add_class::<CosMapItem>()?;
1360    m.add_function(wrap_pyfunction!(enable_request_counting, m)?)?;
1361    m.add_function(wrap_pyfunction!(disable_request_counting, m)?)?;
1362    m.add_function(wrap_pyfunction!(get_request_count, m)?)?;
1363    Ok(())
1364}
1365
1366#[cfg(test)]
1367mod tests {
1368    use super::*;
1369
1370    // ── UrlTracker ────────────────────────────────────────────────────────────
1371
1372    #[test]
1373    fn url_tracker_new_is_empty() {
1374        let tracker = UrlTracker::new();
1375        assert!(tracker.get_all().is_empty());
1376    }
1377
1378    #[test]
1379    fn url_tracker_default_equals_new() {
1380        let t1 = UrlTracker::new();
1381        let t2 = UrlTracker::default();
1382        assert_eq!(t1.get_all().len(), t2.get_all().len());
1383    }
1384
1385    #[test]
1386    fn url_tracker_track_increments_count() {
1387        let tracker = UrlTracker::new();
1388        assert_eq!(tracker.get("http://example.com/key"), None);
1389        tracker.track("http://example.com/key");
1390        assert_eq!(tracker.get("http://example.com/key"), Some(1));
1391        tracker.track("http://example.com/key");
1392        assert_eq!(tracker.get("http://example.com/key"), Some(2));
1393    }
1394
1395    #[test]
1396    fn url_tracker_get_returns_none_for_unknown_url() {
1397        let tracker = UrlTracker::new();
1398        assert_eq!(tracker.get("http://example.com/missing"), None);
1399    }
1400
1401    #[test]
1402    fn url_tracker_get_all_returns_all_tracked_urls() {
1403        let tracker = UrlTracker::new();
1404        tracker.track("http://example.com/a");
1405        tracker.track("http://example.com/b");
1406        tracker.track("http://example.com/a");
1407        let mut all = tracker.get_all();
1408        all.sort_by_key(|(k, _)| k.clone());
1409        assert_eq!(all.len(), 2);
1410        assert_eq!(all[0], ("http://example.com/a".to_string(), 2));
1411        assert_eq!(all[1], ("http://example.com/b".to_string(), 1));
1412    }
1413}