funcsendTime(c interface{}, seq uintptr) { // Non-blocking send of time on c. // Used in NewTimer, it cannot block anyway (buffer). // Used in NewTicker, dropping sends on the floor is // the desired behavior when the reader gets behind, // because the sends are periodic. select { case c.(chan Time) <- Now(): default: } }
//type Timer struct { // C <-chan Time // r runtimeTimer //}
// 创建一次性定时器 // 源码位置 time/sleep.go funcNewTimer(d Duration) *Timer { c := make(chan Time, 1) t := &Timer{ C: c, r: runtimeTimer{ when: when(d), // 定时器指定的触发时间点 f: sendTime, // 到达了指定时间,会向通道c中发送数据 arg: c, }, } startTimer(&t.r) // 添加到定时器堆 return t }
//type Ticker struct { // C <-chan Time // r runtimeTimer //}
// 创建周期性定时器 // 源码位置 time/tick.go funcNewTicker(d Duration) *Ticker { if d <= 0 { panic(errors.New("non-positive interval for NewTicker")) } // Give the channel a 1-element time buffer. // If the client falls behind while reading, we drop ticks // on the floor until the client catches up. c := make(chan Time, 1) t := &Ticker{ C: c, r: runtimeTimer{ when: when(d), // 定时器指定的触发时间点 period: int64(d), // 通过该字段表明这是个周期性的定时器 f: sendTime, // 到达了指定时间,会向通道c中发送数据 arg: c, }, } startTimer(&t.r) // 添加到定时器堆 return t }
// 添加定时器到timer堆 // startTimer adds t to the timer heap. //go:linkname startTimer time.startTimer funcstartTimer(t *timer) { if raceenabled { racerelease(unsafe.Pointer(t)) } addtimer(t) // 添加定时器 }
// addtimer adds a timer to the current P. // This should only be called with a newly created timer. // That avoids the risk of changing the when field of a timer in some P's heap, // which could cause the heap to become unsorted. funcaddtimer(t *timer) { // when must never be negative; otherwise runtimer will overflow // during its delta calculation and never expire other runtime timers. if t.when < 0 { t.when = maxWhen } if t.status != timerNoStatus { throw("addtimer called with initialized timer") } t.status = timerWaiting
// doaddtimer adds t to the current P's heap. // The caller must have locked the timers for pp. funcdoaddtimer(pp *p, t *timer) { // Timers rely on the network poller, so make sure the poller // has started. if netpollInited == 0 { netpollGenericInit() }
if t.pp != 0 { throw("doaddtimer: P already set in timer") } t.pp.set(pp) i := len(pp.timers) pp.timers = append(pp.timers, t) // pp.timers 为具体的定时器切片,追加当前定时器t siftupTimer(pp.timers, i) if t == pp.timers[0] { atomic.Store64(&pp.timer0When, uint64(t.when)) } atomic.Xadd(&pp.numTimers, 1) }
/* //源码位置 runtime/runtime2.go type p struct { ...... // The when field of the first entry on the timer heap. // This is updated using atomic functions. // This is 0 if the timer heap is empty. timer0When uint64 ...... // Actions to take at some time. This is used to implement the // standard library's time package. // Must hold timersLock to access. timers []*timer ...... } */
// 执行一个定时器 // runOneTimer runs a single timer. // The caller must have locked the timers for pp. // This will temporarily unlock the timers while running the timer function. //go:systemstack funcrunOneTimer(pp *p, t *timer, now int64) { if raceenabled { ppcur := getg().m.p.ptr() if ppcur.timerRaceCtx == 0 { ppcur.timerRaceCtx = racegostart(funcPC(runtimer) + sys.PCQuantum) } raceacquirectx(ppcur.timerRaceCtx, unsafe.Pointer(t)) }
if t.period > 0 { // 发现是周期性定时器 // Leave in heap but adjust next time to fire. delta := t.when - now t.when += t.period * (1 + -delta/t.period) // 下次定时器触发的时间 siftdownTimer(pp.timers, 0) if !atomic.Cas(&t.status, timerRunning, timerWaiting) { badTimer() } updateTimer0When(pp) // 调整timer堆 } else { // 发现是一次性定时器,则从timer堆中移除 // Remove from heap. dodeltimer0(pp) // 从timer堆中移除 if !atomic.Cas(&t.status, timerRunning, timerNoStatus) { badTimer() } }
if raceenabled { // Temporarily use the current P's racectx for g0. gp := getg() if gp.racectx != 0 { throw("runOneTimer: unexpected racectx") } gp.racectx = gp.m.p.ptr().timerRaceCtx }