type Mutex struct { state int32 sema uint32 } type RWMutex struct { w Mutex // held if there are pending writers writerSem uint32// semaphore for writers to wait for completing readers readerSem uint32// semaphore for readers to wait for completing writers readerCount int32// number of pending readers readerWait int32// number of departing readers }
funcsemacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) { gp := getg() if gp != gp.m.curg { throw("semacquire not on the G stack") }
// Easy case. if cansemacquire(addr) { return }
// Harder case: // increment waiter count // try cansemacquire one more time, return if succeeded // enqueue itself as a waiter // sleep // (waiter descriptor is dequeued by signaler) s := acquireSudog() root := semtable.rootFor(addr) t0 := int64(0) s.releasetime = 0 s.acquiretime = 0 s.ticket = 0 if profile&semaBlockProfile != 0 && blockprofilerate > 0 { t0 = cputicks() s.releasetime = -1 } if profile&semaMutexProfile != 0 && mutexprofilerate > 0 { if t0 == 0 { t0 = cputicks() } s.acquiretime = t0 } for { lockWithRank(&root.lock, lockRankRoot) // Add ourselves to nwait to disable "easy case" in semrelease. atomic.Xadd(&root.nwait, 1) // Check cansemacquire to avoid missed wakeup. if cansemacquire(addr) { atomic.Xadd(&root.nwait, -1) unlock(&root.lock) break } // Any semrelease after the cansemacquire knows we're waiting // (we set nwait above), so go to sleep. root.queue(addr, s, lifo) goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes) if s.ticket != 0 || cansemacquire(addr) { break } } if s.releasetime > 0 { blockevent(s.releasetime-t0, 3+skipframes) } releaseSudog(s) }
分步骤分析
1 2 3 4
gp := getg() if gp != gp.m.curg { throw("semacquire not on the G stack") }
这一段主要是用来获取sema的当前协程栈,如果拿不到的话会抛出异常
1 2 3 4 5 6 7 8 9 10 11 12 13 14
if cansemacquire(addr) { return } funccansemacquire(addr *uint32)bool { for { v := atomic.Load(addr) if v == 0 { returnfalse } if atomic.Cas(addr, v, v-1) { returntrue } } }
// Easy case: no waiters? // This check must happen after the xadd, to avoid a missed wakeup // (see loop in semacquire). if atomic.Load(&root.nwait) == 0 { return }
// Harder case: search for a waiter and wake it. lockWithRank(&root.lock, lockRankRoot) if atomic.Load(&root.nwait) == 0 { // The count is already consumed by another goroutine, // so no need to wake up another goroutine. unlock(&root.lock) return } s, t0 := root.dequeue(addr) if s != nil { atomic.Xadd(&root.nwait, -1) } unlock(&root.lock) if s != nil { // May be slow or even yield, so unlock first acquiretime := s.acquiretime if acquiretime != 0 { mutexevent(t0-acquiretime, 3+skipframes) } if s.ticket != 0 { throw("corrupted semaphore ticket") } if handoff && cansemacquire(addr) { s.ticket = 1 } readyWithTime(s, 5+skipframes) if s.ticket == 1 && getg().m.locks == 0 { // Direct G handoff // readyWithTime has added the waiter G as runnext in the // current P; we now call the scheduler so that we start running // the waiter G immediately. // Note that waiter inherits our time slice: this is desirable // to avoid having a highly contended semaphore hog the P // indefinitely. goyield is like Gosched, but it emits a // "preempted" trace event instead and, more importantly, puts // the current G on the local runq instead of the global one. // We only do this in the starving regime (handoff=true), as in // the non-starving case it is possible for a different waiter // to acquire the semaphore while we are yielding/scheduling, // and this would be wasteful. We wait instead to enter starving // regime, and then we start to do direct handoffs of ticket and // P. // See issue 33747 for discussion. goyield() } } }