object_storage_proxy/utils/
functions.rs1use pyo3::prelude::*;
2use tracing::debug;
3
4pub(crate) fn callable_accepts_request(py: Python<'_>, callable: &PyObject) -> PyResult<bool> {
12 let inspect = py.import("inspect")?;
13 let signature = inspect.call_method1("signature", (callable.to_owned(),))?;
14 let parameters = signature.getattr("parameters")?;
15 debug!(parameters = ?parameters, "inspecting callable signature");
16 let parameters = parameters.call_method0("items")?;
17
18 for p in parameters.try_iter()? {
19 let (name, param) = p?.extract::<(String, PyObject)>()?;
20 let annotation = param.getattr(py, "annotation")?;
21 debug!("Param: {}", name);
22 let arg_type = annotation.to_string();
23 debug!("Annotation: {}", &arg_type);
24 if name == "request" && arg_type.contains("dict") {
25 return Ok(true);
26 }
27 }
28
29 Ok(false)
30}