Go中多协程协作之sync.Cond

1. 程序中的通信方式

GO语言中有句名言:“不要用共享内存来通信,而是使用通信来共享内存”。
编程语言中,通信方式分为进程间通信、线程间通信。

  1. 进程间通信,常用方式:
  • 有名管道
  • 无名管道
  • 信号
  • 共享内存
  • 消息队列
  • 信号灯集
  • socket
  1. 线程间通信,常用方式:
  • 信号量
  • 互斥锁
  • 条件变量

对于Go语言来说,Go程序启动之后对外是一个进程,内部包含若干协程,协程相当于用户态轻量级线程,所以协程的通信方式大多可以使用线程间通信方式来完成。

协程间通信方式,官方推荐使用channel,channel在一对一的协程之间进行数据交换与通信十分便捷。但是,一对多的广播场景中,则显得有点无力,此时就需要sync.Cond来辅助。

2. 什么是广播?

举个例子,上高中时,宿管老师每天早晨需要叫醒学生们去上课。有两种方法:①一个寝室一个寝室把学生叫醒 ②在宿舍楼安装广播,到起床时间,在广播上叫醒学生。显然,使用广播的方式效率更高。

编程中的广播可以理解为:多个操作流程依赖于一个操作流程完成后才能进行某种动作,这个被依赖的操作流程在唤醒所有依赖者时使用的一种通知方式。

在Go语言中,则可以使用sync.Cond来实现多个协程之间的广播通知功能。

3. sync.Cond

cond是sync包下面的一种数据类型,相当于线程间通信的条件变量方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// Cond implements a condition variable, a rendezvous point
// for goroutines waiting for or announcing the occurrence
// of an event.
//
// Each Cond has an associated Locker L (often a *Mutex or *RWMutex),
// which must be held when changing the condition and
// when calling the Wait method.
//
// A Cond must not be copied after first use.
type Cond struct {
noCopy noCopy // 在第一次使用后不可复制,使用go vet作为检测使用

// L is held while observing or changing the condition
// 根据需求初始化不同的锁,如*Mutex 和 *RWMutex。注意是 指针类型
L Locker

// 具有头尾指针的链表。存储被阻塞的协程,通知时操作该链表中的协程
notify notifyList
checker copyChecker // 复制检查,检查cond实例是否被复制
}

该数据类型提供的方法有:

1
2
3
4
5
type Cond
func NewCond(l Locker) *Cond
func (c *Cond) Broadcast() // 通知所有协程,广播
func (c *Cond) Signal() // 通知一个协程
func (c *Cond) Wait() // 阻塞等待,直到被唤醒

对应源码追溯

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// Wait atomically unlocks c.L and suspends execution
// of the calling goroutine. After later resuming execution,
// Wait locks c.L before returning. Unlike in other systems,
// Wait cannot return unless awoken by Broadcast or Signal.
//
// Because c.L is not locked when Wait first resumes, the caller
// typically cannot assume that the condition is true when
// Wait returns. Instead, the caller should Wait in a loop:
//
// 注意下面的写法是官方推荐的
// c.L.Lock()
// for !condition() {
// c.Wait()
// }
// ... make use of condition ...
// c.L.Unlock()
//
func (c *Cond) Wait() {
// 检查c是否是被复制的,如果是就panic
c.checker.check()
// 获取等待队列的一个ticket数值,作为唤醒时的一个令牌凭证
t := runtime_notifyListAdd(&c.notify)
// 解锁
c.L.Unlock()

// 注意,上面的ticket数值会作为阻塞携程的一个标识
// 加入通知队列里面
// 到这里执行gopark(),当前协程挂起,直到signal或broadcast发起通知
runtime_notifyListWait(&c.notify, t)

// 被唤醒之后,先获取锁
c.L.Lock()
}

// Signal wakes one goroutine waiting on c, if there is any.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Signal() {
c.checker.check()
runtime_notifyListNotifyOne(&c.notify) // 随机挑选一个进行通知,wait阻塞解除
}

// Broadcast wakes all goroutines waiting on c.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Broadcast() {
c.checker.check()
// 通知所有阻塞等待的协程
// 主要是唤醒 cond.notify 链表上的各个协程
runtime_notifyListNotifyAll(&c.notify)
}

使用方法,代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
var locker sync.Mutex
var cond = sync.NewCond(&locker)

// NewCond(l Locker)里面定义的是一个接口,拥有lock和unlock方法。
// 看到sync.Mutex的方法,func (m *Mutex) Lock(),可以看到是指针有这两个方法,所以应该传递的是指针
func main() {
// 启动多个协程
for i := 0; i < 10; i++ {
gofunc(x int) {
cond.L.Lock() // 获取锁
defer cond.L.Unlock() // 释放锁

cond.Wait() // 等待通知,阻塞当前 goroutine

// 通知到来的时候, cond.Wait()就会结束阻塞, do something. 这里仅打印
fmt.Println(x)
}(i)
}

time.Sleep(time.Second * 1) // 睡眠 1 秒,等待所有 goroutine 进入 Wait 阻塞状态
fmt.Println("Signal...")
cond.Signal() // 1 秒后下发一个通知给已经获取锁的 goroutine

time.Sleep(time.Second * 1)
fmt.Println("Signal...")
cond.Signal() // 1 秒后下发下一个通知给已经获取锁的 goroutine

time.Sleep(time.Second * 1)
cond.Broadcast() // 1 秒后下发广播给所有等待的goroutine
fmt.Println("Broadcast...")
time.Sleep(time.Second * 1) // 等待所有 goroutine 执行完毕
}

总结

在go中协程间通信的方式有多种,最常用的是channel。如果牵扯多个协程的通知,可以使用sync.Cond。
查看channel、sync.Cond源码会发现,它们有相似之处

  1. 阻塞协程统一被封装在 sudog 结构里面
  2. channel 阻塞读/写时,用双向链表存储被阻塞导致等待唤醒的协程
  3. sync.Cond 使用带有头尾指针的单向链表存储被阻塞导致等待唤醒的协程
  4. 阻塞时都是使用gopark()进行协程的挂起操作

虽说有相似之处,但却有本质区别

  1. channel 可用来在协程间传递数据
  2. sync.Cond 不可用来在协程间传递数据,主要用来进行协程的阻塞唤醒操作。如需要传递数据,则需要使用全局变量进行传递