Commit 2193ad7f by Ian Lance Taylor

runtime: copy more of scheduler from Go 1.7 runtime

    
    This started by moving procresize from C to Go so that we can pass the
    right type to the memory allocator when allocating a p, which forced
    the gomaxprocs variable to move from C to Go, and everything else
    followed from that.
    
    Reviewed-on: https://go-review.googlesource.com/34916

From-SVN: r244236
parent d1261ac6
eef0fb3b092dc22d9830cac15a536760da5d033a
189ea81cc758e000325fd6cca7882c252d33f8f0
The first line of this file holds the git revision number of the last
merge done from the gofrontend repository.
......@@ -14,7 +14,25 @@ import (
// change the current setting.
// The number of logical CPUs on the local machine can be queried with NumCPU.
// This call will go away when the scheduler improves.
func GOMAXPROCS(n int) int
func GOMAXPROCS(n int) int {
if n > _MaxGomaxprocs {
n = _MaxGomaxprocs
}
lock(&sched.lock)
ret := int(gomaxprocs)
unlock(&sched.lock)
if n <= 0 || n == ret {
return ret
}
stopTheWorld("GOMAXPROCS")
// newprocs will be processed by startTheWorld
newprocs = int32(n)
startTheWorld()
return ret
}
// NumCPU returns the number of logical CPUs usable by the current process.
//
......
......@@ -7,6 +7,7 @@
package runtime
import (
"runtime/internal/atomic"
"unsafe"
)
......@@ -47,39 +48,6 @@ func GCMask(x interface{}) (ret []byte) {
return nil
}
//func testSchedLocalQueue()
//func testSchedLocalQueueSteal()
//
//func RunSchedLocalQueueTest() {
// testSchedLocalQueue()
//}
//
//func RunSchedLocalQueueStealTest() {
// testSchedLocalQueueSteal()
//}
//var StringHash = stringHash
//var BytesHash = bytesHash
//var Int32Hash = int32Hash
//var Int64Hash = int64Hash
//var EfaceHash = efaceHash
//var IfaceHash = ifaceHash
//var MemclrBytes = memclrBytes
var HashLoad = &hashLoad
// entry point for testing
//func GostringW(w []uint16) (s string) {
// s = gostringw(&w[0])
// return
//}
//var Gostringnocopy = gostringnocopy
//var Maxstring = &maxstring
//type Uintreg uintreg
/*
func RunSchedLocalQueueTest() {
_p_ := new(p)
gs := make([]g, len(_p_.runq))
......@@ -177,14 +145,26 @@ func RunSchedLocalQueueEmptyTest(iters int) {
}
}
var StringHash = stringHash
var BytesHash = bytesHash
var Int32Hash = int32Hash
var Int64Hash = int64Hash
var EfaceHash = efaceHash
var IfaceHash = ifaceHash
var MemclrBytes = memclrBytes
*/
//var StringHash = stringHash
//var BytesHash = bytesHash
//var Int32Hash = int32Hash
//var Int64Hash = int64Hash
//var EfaceHash = efaceHash
//var IfaceHash = ifaceHash
//var MemclrBytes = memclrBytes
var HashLoad = &hashLoad
// entry point for testing
//func GostringW(w []uint16) (s string) {
// s = gostringw(&w[0])
// return
//}
//var Gostringnocopy = gostringnocopy
//var Maxstring = &maxstring
//type Uintreg uintreg
var Open = open
var Close = closefd
......
......@@ -149,13 +149,9 @@ func notewakeup(n *note) {
func notesleep(n *note) {
gp := getg()
// Currently OK to sleep in non-g0 for gccgo. It happens in
// stoptheworld because we have not implemented preemption.
// if gp != gp.m.g0 {
// throw("notesleep not on g0")
// }
if gp != gp.m.g0 {
throw("notesleep not on g0")
}
for atomic.Load(key32(&n.key)) == 0 {
gp.m.blocked = true
futexsleep(key32(&n.key), 0, -1)
......@@ -202,10 +198,13 @@ func notetsleep_internal(n *note, ns int64) bool {
}
func notetsleep(n *note, ns int64) bool {
gp := getg()
if gp != gp.m.g0 && gp.m.preemptoff != "" {
throw("notetsleep not on g0")
}
// Currently OK to sleep in non-g0 for gccgo. It happens in
// stoptheworld because our version of systemstack does not
// change to g0.
// gp := getg()
// if gp != gp.m.g0 && gp.m.preemptoff != "" {
// throw("notetsleep not on g0")
// }
return notetsleep_internal(n, ns)
}
......
......@@ -162,13 +162,9 @@ func notewakeup(n *note) {
func notesleep(n *note) {
gp := getg()
// Currently OK to sleep in non-g0 for gccgo. It happens in
// stoptheworld because we have not implemented preemption.
// if gp != gp.m.g0 {
// throw("notesleep not on g0")
// }
if gp != gp.m.g0 {
throw("notesleep not on g0")
}
semacreate(gp.m)
if !atomic.Casuintptr(&n.key, 0, uintptr(unsafe.Pointer(gp.m))) {
// Must be locked (got wakeup).
......@@ -257,7 +253,8 @@ func notetsleep(n *note, ns int64) bool {
gp := getg()
// Currently OK to sleep in non-g0 for gccgo. It happens in
// stoptheworld because we have not implemented preemption.
// stoptheworld because our version of systemstack does not
// change to g0.
// if gp != gp.m.g0 && gp.m.preemptoff != "" {
// throw("notetsleep not on g0")
// }
......
......@@ -11,15 +11,45 @@ import (
// Functions temporarily called by C code.
//go:linkname newextram runtime.newextram
//go:linkname acquirep runtime.acquirep
//go:linkname releasep runtime.releasep
//go:linkname incidlelocked runtime.incidlelocked
//go:linkname checkdead runtime.checkdead
//go:linkname sysmon runtime.sysmon
//go:linkname schedtrace runtime.schedtrace
//go:linkname allgadd runtime.allgadd
//go:linkname ready runtime.ready
//go:linkname gcprocs runtime.gcprocs
//go:linkname needaddgcproc runtime.needaddgcproc
//go:linkname stopm runtime.stopm
//go:linkname handoffp runtime.handoffp
//go:linkname wakep runtime.wakep
//go:linkname stoplockedm runtime.stoplockedm
//go:linkname schedule runtime.schedule
//go:linkname execute runtime.execute
//go:linkname procresize runtime.procresize
//go:linkname helpgc runtime.helpgc
//go:linkname stopTheWorldWithSema runtime.stopTheWorldWithSema
//go:linkname startTheWorldWithSema runtime.startTheWorldWithSema
//go:linkname mput runtime.mput
//go:linkname mget runtime.mget
//go:linkname globrunqput runtime.globrunqput
//go:linkname pidleget runtime.pidleget
//go:linkname runqempty runtime.runqempty
//go:linkname runqput runtime.runqput
// Functions temporarily in C that have not yet been ported.
func allocm(*p, bool, *unsafe.Pointer, *uintptr) *m
func malg(bool, bool, *unsafe.Pointer, *uintptr) *g
func startm(*p, bool)
func newm(unsafe.Pointer, *p)
func gchelper()
func getfingwait() bool
func getfingwake() bool
func wakefing() *g
// C functions for ucontext management.
func gogo(*g)
func setGContext()
func makeGContext(*g, unsafe.Pointer, uintptr)
func getTraceback(me, gp *g)
......@@ -30,6 +60,12 @@ func getTraceback(me, gp *g)
// it is closed, meaning cgocallbackg can reliably receive from it.
var main_init_done chan bool
func goready(gp *g, traceskip int) {
systemstack(func() {
ready(gp, traceskip, true)
})
}
var (
allgs []*g
allglock mutex
......@@ -56,6 +92,117 @@ func allgadd(gp *g) {
unlock(&allglock)
}
func dumpgstatus(gp *g) {
_g_ := getg()
print("runtime: gp: gp=", gp, ", goid=", gp.goid, ", gp->atomicstatus=", readgstatus(gp), "\n")
print("runtime: g: g=", _g_, ", goid=", _g_.goid, ", g->atomicstatus=", readgstatus(_g_), "\n")
}
// Mark gp ready to run.
func ready(gp *g, traceskip int, next bool) {
if trace.enabled {
traceGoUnpark(gp, traceskip)
}
status := readgstatus(gp)
// Mark runnable.
_g_ := getg()
_g_.m.locks++ // disable preemption because it can be holding p in a local var
if status&^_Gscan != _Gwaiting {
dumpgstatus(gp)
throw("bad g->status in ready")
}
// status is Gwaiting or Gscanwaiting, make Grunnable and put on runq
casgstatus(gp, _Gwaiting, _Grunnable)
runqput(_g_.m.p.ptr(), gp, next)
if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 { // TODO: fast atomic
wakep()
}
_g_.m.locks--
}
func gcprocs() int32 {
// Figure out how many CPUs to use during GC.
// Limited by gomaxprocs, number of actual CPUs, and MaxGcproc.
lock(&sched.lock)
n := gomaxprocs
if n > ncpu {
n = ncpu
}
if n > _MaxGcproc {
n = _MaxGcproc
}
if n > sched.nmidle+1 { // one M is currently running
n = sched.nmidle + 1
}
unlock(&sched.lock)
return n
}
func needaddgcproc() bool {
lock(&sched.lock)
n := gomaxprocs
if n > ncpu {
n = ncpu
}
if n > _MaxGcproc {
n = _MaxGcproc
}
n -= sched.nmidle + 1 // one M is currently running
unlock(&sched.lock)
return n > 0
}
func helpgc(nproc int32) {
_g_ := getg()
lock(&sched.lock)
pos := 0
for n := int32(1); n < nproc; n++ { // one M is currently running
if allp[pos].mcache == _g_.m.mcache {
pos++
}
mp := mget()
if mp == nil {
throw("gcprocs inconsistency")
}
mp.helpgc = n
mp.p.set(allp[pos])
mp.mcache = allp[pos].mcache
pos++
notewakeup(&mp.park)
}
unlock(&sched.lock)
}
// freezeStopWait is a large value that freezetheworld sets
// sched.stopwait to in order to request that all Gs permanently stop.
const freezeStopWait = 0x7fffffff
// Similar to stopTheWorld but best-effort and can be called several times.
// There is no reverse operation, used during crashing.
// This function must not lock any mutexes.
func freezetheworld() {
// stopwait and preemption requests can be lost
// due to races with concurrently executing threads,
// so try several times
for i := 0; i < 5; i++ {
// this should tell the scheduler to not start any new goroutines
sched.stopwait = freezeStopWait
atomic.Store(&sched.gcwaiting, 1)
// this should stop running goroutines
if !preemptall() {
break // no running goroutines
}
usleep(1000)
}
// to be sure
usleep(1000)
preemptall()
usleep(1000)
}
// All reads and writes of g's status go through readgstatus, casgstatus
// castogscanstatus, casfrom_Gscanstatus.
//go:nosplit
......@@ -123,6 +270,217 @@ func casgstatus(gp *g, oldval, newval uint32) {
}
}
// stopTheWorld stops all P's from executing goroutines, interrupting
// all goroutines at GC safe points and records reason as the reason
// for the stop. On return, only the current goroutine's P is running.
// stopTheWorld must not be called from a system stack and the caller
// must not hold worldsema. The caller must call startTheWorld when
// other P's should resume execution.
//
// stopTheWorld is safe for multiple goroutines to call at the
// same time. Each will execute its own stop, and the stops will
// be serialized.
//
// This is also used by routines that do stack dumps. If the system is
// in panic or being exited, this may not reliably stop all
// goroutines.
func stopTheWorld(reason string) {
semacquire(&worldsema, false)
getg().m.preemptoff = reason
systemstack(stopTheWorldWithSema)
}
// startTheWorld undoes the effects of stopTheWorld.
func startTheWorld() {
systemstack(startTheWorldWithSema)
// worldsema must be held over startTheWorldWithSema to ensure
// gomaxprocs cannot change while worldsema is held.
semrelease(&worldsema)
getg().m.preemptoff = ""
}
// Holding worldsema grants an M the right to try to stop the world
// and prevents gomaxprocs from changing concurrently.
var worldsema uint32 = 1
// stopTheWorldWithSema is the core implementation of stopTheWorld.
// The caller is responsible for acquiring worldsema and disabling
// preemption first and then should stopTheWorldWithSema on the system
// stack:
//
// semacquire(&worldsema, false)
// m.preemptoff = "reason"
// systemstack(stopTheWorldWithSema)
//
// When finished, the caller must either call startTheWorld or undo
// these three operations separately:
//
// m.preemptoff = ""
// systemstack(startTheWorldWithSema)
// semrelease(&worldsema)
//
// It is allowed to acquire worldsema once and then execute multiple
// startTheWorldWithSema/stopTheWorldWithSema pairs.
// Other P's are able to execute between successive calls to
// startTheWorldWithSema and stopTheWorldWithSema.
// Holding worldsema causes any other goroutines invoking
// stopTheWorld to block.
func stopTheWorldWithSema() {
_g_ := getg()
// If we hold a lock, then we won't be able to stop another M
// that is blocked trying to acquire the lock.
if _g_.m.locks > 0 {
throw("stopTheWorld: holding locks")
}
lock(&sched.lock)
sched.stopwait = gomaxprocs
atomic.Store(&sched.gcwaiting, 1)
preemptall()
// stop current P
_g_.m.p.ptr().status = _Pgcstop // Pgcstop is only diagnostic.
sched.stopwait--
// try to retake all P's in Psyscall status
for i := 0; i < int(gomaxprocs); i++ {
p := allp[i]
s := p.status
if s == _Psyscall && atomic.Cas(&p.status, s, _Pgcstop) {
if trace.enabled {
traceGoSysBlock(p)
traceProcStop(p)
}
p.syscalltick++
sched.stopwait--
}
}
// stop idle P's
for {
p := pidleget()
if p == nil {
break
}
p.status = _Pgcstop
sched.stopwait--
}
wait := sched.stopwait > 0
unlock(&sched.lock)
// wait for remaining P's to stop voluntarily
if wait {
for {
// wait for 100us, then try to re-preempt in case of any races
if notetsleep(&sched.stopnote, 100*1000) {
noteclear(&sched.stopnote)
break
}
preemptall()
}
}
if sched.stopwait != 0 {
throw("stopTheWorld: not stopped")
}
for i := 0; i < int(gomaxprocs); i++ {
p := allp[i]
if p.status != _Pgcstop {
throw("stopTheWorld: not stopped")
}
}
}
func mhelpgc() {
_g_ := getg()
_g_.m.helpgc = -1
}
func startTheWorldWithSema() {
_g_ := getg()
_g_.m.locks++ // disable preemption because it can be holding p in a local var
gp := netpoll(false) // non-blocking
injectglist(gp)
add := needaddgcproc()
lock(&sched.lock)
procs := gomaxprocs
if newprocs != 0 {
procs = newprocs
newprocs = 0
}
p1 := procresize(procs)
sched.gcwaiting = 0
if sched.sysmonwait != 0 {
sched.sysmonwait = 0
notewakeup(&sched.sysmonnote)
}
unlock(&sched.lock)
for p1 != nil {
p := p1
p1 = p1.link.ptr()
if p.m != 0 {
mp := p.m.ptr()
p.m = 0
if mp.nextp != 0 {
throw("startTheWorld: inconsistent mp->nextp")
}
mp.nextp.set(p)
notewakeup(&mp.park)
} else {
// Start M to run P. Do not start another M below.
newm(nil, p)
add = false
}
}
// Wakeup an additional proc in case we have excessive runnable goroutines
// in local queues or in the global queue. If we don't, the proc will park itself.
// If we have lots of excessive work, resetspinning will unpark additional procs as necessary.
if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 {
wakep()
}
if add {
// If GC could have used another helper proc, start one now,
// in the hope that it will be available next time.
// It would have been even better to start it before the collection,
// but doing so requires allocating memory, so it's tricky to
// coordinate. This lazy approach works out in practice:
// we don't mind if the first couple gc rounds don't have quite
// the maximum number of procs.
newm(unsafe.Pointer(funcPC(mhelpgc)), nil)
}
_g_.m.locks--
}
// runSafePointFn runs the safe point function, if any, for this P.
// This should be called like
//
// if getg().m.p.runSafePointFn != 0 {
// runSafePointFn()
// }
//
// runSafePointFn must be checked on any transition in to _Pidle or
// _Psyscall to avoid a race where forEachP sees that the P is running
// just before the P goes into _Pidle/_Psyscall and neither forEachP
// nor the P run the safe-point function.
func runSafePointFn() {
p := getg().m.p.ptr()
// Resolve the race between forEachP running the safe-point
// function on this P's behalf and this P running the
// safe-point function directly.
if !atomic.Cas(&p.runSafePointFn, 1, 0) {
return
}
sched.safePointFn(p)
lock(&sched.lock)
sched.safePointWait--
if sched.safePointWait == 0 {
notewakeup(&sched.safePointNote)
}
unlock(&sched.lock)
}
// needm is called when a cgo callback happens on a
// thread without an m (a thread not created by Go).
// In this case, needm is expected to find an m to use
......@@ -245,9 +603,6 @@ func oneNewExtraM() {
mp.lockedg = gp
gp.lockedm = mp
gp.goid = int64(atomic.Xadd64(&sched.goidgen, 1))
if raceenabled {
gp.racectx = racegostart(funcPC(newextram))
}
// put on allg for garbage collector
allgadd(gp)
......@@ -365,156 +720,1122 @@ func unlockextra(mp *m) {
atomic.Storeuintptr(&extram, uintptr(unsafe.Pointer(mp)))
}
// Check for deadlock situation.
// The check is based on number of running M's, if 0 -> deadlock.
func checkdead() {
// For -buildmode=c-shared or -buildmode=c-archive it's OK if
// there are no running goroutines. The calling program is
// assumed to be running.
if islibrary || isarchive {
return
// Stops execution of the current m until new work is available.
// Returns with acquired P.
func stopm() {
_g_ := getg()
if _g_.m.locks != 0 {
throw("stopm holding locks")
}
if _g_.m.p != 0 {
throw("stopm holding p")
}
if _g_.m.spinning {
throw("stopm spinning")
}
// If we are dying because of a signal caught on an already idle thread,
// freezetheworld will cause all running threads to block.
// And runtime will essentially enter into deadlock state,
// except that there is a thread that will call exit soon.
if panicking > 0 {
return
retry:
lock(&sched.lock)
mput(_g_.m)
unlock(&sched.lock)
notesleep(&_g_.m.park)
noteclear(&_g_.m.park)
if _g_.m.helpgc != 0 {
gchelper()
_g_.m.helpgc = 0
_g_.m.mcache = nil
_g_.m.p = 0
goto retry
}
acquirep(_g_.m.nextp.ptr())
_g_.m.nextp = 0
}
// -1 for sysmon
run := sched.mcount - sched.nmidle - sched.nmidlelocked - 1
if run > 0 {
// Hands off P from syscall or locked M.
// Always runs without a P, so write barriers are not allowed.
//go:nowritebarrier
func handoffp(_p_ *p) {
// handoffp must start an M in any situation where
// findrunnable would return a G to run on _p_.
// if it has local work, start it straight away
if !runqempty(_p_) || sched.runqsize != 0 {
startm(_p_, false)
return
}
if run < 0 {
print("runtime: checkdead: nmidle=", sched.nmidle, " nmidlelocked=", sched.nmidlelocked, " mcount=", sched.mcount, "\n")
throw("checkdead: inconsistent counts")
// if it has GC work, start it straight away
if gcBlackenEnabled != 0 && gcMarkWorkAvailable(_p_) {
startm(_p_, false)
return
}
grunning := 0
lock(&allglock)
for i := 0; i < len(allgs); i++ {
gp := allgs[i]
if isSystemGoroutine(gp) {
continue
// no local work, check that there are no spinning/idle M's,
// otherwise our help is not required
if atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) == 0 && atomic.Cas(&sched.nmspinning, 0, 1) { // TODO: fast atomic
startm(_p_, true)
return
}
s := readgstatus(gp)
switch s &^ _Gscan {
case _Gwaiting:
grunning++
case _Grunnable,
_Grunning,
_Gsyscall:
unlock(&allglock)
print("runtime: checkdead: find g ", gp.goid, " in status ", s, "\n")
throw("checkdead: runnable g")
lock(&sched.lock)
if sched.gcwaiting != 0 {
_p_.status = _Pgcstop
sched.stopwait--
if sched.stopwait == 0 {
notewakeup(&sched.stopnote)
}
unlock(&sched.lock)
return
}
unlock(&allglock)
if grunning == 0 { // possible if main goroutine calls runtime·Goexit()
throw("no goroutines (main called runtime.Goexit) - deadlock!")
if _p_.runSafePointFn != 0 && atomic.Cas(&_p_.runSafePointFn, 1, 0) {
sched.safePointFn(_p_)
sched.safePointWait--
if sched.safePointWait == 0 {
notewakeup(&sched.safePointNote)
}
// Maybe jump time forward for playground.
gp := timejump()
if gp != nil {
// Temporarily commented out for gccgo.
// For gccgo this code will never run anyhow.
// casgstatus(gp, _Gwaiting, _Grunnable)
// globrunqput(gp)
// _p_ := pidleget()
// if _p_ == nil {
// throw("checkdead: no p for timer")
// }
// mp := mget()
// if mp == nil {
// // There should always be a free M since
// // nothing is running.
// throw("checkdead: no m for timer")
// }
// nmp.nextp.set(_p_)
// notewakeup(&mp.park)
// return
}
if sched.runqsize != 0 {
unlock(&sched.lock)
startm(_p_, false)
return
}
// If this is the last running P and nobody is polling network,
// need to wakeup another M to poll network.
if sched.npidle == uint32(gomaxprocs-1) && atomic.Load64(&sched.lastpoll) != 0 {
unlock(&sched.lock)
startm(_p_, false)
return
}
pidleput(_p_)
unlock(&sched.lock)
}
getg().m.throwing = -1 // do not dump full stacks
throw("all goroutines are asleep - deadlock!")
// Tries to add one more P to execute G's.
// Called when a G is made runnable (newproc, ready).
func wakep() {
// be conservative about spinning threads
if !atomic.Cas(&sched.nmspinning, 0, 1) {
return
}
startm(nil, true)
}
var starttime int64
// Stops execution of the current m that is locked to a g until the g is runnable again.
// Returns with acquired P.
func stoplockedm() {
_g_ := getg()
func schedtrace(detailed bool) {
now := nanotime()
if starttime == 0 {
starttime = now
if _g_.m.lockedg == nil || _g_.m.lockedg.lockedm != _g_.m {
throw("stoplockedm: inconsistent locking")
}
if _g_.m.p != 0 {
// Schedule another M to run this p.
_p_ := releasep()
handoffp(_p_)
}
incidlelocked(1)
// Wait until another thread schedules lockedg again.
notesleep(&_g_.m.park)
noteclear(&_g_.m.park)
status := readgstatus(_g_.m.lockedg)
if status&^_Gscan != _Grunnable {
print("runtime:stoplockedm: g is not Grunnable or Gscanrunnable\n")
dumpgstatus(_g_)
throw("stoplockedm: not runnable")
}
acquirep(_g_.m.nextp.ptr())
_g_.m.nextp = 0
}
gomaxprocs := int32(GOMAXPROCS(0))
// Schedules the locked m to run the locked gp.
// May run during STW, so write barriers are not allowed.
//go:nowritebarrier
func startlockedm(gp *g) {
_g_ := getg()
lock(&sched.lock)
print("SCHED ", (now-starttime)/1e6, "ms: gomaxprocs=", gomaxprocs, " idleprocs=", sched.npidle, " threads=", sched.mcount, " spinningthreads=", sched.nmspinning, " idlethreads=", sched.nmidle, " runqueue=", sched.runqsize)
if detailed {
print(" gcwaiting=", sched.gcwaiting, " nmidlelocked=", sched.nmidlelocked, " stopwait=", sched.stopwait, " sysmonwait=", sched.sysmonwait, "\n")
mp := gp.lockedm
if mp == _g_.m {
throw("startlockedm: locked to me")
}
// We must be careful while reading data from P's, M's and G's.
// Even if we hold schedlock, most data can be changed concurrently.
// E.g. (p->m ? p->m->id : -1) can crash if p->m changes from non-nil to nil.
for i := int32(0); i < gomaxprocs; i++ {
_p_ := allp[i]
if _p_ == nil {
continue
if mp.nextp != 0 {
throw("startlockedm: m has p")
}
mp := _p_.m.ptr()
h := atomic.Load(&_p_.runqhead)
t := atomic.Load(&_p_.runqtail)
if detailed {
id := int32(-1)
if mp != nil {
id = mp.id
// directly handoff current P to the locked m
incidlelocked(-1)
_p_ := releasep()
mp.nextp.set(_p_)
notewakeup(&mp.park)
stopm()
}
// Stops the current m for stopTheWorld.
// Returns when the world is restarted.
func gcstopm() {
_g_ := getg()
if sched.gcwaiting == 0 {
throw("gcstopm: not waiting for gc")
}
print(" P", i, ": status=", _p_.status, " schedtick=", _p_.schedtick, " syscalltick=", _p_.syscalltick, " m=", id, " runqsize=", t-h, " gfreecnt=", _p_.gfreecnt, "\n")
} else {
// In non-detailed mode format lengths of per-P run queues as:
// [len1 len2 len3 len4]
print(" ")
if i == 0 {
print("[")
if _g_.m.spinning {
_g_.m.spinning = false
// OK to just drop nmspinning here,
// startTheWorld will unpark threads as necessary.
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("gcstopm: negative nmspinning")
}
print(t - h)
if i == gomaxprocs-1 {
print("]\n")
}
_p_ := releasep()
lock(&sched.lock)
_p_.status = _Pgcstop
sched.stopwait--
if sched.stopwait == 0 {
notewakeup(&sched.stopnote)
}
unlock(&sched.lock)
stopm()
}
// Schedules gp to run on the current M.
// If inheritTime is true, gp inherits the remaining time in the
// current time slice. Otherwise, it starts a new time slice.
// Never returns.
func execute(gp *g, inheritTime bool) {
_g_ := getg()
casgstatus(gp, _Grunnable, _Grunning)
gp.waitsince = 0
gp.preempt = false
if !inheritTime {
_g_.m.p.ptr().schedtick++
}
_g_.m.curg = gp
gp.m = _g_.m
if !detailed {
unlock(&sched.lock)
return
// Check whether the profiler needs to be turned on or off.
hz := sched.profilehz
if _g_.m.profilehz != hz {
resetcpuprofiler(hz)
}
for mp := allm(); mp != nil; mp = mp.alllink {
_p_ := mp.p.ptr()
gp := mp.curg
lockedg := mp.lockedg
id1 := int32(-1)
if _p_ != nil {
id1 = _p_.id
if trace.enabled {
// GoSysExit has to happen when we have a P, but before GoStart.
// So we emit it here.
if gp.syscallsp != 0 && gp.sysblocktraced {
traceGoSysExit(gp.sysexitticks)
}
id2 := int64(-1)
if gp != nil {
id2 = gp.goid
traceGoStart()
}
id3 := int64(-1)
if lockedg != nil {
id3 = lockedg.goid
gogo(gp)
}
// Finds a runnable goroutine to execute.
// Tries to steal from other P's, get g from global queue, poll network.
func findrunnable() (gp *g, inheritTime bool) {
_g_ := getg()
// The conditions here and in handoffp must agree: if
// findrunnable would return a G to run, handoffp must start
// an M.
top:
_p_ := _g_.m.p.ptr()
if sched.gcwaiting != 0 {
gcstopm()
goto top
}
if _p_.runSafePointFn != 0 {
runSafePointFn()
}
if getfingwait() && getfingwake() {
if gp := wakefing(); gp != nil {
ready(gp, 0, true)
}
print(" M", mp.id, ": p=", id1, " curg=", id2, " mallocing=", mp.mallocing, " throwing=", mp.throwing, " preemptoff=", mp.preemptoff, ""+" locks=", mp.locks, " dying=", mp.dying, " helpgc=", mp.helpgc, " spinning=", mp.spinning, " blocked=", mp.blocked, " lockedg=", id3, "\n")
}
lock(&allglock)
for gi := 0; gi < len(allgs); gi++ {
// local runq
if gp, inheritTime := runqget(_p_); gp != nil {
return gp, inheritTime
}
// global runq
if sched.runqsize != 0 {
lock(&sched.lock)
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
if gp != nil {
return gp, false
}
}
// Poll network.
// This netpoll is only an optimization before we resort to stealing.
// We can safely skip it if there a thread blocked in netpoll already.
// If there is any kind of logical race with that blocked thread
// (e.g. it has already returned from netpoll, but does not set lastpoll yet),
// this thread will do blocking netpoll below anyway.
if netpollinited() && sched.lastpoll != 0 {
if gp := netpoll(false); gp != nil { // non-blocking
// netpoll returns list of goroutines linked by schedlink.
injectglist(gp.schedlink.ptr())
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}
}
// Steal work from other P's.
procs := uint32(gomaxprocs)
if atomic.Load(&sched.npidle) == procs-1 {
// Either GOMAXPROCS=1 or everybody, except for us, is idle already.
// New work can appear from returning syscall/cgocall, network or timers.
// Neither of that submits to local run queues, so no point in stealing.
goto stop
}
// If number of spinning M's >= number of busy P's, block.
// This is necessary to prevent excessive CPU consumption
// when GOMAXPROCS>>1 but the program parallelism is low.
if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) { // TODO: fast atomic
goto stop
}
if !_g_.m.spinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
for i := 0; i < 4; i++ {
for enum := stealOrder.start(fastrand1()); !enum.done(); enum.next() {
if sched.gcwaiting != 0 {
goto top
}
stealRunNextG := i > 2 // first look for ready queues with more than 1 g
if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil {
return gp, false
}
}
}
stop:
// We have nothing to do. If we're in the GC mark phase, can
// safely scan and blacken objects, and have work to do, run
// idle-time marking rather than give up the P.
if gcBlackenEnabled != 0 && _p_.gcBgMarkWorker != 0 && gcMarkWorkAvailable(_p_) {
_p_.gcMarkWorkerMode = gcMarkWorkerIdleMode
gp := _p_.gcBgMarkWorker.ptr()
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}
// return P and block
lock(&sched.lock)
if sched.gcwaiting != 0 || _p_.runSafePointFn != 0 {
unlock(&sched.lock)
goto top
}
if sched.runqsize != 0 {
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
return gp, false
}
if releasep() != _p_ {
throw("findrunnable: wrong p")
}
pidleput(_p_)
unlock(&sched.lock)
// Delicate dance: thread transitions from spinning to non-spinning state,
// potentially concurrently with submission of new goroutines. We must
// drop nmspinning first and then check all per-P queues again (with
// #StoreLoad memory barrier in between). If we do it the other way around,
// another thread can submit a goroutine after we've checked all run queues
// but before we drop nmspinning; as the result nobody will unpark a thread
// to run the goroutine.
// If we discover new work below, we need to restore m.spinning as a signal
// for resetspinning to unpark a new worker thread (because there can be more
// than one starving goroutine). However, if after discovering new work
// we also observe no idle Ps, it is OK to just park the current thread:
// the system is fully loaded so no spinning threads are required.
// Also see "Worker thread parking/unparking" comment at the top of the file.
wasSpinning := _g_.m.spinning
if _g_.m.spinning {
_g_.m.spinning = false
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("findrunnable: negative nmspinning")
}
}
// check all runqueues once again
for i := 0; i < int(gomaxprocs); i++ {
_p_ := allp[i]
if _p_ != nil && !runqempty(_p_) {
lock(&sched.lock)
_p_ = pidleget()
unlock(&sched.lock)
if _p_ != nil {
acquirep(_p_)
if wasSpinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
goto top
}
break
}
}
// poll network
if netpollinited() && atomic.Xchg64(&sched.lastpoll, 0) != 0 {
if _g_.m.p != 0 {
throw("findrunnable: netpoll with p")
}
if _g_.m.spinning {
throw("findrunnable: netpoll with spinning")
}
gp := netpoll(true) // block until new work is available
atomic.Store64(&sched.lastpoll, uint64(nanotime()))
if gp != nil {
lock(&sched.lock)
_p_ = pidleget()
unlock(&sched.lock)
if _p_ != nil {
acquirep(_p_)
injectglist(gp.schedlink.ptr())
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}
injectglist(gp)
}
}
stopm()
goto top
}
func resetspinning() {
_g_ := getg()
if !_g_.m.spinning {
throw("resetspinning: not a spinning m")
}
_g_.m.spinning = false
nmspinning := atomic.Xadd(&sched.nmspinning, -1)
if int32(nmspinning) < 0 {
throw("findrunnable: negative nmspinning")
}
// M wakeup policy is deliberately somewhat conservative, so check if we
// need to wakeup another P here. See "Worker thread parking/unparking"
// comment at the top of the file for details.
if nmspinning == 0 && atomic.Load(&sched.npidle) > 0 {
wakep()
}
}
// Injects the list of runnable G's into the scheduler.
// Can run concurrently with GC.
func injectglist(glist *g) {
if glist == nil {
return
}
if trace.enabled {
for gp := glist; gp != nil; gp = gp.schedlink.ptr() {
traceGoUnpark(gp, 0)
}
}
lock(&sched.lock)
var n int
for n = 0; glist != nil; n++ {
gp := glist
glist = gp.schedlink.ptr()
casgstatus(gp, _Gwaiting, _Grunnable)
globrunqput(gp)
}
unlock(&sched.lock)
for ; n != 0 && sched.npidle != 0; n-- {
startm(nil, false)
}
}
// One round of scheduler: find a runnable goroutine and execute it.
// Never returns.
func schedule() {
_g_ := getg()
if _g_.m.locks != 0 {
throw("schedule: holding locks")
}
if _g_.m.lockedg != nil {
stoplockedm()
execute(_g_.m.lockedg, false) // Never returns.
}
top:
if sched.gcwaiting != 0 {
gcstopm()
goto top
}
if _g_.m.p.ptr().runSafePointFn != 0 {
runSafePointFn()
}
var gp *g
var inheritTime bool
if trace.enabled || trace.shutdown {
gp = traceReader()
if gp != nil {
casgstatus(gp, _Gwaiting, _Grunnable)
traceGoUnpark(gp, 0)
}
}
if gp == nil && gcBlackenEnabled != 0 {
gp = gcController.findRunnableGCWorker(_g_.m.p.ptr())
}
if gp == nil {
// Check the global runnable queue once in a while to ensure fairness.
// Otherwise two goroutines can completely occupy the local runqueue
// by constantly respawning each other.
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
gp = globrunqget(_g_.m.p.ptr(), 1)
unlock(&sched.lock)
}
}
if gp == nil {
gp, inheritTime = runqget(_g_.m.p.ptr())
if gp != nil && _g_.m.spinning {
throw("schedule: spinning with local work")
}
// Because gccgo does not implement preemption as a stack check,
// we need to check for preemption here for fairness.
// Otherwise goroutines on the local queue may starve
// goroutines on the global queue.
// Since we preempt by storing the goroutine on the global
// queue, this is the only place we need to check preempt.
if gp != nil && gp.preempt {
gp.preempt = false
lock(&sched.lock)
globrunqput(gp)
unlock(&sched.lock)
goto top
}
}
if gp == nil {
gp, inheritTime = findrunnable() // blocks until work is available
}
// This thread is going to run a goroutine and is not spinning anymore,
// so if it was marked as spinning we need to reset it now and potentially
// start a new spinning M.
if _g_.m.spinning {
resetspinning()
}
if gp.lockedm != nil {
// Hands off own p to the locked m,
// then blocks waiting for a new p.
startlockedm(gp)
goto top
}
execute(gp, inheritTime)
}
// Purge all cached G's from gfree list to the global list.
func gfpurge(_p_ *p) {
lock(&sched.gflock)
for _p_.gfreecnt != 0 {
_p_.gfreecnt--
gp := _p_.gfree
_p_.gfree = gp.schedlink.ptr()
gp.schedlink.set(sched.gfree)
sched.gfree = gp
sched.ngfree++
}
unlock(&sched.gflock)
}
// Change number of processors. The world is stopped, sched is locked.
// gcworkbufs are not being modified by either the GC or
// the write barrier code.
// Returns list of Ps with local work, they need to be scheduled by the caller.
func procresize(nprocs int32) *p {
old := gomaxprocs
if old < 0 || old > _MaxGomaxprocs || nprocs <= 0 || nprocs > _MaxGomaxprocs {
throw("procresize: invalid arg")
}
if trace.enabled {
traceGomaxprocs(nprocs)
}
// update statistics
now := nanotime()
if sched.procresizetime != 0 {
sched.totaltime += int64(old) * (now - sched.procresizetime)
}
sched.procresizetime = now
// initialize new P's
for i := int32(0); i < nprocs; i++ {
pp := allp[i]
if pp == nil {
pp = new(p)
pp.id = i
pp.status = _Pgcstop
pp.sudogcache = pp.sudogbuf[:0]
pp.deferpool = pp.deferpoolbuf[:0]
atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
}
if pp.mcache == nil {
if old == 0 && i == 0 {
if getg().m.mcache == nil {
throw("missing mcache?")
}
pp.mcache = getg().m.mcache // bootstrap
} else {
pp.mcache = allocmcache()
}
}
}
// free unused P's
for i := nprocs; i < old; i++ {
p := allp[i]
if trace.enabled {
if p == getg().m.p.ptr() {
// moving to p[0], pretend that we were descheduled
// and then scheduled again to keep the trace sane.
traceGoSched()
traceProcStop(p)
}
}
// move all runnable goroutines to the global queue
for p.runqhead != p.runqtail {
// pop from tail of local queue
p.runqtail--
gp := p.runq[p.runqtail%uint32(len(p.runq))].ptr()
// push onto head of global queue
globrunqputhead(gp)
}
if p.runnext != 0 {
globrunqputhead(p.runnext.ptr())
p.runnext = 0
}
// if there's a background worker, make it runnable and put
// it on the global queue so it can clean itself up
if gp := p.gcBgMarkWorker.ptr(); gp != nil {
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
globrunqput(gp)
// This assignment doesn't race because the
// world is stopped.
p.gcBgMarkWorker.set(nil)
}
for i := range p.sudogbuf {
p.sudogbuf[i] = nil
}
p.sudogcache = p.sudogbuf[:0]
for i := range p.deferpoolbuf {
p.deferpoolbuf[i] = nil
}
p.deferpool = p.deferpoolbuf[:0]
freemcache(p.mcache)
p.mcache = nil
gfpurge(p)
traceProcFree(p)
p.status = _Pdead
// can't free P itself because it can be referenced by an M in syscall
}
_g_ := getg()
if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs {
// continue to use the current P
_g_.m.p.ptr().status = _Prunning
} else {
// release the current P and acquire allp[0]
if _g_.m.p != 0 {
_g_.m.p.ptr().m = 0
}
_g_.m.p = 0
_g_.m.mcache = nil
p := allp[0]
p.m = 0
p.status = _Pidle
acquirep(p)
if trace.enabled {
traceGoStart()
}
}
var runnablePs *p
for i := nprocs - 1; i >= 0; i-- {
p := allp[i]
if _g_.m.p.ptr() == p {
continue
}
p.status = _Pidle
if runqempty(p) {
pidleput(p)
} else {
p.m.set(mget())
p.link.set(runnablePs)
runnablePs = p
}
}
stealOrder.reset(uint32(nprocs))
var int32p *int32 = &gomaxprocs // make compiler check that gomaxprocs is an int32
atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs))
return runnablePs
}
// Associate p and the current m.
func acquirep(_p_ *p) {
acquirep1(_p_)
// have p; write barriers now allowed
_g_ := getg()
_g_.m.mcache = _p_.mcache
if trace.enabled {
traceProcStart()
}
}
// May run during STW, so write barriers are not allowed.
//go:nowritebarrier
func acquirep1(_p_ *p) {
_g_ := getg()
if _g_.m.p != 0 || _g_.m.mcache != nil {
throw("acquirep: already in go")
}
if _p_.m != 0 || _p_.status != _Pidle {
id := int32(0)
if _p_.m != 0 {
id = _p_.m.ptr().id
}
print("acquirep: p->m=", _p_.m, "(", id, ") p->status=", _p_.status, "\n")
throw("acquirep: invalid p state")
}
_g_.m.p.set(_p_)
_p_.m.set(_g_.m)
_p_.status = _Prunning
}
// Disassociate p and the current m.
func releasep() *p {
_g_ := getg()
if _g_.m.p == 0 || _g_.m.mcache == nil {
throw("releasep: invalid arg")
}
_p_ := _g_.m.p.ptr()
if _p_.m.ptr() != _g_.m || _p_.mcache != _g_.m.mcache || _p_.status != _Prunning {
print("releasep: m=", _g_.m, " m->p=", _g_.m.p.ptr(), " p->m=", _p_.m, " m->mcache=", _g_.m.mcache, " p->mcache=", _p_.mcache, " p->status=", _p_.status, "\n")
throw("releasep: invalid p state")
}
if trace.enabled {
traceProcStop(_g_.m.p.ptr())
}
_g_.m.p = 0
_g_.m.mcache = nil
_p_.m = 0
_p_.status = _Pidle
return _p_
}
func incidlelocked(v int32) {
lock(&sched.lock)
sched.nmidlelocked += v
if v > 0 {
checkdead()
}
unlock(&sched.lock)
}
// Check for deadlock situation.
// The check is based on number of running M's, if 0 -> deadlock.
func checkdead() {
// For -buildmode=c-shared or -buildmode=c-archive it's OK if
// there are no running goroutines. The calling program is
// assumed to be running.
if islibrary || isarchive {
return
}
// If we are dying because of a signal caught on an already idle thread,
// freezetheworld will cause all running threads to block.
// And runtime will essentially enter into deadlock state,
// except that there is a thread that will call exit soon.
if panicking > 0 {
return
}
// -1 for sysmon
run := sched.mcount - sched.nmidle - sched.nmidlelocked - 1
if run > 0 {
return
}
if run < 0 {
print("runtime: checkdead: nmidle=", sched.nmidle, " nmidlelocked=", sched.nmidlelocked, " mcount=", sched.mcount, "\n")
throw("checkdead: inconsistent counts")
}
grunning := 0
lock(&allglock)
for i := 0; i < len(allgs); i++ {
gp := allgs[i]
if isSystemGoroutine(gp) {
continue
}
s := readgstatus(gp)
switch s &^ _Gscan {
case _Gwaiting:
grunning++
case _Grunnable,
_Grunning,
_Gsyscall:
unlock(&allglock)
print("runtime: checkdead: find g ", gp.goid, " in status ", s, "\n")
throw("checkdead: runnable g")
}
}
unlock(&allglock)
if grunning == 0 { // possible if main goroutine calls runtime·Goexit()
throw("no goroutines (main called runtime.Goexit) - deadlock!")
}
// Maybe jump time forward for playground.
gp := timejump()
if gp != nil {
// Temporarily commented out for gccgo.
// For gccgo this code will never run anyhow.
// casgstatus(gp, _Gwaiting, _Grunnable)
// globrunqput(gp)
// _p_ := pidleget()
// if _p_ == nil {
// throw("checkdead: no p for timer")
// }
// mp := mget()
// if mp == nil {
// // There should always be a free M since
// // nothing is running.
// throw("checkdead: no m for timer")
// }
// nmp.nextp.set(_p_)
// notewakeup(&mp.park)
// return
}
getg().m.throwing = -1 // do not dump full stacks
throw("all goroutines are asleep - deadlock!")
}
// forcegcperiod is the maximum time in nanoseconds between garbage
// collections. If we go this long without a garbage collection, one
// is forced to run.
//
// This is a variable for testing purposes. It normally doesn't change.
var forcegcperiod int64 = 2 * 60 * 1e9
// Always runs without a P, so write barriers are not allowed.
//
//go:nowritebarrierrec
func sysmon() {
// If a heap span goes unused for 5 minutes after a garbage collection,
// we hand it back to the operating system.
scavengelimit := int64(5 * 60 * 1e9)
if debug.scavenge > 0 {
// Scavenge-a-lot for testing.
forcegcperiod = 10 * 1e6
scavengelimit = 20 * 1e6
}
lastscavenge := nanotime()
nscavenge := 0
lasttrace := int64(0)
idle := 0 // how many cycles in succession we had not wokeup somebody
delay := uint32(0)
for {
if idle == 0 { // start with 20us sleep...
delay = 20
} else if idle > 50 { // start doubling the sleep after 1ms...
delay *= 2
}
if delay > 10*1000 { // up to 10ms
delay = 10 * 1000
}
usleep(delay)
if debug.schedtrace <= 0 && (sched.gcwaiting != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs)) { // TODO: fast atomic
lock(&sched.lock)
if atomic.Load(&sched.gcwaiting) != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs) {
atomic.Store(&sched.sysmonwait, 1)
unlock(&sched.lock)
// Make wake-up period small enough
// for the sampling to be correct.
maxsleep := forcegcperiod / 2
if scavengelimit < forcegcperiod {
maxsleep = scavengelimit / 2
}
notetsleep(&sched.sysmonnote, maxsleep)
lock(&sched.lock)
atomic.Store(&sched.sysmonwait, 0)
noteclear(&sched.sysmonnote)
idle = 0
delay = 20
}
unlock(&sched.lock)
}
// poll network if not polled for more than 10ms
lastpoll := int64(atomic.Load64(&sched.lastpoll))
now := nanotime()
unixnow := unixnanotime()
if lastpoll != 0 && lastpoll+10*1000*1000 < now {
atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
gp := netpoll(false) // non-blocking - returns list of goroutines
if gp != nil {
// Need to decrement number of idle locked M's
// (pretending that one more is running) before injectglist.
// Otherwise it can lead to the following situation:
// injectglist grabs all P's but before it starts M's to run the P's,
// another M returns from syscall, finishes running its G,
// observes that there is no work to do and no other running M's
// and reports deadlock.
incidlelocked(-1)
injectglist(gp)
incidlelocked(1)
}
}
// retake P's blocked in syscalls
// and preempt long running G's
if retake(now) != 0 {
idle = 0
} else {
idle++
}
// check if we need to force a GC
lastgc := int64(atomic.Load64(&memstats.last_gc))
if gcphase == _GCoff && lastgc != 0 && unixnow-lastgc > forcegcperiod && atomic.Load(&forcegc.idle) != 0 {
lock(&forcegc.lock)
forcegc.idle = 0
forcegc.g.schedlink = 0
injectglist(forcegc.g)
unlock(&forcegc.lock)
}
// scavenge heap once in a while
if lastscavenge+scavengelimit/2 < now {
mheap_.scavenge(int32(nscavenge), uint64(now), uint64(scavengelimit))
lastscavenge = now
nscavenge++
}
if debug.schedtrace > 0 && lasttrace+int64(debug.schedtrace)*1000000 <= now {
lasttrace = now
schedtrace(debug.scheddetail > 0)
}
}
}
var pdesc [_MaxGomaxprocs]struct {
schedtick uint32
schedwhen int64
syscalltick uint32
syscallwhen int64
}
// forcePreemptNS is the time slice given to a G before it is
// preempted.
const forcePreemptNS = 10 * 1000 * 1000 // 10ms
func retake(now int64) uint32 {
n := 0
for i := int32(0); i < gomaxprocs; i++ {
_p_ := allp[i]
if _p_ == nil {
continue
}
pd := &pdesc[i]
s := _p_.status
if s == _Psyscall {
// Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us).
t := int64(_p_.syscalltick)
if int64(pd.syscalltick) != t {
pd.syscalltick = uint32(t)
pd.syscallwhen = now
continue
}
// On the one hand we don't want to retake Ps if there is no other work to do,
// but on the other hand we want to retake them eventually
// because they can prevent the sysmon thread from deep sleep.
if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
continue
}
// Need to decrement number of idle locked M's
// (pretending that one more is running) before the CAS.
// Otherwise the M from which we retake can exit the syscall,
// increment nmidle and report deadlock.
incidlelocked(-1)
if atomic.Cas(&_p_.status, s, _Pidle) {
if trace.enabled {
traceGoSysBlock(_p_)
traceProcStop(_p_)
}
n++
_p_.syscalltick++
handoffp(_p_)
}
incidlelocked(1)
} else if s == _Prunning {
// Preempt G if it's running for too long.
t := int64(_p_.schedtick)
if int64(pd.schedtick) != t {
pd.schedtick = uint32(t)
pd.schedwhen = now
continue
}
if pd.schedwhen+forcePreemptNS > now {
continue
}
preemptone(_p_)
}
}
return uint32(n)
}
// Tell all goroutines that they have been preempted and they should stop.
// This function is purely best-effort. It can fail to inform a goroutine if a
// processor just started running it.
// No locks need to be held.
// Returns true if preemption request was issued to at least one goroutine.
func preemptall() bool {
res := false
for i := int32(0); i < gomaxprocs; i++ {
_p_ := allp[i]
if _p_ == nil || _p_.status != _Prunning {
continue
}
if preemptone(_p_) {
res = true
}
}
return res
}
// Tell the goroutine running on processor P to stop.
// This function is purely best-effort. It can incorrectly fail to inform the
// goroutine. It can send inform the wrong goroutine. Even if it informs the
// correct goroutine, that goroutine might ignore the request if it is
// simultaneously executing newstack.
// No lock needs to be held.
// Returns true if preemption request was issued.
// The actual preemption will happen at some point in the future
// and will be indicated by the gp->status no longer being
// Grunning
func preemptone(_p_ *p) bool {
mp := _p_.m.ptr()
if mp == nil || mp == getg().m {
return false
}
gp := mp.curg
if gp == nil || gp == mp.g0 {
return false
}
gp.preempt = true
// At this point the gc implementation sets gp.stackguard0 to
// a value that causes the goroutine to suspend itself.
// gccgo has no support for this, and it's hard to support.
// The split stack code reads a value from its TCB.
// We have no way to set a value in the TCB of a different thread.
// And, of course, not all systems support split stack anyhow.
// Checking the field in the g is expensive, since it requires
// loading the g from TLS. The best mechanism is likely to be
// setting a global variable and figuring out a way to efficiently
// check that global variable.
//
// For now we check gp.preempt in schedule and mallocgc,
// which is at least better than doing nothing at all.
return true
}
var starttime int64
func schedtrace(detailed bool) {
now := nanotime()
if starttime == 0 {
starttime = now
}
lock(&sched.lock)
print("SCHED ", (now-starttime)/1e6, "ms: gomaxprocs=", gomaxprocs, " idleprocs=", sched.npidle, " threads=", sched.mcount, " spinningthreads=", sched.nmspinning, " idlethreads=", sched.nmidle, " runqueue=", sched.runqsize)
if detailed {
print(" gcwaiting=", sched.gcwaiting, " nmidlelocked=", sched.nmidlelocked, " stopwait=", sched.stopwait, " sysmonwait=", sched.sysmonwait, "\n")
}
// We must be careful while reading data from P's, M's and G's.
// Even if we hold schedlock, most data can be changed concurrently.
// E.g. (p->m ? p->m->id : -1) can crash if p->m changes from non-nil to nil.
for i := int32(0); i < gomaxprocs; i++ {
_p_ := allp[i]
if _p_ == nil {
continue
}
mp := _p_.m.ptr()
h := atomic.Load(&_p_.runqhead)
t := atomic.Load(&_p_.runqtail)
if detailed {
id := int32(-1)
if mp != nil {
id = mp.id
}
print(" P", i, ": status=", _p_.status, " schedtick=", _p_.schedtick, " syscalltick=", _p_.syscalltick, " m=", id, " runqsize=", t-h, " gfreecnt=", _p_.gfreecnt, "\n")
} else {
// In non-detailed mode format lengths of per-P run queues as:
// [len1 len2 len3 len4]
print(" ")
if i == 0 {
print("[")
}
print(t - h)
if i == gomaxprocs-1 {
print("]\n")
}
}
}
if !detailed {
unlock(&sched.lock)
return
}
for mp := allm(); mp != nil; mp = mp.alllink {
_p_ := mp.p.ptr()
gp := mp.curg
lockedg := mp.lockedg
id1 := int32(-1)
if _p_ != nil {
id1 = _p_.id
}
id2 := int64(-1)
if gp != nil {
id2 = gp.goid
}
id3 := int64(-1)
if lockedg != nil {
id3 = lockedg.goid
}
print(" M", mp.id, ": p=", id1, " curg=", id2, " mallocing=", mp.mallocing, " throwing=", mp.throwing, " preemptoff=", mp.preemptoff, ""+" locks=", mp.locks, " dying=", mp.dying, " helpgc=", mp.helpgc, " spinning=", mp.spinning, " blocked=", mp.blocked, " lockedg=", id3, "\n")
}
lock(&allglock)
for gi := 0; gi < len(allgs); gi++ {
gp := allgs[gi]
mp := gp.m
lockedm := gp.lockedm
......@@ -531,3 +1852,416 @@ func schedtrace(detailed bool) {
unlock(&allglock)
unlock(&sched.lock)
}
// Put mp on midle list.
// Sched must be locked.
// May run during STW, so write barriers are not allowed.
//go:nowritebarrier
func mput(mp *m) {
mp.schedlink = sched.midle
sched.midle.set(mp)
sched.nmidle++
checkdead()
}
// Try to get an m from midle list.
// Sched must be locked.
// May run during STW, so write barriers are not allowed.
//go:nowritebarrier
func mget() *m {
mp := sched.midle.ptr()
if mp != nil {
sched.midle = mp.schedlink
sched.nmidle--
}
return mp
}
// Put gp on the global runnable queue.
// Sched must be locked.
// May run during STW, so write barriers are not allowed.
//go:nowritebarrier
func globrunqput(gp *g) {
gp.schedlink = 0
if sched.runqtail != 0 {
sched.runqtail.ptr().schedlink.set(gp)
} else {
sched.runqhead.set(gp)
}
sched.runqtail.set(gp)
sched.runqsize++
}
// Put gp at the head of the global runnable queue.
// Sched must be locked.
// May run during STW, so write barriers are not allowed.
//go:nowritebarrier
func globrunqputhead(gp *g) {
gp.schedlink = sched.runqhead
sched.runqhead.set(gp)
if sched.runqtail == 0 {
sched.runqtail.set(gp)
}
sched.runqsize++
}
// Put a batch of runnable goroutines on the global runnable queue.
// Sched must be locked.
func globrunqputbatch(ghead *g, gtail *g, n int32) {
gtail.schedlink = 0
if sched.runqtail != 0 {
sched.runqtail.ptr().schedlink.set(ghead)
} else {
sched.runqhead.set(ghead)
}
sched.runqtail.set(gtail)
sched.runqsize += n
}
// Try get a batch of G's from the global runnable queue.
// Sched must be locked.
func globrunqget(_p_ *p, max int32) *g {
if sched.runqsize == 0 {
return nil
}
n := sched.runqsize/gomaxprocs + 1
if n > sched.runqsize {
n = sched.runqsize
}
if max > 0 && n > max {
n = max
}
if n > int32(len(_p_.runq))/2 {
n = int32(len(_p_.runq)) / 2
}
sched.runqsize -= n
if sched.runqsize == 0 {
sched.runqtail = 0
}
gp := sched.runqhead.ptr()
sched.runqhead = gp.schedlink
n--
for ; n > 0; n-- {
gp1 := sched.runqhead.ptr()
sched.runqhead = gp1.schedlink
runqput(_p_, gp1, false)
}
return gp
}
// Put p to on _Pidle list.
// Sched must be locked.
// May run during STW, so write barriers are not allowed.
//go:nowritebarrier
func pidleput(_p_ *p) {
if !runqempty(_p_) {
throw("pidleput: P has non-empty run queue")
}
_p_.link = sched.pidle
sched.pidle.set(_p_)
atomic.Xadd(&sched.npidle, 1) // TODO: fast atomic
}
// Try get a p from _Pidle list.
// Sched must be locked.
// May run during STW, so write barriers are not allowed.
//go:nowritebarrier
func pidleget() *p {
_p_ := sched.pidle.ptr()
if _p_ != nil {
sched.pidle = _p_.link
atomic.Xadd(&sched.npidle, -1) // TODO: fast atomic
}
return _p_
}
// runqempty returns true if _p_ has no Gs on its local run queue.
// It never returns true spuriously.
func runqempty(_p_ *p) bool {
// Defend against a race where 1) _p_ has G1 in runqnext but runqhead == runqtail,
// 2) runqput on _p_ kicks G1 to the runq, 3) runqget on _p_ empties runqnext.
// Simply observing that runqhead == runqtail and then observing that runqnext == nil
// does not mean the queue is empty.
for {
head := atomic.Load(&_p_.runqhead)
tail := atomic.Load(&_p_.runqtail)
runnext := atomic.Loaduintptr((*uintptr)(unsafe.Pointer(&_p_.runnext)))
if tail == atomic.Load(&_p_.runqtail) {
return head == tail && runnext == 0
}
}
}
// To shake out latent assumptions about scheduling order,
// we introduce some randomness into scheduling decisions
// when running with the race detector.
// The need for this was made obvious by changing the
// (deterministic) scheduling order in Go 1.5 and breaking
// many poorly-written tests.
// With the randomness here, as long as the tests pass
// consistently with -race, they shouldn't have latent scheduling
// assumptions.
const randomizeScheduler = raceenabled
// runqput tries to put g on the local runnable queue.
// If next if false, runqput adds g to the tail of the runnable queue.
// If next is true, runqput puts g in the _p_.runnext slot.
// If the run queue is full, runnext puts g on the global queue.
// Executed only by the owner P.
func runqput(_p_ *p, gp *g, next bool) {
if randomizeScheduler && next && fastrand1()%2 == 0 {
next = false
}
if next {
retryNext:
oldnext := _p_.runnext
if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
goto retryNext
}
if oldnext == 0 {
return
}
// Kick the old runnext out to the regular run queue.
gp = oldnext.ptr()
}
retry:
h := atomic.Load(&_p_.runqhead) // load-acquire, synchronize with consumers
t := _p_.runqtail
if t-h < uint32(len(_p_.runq)) {
_p_.runq[t%uint32(len(_p_.runq))].set(gp)
atomic.Store(&_p_.runqtail, t+1) // store-release, makes the item available for consumption
return
}
if runqputslow(_p_, gp, h, t) {
return
}
// the queue is not full, now the put above must succeed
goto retry
}
// Put g and a batch of work from local runnable queue on global queue.
// Executed only by the owner P.
func runqputslow(_p_ *p, gp *g, h, t uint32) bool {
var batch [len(_p_.runq)/2 + 1]*g
// First, grab a batch from local queue.
n := t - h
n = n / 2
if n != uint32(len(_p_.runq)/2) {
throw("runqputslow: queue is not full")
}
for i := uint32(0); i < n; i++ {
batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr()
}
if !atomic.Cas(&_p_.runqhead, h, h+n) { // cas-release, commits consume
return false
}
batch[n] = gp
if randomizeScheduler {
for i := uint32(1); i <= n; i++ {
j := fastrand1() % (i + 1)
batch[i], batch[j] = batch[j], batch[i]
}
}
// Link the goroutines.
for i := uint32(0); i < n; i++ {
batch[i].schedlink.set(batch[i+1])
}
// Now put the batch on global queue.
lock(&sched.lock)
globrunqputbatch(batch[0], batch[n], int32(n+1))
unlock(&sched.lock)
return true
}
// Get g from local runnable queue.
// If inheritTime is true, gp should inherit the remaining time in the
// current time slice. Otherwise, it should start a new time slice.
// Executed only by the owner P.
func runqget(_p_ *p) (gp *g, inheritTime bool) {
// If there's a runnext, it's the next G to run.
for {
next := _p_.runnext
if next == 0 {
break
}
if _p_.runnext.cas(next, 0) {
return next.ptr(), true
}
}
for {
h := atomic.Load(&_p_.runqhead) // load-acquire, synchronize with other consumers
t := _p_.runqtail
if t == h {
return nil, false
}
gp := _p_.runq[h%uint32(len(_p_.runq))].ptr()
if atomic.Cas(&_p_.runqhead, h, h+1) { // cas-release, commits consume
return gp, false
}
}
}
// Grabs a batch of goroutines from _p_'s runnable queue into batch.
// Batch is a ring buffer starting at batchHead.
// Returns number of grabbed goroutines.
// Can be executed by any P.
func runqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 {
for {
h := atomic.Load(&_p_.runqhead) // load-acquire, synchronize with other consumers
t := atomic.Load(&_p_.runqtail) // load-acquire, synchronize with the producer
n := t - h
n = n - n/2
if n == 0 {
if stealRunNextG {
// Try to steal from _p_.runnext.
if next := _p_.runnext; next != 0 {
// Sleep to ensure that _p_ isn't about to run the g we
// are about to steal.
// The important use case here is when the g running on _p_
// ready()s another g and then almost immediately blocks.
// Instead of stealing runnext in this window, back off
// to give _p_ a chance to schedule runnext. This will avoid
// thrashing gs between different Ps.
// A sync chan send/recv takes ~50ns as of time of writing,
// so 3us gives ~50x overshoot.
if GOOS != "windows" {
usleep(3)
} else {
// On windows system timer granularity is 1-15ms,
// which is way too much for this optimization.
// So just yield.
osyield()
}
if !_p_.runnext.cas(next, 0) {
continue
}
batch[batchHead%uint32(len(batch))] = next
return 1
}
}
return 0
}
if n > uint32(len(_p_.runq)/2) { // read inconsistent h and t
continue
}
for i := uint32(0); i < n; i++ {
g := _p_.runq[(h+i)%uint32(len(_p_.runq))]
batch[(batchHead+i)%uint32(len(batch))] = g
}
if atomic.Cas(&_p_.runqhead, h, h+n) { // cas-release, commits consume
return n
}
}
}
// Steal half of elements from local runnable queue of p2
// and put onto local runnable queue of p.
// Returns one of the stolen elements (or nil if failed).
func runqsteal(_p_, p2 *p, stealRunNextG bool) *g {
t := _p_.runqtail
n := runqgrab(p2, &_p_.runq, t, stealRunNextG)
if n == 0 {
return nil
}
n--
gp := _p_.runq[(t+n)%uint32(len(_p_.runq))].ptr()
if n == 0 {
return gp
}
h := atomic.Load(&_p_.runqhead) // load-acquire, synchronize with consumers
if t-h+n >= uint32(len(_p_.runq)) {
throw("runqsteal: runq overflow")
}
atomic.Store(&_p_.runqtail, t+n) // store-release, makes the item available for consumption
return gp
}
// Active spinning for sync.Mutex.
//go:linkname sync_runtime_canSpin sync.runtime_canSpin
//go:nosplit
func sync_runtime_canSpin(i int) bool {
// sync.Mutex is cooperative, so we are conservative with spinning.
// Spin only few times and only if running on a multicore machine and
// GOMAXPROCS>1 and there is at least one other running P and local runq is empty.
// As opposed to runtime mutex we don't do passive spinning here,
// because there can be work on global runq on on other Ps.
if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
return false
}
if p := getg().m.p.ptr(); !runqempty(p) {
return false
}
return true
}
//go:linkname sync_runtime_doSpin sync.runtime_doSpin
//go:nosplit
func sync_runtime_doSpin() {
procyield(active_spin_cnt)
}
var stealOrder randomOrder
// randomOrder/randomEnum are helper types for randomized work stealing.
// They allow to enumerate all Ps in different pseudo-random orders without repetitions.
// The algorithm is based on the fact that if we have X such that X and GOMAXPROCS
// are coprime, then a sequences of (i + X) % GOMAXPROCS gives the required enumeration.
type randomOrder struct {
count uint32
coprimes []uint32
}
type randomEnum struct {
i uint32
count uint32
pos uint32
inc uint32
}
func (ord *randomOrder) reset(count uint32) {
ord.count = count
ord.coprimes = ord.coprimes[:0]
for i := uint32(1); i <= count; i++ {
if gcd(i, count) == 1 {
ord.coprimes = append(ord.coprimes, i)
}
}
}
func (ord *randomOrder) start(i uint32) randomEnum {
return randomEnum{
count: ord.count,
pos: i % ord.count,
inc: ord.coprimes[i%uint32(len(ord.coprimes))],
}
}
func (enum *randomEnum) done() bool {
return enum.i == enum.count
}
func (enum *randomEnum) next() {
enum.i++
enum.pos = (enum.pos + enum.inc) % enum.count
}
func (enum *randomEnum) position() uint32 {
return enum.pos
}
func gcd(a, b uint32) uint32 {
for b != 0 {
a, b = b, a%b
}
return a
}
......@@ -556,19 +556,14 @@ func nonleaf(stop chan int) bool {
}
}
/*
func TestSchedLocalQueue(t *testing.T) {
runtime.TestSchedLocalQueue1()
runtime.RunSchedLocalQueueTest()
}
*/
/*
func TestSchedLocalQueueSteal(t *testing.T) {
runtime.TestSchedLocalQueueSteal1()
runtime.RunSchedLocalQueueStealTest()
}
*/
/*
func TestSchedLocalQueueEmpty(t *testing.T) {
if runtime.NumCPU() == 1 {
// Takes too long and does not trigger the race.
......@@ -586,7 +581,6 @@ func TestSchedLocalQueueEmpty(t *testing.T) {
}
runtime.RunSchedLocalQueueEmptyTest(iters)
}
*/
func benchmarkStackGrowth(b *testing.B, rec int) {
b.RunParallel(func(pb *testing.PB) {
......
......@@ -5,6 +5,7 @@
package runtime
import (
"runtime/internal/atomic"
"runtime/internal/sys"
"unsafe"
)
......@@ -203,12 +204,10 @@ func (gp guintptr) ptr() *g { return (*g)(unsafe.Pointer(gp)) }
//go:nosplit
func (gp *guintptr) set(g *g) { *gp = guintptr(unsafe.Pointer(g)) }
/*
//go:nosplit
func (gp *guintptr) cas(old, new guintptr) bool {
return atomic.Casuintptr((*uintptr)(unsafe.Pointer(gp)), uintptr(old), uintptr(new))
}
*/
type puintptr uintptr
......@@ -358,7 +357,7 @@ type g struct {
sigpc uintptr
gopc uintptr // pc of go statement that created this goroutine
startpc uintptr // pc of goroutine function
racectx uintptr
// Not for gccgo: racectx uintptr
waiting *sudog // sudog structures this g is waiting on (that have a valid elem ptr); in lock order
// Not for gccgo: cgoCtxt []uintptr // cgo traceback context
......@@ -521,16 +520,16 @@ type p struct {
gfreecnt int32
sudogcache []*sudog
// Not for gccgo for now: sudogbuf [128]*sudog
sudogbuf [128]*sudog
// Not for gccgo for now: tracebuf traceBufPtr
tracebuf traceBufPtr
// Not for gccgo for now: palloc persistentAlloc // per-P to avoid mutex
// Per-P GC state
// Not for gccgo for now: gcAssistTime int64 // Nanoseconds in assistAlloc
// Not for gccgo for now: gcBgMarkWorker guintptr
// Not for gccgo for now: gcMarkWorkerMode gcMarkWorkerMode
gcAssistTime int64 // Nanoseconds in assistAlloc
gcBgMarkWorker guintptr
gcMarkWorkerMode gcMarkWorkerMode
// gcw is this P's GC work buffer cache. The work buffer is
// filled by write barriers, drained by mutator assists, and
......@@ -761,17 +760,12 @@ var (
// allm *m
allp [_MaxGomaxprocs + 1]*p
// gomaxprocs int32
gomaxprocs int32
panicking uint32
ncpu int32
// forcegc forcegcstate
forcegc forcegcstate
sched schedt
// newprocs int32
newprocs int32
// Information about what cpu features are available.
// Set on startup.
......
......@@ -304,6 +304,7 @@ const (
_64bit = 1 << (^uintptr(0) >> 63) / 2
_MHeapMap_TotalBits = (_64bit*sys.GoosWindows)*35 + (_64bit*(1-sys.GoosWindows)*(1-sys.GoosDarwin*sys.GoarchArm64))*39 + sys.GoosDarwin*sys.GoarchArm64*31 + (1-_64bit)*32
_MaxMem = uintptr(1<<_MHeapMap_TotalBits - 1)
_MaxGcproc = 32
)
// Here for gccgo until we port malloc.go.
......@@ -350,7 +351,6 @@ func entersyscallblock(int32)
func exitsyscall(int32)
func gopark(func(*g, unsafe.Pointer) bool, unsafe.Pointer, string, byte, int)
func goparkunlock(*mutex, string, byte, int)
func goready(*g, int)
// Temporary hack for gccgo until we port proc.go.
//go:nosplit
......@@ -411,12 +411,6 @@ func roundupsize(uintptr) uintptr
// Here for gccgo until we port mgc.go.
func GC()
// Here for gccgo until we port proc.go.
var worldsema uint32 = 1
func stopTheWorldWithSema()
func startTheWorldWithSema()
// For gccgo to call from C code.
//go:linkname acquireWorldsema runtime.acquireWorldsema
func acquireWorldsema() {
......@@ -429,26 +423,6 @@ func releaseWorldsema() {
semrelease(&worldsema)
}
// Here for gccgo until we port proc.go.
func stopTheWorld(reason string) {
semacquire(&worldsema, false)
getg().m.preemptoff = reason
getg().m.gcing = 1
systemstack(stopTheWorldWithSema)
}
// Here for gccgo until we port proc.go.
func startTheWorld() {
getg().m.gcing = 0
getg().m.locks++
systemstack(startTheWorldWithSema)
// worldsema must be held over startTheWorldWithSema to ensure
// gomaxprocs cannot change while worldsema is held.
semrelease(&worldsema)
getg().m.preemptoff = ""
getg().m.locks--
}
// For gccgo to call from C code, so that the C code and the Go code
// can share the memstats variable for now.
//go:linkname getMstats runtime.getMstats
......@@ -461,6 +435,7 @@ func setcpuprofilerate_m(hz int32)
// Temporary for gccgo until we port mem_GOOS.go.
func sysAlloc(n uintptr, sysStat *uint64) unsafe.Pointer
func sysFree(v unsafe.Pointer, n uintptr, sysStat *uint64)
// Temporary for gccgo until we port proc.go, so that the C signal
// handler can call into cpuprof.
......@@ -522,7 +497,6 @@ func getZerobase() *uintptr {
func sigprof()
func mcount() int32
func goexit1()
func freezetheworld()
// Get signal trampoline, written in C.
func getSigtramp() uintptr
......@@ -592,6 +566,7 @@ func getPanicking() uint32 {
// Temporary for gccgo until we port mcache.go.
func allocmcache() *mcache
func freemcache(*mcache)
// Temporary for gccgo until we port mgc.go.
// This is just so that allgadd will compile.
......@@ -616,3 +591,60 @@ func gcount() int32 {
unlock(&allglock)
return n
}
// Temporary for gccgo until we port mgc.go.
var gcBlackenEnabled uint32
// Temporary for gccgo until we port mgc.go.
func gcMarkWorkAvailable(p *p) bool {
return false
}
// Temporary for gccgo until we port mgc.go.
var gcController gcControllerState
// Temporary for gccgo until we port mgc.go.
type gcControllerState struct {
}
// Temporary for gccgo until we port mgc.go.
func (c *gcControllerState) findRunnableGCWorker(_p_ *p) *g {
return nil
}
// Temporary for gccgo until we port mgc.go.
var gcphase uint32
// Temporary for gccgo until we port mgc.go.
const (
_GCoff = iota
_GCmark
_GCmarktermination
)
// Temporary for gccgo until we port mgc.go.
type gcMarkWorkerMode int
// Temporary for gccgo until we port mgc.go.
const (
gcMarkWorkerDedicatedMode gcMarkWorkerMode = iota
gcMarkWorkerFractionalMode
gcMarkWorkerIdleMode
)
// Temporary for gccgo until we port mheap.go.
type mheap struct {
}
// Temporary for gccgo until we port mheap.go.
var mheap_ mheap
// Temporary for gccgo until we port mheap.go.
func (h *mheap) scavenge(k int32, now, limit uint64) {
}
// Temporary for gccgo until we initialize ncpu in Go.
//go:linkname setncpu runtime.setncpu
func setncpu(n int32) {
ncpu = n
}
......@@ -130,7 +130,7 @@ type traceBufHeader struct {
link traceBufPtr // in trace.empty/full
lastTicks uint64 // when we wrote the last event
pos int // next write offset in arr
stk [traceStackSize]uintptr // scratch buffer for traceback
stk [traceStackSize]location // scratch buffer for traceback
}
// traceBuf is per-P tracing buffer.
......@@ -152,9 +152,6 @@ func traceBufPtrOf(b *traceBuf) traceBufPtr {
return traceBufPtr(unsafe.Pointer(b))
}
/*
Commented out for gccgo for now.
// StartTrace enables tracing for the current process.
// While tracing, the data will be buffered and available via ReadTrace.
// StartTrace returns an error if tracing is already enabled.
......@@ -522,13 +519,7 @@ func traceEvent(ev byte, skip int, args ...uint64) {
if gp == _g_ {
nstk = callers(skip, buf.stk[:])
} else if gp != nil {
gp = mp.curg
// This may happen when tracing a system call,
// so we must lock the stack.
if gcTryLockStackBarriers(gp) {
nstk = gcallers(gp, skip, buf.stk[:])
gcUnlockStackBarriers(gp)
}
// FIXME: get stack trace of different goroutine.
}
if nstk > 0 {
nstk-- // skip runtime.goexit
......@@ -647,8 +638,6 @@ func (buf *traceBuf) byte(v byte) {
buf.pos++
}
*/
// traceStackTable maps stack traces (arrays of PC's) to unique uint32 ids.
// It is lock-free for reading.
type traceStackTable struct {
......@@ -664,28 +653,30 @@ type traceStack struct {
hash uintptr
id uint32
n int
stk [0]uintptr // real type [n]uintptr
stk [0]location // real type [n]location
}
type traceStackPtr uintptr
/*
Commented out for gccgo for now.
func (tp traceStackPtr) ptr() *traceStack { return (*traceStack)(unsafe.Pointer(tp)) }
// stack returns slice of PCs.
func (ts *traceStack) stack() []uintptr {
return (*[traceStackSize]uintptr)(unsafe.Pointer(&ts.stk))[:ts.n]
func (ts *traceStack) stack() []location {
return (*[traceStackSize]location)(unsafe.Pointer(&ts.stk))[:ts.n]
}
// put returns a unique id for the stack trace pcs and caches it in the table,
// if it sees the trace for the first time.
func (tab *traceStackTable) put(pcs []uintptr) uint32 {
func (tab *traceStackTable) put(pcs []location) uint32 {
if len(pcs) == 0 {
return 0
}
hash := memhash(unsafe.Pointer(&pcs[0]), 0, uintptr(len(pcs))*unsafe.Sizeof(pcs[0]))
var hash uintptr
for _, loc := range pcs {
hash += loc.pc
hash += hash << 10
hash ^= hash >> 6
}
// First, search the hashtable w/o the mutex.
if id := tab.find(pcs, hash); id != 0 {
return id
......@@ -714,7 +705,7 @@ func (tab *traceStackTable) put(pcs []uintptr) uint32 {
}
// find checks if the stack trace pcs is already present in the table.
func (tab *traceStackTable) find(pcs []uintptr, hash uintptr) uint32 {
func (tab *traceStackTable) find(pcs []location, hash uintptr) uint32 {
part := int(hash % uintptr(len(tab.tab)))
Search:
for stk := tab.tab[part].ptr(); stk != nil; stk = stk.link.ptr() {
......@@ -732,13 +723,12 @@ Search:
// newStack allocates a new stack of size n.
func (tab *traceStackTable) newStack(n int) *traceStack {
return (*traceStack)(tab.mem.alloc(unsafe.Sizeof(traceStack{}) + uintptr(n)*sys.PtrSize))
return (*traceStack)(tab.mem.alloc(unsafe.Sizeof(traceStack{}) + uintptr(n)*unsafe.Sizeof(location{})))
}
// dump writes all previously cached stacks to trace buffers,
// releases all memory and resets state.
func (tab *traceStackTable) dump() {
frames := make(map[uintptr]traceFrame)
var tmp [(2 + 4*traceStackSize) * traceBytesPerNumber]byte
buf := traceFlush(0).ptr()
for _, stk := range tab.tab {
......@@ -749,8 +739,8 @@ func (tab *traceStackTable) dump() {
tmpbuf = traceAppend(tmpbuf, uint64(stk.n))
for _, pc := range stk.stack() {
var frame traceFrame
frame, buf = traceFrameForPC(buf, frames, pc)
tmpbuf = traceAppend(tmpbuf, uint64(pc))
frame, buf = traceFrameForPC(buf, pc)
tmpbuf = traceAppend(tmpbuf, uint64(pc.pc))
tmpbuf = traceAppend(tmpbuf, uint64(frame.funcID))
tmpbuf = traceAppend(tmpbuf, uint64(frame.fileID))
tmpbuf = traceAppend(tmpbuf, uint64(frame.line))
......@@ -780,25 +770,15 @@ type traceFrame struct {
line uint64
}
func traceFrameForPC(buf *traceBuf, frames map[uintptr]traceFrame, pc uintptr) (traceFrame, *traceBuf) {
if frame, ok := frames[pc]; ok {
return frame, buf
}
func traceFrameForPC(buf *traceBuf, loc location) (traceFrame, *traceBuf) {
var frame traceFrame
f := findfunc(pc)
if f == nil {
frames[pc] = frame
return frame, buf
}
fn := funcname(f)
fn := loc.function
const maxLen = 1 << 10
if len(fn) > maxLen {
fn = fn[len(fn)-maxLen:]
}
frame.funcID, buf = traceString(buf, fn)
file, line := funcline(f, pc-sys.PCQuantum)
file, line := loc.filename, loc.lineno
frame.line = uint64(line)
if len(file) > maxLen {
file = file[len(file)-maxLen:]
......@@ -807,8 +787,6 @@ func traceFrameForPC(buf *traceBuf, frames map[uintptr]traceFrame, pc uintptr) (
return frame, buf
}
*/
// traceAlloc is a non-thread-safe region allocator.
// It holds a linked list of traceAllocBlock.
type traceAlloc struct {
......@@ -831,9 +809,6 @@ type traceAllocBlockPtr uintptr
func (p traceAllocBlockPtr) ptr() *traceAllocBlock { return (*traceAllocBlock)(unsafe.Pointer(p)) }
func (p *traceAllocBlockPtr) set(x *traceAllocBlock) { *p = traceAllocBlockPtr(unsafe.Pointer(x)) }
/*
Commented out for gccgo for now.
// alloc allocates n-byte block.
func (a *traceAlloc) alloc(n uintptr) unsafe.Pointer {
n = round(n, sys.PtrSize)
......@@ -841,6 +816,8 @@ func (a *traceAlloc) alloc(n uintptr) unsafe.Pointer {
if n > uintptr(len(a.head.ptr().data)) {
throw("trace: alloc too large")
}
// This is only safe because the strings returned by callers
// are stored in a location that is not in the Go heap.
block := (*traceAllocBlock)(sysAlloc(unsafe.Sizeof(traceAllocBlock{}), &memstats.other_sys))
if block == nil {
throw("trace: out of memory")
......@@ -913,7 +890,7 @@ func traceGoCreate(newg *g, pc uintptr) {
newg.traceseq = 0
newg.tracelastp = getg().m.p
// +PCQuantum because traceFrameForPC expects return PCs and subtracts PCQuantum.
id := trace.stackTab.put([]uintptr{pc + sys.PCQuantum})
id := trace.stackTab.put([]location{location{pc: pc + sys.PCQuantum}})
traceEvent(traceEvGoCreate, 2, uint64(newg.goid), uint64(id))
}
......@@ -1004,5 +981,3 @@ func traceHeapAlloc() {
func traceNextGC() {
traceEvent(traceEvNextGC, -1, memstats.next_gc)
}
*/
......@@ -618,8 +618,7 @@ runtime_debug_WriteHeapDump(uintptr fd)
// Stop the world.
runtime_acquireWorldsema();
m = runtime_m();
m->gcing = 1;
m->locks++;
m->preemptoff = runtime_gostringnocopy((const byte*)"write heap dump");
runtime_stopTheWorldWithSema();
// Update stats so we can dump them.
......@@ -640,10 +639,9 @@ runtime_debug_WriteHeapDump(uintptr fd)
dumpfd = 0;
// Start up the world again.
m->gcing = 0;
runtime_releaseWorldsema();
runtime_startTheWorldWithSema();
m->locks--;
runtime_releaseWorldsema();
m->preemptoff = runtime_gostringnocopy(nil);
}
// Runs the specified gc program. Calls the callback for every
......
......@@ -99,7 +99,8 @@ runtime_mallocgc(uintptr size, uintptr typ, uint32 flag)
flag |= FlagNoInvokeGC;
}
if(runtime_gcwaiting() && g != m->g0 && m->locks == 0 && !(flag & FlagNoInvokeGC) && m->preemptoff.len == 0) {
if((g->preempt || runtime_gcwaiting()) && g != m->g0 && m->locks == 0 && !(flag & FlagNoInvokeGC) && m->preemptoff.len == 0) {
g->preempt = false;
runtime_gosched();
m = runtime_m();
}
......
......@@ -132,12 +132,6 @@ enum
#else
MHeapMap_Bits = 32 - PageShift,
#endif
// Max number of threads to run garbage collection.
// 2, 3, and 4 are all plausible maximums depending
// on the hardware details of the machine. The garbage
// collector scales well to 8 cpus.
MaxGcproc = 8,
};
// Maximum memory allocation size, a hint for callers.
......@@ -186,7 +180,8 @@ enum
void* runtime_SysAlloc(uintptr nbytes, uint64 *stat)
__asm__ (GOSYM_PREFIX "runtime.sysAlloc");
void runtime_SysFree(void *v, uintptr nbytes, uint64 *stat);
void runtime_SysFree(void *v, uintptr nbytes, uint64 *stat)
__asm__ (GOSYM_PREFIX "runtime.sysFree");
void runtime_SysUnused(void *v, uintptr nbytes);
void runtime_SysUsed(void *v, uintptr nbytes);
void runtime_SysMap(void *v, uintptr nbytes, bool reserved, uint64 *stat);
......@@ -467,11 +462,15 @@ void runtime_MProf_GC(void)
__asm__ (GOSYM_PREFIX "runtime.mProf_GC");
void runtime_iterate_memprof(FuncVal* callback)
__asm__ (GOSYM_PREFIX "runtime.iterate_memprof");
int32 runtime_gcprocs(void);
void runtime_helpgc(int32 nproc);
void runtime_gchelper(void);
int32 runtime_gcprocs(void)
__asm__ (GOSYM_PREFIX "runtime.gcprocs");
void runtime_helpgc(int32 nproc)
__asm__ (GOSYM_PREFIX "runtime.helpgc");
void runtime_gchelper(void)
__asm__ (GOSYM_PREFIX "runtime.gchelper");
void runtime_createfing(void);
G* runtime_wakefing(void);
G* runtime_wakefing(void)
__asm__ (GOSYM_PREFIX "runtime.wakefing");
extern bool runtime_fingwait;
extern bool runtime_fingwake;
......
......@@ -7,7 +7,7 @@
// GC is:
// - mark&sweep
// - mostly precise (with the exception of some C-allocated objects, assembly frames/arguments, etc)
// - parallel (up to MaxGcproc threads)
// - parallel (up to _MaxGcproc threads)
// - partially concurrent (mark is stop-the-world, while sweep is concurrent)
// - non-moving/non-compacting
// - full (non-partial)
......@@ -389,7 +389,7 @@ struct BufferList
uint32 busy;
byte pad[CacheLineSize];
};
static BufferList bufferList[MaxGcproc];
static BufferList bufferList[_MaxGcproc];
static void enqueue(Obj obj, Workbuf **_wbuf, Obj **_wp, uintptr *_nobj);
......@@ -2228,7 +2228,7 @@ gc(struct gc_args *args)
m->locks++; // disable gc during mallocs in parforalloc
if(work.markfor == nil)
work.markfor = runtime_parforalloc(MaxGcproc);
work.markfor = runtime_parforalloc(_MaxGcproc);
m->locks--;
tm1 = 0;
......@@ -2355,7 +2355,7 @@ gc(struct gc_args *args)
sweep.g = __go_go(bgsweep, nil);
else if(sweep.parked) {
sweep.parked = false;
runtime_ready(sweep.g);
runtime_ready(sweep.g, 0, true);
}
runtime_unlock(&gclock);
} else {
......@@ -2429,7 +2429,7 @@ gchelperstart(void)
M *m;
m = runtime_m();
if(m->helpgc < 0 || m->helpgc >= MaxGcproc)
if(m->helpgc < 0 || m->helpgc >= _MaxGcproc)
runtime_throw("gchelperstart: bad m->helpgc");
if(runtime_xchg(&bufferList[m->helpgc].busy, 1))
runtime_throw("gchelperstart: already busy");
......@@ -2541,6 +2541,20 @@ runtime_createfing(void)
runtime_unlock(&gclock);
}
bool getfingwait() __asm__(GOSYM_PREFIX "runtime.getfingwait");
bool
getfingwait()
{
return runtime_fingwait;
}
bool getfingwake() __asm__(GOSYM_PREFIX "runtime.getfingwake");
bool
getfingwake()
{
return runtime_fingwake;
}
G*
runtime_wakefing(void)
{
......
......@@ -365,9 +365,14 @@ extern P** runtime_getAllP()
__asm__ (GOSYM_PREFIX "runtime.getAllP");
extern G* allocg(void)
__asm__ (GOSYM_PREFIX "runtime.allocg");
extern bool needaddgcproc(void)
__asm__ (GOSYM_PREFIX "runtime.needaddgcproc");
extern void startm(P*, bool)
__asm__(GOSYM_PREFIX "runtime.startm");
extern void newm(void(*)(void), P*)
__asm__(GOSYM_PREFIX "runtime.newm");
Sched* runtime_sched;
int32 runtime_gomaxprocs;
M runtime_m0;
G runtime_g0; // idle goroutine for m0
G* runtime_lastg;
......@@ -376,51 +381,58 @@ P** runtime_allp;
int8* runtime_goos;
int32 runtime_ncpu;
bool runtime_precisestack;
static int32 newprocs;
bool runtime_isarchive;
void* runtime_mstart(void*);
static void runqput(P*, G*);
static G* runqget(P*);
static bool runqputslow(P*, G*, uint32, uint32);
static G* runqsteal(P*, P*);
static void mput(M*);
static M* mget(void);
static void mcommoninit(M*);
static void schedule(void);
static void procresize(int32);
static void acquirep(P*);
static P* releasep(void);
static void newm(void(*)(void), P*);
static void stopm(void);
static void startm(P*, bool);
static void handoffp(P*);
static void wakep(void);
static void stoplockedm(void);
static void startlockedm(G*);
static void sysmon(void);
static uint32 retake(int64);
static void incidlelocked(int32);
static void exitsyscall0(G*);
static void park0(G*);
static void goexit0(G*);
static void gfput(P*, G*);
static G* gfget(P*);
static void gfpurge(P*);
static void globrunqput(G*);
static void globrunqputbatch(G*, G*, int32);
static G* globrunqget(P*, int32);
static P* pidleget(void);
static void pidleput(P*);
static void injectglist(G*);
static bool preemptall(void);
static bool exitsyscallfast(void);
void allgadd(G*)
extern void setncpu(int32)
__asm__(GOSYM_PREFIX "runtime.setncpu");
extern void allgadd(G*)
__asm__(GOSYM_PREFIX "runtime.allgadd");
void checkdead(void)
extern void stopm(void)
__asm__(GOSYM_PREFIX "runtime.stopm");
extern void handoffp(P*)
__asm__(GOSYM_PREFIX "runtime.handoffp");
extern void wakep(void)
__asm__(GOSYM_PREFIX "runtime.wakep");
extern void stoplockedm(void)
__asm__(GOSYM_PREFIX "runtime.stoplockedm");
extern void schedule(void)
__asm__(GOSYM_PREFIX "runtime.schedule");
extern void execute(G*, bool)
__asm__(GOSYM_PREFIX "runtime.execute");
extern void procresize(int32)
__asm__(GOSYM_PREFIX "runtime.procresize");
extern void acquirep(P*)
__asm__(GOSYM_PREFIX "runtime.acquirep");
extern P* releasep(void)
__asm__(GOSYM_PREFIX "runtime.releasep");
extern void incidlelocked(int32)
__asm__(GOSYM_PREFIX "runtime.incidlelocked");
extern void checkdead(void)
__asm__(GOSYM_PREFIX "runtime.checkdead");
extern void sysmon(void)
__asm__(GOSYM_PREFIX "runtime.sysmon");
extern void mput(M*)
__asm__(GOSYM_PREFIX "runtime.mput");
extern M* mget(void)
__asm__(GOSYM_PREFIX "runtime.mget");
extern void globrunqput(G*)
__asm__(GOSYM_PREFIX "runtime.globrunqput");
extern P* pidleget(void)
__asm__(GOSYM_PREFIX "runtime.pidleget");
extern bool runqempty(P*)
__asm__(GOSYM_PREFIX "runtime.runqempty");
extern void runqput(P*, G*, bool)
__asm__(GOSYM_PREFIX "runtime.runqput");
bool runtime_isstarted;
......@@ -441,6 +453,7 @@ runtime_schedinit(void)
const byte *p;
Eface i;
setncpu(runtime_ncpu);
runtime_sched = runtime_getsched();
m = &runtime_m0;
......@@ -660,234 +673,6 @@ mcommoninit(M *mp)
runtime_unlock(&runtime_sched->lock);
}
// Mark gp ready to run.
void
runtime_ready(G *gp)
{
// Mark runnable.
g->m->locks++; // disable preemption because it can be holding p in a local var
if(gp->atomicstatus != _Gwaiting) {
runtime_printf("goroutine %D has status %d\n", gp->goid, gp->atomicstatus);
runtime_throw("bad g->atomicstatus in ready");
}
gp->atomicstatus = _Grunnable;
runqput((P*)g->m->p, gp);
if(runtime_atomicload(&runtime_sched->npidle) != 0 && runtime_atomicload(&runtime_sched->nmspinning) == 0) // TODO: fast atomic
wakep();
g->m->locks--;
}
void goready(G*, int) __asm__ (GOSYM_PREFIX "runtime.goready");
void
goready(G* gp, int traceskip __attribute__ ((unused)))
{
runtime_ready(gp);
}
int32
runtime_gcprocs(void)
{
int32 n;
// Figure out how many CPUs to use during GC.
// Limited by gomaxprocs, number of actual CPUs, and MaxGcproc.
runtime_lock(&runtime_sched->lock);
n = runtime_gomaxprocs;
if(n > runtime_ncpu)
n = runtime_ncpu > 0 ? runtime_ncpu : 1;
if(n > MaxGcproc)
n = MaxGcproc;
if(n > runtime_sched->nmidle+1) // one M is currently running
n = runtime_sched->nmidle+1;
runtime_unlock(&runtime_sched->lock);
return n;
}
static bool
needaddgcproc(void)
{
int32 n;
runtime_lock(&runtime_sched->lock);
n = runtime_gomaxprocs;
if(n > runtime_ncpu)
n = runtime_ncpu;
if(n > MaxGcproc)
n = MaxGcproc;
n -= runtime_sched->nmidle+1; // one M is currently running
runtime_unlock(&runtime_sched->lock);
return n > 0;
}
void
runtime_helpgc(int32 nproc)
{
M *mp;
int32 n, pos;
runtime_lock(&runtime_sched->lock);
pos = 0;
for(n = 1; n < nproc; n++) { // one M is currently running
if(runtime_allp[pos]->mcache == g->m->mcache)
pos++;
mp = mget();
if(mp == nil)
runtime_throw("runtime_gcprocs inconsistency");
mp->helpgc = n;
mp->mcache = runtime_allp[pos]->mcache;
pos++;
runtime_notewakeup(&mp->park);
}
runtime_unlock(&runtime_sched->lock);
}
// Similar to stoptheworld but best-effort and can be called several times.
// There is no reverse operation, used during crashing.
// This function must not lock any mutexes.
void
runtime_freezetheworld(void)
{
int32 i;
if(runtime_gomaxprocs == 1)
return;
// stopwait and preemption requests can be lost
// due to races with concurrently executing threads,
// so try several times
for(i = 0; i < 5; i++) {
// this should tell the scheduler to not start any new goroutines
runtime_sched->stopwait = 0x7fffffff;
runtime_atomicstore((uint32*)&runtime_sched->gcwaiting, 1);
// this should stop running goroutines
if(!preemptall())
break; // no running goroutines
runtime_usleep(1000);
}
// to be sure
runtime_usleep(1000);
preemptall();
runtime_usleep(1000);
}
void
runtime_stopTheWorldWithSema(void)
{
int32 i;
uint32 s;
P *p;
bool wait;
runtime_lock(&runtime_sched->lock);
runtime_sched->stopwait = runtime_gomaxprocs;
runtime_atomicstore((uint32*)&runtime_sched->gcwaiting, 1);
preemptall();
// stop current P
((P*)g->m->p)->status = _Pgcstop;
runtime_sched->stopwait--;
// try to retake all P's in _Psyscall status
for(i = 0; i < runtime_gomaxprocs; i++) {
p = runtime_allp[i];
s = p->status;
if(s == _Psyscall && runtime_cas(&p->status, s, _Pgcstop))
runtime_sched->stopwait--;
}
// stop idle P's
while((p = pidleget()) != nil) {
p->status = _Pgcstop;
runtime_sched->stopwait--;
}
wait = runtime_sched->stopwait > 0;
runtime_unlock(&runtime_sched->lock);
// wait for remaining P's to stop voluntarily
if(wait) {
runtime_notesleep(&runtime_sched->stopnote);
runtime_noteclear(&runtime_sched->stopnote);
}
if(runtime_sched->stopwait)
runtime_throw("stoptheworld: not stopped");
for(i = 0; i < runtime_gomaxprocs; i++) {
p = runtime_allp[i];
if(p->status != _Pgcstop)
runtime_throw("stoptheworld: not stopped");
}
}
static void
mhelpgc(void)
{
g->m->helpgc = -1;
}
void
runtime_startTheWorldWithSema(void)
{
P *p, *p1;
M *mp;
G *gp;
bool add;
g->m->locks++; // disable preemption because it can be holding p in a local var
gp = runtime_netpoll(false); // non-blocking
injectglist(gp);
add = needaddgcproc();
runtime_lock(&runtime_sched->lock);
if(newprocs) {
procresize(newprocs);
newprocs = 0;
} else
procresize(runtime_gomaxprocs);
runtime_sched->gcwaiting = 0;
p1 = nil;
while((p = pidleget()) != nil) {
// procresize() puts p's with work at the beginning of the list.
// Once we reach a p without a run queue, the rest don't have one either.
if(p->runqhead == p->runqtail) {
pidleput(p);
break;
}
p->m = (uintptr)mget();
p->link = (uintptr)p1;
p1 = p;
}
if(runtime_sched->sysmonwait) {
runtime_sched->sysmonwait = false;
runtime_notewakeup(&runtime_sched->sysmonnote);
}
runtime_unlock(&runtime_sched->lock);
while(p1) {
p = p1;
p1 = (P*)p1->link;
if(p->m) {
mp = (M*)p->m;
p->m = 0;
if(mp->nextp)
runtime_throw("startTheWorldWithSema: inconsistent mp->nextp");
mp->nextp = (uintptr)p;
runtime_notewakeup(&mp->park);
} else {
// Start M to run P. Do not start another M below.
newm(nil, p);
add = false;
}
}
if(add) {
// If GC could have used another helper proc, start one now,
// in the hope that it will be available next time.
// It would have been even better to start it before the collection,
// but doing so requires allocating memory, so it's tricky to
// coordinate. This lazy approach works out in practice:
// we don't mind if the first couple gc rounds don't have quite
// the maximum number of procs.
newm(mhelpgc, nil);
}
g->m->locks--;
}
// Called to start an M.
void*
runtime_mstart(void* mp)
......@@ -1055,7 +840,7 @@ makeGContext(G* gp, byte* sp, uintptr spsize) {
}
// Create a new m. It will start off with a call to fn, or else the scheduler.
static void
void
newm(void(*fn)(void), P *p)
{
M *mp;
......@@ -1067,40 +852,6 @@ newm(void(*fn)(void), P *p)
runtime_newosproc(mp);
}
// Stops execution of the current m until new work is available.
// Returns with acquired P.
static void
stopm(void)
{
M* m;
m = g->m;
if(m->locks)
runtime_throw("stopm holding locks");
if(m->p)
runtime_throw("stopm holding p");
if(m->spinning) {
m->spinning = false;
runtime_xadd(&runtime_sched->nmspinning, -1);
}
retry:
runtime_lock(&runtime_sched->lock);
mput(m);
runtime_unlock(&runtime_sched->lock);
runtime_notesleep(&m->park);
m = g->m;
runtime_noteclear(&m->park);
if(m->helpgc) {
runtime_gchelper();
m->helpgc = 0;
m->mcache = nil;
goto retry;
}
acquirep((P*)m->nextp);
m->nextp = 0;
}
static void
mspinning(void)
{
......@@ -1109,7 +860,7 @@ mspinning(void)
// Schedules some M to run the p (creates an M if necessary).
// If p==nil, tries to get an idle P, if no idle P's does nothing.
static void
void
startm(P *p, bool spinning)
{
M *mp;
......@@ -1118,381 +869,32 @@ startm(P *p, bool spinning)
runtime_lock(&runtime_sched->lock);
if(p == nil) {
p = pidleget();
if(p == nil) {
runtime_unlock(&runtime_sched->lock);
if(spinning)
runtime_xadd(&runtime_sched->nmspinning, -1);
return;
}
}
mp = mget();
runtime_unlock(&runtime_sched->lock);
if(mp == nil) {
fn = nil;
if(spinning)
fn = mspinning;
newm(fn, p);
return;
}
if(mp->spinning)
runtime_throw("startm: m is spinning");
if(mp->nextp)
runtime_throw("startm: m has p");
mp->spinning = spinning;
mp->nextp = (uintptr)p;
runtime_notewakeup(&mp->park);
}
// Hands off P from syscall or locked M.
static void
handoffp(P *p)
{
// if it has local work, start it straight away
if(p->runqhead != p->runqtail || runtime_sched->runqsize) {
startm(p, false);
return;
}
// no local work, check that there are no spinning/idle M's,
// otherwise our help is not required
if(runtime_atomicload(&runtime_sched->nmspinning) + runtime_atomicload(&runtime_sched->npidle) == 0 && // TODO: fast atomic
runtime_cas(&runtime_sched->nmspinning, 0, 1)) {
startm(p, true);
return;
}
runtime_lock(&runtime_sched->lock);
if(runtime_sched->gcwaiting) {
p->status = _Pgcstop;
if(--runtime_sched->stopwait == 0)
runtime_notewakeup(&runtime_sched->stopnote);
runtime_unlock(&runtime_sched->lock);
return;
}
if(runtime_sched->runqsize) {
runtime_unlock(&runtime_sched->lock);
startm(p, false);
return;
}
// If this is the last running P and nobody is polling network,
// need to wakeup another M to poll network.
if(runtime_sched->npidle == (uint32)runtime_gomaxprocs-1 && runtime_atomicload64(&runtime_sched->lastpoll) != 0) {
runtime_unlock(&runtime_sched->lock);
startm(p, false);
return;
}
pidleput(p);
runtime_unlock(&runtime_sched->lock);
}
// Tries to add one more P to execute G's.
// Called when a G is made runnable (newproc, ready).
static void
wakep(void)
{
// be conservative about spinning threads
if(!runtime_cas(&runtime_sched->nmspinning, 0, 1))
return;
startm(nil, true);
}
// Stops execution of the current m that is locked to a g until the g is runnable again.
// Returns with acquired P.
static void
stoplockedm(void)
{
M *m;
P *p;
m = g->m;
if(m->lockedg == nil || m->lockedg->lockedm != m)
runtime_throw("stoplockedm: inconsistent locking");
if(m->p) {
// Schedule another M to run this p.
p = releasep();
handoffp(p);
}
incidlelocked(1);
// Wait until another thread schedules lockedg again.
runtime_notesleep(&m->park);
m = g->m;
runtime_noteclear(&m->park);
if(m->lockedg->atomicstatus != _Grunnable)
runtime_throw("stoplockedm: not runnable");
acquirep((P*)m->nextp);
m->nextp = 0;
}
// Schedules the locked m to run the locked gp.
static void
startlockedm(G *gp)
{
M *mp;
P *p;
mp = gp->lockedm;
if(mp == g->m)
runtime_throw("startlockedm: locked to me");
if(mp->nextp)
runtime_throw("startlockedm: m has p");
// directly handoff current P to the locked m
incidlelocked(-1);
p = releasep();
mp->nextp = (uintptr)p;
runtime_notewakeup(&mp->park);
stopm();
}
// Stops the current m for stoptheworld.
// Returns when the world is restarted.
static void
gcstopm(void)
{
P *p;
if(!runtime_sched->gcwaiting)
runtime_throw("gcstopm: not waiting for gc");
if(g->m->spinning) {
g->m->spinning = false;
runtime_xadd(&runtime_sched->nmspinning, -1);
}
p = releasep();
runtime_lock(&runtime_sched->lock);
p->status = _Pgcstop;
if(--runtime_sched->stopwait == 0)
runtime_notewakeup(&runtime_sched->stopnote);
runtime_unlock(&runtime_sched->lock);
stopm();
}
// Schedules gp to run on the current M.
// Never returns.
static void
execute(G *gp)
{
int32 hz;
if(gp->atomicstatus != _Grunnable) {
runtime_printf("execute: bad g status %d\n", gp->atomicstatus);
runtime_throw("execute: bad g status");
}
gp->atomicstatus = _Grunning;
gp->waitsince = 0;
((P*)g->m->p)->schedtick++;
g->m->curg = gp;
gp->m = g->m;
// Check whether the profiler needs to be turned on or off.
hz = runtime_sched->profilehz;
if(g->m->profilehz != hz)
runtime_resetcpuprofiler(hz);
runtime_gogo(gp);
}
// Finds a runnable goroutine to execute.
// Tries to steal from other P's, get g from global queue, poll network.
static G*
findrunnable(void)
{
G *gp;
P *p;
int32 i;
top:
if(runtime_sched->gcwaiting) {
gcstopm();
goto top;
}
if(runtime_fingwait && runtime_fingwake && (gp = runtime_wakefing()) != nil)
runtime_ready(gp);
// local runq
gp = runqget((P*)g->m->p);
if(gp)
return gp;
// global runq
if(runtime_sched->runqsize) {
runtime_lock(&runtime_sched->lock);
gp = globrunqget((P*)g->m->p, 0);
runtime_unlock(&runtime_sched->lock);
if(gp)
return gp;
}
// poll network
gp = runtime_netpoll(false); // non-blocking
if(gp) {
injectglist((G*)gp->schedlink);
gp->atomicstatus = _Grunnable;
return gp;
}
// If number of spinning M's >= number of busy P's, block.
// This is necessary to prevent excessive CPU consumption
// when GOMAXPROCS>>1 but the program parallelism is low.
if(!g->m->spinning && 2 * runtime_atomicload(&runtime_sched->nmspinning) >= runtime_gomaxprocs - runtime_atomicload(&runtime_sched->npidle)) // TODO: fast atomic
goto stop;
if(!g->m->spinning) {
g->m->spinning = true;
runtime_xadd(&runtime_sched->nmspinning, 1);
}
// random steal from other P's
for(i = 0; i < 2*runtime_gomaxprocs; i++) {
if(runtime_sched->gcwaiting)
goto top;
p = runtime_allp[runtime_fastrand1()%runtime_gomaxprocs];
if(p == (P*)g->m->p)
gp = runqget(p);
else
gp = runqsteal((P*)g->m->p, p);
if(gp)
return gp;
}
stop:
// return P and block
runtime_lock(&runtime_sched->lock);
if(runtime_sched->gcwaiting) {
runtime_unlock(&runtime_sched->lock);
goto top;
}
if(runtime_sched->runqsize) {
gp = globrunqget((P*)g->m->p, 0);
runtime_unlock(&runtime_sched->lock);
return gp;
}
p = releasep();
pidleput(p);
runtime_unlock(&runtime_sched->lock);
if(g->m->spinning) {
g->m->spinning = false;
runtime_xadd(&runtime_sched->nmspinning, -1);
}
// check all runqueues once again
for(i = 0; i < runtime_gomaxprocs; i++) {
p = runtime_allp[i];
if(p && p->runqhead != p->runqtail) {
runtime_lock(&runtime_sched->lock);
p = pidleget();
runtime_unlock(&runtime_sched->lock);
if(p) {
acquirep(p);
goto top;
}
break;
}
}
// poll network
if(runtime_xchg64(&runtime_sched->lastpoll, 0) != 0) {
if(g->m->p)
runtime_throw("findrunnable: netpoll with p");
if(g->m->spinning)
runtime_throw("findrunnable: netpoll with spinning");
gp = runtime_netpoll(true); // block until new work is available
runtime_atomicstore64(&runtime_sched->lastpoll, runtime_nanotime());
if(gp) {
runtime_lock(&runtime_sched->lock);
p = pidleget();
runtime_unlock(&runtime_sched->lock);
if(p) {
acquirep(p);
injectglist((G*)gp->schedlink);
gp->atomicstatus = _Grunnable;
return gp;
}
injectglist(gp);
}
}
stopm();
goto top;
}
static void
resetspinning(void)
{
int32 nmspinning;
if(g->m->spinning) {
g->m->spinning = false;
nmspinning = runtime_xadd(&runtime_sched->nmspinning, -1);
if(nmspinning < 0)
runtime_throw("findrunnable: negative nmspinning");
} else
nmspinning = runtime_atomicload(&runtime_sched->nmspinning);
// M wakeup policy is deliberately somewhat conservative (see nmspinning handling),
// so see if we need to wakeup another P here.
if (nmspinning == 0 && runtime_atomicload(&runtime_sched->npidle) > 0)
wakep();
}
// Injects the list of runnable G's into the scheduler.
// Can run concurrently with GC.
static void
injectglist(G *glist)
{
int32 n;
G *gp;
if(glist == nil)
return;
runtime_lock(&runtime_sched->lock);
for(n = 0; glist; n++) {
gp = glist;
glist = (G*)gp->schedlink;
gp->atomicstatus = _Grunnable;
globrunqput(gp);
}
runtime_unlock(&runtime_sched->lock);
for(; n && runtime_sched->npidle; n--)
startm(nil, false);
}
// One round of scheduler: find a runnable goroutine and execute it.
// Never returns.
static void
schedule(void)
{
G *gp;
uint32 tick;
if(g->m->locks)
runtime_throw("schedule: holding locks");
top:
if(runtime_sched->gcwaiting) {
gcstopm();
goto top;
}
gp = nil;
// Check the global runnable queue once in a while to ensure fairness.
// Otherwise two goroutines can completely occupy the local runqueue
// by constantly respawning each other.
tick = ((P*)g->m->p)->schedtick;
// This is a fancy way to say tick%61==0,
// it uses 2 MUL instructions instead of a single DIV and so is faster on modern processors.
if(tick - (((uint64)tick*0x4325c53fu)>>36)*61 == 0 && runtime_sched->runqsize > 0) {
runtime_lock(&runtime_sched->lock);
gp = globrunqget((P*)g->m->p, 1);
if(p == nil) {
runtime_unlock(&runtime_sched->lock);
if(gp)
resetspinning();
if(spinning)
runtime_xadd(&runtime_sched->nmspinning, -1);
return;
}
if(gp == nil) {
gp = runqget((P*)g->m->p);
if(gp && g->m->spinning)
runtime_throw("schedule: spinning with local work");
}
if(gp == nil) {
gp = findrunnable(); // blocks until work is available
resetspinning();
mp = mget();
runtime_unlock(&runtime_sched->lock);
if(mp == nil) {
fn = nil;
if(spinning)
fn = mspinning;
newm(fn, p);
return;
}
if(gp->lockedm) {
// Hands off own p to the locked m,
// then blocks waiting for a new p.
startlockedm(gp);
goto top;
if(mp->spinning)
runtime_throw("startm: m is spinning");
if(mp->nextp)
runtime_throw("startm: m has p");
if(spinning && !runqempty(p)) {
runtime_throw("startm: p has runnable gs");
}
execute(gp);
mp->spinning = spinning;
mp->nextp = (uintptr)p;
runtime_notewakeup(&mp->park);
}
// Puts the current goroutine into a waiting state and calls unlockf.
......@@ -1572,12 +974,12 @@ park0(G *gp)
m->waitlock = nil;
if(!ok) {
gp->atomicstatus = _Grunnable;
execute(gp); // Schedule it back, never returns.
execute(gp, true); // Schedule it back, never returns.
}
}
if(m->lockedg) {
stoplockedm();
execute(gp); // Never returns.
execute(gp, true); // Never returns.
}
schedule();
}
......@@ -1606,7 +1008,7 @@ runtime_gosched0(G *gp)
runtime_unlock(&runtime_sched->lock);
if(m->lockedg) {
stoplockedm();
execute(gp); // Never returns.
execute(gp, true); // Never returns.
}
schedule();
}
......@@ -1643,6 +1045,7 @@ goexit0(G *gp)
gp->writebuf.__capacity = 0;
gp->waitreason = runtime_gostringnocopy(nil);
gp->param = nil;
m->curg->m = nil;
m->curg = nil;
m->lockedg = nil;
if(m->locked & ~_LockExternal) {
......@@ -1896,12 +1299,12 @@ exitsyscall0(G *gp)
runtime_unlock(&runtime_sched->lock);
if(p) {
acquirep(p);
execute(gp); // Never returns.
execute(gp, false); // Never returns.
}
if(m->lockedg) {
// Wait until another thread schedules gp and so m again.
stoplockedm();
execute(gp); // Never returns.
execute(gp, false); // Never returns.
}
stopm();
schedule(); // Never returns.
......@@ -2069,7 +1472,7 @@ __go_go(void (*fn)(void*), void* arg)
makeGContext(newg, sp, (uintptr)spsize);
runqput(p, newg);
runqput(p, newg, true);
if(runtime_atomicload(&runtime_sched->npidle) != 0 && runtime_atomicload(&runtime_sched->nmspinning) == 0 && fn != runtime_main) // TODO: fast atomic
wakep();
......@@ -2126,23 +1529,6 @@ retry:
return gp;
}
// Purge all cached G's from gfree list to the global list.
static void
gfpurge(P *p)
{
G *gp;
runtime_lock(&runtime_sched->gflock);
while(p->gfreecnt) {
p->gfreecnt--;
gp = p->gfree;
p->gfree = (G*)gp->schedlink;
gp->schedlink = (uintptr)runtime_sched->gfree;
runtime_sched->gfree = gp;
}
runtime_unlock(&runtime_sched->gflock);
}
void
runtime_Breakpoint(void)
{
......@@ -2157,38 +1543,6 @@ runtime_Gosched(void)
runtime_gosched();
}
// Implementation of runtime.GOMAXPROCS.
// delete when scheduler is even stronger
intgo runtime_GOMAXPROCS(intgo)
__asm__(GOSYM_PREFIX "runtime.GOMAXPROCS");
intgo
runtime_GOMAXPROCS(intgo n)
{
intgo ret;
if(n > _MaxGomaxprocs)
n = _MaxGomaxprocs;
runtime_lock(&runtime_sched->lock);
ret = (intgo)runtime_gomaxprocs;
if(n <= 0 || n == ret) {
runtime_unlock(&runtime_sched->lock);
return ret;
}
runtime_unlock(&runtime_sched->lock);
runtime_acquireWorldsema();
g->m->gcing = 1;
runtime_stopTheWorldWithSema();
newprocs = (int32)n;
g->m->gcing = 0;
runtime_releaseWorldsema();
runtime_startTheWorldWithSema();
return ret;
}
// lockOSThread is called by runtime.LockOSThread and runtime.lockOSThread below
// after they modify m->locked. Do not allow preemption during this call,
// or else the m might be different in this function than in the caller.
......@@ -2365,599 +1719,6 @@ runtime_setcpuprofilerate_m(int32 hz)
g->m->locks--;
}
// Change number of processors. The world is stopped, sched is locked.
static void
procresize(int32 new)
{
int32 i, old;
bool pempty;
G *gp;
P *p;
intgo j;
old = runtime_gomaxprocs;
if(old < 0 || old > _MaxGomaxprocs || new <= 0 || new >_MaxGomaxprocs)
runtime_throw("procresize: invalid arg");
// initialize new P's
for(i = 0; i < new; i++) {
p = runtime_allp[i];
if(p == nil) {
p = (P*)runtime_mallocgc(sizeof(*p), 0, FlagNoInvokeGC);
p->id = i;
p->status = _Pgcstop;
p->deferpool.__values = &p->deferpoolbuf[0];
p->deferpool.__count = 0;
p->deferpool.__capacity = nelem(p->deferpoolbuf);
runtime_atomicstorep(&runtime_allp[i], p);
}
if(p->mcache == nil) {
if(old==0 && i==0)
p->mcache = g->m->mcache; // bootstrap
else
p->mcache = runtime_allocmcache();
}
}
// redistribute runnable G's evenly
// collect all runnable goroutines in global queue preserving FIFO order
// FIFO order is required to ensure fairness even during frequent GCs
// see http://golang.org/issue/7126
pempty = false;
while(!pempty) {
pempty = true;
for(i = 0; i < old; i++) {
p = runtime_allp[i];
if(p->runqhead == p->runqtail)
continue;
pempty = false;
// pop from tail of local queue
p->runqtail--;
gp = (G*)p->runq[p->runqtail%nelem(p->runq)];
// push onto head of global queue
gp->schedlink = runtime_sched->runqhead;
runtime_sched->runqhead = (uintptr)gp;
if(runtime_sched->runqtail == 0)
runtime_sched->runqtail = (uintptr)gp;
runtime_sched->runqsize++;
}
}
// fill local queues with at most nelem(p->runq)/2 goroutines
// start at 1 because current M already executes some G and will acquire allp[0] below,
// so if we have a spare G we want to put it into allp[1].
for(i = 1; (uint32)i < (uint32)new * nelem(p->runq)/2 && runtime_sched->runqsize > 0; i++) {
gp = (G*)runtime_sched->runqhead;
runtime_sched->runqhead = gp->schedlink;
if(runtime_sched->runqhead == 0)
runtime_sched->runqtail = 0;
runtime_sched->runqsize--;
runqput(runtime_allp[i%new], gp);
}
// free unused P's
for(i = new; i < old; i++) {
p = runtime_allp[i];
for(j = 0; j < p->deferpool.__count; j++) {
((struct _defer**)p->deferpool.__values)[j] = nil;
}
p->deferpool.__count = 0;
runtime_freemcache(p->mcache);
p->mcache = nil;
gfpurge(p);
p->status = _Pdead;
// can't free P itself because it can be referenced by an M in syscall
}
if(g->m->p)
((P*)g->m->p)->m = 0;
g->m->p = 0;
g->m->mcache = nil;
p = runtime_allp[0];
p->m = 0;
p->status = _Pidle;
acquirep(p);
for(i = new-1; i > 0; i--) {
p = runtime_allp[i];
p->status = _Pidle;
pidleput(p);
}
runtime_atomicstore((uint32*)&runtime_gomaxprocs, new);
}
// Associate p and the current m.
static void
acquirep(P *p)
{
M *m;
m = g->m;
if(m->p || m->mcache)
runtime_throw("acquirep: already in go");
if(p->m || p->status != _Pidle) {
runtime_printf("acquirep: p->m=%p(%d) p->status=%d\n", p->m, p->m ? ((M*)p->m)->id : 0, p->status);
runtime_throw("acquirep: invalid p state");
}
m->mcache = p->mcache;
m->p = (uintptr)p;
p->m = (uintptr)m;
p->status = _Prunning;
}
// Disassociate p and the current m.
static P*
releasep(void)
{
M *m;
P *p;
m = g->m;
if(m->p == 0 || m->mcache == nil)
runtime_throw("releasep: invalid arg");
p = (P*)m->p;
if((M*)p->m != m || p->mcache != m->mcache || p->status != _Prunning) {
runtime_printf("releasep: m=%p m->p=%p p->m=%p m->mcache=%p p->mcache=%p p->status=%d\n",
m, m->p, p->m, m->mcache, p->mcache, p->status);
runtime_throw("releasep: invalid p state");
}
m->p = 0;
m->mcache = nil;
p->m = 0;
p->status = _Pidle;
return p;
}
static void
incidlelocked(int32 v)
{
runtime_lock(&runtime_sched->lock);
runtime_sched->nmidlelocked += v;
if(v > 0)
checkdead();
runtime_unlock(&runtime_sched->lock);
}
static void
sysmon(void)
{
uint32 idle, delay;
int64 now, lastpoll, lasttrace;
G *gp;
lasttrace = 0;
idle = 0; // how many cycles in succession we had not wokeup somebody
delay = 0;
for(;;) {
if(idle == 0) // start with 20us sleep...
delay = 20;
else if(idle > 50) // start doubling the sleep after 1ms...
delay *= 2;
if(delay > 10*1000) // up to 10ms
delay = 10*1000;
runtime_usleep(delay);
if(runtime_debug.schedtrace <= 0 &&
(runtime_sched->gcwaiting || runtime_atomicload(&runtime_sched->npidle) == (uint32)runtime_gomaxprocs)) { // TODO: fast atomic
runtime_lock(&runtime_sched->lock);
if(runtime_atomicload(&runtime_sched->gcwaiting) || runtime_atomicload(&runtime_sched->npidle) == (uint32)runtime_gomaxprocs) {
runtime_atomicstore(&runtime_sched->sysmonwait, 1);
runtime_unlock(&runtime_sched->lock);
runtime_notesleep(&runtime_sched->sysmonnote);
runtime_noteclear(&runtime_sched->sysmonnote);
idle = 0;
delay = 20;
} else
runtime_unlock(&runtime_sched->lock);
}
// poll network if not polled for more than 10ms
lastpoll = runtime_atomicload64(&runtime_sched->lastpoll);
now = runtime_nanotime();
if(lastpoll != 0 && lastpoll + 10*1000*1000 < now) {
runtime_cas64(&runtime_sched->lastpoll, lastpoll, now);
gp = runtime_netpoll(false); // non-blocking
if(gp) {
// Need to decrement number of idle locked M's
// (pretending that one more is running) before injectglist.
// Otherwise it can lead to the following situation:
// injectglist grabs all P's but before it starts M's to run the P's,
// another M returns from syscall, finishes running its G,
// observes that there is no work to do and no other running M's
// and reports deadlock.
incidlelocked(-1);
injectglist(gp);
incidlelocked(1);
}
}
// retake P's blocked in syscalls
// and preempt long running G's
if(retake(now))
idle = 0;
else
idle++;
if(runtime_debug.schedtrace > 0 && lasttrace + runtime_debug.schedtrace*1000000ll <= now) {
lasttrace = now;
runtime_schedtrace(runtime_debug.scheddetail);
}
}
}
typedef struct Pdesc Pdesc;
struct Pdesc
{
uint32 schedtick;
int64 schedwhen;
uint32 syscalltick;
int64 syscallwhen;
};
static Pdesc pdesc[_MaxGomaxprocs];
static uint32
retake(int64 now)
{
uint32 i, s, n;
int64 t;
P *p;
Pdesc *pd;
n = 0;
for(i = 0; i < (uint32)runtime_gomaxprocs; i++) {
p = runtime_allp[i];
if(p==nil)
continue;
pd = &pdesc[i];
s = p->status;
if(s == _Psyscall) {
// Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us).
t = p->syscalltick;
if(pd->syscalltick != t) {
pd->syscalltick = t;
pd->syscallwhen = now;
continue;
}
// On the one hand we don't want to retake Ps if there is no other work to do,
// but on the other hand we want to retake them eventually
// because they can prevent the sysmon thread from deep sleep.
if(p->runqhead == p->runqtail &&
runtime_atomicload(&runtime_sched->nmspinning) + runtime_atomicload(&runtime_sched->npidle) > 0 &&
pd->syscallwhen + 10*1000*1000 > now)
continue;
// Need to decrement number of idle locked M's
// (pretending that one more is running) before the CAS.
// Otherwise the M from which we retake can exit the syscall,
// increment nmidle and report deadlock.
incidlelocked(-1);
if(runtime_cas(&p->status, s, _Pidle)) {
n++;
handoffp(p);
}
incidlelocked(1);
} else if(s == _Prunning) {
// Preempt G if it's running for more than 10ms.
t = p->schedtick;
if(pd->schedtick != t) {
pd->schedtick = t;
pd->schedwhen = now;
continue;
}
if(pd->schedwhen + 10*1000*1000 > now)
continue;
// preemptone(p);
}
}
return n;
}
// Tell all goroutines that they have been preempted and they should stop.
// This function is purely best-effort. It can fail to inform a goroutine if a
// processor just started running it.
// No locks need to be held.
// Returns true if preemption request was issued to at least one goroutine.
static bool
preemptall(void)
{
return false;
}
// Put mp on midle list.
// Sched must be locked.
static void
mput(M *mp)
{
mp->schedlink = runtime_sched->midle;
runtime_sched->midle = (uintptr)mp;
runtime_sched->nmidle++;
checkdead();
}
// Try to get an m from midle list.
// Sched must be locked.
static M*
mget(void)
{
M *mp;
if((mp = (M*)runtime_sched->midle) != nil){
runtime_sched->midle = mp->schedlink;
runtime_sched->nmidle--;
}
return mp;
}
// Put gp on the global runnable queue.
// Sched must be locked.
static void
globrunqput(G *gp)
{
gp->schedlink = 0;
if(runtime_sched->runqtail)
((G*)runtime_sched->runqtail)->schedlink = (uintptr)gp;
else
runtime_sched->runqhead = (uintptr)gp;
runtime_sched->runqtail = (uintptr)gp;
runtime_sched->runqsize++;
}
// Put a batch of runnable goroutines on the global runnable queue.
// Sched must be locked.
static void
globrunqputbatch(G *ghead, G *gtail, int32 n)
{
gtail->schedlink = 0;
if(runtime_sched->runqtail)
((G*)runtime_sched->runqtail)->schedlink = (uintptr)ghead;
else
runtime_sched->runqhead = (uintptr)ghead;
runtime_sched->runqtail = (uintptr)gtail;
runtime_sched->runqsize += n;
}
// Try get a batch of G's from the global runnable queue.
// Sched must be locked.
static G*
globrunqget(P *p, int32 max)
{
G *gp, *gp1;
int32 n;
if(runtime_sched->runqsize == 0)
return nil;
n = runtime_sched->runqsize/runtime_gomaxprocs+1;
if(n > runtime_sched->runqsize)
n = runtime_sched->runqsize;
if(max > 0 && n > max)
n = max;
if((uint32)n > nelem(p->runq)/2)
n = nelem(p->runq)/2;
runtime_sched->runqsize -= n;
if(runtime_sched->runqsize == 0)
runtime_sched->runqtail = 0;
gp = (G*)runtime_sched->runqhead;
runtime_sched->runqhead = gp->schedlink;
n--;
while(n--) {
gp1 = (G*)runtime_sched->runqhead;
runtime_sched->runqhead = gp1->schedlink;
runqput(p, gp1);
}
return gp;
}
// Put p to on pidle list.
// Sched must be locked.
static void
pidleput(P *p)
{
p->link = runtime_sched->pidle;
runtime_sched->pidle = (uintptr)p;
runtime_xadd(&runtime_sched->npidle, 1); // TODO: fast atomic
}
// Try get a p from pidle list.
// Sched must be locked.
static P*
pidleget(void)
{
P *p;
p = (P*)runtime_sched->pidle;
if(p) {
runtime_sched->pidle = p->link;
runtime_xadd(&runtime_sched->npidle, -1); // TODO: fast atomic
}
return p;
}
// Try to put g on local runnable queue.
// If it's full, put onto global queue.
// Executed only by the owner P.
static void
runqput(P *p, G *gp)
{
uint32 h, t;
retry:
h = runtime_atomicload(&p->runqhead); // load-acquire, synchronize with consumers
t = p->runqtail;
if(t - h < nelem(p->runq)) {
p->runq[t%nelem(p->runq)] = (uintptr)gp;
runtime_atomicstore(&p->runqtail, t+1); // store-release, makes the item available for consumption
return;
}
if(runqputslow(p, gp, h, t))
return;
// the queue is not full, now the put above must suceed
goto retry;
}
// Put g and a batch of work from local runnable queue on global queue.
// Executed only by the owner P.
static bool
runqputslow(P *p, G *gp, uint32 h, uint32 t)
{
G *batch[nelem(p->runq)/2+1];
uint32 n, i;
// First, grab a batch from local queue.
n = t-h;
n = n/2;
if(n != nelem(p->runq)/2)
runtime_throw("runqputslow: queue is not full");
for(i=0; i<n; i++)
batch[i] = (G*)p->runq[(h+i)%nelem(p->runq)];
if(!runtime_cas(&p->runqhead, h, h+n)) // cas-release, commits consume
return false;
batch[n] = gp;
// Link the goroutines.
for(i=0; i<n; i++)
batch[i]->schedlink = (uintptr)batch[i+1];
// Now put the batch on global queue.
runtime_lock(&runtime_sched->lock);
globrunqputbatch(batch[0], batch[n], n+1);
runtime_unlock(&runtime_sched->lock);
return true;
}
// Get g from local runnable queue.
// Executed only by the owner P.
static G*
runqget(P *p)
{
G *gp;
uint32 t, h;
for(;;) {
h = runtime_atomicload(&p->runqhead); // load-acquire, synchronize with other consumers
t = p->runqtail;
if(t == h)
return nil;
gp = (G*)p->runq[h%nelem(p->runq)];
if(runtime_cas(&p->runqhead, h, h+1)) // cas-release, commits consume
return gp;
}
}
// Grabs a batch of goroutines from local runnable queue.
// batch array must be of size nelem(p->runq)/2. Returns number of grabbed goroutines.
// Can be executed by any P.
static uint32
runqgrab(P *p, G **batch)
{
uint32 t, h, n, i;
for(;;) {
h = runtime_atomicload(&p->runqhead); // load-acquire, synchronize with other consumers
t = runtime_atomicload(&p->runqtail); // load-acquire, synchronize with the producer
n = t-h;
n = n - n/2;
if(n == 0)
break;
if(n > nelem(p->runq)/2) // read inconsistent h and t
continue;
for(i=0; i<n; i++)
batch[i] = (G*)p->runq[(h+i)%nelem(p->runq)];
if(runtime_cas(&p->runqhead, h, h+n)) // cas-release, commits consume
break;
}
return n;
}
// Steal half of elements from local runnable queue of p2
// and put onto local runnable queue of p.
// Returns one of the stolen elements (or nil if failed).
static G*
runqsteal(P *p, P *p2)
{
G *gp;
G *batch[nelem(p->runq)/2];
uint32 t, h, n, i;
n = runqgrab(p2, batch);
if(n == 0)
return nil;
n--;
gp = batch[n];
if(n == 0)
return gp;
h = runtime_atomicload(&p->runqhead); // load-acquire, synchronize with consumers
t = p->runqtail;
if(t - h + n >= nelem(p->runq))
runtime_throw("runqsteal: runq overflow");
for(i=0; i<n; i++, t++)
p->runq[t%nelem(p->runq)] = (uintptr)batch[i];
runtime_atomicstore(&p->runqtail, t); // store-release, makes the item available for consumption
return gp;
}
void runtime_testSchedLocalQueue(void)
__asm__("runtime.testSchedLocalQueue");
void
runtime_testSchedLocalQueue(void)
{
P p;
G gs[nelem(p.runq)];
int32 i, j;
runtime_memclr((byte*)&p, sizeof(p));
for(i = 0; i < (int32)nelem(gs); i++) {
if(runqget(&p) != nil)
runtime_throw("runq is not empty initially");
for(j = 0; j < i; j++)
runqput(&p, &gs[i]);
for(j = 0; j < i; j++) {
if(runqget(&p) != &gs[i]) {
runtime_printf("bad element at iter %d/%d\n", i, j);
runtime_throw("bad element");
}
}
if(runqget(&p) != nil)
runtime_throw("runq is not empty afterwards");
}
}
void runtime_testSchedLocalQueueSteal(void)
__asm__("runtime.testSchedLocalQueueSteal");
void
runtime_testSchedLocalQueueSteal(void)
{
P p1, p2;
G gs[nelem(p1.runq)], *gp;
int32 i, j, s;
runtime_memclr((byte*)&p1, sizeof(p1));
runtime_memclr((byte*)&p2, sizeof(p2));
for(i = 0; i < (int32)nelem(gs); i++) {
for(j = 0; j < i; j++) {
gs[j].sig = 0;
runqput(&p1, &gs[j]);
}
gp = runqsteal(&p2, &p1);
s = 0;
if(gp) {
s++;
gp->sig++;
}
while((gp = runqget(&p2)) != nil) {
s++;
gp->sig++;
}
while((gp = runqget(&p1)) != nil)
gp->sig++;
for(j = 0; j < i; j++) {
if(gs[j].sig != 1) {
runtime_printf("bad element %d(%d) at iter %d\n", j, gs[j].sig, i);
runtime_throw("bad element");
}
}
if(s != i/2 && s != i/2+1) {
runtime_printf("bad steal %d, want %d or %d, iter %d\n",
s, i/2, i/2+1, i);
runtime_throw("bad steal");
}
}
}
intgo
runtime_setmaxthreads(intgo in)
{
......@@ -3041,56 +1802,15 @@ os_beforeExit()
{
}
// Active spinning for sync.Mutex.
//go:linkname sync_runtime_canSpin sync.runtime_canSpin
enum
{
ACTIVE_SPIN = 4,
ACTIVE_SPIN_CNT = 30,
};
extern _Bool sync_runtime_canSpin(intgo i)
__asm__ (GOSYM_PREFIX "sync.runtime_canSpin");
_Bool
sync_runtime_canSpin(intgo i)
{
P *p;
// sync.Mutex is cooperative, so we are conservative with spinning.
// Spin only few times and only if running on a multicore machine and
// GOMAXPROCS>1 and there is at least one other running P and local runq is empty.
// As opposed to runtime mutex we don't do passive spinning here,
// because there can be work on global runq on on other Ps.
if (i >= ACTIVE_SPIN || runtime_ncpu <= 1 || runtime_gomaxprocs <= (int32)(runtime_sched->npidle+runtime_sched->nmspinning)+1) {
return false;
}
p = (P*)g->m->p;
return p != nil && p->runqhead == p->runqtail;
}
//go:linkname sync_runtime_doSpin sync.runtime_doSpin
//go:nosplit
extern void sync_runtime_doSpin(void)
__asm__ (GOSYM_PREFIX "sync.runtime_doSpin");
void
sync_runtime_doSpin()
{
runtime_procyield(ACTIVE_SPIN_CNT);
}
// For Go code to look at variables, until we port proc.go.
extern M** runtime_go_allm(void)
extern M* runtime_go_allm(void)
__asm__ (GOSYM_PREFIX "runtime.allm");
M**
M*
runtime_go_allm()
{
return &runtime_allm;
return runtime_allm;
}
intgo NumCPU(void) __asm__ (GOSYM_PREFIX "runtime.NumCPU");
......
......@@ -240,7 +240,6 @@ extern G* runtime_lastg;
extern M* runtime_allm;
extern P** runtime_allp;
extern Sched* runtime_sched;
extern int32 runtime_gomaxprocs;
extern uint32 runtime_panicking(void)
__asm__ (GOSYM_PREFIX "runtime.getPanicking");
extern int8* runtime_goos;
......@@ -260,7 +259,8 @@ extern bool runtime_isarchive;
intgo runtime_findnull(const byte*)
__asm__ (GOSYM_PREFIX "runtime.findnull");
void runtime_gogo(G*);
void runtime_gogo(G*)
__asm__ (GOSYM_PREFIX "runtime.gogo");
struct __go_func_type;
void runtime_args(int32, byte**)
__asm__ (GOSYM_PREFIX "runtime.args");
......@@ -294,7 +294,8 @@ void runtime_printtrace(Slice, G*)
#define runtime_read(d, v, n) read((d), (v), (n))
#define runtime_write(d, v, n) write((d), (v), (n))
#define runtime_close(d) close(d)
void runtime_ready(G*);
void runtime_ready(G*, intgo, bool)
__asm__ (GOSYM_PREFIX "runtime.ready");
String runtime_getenv(const char*);
int32 runtime_atoi(const byte*, intgo);
void* runtime_mstart(void*);
......@@ -307,7 +308,8 @@ void runtime_signalstack(byte*, uintptr)
__asm__ (GOSYM_PREFIX "runtime.signalstack");
MCache* runtime_allocmcache(void)
__asm__ (GOSYM_PREFIX "runtime.allocmcache");
void runtime_freemcache(MCache*);
void runtime_freemcache(MCache*)
__asm__ (GOSYM_PREFIX "runtime.freemcache");
void runtime_mallocinit(void);
void runtime_mprofinit(void);
#define runtime_getcallersp(p) __builtin_frame_address(0)
......@@ -368,8 +370,6 @@ int64 runtime_unixnanotime(void) // real time, can skip
void runtime_dopanic(int32) __attribute__ ((noreturn));
void runtime_startpanic(void)
__asm__ (GOSYM_PREFIX "runtime.startpanic");
void runtime_freezetheworld(void)
__asm__ (GOSYM_PREFIX "runtime.freezetheworld");
void runtime_unwindstack(G*, byte*);
void runtime_sigprof()
__asm__ (GOSYM_PREFIX "runtime.sigprof");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment