3

Python implementation of “An algorithm for the binomial distribution with depend...

 2 years ago
source link: https://numbersmithy.com/python-implementation-for-an-algorithm-for-the-binomial-distribution-with-dependent-trials/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

4. Python implementation

4.1. Recursive version

It is noticed that (5) is written in a recursive manner, therefore it is most natural to code it using also recursive functions. Below is the Python implementation.

Firstly, the pmf() function that computes the probability mass function:

import numpy as np

def pmf(j, k, p, phh=None, phm=None):
    '''Compute the probability mass function binomial distr with dependent
    trials, recursive version

    Args:
        j (int): number of successes.
        k (int): number of trials.
        p (float): probability of any trial being success.
    Keyword Args:
        phh (float): probability of success given its previous trial is success.
        phm (float): probability of success given its previous trial is fail.
            <phh> and <phm> must be provided at least one. If both are given, <phm>
            is computed from <phh>.
    Returns:
        res (float): probability of j successes in k trials.
    '''
    assert j >= 0, "<j> must >= 0."
    assert k >= 0, "<k> must >= 0."
    assert p >= 0, "<p> must >= 0."

    if j == 0 and k == 1:
        return 1 - p
    if j == 1 and k == 1:
        return p

    if phh is None and phm is None:
        raise Exception("Either <phh> or <phm> must be provided.")

    if phh is None:
        phh = 1 - (1-p) / p * phm
    else:
        phm = (p - p*phh) / (1 - p)

    pmh = 1 - phh
    pmm = 1 - phm
    f_jk = compute_jkf(j, k, p, pmm, pmh)
    s_jk = compute_jks(j, k, p, phh, phm)
    res = f_jk + s_jk

    return res

The pmf() function calls 2 additional functions:

  • compute_jkf(): computes the ηj,K term.
  • compute_jks(): computes the ξj,K term.

These 2 are the recursive functions:

from functools import lru_cache

@lru_cache(maxsize=1024)
def compute_jkf(j, k, p, pmm, pmh):
    '''Compute the probablity of j successes in k trials, with the kth one fails

    Args:
        j (int): number of successes.
        k (int): number of trials.
        p (float): probability of any trial being success.
        pmm (float): probability of fail given its previous trial is fail.
        pmh (float): probability of fail given its previous trial is success.
    Returns:
        res (float): probability of j successes in k trials, with the kth one fails.
    '''

    if j == 0 and k == 1:
        return 1 - p
    if j == 1 and k == 1:
        return 0
    if j == 0:
        return (1 - p) * pmm**(k-1)
    if j == k:
        return 0

    phh = 1 - pmh
    phm = (p - p*phh) / (1 - p)
    return compute_jks(j, k-1, p, phh, phm) * pmh + compute_jkf(j, k-1, p, pmm, pmh) * pmm

@lru_cache(maxsize=1024)
def compute_jks(j, k, p, phh, phm):
    '''Compute the probablity of j successes in k trials, with the kth one succeeds

    Args:
        j (int): number of successes.
        k (int): number of trials.
        p (float): probability of any trial being success.
        phh (float): probability of success given its previous trial is success.
        phm (float): probability of success given its previous trial is fail.
    Returns:
        res (float): probability of j successes in k trials, with the kth one succeeds.
    '''

    if j == 0 and k == 1:
        return 0
    if j == 1 and k == 1:
        return p
    if j == 0:
        return 0
    if j == k:
        return p * phh**(k-1)

    pmh = 1 - phh
    pmm = 1 - phm

    return compute_jkf(j-1, k-1, p, pmm, pmh) * phm + compute_jks(j-1, k-1, p, phh, phm) * phh

Note that we are using the lru_cache decorators to cache intermediate results. I have another post that talks a bit more on caching and lazy evaluation.

Now a simple test run: when Phh=0.5, i.e. one successful trial infers no bias for the next one, it should fall back to a binomial distribution. So we compare the result with that from scipy.stats.binom:

K = 100
j = 50
p = 0.5
phh = 0.5

prob1 = pmf(j, K, p, phh)
print(prob1)

from scipy.stats import binom
prob2 = binom.pmf(j, K, p)
print(prob2)

The output:

0.0795892373871788
0.07958923738717875

The above recursive implementation works all fine for relatively small-sized problems, but will fail once the K number goes large.

This is because Python’s recursion depth is quite limited, to about 1000. Recursive functions (including tail-recursions) will fail if the stack is full. See the below example call:

# large scale, recursive version
K = 1000
j = 5
p = 0.5
phh = 0.5

prob = pmf(j, K, p, phh)
print(prob)

Running the above snippet will trigger the following error:

Traceback (most recent call last):
  File "dependent_binomial.py", line 318, in <module>
    prob4 = pmf(j, K, p, phh)
  File "dependent_binomial.py", line 52, in pmf
    f_jk = compute_jkf(j, k, p, pmm, pmh)
  File "dependent_binomial.py", line 84, in compute_jkf
    return compute_jks(j, k-1, p, phh, phm) * pmh + compute_jkf(j, k-1, p, pmm, pmh) * pmm
  File "dependent_binomial.py", line 112, in compute_jks
    return compute_jkf(j-1, k-1, p, pmm, pmh) * phm + compute_jks(j-1, k-1, p, phh, phm) * phh
  File "dependent_binomial.py", line 84, in compute_jkf
    return compute_jks(j, k-1, p, phh, phm) * pmh + compute_jkf(j, k-1, p, pmm, pmh) * pmm
  File "dependent_binomial.py", line 112, in compute_jks
    return compute_jkf(j-1, k-1, p, pmm, pmh) * phm + compute_jks(j-1, k-1, p, phh, phm) * phh
  File "dependent_binomial.py", line 84, in compute_jkf
    return compute_jks(j, k-1, p, phh, phm) * pmh + compute_jkf(j, k-1, p, pmm, pmh) * pmm
  File "dependent_binomial.py", line 112, in compute_jks
    return compute_jkf(j-1, k-1, p, pmm, pmh) * phm + compute_jks(j-1, k-1, p, phh, phm) * phh
  File "dependent_binomial.py", line 84, in compute_jkf
    return compute_jks(j, k-1, p, phh, phm) * pmh + compute_jkf(j, k-1, p, pmm, pmh) * pmm
  File "dependent_binomial.py", line 112, in compute_jks
    return compute_jkf(j-1, k-1, p, pmm, pmh) * phm + compute_jks(j-1, k-1, p, phh, phm) * phh
  File "dependent_binomial.py", line 84, in compute_jkf
    return compute_jks(j, k-1, p, phh, phm) * pmh + compute_jkf(j, k-1, p, pmm, pmh) * pmm
  File "dependent_binomial.py", line 84, in compute_jkf
    return compute_jks(j, k-1, p, phh, phm) * pmh + compute_jkf(j, k-1, p, pmm, pmh) * pmm
  File "dependent_binomial.py", line 84, in compute_jkf
    return compute_jks(j, k-1, p, phh, phm) * pmh + compute_jkf(j, k-1, p, pmm, pmh) * pmm
  [Previous line repeated 486 more times]
  File "dependent_binomial.py", line 112, in compute_jks
    return compute_jkf(j-1, k-1, p, pmm, pmh) * phm + compute_jks(j-1, k-1, p, phh, phm) * phh
  File "dependent_binomial.py", line 72, in compute_jkf
    if j == 0 and k == 1:
RecursionError: maximum recursion depth exceeded in comparison

It is possible to enlarge this maximum recursion depth. But I will illustrate another way, which is to change the recursive calls into for loops.

4.2. Non-recursive version

Referring back to the schematic in Figure 1, it is noticed that to get to a certain point Φj,K on the graph, it works from the left most end to the right.

We can re-draw the schematic graph in a slightly different way, as shown in Figure 2 below:

Figure 2: Similar solution schematic in Figure 1 drawn on a regular grid.

We put all the target nodes of Φj,K on a regular grid, and draw the ηj,K transitions as blue, horizontal edges, and ξj,K transitions as red, diagonal edges.

From Equations (5), it is further noticed that we only take a top-down step in the computation of ξj,K=ηj−1,K−1Phm+ξj−1,K−1Phh. The computation of ηj,K=ξj,KPmh+ηj,K−1Pmm then only takes a left-right step from K−1 to K.

So, we will be building 2 2D arrays, one for η and one for ξ terms, both with only the upper triangle filled. j will be the row index, and K the column index.

Here is the non-recursive version:

def compute_jksf(j, k, p, pmm, pmh, phh, phm):
    '''Compute the transition probabilities leading to the prob of j successes in k trials

    Args:
        j (int): number of successes.
        k (int): number of trials.
        p (float): probability of any trial being success.
        pmm (float): probability of fail given its previous trial is fail.
        pmh (float): probability of fail given its previous trial is success.
        phh (float): probability of success given its previous trial is success.
        phm (float): probability of success given its previous trial is fail.
    Returns:
        farr (ndarray): 2d array with shape (k+1, k+1), the (j,k) element is the
            probability of j successes in k trials and the kth one is fail.
        sarr (ndarray): 2d array with shape (k+1, k+1), the (j,k) element is the
            probability of j successes in k trials and the kth one is success.
    '''

    farr = np.zeros([k+1, k+1])
    sarr = np.zeros([k+1, k+1])
    for kk in range(1, k+1):
        for jj in range(0, kk+1):
            if jj == 0:
                sjk = 0
                fjk = (1-p) * pmm**(kk-1)
            elif jj == kk:
                sjk = p * phh**(kk-1)
                fjk = 0
            else:
                sjk = farr[jj-1, kk-1] * phm + sarr[jj-1, kk-1]*phh
                fjk = sarr[jj, kk-1] * pmh + farr[jj, kk-1] * pmm
            farr[jj,kk] = fjk
            sarr[jj,kk] = sjk

    return farr, sarr

The ξj,K term is computed using:

sjk = farr[jj-1, kk-1] * phm + sarr[jj-1, kk-1]*phh

And the ηj,K term is computed using

fjk = sarr[jj, kk-1] * pmh + farr[jj, kk-1] * pmm

This compute_jksf() function will be called by a new pmf2() function to compute the probability mass function:

def pmf2(j, k, p, phh=None, phm=None):
    '''Compute the probability mass function binomial distr with dependent
    trials, non-recursive version

    Args:
        j (int): number of successes.
        k (int): number of trials.
        p (float): probability of any trial being success.
    Keyword Args:
        phh (float): probability of success given its previous trial is success.
        phm (float): probability of success given its previous trial is fail.
            <phh> and <phm> must be provided at least one. If both are given, <phm>
            is computed from <phh>.
    Returns:
        res (float): probability of j successes in k trials.
    '''

    assert j >= 0, "<j> must >= 0."
    assert k >= 0, "<k> must >= 0."
    assert p >= 0, "<p> must >= 0."

    if j == 0 and k == 1:
        return 1 - p
    if j == 1 and k == 1:
        return p

    if phh is None and phm is None:
        raise Exception("Either <phh> or <phm> must be provided.")

    if phh is None:
        phh = 1 - (1-p) / p * phm
    else:
        phm = (p - p*phh) / (1 - p)

    pmh = 1 - phh
    pmm = 1 - phm
    f_jk, s_jk = compute_jksf(j, k, p, pmm, pmh, phh, phm)
    res = f_jk[j, k] + s_jk[j, k]

    return res

Now the new pmf2() function is capable of handling large scale problems:

# large scale, non-recursive version
K = 1000
j = 250
p = 0.6
phh = 0.5

prob3 = pmf2(j, K, p, phh)
print(prob3)

The output is:

7.6996966658670975e-289

4.3. Further acceleration by Cython

The above compute_jksf() function is traversing 2D arrays using pure Python, and this will be slow. As the computation has inherent sequential dependence (later terms depend on the earlier ones), this can’t be vectorized.

One easy way to speed up this computation is to write statically typed code using Cython, and import the compiled module.

Here is my Cython implementation, saved into a sub-folder: dependent_binom/dependent_binom.pyx:

cimport cython
import numpy as np
cimport numpy as np
np.import_array()

@cython.boundscheck(False) # turn off bounds-checking for entire function
@cython.wraparound(False)  # turn off negative index wrapping for entire function
def compute_jksf(int j, int k, double p, double pmm, double pmh, double phh, double phm):
    '''Compute the transition probabilities leading to the prob of j successes in k trials

    Args:
        j (int): number of successes.
        k (int): number of trials.
        p (float): probability of any trial being success.
        pmm (float): probability of fail given its previous trial is fail.
        pmh (float): probability of fail given its previous trial is success.
        phh (float): probability of success given its previous trial is success.
        phm (float): probability of success given its previous trial is fail.
    Returns:
        farr (ndarray): 2d array with shape (k+1, k+1), the (j,k) element is the
            probability of j successes in k trials and the kth one is fail.
        sarr (ndarray): 2d array with shape (k+1, k+1), the (j,k) element is the
            probability of j successes in k trials and the kth one is success.
    '''

    cdef int jj, kk
    cdef double sjk, fjk
    cdef np.ndarray[np.double_t, ndim=2] farr = np.zeros([k+1, k+1])
    cdef np.ndarray[np.double_t, ndim=2] sarr = np.zeros([k+1, k+1])

    for kk in range(1, k+1):
        for jj in range(0, kk+1):
            if jj == 0:
                sjk = 0.
                fjk = (1-p) * pmm**(kk-1)
            elif jj == kk:
                sjk = p * phh**(kk-1)
                fjk = 0.
            else:
                sjk = farr[jj-1, kk-1] * phm + sarr[jj-1, kk-1]*phh
                fjk = sarr[jj, kk-1] * pmh + farr[jj, kk-1] * pmm
            farr[jj,kk] = fjk
            sarr[jj,kk] = sjk

    return farr, sarr

Here, we statically type all index variables (e.g. jj, kk) to int, and all probability values/arrays to double. The rest of the code is identical to the pure Python version.

Then I prepare a simple setup.py script, saved into the same dependent_binom/setup.py folder:

from setuptools import setup, Extension
from Cython.Build import cythonize
import numpy

extension = Extension('dependent_binom',
        ['./dependent_binom.pyx',],
        include_dirs=[numpy.get_include()],)

setup(name='dependent_binom', ext_modules=cythonize(extension, annotate=True),
        zip_safe=False)

To compile the code, execute the following command:

python setup.py build_ext --inplace

If everything goes well, this should create a dependent_binom.c and something like dependent_binom.cpython-38-x86_64-linux-gnu.so in the dependent_binom folder.

To use this Cython version:

from dependent_binom import dependent_binom
dependent_binom.compute_jksf(j, k, p, pmm, pmh, phh, phm)

A speed comparison is done:

# compute_jksf, python version
K = 1000
j = 250
p = 0.6
phm = (p - p*phh) / (1 - p)
pmh = 1 - phh
pmm = 1 - phm

t1 = time.time()
farr, sarr = compute_jksf(j, K, p, pmm, pmh, phh, phm)
t2 = time.time()
print('compute_jksf, Python version time:', t2-t1)

# compute_jksf, cython version
t3 = time.time()
farr2, sarr2 = dependent_binom.compute_jksf(j, K, p, pmm, pmh, phh, phm)
t4 = time.time()
print('compute_jksf, cython version time:', t4-t3)
print(np.allclose(farr, farr2))
print(np.allclose(sarr, sarr2))
print((t2-t1) / (t4-t3))

The output:

compute_jksf, Python version time: 0.5290002822875977
compute_jksf, cython version time: 0.0044057369232177734
True
True
120.0707830510309

So about 120x speed up!

4.4. Cumulative distribution function

Having got the probability mass function ready, the CDF function is just a summation of our previous defined farr and sarr arrays, up to the j th row and at the k th column:

def cdf(j, k, p, phh=None, phm=None):
    '''Compute the CDF of binomial distr with dependent trials, non-recursive cython version

    Args:
        j (int): number of successes.
        k (int): number of trials.
        p (float): probability of any trial being success.
    Keyword Args:
        phh (float): probability of success given its previous trial is success.
        phm (float): probability of success given its previous trial is fail.
            <phh> and <phm> must be provided at least one. If both are given, <phm>
            is computed from <phh>.
    Returns:
        res (float): probability of j or fewer successes in k trials.
    '''

    assert j >= 0, "<j> must >= 0."
    assert k >= 0, "<k> must >= 0."
    assert p >= 0, "<p> must >= 0."

    if phh is None:
        phh = 1 - (1-p) / p * phm
    else:
        phm = (p - p*phh) / (1 - p)

    pmh = 1 - phh
    pmm = 1 - phm
    farr, sarr = dependent_binom.compute_jksf(j, k, p, pmm, pmh, phh, phm)
    res = farr[:(j+1), k].sum() + sarr[:(j+1), k].sum()

    return res

Note that the cdf() function is calling our Cython implemented compute_jksf() function. An example:

K = 10
j = 5
p = 0.6
phh = 0.5
out = cdf(j, K, p, phh)
print('Cython version CDF:', out)

The output:

Cython version CDF: 0.3381835937500002

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK