je_load_density.utils.reliability

class je_load_density.utils.reliability.AdaptiveRetryPolicy(transient_budget: int = 5, flaky_budget: int = 2, permanent_budget: int = 0, base_delay: float = 0.1, max_delay: float = 5.0, backoff_factor: float = 2.0, jitter: float = 0.25, classifier: Callable[[BaseException], str] = <function classify_error at 0x711192324860>, _counts: Dict[str, int] = <factory>)

Bases: object

backoff_factor: float = 2.0
base_delay: float = 0.1
budget_for(classification: str) int
classifier() str
decide(error: BaseException, attempt: int) RetryDecision
flaky_budget: int = 2
jitter: float = 0.25
max_delay: float = 5.0
permanent_budget: int = 0
reset() None
transient_budget: int = 5
exception je_load_density.utils.reliability.CircuitOpenError

Bases: RuntimeError

Raised when the failure budget trips.

class je_load_density.utils.reliability.FailureBudget(threshold: float = 0.05, window_seconds: float = 30.0, min_samples: int = 50, _events: Deque[tuple] = <factory>, _lock: <built-in function allocate_lock> = <factory>, _tripped: bool = False)

Bases: object

failure_rate(now: float | None = None) float
is_breached(now: float | None = None) bool
min_samples: int = 50
record(failed: bool, now: float | None = None) None
sample_count() int
threshold: float = 0.05
trip() None
property tripped: bool
window_seconds: float = 30.0
class je_load_density.utils.reliability.NetworkConditioner(latency_ms: float = 0.0, jitter_ms: float = 0.0, loss_rate: float = 0.0, name_filter: str | None = None)

Bases: object

applies_to(task: Dict[str, Any]) bool
apply(task: ~typing.Dict[str, ~typing.Any], sleeper: ~typing.Callable[[float], None] = <built-in function sleep>) None
jitter_ms: float = 0.0
latency_ms: float = 0.0
loss_rate: float = 0.0
name_filter: str | None = None
class je_load_density.utils.reliability.ProcessSupervisor(name_substrings: tuple = ('locust', 'gevent'), grace_seconds: float = 2.0, killed: List[int] = <factory>)

Bases: object

grace_seconds: float = 2.0
kill_orphans() List[int]
killed: List[int]
name_substrings: tuple = ('locust', 'gevent')
class je_load_density.utils.reliability.RetryDecision(attempt: int, classification: str, delay_seconds: float, will_retry: bool)

Bases: object

attempt: int
classification: str
delay_seconds: float
will_retry: bool
je_load_density.utils.reliability.classify_error(error: BaseException) str
je_load_density.utils.reliability.install_failure_budget(threshold: float = 0.05, window_seconds: float = 30.0, min_samples: int = 50, runner_quit_callback: Callable[[], None] | None = None) FailureBudget

Subscribe to Locust request events; trip the breaker when the sliding-window failure rate exceeds threshold. The supplied runner_quit_callback is invoked once on breach.

je_load_density.utils.reliability.install_network_conditioner(latency_ms: float = 0.0, jitter_ms: float = 0.0, loss_rate: float = 0.0, name_filter: str | None = None) NetworkConditioner
je_load_density.utils.reliability.run_with_retry(fn: ~typing.Callable[[], object], policy: ~je_load_density.utils.reliability.adaptive_retry.AdaptiveRetryPolicy | None = None, sleeper: ~typing.Callable[[float], None] = <built-in function sleep>) object

Run fn under policy. Returns fn’s result on success; re-raises the last exception once the budget is exhausted.

je_load_density.utils.reliability.uninstall_failure_budget() None
je_load_density.utils.reliability.uninstall_network_conditioner() None
je_load_density.utils.reliability.with_watchdog(callable_: Callable[[...], Any], *args: Any, timeout_seconds: float = 300.0, on_timeout: Callable[[], None] | None = None, **kwargs: Any) Any

Run callable_ and raise TimeoutError if it exceeds timeout_seconds. The callable continues running in its daemon thread until the process exits.

Modules

adaptive_retry

Adaptive retry with exponential backoff, jitter, and per-error-class budgets.

failure_budget

Sliding-window failure budget / circuit breaker.

network_conditioner

Network conditioner.

process_supervisor

Process supervisor.