funcmain() { ... if GOARCH != "wasm" { // no threads on wasm yet, so no sysmon // For runtime_syscall_doAllThreadsSyscall, we // register sysmon is not ready for the world to be // stopped. // !!! 找到了 启动sysmon的代码 // 在系统栈内生成一个新的M来启动sysmon atomic.Store(&sched.sysmonStarting, 1) systemstack(func() { newm(sysmon, nil, -1) }) } ... }
// 创建一个新的系统线程 // Create a new m. It will start off with a call to fn, or else the scheduler. // fn needs to be static and not a heap allocated closure. // May run with m.p==nil, so write barriers are not allowed. // // id is optional pre-allocated m ID. Omit by passing -1. //go:nowritebarrierrec funcnewm(fn func(), _p_ *p, id int64) { // 获取GPM中M结构体,并进行部分字段的初始化 // allocm方法非常重要!!! // 该方法获取并初始化M的结构体,还在M里面设置了系统线程将要执行的方法fn,这里是sysmon mp := allocm(_p_, fn, id) ... // M在Go中属于用户态代码中的一个结构体,跟系统线程是一对一的关系 // 每个系统线程怎么执行代码,从哪里开始执行,则是由M的结构体中参数来指明 // 创建GPM中结构体M结构体之后,开始创建对应的底层系统线程 newm1(mp) }
// 给M分配一个系统线程 // Allocate a new m unassociated with any thread. // Can use p for allocation context if needed. // fn is recorded as the new m's m.mstartfn. // id is optional pre-allocated m ID. Omit by passing -1. // // This function is allowed to have write barriers even if the caller // isn't because it borrows _p_. // //go:yeswritebarrierrec funcallocm(_p_ *p, fn func(), id int64) *m { ... // 创建新的M,并且进行一些初始化操作 mp := new(m) // M 的执行方法, 在runtime.mstart()方法中最终调用fn mp.mstartfn = fn ... }
// 通过clone创建系统线程 // May run with m.p==nil, so write barriers are not allowed. //go:nowritebarrier funcnewosproc(mp *m) { ... // Disable signals during clone, so that the new thread starts // with signals disabled. It will enable them in minit. // // 注意: // 第5个参数 mstart 是 runtime.mstart ret := clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(funcPC(mstart))) ... }
// mstart是一个M的执行入口 // mstart is the entry-point for new Ms. // // This must not split the stack because we may not even have stack // bounds set up yet. // // May run during STW (because it doesn't have a P yet), so write // barriers are not allowed. // //go:nosplit //go:nowritebarrierrec funcmstart() { ... mstart1() ... }
// Always runs without a P, so write barriers are not allowed. // //go:nowritebarrierrec funcsysmon() { ... for { ... // 获取超过10ms的netpoll结果 // // poll network if not polled for more than 10ms lastpoll := int64(atomic.Load64(&sched.lastpoll)) if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now { atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now)) list := netpoll(0) // non-blocking - returns list of goroutines if !list.empty() { // Need to decrement number of idle locked M's // (pretending that one more is running) before injectglist. // Otherwise it can lead to the following situation: // injectglist grabs all P's but before it starts M's to run the P's, // another M returns from syscall, finishes running its G, // observes that there is no work to do and no other running M's // and reports deadlock. incidlelocked(-1) injectglist(&list) incidlelocked(1) } }
...
// 抢夺syscall长时间阻塞的P,向长时间阻塞的P发起抢占调度 // // retake P's blocked in syscalls // and preempt long running G's if retake(now) != 0 { idle = 0 } else { idle++ } // 检查是否需要强制执行垃圾回收 // check if we need to force a GC if t := (gcTrigger{kind: gcTriggerTime, now: now}); t.test() && atomic.Load(&forcegc.idle) != 0 { lock(&forcegc.lock) forcegc.idle = 0 var list gList list.push(forcegc.g) injectglist(&list) unlock(&forcegc.lock) } ... } ... }