2023年 Go 并发库的变化综述

2023年来, Go的并发库又有了一些变化,这篇文章是对这些变换的综述。小细节的变化,比如typo、文档变化等无关大局的变化就不介绍了。

sync.Map

Go 1.21.0 中增加了和Once相关的三个函数,便于Once的使用。

1
2
3
func OnceFunc(f func()) func()
func OnceValue[T any](f func() T) func() T
func OnceValues[T1, T2 any](f func() (T1, T2)) func() (T1, T2)

这三个函数的功能分别是:

  • OnceFunc:返回一个函数g,多次调用这个函数g,只会执行一次f。 如果f执行时panic, 则后续调用这个函数g不会再执行f,但是每次调用都会panic。
  • OnceValue:返回一个函数g,多次调用这个函数g,只会执行一次f,函数g返回值类型是T。比上一个g多了一个返回值。panic原理同上。
  • OnceValues:返回一个函数g,多次调用这个函数g,只会执行一次f,函数g返回值类型是(T1, T2)。比上一个g又多了一个返回值。panic原理同上。

当然理论上你还可以增加更多的函数,返回更多的返回值,因为Go没有Tuple类型,所以这里还不能简化函数g的返回值为Tuple类型。反正Go 1.21.0就只增加了这三个函数。

这个有什么好处呢?先前我们使用sync.Once的时候,比如初始化一个线程池,我们需要定义一个线程池的变量,每次访问线程池变量的时候,我需要调用一下sync.Once.Do:

1
2
3
4
5
6
7
8
9
10
11
12
13
func TestOnce(t *testing.T) {
var pool any
var once sync.Once
var initFn = func() {
// init pool
pool = 1
}
for i := 0; i < 10; i++ {
once.Do(initFn)
t.Log(pool)
}
}

如果使用OnceValue,就可以简化代码:

1
2
3
4
5
6
7
8
9
10
func TestOnceValue(t *testing.T) {
var initPool = func() any {
return 1
}
var poolGenerator = sync.OnceValue(initPool)
for i := 0; i < 10; i++ {
t.Log(poolGenerator())
}
}

代码略微简化,获取单例的时候只需调用返回的函数g即可。

所以基本上,这三个函数只是对sync.Once做了封装,更方便使用。

理解copyChecker

我们知道, sync.Cond有两个字段noCopychecker, noCopy通过go vet工具能够静态编译时检查出来,但是checker是在运行时检查的:

1
2
3
4
5
6
7
8
9
type Cond struct {
noCopy noCopy
// L is held while observing or changing the condition
L Locker
notify notifyList
checker copyChecker
}

先前copyChecker的判断条件如下,虽然简单的三行,但是不容易理解:

1
2
3
4
5
6
7
func (c *copyChecker) check() {
if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
uintptr(*c) != uintptr(unsafe.Pointer(c)) {
panic("sync.Cond is copied")
}
}

现在加上了注释,解释了这三行的意义:

1
2
3
4
5
6
7
8
9
10
11
func (c *copyChecker) check() {
// Check if c has been copied in three steps:
// 1. The first comparison is the fast-path. If c has been initialized and not copied, this will return immediately. Otherwise, c is either not initialized, or has been copied.
// 2. Ensure c is initialized. If the CAS succeeds, we're done. If it fails, c was either initialized concurrently and we simply lost the race, or c has been copied.
// 3. Do step 1 again. Now that c is definitely initialized, if this fails, c was copied.
if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
uintptr(*c) != uintptr(unsafe.Pointer(c)) {
panic("sync.Cond is copied")
}
}

主要逻辑在以下3步:

  • 第一步是一个快速检查,直接比较 c 指针和 c 本身的指针,如果不相等则表示已被复制。这是最快的检查路径。
  • 第二步确保 c 已经被初始化。使用 CAS (CompareAndSwap)来初始化。如果CAS失败,说明c 已经在其他goroutine初始化,或者被复制了。
  • 第三步再次执行第一步的检查。因为这时我们清楚的知道 c 已经初始化了,所以如果检查失败,就可以确认 c 被复制了。

整个逻辑就是使用 CAS 配合两次指针检查,来确保判断的正确性。

总的来说,第一步快速检查是性能优化。第二步使用 CAS 确保初始化。第三步再次检查来确保判断。

sync.Map 的一处优化

先前, sync.MapRange 函数的实现如下:

1
2
3
4
5
6
7
8
9
10
func (m *Map) Range(f func(key, value any) bool) {
...
if read.amended {
read = readOnly{m: m.dirty}
m.read.Store(&read)
m.dirty = nil
m.misses = 0
}
...
}

其中有一段代码:m.read.Store(&read),会导致read逃逸到堆上,通过下面的一个小技巧,避免了read的逃逸(通过一个新的变量):

1
2
3
4
5
6
7
8
9
10
11
func (m *Map) Range(f func(key, value any) bool) {
...
if read.amended {
read = readOnly{m: m.dirty}
copyRead := read
m.read.Store(©Read)
m.dirty = nil
m.misses = 0
}
...
}

issue #62404对这个问题进行了分析。

sync.Once的实现中done使用atomic.Uint32替换

先前sync.Once的实现如下:

1
2
3
4
type Once struct {
done uint32
m Mutex
}

其中字段done是一个uint32类型,用来表示Once是否已经执行过了。这个字段的类型是uint32,而不是bool,是因为uint32类型可以使用atomic包的原子操作,而bool类型不能。

现在sync.Once的实现如下:

1
2
3
4
type Once struct {
done atomic.Uint32
m Mutex
}

自从go 1.19提供了对基本类型的原子封装,Go标准库大量代码都被atomic.XXX类型锁替换。

我个人认为,目前这个修改相对于先前的实现,性能上在某些情况下可能会有性能的下降,我会专门写一篇文章进行探讨。

除了sync.Once,还有一批类型使用了atomic.XXX类型替换原来的使用方法,有必要可以进行替换么?

sync.OnceFunc 初始实现的优化

初始的sync.OnceFunc的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func OnceFunc(f func()) func() {
var (
once Once
valid bool
p any
)
g := func() {
defer func() {
p = recover()
if !valid {
panic(p)
}
}()
f()
valid = true
}
return func() {
once.Do(g)
if !valid {
panic(p)
}
}
}

仔细看这段代码,你会发现,传递给OnceFunc/OnceValue/OnceValues的函数f,即使执行完一次,只要返回的g函数好活着没有被垃圾回收,这个f就一直存活。
这是没必要的,因为f只需要执行一次,执行完就可以被垃圾回收了。所以,这里可以对f进行一次优化,让f执行完就设置为nil,这样就可以被垃圾回收了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func OnceFunc(f func()) func() {
var (
once Once
valid bool
p any
)
// Construct the inner closure just once to reduce costs on the fast path.
g := func() {
defer func() {
p = recover()
if !valid {
// Re-panic immediately so on the first call the user gets a
// complete stack trace into f.
panic(p)
}
}()
f()
f = nil // Do not keep f alive after invoking it.
valid = true // Set only if f does not panic.
}
return func() {
once.Do(g)
if !valid {
panic(p)
}
}
}

context

我们知道,在 Go 1.20中, 新增加了一个WithCancelCause方法(func WithCancelCause(parent Context) (ctx Context, cancel CancelCauseFunc)),我们在cancel的时候可以把cancel的原因传递给WithCancelCause产生的Context,这样可以通过context.Cause方法获取到cancel的原因。

1
2
3
4
ctx, cancel := context.WithCancelCause(parent)
cancel(myError)
ctx.Err() // 返回 context.Canceled
context.Cause(ctx) // 返回 myError

当然这个实现只进行了一半,因为超时相关的Context也需要增加这个功能,所以在Go 1.21.0中又新增了两个相关的函数:

1
2
func WithDeadlineCause(parent Context, d time.Time, cause error) (Context, CancelFunc)
func WithTimeoutCause(parent Context, timeout time.Duration, cause error) (Context, CancelFunc)

这两个和WithCancelCause还不太一样,不是利用返回的cancel函数传递原因,而是直接在函数参数中传递原因。

Go 1.21.0 还增加了一个AfterFunc函数,这个函数和time.AfterFunc类似,但是返回的是一个Context,这个Context在超时后会自动取消,这个函数的实现如下:

1
func AfterFunc(ctx Context, f func()) (stop func() bool)

指定的Context在在done(超时或者取消),如果context已经done,那么f立即被调用。
返回的stop函数用来停止f的调用,如果stop被调用并且返回true,f不会被调用。

这是一个辅助函数,但是难以理解,估计这个函数不会被广泛的使用。

其他一些小性能的优化比如type emptyCtx int替换成type emptyCtx struct{}等等就不用提了。

增加了一个func WithoutCancel(parent Context) Context, 当parent被取消时,不会波及到这个函数返回的Context。

Coroutines for Go

在今年7月,Russ Coxx写了一篇巨论: Coroutines for Go

个人不看好在Go标准库实现这个东西,我感觉Rob Pike也不会同意,但是这个东西社区如果去实现一个库,我觉得还是有可能的,返回如果大家不看好,社区的库自然会消亡。

否则,渐渐的Go迷失了它的初心: 简单好用。

社区的一些协程库:

你在go.dev还能搜到一些,这里就不赘述了。

golang.org/x/sync 没有明显改动

errgroup支持使用withCancelCause设置cause。
singleflight的panicError增加Unwrap方法。