Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Byte-compiled / optimized files
__pycache__/
*.py[cod]
*$py.class

# Test / coverage artifacts
.pytest_cache/
.coverage
htmlcov/
.mypy_cache/

# Build / distribution
build/
dist/
*.egg-info/
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ profile = "black"
module = [
'scipy.*',
'networkx',
'dd',
'surpyval.*',
'tqdm',
]
Expand Down
118 changes: 92 additions & 26 deletions repyability/rbd/non_repairable_rbd.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from .helper_classes import PerfectReliability, PerfectUnreliability
from .rbd import RBD
from .repeated_node import RepeatedNode
from .repeated_standby_node import RepeatedStandbyNode
from .standby_node import StandbyModel


Expand Down Expand Up @@ -189,17 +190,22 @@ def __init__(
self.__fixed_probs = False
self.structure_check["all_distributions_fixed"] = False

if self.structure_check["is_valid"]:
self.compile_bdd()
# Record whether the system reliability can be solved analytically
# (equivalently with the BDD), or whether it requires simulation
# because one or more nodes are simulation-based (e.g. standby nodes).
non_analytic_nodes = self.get_non_analytic_nodes()
self.structure_check["is_analytically_solvable"] = (
len(non_analytic_nodes) == 0
)
self.structure_check["non_analytic_nodes"] = non_analytic_nodes

@check_x
def sf(
self,
x: Optional[ArrayLike] = None,
working_nodes: Collection[Hashable] = [],
broken_nodes: Collection[Hashable] = [],
method: str = "c",
approx: bool = False,
method: str = "p",
) -> np.ndarray:
"""Returns the system reliability for time/s x.

Expand All @@ -217,15 +223,9 @@ def sf(
Marks these components as perfectly unreliable, by default []
method: str, optional
Input either "c" or "p" for the function to use the cut set or
path set methods respectively, by default "c". Both methods
ultimately return the same results though.
approx: bool, optional
If true, only considers the first-order terms (w.r.t. the
inclusion-exclusion principle), thereby reducing computation time.
This approximation is only applicable to the cut set method
(method="c"), a ValueError exception is raised if method="p" and
approx=True. This approximation is typically sufficient for most
use cases where reliabilities are close to 1. By default, False.
path set methods respectively. Both methods return the same
(exact) result. By default the path set method ("p") is used, as
it avoids deriving the cut sets.

Returns
-------
Expand All @@ -235,11 +235,9 @@ def sf(
Raises
------
ValueError
- Working/broken node/component inconsistency (a component or node
is supplied more than once to any of working_nodes, broken_nodes,
working_components, broken_components)
- The path set method must not be used with approx=True, see approx
arg description above
Working/broken node/component inconsistency (a component or node
is supplied more than once to any of working_nodes, broken_nodes,
working_components, broken_components)
"""
# Check for any node/component argument inconsistency
# check_sf_node_component_args_consistency(
Expand All @@ -260,14 +258,6 @@ def sf(
).format(node, self.repeated[node])
)

# Check that path set method and approximation are not used together
# (The approximation is only applicable to the cutset method)
if method == "p" and approx:
raise ValueError(
"The path set method must not be used with \
approx=True, see approx arg description in docstring."
)

# Turn node iterables into sets for O(1) lookup later
working_nodes = set(working_nodes)
broken_nodes = set(broken_nodes)
Expand Down Expand Up @@ -366,6 +356,82 @@ def ff_by_node(
def time_varying_rbd(self):
return not self.__fixed_probs

# Node model types whose reliability is obtained by Monte-Carlo simulation
# (a Kaplan-Meier fit to simulated samples) rather than in closed form. A
# standby arrangement is sequence-dependent (dynamic), so its sf(t) cannot
# be expressed analytically and is instead estimated by simulation. Such
# nodes therefore prevent a purely analytic / BDD solution of the system.
_SIMULATION_NODE_TYPES = (StandbyModel, RepeatedStandbyNode)

def _node_is_analytic(self, model) -> bool:
"""Returns True if a node's reliability is available without
Monte-Carlo simulation (i.e. in closed form or from data), and so can
be consumed directly by the analytic / BDD system probability.

The check recurses through RepeatedNodes (analytic iff their underlying
model is) and nested NonRepairableRBDs (analytic iff they are
themselves analytically solvable).
"""
# Perfect reliability / unreliability are constants
if model is PerfectReliability or model is PerfectUnreliability:
return True
# Standby arrangements are simulation-based (KM fit) -> non-analytic
if isinstance(model, self._SIMULATION_NODE_TYPES):
return False
# A repeated node is analytic iff its underlying model is
if isinstance(model, RepeatedNode):
return self._node_is_analytic(model.model)
# A nested RBD is analytic iff it is itself analytically solvable
if isinstance(model, NonRepairableRBD):
return model.is_analytically_solvable()
# Otherwise it is a surpyval parametric/non-parametric distribution
# (incl. FixedEventProbability), all of which expose a usable sf(t)
# without simulation.
return True

def get_non_analytic_nodes(self) -> dict[Any, str]:
"""Returns the nodes that prevent an analytic / BDD solution.

Returns
-------
dict[Any, str]
A mapping of node name -> the offending model's type name for
every node whose reliability requires Monte-Carlo simulation (e.g.
a StandbyModel). Empty if the RBD is analytically solvable.
"""
non_analytic: dict[Any, str] = {}
for node_name, model in self.reliabilities.items():
if not self._node_is_analytic(model):
non_analytic[node_name] = type(model).__name__
return non_analytic

def is_analytically_solvable(self) -> bool:
"""Returns whether the system reliability can be solved analytically.

The analytic methods (the inclusion-exclusion in system_probability(),
and equivalently a BDD evaluation) require every node to expose a
reliability sf(t) that does not itself depend on Monte-Carlo
simulation. This holds for parametric and non-parametric distributions,
fixed-probability nodes, repeated nodes of such models, and nested RBDs
that are themselves analytically solvable.

It does NOT hold when any node is a standby arrangement (StandbyModel
or RepeatedStandbyNode): a standby node is sequence-dependent and its
sf(t) is estimated by simulation, so while sf()/system_probability()
will still return a value, that value is only as good as the underlying
Monte-Carlo + Kaplan-Meier fit (a step function bounded by the sampled
support) rather than a closed-form result. Such systems are better
evaluated by simulation (e.g. random()/mean()).

Returns
-------
bool
True if the RBD can be solved analytically / with a BDD, False if
it requires simulation. Use get_non_analytic_nodes() to see which
nodes are responsible.
"""
return len(self.get_non_analytic_nodes()) == 0

def random(self, size):
out = np.zeros(size)
for i in range(size):
Expand Down
189 changes: 189 additions & 0 deletions repyability/rbd/numerical_convolution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
"""
Numerical convolution of independent lifetimes.

A cold-standby arrangement fails after the *sum* of its components' lifetimes
(when switching is perfect), so the survival function of the arrangement is
the convolution of the components' distributions. With imperfect switching it
is a mixture of partial sums, weighted by how many switches succeed. This
module computes that survival function numerically -- deterministically and
quickly -- as a robust alternative to estimating it from Monte-Carlo samples
with a Kaplan-Meier fit.
"""

import numpy as np
from scipy.integrate import cumulative_trapezoid, trapezoid
from scipy.signal import fftconvolve


def _scalar(value) -> float:
"""Return a plain float from a surpyval scalar/array result."""
return float(np.atleast_1d(value)[0])


def _upper_time(model, eps: float = 1e-10) -> float:
"""A time by which ``model``'s survival has effectively reached zero.

Found by doubling from the mean until sf <= eps, so it is robust for any
distribution exposing sf() and mean() (it does not rely on a quantile
function).
"""
t = max(_scalar(model.mean()), 1.0)
for _ in range(200):
if _scalar(model.sf(t)) <= eps:
break
t *= 2.0
return t


def _density_on_grid(model, t: np.ndarray) -> np.ndarray:
"""The model's density on the grid, with any non-finite values (e.g. an
infinite density at t=0 for some shapes) replaced by zero. The negligible
mass lost is restored by the later CDF normalisation."""
pdf = np.asarray(model.df(t), dtype=float)
return np.nan_to_num(pdf, nan=0.0, posinf=0.0, neginf=0.0)


def switch_success_probs(switching_probability, n) -> list:
"""Normalise a switching_probability into a list of (n-1) per-switch
success probabilities (the switches into components 2..n).

switching_probability is either a scalar (same probability for every
switch) or a sequence of length n-1 (one success probability per switch).
"""
if n <= 1:
return []
if np.isscalar(switching_probability):
probs = [float(switching_probability)] * (n - 1)
else:
probs = [float(p) for p in switching_probability]
if len(probs) != n - 1:
raise ValueError(
"switching_probability sequence must have length "
f"{n - 1} (one per switch), got {len(probs)}"
)
for p in probs:
if not (0.0 <= p <= 1.0):
raise ValueError("switching probabilities must be in [0, 1]")
return probs


def is_perfect_switching(switching_probability) -> bool:
"""True if every switch succeeds with probability 1."""
if np.isscalar(switching_probability):
return float(switching_probability) == 1.0
return all(float(p) == 1.0 for p in switching_probability)


def _switching_weights(switching_probability, n) -> list:
"""Mixture weights for a cold-standby chain with imperfect switching.

Returns a list ``w`` of length n where ``w[k]`` is the probability that
exactly the first (k+1) components run: switches 1..k succeeded and switch
(k+1) failed (or, for the last, every switch succeeded).
"""
if n == 1:
return [1.0]
probs = switch_success_probs(switching_probability, n)
weights = []
prefix = 1.0
for p in probs:
# This switch fails (after all earlier ones succeeded): stop here.
weights.append(prefix * (1.0 - p))
prefix *= p
# Every switch succeeded: all n components run.
weights.append(prefix)
return weights


def _sf_from_pdf(pdf: np.ndarray, t: np.ndarray) -> np.ndarray:
"""Survival function on the grid from a (possibly un-normalised) density,
normalising away discretisation drift."""
cdf = cumulative_trapezoid(pdf, t, initial=0.0)
total = cdf[-1]
if total <= 0.0:
return np.ones_like(t)
return np.clip(1.0 - cdf / total, 0.0, 1.0)


class ConvolvedSurvival:
"""Survival function of a cold-standby arrangement (sum of lifetimes).

With perfect switching the arrangement fails after the sum of its
components' lifetimes, whose survival function is the convolution of the
component distributions. With imperfect switching, each switch onto the
next spare succeeds only with some probability, so the lifetime is a
mixture of partial sums (run the first component; if its switch works, run
the second too; and so on). This computes that mixture by numerically
convolving the component densities on a fine time grid. sf()/ff() then
interpolate the pre-computed grid, so they are fast and deterministic.

Parameters
----------
models : sequence
The component lifetime distributions, in standby order (primary
first). Each must expose df() (density), sf() (survival) and mean().
switching_probability : float or sequence, optional
Probability that a switch onto the next spare succeeds. A scalar
applies to every switch; a sequence gives one probability per switch
(length len(models) - 1). By default 1.0 (perfect switching), which
reduces to the plain convolution.
n_points : int, optional
Number of grid points, by default 100_001. More points give more
accuracy at the cost of construction time.
eps : float, optional
Survival threshold used to bound the time grid, by default 1e-10.
"""

def __init__(
self,
models,
switching_probability=1.0,
n_points: int = 100_001,
eps: float = 1e-10,
):
models = list(models)
n = len(models)
if n == 0:
raise ValueError("Need at least one model to convolve.")

weights = _switching_weights(switching_probability, n)
self.switching_weights = weights

# Bound the grid by the sum of the components' effective upper times
# (the support of the full sum is contained in [0, sum of supports]).
upper = sum(_upper_time(model, eps) for model in models)
t = np.linspace(0.0, upper, n_points)
dt = t[1] - t[0]

# Incrementally convolve to get the density of each partial sum
# T_1 + ... + T_k (k = 1..n).
partial_pdfs = []
pdf = _density_on_grid(models[0], t)
partial_pdfs.append(pdf)
for model in models[1:]:
pdf = fftconvolve(pdf, _density_on_grid(model, t))[:n_points] * dt
partial_pdfs.append(pdf)

# Survival function is the weighted mixture of the partial-sum survival
# functions (zero-weight partials are skipped, so perfect switching
# only evaluates the full convolution).
sf = np.zeros(n_points)
for weight, partial_pdf in zip(weights, partial_pdfs):
if weight == 0.0:
continue
sf += weight * _sf_from_pdf(partial_pdf, t)

self._t = t
self._sf = np.clip(sf, 0.0, 1.0)

def sf(self, x):
"""Survival function at x (1 at/below 0, ~0 beyond the grid)."""
return np.interp(x, self._t, self._sf, left=1.0, right=0.0)

def ff(self, x):
"""Cumulative failure probability (CDF) at x."""
return 1.0 - self.sf(x)

def mean(self, *args, **kwargs) -> float:
"""Mean lifetime, E[T] = integral of the survival function."""
return float(trapezoid(self._sf, self._t))
Loading
Loading