深入Go语言 - 8

goroutine

目录 [−]

  1. go 语句
  2. 深入go语句
  3. goroutine是什么
  4. goroutine的调度

本章介绍 go语句、goroutine调度。

go 语句

go语句用来产生一个新的goroutine,并执行一个函数,它的使用非常简单,就是在函数调用或者方法调用的前面加上go关键字即可。

函数可以是已有函数、匿名函数、方法等,注意匿名方法(方法字面量)不要忘记调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func foo(i int) int {
return i * i
}
……
go foo(10)
go func() {
}()
go os.Open("./test.txt")
buf := bytes.NewBufferString("hello world")
go buf.ReadString(0)

深入go语句

看下面一段代码,你觉得会输出什么:

1
2
3
4
5
6
7
8
9
10
11
12
package main
import "fmt"
func main() {
for i := 0; i < 3; i++ {
go func() {
fmt.Println(i)
}()
}
}

有的人说输出"0 1 2",有的人说输出"3 3 3"。

但实际上什么都没有输出。这是因为main goroutine马上就执行完了,它不会等待生成的goroutine的执行。

Program execution begins by initializing the main package and then invoking the function main. When that function invocation returns, the program exits. It does not wait for other (non-main) goroutines to complete.

你可以增加下面一行,等待所有的goroutine执行完:

1
select {}

因为select语句会被阻塞,所以前面生成的所有的goroutine会被执行。

你可能会发现程序最后会出下面一个错误信息:

1
fatal error: all goroutines are asleep - deadlock!

它的意思是所有的goroutine都已经执行完了,你的select还在那里阻塞着,不会有case等你执行的,所以有死锁的可能。Go强制杀死了这个等待,并抛出了一个错误。因此你可以忽略这个错误,它对我们前面的程序执行没有影响。

如果你不想看到这个错误,你可以使用sync.WaitGroup,或者像其它语言中的处理方法一样,从命令行读取一个值造成main goroutine阻塞,抑或加一行time.Sleep让main goroutine休眠较长的一个时间也可以:

1
os.Stdin.Read(make([]byte, 1))

那么,加上上面一行,会输出什么?

答案是 "3 3 3",为什么呢?

这是因为对于closure的情况(闭包closure的概念在很多语言中都有使用。在Go中,可以简单的认为匿名函数保持对外部变量的引用),for循环的每次迭代都会使用相同的变量i,这样每个goroutine都持有对相同的变量的引用,因为main gororutine 很快就执行了, 三个goroutine还没来得及执行,等它们执行的时候,i已经等于 2了,所以它们都打印出2来。

我们可以稍微修改一下,让main goroutine不要执行那么快,每次迭代暂停1秒:

1
2
3
4
5
6
7
for i := 0; i < 3; i++ {
go func() {
fmt.Println(i)
}()
time.Sleep(1 * time.Second)
}

这段代码输出的结果为"0 1 2"。因为在main goroutine 暂停的时候, 生成的go routine有机会执行。

但是我们无法精确控制goroutine的执行,如果期望输出结果总是使用当前的迭代的值,可以改造成下面的样子:

1
2
3
4
5
for i := 0; i < 3; i++ {
go func(v int) {
fmt.Println(v)
}(i)
}

输出结果为"2 0 1" (goroutine的执行顺序有可能不同,但是如果你看到最后一节的分析,这个执行顺序也能讲得通,最后一个输出2的goroutine作为runnext优先级最高,输出1的goroutine本来在runnext的位置,不幸被挤掉了,放在了本地队列的队尾)。

如果你不想对匿名函数进行改造的话,也可以像下面的这样,生成一个局部变量:

1
2
3
4
5
6
for i := 0; i < 3; i++ {
i := i
go func() {
fmt.Println(i)
}()
}

输出 "2 0 1", 注意我们使用一个同名的局部变量shadow了迭代的变量i。

参考:

goroutine是什么

goroutine是Go语言独有的概念。

并发和多线程编程总是被认为很困难,多少是由于它们的实现,对于线程和并发访问的控制很复杂。 Go语言并发的基础是goroutine和channel。
这些概念来源于著名计算机科学家C.A.R.Hoare的Communication Sequential Process (简称CSP)。
在该语言中,一个并发系统由若干并行运行的顺序进程组成,每个进程不能对其他进程的变量赋值。进程之间只能通过 一对通信原语实现协作:Q->x表示从进程Q输入一个值到变量x中;P<-e表示把表达式e的值发送给进程P。当P进程执行Q->x, 同时Q进程执行P<-e时,发生通信,e的值从Q进程传送给P进程的变量x。
Occam和Erlang基于CSP的理论实现的并发模型。

Go也借鉴了CSP的理论,但又有所不同,最大的不同是Go显示地使用channel, channel在Go中是第一类的对象,goroutine通信完全通过通过channel实现的。
CSP模型中消息的分发是即时和同步的,Go的Channel则不同,消息会缓存在Channel中。

我看到的一个有趣的项目是使用Go语言实现Hoare论文中的例子,有兴趣的朋友可以仔细观看,csp

幸运地是,这些实现的细节对于Go语言的学习和应用来说不是必须的,对于语言的设计者来说,倒是值得比较和研究和出论文。

但是,对于开发者来说,至少应该明白goroutine和线程的不同,为什么一个Go应用可以存在成千上万个goroutine为线程确不行。

goroutine vs thread
对于线程来讲,Java的线程是最有名了。我们从三个方面进行比较:
1、内存占用
goroutine并不需要太多太多的内存占用,初始只需2kB的栈空间即可(自Go 1.4起),按照需要可以增长。

线程初始1MB,并且会分配一个防护页(guard page)。

在使用Java开发服务器的过程中经常会遇到request per thread的问题,如果为每个请求都分配一个线程的话,大并发的情况下服务器很快就死掉,因为内存不够了,所以很多Java框架比如Netty都会使用线程池来处理请求,而不会让线程任意增长。

而使用goroutine则没有这个问题,你页可以看到官方的net/http库就是使用request per goroutine这种模式进行处理的,内存占用不会是问题。

2、对象的创建和销毁
线程的创建和销毁肯定有花费,因为需要从OS中请求/返还资源。

而goroutine的创建和销毁花费很少,因为它是用户态的操作。并且Go语言也不提供goroutine的手工管理。

3、切换时间
当线程阻塞时,其它的线程进可能被执行,这叫做线程的切换。切换的时候,调度器需要保存当前阻塞的线程的状态,恢复要执行的线程状态,包括所有的寄存器,16个通用寄存器、程序计数器、栈指针、段寄存器、16个XMM寄存器、FP协处理器、16个 AVX寄存器、所有的MSR等等。

goroutine的保存和恢复只需要三个寄存器:程序计数器、栈指针和DX寄存器。因为goroutine之间共享堆空间,不共享栈空间,所以只需把goroutine的栈指针和程序执行到那里的信息保存和恢复即可,花费很低。

通过上面三个方面的分析,可以看到goroutine比线程有更多的优势。实际上Go使用少量线程来执行这些goroutine,通过GOMAXPROCS环境变量可以控制有多少线程可以并发执行用户态的代码。由于系统调用而被阻塞的线程不受这个变量的限制。以前版本的Go中这个变量为1,自Go 1.5后它的默认值为CPU的核数。

进程拥有自己独立的堆和栈,既不共享堆,亦不共享栈,进程由操作系统调度。
线程拥有自己独立的栈和共享的堆,共享堆,不共享栈,线程亦由操作系统调度(标准线程是的)。
协程和线程一样共享堆,不共享栈,协程由程序员在协程的代码里显示调度。

goroutine vs coroutine
两个类似,都是共享堆,不共享栈,切换的时候需要保存和恢复栈信息。

但是coroutine(协程)需要显示地控制coroutine的转换,程序员需要在切换的地方调用yield让度当前的coroutine的执行,这样其它coroutine才有可能在这个线程中执行,等暂停的coroutine恢复执行的时候,它会接着上次暂停的地方继续执行,而不像普通的函数从头开始执行。 看一段lua的coroutine代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
function foo (a)
print("foo", a)
return coroutine.yield(2*a)
end
co = coroutine.create(function (a,b)
print("co-body", a, b)
local r = foo(a+1)
print("co-body", r)
local r, s = coroutine.yield(a+b, a-b)
print("co-body", r, s)
return b, "end"
end)
print("main", coroutine.resume(co, 1, 10))
print("main", coroutine.resume(co, "r"))
print("main", coroutine.resume(co, "x", "y"))
print("main", coroutine.resume(co, "x", "y"))

输出:

1
2
3
4
5
6
7
8
9
co-body 1 10
foo 2
main true 4
co-body r
main true 11 -9
co-body x y
main true 10 end
main false cannot resume dead coroutine

可以看到coroutine切换都是通过代码中的yield触发的。

goroutine也是由一组线程执行,也会暂停,也会继续执行,但是这个控制不是程序员实现安排好的,它是由go运行时后台控制的。goroutine的调度不能手工的执行,这是和coroutine最大的区别。当goroutine阻塞的时候,就有可能让度出线程以便其它goroutine执行,以下几种情况goroutine可能暂停自己的运行:

  • 调用runtime.Gosched()将当前goroutine放入到全局队列
  • 调用runtime.Goexit,终止G任务
  • 网络读取
  • sleep
  • channel操作
  • 调用sync包中的对象进行阻塞
  • 其它gouroutine被阻塞的情况,比如io读取,空无限循环,长时间占用线程执行的goroutine

利用goroutine和channel也可以实现cororutine,比如下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
func f(yield chan string) {
yield <- "one"
yield <- "two"
yield <- "three"
}
func main() {
co := make(chan string)
go f(co)
log.Println(<-co) // one
log.Println(<-co) // two
log.Println(<-co) // three
}

参考

goroutine的调度

goroutine调度(Scheduling)的文章网上非常多了,而且分析的都很深入。本文重点的介绍其中的一些细节。

goroutine调度器有三个重要的数据结构,都是以单字母命名: G、P、M,因为Golang以及实现了自举,所以绝大部分的代码都是由Go本身实现的,少部分的以汇编实现,因为你已经由Go的基础知识了,所以你可以查看这些实现的代码不会感到特别困难。

  • M代表系统线程(Machine),由操作系统管理。
  • G代表goroutine,包括栈/指令指针以及其它对调度goroutine有用的信息。
  • P代表处理器(processor),注意不是CPU处理器,而是调度处理器,包含调度的上下文。

这三个个对象的数据结构定义在Go源代码的src/runtime/runtime2.go中定义,另外还包括一个很重要的数据结构schedt。

P必须和M组合起来执行G,但是两者也并不是完全1:1对应,通常情况下P的数量固定和CPU的核数一样(GOMAXPROCS参数),M则是按需创建,比如当M因为陷入系统调用而长时间阻塞的时候,P就会被监控线程抢回,去新建或者唤醒另一个M去执行,因此M的数量会增加,系统中可能存在一些阻塞的M。

当一个G被创建的时候,它可能被放入到一个P的本地队列或者全局队列中:

由于goroutine的执行的时间不会一样,goroutine不可能均匀地分布在所有的P的本地队列中,如果其中的一个P执行地很快,它的队列中没有其它的gouroutine需要执行了,它就会从全局队列中拿一批goroutine过来。

如果全局队列中也没有要执行的goroutine,那么这个P可能要从其它的P中“偷”一些goroutine过来。

这样设计的目的就是不要让一部分P忙的要死,另外一部分P确很清闲,这是一个balance的过程。

编译器会将"go func(……){}(……)"翻译成"newproc"调用,这个方法在runtime/proc.go中定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Create a new g running fn with siz bytes of arguments.
// Put it on the queue of g's waiting to run.
// The compiler turns a go statement into a call to this.
// Cannot split the stack because it assumes that the arguments
// are available sequentially after &fn; they would not be
// copied if a stack split occurred.
//go:nosplit
func newproc(siz int32, fn *funcval) {
argp := add(unsafe.Pointer(&fn), sys.PtrSize)
pc := getcallerpc(unsafe.Pointer(&siz))
systemstack(func() {
newproc1(fn, (*uint8)(argp), siz, 0, pc)
})
}

它的创建G的主要逻辑在newproc1中实现,并调用runqput将创建的G放入到队列中。注意G是可以重用的,如果有重用的G,则选择一个,否则创建一个新的,而且它也有本地复用链表和全局复用链表。

runqput首先尝试将G放入到P本地队列的本地队列中,而且在不设置"-race"的情况下,可能会尝试将这个G放在p.runnext中,作为下一个优先处理的G,而原先的runnext放回队尾。如果本地队列已满,则放入到全局队列中,而且还会将本地队列的一部分放入到全局队列中。

任务队列的优先级分三种:P.runnext、P.runq和全局的Schedt.runq。

schedule方法用来实现goroutine的调用,你可以在proc.go文件中搜索对它的调用。

如果你浏览schedule()方法的实现,可以看到每隔一定时间,会先尝试从全局队列中获取g去执行,这样就避免全局队列中的g没机会执行。

然后尝试本地队列中获取g, 依照优先级选择g,先是P.runnext,然后从队列的头部依次获取。

如果本地队列没有g,则调用findrunnable方法从其它地方获取,这是一个block方法,直到有g获取到。
findrunnable首先从本地队列获取(runqget方法),然后从全局队列获取(globrunqget),然后检查netpoll的goroutine,
如果还没有,随机选择一个P,偷一些任务过来(runqsteal方法,如果“饿”的厉害,连别人的runnext都偷过来)。
具体的获取过程你可以查看每个选择的方法。

你可以通过schedtrace调试参数查看Go调度的细节:

1
GOMAXPROCS=1 GODEBUG=schedtrace=1000 ./example

详细的文章可以查看我翻译的William Kennedy的Scheduler Tracing In Go

参考