# Source code for epipack.integrators

"""
Contains integrator methods that will be used
to integrate ODEs or SDEs.
"""

import warnings

import numpy as np
from scipy.integrate import ode, solve_ivp, quad
from scipy.optimize import newton

[docs]def integrate_ivp_until(dydt, t0, y0, stop_condition, solve_ivp_kwargs={}):
"""
Integrate an ODE system with the scipy.integrate.solve_ivp method.

Parameters
----------
dydt : function
A function returning the momenta of the ODE system.
t0 : float
Initial time
y0 : numpy.ndarray
Initial conditions
stop_condition : function
A function stop_condition(t,y) that flips from
positive to negative values when a certain
condition is reached. For instance, when
an element reaches a certain threshold value:

.. code:: python

stop_condition = lambda t, y: 0.3 - y[2]

solve_ivp_kwargs : dict
Keyword arguments that will be passed to
scipy.integrate.solve_ivp.

Returns
-------
t : float
Times when the system state was recorded
The final time corresponds to the time when
the stop condition was reached.
y : numpy.ndarray
First axis accesses system states, second
axis refers to the system state corresponding
to the time points in t.
"""

y0 = np.array(y0,dtype=float)
stop_condition.terminal = True

result = solve_ivp(
fun=dydt,
y0=y0,
t_span=(t0,np.inf),
events=stop_condition,
**solve_ivp_kwargs,
)

y = result.y
t = result.t

return t, y

[docs]def integrate_dopri5(dydt, t, y0, *args):
"""
Integrate an ODE system with the Runge-Kutte Dormand Prince
method (with step-size control).

Parameters
----------
dydt : function
A function returning the momenta of the ODE system.
t : numpy.ndarray of float
Array of time points at which the functions should
be evaluated.
y0 : numpy.ndarray
Initial conditions
*args : :obj:list
List of parameters that will be passed to the
momenta function.
"""

# get copy
y0 = np.array(y0,dtype=float)
t = np.array(t,dtype=float)

t0 = t[0]
t = t[1:]

# initiate integrator
r = ode(dydt)
r.set_integrator('dopri5')
r.set_initial_value(y0,t0)
r.set_f_params(*args)
result = np.zeros((len(y0),len(t)+1))
result[:,0] = y0

# loop through all demanded time points
for it, t_ in enumerate(t):

# get result of ODE integration
y = r.integrate(t_)

# write result to result vector
result[:,it+1] = y

return result

[docs]def integrate_euler(dydt, t, y0, *args):
"""
Integrate an ODE system with Euler's method.

Parameters
----------
dydt : function
A function returning the momenta of the ODE system.
t : numpy.ndarray of float
Array of time points at which the functions should
be evaluated.
y0 : numpy.ndarray
Initial conditions
*args : :obj:list
List of parameters that will be passed to the
momenta function.
"""

# get copy
y0 = np.array(y0,dtype=float)
t = np.array(t,dtype=float)

t0 = t[0]

# initiate integrator
result = np.zeros((len(y0),len(t)))
result[:,0] = y0
old_t = t0

# loop through all demanded time points
for it, t_ in enumerate(t[1:]):

# get result of ODE integration
dt = t_ - old_t
y = result[:,it] + dt*dydt(old_t, result[:,it], *args)

# write result to result vector
result[:,it+1] = y

old_t = t_

return result

[docs]def integrate_SDE(dydt, t, y0, diffusion_constants, *args):
"""
Integrate an SDE system with Euler's method.

Parameters
----------
dydt : function
A function returning the momenta of the deterministic ODE system.
t : numpy.ndarray of float
Array of time points at which the functions should
be evaluated. Time steps must be equally spaced.
y0 : numpy.ndarray
Initial conditions
diffusion_constants : numpy.ndarray
Scalar and constant diffusion coefficients as prefactors
for each compartment's Wiener process (has to be of same
length as y0)

corresponds to :math:D_i in

.. math::

dY_i = f_i(\mathbf Y,t) dt + D_i dW_i
*args : :obj:list
List of parameters that will be passed to the
momenta function.
"""

# get copy
y0 = np.array(y0,dtype=float)
t = np.array(t,dtype=float)
D = np.array(diffusion_constants,dtype=float)

t0 = t[0]

dt = t[1]-t[0]
sqrt_dt = np.sqrt(dt)

assert(np.all(np.isclose(dt,t[1:]-t[:-1])))
assert(np.all(D>=0.))
ndx = np.where(D>0)[0]

# initiate integrator
result = np.zeros((len(y0),len(t)))
result[:,0] = y0

dW = np.zeros((len(y0),len(t)-1))
dW[ndx,:] = np.sqrt(dt) * np.random.randn(len(ndx),len(t)-1)

# loop through all demanded time points
for it, t_ in enumerate(t[1:]):

# get result of ODE integration
y = result[:,it] + dt * dydt(t[it], result[:,it], *args) + D * dW[:,it]

# write result to result vector
result[:,it+1] = y

return result

[docs]class IntegrationMixin():
"""
A helper MixIn that enables the base
class to set initial conditions
and integrate numerical ODEs.

Expects the base class to have the following methods
and attributes:

- get_numerical_dydt()
- get_compartment_id()
- compartments
- N_comp
"""

[docs]    def integrate_and_return_by_index(self,
time_points,
return_compartments=None,
integrator='dopri5',
diffusion_constants=None,
):
r"""
Returns values of the given compartments at the demanded
time points (as a numpy.ndarray of shape
(return_compartments), len(time_points).

Parameters
==========
time_points : np.ndarray
An array of time points at which the compartment values
should be evaluated and returned.
return_compartments : list, default = None
A list of compartments for which the result should be returned.
If return_compartments is None, all compartments will
be returned.
integrator : str, default = 'dopri5'
Which method to use for integration. Currently supported are
'euler' and 'dopri5'. If 'euler' is chosen,
:math:\delta t will be determined by the difference
of consecutive entries in time_points.
adopt_final_state : bool, default = False
Whether or not to adopt the final state of the integration
diffusion_constants : numpy.ndarray
Scalar and constant diffusion coefficients as prefactors
for each compartment's Wiener process (has to be of same
length as y0)

Returns
=======
result : numpy.ndarray
First axis accesses system states, second
axis refers to the system state corresponding
to the time points in time_points.
"""

dydt = self.get_numerical_dydt()
self.t0 = time_points[0]

if integrator.lower() == 'dopri5':
result = integrate_dopri5(dydt, time_points, self.y0)
elif integrator.lower() == 'euler':
result = integrate_euler(dydt, time_points, self.y0)
elif integrator.lower() == 'sde':
if diffusion_constants is None:
raise ValueError("'diffusion_constants' undefined but necessary for SDE integration.")
result = integrate_euler(dydt, time_points, self.y0, diffusion_constants)
else:
raise ValueError(f"Unknown integrator {integrator}")

self.t0 = time_points[-1]
self.y0 = result[:,-1]

if return_compartments is not None:
ndx = [self.get_compartment_id(C) for C in return_compartments]
result = result[ndx,:]

return result

[docs]    def integrate(self,
time_points,
return_compartments=None,
*args,
**kwargs,
):
r"""
Returns values of the given compartments at the demanded
time points (as a dictionary).

Parameters
==========
time_points : np.ndarray
An array of time points at which the compartment values
should be evaluated and returned.
return_compartments : list, default = None
A list of compartments for which the result should be returned.
If return_compartments is None, all compartments will
be returned.
integrator : str, default = 'dopri5'
Which method to use for integration. Currently supported are
'euler' and 'dopri5'. If 'euler' is chosen,
:math:\delta t will be determined by the difference
of consecutive entries in time_points.
adopt_final_state : bool, default = False
Whether or not to adopt the final state of the integration
as new initial conditions.
diffusion_constants : numpy.ndarray
Scalar and constant diffusion coefficients as prefactors
for each compartment's Wiener process (has to be of same
length as y0)

Returns
=======
result : dict
Dictionary mapping compartments to time series.
"""
if return_compartments is None:
return_compartments = self.compartments

result = self.integrate_and_return_by_index(time_points, return_compartments,*args,**kwargs)

result_dict = {}
for icomp, compartment in enumerate(return_compartments):
result_dict[compartment] = result[icomp,:]

return result_dict

[docs]    def set_initial_conditions(self, initial_conditions, initial_time=0.0, allow_nonzero_column_sums=False):
"""
Set the initial conditions for integration

Parameters
----------
initial_conditions : dict
A dictionary that maps compartments to a fraction
of the population. Compartments that are not
set in this dictionary are assumed to have an initial condition
of zero.
allow_nonzero_column_sums : bool, default = False
If True, an error is raised when the initial conditions do not
sum to the population size.
"""

if type(initial_conditions) == dict:
initial_conditions = list(initial_conditions.items())

self.y0 = np.zeros(self.N_comp)
self.t0 = initial_time
total = 0
for compartment, amount in initial_conditions:
total += amount
if self.y0[self.get_compartment_id(compartment)] != 0:
warnings.warn('Double entry in initial conditions for compartment '+str(compartment))
else:
self.y0[self.get_compartment_id(compartment)] = amount

if np.abs(total-self.initial_population_size)/self.initial_population_size > 1e-14 and not allow_nonzero_column_sums:
warnings.warn('Sum of initial conditions does not equal unity.')

return self

[docs]    def integrate_and_return_by_index_until(
self,
t0,
stop_condition,
return_compartments=None,
solve_ivp_kwargs={},
):
r"""
Integrate the system until a stop condition is reached,
using scipy.integrate.solve_ivp.
Returns values of the given compartments at the demanded
time points (as a numpy.ndarray of shape
(return_compartments), len(time_points).

Parameters
==========
t0 : float
Initial time
stop_condition : function
A function stop_condition(t,y) that flips from
positive to negative values when a certain
condition is reached. For instance, when
an element reaches a certain threshold value:

.. code:: python

stop_condition = lambda t, y: 0.3 - y[2]

return_compartments : list, default = None
A list of compartments for which the result should be returned.
If return_compartments is None, all compartments will
be returned.
adopt_final_state : bool, default = False
Whether or not to adopt the final state of the integration
solve_ivp_kwargs : dict
Keyword arguments that will be passed to
scipy.integrate.solve_ivp.

Returns
=======
t : float
Times when the system state was recorded
The final time corresponds to the time when
the stop condition was reached.
y : numpy.ndarray
First axis accesses system states, second
axis refers to the system state corresponding
to the time points in t.
"""

dydt = self.get_numerical_dydt()
self.t0 = t0

time_points, result = integrate_ivp_until(dydt, self.t0, self.y0, stop_condition, solve_ivp_kwargs)

self.t0 = time_points[-1]
self.y0 = result[:,-1]

if return_compartments is not None:
ndx = [self.get_compartment_id(C) for C in return_compartments]
result = result[ndx,:]

return time_points, result

[docs]    def integrate_until(self,
t0,
stop_condition,
return_compartments=None,
solve_ivp_kwargs={},
):
r"""
Integrate the system until a stop condition is reached,
using scipy.integrate.solve_ivp.

Parameters
==========
t0 : float
Initial time
stop_condition : function
A function stop_condition(t,y) that flips from
positive to negative values when a certain
condition is reached. For instance, when
an element reaches a certain threshold value:

.. code:: python

stop_condition = lambda t, y: 0.3 - y[2]

return_compartments : list, default = None
A list of compartments for which the result should be returned.
If return_compartments is None, all compartments will
be returned.
adopt_final_state : bool, default = False
Whether or not to adopt the final state of the integration
solve_ivp_kwargs : dict
Keyword arguments that will be passed to
scipy.integrate.solve_ivp.

Returns
=======
t : float
Times when the system state was recorded
The final time corresponds to the time when
the stop condition was reached.
result : dict
Dictionary mapping compartments to time series.
"""
if return_compartments is None:
return_compartments = self.compartments

t, result = self.integrate_and_return_by_index_until(
t0,
stop_condition,
return_compartments,
solve_ivp_kwargs,
)

result_dict = {}
for icomp, compartment in enumerate(return_compartments):
result_dict[compartment] = result[icomp,:]

return t, result_dict

[docs]def time_leap_newton(t0, y0, get_event_rates, rand=None):
"""
Compute a time leap for time-varying rate functions
based on Gillespie's SSA using Newton's method

Parameters
==========
t0 : float
The current time
y0 : numpy.ndarray
Current state of the system
get_event_rates : function
A function that takes time t and state y
as input and returns an array of corresponding
event rates
rand : float, default = None
A random number from the unit interval.

Returns
=======
new_t : float
A new time.
"""
integrand = lambda _t : get_event_rates(_t, y0).sum()
integral = lambda _t : quad(integrand, t0, _t)[0]

if rand is None:
rand = np.random.rand()
else:
assert(rand >= 0)
assert(rand < 1)
_1_minus_r = 1 - rand

rootfunction = lambda _t: - np.log(_1_minus_r) - integral(_t)
new_t = newton(rootfunction, t0, fprime=lambda _t: -integrand(_t))

return new_t

[docs]def time_leap_ivp(t0, y0, get_event_rates, rand=None):
"""
Compute a time leap for time-varying rate functions
based on Gillespie's SSA using Newton's method

Parameters
==========
t0 : float
The current time
y0 : numpy.ndarray
Current state of the system
get_event_rates : function
A function that takes time t and state y
as input and returns an array of corresponding
event rates
rand : float, default = None
A random number from the unit interval.

Returns
=======
new_t : float
A new time.
"""
integrand = lambda _t, _y: [get_event_rates(_t, y0).sum()]
initial = integrand(t0,None)

if rand is None:
rand = np.random.rand()
else:
assert(rand >= 0)
assert(rand < 1)
_1_minus_r = 1 - rand

rootfunction = lambda _t, _y: - np.log(_1_minus_r) - _y[0]
rootfunction.terminal = True
result = solve_ivp(integrand,[t0,np.inf],y0=[0],method='RK23',events=rootfunction)

return result.t_events[0][0]

if __name__ == "__main__":     # pragma: no cover

from epipack import SISModel

SIS = SISModel(infection_rate=2.0,recovery_rate=1)
SIS.set_initial_conditions({
'S': 0.99,
'I': 0.01,
})

tt = np.linspace(0,10,100)
result = integrate_dopri5(SIS.dydt, tt, SIS.y0)

import matplotlib.pyplot as pl

pl.plot(tt, result[0,:])
pl.plot(tt, result[1,:])

pl.show()