layout | title | subtitle | date | categories | cover | tags |
---|---|---|---|---|---|---|
post |
探索m的一生 |
如何创建m,m的3种状态切换 |
2019-08-13 |
技术 |
Golang |
环境:
$ go version
go version go1.12.7 linux/amd64
$ uname -a
Linux wu-insparition 4.18.0-25-generic #26~18.04.1-Ubuntu SMP Thu Jun 27 07:28:31 UTC 2019 x86_64 x86_64 x86_64 GNU/Linux
go调度的实现还是挺复杂的,从最简单的m切入再好不过了。
m是一个结构体,在runtime中分配一个m通常使用new关键字new(m)
。整个runtime中唯一调用new(m)
的地方,就是allocm函数:
// Allocate a new m unassociated with any thread.
// Can use p for allocation context if needed.
// fn is recorded as the new m's m.mstartfn.
//
// This function is allowed to have write barriers even if the caller
// isn't because it borrows _p_.
//
//go:yeswritebarrierrec
func allocm(_p_ *p, fn func()) *m {
// 通过tls得到当前线程正在运行的g
// g还是g0,还不确定TODO
// 但是看到获取_g_的目的都是在使用_g_.m(当前线程),所以无论是g还是g0,_g_.m结果都一样
_g_ := getg()
_g_.m.locks++ // disable GC because it can be called from sysmon
if _g_.m.p == 0 {
// 如果当前线程没有p,就把为allocm函数准备的_p_
// 暂时借给当前线程_g_.m
acquirep(_p_) // temporarily borrow p for mallocs in this function
}
// Release the free M list. We need to do this somewhere and
// this may free up a stack we can use.
if sched.freem != nil {
lock(&sched.lock)
var newList *m
for freem := sched.freem; freem != nil; {
if freem.freeWait != 0 {
next := freem.freelink
freem.freelink = newList
newList = freem
freem = next
continue
}
// 正如freeWait的注释所说
// if == 0, safe to free g0 and delete m (atomic)
// 释放sched.freem中sched.freem.freeWait=0的m的系统栈(g0)
stackfree(freem.g0.stack)
freem = freem.freelink
}
sched.freem = newList
unlock(&sched.lock)
}
mp := new(m) // 在堆上分配一个m
mp.mstartfn = fn
// (1)给mp分配信号栈gsignal
// (2)将mp链入全局allm
mcommoninit(mp)
// In case of cgo or Solaris or Darwin, pthread_create will make us a stack.
// Windows and Plan 9 will layout sched stack on OS stack.
// 正如注释所说,如果是下面几种系统,g0栈通过创建线程分配
if iscgo || GOOS == "solaris" || GOOS == "windows" || GOOS == "plan9" || GOOS == "darwin" {
mp.g0 = malg(-1)
} else {
mp.g0 = malg(8192 * sys.StackGuardMultiplier) // 分配g0的栈,大小为8K
}
mp.g0.m = mp
if _p_ == _g_.m.p.ptr() {
releasep() // 这里与前面的acquirep(_p_)相呼应。
}
_g_.m.locks--
if _g_.m.locks == 0 && _g_.preempt { // restore the preemption request in case we've cleared it in newstack
_g_.stackguard0 = stackPreempt // TODO
}
return mp
}
注意这个函数的注释:创建一个没有与thread相关联的m,为什么?因为在allocm函数里,能做的仅仅是在用户空间分配一块m的堆内存。在这个函数里没有涉及到线程相关操作。那为什么通常说m对应一个线程呢?原因在后面解释。
在runtime里,调用allocm函数的地方有两处:
+--------+ +----------------+
| newm() | | oneNewExtraM() |
+-----+--+ +--+-------------+
| |
+----+ +---+
| |
+--v----v--+
| allocm() |
+-----+----+
|
v
+---+----+
| new(m) |
+--------+
下面先看newm函数:
// Create a new m. It will start off with a call to fn, or else the scheduler.
// fn needs to be static and not a heap allocated closure.
// May run with m.p==nil, so write barriers are not allowed.
//go:nowritebarrierrec
func newm(fn func(), _p_ *p) {
mp := allocm(_p_, fn) // 详细过程见上面
mp.nextp.set(_p_) //
mp.sigmask = initSigmask
...
newm1(mp)
}
func newm1(mp *m) {
...
execLock.rlock() // Prevent process clone.
newosproc(mp) // 开始创建os线程
execLock.runlock()
}
在此之前,我们在堆上分配的m还不能称作一个线程。因为还没有进行线程的创建,系统中线程数依旧是原来的数量。
下面,就要开始创建系统线程了。观察我们创建的m如何与系统线程相联系:
// located at runtime/os_linux.go
// May run with m.p==nil, so write barriers are not allowed.
//go:nowritebarrier
func newosproc(mp *m) {
stk := unsafe.Pointer(mp.g0.stack.hi) // 得到g0栈起始位置,说明新线程的用户栈就是g0的栈
...
// Disable signals during clone, so that the new thread starts
// with signals disabled. It will enable them in minit.
var oset sigset
sigprocmask(_SIG_SETMASK, &sigset_all, &oset)
// 这里就是创建线程的代码了:)
ret := clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(funcPC(mstart)))
...
}
// int32 clone(int32 flags, void *stk, M *mp, G *gp, void (*fn)(void));
TEXT runtime·clone(SB),NOSPLIT,$0
MOVL flags+0(FP), DI
MOVQ stk+8(FP), SI
MOVQ $0, DX
MOVQ $0, R10
// Copy mp, gp, fn off parent stack for use by child.
// Careful: Linux system call clobbers CX and R11.
MOVQ mp+16(FP), R8
MOVQ gp+24(FP), R9
MOVQ fn+32(FP), R12
MOVL $SYS_clone, AX // 系统调用号
SYSCALL
// In parent, return.
CMPQ AX, $0 // 如果返回值为0,是子进程返回了;如果非0,则是父进程返回了
JEQ 3(PC)
MOVL AX, ret+40(FP) // 如果是父进程返回,直接将rax中的返回值(子进程的pid)放进ret变量中
RET
// 注意下面的指令,只有子进程才会执行
// In child, on new stack.
MOVQ SI, SP // 切换到子进程的栈上
// If g or m are nil, skip Go-related setup.
// 通过前面的分析可以知道可以知道,m和m.g0都有值
CMPQ R8, $0 // m
JEQ nog
CMPQ R9, $0 // g
JEQ nog
// Initialize m->procid to Linux tid
// 因为现在已经在子进程中了,所以只有通过系统调用的方式获得子进程的pid
MOVL $SYS_gettid, AX
SYSCALL
MOVQ AX, m_procid(R8)
// Set FS to point at m->tls.
// 通过SYS_arch_prctl系统调用来设置m.tls为当前进程(线程)的tls
LEAQ m_tls(R8), DI
CALL runtime·settls(SB)
// In child, set up new stack
get_tls(CX)
MOVQ R8, g_m(R9) // g0.m = m
MOVQ R9, g(CX) // 将g0设置为当前m正在运行的g
CALL runtime·stackcheck(SB)
nog:
// Call fn
CALL R12 // 调用mstart函数,never return
...
在上面代码的分析中,在不同的语境下,我将线程与进程混用,这是可以的(参考内核书籍进程相关章节,如「ULK」)。另外,对plan9不熟悉的话,可以使用objdump -d xxx | grep 'runtime.clone'
查看x86汇编,对照着看。关于settls函数的内容,参考我上一篇文章Go程序启动源码分析。
如果从内核的角度来看,父进程创建子进程后就回到了自己的control flow中。至于子进程什么时候被调度(mstart函数什么时候执行),那就要看内核的调度了,但是子进程迟早是会执行的。如果子进程被内核调度,回到用户空间执行的第一条语句就是CMPQ AX, $0
,由于rax(AX)= 0,所以下一条指令是MOVQ SI, SP
。
需要注意的是,runtime/proc.go里面有两个函数名字挺相近的函数,一个是mstart函数;另一个是startm函数。不要将二者弄混了。
现在,我们已经分析完newm函数了。再完善一下调用链:
+-----------------------+ +----------+ +--------+ +-------------------------+
| startTemplateThread() | | startm() | | main() | | startTheWorldWithSema() |
+----------------+------+ ++---------+ +---+----+ +-------+-----------------+
+---------+ | +---------------+ |
| | | +-----------------------------+
v v v v
++-+-+--++ +----------------+
| newm() | | oneNewExtraM() |
+-----+--+ +--+-------------+
| |
+----+ +---+
| |
+--v----v--+
| allocm() |
+-----+----+
v
+---+----+
| new(m) |
+--------+
在继续讲解之前,先介绍一下关于m的几个状态。与p和g不同的是,m的状态没有在其结构体中与成员的形式体现出来。但是只要稍加仔细阅读源码,会发现m也是有状态的。
在源码文档中将m分为两个状态:(1)spinning(2)non-spinning
其实non-spinning状态还可以细分为两个状态:(1)parking(2)executing。ps:这两个状态的名词是我自己造的,不具有权威性^_^。在下面的讲解中,将会看到线程m如何在这几个状态间来回的切换。
一个新建的线程,回到用户空间,执行的第一个函数就是mstart函数。注意,这里特别强调第一个函数,第一条指令在前面已经讲过。
// Called to start an M.
//
// This must not split the stack because we may not even have stack
// bounds set up yet.
//
// May run during STW (because it doesn't have a P yet), so write
// barriers are not allowed.
//
//go:nosplit
//go:nowritebarrierrec
func mstart() {
_g_ := getg()
...
// Initialize stack guards so that we can start calling
// both Go and C functions with stack growth prologues.
_g_.stackguard0 = _g_.stack.lo + _StackGuard
_g_.stackguard1 = _g_.stackguard0
mstart1()
...
}
这个函数会在两个地方调用(它们都是在g0上开始运行):
- (1)在newosproc函数里面,调用clone(sys_linux_amd64.s),创建系统线程,传入的参数有mstart
- (2)是在程序启动的时候,在asm_amd64.s里面
func mstart1() {
_g_ := getg()
if _g_ != _g_.m.g0 {
/*更加说明了,mstart函数是运行在g0上的,但是执行当前代码的线程可能是m0,也可能是其他的*/
throw("bad runtime·mstart")
}
// Record the caller for use as the top of stack in mcall and
// for terminating the thread.
// We're never coming back to mstart1 after we call schedule,
// so other calls can reuse the current frame.
save(getcallerpc(), getcallersp()) /*更新当前g0的sched.pc和sched.sp*/
asminit() /*在asm.amd64.s中,没有函数体*/
minit() /*在os_linux.go中,*/
// Install signal handlers; after minit so that minit can
// prepare the thread to be able to handle the signals.
if _g_.m == &m0 {
mstartm0() /*只有在程序启动时,才会执行到这里*/
}
if fn := _g_.m.mstartfn; fn != nil {
/*
执行创建m时传入的参数fn(想要执行的函数),如果mstartfn != nil,只有下面3种情况:
(1)mspinning(将m的状态设置为spinning)
(2)templateThread(一直循环)
(3)sysmon(一直循环)
这三种情况都不需要p
*/
fn()
}
/*
从这里开始,说明m执行完了自己的任务函数m.mstartfn
下面应该开始去执行任务g了,前提是拿到一个p
*/
if _g_.m != &m0 {
acquirep(_g_.m.nextp.ptr()) /*给m绑定一个p,在创建线程的时候,newm中设置了m.nextp*/
_g_.m.nextp = 0
}
/*执行前,m的状态为spinning*/
schedule()
}
// One round of scheduler: find a runnable goroutine and execute it.
// Never returns.
func schedule() {
_g_ := getg()
...
top:
...
var gp *g
var inheritTime bool
...
if gp == nil {
// Check the global runnable queue once in a while to ensure fairness.
// Otherwise two goroutines can completely occupy the local runqueue
// by constantly respawning each other.
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
/*
从全局队列sched.runq中获取一个,注意如果globrunqget第二个参数为0的话
globrunqget函数的效果有一点不同
*/
gp = globrunqget(_g_.m.p.ptr(), 1)
unlock(&sched.lock)
}
}
if gp == nil {
/*
当前线程的本地队列_g_.m.p.runq中获取一个q
*/
gp, inheritTime = runqget(_g_.m.p.ptr())
if gp != nil && _g_.m.spinning {
throw("schedule: spinning with local work")
}
}
if gp == nil {
/*
在findrunnable函数里,就是处于spinning状态
*/
gp, inheritTime = findrunnable() // blocks until work is available
}
// This thread is going to run a goroutine and is not spinning anymore,
// so if it was marked as spinning we need to reset it now and potentially
// start a new spinning M.
/*
留意上面的注释⬆⬆⬆
*/
if _g_.m.spinning {
/*
因为已经找到可运行的goroutine了,将本线程m设置为non-spinning状态
说明一个处于spinning状态,说明它正在寻找goroutine
*/
resetspinning()
}
/*
至此m开始处于non-spinning状态,也就是executing状态
这个线程m通过gogo函数,开始执行gp这个goroutine
*/
...
execute(gp, inheritTime)
}
就这样,线程在schedule函数中不断的找可运行的g,并来回的在g与g0中切换。schedule()
里面值得注意的是resetspinning函数:
func resetspinning() {
_g_ := getg()
if !_g_.m.spinning {
throw("resetspinning: not a spinning m")
}
_g_.m.spinning = false
nmspinning := atomic.Xadd(&sched.nmspinning, -1)
if int32(nmspinning) < 0 {
throw("findrunnable: negative nmspinning")
}
// M wakeup policy is deliberately somewhat conservative, so check if we
// need to wakeup another P here. See "Worker thread parking/unparking"
// comment at the top of the file for details.
/*
见文件开始的注释:
We unpark an additional thread when we ready a goroutine if
(1) there is an idle P and there are no "spinning" worker threads.
*/
if nmspinning == 0 && atomic.Load(&sched.npidle) > 0 {
wakep()
}
}
在runtime中所有调用wakep()
的地方均要满足两个条件:
- (1)atomic.Load(&sched.npidle) != 0
- (2)atomic.Load(&sched.nmspinning) == 0
// Tries to add one more P to execute G's.
// Called when a G is made runnable (newproc, ready).
func wakep() {
// be conservative about spinning threads
if !atomic.Cas(&sched.nmspinning, 0, 1) {
return
}
/*
(1)第一个参数为nil,说明在startm函数内部会去找空闲的p
(2)第二个参数为true,说明startm()的caller已经将sched.nmspinning加1了
*/
startm(nil, true)
}
// Schedules some M to run the p (creates an M if necessary).
// If p==nil, tries to get an idle P, if no idle P's does nothing.
// May run with m.p==nil, so write barriers are not allowed.
// If spinning is set, the caller has incremented nmspinning and startm will
// either decrement nmspinning or set m.spinning in the newly started M.
//go:nowritebarrierrec
func startm(_p_ *p, spinning bool) {
lock(&sched.lock)
if _p_ == nil {
_p_ = pidleget() /*如果没有指定p,就从全局空闲p队列中取一个出来*/
...
}
mp := mget() /*Note:从全局sched中获取一个m,有可能为nil,也即是unpark m*/
unlock(&sched.lock)
if mp == nil {
var fn func()
if spinning {
// The caller incremented nmspinning, so set m.spinning in the new M.
fn = mspinning
}
newm(fn, _p_) //= 通过newm新建的线程m,从内核回来后执行mstart函数,然后执行fn,将自己设置为spinning状态
return
}
if mp.spinning {
/*通过mget函数获取的m,是从sched.midle中获得的m,应该处于non-spinning(parking)状态*/
throw("startm: m is spinning")
}
if mp.nextp != 0 {
/*处于parking状态的m不应该有p*/
throw("startm: m has p")
}
if spinning && !runqempty(_p_) {
throw("startm: p has runnable gs")
}
// The caller incremented nmspinning, so set m.spinning in the new M.
mp.spinning = spinning
mp.nextp.set(_p_)
notewakeup(&mp.park)
}
凡是调用了startm函数,说明有p了(要么指定p;要么全局sched.pidle中有空闲p),为了不浪费cpu资源,应该尽快找一个m来。
与startm函数对应的是stopm函数:
func stopm() {
_g_ := getg()
if _g_.m.locks != 0 {
throw("stopm holding locks")
}
if _g_.m.p != 0 {
throw("stopm holding p")
}
if _g_.m.spinning {
throw("stopm spinning") /*说明在调用stopm前,必须处于no-spinning状态*/
}
lock(&sched.lock)
mput(_g_.m)
unlock(&sched.lock)
notesleep(&_g_.m.park)
noteclear(&_g_.m.park)
acquirep(_g_.m.nextp.ptr())
_g_.m.nextp = 0
}
这个函数通过mput()
将当前线程m放进全局队列sched.midle中。由此线程m变成了non-spinning(parking)状态。