Source code for branchpro.models
#
# ForwardModel Class
#
# This file is part of BRANCHPRO
# (https://github.com/SABS-R3-Epidemiology/branchpro.git) which is released
# under the BSD 3-clause license. See accompanying LICENSE.md for copyright
# notice and full license details.
#
import numpy as np
[docs]
class ForwardModel(object):
"""ForwardModel Class:
Base class for the model classes included in the branchpro package.
Classes inheriting from ``ForwardModel`` class can implement the methods
directly in Python.
Methods
-------
simulate: return model output for specified parameters and times.
"""
def __init__(self):
super(ForwardModel, self).__init__()
[docs]
def simulate(self, parameters, times):
"""
Runs a forward simulation with the given ``parameters`` and returns a
time-series with data points corresponding to the given ``times``.
Returns a sequence of length ``n_times`` (for single output problems)
or a NumPy array of shape ``(n_times, n_outputs)`` (for multi-output
problems), representing the values of the model at the given ``times``.
Parameters
----------
parameters
An ordered sequence of parameter values.
times
The times at which to evaluate. Must be an ordered sequence,
without duplicates, and without negative values.
All simulations are started at time 0, regardless of whether this
value appears in ``times``.
"""
raise NotImplementedError
#
# BranchProModel Class
#
[docs]
class BranchProModel(ForwardModel):
r"""BranchProModel Class:
Class for the models following a Branching Processes behaviour.
It inherits from the ``ForwardModel`` class.
In the branching process model, we track the number of cases
registered each day, I_t, also known as the "incidence" at time t.
The incidence at time t is modelled by a random variable distributed
according to a Poisson distribution with a mean that depends on previous
number of cases, according to the following formula:
.. math::
E(I_{t}^{\text(local)}|I_0, I_1, \dots I_{t-1}, w_{s}, R_{t}) =
R_{t}\sum_{s=1}^{t}I_{t-s}w_{s}
Always apply method :meth:`set_r_profile` before calling
:meth:`BranchProModel.simulate` for a change of R_t profile!
Parameters
----------
initial_r
(numeric) Value of the reproduction number at the beginning
of the epidemic
serial_interval
(list) Unnormalised probability distribution of that the recipient
first displays symptoms s days after the infector first displays
symptoms.
"""
def __init__(self, initial_r, serial_interval):
super(BranchProModel, self).__init__()
if np.asarray(serial_interval).ndim != 1:
raise ValueError(
'Serial interval values storage format must be 1-dimensional')
if np.sum(serial_interval) <= 0:
raise ValueError('Sum of serial interval values must be > 0.')
if not isinstance(initial_r, (int, float)):
raise TypeError('Value of R must be integer or float.')
# Invert order of serial intervals for ease in _normalised_daily_mean
self._serial_interval = np.asarray(serial_interval)[::-1]
self._r_profile = np.array([initial_r])
self._normalizing_const = np.sum(self._serial_interval)
[docs]
def set_r_profile(self, new_rs, start_times, last_time=None):
"""
Creates a new R_t profile for the model.
Parameters
----------
new_rs
sequence of new time-dependent values of the reproduction
numbers.
start_times
sequence of the first time unit when the corresponding
indexed value of R_t in new_rs is used. Must be an ordered sequence
and without duplicates or negative values.
last_time
total evaluation time; optional.
"""
# Raise error if not correct dimensionality of inputs
if np.asarray(new_rs).ndim != 1:
raise ValueError(
'New reproduction numbers storage format must be 1-dimensional'
)
if np.asarray(start_times).ndim != 1:
raise ValueError(
'Starting times values storage format must be 1-dimensional')
# Raise error if inputs do not have same shape
if np.asarray(new_rs).shape != np.asarray(start_times).shape:
raise ValueError('Both inputs should have same number of elements')
# Raise error if start times are not non-negative and increasing
if np.any(np.asarray(start_times) < 0):
raise ValueError('Times can not be negative.')
if np.any(np.asarray(start_times)[:-1] >= np.asarray(start_times)[1:]):
raise ValueError('Times must be increasing.')
# Ceil times to integer numbers a
times = np.ceil(start_times).astype(int)
# Create r profile
r_profile = []
# Fill in initial r from day 1 up to start time
initial_r = self._r_profile[0]
r_profile += [initial_r] * (times[0] - 1)
# Fill in later r's
time_intervals = times[1:] - times[:-1]
for time_id, time_interval in enumerate(time_intervals):
# Append r for each time unit
r_profile += [new_rs[time_id]] * time_interval
# Add final r
if last_time:
final_interval = last_time - start_times[-1] + 1
r_profile += [new_rs[-1]] * final_interval
else:
r_profile += [new_rs[-1]]
self._r_profile = np.asarray(r_profile)
[docs]
def get_serial_intervals(self):
"""
Returns serial intervals for the model.
"""
# Reverse inverting of order of serial intervals
return self._serial_interval[::-1]
[docs]
def set_serial_intervals(self, serial_intervals):
"""
Updates serial intervals for the model.
Parameters
----------
serial_intervals
New unnormalised probability distribution of that the recipient
first displays symptoms s days after the infector first displays
symptoms.
"""
if np.asarray(serial_intervals).ndim != 1:
raise ValueError(
'Chosen times storage format must be 1-dimensional')
# Invert order of serial intervals for ease in _effective_no_infectives
self._serial_interval = np.asarray(serial_intervals)[::-1]
self._normalizing_const = np.sum(self._serial_interval)
def _effective_no_infectives(self, t, incidences):
"""
Computes expected number of new cases at time t, using previous
incidences and serial intervals at a rate of 1:1 reproduction.
Parameters
----------
t
evaluation time
incidences
sequence of incidence numbers
"""
if t > len(self._serial_interval):
start_date = t - len(self._serial_interval)
mean = (
np.sum(incidences[start_date:t] * self._serial_interval) /
self._normalizing_const)
return mean
mean = (
np.sum(incidences[:t] * self._serial_interval[-t:]) /
self._normalizing_const)
return mean
[docs]
def simulate(self, parameters, times):
"""
Runs a forward simulation with the given ``parameters`` and returns a
time-series with incidence numbers corresponding to the given ``times``
.
Parameters
----------
parameters
Initial number of cases.
times
The times at which to evaluate. Must be an ordered sequence,
without duplicates, and without negative values.
All simulations are started at time 0, regardless of whether this
value appears in ``times``.
"""
initial_cond = parameters
last_time_point = np.max(times)
# Repeat final r if necessary
# (r_1, r_2, ..., r_t)
if len(self._r_profile) < last_time_point:
missing_days = last_time_point - len(self._r_profile)
last_r = self._r_profile[-1]
repeated_r = np.full(shape=missing_days, fill_value=last_r)
self._r_profile = np.append(self._r_profile, repeated_r)
incidences = np.empty(shape=last_time_point + 1)
incidences[0] = initial_cond
# Construct simulation times in steps of 1 unit time each
simulation_times = np.arange(start=1, stop=last_time_point+1, step=1)
# Compute normalised daily means for full timespan
# and draw samples for the incidences
for t in simulation_times:
norm_daily_mean = self._r_profile[t-1] * (
self._effective_no_infectives(t, incidences))
incidences[t] = np.random.poisson(lam=norm_daily_mean, size=1)
mask = np.in1d(np.append(np.asarray(0), simulation_times), times)
return incidences[mask]
[docs]
class LocImpBranchProModel(BranchProModel):
r"""LocImpBranchProModel Class:
Class for the models following a Branching Processes behaviour with
local and imported cases. It inherits from the ``BranchProModel`` class.
In the branching process model, we track the number of cases
registered each day, I_t, also known as the "incidence" at time t.
For the local & imported cases scenario, to the local incidences we add
migration of cases from an external source. The conditions of this external
environment may differ from the ones we are currently in through a change
in the value of the R number.
To account for this difference, we assume that at all times the R number of
the imported cases is proportional to the R number of the local incidences:
.. math::
R_{t}^{\text(imported)} = \epsilon R_{t}^{\text(local)}
The local incidence at time t is modelled by a random variable distributed
according to a Poisson distribution with a mean that depends on previous
number of cases both local and imported, according to the following
formula:
.. math::
E(I_{t}^{\text(local)}|I_0, I_1, \dots I_{t-1}, w_{s}, R_{t}) =
R_{t}^{\text(local)}\sum_{s=1}^{t}I_{t-s}^{\text(local)}w_{s} +
R_{t}^{\text(imported)}\sum_{s=1}^{t}I_{t-s}^{\text(imported)}w_{s}
Always apply methods :meth:`set_r_profile` and :meth:`set_imported_cases`
before calling :meth:`LocImpBranchProModel.simulate` for a change of R_t
profile and for loading the imported cases data!
Parameters
----------
initial_r
(numeric) Value of the reproduction number at the beginning
of the epidemic
serial_interval
(list) Unnormalised probability distribution of that the recipient
first displays symptoms s days after the infector first displays
symptoms
epsilon
(numeric) Proportionality constant of the R number for imported cases
with respect to its analog for local ones.
"""
def __init__(self, initial_r, serial_interval, epsilon):
super().__init__(initial_r, serial_interval)
self.set_epsilon(epsilon)
[docs]
def set_epsilon(self, new_epsilon):
"""
Updates proportionality constant of the R number for imported cases
with respect to its analog for local ones.
Parameters
----------
new_epsilon
new value of constant of proportionality.
"""
if not isinstance(new_epsilon, (int, float)):
raise TypeError('Value of epsilon must be integer or float.')
if new_epsilon < 0:
raise ValueError('Epsilon needs to be greater or equal to 0.')
self.epsilon = new_epsilon
[docs]
def set_imported_cases(self, times, cases):
"""
Sets number of imported cases and when they occur.
Parameters
----------
times
times at which imported cases occur. Must be an ordered sequence,
without duplicates, and without negative values.
cases
number of imported cases at that specified point.
"""
# Raise error if not correct dimensionality of inputs
if np.asarray(times).ndim != 1:
raise ValueError(
'Times of arising imported cases storage format must \
be 1-dimensional'
)
if np.asarray(cases).ndim != 1:
raise ValueError(
'Number of imported cases storage format must be \
1-dimensional')
# Raise error if inputs do not have same shape
if np.asarray(times).shape != np.asarray(cases).shape:
raise ValueError('Both inputs should have same number of elements')
self._imported_times = np.asarray(times, dtype=int)
self._imported_cases = np.asarray(cases)
[docs]
def simulate(self, parameters, times):
"""
Runs a forward simulation with the given ``parameters`` and returns a
time-series with incidence numbers corresponding to the given ``times``
.
Parameters
----------
parameters
Initial number of cases.
times
The times at which to evaluate. Must be an ordered sequence,
without duplicates, and without negative values.
All simulations are started at time 0, regardless of whether this
value appears in ``times``.
"""
initial_cond = parameters
last_time_point = np.max(times)
# Repeat final r if necessary
# (r_1, r_2, ..., r_t)
if len(self._r_profile) < last_time_point:
missing_days = last_time_point - len(self._r_profile)
last_r = self._r_profile[-1]
repeated_r = np.full(shape=missing_days, fill_value=last_r)
self._r_profile = np.append(self._r_profile, repeated_r)
incidences = np.empty(shape=last_time_point + 1)
incidences[0] = initial_cond
# Create vector of imported cases
imported_incidences = np.zeros(last_time_point + 1, dtype=int)
imports_times = self._imported_times[
self._imported_times <= last_time_point]
imports_cases = self._imported_cases[
self._imported_times <= last_time_point]
np.put(
imported_incidences, ind=imports_times, v=imports_cases)
# Construct simulation times in steps of 1 unit time each
simulation_times = np.arange(start=1, stop=last_time_point+1, step=1)
# Compute normalised daily means for full timespan
# and draw samples for the incidences
for t in simulation_times:
norm_daily_mean = self._r_profile[t-1] * (
self._effective_no_infectives(
t, incidences) + self.epsilon * (
self._effective_no_infectives(
t, imported_incidences)))
incidences[t] = np.random.poisson(lam=norm_daily_mean, size=1)
mask = np.in1d(np.append(np.asarray(0), simulation_times), times)
return incidences[mask]