Source code for epipack.numeric_epi_models

"""
Provides an API to define  epidemiological models.
"""

import numpy as np
import scipy.sparse as sprs

import warnings

from epipack.integrators import (
        IntegrationMixin,
        time_leap_newton,
        time_leap_ivp,
    )

from epipack.process_conversions import (
        processes_to_events,
        transition_processes_to_events,
        fission_processes_to_events,
        fusion_processes_to_events,
        transmission_processes_to_events,
    )

from scipy.optimize import newton
from scipy.integrate import quad

[docs]def custom_choice(p): """ Return an index of normalized probability vector ``p`` with probability equal to the corresponding entry in ``p``. """ return np.argmin(np.cumsum(p)<np.random.rand())
[docs]class ConstantBirthRate(): """ Will be used as a function of time ``t`` and state ``y``, returning a rate value. Parameters ---------- rate : float Constant rate value """ def __init__(self, rate): self.rate = rate def __call__(self, t, y): return self.rate
[docs]class DynamicBirthRate(): """ Will be used as a function of time ``t`` and state ``y``, returning a rate value. Parameters ---------- rate : function Function of time ``t`` and state ``y`` """ def __init__(self, rate): self.rate = rate def __call__(self, t, y): return self.rate(t,y)
[docs]class ConstantLinearRate: """ Will be used as a function of time ``t`` and state ``y``, returning a rate value. Parameters ---------- rate : float Constant rate value comp0 : int Index of the corresponding reacting component. The incidence of this component will be multiplied with the value of ``rate``. """ def __init__(self, rate, comp0): self.rate = rate self.comp0 = comp0 def __call__(self, t, y): return self.rate * y[self.comp0]
[docs]class DynamicLinearRate: """ Will be used as a function of time ``t`` and state ``y``, returning a rate value. Parameters ---------- rate : function Function of time ``t`` and state ``y`` comp0 : int Index of the corresponding reacting component. The incidence of this component will be multiplied with the value of ``rate``. """ def __init__(self, rate, comp0): self.rate = rate self.comp0 = comp0 def __call__(self, t, y): return self.rate(t,y) * y[self.comp0]
[docs]class ConstantQuadraticRate: """ Will be used as a function of time ``t`` and state ``y``, returning a rate value. Parameters ---------- rate : float Constant rate value comp0 : int Index of one of the reacting components. The incidence of this component will be multiplied with the value of ``rate``. comp1 : int Index of the other reacting component. The incidence of this component will be multiplied with the value of ``rate``. """ def __init__(self, rate, comp0, comp1): self.rate = rate self.comp0 = comp0 self.comp1 = comp1 def __call__(self, t, y): return self.rate * y[self.comp0] * y[self.comp1]
[docs]class DynamicQuadraticRate: """ Will be used as a function of time ``t`` and state ``y``, returning a rate value. Parameters ---------- rate : function Function of time ``t`` and state ``y`` comp0 : int Index of one of the reacting components. The incidence of this component will be multiplied with the value of ``rate``. comp1 : int Index of the other reacting component. The incidence of this component will be multiplied with the value of ``rate``. """ def __init__(self, rate, comp0, comp1): self.rate = rate self.comp0 = comp0 self.comp1 = comp1 def __call__(self, t, y): return self.rate(t,y) * y[self.comp0] * y[self.comp1]
[docs]class EpiModel(IntegrationMixin): """ A general class to define a standard mean-field compartmental epidemiological model, based on reaction events. Parameters ---------- compartments : :obj:`list` of :obj:`string` A list containing compartment strings. initial_population_size : float, default = 1.0 The population size at :math:`t = 0`. correct_for_dynamical_population_size : bool, default = False If ``True``, the quadratic coupling terms will be divided by the population size. integral_solver : str, default = 'solve_ivp' Whether or not to use the initial-value solver ``solve_ivp``. to determine a time leap for time-varying rates. If not ``'solve_ivp'``, a Newton-Raphson method will be used on the upper bound of a quad-integrator. Attributes ---------- compartments : list A list containing strings or hashable types that describe each compartment, (e.g. "S", "I", etc.). compartment_ids: dict Maps compartments to their indices in ``self.compartments``. N_comp : :obj:`int` Number of compartments (including population number) initial_population_size : float The population size at :math:`t = 0`. correct_for_dynamical_population_size : bool If ``True``, the quadratic coupling terms will be divided by the sum of all compartments, otherwise they will be divided by the initial population size. birth_rate_functions : list of ConstantBirthRate or DynamicBirthRate A list of functions that return rate values based on time ``t`` and state vector ``y``. Each entry corresponds to an event update in ``self.birth_event_updates``. birth_event_updates : list of numpy.ndarray A list of vectors. Each entry corresponds to a rate in ``birth_rate_functions`` and quantifies the change in individual counts in the compartments. linear_rate_functions : list of ConstantLinearRate or DynamicLinearRate A list of functions that return rate values based on time ``t`` and state vector ``y``. Each entry corresponds to an event update in ``self.linear_event_updates``. linear_event_updates : list of numpy.ndarray A list of vectors. Each entry corresponds to a rate in ``linear_rate_functions`` and quantifies the change in individual counts in the compartments. quadratic_rate_functions : list of ConstantQuadraticRate or DynamicQuadraticRate A list of functions that return rate values based on time ``t`` and state vector ``y``. Each entry corresponds to an event update in ``self.quadratic_event_updates``. quadratic_event_updates : list of numpy.ndarray A list of vectors. Each entry corresponds to a rate in ``quadratic_rate_functions`` and quantifies the change in individual counts in the compartments. y0 : numpy.ndarray The initial conditions. rates_have_explicit_time_dependence : bool Internal switch that's flipped when a non-constant rate is passed to the model. use_ivp_solver : bool Whether or not to use the initial-value solver to determine a time leap for time-varying rates. If ``False``, a Newton-Raphson method will be used on the upper bound of a quad-integrator. Example ------- .. code:: python >>> epi = EpiModel(["S","I","R"]) >>> print(epi.compartments) [ "S", "I", "R" ] """ def __init__(self, compartments, initial_population_size=1, correct_for_dynamical_population_size=False, integral_solver='solve_ivp', ): self.y0 = None self.compartments = list(compartments) self.compartment_ids = { C: iC for iC, C in enumerate(compartments) } self.N_comp = len(self.compartments) self.initial_population_size = initial_population_size self.correct_for_dynamical_population_size = correct_for_dynamical_population_size self.birth_rate_functions = [] self.birth_event_updates = [] self.linear_rate_functions = [] self.linear_event_updates = [] self.quadratic_rate_functions = [] self.quadratic_event_updates = [] self.birth_events = [] self.linear_events = [] self.quadratic_events = [] self.use_ivp_solver = integral_solver == 'solve_ivp' self.rates_have_explicit_time_dependence = False
[docs] def get_compartment_id(self,C): """Get the integer ID of a compartment ``C``""" return self.compartment_ids[C]
[docs] def get_compartment(self,iC): """Get the compartment, given an integer ID ``iC``""" return self.compartments[iC]
[docs] def set_processes(self, process_list, allow_nonzero_column_sums=False, reset_events=True, ignore_rate_position_checks=False): """ Converts a list of reaction process tuples to event tuples and sets the rates for this model. Parameters ---------- process_list : :obj:`list` of :obj:`tuple` A list containing reaction processes in terms of tuples. .. code:: python [ # transition process ( source_compartment, rate, target_compartment), # transmission process ( coupling_compartment_0, coupling_compartment_1, rate, target_compartment_0, target_ccompartment_1), # fission process ( source_compartment, rate, target_compartment_0, target_ccompartment_1), # fusion process ( source_compartment_0, source_compartment_1, rate, target_compartment), # death process ( source_compartment, rate, None), # birth process ( None, rate, target_compartment), ] allow_nonzero_column_sums : bool, default : False Traditionally, epidemiological models preserve the total population size. If that's not the case, switch off testing for this. reset_events : bool, default : True If this is `True`, reset all events to zero before setting the new ones. ignore_rate_position_checks : bool, default = False This function usually checks whether the rate of a reaction is positioned correctly. You can turn this behavior off for transition, birth, death, and transmission processes. (Useful if you want to define symbolic transmission processes that are compartment-dependent). """ quadratic_events, linear_events = processes_to_events(process_list, self.compartments, ignore_rate_position_checks) self.set_linear_events(linear_events, allow_nonzero_column_sums=allow_nonzero_column_sums, reset_events=reset_events, ) self.set_quadratic_events(quadratic_events, allow_nonzero_column_sums=allow_nonzero_column_sums, reset_events=reset_events, ) return self
def _rate_has_functional_dependency(self,rate): if callable(rate): t = [0,1,2,10000,-10000] y = np.ones(self.N_comp) test = np.array([ rate(_t, y) for _t in t ]) has_time_dependence = not np.all(test == test[0]) self.rates_have_explicit_time_dependence = \ self.rates_have_explicit_time_dependence or has_time_dependence return True else: return False
[docs] def set_linear_events(self, event_list, allow_nonzero_column_sums=False, reset_events=True): r""" Define the linear transition events between compartments. Parameters ========== event_list : :obj:`list` of :obj:`tuple` A list of tuples that contains transition events in the following format: .. code:: python [ ( ("affected_compartment_0",), rate, [ ("affected_compartment_0", dN0), ("affected_compartment_1", dN1), ... ], ), ... ] allow_nonzero_column_sums : :obj:`bool`, default : False Traditionally, epidemiological models preserve the total population size. If that's not the case, switch off testing for this. reset_events : bool, default : True Whether to reset all linear events to zero before converting those. Example ------- For an SEIR model with infectious period ``tau`` and incubation period ``theta``. .. code:: python epi.set_linear_events([ ( ("E",), 1/theta, [ ("E", -1), ("I", +1) ] ), ( ("I",), 1/tau, [ ("I", -1), ("R", +1) ] ), ]) Read as "compartment E reacts with rate :math:`1/\theta` which leads to the decay of one E particle to one I particle." """ if reset_events: birth_rate_functions = [] birth_event_updates = [] linear_rate_functions = [] linear_event_updates = [] birth_events = [] linear_events = [] else: linear_event_updates = list(self.linear_event_updates) birth_event_updates = list(self.birth_event_updates) linear_rate_functions = list(self.linear_rate_functions) birth_rate_functions = list(self.birth_rate_functions) birth_events = list(self.birth_events) linear_events = list(self.linear_events) for acting_compartments, rate, affected_compartments in event_list: dy = np.zeros(self.N_comp) for trg, change in affected_compartments: _t = self.get_compartment_id(trg) dy[_t] += change if acting_compartments[0] is None: if self._rate_has_functional_dependency(rate): this_rate = DynamicBirthRate(rate) else: this_rate = ConstantBirthRate(rate) birth_event_updates.append( dy ) birth_rate_functions.append( this_rate ) birth_events.append((acting_compartments, rate, affected_compartments)) else: _s = self.get_compartment_id(acting_compartments[0]) if self._rate_has_functional_dependency(rate): this_rate = DynamicLinearRate(rate, _s) else: this_rate = ConstantLinearRate(rate, _s) linear_event_updates.append( dy ) linear_rate_functions.append( this_rate ) linear_events.append((acting_compartments, rate, affected_compartments)) if dy.sum() != 0 and not self.correct_for_dynamical_population_size: warnings.warn("This model has processes with a fluctuating "+\ "number of agents. Consider correcting the rates dynamically with "+\ "the attribute correct_for_dynamical_population_size = True") if not allow_nonzero_column_sums and len(linear_rate_functions)>0: _y = np.ones(self.N_comp) test = sum([r(0,_y) * dy for dy, r in zip (linear_event_updates, linear_rate_functions)]) test += sum([r(0,_y) * dy for dy, r in zip (birth_event_updates, birth_rate_functions)]) test_sum = test.sum() if np.abs(test_sum) > 1e-15: warnings.warn("events do not sum to zero for each column:" + str(test_sum)) self.linear_event_updates = linear_event_updates self.linear_rate_functions = linear_rate_functions self.birth_event_updates = birth_event_updates self.birth_rate_functions = birth_rate_functions self.linear_events = linear_events self.birth_events = birth_events return self
[docs] def add_transition_processes(self,process_list): """ Define the linear transition processes between compartments. Parameters ========== process_list : :obj:`list` of :obj:`tuple` A list of tuples that contains transitions events in the following format: .. code:: python [ ( source_compartment, rate, target_compartment ), ... ] Example ======= For an SEIR model. .. code:: python epi.add_transition_processes([ ("E", symptomatic_rate, "I" ), ("I", recovery_rate, "R" ), ]) """ linear_events = transition_processes_to_events(process_list) return self.set_linear_events(linear_events, reset_events=False, allow_nonzero_column_sums=True)
[docs] def add_fission_processes(self,process_list): """ Define linear fission processes between compartments. Parameters ========== process_list : :obj:`list` of :obj:`tuple` A list of tuples that contains fission rates in the following format: .. code:: python [ ("source_compartment", rate, "target_compartment_0", "target_compartment_1" ), ... ] Example ------- For pure exponential growth of compartment `B`. .. code:: python epi.add_fission_processes([ ("B", growth_event, "B", "B" ), ]) """ linear_events = fission_processes_to_events(process_list) return self.set_linear_events(linear_events, reset_events=False, allow_nonzero_column_sums=True)
[docs] def add_fusion_processes(self,process_list): """ Define fusion processes between compartments. Parameters ========== process_list : :obj:`list` of :obj:`tuple` A list of tuples that contains fission rates in the following format: .. code:: python [ ("coupling_compartment_0", "coupling_compartment_1", rate, "target_compartment_0" ), ... ] Example ------- Fusion of reactants "A", and "B" to form "C". .. code:: python epi.add_fusion_processes([ ("A", "B", reaction_rate, "C" ), ]) """ quadratic_events = fusion_processes_to_events(process_list) return self.set_quadratic_events(quadratic_events, reset_events=False, allow_nonzero_column_sums=True)
[docs] def add_transmission_processes(self,process_list): r""" A wrapper to define quadratic process rates through transmission reaction equations. Note that in stochastic network/agent simulations, the transmission rate is equal to a rate per link. For the mean-field ODEs, the rates provided to this function will just be equal to the prefactor of the respective quadratic terms. For instance, if you analyze an SIR system and simulate on a network of mean degree :math:`k_0`, a basic reproduction number :math:`R_0`, and a recovery rate :math:`\mu`, you would define the single link transmission process as .. code:: python ("I", "S", R_0/k_0 * mu, "I", "I") For the mean-field system here, the corresponding reaction equation would read .. code:: python ("I", "S", R_0 * mu, "I", "I") Parameters ---------- process_list : :obj:`list` of :obj:`tuple` A list of tuples that contains transitions rates in the following format: .. code:: python [ ("source_compartment", "target_compartment_initial", rate "source_compartment", "target_compartment_final", ), ... ] Example ------- For an SEIR model. .. code:: python epi.add_transmission_processes([ ("I", "S", +1, "I", "E" ), ]) """ quadratic_events = transmission_processes_to_events(process_list) return self.set_quadratic_events(quadratic_events, reset_events=False, allow_nonzero_column_sums=True)
[docs] def add_quadratic_events(self, event_list, allow_nonzero_column_sums=False): """ Add quadratic events without resetting the existing event terms. See :func:`epipack.numeric_epi_models.EpiModel.set_quadratic_events` for docstring. """ return self.set_quadratic_events(event_list, reset_events=False, allow_nonzero_column_sums=allow_nonzero_column_sums, )
[docs] def add_linear_events(self, event_list, allow_nonzero_column_sums=False): """ Add linear events without resetting the existing event terms. See :func:`epipack.numeric_epi_models.EpiModel.set_linear_events` for docstring. """ return self.set_linear_events(event_list, reset_events=False, allow_nonzero_column_sums=allow_nonzero_column_sums )
[docs] def set_quadratic_events(self, event_list, allow_nonzero_column_sums=False, reset_events=True, ): r""" Define quadratic transition events between compartments. Parameters ---------- event_list : :obj:`list` of :obj:`tuple` A list of tuples that contains transmission events in the following format: .. code:: python [ ( ("coupling_compartment_0", "coupling_compartment_1"), rate, [ ("affected_compartment_0", dN0), ("affected_compartment_1", dN1), ... ], ), ... ] allow_nonzero_column_sums : :obj:`bool`, default : False Traditionally, epidemiological models preserve the total population size. If that's not the case, switch off testing for this. reset_events : bool, default : True Whether to reset all linear events to zero before converting those. Example ------- For an SEIR model with infection rate ``eta``. .. code:: python epi.set_quadratic_events([ ( ("S", "I"), eta, [ ("S", -1), ("E", +1) ] ), ]) Read as "Coupling of *S* and *I* leads to the decay of one *S* particle to one *E* particle with rate :math:`\eta`.". """ if reset_events: quadratic_event_updates = [] quadratic_rate_functions = [] quadratic_events = [] else: quadratic_event_updates = list(self.quadratic_event_updates) quadratic_rate_functions = list(self.quadratic_rate_functions) quadratic_events = list(self.quadratic_events) for coupling_compartments, rate, affected_compartments in event_list: _s0 = self.get_compartment_id(coupling_compartments[0]) _s1 = self.get_compartment_id(coupling_compartments[1]) dy = np.zeros(self.N_comp) for trg, change in affected_compartments: _t = self.get_compartment_id(trg) dy[_t] += change if self._rate_has_functional_dependency(rate): this_rate = DynamicQuadraticRate(rate, _s0, _s1) else: this_rate = ConstantQuadraticRate(rate, _s0, _s1) quadratic_event_updates.append( dy ) quadratic_rate_functions.append( this_rate ) quadratic_events.append( (coupling_compartments, rate, affected_compartments) ) if not allow_nonzero_column_sums and len(quadratic_rate_functions)>0: _y = np.ones(self.N_comp) test = sum([r(0,_y) * dy for dy, r in zip (quadratic_event_updates, quadratic_rate_functions)]) test_sum = test.sum() if np.abs(test_sum) > 1e-15: warnings.warn("events do not sum to zero for each column:" + str(test_sum)) self.quadratic_event_updates = quadratic_event_updates self.quadratic_rate_functions = quadratic_rate_functions self.quadratic_events = quadratic_events return self
[docs] def dydt(self,t,y): """ Compute the current momenta of the epidemiological model. Parameters ---------- t : :obj:`float` Current time y : numpy.ndarray The entries correspond to the compartment frequencies (or counts, depending on population size). """ ynew = sum([r(t,y) * dy for dy, r in zip(self.linear_event_updates, self.linear_rate_functions)]) ynew += sum([r(t,y) * dy for dy, r in zip(self.birth_event_updates, self.birth_rate_functions)]) if self.correct_for_dynamical_population_size: population_size = y.sum() else: population_size = self.initial_population_size ynew += sum([r(t,y)/population_size * dy for dy, r in zip(self.quadratic_event_updates, self.quadratic_rate_functions)]) return ynew
[docs] def get_numerical_dydt(self): """ Return a function that obtains ``t`` and ``y`` as an input and returns ``dydt`` of this system """ return self.dydt
[docs] def get_time_leap_and_proposed_compartment_changes(self, t, current_event_rates = None, get_event_rates = None, get_compartment_changes = None, ): """ For the current event rates, obtain a proposed time leap and concurrent state change vector. This method is needed for stochastic simulations. Parameters ---------- t : float current time current_event_rates : list, default = None A list of constant rate values. Will be ignored if ``self.rates_have_explicit_time_dependence`` is ``True``, which is why ``None`` is a valid value. get_event_rates : function, default = None A function that takes time ``t`` and current state ``y`` as input and computes the rates of all possible events. If ``None``, will attempt to set this to self.get_event_rates(). get_compartment_changes : function, default = None A function that takes computed event rates and returns a random state change with probability proportional to its rate. If ``None``, will attempt to set this to self.get_compartment_changes(). Returns ------- tau : float A time leap. dy : numpy.ndarray A state change vector. """ if get_event_rates is None: get_event_rates = self.get_event_rates if get_compartment_changes is None: get_compartment_changes = self.get_compartment_changes if self.rates_have_explicit_time_dependence: # solve the integral numerically if self.use_ivp_solver: new_t = time_leap_ivp(t, self.y0, get_event_rates) else: new_t = time_leap_newton(t, self.y0, get_event_rates) tau = new_t - t proposed_event_rates = get_event_rates(new_t, self.y0) dy = get_compartment_changes(proposed_event_rates) else: total_event_rate = current_event_rates.sum() tau = np.random.exponential(1/total_event_rate) dy = get_compartment_changes(current_event_rates) return tau, dy
[docs] def get_compartment_changes(self, rates): """ Sample a state change vector with probability proportional to its rate in ``rates``. Needed for stochastic simulations. Parameters ========== rates : numpy.ndarray A non-zero list of rates. Expects ``rates`` to be sorted according to ``self.birth_event_updates + self.linear_event_updates + self.quadratic_event_updates``. Returns ======= dy : numpy.ndarray A state change vector. """ idy = custom_choice(rates/rates.sum()) if idy < len(self.birth_event_updates): return self.birth_event_updates[idy] elif idy < len(self.birth_event_updates) + len(self.linear_event_updates): idy -= len(self.birth_event_updates) return self.linear_event_updates[idy] else: idy -= (len(self.birth_event_updates) + len(self.linear_event_updates)) return self.quadratic_event_updates[idy]
[docs] def get_event_rates(self, t, y): """ Get a list of rate values corresponding to the previously set events. Parameters ---------- t : float Current time y : numpy.ndarray Current state vector Returns ------- rates : list A list of rate values corresponding to rates. Ordered as ``birth_rate_functions + linear_rate_functions + quadratic_rate_functions``. """ rates = [r(t,y) for r in self.birth_rate_functions] rates += [r(t,y) for r in self.linear_rate_functions] if self.correct_for_dynamical_population_size: population_size = self.y0.sum() else: population_size = self.initial_population_size rates += [ r(t,self.y0)/population_size for r in self.quadratic_rate_functions ] rates = np.array(rates) return rates
[docs] def get_numerical_event_and_rate_functions(self): """ This function is needed to generalize stochastic simulations for child classes. Returns ------- get_event_rates : func A function that takes the current time ``t`` and state vector ``y`` and returns numerical event rate lists. get_compartment_changes : funx A function that takes a numerical list of event ``rates`` and returns a random event state change vector with probability proportional to its entry in ``rates``. """ return self.get_event_rates, self.get_compartment_changes
[docs] def simulate(self, tmax, return_compartments=None, sampling_dt=None, sampling_callback=None, adopt_final_state=False, ): """ Returns values of the given compartments at the demanded time points (as a numpy.ndarray of shape ``(return_compartments), len(time_points)``. If ``return_compartments`` is None, all compartments will be returned. Parameters ---------- tmax : float maximum length of the simulation return_compartments : list of compartments, default = None: The compartments for which to return time series. If ``None``, all compartments will be returned. sampling_dt : float, default = None Temporal distance between samples of the compartment counts. If ``None``, every change will be returned. sampling_callback : funtion, default = None A function that's called when a sample is taken Returns ------- t : numpy.ndarray times at which compartment counts have been sampled result : dict Dictionary mapping a compartment to a time series of its count. """ if return_compartments is None: return_compartments = self.compartments if sampling_callback is not None and sampling_dt is None: raise ValueError('A sampling callback function can only be set if sampling_dt is set, as well.') ndx = [self.get_compartment_id(C) for C in return_compartments] current_state = self.y0.copy() compartments = [ current_state.copy() ] if not adopt_final_state: initial_state = current_state.copy() initial_time = self.t0 t = self.t0 time = [self.t0] get_event_rates, get_compartment_changes = self.get_numerical_event_and_rate_functions() current_event_rates = get_event_rates(t, self.y0) total_event_rate = current_event_rates.sum() if sampling_callback is not None: sampling_callback() # Check for a) zero event rate and b) zero possibility for any nodes being changed still. # This is important because it might happen that nodes # have a non-zero reaction rate but no targets left # at which point the simulation will never halt. while t < tmax and \ total_event_rate > 0: # sample and advance time according to current total rate tau, dy = self.get_time_leap_and_proposed_compartment_changes(t, current_event_rates=current_event_rates, get_event_rates=get_event_rates, get_compartment_changes=get_compartment_changes, ) new_t = t + tau # break if simulation time is reached if new_t >= tmax: break # sampling if sampling_dt is not None: # sample all the time steps that were demanded in between the two events last_sample_dt = time[-1] for idt in range(1,int(np.ceil((new_t-last_sample_dt)/sampling_dt))): time.append(last_sample_dt+idt*sampling_dt) compartments.append(current_state.copy()) if sampling_callback is not None: sampling_callback() # write losses and gains into the current state vector current_state += dy # save the current state if sampling_dt wasn't specified if sampling_dt is None: time.append(new_t) compartments.append(current_state.copy()) # save current state self.t0 = new_t self.y0 = current_state.copy() current_event_rates = get_event_rates(new_t, self.y0) total_event_rate = current_event_rates.sum() # advance time t = new_t if sampling_dt is not None: next_sample = time[-1] + sampling_dt if next_sample <= tmax: time.append(next_sample) compartments.append(current_state) if sampling_callback is not None: sampling_callback() # convert to result dictionary time = np.array(time) result = np.array(compartments) if not adopt_final_state: self.y0 = initial_state self.t0 = initial_time else: self.t0 = tmax return time, { compartment: result[:,c_ndx] for c_ndx, compartment in zip(ndx, return_compartments) }
[docs]class SIModel(EpiModel): """ An SI model derived from :class:`epipack.numeric_epi_models.EpiModel`. """ def __init__(self, infection_rate, initial_population_size=1.0): EpiModel.__init__(self, list("SI"), initial_population_size) self.set_processes([ ("S", "I", infection_rate, "I", "I"), ])
[docs]class SISModel(EpiModel): """ An SIS model derived from :class:`epipack.numeric_epi_models.EpiModel`. Parameters ---------- R0 : float The basic reproduction number. From this, the infection rate is computed as ``infection_rate = R0 * recovery_rate`` recovery_rate : float Recovery rate population_size : float, default = 1.0 Number of people in the population. """ def __init__(self, infection_rate, recovery_rate, initial_population_size=1.0): EpiModel.__init__(self, list("SI"), initial_population_size) self.set_processes([ ("S", "I", infection_rate, "I", "I"), ("I", recovery_rate, "S" ), ])
[docs]class SIRModel(EpiModel): """ An SIR model derived from :class:`epipack.numeric_epi_models.EpiModel`. """ def __init__(self, infection_rate, recovery_rate, initial_population_size=1.0): EpiModel.__init__(self, list("SIR"), initial_population_size) self.set_processes([ ("S", "I", infection_rate, "I", "I"), ("I", recovery_rate, "R"), ])
[docs]class SIRSModel(EpiModel): """ An SIRS model derived from :class:`epipack.numeric_epi_models.EpiModel`. """ def __init__(self, infection_rate, recovery_rate, waning_immunity_rate, initial_population_size=1.0): EpiModel.__init__(self, list("SIR"), initial_population_size) self.set_processes([ ("S", "I", infection_rate, "I", "I"), ("I", recovery_rate, "R"), ("R", waning_immunity_rate, "S"), ])
[docs]class SEIRModel(EpiModel): """ An SEIR model derived from :class:`epipack.numeric_epi_models.EpiModel`. """ def __init__(self, infection_rate, recovery_rate, symptomatic_rate, initial_population_size=1.0): EpiModel.__init__(self, list("SEIR"), initial_population_size) self.set_processes([ ("S", "I", infection_rate, "E", "I"), ("E", symptomatic_rate, "I"), ("I", recovery_rate, "R"), ])
if __name__=="__main__": # pragma: no cover N = 100 epi = EpiModel(list("SEIR"),100) #print(epi.compartments) #print() epi.set_processes([ ("S", "I", 2.0, "E", "I"), ("E", 1.0, "I"), ("I", 1.0, "R"), ]) print("#printing updates") #print([dy.toarray() for dy in epi.linear_event_updates]) #print([dy.toarray() for dy in epi.quadratic_event_updates]) import matplotlib.pyplot as pl from time import time N_meas = 5 tt = np.linspace(0,20,100) start = time() epi = EpiModel(list("SEIR"),100) epi.set_processes([ ("S", "I", 2.0, "E", "I"), ("E", 1.0, "I"), ("I", 1.0, "R"), ]) for meas in range(N_meas): epi.set_initial_conditions({'S':0.99*N,'I':0.01*N}) result = epi.integrate(tt) end = time() print("arrays needed", end-start,"s") pl.plot(tt, result['S'],label='S') pl.plot(tt, result['E'],label='E') pl.plot(tt, result['I'],label='I') pl.plot(tt, result['R'],label='R') pl.legend() from epipack import MatrixSEIRModel tt = np.linspace(0,20,50) SEIR = MatrixSEIRModel(2.0,1.0,1.0,initial_population_size=N) SEIR.set_initial_conditions({'S':0.99*N,'I':0.01*N}) result = SEIR.integrate(tt) pl.plot(tt, result['S'],'s',label='S',mfc='None') pl.plot(tt, result['E'],'s',label='E',mfc='None') pl.plot(tt, result['I'],'s',label='I',mfc='None') pl.plot(tt, result['R'],'s',label='R',mfc='None') ########## epi = EpiModel(list("SEIR"),100) epi.set_processes([ ("S", "I", 2.0, "E", "I"), ("E", 1.0, "I"), ("I", 1.0, "R"), ]) epi.set_initial_conditions({'S':99,'I':1}) t, result = epi.simulate(tt[-1]) pl.plot(t, result['S'],label='S') pl.plot(t, result['E'],label='E') pl.plot(t, result['I'],label='I') pl.plot(t, result['R'],label='R') pl.figure() S, I, R = list("SIR") N = 200 model = EpiModel([S,I],N,correct_for_dynamical_population_size=True) def temporalR0(t,y): return 4 + np.cos(t/100*2*np.pi) model.set_processes([ (S, I, temporalR0, I, I), (None, N, S), (I, 1, S), (S, 1, None), (I, 1, None), ]) model.set_initial_conditions({ S: 190, I: 10, }) def print_status(): print(model.t0/150*100) t, result = model.simulate(150,sampling_dt=0.5,sampling_callback=print_status) pl.plot(t, result['S'],label='S') pl.plot(t, result['I'],label='I') model.set_initial_conditions({ S: 190, I: 10, }) tt = np.linspace(0,100,1000) result = model.integrate(tt) pl.plot(tt, result['S'],label='S') pl.plot(tt, result['I'],label='I') pl.show()