0%

net/http 路由解析

在了解gin的路由解析过程之前,我们有必要先了解下go的net/http的工作流程

首先,http是如何建立起来的?
img.png

http.ListenAndServe的代码执行流程中,我们可以获取到上面的代码执行流程:

创建 socket

1
2
3
4
5
6
7
8
func (srv *Server) ListenAndServe() error {
// ... 省略代码
ln, err := net.Listen("tcp", addr) // <-----看这里listen
if err != nil {
return err
}
return srv.Serve(tcpKeepAliveListener{ln.(*net.TCPListener)})
}

Accept 等待客户端链接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// net/http/server.go:L2805-2853
func (srv *Server) Serve(l net.Listener) error {
// ... 省略代码
for {
rw, e := l.Accept() // <----- 看这里accept
if e != nil {
select {
case <-srv.getDoneChan():
return ErrServerClosed
default:
}
if ne, ok := e.(net.Error); ok && ne.Temporary() {
if tempDelay == 0 {
tempDelay = 5 * time.Millisecond
} else {
tempDelay *= 2
}
if max := 1 * time.Second; tempDelay > max {
tempDelay = max
}
srv.logf("http: Accept error: %v; retrying in %v", e, tempDelay)
time.Sleep(tempDelay)
continue
}
return e
}
tempDelay = 0
c := srv.newConn(rw)
c.setState(c.rwc, StateNew) // before Serve can return
go c.serve(ctx) // <--- 看这里
}
}
3. 提供回调

read or write

c.serve中执行c.readRequest会调用read读取请求数据

1
2
3
4
5
6
7
8
9
10
11
12
func (c *conn) serve(ctx context.Context) {
// ...省略代码
for {
w, err := c.readRequest(ctx)
if c.r.remain != c.server.initialReadLimitSize() {
// If we read any bytes off the wire, we're active.
c.setState(c.rwc, StateActive, runHooks)
}
// ...省略代码
serverHandler{c.server}.ServeHTTP(w, w.req)
}
}

ServeHTTP中的ResponseWriter会调用write返回响应数据给客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (mux *ServeMux) ServeHTTP(w ResponseWriter, r *Request) {
handler := sh.srv.Handler
if handler == nil {
handler = DefaultServeMux
}
if r.RequestURI == "*" {
if r.ProtoAtLeast(1, 1) {
w.Header().Set("Connection", "close")
}
w.WriteHeader(StatusBadRequest)
return
}
h, _ := mux.Handler(r) // <--- 看这里
h.ServeHTTP(w, r)
}

这基本是整个过程的代码了

  1. ln, err := net.Listen("tcp", addr)做了初试化了socket, bind, listen的操作.
  2. rw, e := l.Accept()进行accept, 等待客户端进行连接
  3. go c.serve(ctx) 启动新的goroutine来处理本次请求. 同时主goroutine继续等待客户端连接, 进行高并发操作
  4. h, _ := mux.Handler(r) 获取注册的路由, 然后拿到这个路由的handler, 然后将处理结果返回给客户端

路由注册

net/http是通过server mux来管理路由解析的

先从最基本的路由注册实例开始

1
2
3
http.HandleFunc("/api/hello/", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("Hello World"))
})
1
2
3
func HandleFunc(pattern string, handler func(ResponseWriter, *Request)) {
DefaultServeMux.HandleFunc(pattern, handler)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (mux *ServeMux) Handle(pattern string, handler Handler) {
mux.mu.Lock()
defer mux.mu.Unlock()

// ...省略代码

if mux.m == nil {
mux.m = make(map[string]muxEntry)
}
e := muxEntry{h: handler, pattern: pattern}
mux.m[pattern] = e
if pattern[len(pattern)-1] == '/' {
mux.es = appendSorted(mux.es, e)
}

if pattern[0] != '/' {
mux.hosts = true
}
}

通过上述追踪可以看到mux将api/hello注册到mux.m中,mux.m是一个加锁的map,也就是通过map的key存储具体的路由前缀(parttern)来匹配对应的路由

1
2
3
4
5
6
type ServeMux struct {
mu sync.RWMutex
m map[string]muxEntry
es []muxEntry // slice of entries sorted from longest to shortest.
hosts bool // whether any patterns contain hostnames
}

在下面这段代码中可以看到,mux会分析注册进来的新路由是否以’/‘结尾,如果是会加该parttern入到es中

1
2
3
4
mux.m[pattern] = e
if pattern[len(pattern)-1] == '/' {
mux.es = appendSorted(mux.es, e)
}

appendSorted详细逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func appendSorted(es []muxEntry, e muxEntry) []muxEntry {
n := len(es)
i := sort.Search(n, func(i int) bool {
return len(es[i].pattern) < len(e.pattern)
})
if i == n {
return append(es, e)
}
// we now know that i points at where we want to insert
es = append(es, muxEntry{}) // try to grow the slice in place, any entry works.
copy(es[i+1:], es[i:]) // Move shorter entries down
es[i] = e
return es
}

在es中,api前缀的加入是根据parttern的len递减顺序来进行存储的

1
2
3
4
5
6
7
8
9
10
11
12
http.HandleFunc("/api/hello/", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("Hello World"))
})
http.HandleFunc("/api/hello1/", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("Hello World"))
})
http.HandleFunc("/api/hello12/", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("Hello World"))
})
http.HandleFunc("/api/hello3/", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("Hello World"))
})

es append的输出过程是这样的

1
2
3
4
[{0x1315f00 /api/hello/}]
[{0x1315fa0 /api/hello1/} {0x1315f00 /api/hello/}]
[{0x1316040 /api/hello12/} {0x1315fa0 /api/hello1/} {0x1315f00 /api/hello/}]
[{0x1316040 /api/hello12/} {0x1315fa0 /api/hello1/} {0x13160e0 /api/hello3/} {0x1315f00 /api/hello/}]

为什么要按照这种顺序呢,当注册一个以/为结尾的路由,mux会尽可能去按照最大长度来匹配路由规则,如果匹配到了,即使后面的部分还有其他字符,也会按照当前匹配的handler去处理逻辑
所以,对于/api/hello//api/hello/1111拿到的handler是一样的,而不是以/为结尾注册后的路由,会按照全部parttern去匹配

自定义路由注册

通过net/http包的路由管理模式我们可以知道,net/http的路由匹配根本就不符合 RESTful 的规则,遇到稍微复杂一点的需求时,这个简单的路由匹配规则简直就是噩梦。
所以net/http提供了自定义路由管理模式

1
2
3
if err := http.ListenAndServe(":8000", nil); err != nil {
fmt.Println("start http server fail:", err)
}

ListenAndServe的第二个参数会传递用户自定义的handler,如果为nil,那么会使用DefaultServeMux也就是net/http的路由管理模式来进行管理

在下面的这段代码有具体逻辑展示

1
2
3
4
5
6
7
func (mux *ServeMux) ServeHTTP(w ResponseWriter, r *Request) {
handler := sh.srv.Handler
if handler == nil {
handler = DefaultServeMux
}
// ...省略
}

我们可以总结下net/http的请求处理流程
img_2.png

gin正是通过注册了自己的路由handler,即engine.Handler()来实现自定义管理

1
2
3
4
5
6
7
8
9
10
11
12
13
func (engine *Engine) Run(addr ...string) (err error) {
defer func() { debugPrintError(err) }()

if engine.isUnsafeTrustedProxies() {
debugPrint("[WARNING] You trusted all proxies, this is NOT safe. We recommend you to set a value.\n" +
"Please check https://pkg.go.dev/github.com/gin-gonic/gin#readme-don-t-trust-all-proxies for details.")
}

address := resolveAddress(addr)
debugPrint("Listening and serving HTTP on %s\n", address)
err = http.ListenAndServe(address, engine.Handler())
return
}

gin的请求处理流程
img_1.png

所以我们只需要关心engine的实现和engineServerHttp实现,数据请求流程还是跟net/http是一样的

1
2
3
4
5
6
7
8
9
10
11
// ServeHTTP conforms to the http.Handler interface.
func (engine *Engine) ServeHTTP(w http.ResponseWriter, req *http.Request) {
c := engine.pool.Get().(*Context)
c.writermem.reset(w)
c.Request = req
c.reset()

engine.handleHTTPRequest(c)

engine.pool.Put(c)
}

ServeHTTP的实现流程可以总结为:

  1. 从 sync.pool 里面拿去一块内存
  2. 对这块内存做初始化工作,防止数据污染
  3. 处理请求 handleHTTPRequest
  4. 请求处理完成后,把这块内存归还到 sync.pool 中

RESTful

目前业界 Server 端 API 接口的设计方式一般是遵循 RESTful 风格的规范

1
2
3
4
GET    /user/{userID} HTTP/1.1
POST /user/{userID} HTTP/1.1
PUT /user/{userID} HTTP/1.1
DELETE /user/{userID} HTTP/1.1

这是比较规范的 RESTful API设计,分别代表:

  • 获取 userID 的用户信息
  • 更新 userID 的用户信息(当然还有其 json body,没有写出来)
  • 创建 userID 的用户(当然还有其 json body,没有写出来)
  • 删除 userID 的用户

gin 路由设计

前缀树和基数树(radix tree)

前缀树是一个多叉树,广泛应用于字符串搜索,每个树节点存储一个字符,从根节点到任意一个叶子结点串起来就是一个字符串。

radix tree是优化之后的前缀树,对空间进一步压缩,从上往下提取公共前缀,非公共部分存到子节点,这样既节省了空间,同时也提高了查询效率(左边字符串sleep查询需要5步, 右边只需要3步),Gin的路由树就是用radix tree实现的。
img.png

Gin为每一种请求都维护了一个radix tree,不同的请求会被解析并送到对应的radix tree进行处理。

img_1.png

gin的路由树的一些相关结构体介绍

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
type Engine struct {
RouterGroup
....
trees methodTrees //保存着每种请求的路有树
}
//RouterGroup可以被看作的内部的配置路由
type RouterGroup struct {
Handlers HandlersChain //处理函数链,用Default初始化时会加入Logger(), Recovery()
basePath string //基础路径,用Default和New初始化时都被赋值为"/"
engine *Engine // Gin引擎,在添加路由时会用到
root bool
}
//树节点
type node struct {
path string //保存节点的路径,像上面radix tree图中节点里面的值
indices string //子节点的首字符根据priority排列组成的字符串,为了方便遍历
wildChild bool //标识孩子节点是否有通配符
nType nodeType //节点类型
priority uint32 //优先级
children []*node
handlers HandlersChain //处理函数链
fullPath string //保存完整路径
}

// node结构中nodeType节点类型
const (
static nodeType = iota // default 普通节点,默认
root // 根节点
param // 参数路由,比如 /user/:id
catchAll // 匹配所有内容的路由,比如 /article/*key
)

gin 注册路由的时候,会根据不同的 Method 分别注册不同的路由树。

如上面的restful请求会注册四颗路由树出来。

1
2
3
4
5
6
7
8
9
10
11
func (engine *Engine) addRoute(method, path string, handlers HandlersChain) {
//....
root := engine.trees.get(method)
if root == nil {
root = new(node)
root.fullPath = "/"
engine.trees = append(engine.trees, methodTree{method: method, root: root})
}
root.addRoute(path, handlers)
// ...
}

流程

  • 拿到一个 method 方法时,去 trees slice 中遍历
  • 如果 trees slice 存在这个 method, 则这个URL对应的 handler 直接添加到找到的路由树上
  • 如果没有找到,则重新创建一颗新的方法树出来, 然后将 URL对应的 handler 添加到这个路由 树上

这里的重点是根节点root调用的addRoute,添加节点的逻辑都在这个函数里面,包括找到插入的位置,一些通配节点的特殊处理等。在这个函数里面会用到一些工具函数,这里一并介绍一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
// Increments priority of the given child and reorders if necessary
// 增加指定孩子节点的优先级,并更新节点的indices
// 这并不会影响路由功能,但是可以加快孩子节点的查找速度
func (n *node) incrementChildPrio(pos int) int {
cs := n.children
cs[pos].priority++
prio := cs[pos].priority

// Adjust position (move to front)
// 将更新后的priority向前移动,保持按优先级降序排列
newPos := pos
for ; newPos > 0 && cs[newPos-1].priority < prio; newPos-- {
// Swap node positions
cs[newPos-1], cs[newPos] = cs[newPos], cs[newPos-1]
}

// Build new index char string
// 根据优先级重新构建indices,indices保存着当前节点下的每个孩子节点的首字符
if newPos != pos {
n.indices = n.indices[:newPos] + // Unchanged prefix, might be empty
n.indices[pos:pos+1] + // The index char we move
n.indices[newPos:pos] + n.indices[pos+1:] // Rest without char at 'pos'
}

return newPos
}

// 获两个字符串的最长公共前缀
func longestCommonPrefix(a, b string) int {
i := 0
max := min(len(a), len(b))
for i < max && a[i] == b[i] {
i++
}
return i
}

// addRoute adds a node with the given handle to the path.
// Not concurrency-safe!
func (n *node) addRoute(path string, handlers HandlersChain) {
fullPath := path
n.priority++

// Empty tree
if len(n.path) == 0 && len(n.children) == 0 {
n.insertChild(path, fullPath, handlers)
n.nType = root
return
}

parentFullPathIndex := 0

walk:
for {
// Find the longest common prefix.
// This also implies that the common prefix contains no ':' or '*'
// since the existing key can't contain those chars.
// 获取最长公共前缀 path代表新加路由节点的path,n.path代表当前node的path
i := longestCommonPrefix(path, n.path)

// Split edge
// 如果n.path=/abc path=/a这种的,最长公共前缀小于n.path 则进入if语句,提取公共部分为父节点
// 非公共部分为子节点,即n.path=/a 子节点/bc
// /bc保存原来节点n的信息,/a为新节点的信息
if i < len(n.path) {
child := node{
path: n.path[i:],//非公共的部分
wildChild: n.wildChild,
indices: n.indices,
children: n.children,
handlers: n.handlers,
// 由于是n的孩子节点,故优先级减去n节点,即n-1
priority: n.priority - 1,
fullPath: n.fullPath,
}

n.children = []*node{&child}
// []byte for proper unicode char conversion, see #65
n.indices = bytesconv.BytesToString([]byte{n.path[i]})
n.path = path[:i]
n.handlers = nil // 目前先置为空,后面会再加上handlers
n.wildChild = false
n.fullPath = fullPath[:parentFullPathIndex+i]
}

// Make new node a child of this node
// 例如n.path=/a path=/abc 新加进来的path比当前n.path长,则进入if语句
// 在当前节点创建一个新的子节点
if i < len(path) {
path = path[i:]
// 如果n节点的孩子节点有通配符进入if,这里的逻辑会有点绕
if n.wildChild {
parentFullPathIndex += len(n.path)
//注意,此时n已经指向它的第一个孩子节点
n = n.children[0]
n.priority++

// Check if the wildcard matches
// 注意,此时的path已经取成了公共前缀后的部分
// 例如原来的路径是/usr/:name,假设当前n节点的父节点为nfather
// 而n在前面已经取成了nfather孩子节点
// 目前情况是nfather.path=/usr,由于其子节点是通配符节点
// 故nfather.wildChild=true,n.path=/:name
// 假设新加进来的节点path=/:nameserver
// 则符合这里的if条件,跳转到walk,以n为父节点继续匹配
if len(path) >= len(n.path) && n.path == path[:len(n.path)] &&
// Adding a child to a catchAll is not possible
// 不可能在全匹配节点(例如*name)后继续加子节点
n.nType != catchAll &&
// Check for longer wildcard, e.g. :name and :names
(len(n.path) >= len(path) || path[len(n.path)] == '/') {
continue walk
}

pathSeg := path
if n.nType != catchAll {
pathSeg = strings.SplitN(path, "/", 2)[0]
}
prefix := fullPath[:strings.Index(fullPath, pathSeg)] + n.path
panic("'" + pathSeg +
"' in new path '" + fullPath +
"' conflicts with existing wildcard '" + n.path +
"' in existing prefix '" + prefix +
"'")
}
// 注意,此时的path已经取成了公共前缀的后部分
c := path[0]

// slash after param
// 如果当前节点是参数节点类型,比如n.path= :name
// 且c= / ,且n节点仅有一个孩子节点
if n.nType == param && c == '/' && len(n.children) == 1 {
parentFullPathIndex += len(n.path)
//当前节点等于子节点
n = n.children[0]
//优先级加1
n.priority++
continue walk
}

// Check if a child with the next path byte exists
// 循环查找,n.indices记录着所有孩子节点的第一个字符
for i, max := 0, len(n.indices); i < max; i++ {
//如果找到和当前要插入节点的第一个字符相符,匹配成功
if c == n.indices[i] {
parentFullPathIndex += len(n.path)
// 对应子节点优先级加1,并对该子节点的indices重新排列
i = n.incrementChildPrio(i)
n = n.children[i]
continue walk
}
}

// Otherwise insert it
// 如果添加的节点既不是 * 也不是: 这样的通配节点,就执行插入
if c != ':' && c != '*' {
// []byte for proper unicode char conversion, see #65
n.indices += bytesconv.BytesToString([]byte{c})
child := &node{
fullPath: fullPath,
}
n.children = append(n.children, child)
n.incrementChildPrio(len(n.indices) - 1)
n = child
}
n.insertChild(path, fullPath, handlers)
return
}

// Otherwise and handle to current node
if n.handlers != nil {
panic("handlers are already registered for path '" + fullPath + "'")
}
n.handlers = handlers
n.fullPath = fullPath
return
}
}

Priority 优先级

为了能快速找到并组合完整的路由,GIN 在添加路由的同时,会在每个节点中添加 Priority 这个属性。在查找时根据 Priority 进行排序,常用节点(通过次数理论最多的节点) 在最前,并且同一层级里面 Priority 值越大,越优先进行匹配。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
// Search for a wildcard segment and check the name for invalid characters.
// Returns -1 as index, if no wildcard was found.
// wildcard-通配符字符串(例如:name,wildcard就为name) i-通配符在path的索引 valid-是否有合法的通配符
func findWildcard(path string) (wildcard string, i int, valid bool) {
// Find start
for start, c := range []byte(path) {
// A wildcard starts with ':' (param) or '*' (catch-all)
if c != ':' && c != '*' {
continue
}

// Find end and check for invalid characters
valid = true
for end, c := range []byte(path[start+1:]) {
switch c {
case '/':
return path[start : start+1+end], start, valid
case ':', '*': //一个通配符后还有一个通配符,valid置为false
valid = false
}
}
return path[start:], start, valid
}
return "", -1, false
}
//插入子节点
func (n *node) insertChild(path string, fullPath string, handlers HandlersChain) {
for {
// Find prefix until first wildcard
wildcard, i, valid := findWildcard(path)
if i < 0 { // No wildcard found
break
}

// The wildcard name must not contain ':' and '*'
if !valid {
panic("only one wildcard per path segment is allowed, has: '" +
wildcard + "' in path '" + fullPath + "'")
}

// check if the wildcard has a name
if len(wildcard) < 2 {
panic("wildcards must be named with a non-empty name in path '" + fullPath + "'")
}

// Check if this node has existing children which would be
// unreachable if we insert the wildcard here
// 检查此节点是否有现有子节点,如果有子节点,我们在插入通配符,将没办法再访问这些子节点
if len(n.children) > 0 {
panic("wildcard segment '" + wildcard +
"' conflicts with existing children in path '" + fullPath + "'")
}

if wildcard[0] == ':' { // param
if i > 0 {
// Insert prefix before the current wildcard
// 当前节点n保存通配符的前面部分
n.path = path[:i]
path = path[i:]
}
//wildChild设为true表示子节点有通配符
n.wildChild = true
child := &node{
nType: param,
path: wildcard,
fullPath: fullPath,
}
n.children = []*node{child}
n = child
n.priority++

// if the path doesn't end with the wildcard, then there
// will be another non-wildcard subpath starting with '/'
// 如果通配符后面还有字符,则一定以/为开头
// 例如/:name/age 通配符后还有/age
// 这里的英文注释说“将会有另一个以'/'开头的非通配符子路径”
// 这不代表不能处理/:name/*hobby这种,上面已经展示了会将通配符的前面部分
// 设为父节点,也就是说通配符节点的父节点一定是一个非通配符节点,英文的注释应该这么理解的
if len(wildcard) < len(path) {
path = path[len(wildcard):]

child := &node{
priority: 1,
fullPath: fullPath,
}
n.children = []*node{child}
n = child
continue
}

// Otherwise we're done. Insert the handle in the new leaf
n.handlers = handlers
return
}

// catchAll
// 通配符不是:那么就是*,因为*是全匹配的通配符,那么这种情况是不允许的/*name/pwd
if i+len(wildcard) != len(path) {
panic("catch-all routes are only allowed at the end of the path in path '" + fullPath + "'")
}
// 如果当前节点的最后一个字符是/,则与全匹配通配符*冲突
if len(n.path) > 0 && n.path[len(n.path)-1] == '/' {
panic("catch-all conflicts with existing handle for the path segment root in path '" + fullPath + "'")
}

// currently fixed width 1 for '/'
i--
if path[i] != '/' {
panic("no / before catch-all in path '" + fullPath + "'")
}

n.path = path[:i]

// First node: catchAll node with empty path
// *可以匹配0个或多个字符,第一个节点保存为空,也就是*匹配0个字符的情况
child := &node{
wildChild: true,
nType: catchAll,
fullPath: fullPath,
}

n.children = []*node{child}
n.indices = string('/')
n = child
n.priority++

// second node: node holding the variable
// 匹配多个字符的情况
child = &node{
path: path[i:],
nType: catchAll,
handlers: handlers,
priority: 1,
fullPath: fullPath,
}
n.children = []*node{child}

return
}

// If no wildcard was found, simply insert the path and handle
n.path = path
n.handlers = handlers
n.fullPath = fullPath
}

摘自Gin源码解析(一)

Go语言Slice是否线程安全

Go语言实现线程安全常用的几种方式:1.互斥锁;2.读写锁;3.原子操作;4.sync.once;5. sync.atomic;6.channel
slice底层结构并没有使用加锁等方式,不支持并发读写,所以并不是线程安全的,使用多个goroutine对类型为slice的变量进行操作,每次输出的值大概率都不会一样,与预期值不一致; slice在并发执行中不会报错,但是数据会丢失。

slice内存泄漏分析

(1)发生场景:截取长slice中的一段导致长slice未释放

由于底层都是数组,如果截长slice的一段,其实相当于引用了底层数组中的一小段。只要还有引用,golang的gc就不能回收数组。这种情况导致未使用的数组空间,未及时回收。

解决方案:新建一个长度为0的slice,将需要的一小段slice使用append方法添加到新的slice。再将原来的slice置为nil。

2)发生场景:没有重置丢失的子切片元素中的指针

没有及时将不再使用的slice置为nil

解决方案:如果slice中包含很多元素,再只有一小部分元素需要使用的情况下。建议重新分配一个slice将需要保留的元素加入其中,将原来的长slice整个置为nil。

Golang Slice 的底层实现

切片是基于数组实现的,它的底层是数组,它自己本身非常小,可以理解为对 底层数组的抽象。因为基于数组实现,所以它的底层的内存是连续分配的,效 率非常高,还可以通过索引获得数据。

切片本身并不是动态数组或者数组指针。它内部实现的数据结构通过指针引用 底层数组,设定相关属性将数据读写操作限定在指定的区域内。切片本身是一 个只读对象,其工作机制类似数组指针的一种封装。

切片对象非常小,是因为它是只有 3 个字段的数据结构:

  • 指向底层数组的指针

  • 切片的长度

  • 切片的容量

golang里的数组和切片

数组长度是固定的,而切片是可变长的。可以把切片看作是对底层数组的封装,每个切片的底层数据结构中,一定会包含一个数组。数组可以被称为切片的底层数组,切片也可以被看作对数组某一连续片段的引用。因此,Go中切片属于引用类型,而数组属于值类型,通过内建函数len,可以取得数组和切片的长度。通过内建函数cap,可以得到数组和切片的容量。但是数组的长度和容量是相等的,并且都不可变,而且切片容量是有变化规律的。

对已经关闭的channel进行读写操作会发生什么?

  1. 读已关闭的channel
    读已经关闭的channel无影响。
    如果在关闭前,通道内部有元素,会正确读到元素的值;
    如果关闭前通道无元素,则会读取到通道内元素类型对应的零值。
    若遍历通道,如果通道未关闭,读完元素后,会报死锁的错误。
    fatal error: all goroutines are asleep - deadlock!

  2. 写已关闭的通道
    会引发panic: send on closed channel

  3. 关闭已关闭的通道
    会引发panic: close of closed channel

总结:对于一个已初始化,但并未关闭的通道来说,收发操作一定不会引发 panic。但是通道一旦关闭,再对它进行发送操作,就会引发 panic。如果我们试图关闭一个已经关闭了的通道,也会引发 panic。

go struct 能不能比较

需要具体情况具体分析,如果struct中含有不能被比较的字段类型,就不能被比较,如果struct中所有的字段类型都支持比较,那么就可以被比较。

不可被比较的类型:
① slice,因为slice是引用类型,除非是和nil比较
② map,和slice同理,如果要比较两个map只能通过循环遍历实现
③ 函数类型

其他的类型都可以比较。

还有两点值得注意:

结构体之间只能比较它们是否相等,而不能比较它们的大小。
只有所有属性都相等而属性顺序都一致的结构体才能进行比较。

数组是如何实现根据下标随机访问数组元素的吗?

例如: a := [10]int{0}

  • 计算机给数组a,分配了一组连续的内存空间。
  • 比如内存块的首地址为 base_address=1000。
  • 当计算给每个内存单元分配一个地址,计算机通过地址来访问数据。当计算机需要访问数组的某个元素的时候,会通过一个寻址公式来计算存储的内存地址。

GMP模型

  • G(Goroutine):G 就是我们所说的 Go 语言中的协程 Goroutine 的缩写,相当于操作系统中的进程控制块。其中存着 goroutine 的运行时栈信息,CPU 的一些寄存器的值以及执行的函数指令等。
  • M(Machine):代表一个操作系统的主线程,对内核级线程的封装,数量对应真实的 CPU 数。一个 M 直接关联一个 os 内核线程,用于执行 G。M 会优先从关联的 P 的本地队列中直接获取待执行的 G。M 保存了 M 自身使用的栈信息、当前正在 M上执行的 G 信息、与之绑定的 P 信息。
  • P(Processor):Processor 代表了 M 所需的上下文环境,代表 M 运行 G 所需要的资源。是处理用户级代码逻辑的处理器,可以将其看作一个局部调度器使 go 代码在一个线程上跑。当 P 有任务时,就需要创建或者唤醒一个系统线程来执行它队列里的任务,所以 P 和 M 是相互绑定的。总的来说,P 可以根据实际情况开启协程去工作,它包含了运行 goroutine 的资源,如果线程想运行 goroutine,必须先获取 P,P 中还包含了可运行的 G 队列。

go垃圾回收,什么时候触发

主动触发(手动触发),通过调用 runtime.GC 来触发GC,此调用阻塞式地等待当前GC运行完毕。
被动触发,分为两种方式:
1)使用步调(Pacing)算法,其核心思想是控制内存增长的比例,每次内存分配时检查当前内存分配量是否已达到阈值(环境变量GOGC):默认100%,即当内存扩大一倍时启用GC。
2)使用系统监控,当超过两分钟没有产生任何GC时,强制触发 GC。

gc算法有哪些?

常见的垃圾回收算法有以下几种:

引用计数 :对每个对象维护一个引用计数,当引用该对象的对象被销毁时,引用计数减1,当引用计数器为0时回收该对象。
优点:对象可以很快的被回收,不会出现内存耗尽或达到某个阀值时才回收。
缺点:不能很好的处理循环引用,而且实时维护引用计数,有也一定的代价。
代表语言:Python、PHP
标记-清除:从根变量开始遍历所有引用的对象,引用的对象标记为”被引用”,没有被标记的进行回收。
优点:解决了引用计数的缺点。
缺点:需要STW,即要暂时停掉程序运行。
代表语言:Golang(其采用三色标记法)
分代收集:按照对象生命周期长短划分不同的代空间,生命周期长的放入老年代,而短的放入新生代,不同代有不能的回收算法和回收频率。
优点:回收性能好
缺点:算法复杂
代表语言: JAVA
三色标记法
1)初始状态下所有对象都是白色的。
2)从根节点开始遍历所有对象,把遍历到的对象变成灰色对象
3)遍历灰色对象,将灰色对象引用的对象也变成灰色,然后将遍历过的灰色对象变成黑色对象。
4)循环步骤3,直到灰色对象全部变黑色。
5)回收所有白色对象(垃圾)。

make 与 new 的区别

引用类型与值类型

  • 引用类型 变量存储的是一个地址,这个地址存储最终的值。内存通常在堆上分配。通过 GC 回收。包括 指针、slice 切片、管道 channel、接口 interface、map、函数等。

  • 值类型是 基本数据类型,int,float,bool,string, 以及数组和 struct 特点:变量直接存储值,内存通常在栈中分配,栈在函数调用后会被释放

  • 对于引用类型的变量,我们不光要声明它,还要为它分配内容空间

  • 对于值类型的则不需要显示分配内存空间,是因为go会默认帮我们分配好

new()

1
func new(Type) *Type

new()对类型进行内存分配,入参为类型,返回为类型的指针,指向分配类型的内存地址

make()

1
func make(t Type, size ...IntegerType) Type

make()也是用于内存分配的,但是和new不同,它只用于channel、map以及切片的内存创建,而且它返回的类型就是这三个类型本身,而不是他们的指针类型,因为这三种类型就是引用类型,所以就没有必要返回他们的指针了。

注意,因为这三种类型是引用类型,所以必须得初始化,但是不是置为零值,这个和new是不一样的。

简而言之make()用于初始化slice, map, channel等内置数据结构

如何判断channel是否关闭?

  • 读channel的时候判断其是否已经关闭

    _,ok := <- jobs

    此时如果 channel 关闭,ok 值为 false

  • 写入channel的时候判断其是否已经关闭

  1. _,ok := <- jobs

    此时如果 channel 关闭,ok 值为 false,如果 channel 没有关闭,则会漏掉一个 jobs中的一个数据

  2. 使用 select 方式

    再创建一个 channel,叫做 timeout,如果超时往这个 channel 发送 true,在生产者发送数据给 jobs 的 channel,用 select 监听 timeout,如果超时则关闭 jobs 的 channel。

go 的锁是可重入的吗?

不是可重入锁。

讨论这个问题前,先解释一下“重入”这个概念。当一个线程获取到锁时,如果没有其他线程拥有这个锁,那么这个线程
就会成功获取到这个锁。线程持有这个锁后,其他线程再请求这个锁,其他线程就会进入阻塞等待的状态。但是如果游泳这个锁的
线程再请求这把锁的话,就不会阻塞,而是成功返回,这就是可重入锁。可重入锁也叫做递归锁。
为什么 go 的锁不是可重入锁,因为 Mutex 的实现中,没有记录哪个 goroutine 拥有这把锁。换句话说,我们可以通过
扩展来将 go 的锁变为可重入锁,这里就不展开了。下面是一个误用 Mutex 的重入例子:https://github.com/guowei-gong/go-demo/commit/a6fc236853f5cd0efd4e62269cfe60a19de7a96e

go 中用 for 遍历多次执行 goroutine会存在什么问题

  1. 假如在协程中打印for的下标i或当前下标的元素,会随机打印载体中的元素.

    原因有二:

    • golang是值拷贝传递 for循环很快就执行完了,但是创建的10个协程需要做初始化。上下文准备,堆栈,和内核态的线程映射关系的工作,是需要时间的,比for慢,等都准备好了的时候,会同时访问i。这个时候的i肯定是for执行完成后的下标。(也可能有个别的协程已经准备好了,取i的时候,正好是5,或者7,就输出了这些数字)。

      解决的方法就是闭包,给匿名函数增加入参,因为是值传递,所以每次for创建一个协程的时候,会拷贝一份i传到这个协程里面去。 或者在开启协程之前声明一个新的变量 = i。

  2. 假如当前for是并发读取文件

程序会panic:too many open files

解决的方法:通过带缓冲的channel和sync.waitgroup控制协程并发量。

空结构体占不占内存空间? 为什么使用空结构体?

准确的来说,空结构体有一个特殊起点: zerobase 变量。zerobase是一个占用 8 个字节的uintptr全局变量。每次定义 struct {} 类型的变量,编译器只是把zerobase变量的地址给出去。也就是说空结构体的变量的内存地址都是一样的。

进程、线程、协程的区别?

概念定义

进程: 进程是一个具有一定独立功能的程序关于某个数据集合上的一次运行活动,是系统资源分配和独立运行的最小单位。
线程: 线程是进程的一个执行单元,是任务调度和系统执行的最小单位。
协程: 协程是一种用户态的轻量级线程,协程的调度完全由用户控制。

进程与线程的区别

  1. 根本区别:进程是操作系统资源分配和独立运行的最小单位;线程是任务调度和系统执行的最小单位。
  2. 地址空间区别: 每个进程都有独立的地址空间,一个进程崩溃不影响其它进程;一个进程中的多个线程共享该进程的地址空间,一个线程的非法操作会使整个进程崩溃。
  3. 上下文切换开销区别: 每个进程有独立的代码和数据空间,进程之间上下文切换开销较大;线程组共享代码和数据空间,线程之间切换的开销较小。

线程和协程的区别

  1. 内存开销:创建一个协程需要2kb, 栈空间不够会自动扩容, 创建一个线程需要1M空间。
  2. 创建和销毁:创建线程是和操作系统打交道,内核态 开销更大, 协程是由runtime管理,属于用户态 开销小。
  3. 切换成本:线程切换 需要保存各种寄存器,切换时间大概在1500-2000us, 协程保存的寄存器比较少, 切换时间大概在200us, 它能执行更多的指令。

defer recover panic 执行顺序

执行顺序应该为panic、defer、recover

  • 发生panic的函数并不会立刻返回,而是先层层函数执行defer,再返回。如果有办法将panic捕获到panic,就正常处理(若是外部函数捕获到,则外部函数只执行defer),如果没有没有捕获,程序直接异常终止。
  • Go语言提供了recover内置函数。前面提到,一旦panic逻辑就会走到defer(defer必须在panic的前面!)。调用recover函数将会捕获到当前的panic,被捕获到的panic就不会向上传递了
  • 在panic发生时,在前面的defer中通过recover捕获这个panic,转化为错误通过返回值告诉方法调用者。

如何解决孤儿进程的出现

孤儿进程:一个父进程退出,而它的一个或多个子进程还在运行,那么那些子进程将成为孤儿进程。孤儿进程将被init进程(进程号为1)所收养,并由init进程对它们完成状态收集工作。

解决方案

孤儿进程结束后会被 init 进程善后,并没有危害,而僵尸进程则会一直占着进程号,操作系统的进程数量有限则会受影响。

僵尸进程:一个进程使用fork创建子进程,如果子进程退出,而父进程并没有调用wait或waitpid获取子进程的状态信息,那么子进程的进程描述符仍然保存在系统中。这种进程称之为僵死进程。

僵尸进程解决方案

进程等待—wait函数和waitpid函数
wait函数
创建一个子进程,子进程正常逻辑,父进程调用wait函数来进行等待,当子进程退出的时候,由于父进程在等待,所以子进程就不会变成僵尸进程
父进程一开始调用wait函数,就会阻塞在wait函数中,等待子进程
直到子进程退出,wait函数调用才返回,父进程接着执行wait函数之后的代码

说一下reflect

recflect是golang用来检测存储在接口变量内部(值value;类型concrete type) pair对的一种机制。它提供了两种类型(或者说两个方法)让我们可以很容易的访问接口变量内容,分别是reflect.ValueOf() 和 reflect.TypeOf()。

ValueOf用来获取输入参数接口中的数据的值,如果接口为空则返回0
TypeOf用来动态获取输入参数接口中的值的类型,如果接口为空则返回nil

Golang 里怎么避免内存逃逸?

  1. 不要盲目使用变量指针作为参数,虽然减少了复制,但变量逃逸的开销更大。
  2. 预先设定好slice长度,避免频繁超出容量,重新分配。
  3. 一个经验是,指针指向的数据大部分在堆上分配的,请注意。

出现内存逃逸的情况有:

  1. 发送指针或带有指针的值到channel,因为编译时候无法知道那个goroutine会在channel接受数据,编译器无法知道什么时候释放。

  2. 在一个切片上存储指针或带指针的值。比如[]*string,导致切片内容逃逸,其引用值一直在堆上。

  3. 切片的append导致超出容量,切片重新分配地址,切片背后的存储基于运行时的数据进行扩充,就会在堆上分配。

  4. 调用接口类型时,接口类型的方法调用是动态调度,实际使用的具体实现只能在运行时确定,如一个接口类型为io.Reader的变量r,对r.Read(b)的调用将导致r的值和字节片b的后续转义并因此分配到堆上。

  5. 在方法内把局部变量指针返回,被外部引用,其生命周期大于栈,导致内存溢出。

defer的执行顺序

  1. 一个函数中多个defer的执行顺序

defer的作用就是把defer关键字之后的函数压入一个栈中延迟执行,多个defer的执行顺序是后进先出

  1. defer、return、返回值的执行返回顺序

return最先执行,先将结果写入返回值中(即赋值);接着defer开始执行一些收尾工作;最后函数携带当前返回值退出(即返回值)

go init 的执行顺序,注意是不按导入规则的(这里是编译时按文件名的顺序执行的)

  1. init函数是用于程序执行前做包的初始化的函数,比如初始化包里的变量等
  2. 每个包可以拥有多个init函数
  3. 包的每个源文件也可以拥有多个init函数
  4. 同一个包中多个init函数的执行顺序go语言没有明确的定义(说明)
  5. 不同包的init函数按照包导入的依赖关系决定该初始化函数的执行顺序
  6. init函数不能被其他函数调用,而是在main函数执行之前,自动被调用

栈内存(协程栈、调用栈)

  • go的协程栈位于go堆内存
  • go堆内存位于操作系统虚拟内存

主要作用:

  • 协程的执行路径
  • 局部变量
  • 函数参数
  • 返回值

下面的go程序简述了协程栈的工作流程

1
2
3
4
5
6
7
8
9
10
11
func sum(a, b int) {
sum := 0
sum = a+b
return sum
}

func main() {
a:=3
b:=5
print(sum(a, b))
}

img.png

参数传递

  • Go使用参数拷贝(深拷贝)
  • 传递结构体会拷贝结构体全部内容
  • 传递结构体指针,会拷贝结构体指针

协程栈作用总结

  • 协程栈记录了协程的执行现场
  • 协程栈还记录局部变量,函数参数和返回值
  • Go的函数参数是值传递

协程栈不够大怎么办?

引起协程栈不够大的主要原因:

  • 本地变量太多
  • 栈帧太多

本地变量太多

当协程栈空间不够大会通过变量从栈逃逸到堆上来得到缓解,因而产生逃逸问题

  • 指针逃逸
    • 函数返回了对象的指针
      1
      2
      3
      4
      5
      6
      7
      8
      9
      func point() *int {
      a := 0
      return &a
      }

      func main() {
      i := point()
      fmt.Println(i)
      }
      go build -gcflags=-m escape.go
      1
      2
      3
      4
      5
      6
      7
      # command-line-arguments
      ./escape.go:5:6: can inline point
      ./escape.go:11:12: inlining call to point
      ./escape.go:12:13: inlining call to fmt.Println
      ./escape.go:6:2: moved to heap: a
      ./escape.go:11:12: moved to heap: a
      ./escape.go:12:13: ... argument does not escape
  • 空接口逃逸
    • 如果函数的参数是interface,函数的实参很可能会逃逸
      1
      2
      3
      4
      5
      6
      7
      8
      func intf() {
      b := 0
      fmt.Println(b)
      }

      func main() {
      intf()
      }
      1
      2
      3
      4
      5
      6
      go build -gcflags=-m escape.go
      # command-line-arguments
      ./escape.go:7:13: inlining call to fmt.Println
      ./escape.go:10:6: can inline main
      ./escape.go:7:13: ... argument does not escape
      ./escape.go:7:13: b escapes to heap

      因为interface{}类型的函数往往使用反射,反射往往要求反射的对象在堆上

  • 大变量逃逸
    • 一般在64位机器,超过64k的变量会发生逃逸

逃逸原因

  • 不是所有的变量都能放在协程栈
  • 栈帧回收后,需要继续使用的变量
  • 变量太大

栈帧太多

go的栈初始大小为2k,必要时会对栈进行扩容,在1.13版本前使用分段栈,后期使用连续栈

分段栈

优点:没有空间浪费

缺点:伸缩时栈指针会在不连续的空间来回反复横跳

连续栈

优点:空间是连续的

缺点:伸缩时开销大,需要将旧空间拷贝过来,所以当空间不足发生扩容时,变为原来的2倍
为了减少伸缩时的开销,当空间使用率不足1/4时,变为原来的1/2

堆内存

在64位操作系统中

  • Go每次申请的虚拟内存单元位64MB
  • 最多有2^20虚拟内存单元
  • 内存单元也叫headArena
  • 所有的headArena组成了mheap(Go堆内存)
    img_7.png

    操作系统虚拟内存的最大容量是由计算机的地址结构(CPU寻址范围)确定的,虛拟内存的实际容量=min(内存和外存容量之和,CPU寻址范围)

如:某计算机地址结构为32位,按字节编址,内存大小为512MB,外存大小为2GB。 则虚拟内存的最大容量为2^32B= 4GB

虚拟内存的实际容量 =min (2^32B,512MB+2GB)=2GB+512MB

headArena分配方式

  • 线性分配(线性顺序分配)
    img_1.png
  • 链表分配
    img_2.png
  • 分级分配
    img_5.png
    分级分配可以看成将内存整理成不同规格按需进行分配

线性分配和链表分配都会产生外部内存碎片

内存管理单元mspan

  • mspan是内存最小使用单位
  • 每个mspan为N个相同大小的“格子
  • 一共有67中mspan
    mspan规格
    img_4.png

中心索引(mcentral)

go分配内存时为了达到按需分配,通过中心索引能实现快速查找到所需规格的内存,mcentral被放在mspan链表头

  • 共需要有134个mcentral
  • 67个组用来标记需要GC扫描的mspan,如堆中的对象
  • 67个组用来标记不需要GC扫描的mspan,如常量

img_3.png

mcentral性能问题

  • mcentral使用互斥锁保护
  • 高并发场景下存在锁冲突
  • 参考协程GMP模型,增加线程本地缓存

线程缓存mcache(本地缓存)

  • 每个P拥有一个mcache
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    type p struct {
    id int32
    status uint32 // one of pidle/prunning/...
    link puintptr
    schedtick uint32 // incremented on every scheduler call
    syscalltick uint32 // incremented on every system call
    sysmontick sysmontick // last tick observed by sysmon
    m muintptr // back-link to associated m (nil if idle)
    mcache *mcache
    ...
    }
  • 一个mcache拥有134种mspan,67个需要GC的和67个不需要GCmspan
    img_6.png

headArena结构总结

  • GO使用heapArena向操作系统申请内存
  • 使用heapArena时,以mspan为单位,防止碎片化
  • mcentralmspan们的中心索引
  • mcache记录了分配给各个P的本地mspan

对象分级

Go分配内存时变量对象分为3个级别

  • Tiny微对象无指针(<16B以内)
  • Small小对象(16k-32k之间)
  • Large大对象无指针(>32K以上)
  • 微小对象分配至普通的mspan(class1~class67)
    • mcache拿到class2级别mspan
    • 多个微对象合并成一个16byte存入到mspan(class2)的一个小单元
      img.png
  • 大对象分配至0级mspan(class0)

垃圾回收

什么样对象需要垃圾回收

垃圾回收思路

  • 标记-清除
    • 标记后直接清除
    • 优点:逻辑简单
    • 问题:会有内存碎片产生
  • 标记-整理
    • 将碎片化内存整理后,清除多余的碎片
    • 优点,没有内存碎片
    • 问题:整理过程cpu开销大
  • 标记-复制
    • 复制一块新的内存,然后将旧的内存块上标记的内存整理到新的内存块上
    • 优点,无内存碎片,内存复制快
    • 问题:浪费空间

Go因为有独特的内存结构规格管理优势,直接选择最简单的标记-清除

标记

把根数据段上的数据作为root,基于他们进行进一步的追踪,追踪到的数据就进行标记,最后把没有标记的对象当作垃圾进行释放,是Go的GC的核心原理

  • 被栈上的指针引用(逃逸到堆上的内存变量)
  • 被全局变量指针引用
  • 被寄存器指针引用
  • 上述变量被称为Root Set(GCROOT)
    img_1.png

    通过DFS搜索除了G和H剩下的都不能GC掉

GC方式

首先一个重要的概念:
STWstop the word,指程序执行过程中,中断暂停程序逻辑,专门去进行垃圾回收。

串行GC

  • 开启STW
  • 通过DFS找到无用内存
  • 释放堆内存
  • 停止STW

并行GC

Go采用三色标记法实现并行GC
黑色:有用已经分析扫描
黑色:有用还未分析扫描
白色:暂时无用

三色标记法

三色标记法过程

起初所有堆上的对象都是白色的
img_2.png
GC开始,遍历堆栈root,将直接可达的对象标记为灰色
img_4.png
遍历灰色结点,将直接可达的对象标记为灰色,自身标记为黑色
img_3.png
继续执行第三步同样的步骤,直到所有能够访问到的结点都被标记为黑色
img_5.png
回收所有白色标记的对象。
img_6.png
再次标记时,所有对象恢复为白色
img_7.png

三色标记法并行时的问题

三色标记法在并发标记过程中会出现误回收情况:

由于是并发执行过程,如果不开启STW,GC分析过的某个白色标记,此时被业务代码进行了新的引用,被引用到之前的一个灰色的标记对象,那么此时这个变量应该就不是白色了,但是由于GC已经分析过前面的变量引用关系了,就不会把这个变量标记成灰色和黑色,导致误回收
初始状态的对象结构
img_8.png
C在标记中间时中被E引用,由于E是一开始就被分析过了,所以不再会重新分析E,导致C被误回收
img_9.png

删除写屏障

原理:当一个白色对象被另外一个对象时解除引用时,将该被引用对象标记为灰色(白色对象被保护)

缺点:如上面的例子,如果一开是B指向了C,B和C在断开后才会将C标记为白色,由于上述示例C在一开始就没有被引用过,所以不会被标记为黑色

插入写屏障

原理:当一个对象引用另外一个对象时,将另外一个对象标记为灰色。

插入写屏障可以杜绝堆空间新增的被引用的指针误回收的情况;
但是由于栈容量小,反应速度要求高,不能用插入屏障的机制。因此,在堆对象扫描完之后,为了不引发误回收,会对栈对象STW,然后通过三色并发标记清扫,完成GC。

混合写屏障

可以看到之前的插入屏障和删除屏障有明显的自身缺陷:

插入屏障:需要对栈对象重新STW遍历
删除屏障:回收精度低

GO 1.8采用了混合写屏障,混合写屏障,就是结合两者优势,又中和两者的劣势。混合写屏障减少STW,并且减少了扫描栈对象的时间。混合写屏障会做如下操作:

  • GC开始时,将栈全部可达对象标记为黑色
  • GC期间,任何在栈上新创建的对象,均为黑色。
    将栈上的可达对象全部标黑,扫描过程中,如果某个groutine栈的对象出现引用关系变更,进行STW,但是不会对整个栈STW
  • 被删除的对象标记为灰色
  • 被添加的对象标记为灰色
总结

GoV1.8三色标记法加混合写屏障机制,栈空间不启动屏障机制,堆空间启动屏障机制。整个过程几乎不需要STW,效率较高。

GC优化

尽量减少堆上的垃圾

  • 内存池化
  • 减少逃逸
  • 尽量使用空结构体

观察GC

1
GODEBUG=gctrace=1 ./main

字段含义:
img_10.png

context包内部如何实现的?

context是 Go 语言在 1.7 版本中引入标准库的接口。context主要用于父子任务之间的同步取消信号,本质上是一种协程调度的方式。另外在使用context时有两点值得注意:上游任务仅仅使用context通知下游任务不再需要,但不会直接干涉和中断下游任务的执行,由下游任务自行决定后续的处理操作,也就是说context的取消操作是无侵入的;context是线程安全的,因为context本身是不可变的(immutable),因此可以放心地在多个协程中传递使用。

方法:

  • Deadline — 返回 context.Context 被取消的时间,也就是完成工作的截止日期;
  • Done — 返回一个 Channel,这个 Channel 会在当前工作完成或者上下文被取消后关闭,多次调用 Done 方法会返回同一个 Channel;
  • Err — 返回 context.Context 结束的原因,它只会在 Done 方法对应的 Channel 关闭时返回非空的值;
    • 如果 context.Context 被取消,会返回 Canceled 错误;
    • 如果 context.Context 超时,会返回 DeadlineExceeded 错误;
  • Value — 从 context.Context 中获取键对应的值,对于同一个上下文来说,多次调用 Value 并传入相同的 Key 会返回相同的结果,该方法可以用来传递请求特定的数据;

设计原理

在 Goroutine 构成的树形结构中对信号进行同步以减少计算资源的浪费是 context.Context 的最大作用。Go 服务的每一个请求都是通过单独的 Goroutine 处理的,HTTP/RPC 请求的处理器会启动新的
Goroutine 访问数据库和其他服务。

(context树结构)

img.png

每一个 context.Context 都会从最顶层的 Goroutine 一层一层传递到最下层。context.Context 可以在上层 Goroutine 执行出现错误时,将信号及时同步给下层。

如果不使用 context,当最上层的 Goroutine 因为某些原因执行失败时,下层的 Goroutine 由于没有接收到这个信号所以会继续工作;但是当我们正确地使用 context.Context 时,就可以在下层及时停掉无用的工作以减少额外资源的消耗。

多个 Goroutine 同时订阅 ctx.Done() 管道中的消息,一旦接收到取消信号就立刻停止当前正在执行的工作。

默认上下文

context 包中最常用的方法还是 context.Background、context.TODO,这两个方法都会返回预先初始化好的私有变量 background 和 todo,它们会在同一个 Go 程序中被复用。

  • context.Background 是上下文的默认值,所有其他的上下文都应该从它衍生出来;
  • context.TODO 应该仅在不确定应该使用哪种上下文时使用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func Background() Context {
return background
}
func TODO() Context {
return todo
}
这两个私有变量都是通过 new(emptyCtx) 语句初始化的,它们是指向私有结构体
context.emptyCtx 的指针,这是最简单、最常用的上下文类型。

////////////////// emptyCtx ///////////////////
type emptyCtx int
func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
return
}
func (*emptyCtx) Done() <-chan struct{} {
return nil
}
func (*emptyCtx) Err() error {
return nil
}
func (*emptyCtx) Value(key interface{}) interface{} {
return nil
}

取消信号

context.WithCancel 函数能够从 context.Context 中衍生出一个新的子上下文并返回用于取消该上下文的函数。一旦我们执行返回的取消函数,当前上下文以及它的子上下文都会被取消,所有的 Goroutine
都会同步收到这一取消信号。

Context 子树的取消

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
//  context.WithCancel 函数的实现
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
c := newCancelCtx(parent)
propagateCancel(parent, &c)
return &c, func() { c.cancel(true, Canceled) }
}

func propagateCancel(parent Context, child canceler) {
done := parent.Done()
if done == nil {
return // 父上下文不会触发取消信号
}
select {
case <-done:
child.cancel(false, parent.Err()) // 父上下文已经被取消
return
default:
}
if p, ok := parentCancelCtx(parent); ok {
p.mu.Lock()
if p.err != nil {
child.cancel(false, p.err)
} else {
p.children[child] = struct{}{}
}
p.mu.Unlock()
} else {
go func () {
select {
case <-parent.Done():
child.cancel(false, parent.Err())
case <-child.Done():
}
}()
}
}
  • context.newCancelCtx 将传入的上下文包装成私有结构体 context.cancelCtx;
  • context.propagateCancel 会构建父子上下文之间的关联,当父上下文被取消时,子上下文也会被取消:

propagateCancel 可能出现的情况:

  • 当 parent.Done() == nil,也就是 parent 不会触发取消事件时,当前函数会直接返回;
  • 当 child 的继承链包含可以取消的上下文时,会判断 parent 是否已经触发了取消信号;
    • 如果已经被取消,child 会立刻被取消;
    • 如果没有被取消,child 会被加入 parent 的 children 列表中,等待 parent 释放取消信号;
  • 当父上下文是开发者自定义的类型、实现了 context.Context 接口并在 Done() 方法中返回了非空的管道时;
    • 运行一个新的 Goroutine 同时监听 parent.Done() 和 child.Done() 两个 Channel;
    • 在 parent.Done() 关闭时调用 child.cancel 取消子上下文; context.propagateCancel 的作用是在 parent 和 child 之间同步取消和结束的信号,保证在 parent 被取消时,child 也会收到对应的信号,不会出现状态不一致的情况。

context.cancelCtx 实现的几个接口方法也没有太多值得分析的地方,该结构体最重要的方法是 context.cancelCtx.cancel,该方法会关闭上下文中的 Channel 并向所有的子上下文同步取消信号

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
return
}
c.err = err
if c.done == nil {
c.done = closedchan
} else {
close(c.done)
}
for child := range c.children {
child.cancel(false, err)
}
c.children = nil
c.mu.Unlock()
if removeFromParent {
removeChild(c.Context, c)
}
}

context 包中的另外两个函数 context.WithDeadline 和 context.WithTimeout 也都能创建可以被取消的计时器上下文 context.timerCtx

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
return WithDeadline(parent, time.Now().Add(timeout))
}
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
if cur, ok := parent.Deadline(); ok && cur.Before(d) {
return WithCancel(parent)
}
c := &timerCtx{
cancelCtx: newCancelCtx(parent),
deadline: d,
}
propagateCancel(parent, c)
dur := time.Until(d)
if dur <= 0 {
c.cancel(true, DeadlineExceeded) // 已经过了截止日期
return c, func() { c.cancel(false, Canceled) }
}
c.mu.Lock()
defer c.mu.Unlock()
if c.err == nil {
c.timer = time.AfterFunc(dur, func() {
c.cancel(true, DeadlineExceeded)
})
}
return c, func() { c.cancel(true, Canceled) }
}

context.WithDeadline 在创建 context.timerCtx 的过程中判断了父上下文的截止日期与当前日期,并通过 time.AfterFunc 创建定时器,当时间超过了截止日期后会调用 context.timerCtx.cancel 同步取消信号。
context.timerCtx 内部不仅通过嵌入 context.cancelCtx 结构体继承了相关的变量和方法,还通过持有的定时器 timer 和截止时间 deadline 实现了定时取消的功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type timerCtx struct {
cancelCtx
timer *time.Timer // Under cancelCtx.mu.
deadline time.Time
}
func (c *timerCtx) Deadline() (deadline time.Time, ok bool) {
return c.deadline, true
}
func (c *timerCtx) cancel(removeFromParent bool, err error) {
c.cancelCtx.cancel(false, err)
if removeFromParent {
removeChild(c.cancelCtx.Context, c)
}
c.mu.Lock()
if c.timer != nil {
c.timer.Stop()
c.timer = nil
c.mu.Unlock()
}

context.timerCtx.cancel 方法不仅调用了 context.cancelCtx.cancel,还会停止持有的定时器减少不必要的资源浪费。

传值方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func WithValue(parent Context, key, val interface{}) Context {
if key == nil {
panic("nil key")
}
if !reflectlite.TypeOf(key).Comparable() {
panic("key is not comparable")
}
return &valueCtx{parent, key, val}
}
type valueCtx struct {
Context
key, val interface{}
}

func (c *valueCtx) Value(key interface{}) interface{} {
if c.key == key {
return c.val
}
return c.Context.Value(key)
}

context.valueCtx 结构体会将除了 Value 之外的 Err、Deadline 等方法代理到父上下文中,它只会响应 context.valueCtx.Value 方法,该方法的实现也很简单 。

如果 context.valueCtx 中存储的键值对与 context.valueCtx.Value 方法中传入的参数不匹配,就会从父上下文中查找该键对应的值直到某个父上下文中返回 nil 或者查找到对应的值。

Array

数组(Array)是一个由固定长度的特定类型元素组成的序列,一个数组可以由零个或多个元素组成。因其长度的不可变动,数组在Go中很少直接使用。把一个大数组传递给函数会消耗很多内存。一般采用数组的切片

几种初始化方式

1
2
3
arr1 := [3]int{1, 2, 3}
arr2 := [...]int{1, 2, 3}
arr3 := [3]int{0:3,1:4}

Slice

Slice是一种数据结构,描述与Slice变量本身分开存储的Array的连续部分。 Slice不是Array。Slice描述了Array的一部分。

slice底层是一个struct

1
2
3
4
5
6
// runtime/slice.go
type slice struct {
array unsafe.Pointer// 指向数组的指针
len int
cap int
}

创建slice的几种方法

1
2
3
4
5
6
7
8
9
10
11
12
13
// 直接通过make创建,可以指定len、cap
s4 := make([]int, 5, 10)

// 通过数组/slice 切片生成
var data [10]int
s2 := data[2:8]
s3 := s2[1:3]

// append()
s6 = append(s4,6)

// 直接创建
s1 := []int{1, 2}

append() 底层逻辑

  1. 计算追加后slice的总长度n
  2. 如果总长度n大于原cap,则调用growslice func进行扩容(cap最小为n,具体扩容规则见growslice)
  3. 对扩容后的slice进行切片,长度为n,获取slice s,用以存储所有的数据
  4. 根据不同的数据类型,调用对应的复制方法,将原slice及追加的slice的数据复制到新的slice

growslice 计算cap的逻辑

  1. 原cap扩容一倍,即doublecap
  2. 如果指定cap大于doublecap则使用cap,否则执行如下
  3. 如果原数据长度小于1024,则使用doublecap
  4. 否则在原cap的基础上每次扩容1/4,直至不小于cap

1.18更新

已经不是 doublecap

Go:v1.18对扩容策略进行了优化,主要是在1.17的基础上对第二点进行了优化

  1. 当期望容量 > 两倍的旧容量时,直接使用期望容量作为新切片的容量
  2. 增加一个阈值并固定为一个常量(threshold),如果旧容量小于这个阈值,则直接使用两倍的旧容量,如果大于等于阈值,那么会进入一个循环,每次增加大概1.3~2倍(取决于threshold),直到大于期望容量
    源码大致如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// Go 1.18的扩容实现代码如下,et是切片里的元素类型,old是原切片,cap等于原切片的长度+append新增的元素个数。
func growslice(et *_type, old slice, cap int) slice {
// ...
newcap := old.cap
doublecap := newcap + newcap
if cap > doublecap {
newcap = cap
} else {
const threshold = 256
if old.cap < threshold {
newcap = doublecap
} else {
// Check 0 < newcap to detect overflow
// and prevent an infinite loop.
for 0 < newcap && newcap < cap {
// Transition from growing 2x for small slices
// to growing 1.25x for large slices. This formula
// gives a smooth-ish transition between the two.
newcap += (newcap + 3*threshold) / 4
}
// Set newcap to the requested cap when
// the newcap calculation overflowed.
if newcap <= 0 {
newcap = cap
}
}
}

newcap += (newcap + 3*threshold) / 4
newcap是扩容后的容量,先根据原切片的长度、容量和要添加的元素个数确定newcap大小,最后再对newcap做内存对齐得到最后的newcap。

扩容的整体逻辑(对应上述append()的2)

  1. 按照原slice的cap及指定cap计算扩容后的cap
  2. 根据计算出cap申请内存(创建新的数组)
  3. 将原slice的数据拷贝到新内存中(新数组)
  4. 返回新slice,新slilce指向新数组,len为原slice的len,cap为扩容后的cap

正常我们使用,因slice的长度相对较小,append是扩容使用的是doublecap。
使用append后会产生新的slice,必须重新赋值到原slice上,才能更新原slice的数据。

典型例题

1
2
3
4
data := [10]int{}
slice := data[5:8]
slice = append(slice,9)// slice=? data=?
slice = append(slice,10,11,12)// slice=? data=?
1
2
3
4
5
6
//第一次append后结果
slice=[0 0 0 9]
data=[0 0 0 0 0 0 0 0 9 0]
//第二次append后结果
[0 0 0 9 10 11 12]
[0 0 0 0 0 0 0 0 9 0]

可以看到第一次append的结果影响到了原data的数据,第二次append的结果并没有影响到了data的数据,这是为什么呢?

未append前,slice的cap是5。第一次append一个元素,未超出cap,因此直接存入数据到数组中。第二次append三个元素,append后的元素长度为7,已大于原slice的cap,因此slice需要扩容,扩容后创建了新的数组,复制了data的数据到新数组内,然后存入append的数据,变动的是新数组,原数组data自然不受影响。

append存在对原数据影响的情况,使用时还是需要注意,如有必要,先copy原数据后再进行slice的操作。

总结

  1. slice本身并非指针,append追加元素后,意味着底层数组数据(或数组)、len、cap会发生变化,因此append后需要返回新的slice。

  2. append在追加元素时,当前cap足够容纳元素,则直接存入数据,否则需要扩容后重新创建新的底层数组,拷贝原数组元素后,再存入追加元素。

  3. cap的扩容意味着内存的重新分配,数据的拷贝等操作,为了提高append的效率,若是能预估cap的大小的话,尽量提前声明cap,避免后期的扩容操作。

Semaphore

信号量是在并发编程中常见的一种同步机制,在需要控制访问资源的进程数量时就会用到信号量,它会保证持有的计数器在 0 到初始化的权重之间波动。

  • 每次获取资源时都会将信号量中的计数器减去对应的数值,在释放时重新加回来;
  • 当遇到计数器大于信号量大小时,会进入休眠等待其他线程释放信号;

在go的锁中的底层结构体实现过程中可以看到sema这个关键词,如go的互斥锁或读写锁

1
2
3
4
5
6
7
8
9
10
11
type Mutex struct {
state int32
sema uint32
}
type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
readerCount int32 // number of pending readers
readerWait int32 // number of departing readers
}

信号量机制

回顾下操作系统中信号量编程的机制

用户进程可以通过使用操作系统提供的一对原语来对信号量进行操作,从而很方便的实现了进程互斥、进程同步

信号量其实就是一个变量(可以是一个整数,也可以是更复杂的记录型变量),可以用一个信号量来表示系统中某种资源的数量,比如:系统中只有一台打印机,就可以设置一个初值为1的信号量 。

原语是一种特殊的程序段,其执行只能一气呵成,不可被中断。 原语是由关中断/开中断指令实现 的。软件解决方案的主要问题是由“进入区的各种操作无法一气呵成”
因此如果能把进入区、退出区的操作都用, “原语”实现,使这些操作能 一气呵成”就能避免问题。

一对原语wait(s) 原语 和 signall(S)原语, 可以把原语理解为我们自己写的函数,函数名分别为 waitsignal,括号里的信号量S 其实就是两数调用时传入的一个参数。

waitsignal 原语常简称为PV操作(来自荷兰语 proberen 和 verhogen)

我们可以把信号量机制互斥用如下代码实现表述出来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
typedef struct {
int value
struct process *L
} semaphore

void wait (semaphore S) {
S.value--;
if (S.value < 0 ) {
block(S.L)
}
}
void signal (semaphore S) {
S.value++;
if (S.value <= 0 ) {
wakeup(S.L);
}
}

信号量机制通过休眠队列和wakeup(唤醒)block(挂起)机制实现

go的Semaphore的实现

go里面sema的实现主要在runtime/sema.go文件中

数据结构

go的runtime有一个全局变量semtable,它放置了所有的信号量。

1
2
3
4
5
6
7
8
9
var semtable semTable

// Prime to not correlate with any user patterns.
const semTabSize = 251

type semTable [semTabSize]struct {
root semaRoot
pad [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte
}

每个信号量使用semaRoot结构体来表示

1
2
3
4
5
type semaRoot struct {
lock mutex
treap *sudog // 平衡树的根节点
nwait uint32 // Number of waiters. Read w/o the lock.
}

原语P

原语P即是wait,在并发编程信号同步过程中用来进行阻塞等待,go的sema主要通过semacquire1来实现wait,通过sync.runtime_Semacquire来调用

semacquire1流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) {
gp := getg()
if gp != gp.m.curg {
throw("semacquire not on the G stack")
}

// Easy case.
if cansemacquire(addr) {
return
}

// Harder case:
// increment waiter count
// try cansemacquire one more time, return if succeeded
// enqueue itself as a waiter
// sleep
// (waiter descriptor is dequeued by signaler)
s := acquireSudog()
root := semtable.rootFor(addr)
t0 := int64(0)
s.releasetime = 0
s.acquiretime = 0
s.ticket = 0
if profile&semaBlockProfile != 0 && blockprofilerate > 0 {
t0 = cputicks()
s.releasetime = -1
}
if profile&semaMutexProfile != 0 && mutexprofilerate > 0 {
if t0 == 0 {
t0 = cputicks()
}
s.acquiretime = t0
}
for {
lockWithRank(&root.lock, lockRankRoot)
// Add ourselves to nwait to disable "easy case" in semrelease.
atomic.Xadd(&root.nwait, 1)
// Check cansemacquire to avoid missed wakeup.
if cansemacquire(addr) {
atomic.Xadd(&root.nwait, -1)
unlock(&root.lock)
break
}
// Any semrelease after the cansemacquire knows we're waiting
// (we set nwait above), so go to sleep.
root.queue(addr, s, lifo)
goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes)
if s.ticket != 0 || cansemacquire(addr) {
break
}
}
if s.releasetime > 0 {
blockevent(s.releasetime-t0, 3+skipframes)
}
releaseSudog(s)
}

分步骤分析

1
2
3
4
   gp := getg()
if gp != gp.m.curg {
throw("semacquire not on the G stack")
}

这一段主要是用来获取sema的当前协程栈,如果拿不到的话会抛出异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
if cansemacquire(addr) {
return
}
func cansemacquire(addr *uint32) bool {
for {
v := atomic.Load(addr)
if v == 0 {
return false
}
if atomic.Cas(addr, v, v-1) {
return true
}
}
}

atomic.Loadatomic.Cas是原语操作:这段代码可以理解为,判断addr==0,如果为0说明addr被获取过了,要去走下面的流程判断是否需要阻塞,不为0说明addr拿到成功,对addr进行-1操作,此时函数直接return,不会发生阻塞

atomic.Cas的汇编实现

1
2
3
4
5
6
7
8
TEXT ·Cas(SB),NOSPLIT,$0-17
MOVQ ptr+0(FP), BX
MOVL old+8(FP), AX
MOVL new+12(FP), CX
LOCK
CMPXCHGL CX, 0(BX)
SETEQ ret+16(FP)
RET

基于汇编对cpu硬件加锁实现的原子操作

1
2
3
4
5
6
s := acquireSudog()
root := semtable.rootFor(addr)
t0 := int64(0)
s.releasetime = 0
s.acquiretime = 0
s.ticket = 0

这一部分就是拿到sudogroot队列

roots的作用:

  • semacquire1会在semtable数组中找一个元素和它对应上。每个元素都有一个root,这个rootTreap

  • 最后addr变成一个树节点,这个树节点,有自己的一个队列,专门放被阻塞的goroutine。叫它阻塞队列吧。 这个阻塞队列是个双端队列,头尾都可以进。

  • semacquire1把当前goroutine相关元数据放进阻塞队列之后,就挂起了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
for {
lockWithRank(&root.lock, lockRankRoot)
// 记录root等待队列数量+1
atomic.Xadd(&root.nwait, 1)
// 检测addr如果不为0了,就进行唤醒
if cansemacquire(addr) {
atomic.Xadd(&root.nwait, -1)
unlock(&root.lock)
break
}
// 将sudog协程放入到root队列
root.queue(addr, s, lifo)
// 执行挂起
goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes)
if s.ticket != 0 || cansemacquire(addr) {
break
}
}

上面这部分的死循环可以理解为阻塞的过程,在addr未唤醒之前,会将当前sudog假如阻塞队列,并挂起等待

原语V

原语V即是signal,在并发编程信号同步过程中用来进行唤醒,go的sema主要通过semrelease1来实现signal,通过sync.runtime_Semacquire来调用

1
2
3
4
//go:linkname sync_runtime_Semrelease sync.runtime_Semrelease
func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) {
semrelease1(addr, handoff, skipframes)
}

semrelease1流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
func semrelease1(addr *uint32, handoff bool, skipframes int) {
root := semtable.rootFor(addr)
atomic.Xadd(addr, 1)

// Easy case: no waiters?
// This check must happen after the xadd, to avoid a missed wakeup
// (see loop in semacquire).
if atomic.Load(&root.nwait) == 0 {
return
}

// Harder case: search for a waiter and wake it.
lockWithRank(&root.lock, lockRankRoot)
if atomic.Load(&root.nwait) == 0 {
// The count is already consumed by another goroutine,
// so no need to wake up another goroutine.
unlock(&root.lock)
return
}
s, t0 := root.dequeue(addr)
if s != nil {
atomic.Xadd(&root.nwait, -1)
}
unlock(&root.lock)
if s != nil { // May be slow or even yield, so unlock first
acquiretime := s.acquiretime
if acquiretime != 0 {
mutexevent(t0-acquiretime, 3+skipframes)
}
if s.ticket != 0 {
throw("corrupted semaphore ticket")
}
if handoff && cansemacquire(addr) {
s.ticket = 1
}
readyWithTime(s, 5+skipframes)
if s.ticket == 1 && getg().m.locks == 0 {
// Direct G handoff
// readyWithTime has added the waiter G as runnext in the
// current P; we now call the scheduler so that we start running
// the waiter G immediately.
// Note that waiter inherits our time slice: this is desirable
// to avoid having a highly contended semaphore hog the P
// indefinitely. goyield is like Gosched, but it emits a
// "preempted" trace event instead and, more importantly, puts
// the current G on the local runq instead of the global one.
// We only do this in the starving regime (handoff=true), as in
// the non-starving case it is possible for a different waiter
// to acquire the semaphore while we are yielding/scheduling,
// and this would be wasteful. We wait instead to enter starving
// regime, and then we start to do direct handoffs of ticket and
// P.
// See issue 33747 for discussion.
goyield()
}
}
}

分步骤分析

1
2
root := semtable.rootFor(addr)
atomic.Xadd(addr, 1)

到阻塞队列中拿到根结点,并对当前addr进行加1释放

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
if atomic.Load(&root.nwait) == 0 {
return
}
// 对当前代码区加锁
lockWithRank(&root.lock, lockRankRoot)
if atomic.Load(&root.nwait) == 0 {
// 计数已经被其他goroutine消费,所以不需要唤醒其他goroutine
unlock(&root.lock)
return
}
// 从阻塞队列中找到一个addr
s, t0 := root.dequeue(addr)
if s != nil {
// 等待队列计数-1
atomic.Xadd(&root.nwait, -1)
}
// 当前代码区释放锁
unlock(&root.lock)

这段代码的含义可以理解为,对当前区域加临时锁,主要目的从阻塞队列获取一个addr,然后检测下当前根结点的队列等待数量如果为0,说明都释放过了,直接 return 即可,最后对当前代码执行区域释放锁

1
2
3
4
5
6
7
8
readyWithTime(s, 5+skipframes)
func readyWithTime(s *sudog, traceskip int) {
if s.releasetime != 0 {
s.releasetime = cputicks()
}
goready(s.g, traceskip)
}

到这里会将sudog的协程唤醒执行,基本释放操作到这里就结束了,后面的代码主要是针对饥饿状态下g的处理

go的Semaphore的应用

go中在互斥锁和读写锁中都用到了Semaphore,在sync.mutexsync.rwmutex都有调用,当然,在其他结构体中比如waitgroup也有用到,这里只列出使用场景比较高的结构体,通过runtime_SemacquireMutexruntime_Semrelease实现调用
那么为什么会采用Semaphore呢,主要目的还是提高高并发场景下锁的性能,正常情况下可以通过CAS中的自旋也可以实现协程中的通信,但是自旋操作在高并发场景下对cpu资源消耗大,并且由于协程都是自旋等待的,所以当一个协程拿到锁后,其它协程会发生阻塞,影响性能,通过Semaphore中维护休眠队列,对协程进行调度,防止全局阻塞
提高了协程间的调度效率,并且在信号同步的P中,go的阻塞是通过gopark实现的,gopark类似与time.sleep,是一种挂起机制,不会大量消耗cpu资源,所以说,go的锁中利用了Semaphore实现调度,也是它在高并发场景中一种优势的体现

并行和并发

在操作系统中,一组程序按独立异步的速度执行,无论从微观还是宏观,程序都是一起执行的,我们把它叫做并行;对比地,并发是指:在同一个时间段内,两个或多个程序执行,有时间上的重叠(宏观上是同时,微观上仍是顺序执行)。

并行的计算机操作系统中多进程和多线程都不具备真正并行能力,它们通过毫秒甚至微秒级的进程(线程)切换,给人们一种错觉,它们是并行执行的,实际上严格来说,在某一时刻,cpu只能运行一个进程(线程)。

我们知道go语言的Goroutine是具备多任务处理能力的,往往都能听人说到,go的协程可以轻松实现单机并发几千甚至几万,并且具备并行的能力…针对这些听到的内容,总会有一种朦胧感,也会抱着疑惑,比如说,在生产环境中,Goroutine我开几千几万个协程是否真的没问题?它是否真的具备让任务能够达到并行的能力?
产生这些疑惑的原因根本还是我们不知道Goroutine底层是怎么运作的,也没法印证这些言论是否是正确的

想解决这些疑惑,需要弄清楚Goroutine的底层是怎么运作的,也可以说是Goroutine的调度模型

数据结构

G

Goroutine 在Go语言运行时使用私有结构体runtime.g表示。这个私有结构体非常复杂,总共包含 40 多个用于表示各种状态的成员变量,这里也不会介绍所有的字段,仅会挑选其中的一部分,首先是与栈相关的两个字段:

1
2
3
4
type g struct {
stack stack
stackguard0 uintptr
}

stack记录了lowhigh,由于是栈区信息记录,可以理解为栈顶和栈底,用于表示栈区内存范围

1
2
3
4
type stack struct {
lo uintptr
hi uintptr
}

每一个Goroutine上都持有两个分别存储 deferpanic 对应结构体的链表:

1
2
3
4
type g struct {
_panic *_panic // 最内侧的 panic 结构体
_defer *_defer // 最内侧的延迟函数结构体
}
1
2
3
4
5
6
type g struct {
m *m
sched gobuf
atomicstatus uint32
goid int64
}
  • m — 当前 Goroutine 占用的线程,可能为空;
  • atomicstatus — Goroutine的状态;
  • sched — 存储Goroutine的调度相关的数据;
  • goid — GoroutineID

上述四个字段中,我们需要展开介绍sched字段的runtime.gobuf结构体中包含哪些内容:

1
2
3
4
5
6
7
type gobuf struct {
sp uintptr
pc uintptr
g guintptr
ret sys.Uintreg
...
}
  • sp — (stack point)栈指针;
  • pc — (program counter)程序计数器;
  • g — 持有runtime.gobufGoroutine
  • ret — 系统调用的返回值;
    这些内容会在调度器保存或者恢复上下文的时候用到,其中的栈指针和程序计数器会用来存储或者恢复寄存器中的值,改变程序即将执行的代码。
    结构体runtime.gatomicstatus字段存储了当前Goroutine的状态。除了几个已经不被使用的以及与GC相关的状态之外,Goroutine可能处于以下 9 种状态:
    状态描述
    _Gidle刚刚被分配并且还没有被初始化
    _Grunnable没有执行代码,没有栈的所有权,存储在运行队列中
    _Grunning可以执行代码,拥有栈的所有权,被赋予了内核线程 M 和处理器 P
    _Gsyscall正在执行系统调用,拥有栈的所有权,没有执行用户代码,被赋予了内核线程 M 但是不在运行队列上
    _Gwaiting由于运行时而被阻塞,没有执行用户代码并且不在运行队列上,但是可能存在于 Channel 的等待队列上
    _Gdead没有被使用,没有执行代码,可能有分配的栈
    _Gcopystack栈正在被拷贝,没有执行代码,不在运行队列上
    _Gpreempted由于抢占而被阻塞,没有执行用户代码并且不在运行队列上,等待唤醒
    _GscanGC 正在扫描栈空间,没有执行代码,可以与其他状态同时存在

上述状态中比较常见是 _Grunnable_Grunning_Gsyscall_Gwaiting_Gpreempted五个状态,虽然Goroutine在运行时中定义的状态非常多而且复杂,但是我们可以将这些不同的状态聚合成三种:等待中、可运行、运行中,运行期间会在这三种状态来回切换:

  • 等待中:Goroutine 正在等待某些条件满足,例如:系统调用结束等,包括 _Gwaiting_Gsyscall_Gpreempted几个状态;
  • 可运行:Goroutine 已经准备就绪,可以在线程运行,如果当前程序中有非常多的 Goroutine,每个 Goroutine 就可能会等待更多的时间,即 _Grunnable
  • 运行中:Goroutine 正在某个线程上运行,即 _Grunning
    img_2.png

M

Go 语言并发模型中的 M 是操作系统线程。调度器最多可以创建 10000 个线程,但是其中大多数的线程都不会执行用户代码(可能陷入系统调用),最多只会有GOMAXPROCS个活跃线程能够正常运行。

在默认情况下,运行时会将GOMAXPROCS设置成当前机器的核数,我们也可以在程序中使用runtime.GOMAXPROCS来改变最大的活跃线程数。

Go 语言会使用私有结构体runtime.m表示操作系统线程,这个结构体也包含了几十个字段,这里先来了解几个与Goroutine相关的字段:

1
2
3
4
5
type m struct {
g0 *g
curg *g
...
}

g0是一个运行时中比较特殊的Goroutine,它会深度参与运行时的调度过程,包括Goroutine的创建、大内存分配和CGO函数的执行
在后面Goroutine的调度过程中,g0是负责调度工作的核心worker

P

调度器中的处理器 P 是线程和 Goroutine 的中间层,它能提供线程需要的上下文环境,也会负责调度线程上的等待队列,通过处理器 P 的调度,每一个内核线程都能够执行多个 Goroutine,它能在 Goroutine 进行一些 I/O 操作时及时让出计算资源,提高线程的利用率。

因为调度器在启动时就会创建GOMAXPROCS个处理器,所以 Go 语言程序的处理器数量一定会等于GOMAXPROCS,这些处理器会绑定到不同的内核线程上。

runtime.p是处理器的运行时表示,作为调度器的内部实现,它包含的字段也非常多,其中包括与性能追踪、垃圾回收和计时器相关的字段,这些字段也非常重要,但是在这里就不展示了,我们主要关注处理器中的线程和运行队列:

1
2
3
4
5
6
7
8
type p struct {
m muintptr
runqhead uint32
runqtail uint32
runq [256]guintptr
runnext guintptr
...
}

归纳总结

  • G — 表示 Goroutine,它是一个待执行的任务;
  • M — 表示操作系统的线程,它由操作系统的调度器调度和管理;
  • P — 表示处理器,它可以被看做运行在线程上的本地调度器;

调度

Goroutine的调度流出比较复杂,这里只列举它的调度流程,不影响流程分析的地方直截图调度的关键函数,一些重要的过程分析会贴代码分析

Go语言运行时会调用runtime.mstart以及runtime.mstart1,并调用 runtime.schedule 进入调度循环:
img_3.png
其中在mstart1中会启动g0 Goroutine
img_4.png
mstart1最后调用schedule开始进行调度工作
img_5.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func schedule() {
_g_ := getg()

top:
var gp *g
var inheritTime bool

if gp == nil {
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
gp = globrunqget(_g_.m.p.ptr(), 1)
unlock(&sched.lock)
}
}
if gp == nil {
gp, inheritTime = runqget(_g_.m.p.ptr())
}
if gp == nil {
gp, inheritTime = findrunnable()
}

execute(gp, inheritTime)
}

runtime.schedule 函数会从下面几个地方查找待执行的 Goroutine

  • 为了保证公平,当全局运行队列中有待执行的 Goroutine 时,通过 schedtick 保证有一定几率会从全局的运行队列中查找对应的 Goroutine
  • 从处理器本地的运行队列中查找待执行的 Goroutine
  • 如果前两种方法都没有找到 Goroutine,会通过 runtime.findrunnable 进行阻塞地查找 Goroutine

通过schedule获取的Goroutine会调用execute,通过runtime.gogo将 Goroutine 调度到当前线程上。
img_6.png

runtime·gogo通过汇编插入指令在新的协程栈中插入goexit栈帧,目的是为了在后面执行完业务函数后能够回调回来,
同时MOVL gobuf_pc(BX)指令还会记录执行函数的程序计数器,最后执行JMP跳转到程序计数器代码位置执行代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
TEXT runtime·gogo(SB), NOSPLIT, $8-4
MOVL buf+0(FP), BX // 获取调度信息
MOVL gobuf_g(BX), DX
MOVL 0(DX), CX // 保证 Goroutine 不为空
get_tls(CX)
MOVL DX, g(CX)
MOVL gobuf_sp(BX), SP // 将 runtime.goexit 函数的 PC 恢复到 SP 中
MOVL gobuf_ret(BX), AX
MOVL gobuf_ctxt(BX), DX
MOVL $0, gobuf_sp(BX)
MOVL $0, gobuf_ret(BX)
MOVL $0, gobuf_ctxt(BX)
MOVL gobuf_pc(BX), BX // 获取待执行函数的程序计数器
JMP BX

Goroutine中运行的函数返回时,程序会跳转到runtime.goexit所在位置执行该函数 :

1
2
3
4
5
6
TEXT runtime·goexit(SB),NOSPLIT,$0-0
CALL runtime·goexit1(SB)

func goexit1() {
mcall(goexit0)
}

注意goexit0,该函数在g0栈上,可在runtime.goexit0找到

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func goexit0(gp *g) {
_g_ := getg()

casgstatus(gp, _Grunning, _Gdead)
gp.m = nil
...
gp.param = nil
gp.labels = nil
gp.timer = nil

dropg()
gfput(_g_.m.p.ptr(), gp)
schedule()
}

在最后runtime.goexit0会重新调用runtime.schedule触发新一轮的Goroutine调度,Go语言中的运行时调度循环会从runtime.schedule开始,最终又回到runtime.schedule,我们可以认为调度循环永远都不会终止。

调度流程

整个调度链路比较长,我们可以通过一张图来整理下它的工作流程,其中业务代码逻辑为

1
2
3
4
5
6
7
8
go do1()
func do1() {
...
return do2()
}
func do2() {
....
}

img_8.png

GMP模型

通过上述流程我们可以归纳出gmp模型

img_1.png

总结

再回到一开始的问题,

  • 再生产环境中,goroutine我开几千几万个协程是否真的没问题?
    • 针对问题1,我们已经有了很明确的答案,go协程和g1协程的并发执行是在M(线程)中执行的,由于不需要保存线程切换时的cpu上下文信息,是很轻量的,它是通过在用户空间中维护了一套Goroutine的调度管理池子,操作系统中并不知道Goroutine存在,不涉及到cpu上下文切换,所以可以很明确的回答是可以很轻松实现单机创建上千甚至上万个协程的
      当然也不是任何时候都可以这样创建,当发生系统调用时,需要m线程发生系统调用,所以会产生两次用户态内核态的切换,这时会涉及到cpu上下文切换,这一点还是需要注意的
      至于为什么不在协程里去做系统调用,这一点我们在进程与线程-线程实现方式小节中阐述过用户空间实现的线程的缺点,同时,goroutine的这种协程实现方式也是属于进程与线程-线程实现方式-混合实现方式
  • 它是否真的具备让任务能够达到并行的能力?
    • 第二个问题,通过gmp模型的归纳可以知道,当操作系统核心数>1时,M是多个,是可以实现真正意义上的并行的

参考文献:

GO MAP原理

hmap结构

1
2
3
4
5
6
7
8
9
10
11
type hmap struct {
count int // map的元素数量
flags uint8 //状态标识,用于控制goroutine写入和扩容的状态
B uint8 // 用于计算buckets数量,计算公式2^B
noverflow uint16 // 溢出桶数量
hash0 uint32 // hash 种子
buckets unsafe.Pointer // 2^B Buckets count ==0时可能为nil(此处是万能指针类型,实际是对应下面的bmap)
oldbuckets unsafe.Pointer // 扩容后的旧bucket数组
nevacuate uintptr // 迁移计数器,此指针之前的所有桶已被迁移,即nevacuate指向桶数组已迁移桶的最高下标
extra *mapextra //溢出桶结构
}
1
2
3
type bmap struct {
tophash [bucketCnt]uint8 // bucketCnt == 8每个桶固定8个元素
}

初始化

make

通过make函数和hint指定元素数量来初始化map

1
hash := make(map[string]int, hint)

字面量

目前的现代编程语言基本都支持使用字面量的方式初始化哈希,一般都会使用 key: value 的语法来表示键值对,Go 语言中也不例外:

1
2
3
4
5
hash := map[string]int{
"1": 2,
"3": 4,
"5": 6,
}

当哈希表中的元素数量少于或者等于 25 个时,编译器会将字面量初始化的结构体转换成以下的代码,将所有的键值对一次加入到哈希表中:

1
2
3
4
hash := make(map[string]int, 3)
hash["1"] = 2
hash["3"] = 4
hash["5"] = 6

一旦哈希表中元素的数量超过了 25 个,编译器会创建两个数组分别存储键和值,这些键值对会通过如下所示的 for 循环加入哈希:

1
2
3
4
5
6
hash := make(map[string]int, 26)
vstatk := []string{"1", "2", "3", ... , "26"}
vstatv := []int{1, 2, 3, ... , 26}
for i := 0; i < len(vstak); i++ {
hash[vstatk[i]] = vstatv[i]
}

无论是字面量或是make方式map底层都是通过makemap函数来创建

makemap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func makemap(t *maptype, hint int, h *hmap) *hmap {
mem, overflow := math.MulUintptr(uintptr(hint), t.bucket.size)
if overflow || mem > maxAlloc {
hint = 0
}

// initialize Hmap
if h == nil {
h = new(hmap)
}
h.hash0 = fastrand()

// Find the size parameter B which will hold the requested # of elements.
// For hint < 0 overLoadFactor returns false since hint < bucketCnt.
B := uint8(0)
for overLoadFactor(hint, B) {
B++
}
h.B = B

// allocate initial hash table
// if B == 0, the buckets field is allocated lazily later (in mapassign)
// If hint is large zeroing this memory could take a while.
if h.B != 0 {
var nextOverflow *bmap
h.buckets, nextOverflow = makeBucketArray(t, h.B, nil)
if nextOverflow != nil {
h.extra = new(mapextra)
h.extra.nextOverflow = nextOverflow
}
}

return h
}

makemap 函数通过指定hint来通过B计算bucket数量
计算公式

1
hint ≤ 2^B * 6.5

通过B数量指定hmap会创建2^B个桶和一些溢出桶

1
2
3
4
5
6
7
8
9
10
11
if b >= 4 {
// Add on the estimated number of overflow buckets
// required to insert the median number of elements
// used with this value of b.
nbuckets += bucketShift(b - 4)
sz := t.bucket.size * nbuckets
up := roundupsize(sz)
if up != sz {
nbuckets = up / t.bucket.size
}
}
  • 当桶的数量小于 2^4 时,由于数据较少、使用溢出桶的可能性较低,会省略创建的过程以减少额外开销;
  • 当桶的数量多于 2^4 时,会额外创建 2^𝐵−4 个溢出桶;

扩容

为什么需要扩容,当溢出桶承装元素超过8个时,溢出桶的mapextra指向新的溢出桶,直到能够满足承装元素数目,此时,hash查找会退化成链表查找,时间复杂度为O(n)
扩容判断条件:

  • 当前未在扩容状态并且负载因子>=6.5
  • 有太多的溢出桶
    1
    2
    3
    4
    if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
    hashGrow(t, h)
    goto again // Growing the table invalidates everything, so try again
    }
    map扩容为渐进式扩容,只在map操作当前桶时才对当前桶进行扩容
    扩容步骤分为hashGrowgrowWork
  • hashGrow不做桶元素迁移,只是将当前桶指向oldbuckets,然后创建等量的buckets和溢出桶作为newbuckets指向hmap.buckets
    1
    2
    3
    4
    5
    6
    h.B += bigger #更新B
    h.flags = flags #更新扩容状态
    h.oldbuckets = oldbuckets
    h.buckets = newbuckets
    h.nevacuate = 0 #扩容计数器
    h.noverflow = 0 #溢出桶数量
  • growWork会在元素赋值是触发当前桶操作,然后对当前桶进行扩容,将将旧桶数据驱逐到新桶,操作步骤在growWork.evacuate

evacuate里的扩容方式分为等量扩容和非等量扩容

1
2
3
4
5
6
7
8
if !h.sameSizeGrow() {
// Only calculate y pointers if we're growing bigger.
// Otherwise GC can see bad pointers.
y := &xy[1]
y.b = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr(t.bucketsize)))
y.k = add(unsafe.Pointer(y.b), dataOffset)
y.e = add(y.k, bucketCnt*uintptr(t.keysize))
}

等量扩容

对hash后的key后B位计算桶号,然后将元素等量分配

例如:hash(a)的二进制为0101110001010110,扩容前B=2,桶号为10=2,扩容后桶号为110,桶号为110=6,将元素平均分配到这两个桶

非等量扩容

非等量扩容不扩容桶的数量,由于之前产生过很多溢出桶,但是溢出桶的元素很稀疏,所以只是将桶序号重新整理,清到多余的溢出桶,然后整理到新的buckets,

##查找和写入

查找

  • 查找主要分为mapaccess1mapaccess2两个函数,区别就是mapaccess2在找到目标值时会多返回一个true,在未找到时会返回false
  • 查找流程hmap里通过tophash找到对应buckets桶号,再到bmap里区寻找tophash高8位对应的key,如果不在当前桶就去溢出桶里找,如果溢出桶找不到说明元素不在map,返回false
  • 如果查找时正在扩容,会判断oldbucktes是否为nil,不为nil找旧桶,缩减m找到旧桶编号去找旧桶元素位置
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
     //hash&m找到桶编号作为偏移值找到对应的bmap结构体
    b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize)))
    if c := h.oldbuckets; c != nil { //存在旧桶,说明正在扩容状态中
    if !h.sameSizeGrow() { //判断是否翻倍扩容
    m >>= 1 //翻倍扩容时,新的桶数是旧的2倍,m需要减半才能找到旧桶编号
    }
    oldb := (*bmap)(add(c, (hash&m)*uintptr(t.bucketsize))) //找到对应的旧桶位置
    if !evacuated(oldb) {
    b = oldb //如果旧桶没有完成数据迁移,那么更新b指向旧桶bmap
    }
    }
    top := tophash(hash) //取hash值高八位,因为bmp.tophash中0-4是标志位,所以hash值小于5的自动加5
    bucketloop:
    //遍历当前bmp和溢出桶
    ...

写入

  • 函数首先会检查 map 的标志位 flags。如果 flags 的写标志位此时被置 1 了,说明有其他协程在执行“写”操作,进而导致程序 panic。这也说明了 map 对协程是不安全的。
  • 扩容是渐进式的,如果 map 处在扩容的过程中,那么当 key 定位到了某个 bucket 后,需要确保这个 bucket 对应的老 bucket 完成了迁移过程。即老 bucket 里的 key 都要迁移到新的 bucket 中来(分裂到 2 个新 bucket),才能在新的 bucket 中进行插入或者更新的操作。
    上面说的操作是在函数靠前的位置进行的,只有进行完了这个搬迁操作后,我们才能放心地在新 bucket 里定位 key 要安置的地址,再进行之后的操作。
  • 如果这个 bucket8key 都已经放置满了,那在跳出循环后,发现 insertiinsertk 都是空,这时候需要在 bucket 后面挂上 overflow bucket。当然,也有可能是在 overflow bucket 后面再挂上一个 overflow bucket。这就说明,太多 key hash 到了此 bucket
    在正式安置 key 之前,还要检查 map 的状态,看它是否需要进行扩容。如果满足扩容的条件,就主动触发一次扩容操作。

删除

  • 首先会检查 h.flags 标志,如果发现写标位是 1,直接 panic,因为这表明有其他协程同时在进行写操作。
  • 计算 key 的哈希,找到落入的 bucket。检查此 map 如果正在扩容的过程中,直接触发一次搬迁操作。
  • 删除操作同样是两层循环,核心还是找到 key 的具体位置。寻找过程都是类似的,在 bucket 中挨个 cell 寻找。
  • 找到对应位置后,对 key 或者 value 进行“清零”操作: 最后,将 count 值减 1,将对应位置的 tophash 值置成 Empty

SYNC MAP

map不支持并发读写,原因是:

  • map底层的hmapflags做了状态标志,并发读写会panic
  • 底层控制的本质是防止扩容时,读map操作读到旧桶,写map正在做扩容迁移,将旧桶数据迁移到新桶,从而造成数据读取不正确

解决map并发问题

加锁(mutex)

加锁会导致同一时刻只能一个协程就操作map,性能比较差

sync map

map结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type Map struct {
mu Mutex
read atomic.Value // readOnly
dirty map[interface{}]*entry
misses int
}

type readOnly struct {
m map[interface{}]*entry
amended bool
}

type entry struct {
p unsafe.Pointer // *interface{}
}

map struct

正常读写

正常读写操作read map,对map进行读取添加或修改操作

追加

比如追加的d=>D的kv:

先去read map查找有没有d,需要对dirty mapmutex锁,防止其他协程操作dirty map,然后在dirty map追加d,并将d.entry指向万能指针,由万能指针指向对应的值
同时将readmapamended赋值为true
append map

append map success

追加后的读

先去read map去看有没有该k,没有检查amended,如果为true则去查dirty map,并将misses++,当misses加到和dirty map kv数量相等时,提升dirty mapread map

sync map dirty提升流程:
remove read map
dirty map up

当再一次追加新元素时会重建dirty map
rebuild dirty map

追加后再删除
  • 正常删除d:
    正常删除主要操作read map,删除流程参考下图
    delete
  • 追加d后再次删除d:
    先去read map去看有没有该k,没有检查amended,如果为true加锁,去dirty map查找,找到后删除k,并将pointer指向nil
    append delete
    然后将dirty map提升至read map,amended改为false
    append delete dirty up
    最后下次追加时重建dirty map
    append delete rebuild dirty
    重建dirty map时,由于read map此时d指向nil,所以重建dirty map不会重建d
    之后操作read map,并将d标记为expunged,提醒后面操作read map的d时,不用改为nil,直接从map当前buckets删除

由于sync map只有在追加时才会操作dirty map,所以可理解追加、读写分离

总结

  • map在扩容时存在并发问题
  • sync map使用dirty mapread map解决扩容问题
  • 不存在扩容操作时直接读写read map
  • 存在扩容操作时操作dirty map