From 34905d8be3c89829e5e291f922ccc26c7ccc735a Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 17 Jun 2026 08:09:20 +0000 Subject: [PATCH 1/9] Add is_analytically_solvable() to NonRepairableRBD Adds a check for whether an RBD's system reliability can be solved analytically / with the BDD, or whether it requires simulation because one or more nodes are simulation-based (standby arrangements). - _node_is_analytic(): classifies a node model, recursing through RepeatedNode wrappers and nested NonRepairableRBDs. - get_non_analytic_nodes(): returns {node: model type} for the offending simulation-based nodes (StandbyModel, RepeatedStandbyNode). - is_analytically_solvable(): True iff there are no such nodes. - Records the result in structure_check as "is_analytically_solvable" and "non_analytic_nodes" at construction time. Adds tests covering analytic fixtures, standby (non-analytic) fixtures, repeated nodes/components, and the structure_check wiring. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_01D3uxEk7qdLTcRhR8Avnk5M --- repyability/rbd/non_repairable_rbd.py | 86 +++++++++++++++++++ .../tests/test_rbd_analytic_solvability.py | 80 +++++++++++++++++ 2 files changed, 166 insertions(+) create mode 100644 repyability/tests/test_rbd_analytic_solvability.py diff --git a/repyability/rbd/non_repairable_rbd.py b/repyability/rbd/non_repairable_rbd.py index 5aa286f..e2940c2 100644 --- a/repyability/rbd/non_repairable_rbd.py +++ b/repyability/rbd/non_repairable_rbd.py @@ -13,6 +13,7 @@ from .helper_classes import PerfectReliability, PerfectUnreliability from .rbd import RBD from .repeated_node import RepeatedNode +from .repeated_standby_node import RepeatedStandbyNode from .standby_node import StandbyModel @@ -189,6 +190,15 @@ def __init__( self.__fixed_probs = False self.structure_check["all_distributions_fixed"] = False + # Record whether the system reliability can be solved analytically + # (equivalently with the BDD), or whether it requires simulation + # because one or more nodes are simulation-based (e.g. standby nodes). + non_analytic_nodes = self.get_non_analytic_nodes() + self.structure_check["is_analytically_solvable"] = ( + len(non_analytic_nodes) == 0 + ) + self.structure_check["non_analytic_nodes"] = non_analytic_nodes + if self.structure_check["is_valid"]: self.compile_bdd() @@ -366,6 +376,82 @@ def ff_by_node( def time_varying_rbd(self): return not self.__fixed_probs + # Node model types whose reliability is obtained by Monte-Carlo simulation + # (a Kaplan-Meier fit to simulated samples) rather than in closed form. A + # standby arrangement is sequence-dependent (dynamic), so its sf(t) cannot + # be expressed analytically and is instead estimated by simulation. Such + # nodes therefore prevent a purely analytic / BDD solution of the system. + _SIMULATION_NODE_TYPES = (StandbyModel, RepeatedStandbyNode) + + def _node_is_analytic(self, model) -> bool: + """Returns True if a node's reliability is available without + Monte-Carlo simulation (i.e. in closed form or from data), and so can + be consumed directly by the analytic / BDD system probability. + + The check recurses through RepeatedNodes (analytic iff their underlying + model is) and nested NonRepairableRBDs (analytic iff they are + themselves analytically solvable). + """ + # Perfect reliability / unreliability are constants + if model is PerfectReliability or model is PerfectUnreliability: + return True + # Standby arrangements are simulation-based (KM fit) -> non-analytic + if isinstance(model, self._SIMULATION_NODE_TYPES): + return False + # A repeated node is analytic iff its underlying model is + if isinstance(model, RepeatedNode): + return self._node_is_analytic(model.model) + # A nested RBD is analytic iff it is itself analytically solvable + if isinstance(model, NonRepairableRBD): + return model.is_analytically_solvable() + # Otherwise it is a surpyval parametric/non-parametric distribution + # (incl. FixedEventProbability), all of which expose a usable sf(t) + # without simulation. + return True + + def get_non_analytic_nodes(self) -> dict[Any, str]: + """Returns the nodes that prevent an analytic / BDD solution. + + Returns + ------- + dict[Any, str] + A mapping of node name -> the offending model's type name for + every node whose reliability requires Monte-Carlo simulation (e.g. + a StandbyModel). Empty if the RBD is analytically solvable. + """ + non_analytic: dict[Any, str] = {} + for node_name, model in self.reliabilities.items(): + if not self._node_is_analytic(model): + non_analytic[node_name] = type(model).__name__ + return non_analytic + + def is_analytically_solvable(self) -> bool: + """Returns whether the system reliability can be solved analytically. + + The analytic methods (the inclusion-exclusion in system_probability(), + and equivalently a BDD evaluation) require every node to expose a + reliability sf(t) that does not itself depend on Monte-Carlo + simulation. This holds for parametric and non-parametric distributions, + fixed-probability nodes, repeated nodes of such models, and nested RBDs + that are themselves analytically solvable. + + It does NOT hold when any node is a standby arrangement (StandbyModel + or RepeatedStandbyNode): a standby node is sequence-dependent and its + sf(t) is estimated by simulation, so while sf()/system_probability() + will still return a value, that value is only as good as the underlying + Monte-Carlo + Kaplan-Meier fit (a step function bounded by the sampled + support) rather than a closed-form result. Such systems are better + evaluated by simulation (e.g. random()/mean()). + + Returns + ------- + bool + True if the RBD can be solved analytically / with a BDD, False if + it requires simulation. Use get_non_analytic_nodes() to see which + nodes are responsible. + """ + return len(self.get_non_analytic_nodes()) == 0 + def random(self, size): out = np.zeros(size) for i in range(size): diff --git a/repyability/tests/test_rbd_analytic_solvability.py b/repyability/tests/test_rbd_analytic_solvability.py new file mode 100644 index 0000000..79c1fbf --- /dev/null +++ b/repyability/tests/test_rbd_analytic_solvability.py @@ -0,0 +1,80 @@ +""" +Tests NonRepairableRBD.is_analytically_solvable() and get_non_analytic_nodes(). + +An RBD is analytically / BDD solvable iff every node's reliability is available +without Monte-Carlo simulation. Standby nodes (StandbyModel, +RepeatedStandbyNode) are simulation-based (a Kaplan-Meier fit to simulated +samples) and so make an RBD non-analytic. + +Uses pytest fixtures located in conftest.py in the tests/ directory. +""" + +import surpyval as surv + +from repyability.rbd.non_repairable_rbd import NonRepairableRBD +from repyability.rbd.repeated_node import RepeatedNode + + +# --- Analytically solvable RBDs -------------------------------------------- + + +def test_analytic_series(rbd_series: NonRepairableRBD): + assert rbd_series.is_analytically_solvable() + assert rbd_series.get_non_analytic_nodes() == {} + + +def test_analytic_parallel_fixed(rbd_parallel: NonRepairableRBD): + assert rbd_parallel.is_analytically_solvable() + assert rbd_parallel.get_non_analytic_nodes() == {} + + +def test_analytic_rbd1(rbd1: NonRepairableRBD): + assert rbd1.is_analytically_solvable() + + +def test_repeated_component_still_analytic( + rbd_repeated_component_parallel: NonRepairableRBD, +): + # A repeated *component* (shared node) is merged into one node and remains + # analytically solvable. + assert rbd_repeated_component_parallel.is_analytically_solvable() + + +def test_repeated_node_of_parametric_is_analytic(): + # A RepeatedNode wrapping a parametric distribution is analytic. + edges = [(1, 2), (2, 3)] + reliabilities = { + 2: RepeatedNode( + surv.Weibull.from_params([5, 1.1]), repeats=2, kind="series" + ) + } + rbd = NonRepairableRBD(edges, reliabilities) + assert rbd.is_analytically_solvable() + + +# --- Non-analytic (simulation-based) RBDs ---------------------------------- + + +def test_non_analytic_standby(rbd2: NonRepairableRBD): + # rbd2 contains a StandbyModel at node 7. + assert not rbd2.is_analytically_solvable() + assert rbd2.get_non_analytic_nodes() == {7: "StandbyModel"} + + +def test_non_analytic_standby_koon(rbd2_koon: NonRepairableRBD): + # rbd2_koon also contains the StandbyModel at node 7. + assert not rbd2_koon.is_analytically_solvable() + assert 7 in rbd2_koon.get_non_analytic_nodes() + + +# --- structure_check wiring ------------------------------------------------ + + +def test_structure_check_fields( + rbd_series: NonRepairableRBD, rbd2: NonRepairableRBD +): + assert rbd_series.structure_check["is_analytically_solvable"] is True + assert rbd_series.structure_check["non_analytic_nodes"] == {} + + assert rbd2.structure_check["is_analytically_solvable"] is False + assert rbd2.structure_check["non_analytic_nodes"] == {7: "StandbyModel"} From e357fae8405394c6554e858846fcba2fa7abdcc7 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 17 Jun 2026 09:59:00 +0000 Subject: [PATCH 2/9] Remove dd/BDD dependency; exact reliability via Shannon decomposition Replaces the `dd` Binary Decision Diagram dependency with a pure-Python exact reliability engine and a direct structure-function evaluation. - rbd.py: add probability_any_set_satisfied(), an exact Shannon decomposition over the minimal path/cut sets with memoisation. It returns the same result as the previous inclusion-exclusion but avoids the 2^(#sets) blow-up by solving each shared sub-problem once. - system_probability(): now uses the new engine. method="p" computes reliability from path sets; method="c" computes unreliability from cut sets. The approx=True path keeps the first-order rare-event cut-set approximation, and method="p" + approx=True still raises. - is_system_working(): re-implemented without a BDD. The system works iff some minimal path set is fully up ("p") / every minimal cut set has one component up ("c"). Path/cut sets are cached on first use for the simulation hot loop. - Drop compile_bdd() and the BDD bdd_to_string/bdd_to_file helpers from RBD and RepairableRBD; RepairableRBD now inherits is_system_working. - Remove dd from requirements.txt and the mypy override list. Adds test_rbd_exact_probability.py: cross-checks the path-set and cut-set methods against a brute-force enumeration on a bridge network, and checks the new is_system_working truth tables. Existing sf()/availability tests (both methods + approx) continue to pass. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_01D3uxEk7qdLTcRhR8Avnk5M --- pyproject.toml | 1 - repyability/rbd/non_repairable_rbd.py | 3 - repyability/rbd/rbd.py | 283 +++++++++--------- repyability/rbd/repairable_rbd.py | 126 -------- .../tests/test_rbd_exact_probability.py | 96 ++++++ requirements.txt | 1 - 6 files changed, 244 insertions(+), 266 deletions(-) create mode 100644 repyability/tests/test_rbd_exact_probability.py diff --git a/pyproject.toml b/pyproject.toml index 43b36da..3cc3d97 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,7 +13,6 @@ profile = "black" module = [ 'scipy.*', 'networkx', - 'dd', 'surpyval.*', 'tqdm', ] diff --git a/repyability/rbd/non_repairable_rbd.py b/repyability/rbd/non_repairable_rbd.py index e2940c2..3a7dcf2 100644 --- a/repyability/rbd/non_repairable_rbd.py +++ b/repyability/rbd/non_repairable_rbd.py @@ -199,9 +199,6 @@ def __init__( ) self.structure_check["non_analytic_nodes"] = non_analytic_nodes - if self.structure_check["is_valid"]: - self.compile_bdd() - @check_x def sf( self, diff --git a/repyability/rbd/rbd.py b/repyability/rbd/rbd.py index ed3c4e9..645bd46 100644 --- a/repyability/rbd/rbd.py +++ b/repyability/rbd/rbd.py @@ -2,12 +2,11 @@ import warnings from collections import defaultdict from copy import copy -from itertools import combinations, product +from itertools import product from typing import Any, Dict, Hashable, Iterable, Iterator, Optional import networkx as nx import numpy as np -from dd import autoref as _bdd from numpy.typing import ArrayLike from scipy.optimize import minimize, root from scipy.special import expit as sigmoid @@ -71,6 +70,82 @@ def scale_probability_dict( return out +def probability_any_set_satisfied( + sets: Iterable[frozenset], + element_probabilities: Dict[Any, np.ndarray], + array_shape, +) -> np.ndarray: + """Exact probability that at least one of the given sets is fully active. + + Each "set" is a collection of elements (e.g. a minimal path set of + components); the set is "satisfied" when *all* of its elements are active. + Given the per-element probability of being active, this returns the + probability that *at least one* set is satisfied. + + With path sets and node reliabilities this is the system reliability; with + cut sets and node unreliabilities it is the system unreliability. + + The computation is an exact Shannon decomposition of the structure + function: it repeatedly conditions on a single element being active or + inactive, + + P(S) = p_e * P(S | e active) + (1 - p_e) * P(S | e inactive), + + and memoises sub-problems. Because shared sub-functions are only solved + once, this avoids the 2^(#sets) blow-up of the inclusion-exclusion + principle while returning the identical exact result. + + Parameters + ---------- + sets : Iterable[frozenset] + The collection of sets (e.g. minimal path sets or cut sets). + element_probabilities : Dict[Any, np.ndarray] + Maps each element to its probability array of being active. + array_shape : + The shape of the probability arrays, used to seed the 0/1 base cases. + + Returns + ------- + np.ndarray + The probability that at least one set is fully active. + """ + sets = [frozenset(s) for s in sets] + memo: Dict[frozenset, np.ndarray] = {} + + def recurse(state: frozenset) -> np.ndarray: + # state is a frozenset of frozensets: the sets still to be satisfied, + # with already-active elements removed. + if not state: + # No set can be satisfied any more -> probability 0. + return np.zeros(array_shape) + if frozenset() in state: + # A set has had all its elements satisfied -> probability 1. + return np.ones(array_shape) + if state in memo: + return memo[state] + + # Pivot on the element appearing in the most sets, which tends to + # collapse the problem (and the memo table) fastest. + counts: Dict[Any, int] = {} + for s in state: + for element in s: + counts[element] = counts.get(element, 0) + 1 + pivot = max(counts, key=lambda e: counts[e]) + p = element_probabilities[pivot] + + # Pivot active: it satisfies its requirement, so drop it from every + # set that contained it (other sets are unaffected). + state_active = frozenset(s - {pivot} for s in state) + # Pivot inactive: any set needing it can never be satisfied -> drop it. + state_inactive = frozenset(s for s in state if pivot not in s) + + result = p * recurse(state_active) + (1 - p) * recurse(state_inactive) + memo[state] = result + return result + + return recurse(frozenset(sets)) + + class RBD: def __init__( self, @@ -250,60 +325,6 @@ def get_min_path_sets( return ret_set - def compile_bdd(self): - """ - Compiles the BDD, setting self.bdd to the BDD manager, and - self.bdd_system_ref to the BDD variable reference for the system state. - """ - # Instantiate the manager - bdd = _bdd.BDD() - bdd_c = _bdd.BDD() - - # Enable Dynamic Reordering (Rudell's Sifting Algorithm) - bdd.configure(reordering=True) - bdd_c.configure(reordering=True) - - # For all components, declare the BDD variable, - # and get the BDD variable reference - bdd_vars = {} - bdd_c_vars = {} - for component in self.G.nodes(): - bdd.declare(component) - bdd_vars[component] = bdd.var(component) - bdd_c.declare(component) - bdd_c_vars[component] = bdd_c.var(component) - - # Make path expressions - path_expressions: list[_bdd.Function] = [] - for min_path_set in self.get_min_path_sets(include_in_out_nodes=False): - min_path_set_as_list = list(min_path_set) - path_function = bdd_vars[min_path_set_as_list[0]] - for component in min_path_set_as_list[1:]: - path_function &= bdd_vars[component] - path_expressions.append(path_function) - - # Make cut expressions - cut_expressions: list[_bdd.Function] = [] - for min_cut_set in self.get_min_cut_sets(include_in_out_nodes=False): - min_cut_set_as_list = list(min_cut_set) - cut_function = bdd_c_vars[min_cut_set_as_list[0]] - for component in min_cut_set_as_list[1:]: - cut_function |= bdd_c_vars[component] - cut_expressions.append(cut_function) - - system: _bdd.Function = path_expressions[0] - for path_expression in path_expressions[1:]: - system |= path_expression - - system_c: _bdd.Function = cut_expressions[0] - for cut_expression in cut_expressions[1:]: - system_c &= cut_expression - - self.bdd = bdd - self.bdd_c = bdd_c - self.bdd_system_ref = system - self.bdd_system_c_ref = system_c - def is_system_working( self, component_status: dict[Any, bool], method: str ) -> bool: @@ -316,35 +337,54 @@ def is_system_working( Dictionary with all components where component_status[component] = True only if the component is working, and = False if not working. + method : str + Either "p" (path-set) or "c" (cut-set). Both return the same + result; "p" is typically faster as it avoids deriving the cut + sets. Returns ------- bool True if the system is working, otherwise False. """ - # This function uses a Binary Decision Diagram (BDD) to enable - # efficient component_status -> is_system_working lookup. Something - # very much required given the rate at which this function is called - # in the N simulations in availability(). - - # Now evaluate the BDD given the component status - if method == "c": - system_given_component_status = self.bdd_c.let( - component_status, self.bdd_system_c_ref + # The system structure function is evaluated directly from the minimal + # path/cut sets, which is plenty fast for the rate at which this is + # called in the simulations (no Binary Decision Diagram required). + # + # - path-set ("p"): the system works iff at least one minimal path set + # has all of its components working. + # - cut-set ("c"): the system works iff every minimal cut set has at + # least one of its components working. + # + # The path/cut sets (excluding the input/output nodes) are cached on + # first use so repeated calls during a simulation are cheap. + if method == "p": + if not hasattr(self, "_eval_path_sets"): + self._eval_path_sets = [ + tuple(path_set) + for path_set in self.get_min_path_sets( + include_in_out_nodes=False + ) + ] + return any( + all(component_status[component] for component in path_set) + for path_set in self._eval_path_sets ) - - system_state = self.bdd_c.to_expr(system_given_component_status) - elif method == "p": - system_given_component_status = self.bdd.let( - component_status, self.bdd_system_ref + elif method == "c": + if not hasattr(self, "_eval_cut_sets"): + self._eval_cut_sets = [ + tuple(cut_set) + for cut_set in self.get_min_cut_sets( + include_in_out_nodes=False + ) + ] + return all( + any(component_status[component] for component in cut_set) + for cut_set in self._eval_cut_sets ) - - system_state = self.bdd.to_expr(system_given_component_status) else: raise ValueError("`method` must be either 'p' or 'c'") - return system_state == "TRUE" - def get_min_cut_sets( self, include_in_out_nodes=False ) -> set[frozenset[Hashable]]: @@ -446,8 +486,6 @@ def system_probability( else: # get shape of input array array_shape = lengths[0] - # Return array - system_prob = np.zeros(array_shape) # Check that path set method and approximation are not used together # (The approximation is only applicable to the cutset method) @@ -456,69 +494,44 @@ def system_probability( "The path set method must not be used with \ approx=True, see approx arg description in docstring." ) - # Get all node sets (path sets for method="p" and cut sets for - # method="c") - node_sets: set[frozenset] + if method == "p": - node_sets = self.get_min_path_sets(include_in_out_nodes=False) - else: - # method == "c" - node_sets = self.get_min_cut_sets(include_in_out_nodes=False) - node_probabilities = { - k: 1 - v for k, v in node_probabilities.items() - } + # The system reliability is the probability that at least one + # minimal path set has all of its components working. + path_sets = self.get_min_path_sets(include_in_out_nodes=False) + return probability_any_set_satisfied( + path_sets, node_probabilities, array_shape + ) - num_node_sets = len(node_sets) - # Perform intersection calculation, which isn't as simple as summating - # in the case of mutual non-exclusivity - # i is the 'level' of the intersection calc - # This is really just applying the inclusion-exclusion principle - for i in range(1, num_node_sets + 1): - # Get node set combinations for level i - node_set_combs = combinations(node_sets, i) - - # Calculate the probability of each level combination - # Making sure to not multiply a components' probability twice - level_sum = np.zeros(array_shape) - for node_set_comb in node_set_combs: - # Make a set of components out of the node path/tieset - s = set() - for path in node_set_comb: - for node in path: - if node in self.in_or_out: - continue - else: - # Add node to combination list - s.add(node) - - # Now calculate the node set probability - node_set_prob = np.ones(array_shape) - for comp in s: - comp_prob = node_probabilities[comp] - node_set_prob = node_set_prob * comp_prob - - # Now add the node set probability to the level sum - level_sum = level_sum + node_set_prob - - # Finally add/subtract the level sum to/from the system_prob if the - # level is even/odd - if i % 2 == 1: - system_prob = system_prob + level_sum - - # If the approximation is requested, just break from the - # inclusion-exclusion principle procedure after the first level - if approx: - break - else: - system_prob = system_prob - level_sum + # method == "c": work with cut sets and node unreliabilities. The + # system unreliability is the probability that at least one minimal + # cut set has all of its components failed. + cut_sets = self.get_min_cut_sets(include_in_out_nodes=False) + node_unreliability = { + k: 1 - v for k, v in node_probabilities.items() + } - # If cutset method is used, the above returns the unreliability, - # so we just have to return = 1 - system_prob - if method == "c": - return 1 - system_prob + if approx: + # First-order (rare-event) approximation: sum the probabilities of + # each cut set failing. This is the first term of the + # inclusion-exclusion principle, an upper bound on unreliability + # that is typically sufficient when reliabilities are close to 1. + system_unreliability = np.zeros(array_shape) + for cut_set in cut_sets: + cut_set_fail_prob = np.ones(array_shape) + for comp in cut_set: + cut_set_fail_prob = cut_set_fail_prob * ( + node_unreliability[comp] + ) + system_unreliability = ( + system_unreliability + cut_set_fail_prob + ) + return 1 - system_unreliability - # Otherwise for the pathset method it's already the reliability - return system_prob + system_unreliability = probability_any_set_satisfied( + cut_sets, node_unreliability, array_shape + ) + return 1 - system_unreliability @check_probability def improvement_allocation( diff --git a/repyability/rbd/repairable_rbd.py b/repyability/rbd/repairable_rbd.py index 7cffaca..ff16b11 100644 --- a/repyability/rbd/repairable_rbd.py +++ b/repyability/rbd/repairable_rbd.py @@ -5,7 +5,6 @@ from typing import Any, Collection, Hashable, Iterable, List, Tuple import numpy as np -from dd import autoref as _bdd from tqdm import tqdm from repyability.non_repairable import NonRepairable @@ -140,51 +139,6 @@ def __init__( self.components = components self.repairability = copy(repairability) - # Compile BDD, sets self.bdd and self.bdd_system_ref. See compile_bdd() - # docstring for more info. - self.compile_bdd() - - def is_system_working( - self, component_status: dict[Any, bool], method: str - ) -> bool: - """Returns a boolean as to whether the system is working given the - status of the components - - Parameters - ---------- - component_status : dict[Any, bool] - Dictionary with all components where - component_status[component] = True only if the component is - working, and = False if not working. - - Returns - ------- - bool - True if the system is working, otherwise False. - """ - # This function uses a Binary Decision Diagram (BDD) to enable - # efficient component_status -> is_system_working lookup. Something - # very much required given the rate at which this function is called - # in the N simulations in availability(). - - # Now evaluate the BDD given the component status - if method == "c": - system_given_component_status = self.bdd_c.let( - component_status, self.bdd_system_c_ref - ) - - system_state = self.bdd_c.to_expr(system_given_component_status) - elif method == "p": - system_given_component_status = self.bdd.let( - component_status, self.bdd_system_ref - ) - - system_state = self.bdd.to_expr(system_given_component_status) - else: - raise ValueError("`method` must be either 'p' or 'c'") - - return system_state == "TRUE" - def initialize_event_queue( self, t_simulation, @@ -572,86 +526,6 @@ def availability( return simulation_results - def compile_bdd(self): - """ - Compiles the BDD, setting self.bdd to the BDD manager, and - self.bdd_system_ref to the BDD variable reference for the system state. - """ - # Instantiate the manager - bdd = _bdd.BDD() - bdd_c = _bdd.BDD() - - # Enable Dynamic Reordering (Rudell's Sifting Algorithm) - bdd.configure(reordering=True) - bdd_c.configure(reordering=True) - - # For all components, declare the BDD variable, - # and get the BDD variable reference - bdd_vars = {} - bdd_c_vars = {} - for component in self.get_nodes_names(): - bdd.declare(component) - bdd_vars[component] = bdd.var(component) - bdd_c.declare(component) - bdd_c_vars[component] = bdd_c.var(component) - - # Make path expressions - path_expressions: list[_bdd.Function] = [] - for min_path_set in self.get_min_path_sets(include_in_out_nodes=False): - min_path_set_as_list = list(min_path_set) - path_function = bdd_vars[min_path_set_as_list[0]] - for component in min_path_set_as_list[1:]: - path_function &= bdd_vars[component] - path_expressions.append(path_function) - - # Make cut expressions - cut_expressions: list[_bdd.Function] = [] - for min_cut_set in self.get_min_cut_sets(include_in_out_nodes=False): - min_cut_set_as_list = list(min_cut_set) - cut_function = bdd_c_vars[min_cut_set_as_list[0]] - for component in min_cut_set_as_list[1:]: - cut_function |= bdd_c_vars[component] - cut_expressions.append(cut_function) - - system: _bdd.Function = path_expressions[0] - for path_expression in path_expressions[1:]: - system |= path_expression - - system_c: _bdd.Function = cut_expressions[0] - for cut_expression in cut_expressions[1:]: - system_c &= cut_expression - - self.bdd = bdd - self.bdd_c = bdd_c - self.bdd_system_ref = system - self.bdd_system_c_ref = system_c - - # Debugging Functions - these help to understand the BDD structure - def bdd_to_string(self, filename: str) -> str: - """Returns the BDD as a string. Simply wraps bdd.to_expr()""" - return self.bdd.to_expr(self.bdd_system_ref) - - def bdd_to_file(self, filename: str): - """ - Wraps bdd.dump(). The file type is inferred from the extension - (case insensitive). - - Note: bdd.dump() depends on the python library `pydot` being installed - (via pip) and the graphviz' `dot` program being in your PATH. - Install pydot just with pip: `pip install pydot`, - and install graphviz, at least on Mac, with `brew install graphviz`. - Installing with brew will at least make sure `dot` is added to your - PATH. - - Supported extensions: - '.p' for Pickle - '.pdf' for PDF - '.png' for PNG - '.svg' for SVG - '.json' for JSON - """ - self.bdd.dump(filename, roots=[self.bdd_system_ref]) - def node_availability(self): node_av: dict = {} for node_name, component in self.components.items(): diff --git a/repyability/tests/test_rbd_exact_probability.py b/repyability/tests/test_rbd_exact_probability.py new file mode 100644 index 0000000..b7f08a4 --- /dev/null +++ b/repyability/tests/test_rbd_exact_probability.py @@ -0,0 +1,96 @@ +""" +Tests the exact reliability engine (probability_any_set_satisfied, used by +system_probability) and the BDD-free is_system_working() evaluation. + +The exact engine is cross-checked against a brute-force enumeration over every +component up/down combination, on a non-series-parallel (bridge) network where +the cut-set and path-set methods must both equal the true value. + +Uses pytest fixtures located in conftest.py in the tests/ directory. +""" + +import itertools + +import pytest +from surpyval import FixedEventProbability + +from repyability.rbd.non_repairable_rbd import NonRepairableRBD + + +def _brute_force_reliability(rbd: NonRepairableRBD, rel: dict) -> float: + """Reliability by summing P(state) over every state in which the system + works, using the path-set structure function directly.""" + nodes = list(rbd.nodes) + path_sets = [ + set(p) for p in rbd.get_min_path_sets(include_in_out_nodes=False) + ] + total = 0.0 + for bits in itertools.product([0, 1], repeat=len(nodes)): + status = {n: bool(b) for n, b in zip(nodes, bits)} + works = any(all(status[c] for c in ps) for ps in path_sets) + if not works: + continue + p = 1.0 + for n, b in zip(nodes, bits): + p *= rel[n] if b else (1 - rel[n]) + total += p + return total + + +def _bridge_rbd(rel: dict) -> NonRepairableRBD: + # rbd3 topology: a bridge network that is not reducible to series/parallel. + edges = [ + (0, 1), (0, 3), (1, 2), (3, 4), (1, 5), + (3, 5), (5, 2), (5, 4), (2, 6), (4, 6), + ] + reliabilities = { + n: FixedEventProbability.from_params(1 - r) for n, r in rel.items() + } + return NonRepairableRBD(edges, reliabilities) + + +def test_exact_matches_brute_force_bridge(): + rel = {1: 0.9, 2: 0.8, 3: 0.7, 4: 0.85, 5: 0.6} + rbd = _bridge_rbd(rel) + expected = _brute_force_reliability(rbd, rel) + assert rbd.sf(1, method="p")[0] == pytest.approx(expected) + assert rbd.sf(1, method="c")[0] == pytest.approx(expected) + + +def test_cut_and_path_methods_agree( + rbd3: NonRepairableRBD, + rbd1: NonRepairableRBD, + rbd_parallel: NonRepairableRBD, +): + for rbd in (rbd3, rbd1, rbd_parallel): + assert rbd.sf(1, method="p") == pytest.approx(rbd.sf(1, method="c")) + + +def test_is_system_working_series(rbd_series: NonRepairableRBD): + status = {n: True for n in rbd_series.G.nodes} + assert rbd_series.is_system_working(status, "p") + assert rbd_series.is_system_working(status, "c") + + # Any single failure breaks a series system. + status[3] = False + assert not rbd_series.is_system_working(status, "p") + assert not rbd_series.is_system_working(status, "c") + + +def test_is_system_working_parallel(rbd_parallel: NonRepairableRBD): + status = {n: True for n in rbd_parallel.G.nodes} + # All redundant branches down -> system down. + status[2] = status[3] = status[4] = False + assert not rbd_parallel.is_system_working(status, "p") + assert not rbd_parallel.is_system_working(status, "c") + + # A single branch back up -> system up. + status[3] = True + assert rbd_parallel.is_system_working(status, "p") + assert rbd_parallel.is_system_working(status, "c") + + +def test_is_system_working_bad_method(rbd_series: NonRepairableRBD): + status = {n: True for n in rbd_series.G.nodes} + with pytest.raises(ValueError): + rbd_series.is_system_working(status, "x") diff --git a/requirements.txt b/requirements.txt index 1c13022..6ef5fd1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,5 @@ # Production requirements # For development, use requirements_dev.txt instead -dd==0.5.7 networkx==3.0 numpy==1.23.5 surpyval==0.10.10 From 60f3a79a56f585b3743370823857d60802b3670e Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 17 Jun 2026 09:59:42 +0000 Subject: [PATCH 3/9] Add .gitignore for Python build and test artifacts Ignore __pycache__, compiled files, and pytest/coverage/mypy caches so they are not reported as untracked. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_01D3uxEk7qdLTcRhR8Avnk5M --- .gitignore | 15 +++++++++++++++ 1 file changed, 15 insertions(+) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4e7c166 --- /dev/null +++ b/.gitignore @@ -0,0 +1,15 @@ +# Byte-compiled / optimized files +__pycache__/ +*.py[cod] +*$py.class + +# Test / coverage artifacts +.pytest_cache/ +.coverage +htmlcov/ +.mypy_cache/ + +# Build / distribution +build/ +dist/ +*.egg-info/ From 7bd9ddbe917fbec71c6b0f345ed9bf43f7fb8d1d Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 17 Jun 2026 10:20:34 +0000 Subject: [PATCH 4/9] Compute cut sets with Berge's algorithm; default sf to path sets - minimal_cut_sets_from_path_sets(): computes the minimal cut sets as the minimal transversals (hitting sets) of the minimal path sets using Berge's algorithm. It builds the transversals incrementally one path set at a time and prunes non-minimal candidates at each step, instead of materialising the full Cartesian product of the path sets and filtering once at the end. Works directly from the path sets, so it stays correct for k-out-of-n structures. get_min_cut_sets() now delegates to it; the now-unused itertools.product import is removed. - Default the system reliability to the path-set method: - RBD.system_probability default method "c" -> "p". - NonRepairableRBD.sf default method is now resolved from None: path sets by default (avoids deriving the cut sets), falling back to the cut-set method when approx=True (the approximation only applies to cut sets). Explicit method="p" with approx=True still raises. Adds test_rbd_berge_cut_sets.py: cross-checks Berge against the previous Cartesian-product implementation across every fixture (including the k-out-of-n ones), checks hand-computed examples, verifies the cut sets are minimal transversals, and checks the new default sf behaviour. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_01D3uxEk7qdLTcRhR8Avnk5M --- repyability/rbd/non_repairable_rbd.py | 16 ++- repyability/rbd/rbd.py | 99 ++++++++++---- repyability/tests/test_rbd_berge_cut_sets.py | 137 +++++++++++++++++++ 3 files changed, 220 insertions(+), 32 deletions(-) create mode 100644 repyability/tests/test_rbd_berge_cut_sets.py diff --git a/repyability/rbd/non_repairable_rbd.py b/repyability/rbd/non_repairable_rbd.py index 3a7dcf2..cf15c6b 100644 --- a/repyability/rbd/non_repairable_rbd.py +++ b/repyability/rbd/non_repairable_rbd.py @@ -205,7 +205,7 @@ def sf( x: Optional[ArrayLike] = None, working_nodes: Collection[Hashable] = [], broken_nodes: Collection[Hashable] = [], - method: str = "c", + method: Optional[str] = None, approx: bool = False, ) -> np.ndarray: """Returns the system reliability for time/s x. @@ -224,8 +224,11 @@ def sf( Marks these components as perfectly unreliable, by default [] method: str, optional Input either "c" or "p" for the function to use the cut set or - path set methods respectively, by default "c". Both methods - ultimately return the same results though. + path set methods respectively. Both methods ultimately return the + same results. By default (None) the path set method ("p") is used, + as it avoids deriving the cut sets; if approx=True is requested the + cut set method ("c") is used instead, as the approximation only + applies to cut sets. approx: bool, optional If true, only considers the first-order terms (w.r.t. the inclusion-exclusion principle), thereby reducing computation time. @@ -267,6 +270,13 @@ def sf( ).format(node, self.repeated[node]) ) + # Resolve the default method. Path sets are used by default (they + # avoid deriving the cut sets); the cut set method is used when the + # first-order approximation is requested, as approx only applies to + # cut sets. + if method is None: + method = "c" if approx else "p" + # Check that path set method and approximation are not used together # (The approximation is only applicable to the cutset method) if method == "p" and approx: diff --git a/repyability/rbd/rbd.py b/repyability/rbd/rbd.py index 645bd46..d0b2043 100644 --- a/repyability/rbd/rbd.py +++ b/repyability/rbd/rbd.py @@ -2,7 +2,6 @@ import warnings from collections import defaultdict from copy import copy -from itertools import product from typing import Any, Dict, Hashable, Iterable, Iterator, Optional import networkx as nx @@ -146,6 +145,66 @@ def recurse(state: frozenset) -> np.ndarray: return recurse(frozenset(sets)) +def _keep_minimal_sets(sets: Iterable[frozenset]) -> list[frozenset]: + """Return only the inclusion-minimal sets. + + Discards any set that is a superset of another (and de-duplicates). + """ + minimal: list[frozenset] = [] + # Consider smaller sets first so that a kept set can prune its supersets. + for candidate in sorted(set(sets), key=len): + if not any(kept <= candidate for kept in minimal): + minimal.append(candidate) + return minimal + + +def minimal_cut_sets_from_path_sets( + path_sets: Iterable[frozenset], +) -> set[frozenset]: + """Return the minimal cut sets given the minimal path sets. + + A minimal cut set is a minimal "transversal" (hitting set) of the path + sets: a smallest set of components that intersects every path set, so that + failing those components breaks every path through the system. + + This uses Berge's algorithm: it builds the minimal transversals + incrementally, one path set at a time, discarding non-minimal candidates at + every step. Unlike taking the full Cartesian product of the path sets and + filtering once at the end, it never materialises the (potentially enormous) + product, which makes it dramatically faster in practice. Because it works + directly from the path sets, it stays correct for k-out-of-n structures, + whose k-of-n behaviour is already encoded in the path sets. + + Parameters + ---------- + path_sets : Iterable[frozenset] + The minimal path sets (each a set of components). + + Returns + ------- + set[frozenset] + The minimal cut sets. + """ + # Start with the single empty transversal and extend it to hit each path + # set in turn. + transversals: list[frozenset] = [frozenset()] + for path_set in path_sets: + path_set = frozenset(path_set) + candidates: list[frozenset] = [] + for transversal in transversals: + if transversal & path_set: + # Already hits this path set; keep it unchanged. + candidates.append(transversal) + else: + # Must be extended to hit this path set, by one of its + # components. + for component in path_set: + candidates.append(transversal | {component}) + # Prune non-minimal candidates now so the working set stays small. + transversals = _keep_minimal_sets(candidates) + return set(transversals) + + class RBD: def __init__( self, @@ -392,35 +451,15 @@ def get_min_cut_sets( Returns the set of frozensets of minimal cut sets of the RBD. The outer set contains the frozenset of nodes. frozensets were used so the inner set elements could be hashable. + + The minimal cut sets are the minimal transversals (hitting sets) of the + minimal path sets, computed with Berge's algorithm. See + minimal_cut_sets_from_path_sets() for details. """ path_sets = self.get_min_path_sets( include_in_out_nodes=include_in_out_nodes ) - - # Gets the cartesian product across pathsets - prods = product(*path_sets) - - # We need to remove duplicate nodes in the products to get the cutsets, - # and discard empty products - cut_sets = [frozenset(prod) for prod in prods if prod] - - min_cut_sets: list[frozenset] = [] - - # Now only insert if minimal, removing any superset (non-minimal) - # cutsets are encountered - for cut_set in cut_sets: - is_minimal_cut_set = True - for other_cut_set in min_cut_sets.copy(): - if cut_set.issuperset(other_cut_set): - is_minimal_cut_set = False - break - if cut_set.issubset(other_cut_set): - min_cut_sets.remove(other_cut_set) - - if is_minimal_cut_set: - min_cut_sets.append(cut_set) - - return set(min_cut_sets) + return minimal_cut_sets_from_path_sets(path_sets) def path_set_probabilities(self, node_probabilities): path_sets = self.get_min_path_sets(include_in_out_nodes=False) @@ -435,7 +474,7 @@ def path_set_probabilities(self, node_probabilities): def system_probability( self, node_probabilities: Dict, - method: str = "c", + method: str = "p", approx: bool = False, ) -> np.ndarray: """Returns the system probability/ies given the probability of each @@ -449,8 +488,10 @@ def system_probability( availability (or some other probability that I can't conceive). method: str, optional Input either "c" or "p" for the function to use the cut set or - path set methods respectively, by default "c". Both methods - ultimately return the same results though. + path set methods respectively, by default "p". Both methods + ultimately return the same results though; the path set method is + the default as it avoids deriving the cut sets. The cut set method + (method="c") is required for the approx option below. approx: bool, optional If true, only considers the first-order terms (w.r.t. the inclusion-exclusion principle), thereby reducing computation time. diff --git a/repyability/tests/test_rbd_berge_cut_sets.py b/repyability/tests/test_rbd_berge_cut_sets.py new file mode 100644 index 0000000..8ac860e --- /dev/null +++ b/repyability/tests/test_rbd_berge_cut_sets.py @@ -0,0 +1,137 @@ +""" +Tests Berge's algorithm for the minimal cut sets +(minimal_cut_sets_from_path_sets / RBD.get_min_cut_sets) and the new default +path-set method for sf(). + +Berge's output is cross-checked against the previous implementation (full +Cartesian product of the path sets, then minimalised) across every fixture, +including the k-out-of-n ones, plus a few hand-computed examples. + +Uses pytest fixtures located in conftest.py in the tests/ directory. +""" + +import itertools + +import pytest + +from repyability.rbd.non_repairable_rbd import NonRepairableRBD +from repyability.rbd.rbd import minimal_cut_sets_from_path_sets + + +def _reference_min_cut_sets(path_sets): + """The previous implementation: full Cartesian product then minimalise. + + Kept here purely as an independent reference to verify Berge against. + """ + path_sets = [set(p) for p in path_sets] + prods = itertools.product(*path_sets) + cut_sets = [frozenset(prod) for prod in prods if prod] + minimal: list = [] + for cut_set in cut_sets: + is_minimal = True + for other in minimal.copy(): + if cut_set.issuperset(other): + is_minimal = False + break + if cut_set.issubset(other): + minimal.remove(other) + if is_minimal: + minimal.append(cut_set) + return set(minimal) + + +# --- Hand-computed examples ------------------------------------------------ + + +def test_berge_series(): + assert minimal_cut_sets_from_path_sets([{2, 3, 4}]) == { + frozenset([2]), + frozenset([3]), + frozenset([4]), + } + + +def test_berge_parallel(): + assert minimal_cut_sets_from_path_sets([{2}, {3}, {4}]) == { + frozenset([2, 3, 4]) + } + + +def test_berge_bridge_example(): + # Path sets of a small bridge: cut sets are the minimal hitting sets. + path_sets = [frozenset({1, 2}), frozenset({3, 4}), frozenset({1, 4})] + assert minimal_cut_sets_from_path_sets(path_sets) == { + frozenset({1, 3}), + frozenset({1, 4}), + frozenset({2, 4}), + } + + +# --- Cross-check Berge against the reference across all fixtures ------------ + + +FIXTURE_NAMES = [ + "rbd_series", + "rbd_parallel", + "rbd1", + "rbd2", + "rbd3", + "rbd_repeated_component_parallel", + "rbd1_koon", + "rbd2_koon", + "rbd3_koon1", + "rbd3_koon2", + "rbd3_koon3", + "rbd_koon_simple", +] + + +@pytest.mark.parametrize("fixture_name", FIXTURE_NAMES) +def test_berge_matches_reference(fixture_name, request): + rbd = request.getfixturevalue(fixture_name) + path_sets = rbd.get_min_path_sets(include_in_out_nodes=False) + expected = _reference_min_cut_sets(path_sets) + assert minimal_cut_sets_from_path_sets(path_sets) == expected + # And the method itself returns the same. + assert rbd.get_min_cut_sets() == expected + + +@pytest.mark.parametrize("fixture_name", FIXTURE_NAMES) +def test_cut_sets_are_minimal_transversals(fixture_name, request): + rbd = request.getfixturevalue(fixture_name) + path_sets = [ + set(p) for p in rbd.get_min_path_sets(include_in_out_nodes=False) + ] + cut_sets = rbd.get_min_cut_sets() + + # Every cut set must hit every path set (it is a transversal). + for cut_set in cut_sets: + for path_set in path_sets: + assert cut_set & path_set + + # No cut set is a proper subset of another (they are all minimal). + for a in cut_sets: + for b in cut_sets: + if a != b: + assert not a < b + + +# --- New default: path-set method ------------------------------------------ + + +def test_sf_default_method_is_path_set(rbd3: NonRepairableRBD): + # The default (method=None) must match the explicit path-set method, and + # both methods agree. + assert rbd3.sf(1) == pytest.approx(rbd3.sf(1, method="p")) + assert rbd3.sf(1) == pytest.approx(rbd3.sf(1, method="c")) + + +def test_sf_default_with_approx_does_not_raise(rbd1: NonRepairableRBD): + # approx=True with the default method must route to cut sets, not raise. + value = rbd1.sf(1, approx=True) + assert value == pytest.approx(rbd1.sf(1, method="c")) + + +def test_sf_explicit_path_with_approx_raises(rbd1: NonRepairableRBD): + with pytest.raises(ValueError): + rbd1.sf(1, method="p", approx=True) From 0fdf47a48da62b67a8fc27b79efc210941cdc7c9 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 17 Jun 2026 10:31:02 +0000 Subject: [PATCH 5/9] Remove the approx (first-order) option from sf/system_probability The approx flag was a first-order rare-event approximation that pre-dated the exact engine. It was never actually wired through sf() (only its error-check fired), and the exact path/cut-set computation is now cheap, so the option added API surface without benefit. - RBD.system_probability: drop the approx parameter and the first-order cut-set branch; both methods now always return the exact result. - NonRepairableRBD.sf: drop approx; method now defaults straight to "p" (no None sentinel needed) and the method="p"+approx ValueError is gone. - Remove the corresponding approx tests from test_rbd_sf.py and the approx-routing assertions from test_rbd_berge_cut_sets.py. The separate Fussell-Vesely approx option (_fussel_vesely) is unrelated and left untouched. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_01D3uxEk7qdLTcRhR8Avnk5M --- repyability/rbd/non_repairable_rbd.py | 41 +++--------------- repyability/rbd/rbd.py | 45 ++------------------ repyability/tests/test_rbd_berge_cut_sets.py | 15 +------ repyability/tests/test_rbd_sf.py | 35 --------------- 4 files changed, 12 insertions(+), 124 deletions(-) diff --git a/repyability/rbd/non_repairable_rbd.py b/repyability/rbd/non_repairable_rbd.py index cf15c6b..439d2f7 100644 --- a/repyability/rbd/non_repairable_rbd.py +++ b/repyability/rbd/non_repairable_rbd.py @@ -205,8 +205,7 @@ def sf( x: Optional[ArrayLike] = None, working_nodes: Collection[Hashable] = [], broken_nodes: Collection[Hashable] = [], - method: Optional[str] = None, - approx: bool = False, + method: str = "p", ) -> np.ndarray: """Returns the system reliability for time/s x. @@ -224,18 +223,9 @@ def sf( Marks these components as perfectly unreliable, by default [] method: str, optional Input either "c" or "p" for the function to use the cut set or - path set methods respectively. Both methods ultimately return the - same results. By default (None) the path set method ("p") is used, - as it avoids deriving the cut sets; if approx=True is requested the - cut set method ("c") is used instead, as the approximation only - applies to cut sets. - approx: bool, optional - If true, only considers the first-order terms (w.r.t. the - inclusion-exclusion principle), thereby reducing computation time. - This approximation is only applicable to the cut set method - (method="c"), a ValueError exception is raised if method="p" and - approx=True. This approximation is typically sufficient for most - use cases where reliabilities are close to 1. By default, False. + path set methods respectively. Both methods return the same + (exact) result. By default the path set method ("p") is used, as + it avoids deriving the cut sets. Returns ------- @@ -245,11 +235,9 @@ def sf( Raises ------ ValueError - - Working/broken node/component inconsistency (a component or node - is supplied more than once to any of working_nodes, broken_nodes, - working_components, broken_components) - - The path set method must not be used with approx=True, see approx - arg description above + Working/broken node/component inconsistency (a component or node + is supplied more than once to any of working_nodes, broken_nodes, + working_components, broken_components) """ # Check for any node/component argument inconsistency # check_sf_node_component_args_consistency( @@ -270,21 +258,6 @@ def sf( ).format(node, self.repeated[node]) ) - # Resolve the default method. Path sets are used by default (they - # avoid deriving the cut sets); the cut set method is used when the - # first-order approximation is requested, as approx only applies to - # cut sets. - if method is None: - method = "c" if approx else "p" - - # Check that path set method and approximation are not used together - # (The approximation is only applicable to the cutset method) - if method == "p" and approx: - raise ValueError( - "The path set method must not be used with \ - approx=True, see approx arg description in docstring." - ) - # Turn node iterables into sets for O(1) lookup later working_nodes = set(working_nodes) broken_nodes = set(broken_nodes) diff --git a/repyability/rbd/rbd.py b/repyability/rbd/rbd.py index d0b2043..59bdbca 100644 --- a/repyability/rbd/rbd.py +++ b/repyability/rbd/rbd.py @@ -475,7 +475,6 @@ def system_probability( self, node_probabilities: Dict, method: str = "p", - approx: bool = False, ) -> np.ndarray: """Returns the system probability/ies given the probability of each node. @@ -489,16 +488,8 @@ def system_probability( method: str, optional Input either "c" or "p" for the function to use the cut set or path set methods respectively, by default "p". Both methods - ultimately return the same results though; the path set method is - the default as it avoids deriving the cut sets. The cut set method - (method="c") is required for the approx option below. - approx: bool, optional - If true, only considers the first-order terms (w.r.t. the - inclusion-exclusion principle), thereby reducing computation time. - This approximation is only applicable to the cut set method - (method="c"), a ValueError exception is raised if method="p" and - approx=True. This approximation is typically sufficient for most - use cases where reliabilities are close to 1. By default, False. + return the same (exact) result; the path set method is the default + as it avoids deriving the cut sets. Returns ------- @@ -508,11 +499,7 @@ def system_probability( Raises ------ ValueError - - Working/broken node/component inconsistency (a component or node - is supplied more than once to any of working_nodes, broken_nodes, - working_components, broken_components) - - The path set method must not be used with approx=True, see approx - arg description above + Probability arrays of differing lengths """ node_probabilities = copy(node_probabilities) @@ -528,14 +515,6 @@ def system_probability( # get shape of input array array_shape = lengths[0] - # Check that path set method and approximation are not used together - # (The approximation is only applicable to the cutset method) - if method == "p" and approx: - raise ValueError( - "The path set method must not be used with \ - approx=True, see approx arg description in docstring." - ) - if method == "p": # The system reliability is the probability that at least one # minimal path set has all of its components working. @@ -551,24 +530,6 @@ def system_probability( node_unreliability = { k: 1 - v for k, v in node_probabilities.items() } - - if approx: - # First-order (rare-event) approximation: sum the probabilities of - # each cut set failing. This is the first term of the - # inclusion-exclusion principle, an upper bound on unreliability - # that is typically sufficient when reliabilities are close to 1. - system_unreliability = np.zeros(array_shape) - for cut_set in cut_sets: - cut_set_fail_prob = np.ones(array_shape) - for comp in cut_set: - cut_set_fail_prob = cut_set_fail_prob * ( - node_unreliability[comp] - ) - system_unreliability = ( - system_unreliability + cut_set_fail_prob - ) - return 1 - system_unreliability - system_unreliability = probability_any_set_satisfied( cut_sets, node_unreliability, array_shape ) diff --git a/repyability/tests/test_rbd_berge_cut_sets.py b/repyability/tests/test_rbd_berge_cut_sets.py index 8ac860e..d758dad 100644 --- a/repyability/tests/test_rbd_berge_cut_sets.py +++ b/repyability/tests/test_rbd_berge_cut_sets.py @@ -120,18 +120,7 @@ def test_cut_sets_are_minimal_transversals(fixture_name, request): def test_sf_default_method_is_path_set(rbd3: NonRepairableRBD): - # The default (method=None) must match the explicit path-set method, and - # both methods agree. + # The default method must match the explicit path-set method, and both + # methods agree. assert rbd3.sf(1) == pytest.approx(rbd3.sf(1, method="p")) assert rbd3.sf(1) == pytest.approx(rbd3.sf(1, method="c")) - - -def test_sf_default_with_approx_does_not_raise(rbd1: NonRepairableRBD): - # approx=True with the default method must route to cut sets, not raise. - value = rbd1.sf(1, approx=True) - assert value == pytest.approx(rbd1.sf(1, method="c")) - - -def test_sf_explicit_path_with_approx_raises(rbd1: NonRepairableRBD): - with pytest.raises(ValueError): - rbd1.sf(1, method="p", approx=True) diff --git a/repyability/tests/test_rbd_sf.py b/repyability/tests/test_rbd_sf.py index a452f6d..76b35db 100644 --- a/repyability/tests/test_rbd_sf.py +++ b/repyability/tests/test_rbd_sf.py @@ -144,38 +144,3 @@ def test_rbd_sf_NonRepairableRBD_as_node(rbd1: NonRepairableRBD, method: str): * rbd.reliabilities["NonRepairableRBD"].ff(t) * rbd.reliabilities[4].ff(t) ) == rbd.ff(t, method=method) - - -# Test approx - - -def test_rbd_sf_series_approx(rbd_series: NonRepairableRBD): - t = 5 - assert pytest.approx( - rbd_series.reliabilities[2].sf(t) - * rbd_series.reliabilities[3].sf(t) - * rbd_series.reliabilities[4].sf(t), - abs=0.001, - ) == rbd_series.sf(t, approx=True) - - -def test_rbd_sf_composite_approx(rbd1: NonRepairableRBD): - t = 2 - assert pytest.approx( - ( - 1 - - (1 - rbd1.reliabilities["pump1"].sf(t)) - * (1 - rbd1.reliabilities["pump2"].sf(t)) - ) - * rbd1.reliabilities["valve"].sf(t), - abs=0.001, - ) == rbd1.sf(t, approx=True) - - -def test_rbd_sf_complex_approx(rbd3: NonRepairableRBD): - assert pytest.approx(0.994780625, abs=0.001) == rbd3.sf(1000, approx=True) - - -def test_rbd_sf_path_set_method_and_approx_error(rbd1: NonRepairableRBD): - with pytest.raises(ValueError): - rbd1.sf(1, method="p", approx=True) From 6a745344e2902c5285afa1172e8f555842e34b0a Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 17 Jun 2026 10:45:10 +0000 Subject: [PATCH 6/9] Deterministic standby sf via numerical convolution Standby sf/ff were estimated from Monte-Carlo samples via a Kaplan-Meier fit: stochastic, a step function bounded by the sampled support, and the source of intermittent test failures. For cold standby the lifetime is the sum of the components' lifetimes, so its survival function is their convolution, which can be computed directly. - numerical_convolution.py: ConvolvedSurvival computes the survival function of a sum of independent lifetimes by numerically convolving the component densities (FFT) on a fine grid bounded robustly by doubling on sf, integrating to the CDF with trapezoidal quadrature and normalising away discretisation drift. sf()/ff()/mean() then read the precomputed grid -- fast, deterministic and accurate (~1e-3 vs closed forms). - StandbyModel: for k=1 (cold standby = sum) sf/ff now use the exact convolution instead of the KM fit; k>=2 (order-dependent) keeps the Monte-Carlo + KM approximation. - RepeatedStandbyNode: sf/ff use the convolution of `repeats` copies. random()/mean() are deliberately left unchanged (out of scope for this sf work). Note RepeatedStandbyNode.random/mean have a separate pre-existing bug (they sample the KM-of-the-sum and re-sum it, ~repeats x too large), which is why test_rbd_repeated_standby::test_mean expects 9 rather than 3 and remains the only flaky standby test. Adds test_rbd_numerical_convolution.py: checks the convolution against the Erlang (Gamma) and hypoexponential closed forms, array input, the single-model identity, and that StandbyModel(k=1)/RepeatedStandbyNode sf is exact and reproducible. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_01D3uxEk7qdLTcRhR8Avnk5M --- repyability/rbd/numerical_convolution.py | 97 +++++++++++++++++++ repyability/rbd/repeated_standby_node.py | 16 ++- repyability/rbd/standby_node.py | 26 +++-- .../tests/test_rbd_numerical_convolution.py | 76 +++++++++++++++ 4 files changed, 204 insertions(+), 11 deletions(-) create mode 100644 repyability/rbd/numerical_convolution.py create mode 100644 repyability/tests/test_rbd_numerical_convolution.py diff --git a/repyability/rbd/numerical_convolution.py b/repyability/rbd/numerical_convolution.py new file mode 100644 index 0000000..24fdc84 --- /dev/null +++ b/repyability/rbd/numerical_convolution.py @@ -0,0 +1,97 @@ +""" +Numerical convolution of independent lifetimes. + +A cold-standby arrangement (with perfect switching) fails after the *sum* of +its components' lifetimes, so the survival function of the arrangement is the +convolution of the components' distributions. This module computes that +survival function numerically -- deterministically and quickly -- as a robust +alternative to estimating it from Monte-Carlo samples with a Kaplan-Meier fit. +""" + +import numpy as np +from scipy.integrate import cumulative_trapezoid, trapezoid +from scipy.signal import fftconvolve + + +def _scalar(value) -> float: + """Return a plain float from a surpyval scalar/array result.""" + return float(np.atleast_1d(value)[0]) + + +def _upper_time(model, eps: float = 1e-10) -> float: + """A time by which ``model``'s survival has effectively reached zero. + + Found by doubling from the mean until sf <= eps, so it is robust for any + distribution exposing sf() and mean() (it does not rely on a quantile + function). + """ + t = max(_scalar(model.mean()), 1.0) + for _ in range(200): + if _scalar(model.sf(t)) <= eps: + break + t *= 2.0 + return t + + +def _density_on_grid(model, t: np.ndarray) -> np.ndarray: + """The model's density on the grid, with any non-finite values (e.g. an + infinite density at t=0 for some shapes) replaced by zero. The negligible + mass lost is restored by the later CDF normalisation.""" + pdf = np.asarray(model.df(t), dtype=float) + return np.nan_to_num(pdf, nan=0.0, posinf=0.0, neginf=0.0) + + +class ConvolvedSurvival: + """Survival function of a sum of independent lifetimes. + + Computes the survival function of ``X_1 + X_2 + ... + X_n`` (the lifetime + of a cold-standby arrangement with perfect switching) by numerically + convolving the component densities on a fine time grid. sf()/ff() then + interpolate the pre-computed grid, so they are fast and deterministic. + + Parameters + ---------- + models : sequence + The component lifetime distributions. Each must expose df() (density), + sf() (survival) and mean(). + n_points : int, optional + Number of grid points, by default 100_001. More points give more + accuracy at the cost of construction time. + eps : float, optional + Survival threshold used to bound the time grid, by default 1e-10. + """ + + def __init__(self, models, n_points: int = 100_001, eps: float = 1e-10): + models = list(models) + if len(models) == 0: + raise ValueError("Need at least one model to convolve.") + + # Bound the grid by the sum of the components' effective upper times + # (the support of the sum is contained in [0, sum of supports]). + upper = sum(_upper_time(model, eps) for model in models) + t = np.linspace(0.0, upper, n_points) + dt = t[1] - t[0] + + # Convolve the component densities to get the density of the sum. + pdf = _density_on_grid(models[0], t) + for model in models[1:]: + pdf = fftconvolve(pdf, _density_on_grid(model, t))[:n_points] * dt + + # Integrate to the CDF, normalising away discretisation drift so the + # total probability is exactly one, then form the survival function. + cdf = cumulative_trapezoid(pdf, t, initial=0.0) + cdf = cdf / cdf[-1] + self._t = t + self._sf = np.clip(1.0 - cdf, 0.0, 1.0) + + def sf(self, x): + """Survival function at x (1 at/below 0, ~0 beyond the grid).""" + return np.interp(x, self._t, self._sf, left=1.0, right=0.0) + + def ff(self, x): + """Cumulative failure probability (CDF) at x.""" + return 1.0 - self.sf(x) + + def mean(self, *args, **kwargs) -> float: + """Mean lifetime, E[T] = integral of the survival function.""" + return float(trapezoid(self._sf, self._t)) diff --git a/repyability/rbd/repeated_standby_node.py b/repyability/rbd/repeated_standby_node.py index 1304bf0..89f4bc5 100644 --- a/repyability/rbd/repeated_standby_node.py +++ b/repyability/rbd/repeated_standby_node.py @@ -1,15 +1,21 @@ import numpy as np from surpyval import KaplanMeier +from .numerical_convolution import ConvolvedSurvival + class RepeatedStandbyNode: def __init__(self, model, repeats, N=10_000, lower=-np.inf): self.model = model self.repeats = repeats - x_random = self.random(N) - # Create an approximation of the standby arrangement with - # a Kaplan-Meier estimation. + # Repeated cold standby: the lifetime is the sum of `repeats` + # independent copies of `model`. Its survival function is their + # convolution, computed deterministically here. + self._sf_model = ConvolvedSurvival([model] * repeats) + + # A Kaplan-Meier fit is retained only for random()/mean(). + x_random = self.random(N) self.model = KaplanMeier.fit(x_random, set_lower_limit=lower) def random(self, size): @@ -20,7 +26,7 @@ def mean(self, N=1000): return self.random(N).mean() def sf(self, *args, **kwargs): - return self.model.sf(*args, **kwargs) + return self._sf_model.sf(*args, **kwargs) def ff(self, *args, **kwargs): - return self.model.ff(*args, **kwargs) + return self._sf_model.ff(*args, **kwargs) diff --git a/repyability/rbd/standby_node.py b/repyability/rbd/standby_node.py index 1ef0a12..3b15400 100644 --- a/repyability/rbd/standby_node.py +++ b/repyability/rbd/standby_node.py @@ -3,6 +3,8 @@ import numpy as np from surpyval import KaplanMeier +from .numerical_convolution import ConvolvedSurvival + class StandbyModel: def __init__(self, reliabilities, k=1, n_sims=10_000, lower=-np.inf): @@ -16,12 +18,20 @@ def __init__(self, reliabilities, k=1, n_sims=10_000, lower=-np.inf): self.N = len(reliabilities) self.n_sims = n_sims - # Create n_sims samples of survival times using the random method - x_random = self.random(n_sims) - - # Create an approximation of the standby arrangement with - # a Kaplan-Meier estimation. - self.model = KaplanMeier.fit(x_random, set_lower_limit=lower) + if k == 1: + # Cold standby (k=1): the lifetime is the sum of the components' + # lifetimes, whose survival function is their convolution. Compute + # it deterministically by numerical convolution rather than + # fitting a Kaplan-Meier to Monte-Carlo samples. + self._sf_model = ConvolvedSurvival(reliabilities) + self.model = None + else: + # For k >= 2 the lifetime is not a simple sum (it depends on the + # order in which components fail), so fall back to the + # Monte-Carlo + Kaplan-Meier approximation. + x_random = self.random(n_sims) + self.model = KaplanMeier.fit(x_random, set_lower_limit=lower) + self._sf_model = None def random(self, size): if self.k == 1: @@ -69,7 +79,11 @@ def mean(self, N=10_000): return self.random(N).mean() def sf(self, *args, **kwargs): + if self._sf_model is not None: + return self._sf_model.sf(*args, **kwargs) return self.model.sf(*args, **kwargs) def ff(self, *args, **kwargs): + if self._sf_model is not None: + return self._sf_model.ff(*args, **kwargs) return self.model.ff(*args, **kwargs) diff --git a/repyability/tests/test_rbd_numerical_convolution.py b/repyability/tests/test_rbd_numerical_convolution.py new file mode 100644 index 0000000..992670f --- /dev/null +++ b/repyability/tests/test_rbd_numerical_convolution.py @@ -0,0 +1,76 @@ +""" +Tests the numerical-convolution survival function used for standby nodes +(ConvolvedSurvival), and its use in StandbyModel (k=1) and RepeatedStandbyNode. + +The numerical sf is checked against two closed forms: + - sum of n iid Exponential(rate) == Erlang == Gamma(n, scale=1/rate) + - Exp(rate 1) + Exp(rate 2) == hypoexponential, sf = 2 e^-t - e^-2t +and is verified to be deterministic (no Monte-Carlo), unlike the previous +Kaplan-Meier fit. +""" + +import numpy as np +import pytest +from surpyval import Exponential, Gamma, Weibull + +from repyability.rbd.numerical_convolution import ConvolvedSurvival +from repyability.rbd.repeated_standby_node import RepeatedStandbyNode +from repyability.rbd.standby_node import StandbyModel + +TOL = dict(rel=1e-2, abs=1e-4) + + +def test_convolution_matches_erlang(): + exp = Exponential.from_params([1]) + conv = ConvolvedSurvival([exp, exp, exp]) + gamma = Gamma.from_params([3.0, 1.0]) + for t in [0.5, 1, 3, 5, 10, 15]: + assert conv.sf(t) == pytest.approx(gamma.sf(t), **TOL) + assert conv.ff(t) == pytest.approx(gamma.ff(t), **TOL) + assert conv.mean() == pytest.approx(3.0, rel=1e-2) + + +def test_convolution_matches_hypoexponential(): + conv = ConvolvedSurvival( + [Exponential.from_params([1]), Exponential.from_params([2])] + ) + for t in [0.5, 1, 2, 4, 8]: + expected = 2 * np.exp(-t) - np.exp(-2 * t) + assert conv.sf(t) == pytest.approx(expected, **TOL) + + +def test_convolution_array_input(): + exp = Exponential.from_params([1]) + conv = ConvolvedSurvival([exp, exp]) + x = np.array([1.0, 2.0, 3.0]) + out = conv.sf(x) + assert out.shape == x.shape + gamma = Gamma.from_params([2.0, 1.0]) + assert out == pytest.approx(gamma.sf(x), **TOL) + + +def test_convolution_single_model_is_identity(): + w = Weibull.from_params([10, 2]) + conv = ConvolvedSurvival([w]) + for t in [1, 5, 10, 20]: + assert conv.sf(t) == pytest.approx(w.sf(t), **TOL) + + +def test_standby_model_k1_sf_is_exact_and_deterministic(): + exp = Exponential.from_params([1]) + standby = StandbyModel([exp, exp, exp], k=1) + standby_again = StandbyModel([exp, exp, exp], k=1) + gamma = Gamma.from_params([3.0, 1.0]) + for t in [1, 3, 10]: + assert standby.sf(t) == pytest.approx(gamma.sf(t), **TOL) + # Deterministic: a second build gives identical values (no sampling). + assert standby.sf(t) == standby_again.sf(t) + + +def test_repeated_standby_sf_matches_erlang(): + exp = Exponential.from_params([1]) + node = RepeatedStandbyNode(exp, 3) + gamma = Gamma.from_params([3.0, 1.0]) + for t in [1, 3, 10]: + assert node.sf(t) == pytest.approx(gamma.sf(t), **TOL) + assert node.ff(t) == pytest.approx(gamma.ff(t), **TOL) From 29c8f11b7059e4f6e7323de6abaef9c0a47edf6e Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 17 Jun 2026 10:57:08 +0000 Subject: [PATCH 7/9] Add switching probability to cold-standby nodes Models imperfect switching (failure on demand) for cold standby. Each switch onto the next spare succeeds with a given probability, so the lifetime is a mixture of partial sums: run the primary; if its switch works, run the next component too; and so on. The survival function is the corresponding weighted mixture of the partial-sum convolutions. - ConvolvedSurvival: new switching_probability argument (a scalar applied to every switch, or a per-switch sequence of length n-1). It builds the partial-sum survival functions it already computes incrementally and returns their mixture; default 1.0 reduces exactly to the full convolution. Helpers switch_success_probs()/is_perfect_switching() and the mixture weights are exposed/computed for reuse. - StandbyModel: switching_probability argument for k=1 (passed to the convolution, and random() now also reflects switching via a per-switch Bernoulli). For k>=2 a non-perfect switching probability raises NotImplementedError rather than being silently ignored. - RepeatedStandbyNode: switching_probability argument applied to sf/ff (its random()/mean() still reflect perfect switching, as before). Adds test_rbd_switching_probability.py, verifying against closed forms: single-switch S(t)=e^-t(1+pt), zero-switching = primary only, perfect = full convolution, mean 1+p+p^2 for three components, a per-switch sequence, weight normalisation, the k>=2 NotImplementedError, and input validation. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_01D3uxEk7qdLTcRhR8Avnk5M --- repyability/rbd/numerical_convolution.py | 132 +++++++++++++++--- repyability/rbd/repeated_standby_node.py | 20 ++- repyability/rbd/standby_node.py | 60 ++++++-- .../tests/test_rbd_switching_probability.py | 96 +++++++++++++ 4 files changed, 271 insertions(+), 37 deletions(-) create mode 100644 repyability/tests/test_rbd_switching_probability.py diff --git a/repyability/rbd/numerical_convolution.py b/repyability/rbd/numerical_convolution.py index 24fdc84..66cac8c 100644 --- a/repyability/rbd/numerical_convolution.py +++ b/repyability/rbd/numerical_convolution.py @@ -1,11 +1,13 @@ """ Numerical convolution of independent lifetimes. -A cold-standby arrangement (with perfect switching) fails after the *sum* of -its components' lifetimes, so the survival function of the arrangement is the -convolution of the components' distributions. This module computes that -survival function numerically -- deterministically and quickly -- as a robust -alternative to estimating it from Monte-Carlo samples with a Kaplan-Meier fit. +A cold-standby arrangement fails after the *sum* of its components' lifetimes +(when switching is perfect), so the survival function of the arrangement is +the convolution of the components' distributions. With imperfect switching it +is a mixture of partial sums, weighted by how many switches succeed. This +module computes that survival function numerically -- deterministically and +quickly -- as a robust alternative to estimating it from Monte-Carlo samples +with a Kaplan-Meier fit. """ import numpy as np @@ -41,19 +43,90 @@ def _density_on_grid(model, t: np.ndarray) -> np.ndarray: return np.nan_to_num(pdf, nan=0.0, posinf=0.0, neginf=0.0) -class ConvolvedSurvival: - """Survival function of a sum of independent lifetimes. +def switch_success_probs(switching_probability, n) -> list: + """Normalise a switching_probability into a list of (n-1) per-switch + success probabilities (the switches into components 2..n). + + switching_probability is either a scalar (same probability for every + switch) or a sequence of length n-1 (one success probability per switch). + """ + if n <= 1: + return [] + if np.isscalar(switching_probability): + probs = [float(switching_probability)] * (n - 1) + else: + probs = [float(p) for p in switching_probability] + if len(probs) != n - 1: + raise ValueError( + "switching_probability sequence must have length " + f"{n - 1} (one per switch), got {len(probs)}" + ) + for p in probs: + if not (0.0 <= p <= 1.0): + raise ValueError("switching probabilities must be in [0, 1]") + return probs + + +def is_perfect_switching(switching_probability) -> bool: + """True if every switch succeeds with probability 1.""" + if np.isscalar(switching_probability): + return float(switching_probability) == 1.0 + return all(float(p) == 1.0 for p in switching_probability) + + +def _switching_weights(switching_probability, n) -> list: + """Mixture weights for a cold-standby chain with imperfect switching. + + Returns a list ``w`` of length n where ``w[k]`` is the probability that + exactly the first (k+1) components run: switches 1..k succeeded and switch + (k+1) failed (or, for the last, every switch succeeded). + """ + if n == 1: + return [1.0] + probs = switch_success_probs(switching_probability, n) + weights = [] + prefix = 1.0 + for p in probs: + # This switch fails (after all earlier ones succeeded): stop here. + weights.append(prefix * (1.0 - p)) + prefix *= p + # Every switch succeeded: all n components run. + weights.append(prefix) + return weights + + +def _sf_from_pdf(pdf: np.ndarray, t: np.ndarray) -> np.ndarray: + """Survival function on the grid from a (possibly un-normalised) density, + normalising away discretisation drift.""" + cdf = cumulative_trapezoid(pdf, t, initial=0.0) + total = cdf[-1] + if total <= 0.0: + return np.ones_like(t) + return np.clip(1.0 - cdf / total, 0.0, 1.0) - Computes the survival function of ``X_1 + X_2 + ... + X_n`` (the lifetime - of a cold-standby arrangement with perfect switching) by numerically + +class ConvolvedSurvival: + """Survival function of a cold-standby arrangement (sum of lifetimes). + + With perfect switching the arrangement fails after the sum of its + components' lifetimes, whose survival function is the convolution of the + component distributions. With imperfect switching, each switch onto the + next spare succeeds only with some probability, so the lifetime is a + mixture of partial sums (run the first component; if its switch works, run + the second too; and so on). This computes that mixture by numerically convolving the component densities on a fine time grid. sf()/ff() then interpolate the pre-computed grid, so they are fast and deterministic. Parameters ---------- models : sequence - The component lifetime distributions. Each must expose df() (density), - sf() (survival) and mean(). + The component lifetime distributions, in standby order (primary + first). Each must expose df() (density), sf() (survival) and mean(). + switching_probability : float or sequence, optional + Probability that a switch onto the next spare succeeds. A scalar + applies to every switch; a sequence gives one probability per switch + (length len(models) - 1). By default 1.0 (perfect switching), which + reduces to the plain convolution. n_points : int, optional Number of grid points, by default 100_001. More points give more accuracy at the cost of construction time. @@ -61,28 +134,47 @@ class ConvolvedSurvival: Survival threshold used to bound the time grid, by default 1e-10. """ - def __init__(self, models, n_points: int = 100_001, eps: float = 1e-10): + def __init__( + self, + models, + switching_probability=1.0, + n_points: int = 100_001, + eps: float = 1e-10, + ): models = list(models) - if len(models) == 0: + n = len(models) + if n == 0: raise ValueError("Need at least one model to convolve.") + weights = _switching_weights(switching_probability, n) + self.switching_weights = weights + # Bound the grid by the sum of the components' effective upper times - # (the support of the sum is contained in [0, sum of supports]). + # (the support of the full sum is contained in [0, sum of supports]). upper = sum(_upper_time(model, eps) for model in models) t = np.linspace(0.0, upper, n_points) dt = t[1] - t[0] - # Convolve the component densities to get the density of the sum. + # Incrementally convolve to get the density of each partial sum + # T_1 + ... + T_k (k = 1..n). + partial_pdfs = [] pdf = _density_on_grid(models[0], t) + partial_pdfs.append(pdf) for model in models[1:]: pdf = fftconvolve(pdf, _density_on_grid(model, t))[:n_points] * dt + partial_pdfs.append(pdf) + + # Survival function is the weighted mixture of the partial-sum survival + # functions (zero-weight partials are skipped, so perfect switching + # only evaluates the full convolution). + sf = np.zeros(n_points) + for weight, partial_pdf in zip(weights, partial_pdfs): + if weight == 0.0: + continue + sf += weight * _sf_from_pdf(partial_pdf, t) - # Integrate to the CDF, normalising away discretisation drift so the - # total probability is exactly one, then form the survival function. - cdf = cumulative_trapezoid(pdf, t, initial=0.0) - cdf = cdf / cdf[-1] self._t = t - self._sf = np.clip(1.0 - cdf, 0.0, 1.0) + self._sf = np.clip(sf, 0.0, 1.0) def sf(self, x): """Survival function at x (1 at/below 0, ~0 beyond the grid).""" diff --git a/repyability/rbd/repeated_standby_node.py b/repyability/rbd/repeated_standby_node.py index 89f4bc5..b04a7c7 100644 --- a/repyability/rbd/repeated_standby_node.py +++ b/repyability/rbd/repeated_standby_node.py @@ -5,14 +5,26 @@ class RepeatedStandbyNode: - def __init__(self, model, repeats, N=10_000, lower=-np.inf): + def __init__( + self, + model, + repeats, + N=10_000, + lower=-np.inf, + switching_probability=1.0, + ): self.model = model self.repeats = repeats + self.switching_probability = switching_probability # Repeated cold standby: the lifetime is the sum of `repeats` - # independent copies of `model`. Its survival function is their - # convolution, computed deterministically here. - self._sf_model = ConvolvedSurvival([model] * repeats) + # independent copies of `model` (a mixture of partial sums under + # imperfect switching). Its survival function is computed + # deterministically here. Note: random()/mean() below still reflect + # perfect switching. + self._sf_model = ConvolvedSurvival( + [model] * repeats, switching_probability=switching_probability + ) # A Kaplan-Meier fit is retained only for random()/mean(). x_random = self.random(N) diff --git a/repyability/rbd/standby_node.py b/repyability/rbd/standby_node.py index 3b15400..9651f5b 100644 --- a/repyability/rbd/standby_node.py +++ b/repyability/rbd/standby_node.py @@ -3,11 +3,22 @@ import numpy as np from surpyval import KaplanMeier -from .numerical_convolution import ConvolvedSurvival +from .numerical_convolution import ( + ConvolvedSurvival, + is_perfect_switching, + switch_success_probs, +) class StandbyModel: - def __init__(self, reliabilities, k=1, n_sims=10_000, lower=-np.inf): + def __init__( + self, + reliabilities, + k=1, + n_sims=10_000, + lower=-np.inf, + switching_probability=1.0, + ): if k > len(reliabilities): raise ValueError( "Must be more nodes in the standby arrangement" @@ -17,31 +28,54 @@ def __init__(self, reliabilities, k=1, n_sims=10_000, lower=-np.inf): self.k = k self.N = len(reliabilities) self.n_sims = n_sims + self.switching_probability = switching_probability if k == 1: # Cold standby (k=1): the lifetime is the sum of the components' - # lifetimes, whose survival function is their convolution. Compute - # it deterministically by numerical convolution rather than - # fitting a Kaplan-Meier to Monte-Carlo samples. - self._sf_model = ConvolvedSurvival(reliabilities) + # lifetimes (or a mixture of partial sums under imperfect + # switching), whose survival function is computed deterministically + # by numerical convolution rather than from Monte-Carlo samples. + self._sf_model = ConvolvedSurvival( + reliabilities, switching_probability=switching_probability + ) self.model = None else: # For k >= 2 the lifetime is not a simple sum (it depends on the # order in which components fail), so fall back to the - # Monte-Carlo + Kaplan-Meier approximation. + # Monte-Carlo + Kaplan-Meier approximation. Imperfect switching is + # not modelled in that case yet. + if not is_perfect_switching(switching_probability): + raise NotImplementedError( + "switching_probability is only supported for k=1 cold" + " standby; for k>=2 leave it at 1.0 (perfect switching)." + ) x_random = self.random(n_sims) self.model = KaplanMeier.fit(x_random, set_lower_limit=lower) self._sf_model = None def random(self, size): if self.k == 1: - # If k is only one for the standby node the - # reliability can be estimated from the sum - # of each of the components in the node. + # If k is only one for the standby node the reliability can be + # estimated from the sum of each of the components in the node, # i.e. it will fail after all of them fail. - x_random = np.zeros(size) - for model in self.reliabilities: - x_random += model.random(size) + x_random = np.asarray( + self.reliabilities[0].random(size), dtype=float + ) + if is_perfect_switching(self.switching_probability): + for model in self.reliabilities[1:]: + x_random = x_random + model.random(size) + else: + # Under imperfect switching a spare only contributes if every + # switch up to and including its own has succeeded. + probs = switch_success_probs( + self.switching_probability, self.N + ) + running = np.ones(size, dtype=bool) + for model, p in zip(self.reliabilities[1:], probs): + running = running & (np.random.random(size) < p) + x_random = x_random + np.where( + running, model.random(size), 0.0 + ) else: # If k are required to continue then a random draw needs diff --git a/repyability/tests/test_rbd_switching_probability.py b/repyability/tests/test_rbd_switching_probability.py new file mode 100644 index 0000000..d63c131 --- /dev/null +++ b/repyability/tests/test_rbd_switching_probability.py @@ -0,0 +1,96 @@ +""" +Tests imperfect switching for cold-standby nodes. + +Under imperfect switching the lifetime is a mixture of partial sums, weighted +by how many switches succeed. The key checks use the closed form for a single +switch between two Exp(1) components with switch probability p: + + S(t) = (1 - p) e^-t + p * Erlang(2,1).sf(t) = e^-t (1 + p t) + +and the corresponding mean for three Exp(1) components, E[T] = 1 + p + p^2. +""" + +import numpy as np +import pytest +from surpyval import Exponential + +from repyability.rbd.numerical_convolution import ConvolvedSurvival +from repyability.rbd.repeated_standby_node import RepeatedStandbyNode +from repyability.rbd.standby_node import StandbyModel + +TOL = dict(rel=1e-2, abs=1e-4) +EXP = Exponential.from_params([1]) + + +def test_single_switch_closed_form(): + for p in [0.0, 0.3, 0.7, 1.0]: + conv = ConvolvedSurvival([EXP, EXP], switching_probability=p) + for t in [0.5, 1, 3, 6]: + expected = np.exp(-t) * (1 + p * t) + assert conv.sf(t) == pytest.approx(expected, **TOL) + + +def test_perfect_switching_matches_default(): + a = ConvolvedSurvival([EXP, EXP, EXP], switching_probability=1.0) + b = ConvolvedSurvival([EXP, EXP, EXP]) + for t in [1, 3, 7]: + assert a.sf(t) == pytest.approx(b.sf(t), **TOL) + + +def test_zero_switching_is_primary_only(): + conv = ConvolvedSurvival([EXP, EXP, EXP], switching_probability=0.0) + for t in [0.5, 1, 3]: + assert conv.sf(t) == pytest.approx(np.exp(-t), **TOL) + + +def test_mean_three_components_closed_form(): + # E[T] = (1-p)*1 + p(1-p)*2 + p^2*3 = 1 + p + p^2 + for p in [0.0, 0.4, 0.9, 1.0]: + conv = ConvolvedSurvival([EXP, EXP, EXP], switching_probability=p) + assert conv.mean() == pytest.approx(1 + p + p**2, rel=1e-2) + + +def test_per_switch_sequence(): + # switches [1.0, 0.0]: first always works, second never -> exactly the + # first two components run -> Erlang(2, 1). + conv = ConvolvedSurvival([EXP, EXP, EXP], switching_probability=[1.0, 0.0]) + erlang2 = ConvolvedSurvival([EXP, EXP]) + for t in [0.5, 1, 3]: + assert conv.sf(t) == pytest.approx(erlang2.sf(t), **TOL) + + +def test_switching_weights_sum_to_one(): + for p in [0.0, 0.5, 1.0]: + conv = ConvolvedSurvival([EXP, EXP, EXP], switching_probability=p) + assert sum(conv.switching_weights) == pytest.approx(1.0) + + +def test_standby_model_switching_sf(): + standby = StandbyModel([EXP, EXP], k=1, switching_probability=0.5) + for t in [0.5, 1, 3]: + expected = np.exp(-t) * (1 + 0.5 * t) + assert standby.sf(t) == pytest.approx(expected, **TOL) + + +def test_standby_model_switching_random_mean(): + # random() should also reflect imperfect switching: mean ~ 1 + p (n=2). + standby = StandbyModel([EXP, EXP], k=1, switching_probability=0.5) + assert standby.random(200_000).mean() == pytest.approx(1.5, abs=5e-2) + + +def test_repeated_standby_switching_sf(): + node = RepeatedStandbyNode(EXP, 2, switching_probability=0.5) + for t in [0.5, 1, 3]: + assert node.sf(t) == pytest.approx(np.exp(-t) * (1 + 0.5 * t), **TOL) + + +def test_standby_k2_switching_not_implemented(): + with pytest.raises(NotImplementedError): + StandbyModel([EXP, EXP, EXP], k=2, switching_probability=0.5) + + +def test_invalid_switching_probability(): + with pytest.raises(ValueError): + ConvolvedSurvival([EXP, EXP], switching_probability=1.5) + with pytest.raises(ValueError): + ConvolvedSurvival([EXP, EXP, EXP], switching_probability=[0.5]) From 363bfe0cfa123504b74c9da058f8be536cfefb94 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 17 Jun 2026 11:10:21 +0000 Subject: [PATCH 8/9] Fix RepeatedStandbyNode random/mean (and make them deterministic) random() previously sampled from a Kaplan-Meier fit of the summed lifetime and then re-summed `repeats` of those draws, overcounting by a factor of ~repeats (e.g. a 3-fold standby of Exp(1) reported a mean of ~9 instead of 3). It was also the source of intermittent -inf failures. random() now sums `repeats` independent draws from the base model directly (respecting switching_probability via a per-switch Bernoulli), and mean() returns the exact value from the convolution (E[T] = integral of sf), so both are correct and deterministic. The Kaplan-Meier fit is dropped; the N and lower arguments are kept (unused) for backwards compatibility. Updates test_rbd_repeated_standby::test_mean to assert the correct mean (3, not the previous buggy 9). The standby tests are now deterministic (no more flakiness) and run in a fraction of the time. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_01D3uxEk7qdLTcRhR8Avnk5M --- repyability/rbd/repeated_standby_node.py | 44 +++++++++++++------ .../tests/test_rbd_repeated_standby.py | 4 +- 2 files changed, 33 insertions(+), 15 deletions(-) diff --git a/repyability/rbd/repeated_standby_node.py b/repyability/rbd/repeated_standby_node.py index b04a7c7..4b38825 100644 --- a/repyability/rbd/repeated_standby_node.py +++ b/repyability/rbd/repeated_standby_node.py @@ -1,7 +1,10 @@ import numpy as np -from surpyval import KaplanMeier -from .numerical_convolution import ConvolvedSurvival +from .numerical_convolution import ( + ConvolvedSurvival, + is_perfect_switching, + switch_success_probs, +) class RepeatedStandbyNode: @@ -13,6 +16,8 @@ def __init__( lower=-np.inf, switching_probability=1.0, ): + # N and lower are kept for backwards compatibility (a Kaplan-Meier fit + # was previously made here); they are no longer used. self.model = model self.repeats = repeats self.switching_probability = switching_probability @@ -20,22 +25,35 @@ def __init__( # Repeated cold standby: the lifetime is the sum of `repeats` # independent copies of `model` (a mixture of partial sums under # imperfect switching). Its survival function is computed - # deterministically here. Note: random()/mean() below still reflect - # perfect switching. + # deterministically by numerical convolution. self._sf_model = ConvolvedSurvival( [model] * repeats, switching_probability=switching_probability ) - # A Kaplan-Meier fit is retained only for random()/mean(). - x_random = self.random(N) - self.model = KaplanMeier.fit(x_random, set_lower_limit=lower) - def random(self, size): - randoms = self.model.random((self.repeats, size)) - return randoms.sum(axis=0) - - def mean(self, N=1000): - return self.random(N).mean() + # Sum of `repeats` independent draws from the base model. Under + # imperfect switching a spare only contributes if every switch up to + # and including its own has succeeded. + x_random = np.asarray(self.model.random(size), dtype=float) + if is_perfect_switching(self.switching_probability): + for _ in range(self.repeats - 1): + x_random = x_random + self.model.random(size) + else: + probs = switch_success_probs( + self.switching_probability, self.repeats + ) + running = np.ones(size, dtype=bool) + for p in probs: + running = running & (np.random.random(size) < p) + x_random = x_random + np.where( + running, self.model.random(size), 0.0 + ) + return x_random + + def mean(self, *args, **kwargs): + # Exact, deterministic mean from the convolution (E[T] = integral of + # the survival function). + return self._sf_model.mean() def sf(self, *args, **kwargs): return self._sf_model.sf(*args, **kwargs) diff --git a/repyability/tests/test_rbd_repeated_standby.py b/repyability/tests/test_rbd_repeated_standby.py index 89dbd23..3ff03e6 100644 --- a/repyability/tests/test_rbd_repeated_standby.py +++ b/repyability/tests/test_rbd_repeated_standby.py @@ -27,9 +27,9 @@ def test_random(): def test_mean(): + # Cold standby of 3 iid Exp(1): mean is the sum of the means = 3. node = RepeatedStandbyNode(exp_model, 3, N=20_000) - mean = node.mean() - assert pytest.approx(mean, rel=1e-1) == 3 * 3 + assert pytest.approx(node.mean(), rel=1e-2) == 3 def test_sf(): From 0fdc195f59e1a9ad40d206d4a6a2e1508fd2168f Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 17 Jun 2026 12:30:52 +0000 Subject: [PATCH 9/9] Exact closed form for identical-exponential cold standby When every standby component is an Exponential with the same rate (and switching is perfect), the k-out-of-n cold standby lifetime is exactly Erlang(N-k+1, k*rate): while k units operate the failure rate is k*rate, and by memorylessness the inter-failure times are i.i.d. Exponential(k*rate). - StandbyModel now detects identical exponential units and uses that closed form (via scipy.stats.gamma) for any k -- deterministic and exact, replacing the Monte-Carlo + Kaplan-Meier fit for k>=2 and the convolution for k=1 in this case. Non-exponential or mixed-rate inputs fall back to the existing convolution (k=1) / Monte-Carlo (k>=2) paths. - mean() now returns the exact value from the analytic model when available (exponential closed form or convolution), only falling back to the Monte-Carlo estimate for the k>=2 general case. Adds test_rbd_exponential_standby.py: checks the closed form against scipy's gamma and surpyval's Gamma for k=1 and k=2, the k=N min-exponential limit, determinism, agreement (in mean) with the priority-queue simulation, and that mixed-rate inputs use the convolution path instead. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_01D3uxEk7qdLTcRhR8Avnk5M --- repyability/rbd/standby_node.py | 68 +++++++++++++++-- .../tests/test_rbd_exponential_standby.py | 76 +++++++++++++++++++ 2 files changed, 139 insertions(+), 5 deletions(-) create mode 100644 repyability/tests/test_rbd_exponential_standby.py diff --git a/repyability/rbd/standby_node.py b/repyability/rbd/standby_node.py index 9651f5b..f96eb18 100644 --- a/repyability/rbd/standby_node.py +++ b/repyability/rbd/standby_node.py @@ -1,6 +1,7 @@ from queue import PriorityQueue import numpy as np +from scipy.stats import gamma as _gamma from surpyval import KaplanMeier from .numerical_convolution import ( @@ -10,6 +11,50 @@ ) +def _identical_exponential_rate(models): + """If every model is an Exponential with the same rate, return that rate; + otherwise return None. The rate is taken as 1 / mean.""" + rate = None + for model in models: + dist = getattr(model, "dist", None) + if dist is None or getattr(dist, "name", "") != "Exponential": + return None + mean = float(model.mean()) + if mean <= 0.0: + return None + this_rate = 1.0 / mean + if rate is None: + rate = this_rate + elif not np.isclose(this_rate, rate): + return None + return rate + + +class _ExponentialStandbySurvival: + """Exact survival function of an identical-exponential k-out-of-n cold + standby arrangement. + + With N identical units of rate ``rate``, k operating at a time, the system + fails at the (N-k+1)-th failure. While k units operate the failure rate is + ``k*rate``, and by the memorylessness of the exponential the inter-failure + times are i.i.d. Exponential(k*rate). Hence the lifetime is exactly + Erlang(N-k+1, k*rate) -- deterministic, for any k. + """ + + def __init__(self, rate, n_units, k): + self.shape = n_units - k + 1 # number of inter-failure gaps + self.rate = k * rate # failure rate while k units operate + + def sf(self, x): + return _gamma.sf(x, a=self.shape, scale=1.0 / self.rate) + + def ff(self, x): + return _gamma.cdf(x, a=self.shape, scale=1.0 / self.rate) + + def mean(self, *args, **kwargs): + return self.shape / self.rate + + class StandbyModel: def __init__( self, @@ -30,7 +75,14 @@ def __init__( self.n_sims = n_sims self.switching_probability = switching_probability - if k == 1: + rate = _identical_exponential_rate(reliabilities) + if rate is not None and is_perfect_switching(switching_probability): + # Identical exponential units: the cold standby lifetime is exactly + # Erlang(N-k+1, k*rate) for any k, by the memorylessness of the + # exponential. Use that closed form directly. + self._sf_model = _ExponentialStandbySurvival(rate, self.N, k) + self.model = None + elif k == 1: # Cold standby (k=1): the lifetime is the sum of the components' # lifetimes (or a mixture of partial sums under imperfect # switching), whose survival function is computed deterministically @@ -40,10 +92,11 @@ def __init__( ) self.model = None else: - # For k >= 2 the lifetime is not a simple sum (it depends on the - # order in which components fail), so fall back to the - # Monte-Carlo + Kaplan-Meier approximation. Imperfect switching is - # not modelled in that case yet. + # For k >= 2 with general (non-exponential) lifetimes the lifetime + # is not a simple sum (it depends on the order in which components + # fail), so fall back to the Monte-Carlo + Kaplan-Meier + # approximation. Imperfect switching is not modelled in that case + # yet. if not is_perfect_switching(switching_probability): raise NotImplementedError( "switching_probability is only supported for k=1 cold" @@ -110,6 +163,11 @@ def random(self, size): return x_random def mean(self, N=10_000): + # Use the exact/deterministic mean when an analytic survival model is + # available (exponential closed form or convolution); otherwise fall + # back to the Monte-Carlo estimate. + if self._sf_model is not None: + return self._sf_model.mean() return self.random(N).mean() def sf(self, *args, **kwargs): diff --git a/repyability/tests/test_rbd_exponential_standby.py b/repyability/tests/test_rbd_exponential_standby.py new file mode 100644 index 0000000..1fba2d9 --- /dev/null +++ b/repyability/tests/test_rbd_exponential_standby.py @@ -0,0 +1,76 @@ +""" +Tests the exact closed form used when standby components are identical +Exponentials. + +For N identical exponential units of rate lambda with k operating, the cold +standby lifetime is exactly Erlang(N-k+1, k*lambda): while k units run the +failure rate is k*lambda, and by memorylessness the inter-failure times are +i.i.d. Exponential(k*lambda). This is checked against scipy's gamma, against +the surpyval Gamma, and (in distribution) against the Monte-Carlo simulation. +""" + +import numpy as np +import pytest +from scipy.stats import gamma +from surpyval import Exponential, Gamma + +from repyability.rbd.standby_node import StandbyModel + +TOL = dict(rel=1e-2, abs=1e-4) +EXP1 = Exponential.from_params([1]) + + +def test_k1_exponential_is_erlang(): + # N=3, k=1 -> Erlang(3, 1) == Gamma(3, 1) + standby = StandbyModel([EXP1, EXP1, EXP1], k=1) + gam = Gamma.from_params([3.0, 1.0]) + for t in [0.5, 1, 3, 10]: + assert standby.sf(t) == pytest.approx(gam.sf(t), **TOL) + assert standby.ff(t) == pytest.approx(gam.ff(t), **TOL) + assert standby.mean() == pytest.approx(3.0, rel=1e-9) + + +def test_k2_exponential_is_erlang(): + # N=4, k=2 -> fails at the 3rd failure, rate 2 while operating + # -> Erlang(3, 2) == Gamma(a=3, scale=1/2), mean 3/2 + standby = StandbyModel([EXP1] * 4, k=2) + for t in [0.5, 1, 2, 5]: + assert standby.sf(t) == pytest.approx( + gamma.sf(t, a=3, scale=0.5), **TOL + ) + assert standby.mean() == pytest.approx(1.5, rel=1e-9) + + +def test_k_equals_n_is_min_exponential(): + # k=N: all units operate, system fails at the first failure + # -> Exp(N*rate) == Erlang(1, N*rate) + standby = StandbyModel([EXP1] * 4, k=4) + for t in [0.25, 1, 2]: + assert standby.sf(t) == pytest.approx(np.exp(-4 * t), **TOL) + + +def test_exponential_closed_form_is_deterministic(): + a = StandbyModel([EXP1] * 5, k=2) + b = StandbyModel([EXP1] * 5, k=2) + for t in [1, 2, 5]: + assert a.sf(t) == b.sf(t) # exact, no sampling + + +def test_k2_exponential_matches_simulation(): + # The closed form should agree (in mean) with the priority-queue sim. + exp = Exponential.from_params([2.0]) # rate 2 + standby = StandbyModel([exp] * 5, k=2) # Erlang(4, 4), mean 1.0 + assert standby.mean() == pytest.approx(1.0, rel=1e-9) + sim_mean = standby.random(50_000).mean() + assert sim_mean == pytest.approx(1.0, rel=3e-2) + + +def test_non_identical_rates_use_convolution_not_erlang(): + # Different rates -> not the simple Erlang; k=1 still exact via the + # convolution path: Exp(1) + Exp(2) is hypoexponential, 2 e^-t - e^-2t. + standby = StandbyModel( + [Exponential.from_params([1]), Exponential.from_params([2])], k=1 + ) + for t in [0.5, 1, 2]: + expected = 2 * np.exp(-t) - np.exp(-2 * t) + assert standby.sf(t) == pytest.approx(expected, **TOL)