1#![warn(clippy::all)]
38use async_trait::async_trait;
39use bytes::BytesMut;
40use credentials::signer::{
41 self, resign_streaming_request, signature_is_valid_for_presigned,
42 signature_is_valid_for_request,
43};
44use dashmap::DashMap;
45use dotenv::dotenv;
46use http::uri::Authority;
47use http::{StatusCode, Uri};
48use parsers::cos_map::{CosMapItem, parse_cos_map};
49use parsers::keystore::parse_hmac_list;
50use pingora::Result;
51use pingora::http::ResponseHeader;
52use pingora::proxy::{ProxyHttp, Session};
53use pingora::server::Server;
54use pingora::upstreams::peer::HttpPeer;
55use pyo3::prelude::*;
56use pyo3::types::{PyModule, PyModuleMethods};
57use pyo3::{Bound, PyResult, Python, pyclass, pyfunction, pymodule, wrap_pyfunction};
58use std::sync::{
59 Arc,
60 atomic::{AtomicBool, AtomicUsize, Ordering},
61};
62
63use std::collections::HashMap;
66use std::fmt::Debug;
67
68use std::time::Duration;
69use tokio::sync::RwLock;
70use tracing::{debug, error, info, warn};
71use tracing_subscriber::EnvFilter;
72use tracing_subscriber::fmt::time::ChronoLocal;
73
74pub mod parsers;
75use parsers::credentials::{parse_presigned_params, parse_token_from_header};
76use parsers::path::{parse_path, parse_query};
77
78pub mod credentials;
79use credentials::{
80 secrets_proxy::{SecretsCache, get_bearer, get_credential_for_bucket},
81 signer::sign_request,
82};
83
84pub mod utils;
85use utils::banner::print_banner;
86use utils::response::write_error_response_with_header;
87use utils::validator::{AuthCache, validate_request};
88
89static REQ_COUNTER: AtomicUsize = AtomicUsize::new(0);
90static REQ_COUNTER_ENABLED: AtomicBool = AtomicBool::new(false);
91const DEFAULT_SERVER_NAME: &str = "<osp⚡>";
92
93#[derive(Clone)]
103pub struct UrlTracker {
104 pub counts: Arc<DashMap<String, usize>>,
106}
107
108impl Default for UrlTracker {
109 fn default() -> Self {
110 Self::new()
111 }
112}
113
114impl UrlTracker {
115 pub fn new() -> Self {
117 UrlTracker {
118 counts: Arc::new(DashMap::new()),
119 }
120 }
121
122 pub fn track(&self, url: &str) {
124 let mut entry = self.counts.entry(url.to_string()).or_insert(0);
125 *entry += 1;
126 debug!(url, count = *entry, "tracking presigned URL");
127 }
128
129 pub fn get(&self, url: &str) -> Option<usize> {
131 self.counts.get(url).map(|v| *v)
132 }
133
134 pub fn get_all(&self) -> Vec<(String, usize)> {
136 self.counts
137 .iter()
138 .map(|e| (e.key().clone(), *e.value()))
139 .collect()
140 }
141}
142
143#[pyclass]
173#[pyo3(name = "ProxyServerConfig")]
174#[derive(Debug)]
175pub struct ProxyServerConfig {
176 #[pyo3(get, set)]
177 pub bucket_creds_fetcher: Option<Py<PyAny>>,
178
179 #[pyo3(get, set)]
180 pub cos_map: PyObject,
181
182 #[pyo3(get, set)]
183 pub http_port: Option<u16>,
184
185 #[pyo3(get, set)]
186 pub https_port: Option<u16>,
187
188 #[pyo3(get, set)]
189 pub validator: Option<Py<PyAny>>,
190
191 #[pyo3(get, set)]
192 pub threads: Option<usize>,
193
194 #[pyo3(get, set)]
195 pub verify: Option<bool>,
196
197 #[pyo3(get, set)]
198 pub hmac_keystore: PyObject,
199
200 #[pyo3(get, set)]
201 pub skip_signature_validation: Option<bool>,
202
203 #[pyo3(get, set)]
204 pub hmac_fetcher: Option<Py<PyAny>>,
205
206 #[pyo3(get, set)]
207 pub max_presign_url_usage_attempts: Option<usize>,
208
209 #[pyo3(get, set)]
210 pub server_name: String,
211}
212
213impl Default for ProxyServerConfig {
214 fn default() -> Self {
215 ProxyServerConfig {
216 cos_map: Python::with_gil(|py| py.None()),
217 bucket_creds_fetcher: None,
218 http_port: None,
219 https_port: None,
220 validator: None,
221 threads: Some(1),
222 verify: None,
223 hmac_keystore: Python::with_gil(|py| py.None()),
224 skip_signature_validation: Some(false),
225 hmac_fetcher: None,
226 max_presign_url_usage_attempts: Some(3),
227 server_name: "<osp⚡>".to_string(),
228 }
229 }
230}
231
232#[pymethods]
233impl ProxyServerConfig {
234 #[new]
235 #[pyo3(
236 signature = (
237 cos_map,
238 hmac_keystore = None,
239 bucket_creds_fetcher = None,
240 http_port = None,
241 https_port = None,
242 validator = None,
243 threads = Some(1),
244 verify = None,
245 skip_signature_validation = Some(false),
246 hmac_fetcher = None,
247 max_presign_url_usage_attempts = Some(3),
248 server_name = "<osp⚡>".to_string(),
249 )
250 )]
251 #[allow(clippy::too_many_arguments)]
252 pub fn new(
253 cos_map: PyObject,
254 hmac_keystore: Option<PyObject>,
255 bucket_creds_fetcher: Option<PyObject>,
256 http_port: Option<u16>,
257 https_port: Option<u16>,
258 validator: Option<PyObject>,
259 threads: Option<usize>,
260 verify: Option<bool>,
261 skip_signature_validation: Option<bool>,
262 hmac_fetcher: Option<PyObject>,
263 max_presign_url_usage_attempts: Option<usize>,
264 server_name: String,
265 ) -> Self {
266 ProxyServerConfig {
267 cos_map,
268 hmac_keystore: hmac_keystore.unwrap_or_else(|| Python::with_gil(|py| py.None())),
269 bucket_creds_fetcher,
270 http_port,
271 https_port,
272 validator,
273 threads,
274 verify,
275 skip_signature_validation,
276 hmac_fetcher,
277 max_presign_url_usage_attempts,
278 server_name,
279 }
280 }
281
282 fn __repr__(&self) -> PyResult<String> {
283 Ok(format!(
284 "ProxyServerConfig(http_port={}, https_port={}, threads={:?})",
285 self.http_port.unwrap_or(0),
286 self.https_port.unwrap_or(0),
287 self.threads
288 ))
289 }
290}
291
292pub struct MyProxy {
298 cos_endpoint: String,
299 cos_mapping: Arc<RwLock<HashMap<String, CosMapItem>>>,
300 hmac_keystore: Arc<RwLock<HashMap<String, String>>>,
301 secrets_cache: SecretsCache,
302 auth_cache: AuthCache,
303 validator: Option<PyObject>,
304 bucket_creds_fetcher: Option<PyObject>,
305 verify: Option<bool>,
306 skip_signature_validation: Option<bool>,
307 hmac_fetcher: Option<PyObject>,
308 tracker: UrlTracker,
309 max_presign_url_usage_attempts: Option<usize>,
310 #[allow(dead_code)]
311 server_name: String,
312}
313
314pub struct MyCtx {
319 cos_mapping: Arc<RwLock<HashMap<String, CosMapItem>>>,
320 hmac_keystore: Arc<RwLock<HashMap<String, String>>>,
321 secrets_cache: SecretsCache,
322 auth_cache: AuthCache,
323 validator: Option<PyObject>,
324 bucket_creds_fetcher: Option<PyObject>,
325 hmac_fetcher: Option<PyObject>,
326 is_presigned: Option<bool>,
327 stream_state: Option<signer::StreamingState>,
328}
329
330#[async_trait]
337impl ProxyHttp for MyProxy {
338 type CTX = MyCtx;
339 fn new_ctx(&self) -> Self::CTX {
340 MyCtx {
341 cos_mapping: Arc::clone(&self.cos_mapping),
342 hmac_keystore: Arc::clone(&self.hmac_keystore),
343 secrets_cache: self.secrets_cache.clone(),
344 auth_cache: self.auth_cache.clone(),
345 validator: self
346 .validator
347 .as_ref()
348 .map(|v| Python::with_gil(|py| v.clone_ref(py))),
349 bucket_creds_fetcher: self
350 .bucket_creds_fetcher
351 .as_ref()
352 .map(|v| Python::with_gil(|py| v.clone_ref(py))),
353 hmac_fetcher: self
354 .hmac_fetcher
355 .as_ref()
356 .map(|v| Python::with_gil(|py| v.clone_ref(py))),
357 is_presigned: None,
358 stream_state: None,
359 }
360 }
361
362 async fn upstream_peer(
363 &self,
364 session: &mut Session,
365 ctx: &mut Self::CTX,
366 ) -> Result<Box<HttpPeer>> {
367 debug!("upstream_peer::start");
368 if REQ_COUNTER_ENABLED.load(Ordering::Relaxed) {
369 let new_val = REQ_COUNTER.fetch_add(1, Ordering::Relaxed) + 1;
370 debug!("Request count: {}", new_val);
371 }
372
373 let path = session.req_header().uri.path();
374
375 let parse_path_result = parse_path(path);
376 if parse_path_result.is_err() {
377 error!("Failed to parse path: {:?}", parse_path_result);
378 return Err(pingora::Error::new_str("Failed to parse path"));
379 }
380
381 let (_, (bucket, _)) = parse_path_result.expect("checked above");
382
383 let hdr_bucket = bucket.to_owned();
384
385 let bucket_config = {
386 let map = ctx.cos_mapping.read().await;
387 map.get(&hdr_bucket).cloned()
388 };
389
390 let addressing_style = bucket_config
391 .clone()
392 .and_then(|config| config.addressing_style)
393 .unwrap_or("virtual".to_string());
394
395 let endpoint = match bucket_config.clone() {
396 Some(config) => {
397 if addressing_style == "path" {
398 config.host.to_owned()
399 } else {
400 format!("{}.{}", bucket, config.host)
401 }
402 }
403 None => {
404 format!("{}.{}", bucket, self.cos_endpoint)
405 }
406 };
407
408 let port = bucket_config
409 .clone()
410 .map(|config| config.port)
411 .unwrap_or(443);
412
413 let addr = (endpoint.clone(), port);
414
415 let endpoint_is_tls = bucket_config.and_then(|config| config.tls).unwrap_or(true);
416
417 debug!(endpoint_is_tls, endpoint, "resolved upstream peer");
418
419 let mut peer = Box::new(HttpPeer::new(addr, endpoint_is_tls, endpoint.clone()));
420 debug!(?peer, "upstream peer created");
421
422 peer.options.max_h2_streams = 128;
425 peer.options.h2_ping_interval = Some(Duration::from_secs(30));
426
427 debug!("peer: {:#?}", &peer);
433
434 if let Some(verify) = self.verify {
435 info!("Verify peer (upstream) certificates disabled!");
436 peer.options.verify_cert = verify;
437 peer.options.verify_hostname = verify;
438 } else {
439 peer.options.verify_cert = true;
440 }
441
442 debug!("peer: {:#?}", &peer);
443
444 debug!("upstream_peer::end");
445 Ok(peer)
446 }
447
448 async fn request_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<bool> {
449 debug!("request_filter::start");
450
451 let url = session.req_header().uri.to_string();
456 let path = session.req_header().uri.path().to_string();
457 let is_presigned_url = session
458 .req_header()
459 .uri
460 .query()
461 .is_some_and(|q| q.contains("X-Amz-Signature"));
462 if is_presigned_url {
463 self.tracker.track(&url);
464 }
465 let tracked_count = self.tracker.get(&url).unwrap_or(0);
466 if is_presigned_url && tracked_count > self.max_presign_url_usage_attempts.unwrap_or(3) {
467 warn!(
468 url,
469 tracked_count,
470 max = self.max_presign_url_usage_attempts.unwrap_or(3),
471 "presigned URL usage limit exceeded, denying"
472 );
473 let msg = format!(
474 "URL ({}) has been tracked too many times: {} (max={}). Access Denied!",
475 path,
476 tracked_count,
477 self.max_presign_url_usage_attempts.unwrap_or(3)
478 );
479
480 write_error_response_with_header(session, StatusCode::FORBIDDEN, msg).await?;
493 return Ok(true);
494 }
495
496 debug!(summary = ?session.request_summary(), "request summary");
497 debug!(uri = ?session.req_header().uri, "incoming request URI");
498
499 let request_query = session.req_header().uri.query().unwrap_or("");
500 info!("request path: {}", session.req_header().uri.path());
501 info!("request query: {}", request_query);
502 info!("request method : {}", session.req_header().method);
503
504 let parsed_query_result = parse_query(request_query);
505
506 if parsed_query_result.is_err() {
507 error!("Failed to parse query: {:?}", parsed_query_result);
508 return Err(pingora::Error::new_str("Failed to parse query"));
509 }
510 let (rest, mut query_dict) = parsed_query_result.expect("checked above");
511 if rest.is_empty() {
512 info!("Parsed query: {:#?}", query_dict);
513 } else {
514 error!("Failed to parse query: {}", rest);
515 }
516
517 query_dict.insert(
518 "method".to_string(),
519 session.req_header().method.to_string(),
520 );
521 query_dict.insert(
522 "path".to_string(),
523 session.req_header().uri.path().to_string(),
524 );
525 query_dict.insert(
527 "source".to_string(),
528 session
529 .req_header()
530 .headers
531 .get("x-forwarded-for")
532 .and_then(|h| h.to_str().ok())
533 .unwrap_or_default()
534 .to_string(),
535 );
536
537 info!("---> Parsed query: {:#?}", query_dict);
538
539 if session
540 .req_header()
541 .headers
542 .get("expect")
543 .map(|v| {
544 v.to_str()
545 .unwrap_or("")
546 .eq_ignore_ascii_case("100-continue")
547 })
548 .unwrap_or(false)
549 {
550 return Ok(false);
551 };
552
553 let path = session.req_header().uri.path().to_owned();
554
555 let parse_path_result = parse_path(&path);
556 if parse_path_result.is_err() {
557 error!("Failed to parse path: {:?}", parse_path_result);
558 return Err(pingora::Error::new_str("Failed to parse path"));
559 }
560
561 let (_, (bucket, _uri_path)) = parse_path_result.expect("checked above");
562
563 let hdr_bucket = bucket.to_owned();
564
565 let auth_header = session
566 .req_header()
567 .headers
568 .get("authorization")
569 .and_then(|h| h.to_str().ok())
570 .map(ToString::to_string)
571 .unwrap_or_default();
572
573 let ttl = {
574 let map = ctx.cos_mapping.read().await;
575 map.get(bucket).and_then(|c| c.ttl).unwrap_or(0)
576 };
577 let mut access_key: String = String::new();
578
579 if auth_header.is_empty() {
580 if let Some(q) = session.req_header().uri.query()
581 && q.contains("X-Amz-Credential")
582 {
583 let (_, p) = parse_presigned_params(&format!("?{q}"))
584 .map_err(|_| pingora::Error::new_str("Failed to parse presigned params"))?;
585 access_key = p.access_key.clone();
586 }
587 } else {
588 access_key = parse_token_from_header(&auth_header)
589 .map_err(|_| pingora::Error::new_str("Failed to parse access_key"))?
590 .1
591 .to_string();
592 }
593
594 let is_authorized = if let Some(py_cb) = &ctx.validator {
595 let is_multipart = session
596 .req_header()
597 .uri
598 .query()
599 .is_some_and(|q| q.contains("uploadId="));
600
601 info!("CHECKING SIGNATURE");
602 if let Some(skip) = self.skip_signature_validation {
603 if skip || is_multipart {
604 info!("Skipping local signature check");
605 } else {
607 info!("Checking presigned signature");
609 let uri_q = session.req_header().uri.query().unwrap_or("");
610
611 if auth_header.is_empty() && uri_q.contains("X-Amz-Signature") {
612 ctx.is_presigned = Some(true);
613
614 if !ctx.hmac_keystore.read().await.contains_key(&access_key) {
616 debug!(
617 "No key in keystore, trying to fetch via hmac_fetcher for ->{}<-",
618 access_key
619 );
620 if let Some(py_fetcher) = &ctx.hmac_fetcher {
622 let cb = py_fetcher;
624 let secret: PyResult<String> = Python::with_gil(|py| {
625 cb.call1(py, (&access_key,)).and_then(|r| r.extract(py))
626 });
627 debug!("Got secret: {:#?}", secret);
628 match secret {
629 Ok(secret_key) => {
630 debug!("got key and inserting into keystore");
631 ctx.hmac_keystore
632 .write()
633 .await
634 .insert(access_key.clone().to_string(), secret_key);
635 }
636 Err(_) => {
637 write_error_response_with_header(
639 session,
640 StatusCode::UNAUTHORIZED,
641 "No key found for presigned URL".to_string(),
642 )
643 .await?;
644 return Ok(true);
646 }
647 }
648 } else {
649 write_error_response_with_header(
651 session,
652 StatusCode::UNAUTHORIZED,
653 "No key found for presigned URL".to_string(),
654 )
655 .await?;
656 return Ok(true);
657 }
658 }
659 debug!("now checking if the signature is valid for presigned...");
660 let sk = ctx
661 .hmac_keystore
662 .read()
663 .await
664 .get(&access_key)
665 .expect("key was just inserted")
666 .clone();
667 debug!("got secret {} from keystore", sk);
668 debug!("RAW_PATH = {}", &session.req_header().uri);
669 debug!(
670 "RAW_HOST_HDR = {:?}",
671 &session.req_header().headers.get("host")
672 );
673 let ok = match signature_is_valid_for_presigned(session, &sk).await {
674 Ok(b) => b,
675 Err(e) => {
676 error!("presigned-URL validation error: {e}"); return Err(pingora::Error::new_str("Failed to check signature"));
678 }
679 };
680 info!("is signature valid?: {}", ok);
681 if !ok {
682 let msg = format!(
683 "Signature invalid for presigned URL: {}",
684 &session.req_header().uri.path()
685 );
686 session.respond_error_with_body(401, msg.into()).await?;
687 return Ok(true);
688 }
689 } else {
690 info!("processing a regular request");
691
692 let has_key = {
693 let map = ctx.hmac_keystore.read().await;
694 map.contains_key(&access_key)
695 };
696 if !has_key {
697 if let Some(py_fetcher) = &ctx.hmac_fetcher {
698 let cb = py_fetcher;
700 let secret: PyResult<String> = Python::with_gil(|py| {
701 cb.call1(py, (&access_key,)).and_then(|r| r.extract(py))
702 });
703 match secret {
704 Ok(secret_key) => {
705 ctx.hmac_keystore
706 .write()
707 .await
708 .insert(access_key.clone().to_string(), secret_key);
709 }
710 Err(_) => {
711 write_error_response_with_header(
714 session,
715 StatusCode::UNAUTHORIZED,
716 "No key found for request".to_string(),
717 )
718 .await?;
719 return Ok(true);
720 }
721 }
722 } else {
723 write_error_response_with_header(
725 session,
726 StatusCode::UNAUTHORIZED,
727 "No key found for request".to_string(),
728 )
729 .await?;
730 return Ok(true);
731 }
732 }
733 let secret_key = {
734 let map = ctx.hmac_keystore.read().await;
735 map.get(&access_key).cloned()
736 };
737
738 info!("Checking signature");
739 let sig_ok = match signature_is_valid_for_request(
740 &auth_header,
741 session,
742 &secret_key.expect("key was just inserted"),
743 )
744 .await
745 {
746 Ok(true) => true,
747 Ok(false) => {
748 info!("Signature invalid");
749 false
750 }
751 Err(err) => {
752 error!("Signature check error: {}", err);
753 false
754 }
755 };
756
757 if !sig_ok {
759 write_error_response_with_header(
761 session,
762 StatusCode::UNAUTHORIZED,
763 "Signature invalid".to_string(),
764 )
765 .await?;
766 return Ok(true);
767 }
768 }
769 }
770 }
771 info!("Signature check passed, continuing now onto the bespoke validation");
772 let cache_key = format!("{}:{}:{:?}", &access_key, bucket, &query_dict);
773 debug!("Cache key: {}", cache_key);
774
775 let bucket_clone = bucket.to_string();
776 let callback_clone: PyObject = Python::with_gil(|py| py_cb.clone_ref(py));
777
778 let move_access_key = access_key.clone();
779 let req = query_dict.clone();
780
781 ctx.auth_cache
782 .get_or_validate(&cache_key, Duration::from_secs(ttl), move || {
783 let tk = move_access_key.clone();
784 let bu = bucket_clone.clone();
785 let cb = Python::with_gil(|py| callback_clone.clone_ref(py));
786 {
787 let req_value = req.clone();
788 async move {
789 validate_request(&tk, &bu, &req_value, cb)
790 .await
791 .map_err(|_| pingora::Error::new_str("Validator error"))
792 }
793 }
794 })
795 .await?
796 } else {
797 true
798 };
799
800 if !is_authorized {
801 info!("Access denied for bucket: {}. End of request.", bucket);
802 write_error_response_with_header(
804 session,
805 StatusCode::UNAUTHORIZED,
806 format!("Access denied for bucket: {}", bucket),
807 )
808 .await?;
809 return Ok(true);
810 }
811
812 let bucket_config = {
813 let map = ctx.cos_mapping.read().await;
814 map.get(&hdr_bucket).cloned()
815 };
816
817 debug!("Access key: {}", &access_key);
818
819 match bucket_config.clone() {
821 Some(mut config) => {
822 let fetcher_opt = ctx.bucket_creds_fetcher.as_ref().map(|py_cb| {
823 let cb = Python::with_gil(|py| py_cb.clone_ref(py));
825 move |bucket: String| async move {
826 get_credential_for_bucket(&cb, bucket, access_key)
827 .await
828 .map_err(|e| e.into()) }
830 });
831
832 config
833 .ensure_credentials(&hdr_bucket, fetcher_opt)
834 .await
835 .map_err(|e| {
836 error!("Credential check failed for {hdr_bucket}: {e}");
837 pingora::Error::new_str("Credential check failed")
838 })?;
839
840 ctx.cos_mapping
841 .write()
842 .await
843 .insert(hdr_bucket.clone(), config);
844 }
845 None => {
846 error!("No configuration available for bucket: {hdr_bucket}");
847 return Err(pingora::Error::new_str(
848 "No configuration available for bucket",
849 ));
850 }
851 }
852 debug!(
853 "request_filter::Credentials checked for bucket: {}. End of function.",
854 hdr_bucket
855 );
856 debug!("request_filter::end");
857 Ok(false)
858 }
859
860 async fn upstream_request_filter(
861 &self,
862 _session: &mut Session,
863 upstream_request: &mut pingora::http::RequestHeader,
864 ctx: &mut Self::CTX,
865 ) -> Result<()> {
866 if let Some(presigned) = ctx.is_presigned
867 && presigned
868 {
869 debug!("upstream_request_filter::presigned");
870 let cleaned_q = upstream_request
871 .uri
872 .query()
873 .unwrap_or("")
874 .split('&')
875 .filter(|kv| !kv.starts_with("X-Amz-"))
876 .collect::<Vec<_>>()
877 .join("&");
878
879 let _ = upstream_request.remove_header("authorization");
880
881 let new_path_and_query = if cleaned_q.is_empty() {
882 upstream_request.uri.path().to_owned()
883 } else {
884 format!("{}?{}", upstream_request.uri.path(), cleaned_q)
885 };
886
887 upstream_request.set_uri(
888 new_path_and_query
889 .try_into()
890 .map_err(|_| pingora::Error::new_str("invalid URI after query rewrite"))?,
891 );
892 }
893
894 let _ = upstream_request.remove_header("accept-encoding");
895
896 debug!("upstream_request_filter::start");
897
898 let (_, (bucket, my_updated_url)) = parse_path(upstream_request.uri.path())
899 .map_err(|_| pingora::Error::new_str("failed to parse upstream request path"))?;
900
901 debug!(my_updated_url, "parsed upstream path");
902
903 let hdr_bucket = bucket.to_string();
904
905 let my_query = match upstream_request.uri.query() {
906 Some(q) if !q.is_empty() => format!("?{}", q),
907 _ => String::new(),
908 };
909
910 let bucket_config = {
911 let map = ctx.cos_mapping.read().await;
912 map.get(&hdr_bucket).cloned()
913 };
914
915 let addressing_style = bucket_config
916 .clone()
917 .and_then(|config| config.addressing_style)
918 .unwrap_or("virtual".to_string());
919
920 let this_url = match addressing_style.as_str() {
921 "virtual" => my_updated_url,
922 _ => {
923 let u_url = format!("/{}{}", bucket, my_updated_url);
924 debug!(u_url, "using path addressing style");
925 &u_url.clone()
926 }
927 };
928
929 let endpoint = match bucket_config.clone() {
930 Some(cfg) => {
931 let this_host = match addressing_style.as_str() {
932 "path" => cfg.host.to_owned(),
933 _ => format!("{}.{}", bucket, cfg.host),
934 };
935 if cfg.port == 443 {
936 this_host
937 } else {
938 format!("{}:{}", this_host, cfg.port)
939 }
940 }
941 None => format!("{}.{}", bucket, self.cos_endpoint),
942 };
943
944 debug!("endpoint: {}.", &endpoint);
945
946 let authority = Authority::from_static(Box::leak(endpoint.clone().into_boxed_str()));
948 let new_uri = Uri::builder()
951 .scheme("https")
952 .authority(authority.clone())
953 .path_and_query(this_url.to_owned() + &my_query)
954 .build()
955 .expect("should build a valid URI");
956
957 upstream_request.set_uri(new_uri.clone());
958 upstream_request.insert_header("host", authority.as_str())?;
960
961 let (maybe_hmac, maybe_api_key) = match &bucket_config {
962 Some(cfg) => (cfg.has_hmac(), cfg.api_key.clone()),
963 None => (false, None),
964 };
965
966 let allowed = [
967 "host",
968 "content-length",
969 "x-amz-date",
970 "x-amz-content-sha256",
971 "x-amz-security-token",
972 "transfer-encoding",
974 "content-encoding",
975 "x-amz-decoded-content-length",
976 "x-amz-trailer",
977 "x-amz-sdk-checksum-algorithm",
978 "range",
979 "expect",
980 ];
985
986 let to_check: Vec<String> = upstream_request
987 .headers
988 .iter()
989 .map(|(name, _)| name.as_str().to_owned())
990 .collect();
991
992 for name in to_check {
993 let keep = allowed.contains(&name.as_str()) || name.starts_with("x-amz-checksum-");
994 if !keep {
995 let _ = upstream_request.remove_header(&name);
996 }
997 }
998
999 if maybe_hmac {
1000 debug!("HMAC: Signing request for bucket: {}", hdr_bucket);
1001
1002 let streaming = {
1003 upstream_request
1004 .headers
1005 .get("x-amz-content-sha256")
1006 .map(|v| v.as_bytes().starts_with(b"STREAMING-"))
1007 .unwrap_or(false)
1008 };
1009
1010 if streaming {
1011 let streaming_header = upstream_request
1012 .headers
1013 .get("x-amz-content-sha256")
1014 .and_then(|v| v.to_str().ok())
1015 .unwrap_or_default();
1016
1017 debug!(streaming_header, "streaming upload detected");
1018
1019 let cfg = bucket_config.as_ref().ok_or_else(|| {
1020 pingora::Error::new_str("no bucket config for streaming upload")
1021 })?;
1022 let access_key = cfg.access_key.as_deref().unwrap_or_default().to_string();
1023 let secret_key = cfg.secret_key.as_deref().unwrap_or_default().to_string();
1024 let region = cfg.region.as_deref().unwrap_or_default().to_string();
1025
1026 debug!(headers = ?upstream_request.headers, "upstream request headers before streaming rewrite");
1038 upstream_request.remove_header("content-length");
1039 upstream_request.remove_header("content-md5");
1040 upstream_request.insert_header("transfer-encoding", "chunked")?;
1041 upstream_request.set_send_end_stream(false);
1043
1044 let ts = chrono::Utc::now();
1047 resign_streaming_request(upstream_request, ®ion, &access_key, &secret_key, ts)
1048 .map_err(|e| {
1049 error!("Failed to sign request: {e}");
1050 pingora::Error::new_str("Failed to sign request")
1051 })?;
1052
1053 let seed_sig = upstream_request
1054 .headers
1055 .get("authorization")
1056 .and_then(|v| v.to_str().ok())
1057 .and_then(|v| v.split("Signature=").nth(1))
1058 .expect("seed signature missing")
1059 .to_owned();
1060
1061 ctx.stream_state = Some(signer::StreamingState::new(
1063 region.to_string(),
1064 access_key.to_string(),
1065 secret_key.to_string(),
1066 ts,
1067 seed_sig,
1068 ));
1069 } else {
1070 sign_request(
1071 upstream_request,
1072 bucket_config
1073 .as_ref()
1074 .ok_or_else(|| pingora::Error::new_str("no bucket config for signing"))?,
1075 )
1076 .await
1077 .map_err(|e| {
1078 error!("Failed to sign request for {}: {e}", hdr_bucket);
1079 pingora::Error::new_str("Failed to sign request")
1080 })?;
1081 }
1082
1083 debug!("Request signed for bucket: {}", hdr_bucket);
1084 debug!("{:#?}", &upstream_request.headers);
1085 } else {
1086 debug!("Using API key for bucket: {}", hdr_bucket);
1087 let api_key = match maybe_api_key {
1088 Some(key) => key,
1089 None => {
1090 error!("No API key for bucket {hdr_bucket}");
1093 return Err(pingora::Error::new_str("No API key configured for bucket"));
1094 }
1095 };
1096
1097 let bearer_fetcher = {
1099 let api_key = api_key.clone();
1100 move || get_bearer(api_key.clone())
1101 };
1102
1103 let bearer_token = ctx
1104 .secrets_cache
1105 .get(&hdr_bucket, bearer_fetcher)
1106 .await
1107 .ok_or_else(|| pingora::Error::new_str("Failed to obtain bearer token"))?;
1108
1109 upstream_request.insert_header("Authorization", format!("Bearer {bearer_token}"))?;
1110 }
1111
1112 debug!("Request sent to upstream.");
1115 debug!("upstream_request_filter::end");
1116
1117 Ok(())
1118 }
1119
1120 async fn response_filter(
1121 &self,
1122 _session: &mut Session,
1123 resp: &mut ResponseHeader,
1124 _ctx: &mut Self::CTX,
1125 ) -> Result<()> {
1126 let _ = resp.remove_header("Server");
1127
1128 let _ = resp.insert_header("Server", DEFAULT_SERVER_NAME);
1129
1130 Ok(())
1131 }
1132
1133 async fn request_body_filter(
1134 &self,
1135 _session: &mut Session,
1136 body: &mut Option<bytes::Bytes>,
1137 end_of_stream: bool,
1138 ctx: &mut Self::CTX,
1139 ) -> Result<()> {
1140 let Some(state) = ctx.stream_state.as_mut() else {
1142 return Ok(());
1143 };
1144
1145 let Some(payload) = body.take() else {
1147 return Ok(());
1148 };
1149 if payload.is_empty() && !end_of_stream {
1150 return Ok(());
1151 };
1152
1153 let mut out = BytesMut::new();
1155 if !payload.is_empty() {
1156 out.extend_from_slice(&state.sign_chunk(&payload).map_err(|e| {
1157 error!("Failed to sign chunk: {e}");
1158 pingora::Error::new_str("Failed to sign chunk")
1159 })?);
1160 }
1161 if end_of_stream {
1162 out.extend_from_slice(&state.final_chunk().map_err(|e| {
1163 error!("Failed to sign trailer: {e}");
1164 pingora::Error::new_str("Failed to sign trailer")
1165 })?);
1166 ctx.stream_state = None; }
1168
1169 *body = Some(out.freeze());
1171 Ok(())
1172 }
1173}
1174
1175pub fn init_tracing() {
1184 tracing_subscriber::fmt()
1185 .with_timer(ChronoLocal::rfc_3339())
1186 .with_env_filter(EnvFilter::from_default_env())
1187 .init();
1188}
1189
1190pub fn run_server(py: Python, run_args: &ProxyServerConfig) {
1203 print_banner();
1204 init_tracing();
1205
1206 if run_args.http_port.is_none() && run_args.https_port.is_none() {
1207 error!("At least one of http_port or https_port must be specified!");
1208 return;
1209 }
1210
1211 if let Some(http_port) = run_args.http_port {
1212 info!("starting HTTP server on port {}", http_port);
1213 }
1214
1215 if let Some(https_port) = run_args.https_port {
1216 info!("starting HTTPS server on port {}", https_port);
1217 }
1218
1219 let local_hmac_map = if Python::with_gil(|py| run_args.hmac_keystore.is_none(py)) {
1220 HashMap::new()
1221 } else {
1222 parse_hmac_list(py, &run_args.hmac_keystore).unwrap_or_default()
1223 };
1224
1225 debug!("HMAC keys: {:#?}", &local_hmac_map);
1226
1227 let cosmap = Arc::new(RwLock::new(
1228 parse_cos_map(py, &run_args.cos_map).expect("failed to parse cos_map"),
1229 ));
1230 let hmac_keystore = Arc::new(RwLock::new(local_hmac_map));
1231
1232 let mut my_server = Server::new(None).expect("failed to create pingora server");
1233 my_server.bootstrap();
1234
1235 let validator = run_args.validator.as_ref().map(|v| v.clone_ref(py));
1236 let hmac_fetcher = run_args.hmac_fetcher.as_ref().map(|v| v.clone_ref(py));
1237
1238 let mut my_proxy = pingora::proxy::http_proxy_service(
1239 &my_server.configuration,
1240 MyProxy {
1241 cos_endpoint: "s3.eu-de.cloud-object-storage.appdomain.cloud".to_string(),
1242 cos_mapping: Arc::clone(&cosmap),
1243 hmac_keystore: Arc::clone(&hmac_keystore),
1244 secrets_cache: SecretsCache::new(),
1245 auth_cache: AuthCache::new(),
1246 validator,
1247 bucket_creds_fetcher: run_args
1248 .bucket_creds_fetcher
1249 .as_ref()
1250 .map(|v| v.clone_ref(py)),
1251 verify: run_args.verify,
1252 skip_signature_validation: run_args.skip_signature_validation,
1253 hmac_fetcher,
1254 tracker: UrlTracker::new(),
1255 max_presign_url_usage_attempts: run_args.max_presign_url_usage_attempts,
1256 server_name: "<osp⚡>".to_string(),
1257 },
1258 );
1259
1260 if run_args.threads.is_some() {
1261 my_proxy.threads = run_args.threads;
1262 }
1263
1264 debug!("Proxy service threads: {:?}", &my_proxy.threads);
1265
1266 if let Some(http_port) = run_args.http_port {
1267 info!("starting HTTP server on port {}", &http_port);
1268 let addr = format!("0.0.0.0:{}", http_port);
1269 my_proxy.add_tcp(addr.as_str());
1270 }
1271
1272 if let Some(https_port) = run_args.https_port {
1273 let cert_path =
1274 std::env::var("TLS_CERT_PATH").expect("Set TLS_CERT_PATH to the PEM certificate file");
1275 let key_path =
1276 std::env::var("TLS_KEY_PATH").expect("Set TLS_KEY_PATH to the PEM private-key file");
1277
1278 let mut tls = pingora::listeners::tls::TlsSettings::intermediate(&cert_path, &key_path)
1279 .expect("failed to build TLS settings");
1280
1281 tls.enable_h2();
1282 let https_addr = format!("0.0.0.0:{}", https_port);
1283 my_proxy.add_tls_with_settings(https_addr.as_str(), None, tls);
1284 }
1285
1286 my_server.add_service(my_proxy);
1287
1288 debug!("{:?}", &my_server.configuration);
1289
1290 py.allow_threads(|| my_server.run_forever());
1291
1292 info!("server running ...");
1293}
1294
1295#[pyfunction]
1322pub fn start_server(py: Python, run_args: &ProxyServerConfig) -> PyResult<()> {
1323 rustls::crypto::ring::default_provider()
1324 .install_default()
1325 .expect("Failed to install rustls crypto provider");
1326
1327 dotenv().ok();
1328
1329 run_server(py, run_args);
1330
1331 Ok(())
1332}
1333
1334#[pyfunction]
1339fn enable_request_counting() {
1340 REQ_COUNTER_ENABLED.store(true, Ordering::Relaxed);
1341}
1342
1343#[pyfunction]
1345fn disable_request_counting() {
1346 REQ_COUNTER_ENABLED.store(false, Ordering::Relaxed);
1347}
1348
1349#[pyfunction]
1351fn get_request_count() -> PyResult<usize> {
1352 Ok(REQ_COUNTER.load(Ordering::Relaxed))
1353}
1354
1355#[pymodule]
1356fn object_storage_proxy(m: &Bound<'_, PyModule>) -> PyResult<()> {
1357 m.add_function(wrap_pyfunction!(start_server, m)?)?;
1358 m.add_class::<ProxyServerConfig>()?;
1359 m.add_class::<CosMapItem>()?;
1360 m.add_function(wrap_pyfunction!(enable_request_counting, m)?)?;
1361 m.add_function(wrap_pyfunction!(disable_request_counting, m)?)?;
1362 m.add_function(wrap_pyfunction!(get_request_count, m)?)?;
1363 Ok(())
1364}
1365
1366#[cfg(test)]
1367mod tests {
1368 use super::*;
1369
1370 #[test]
1373 fn url_tracker_new_is_empty() {
1374 let tracker = UrlTracker::new();
1375 assert!(tracker.get_all().is_empty());
1376 }
1377
1378 #[test]
1379 fn url_tracker_default_equals_new() {
1380 let t1 = UrlTracker::new();
1381 let t2 = UrlTracker::default();
1382 assert_eq!(t1.get_all().len(), t2.get_all().len());
1383 }
1384
1385 #[test]
1386 fn url_tracker_track_increments_count() {
1387 let tracker = UrlTracker::new();
1388 assert_eq!(tracker.get("http://example.com/key"), None);
1389 tracker.track("http://example.com/key");
1390 assert_eq!(tracker.get("http://example.com/key"), Some(1));
1391 tracker.track("http://example.com/key");
1392 assert_eq!(tracker.get("http://example.com/key"), Some(2));
1393 }
1394
1395 #[test]
1396 fn url_tracker_get_returns_none_for_unknown_url() {
1397 let tracker = UrlTracker::new();
1398 assert_eq!(tracker.get("http://example.com/missing"), None);
1399 }
1400
1401 #[test]
1402 fn url_tracker_get_all_returns_all_tracked_urls() {
1403 let tracker = UrlTracker::new();
1404 tracker.track("http://example.com/a");
1405 tracker.track("http://example.com/b");
1406 tracker.track("http://example.com/a");
1407 let mut all = tracker.get_all();
1408 all.sort_by_key(|(k, _)| k.clone());
1409 assert_eq!(all.len(), 2);
1410 assert_eq!(all[0], ("http://example.com/a".to_string(), 2));
1411 assert_eq!(all[1], ("http://example.com/b".to_string(), 1));
1412 }
1413}