利用质数随机遍历集合的一种实现

有个随机遍历数据集合的需求,描述如下:
① 从集合的某个随机位置开始遍历
② 随机获取集合中剩余数据
③ 重复步骤②,直到遍历完整个集合

当看到这个需求时,不知道你是否会想到什么实现方式?

最近在追溯go底层源码时发现,在调度器从其他 P 中 steal 可用 goroutine 时利用质数的特性,采用了一种特殊算法随机遍历 allp 数组。觉得很是经典,特此记录一下。

例如:通过下标随机遍历数组中的 8 个元素,则最终遍历数组的下标可能是 [0,1,2,3,4,5,6,7] 中的任意一种随机排列。

遍历步骤:

① 计算小于等于 count (这里是8) 的所有质数,得到数组 [1,3,5,7]
② 从质数数组随机获取一个质数 inc,例如:5
③ 获取一个初识随机数 R,例如:16
④ (R+inc) % count = pos,pos 就是数组下标
⑤ 把 第④步 计算结果 pos 作为第三步中的 R,继续下一次循环,直到遍历完整个数组

按照遍历步骤,进行一轮计算,过程如下:

1
2
3
4
5
6
7
8
① R:16 inc:5 count:8    (16+5)%8 = pos:5 // 第一个R是单独获取的随机数,计算结果 pos 作为下一次 R 的输入
② R:5 inc:5 count:8 (5+5)%8 = pos:2
③ R:2 inc:5 count:8 (2+5)%8 = pos:7
④ R:7 inc:5 count:8 (7+5)%8 = pos:4
⑤ R:4 inc:5 count:8 (4+5)%8 = pos:1
⑥ R:1 inc:5 count:8 (1+5)%8 = pos:6
⑦ R:6 inc:5 count:8 (6+5)%8 = pos:3
⑧ R:3 inc:5 count:8 (3+5)%8 = pos:0

通过上面计算,最终获得的遍历下标为 [5,2,7,4,1,6,3,0]。需要注意的是:每一轮遍历,第一次传入的随机数 R 与选取的质数 inc 不同,会得到不同的遍历次序。

在 go 底层源码的 func findrunnable() (gp *g, inheritTime bool) 方法中可以看到实现逻辑,如下所示:

代码位置:src/runtime/proc.go

注意:>=go1.16 findrunnable()代码有所变动,可在 go1.12 ~ go1.15 版本中看到相关逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
func findrunnable() (gp *g, inheritTime bool) {
// ...
// omit some code

// 没有获取到可用 goroutine 之前,整个过程会执行4次
for i := 0; i < 4; i++ {
// 开始遍历整个数组,这一层的 for 循环就是一次数组遍历
// stealOrder 是提前准备好的一个数据结构
// 其包含两个字段:1.待遍历的数组个数count 2.小于等于count的所有质数组成的数组
//
// fastrand() 就是开始遍历数组时第一次需要的随机数 R
for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
// enum.position() 就是遍历数组时获得的数组下标
// allp[enum.position()]则是数组的具体值
//
// runqsteal()含义:从 allp[enum.position()]找到的 P 的本地队列 steal 一批 goroutine 存放到 _p_
if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil {
return gp, false
}
}
}

// ...
// omit some code
}


var stealOrder randomOrder

//主要用来 存储所有小于等于数组个数 count 的质数数组 coprimes
type randomOrder struct {
count uint32 //将要遍历的数组的个数
coprimes []uint32 //所有小于等于 count 的质数数组
}

//获取小于等于 count 的所有质数组成的数组
func (ord *randomOrder) reset(count uint32) {
ord.count = count
ord.coprimes = ord.coprimes[:0]

for i := uint32(1); i <= count; i++ {
//判断i是否是质数
if gcd(i, count) == 1 {
ord.coprimes = append(ord.coprimes, i)
}
}
}

// 判断 a 是否是质数
func gcd(a, b uint32) uint32 {
for b != 0 {
a, b = b, a%b
}
return a
}

//用于记录数组遍历时的一些信息
type randomEnum struct {
i uint32 // 已经遍历的数组个数
count uint32 // 数组总个数
pos uint32 // 当前计算得到的数组下标
inc uint32 // 质数数组中选取的一个质数
}

//生成遍历数组时需要的信息
//i就是第一次需要的随机数
func (ord *randomOrder) start(i uint32) randomEnum {
return randomEnum{
count: ord.count, //待遍历数组个数
pos: i % ord.count,//通过随机数i,第一次获取的数组下标 pos
inc: ord.coprimes[i%uint32(len(ord.coprimes))], //选取一个质数
}
}

//判断数组是否遍历完毕
func (enum *randomEnum) done() bool {
//已经遍历的数组元素个数 等于 数组个数,则本轮遍历完毕
return enum.i == enum.count
}

//每经过一次计算,会对已经遍历的数组元素个数i加一
func (enum *randomEnum) next() {
enum.i++ //已经遍历的数组元素个数i加一
enum.pos = (enum.pos + enum.inc) % enum.count //计算下一个遍历到的数组下标
}

//获取计算得到的数组下标
func (enum *randomEnum) position() uint32 {
return enum.pos
}

这种实现方式被视为是一种公平的数据集合遍历实现方式,想必在平时的业务开发过程中有很多地方会用的到。