Source code for epipack.numeric_matrix_epi_models

"""
Provides an API to define Matrix epidemiological models.
"""

import numpy as np
import scipy.sparse as sprs

import warnings

from epipack.integrators import (
            integrate_dopri5,
            integrate_euler,
            IntegrationMixin,
        )

from epipack.process_conversions import (
            processes_to_rates,
            transition_processes_to_rates,
            fission_processes_to_rates,
            fusion_processes_to_rates,
            transmission_processes_to_rates,
        )


[docs]class MatrixEpiModel(IntegrationMixin): """ A general class to define standard mean-field compartmental epidemiological model. Parameters ---------- compartments : :obj:`list` of :obj:`string` A list containing compartment strings. Attributes ---------- compartments : :obj:`list` of :obj:`string` A list containing strings that describe each compartment, (e.g. "S", "I", etc.). N_comp : :obj:`int` Number of compartments (including population number) linear_rates : scipy.sparse.csr_matrix Sparse matrix containing transition rates of the linear processes. quadratic_rates : scipy.sparse.csr_matrix List of sparse matrices that contain transition rates of the quadratic processes for each compartment. affected_by_quadratic_process : :obj:`list` of :obj:`int` List of integer compartment IDs, collecting compartments that are affected by the quadratic processes Example ------- .. code:: python >>> epi = MatrixEpiModel(["S","I","R"]) >>> print(epi.compartments) [ "S", "I", "R" ] """ def __init__(self,compartments,initial_population_size=1,correct_for_dynamical_population_size=False): """ """ self.t0 = None self.y0 = None self.affected_by_quadratic_process = [] self.compartments = list(compartments) self.compartment_ids = { C: iC for iC, C in enumerate(compartments) } self.initial_population_size = initial_population_size self.correct_for_dynamical_population_size = correct_for_dynamical_population_size self.N_comp = len(self.compartments) self.birth_rates = np.zeros((self.N_comp,),dtype=np.float64) self.linear_rates = sprs.csr_matrix((self.N_comp, self.N_comp),dtype=np.float64) self.quadratic_rates = [ sprs.csr_matrix((self.N_comp, self.N_comp),dtype=np.float64)\ for c in range(self.N_comp) ]
[docs] def get_compartment_id(self,C): """Get the integer ID of a compartment ``C``""" return self.compartment_ids[C]
[docs] def get_compartment(self,iC): """Get the compartment, given an integer ID ``iC``""" return self.compartments[iC]
[docs] def set_processes(self, process_list, allow_nonzero_column_sums=False, reset_rates=True, ignore_rate_position_checks=False, ): """ Converts a list of reaction process tuples to rate tuples and sets the rates for this model. Parameters ---------- process_list : :obj:`list` of :obj:`tuple` A list containing reaction processes in terms of tuples. .. code:: python [ # transition process ( source_compartment, rate, target_compartment), # transmission process ( coupling_compartment_0, coupling_compartment_1, rate, target_compartment_0, target_ccompartment_1), # fission process ( source_compartment, rate, target_compartment_0, target_ccompartment_1), # fusion process ( source_compartment_0, source_compartment_1, rate, target_compartment), # death process ( source_compartment, rate, None), # birth process ( None, rate, target_compartment), ] allow_nonzero_column_sums : bool, default : False Traditionally, epidemiological models preserve the total population size. If that's not the case, switch off testing for this. reset_rates : bool, default : True If this is `True`, reset all rates to zero before setting the new ones. ignore_rate_position_checks : bool, default = False This function usually checks whether the rate of a reaction is positioned correctly. You can turn this behavior off for transition, birth, death, and transmission processes. (Useful if you want to define symbolic transmission processes that are compartment-dependent). """ quadratic_rates, linear_rates = processes_to_rates(process_list, self.compartments, ignore_rate_position_checks, ) self.set_linear_rates(linear_rates,allow_nonzero_column_sums=allow_nonzero_column_sums) self.set_quadratic_rates(quadratic_rates,allow_nonzero_column_sums=allow_nonzero_column_sums) return self
[docs] def set_linear_rates(self,rate_list,allow_nonzero_column_sums=False,reset_rates=True): """ Define the linear transition rates between compartments. Parameters ========== rate_list : :obj:`list` of :obj:`tuple` A list of tuples that contains transitions rates in the following format: .. code:: python [ ( acting_compartment, affected_compartment, rate ), ... ] allow_nonzero_column_sums : :obj:`bool`, default : False Traditionally, epidemiological models preserve the total population size. If that's not the case, switch off testing for this. Example reset_rates : bool, default : True Whether to reset all linear rates to zero before converting those. """ if reset_rates: row = [] col = [] data = [] birth_rates = np.zeros((self.N_comp,),dtype=np.float64) else: linear_rates = self.linear_rates.tocoo() row = linear_rates.row.tolist() col = linear_rates.col.tolist() data = linear_rates.data.tolist() birth_rates = self.birth_rates.copy() for acting_compartment, affected_compartment, rate in rate_list: _t = self.get_compartment_id(affected_compartment) if acting_compartment is None: birth_rates[_t] += rate else: _s = self.get_compartment_id(acting_compartment) row.append(_t) col.append(_s) data.append(rate) linear_rates = sprs.coo_matrix((data, (row, col)),shape=(self.N_comp, self.N_comp),dtype=np.float64) linear_rates = linear_rates.tocsr() if not allow_nonzero_column_sums: test = linear_rates.sum(axis=0) + birth_rates test_sum = test.sum() if np.abs(test_sum) > 1e-15: warnings.warn("Rates do not sum to zero for each column:" + str(test_sum)) self.linear_rates = linear_rates self.birth_rates = birth_rates return self
[docs] def add_transition_processes(self,process_list): """ Define the linear transition processes between compartments. Parameters ========== process_list : :obj:`list` of :obj:`tuple` A list of tuples that contains transitions rates in the following format: .. code:: python [ ( source_compartment, rate, target_compartment ), ... ] Example ------- For an SEIR model. .. code:: python epi.add_transition_processes([ ("E", symptomatic_rate, "I" ), ("I", recovery_rate, "R" ), ]) """ linear_rates = transition_processes_to_rates(process_list) return self.set_linear_rates(linear_rates, reset_rates=False, allow_nonzero_column_sums=True, )
[docs] def add_fission_processes(self,process_list): """ Define linear fission processes between compartments. Parameters ========== process_list : :obj:`list` of :obj:`tuple` A list of tuples that contains fission rates in the following format: .. code:: python [ ("source_compartment", rate, "target_compartment_0", "target_compartment_1" ), ... ] Example ------- For pure exponential growth of compartment `B`. .. code:: python epi.add_fission_processes([ ("B", growth_rate, "B", "B" ), ]) """ linear_rates = fission_processes_to_rates(process_list) return self.set_linear_rates(linear_rates, reset_rates=False, allow_nonzero_column_sums=True, )
[docs] def add_fusion_processes(self,process_list): """ Define fusion processes between compartments. Parameters ========== process_list : :obj:`list` of :obj:`tuple` A list of tuples that contains fission rates in the following format: .. code:: python [ ("coupling_compartment_0", "coupling_compartment_1", rate, "target_compartment_0" ), ... ] Example ------- Fusion of reactants "A", and "B" to form "C". .. code:: python epi.add_fusion_processes([ ("A", "B", reaction_rate, "C" ), ]) """ quadratic_rates = fusion_processes_to_rates(process_list) return self.set_quadratic_rates(quadratic_rates, reset_rates=False, allow_nonzero_column_sums=True )
[docs] def add_transmission_processes(self,process_list): r""" A wrapper to define quadratic process rates through transmission reaction equations. Note that in stochastic network/agent simulations, the transmission rate is equal to a rate per link. For the mean-field ODEs, the rates provided to this function will just be equal to the prefactor of the respective quadratic terms. For instance, if you analyze an SIR system and simulate on a network of mean degree :math:`k_0`, a basic reproduction number :math:`R_0`, and a recovery rate :math:`\mu`, you would define the single link transmission process as .. code:: python ("I", "S", R_0/k_0 * mu, "I", "I") For the mean-field system here, the corresponding reaction equation would read .. code:: python ("I", "S", R_0 * mu, "I", "I") Parameters ---------- process_list : :obj:`list` of :obj:`tuple` A list of tuples that contains transitions rates in the following format: .. code:: python [ ("source_compartment", "target_compartment_initial", rate, "source_compartment", "target_compartment_final", ), ... ] Example ------- For an SEIR model. .. code:: python epi.add_transmission_processes([ ("I", "S", +1, "I", "E" ), ]) """ quadratic_rates = transmission_processes_to_rates(process_list) return self.set_quadratic_rates(quadratic_rates, reset_rates=False, allow_nonzero_column_sums=True, )
[docs] def add_quadratic_rates(self,rate_list,reset_rates=True,allow_nonzero_column_sums=False): """ Add quadratic rates without resetting the existing rate terms. See :func:`_tacoma.set_quadratic_rates` for docstring. """ return self.set_quadratic_rates(rate_list, reset_rates=False, allow_nonzero_column_sums=allow_nonzero_column_sums, )
[docs] def add_linear_rates(self,rate_list,reset_rates=True,allow_nonzero_column_sums=False): """ Add linear rates without resetting the existing rate terms. See :func:`_tacoma.set_linear_rates` for docstring. """ return self.set_linear_rates(rate_list, reset_rates=False, allow_nonzero_column_sums=allow_nonzero_column_sums, )
[docs] def set_quadratic_rates(self,rate_list,reset_rates=True,allow_nonzero_column_sums=False): r""" Define the quadratic transition processes between compartments. Parameters ---------- rate_list : :obj:`list` of :obj:`tuple` A list of tuples that contains transitions rates in the following format: .. code:: python [ ( "coupling_compartment_0", "coupling_compartment_1", "affected_compartment", rate ), ... ] allow_nonzero_column_sums : :obj:`bool`, default : False Traditionally, epidemiological models preserve the total population size. If that's not the case, switch off testing for this. Example ------- For an SEIR model. .. code:: python epi.set_quadratic_rates([ ("S", "I", "S", -1 ), ("S", "I", "E", +1 ) ]) Read as "Coupling of *S* and *I* leads to a reduction in "S" proportional to :math:`S\times I` and rate -1/time_unit. Furthermore, coupling of *S* and *I* leads to an increase in "E" proportional to :math:`S\times I` and rate +1/time_unit." """ matrices = [ [[], [], []] for c in self.compartments] if reset_rates: all_affected = [] else: for iC, _M in enumerate(self.quadratic_rates): if _M.count_nonzero() > 0: M = _M.tocoo() matrices[iC][0] = M.row.tolist() matrices[iC][1] = M.col.tolist() matrices[iC][2] = M.data.tolist() else: matrices[iC] = [ [], [], [] ] all_affected = self.affected_by_quadratic_process if len(self.affected_by_quadratic_process)>0 else [] for coupling0, coupling1, affected, rate in rate_list: c0, c1 = sorted([ self.get_compartment_id(c) for c in [coupling0, coupling1] ]) a = self.get_compartment_id(affected) matrices[a][0].append(c0) matrices[a][1].append(c1) matrices[a][2].append(rate) all_affected.append(a) total_sum = 0 for c in range(self.N_comp): row = matrices[c][0] col = matrices[c][1] data = matrices[c][2] if len(row) > 0: matrices[c] = sprs.coo_matrix((data,(row, col)), shape=(self.N_comp, self.N_comp), dtype=np.float64, ).tocsr() total_sum += matrices[c].sum(axis=0).sum() else: matrices[c] = sprs.csr_matrix((self.N_comp, self.N_comp),dtype=np.float64) if np.abs(total_sum) > 1e-14: if not allow_nonzero_column_sums: warnings.warn("Rates do not sum to zero. Sum = "+ str(total_sum)) self.affected_by_quadratic_process = sorted(list(set(all_affected))) self.quadratic_rates = matrices return self
[docs] def dydt(self,t,y): """ Compute the current momenta of the epidemiological model. Parameters ---------- t : :obj:`float` Current time y : numpy.ndarray The first entry is equal to the population size. The remaining entries are equal to the current fractions of the population of the respective compartments """ ynew = self.linear_rates.dot(y) + self.birth_rates if self.correct_for_dynamical_population_size: population_size = y.sum() else: population_size = self.initial_population_size for c in self.affected_by_quadratic_process: ynew[c] += y.T.dot(self.quadratic_rates[c].dot(y)) / population_size return ynew
[docs] def get_numerical_dydt(self): """ Return a function that obtains t and y as an input and returns dydt of this system """ return self.dydt
[docs] def jacobian(self,y0=None): """ Return Jacobian at point ``y0``. Will use ``self.y0`` if argument ``y0`` is ``None``. """ return self.get_transmission_matrix(y0=y0) + self.get_transition_matrix()
[docs] def get_jacobian_leading_eigenvalue(self,y0=None,returntype='complex'): """ Return leading eigenvalue of Jacobian at point ``y0``. Will use ``self.y0`` if argument ``y0`` is ``None``. Use argument ``returntype='real'`` to only obtain the real part of the eigenvalue. """ J = self.jacobian(y0=y0) return self._get_leading_eigenvalue(J,returntype,'LR')
[docs] def get_transmission_matrix(self,y0=None): """ Return transmission matrix at point ``y0``. Will use ``self.y0`` if argument ``y0`` is ``None``. """ if y0 is None: y0 = self.y0 if y0 is None: raise ValueError("No initial conditions have been given or set.") if self.correct_for_dynamical_population_size: raise NotImplementedError("transmission matrices for models with " "varying population size have not been implemented.") rows = [] for M in self.quadratic_rates: row = ((M+M.T).dot(y0)) / self.initial_population_size row = row.reshape(1,self.N_comp) row = sprs.csr_matrix(row) rows.append(row) T = sprs.vstack(rows,format='csr') return T
[docs] def get_transition_matrix(self): """ Return transition matrix. """ return self.linear_rates
[docs] def get_next_generation_matrix(self,y0=None): """ Return next generation matrix at point y0. Will use ``self.y0`` if argument ``y0`` is ``None``. """ T = self.get_transmission_matrix(y0=y0) Sigma = self.get_transition_matrix() # delete all compartments that contribute to the singularity # of the transition matrix del_cols = [] del_rows = [] n = Sigma.shape[0] for i in range(n): if np.all(Sigma[i,:].data==0): del_rows.append(i) for j in range(n): if np.all(Sigma[:,j].data==0): del_cols.append(j) del_comp = set(del_cols) | set(del_rows) use_comp = set(range(n)) - del_comp use_comp = sorted(list(use_comp)) # filter our compartments that do not make the matrix singular Sigma = (Sigma[use_comp,:])[:,use_comp] T = (T[use_comp,:])[:,use_comp] # convert Sigma to csc for more efficient inverse algo K = -T.dot(sprs.linalg.inv(Sigma.tocsc())) return K
[docs] def get_next_generation_matrix_leading_eigenvalue(self,y0=None,returntype='real'): """ Return the leading eigenvalue of the next generation matrix at point ``y0``. Will use ``self.y0`` if argument ``y0`` is ``None``. When ``y0`` is equal to the initial disease-free state, this function will return the basic reproduction number :math:`R_0`. The function will return only the real part by default. Use ``returntype='complex'`` to change this. """ K = self.get_next_generation_matrix(y0=y0) return self._get_leading_eigenvalue(K,returntype,method='LM')
def _get_leading_eigenvalue(self,M,returntype='complex',method='LR'): if M.shape == (1,1): _lambda = M[0,0] elif M.shape == (1,): _lambda = M[0] else: # I thought CSC format would be better for solver # but it's not, so I uncommented this # M_ = M.tocsc() M_ = M if M_.shape == (2,2): lambdas = np.linalg.eig(M_.toarray())[0] else: lambdas = sprs.linalg.eigs(M_,k=min(2,M_.shape[0]-2),which=method)[0] if method == 'LR': lambdas = sorted(lambdas, key=lambda x: -np.real(x)) _lambda = lambdas[0] if returntype == 'real': _lambda = np.real(_lambda) elif method == 'LM': lambdas = sorted(lambdas, key=lambda x: -np.abs(x)) _lambda = lambdas[0] if returntype == 'real': _lambda = np.abs(_lambda) return _lambda
[docs]class MatrixSIModel(MatrixEpiModel): """ An SI model derived from :class:`epipack.numeric_matrix_based_epi_models.MatrixEpiModel`. """ def __init__(self, infection_rate, initial_population_size=1.0): MatrixEpiModel.__init__(self, list("SI"), initial_population_size) self.set_quadratic_rates([ ("S", "I", "S", -infection_rate), ("S", "I", "I", +infection_rate), ])
[docs]class MatrixSISModel(MatrixEpiModel): """ An SIS model derived from :class:`epipack.numeric_matrix_based_epi_models.MatrixEpiModel`. Parameters ---------- R0 : float The basic reproduction number. From this, the infection rate is computed as ``infection_rate = R0 * recovery_rate`` recovery_rate : float Recovery rate population_size : float, default = 1.0 Number of people in the population. """ def __init__(self, R0, recovery_rate, initial_population_size=1.0): infection_rate = R0 * recovery_rate MatrixEpiModel.__init__(self, list("SI"), initial_population_size) self.set_quadratic_rates([ ("S", "I", "S", -infection_rate), ("S", "I", "I", +infection_rate), ]) self.add_transition_processes([ ("I", recovery_rate, "S" ), ])
[docs]class MatrixSIRModel(MatrixEpiModel): """ An SIR model derived from :class:`epipack.numeric_matrix_based_epi_models.MatrixEpiModel`. """ def __init__(self, R0, recovery_rate, initial_population_size=1.0): infection_rate = R0 * recovery_rate MatrixEpiModel.__init__(self, list("SIR"), initial_population_size) self.set_quadratic_rates([ ("S", "I", "S", -infection_rate), ("S", "I", "I", +infection_rate), ]) self.add_transition_processes([ ("I", recovery_rate, "R"), ])
[docs]class MatrixSIRSModel(MatrixEpiModel): """ An SIRS model derived from :class:`epipack.numeric_matrix_based_epi_models.MatrixEpiModel`. """ def __init__(self, R0, recovery_rate, waning_immunity_rate, initial_population_size=1.0): infection_rate = R0 * recovery_rate MatrixEpiModel.__init__(self, list("SIR"), initial_population_size) self.set_quadratic_rates([ ("S", "I", "S", -infection_rate), ("S", "I", "I", +infection_rate), ]) self.add_transition_processes([ ("I", recovery_rate, "R"), ("R", waning_immunity_rate, "S"), ])
[docs]class MatrixSEIRModel(MatrixEpiModel): """ An SEIR model derived from :class:`epipack.numeric_matrix_based_epi_models.MatrixEpiModel`. """ def __init__(self, R0, recovery_rate, symptomatic_rate, initial_population_size=1.0): infection_rate = R0 * recovery_rate MatrixEpiModel.__init__(self, list("SEIR"), initial_population_size) self.set_quadratic_rates([ ("S", "I", "S", -infection_rate), ("S", "I", "E", +infection_rate), ]) self.add_transition_processes([ ("E", symptomatic_rate, "I"), ("I", recovery_rate, "R"), ])
if __name__=="__main__": # pragma: no cover epi = MatrixEpiModel(list("SEIR")) print(epi.compartments) print() epi.add_transition_processes([ ("E", 1.0, "I"), ("I", 1.0, "R"), ]) print(epi.linear_rates) epi.set_quadratic_rates([ ("S", "I", "S", -1.0), ("S", "I", "E", +1.0), ]) print() for iM, M in enumerate(epi.quadratic_rates): print(epi.get_compartment(iM), M) import matplotlib.pyplot as pl N = 100 epi = MatrixSISModel(R0=2,recovery_rate=1,initial_population_size=N) print(epi.linear_rates) epi.set_initial_conditions({'S':0.99*N,'I':0.01*N}) tt = np.linspace(0,10,100) result = epi.integrate(tt,['S','I']) pl.plot(tt, result['S'],label='S') pl.plot(tt, result['I'],label='I') pl.legend() pl.show()