0%

golang锁实现之Semaphore

Semaphore

信号量是在并发编程中常见的一种同步机制,在需要控制访问资源的进程数量时就会用到信号量,它会保证持有的计数器在 0 到初始化的权重之间波动。

  • 每次获取资源时都会将信号量中的计数器减去对应的数值,在释放时重新加回来;
  • 当遇到计数器大于信号量大小时,会进入休眠等待其他线程释放信号;

在go的锁中的底层结构体实现过程中可以看到sema这个关键词,如go的互斥锁或读写锁

1
2
3
4
5
6
7
8
9
10
11
type Mutex struct {
state int32
sema uint32
}
type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
readerCount int32 // number of pending readers
readerWait int32 // number of departing readers
}

信号量机制

回顾下操作系统中信号量编程的机制

用户进程可以通过使用操作系统提供的一对原语来对信号量进行操作,从而很方便的实现了进程互斥、进程同步

信号量其实就是一个变量(可以是一个整数,也可以是更复杂的记录型变量),可以用一个信号量来表示系统中某种资源的数量,比如:系统中只有一台打印机,就可以设置一个初值为1的信号量 。

原语是一种特殊的程序段,其执行只能一气呵成,不可被中断。 原语是由关中断/开中断指令实现 的。软件解决方案的主要问题是由“进入区的各种操作无法一气呵成”
因此如果能把进入区、退出区的操作都用, “原语”实现,使这些操作能 一气呵成”就能避免问题。

一对原语wait(s) 原语 和 signall(S)原语, 可以把原语理解为我们自己写的函数,函数名分别为 waitsignal,括号里的信号量S 其实就是两数调用时传入的一个参数。

waitsignal 原语常简称为PV操作(来自荷兰语 proberen 和 verhogen)

我们可以把信号量机制互斥用如下代码实现表述出来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
typedef struct {
int value
struct process *L
} semaphore

void wait (semaphore S) {
S.value--;
if (S.value < 0 ) {
block(S.L)
}
}
void signal (semaphore S) {
S.value++;
if (S.value <= 0 ) {
wakeup(S.L);
}
}

信号量机制通过休眠队列和wakeup(唤醒)block(挂起)机制实现

go的Semaphore的实现

go里面sema的实现主要在runtime/sema.go文件中

数据结构

go的runtime有一个全局变量semtable,它放置了所有的信号量。

1
2
3
4
5
6
7
8
9
var semtable semTable

// Prime to not correlate with any user patterns.
const semTabSize = 251

type semTable [semTabSize]struct {
root semaRoot
pad [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte
}

每个信号量使用semaRoot结构体来表示

1
2
3
4
5
type semaRoot struct {
lock mutex
treap *sudog // 平衡树的根节点
nwait uint32 // Number of waiters. Read w/o the lock.
}

原语P

原语P即是wait,在并发编程信号同步过程中用来进行阻塞等待,go的sema主要通过semacquire1来实现wait,通过sync.runtime_Semacquire来调用

semacquire1流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) {
gp := getg()
if gp != gp.m.curg {
throw("semacquire not on the G stack")
}

// Easy case.
if cansemacquire(addr) {
return
}

// Harder case:
// increment waiter count
// try cansemacquire one more time, return if succeeded
// enqueue itself as a waiter
// sleep
// (waiter descriptor is dequeued by signaler)
s := acquireSudog()
root := semtable.rootFor(addr)
t0 := int64(0)
s.releasetime = 0
s.acquiretime = 0
s.ticket = 0
if profile&semaBlockProfile != 0 && blockprofilerate > 0 {
t0 = cputicks()
s.releasetime = -1
}
if profile&semaMutexProfile != 0 && mutexprofilerate > 0 {
if t0 == 0 {
t0 = cputicks()
}
s.acquiretime = t0
}
for {
lockWithRank(&root.lock, lockRankRoot)
// Add ourselves to nwait to disable "easy case" in semrelease.
atomic.Xadd(&root.nwait, 1)
// Check cansemacquire to avoid missed wakeup.
if cansemacquire(addr) {
atomic.Xadd(&root.nwait, -1)
unlock(&root.lock)
break
}
// Any semrelease after the cansemacquire knows we're waiting
// (we set nwait above), so go to sleep.
root.queue(addr, s, lifo)
goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes)
if s.ticket != 0 || cansemacquire(addr) {
break
}
}
if s.releasetime > 0 {
blockevent(s.releasetime-t0, 3+skipframes)
}
releaseSudog(s)
}

分步骤分析

1
2
3
4
   gp := getg()
if gp != gp.m.curg {
throw("semacquire not on the G stack")
}

这一段主要是用来获取sema的当前协程栈,如果拿不到的话会抛出异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
if cansemacquire(addr) {
return
}
func cansemacquire(addr *uint32) bool {
for {
v := atomic.Load(addr)
if v == 0 {
return false
}
if atomic.Cas(addr, v, v-1) {
return true
}
}
}

atomic.Loadatomic.Cas是原语操作:这段代码可以理解为,判断addr==0,如果为0说明addr被获取过了,要去走下面的流程判断是否需要阻塞,不为0说明addr拿到成功,对addr进行-1操作,此时函数直接return,不会发生阻塞

atomic.Cas的汇编实现

1
2
3
4
5
6
7
8
TEXT ·Cas(SB),NOSPLIT,$0-17
MOVQ ptr+0(FP), BX
MOVL old+8(FP), AX
MOVL new+12(FP), CX
LOCK
CMPXCHGL CX, 0(BX)
SETEQ ret+16(FP)
RET

基于汇编对cpu硬件加锁实现的原子操作

1
2
3
4
5
6
s := acquireSudog()
root := semtable.rootFor(addr)
t0 := int64(0)
s.releasetime = 0
s.acquiretime = 0
s.ticket = 0

这一部分就是拿到sudogroot队列

roots的作用:

  • semacquire1会在semtable数组中找一个元素和它对应上。每个元素都有一个root,这个rootTreap

  • 最后addr变成一个树节点,这个树节点,有自己的一个队列,专门放被阻塞的goroutine。叫它阻塞队列吧。 这个阻塞队列是个双端队列,头尾都可以进。

  • semacquire1把当前goroutine相关元数据放进阻塞队列之后,就挂起了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
for {
lockWithRank(&root.lock, lockRankRoot)
// 记录root等待队列数量+1
atomic.Xadd(&root.nwait, 1)
// 检测addr如果不为0了,就进行唤醒
if cansemacquire(addr) {
atomic.Xadd(&root.nwait, -1)
unlock(&root.lock)
break
}
// 将sudog协程放入到root队列
root.queue(addr, s, lifo)
// 执行挂起
goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes)
if s.ticket != 0 || cansemacquire(addr) {
break
}
}

上面这部分的死循环可以理解为阻塞的过程,在addr未唤醒之前,会将当前sudog假如阻塞队列,并挂起等待

原语V

原语V即是signal,在并发编程信号同步过程中用来进行唤醒,go的sema主要通过semrelease1来实现signal,通过sync.runtime_Semacquire来调用

1
2
3
4
//go:linkname sync_runtime_Semrelease sync.runtime_Semrelease
func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) {
semrelease1(addr, handoff, skipframes)
}

semrelease1流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
func semrelease1(addr *uint32, handoff bool, skipframes int) {
root := semtable.rootFor(addr)
atomic.Xadd(addr, 1)

// Easy case: no waiters?
// This check must happen after the xadd, to avoid a missed wakeup
// (see loop in semacquire).
if atomic.Load(&root.nwait) == 0 {
return
}

// Harder case: search for a waiter and wake it.
lockWithRank(&root.lock, lockRankRoot)
if atomic.Load(&root.nwait) == 0 {
// The count is already consumed by another goroutine,
// so no need to wake up another goroutine.
unlock(&root.lock)
return
}
s, t0 := root.dequeue(addr)
if s != nil {
atomic.Xadd(&root.nwait, -1)
}
unlock(&root.lock)
if s != nil { // May be slow or even yield, so unlock first
acquiretime := s.acquiretime
if acquiretime != 0 {
mutexevent(t0-acquiretime, 3+skipframes)
}
if s.ticket != 0 {
throw("corrupted semaphore ticket")
}
if handoff && cansemacquire(addr) {
s.ticket = 1
}
readyWithTime(s, 5+skipframes)
if s.ticket == 1 && getg().m.locks == 0 {
// Direct G handoff
// readyWithTime has added the waiter G as runnext in the
// current P; we now call the scheduler so that we start running
// the waiter G immediately.
// Note that waiter inherits our time slice: this is desirable
// to avoid having a highly contended semaphore hog the P
// indefinitely. goyield is like Gosched, but it emits a
// "preempted" trace event instead and, more importantly, puts
// the current G on the local runq instead of the global one.
// We only do this in the starving regime (handoff=true), as in
// the non-starving case it is possible for a different waiter
// to acquire the semaphore while we are yielding/scheduling,
// and this would be wasteful. We wait instead to enter starving
// regime, and then we start to do direct handoffs of ticket and
// P.
// See issue 33747 for discussion.
goyield()
}
}
}

分步骤分析

1
2
root := semtable.rootFor(addr)
atomic.Xadd(addr, 1)

到阻塞队列中拿到根结点,并对当前addr进行加1释放

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
if atomic.Load(&root.nwait) == 0 {
return
}
// 对当前代码区加锁
lockWithRank(&root.lock, lockRankRoot)
if atomic.Load(&root.nwait) == 0 {
// 计数已经被其他goroutine消费,所以不需要唤醒其他goroutine
unlock(&root.lock)
return
}
// 从阻塞队列中找到一个addr
s, t0 := root.dequeue(addr)
if s != nil {
// 等待队列计数-1
atomic.Xadd(&root.nwait, -1)
}
// 当前代码区释放锁
unlock(&root.lock)

这段代码的含义可以理解为,对当前区域加临时锁,主要目的从阻塞队列获取一个addr,然后检测下当前根结点的队列等待数量如果为0,说明都释放过了,直接 return 即可,最后对当前代码执行区域释放锁

1
2
3
4
5
6
7
8
readyWithTime(s, 5+skipframes)
func readyWithTime(s *sudog, traceskip int) {
if s.releasetime != 0 {
s.releasetime = cputicks()
}
goready(s.g, traceskip)
}

到这里会将sudog的协程唤醒执行,基本释放操作到这里就结束了,后面的代码主要是针对饥饿状态下g的处理

go的Semaphore的应用

go中在互斥锁和读写锁中都用到了Semaphore,在sync.mutexsync.rwmutex都有调用,当然,在其他结构体中比如waitgroup也有用到,这里只列出使用场景比较高的结构体,通过runtime_SemacquireMutexruntime_Semrelease实现调用
那么为什么会采用Semaphore呢,主要目的还是提高高并发场景下锁的性能,正常情况下可以通过CAS中的自旋也可以实现协程中的通信,但是自旋操作在高并发场景下对cpu资源消耗大,并且由于协程都是自旋等待的,所以当一个协程拿到锁后,其它协程会发生阻塞,影响性能,通过Semaphore中维护休眠队列,对协程进行调度,防止全局阻塞
提高了协程间的调度效率,并且在信号同步的P中,go的阻塞是通过gopark实现的,gopark类似与time.sleep,是一种挂起机制,不会大量消耗cpu资源,所以说,go的锁中利用了Semaphore实现调度,也是它在高并发场景中一种优势的体现