grpc-go设置keepalive

每个grpc请求都是 stream,Keepalive 能够让 grpc 的每个 stream 保持长连接状态,适合一些执行时间长的请求。Keepalive 支持在服务端和客户端配置,且只有服务端配置后,客户端的配置才会真正有效。

server 端实现

https://github.com/grpc/grpc-go/blob/master/examples/features/keepalive/server/main.go

keepalive配置参数是针对整个连接的
grpc-go基于http/2实现,可以多路复用。即:多个请求复用同一个连接,每个请求都是一个单独的stream

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package main

import (
"context"
"flag"
"fmt"
"log"
"net"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"

pb "google.golang.org/grpc/examples/features/proto/echo"
)

var port = flag.Int("port", 50052, "port number")

var kaep = keepalive.EnforcementPolicy{
MinTime: 5 * time.Second, // If a client pings more than once every 5 seconds, terminate the connection
PermitWithoutStream: true, // Allow pings even when there are no active streams
}

var kasp = keepalive.ServerParameters{
MaxConnectionIdle: 15 * time.Second, // If a client is idle for 15 seconds, send a GOAWAY
MaxConnectionAge: 30 * time.Second, // If any connection is alive for more than 30 seconds, send a GOAWAY
MaxConnectionAgeGrace: 5 * time.Second, // Allow 5 seconds for pending RPCs to complete before forcibly closing connections
Time: 5 * time.Second, // Ping the client if it is idle for 5 seconds to ensure the connection is still active
Timeout: 1 * time.Second, // Wait 1 second for the ping ack before assuming the connection is dead
}

// server implements EchoServer.
type server struct {
pb.UnimplementedEchoServer
}

func (s *server) UnaryEcho(ctx context.Context, req *pb.EchoRequest) (*pb.EchoResponse, error) {
return &pb.EchoResponse{Message: req.Message}, nil
}

func main() {
flag.Parse()

address := fmt.Sprintf(":%v", *port)
lis, err := net.Listen("tcp", address)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}

s := grpc.NewServer(grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp))
pb.RegisterEchoServer(s, &server{})

if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}

执行

1
go run server/main.go

server keepalive 的实现核心在于 keepalive.ServerParameterskeepalive.EnforcementPolicy

首先是 keepalive.ServerParameters,包含以下几个属性:

  • MaxConnectionIdle:最大空闲连接时间,默认为无限制。超出这段时间后,serve 发送 GoWay,强制 client stream 断开
  • MaxConnectionAge:最大连接时间,默认为无限制。stream 连接超出这个值是发送一个 GoWay
  • MaxConnectionAgeGrace:超出MaxConnectionAge之后的宽限时长,默认无限制 (最小为 1s)
  • Time:如果一段时间客户端存活但没有 pings 请求,服务端发送一次 ping 请求,默认是 2hour
  • Timeout:服务端发送 ping 请求超时的时间,默认20s

    即:在发送ping包之后,Timeout 时间内没有收到 ack 则视为超时

keepalive.EnforcementPolicy 为服务端强制执行策略,如果客户端违反则断开连接。它有两个属性:

  • MinTime : 如果在指定时间内收到 pings 次数大于一次,强制断开连接,默认 5min

    防止客户端在一段时间间隔内发送太频繁 ping 操作

  • PermitWithoutStream:没有活动的 stream 也允许pings。默认关闭

    是否允许没有流时发送 ping

注意:
若设置 MaxConnectionAge,而没设置 MaxConnectionAgeGrace,在达到 MaxConnectionAge 后看不到效果。
原因见下面代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
// grpc-go源码文件&位置
// internal/transport/defaults.go

const (
// The default value of flow control window size in HTTP2 spec.
defaultWindowSize = 65535
// The initial window size for flow control.
initialWindowSize = defaultWindowSize // for an RPC
infinity = time.Duration(math.MaxInt64)
defaultClientKeepaliveTime = infinity
defaultClientKeepaliveTimeout = 20 * time.Second
defaultMaxStreamsClient = 100
defaultMaxConnectionIdle = infinity
defaultMaxConnectionAge = infinity
defaultMaxConnectionAgeGrace = infinity
defaultServerKeepaliveTime = 2 * time.Hour
defaultServerKeepaliveTimeout = 20 * time.Second
defaultKeepalivePolicyMinTime = 5 * time.Minute
// max window limit set by HTTP2 Specs.
maxWindowSize = math.MaxInt32
// defaultWriteQuota is the default value for number of data
// bytes that each stream can schedule before some of it being
// flushed out.
defaultWriteQuota = 64 * 1024
defaultClientMaxHeaderListSize = uint32(16 << 20)
defaultServerMaxHeaderListSize = uint32(16 << 20)
)


// grpc-go源码文件&位置
// internal/transport/http2_server.go

func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
......

kp := config.KeepaliveParams

// 最大空闲时间
if kp.MaxConnectionIdle == 0 {
kp.MaxConnectionIdle = defaultMaxConnectionIdle
}
// 最大存活时间
if kp.MaxConnectionAge == 0 {
kp.MaxConnectionAge = defaultMaxConnectionAge
}
// Add a jitter to MaxConnectionAge.
// 添加一个随机值
kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)

// 超出MaxConnectionAge之后的宽限时长,默认无限制,最小为 1s
if kp.MaxConnectionAgeGrace == 0 {
kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
}

......

go t.keepalive()

return t, nil
}


// keepalive running in a separate goroutine does the following:
// 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
// 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
// 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
// 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection after an additional duration of keepalive.Timeout.
func (t *http2Server) keepalive() {
p := &ping{}

// True iff a ping has been sent, and no data has been received since then.
outstandingPing := false

// Amount of time remaining before which we should receive an ACK for the
// last sent ping.
kpTimeoutLeft := time.Duration(0)

// Records the last value of t.lastRead before we go block on the timer.
// This is required to check for read activity since then.
prevNano := time.Now().UnixNano()

// Initialize the different timers to their default values.
// 初始化不同的定时器
idleTimer := time.NewTimer(t.kp.MaxConnectionIdle)
ageTimer := time.NewTimer(t.kp.MaxConnectionAge)
kpTimer := time.NewTimer(t.kp.Time)

// 在 defer 中关闭定时器
defer func() {
// We need to drain the underlying channel in these timers after a call
// to Stop(), only if we are interested in resetting them. Clearly we
// are not interested in resetting them here.
idleTimer.Stop()
ageTimer.Stop()
kpTimer.Stop()
}()

for {
select {
case <-idleTimer.C: // 空闲定时器
t.mu.Lock()
idle := t.idle
if idle.IsZero() { // The connection is non-idle.
t.mu.Unlock()
idleTimer.Reset(t.kp.MaxConnectionIdle)
continue
}

val := t.kp.MaxConnectionIdle - time.Since(idle)
t.mu.Unlock()
if val <= 0 {
// The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
// Gracefully close the connection.
t.Drain()
return
}

idleTimer.Reset(val)

case <-ageTimer.C: // 最大连接时间定时器
t.Drain()

// 把定时器重置为 MaxConnectionAgeGrace
// 假设 设置了 MaxConnectionAge 而没设置 MaxConnectionAgeGrace
// 则 MaxConnectionAgeGrace 是 defaultMaxConnectionAgeGrace = infinity
// 由于 infinity 是一个非常大的数值,所以在达到 MaxConnectionAge 后看不到效果
ageTimer.Reset(t.kp.MaxConnectionAgeGrace)

select {
case <-ageTimer.C:
// Close the connection after grace period.
if logger.V(logLevel) {
logger.Infof("transport: closing server transport due to maximum connection age.")
}
t.Close()
case <-t.done:
}
return

case <-kpTimer.C: // 在指定时间没看到客户端活跃,则发送ping数据帧
lastRead := atomic.LoadInt64(&t.lastRead)
if lastRead > prevNano {
// There has been read activity since the last time we were
// here. Setup the timer to fire at kp.Time seconds from
// lastRead time and continue.
outstandingPing = false
kpTimer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano()))
prevNano = lastRead
continue
}

if outstandingPing && kpTimeoutLeft <= 0 {
if logger.V(logLevel) {
logger.Infof("transport: closing server transport due to idleness.")
}
t.Close()
return
}

if !outstandingPing {
if channelz.IsOn() {
atomic.AddInt64(&t.czData.kpCount, 1)
}
t.controlBuf.put(p)
kpTimeoutLeft = t.kp.Timeout
outstandingPing = true
}
// The amount of time to sleep here is the minimum of kp.Time and
// timeoutLeft. This will ensure that we wait only for kp.Time
// before sending out the next ping (for cases where the ping is
// acked).
sleepDuration := minTime(t.kp.Time, kpTimeoutLeft)
kpTimeoutLeft -= sleepDuration
kpTimer.Reset(sleepDuration)

case <-t.done: // 已经关闭,退出
return
}
}
}

client 端实现

https://github.com/grpc/grpc-go/blob/master/examples/features/keepalive/client/main.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package main

import (
"context"
"flag"
"fmt"
"log"
"time"

"google.golang.org/grpc"
pb "google.golang.org/grpc/examples/features/proto/echo"
"google.golang.org/grpc/keepalive"
)

var addr = flag.String("addr", "localhost:50052", "the address to connect to")

var kacp = keepalive.ClientParameters{
Time: 10 * time.Second, // send pings every 10 seconds if there is no activity
Timeout: time.Second, // wait 1 second for ping ack before considering the connection dead
PermitWithoutStream: true, // send pings even without active streams
}

func main() {
flag.Parse()

conn, err := grpc.Dial(*addr, grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp))
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()

c := pb.NewEchoClient(conn)

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
defer cancel()
fmt.Println("Performing unary request")
res, err := c.UnaryEcho(ctx, &pb.EchoRequest{Message: "keepalive demo"})
if err != nil {
log.Fatalf("unexpected error from UnaryEcho: %v", err)
}
fmt.Println("RPC response:", res)
select {} // Block forever; run with GODEBUG=http2debug=2 to observe ping frames and GOAWAYs due to idleness.
}

执行

1
GODEBUG=http2debug=2 go run client/main.go

keepalive.ClientParameters是在客户端使用的 keepalive 配置:

  • Time :ping 请求间隔时间,默认无限制,最小为 10s
  • Timeout :ping 超时时间,默认是 20s

    即:在发送ping包之后,Timeout 时间内没有收到 ack 则视为超时

  • PermitWithoutStream:没有活动的 stream 也允许pings。默认关闭

更多细节&设置,详见 https://github.com/grpc/grpc-go/tree/master/Documentation/keepalive.md

其他参考