Source code for epipack.integrators

"""
Contains integrator methods that will be used
to integrate ODEs or SDEs.
"""

import warnings

import numpy as np
from scipy.integrate import ode, solve_ivp, quad
from scipy.optimize import newton

[docs]def integrate_ivp_until(dydt, t0, y0, stop_condition, solve_ivp_kwargs={}): """ Integrate an ODE system with the ``scipy.integrate.solve_ivp`` method. Parameters ---------- dydt : function A function returning the momenta of the ODE system. t0 : float Initial time y0 : numpy.ndarray Initial conditions stop_condition : function A function ``stop_condition(t,y)`` that flips from positive to negative values when a certain condition is reached. For instance, when an element reaches a certain threshold value: .. code:: python stop_condition = lambda t, y: 0.3 - y[2] solve_ivp_kwargs : dict Keyword arguments that will be passed to ``scipy.integrate.solve_ivp``. Returns ------- t : float Times when the system state was recorded The final time corresponds to the time when the stop condition was reached. y : numpy.ndarray First axis accesses system states, second axis refers to the system state corresponding to the time points in ``t``. """ y0 = np.array(y0,dtype=float) stop_condition.terminal = True result = solve_ivp( fun=dydt, y0=y0, t_span=(t0,np.inf), events=stop_condition, **solve_ivp_kwargs, ) y = result.y t = result.t return t, y
[docs]def integrate_dopri5(dydt, t, y0, *args): """ Integrate an ODE system with the Runge-Kutte Dormand Prince method (with step-size control). Parameters ---------- dydt : function A function returning the momenta of the ODE system. t : numpy.ndarray of float Array of time points at which the functions should be evaluated. y0 : numpy.ndarray Initial conditions *args : :obj:`list` List of parameters that will be passed to the momenta function. """ # get copy y0 = np.array(y0,dtype=float) t = np.array(t,dtype=float) t0 = t[0] t = t[1:] # initiate integrator r = ode(dydt) r.set_integrator('dopri5') r.set_initial_value(y0,t0) r.set_f_params(*args) result = np.zeros((len(y0),len(t)+1)) result[:,0] = y0 # loop through all demanded time points for it, t_ in enumerate(t): # get result of ODE integration y = r.integrate(t_) # write result to result vector result[:,it+1] = y return result
[docs]def integrate_euler(dydt, t, y0, *args): """ Integrate an ODE system with Euler's method. Parameters ---------- dydt : function A function returning the momenta of the ODE system. t : numpy.ndarray of float Array of time points at which the functions should be evaluated. y0 : numpy.ndarray Initial conditions *args : :obj:`list` List of parameters that will be passed to the momenta function. """ # get copy y0 = np.array(y0,dtype=float) t = np.array(t,dtype=float) t0 = t[0] # initiate integrator result = np.zeros((len(y0),len(t))) result[:,0] = y0 old_t = t0 # loop through all demanded time points for it, t_ in enumerate(t[1:]): # get result of ODE integration dt = t_ - old_t y = result[:,it] + dt*dydt(old_t, result[:,it], *args) # write result to result vector result[:,it+1] = y old_t = t_ return result
[docs]def integrate_SDE(dydt, t, y0, diffusion_constants, *args): """ Integrate an SDE system with Euler's method. Parameters ---------- dydt : function A function returning the momenta of the deterministic ODE system. t : numpy.ndarray of float Array of time points at which the functions should be evaluated. Time steps must be equally spaced. y0 : numpy.ndarray Initial conditions diffusion_constants : numpy.ndarray Scalar and constant diffusion coefficients as prefactors for each compartment's Wiener process (has to be of same length as y0) corresponds to :math:`D_i` in .. math:: dY_i = f_i(\mathbf Y,t) dt + D_i dW_i *args : :obj:`list` List of parameters that will be passed to the momenta function. """ # get copy y0 = np.array(y0,dtype=float) t = np.array(t,dtype=float) D = np.array(diffusion_constants,dtype=float) t0 = t[0] dt = t[1]-t[0] sqrt_dt = np.sqrt(dt) assert(np.all(np.isclose(dt,t[1:]-t[:-1]))) assert(np.all(D>=0.)) ndx = np.where(D>0)[0] # initiate integrator result = np.zeros((len(y0),len(t))) result[:,0] = y0 dW = np.zeros((len(y0),len(t)-1)) dW[ndx,:] = np.sqrt(dt) * np.random.randn(len(ndx),len(t)-1) # loop through all demanded time points for it, t_ in enumerate(t[1:]): # get result of ODE integration y = result[:,it] + dt * dydt(t[it], result[:,it], *args) + D * dW[:,it] # write result to result vector result[:,it+1] = y return result
[docs]class IntegrationMixin(): """ A helper MixIn that enables the base class to set initial conditions and integrate numerical ODEs. Expects the base class to have the following methods and attributes: - get_numerical_dydt() - get_compartment_id() - compartments - N_comp """
[docs] def integrate_and_return_by_index(self, time_points, return_compartments=None, integrator='dopri5', adopt_final_state=False, diffusion_constants=None, ): r""" Returns values of the given compartments at the demanded time points (as a numpy.ndarray of shape ``(return_compartments), len(time_points)``. Parameters ========== time_points : np.ndarray An array of time points at which the compartment values should be evaluated and returned. return_compartments : list, default = None A list of compartments for which the result should be returned. If ``return_compartments`` is None, all compartments will be returned. integrator : str, default = 'dopri5' Which method to use for integration. Currently supported are ``'euler'`` and ``'dopri5'``. If ``'euler'`` is chosen, :math:`\delta t` will be determined by the difference of consecutive entries in ``time_points``. adopt_final_state : bool, default = False Whether or not to adopt the final state of the integration diffusion_constants : numpy.ndarray Scalar and constant diffusion coefficients as prefactors for each compartment's Wiener process (has to be of same length as y0) Returns ======= result : numpy.ndarray First axis accesses system states, second axis refers to the system state corresponding to the time points in ``time_points``. """ dydt = self.get_numerical_dydt() self.t0 = time_points[0] if integrator.lower() == 'dopri5': result = integrate_dopri5(dydt, time_points, self.y0) elif integrator.lower() == 'euler': result = integrate_euler(dydt, time_points, self.y0) elif integrator.lower() == 'sde': if diffusion_constants is None: raise ValueError("'diffusion_constants' undefined but necessary for SDE integration.") result = integrate_euler(dydt, time_points, self.y0, diffusion_constants) else: raise ValueError(f"Unknown integrator {integrator}") if adopt_final_state: self.t0 = time_points[-1] self.y0 = result[:,-1] if return_compartments is not None: ndx = [self.get_compartment_id(C) for C in return_compartments] result = result[ndx,:] return result
[docs] def integrate(self, time_points, return_compartments=None, *args, **kwargs, ): r""" Returns values of the given compartments at the demanded time points (as a dictionary). Parameters ========== time_points : np.ndarray An array of time points at which the compartment values should be evaluated and returned. return_compartments : list, default = None A list of compartments for which the result should be returned. If ``return_compartments`` is None, all compartments will be returned. integrator : str, default = 'dopri5' Which method to use for integration. Currently supported are ``'euler'`` and ``'dopri5'``. If ``'euler'`` is chosen, :math:`\delta t` will be determined by the difference of consecutive entries in ``time_points``. adopt_final_state : bool, default = False Whether or not to adopt the final state of the integration as new initial conditions. diffusion_constants : numpy.ndarray Scalar and constant diffusion coefficients as prefactors for each compartment's Wiener process (has to be of same length as y0) Returns ======= result : dict Dictionary mapping compartments to time series. """ if return_compartments is None: return_compartments = self.compartments result = self.integrate_and_return_by_index(time_points, return_compartments,*args,**kwargs) result_dict = {} for icomp, compartment in enumerate(return_compartments): result_dict[compartment] = result[icomp,:] return result_dict
[docs] def set_initial_conditions(self, initial_conditions, initial_time=0.0, allow_nonzero_column_sums=False): """ Set the initial conditions for integration Parameters ---------- initial_conditions : dict A dictionary that maps compartments to a fraction of the population. Compartments that are not set in this dictionary are assumed to have an initial condition of zero. allow_nonzero_column_sums : bool, default = False If True, an error is raised when the initial conditions do not sum to the population size. """ if type(initial_conditions) == dict: initial_conditions = list(initial_conditions.items()) self.y0 = np.zeros(self.N_comp) self.t0 = initial_time total = 0 for compartment, amount in initial_conditions: total += amount if self.y0[self.get_compartment_id(compartment)] != 0: warnings.warn('Double entry in initial conditions for compartment '+str(compartment)) else: self.y0[self.get_compartment_id(compartment)] = amount if np.abs(total-self.initial_population_size)/self.initial_population_size > 1e-14 and not allow_nonzero_column_sums: warnings.warn('Sum of initial conditions does not equal unity.') return self
[docs] def integrate_and_return_by_index_until( self, t0, stop_condition, return_compartments=None, adopt_final_state=False, solve_ivp_kwargs={}, ): r""" Integrate the system until a stop condition is reached, using ``scipy.integrate.solve_ivp``. Returns values of the given compartments at the demanded time points (as a numpy.ndarray of shape ``(return_compartments), len(time_points)``. Parameters ========== t0 : float Initial time stop_condition : function A function ``stop_condition(t,y)`` that flips from positive to negative values when a certain condition is reached. For instance, when an element reaches a certain threshold value: .. code:: python stop_condition = lambda t, y: 0.3 - y[2] return_compartments : list, default = None A list of compartments for which the result should be returned. If ``return_compartments`` is None, all compartments will be returned. adopt_final_state : bool, default = False Whether or not to adopt the final state of the integration solve_ivp_kwargs : dict Keyword arguments that will be passed to ``scipy.integrate.solve_ivp``. Returns ======= t : float Times when the system state was recorded The final time corresponds to the time when the stop condition was reached. y : numpy.ndarray First axis accesses system states, second axis refers to the system state corresponding to the time points in `t`. """ dydt = self.get_numerical_dydt() self.t0 = t0 time_points, result = integrate_ivp_until(dydt, self.t0, self.y0, stop_condition, solve_ivp_kwargs) if adopt_final_state: self.t0 = time_points[-1] self.y0 = result[:,-1] if return_compartments is not None: ndx = [self.get_compartment_id(C) for C in return_compartments] result = result[ndx,:] return time_points, result
[docs] def integrate_until(self, t0, stop_condition, return_compartments=None, adopt_final_state=False, solve_ivp_kwargs={}, ): r""" Integrate the system until a stop condition is reached, using ``scipy.integrate.solve_ivp``. Parameters ========== t0 : float Initial time stop_condition : function A function ``stop_condition(t,y)`` that flips from positive to negative values when a certain condition is reached. For instance, when an element reaches a certain threshold value: .. code:: python stop_condition = lambda t, y: 0.3 - y[2] return_compartments : list, default = None A list of compartments for which the result should be returned. If ``return_compartments`` is None, all compartments will be returned. adopt_final_state : bool, default = False Whether or not to adopt the final state of the integration solve_ivp_kwargs : dict Keyword arguments that will be passed to ``scipy.integrate.solve_ivp``. Returns ======= t : float Times when the system state was recorded The final time corresponds to the time when the stop condition was reached. result : dict Dictionary mapping compartments to time series. """ if return_compartments is None: return_compartments = self.compartments t, result = self.integrate_and_return_by_index_until( t0, stop_condition, return_compartments, adopt_final_state, solve_ivp_kwargs, ) result_dict = {} for icomp, compartment in enumerate(return_compartments): result_dict[compartment] = result[icomp,:] return t, result_dict
[docs]def time_leap_newton(t0, y0, get_event_rates, rand=None): """ Compute a time leap for time-varying rate functions based on Gillespie's SSA using Newton's method on a quad-integrator. Parameters ========== t0 : float The current time y0 : numpy.ndarray Current state of the system get_event_rates : function A function that takes time t and state y as input and returns an array of corresponding event rates rand : float, default = None A random number from the unit interval. Returns ======= new_t : float A new time. """ integrand = lambda _t : get_event_rates(_t, y0).sum() integral = lambda _t : quad(integrand, t0, _t)[0] if rand is None: rand = np.random.rand() else: assert(rand >= 0) assert(rand < 1) _1_minus_r = 1 - rand rootfunction = lambda _t: - np.log(_1_minus_r) - integral(_t) new_t = newton(rootfunction, t0, fprime=lambda _t: -integrand(_t)) return new_t
[docs]def time_leap_ivp(t0, y0, get_event_rates, rand=None): """ Compute a time leap for time-varying rate functions based on Gillespie's SSA using Newton's method on a quad-integrator. Parameters ========== t0 : float The current time y0 : numpy.ndarray Current state of the system get_event_rates : function A function that takes time t and state y as input and returns an array of corresponding event rates rand : float, default = None A random number from the unit interval. Returns ======= new_t : float A new time. """ integrand = lambda _t, _y: [get_event_rates(_t, y0).sum()] initial = integrand(t0,None) if rand is None: rand = np.random.rand() else: assert(rand >= 0) assert(rand < 1) _1_minus_r = 1 - rand rootfunction = lambda _t, _y: - np.log(_1_minus_r) - _y[0] rootfunction.terminal = True result = solve_ivp(integrand,[t0,np.inf],y0=[0],method='RK23',events=rootfunction) return result.t_events[0][0]
if __name__ == "__main__": # pragma: no cover from epipack import SISModel SIS = SISModel(infection_rate=2.0,recovery_rate=1) SIS.set_initial_conditions({ 'S': 0.99, 'I': 0.01, }) tt = np.linspace(0,10,100) result = integrate_dopri5(SIS.dydt, tt, SIS.y0) import matplotlib.pyplot as pl pl.plot(tt, result[0,:]) pl.plot(tt, result[1,:]) pl.show()