感染者数を数理モデルで予測しているThe Lancetの論文[^1]を見つけたので、日本国内の感染者数を予測するために数理モデルを実装し他の感染症と比較してみた。
(あくまで今回の分析の仮定のもとで得られた内容です。)
新型コロナウイルス(COVID-19, 以下新型コロナ)の今後の国内感染者数を数理モデルで予測する。
新型コロナの基本再生産数を他の感染症と比較する。
Scipyを用いた科学計算の実装。
「感染症がどのように伝播し、感染した人がどの程度の期間で発症し、重症化するのかといったプロセスを数式で記述する。」[^2]
感染症数理モデルでは、人口を感染の段階などで区分しそれぞれの集団の増減を微分方程式で表します。 単純なSEIRモデルと今回使う拡張版を説明します。
とすると、以下の連立微分方程式が得られます。
ただし、出生率と死亡率は無視し、発症後は必ず回復し免疫を獲得するため二度と感染しないとしています。
適当な初期値を与えて解けば、将来の感染者数が予想できます。(参考に図[^3]を載せておきます)
詳しい説明や実装はqiitaの記事[^4]が参考になります。
ここで、は基本再生産数と呼ばれ、「(全ての個体が初期に感受性を 有する状態で)1 人の感染者当たりが生産する 2 次感染者数」を意味します。[^5] 要はが大きいほど感染力が大きいということ。
を用いれば、予防接種をどれくら普及させれば感染症の流行が防げるか計算することもできます。
SEIR以外にも、全人口を居住地や年齢で細分化したり、感染症の特徴を微分方程式に反映させることで様々なモデルを作成できます。例えば、Eを考慮しないSIRや免疫獲得を考慮しないSEIS、受動免疫を考慮したMSEIRなどがあります[^6]。
上記のSEIRは単純すぎるので、今回は論文[^1]を参考に外国との人口の流出入や死亡率を考慮したモデルに拡張します。
とすると、同じく以下の連立微分方程式が得られます。
ただし、とについて、日本から海外に行く人は全員、海外から日本に来る人はに属しているとします。
論文[^1]と同じように以外のパラメータは適当な値で代用します。はSARSコロナを参考に、としました。は2019年2月の旅行者数を参考にとしました。
モデルの未知パラメータはです。推定方法は最小二乗法、最尤推定、MAP推定、ベイズ推定など色々あります[^7]が、今回は非定常ポアソン過程[^8]として尤度を求め最尤推定でを求めます。
みんな大好き最尤推定。まずは観測したデータが発生する確率(尤度)をの関数として表現し、それが最大となるようなを求めます。論文[^1]を参考に感染者数を非定常ポアソン過程としてモデル化して尤度を求めると、
ただし、: 観測初日、: 観測最終日、: での感染者数で、
とします。は感染者数がポアソン分布に従うとした時の平均を表します。
対数尤度からに無関係な項を除いて、は以下で推定できます。
import scipy
from scipy.integrate import odeint
from scipy.optimize import minimize
from sklearn.metrics import mean_absolute_error, mean_squared_error
class SIR(object):
def __init__(self, r_0, t_i, dt, init_S, init_I, init_R):
self.r_0 = r_0
self.t_i = t_i
self.dt = dt
self.init_state_list = [init_S, init_I, init_R]
def _get_param(self, params, key, default, t=None):
"""パラメータの時間変化に対応する
"""
if isinstance(params, dict):
if key in list(params.keys()):
param = params[key]
if isinstance(param, list):
return param[np.clip(int(t / self.dt), 0, len(param)-1)]
elif isinstance(param, np.ndarray):
return param
else:
return default
else:
return default
else:
return default
def _ode(self, state_list, t=None, params=None):
"""連立微分方程式を定義する
"""
r_0 = self._get_param(params, 'r_0', self.r_0, t=t)
t_i = self._get_param(params, 't_i', self.t_i, t=t)
S, I, R = state_list
N = S + I + R
dstate_dt = list()
dstate_dt[0] = - (r_0 / t_i) * (I / N) * S
dstate_dt[1] = (r_0 / t_i) * (I / N) * S - I / t_i
dstate_dt[2] = I / t_i
return dstate_dt
def solve_ode(self, len_days=365, params=None):
"""微分方程式を解く
"""
t = np.linspace(0, len_days, int(len_days / self.dt), endpoint=False)
args = (params,) if params else ()
return odeint(self._ode, self.init_state_list, t, args=args)
class CustomizedSEIRD(SIR):
def __init__(self, r_0=None, t_e=6.0, t_i=2.4, n_i_j=70000, n_j_i=40000, f=0.0001, dt=1,
init_S=126800000, init_E=0, init_I=1, init_R=0, init_D=0):
self.r_0 = r_0
self.t_e = t_e
self.t_i = t_i
self.n_i_j = n_i_j
self.n_j_i = n_j_i
self.f = f
self.dt = dt
self.init_state_list = [init_S, init_E, init_I, init_R, init_D]
def _ode(self, state_list, t=None, params=None):
"""連立微分方程式を定義する
"""
r_0 = self._get_param(params, 'r_0', self.r_0, t=t)
t_e = self._get_param(params, 't_e', self.t_e, t=t)
t_i = self._get_param(params, 't_i', self.t_i, t=t)
n_i_j = self._get_param(params, 'n_i_j', self.n_i_j, t=t)
n_j_i = self._get_param(params, 'n_j_i', self.n_j_i, t=t)
f = self._get_param(params, 'f', self.f)
S, E, I, R, D = state_list
N = S + E + I + R
dstate_dt = list()
dstate_dt.append(- (r_0 / t_i) * (I / N) * S - n_j_i + (1 - ((E + I) / N)) * n_i_j)
dstate_dt.append((r_0 / t_i) * (I / N) * S - E / t_e + ((E + I) / N) * n_i_j)
dstate_dt.append(E / t_e - I / t_i)
dstate_dt.append((1 - f) * I / t_i)
dstate_dt.append(f * I / t_i)
return dstate_dt
def _calc_neg_log_likelihood_r0(self, r_0, X):
"""対数尤度(R_0に関係ある部分のみ)を計算する
"""
solution = self.solve_ode(len_days=len(X), params=dict(r_0=r_0))
lambda_arr = solution[int(1/self.dt)-1::int(1/self.dt), 2] # I
return - np.sum(- lambda_arr + X * np.log(lambda_arr))
def _calc_error(self, r_0, X, metric=mean_absolute_error):
"""平均絶対誤差、平均二乗誤差を計算する
"""
solution = self.solve_ode(len_days=len(X), params=dict(r_0=r_0))
e_arr = solution[int(1/self.dt)-1::int(1/self.dt), 2] # I
return metric(X, e_arr)
def exec_point_estimation(self, init_r_0, X, project='mle'):
"""パラメータを点推定する
"""
if project == 'mle':
result = minimize(self._calc_neg_log_likelihood_r0, init_r_0, args=(X,), method='Nelder-Mead')
elif project == 'lad':
result = minimize(self._calc_error, init_r_0, args=(X, mean_absolute_error), method='Nelder-Mead')
elif project == 'ls':
result = minimize(self._calc_error, init_r_0, args=(X, mean_squared_error), method='Nelder-Mead')
else:
print(f'Invalid project: {project}')
return None
if self.r_0 is None:
self.r_0 = result.x[0]
return result
def exec_map(self):
"""MAP推定
"""
pass
def exec_mcmc(self):
"""MCMC法
"""
pass
if __name__ == '__main__':
# 1/16 から 3/8 までの感染者数
X = np.array([
1, 1, 1, 1, 1, 1, 1, 1, 2, 3, # 1/25
4, 4, 7, 8, 14, 17, 20, 20, 20, 23, # 2/4
35, 45, 86, 90, 96, 161, 163, 203, 251, 259, # 2/14
338, 414, 520, 616, 705, 728, 743, 743, 838, 851, # 2/24
862, 891, 919, 938, 947, 961, 980, 999, 1035, 1056, # 3/5
1113, 1157, 1190 # 3/8
])
model = CustomizedSEIRD()
result = model.exec_point_estimation(init_r_0=1, X=X, project='mle')
print(result.x[0])