Simulating Arrival Times for a Non-homogeneous Poisson Process

1. Intro

We provide an algorithm to generate arrival times of a non-homogeneous Poisson process (nhpp) when the rate function \(\lambda(s)\) is piecewise constant. Specifically, we assume given two sequences \(\{\lambda_{j}\}_{j=0}^{\infty}\) and \(\{s_{j}\}_{j=0}^{\infty}\), with \(s_{0}=0\), the \(s_{j}\) strictly increasing, and \(s_{k}\to \infty\) as \(k\to\infty\), and then we take \(\lambda(s) = \lambda_{j}\) for \(s\in [s_{j}, s_{j+1}]\). An example would be a daily scheme of arrival rates of patients per hour at a hospital, where \(\lambda_{0} = 1\) from \([0, 8)\), \(\lambda_{1} = 10\) from \([8, 12)\), \(\lambda_{2} = 8\) from \([12, 18)\) and \(\lambda_{4}=3\) from \([18, 24)]\), update \(\{s_{j}\}\) accordingly).

2. The Maths

One way to simulate a non-homogenous Poisson process with piece-wise constant rates is as follows.

  1. For each interval \([s_j, s_{j+1}]\) compute the duration \(t_j = s_{j+1}-s_j\).
  2. Generate a random deviate \(R_j \sim \Pois{\lambda_j t_j}\).
  3. Generate \(R_j\) uniform deviates \(U_i \sim \Unif{[s_j, s_{j+1}]}\).
  4. The uniform deviates form the arrivals times on \([s_j, s_{j+1}]\).

This procedure is easy, but the computation of a Poisson distributed random deviate is somewhat costly.

Perhaps the next procedure is more efficient–I have not tested it, so I don't know. The paper Generating Nonhomogeneous Poisson Processes by Raghu Pasupathy explains that we need to find the smallest \(t\) such that \(\Lambda (s, t) = \int_{s}^{t} \lambda(y)\d y =x \), where \(x = -\log u\) and \(u\sim\Unif{[0.1]}\).

2.1. A constant rate throughout

If the rate \(\lambda(x) \equiv \lambda\) is contant throughout, finding such \(t\) is simple because \(\Lambda(s, t) = \lambda (t-s)\) so that \(t= s + s/\lambda\). This gives the following algorithm:

  1. \(s = 0\).
  2. \(\mineq \log(u)/\lambda, \quad u \sim \Unif{[0,1]}\).
  3. Yield \(s\) and return to step 2.

2.2. Piecewise Constant

Let us generalize this proceduce so that we can deal with piecewise constant \(\lambda(\cdot)\). Noting that the sequence \(\{s_{j}\}\) places a grid on \(\R^{+}\), we define

\begin{align*} j &= \max\{i: s_{i} \leq s\}, & k &=\min\left\{i : \Lambda(s, s_{i}) >x\right\}, \end{align*}

i.e., \(s_{j}\) is the point on the grid just below \(s\), and \(s_{k}\) is the smallest point on the grid such that the integral \(\Lambda(s, s_{k})\) exceeds \(x\). For the algorithm, if \(s+x/\lambda_{j}< s_{j+1}\), then we can set \(s += x/\lambda_{j}\), and return \(s\) as the next arrival time. However, if if \(s+x/\lambda_{j}\geq s_{j+1}\), we use the following idea:

\begin{equation*} x = \Lambda(s, t) = \Lambda(s_{j}, s_{k}) - \Lambda(s_{j}, s) - \Lambda(t, s_{k}). \end{equation*}

Since \(\lambda(\cdot)\) is \(\lambda_{j}\) on \([s_{j}, s]\) and \(\lambda(\cdot) = \lambda_{k-1}\) on \([t, s_{k})\),

\begin{align*} \Lambda(s_{j}, s) &= \lambda_{j}(s-s_{k}), & \Lambda(t, s_{k}) = \lambda_{k-1} (s_{k}-t). \end{align*}

Thus, if we have \(s_{j}\) and \(s_{k}\), the next arrival time \(t\) after \(s\) is easily solved from the equation

\begin{equation*} \lambda_{k-1}(s_{k} -t ) = x + \Lambda(s_{j}, s) - \Lambda(s_{j}, s_{k}). \end{equation*}

That is,

\begin{equation}\label{eq:nh8} t = s_{k } - (\Lambda(s_{j}, s_{k}) - x - \Lambda(s_{j}, s) )/\lambda_{k-1}. \end{equation}

3. The Algorithm

With this idea, we obtain the following algorithm. Assume that \(\lambda_{0} > 0\) (Otherwise search for the first \(\lambda_{j}>0\), and start from there).

  1. \(s=0\), \(j=0\)
  2. \(s \mineq \log(u)/\lambda_{j}\)
  3. If \(s < s_{j+1}\), yield \(s\) and return to step 2, else
  4. \(T = \lambda_{j}(s_{j+1}-s)\).
  5. While \(T<0\): \(j \pluseq 1\), \(T \pluseq \lambda_{j}(s_{j+1}-s_{j})\).
  6. \(s = s_{j+1} - T/\lambda_{j}\).
  7. Yield \(s\) and return to step 2.

4. The Python <Code

from typing import Iterator

import numpy as np


def nhpp_range(
    start: float,
    end: float,
    times: Iterator[float],
    rates: Iterator[float],
    gen: np.random.Generator = np.random.default_rng(3),
) -> Iterator[float]:
    rate = next(rates)
    down, up = next(times), next(times)
    while np.isclose(rate, 0):
        rate = next(rates)
        down, up = up, next(times)
    now = max(start, down)
    while True:
        now += gen.exponential() / rate
        T = rate * (up - now)
        while T < 0:
            rate = next(rates)
            down, up = up, next(times)
            T += rate * (up - down)
        now = up - T / rate
        if now > end:
            break
        yield now

5. The Tests

from datetime import datetime, timedelta
from itertools import cycle, accumulate
from collections import Counter
import numpy as np
from nhpp import nhpp_range


def test_floats():
    gen = np.random.default_rng(3)

    start = 0
    end = 40
    periods = [1, 1, 3]
    rates = [0, 1, 10]

    times = []
    counter = Counter()
    counter[0] = 0
    for time in nhpp_range(
        start, end, accumulate(cycle(periods), initial=start), cycle(rates)
    ):
        times.append(time)
        counter[int(time) % 5] += 1
    print(counter)

    A = np.array(times)
    X = A[1:] - A[:-1]
    print(X.mean(), X.std())
    theoretical_mean_rate = sum(p * r for r, p in zip(rates, periods)) / sum(
        periods
    )
    print(1 / theoretical_mean_rate)


def test_datetimes():
    start = datetime(2023, 8, 9, 10, 0)
    end = datetime(2023, 8, 10, 18, 0)
    rates = list(range(13))
    periods = [2] * len(rates)

    time = start
    for delta in nhpp_range(
        0,
        (end - start).total_seconds() / 3600,
        accumulate(cycle(periods), initial=0),
        cycle(rates),
    ):
        time += timedelta(hours=delta)
        print(time)
    print(start, time)


if __name__ == "__main__":
    test_floats()
    test_datetimes()