Magicode logo
Magicode
6
11 min read

新型コロナの国内感染者数を数理モデルで予測してみた!

概要

感染者数を数理モデルで予測しているThe Lancetの論文[^1]を見つけたので、日本国内の感染者数を予測するために数理モデルを実装し他の感染症と比較してみた。

まとめ

(あくまで今回の分析の仮定のもとで得られた内容です。)

  • SEIRモデルで感染者数を予測した。
  • 新型コロナの基本再生産数は約2.9でインフルエンザより少し高い。
  • 感染対策の効果が無い場合は国内感染者数のピークはGW前後。
  • 今後1年の国内死亡者数は1万人強(致死率0.01%の場合)。
  • 外国人の入国制限は無意味。

目次

1. 今回の目的

  1. 新型コロナウイルス(COVID-19, 以下新型コロナ)の今後の国内感染者数を数理モデルで予測する。

  2. 新型コロナの基本再生産数を他の感染症と比較する。

  3. Scipyを用いた科学計算の実装。

2. 感染症数理モデル

「感染症がどのように伝播し、感染した人がどの程度の期間で発症し、重症化するのかといったプロセスを数式で記述する。」[^2]

感染症数理モデルでは、人口を感染の段階などで区分しそれぞれの集団の増減を微分方程式で表します。 単純なSEIRモデルと今回使う拡張版を説明します。

2.1. SEIRモデル

  • SS: 免疫を持っていない人数(Susceptible)
  • EE: 暴露後期間の人数(Exposed)
  • II: 感染性期間の人数(Infectious)
  • RR: 感染症から回復し免疫を獲得した人数(Recovered)
  • β\beta: 感染率, γ\gamma: 回復(隔離)率, σ\sigma: 感染性を得る確率, NN: 総人口

とすると、以下の連立微分方程式が得られます。

dS(t)dt=βI(t)NS(t)dE(t)dt=βI(t)NS(t)σE(t)dI(t)dt=σE(t)γI(t)dR(t)dt=γI(t)N=S(t)+E(t)+I(t)+R(t)\begin{align} \frac{dS(t)}{dt} &= -\beta \frac{I(t)}{N}S(t) \\ \frac{dE(t)}{dt} &= \beta \frac{I(t)}{N}S(t) - \sigma E(t) \\ \frac{dI(t)}{dt} &= \sigma E(t) - \gamma I(t) \\ \frac{dR(t)}{dt} &= \gamma I(t) \\ N &= S(t) + E(t) + I(t) + R(t) \end{align}

ただし、出生率と死亡率は無視し、発症後は必ず回復し免疫を獲得するため二度と感染しないとしています。

適当な初期値を与えて解けば、将来の感染者数が予想できます。(参考に図[^3]を載せておきます)

詳しい説明や実装はqiitaの記事[^4]が参考になります。

ここで、R0=β/γR_0=\beta / \gammaは基本再生産数と呼ばれ、「(全ての個体が初期に感受性を 有する状態で)1 人の感染者当たりが生産する 2 次感染者数」を意味します。[^5] 要はR0R_0が大きいほど感染力が大きいということ。

R0R_0を用いれば、予防接種をどれくら普及させれば感染症の流行が防げるか計算することもできます。

SEIR以外にも、全人口を居住地や年齢で細分化したり、感染症の特徴を微分方程式に反映させることで様々なモデルを作成できます。例えば、Eを考慮しないSIRや免疫獲得を考慮しないSEIS、受動免疫を考慮したMSEIRなどがあります[^6]。

2.2. 拡張版SEIRDモデル

上記のSEIRは単純すぎるので、今回は論文[^1]を参考に外国との人口の流出入や死亡率を考慮したモデルに拡張します。

  • R0R_0: 基本再生産数(R0=β/γR_0=\beta / \gamma)
  • TET_E: 平均暴露後期間(日)(γ1=TI\gamma^{-1} = T_I)
  • TIT_I: 平均感染性期間(日)(σ1=TE\sigma^{-1} = T_E)
  • NJ,IN_{J,I}: 日本から海外への渡航者数(人/日)
  • NI,JN_{I,J}: 海外から日本への渡航者数(人/日)
  • ff: 致死率
  • D(t)D(t): 死亡者数(人)

とすると、同じく以下の連立微分方程式が得られます。

dS(t)dt=R0DII(t)N(t)S(t)NJ,I+(1E(t)+I(t)N(t))NI,JdE(t)dt=R0DII(t)N(t)S(t)E(t)TE+E(t)+I(t)N(t)NI,JdI(t)dt=E(t)TEI(t)TIdR(t)dt=(1f)I(t)TIdD(t)dt=fI(t)TIN(t)=S(t)+E(t)+I(t)+R(t)\begin{align} \frac{dS(t)}{dt} &= - \frac{R_0}{D_I} \frac{I(t)}{N(t)} S(t) - N_{J,I} + \left(1 - \frac{E(t) + I(t)}{N(t)} \right)N_{I,J} \\ \frac{dE(t)}{dt} &= \frac{R_0}{D_I} \frac{I(t)}{N(t)} S(t) - \frac{E(t)}{T_E} + \frac{E(t) + I(t)}{N(t)} N_{I,J}\\ \frac{dI(t)}{dt} &= \frac{E(t)}{T_E} - \frac{I(t)}{T_I}\\ \frac{dR(t)}{dt} &= (1-f) \frac{I(t)}{T_I}\\ \frac{dD(t)}{dt} &= f \frac{I(t)}{T_I} \\ N(t) &= S(t) + E(t) + I(t) + R(t) \end{align}

ただし、NJ,IN_{J,I}NI,JN_{I,J}について、日本から海外に行く人は全員SS、海外から日本に来る人はS,ES, Eに属しているとします。

論文[^1]と同じようにR0R_0以外のパラメータは適当な値で代用します。TI,TET_I, T_EはSARSコロナを参考に、TE=6.0,TI=2.4T_E=6.0, T_I=2.4としました。NJ,I,NI,JN_{J, I}, N_{I, J}は2019年2月の旅行者数を参考にNJ,I=40000,NI,J=70000N_{J, I}=40000, N_{I, J}=70000としました。

3. パラメータ推定

モデルの未知パラメータはR0R_0です。推定方法は最小二乗法、最尤推定、MAP推定、ベイズ推定など色々あります[^7]が、今回は非定常ポアソン過程[^8]として尤度を求め最尤推定でR0R_0を求めます。

3.1. 最尤推定

みんな大好き最尤推定。まずは観測したデータが発生する確率(尤度)をR0R_0の関数として表現し、それが最大となるようなR0R_0を求めます。論文[^1]を参考に感染者数を非定常ポアソン過程としてモデル化して尤度を求めると、

L(R0)=d=DsDeeλdλdxdxd!L(R_0) = \prod_{d=D_s}^{D_e} \frac{e^{-\lambda_d}\lambda_d^{x_d}}{x_d!}

ただし、DsD_s: 観測初日、DsD_s: 観測最終日、xdx_d: ddでの感染者数で、

λ(t,R0)=E(t)+I(t)λd(R0)=d1dλ(u,R0)du\lambda (t, R_0) = E(t) + I(t) \\ \lambda _d (R_0) = \int_{d-1}^{d} \lambda (u, R_0) du

とします。λ\lambdaは感染者数がポアソン分布に従うとした時の平均を表します。

対数尤度からR0R_0に無関係な項を除いて、R0R_0は以下で推定できます。

arg minR0>0logL(R0)=arg minR0>0d=DsDe(λd(R0)xdlogλd(R0))\argmin_{R_0 > 0} - \log L(R_0) = \argmin_{R_0 > 0} \sum_{d=D_s}^{D_e} (\lambda_d(R_0) - x_d \log \lambda_d(R_0))

4. pythonで実装と図示

import scipy
from scipy.integrate import odeint
from scipy.optimize import minimize
from sklearn.metrics import mean_absolute_error, mean_squared_error


class SIR(object):

    def __init__(self, r_0, t_i, dt, init_S, init_I, init_R):
        self.r_0 = r_0
        self.t_i = t_i
        self.dt = dt
        self.init_state_list = [init_S, init_I, init_R]

    def _get_param(self, params, key, default, t=None):
        """パラメータの時間変化に対応する
        """
        if isinstance(params, dict):
            if key in list(params.keys()):
                param = params[key]
                if isinstance(param, list):
                    return param[np.clip(int(t / self.dt), 0, len(param)-1)]
                elif isinstance(param, np.ndarray):
                    return param
                else:
                    return default
            else:
                return default
        else:
            return default

    def _ode(self, state_list, t=None, params=None):
        """連立微分方程式を定義する
        """
        r_0 = self._get_param(params, 'r_0', self.r_0, t=t)
        t_i = self._get_param(params, 't_i', self.t_i, t=t)
        S, I, R = state_list
        N = S + I + R
        dstate_dt = list()
        dstate_dt[0] = - (r_0 / t_i) * (I / N) * S
        dstate_dt[1] = (r_0 / t_i) * (I / N) * S - I / t_i
        dstate_dt[2] = I / t_i
        return dstate_dt

    def solve_ode(self, len_days=365, params=None):
        """微分方程式を解く
        """
        t = np.linspace(0, len_days, int(len_days / self.dt), endpoint=False)
        args = (params,) if params else ()
        return odeint(self._ode, self.init_state_list, t, args=args)


class CustomizedSEIRD(SIR):

    def __init__(self, r_0=None, t_e=6.0, t_i=2.4, n_i_j=70000, n_j_i=40000, f=0.0001, dt=1,
            init_S=126800000, init_E=0, init_I=1, init_R=0, init_D=0):
        self.r_0 = r_0
        self.t_e = t_e
        self.t_i = t_i
        self.n_i_j = n_i_j
        self.n_j_i = n_j_i
        self.f = f
        self.dt = dt
        self.init_state_list = [init_S, init_E, init_I, init_R, init_D]

    def _ode(self, state_list, t=None, params=None):
        """連立微分方程式を定義する
        """
        r_0 = self._get_param(params, 'r_0', self.r_0, t=t)
        t_e = self._get_param(params, 't_e', self.t_e, t=t)
        t_i = self._get_param(params, 't_i', self.t_i, t=t)
        n_i_j = self._get_param(params, 'n_i_j', self.n_i_j, t=t)
        n_j_i = self._get_param(params, 'n_j_i', self.n_j_i, t=t)
        f = self._get_param(params, 'f', self.f)
        S, E, I, R, D = state_list
        N = S + E + I + R
        dstate_dt = list()
        dstate_dt.append(- (r_0 / t_i) * (I / N) * S - n_j_i + (1 - ((E + I) / N)) * n_i_j)
        dstate_dt.append((r_0 / t_i) * (I / N) * S - E / t_e + ((E + I) / N) * n_i_j)
        dstate_dt.append(E / t_e - I / t_i)
        dstate_dt.append((1 - f) * I / t_i)
        dstate_dt.append(f * I / t_i)
        return dstate_dt

    def _calc_neg_log_likelihood_r0(self, r_0, X):
        """対数尤度(R_0に関係ある部分のみ)を計算する
        """
        solution = self.solve_ode(len_days=len(X), params=dict(r_0=r_0))
        lambda_arr = solution[int(1/self.dt)-1::int(1/self.dt), 2] # I
        return - np.sum(- lambda_arr + X * np.log(lambda_arr))
    
    def _calc_error(self, r_0, X, metric=mean_absolute_error):
        """平均絶対誤差、平均二乗誤差を計算する
        """
        solution = self.solve_ode(len_days=len(X), params=dict(r_0=r_0))
        e_arr = solution[int(1/self.dt)-1::int(1/self.dt), 2]  # I
        return metric(X, e_arr)

    def exec_point_estimation(self, init_r_0, X, project='mle'):
        """パラメータを点推定する
        """
        if project == 'mle':
            result = minimize(self._calc_neg_log_likelihood_r0, init_r_0, args=(X,), method='Nelder-Mead')
        elif project == 'lad':
            result = minimize(self._calc_error, init_r_0, args=(X, mean_absolute_error), method='Nelder-Mead')
        elif project == 'ls':
            result = minimize(self._calc_error, init_r_0, args=(X, mean_squared_error), method='Nelder-Mead')
        else:
            print(f'Invalid project: {project}')
            return None
        if self.r_0 is None:
            self.r_0 = result.x[0]
        return result

    def exec_map(self):
        """MAP推定
        """
        pass

    def exec_mcmc(self):
        """MCMC法
        """
        pass
if __name__ == '__main__':
    # 1/16 から 3/8 までの感染者数
    X = np.array([
        1, 1, 1, 1, 1, 1, 1, 1, 2, 3,  # 1/25
        4, 4, 7, 8, 14, 17, 20, 20, 20, 23,  # 2/4
        35, 45, 86, 90, 96, 161, 163, 203, 251, 259,  # 2/14
        338, 414, 520, 616, 705, 728, 743, 743, 838, 851, # 2/24
        862, 891, 919, 938, 947, 961, 980, 999, 1035, 1056, # 3/5
        1113, 1157, 1190 # 3/8
    ])
    model = CustomizedSEIRD()
    result = model.exec_point_estimation(init_r_0=1, X=X, project='mle')
    print(result.x[0])

Discussion

コメントにはログインが必要です。