11

加权随机采样 (Weighted Random Sampling)

 3 years ago
source link: https://lotabout.me/2018/Weighted-Random-Sampling/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client
Table of Contents

一个集合里有 n 个元素,每个元素有不同的权重,现在要不放回地随机抽取 m 个元素,每个元素被抽中的概率为元素的权重占总权重的比例。要怎么做呢?

简单的解法

现在考虑只抽取一个元素,假设权重之和为 1。我们可以从 [0, 1] 中随机得到一个权重,假设为 0.71,而后从第一个元素开始,不断累加它们的权重,直到有一个元素的累加权重包含 0.71,则选取该元素。下面是个示意图:

naive-weighted-sampling.svg

要选取 m 个元素,则可以按上面的方法先选取一个,将该元素从集合中去除,再反复按上面的方法抽取剩余的元素。这种方法的复杂度是 O(mn),并且将元素从集合中删除其实不太方便实现。

当然,最重要的是这个算法需要多次遍历数据,不适合用在流处理的场景中。

Algorithm A

Algorithm A 是论文 Weighted Random Sampling 中提出的,步骤如下:

  1. 对于集合 VV 中的元素 vi∈Vvi∈V,选取均匀分布的随机数 ui=rand(0,1)ui=rand(0,1) ,计算元素的特征 ki=u(1/wi)iki=ui(1/wi)
  2. 将集合按 kiki 排序,选取前 mm 大的元素。

算法的正确性在作者 2006 年的论文 Weighted random sampling with a reservoir 里给了详细的证明。论文中给出了算法的两个变种 A-Res 与 A-ExpJ,它们都能在一次扫描中得到 m 个样本。非常适合在流处理的场合中。

A-Res 算法

A-Res(Algorithm A With a Reservoir) 是 Algorithm 的“蓄水池”版本,即维护含有 m 个元素的结果集,对每个新元素尝试去替换结果集中权重最小的元素。步骤如下:

  1. 将集合 VV 的前 mm 个元素放入结果集合 RR。
  2. 对于结果集里的每个元素,计算特征值 ki=u(1/wi)iki=ui(1/wi),其中 ui=rand(0,1)ui=rand(0,1)
  3. 对 i=m+1,m+2,…,ni=m+1,m+2,…,n 重复步骤 4 ~ 6
    1. 将结果集中最小的特征 kk 作为当前的阈值 TT
    2. 对于元素 vivi,计算特征 ki=u(1/wi)iki=ui(1/wi),其中 ui=rand(0,1)ui=rand(0,1)
    3. 如果 ki>Tki>T 则将 RR 中拥有最小 kk 值的元素替换成 vivi。

论文证明了如果权重 wiwi 是一般连续分布上的随机变量,则上面的算法中插入 RR 的次数为 O(mlog(nm))O(mlog⁡(nm))。该算法用 Python 实现如下:

import heapq
import random

def a_res(samples, m):
"""
:samples: [(item, weight), ...]
:k: number of selected items
:returns: [(item, weight), ...]
"""

heap = [] # [(new_weight, item), ...]
for sample in samples:
wi = sample[1]
ui = random.uniform(0, 1)
ki = ui ** (1/wi)

if len(heap) < m:
heapq.heappush(heap, (ki, sample))
elif ki > heap[0][0]:
heapq.heappush(heap, (ki, sample))

if len(heap) > m:
heapq.heappop(heap)

return [item[1] for item in heap]

A-ExpJ 算法

A-Res 需要对每个元素产生一个随机数,而生成高质量的随机数有可能会有较大的性能开销,,所以论文中给出了 A-ExpJ 算法,能将随机数的生成量从 O(n)O(n) 减少到 O(mlog(nm)))O(mlog⁡(nm)))。从步骤上看,很像我们最开始提出的简单版本,设定一个阈值并跳过一些元素。具体步骤如下:

  1. 将集合 VV 的前 mm 个元素放入结果集合 RR。
  2. 对于结果集里的每个元素,计算特征值 ki=u(1/wi)iki=ui(1/wi),其中 ui=rand(0,1)ui=rand(0,1)
  3. 将 RR 中小最的特征值记为阈值 TwTw
  4. 对剩下的元素重复步骤 5 ~ 10
    1. 令 r=rand(0,1)r=rand(0,1) 且 Xw=log(r)/log(Tw)Xw=log⁡(r)/log⁡(Tw)
    2. 从当前元素 vcvc 开始跳过元素,直到遇到元素 vivi,满足
    3. wc+wc+1+⋯+wi−1<Xw≤wc+wc+1+⋯+wi−1+wiwc+wc+1+⋯+wi−1<Xw≤wc+wc+1+⋯+wi−1+wi
    4. 使用 vivi 替换 RR 中特征值最小的元素。
    5. 令 tw=Twiwtw=Twwi, r2=rand(tw,1)r2=rand(tw,1), vivi 的特征 ki=r(1/wi)2ki=r2(1/wi)
    6. 令新的阈值 TwTw 为此时 RR 中的最小特征值。

Python 实现如下:

def a_expj(samples, m):
"""
:samples: [(item, weight), ...]
:k: number of selected items
:returns: [(item, weight), ...]
"""

heap = [] # [(new_weight, item), ...]

Xw = None
Tw = 0
w_acc = 0
for sample in samples:
if len(heap) < m:
wi = sample[1]
ui = random.uniform(0, 1)
ki = ui ** (1/wi)
heapq.heappush(heap, (ki, sample))
continue

if w_acc == 0:
Tw = heap[0][0]
r = random.uniform(0, 1)
Xw = math.log(r)/math.log(Tw)

wi = sample[1]
if w_acc + wi < Xw:
w_acc += wi
continue
else:
w_acc = 0

tw = Tw ** wi
r2 = random.uniform(tw, 1)
ki = r2 ** (1/wi)
heapq.heappop(heap)
heapq.heappush(heap, (ki, sample))

return [item[1] for item in heap]

我们用多次采样的方式来尝试验证算法的正确性。下面代码1中为 abc 等元素赋予了不同的权重,采样 10 万次后计算被采样的次数与元素 a 被采样次数的比值。

overall = [('a', 10), ('b', 20), ('c', 50), ('d', 100), ('e', 200)]
def test_weighted_sampling(func, k):
stat = {}
for i in range(100000):
sampled = func(overall, k)
for item in sampled:
if item[0] not in stat:
stat[item[0]] = 0
stat[item[0]] += 1
total = stat['a']
for a in stat:
stat[a] = float(stat[a])/float(total)
print(stat)

首先验证 A-Res 算法:

test_weighted_sampling(a_res, 1)
test_weighted_sampling(a_res, 2)
test_weighted_sampling(a_res, 3)
test_weighted_sampling(a_res, 4)
test_weighted_sampling(a_res, 5)

# output
{'e': 19.54951600893522, 'd': 9.864110201042442, 'c': 4.842889054355919, 'a': 1.0, 'b': 1.973566641846612}
{'b': 2.0223285486443383, 'e': 12.17949833260838, 'd': 8.95287806292591, 'c': 4.843410178338408, 'a': 1.0}
{'a': 1.0, 'e': 6.166443722530097, 'd': 5.597171794381808, 'b': 1.9579591056755208, 'c': 4.387922797630423}
{'b': 1.8358898492044953, 'e': 2.5878688779880092, 'c': 2.4081341327311896, 'd': 2.549897479820395, 'a': 1.0}
{'a': 1.0, 'd': 1.0, 'c': 1.0, 'b': 1.0, 'e': 1.0}

看到,在采样一个元素时,b 被采样到的次数约为 a2 倍,而 e 则约为 20 倍,与overall 数组中指定的权重一致。而采样 5 个元素时,所有元素都会被选中。

同理验证 A-ExpJ 算法:

test_weighted_sampling(a_expj, 1)
test_weighted_sampling(a_expj, 2)
test_weighted_sampling(a_expj, 3)
test_weighted_sampling(a_expj, 4)
test_weighted_sampling(a_expj, 5)

# output
{'e': 19.78311444652908, 'c': 4.915572232645403, 'd': 9.840900562851782, 'a': 1.0, 'b': 1.9838649155722325}
{'e': 11.831543244771057, 'c': 4.709157716223856, 'b': 1.9720180893159978, 'd': 8.75183719615602, 'a': 1.0}
{'d': 5.496249062265567, 'c': 4.280007501875469, 'e': 6.046324081020255, 'b': 1.9321080270067517, 'a': 1.0}
{'a': 1.0, 'd': 2.5883654175335105, 'c': 2.440760540383957, 'e': 2.62591841571643, 'b': 1.8787559581808126}
{'a': 1.0, 'd': 1.0, 'c': 1.0, 'b': 1.0, 'e': 1.0}

与 A-Res 的结果类似。

文章中介绍了 A-Res 与 A-ExpJ 两种算法,按照步骤用 Python 实现了一个简单的版本,最后用采样的方式验证了算法的正确性。

加权随机采样本身不难,但如果需要在一次扫描中完成就不容易了。难以想像上面的算法直到 2006 年才提出。算法本身如此之简单,也让不感叹数学与概率的精妙。


  1. 1.修改自 https://blog.xingwudao.me/2017/09/26/sampling/

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK