Redis+DB实现基于号段的发号器原理

什么是发号器

在互联网场景中,很多业务要求生成唯一的ID号,以用于区分某些资源。常见例子:电商系统中的订单ID号、聊天群组中的消息ID号、上传文件到存储系统中之后生成的文件ID号、用户注册系统中的用户ID号、商户系统中的商户ID号、开放平台中的开发者账号ID、餐饮店的排队进餐号、影剧院票据单号、医院/银行排队号等等,这些基本都是基于先来后到的规则生成,以期达到唯一性或稍显公平的享受某些资源。
你是否想过使用技术应该如何实现呢? 下面引出本文主角:发号器(ticket dispenser),也可称之为ID生成器。

生成的ID号可以是字符串也可以是整数,本文仅探讨生成整数id的发号器实现原理

在互联网行业中,为了保证服务的稳定性、可用性、并发性等指标,服务一般是采用集群多节点部署,如何保证在这些不同的节点生成符合业务要求的ID,又引出另一个概念:分布式ID生成器(实现方案有多种)。关于分布式ID的常见实现方式参考笔者文章:分布式ID的5种生成方式以及Go源码中的一种应用,文章中列举了常见的5种实现方式以及原理。本文,则重点讲解使用Redis+DB基于号段的发号器实现原理。

实现发号器需要的关注点

需要关注的点大致有以下几个:

  1. 有序性
    正序或倒序,发号器基本都是基于某种纬度的正序排列。还有一些不需要有序性,只要保证唯一性即可。

  2. 递增性
    随着时间的流逝,号码的值只能增大不能变小,即:后面生成的一定大于前面生成的。

  3. 唯一性
    在整个生成的号码值域中,同一个号码有且仅出现一次。

  4. 先到先得
    先申请号码的先获取到,后申请号码的后获取到。

基于号段的发号器实现原理

由上图可知,实现基于号段的发号器逻辑有2个角色:

  1. 发号生成器
  2. 集中式号段管理器

对于基于号段的发号器来说,发号生成器本身存储一段可用的值域空间,其数据结构一般为:[currId, maxId],currId为下一个可用id,maxId为当前号段最大id。 每当有号码申请到达后,发号器先判断是否有可用号码。
若currId<=maxId则存在可用id,把currId返回给申请方,然后currId+=1。
若currId>maxId,说明该号段被消耗殆尽。此时,发号器需要申请新的号段值域空间。即:申请新的maxId,一般使用步长的方式,发号器每次申请新的值域空间,会得到长度固定的新值域空间。

判断发号生成器是否存在可用号码,一般有2种写法(只是处理边界条件不一样,这里以currId<=maxId视为有可用号码)

通常情况下,发号生成器分为进程内发号生成器与进程外发号生成器。即:在进程内部维护[currId, maxId]值域空间还是在进程外部维护[currId, maxId]值域空间。 进程内部可以通过全局变量+进程内部锁(或原子操作)的方式实现,进程外部通过其他中间件(Redis or DB)或服务来实现。
进程内发号生成器与进程外发号生成器的使用场景也有很大不同之处,服务一般存在多个节点,每个节点都存在一个业务进程。若对全部请求都要保证先来后到严格的时序性,则需要使用唯一的进程外发号生成器。若不用保证先来后到严格的时序性,则进程内发号生成器与进程外发号生成器都可以使用,考虑性能优先选择进程内发号生成器。

发号器类型 使用场景 优点 缺点
进程内发号生成器 不用保证先来后到时序性 高效、不用引入额外的进程外服务或者中间件 局部有序性
进程外发号生成器 保证先来后到严格的时序性 引入额外的服务或中间件,但整体有序 额外开销,复杂

集中式号段管理器(通常使用Redis或DB)则存储当前发号器派发的数值空间域最大数值,待发号生成器号码耗尽或需要一段数值空间域时,来集中式号段管理器申请一段可用的数值空间。

Redis+DB实现基于号段的发号器

通过上面可知,实现发号器功能需要实现2个角色:发号生成器与集中式号段管理器,本文着重讲解进程外发号生成器的实现原理。这里使用Redis作为发号生成器,DB作为集中式号段管理器。

Redis发号生成器仅仅是一个hash类型的数值结构,包含2个field:v_l/v_h。
DB集中式号段管理器一般是一张表结构,核心的2个字段:server_name/max_id。

通过上图可知,业务在通过发号器获取号码时需要经历以下几个步骤:

  1. 业务请求Redis发号生成器获取号码
  2. 发号生成器判断是否存在可用号码
  3. 有几种情况
    3.1 正常获取号码返回给业务(到这里结束)
    3.2 发号生成器数据结构不存在
    3.3 发号生成器数值空间耗尽
  4. 对于3.2/3.3这两种情况,业务会加分布式锁并请求DB集中式号段管理器获取新的号段
  5. DB集中式号段管理器返回新的号段
  6. 业务更新发号生成器号段
  7. 回到第1步

业务在调用Redis获取可用号码时,一般使用lua脚本来实现,代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
-- KEYS[1] 发号器结构的key
-- ARGV[1] 表示filed v_l
-- ARGV[2] 表示filed v_h

local v_l = redis.call('hget', KEYS[1], tostring(ARGV[1])) --号段的当前最大值
local v_h = redis.call('hget', KEYS[1], tostring(ARGV[2])) --号段的最大值

if v_h == false then --号段最大值为空,即:数据结构在Redis中不存在
return -1
elseif v_l > v_h then --当前号段消耗完毕,需要再申请一段
return -2
else
return redis.call('hincrby', KEYS[1], tostring(ARGV[1]), 1) --通过incr获取新版本号
end

基于号段的发号器伪代码

这里使用go语言演示整个操作过程,伪代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package generateDistributedIdDemo

const (
TicketStep = 512

Empty = iota
Exhaust
)

func getDistributedId() int64{
// 待实现 getDistributedLocker 检查Redis中是否能够获取号码
id, err := callRedisGetDistributedId(serverName)
if err != nil {
return 0
}

switch id {
case -1: // 发号生成器数据结构不存在
return rebuildTicketRange(serverName, Empty)
case -2: // 发号生成器数值空间耗尽
return rebuildTicketRange(serverName, Exhaust)
default:
}

if id>0 {
return id
}
return 0
}

func rebuildTicketRange(serverName string, whichType int) int64{
// 待实现 getDistributedLocker 获取分布式锁
locker, err:=getDistributedLocker(serverName)
if err!= nil {
return 0
}
defer locker.Relase()

// double check
// 待实现 getDistributedLocker 再次检查Redis中是否能够获取号码
id, err := callRedisGetDistributedId(serverName)
if err != nil {
return 0
}

switch id {
case -1,-2:
default:
if id > 0 {
return id
}
return 0
}

// 从DB申请一段新的号段
maxId, err:= getTicketMaxIdFromDB(serverName, whichType)
if err!= nil {
return 0
}

newTicketRangeLow:= maxId-TicketStep
newTicketRangeHigh:= maxId

// 待实现 setNewTicketRangeToRedis 设置新的数值空间域到Redis
if err:= setNewTicketRangeToRedis(serverName, newTicketRangeLow+1, newTicketRangeHigh); err!=nil {
return 0
}
return newTicketRangeLow
}

// 更新DB分布式号段管理器获取新的max_id
func getTicketMaxIdFromDB(serverName string, whichType int) (int64, error) {
currStep := TicketStep
switch whichType {
case Empty:
currStep*=2 // 发号生成器不存在时,步长为2倍TicketStep
case Exhaust:
}

// DB中旧的 max_id
// oldMaxId := someNumber

// 返回DB中旧的 max_id+=currStep 之后新的 max_id
// 待实现 getNewTicketMaxIdFromDB
newMaxId, err := getNewTicketMaxIdFromDB(serverName, currStep)

// 最终发号生成器新的号段
// Empty情况的新的号段范围:[newMaxId-TicketStep, oldMaxId+=2*TicketStep]
// 其中newMaxId=(oldMaxId+=2*TicketStep),会跳过一段长度为 TicketStep 的空间
//
// Exhaust情况的新的号段范围:[newMaxId-TicketStep, oldMaxId+=TicketStep]
return newMaxId, err
}

通过上述伪代码可知,在获取新的号段时需要先加分布式锁以保证同时只有一个进程刷新号段。上述伪代码仅用于说明通过发号器获取号码的过程,部分代码与细节没有实现。