面向并发的内存模型
Goroutine和系统线程
每个系统级线程都会有一个固定大小的栈(一般默认可能是2MB),这个栈主要用来保存函数递归调用时参数和局部变量。固定了栈的大小导致了两个问题:一是对于很多只需要很小的栈空间的线程来说是一个巨大的浪费,二是对于少数需要巨大栈空间的线程来说又面临栈溢出的风险。针对这两个问题的解决方案是:要么降低固定的栈大小,提升空间的利用率;要么增大栈的大小以允许更深的函数递归调用,但这两者是没法同时兼得的。相反,一个Goroutine会以一个很小的栈启动(可能是2KB或4KB),当遇到深度递归导致当前栈空间不足时,Goroutine会根据需要动态地伸缩栈的大小(主流实现中栈的最大值可达到1GB)。因为启动的代价很小,所以我们可以轻易地启动成千上万个Goroutine。
Go的运行时还包含了其自己的调度器,这个调度器使用了一些技术手段,可以在n个操作系统线程上多工调度m个Goroutine。Go调度器的工作和内核的调度是相似的,但是这个调度器只关注单独的Go程序中的Goroutine。Goroutine采用的是半抢占式的协作调度,只有在当前Goroutine发生阻塞时才会导致调度;同时发生在用户态,调度器会根据具体函数只保存必要的寄存器,切换的代价要比系统 线程低得多。运行时有一个runtime.GOMAXPROCS变量,用于控制当前运行正常非阻塞Goroutine的系统线程数目。
在Go语言中启动一个Goroutine不仅和调用函数一样简单,而且Goroutine之间调度代价也很低,这些因素极大地促进了并发编程的流行和发展。
原子操作
原子操作就是并发编程中“最小的且不可并行化”的操作。通常,如果多个并发体对同一个共享资源进行的操作是原子的话,那么同一时刻最多只能有一个并发体对该资源进行操作。从线程角度看,在当前线程修改共享资源期间,其它的线程是不能访问该资源的。原子操作对于多线程并发编程模型来说,不会发生有别于单线程的意外情况,共享资源的完整性可以得到保证。
原子操作都是通过“互斥”访问来保证的,通常由特殊的CPU指令提供保护。当然,如果仅仅是想模拟下粗粒度的原子操作,我们可以借助于sync.Mutex来实现:
import (
"sync"
)
var total struct {
sync.Mutex
value int
}
func worker(wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i <= 100; i++ {
total.Lock()
total.value += i
total.Unlock()
}
}
func main() {
var wg sync.WaitGroup
wg.Add(2)
go worker(&wg)
go worker(&wg)
wg.Wait()
fmt.Println(total.value)
}
用互斥锁来保护一个数值型的共享资源,麻烦且效率低下。标准库的sync/atomic包对原子操作提供了丰富的支持。我们可以重新实现上面的例子
import (
"sync"
"sync/atomic"
)
var total uint64
func worker(wg *sync.WaitGroup) {
defer wg.Done()
var i uint64
for i = 0; i <= 100; i++ {
atomic.AddUint64(&total, i)
}
}
func main() {
var wg sync.WaitGroup
wg.Add(2)
go worker(&wg)
go worker(&wg)
wg.Wait()
}
atomic.AddUint64函数调用保证了total的读取、更新和保存是一个原子操作,因此在多线程中访问也是安全的。
原子操作配合互斥锁可以实现非常高效的单件模式。互斥锁的代价比普通整数的原子读写高很多,在性能敏感的地方可以增加一个数字型的标志位,通过原子检测标志位状态降低互斥锁的使用次数来提高性能。
type singleton struct {}
var (
instance *singleton
initialized uint32
mu sync.Mutex
)
func Instance() *singleton {
if atomic.LoadUint32(&initialized) == 1 {
return instance
}
mu.Lock()
defer mu.Unlock()
if instance == nil {
defer atomic.StoreUint32(&initialized, 1)
instance = &singleton{}
}
return instance
}
我们可以 将通用的代码提取出来,就成了标准库中sync.Once的实现:
type Once struct {
m Mutex
done uint32
}
func (o *Once) Do(f func()) {
if atomic.LoadUint32(&o.done) == 1 {
return
}
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 {
defer atomic.StoreUint32(&o.done, 1)
f()
}
}
基于sync.Once重新实现单件模式:
var (
instance *singleton
once sync.Once
)
func Instance() *singleton {
once.Do(func() {
instance = &singleton{}
})
return instance
}
sync/atomic包对基本的数值类型及复杂对象的读写都提供了原子操作的支持。atomic.Value原子对象提供了Load和Store两个原子方法,分别用于加载和保存数据,返回值和参数都是interface类型,因此可以用于任意的自定义复杂类型。
var config atomic.Value // 保存当前配置信息
// 初始化配置信息
config.Store(loadConfig())
// 启动一个后台线程, 加载更新后的配置信息
go func() {
for {
time.Sleep(time.Second)
config.Store(loadConfig())
}
}()
// 用于处理请求的工作者线程始终采用最新的配置信息
for i := 0; i < 10; i++ {
go func() {
for r := range requests() {
c := config.Load()
// ...
}
}()
}
顺序一致性内存模型
如果只是想简单地在线程之间进行数据同步的话,原子操作已经为编程人员提供了一些同步保障。不过这种保障有一个前提:顺序一致性的内存模型。要了解顺序一致性,我们先看看一个简单的例子:
var a string
var done bool
func setup() {
a = "hello, world"
done = true
}
func main() {
go setup()
for !done {}
print(a)
}
我们创建了setup线程,用于对字符串a的初始化工作,初始化完成之后设置done标志为true。main函数所在的主线程中,通过for !done 检测done变为true时,认为字符串初始化工作完成,然后进行字符串的打印工作。
但是Go语言并不保证在main函数中观测到的对done的写入操作发 生在对字符串a的写入的操作之后,因此程序很可能打印一个空字符串。更糟糕的是,因为两个线程之间没有同步事件,setup线程对done的写入操作甚至无法被main线程看到,main函数有可能陷入死循环中。
在Go语言中,同一个Goroutine线程内部,顺序一致性内存模型是得到保证的。但是不同的Goroutine之间,并不满足顺序一致性内存模型,需要通过明确定义的同步事件来作为同步的参考。如果两个事件不可排序,那么就说这两个事件是并发的。为了最大化并行,Go语言的编译器和处理器在不影响上述规定的前提下可能会对执行语句重新排序(CPU也会对一些指令进行乱序执行)
因此,如果在一个Goroutine中顺序执行a = 1; b = 2;两个语句,虽然在当前的Goroutine中可以认为a = 1;语句先于b = 2;语句执行,但是在另一个Goroutine中b = 2;语句可能会先于a = 1;语句执行,甚至在另一个Goroutine中无法看到它们的变化(可能始终在寄存器中)。也就是说在另一个Goroutine看来, a = 1; b = 2;两个语句的执行顺序是不确定的。如果一个并发程序无法确定事件的顺序关系,那么程序的运行结果往往会有不确定的结果。比如下面这个程序:
func main() {
go println("你好, 世界")
}
根据Go语言规范,main函数退出时程序结束,不会等待任何后台线程。因为Goroutine的执行和main函数的返回事件是并发的,谁都有可能先发生,所以什么时候打印,能否打印都是未知的。
用前面的原子操作并不能解决问题,因为我们无法确定两个原子操作之间的顺序。解决问题的办法就是通过同步原语来给两个事件明确排序:
func main() {
done := make(chan int)
go func(){
println("你好, 世界")
done <- 1
}()
<-done
}
当<- done
执行时,必然要求done <- 1
也已经执行。根据同一个Gorouine依然满足顺序一致性规则,我们可以判断当done <- 1
执行时,println("你好, 世界")
语句必然已经执行完成了。因此,现在的程序确保可以正常打印结果。
当然,通过sync.Mutex互斥量也是可以实现同步的:
func main() {
var mu sync.Mutex
mu.Lock()
go func(){
println("你好, 世界")
mu.Unlock()
}()
mu.Lock()
}
可以确定后台线程的mu.Unlock()必然在println("你好, 世界")完成后发生(同一个线程满足顺序一致性),main函数的第二个mu.Lock()必然在后台线程的mu.Unlock()之后发生(sync.Mutex保证),此时后台线程的打印工作已经顺利完成了。
初始化顺序
Go程序的初始化和执行总是从main.main函数开始的。但是如果main包里导入了其它的包,则会按照顺序将它们包含进main包里(这里的导入顺序依赖具体实现,一般可能是以文件名或包路径名的字符串顺序导入)。如果某个包被多次导入的话,在执行的时候只会导入一次。当一个包被导入时,如果它还导入了其它的包,则先将其它的包包含进来,然后创建和初始化这个包的常量和变量。然后就是调用包里的init函数,如果一个包有多个init函数的话,实现可能是以文件名的顺序调用,同一个文件内的多个init则是以出现的顺序依次调用(init不是普通函数,可以定义有多个,所以不能被其它函数调用)。最终,在main包的所有包常量、包变量被创建和初始化,并且init函数被执行后,才会进入main.main函数,程序开始正常执行。
要注意的是,在main.main函数执行之前所有代码都运行在同一个Goroutine中,也是运行在程序的主系统线程中。如果某个init函数内部用go关键字启动了新的Goroutine的话,新的Goroutine和main.main函数是并发执行的。
因为所有的init函数和main函数都是在主线程完成,它们也是满足顺序一致性模型的。
Goroutine的创建
go语句会在当前Goroutine对应函数返回前创建新的Goroutine. 例如:
var a string
func f() {
print(a)
}
func hello() {
a = "hello, world"
go f()
}
执行go f()语句创建Goroutine和hello函数是在同一个Goroutine中执行, 根据语句的书写顺序可以确定Goroutine的创建发生在hello函数返回之前, 但是新创建Goroutine对应的f()的执行事件和hello函数返回的事件则是不可排序的,也就是并发的。调用hello可能会在将来的某一时刻打印"hello, world",也很可能是在hello函数执行完成后才打印。
基于Channel的通信
Channel通信是在Goroutine之间进行同步的主要方法。在无缓 存的Channel上的每一次发送操作都有与其对应的接收操作相配对,发送和接收操作通常发生在不同的Goroutine上(在同一个Goroutine上执行2个操作很容易导致死锁)。无缓存的Channel上的发送操作总在对应的接收操作完成前发生.
var done = make(chan bool)
var msg string
func aGoroutine() {
msg = "你好, 世界"
done <- true
}
func main() {
go aGoroutine()
<-done
println(msg)
}
可保证打印出“hello, world”。该程序首先对msg进行写入,然后在done管道上发送同步信号,随后从done接收对应的同步信号,最后执行println函数。
若在关闭Channel后继续从中接收数据,接收者就会收到该Channel返回的零值。因此在这个例子中,用close(c)
关闭管道代替done <- false
依然能保证该程序产生相同的行为。
var done = make(chan bool)
var msg string
func aGoroutine() {
msg = "你好, 世界"
close(done)
}
func main() {
go aGoroutine()
<-done
println(msg)
}
对于从无缓冲Channel进行的接收,发生在对该Channel进行的发送完成之前。
基于上面这个规则可知,交换两个Goroutine中的接收和发送操作也是可以的(但是很危险):
var done = make(chan bool)
var msg string
func aGoroutine() {
msg = "hello, world"
<-done
}
func main() {
go aGoroutine()
done <- true
println(msg)
}