Simulating a Non-homogeneous Poisson Process
1. Intro
We provide an algorithm to generate arrival times of a non-homogeneous Poisson process (nhpp) when the rate function \(\lambda(s)\) is piecewise constant. Specifically, we assume given two sequences \(\{\lambda_{j}\}_{j=0}^{\infty}\) and \(\{s_{j}\}_{j=0}^{\infty}\), with \(s_{0}=0\), the \(s_{j}\) strictly increasing, and \(s_{k}\to \infty\) as \(k\to\infty\), and then we take \(\lambda(s) = \lambda_{j}\) for \(s\in [s_{j}, s_{j+1}]\). An example would be a daily scheme of arrival rates of patients per hour at a hospital, where \(\lambda_{0} = 1\) from \([0, 8)\), \(\lambda_{1} = 10\) from \([8, 12)\), \(\lambda_{2} = 8\) from \([12, 18)\) and \(\lambda_{4}=3\) from \([18, 24)]\), update \(\{s_{j}\}\) accordingly).
2. The Maths
One way to simulate a non-homogenous Poisson process with piece-wise constant rates is as follows.
- For each interval \([s_j, s_{j+1}]\) compute the duration \(t_j = s_{j+1}-s_j\).
- Generate a random deviate \(R_j \sim \Pois{\lambda_j t_j}\).
- Generate \(R_j\) uniform deviates \(U_i \sim \Unif{[s_j, s_{j+1}]}\).
- The uniform deviates form the arrivals times on \([s_j, s_{j+1}]\).
This procedure is easy, but the computation of a Poisson distributed random deviate is somewhat costly.
Perhaps the next procedure is more efficient–I have not tested it, so I don’t know. The paper Generating Nonhomogeneous Poisson Processes by Raghu Pasupathy explains that we need to find the smallest \(t\) such that \(\Lambda (s, t) = \int_{s}^{t} \lambda(y)\d y =x \), where \(x = -\log u\) and \(u\sim\Unif{[0.1]}\).
2.1. A constant rate throughout
If the rate \(\lambda(x) \equiv \lambda\) is contant throughout, finding such \(t\) is simple because \(\Lambda(s, t) = \lambda (t-s)\) so that \(t= s + s/\lambda\). This gives the following algorithm:
- \(s = 0\).
- \(s \mineq \log(u)/\lambda, \quad u \sim \Unif{[0,1]}\).
- Yield \(s\) and return to step 2.
2.2. Piecewise Constant
Let us generalize this proceduce so that we can deal with piecewise constant \(\lambda(\cdot)\). Noting that the sequence \(\{s_{j}\}\) places a grid on \(\R^{+}\), we define
\begin{align*} j &= \max\{i: s_{i} \leq s\}, & k &=\min\left\{i : \Lambda(s, s_{i}) >x\right\}, \end{align*}i.e., \(s_{j}\) is the point on the grid just below \(s\), and \(s_{k}\) is the smallest point on the grid such that the integral \(\Lambda(s, s_{k})\) exceeds \(x\). For the algorithm, if \(s+x/\lambda_{j}< s_{j+1}\), then we can set \(s += x/\lambda_{j}\), and return \(s\) as the next arrival time. However, if if \(s+x/\lambda_{j}\geq s_{j+1}\), we use the following idea:
\begin{equation*} x = \Lambda(s, t) = \Lambda(s_{j}, s_{k}) - \Lambda(s_{j}, s) - \Lambda(t, s_{k}). \end{equation*}Since \(\lambda(\cdot)\) is \(\lambda_{j}\) on \([s_{j}, s]\) and \(\lambda(\cdot) = \lambda_{k-1}\) on \([t, s_{k})\),
\begin{align*} \Lambda(s_{j}, s) &= \lambda_{j}(s-s_{k}), & \Lambda(t, s_{k}) = \lambda_{k-1} (s_{k}-t). \end{align*}Thus, if we have \(s_{j}\) and \(s_{k}\), the next arrival time \(t\) after \(s\) is easily solved from the equation
\begin{equation*} \lambda_{k-1}(s_{k} -t ) = x + \Lambda(s_{j}, s) - \Lambda(s_{j}, s_{k}). \end{equation*}That is,
\begin{equation}\label{eq:nh8} t = s_{k } - (\Lambda(s_{j}, s_{k}) - x - \Lambda(s_{j}, s) )/\lambda_{k-1}. \end{equation}3. The Algorithm
With this idea, we obtain the following algorithm. Assume that \(\lambda_{0} > 0\) (Otherwise search for the first \(\lambda_{j}>0\), and start from there).
- \(s=0\), \(j=0\)
- \(s \mineq \log(u)/\lambda_{j}\)
- If \(s < s_{j+1}\), yield \(s\) and return to step 2, else
- \(T = \lambda_{j}(s_{j+1}-s)\).
- While \(T<0\): \(j \pluseq 1\), \(T \pluseq \lambda_{j}(s_{j+1}-s_{j})\).
- \(s = s_{j+1} - T/\lambda_{j}\).
- Yield \(s\) and return to step 2.
4. The Python <Code
from typing import Iterator
import numpy as np
def nhpp_range(
start: float,
end: float,
times: Iterator[float],
rates: Iterator[float],
gen: np.random.Generator = np.random.default_rng(3),
) -> Iterator[float]:
rate = next(rates)
down, up = next(times), next(times)
while np.isclose(rate, 0):
rate = next(rates)
down, up = up, next(times)
now = max(start, down)
while True:
now += gen.exponential() / rate
T = rate * (up - now)
while T < 0:
rate = next(rates)
down, up = up, next(times)
T += rate * (up - down)
now = up - T / rate
if now > end:
break
yield now
5. The Tests
from datetime import datetime, timedelta
from itertools import cycle, accumulate
from collections import Counter
import numpy as np
from nhpp import nhpp_range
def test_floats():
gen = np.random.default_rng(3)
start = 0
end = 40
periods = [1, 1, 3]
rates = [0, 1, 10]
times = []
counter = Counter()
counter[0] = 0
for time in nhpp_range(
start, end, accumulate(cycle(periods), initial=start), cycle(rates)
):
times.append(time)
counter[int(time) % 5] += 1
print(counter)
A = np.array(times)
X = A[1:] - A[:-1]
print(X.mean(), X.std())
theoretical_mean_rate = sum(p * r for r, p in zip(rates, periods)) / sum(
periods
)
print(1 / theoretical_mean_rate)
def test_datetimes():
start = datetime(2023, 8, 9, 10, 0)
end = datetime(2023, 8, 10, 18, 0)
rates = list(range(13))
periods = [2] * len(rates)
time = start
for delta in nhpp_range(
0,
(end - start).total_seconds() / 3600,
accumulate(cycle(periods), initial=0),
cycle(rates),
):
time += timedelta(hours=delta)
print(time)
print(start, time)
if __name__ == "__main__":
test_floats()
test_datetimes()