je_load_density.utils.auth.oauth2
OAuth2 token helpers with a tiny in-memory cache.
Three flows supported:
client_credentials
password (Resource Owner Password Credentials)
refresh_token
Each helper returns the raw token dict; OAuth2Client wraps them with
a cache keyed on token-endpoint + scope, so subsequent calls reuse a
still-valid token instead of hitting the IdP on every task.
Network I/O is pluggable via the poster argument — tests inject a
fake poster.
Functions
|
|
|
|
|
Classes
|
- class je_load_density.utils.auth.oauth2.OAuth2Client(token_url: str, client_id: str, client_secret: str, scope: Optional[str] = None, timeout: float = 5.0, safety_window: float = 30.0, poster: Callable[[str, bytes, Dict[str, str], float], Dict[str, Any]] = <function _default_poster at 0x711191e8bf60>, _cache: Dict[Tuple[str, str], je_load_density.utils.auth.oauth2._CacheEntry] = <factory>, _lock: <built-in function allocate_lock> = <factory>)
Bases:
object- clear() None
- client_id: str
- client_secret: str
- get_client_credentials() Dict[str, Any]
- get_password(username: str, password: str) Dict[str, Any]
- poster(body: bytes, headers: Dict[str, str], timeout: float) Dict[str, Any]
- refresh(refresh: str) Dict[str, Any]
- safety_window: float = 30.0
- scope: str | None = None
- timeout: float = 5.0
- token_url: str
- je_load_density.utils.auth.oauth2.fetch_client_credentials_token(token_url: str, client_id: str, client_secret: str, scope: str | None = None, timeout: float = 5.0, poster: ~typing.Callable[[str, bytes, ~typing.Dict[str, str], float], ~typing.Dict[str, ~typing.Any]] = <function _default_poster>) Dict[str, Any]
- je_load_density.utils.auth.oauth2.fetch_password_token(token_url: str, client_id: str, client_secret: str, username: str, password: str, scope: str | None = None, timeout: float = 5.0, poster: ~typing.Callable[[str, bytes, ~typing.Dict[str, str], float], ~typing.Dict[str, ~typing.Any]] = <function _default_poster>) Dict[str, Any]
- je_load_density.utils.auth.oauth2.refresh_token(token_url: str, client_id: str, client_secret: str, refresh: str, timeout: float = 5.0, poster: ~typing.Callable[[str, bytes, ~typing.Dict[str, str], float], ~typing.Dict[str, ~typing.Any]] = <function _default_poster>) Dict[str, Any]