var kaep = keepalive.EnforcementPolicy{ MinTime: 5 * time.Second, // If a client pings more than once every 5 seconds, terminate the connection PermitWithoutStream: true, // Allow pings even when there are no active streams }
var kasp = keepalive.ServerParameters{ MaxConnectionIdle: 15 * time.Second, // If a client is idle for 15 seconds, send a GOAWAY MaxConnectionAge: 30 * time.Second, // If any connection is alive for more than 30 seconds, send a GOAWAY MaxConnectionAgeGrace: 5 * time.Second, // Allow 5 seconds for pending RPCs to complete before forcibly closing connections Time: 5 * time.Second, // Ping the client if it is idle for 5 seconds to ensure the connection is still active Timeout: 1 * time.Second, // Wait 1 second for the ping ack before assuming the connection is dead }
// server implements EchoServer. type server struct { pb.UnimplementedEchoServer }
const ( // The default value of flow control window size in HTTP2 spec. defaultWindowSize = 65535 // The initial window size for flow control. initialWindowSize = defaultWindowSize // for an RPC infinity = time.Duration(math.MaxInt64) defaultClientKeepaliveTime = infinity defaultClientKeepaliveTimeout = 20 * time.Second defaultMaxStreamsClient = 100 defaultMaxConnectionIdle = infinity defaultMaxConnectionAge = infinity defaultMaxConnectionAgeGrace = infinity defaultServerKeepaliveTime = 2 * time.Hour defaultServerKeepaliveTimeout = 20 * time.Second defaultKeepalivePolicyMinTime = 5 * time.Minute // max window limit set by HTTP2 Specs. maxWindowSize = math.MaxInt32 // defaultWriteQuota is the default value for number of data // bytes that each stream can schedule before some of it being // flushed out. defaultWriteQuota = 64 * 1024 defaultClientMaxHeaderListSize = uint32(16 << 20) defaultServerMaxHeaderListSize = uint32(16 << 20) )
// 最大空闲时间 if kp.MaxConnectionIdle == 0 { kp.MaxConnectionIdle = defaultMaxConnectionIdle } // 最大存活时间 if kp.MaxConnectionAge == 0 { kp.MaxConnectionAge = defaultMaxConnectionAge } // Add a jitter to MaxConnectionAge. // 添加一个随机值 kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge) // 超出MaxConnectionAge之后的宽限时长,默认无限制,最小为 1s if kp.MaxConnectionAgeGrace == 0 { kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace }
......
go t.keepalive()
return t, nil }
// keepalive running in a separate goroutine does the following: // 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle. // 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge. // 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge. // 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection after an additional duration of keepalive.Timeout. func(t *http2Server) keepalive() { p := &ping{}
// True iff a ping has been sent, and no data has been received since then. outstandingPing := false
// Amount of time remaining before which we should receive an ACK for the // last sent ping. kpTimeoutLeft := time.Duration(0)
// Records the last value of t.lastRead before we go block on the timer. // This is required to check for read activity since then. prevNano := time.Now().UnixNano()
// Initialize the different timers to their default values. // 初始化不同的定时器 idleTimer := time.NewTimer(t.kp.MaxConnectionIdle) ageTimer := time.NewTimer(t.kp.MaxConnectionAge) kpTimer := time.NewTimer(t.kp.Time)
// 在 defer 中关闭定时器 deferfunc() { // We need to drain the underlying channel in these timers after a call // to Stop(), only if we are interested in resetting them. Clearly we // are not interested in resetting them here. idleTimer.Stop() ageTimer.Stop() kpTimer.Stop() }()
for { select { case <-idleTimer.C: // 空闲定时器 t.mu.Lock() idle := t.idle if idle.IsZero() { // The connection is non-idle. t.mu.Unlock() idleTimer.Reset(t.kp.MaxConnectionIdle) continue }
val := t.kp.MaxConnectionIdle - time.Since(idle) t.mu.Unlock() if val <= 0 { // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more. // Gracefully close the connection. t.Drain() return }
idleTimer.Reset(val) case <-ageTimer.C: // 最大连接时间定时器 t.Drain()
select { case <-ageTimer.C: // Close the connection after grace period. if logger.V(logLevel) { logger.Infof("transport: closing server transport due to maximum connection age.") } t.Close() case <-t.done: } return
case <-kpTimer.C: // 在指定时间没看到客户端活跃,则发送ping数据帧 lastRead := atomic.LoadInt64(&t.lastRead) if lastRead > prevNano { // There has been read activity since the last time we were // here. Setup the timer to fire at kp.Time seconds from // lastRead time and continue. outstandingPing = false kpTimer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano())) prevNano = lastRead continue }
if outstandingPing && kpTimeoutLeft <= 0 { if logger.V(logLevel) { logger.Infof("transport: closing server transport due to idleness.") } t.Close() return }
if !outstandingPing { if channelz.IsOn() { atomic.AddInt64(&t.czData.kpCount, 1) } t.controlBuf.put(p) kpTimeoutLeft = t.kp.Timeout outstandingPing = true } // The amount of time to sleep here is the minimum of kp.Time and // timeoutLeft. This will ensure that we wait only for kp.Time // before sending out the next ping (for cases where the ping is // acked). sleepDuration := minTime(t.kp.Time, kpTimeoutLeft) kpTimeoutLeft -= sleepDuration kpTimer.Reset(sleepDuration)
var addr = flag.String("addr", "localhost:50052", "the address to connect to")
var kacp = keepalive.ClientParameters{ Time: 10 * time.Second, // send pings every 10 seconds if there is no activity Timeout: time.Second, // wait 1 second for ping ack before considering the connection dead PermitWithoutStream: true, // send pings even without active streams }
funcmain() { flag.Parse()
conn, err := grpc.Dial(*addr, grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp)) if err != nil { log.Fatalf("did not connect: %v", err) } defer conn.Close()
c := pb.NewEchoClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) defer cancel() fmt.Println("Performing unary request") res, err := c.UnaryEcho(ctx, &pb.EchoRequest{Message: "keepalive demo"}) if err != nil { log.Fatalf("unexpected error from UnaryEcho: %v", err) } fmt.Println("RPC response:", res) select {} // Block forever; run with GODEBUG=http2debug=2 to observe ping frames and GOAWAYs due to idleness. }