Generation of state space and computing the stationary distribution

This code can be used to compute the steady state distribution of a finite Markov chain. It has been written with two design goals in mind.

  • The user should not need to provide more than the absolute minimal information to generate the chain.
  • The code should be generic and fast, and relatively simple to use.

Below I first discuss the generic code and then I apply it to two examples.

If you need something more extensive, see Gerlach’s implementation at github.

State Space and States

First I build the Markov chain as a directed graph, i.e., as a DiGraph of the networkx package. Then I build the transition matrix based on this graph as a sparse matrix. Hence the following imports.

import networkx as nx
import numpy as np
import scipy.sparse as sp
from scipy.sparse.linalg import eigs

The states are nodes in the graph, transitions are directed edges between states with the transition rate as an attribute. In the building process I use the namedtuple Edge to store the transtions. Here fr is the from-state, to the to-state, and rate the transition rate (not the probability`).

from collections import namedtuple
Edge = namedtuple("Edge", ["fr", "to", "rate"])

Generation of the Reachability Graph

To generate the reachable space of the Markov chain I assume that the method transition is given, see below for an example. The method transition is given a state, and returns a set of Edges. The given state is added to the graph while the set of states in the returned set of Edges not yet in the graph are added to a frontier set. A new state is then popped from the frontier set, to which transitions is applied, and so on, until the frontier set is empty.

eps = 1e-5
maxIterations = int(1e5)

class MarkovChain(nx.DiGraph):
    def generateReachabilityGraph(self, frontier):
        while frontier:
            node = frontier.pop()
            self.add_node(node)
            for edge in self.transitions(node):
                if edge.to not in self:
                    frontier.add(edge.to)
                self.add_edge(node, edge.to, rate=edge.rate)

To make the generator matrix, it is necessary to give each state, i.e., each node in the graph, an index.

    def generateGeneratorMatrix(self):
        for i, n in enumerate(self.nodes()):
            self.nodes[n]['i'] = i
        size = self.number_of_nodes()
        Q = sp.dok_matrix((size, size))  # , dtype=np.float32)
        for (m, n, info) in self.edges(data=True):
            Q[self.nodes[m]['i'], self.nodes[n]['i']] = info["rate"]
        exit_rates = Q.dot(np.ones(size))
        Q.setdiag(-exit_rates)
        return Q

The steady state distribution is solved from the equation $\pi Q = 0$. In the last line I map $\pi_i$ to the state with index $i$.

    def computePi(self):
        Q = self.generateGeneratorMatrix()
        guess = np.ones(self.number_of_nodes(), dtype=np.float32)
        w, v = eigs(
            Q.T.tocsr(),
            k=1,
            v0=guess,
            sigma=1e-6,
            which='LM',
            tol=eps,
            maxiter=maxIterations,
        )
        pi = v[:, 0].real
        pi /= pi.sum()
        self.pi = {n: pi[self.nodes[n]['i']] for n in self.nodes()}

Examples

The M/M/1 queue

class MM1(MarkovChain):
    def __init__(self):
        super().__init__()
        self.N = 10

    def transitions(self, n):
        to = set()
        if n > 0:
            to.add(Edge(n, n - 1, 4))
        if n < self.N:
            to.add(Edge(n, n + 1, 3))
        return to
mm1 = MM1()
mm1.generateReachabilityGraph({0})
mm1.computePi()
print(mm1.pi)
{0: 0.26102440108763486, 1: 0.19576830081572622, 2:
0.14682622561179462, 3: 0.11011966920884596, 4: 0.08258975190663445,
5: 0.06194231392997585, 6: 0.046456735447481856, 7:
0.034842551585611406, 8: 0.02613191368920854, 9: 0.01959893526690641,
10: 0.014699201450179819}

A tandem of two M/M/1 queues

class Tandem(MarkovChain):
    def __init__(self):
        super().__init__()
        self.N1 = 50
        self.N2 = 50
        self.labda = 1.
        self.mu1= 1.05
        self.mu2 = 1.05

    def transitions(self, s):
        to = set()
        n1, n2 = s[:]
        if n1 < self.N1:
            to.add( Edge(s, (n1+1, n2), self.labda) )
        if 0 < n2:
            to.add( Edge(s, (n1, n2-1), self.mu2) )
        if 0 < n1 and n2 < self.N2:
            to.add( Edge(s, (n1-1, n2+1), self.mu1) )
        return to

    def plotPi(self):
        pi = np.zeros([self.N1+1, self.N2+1])
        for n in self.nodes():
            pi[n[0], n[1]] = self.pi[n]
        import matplotlib.pylab as plt
        plt.matshow(pi)
        plt.colorbar()
        plt.show()
tandem = Tandem()
tandem.generateReachabilityGraph({(0,0)})
tandem.computePi()
tandem.plotPi()