解读精益求精的 Sync.Mutex

原文地址 juejin.cn

“互斥锁” 对于并发编程是必不可少的,Go 语言虽然推崇使用 Channel 来解决对并发资源的访问,但同样实现了 Sync.Mutex 互斥锁供编程人员使用。有人做过专门的统计,在知名的开源软件 Docker、Kubernutes、etcd、gRPC 中,使用 Mutex 的频率是最高的。Go 语言随着版本的迭代,对 Sync.Mutex 的实现也愈发精细化,当前版本的 Sync.Mutex 核心实现代码已经有 100 多行,其中大量使用了复杂位运算和流程控制来解决各种问题,因此使得 Sync.Mutex 源码达到了不可读的状态,本文从历史发展的角度剖析 Sync.Mutex,希望能帮助读者读懂 Sync.Mutex 的源码并掌握其设计思想。

  1. 初版互斥锁——先到先得

github 地址:初级版本的互斥锁

代码量很少,我们来分析一下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package sync

import (
    "runtime"
    "sync/atomic"
)


//互斥锁结构
type Mutex struct {
    key  int32
    sema uint32
}

//请求锁
func (m *Mutex) Lock() {
    if atomic.AddInt32(&m.key, 1) == 1 {    //标识加1,如果等于1,成功获取到锁
        return
    }
    runtime.Semacquire(&m.sema)     //否则阻塞等待
}


//释放锁
func (m *Mutex) Unlock() {
    switch v := atomic.AddInt32(&m.key, -1); {  //标识减1
    case v == 0:    //如果等于0,则没有等待者
        return
    case v == -1:   //如果等于-1,这种是异常情况,或者超过了最大可等待goroutine的数量
        panic("sync: unlock of unlocked mutex")
    }
    runtime.Semrelease(&m.sema) //唤醒其他阻塞的goroutine
}

初级版本的 Mutex 包含两个字段:

当 goroutine 调用 Lock 方法请求锁的时候,通过 atomic.AddInt32 方法原子性的给 key 加 1,如果比较幸运,当前没有等待者,那么 key 的值就会等于 1,成功获取到锁;如果锁已经被别的 goroutine 持有了,当前的 goroutine 会在将 key 加 1 的同时,调用 runtime.Semacquire 方法,使用信号量将自己休眠,等锁释放的时候,信号量会将它唤醒。释放锁的操作也比较简单,就是原子性的给 key 减 1,不过当减 1 之后值变成 - 1 的时候,程序就会 panic,例如没有请求锁,直接对锁进行释放就会 panic 这个错误;此外还有一种情况,Mutex 结构中的 key 是一个 int32 类型,它能表达的最大整数是 40 多亿,当争抢锁的 goroutine 数目达到这个阈值的时候,也会 panic,但是这种情况在服务器资源有限的情况下,是不可能会发生的。

我们先来探讨两个问题:

  • 为什么请求锁时,需要使用原子操作?
  • 为什么需要信号量?它的实现机制是什么?

1.1 原子操作的必要性

我们先来看一个简单技术器程序 counter.go。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import (
   "fmt"
   "sync"
)

func main() {
   //runtime.GOMAXPROCS(1)
   var counter int64
   var wg sync.WaitGroup
   wg.Add(2)
   for i := 0; i < 2; i++ {
      go func() {
         defer wg.Done()
         for j := 0; j < 10000; j++ {
            counter++  //atomic.AddInt64(&counter, 1)
         }
      }()
   }
   wg.Wait()
   fmt.Println(counter)
}

我们启动 2 个 goroutine 并发的累加 counter,每个 goroutine 将 counter 累加 1 万,运行程序大概率打印出来的 counter 值不会是 2 万,而是一个小于 2 万的数,每次运行结果都不一样。熟悉并发编程的读者应该早都看出了端倪,这段程序中存在竞态条件。counter++ 不是一个原子操作,它包含三个步骤:1. 读取 counter 变量当前值;2. 对 counter 当前值加 1,保存到寄存器临时变量中;3. 将临时变量的结果再保存到 counter 中。其中的每一个步骤,都有对应的汇编实现。

当我们使用一个 CPU 核心的时候 (使用 runtime.GOMAXPROCS(1) 设置),多次运行程序,似乎我们总能得到 2 万的准确计数,但是这并不是一个强保证,当我们使用go run -race counter.go竞态检测就大概率能检测到这段程序对 counter 变量的内存有非同步的并发读写:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
==================
WARNING: DATA RACE
Read at 0x00c000136008 by goroutine 8:
  main.main.func1()
      /Users/guozhaoran/goCode/basic/goConcurrent/mutex/example.go:17 +0x78

Previous write at 0x00c000136008 by goroutine 7:
  main.main.func1()
      /Users/guozhaoran/goCode/basic/goConcurrent/mutex/example.go:17 +0x91

Goroutine 8 (running) created at:
  main.main()
      /Users/guozhaoran/goCode/basic/goConcurrent/mutex/example.go:14 +0xe4

Goroutine 7 (finished) created at:
  main.main()
      /Users/guozhaoran/goCode/basic/goConcurrent/mutex/example.go:14 +0xe4
==================
复制代码

所以解决方案就是使用 atomic 包,它非常适合于这种全局单体变量的原子性加减,而这种原子性的实现是不同的 CPU 架构硬件提供的能力,通过 LOCK 汇编指令锁定数据总线来完成。对于 Mutex 的应用场景,原子操作当然是必不可少的,它保证了多个 goroutine 对共享变量 key 累加的一致性

1.2 信号量

信号量的概念是荷兰计算机科学家 Edsger Dijkstra 在 1963 年左右提出来的,广泛应用在不同的操作系统中。在系统中,会给每一个进程一个信号量,代表每个进程目前的状态。未得到控制权的进程,会在特定的地方被迫停下来,等待可以继续进行的信号到来。

Dijkstra 在他的论文中为信号量定义了两个操作 P 和 V。P 操作(descrease、wait、acquire)是减少信号量的计数值,而 V 操作(increase、signal、release)是增加信号量的计数值。使用伪代码表示如下(中括号代表原子操作):

1
2
3
4
5
6
7
8
function V(semaphore S, integer I): 
    [S  S + I]

function P(semaphore S, integer I): 
    repeat: 
        [if S  I: 
        S  S  I 
        break]

可以看到,初始化信号量 S 有一个指定数量(n)的资源,它就像是一个有 n 个资源的池子。P 操作相当于请求资源,如果资源可用,就立即返回;如果没有资源或者不够,那么,它可以不断尝试或者阻塞等待。V 操作会释放自己持有的资源,把资源返还给信号量。信号量的值除了初始化的操作以外,只能由 P/V 操作改变。

现在,我们来总结下信号量的实现。初始化信号量:

  • 设定初始的资源的数量。
  • P 操作:将信号量的计数值减去 1,如果新值已经为负,那么调用者会被阻塞并加入到等待队列中。否则,调用者会继续执行,并且获得一个资源。
  • V 操作:将信号量的计数值加 1,如果先前的计数值为负,就说明有等待的 P 操作的调用者。它会从等待队列中取出一个等待的调用者,唤醒它,让它继续执行。

在运行时,Go 内部使用信号量来控制 goroutine 的阻塞和唤醒。比如互斥锁的第二个字段 sema,信号量的 P/V 操作是通过函数实现的 (Go 内部运行时的信号量也是通过 atomic 和 gopark 实现的,具体实现可以看 runtime/sema.go)。

1
2
3
func runtime_Semacquire(s *uint32)
func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)
func runtime_Semrelease(s *uint32, handoff bool, skipframes int)

值得一提的是,Go 运行时的信号量实现的是一个优先级等待队列,这也是当前阶段 Mutex 饥饿模式实现的基础。信号量的 P/V 操作函数,可以将 goroutine 休眠后添加到优先级队列的头部或尾部;也可以从优先级队列的头部或尾部将 goroutine 取出唤醒。

  1. 互斥锁初步优化——给要抢锁的 goroutine 一次机会

解读完初版互斥锁的实现,读者可能会发现一个问题,当锁被持有的情况下,新到来争抢锁的 goroutine 直接被运行时的信号量休眠并添加到了优先队列中,虽然这样严格保证了锁争抢的先来先得顺序,但是 goroutine 的休眠和唤醒非常影响性能,针对这一点,Go 开发者对 Mutex 做了一次大的调整。

github 地址:互斥锁进一步优化

此时的 Mutex 结构如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
type Mutex struct {
    state int32
    sema  uint32
}

const (
    mutexLocked = 1 << iota // mutex is locked
    mutexWoken
    mutexWaiterShift = iota
)

Mutex 结构体的 key 被改成了 state,代表的含义也被拆分成了三个。

  • MutexLocked:state 的第一个位代表锁是否被持有
  • MutexWoken:state 的第二个位代表是否有唤醒的 goroutine
  • MutexWaiters:state 剩下的位代表的是等待此锁的 goroutine 数

在分析这个版本的代码之前,我们先来补充一个要用到的非常重要的知识点:CAS。

2.1 自旋锁 (CAS) 及其实现原理

CAS 指令的实现原理是将给定的值与内存中的值进行比较,如果是同一个值,就用新值替换掉内存中的值,然后返回。如果不是就返回第一步的比较,因此得名 “自旋锁”。下面画一个图来描述一下 CAS 算法,并和 Go 语言中的 atomic.CompareAndSwap 函数簇做一个比较:

有一点需要特别注意,CAS 中值的比较与交换过程是原子性的,这个过程中如果有其他的 goroutine 修改了内存中的值,那么 CAS 会返回 false

CAS 也是 Mutex 实现的基础,读者可能会有疑问:有了 CAS 为什么还要有信号量呢?其实无论是原子操作,还是自旋锁,都不适合长时间等待的情况,因为有很多资源(数据)它有一定的时间性,你想去获取它,CPU 并不能立即返回给你,而是要等待一段时间,才能把数据返回给你。这种情况,你用自旋锁来同步访问这种资源,你会发现这是对 CPU 时间的巨大浪费。当然 Mutex 的实现非常适合使用 CAS

2.2 互斥锁初步优化后的 Lock 实现

介绍完了 CAS,我们来看当前版本的 Lock 实现,直接上代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (m *Mutex) Lock() {
    // Fast path: 幸运case,能够直接获取到锁
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        return
    }

    awoke := false
    for {
        old := m.state
        new := old | mutexLocked    //新状态加锁
        if old&mutexLocked != 0 {
            new = old + 1<<mutexWaiterShift     //等待者数量加一
        }
        if awoke {
            //goroutine是被唤醒的,新状态清除唤醒标记
            new &^= mutexWoken
        }
        if atomic.CompareAndSwapInt32(&m.state, old, new) { //设置新状态
            if old&mutexLocked == 0 {   //锁原状态未加锁
                break
            }
            runtime.Semacquire(&m.sema)  //请求信号量
            awoke = true //设置唤醒标记
        }
    }
}

我们重点看一下对 state 的操作。首先通过 CAS 检测 Mutex 是否没有被 goroutine 持有并且没有等待者,如果是这样,那么当前 goroutine 很幸运,可以直接获取到锁,这也就是代码中标注的 Fast path。

如果当前 goroutine 不够幸运,那么会走到下边的循环检查阶段,for 循环不断尝试获取锁,如果获取不到,就通过 runtime.Semacquire(&m.sema) 休眠,休眠醒来之后 awoke 置为 true,尝试争抢锁。我们知道 state 有三个含义:

  • 通过new := old | mutexLocked设置 state 中的 mutexLocked,给 Mutex 加锁;
  • 通过new = old + 1<<mutexWaiterShift给 Mutex 设置 mutexWaiterShift,等待者加 1;
  • 通过new &^= mutexWoken给 Mutex 清除 Mutex 的唤醒标记。

那么接下来atomic.CompareAndSwapInt32(&m.state, old, new)执行成功说明给 state 设置了新值,就要区分两种情况考虑了,第一种情况是 state 新值中包含加锁成功了,那么直接 break,goroutine 抢到了锁,程序结束;否则只能说明 state 只是清除 mutexWoken 标志或者增加一个 waiter 而已。

这里的循环状态检查的代码有两种 goroutine 会同时执行:

  • 新来抢锁的 goroutine(可能有多个)
  • 从信号量优先队列中唤醒的 goroutine(最多只可能有一个)

上边的描述可能比较抽象,我们结合下边的流程图帮助理解一下吧:

2.3 互斥锁初步优化后的 Unlock 实现

Unlock 方法也变得复杂了,但是不像 Lock,仔细研究一下,还是能看得懂的,下边是代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (m *Mutex) Unlock() {
    // Fast path: drop lock bit.
    new := atomic.AddInt32(&m.state, -mutexLocked)  //去掉锁状态
    if (new+mutexLocked)&mutexLocked == 0 {     //未被锁定的mutex释放锁会panic
        panic("sync: unlock of unlocked mutex")
    }

    old := new
    for {
        //锁上没有goroutine等待或者有被唤醒的goroutine,或者又被别的goroutine加了锁,那么不需要做任何事情,返回即可
        if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 {
            return
        }
        //将mutexWaiterShift数量减1并设置mutexWoken为true
        new = (old - 1<<mutexWaiterShift) | mutexWoken
        if atomic.CompareAndSwapInt32(&m.state, old, new) {     //CAS设置成功,唤醒一个新的goroutine争抢锁即可
            runtime.Semrelease(&m.sema)
            return
        }
        old = m.state   //记录当前mutex的状态,继续循环
    }
}

Unlock 方法先定义一个新变量将锁标志去掉,如果对一个未加锁的 Mutex 进行 Unlock 会 panic,然后程序还需要进行一些额外的判断,并不能直接返回。下面情况之一调用 Unlock 的 goroutine 可以直接返回:

  • Mutex 上没有 waiter
  • Mutex 又被别人上了锁
  • 有 goroutine 被唤醒了

否则,先设置 Mutex 中 mutexWaiterShift 减 1,并标记 mutexWoken 为 true,使用 CAS 方法如果设置成功,则从信号量的优先队列中唤醒一个 goroutine,程序返回。否则记录当前 mutex 的状态,继续循环判断。直到返回为止。

相比较最初版本的设计,这个版本的 Sync.Mutex 实现主要是给新来的 goroutine 一次获取到锁的机会,打破了原来先来先得的逻辑,代码的复杂度也增加了不少。

  1. 互斥锁进一步优化——给要抢锁的 goroutine 更多机会

我们前边对 Sync.Mutex 的优化是基于一种猜想:新来的争抢锁的 goroutine 很大概率上能够获取到锁!持有锁的 goroutine 在持有锁时间越短的情况下,这种概率越大,那么我们为什么不给争抢锁的 goroutine 更多机会呢?也就是让它们稍微等一会,如果等一会也获取不到,那么就乖乖的添加进信号量的优先队列就好了。实际上,Go 官方团队也是这么做的。

gihub 地址:给要抢锁的 goroutine 更多机会

我们来看一下代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func (m *Mutex) Lock() {
    // Fast path: 幸运case,能够直接获取到锁
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        return
    }

    awoke := false
    iter := 0
    for {   // 不管是新来的请求锁的goroutine, 还是被唤醒的goroutine,都不断尝试请求锁
        old := m.state
        new := old | mutexLocked    //新状态加锁
        if old&mutexLocked != 0 {   // 锁还没被释放
            if runtime_canSpin(iter) {  // 还可以自旋
                if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                    atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                    awoke = true
                }
                runtime_doSpin()
                iter++
                continue    //自旋,再次尝试获取锁
            }
            new = old + 1<<mutexWaiterShift      //等待者数量加一
        }
        if awoke {  //唤醒状态,去掉标记
            new &^= mutexWoken
        }
        if atomic.CompareAndSwapInt32(&m.state, old, new) { //设置新状态
            if old&mutexLocked == 0 {   //锁原状态未加锁
                break
            }
            runtime_Semacquire(&m.sema) //请求信号量
            awoke = true    //设置信号量
            iter = 0    //重新设置自旋计数器
        }
    }
}

这次的优化很小,只增加了 runtime_canSpin 的检测,有一个自旋计数器 iter,如果抢锁的 goroutine 还能自旋的话,就自旋等待持有锁的 goroutine 释放锁,这样能够增大抢到锁的概率。

我想读者应该对代码中的这部分很迷惑:

1
2
3
4
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                    atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                    awoke = true
                }

这四个 && 符号确实让人眼花撩乱,不过结合上一小节对 Sync.Mutex 的解读,我们能分析出这段代码的用途。首先 && 是短路运算符,有一个为 false 判断就不会进行下去了。!awoke表示程序逻辑是由新加入抢锁的 goroutine 进来的,而不是从 sema 优先级队列中唤醒的 goroutine 进来的;再然后如果 Mutex 的旧值是没有唤醒新的 goroutine(old&mutexWoken == 0) 的并且有等待者 (old>>mutexWaiterShift != 0) 的话,就尝试通过 CAS 给 Mutex 设置一个唤醒标记 (atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken)),如果成功的话,将 awoke 设置为 true,这样做可以让调用 Unlock 的 goroutine 快速返回,而不用从优先队列中再唤醒 goroutine 来争抢锁。

经过这一版本的优化,我们可以看到,Sync.Mutex 对新加入抢锁的 goroutine 相当友好,表面上看这样似乎没有什么问题,能够让更多的 goroutine 在最短的时间内获取到锁。但是我们考虑一下那些一直在信号量优先队列中的等待者怎么办?锁有可能一直被新来的 goroutine 抢到,这就产生了 “饥饿问题”

  1. 终极版本的互斥锁——小康社会,不再饥饿

Sync.Mutex 的 “饥饿问题” 早在 Go 1.9 版本中就解决了,后续也进行了一些优化工作,到此 Sync.Mutex 才算是实现的比较完美。我们接下来会解读当前最新版本 Go1.17 的代码实现,读者要和我一起开启烧脑模式,细细的品一品了。

github 地址:终极版本的 Sync.Mutex

4.1 Mutex 结构体实现

为了解决饥饿问题,Mutex 结构体从 state 字段中又分出一个位标识当前 Mutex 是否饥饿,并定义了一个常量,将获取锁的 goroutine 等待时间设置了 1 毫秒阈值。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
type Mutex struct {
    state int32
    sema  uint32
}

const (
    mutexLocked = 1 << iota // mutex is locked
    mutexWoken
    mutexStarving   // 从state字段中分出一个饥饿标记
    mutexWaiterShift = iota

    starvationThresholdNs = 1e6   //1000000ns = 1ms
)

4.2 Sync.Mutex 终极版本的 Lock 函数实现

最新版本的 Sync.Mutex 的 Lock 方法和 Unlock 方法将 fast path 和 slow path 拆成独立的函数,以便内联,提高性能。本节我们先来看一下 Lock 方法的实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
func (m *Mutex) Lock() {
	// Fast path: 顺利的获取到锁
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
		if race.Enabled {
			race.Acquire(unsafe.Pointer(m))
		}
		return
	}
	// Slow path (缓慢之路,通过自旋、竞争或者饥饿状态下的锁竞争)
	m.lockSlow()
}

func (m *Mutex) lockSlow() {
	var waitStartTime int64
	starving := false       //标识当前goroutine是否饥饿
	awoke := false  //唤醒标记
	iter := 0   //自旋次数
	old := m.state  //当前的锁状态
	for {
		//锁是非饥饿状态,并且未释放,尝试自旋
		if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
			// 主动自旋的场景
			// 尝试设置 mutexWoken 标志以通知 Unlock 不唤醒其他阻塞的 goroutine
			if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
				atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
				awoke = true
			}
			runtime_doSpin()    //自旋
			iter++
			old = m.state
			continue
		}
		new := old
		// 不要尝试获取饥饿的互斥锁,新到达的 goroutine 必须排队
		if old&mutexStarving == 0 {
			new |= mutexLocked  //非饥饿状态,加锁
		}
		if old&(mutexLocked|mutexStarving) != 0 {   //饥饿状态,或者锁被抢占,等待者 + 1
			new += 1 << mutexWaiterShift
		}
		// 当前 goroutine 将互斥锁切换到饥饿模式。
		if starving && old&mutexLocked != 0 {
			new |= mutexStarving
		}
		if awoke {
			//清除awoke标识
			new &^= mutexWoken
		}
		if atomic.CompareAndSwapInt32(&m.state, old, new) {
			if old&(mutexLocked|mutexStarving) == 0 {
				break // 上锁成功
			}
			// 第一次等待,添加到信号量队列的队首
			queueLifo := waitStartTime != 0
			if waitStartTime == 0 {
				waitStartTime = runtime_nanotime()
			}
			runtime_SemacquireMutex(&m.sema, queueLifo, 1)
            //设置饥饿标记
			starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
			old = m.state
			if old&mutexStarving != 0 {
				//加锁并将waiter数量减1
				delta := int32(mutexLocked - 1<<mutexWaiterShift)
				if !starving || old>>mutexWaiterShift == 1 {
					//非饥饿状态的goroutine,最后一个waiter已经不饥饿了,清除标记
					delta -= mutexStarving
				}
				atomic.AddInt32(&m.state, delta)
				break
			}
			awoke = true
			iter = 0
		} else {
			old = m.state
		}
	}
}

代码结合注释也很难看懂,再结合流程图看一下。

获取锁的 goroutine 很幸运的话,可以通过 Fast path 很快获取锁,我们来一步步分析一下 lockSlow 函数的实现。 首先定义了当前 goroutine 用到的一些变量信息,然后使用 old 保存当前锁的状态。

1
2
3
4
5
var waitStartTime int64
	starving := false   //标识当前goroutine是否饥饿模式
	awoke := false  //唤醒标记,初次进入for循环为false,之后以从sema优先队列中唤醒的身份进入for循环
	iter := 0   //自旋次数,用来判断是否可以继续自旋获取锁
old := m.state  //当前的锁状态        

然后代码就进入到了 for 循环,lockSlow 的实现效果是尽可能少的循环,但是一定使获取锁的 goroutine 得到锁。for 循环中,当前抢锁的 goroutine 发现锁还没有被释放 (正常模式下),则调用 runtime_canSpin 自旋等待一会,期望锁能够被释放,这期间,程序还做了其他一些事情,比如通过设置 Mutex 的 awoke 状态,尽量使得 Unlock 程序不要再唤醒 sema 优先队列中的 goroutine 了,因为参与竞争锁的 goroutine 越多,得到锁的概率越小!

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
	for {
		//锁是非饥饿状态,并且未释放,尝试自旋
		if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
			// 主动自旋的场景
			// 尝试设置 mutexWoken 标志以通知 Unlock 不唤醒其他阻塞的 goroutine
			if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
				atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
				awoke = true
			}
			runtime_doSpin()    //自旋
			iter++
			old = m.state
			continue
		}

自旋等待之后,获取锁的 goroutine 命运有两个:

  • 当前锁已经被释放了,那么它就可以参与竞争了
  • 当前锁没有被释放,又或者它参与锁竞争失败了,那么它就要进入 sema 的优先队列了

不过不管结果如何,它都要走下边的通用逻辑,就是给 Mutex 设置新状态。有下边几个步骤:

  • 判断锁的状态是否是正常模式,是的话就设置 mutexLocked 标志准备抢锁
  • 如果锁的状态是没有被释放,或者是饥饿模式的话,当前 goroutine 一定要进 sema 优先队列了,这时候设置锁的等待者 + 1
  • 如果锁已经被标记为饥饿模式了,并且原来锁并没有被释放,那么将锁给打上饥饿的标记。有的读者可能会问了,为什么只有锁没有被释放的时候才打上饥饿标记呢?这是因为 Unlock 方法是根据锁是否为饥饿模式来从 sema 优先队列中唤醒 goroutine 的,也就是锁为饥饿模式时,优先队列中必须还有等待的 goroutine
  • 如果有 awoke 标记的话,将锁上的 mutexWoken 标记给擦除掉,因为不管当前 goroutine 最终进入 sema 优先队列还是获取到锁,新状态都应该清楚 awoke 标记。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
                new := old
		// 不要尝试获取饥饿的互斥锁,新到达的 goroutine 必须排队
		if old&mutexStarving == 0 {
			new |= mutexLocked  //非饥饿状态,加锁
		}
		if old&(mutexLocked|mutexStarving) != 0 {   //饥饿状态,或者锁被抢占,等待者 + 1
			new += 1 << mutexWaiterShift
		}
		// 当前 goroutine 将互斥锁切换到饥饿模式。
		if starving && old&mutexLocked != 0 {
			new |= mutexStarving
		}
		if awoke {
			//清除awoke标识
			new &^= mutexWoken
		}

设置状态的步骤走完了,接下来就是 CAS 了,有两个结局:CAS 成功 or CAS 失败。失败是成功之母,CAS 失败记录当前锁的状态,再次走 for 循环就好了,注意这时候不用更新 spin 计数,因为抢锁失败并没有进入 sema 优先队列!当然 CAS 成功了也并不是万事大吉了,如果锁是在正常情况下被上锁成功的话,那么恭喜当前 goroutine 成功获取到了锁;否则是一定要进入 sema 优先队列的,但是具体是被插入到头部还是尾部,则要分情况而定,这时候 waitStartTime 就发挥作用了,根据它不仅能判断出来抢锁的 goroutine 是否是第一次要被插入 sema 优先队列,还能判断当前锁状态是否已经达到饥饿阈值了。

我们先来说 goroutine 是否是第一次要被插入 sema 优先队列的情况,是插入到尾部,这样只能等到下一轮调度唤醒了;如果当前 goroutine 不是第一次插入到 sema 优先队列,程序会将它插入到头部,这样下一个唤醒的依然是它,增加了它获取到锁的概率!

计算当前锁状态模式的代码是 goroutine 从 sema 优先队列中被唤醒之后,根据 waitStartTime 的记录和上边 starving 的标识,如果当前 goroutine 等待锁时间超过了 1ms,那么当前 goroutine 就会将锁标记为饥饿 (注意这个时候并没有给 Mutex 打上饥饿的标识,只是标记为饥饿,如果接下来当前 goroutine 仍然获取不到锁,那么接下来的循环中会做这个事情)。这个时候再次判断锁的状态,如果锁为饥饿模式,那么就直接把锁交给当前请求锁的 goroutine 就好了,否则当前被唤醒的 goroutine 只能进行新一轮抢锁了 (重新设置自旋计数器)。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
if atomic.CompareAndSwapInt32(&m.state, old, new) {
			if old&(mutexLocked|mutexStarving) == 0 {
				break // 上锁成功
			}
			// 第一次等待,添加到信号量队列的队首
			queueLifo := waitStartTime != 0
			if waitStartTime == 0 {
				waitStartTime = runtime_nanotime()
			}
			runtime_SemacquireMutex(&m.sema, queueLifo, 1)
            //设置饥饿标记
			starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
			old = m.state
			if old&mutexStarving != 0 {
				//加锁并将waiter数量减1
				delta := int32(mutexLocked - 1<<mutexWaiterShift)
				if !starving || old>>mutexWaiterShift == 1 {
					//非饥饿状态的goroutine,最后一个waiter已经不饥饿了,清除标记
					delta -= mutexStarving
				}
				atomic.AddInt32(&m.state, delta)
				break
			}
			awoke = true
			iter = 0
		} else {
			old = m.state
		}

以上就是终极版本 Mutex 的 Lock 函数实现,接下来回答几个常见问题:

    1. 如果 Mutex 已经被标记成为 “饥饿模式” 了,什么时候会变为 “正常模式” 呢?

拥有 Mutex 的 waiter 发现下面两种情况的其中之一,它就会把这个 Mutex 转换成正常模式;第一此 waiter 已经是队列中的最后一个 waiter 了 (通过代码!starving || old>>mutexWaiterShift == 1判断),没有其它的等待锁的 goroutine 了;第二是此 waiter 的等待时间小于 1 毫秒 (通过代码starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs判断)。

    1. 为什么要有饥饿模式?

饥饿模式是对公平性和性能的一种平衡,它避免了某些 goroutine 长时间的等待锁。在饥饿模式下,优先对待的是那些一直在等待的 waiter。正常模式拥有更好的性能,因为即使有等待抢锁的 waiter,goroutine 也可以连续多次获取到锁。

    1. 饥饿模式下 Mutex 是如何工作的?

饥饿模式下,会直接把锁交给队列第一个 goroutine。这块再代码中有体现。

本节内容建议读着多读几遍,最好是从文章开始顺着历史发展的脉络思考,如果还有什么问题,可以在留言区和我一块交流。

我们再接着看 Unlock 方法。

4.3 Sync.Mutex 终极版本的 Unlock 函数实现

相对于 Lock 函数,Unlock 函数的代码逻辑比较好理解,我们先看一下流程图:

代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (m *Mutex) Unlock() {
	// Fast path: 将锁标记去掉
	new := atomic.AddInt32(&m.state, -mutexLocked)
	if new != 0 {   //还需要做其他的事
		m.unlockSlow(new)
	}
}

func (m *Mutex) unlockSlow(new int32) {
    //无锁的mutex释放锁会panic
	if (new+mutexLocked)&mutexLocked == 0 {
		throw("sync: unlock of unlocked mutex")
	}
	if new&mutexStarving == 0 {
		old := new
		for {
			//锁上没有goroutine等待或者有被唤醒的goroutine改变了锁的状态,直接return即可
			if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
				return
			}
			// 减少等待者并设置Mutex唤醒标记,CAS释放锁
			new = (old - 1<<mutexWaiterShift) | mutexWoken
			if atomic.CompareAndSwapInt32(&m.state, old, new) {
				runtime_Semrelease(&m.sema, false, 1)   //锁在正常模式下从sema优先队列尾部唤醒新的goroutine
				return
			}
			old = m.state
		}
	} else {
		//饥饿模式下,从优先队列的头部唤醒等待的goroutine,Lock方法会直接将锁给它
		runtime_Semrelease(&m.sema, true, 1)
	}
}

先将锁标记去掉,但是 state 其他字段并不为 0 的话,还需要做一些额外的工作,这就是unlockSlow的代码逻辑,如果是饥饿模式的话,从优先队列的头部唤醒一个 goroutine,分析 Lock 代码的时候我们知道,饥饿模式下会将锁直接给这个唤醒的 goroutine; 正常模式下进入 for 循环,如果锁的的其他状态已经被改变的情况下,unlockSlow 什么也不需要做,return 即可,否则将锁等待者数量 - 1,并设置唤醒标识,CAS 操作成功之后,从 sema 优先队列尾部唤醒一个 goroutine 参与到抢锁工作中。

以上就是 Sync.Mutex 终极版本的全部实现了。下边我们再来看一下 Sync.Mutex 使用中都有哪些坑。

  1. Sync.Mutex 使用时常见的坑

关于 Mutex 的使用,一不小心就会产生死锁或者 panic,使用不得当会有很大的性能开销,接下来我们结合上边的源码分析一下。

  • Mutex 对 goroutine 无状态性

看过源代码,我们不难看出不同的 goroutine 操作 Mutex 这一个全局变量是没有状态记录的,这样会出现两种情况:1. 一个 goroutine 可以释放掉另一个 goroutine 的锁;2.goroutine 一旦重入设置两次 Lock,就会死锁;建议使用 Mutex 时,Lock/Unlock 要成对出现,最好是封装到一个函数中,使用 defer 是一个好的方案。也可以对 Mutex 的无状态性做一个封装,例如实现锁的重入,添加一些锁的检测机制等等。

  • Mutex 千万不能被复制

这里所说的千万不能被复制并不是 Go 语言在语法上做了限制,而是在使用过程中,为了避免不必要的麻烦,不要拷贝 Mutex,从源码分析我们看到,Mutex 可以被千千万万个 goroutine 使用,拷贝 Mutex 时,只是拷贝的一个临时状态而已。复制之后,一个新 Mutex 可能莫名处于持有锁、唤醒或者饥饿状态,甚至等阻塞等待数量远远大于 0。而原锁 Unlock 的时候,却不会影响复制锁。

关于锁复制后产生的严重后果,推荐阅读:当 Go struct 遇上 Mutex

  • 产生强烈的锁竞争时怎么办

当线上 QPS 很高时,如果使用了 Mutex,可能会发现大量的 goroutine 阻塞在 Mutex 的 Lock 函数上,这个时候就要想办法优化程序,首先应该尽量少使用 Mutex,如果非要使用,可以采用分片管理数据的方式,Mutex 保护的程序逻辑不应该过于复杂,因为大量 goroutine 阻塞在 Mutex 的 Lock 函数上会产生饥饿,饥饿带来的问题是性能更差,不能更快的恢复到正常模式将是一场灾难。

关于锁竞争的优化,推荐阅读:一次错误使用 go-cache 导致出现的线上问题

  1. 小结

DDD 设计中有关战略模型的分析指出,每一个复杂系统都有一个演进的过程,我们不要期望一次就将它做的很好。我觉得这个观点应用在 Go 语言的 Sync.Mutex 设计上非常恰当,“罗马不是一天建成的”,我们不得不佩服 Sync.Mutex 开发者的匠心精神,一次次打磨,精益求精的实现了现在的 Sync.Mutex,虽然它可能还不够完美,还能够继续雕琢,但是这个发现问题并解决问题的思路和精神,是我们应该学习的。本文还分析了很多关于 Sync.Mutex 实现的基础知识:原子性、信号量、CAS;还指出了使用 Mutex 时应该注意的问题,希望感兴趣的读者在留言区继续和我交流!

Reference:

Licensed under CC BY-NC-SA 4.0
this is the way