Python implementation of “An algorithm for the binomial distribution with depend...
source link: https://numbersmithy.com/python-implementation-for-an-algorithm-for-the-binomial-distribution-with-dependent-trials/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
4. Python implementation
4.1. Recursive version
It is noticed that (5) is written in a recursive manner, therefore it is most natural to code it using also recursive functions. Below is the Python implementation.
Firstly, the pmf()
function that computes the probability mass function:
import numpy as np def pmf(j, k, p, phh=None, phm=None): '''Compute the probability mass function binomial distr with dependent trials, recursive version Args: j (int): number of successes. k (int): number of trials. p (float): probability of any trial being success. Keyword Args: phh (float): probability of success given its previous trial is success. phm (float): probability of success given its previous trial is fail. <phh> and <phm> must be provided at least one. If both are given, <phm> is computed from <phh>. Returns: res (float): probability of j successes in k trials. ''' assert j >= 0, "<j> must >= 0." assert k >= 0, "<k> must >= 0." assert p >= 0, "<p> must >= 0." if j == 0 and k == 1: return 1 - p if j == 1 and k == 1: return p if phh is None and phm is None: raise Exception("Either <phh> or <phm> must be provided.") if phh is None: phh = 1 - (1-p) / p * phm else: phm = (p - p*phh) / (1 - p) pmh = 1 - phh pmm = 1 - phm f_jk = compute_jkf(j, k, p, pmm, pmh) s_jk = compute_jks(j, k, p, phh, phm) res = f_jk + s_jk return res
The pmf()
function calls 2 additional functions:
compute_jkf()
: computes the ηj,K term.compute_jks()
: computes the ξj,K term.
These 2 are the recursive functions:
from functools import lru_cache @lru_cache(maxsize=1024) def compute_jkf(j, k, p, pmm, pmh): '''Compute the probablity of j successes in k trials, with the kth one fails Args: j (int): number of successes. k (int): number of trials. p (float): probability of any trial being success. pmm (float): probability of fail given its previous trial is fail. pmh (float): probability of fail given its previous trial is success. Returns: res (float): probability of j successes in k trials, with the kth one fails. ''' if j == 0 and k == 1: return 1 - p if j == 1 and k == 1: return 0 if j == 0: return (1 - p) * pmm**(k-1) if j == k: return 0 phh = 1 - pmh phm = (p - p*phh) / (1 - p) return compute_jks(j, k-1, p, phh, phm) * pmh + compute_jkf(j, k-1, p, pmm, pmh) * pmm @lru_cache(maxsize=1024) def compute_jks(j, k, p, phh, phm): '''Compute the probablity of j successes in k trials, with the kth one succeeds Args: j (int): number of successes. k (int): number of trials. p (float): probability of any trial being success. phh (float): probability of success given its previous trial is success. phm (float): probability of success given its previous trial is fail. Returns: res (float): probability of j successes in k trials, with the kth one succeeds. ''' if j == 0 and k == 1: return 0 if j == 1 and k == 1: return p if j == 0: return 0 if j == k: return p * phh**(k-1) pmh = 1 - phh pmm = 1 - phm return compute_jkf(j-1, k-1, p, pmm, pmh) * phm + compute_jks(j-1, k-1, p, phh, phm) * phh
Note that we are using the lru_cache
decorators to cache
intermediate results. I have another post that talks a bit more on
caching and lazy evaluation.
Now a simple test run: when Phh=0.5, i.e. one successful trial infers
no bias for the next one, it should fall back to a binomial
distribution. So we compare the result with that from scipy.stats.binom
:
K = 100 j = 50 p = 0.5 phh = 0.5 prob1 = pmf(j, K, p, phh) print(prob1) from scipy.stats import binom prob2 = binom.pmf(j, K, p) print(prob2)
The output:
0.0795892373871788 0.07958923738717875
The above recursive implementation works all fine for relatively small-sized problems, but will fail once the K number goes large.
This is because Python’s recursion depth is quite limited, to about 1000. Recursive functions (including tail-recursions) will fail if the stack is full. See the below example call:
# large scale, recursive version K = 1000 j = 5 p = 0.5 phh = 0.5 prob = pmf(j, K, p, phh) print(prob)
Running the above snippet will trigger the following error:
Traceback (most recent call last): File "dependent_binomial.py", line 318, in <module> prob4 = pmf(j, K, p, phh) File "dependent_binomial.py", line 52, in pmf f_jk = compute_jkf(j, k, p, pmm, pmh) File "dependent_binomial.py", line 84, in compute_jkf return compute_jks(j, k-1, p, phh, phm) * pmh + compute_jkf(j, k-1, p, pmm, pmh) * pmm File "dependent_binomial.py", line 112, in compute_jks return compute_jkf(j-1, k-1, p, pmm, pmh) * phm + compute_jks(j-1, k-1, p, phh, phm) * phh File "dependent_binomial.py", line 84, in compute_jkf return compute_jks(j, k-1, p, phh, phm) * pmh + compute_jkf(j, k-1, p, pmm, pmh) * pmm File "dependent_binomial.py", line 112, in compute_jks return compute_jkf(j-1, k-1, p, pmm, pmh) * phm + compute_jks(j-1, k-1, p, phh, phm) * phh File "dependent_binomial.py", line 84, in compute_jkf return compute_jks(j, k-1, p, phh, phm) * pmh + compute_jkf(j, k-1, p, pmm, pmh) * pmm File "dependent_binomial.py", line 112, in compute_jks return compute_jkf(j-1, k-1, p, pmm, pmh) * phm + compute_jks(j-1, k-1, p, phh, phm) * phh File "dependent_binomial.py", line 84, in compute_jkf return compute_jks(j, k-1, p, phh, phm) * pmh + compute_jkf(j, k-1, p, pmm, pmh) * pmm File "dependent_binomial.py", line 112, in compute_jks return compute_jkf(j-1, k-1, p, pmm, pmh) * phm + compute_jks(j-1, k-1, p, phh, phm) * phh File "dependent_binomial.py", line 84, in compute_jkf return compute_jks(j, k-1, p, phh, phm) * pmh + compute_jkf(j, k-1, p, pmm, pmh) * pmm File "dependent_binomial.py", line 84, in compute_jkf return compute_jks(j, k-1, p, phh, phm) * pmh + compute_jkf(j, k-1, p, pmm, pmh) * pmm File "dependent_binomial.py", line 84, in compute_jkf return compute_jks(j, k-1, p, phh, phm) * pmh + compute_jkf(j, k-1, p, pmm, pmh) * pmm [Previous line repeated 486 more times] File "dependent_binomial.py", line 112, in compute_jks return compute_jkf(j-1, k-1, p, pmm, pmh) * phm + compute_jks(j-1, k-1, p, phh, phm) * phh File "dependent_binomial.py", line 72, in compute_jkf if j == 0 and k == 1: RecursionError: maximum recursion depth exceeded in comparison
It is possible to enlarge this maximum recursion depth. But I will illustrate another way, which is to change the recursive calls into for loops.
4.2. Non-recursive version
Referring back to the schematic in Figure 1, it is noticed that to get to a certain point Φj,K on the graph, it works from the left most end to the right.
We can re-draw the schematic graph in a slightly different way, as shown in Figure 2 below:
Figure 2: Similar solution schematic in Figure 1 drawn on a regular grid.
We put all the target nodes of Φj,K on a regular grid, and draw the ηj,K transitions as blue, horizontal edges, and ξj,K transitions as red, diagonal edges.
From Equations (5), it is further noticed that we only take a top-down step in the computation of ξj,K=ηj−1,K−1Phm+ξj−1,K−1Phh. The computation of ηj,K=ξj,KPmh+ηj,K−1Pmm then only takes a left-right step from K−1 to K.
So, we will be building 2 2D arrays, one for η and one for ξ terms, both with only the upper triangle filled. j will be the row index, and K the column index.
Here is the non-recursive version:
def compute_jksf(j, k, p, pmm, pmh, phh, phm): '''Compute the transition probabilities leading to the prob of j successes in k trials Args: j (int): number of successes. k (int): number of trials. p (float): probability of any trial being success. pmm (float): probability of fail given its previous trial is fail. pmh (float): probability of fail given its previous trial is success. phh (float): probability of success given its previous trial is success. phm (float): probability of success given its previous trial is fail. Returns: farr (ndarray): 2d array with shape (k+1, k+1), the (j,k) element is the probability of j successes in k trials and the kth one is fail. sarr (ndarray): 2d array with shape (k+1, k+1), the (j,k) element is the probability of j successes in k trials and the kth one is success. ''' farr = np.zeros([k+1, k+1]) sarr = np.zeros([k+1, k+1]) for kk in range(1, k+1): for jj in range(0, kk+1): if jj == 0: sjk = 0 fjk = (1-p) * pmm**(kk-1) elif jj == kk: sjk = p * phh**(kk-1) fjk = 0 else: sjk = farr[jj-1, kk-1] * phm + sarr[jj-1, kk-1]*phh fjk = sarr[jj, kk-1] * pmh + farr[jj, kk-1] * pmm farr[jj,kk] = fjk sarr[jj,kk] = sjk return farr, sarr
The ξj,K term is computed using:
sjk = farr[jj-1, kk-1] * phm + sarr[jj-1, kk-1]*phh
And the ηj,K term is computed using
fjk = sarr[jj, kk-1] * pmh + farr[jj, kk-1] * pmm
This compute_jksf()
function will be called by a new pmf2()
function to compute the probability mass function:
def pmf2(j, k, p, phh=None, phm=None): '''Compute the probability mass function binomial distr with dependent trials, non-recursive version Args: j (int): number of successes. k (int): number of trials. p (float): probability of any trial being success. Keyword Args: phh (float): probability of success given its previous trial is success. phm (float): probability of success given its previous trial is fail. <phh> and <phm> must be provided at least one. If both are given, <phm> is computed from <phh>. Returns: res (float): probability of j successes in k trials. ''' assert j >= 0, "<j> must >= 0." assert k >= 0, "<k> must >= 0." assert p >= 0, "<p> must >= 0." if j == 0 and k == 1: return 1 - p if j == 1 and k == 1: return p if phh is None and phm is None: raise Exception("Either <phh> or <phm> must be provided.") if phh is None: phh = 1 - (1-p) / p * phm else: phm = (p - p*phh) / (1 - p) pmh = 1 - phh pmm = 1 - phm f_jk, s_jk = compute_jksf(j, k, p, pmm, pmh, phh, phm) res = f_jk[j, k] + s_jk[j, k] return res
Now the new pmf2()
function is capable of handling large scale
problems:
# large scale, non-recursive version K = 1000 j = 250 p = 0.6 phh = 0.5 prob3 = pmf2(j, K, p, phh) print(prob3)
The output is:
7.6996966658670975e-289
4.3. Further acceleration by Cython
The above compute_jksf()
function is traversing 2D arrays using pure
Python, and this will be slow. As the computation has inherent
sequential dependence (later terms depend on the earlier ones), this
can’t be vectorized.
One easy way to speed up this computation is to write statically typed code using Cython, and import the compiled module.
Here is my Cython implementation, saved into a sub-folder:
dependent_binom/dependent_binom.pyx
:
cimport cython import numpy as np cimport numpy as np np.import_array() @cython.boundscheck(False) # turn off bounds-checking for entire function @cython.wraparound(False) # turn off negative index wrapping for entire function def compute_jksf(int j, int k, double p, double pmm, double pmh, double phh, double phm): '''Compute the transition probabilities leading to the prob of j successes in k trials Args: j (int): number of successes. k (int): number of trials. p (float): probability of any trial being success. pmm (float): probability of fail given its previous trial is fail. pmh (float): probability of fail given its previous trial is success. phh (float): probability of success given its previous trial is success. phm (float): probability of success given its previous trial is fail. Returns: farr (ndarray): 2d array with shape (k+1, k+1), the (j,k) element is the probability of j successes in k trials and the kth one is fail. sarr (ndarray): 2d array with shape (k+1, k+1), the (j,k) element is the probability of j successes in k trials and the kth one is success. ''' cdef int jj, kk cdef double sjk, fjk cdef np.ndarray[np.double_t, ndim=2] farr = np.zeros([k+1, k+1]) cdef np.ndarray[np.double_t, ndim=2] sarr = np.zeros([k+1, k+1]) for kk in range(1, k+1): for jj in range(0, kk+1): if jj == 0: sjk = 0. fjk = (1-p) * pmm**(kk-1) elif jj == kk: sjk = p * phh**(kk-1) fjk = 0. else: sjk = farr[jj-1, kk-1] * phm + sarr[jj-1, kk-1]*phh fjk = sarr[jj, kk-1] * pmh + farr[jj, kk-1] * pmm farr[jj,kk] = fjk sarr[jj,kk] = sjk return farr, sarr
Here, we statically type all index variables (e.g. jj
, kk
) to int
,
and all probability values/arrays to double
. The rest of the code is
identical to the pure Python version.
Then I prepare a simple setup.py
script, saved into the same
dependent_binom/setup.py
folder:
from setuptools import setup, Extension from Cython.Build import cythonize import numpy extension = Extension('dependent_binom', ['./dependent_binom.pyx',], include_dirs=[numpy.get_include()],) setup(name='dependent_binom', ext_modules=cythonize(extension, annotate=True), zip_safe=False)
To compile the code, execute the following command:
python setup.py build_ext --inplace
If everything goes well, this should create a
dependent_binom.c
and something like
dependent_binom.cpython-38-x86_64-linux-gnu.so
in the
dependent_binom
folder.
To use this Cython version:
from dependent_binom import dependent_binom dependent_binom.compute_jksf(j, k, p, pmm, pmh, phh, phm)
A speed comparison is done:
# compute_jksf, python version K = 1000 j = 250 p = 0.6 phm = (p - p*phh) / (1 - p) pmh = 1 - phh pmm = 1 - phm t1 = time.time() farr, sarr = compute_jksf(j, K, p, pmm, pmh, phh, phm) t2 = time.time() print('compute_jksf, Python version time:', t2-t1) # compute_jksf, cython version t3 = time.time() farr2, sarr2 = dependent_binom.compute_jksf(j, K, p, pmm, pmh, phh, phm) t4 = time.time() print('compute_jksf, cython version time:', t4-t3) print(np.allclose(farr, farr2)) print(np.allclose(sarr, sarr2)) print((t2-t1) / (t4-t3))
The output:
compute_jksf, Python version time: 0.5290002822875977 compute_jksf, cython version time: 0.0044057369232177734 True True 120.0707830510309
So about 120x speed up!
4.4. Cumulative distribution function
Having got the probability mass function ready, the CDF function is
just a summation of our previous defined farr
and sarr
arrays,
up to the j
th row and at the k
th column:
def cdf(j, k, p, phh=None, phm=None): '''Compute the CDF of binomial distr with dependent trials, non-recursive cython version Args: j (int): number of successes. k (int): number of trials. p (float): probability of any trial being success. Keyword Args: phh (float): probability of success given its previous trial is success. phm (float): probability of success given its previous trial is fail. <phh> and <phm> must be provided at least one. If both are given, <phm> is computed from <phh>. Returns: res (float): probability of j or fewer successes in k trials. ''' assert j >= 0, "<j> must >= 0." assert k >= 0, "<k> must >= 0." assert p >= 0, "<p> must >= 0." if phh is None: phh = 1 - (1-p) / p * phm else: phm = (p - p*phh) / (1 - p) pmh = 1 - phh pmm = 1 - phm farr, sarr = dependent_binom.compute_jksf(j, k, p, pmm, pmh, phh, phm) res = farr[:(j+1), k].sum() + sarr[:(j+1), k].sum() return res
Note that the cdf()
function is calling our Cython implemented
compute_jksf()
function. An example:
K = 10 j = 5 p = 0.6 phh = 0.5 out = cdf(j, K, p, phh) print('Cython version CDF:', out)
The output:
Cython version CDF: 0.3381835937500002
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK