diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4e7c166 --- /dev/null +++ b/.gitignore @@ -0,0 +1,15 @@ +# Byte-compiled / optimized files +__pycache__/ +*.py[cod] +*$py.class + +# Test / coverage artifacts +.pytest_cache/ +.coverage +htmlcov/ +.mypy_cache/ + +# Build / distribution +build/ +dist/ +*.egg-info/ diff --git a/pyproject.toml b/pyproject.toml index 43b36da..3cc3d97 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,7 +13,6 @@ profile = "black" module = [ 'scipy.*', 'networkx', - 'dd', 'surpyval.*', 'tqdm', ] diff --git a/repyability/rbd/non_repairable_rbd.py b/repyability/rbd/non_repairable_rbd.py index 5aa286f..439d2f7 100644 --- a/repyability/rbd/non_repairable_rbd.py +++ b/repyability/rbd/non_repairable_rbd.py @@ -13,6 +13,7 @@ from .helper_classes import PerfectReliability, PerfectUnreliability from .rbd import RBD from .repeated_node import RepeatedNode +from .repeated_standby_node import RepeatedStandbyNode from .standby_node import StandbyModel @@ -189,8 +190,14 @@ def __init__( self.__fixed_probs = False self.structure_check["all_distributions_fixed"] = False - if self.structure_check["is_valid"]: - self.compile_bdd() + # Record whether the system reliability can be solved analytically + # (equivalently with the BDD), or whether it requires simulation + # because one or more nodes are simulation-based (e.g. standby nodes). + non_analytic_nodes = self.get_non_analytic_nodes() + self.structure_check["is_analytically_solvable"] = ( + len(non_analytic_nodes) == 0 + ) + self.structure_check["non_analytic_nodes"] = non_analytic_nodes @check_x def sf( @@ -198,8 +205,7 @@ def sf( x: Optional[ArrayLike] = None, working_nodes: Collection[Hashable] = [], broken_nodes: Collection[Hashable] = [], - method: str = "c", - approx: bool = False, + method: str = "p", ) -> np.ndarray: """Returns the system reliability for time/s x. @@ -217,15 +223,9 @@ def sf( Marks these components as perfectly unreliable, by default [] method: str, optional Input either "c" or "p" for the function to use the cut set or - path set methods respectively, by default "c". Both methods - ultimately return the same results though. - approx: bool, optional - If true, only considers the first-order terms (w.r.t. the - inclusion-exclusion principle), thereby reducing computation time. - This approximation is only applicable to the cut set method - (method="c"), a ValueError exception is raised if method="p" and - approx=True. This approximation is typically sufficient for most - use cases where reliabilities are close to 1. By default, False. + path set methods respectively. Both methods return the same + (exact) result. By default the path set method ("p") is used, as + it avoids deriving the cut sets. Returns ------- @@ -235,11 +235,9 @@ def sf( Raises ------ ValueError - - Working/broken node/component inconsistency (a component or node - is supplied more than once to any of working_nodes, broken_nodes, - working_components, broken_components) - - The path set method must not be used with approx=True, see approx - arg description above + Working/broken node/component inconsistency (a component or node + is supplied more than once to any of working_nodes, broken_nodes, + working_components, broken_components) """ # Check for any node/component argument inconsistency # check_sf_node_component_args_consistency( @@ -260,14 +258,6 @@ def sf( ).format(node, self.repeated[node]) ) - # Check that path set method and approximation are not used together - # (The approximation is only applicable to the cutset method) - if method == "p" and approx: - raise ValueError( - "The path set method must not be used with \ - approx=True, see approx arg description in docstring." - ) - # Turn node iterables into sets for O(1) lookup later working_nodes = set(working_nodes) broken_nodes = set(broken_nodes) @@ -366,6 +356,82 @@ def ff_by_node( def time_varying_rbd(self): return not self.__fixed_probs + # Node model types whose reliability is obtained by Monte-Carlo simulation + # (a Kaplan-Meier fit to simulated samples) rather than in closed form. A + # standby arrangement is sequence-dependent (dynamic), so its sf(t) cannot + # be expressed analytically and is instead estimated by simulation. Such + # nodes therefore prevent a purely analytic / BDD solution of the system. + _SIMULATION_NODE_TYPES = (StandbyModel, RepeatedStandbyNode) + + def _node_is_analytic(self, model) -> bool: + """Returns True if a node's reliability is available without + Monte-Carlo simulation (i.e. in closed form or from data), and so can + be consumed directly by the analytic / BDD system probability. + + The check recurses through RepeatedNodes (analytic iff their underlying + model is) and nested NonRepairableRBDs (analytic iff they are + themselves analytically solvable). + """ + # Perfect reliability / unreliability are constants + if model is PerfectReliability or model is PerfectUnreliability: + return True + # Standby arrangements are simulation-based (KM fit) -> non-analytic + if isinstance(model, self._SIMULATION_NODE_TYPES): + return False + # A repeated node is analytic iff its underlying model is + if isinstance(model, RepeatedNode): + return self._node_is_analytic(model.model) + # A nested RBD is analytic iff it is itself analytically solvable + if isinstance(model, NonRepairableRBD): + return model.is_analytically_solvable() + # Otherwise it is a surpyval parametric/non-parametric distribution + # (incl. FixedEventProbability), all of which expose a usable sf(t) + # without simulation. + return True + + def get_non_analytic_nodes(self) -> dict[Any, str]: + """Returns the nodes that prevent an analytic / BDD solution. + + Returns + ------- + dict[Any, str] + A mapping of node name -> the offending model's type name for + every node whose reliability requires Monte-Carlo simulation (e.g. + a StandbyModel). Empty if the RBD is analytically solvable. + """ + non_analytic: dict[Any, str] = {} + for node_name, model in self.reliabilities.items(): + if not self._node_is_analytic(model): + non_analytic[node_name] = type(model).__name__ + return non_analytic + + def is_analytically_solvable(self) -> bool: + """Returns whether the system reliability can be solved analytically. + + The analytic methods (the inclusion-exclusion in system_probability(), + and equivalently a BDD evaluation) require every node to expose a + reliability sf(t) that does not itself depend on Monte-Carlo + simulation. This holds for parametric and non-parametric distributions, + fixed-probability nodes, repeated nodes of such models, and nested RBDs + that are themselves analytically solvable. + + It does NOT hold when any node is a standby arrangement (StandbyModel + or RepeatedStandbyNode): a standby node is sequence-dependent and its + sf(t) is estimated by simulation, so while sf()/system_probability() + will still return a value, that value is only as good as the underlying + Monte-Carlo + Kaplan-Meier fit (a step function bounded by the sampled + support) rather than a closed-form result. Such systems are better + evaluated by simulation (e.g. random()/mean()). + + Returns + ------- + bool + True if the RBD can be solved analytically / with a BDD, False if + it requires simulation. Use get_non_analytic_nodes() to see which + nodes are responsible. + """ + return len(self.get_non_analytic_nodes()) == 0 + def random(self, size): out = np.zeros(size) for i in range(size): diff --git a/repyability/rbd/numerical_convolution.py b/repyability/rbd/numerical_convolution.py new file mode 100644 index 0000000..66cac8c --- /dev/null +++ b/repyability/rbd/numerical_convolution.py @@ -0,0 +1,189 @@ +""" +Numerical convolution of independent lifetimes. + +A cold-standby arrangement fails after the *sum* of its components' lifetimes +(when switching is perfect), so the survival function of the arrangement is +the convolution of the components' distributions. With imperfect switching it +is a mixture of partial sums, weighted by how many switches succeed. This +module computes that survival function numerically -- deterministically and +quickly -- as a robust alternative to estimating it from Monte-Carlo samples +with a Kaplan-Meier fit. +""" + +import numpy as np +from scipy.integrate import cumulative_trapezoid, trapezoid +from scipy.signal import fftconvolve + + +def _scalar(value) -> float: + """Return a plain float from a surpyval scalar/array result.""" + return float(np.atleast_1d(value)[0]) + + +def _upper_time(model, eps: float = 1e-10) -> float: + """A time by which ``model``'s survival has effectively reached zero. + + Found by doubling from the mean until sf <= eps, so it is robust for any + distribution exposing sf() and mean() (it does not rely on a quantile + function). + """ + t = max(_scalar(model.mean()), 1.0) + for _ in range(200): + if _scalar(model.sf(t)) <= eps: + break + t *= 2.0 + return t + + +def _density_on_grid(model, t: np.ndarray) -> np.ndarray: + """The model's density on the grid, with any non-finite values (e.g. an + infinite density at t=0 for some shapes) replaced by zero. The negligible + mass lost is restored by the later CDF normalisation.""" + pdf = np.asarray(model.df(t), dtype=float) + return np.nan_to_num(pdf, nan=0.0, posinf=0.0, neginf=0.0) + + +def switch_success_probs(switching_probability, n) -> list: + """Normalise a switching_probability into a list of (n-1) per-switch + success probabilities (the switches into components 2..n). + + switching_probability is either a scalar (same probability for every + switch) or a sequence of length n-1 (one success probability per switch). + """ + if n <= 1: + return [] + if np.isscalar(switching_probability): + probs = [float(switching_probability)] * (n - 1) + else: + probs = [float(p) for p in switching_probability] + if len(probs) != n - 1: + raise ValueError( + "switching_probability sequence must have length " + f"{n - 1} (one per switch), got {len(probs)}" + ) + for p in probs: + if not (0.0 <= p <= 1.0): + raise ValueError("switching probabilities must be in [0, 1]") + return probs + + +def is_perfect_switching(switching_probability) -> bool: + """True if every switch succeeds with probability 1.""" + if np.isscalar(switching_probability): + return float(switching_probability) == 1.0 + return all(float(p) == 1.0 for p in switching_probability) + + +def _switching_weights(switching_probability, n) -> list: + """Mixture weights for a cold-standby chain with imperfect switching. + + Returns a list ``w`` of length n where ``w[k]`` is the probability that + exactly the first (k+1) components run: switches 1..k succeeded and switch + (k+1) failed (or, for the last, every switch succeeded). + """ + if n == 1: + return [1.0] + probs = switch_success_probs(switching_probability, n) + weights = [] + prefix = 1.0 + for p in probs: + # This switch fails (after all earlier ones succeeded): stop here. + weights.append(prefix * (1.0 - p)) + prefix *= p + # Every switch succeeded: all n components run. + weights.append(prefix) + return weights + + +def _sf_from_pdf(pdf: np.ndarray, t: np.ndarray) -> np.ndarray: + """Survival function on the grid from a (possibly un-normalised) density, + normalising away discretisation drift.""" + cdf = cumulative_trapezoid(pdf, t, initial=0.0) + total = cdf[-1] + if total <= 0.0: + return np.ones_like(t) + return np.clip(1.0 - cdf / total, 0.0, 1.0) + + +class ConvolvedSurvival: + """Survival function of a cold-standby arrangement (sum of lifetimes). + + With perfect switching the arrangement fails after the sum of its + components' lifetimes, whose survival function is the convolution of the + component distributions. With imperfect switching, each switch onto the + next spare succeeds only with some probability, so the lifetime is a + mixture of partial sums (run the first component; if its switch works, run + the second too; and so on). This computes that mixture by numerically + convolving the component densities on a fine time grid. sf()/ff() then + interpolate the pre-computed grid, so they are fast and deterministic. + + Parameters + ---------- + models : sequence + The component lifetime distributions, in standby order (primary + first). Each must expose df() (density), sf() (survival) and mean(). + switching_probability : float or sequence, optional + Probability that a switch onto the next spare succeeds. A scalar + applies to every switch; a sequence gives one probability per switch + (length len(models) - 1). By default 1.0 (perfect switching), which + reduces to the plain convolution. + n_points : int, optional + Number of grid points, by default 100_001. More points give more + accuracy at the cost of construction time. + eps : float, optional + Survival threshold used to bound the time grid, by default 1e-10. + """ + + def __init__( + self, + models, + switching_probability=1.0, + n_points: int = 100_001, + eps: float = 1e-10, + ): + models = list(models) + n = len(models) + if n == 0: + raise ValueError("Need at least one model to convolve.") + + weights = _switching_weights(switching_probability, n) + self.switching_weights = weights + + # Bound the grid by the sum of the components' effective upper times + # (the support of the full sum is contained in [0, sum of supports]). + upper = sum(_upper_time(model, eps) for model in models) + t = np.linspace(0.0, upper, n_points) + dt = t[1] - t[0] + + # Incrementally convolve to get the density of each partial sum + # T_1 + ... + T_k (k = 1..n). + partial_pdfs = [] + pdf = _density_on_grid(models[0], t) + partial_pdfs.append(pdf) + for model in models[1:]: + pdf = fftconvolve(pdf, _density_on_grid(model, t))[:n_points] * dt + partial_pdfs.append(pdf) + + # Survival function is the weighted mixture of the partial-sum survival + # functions (zero-weight partials are skipped, so perfect switching + # only evaluates the full convolution). + sf = np.zeros(n_points) + for weight, partial_pdf in zip(weights, partial_pdfs): + if weight == 0.0: + continue + sf += weight * _sf_from_pdf(partial_pdf, t) + + self._t = t + self._sf = np.clip(sf, 0.0, 1.0) + + def sf(self, x): + """Survival function at x (1 at/below 0, ~0 beyond the grid).""" + return np.interp(x, self._t, self._sf, left=1.0, right=0.0) + + def ff(self, x): + """Cumulative failure probability (CDF) at x.""" + return 1.0 - self.sf(x) + + def mean(self, *args, **kwargs) -> float: + """Mean lifetime, E[T] = integral of the survival function.""" + return float(trapezoid(self._sf, self._t)) diff --git a/repyability/rbd/rbd.py b/repyability/rbd/rbd.py index ed3c4e9..59bdbca 100644 --- a/repyability/rbd/rbd.py +++ b/repyability/rbd/rbd.py @@ -2,12 +2,10 @@ import warnings from collections import defaultdict from copy import copy -from itertools import combinations, product from typing import Any, Dict, Hashable, Iterable, Iterator, Optional import networkx as nx import numpy as np -from dd import autoref as _bdd from numpy.typing import ArrayLike from scipy.optimize import minimize, root from scipy.special import expit as sigmoid @@ -71,6 +69,142 @@ def scale_probability_dict( return out +def probability_any_set_satisfied( + sets: Iterable[frozenset], + element_probabilities: Dict[Any, np.ndarray], + array_shape, +) -> np.ndarray: + """Exact probability that at least one of the given sets is fully active. + + Each "set" is a collection of elements (e.g. a minimal path set of + components); the set is "satisfied" when *all* of its elements are active. + Given the per-element probability of being active, this returns the + probability that *at least one* set is satisfied. + + With path sets and node reliabilities this is the system reliability; with + cut sets and node unreliabilities it is the system unreliability. + + The computation is an exact Shannon decomposition of the structure + function: it repeatedly conditions on a single element being active or + inactive, + + P(S) = p_e * P(S | e active) + (1 - p_e) * P(S | e inactive), + + and memoises sub-problems. Because shared sub-functions are only solved + once, this avoids the 2^(#sets) blow-up of the inclusion-exclusion + principle while returning the identical exact result. + + Parameters + ---------- + sets : Iterable[frozenset] + The collection of sets (e.g. minimal path sets or cut sets). + element_probabilities : Dict[Any, np.ndarray] + Maps each element to its probability array of being active. + array_shape : + The shape of the probability arrays, used to seed the 0/1 base cases. + + Returns + ------- + np.ndarray + The probability that at least one set is fully active. + """ + sets = [frozenset(s) for s in sets] + memo: Dict[frozenset, np.ndarray] = {} + + def recurse(state: frozenset) -> np.ndarray: + # state is a frozenset of frozensets: the sets still to be satisfied, + # with already-active elements removed. + if not state: + # No set can be satisfied any more -> probability 0. + return np.zeros(array_shape) + if frozenset() in state: + # A set has had all its elements satisfied -> probability 1. + return np.ones(array_shape) + if state in memo: + return memo[state] + + # Pivot on the element appearing in the most sets, which tends to + # collapse the problem (and the memo table) fastest. + counts: Dict[Any, int] = {} + for s in state: + for element in s: + counts[element] = counts.get(element, 0) + 1 + pivot = max(counts, key=lambda e: counts[e]) + p = element_probabilities[pivot] + + # Pivot active: it satisfies its requirement, so drop it from every + # set that contained it (other sets are unaffected). + state_active = frozenset(s - {pivot} for s in state) + # Pivot inactive: any set needing it can never be satisfied -> drop it. + state_inactive = frozenset(s for s in state if pivot not in s) + + result = p * recurse(state_active) + (1 - p) * recurse(state_inactive) + memo[state] = result + return result + + return recurse(frozenset(sets)) + + +def _keep_minimal_sets(sets: Iterable[frozenset]) -> list[frozenset]: + """Return only the inclusion-minimal sets. + + Discards any set that is a superset of another (and de-duplicates). + """ + minimal: list[frozenset] = [] + # Consider smaller sets first so that a kept set can prune its supersets. + for candidate in sorted(set(sets), key=len): + if not any(kept <= candidate for kept in minimal): + minimal.append(candidate) + return minimal + + +def minimal_cut_sets_from_path_sets( + path_sets: Iterable[frozenset], +) -> set[frozenset]: + """Return the minimal cut sets given the minimal path sets. + + A minimal cut set is a minimal "transversal" (hitting set) of the path + sets: a smallest set of components that intersects every path set, so that + failing those components breaks every path through the system. + + This uses Berge's algorithm: it builds the minimal transversals + incrementally, one path set at a time, discarding non-minimal candidates at + every step. Unlike taking the full Cartesian product of the path sets and + filtering once at the end, it never materialises the (potentially enormous) + product, which makes it dramatically faster in practice. Because it works + directly from the path sets, it stays correct for k-out-of-n structures, + whose k-of-n behaviour is already encoded in the path sets. + + Parameters + ---------- + path_sets : Iterable[frozenset] + The minimal path sets (each a set of components). + + Returns + ------- + set[frozenset] + The minimal cut sets. + """ + # Start with the single empty transversal and extend it to hit each path + # set in turn. + transversals: list[frozenset] = [frozenset()] + for path_set in path_sets: + path_set = frozenset(path_set) + candidates: list[frozenset] = [] + for transversal in transversals: + if transversal & path_set: + # Already hits this path set; keep it unchanged. + candidates.append(transversal) + else: + # Must be extended to hit this path set, by one of its + # components. + for component in path_set: + candidates.append(transversal | {component}) + # Prune non-minimal candidates now so the working set stays small. + transversals = _keep_minimal_sets(candidates) + return set(transversals) + + class RBD: def __init__( self, @@ -250,60 +384,6 @@ def get_min_path_sets( return ret_set - def compile_bdd(self): - """ - Compiles the BDD, setting self.bdd to the BDD manager, and - self.bdd_system_ref to the BDD variable reference for the system state. - """ - # Instantiate the manager - bdd = _bdd.BDD() - bdd_c = _bdd.BDD() - - # Enable Dynamic Reordering (Rudell's Sifting Algorithm) - bdd.configure(reordering=True) - bdd_c.configure(reordering=True) - - # For all components, declare the BDD variable, - # and get the BDD variable reference - bdd_vars = {} - bdd_c_vars = {} - for component in self.G.nodes(): - bdd.declare(component) - bdd_vars[component] = bdd.var(component) - bdd_c.declare(component) - bdd_c_vars[component] = bdd_c.var(component) - - # Make path expressions - path_expressions: list[_bdd.Function] = [] - for min_path_set in self.get_min_path_sets(include_in_out_nodes=False): - min_path_set_as_list = list(min_path_set) - path_function = bdd_vars[min_path_set_as_list[0]] - for component in min_path_set_as_list[1:]: - path_function &= bdd_vars[component] - path_expressions.append(path_function) - - # Make cut expressions - cut_expressions: list[_bdd.Function] = [] - for min_cut_set in self.get_min_cut_sets(include_in_out_nodes=False): - min_cut_set_as_list = list(min_cut_set) - cut_function = bdd_c_vars[min_cut_set_as_list[0]] - for component in min_cut_set_as_list[1:]: - cut_function |= bdd_c_vars[component] - cut_expressions.append(cut_function) - - system: _bdd.Function = path_expressions[0] - for path_expression in path_expressions[1:]: - system |= path_expression - - system_c: _bdd.Function = cut_expressions[0] - for cut_expression in cut_expressions[1:]: - system_c &= cut_expression - - self.bdd = bdd - self.bdd_c = bdd_c - self.bdd_system_ref = system - self.bdd_system_c_ref = system_c - def is_system_working( self, component_status: dict[Any, bool], method: str ) -> bool: @@ -316,35 +396,54 @@ def is_system_working( Dictionary with all components where component_status[component] = True only if the component is working, and = False if not working. + method : str + Either "p" (path-set) or "c" (cut-set). Both return the same + result; "p" is typically faster as it avoids deriving the cut + sets. Returns ------- bool True if the system is working, otherwise False. """ - # This function uses a Binary Decision Diagram (BDD) to enable - # efficient component_status -> is_system_working lookup. Something - # very much required given the rate at which this function is called - # in the N simulations in availability(). - - # Now evaluate the BDD given the component status - if method == "c": - system_given_component_status = self.bdd_c.let( - component_status, self.bdd_system_c_ref + # The system structure function is evaluated directly from the minimal + # path/cut sets, which is plenty fast for the rate at which this is + # called in the simulations (no Binary Decision Diagram required). + # + # - path-set ("p"): the system works iff at least one minimal path set + # has all of its components working. + # - cut-set ("c"): the system works iff every minimal cut set has at + # least one of its components working. + # + # The path/cut sets (excluding the input/output nodes) are cached on + # first use so repeated calls during a simulation are cheap. + if method == "p": + if not hasattr(self, "_eval_path_sets"): + self._eval_path_sets = [ + tuple(path_set) + for path_set in self.get_min_path_sets( + include_in_out_nodes=False + ) + ] + return any( + all(component_status[component] for component in path_set) + for path_set in self._eval_path_sets ) - - system_state = self.bdd_c.to_expr(system_given_component_status) - elif method == "p": - system_given_component_status = self.bdd.let( - component_status, self.bdd_system_ref + elif method == "c": + if not hasattr(self, "_eval_cut_sets"): + self._eval_cut_sets = [ + tuple(cut_set) + for cut_set in self.get_min_cut_sets( + include_in_out_nodes=False + ) + ] + return all( + any(component_status[component] for component in cut_set) + for cut_set in self._eval_cut_sets ) - - system_state = self.bdd.to_expr(system_given_component_status) else: raise ValueError("`method` must be either 'p' or 'c'") - return system_state == "TRUE" - def get_min_cut_sets( self, include_in_out_nodes=False ) -> set[frozenset[Hashable]]: @@ -352,35 +451,15 @@ def get_min_cut_sets( Returns the set of frozensets of minimal cut sets of the RBD. The outer set contains the frozenset of nodes. frozensets were used so the inner set elements could be hashable. + + The minimal cut sets are the minimal transversals (hitting sets) of the + minimal path sets, computed with Berge's algorithm. See + minimal_cut_sets_from_path_sets() for details. """ path_sets = self.get_min_path_sets( include_in_out_nodes=include_in_out_nodes ) - - # Gets the cartesian product across pathsets - prods = product(*path_sets) - - # We need to remove duplicate nodes in the products to get the cutsets, - # and discard empty products - cut_sets = [frozenset(prod) for prod in prods if prod] - - min_cut_sets: list[frozenset] = [] - - # Now only insert if minimal, removing any superset (non-minimal) - # cutsets are encountered - for cut_set in cut_sets: - is_minimal_cut_set = True - for other_cut_set in min_cut_sets.copy(): - if cut_set.issuperset(other_cut_set): - is_minimal_cut_set = False - break - if cut_set.issubset(other_cut_set): - min_cut_sets.remove(other_cut_set) - - if is_minimal_cut_set: - min_cut_sets.append(cut_set) - - return set(min_cut_sets) + return minimal_cut_sets_from_path_sets(path_sets) def path_set_probabilities(self, node_probabilities): path_sets = self.get_min_path_sets(include_in_out_nodes=False) @@ -395,8 +474,7 @@ def path_set_probabilities(self, node_probabilities): def system_probability( self, node_probabilities: Dict, - method: str = "c", - approx: bool = False, + method: str = "p", ) -> np.ndarray: """Returns the system probability/ies given the probability of each node. @@ -409,15 +487,9 @@ def system_probability( availability (or some other probability that I can't conceive). method: str, optional Input either "c" or "p" for the function to use the cut set or - path set methods respectively, by default "c". Both methods - ultimately return the same results though. - approx: bool, optional - If true, only considers the first-order terms (w.r.t. the - inclusion-exclusion principle), thereby reducing computation time. - This approximation is only applicable to the cut set method - (method="c"), a ValueError exception is raised if method="p" and - approx=True. This approximation is typically sufficient for most - use cases where reliabilities are close to 1. By default, False. + path set methods respectively, by default "p". Both methods + return the same (exact) result; the path set method is the default + as it avoids deriving the cut sets. Returns ------- @@ -427,11 +499,7 @@ def system_probability( Raises ------ ValueError - - Working/broken node/component inconsistency (a component or node - is supplied more than once to any of working_nodes, broken_nodes, - working_components, broken_components) - - The path set method must not be used with approx=True, see approx - arg description above + Probability arrays of differing lengths """ node_probabilities = copy(node_probabilities) @@ -446,79 +514,26 @@ def system_probability( else: # get shape of input array array_shape = lengths[0] - # Return array - system_prob = np.zeros(array_shape) - # Check that path set method and approximation are not used together - # (The approximation is only applicable to the cutset method) - if method == "p" and approx: - raise ValueError( - "The path set method must not be used with \ - approx=True, see approx arg description in docstring." - ) - # Get all node sets (path sets for method="p" and cut sets for - # method="c") - node_sets: set[frozenset] if method == "p": - node_sets = self.get_min_path_sets(include_in_out_nodes=False) - else: - # method == "c" - node_sets = self.get_min_cut_sets(include_in_out_nodes=False) - node_probabilities = { - k: 1 - v for k, v in node_probabilities.items() - } - - num_node_sets = len(node_sets) - # Perform intersection calculation, which isn't as simple as summating - # in the case of mutual non-exclusivity - # i is the 'level' of the intersection calc - # This is really just applying the inclusion-exclusion principle - for i in range(1, num_node_sets + 1): - # Get node set combinations for level i - node_set_combs = combinations(node_sets, i) - - # Calculate the probability of each level combination - # Making sure to not multiply a components' probability twice - level_sum = np.zeros(array_shape) - for node_set_comb in node_set_combs: - # Make a set of components out of the node path/tieset - s = set() - for path in node_set_comb: - for node in path: - if node in self.in_or_out: - continue - else: - # Add node to combination list - s.add(node) - - # Now calculate the node set probability - node_set_prob = np.ones(array_shape) - for comp in s: - comp_prob = node_probabilities[comp] - node_set_prob = node_set_prob * comp_prob - - # Now add the node set probability to the level sum - level_sum = level_sum + node_set_prob - - # Finally add/subtract the level sum to/from the system_prob if the - # level is even/odd - if i % 2 == 1: - system_prob = system_prob + level_sum - - # If the approximation is requested, just break from the - # inclusion-exclusion principle procedure after the first level - if approx: - break - else: - system_prob = system_prob - level_sum - - # If cutset method is used, the above returns the unreliability, - # so we just have to return = 1 - system_prob - if method == "c": - return 1 - system_prob + # The system reliability is the probability that at least one + # minimal path set has all of its components working. + path_sets = self.get_min_path_sets(include_in_out_nodes=False) + return probability_any_set_satisfied( + path_sets, node_probabilities, array_shape + ) - # Otherwise for the pathset method it's already the reliability - return system_prob + # method == "c": work with cut sets and node unreliabilities. The + # system unreliability is the probability that at least one minimal + # cut set has all of its components failed. + cut_sets = self.get_min_cut_sets(include_in_out_nodes=False) + node_unreliability = { + k: 1 - v for k, v in node_probabilities.items() + } + system_unreliability = probability_any_set_satisfied( + cut_sets, node_unreliability, array_shape + ) + return 1 - system_unreliability @check_probability def improvement_allocation( diff --git a/repyability/rbd/repairable_rbd.py b/repyability/rbd/repairable_rbd.py index 7cffaca..ff16b11 100644 --- a/repyability/rbd/repairable_rbd.py +++ b/repyability/rbd/repairable_rbd.py @@ -5,7 +5,6 @@ from typing import Any, Collection, Hashable, Iterable, List, Tuple import numpy as np -from dd import autoref as _bdd from tqdm import tqdm from repyability.non_repairable import NonRepairable @@ -140,51 +139,6 @@ def __init__( self.components = components self.repairability = copy(repairability) - # Compile BDD, sets self.bdd and self.bdd_system_ref. See compile_bdd() - # docstring for more info. - self.compile_bdd() - - def is_system_working( - self, component_status: dict[Any, bool], method: str - ) -> bool: - """Returns a boolean as to whether the system is working given the - status of the components - - Parameters - ---------- - component_status : dict[Any, bool] - Dictionary with all components where - component_status[component] = True only if the component is - working, and = False if not working. - - Returns - ------- - bool - True if the system is working, otherwise False. - """ - # This function uses a Binary Decision Diagram (BDD) to enable - # efficient component_status -> is_system_working lookup. Something - # very much required given the rate at which this function is called - # in the N simulations in availability(). - - # Now evaluate the BDD given the component status - if method == "c": - system_given_component_status = self.bdd_c.let( - component_status, self.bdd_system_c_ref - ) - - system_state = self.bdd_c.to_expr(system_given_component_status) - elif method == "p": - system_given_component_status = self.bdd.let( - component_status, self.bdd_system_ref - ) - - system_state = self.bdd.to_expr(system_given_component_status) - else: - raise ValueError("`method` must be either 'p' or 'c'") - - return system_state == "TRUE" - def initialize_event_queue( self, t_simulation, @@ -572,86 +526,6 @@ def availability( return simulation_results - def compile_bdd(self): - """ - Compiles the BDD, setting self.bdd to the BDD manager, and - self.bdd_system_ref to the BDD variable reference for the system state. - """ - # Instantiate the manager - bdd = _bdd.BDD() - bdd_c = _bdd.BDD() - - # Enable Dynamic Reordering (Rudell's Sifting Algorithm) - bdd.configure(reordering=True) - bdd_c.configure(reordering=True) - - # For all components, declare the BDD variable, - # and get the BDD variable reference - bdd_vars = {} - bdd_c_vars = {} - for component in self.get_nodes_names(): - bdd.declare(component) - bdd_vars[component] = bdd.var(component) - bdd_c.declare(component) - bdd_c_vars[component] = bdd_c.var(component) - - # Make path expressions - path_expressions: list[_bdd.Function] = [] - for min_path_set in self.get_min_path_sets(include_in_out_nodes=False): - min_path_set_as_list = list(min_path_set) - path_function = bdd_vars[min_path_set_as_list[0]] - for component in min_path_set_as_list[1:]: - path_function &= bdd_vars[component] - path_expressions.append(path_function) - - # Make cut expressions - cut_expressions: list[_bdd.Function] = [] - for min_cut_set in self.get_min_cut_sets(include_in_out_nodes=False): - min_cut_set_as_list = list(min_cut_set) - cut_function = bdd_c_vars[min_cut_set_as_list[0]] - for component in min_cut_set_as_list[1:]: - cut_function |= bdd_c_vars[component] - cut_expressions.append(cut_function) - - system: _bdd.Function = path_expressions[0] - for path_expression in path_expressions[1:]: - system |= path_expression - - system_c: _bdd.Function = cut_expressions[0] - for cut_expression in cut_expressions[1:]: - system_c &= cut_expression - - self.bdd = bdd - self.bdd_c = bdd_c - self.bdd_system_ref = system - self.bdd_system_c_ref = system_c - - # Debugging Functions - these help to understand the BDD structure - def bdd_to_string(self, filename: str) -> str: - """Returns the BDD as a string. Simply wraps bdd.to_expr()""" - return self.bdd.to_expr(self.bdd_system_ref) - - def bdd_to_file(self, filename: str): - """ - Wraps bdd.dump(). The file type is inferred from the extension - (case insensitive). - - Note: bdd.dump() depends on the python library `pydot` being installed - (via pip) and the graphviz' `dot` program being in your PATH. - Install pydot just with pip: `pip install pydot`, - and install graphviz, at least on Mac, with `brew install graphviz`. - Installing with brew will at least make sure `dot` is added to your - PATH. - - Supported extensions: - '.p' for Pickle - '.pdf' for PDF - '.png' for PNG - '.svg' for SVG - '.json' for JSON - """ - self.bdd.dump(filename, roots=[self.bdd_system_ref]) - def node_availability(self): node_av: dict = {} for node_name, component in self.components.items(): diff --git a/repyability/rbd/repeated_standby_node.py b/repyability/rbd/repeated_standby_node.py index 1304bf0..4b38825 100644 --- a/repyability/rbd/repeated_standby_node.py +++ b/repyability/rbd/repeated_standby_node.py @@ -1,26 +1,62 @@ import numpy as np -from surpyval import KaplanMeier + +from .numerical_convolution import ( + ConvolvedSurvival, + is_perfect_switching, + switch_success_probs, +) class RepeatedStandbyNode: - def __init__(self, model, repeats, N=10_000, lower=-np.inf): + def __init__( + self, + model, + repeats, + N=10_000, + lower=-np.inf, + switching_probability=1.0, + ): + # N and lower are kept for backwards compatibility (a Kaplan-Meier fit + # was previously made here); they are no longer used. self.model = model self.repeats = repeats - x_random = self.random(N) + self.switching_probability = switching_probability - # Create an approximation of the standby arrangement with - # a Kaplan-Meier estimation. - self.model = KaplanMeier.fit(x_random, set_lower_limit=lower) + # Repeated cold standby: the lifetime is the sum of `repeats` + # independent copies of `model` (a mixture of partial sums under + # imperfect switching). Its survival function is computed + # deterministically by numerical convolution. + self._sf_model = ConvolvedSurvival( + [model] * repeats, switching_probability=switching_probability + ) def random(self, size): - randoms = self.model.random((self.repeats, size)) - return randoms.sum(axis=0) + # Sum of `repeats` independent draws from the base model. Under + # imperfect switching a spare only contributes if every switch up to + # and including its own has succeeded. + x_random = np.asarray(self.model.random(size), dtype=float) + if is_perfect_switching(self.switching_probability): + for _ in range(self.repeats - 1): + x_random = x_random + self.model.random(size) + else: + probs = switch_success_probs( + self.switching_probability, self.repeats + ) + running = np.ones(size, dtype=bool) + for p in probs: + running = running & (np.random.random(size) < p) + x_random = x_random + np.where( + running, self.model.random(size), 0.0 + ) + return x_random - def mean(self, N=1000): - return self.random(N).mean() + def mean(self, *args, **kwargs): + # Exact, deterministic mean from the convolution (E[T] = integral of + # the survival function). + return self._sf_model.mean() def sf(self, *args, **kwargs): - return self.model.sf(*args, **kwargs) + return self._sf_model.sf(*args, **kwargs) def ff(self, *args, **kwargs): - return self.model.ff(*args, **kwargs) + return self._sf_model.ff(*args, **kwargs) diff --git a/repyability/rbd/standby_node.py b/repyability/rbd/standby_node.py index 1ef0a12..f96eb18 100644 --- a/repyability/rbd/standby_node.py +++ b/repyability/rbd/standby_node.py @@ -1,11 +1,69 @@ from queue import PriorityQueue import numpy as np +from scipy.stats import gamma as _gamma from surpyval import KaplanMeier +from .numerical_convolution import ( + ConvolvedSurvival, + is_perfect_switching, + switch_success_probs, +) + + +def _identical_exponential_rate(models): + """If every model is an Exponential with the same rate, return that rate; + otherwise return None. The rate is taken as 1 / mean.""" + rate = None + for model in models: + dist = getattr(model, "dist", None) + if dist is None or getattr(dist, "name", "") != "Exponential": + return None + mean = float(model.mean()) + if mean <= 0.0: + return None + this_rate = 1.0 / mean + if rate is None: + rate = this_rate + elif not np.isclose(this_rate, rate): + return None + return rate + + +class _ExponentialStandbySurvival: + """Exact survival function of an identical-exponential k-out-of-n cold + standby arrangement. + + With N identical units of rate ``rate``, k operating at a time, the system + fails at the (N-k+1)-th failure. While k units operate the failure rate is + ``k*rate``, and by the memorylessness of the exponential the inter-failure + times are i.i.d. Exponential(k*rate). Hence the lifetime is exactly + Erlang(N-k+1, k*rate) -- deterministic, for any k. + """ + + def __init__(self, rate, n_units, k): + self.shape = n_units - k + 1 # number of inter-failure gaps + self.rate = k * rate # failure rate while k units operate + + def sf(self, x): + return _gamma.sf(x, a=self.shape, scale=1.0 / self.rate) + + def ff(self, x): + return _gamma.cdf(x, a=self.shape, scale=1.0 / self.rate) + + def mean(self, *args, **kwargs): + return self.shape / self.rate + class StandbyModel: - def __init__(self, reliabilities, k=1, n_sims=10_000, lower=-np.inf): + def __init__( + self, + reliabilities, + k=1, + n_sims=10_000, + lower=-np.inf, + switching_probability=1.0, + ): if k > len(reliabilities): raise ValueError( "Must be more nodes in the standby arrangement" @@ -15,23 +73,62 @@ def __init__(self, reliabilities, k=1, n_sims=10_000, lower=-np.inf): self.k = k self.N = len(reliabilities) self.n_sims = n_sims + self.switching_probability = switching_probability - # Create n_sims samples of survival times using the random method - x_random = self.random(n_sims) - - # Create an approximation of the standby arrangement with - # a Kaplan-Meier estimation. - self.model = KaplanMeier.fit(x_random, set_lower_limit=lower) + rate = _identical_exponential_rate(reliabilities) + if rate is not None and is_perfect_switching(switching_probability): + # Identical exponential units: the cold standby lifetime is exactly + # Erlang(N-k+1, k*rate) for any k, by the memorylessness of the + # exponential. Use that closed form directly. + self._sf_model = _ExponentialStandbySurvival(rate, self.N, k) + self.model = None + elif k == 1: + # Cold standby (k=1): the lifetime is the sum of the components' + # lifetimes (or a mixture of partial sums under imperfect + # switching), whose survival function is computed deterministically + # by numerical convolution rather than from Monte-Carlo samples. + self._sf_model = ConvolvedSurvival( + reliabilities, switching_probability=switching_probability + ) + self.model = None + else: + # For k >= 2 with general (non-exponential) lifetimes the lifetime + # is not a simple sum (it depends on the order in which components + # fail), so fall back to the Monte-Carlo + Kaplan-Meier + # approximation. Imperfect switching is not modelled in that case + # yet. + if not is_perfect_switching(switching_probability): + raise NotImplementedError( + "switching_probability is only supported for k=1 cold" + " standby; for k>=2 leave it at 1.0 (perfect switching)." + ) + x_random = self.random(n_sims) + self.model = KaplanMeier.fit(x_random, set_lower_limit=lower) + self._sf_model = None def random(self, size): if self.k == 1: - # If k is only one for the standby node the - # reliability can be estimated from the sum - # of each of the components in the node. + # If k is only one for the standby node the reliability can be + # estimated from the sum of each of the components in the node, # i.e. it will fail after all of them fail. - x_random = np.zeros(size) - for model in self.reliabilities: - x_random += model.random(size) + x_random = np.asarray( + self.reliabilities[0].random(size), dtype=float + ) + if is_perfect_switching(self.switching_probability): + for model in self.reliabilities[1:]: + x_random = x_random + model.random(size) + else: + # Under imperfect switching a spare only contributes if every + # switch up to and including its own has succeeded. + probs = switch_success_probs( + self.switching_probability, self.N + ) + running = np.ones(size, dtype=bool) + for model, p in zip(self.reliabilities[1:], probs): + running = running & (np.random.random(size) < p) + x_random = x_random + np.where( + running, model.random(size), 0.0 + ) else: # If k are required to continue then a random draw needs @@ -66,10 +163,19 @@ def random(self, size): return x_random def mean(self, N=10_000): + # Use the exact/deterministic mean when an analytic survival model is + # available (exponential closed form or convolution); otherwise fall + # back to the Monte-Carlo estimate. + if self._sf_model is not None: + return self._sf_model.mean() return self.random(N).mean() def sf(self, *args, **kwargs): + if self._sf_model is not None: + return self._sf_model.sf(*args, **kwargs) return self.model.sf(*args, **kwargs) def ff(self, *args, **kwargs): + if self._sf_model is not None: + return self._sf_model.ff(*args, **kwargs) return self.model.ff(*args, **kwargs) diff --git a/repyability/tests/test_rbd_analytic_solvability.py b/repyability/tests/test_rbd_analytic_solvability.py new file mode 100644 index 0000000..79c1fbf --- /dev/null +++ b/repyability/tests/test_rbd_analytic_solvability.py @@ -0,0 +1,80 @@ +""" +Tests NonRepairableRBD.is_analytically_solvable() and get_non_analytic_nodes(). + +An RBD is analytically / BDD solvable iff every node's reliability is available +without Monte-Carlo simulation. Standby nodes (StandbyModel, +RepeatedStandbyNode) are simulation-based (a Kaplan-Meier fit to simulated +samples) and so make an RBD non-analytic. + +Uses pytest fixtures located in conftest.py in the tests/ directory. +""" + +import surpyval as surv + +from repyability.rbd.non_repairable_rbd import NonRepairableRBD +from repyability.rbd.repeated_node import RepeatedNode + + +# --- Analytically solvable RBDs -------------------------------------------- + + +def test_analytic_series(rbd_series: NonRepairableRBD): + assert rbd_series.is_analytically_solvable() + assert rbd_series.get_non_analytic_nodes() == {} + + +def test_analytic_parallel_fixed(rbd_parallel: NonRepairableRBD): + assert rbd_parallel.is_analytically_solvable() + assert rbd_parallel.get_non_analytic_nodes() == {} + + +def test_analytic_rbd1(rbd1: NonRepairableRBD): + assert rbd1.is_analytically_solvable() + + +def test_repeated_component_still_analytic( + rbd_repeated_component_parallel: NonRepairableRBD, +): + # A repeated *component* (shared node) is merged into one node and remains + # analytically solvable. + assert rbd_repeated_component_parallel.is_analytically_solvable() + + +def test_repeated_node_of_parametric_is_analytic(): + # A RepeatedNode wrapping a parametric distribution is analytic. + edges = [(1, 2), (2, 3)] + reliabilities = { + 2: RepeatedNode( + surv.Weibull.from_params([5, 1.1]), repeats=2, kind="series" + ) + } + rbd = NonRepairableRBD(edges, reliabilities) + assert rbd.is_analytically_solvable() + + +# --- Non-analytic (simulation-based) RBDs ---------------------------------- + + +def test_non_analytic_standby(rbd2: NonRepairableRBD): + # rbd2 contains a StandbyModel at node 7. + assert not rbd2.is_analytically_solvable() + assert rbd2.get_non_analytic_nodes() == {7: "StandbyModel"} + + +def test_non_analytic_standby_koon(rbd2_koon: NonRepairableRBD): + # rbd2_koon also contains the StandbyModel at node 7. + assert not rbd2_koon.is_analytically_solvable() + assert 7 in rbd2_koon.get_non_analytic_nodes() + + +# --- structure_check wiring ------------------------------------------------ + + +def test_structure_check_fields( + rbd_series: NonRepairableRBD, rbd2: NonRepairableRBD +): + assert rbd_series.structure_check["is_analytically_solvable"] is True + assert rbd_series.structure_check["non_analytic_nodes"] == {} + + assert rbd2.structure_check["is_analytically_solvable"] is False + assert rbd2.structure_check["non_analytic_nodes"] == {7: "StandbyModel"} diff --git a/repyability/tests/test_rbd_berge_cut_sets.py b/repyability/tests/test_rbd_berge_cut_sets.py new file mode 100644 index 0000000..d758dad --- /dev/null +++ b/repyability/tests/test_rbd_berge_cut_sets.py @@ -0,0 +1,126 @@ +""" +Tests Berge's algorithm for the minimal cut sets +(minimal_cut_sets_from_path_sets / RBD.get_min_cut_sets) and the new default +path-set method for sf(). + +Berge's output is cross-checked against the previous implementation (full +Cartesian product of the path sets, then minimalised) across every fixture, +including the k-out-of-n ones, plus a few hand-computed examples. + +Uses pytest fixtures located in conftest.py in the tests/ directory. +""" + +import itertools + +import pytest + +from repyability.rbd.non_repairable_rbd import NonRepairableRBD +from repyability.rbd.rbd import minimal_cut_sets_from_path_sets + + +def _reference_min_cut_sets(path_sets): + """The previous implementation: full Cartesian product then minimalise. + + Kept here purely as an independent reference to verify Berge against. + """ + path_sets = [set(p) for p in path_sets] + prods = itertools.product(*path_sets) + cut_sets = [frozenset(prod) for prod in prods if prod] + minimal: list = [] + for cut_set in cut_sets: + is_minimal = True + for other in minimal.copy(): + if cut_set.issuperset(other): + is_minimal = False + break + if cut_set.issubset(other): + minimal.remove(other) + if is_minimal: + minimal.append(cut_set) + return set(minimal) + + +# --- Hand-computed examples ------------------------------------------------ + + +def test_berge_series(): + assert minimal_cut_sets_from_path_sets([{2, 3, 4}]) == { + frozenset([2]), + frozenset([3]), + frozenset([4]), + } + + +def test_berge_parallel(): + assert minimal_cut_sets_from_path_sets([{2}, {3}, {4}]) == { + frozenset([2, 3, 4]) + } + + +def test_berge_bridge_example(): + # Path sets of a small bridge: cut sets are the minimal hitting sets. + path_sets = [frozenset({1, 2}), frozenset({3, 4}), frozenset({1, 4})] + assert minimal_cut_sets_from_path_sets(path_sets) == { + frozenset({1, 3}), + frozenset({1, 4}), + frozenset({2, 4}), + } + + +# --- Cross-check Berge against the reference across all fixtures ------------ + + +FIXTURE_NAMES = [ + "rbd_series", + "rbd_parallel", + "rbd1", + "rbd2", + "rbd3", + "rbd_repeated_component_parallel", + "rbd1_koon", + "rbd2_koon", + "rbd3_koon1", + "rbd3_koon2", + "rbd3_koon3", + "rbd_koon_simple", +] + + +@pytest.mark.parametrize("fixture_name", FIXTURE_NAMES) +def test_berge_matches_reference(fixture_name, request): + rbd = request.getfixturevalue(fixture_name) + path_sets = rbd.get_min_path_sets(include_in_out_nodes=False) + expected = _reference_min_cut_sets(path_sets) + assert minimal_cut_sets_from_path_sets(path_sets) == expected + # And the method itself returns the same. + assert rbd.get_min_cut_sets() == expected + + +@pytest.mark.parametrize("fixture_name", FIXTURE_NAMES) +def test_cut_sets_are_minimal_transversals(fixture_name, request): + rbd = request.getfixturevalue(fixture_name) + path_sets = [ + set(p) for p in rbd.get_min_path_sets(include_in_out_nodes=False) + ] + cut_sets = rbd.get_min_cut_sets() + + # Every cut set must hit every path set (it is a transversal). + for cut_set in cut_sets: + for path_set in path_sets: + assert cut_set & path_set + + # No cut set is a proper subset of another (they are all minimal). + for a in cut_sets: + for b in cut_sets: + if a != b: + assert not a < b + + +# --- New default: path-set method ------------------------------------------ + + +def test_sf_default_method_is_path_set(rbd3: NonRepairableRBD): + # The default method must match the explicit path-set method, and both + # methods agree. + assert rbd3.sf(1) == pytest.approx(rbd3.sf(1, method="p")) + assert rbd3.sf(1) == pytest.approx(rbd3.sf(1, method="c")) diff --git a/repyability/tests/test_rbd_exact_probability.py b/repyability/tests/test_rbd_exact_probability.py new file mode 100644 index 0000000..b7f08a4 --- /dev/null +++ b/repyability/tests/test_rbd_exact_probability.py @@ -0,0 +1,96 @@ +""" +Tests the exact reliability engine (probability_any_set_satisfied, used by +system_probability) and the BDD-free is_system_working() evaluation. + +The exact engine is cross-checked against a brute-force enumeration over every +component up/down combination, on a non-series-parallel (bridge) network where +the cut-set and path-set methods must both equal the true value. + +Uses pytest fixtures located in conftest.py in the tests/ directory. +""" + +import itertools + +import pytest +from surpyval import FixedEventProbability + +from repyability.rbd.non_repairable_rbd import NonRepairableRBD + + +def _brute_force_reliability(rbd: NonRepairableRBD, rel: dict) -> float: + """Reliability by summing P(state) over every state in which the system + works, using the path-set structure function directly.""" + nodes = list(rbd.nodes) + path_sets = [ + set(p) for p in rbd.get_min_path_sets(include_in_out_nodes=False) + ] + total = 0.0 + for bits in itertools.product([0, 1], repeat=len(nodes)): + status = {n: bool(b) for n, b in zip(nodes, bits)} + works = any(all(status[c] for c in ps) for ps in path_sets) + if not works: + continue + p = 1.0 + for n, b in zip(nodes, bits): + p *= rel[n] if b else (1 - rel[n]) + total += p + return total + + +def _bridge_rbd(rel: dict) -> NonRepairableRBD: + # rbd3 topology: a bridge network that is not reducible to series/parallel. + edges = [ + (0, 1), (0, 3), (1, 2), (3, 4), (1, 5), + (3, 5), (5, 2), (5, 4), (2, 6), (4, 6), + ] + reliabilities = { + n: FixedEventProbability.from_params(1 - r) for n, r in rel.items() + } + return NonRepairableRBD(edges, reliabilities) + + +def test_exact_matches_brute_force_bridge(): + rel = {1: 0.9, 2: 0.8, 3: 0.7, 4: 0.85, 5: 0.6} + rbd = _bridge_rbd(rel) + expected = _brute_force_reliability(rbd, rel) + assert rbd.sf(1, method="p")[0] == pytest.approx(expected) + assert rbd.sf(1, method="c")[0] == pytest.approx(expected) + + +def test_cut_and_path_methods_agree( + rbd3: NonRepairableRBD, + rbd1: NonRepairableRBD, + rbd_parallel: NonRepairableRBD, +): + for rbd in (rbd3, rbd1, rbd_parallel): + assert rbd.sf(1, method="p") == pytest.approx(rbd.sf(1, method="c")) + + +def test_is_system_working_series(rbd_series: NonRepairableRBD): + status = {n: True for n in rbd_series.G.nodes} + assert rbd_series.is_system_working(status, "p") + assert rbd_series.is_system_working(status, "c") + + # Any single failure breaks a series system. + status[3] = False + assert not rbd_series.is_system_working(status, "p") + assert not rbd_series.is_system_working(status, "c") + + +def test_is_system_working_parallel(rbd_parallel: NonRepairableRBD): + status = {n: True for n in rbd_parallel.G.nodes} + # All redundant branches down -> system down. + status[2] = status[3] = status[4] = False + assert not rbd_parallel.is_system_working(status, "p") + assert not rbd_parallel.is_system_working(status, "c") + + # A single branch back up -> system up. + status[3] = True + assert rbd_parallel.is_system_working(status, "p") + assert rbd_parallel.is_system_working(status, "c") + + +def test_is_system_working_bad_method(rbd_series: NonRepairableRBD): + status = {n: True for n in rbd_series.G.nodes} + with pytest.raises(ValueError): + rbd_series.is_system_working(status, "x") diff --git a/repyability/tests/test_rbd_exponential_standby.py b/repyability/tests/test_rbd_exponential_standby.py new file mode 100644 index 0000000..1fba2d9 --- /dev/null +++ b/repyability/tests/test_rbd_exponential_standby.py @@ -0,0 +1,76 @@ +""" +Tests the exact closed form used when standby components are identical +Exponentials. + +For N identical exponential units of rate lambda with k operating, the cold +standby lifetime is exactly Erlang(N-k+1, k*lambda): while k units run the +failure rate is k*lambda, and by memorylessness the inter-failure times are +i.i.d. Exponential(k*lambda). This is checked against scipy's gamma, against +the surpyval Gamma, and (in distribution) against the Monte-Carlo simulation. +""" + +import numpy as np +import pytest +from scipy.stats import gamma +from surpyval import Exponential, Gamma + +from repyability.rbd.standby_node import StandbyModel + +TOL = dict(rel=1e-2, abs=1e-4) +EXP1 = Exponential.from_params([1]) + + +def test_k1_exponential_is_erlang(): + # N=3, k=1 -> Erlang(3, 1) == Gamma(3, 1) + standby = StandbyModel([EXP1, EXP1, EXP1], k=1) + gam = Gamma.from_params([3.0, 1.0]) + for t in [0.5, 1, 3, 10]: + assert standby.sf(t) == pytest.approx(gam.sf(t), **TOL) + assert standby.ff(t) == pytest.approx(gam.ff(t), **TOL) + assert standby.mean() == pytest.approx(3.0, rel=1e-9) + + +def test_k2_exponential_is_erlang(): + # N=4, k=2 -> fails at the 3rd failure, rate 2 while operating + # -> Erlang(3, 2) == Gamma(a=3, scale=1/2), mean 3/2 + standby = StandbyModel([EXP1] * 4, k=2) + for t in [0.5, 1, 2, 5]: + assert standby.sf(t) == pytest.approx( + gamma.sf(t, a=3, scale=0.5), **TOL + ) + assert standby.mean() == pytest.approx(1.5, rel=1e-9) + + +def test_k_equals_n_is_min_exponential(): + # k=N: all units operate, system fails at the first failure + # -> Exp(N*rate) == Erlang(1, N*rate) + standby = StandbyModel([EXP1] * 4, k=4) + for t in [0.25, 1, 2]: + assert standby.sf(t) == pytest.approx(np.exp(-4 * t), **TOL) + + +def test_exponential_closed_form_is_deterministic(): + a = StandbyModel([EXP1] * 5, k=2) + b = StandbyModel([EXP1] * 5, k=2) + for t in [1, 2, 5]: + assert a.sf(t) == b.sf(t) # exact, no sampling + + +def test_k2_exponential_matches_simulation(): + # The closed form should agree (in mean) with the priority-queue sim. + exp = Exponential.from_params([2.0]) # rate 2 + standby = StandbyModel([exp] * 5, k=2) # Erlang(4, 4), mean 1.0 + assert standby.mean() == pytest.approx(1.0, rel=1e-9) + sim_mean = standby.random(50_000).mean() + assert sim_mean == pytest.approx(1.0, rel=3e-2) + + +def test_non_identical_rates_use_convolution_not_erlang(): + # Different rates -> not the simple Erlang; k=1 still exact via the + # convolution path: Exp(1) + Exp(2) is hypoexponential, 2 e^-t - e^-2t. + standby = StandbyModel( + [Exponential.from_params([1]), Exponential.from_params([2])], k=1 + ) + for t in [0.5, 1, 2]: + expected = 2 * np.exp(-t) - np.exp(-2 * t) + assert standby.sf(t) == pytest.approx(expected, **TOL) diff --git a/repyability/tests/test_rbd_numerical_convolution.py b/repyability/tests/test_rbd_numerical_convolution.py new file mode 100644 index 0000000..992670f --- /dev/null +++ b/repyability/tests/test_rbd_numerical_convolution.py @@ -0,0 +1,76 @@ +""" +Tests the numerical-convolution survival function used for standby nodes +(ConvolvedSurvival), and its use in StandbyModel (k=1) and RepeatedStandbyNode. + +The numerical sf is checked against two closed forms: + - sum of n iid Exponential(rate) == Erlang == Gamma(n, scale=1/rate) + - Exp(rate 1) + Exp(rate 2) == hypoexponential, sf = 2 e^-t - e^-2t +and is verified to be deterministic (no Monte-Carlo), unlike the previous +Kaplan-Meier fit. +""" + +import numpy as np +import pytest +from surpyval import Exponential, Gamma, Weibull + +from repyability.rbd.numerical_convolution import ConvolvedSurvival +from repyability.rbd.repeated_standby_node import RepeatedStandbyNode +from repyability.rbd.standby_node import StandbyModel + +TOL = dict(rel=1e-2, abs=1e-4) + + +def test_convolution_matches_erlang(): + exp = Exponential.from_params([1]) + conv = ConvolvedSurvival([exp, exp, exp]) + gamma = Gamma.from_params([3.0, 1.0]) + for t in [0.5, 1, 3, 5, 10, 15]: + assert conv.sf(t) == pytest.approx(gamma.sf(t), **TOL) + assert conv.ff(t) == pytest.approx(gamma.ff(t), **TOL) + assert conv.mean() == pytest.approx(3.0, rel=1e-2) + + +def test_convolution_matches_hypoexponential(): + conv = ConvolvedSurvival( + [Exponential.from_params([1]), Exponential.from_params([2])] + ) + for t in [0.5, 1, 2, 4, 8]: + expected = 2 * np.exp(-t) - np.exp(-2 * t) + assert conv.sf(t) == pytest.approx(expected, **TOL) + + +def test_convolution_array_input(): + exp = Exponential.from_params([1]) + conv = ConvolvedSurvival([exp, exp]) + x = np.array([1.0, 2.0, 3.0]) + out = conv.sf(x) + assert out.shape == x.shape + gamma = Gamma.from_params([2.0, 1.0]) + assert out == pytest.approx(gamma.sf(x), **TOL) + + +def test_convolution_single_model_is_identity(): + w = Weibull.from_params([10, 2]) + conv = ConvolvedSurvival([w]) + for t in [1, 5, 10, 20]: + assert conv.sf(t) == pytest.approx(w.sf(t), **TOL) + + +def test_standby_model_k1_sf_is_exact_and_deterministic(): + exp = Exponential.from_params([1]) + standby = StandbyModel([exp, exp, exp], k=1) + standby_again = StandbyModel([exp, exp, exp], k=1) + gamma = Gamma.from_params([3.0, 1.0]) + for t in [1, 3, 10]: + assert standby.sf(t) == pytest.approx(gamma.sf(t), **TOL) + # Deterministic: a second build gives identical values (no sampling). + assert standby.sf(t) == standby_again.sf(t) + + +def test_repeated_standby_sf_matches_erlang(): + exp = Exponential.from_params([1]) + node = RepeatedStandbyNode(exp, 3) + gamma = Gamma.from_params([3.0, 1.0]) + for t in [1, 3, 10]: + assert node.sf(t) == pytest.approx(gamma.sf(t), **TOL) + assert node.ff(t) == pytest.approx(gamma.ff(t), **TOL) diff --git a/repyability/tests/test_rbd_repeated_standby.py b/repyability/tests/test_rbd_repeated_standby.py index 89dbd23..3ff03e6 100644 --- a/repyability/tests/test_rbd_repeated_standby.py +++ b/repyability/tests/test_rbd_repeated_standby.py @@ -27,9 +27,9 @@ def test_random(): def test_mean(): + # Cold standby of 3 iid Exp(1): mean is the sum of the means = 3. node = RepeatedStandbyNode(exp_model, 3, N=20_000) - mean = node.mean() - assert pytest.approx(mean, rel=1e-1) == 3 * 3 + assert pytest.approx(node.mean(), rel=1e-2) == 3 def test_sf(): diff --git a/repyability/tests/test_rbd_sf.py b/repyability/tests/test_rbd_sf.py index a452f6d..76b35db 100644 --- a/repyability/tests/test_rbd_sf.py +++ b/repyability/tests/test_rbd_sf.py @@ -144,38 +144,3 @@ def test_rbd_sf_NonRepairableRBD_as_node(rbd1: NonRepairableRBD, method: str): * rbd.reliabilities["NonRepairableRBD"].ff(t) * rbd.reliabilities[4].ff(t) ) == rbd.ff(t, method=method) - - -# Test approx - - -def test_rbd_sf_series_approx(rbd_series: NonRepairableRBD): - t = 5 - assert pytest.approx( - rbd_series.reliabilities[2].sf(t) - * rbd_series.reliabilities[3].sf(t) - * rbd_series.reliabilities[4].sf(t), - abs=0.001, - ) == rbd_series.sf(t, approx=True) - - -def test_rbd_sf_composite_approx(rbd1: NonRepairableRBD): - t = 2 - assert pytest.approx( - ( - 1 - - (1 - rbd1.reliabilities["pump1"].sf(t)) - * (1 - rbd1.reliabilities["pump2"].sf(t)) - ) - * rbd1.reliabilities["valve"].sf(t), - abs=0.001, - ) == rbd1.sf(t, approx=True) - - -def test_rbd_sf_complex_approx(rbd3: NonRepairableRBD): - assert pytest.approx(0.994780625, abs=0.001) == rbd3.sf(1000, approx=True) - - -def test_rbd_sf_path_set_method_and_approx_error(rbd1: NonRepairableRBD): - with pytest.raises(ValueError): - rbd1.sf(1, method="p", approx=True) diff --git a/repyability/tests/test_rbd_switching_probability.py b/repyability/tests/test_rbd_switching_probability.py new file mode 100644 index 0000000..d63c131 --- /dev/null +++ b/repyability/tests/test_rbd_switching_probability.py @@ -0,0 +1,96 @@ +""" +Tests imperfect switching for cold-standby nodes. + +Under imperfect switching the lifetime is a mixture of partial sums, weighted +by how many switches succeed. The key checks use the closed form for a single +switch between two Exp(1) components with switch probability p: + + S(t) = (1 - p) e^-t + p * Erlang(2,1).sf(t) = e^-t (1 + p t) + +and the corresponding mean for three Exp(1) components, E[T] = 1 + p + p^2. +""" + +import numpy as np +import pytest +from surpyval import Exponential + +from repyability.rbd.numerical_convolution import ConvolvedSurvival +from repyability.rbd.repeated_standby_node import RepeatedStandbyNode +from repyability.rbd.standby_node import StandbyModel + +TOL = dict(rel=1e-2, abs=1e-4) +EXP = Exponential.from_params([1]) + + +def test_single_switch_closed_form(): + for p in [0.0, 0.3, 0.7, 1.0]: + conv = ConvolvedSurvival([EXP, EXP], switching_probability=p) + for t in [0.5, 1, 3, 6]: + expected = np.exp(-t) * (1 + p * t) + assert conv.sf(t) == pytest.approx(expected, **TOL) + + +def test_perfect_switching_matches_default(): + a = ConvolvedSurvival([EXP, EXP, EXP], switching_probability=1.0) + b = ConvolvedSurvival([EXP, EXP, EXP]) + for t in [1, 3, 7]: + assert a.sf(t) == pytest.approx(b.sf(t), **TOL) + + +def test_zero_switching_is_primary_only(): + conv = ConvolvedSurvival([EXP, EXP, EXP], switching_probability=0.0) + for t in [0.5, 1, 3]: + assert conv.sf(t) == pytest.approx(np.exp(-t), **TOL) + + +def test_mean_three_components_closed_form(): + # E[T] = (1-p)*1 + p(1-p)*2 + p^2*3 = 1 + p + p^2 + for p in [0.0, 0.4, 0.9, 1.0]: + conv = ConvolvedSurvival([EXP, EXP, EXP], switching_probability=p) + assert conv.mean() == pytest.approx(1 + p + p**2, rel=1e-2) + + +def test_per_switch_sequence(): + # switches [1.0, 0.0]: first always works, second never -> exactly the + # first two components run -> Erlang(2, 1). + conv = ConvolvedSurvival([EXP, EXP, EXP], switching_probability=[1.0, 0.0]) + erlang2 = ConvolvedSurvival([EXP, EXP]) + for t in [0.5, 1, 3]: + assert conv.sf(t) == pytest.approx(erlang2.sf(t), **TOL) + + +def test_switching_weights_sum_to_one(): + for p in [0.0, 0.5, 1.0]: + conv = ConvolvedSurvival([EXP, EXP, EXP], switching_probability=p) + assert sum(conv.switching_weights) == pytest.approx(1.0) + + +def test_standby_model_switching_sf(): + standby = StandbyModel([EXP, EXP], k=1, switching_probability=0.5) + for t in [0.5, 1, 3]: + expected = np.exp(-t) * (1 + 0.5 * t) + assert standby.sf(t) == pytest.approx(expected, **TOL) + + +def test_standby_model_switching_random_mean(): + # random() should also reflect imperfect switching: mean ~ 1 + p (n=2). + standby = StandbyModel([EXP, EXP], k=1, switching_probability=0.5) + assert standby.random(200_000).mean() == pytest.approx(1.5, abs=5e-2) + + +def test_repeated_standby_switching_sf(): + node = RepeatedStandbyNode(EXP, 2, switching_probability=0.5) + for t in [0.5, 1, 3]: + assert node.sf(t) == pytest.approx(np.exp(-t) * (1 + 0.5 * t), **TOL) + + +def test_standby_k2_switching_not_implemented(): + with pytest.raises(NotImplementedError): + StandbyModel([EXP, EXP, EXP], k=2, switching_probability=0.5) + + +def test_invalid_switching_probability(): + with pytest.raises(ValueError): + ConvolvedSurvival([EXP, EXP], switching_probability=1.5) + with pytest.raises(ValueError): + ConvolvedSurvival([EXP, EXP, EXP], switching_probability=[0.5]) diff --git a/requirements.txt b/requirements.txt index 1c13022..6ef5fd1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,5 @@ # Production requirements # For development, use requirements_dev.txt instead -dd==0.5.7 networkx==3.0 numpy==1.23.5 surpyval==0.10.10