etcd与分布式锁的实现及原理

1.关于etcd

官方文档永远是最好的学习资料,官方介绍etcd如是说。

分布式系统使用etcd作为配置管理、服务发现和协调分布式工作的一致键值存储。许多组织使用etcd来实现生产系统,如容器调度器、服务发现服务和分布式数据存储。使用etcd的常见分布式模式包括leader选举、分布式锁和监视机器活动。

Distributed systems use etcd as a consistent key-value store for configuration management, service discovery, and coordinating distributed work. Many organizations use etcd to implement production systems such as container schedulers, service discovery services, and distributed data storage. Common distributed patterns using etcd include leader election, distributed locks, and monitoring machine liveness.

https://etcd.io/docs/v3.4/learning/why/

实现分布式锁仅是 etcd 众多功能中的一项,服务注册与发现在 etcd 中用的则会更多。

官方也对众多组件进行了对比,如下所示:

2.实现分布式锁的组件们

在分布式系统中,常用于实现分布式锁的组件有:Redis、zookeeper、etcd,下面针对各自的特性进行对比:

备注:N+1可用 中的 N 含义为 集群节点数的一半
例如,etcd集群节点数为5,则 N 为 5/2=2,则 N+1 为3,也就是说保证集群中超过半数节点可用。

由上图可以看出组件各自的特点,对于分布式锁来说在某些场景下可能至关重要的一点是要求CP。但是,一般情况下Redis集群不支持CP,而是支持AP。虽然,官方也给出了redlock的方案,但由于需要部署多个实例(超过一半实例成功才视为成功),部署、维护比较复杂、可能还会存在一些问题。所以在对一致性要求很高的业务场景下(电商、银行支付),一般选择使用zookeeper或者etcd。对比zookeeper与etcd,如果考虑性能、并发量、维护成本来看。由于etcd是用Go语言开发,直接编译为二进制可执行文件,并不依赖其他任何东西,则更具有优势。

3.etcd分布式锁实现原理

对于分布式锁来说,操作的动作包含:
1.获取锁

处理过程中,需要另起线程/协程对锁进行续约,防止锁被释放时业务还未处理完成。

2.释放锁

4.etcd分布式锁示例代码

官方已经对 etcd 分布式锁进行了封装,这里使用官方示例来讲解。

https://github.com/etcd-io/etcd/blob/main/tests/integration/clientv3/concurrency/example_mutex_test.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package example

import (
"context"
"fmt"
"log"
"sync"
"testing"
"time"

"go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
)

func TestMutex_TryLock(t *testing.T) {
// 创建 etcd client 对象
// Endpoints 为 etcd server 端的地址,这里连接到本地的 etcd server
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"127.0.0.1:2379"}})
if err != nil {
log.Fatal(err)
}
// 用完关闭连接
defer cli.Close()

// create two separate sessions for lock competition
// 通过 etcd client 创建 session 会话
// 注意:session 会话中包含一个租约,在后面创建 etcd key 时,与 key 进行绑定,从而进行保活

// 写法1
//s1, err := concurrency.NewSession(cli, concurrency.WithTTL(10))
//
// 写法2
//leaseId:=100 // 这里只是随便写一个数值,具体的还需要创建
//s1, err :=concurrency.NewSession(cli,concurrency.WithLease(clientv3.LeaseID(leaseId)))

s1, err := concurrency.NewSession(cli)
if err != nil {
log.Fatal(err)
}
// 用完关闭
defer s1.Close()

// 创建 锁1,锁的 key 前缀为 /my-lock
m1 := concurrency.NewMutex(s1, "/my-lock")

// 创建会话2
s2, err := concurrency.NewSession(cli)
if err != nil {
log.Fatal(err)
}
defer s2.Close()
// 锁2
m2 := concurrency.NewMutex(s2, "/my-lock")

// acquire lock for s1
// 通过 m1 取获取锁
// err 等于 nil 说明获得到了 锁
if err = m1.Lock(context.TODO()); err != nil {
log.Fatal(err)
}
fmt.Println("acquired lock for s1")

if err = m2.Lock(context.TODO()); err == nil {
// 因为,锁被 m1 持有了,此时 m2 不应该获得到锁,走到这里说明除了问题
log.Fatal("should not acquire lock")
}
if err == concurrency.ErrLocked {
fmt.Println("cannot acquire lock for s2, as already locked in another session")
}

// m1 释放锁
if err = m1.Unlock(context.TODO()); err != nil {
log.Fatal(err)
}
fmt.Println("released lock for s1")

// m2 试图去获取锁
// 因为 m1 已经释放了锁,所以 m2 会成功获取到锁
if err = m2.TryLock(context.TODO()); err != nil {
log.Fatal(err)
}
fmt.Println("acquired lock for s2")

// Output:
// acquired lock for s1
// cannot acquire lock for s2, as already locked in another session
// released lock for s1
// acquired lock for s2
}

5.源码解读

通过示例可以看出,使用 etcd 实现分布式锁非常方便,调用的方法主要有:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

// NewSession gets the leased session for a client.
// 创建一个 带有租约 的会话
func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error)

// 创建 实现锁的 对象 Mutex
// Mutex implements the sync Locker interface with etcd
func NewMutex(s *Session, pfx string) *Mutex

// Lock locks the mutex with a cancelable context. If the context is canceled
// while trying to acquire the lock, the mutex tries to clean its stale lock entry.
//
// 如果锁被其他 session 持有,则会一直阻塞到能获取到锁
func (m *Mutex) Lock(ctx context.Context) error

// TryLock locks the mutex if not already locked by another session.
// If lock is held by another session, return immediately after attempting necessary cleanup
// The ctx argument is used for the sending/receiving Txn RPC.
//
// 如果锁被其他 session 持有,则不会阻塞,而是直接返回
func (m *Mutex) TryLock(ctx context.Context) error

// 释放锁
func (m *Mutex) Unlock(ctx context.Context) error

对会话 Session 解读。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75

// session ttl 默认 60秒
const defaultSessionTTL = 60

// session 会话选项
type sessionOptions struct {
ttl int // time to live
leaseID v3.LeaseID // 租约id

ctx context.Context
}

// Session represents a lease kept alive for the lifetime of a client.
// Fault-tolerant applications may use sessions to reason about liveness.
type Session struct {
client *v3.Client // etcd 客户端对象
opts *sessionOptions // session 选项
id v3.LeaseID // 租约id

cancel context.CancelFunc
donec <-chan struct{}
}

// NewSession gets the leased session for a client.
func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error) {
// 这里先设置 session ttl 为默认值
// 如果 opts 包含用于设置 ttl 的方法 `func WithTTL(ttl int) SessionOption` 就会使用配置的值覆盖默认值
ops := &sessionOptions{ttl: defaultSessionTTL, ctx: client.Ctx()}

for _, opt := range opts {
opt(ops)
}

// 租约id
// 如果 opts 包含用于设置 ttl 的方法 `func WithLease(leaseID v3.LeaseID) SessionOption` 则 ops.leaseID 大于 0
id := ops.leaseID
// v3.NoLease 为 常量0。也就是说未设置 租约,则会根据 ttl 创建租约
if id == v3.NoLease {
resp, err := client.Grant(ops.ctx, int64(ops.ttl))
if err != nil {
return nil, err
}
id = resp.ID
}

// 走到这里时,session 中已经包含了一个可用的租约

ctx, cancel := context.WithCancel(ops.ctx)
// 对 leaseID 保活
// lease 相当于 redis 中设置的过期时间。一旦到达过期时间,key就会被删除
// KeepAlive() 是为了 不停的给 key 续约,以防止 业务还没处理完成,key就被释放了
//
// java 封装的 redis 库中有 watchdog,这里的 KeepAlive() 就相当于 watchdog
//
// keepAlive 是一个 chan
keepAlive, err := client.KeepAlive(ctx, id)
if err != nil || keepAlive == nil {
cancel()
return nil, err
}

donec := make(chan struct{})
s := &Session{client: client, opts: ops, id: id, cancel: cancel, donec: donec}

// keep the lease alive until client error or cancelled context
// 从 keepAlive 读取值,却不用是因为 keepAlive 的缓冲区大小固定,否则被填满之后可能会造成阻塞,从而出现问题
go func() {
defer close(donec)
for range keepAlive {
// eat messages until keep alive channel closes
}
}()

return s, nil
}

对锁对象 Mutex 解读。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type Mutex struct {
s *Session // session 对象,包含租约id,需要跟 mykey 进行绑定

pfx string // 锁前缀
myKey string // 最终锁的 key = pfx+s.id,即:锁前缀+租约id
myRev int64 // key 的 Revision

hdr *pb.ResponseHeader // 请求 etcd server 获取锁时,返回的 header 信息
}

// 创建锁对象
func NewMutex(s *Session, pfx string) *Mutex {
return &Mutex{s, pfx + "/", "", -1, nil}
}

对加锁方法 Lock 的解读 (很关键)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
// Lock locks the mutex with a cancelable context. If the context is canceled
// while trying to acquire the lock, the mutex tries to clean its stale lock entry.
func (m *Mutex) Lock(ctx context.Context) error {
// 尝试加锁(核心)
resp, err := m.tryAcquire(ctx)
if err != nil {
return err
}

// if no key on prefix / the minimum rev is key, already hold the lock
// 持有锁 session 中 pfx 的 kvs,即:当前是谁持有的这把锁
ownerKey := resp.Responses[1].GetResponseRange().Kvs

// ① 如果还没有持有这把锁,则 len(ownerKey) == 0
// ② 如果持有锁的人是自己,则 ownerKey[0].CreateRevision == m.myRev
//
// 只要符合上面2个条件,就说明是自己获得了锁
if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
m.hdr = resp.Header
// 获得了锁,直接返回
return nil
}

client := m.s.Client()
// wait for deletion revisions prior to myKey
// TODO: early termination if the session key is deleted before other session keys with smaller revisions.

// 没有获得到锁,需要则阻塞等待锁的释放
// 为了避免惊群效应,只监听比自己小的前一个 key
// m.myRev-1 表示自己只关心比自己小的 Revision,而这个 Revision 最大是 m.myRev-1
//
// TryLock() 唯一不同的是这里调用的是 client.Delete() 方法。不会阻塞
_, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
// release lock key if wait failed
if werr != nil {
m.Unlock(client.Ctx())
return werr
}

// make sure the session is not expired, and the owner key still exists.
// 再次确认一下 session 还未过期
gresp, werr := client.Get(ctx, m.myKey)
if werr != nil {
m.Unlock(client.Ctx())
return werr
}

if len(gresp.Kvs) == 0 { // is the session key lost?
return ErrSessionExpired
}
m.hdr = gresp.Header

return nil
}

// 尝试去加锁
func (m *Mutex) tryAcquire(ctx context.Context) (*v3.TxnResponse, error) {
// session
s := m.s
// etcd 客户端对象
client := m.s.Client()

// 最终的key 为 key前缀+租约id
m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())

// 判断 m.myKey 的 create_vison是否为0
// 为 0, 则表示 m.myKey 还不存在
cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)

// put self in lock waiters via myKey; oldest waiter holds lock
// (重要)在这里可以看到 租约id 与 m.myKey 进行绑定
put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))

// reuse key in case this session already holds the lock
get := v3.OpGet(m.myKey)

// fetch current holder to complete uncontended path with only one RPC
// 看到这里是 m.pfx,即:key前缀,而不是完整的key
// v3.WithFirstCreate() 表示获取最先创建的,即:Revision最小的
getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)

// cmp 为判断条件
// put 为创建 key
// get 为查询 key
// getOwner 为查询持有key前缀 m.pfx 的最小的 Revision 信息

// 注意:这里使用了 etcd 的 事务写法
// If(cmp).Then().Else().Commit() 语义类似于编程语言中的 if ... else ...,伪代码如下
//
// if (cmp) {
// //执行 then 中包含的动作(动作可以是多个)
// Then()
// }else{
// //执行 else 中执行的动作(动作可以是多个)
// Else()
// }
//
// 一句话概括:判断 m.myKey 是否存在,不存在则执行 Then(),存在则执行 Else()。

// 由于需要请求远端的 etcd server,所以只有在 调用 Commit() 时,才会发起对远端的调用执行上面的逻辑
resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
if err != nil {
return nil, err
}

m.myRev = resp.Header.Revision

// 如果 cmp 为 true,则 resp.Succeeded 为true
// 否则 cmp 为 false,则 resp.Succeeded 为false
if !resp.Succeeded {
m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
}

return resp, nil
}

为了更好地说明获取锁的过程,假设存在3个client(client1、client2、client3)分别创建属于自己的key /my-lock/uuid1/my-lock/uuid2/my-lock/uuid3,由于client1最先创建,所以只有 client1获得了锁。
此时,client2 只监听比自己Revision小并且距离自己Revision最近的key /my-lock/uuid1 的删除事件,一旦 client1 删除了 /my-lock/uuid1,则 client2获得锁。
同理,client3 只监听比自己Revision小并且距离自己Revision最近的key /my-lock/uuid2 的删除事件,client2 删除了 /my-lock/uuid2,则 client3获得锁。

对监听key删除事件的 waitDeletes() 方法的解读。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// waitDeletes efficiently waits until all keys matching the prefix and no greater
// than the create revision.
func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) (*pb.ResponseHeader, error) {
// 注意:这里比较核心,比自己Revision小并且距离自己Revision最近的key
// 通过 v3.WithLastCreate()、v3.WithMaxCreateRev(maxCreateRev) 着两个条件来限制自己关注的key长什么样子
//
// v3.WithLastCreate() 关注的key的 按照 CreateRevision 倒序排列,只取一个
// v3.WithMaxCreateRev(maxCreateRev) 关注的key的Revision最大为 maxCreateRev
getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev))

for {
// 先获取关注并且将要 watch 的 key 的信息
resp, err := client.Get(ctx, pfx, getOpts...)
if err != nil {
return nil, err
}
if len(resp.Kvs) == 0 {
return resp.Header, nil
}

// 监听的 key,即:比自己Revision小并且距离自己Revision最近的key
lastKey := string(resp.Kvs[0].Key)

// waitDelete 中 监听key
// 关注的 key 无事件发生,则一直阻塞
// resp.Header.Revision 为 对应的key的 Revision
if err = waitDelete(ctx, client, lastKey, resp.Header.Revision); err != nil {
return nil, err
}
}
}

func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error {
cctx, cancel := context.WithCancel(ctx)
defer cancel()

var wr v3.WatchResponse
// 对 key 进行监听
// rev 为 key 的 Revision
wch := client.Watch(cctx, key, v3.WithRev(rev))

// key有任何事件发生之后,都会从 wch 中获取的到事件信息
for wr = range wch {
for _, ev := range wr.Events {
// 只关注 删除事件
// 正常情况下,只有 删除事件发生,才会退出
if ev.Type == mvccpb.DELETE {
return nil
}
}
}
if err := wr.Err(); err != nil {
return err
}
if err := ctx.Err(); err != nil {
return err
}
return fmt.Errorf("lost watcher waiting for delete")
}

对删除key Unlock() 方法的解读。

1
2
3
4
5
6
7
8
9
10
11
12
13
func (m *Mutex) Unlock(ctx context.Context) error {
client := m.s.Client()

// 请求 etcd server 对key进行删除操作
if _, err := client.Delete(ctx, m.myKey); err != nil {
return err
}

// 把 m.myKey 重置为 ASCII 码值为 0 的字符,即:
m.myKey = "\x00"
m.myRev = -1
return nil
}

6.小结

好的分布式锁应该具备一下几点:

  • 互斥性:任意时刻,同一个锁,只有一个操作对象能持有

    etcd 通过对key的相同前缀加锁,Revision 最小者获得锁实现

  • 安全性:避免死锁,当进程没有主动释放锁(进程崩溃退出),保证其他进程能够加锁

    etcd 通过对key的相同前缀加锁,Revision 非最小者创建key并阻塞等待

  • 可用性:当提供锁的服务节点故障(宕机)时,热备节点能够接替故障的节点继续提供服务,并保证自身持有的数据与故障节点一致

    etcd server 通过raft一致性算法来保证
    redis 的主从集群 或者 Redlock算法 在这里可能会存在一些问题

  • 对称性:对同一个锁,加锁和解锁必须是同一个进程,即某进程不能把其他进程持有的锁给释放了

    每一个加锁对象都会有一个 mutex 对象,解锁时删除的是 完整的 key 名称,而非前缀。etcd 使用 key前缀+uuid 保证了key的唯一性

通过源码解读,可知 etcd 具备上述特性,并且是一把合格的分布式锁。

7.扩展阅读