Go编程基础-9. 使用共享变量实现并发

48

使用共享变量实现并发

1. 竞态

当一个程序有两个或多个 goroutine 时,每个 goroutine 内部的各个步骤虽然是顺序执行的,但我们无法确定一个 goroutine 中的事件 x 和另一个 goroutine 中的事件 y 的先后顺序。如果我们不能自信地说一个事件肯定早于另一个事件,那么这两个事件就是并发的。

考虑一个在串行程序中能够正确工作的函数。如果这个函数在并发调用时仍然能正确工作,那么这个函数是并发安全(concurrency-safe)的。这里的并发调用是指在没有额外同步机制的情况下,从两个或多个 goroutine 同时调用这个函数。这个概念也可以推广到其他函数,比如方法或者作用于特定类型的一些操作。如果一个类型的所有可访问方法和操作都是并发安全的,那么这个类型可以被称为并发安全的类型。

让一个程序并发安全并不需要其中的每一个具体类型都是并发安全的。实际上,并发安全的类型是特例而不是普遍存在的。因此,只有在文档明确指出类型是安全的情况下,才可以并发地访问一个变量。对于绝大部分变量,为避免并发访问,要么限制变量只存在于一个 goroutine 内,要么维护一个更高层的互斥不变量。

函数在并发调用时不工作的原因有很多,包括死锁、活锁(livelock)及资源耗尽。由于时间有限,我们无法讨论所有的情形,因此接下来会重点讨论最重要的一种情形,即竞态。竞态是指在多个 goroutine 按某些交错顺序执行时程序无法给出正确的结果。竞态对程序是致命的,因为它们可能潜伏在程序中,出现频率也很低,有可能仅在高负载环境或者在使用特定的编译器、平台和架构时才出现。这些都让竞态问题变得难以再现和分析。

示例1:

var balance int

func Deposit(amount int) {
    balance = balance + amount
}

func Balance() int {
    return balance
}

// Alice:
go func() {
    bank.Deposit(200) // A1
    fmt.Println("=", bank.Balance()) // A2
}()

// Bob:
go bank.Deposit(100) // B

表面上看起来很简单,但 balance = balance + amount 事实上分为两个操作:balance + amountbalance = result。因此,在并发时可能会出现下述特殊执行顺序,导致 Bob 的执行结果被吞。

数据竞态
A1: 0     ... = balance + amount
B: 100    balance = 0 + 100
A1: 200   balance = ... (200)
A2: "= 200"

示例2:

当发生数据竞态的变量类型是大于一个机器字长的类型(比如接口、字符串或 slice)时,事情就更加复杂了。下面的代码并发地把 x 更新为两个不同长度的 slice。

var x []int

go func() {
    x = make([]int, 10)
}()

go func() {
    x = make([]int, 1000000)
}()

x[999999] = 1 // 注意:未定义行为,可能造成内存异常

在最后一个表达式中,x 的值是未定义的,它可能是 nil、一个长度为 10 的 slice,或者一个长度为 1,000,000 的 slice。回想一下 slice 的三个部分:指针、长度和容量。如果指针来自第一个 make 调用而长度来自第二个 make 调用,那么 x 会变成一个嵌合体,名义上长度为 1,000,000,但底层的数组只有 10 个元素。在这种情况下,尝试存储到第 999,999 个元素会伤及很远的一段内存,其后果无法预测,问题也很难调试和定位。这种语义上的雷区称为未定义行为,C 程序员对此应当很熟悉。

竞态解决方案:

数据竞态发生于两个 goroutine 并发读写同一个变量并且至少其中一个是写入时. 从定义中不难看出,至少有三种方法可以避免数据竞态:

  1. 避免在并发程序中修改变量: 在程序开始前统一对数据进行初始化,goroutine只进行读操作。

考虑如下的 map,它进行了延迟初始化,对于每个键,在第一次访问时才触发加载。如果 icon 的调用是串行的,那么程序能正常工作;但如果 icon 的调用是并发的,在访问 map 时就可能存在数据竞态问题。

var icons = make(map[string]image.Image)

func loadIcon(name string) image.Image {
    // 假设这是一个加载图标的函数
}

// 注意:并发不安全
func Icon(name string) image.Image {
    icon, ok := icons[name]
    if !ok {
        icon = loadIcon(name)
        icons[name] = icon
    }
    return icon
}

如果在创建其他 goroutine 之前就用完整的数据来初始化 map,并且不再修改它,那么无论有多少 goroutine,也可以安全地并发调用 icon,因为每个 goroutine 都只读取这个 map

var icons = map[string]image.Image{
    "spades.png": loadIcon("spades.png"),
    "hearts.png": loadIcon("hearts.png"),
    "diamonds.png": loadIcon("diamonds.png"),
    "clubs.png": loadIcon("clubs.png"),
}

// 并发安全
func Icon(name string) image.Image {
    return icons[name]
}

在上面的例子中,icons 变量的赋值发生在包初始化时,也就是在程序的 main 函数开始运行之前。一旦初始化完成后,icons 就不再修改。那些从不修改的数据结构以及不可变数据结构本质上是并发安全的,也不需要做任何同步。但显然,我们不能把这个方法用在必然会有更新的场景,比如一个银行账号。

  1. 避免从多个goroutine访问同一个变量:所有需要进行的数据修改操作,通过消息队列/通道等方式,提交给统一的写线程进行执行,1写n读 (可能会存在更新的,需要注意)。

例如,在并发的 Web 爬虫(见 8.6 节)中,主 goroutine 是唯一一个能访问 seen map 的 goroutine;在聊天服务器(见 8.10 节)中,broadcaster goroutine 是唯一一个能访问 clients map 的 goroutine。这些变量都被限制在单个 goroutine 内部。

由于其他 goroutine 无法直接访问这些变量,因此它们必须使用通道来向受限 goroutine 发送查询请求或更新变量。这正是 Go 的箴言的含义:“不要通过共享内存来通信,而应该通过通信来共享内存”。使用通道请求来代理受限变量的所有访问的 goroutine 被称为该变量的监控 goroutine。例如,broadcaster goroutine 监控了对 clients map 的访问。

下面是重写的银行案例,用一个叫 teller 的监控 goroutine 限制 balance 变量:

package bank

var deposits = make(chan int)  // 发送存款额
var balances = make(chan int)  // 接收余额

func Deposit(amount int) { deposits <- amount }
func Balance() int { return <-balances }

func teller() {
    var balance int  // balance 被限制在 teller goroutine 中
    for {
        select {
        case amount := <-deposits:
            balance += amount
        case balances <- balance:
        }
    }
}

func init() {
    go teller()  // 启动监控 goroutine
}

即使一个变量无法在整个生命周期内受限于单个 goroutine,限制它仍然可以是解决并发访问的好方法。例如,一个常见的场景是通过借助通道将共享变量的地址从上一步传到下一步,从而在流水线上的多个 goroutine 之间共享该变量。在流水线中的每一步,在将变量地址传给下一步后,就不再访问该变量了,这样所有对这个变量的访问都是串行的。换句话说,这个变量先受限于流水线的一步,再受限于下一步,以此类推。这种限制有时也称为串行限制。

在下面的例子中,Cakes 是串行限制的,首先受限于 baker goroutine,然后受限于 icer goroutine:

type Cake struct{ state string }

func baker(cooked chan<- *Cake) {
    for {
        cake := new(Cake)
        cake.state = "cooked"
        cooked <- cake  // baker 不再访问 cake 变量
    }
}

func icer(iced chan<- *Cake, cooked <-chan *Cake) {
    for cake := range cooked {
        cake.state = "iced"
        iced <- cake  // icer 不再访问 cake 变量
    }
}
  1. 允许多个 goroutine 访问同一个变量,但在同一时间只有
    一个 goroutine 可以访问。这种方法称为互斥机制

2. 互斥锁: sync.Mutex

在之前的代码中,我们曾使用一个缓冲通道作为令牌池/计数信号量,用来限制并发的 goroutine 数量。使用同样的理念,也可以用一个容量为 1 的通道来保证同一时间最多有一个 goroutine 能访问共享变量。一个计数上限为 1 的信号量称为二进制信号量(binary semaphore)。

var (
	sema    = make(chan struct{}, 1) // 用来保护 balance 的二进制信号量
	balance int
)

func Deposit(amount int) {
	sema <- struct{}{} // 获取令牌
	balance = balance + amount
	<-sema // 释放令牌
}

func Balance() int {
	sema <- struct{}{} // 获取令牌
	b := balance
	<-sema // 释放令牌
	return b
}

互斥锁模式应用非常广泛,所以 sync 包有一个单独的 Mutex 类型来支持这种模式。它的 Lock 方法用于获取令牌(token,此过程也称为上锁),Unlock 方法用于释放令牌:

import "sync"

var (
	mu      sync.Mutex
	balance int
)

func Deposit(amount int) {
	mu.Lock()
	defer mu.Unlock() // 注意:尽量使用 defer 进行释放。
	balance = balance + amount
}

func Balance() int {
	mu.Lock()
	defer mu.Unlock()
	return balance
}

一个 goroutine 在每次访问银行的变量(此处仅有 balance)之前,都必须先调用互斥量的 Lock 方法来获取一个互斥锁。如果其他 goroutine 已经取走了互斥锁,那么操作会一直阻塞到其他 goroutine 调用 Unlock 之后(此时互斥锁再度可用)。互斥量保护共享变量。按照惯例(规范),被互斥量保护的变量声明应当紧接在互斥量的声明之后。如果实际情况不是如此,请确认已加了注释来说明此事。

LockUnlock 之间的代码,可以自由地读取和修改共享变量,这一部分称为临界区域。在锁的持有人调用 Unlock 之前,其他 goroutine 不能获取锁。所以很重要的一点是,goroutine 在使用完成后就应当释放锁,另外,需要注意在函数的所有分支,特别是错误分支中释放锁,避免因为异常导致锁无法释放,从而导致程序陷入死锁而卡死。因此尽量使用 defer 进行锁的释放操作

几个导出函数封装了一个或多个变量,于是只能通过这些函数来访问这些变量(对于一个对象的变量,则用方法来封装)。每个函数在开始时申请一个互斥锁,在结束时再释放掉,通过这种方式来确保共享变量不会被并发访问。这种函数、互斥锁、变量的组合方式称为监控(monitor)模式。

监控模式(Monitor Pattern)是一种经典的并发编程模式,用于在多线程环境中安全地访问和修改共享资源。监控模式通过将共享资源的访问封装在一个监控对象(Monitor)中,并使用同步机制(如互斥锁)来控制对这些资源的并发访问,从而避免数据竞争和不一致性。

临界区域的设计准则

考虑如下的 Withdraw 函数。当成功时,余额减少了指定的金额,并返回 true;但如果余额不足,无法完成交易,Withdraw 会恢复余额并返回 false

// 注意:不是原子操作
func Withdraw(amount int) bool {
    Deposit(-amount)
    if Balance() < 0 {
        Deposit(amount)
        return false // 余额不足
    }
    return true
}

这个函数最终能给出正确的结果,但它有一个不良的副作用:在尝试进行超额提款时,在某个瞬间余额会降到 0 以下。这可能会导致一个小额的取款被不合逻辑地拒绝。所以当 Bob 尝试购买一辆跑车时,可能会导致 Alice 无法支付早上的咖啡。Withdraw 的问题在于它不是原子操作:它包含三个串行的操作,每个操作都申请并释放了互斥锁,但对整个序列没有上锁。

理想情况下,Withdraw 应当为整个操作申请一次互斥锁。但如下的尝试是错误的:

// 注意:不正确的实现
func Withdraw(amount int) bool {
    mu.Lock()
    defer mu.Unlock()
    Deposit(-amount)
    if Balance() < 0 {
        Deposit(amount)
        return false // 余额不足
    }
    return true
}

Deposit 会通过调用 mu.Lock() 来尝试再次获取互斥锁,但由于互斥锁是不可重入的(无法对一个已经上锁的互斥量再上锁),这会导致死锁。Withdraw 会一直被卡住。

Go 语言的互斥量是不可重入的,其目的是在程序执行过程中维持基于共享变量的特定不变量(invariant)。其中一个不变量是“没有 goroutine 正在访问这个共享变量”,但有可能互斥量也保护针对数据结构的其他不变量。当 goroutine 获取一个互斥锁时,它可能会假定这些不变量是满足的。当它获取到互斥锁之后,它可能会更新共享变量的值,这样可能会临时不满足之前的不变量。当它释放互斥锁时,它必须保证之前的不变量已经还原且重新满足。尽管一个可重入的互斥量可以确保没有其他 goroutine 可以访问共享变量,但是无法保护这些变量的其他不变量。

注意:此处的不变量指的是程序设计的准则,即变量操作过程中的原子性和一致性。对于不可重入锁而言,能够很好地保证变量操作的原子性和一致性;但如果是可重入锁,就可能一层嵌套一层,从而无法确定最终的结果是否符合预期。

一个常见的解决方案是把 Deposit 这样的函数拆分为两部分:一个不导出的函数 deposit,它假定已经获得互斥锁并完成实际的基本业务逻辑;以及两个导出的函数 DepositWithdraw,它用来获取锁并调用 deposit,执行进一步的复合操作,并对外导出(需要明确,只能对导出函数并行操作;非导出函数顺序操作):

func Withdraw(amount int) bool {
    mu.Lock()
    defer mu.Unlock()
    deposit(-amount)
    if balance < 0 {
        deposit(amount)
        return false // 余额不足
    }
    return true
}

func Deposit(amount int) {
    mu.Lock()
    defer mu.Unlock()
    deposit(amount)
}

func Balance() int {
    mu.Lock()
    defer mu.Unlock()
    return balance
}

// 这个函数要求已获取互斥锁
func deposit(amount int) {
    balance += amount
}

当然,这里的 deposit 函数代码很少,所以实际上 Withdraw 函数可以不用调用这个函数,但无论如何,通过这个例子我们很好地演示了这个规则。封装(参考 6.6 节)即通过在程序中减少对数据结构的非预期交互,来帮助我们保证数据结构中的不变量。因为类似的原因,封装也可以用来保持并发中的不变性。所以无论是为了保护包级别的变量,还是结构中的字段,当你使用一个互斥量时,都请确保互斥量本身以及被保护的变量都没有导出。

3. 读写互斥锁: sync.RWMutex

在实际的应用场景中,往往会存在大量的读和相对较少的写操作。如果对所有读操作都加上独享锁,那么程序性能很快就会陷入瓶颈。因为 Balance 函数只需读取变量的状态,所以多个 Balance 请求可以安全地并发运行,只要 DepositWithdraw 请求没有同时运行即可。在这种场景下,我们需要一种特殊类型的锁,它允许只读操作可以并发执行,但写操作需要获得完全独享的访问权限。这种锁称为多读单写锁,Go 语言中的 sync.RWMutex 可以提供这种功能:

var mu sync.RWMutex
var balance int

func Balance() int {
    mu.RLock() // 读锁
    defer mu.RUnlock()
    return balance
}

现在,Balance 函数可以调用 RLockRUnlock 方法来分别获取和释放一个读锁(也称为共享锁)。Deposit 函数无须更改,它通过调用 mu.Lockmu.Unlock 来分别获取和释放一个写锁(也称为互斥锁)。

经过上述修改之后,Bob 的绝大部分 Balance 请求可以并行运行且能更快完成。因此,锁可用的时间比例会更大,Deposit 请求也能得到更及时的响应。

RLock 仅可用于在临界区域内对共享变量无写操作的情形。一般来讲,我们不应假定那些逻辑上只读的函数和方法不会更新一些变量。比如,一个看起来只是简单访问器的方法可能会递增内部使用的计数器,或者更新一个缓存来让重复的调用更快。如果你有疑问,那么就应当使用独享版本的 Lock

逻辑只读 ≠ 实际只读

仅在绝大部分 goroutine 都在获取读锁并且锁竞争比较激烈时(即,goroutine 一般都需要等待才能获得锁),RWMutex 才有优势。因为 RWMutex 需要更复杂的内部簿记工作,所以在竞争不激烈时它比普通的互斥锁慢。

示例:

  1. 功能代码:
package __mutex

import (
	"sync"
)

//var (
//	mu      sync.Mutex
//	balance int
//)

var (
	mu      sync.RWMutex
	balance int
)

func Deposit(amount int) {
	mu.Lock()
	defer mu.Unlock()
	balance = balance + amount
}

func Balance() int {
	mu.RLock()
	defer mu.RUnlock()
	//mu.Lock()
	//defer mu.Unlock()
	return balance
}
  1. 测试代码:
import (
	"log"
	"testing"
	"time"
)

func TestMutex(t *testing.T) {

	// 1. 记录执行耗时
	defer func() func() {
		start := time.Now()
		return func() {
			duration := time.Since(start)
			log.Println(duration)
		}
	}()()

	// 2. 并发执行Read/Write操作,测试耗时
	done := make(chan struct{})
	go func() {
		for i := 0; i < 1000000; i++ {
			Deposit(i)
		}
		done <- struct{}{}
	}()
	// 读操作的并发量是写的100倍
	for i := 0; i < 100; i++ {
		go func() {
			for j := 0; j < 1000000; j++ {
				Balance()
			}
		}()
	}

	// 读写均独享锁的情况下:10.0879871s
	// 读共享,写独享情况下:5.4425918s
	<-done
	time.Sleep(1 * time.Second)
}

可以看到,读写锁确实显著快于独享锁。尤其是读多、写少的情形下,读:写操作的比例越高,那么性能提升越显著。需要注意的是,由于读写之间任然是互斥的,因此性能的提升依然是有限的。每当一个写操作到达后,就需要等待当前所有读操作完成后再执行,因此读操作的性能消耗依然远大于未加锁的情况。

4. 内存同步

你可能会对 Balance 方法也需要互斥锁(不管是基于通道的锁还是基于互斥量的锁)感到奇怪。毕竟,与 Deposit 不一样,它只包含单个操作,所以并不存在另外一个 goroutine 插在中间执行的风险。其实需要互斥锁的原因有两个。

首先,防止 Balance 插到其他操作中间是很重要的。例如,在 Withdraw 函数执行过程中,会存在一个临时的账户余额。如果不加锁,Balance 操作可能会读取到这个临时余额,产生潜在的错误。

其次,原因更为微妙,因为同步不仅涉及多个 goroutine 的执行顺序问题,还会影响到内存。

现代计算机一般都有多个处理器,每个处理器都有其本地缓存。为了提高效率,对内存的写入通常会先缓存在处理器中,只在必要时才刷回内存。甚至刷回内存的顺序都可能与 goroutine 的写入顺序不一致。像通道通信或者互斥锁操作这样的同步原语,会导致处理器把累积的写操作刷回内存并提交,所以在这个时刻之前 goroutine 的执行结果才会保证对运行在其他处理器的 goroutine 可见。

考虑如下代码片段的可能输出:

var x, y int
go func() {
    x = 1 // A1
    fmt.Print("y:", y, " ") // A2
}()
go func() {
    y = 1 // B1
    fmt.Print("x:", x, " ") // B2
}()

由于这两个 goroutine 并发运行且在没有使用互斥锁的情况下访问共享变量,所以会产生数据竞态。因此,我们不应对程序每次输出不一致感到奇怪。根据程序中标注语句的不同交错模式,我们可能会看到如下四个结果中的一个:

  • y:0 x:1
  • x:0 y:1
  • x:1 y:1
  • y:1 x:1

第四种情况可以由 A1, B1, A2, B2 或 B1, A1, A2, B2 这样的执行顺序产生。但是,程序也可能产生如下两个意料之外的输出:

  • x:0 y:0
  • y:0 x:0

在某些特定的编译器、CPU 或其他情况下,这些确实可能发生。上面四个语句以什么样的顺序交错执行才能解释这个结果呢?

在单个 goroutine 内,每个语句的效果保证按照执行的顺序发生,也就是说,goroutine 是串行一致的(sequentially consistent)。但在缺乏使用通道或者互斥量来显式同步的情况下,并不能保证所有的 goroutine 看到的事件顺序都是一致的。尽管 goroutine A 肯定能在读取 y 之前观察到 x=1 的效果,但它不一定能观察到 goroutine B 对 y 写入的效果,所以可能会输出 y 的一个过期值(即两者对 xy 的修改存在于两个独立 CPU 的本地缓存中,还未同步到内存,各自对变量的修改对其他 goroutine 是不可见的)。

因为赋值和 Print 对应不同的变量,所以编译器可能会认为两个语句的执行顺序不会影响结果,然后就交换了这两个语句的执行顺序。CPU 也有类似的问题,如果两个 goroutine 在不同的 CPU 上执行,每个 CPU 都有自己的缓存,那么一个 goroutine 的写入操作在同步到内存之前对另一个 goroutine 的 Print 语句是不可见的。

这些并发问题都可以通过采用简单、成熟的模式来避免,即在可能的情况下,把变量限制到单个 goroutine 中,如果需要访问其他外部变量,使用互斥锁。

5. 延迟初始化: sync.Once

延迟一个昂贵的初始化步骤到有实际需求的时刻是一种很好的实践。预先初始化一个变量会增加程序的启动时间,并且如果实际执行时可能根本用不上这个变量,那么初始化也是不必要的。回到本章之前提到的 icons 变量:

var icons map[string]image.Image

这个版本的 Icon 使用了延迟初始化:

func loadIcons() {
    icons = map[string]image.Image{
        "spades.png": loadIcon("spades.png"),
        "hearts.png": loadIcon("hearts.png"),
        "diamonds.png": loadIcon("diamonds.png"),
        "clubs.png": loadIcon("clubs.png"),
    }
}

// 注意:并发不安全
func Icon(name string) image.Image {
    if icons == nil {
        loadIcons() // 一次性初始化
    }
    return icons[name]
}

对于那些只被一个 goroutine 访问的变量,上面的模式是没有问题的,但在上述例子中,若在并发调用 Icon 时,这个模式就是不安全的。类似于银行例子中最早版本的 Deposit 函数,Icon 也包含多个步骤:检测 icons 是否为空,再加载图标,最后更新 icons 为一个非 nil 值。直觉可能会告诉你,竞态带来的最严重问题可能就是 loadIcons 函数会被调用多遍。当第一个 goroutine 正忙于加载图标时,其他 goroutine 进入 Icon 函数,会发现 icons 仍然是 nil,所以仍然会调用 loadIcons

但这个直觉是错误的。在缺乏显式同步的情况下,编译器和 CPU 在能保证每个 goroutine 都满足串行一致性的基础上可以自由地重排访问内存的顺序。loadIcons 一个可能的语句重排结果如下所示。它在填充数据之前把一个空 map 赋给 icons

func loadIcons() {
    icons = make(map[string]image.Image)
    icons["spades.png"] = loadIcon("spades.png")
    icons["hearts.png"] = loadIcon("hearts.png")
    icons["diamonds.png"] = loadIcon("diamonds.png")
    icons["clubs.png"] = loadIcon("clubs.png")
}

因此,一个 goroutine 发现 icons 不是 nil 并不意味着变量的初始化肯定已经完成。这个时候访问未被初始化的对象,就有可能返回 nil;但实际上这个对象应该存在于 icons 中。

保证所有 goroutine 都能观察到 loadIcons 效果最简单的正确方法就是用一个互斥锁来做同步:

var mu sync.Mutex // 保护 icons
var icons map[string]image.Image

// 并发安全
func Icon(name string) image.Image {
    mu.Lock()
    defer mu.Unlock()
    if icons == nil {
        loadIcons()
    }
    return icons[name]
}

采用互斥锁访问 icons 的额外代价是两个 goroutine 不能并发访问这个变量,即使在变量已经安全完成初始化且不再更改的情况下,也会造成这个后果。使用一个可以并发读的锁可以改善这个问题:

var mu sync.RWMutex // 保护 icons
var icons map[string]image.Image

// 并发安全
func Icon(name string) image.Image {
    mu.RLock()
    if icons != nil {
        icon := icons[name]
        mu.RUnlock()
        return icon
    }
    mu.RUnlock()

    // 获取互斥锁
    mu.Lock()
    if icons == nil { // 注意:必须重新检查 nil 值
        loadIcons()
    }
    icon := icons[name]
    mu.Unlock()
    return icon
}

这里有两个临界区域。goroutine 首先获取一个读锁,查阅 map,然后释放这个读锁。如果条目能找到(常见情况),就返回它。如果条目没找到,goroutine 再获取一个写锁。由于不先释放一个共享锁就无法直接把它升级到互斥锁,为了避免在过渡期其他 goroutine 已经初始化了 icons,所以我们必须重新检查 nil 值。

上面的模式具有更好的并发性,但它更复杂并且更容易出错。幸运的是,sync 包提供了针对一次性初始化问题的特化解决方案:sync.Once。从概念上来讲,Once 包含一个布尔变量和一个互斥量,布尔变量记录初始化是否已经完成,互斥量则负责保护这个布尔变量和客户端的数据结构。Once 的唯一方法 Do 以初始化函数作为它的参数。让我们看一下 Once 简化后的 Icon 函数:

var loadIconsOnce sync.Once
var icons map[string]image.Image

// 并发安全
func Icon(name string) image.Image {
    loadIconsOnce.Do(loadIcons)
    return icons[name]
}

每次调用 Do(loadIcons) 时会先锁定互斥量并检查里边的布尔变量。在第一次调用时,这个布尔变量为假,Do 会调用 loadIcons 然后把变量设置为真。后续的调用相当于空操作,只是通过互斥量的同步来保证 loadIcons 对内存产生的效果(在这里就是 icons 变量)对所有的 goroutine 可见。以这种方式使用 sync.Once,可以避免变量在正确构造之前就被其他 goroutine 共享。

查看sync.Once源码,可以看到其本质上就是double check实现懒加载的操作封装。

6. 竞态检测器

即使付出最大的努力,小心谨慎地处理并发,仍然很容易出错。幸运的是,Go 语言的运行时和工具链配备了一个精致且易于使用的动态分析工具:竞态检测器(racedetector)。

要使用此功能,只需在 go buildgo rungo test 等命令中添加 -race 参数。该参数会让编译器为你的应用或测试构建一个修改后的版本,该版本包含额外的机制,用于高效地记录执行时对共享变量的所有访问,以及读写这些变量的 goroutine 标识。此外,修改后的版本还会记录所有的同步事件,包括 go 语句、通道操作、(*sync.Mutex).Lock 调用、(*sync.WaitGroup).Wait 调用等(完整的同步事件集合可以在 Go 语言规范中的 “The Go Memory Model” 文档中找到)。

竞态检测器会分析事件流,找出有问题的案例,即一个 goroutine 写入一个变量后,中间没有任何同步操作,就有另外一个 goroutine 读写了该变量。这种情况表明存在对共享变量的并发访问,即数据竞态。该工具会生成一份报告,其中包括变量的标识以及读写 goroutine 当时的调用栈。通常情况下,这些信息足以定位问题。在第 9.7 节中有一个竞态检测器的示例。

竞态检测器报告所有实际运行时发生的数据竞态。然而,它只能检测到运行时发生的竞态,无法保证肯定不会发生竞态。为了获得最佳效果,请确保你的测试包含并发使用包的场景。

示例:

 $env:CGO_ENABLED=1; go test -run=TestMemo -race -v .\9_5_funcMemory_test.go .\9_4_funcMemory.go

其中:
1. $env:CGO_ENABLED=1; windows环境下临时启用CGO
2. -run=TestMemo       待测试的函数
3. -race               竞态检测

示例:并发非阻塞缓存

在本节中,我们将创建一个并发非阻塞的缓存系统,这个系统能够解决并发编程中常见的一个问题:函数记忆(memoizing)。函数记忆指的是缓存函数的结果,从而在多次调用时只需计算一次结果。我们的解决方案不仅是并发安全的,而且避免了使用单个锁来保护整个缓存所带来的锁争夺问题。

我们将使用下面的 httpGetBody 函数作为示例来演示函数记忆。该函数发起一个 HTTP GET 请求并读取响应体。由于调用这个函数的代价较高,我们希望避免不必要的重复调用。

func httpGetBody(url string) (interface{}, error) {
    resp, err := http.Get(url)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()
    return ioutil.ReadAll(resp.Body)
}

函数 httpGetBody 的最后一行稍微有些微妙,ReadAll 返回两个结果:一个 []byte 和一个 error。因为这些结果可以直接赋给 httpGetBody 声明的返回类型 interface{}error,所以我们可以直接返回它们而不需要额外处理。选择这样的返回类型是为了满足我们缓存系统的设计需求。

缓存系统初始版本

下面是缓存系统的初始版本:

// memo 包提供了一个对类型 Func 并发不安全的函数记忆功能
package memo

// Memo 缓存了调用 Func 的结果
type Memo struct {
    f     Func
    cache map[string]result
}

// Func 是用于记忆的函数类型
type Func func(key string) (interface{}, error)

type result struct {
    value interface{}
    err   error
}

func New(f Func) *Memo {
    return &Memo{f: f, cache: make(map[string]result)}
}

// 注意:非并发安全
func (memo *Memo) Get(key string) (interface{}, error) {
    res, ok := memo.cache[key]
    if !ok {
        res.value, res.err = memo.f(key)
        memo.cache[key] = res
    }
    return res.value, res.err
}

一个 Memo 实例包含了被记忆的函数 f(类型为 Func)以及缓存(类型为从字符串到 result 的映射表)。每个 result 都是调用 f 产生的结果对:一个值和一个错误。在设计的推进过程中,我们会展示 Memo 的几种变体,但所有变体都会遵守这些基本概念。

下面的例子展示了如何使用 Memo。对于请求 URL 列表中的每个元素,首先调用 Get,记录延时并输出返回的数据长度:

m := memo.New(httpGetBody)
for url := range incomingURLs() {
    start := time.Now()
    value, err := m.Get(url)
    if err != nil {
        log.Print(err)
    }
    fmt.Printf("%s, %s, %d bytes\n", url, time.Since(start), len(value.([]byte)))
}

我们可以使用 testing 包(这是第 11 章的主题)来系统地调查一下记忆的效果。从测试结果来看,我们可以看到 URL 流中有重复项。尽管每个 URL 第一次调用 (*Memo).Get 都会消耗数百毫秒的时间,但第二次请求在 1 微秒内就返回了同样的结果。

由于 HTTP 请求在并发情况下有很大的优化空间,我们修改测试来让所有请求并发进行。这个测试使用 sync.WaitGroup 来确保所有请求完成后再返回:

m := memo.New(httpGetBody)
var n sync.WaitGroup
for url := range incomingURLs() {
    n.Add(1)
    go func(url string) {
        start := time.Now()
        value, err := m.Get(url)
        if err != nil {
            log.Print(err)
        }
        fmt.Printf("%s, %s, %d bytes\n", url, time.Since(start), len(value.([]byte)))
        n.Done()
    }(url)
}
n.Wait()

这次测试运行得更快,但并不是每次都能正常运行。我们可能会注意到意料之外的缓存失效、缓存命中后返回错误结果,甚至程序崩溃。更糟糕的是,有时它能正常运行,所以我们可能甚至没有注意到它会有问题。但如果我们加上 -race 标志后再运行,竞态检测器(参考第 9.6 节)经常会报告两个 goroutine 在未使用同步的情况下更新了 cache。整个 Get 函数其实不是并发安全的:它存在数据竞态。

并发安全的缓存系统 - 基于锁的同步机制

让缓存并发安全最简单的方法是使用一个基于锁的同步机制。我们需要为 Memo 添加一个互斥量,并在 Get 函数的开头获取锁,返回前释放锁,这样所有与缓存相关的操作都发生在临界区域内:

type Memo struct {
    f     Func
    mu    sync.Mutex // 保护 cache
    cache map[string]result
}

// Get 是并发安全的
func (memo *Memo) Get(key string) (interface{}, error) {
    memo.mu.Lock()
    res, ok := memo.cache[key]
    if !ok {
        res.value, res.err = memo.f(key)
        memo.cache[key] = res
    }
    memo.mu.Unlock()
    return res.value, res.err
}

现在即使并发运行测试,竞态检测器也不会报警。但是,这次对 Memo 的修改使我们之前对性能的优化失效了。由于每次调用 f 时都上锁,Get 将我们希望并行的 I/O 操作串行化了。我们需要的是一个非阻塞的缓存系统,一个不会将需要记忆的函数串行运行的缓存。

在下面一个版本的 Get 实现中,主调 goroutine 会分两次获取锁:第一次用于查询,第二次用于在查询无返回结果时进行更新。在两次之间,其他 goroutine 也可以使用缓存。

func (memo *Memo) Get(key string) (interface{}, error) {
    memo.mu.Lock()
    res, ok := memo.cache[key]
    memo.mu.Unlock()
    if !ok {
        res.value, res.err = memo.f(key)
        // 在两个临界区域之间,可能会有多个 goroutine 来计算 f(key) 并且更新 map
        memo.mu.Lock()
        memo.cache[key] = res
        memo.mu.Unlock()
    }
    return res.value, res.err
}

性能再次得到提升,但我们注意到某些 URL 被获取了两次。当两个或多个 goroutine 几乎同时调用 Get 来获取同一个 URL 时,就会出现这个问题。两个 goroutine 都首先查询缓存,发现缓存中没有需要的数据,然后调用慢函数 f,最后都用获得的结果更新 map,其中一个结果会被另一个覆盖。

理想情况下,我们应该避免这种额外的处理。这种功能有时称为重复抑制(duplicate suppression)。在下面的 Memo 版本中,map 的每个元素是一个指向 entry 结构的指针。除了包含一个已经记住的函数 f 调用结果之外,每个 entry 还新加了一个通道 ready。在设置 entryresult 字段后,通道会关闭,正在等待的 goroutine 会收到广播(参考第 8.9 节),然后可以从 entry 读取结果。

type entry struct {
    res   result
    ready chan struct{} // res 准备好后会被关闭
}

func New(f Func) *Memo {
    return &Memo{f: f, cache: make(map[string]*entry)}
}

type Memo struct {
    f     Func
    mu    sync.Mutex // 保护 cache
    cache map[string]*entry
}

func (memo *Memo) Get(key string) (interface{}, error) {
    memo.mu.Lock()
    e := memo.cache[key]
    if e == nil {
        // 对 key 的第一次访问,这个 goroutine 负责计算数据和广播数据已准备完毕的消息
        e = &entry{ready: make(chan struct{})}
        memo.cache[key] = e
        memo.mu.Unlock()
        e.res.value, e.res.err = memo.f(key)
        close(e.ready) // 广播数据已准备完毕的消息
    } else {
        // 对这个 key 的重复访问
        memo.mu.Unlock()
        <-e.ready // 等待数据准备完毕
    }
    return e.res.value, e.res.err
}

现在,调用 Get 时会先获取保护 cache map 的互斥锁,再从 map 中查询一个指向已有 entry 的指针。如果没有查找到,就分配并插入一个新的 entry,最后释放锁。如果要查询的 entry 存在,那么它的值可能还没准备好(另一个 goroutine 可能还在调用慢函数 f),所以主调 goroutine 需要等待 entry 准备好才能读取 entry 中的 result 数据,具体的实现方法是从 ready 通道读取数据,这个操作会一直阻塞到通道关闭。

如果要查询的 entry 不存在,那么当前的 goroutine 就需要新插入一个未准备好的 entrymap 里,并负责调用慢函数 f,更新 entry,最后向其他正在等待的 goroutine 广播数据已准备完毕的消息。

注意,entry 中的变量 e.res.valuee.res.err 被多个 goroutine 共享。创建 entry 的 goroutine 设置了这两个变量的值,其他 goroutine 在收到数据准备完毕的广播后开始读取这两个变量。尽管变量被多个 goroutine 访问,但此处不需要加上互斥锁。ready 通道的关闭先于其他 goroutine 收到广播事件,所以第一个 goroutine 的变量写入事件也先于后续多个 goroutine 的读取事件。在这种情况下不存在数据竞态。现在,我们的并发、重复抑制、非阻塞缓存系统就完成了。

重实现重复抑制的方式类似于 Java 中的 Future,即通过占位和阻塞机制。其中,通道(channel)负责实现阻塞和广播的效果,而调用 close() 则表示对应的结果已经准备完成。

如果存在大量重复url,可以尝试下述改进版本,使用读写锁,方便大量读取而只有少量写入的情况:

// Get :并发安全 + 重复抑制的缓存读取操作
func (memo *Memo) Get(key string) (interface{}, error) {
	var res result

	memo.mu.RLock()
	cache := memo.cache[key]
	memo.mu.RUnlock()

	if cache == nil {
		// double check:涉及读写锁分离的情况,就要double-check
		memo.mu.Lock()
		cache = memo.cache[key]
		if cache == nil {
			// 第一次访问, 生成一个future对象
			cache = NewFuture()
			memo.cache[key] = cache
			memo.mu.Unlock() // 占位完成后即可释放锁

			// 对url进行访问,结果写入future对象
			content, err := memo.f(key)
			res = result{content, err}
			cache.Set(res)
		} else {
			memo.mu.Unlock()
		}
	}
	res = cache.Get()
	return res.value, res.err
}

只要是非并发安全的对象,且同时存在读、写操作时就需要避免data race。这不仅可能导致读取到滞后的数据,还可能导致数据和程序损坏,进一步程序崩溃。为了保证程序的稳定性和可维护性,建议不要忽视 data race 问题。如果性能是一个关键考虑因素,可以使用 sync.Map 或者 RWMutex 来实现高效的并发读写。

并发安全的缓存系统 - 基于通道的同步机制

上面的 Memo 代码使用一个互斥量来保护被多个调用 Get 的 goroutine 访问的 map 变量。接下来会对比另外一种设计,在新的设计中,map 变量的定义和操作被限制在一个监控 goroutine 中,由一个顺序进程来处理所有缓存的查询和更新请求,其它 goroutine 通过向监控通道发送请求和事件来获取缓存信息,从而避免并发过程中存在的data race问题。

Funcresultentry 的声明与之前一致:

  • Func 是用于记忆的函数类型:
type Func func(key string) (interface{}, error)
  • result 是调用 Func 的返回结果:
type result struct {
    value interface{}
    err   error
}
  • entry
type entry struct {
    res   result
    ready chan struct{} // 当 res 准备好后关闭该通道
}

尽管 Get 的调用者通过这个通道来与监控 goroutine 通信,但是 Memo 类型现在包含一个通道 requests。该通道的元素类型是 request。通过这种数据结构,Get 的调用者向监控 goroutine 发送被记忆函数的参数(key),以及一个通道 response,结果在准备好后就通过 response 通道发回。这个通道仅会传输一个值。

type request struct {
    key      string
    response chan<- result // 客户端需要单个 result
}

type Memo struct {
    requests chan request
}

func New(f Func) *Memo {
    memo := &Memo{requests: make(chan request)}
    go memo.server(f)
    return memo
}

func (memo *Memo) Get(key string) (interface{}, error) {
    response := make(chan result)
    memo.requests <- request{key, response}
    res := <-response
    return res.value, res.err
}

func (memo *Memo) Close() {
    close(memo.requests)
}

上面的 Get 方法创建了一个响应(response)通道,放在了请求里边,然后把它发送给监控 goroutine,再从响应通道中读取。

如下所示,cache 变量被限制在监控 goroutine(即 (*Memo).server)中。监控 goroutine 从 request 通道中循环读取,直到该通道被 Close 方法关闭。对于每个请求,它先查询缓存,如果没找到则创建并插入一个新的 entry

func (memo *Memo) server(f Func) {
    cache := make(map[string]*entry)
    for req := range memo.requests {
        e := cache[req.key]
        if e == nil {
            // 对这个 key 的第一次请求
            e = &entry{ready: make(chan struct{})}
            cache[req.key] = e
            go e.call(f, req.key) // 调用 f(key)
        }
        go e.deliver(req.response)
    }
}

func (e *entry) call(f Func, key string) {
    // 执行函数
    e.res.value, e.res.err = f(key)
    // 通知数据已准备完毕
    close(e.ready)
}

func (e *entry) deliver(response chan<- result) {
    // 等待数据准备完毕
    <-e.ready
    // 向客户端发送结果
    response <- e.res
}

与基于互斥锁的版本类似,对于指定键的一次请求负责在该键上调用函数并保存结果到 entry 中,最后通过关闭 ready 通道来广播准备完毕状态。这个流程通过 (*entry).call 来实现。

对同一个键的后续请求会在 map 中找到已有的 entry,然后等待结果准备好,最后通过响应通道把结果发回给调用 Get 的客户端 goroutine。其中 calldeliver 方法都需要在独立的 goroutine 中运行,以确保监控 goroutine 能持续处理新请求。

上面的例子展示了可以使用两种方案来构建并发结构:共享变量并上锁,或者通信顺序进程(Communicating Sequential Processes,CSP),这两者都不复杂

在给定的情况下也许很难判定哪种方案更好,但了解这两种方案的对照关系是很有价值的。有时候从一种方案切换到另一种能够让代码更简单。

7. goroutine与线程的差异

可增长的栈

每个 OS 线程都有一个固定大小的栈内存(通常为 2MB),用于存储在函数调用期间正在执行或临时暂停的函数中的局部变量。然而,这个固定栈大小既可能显得过大,也可能显得过小。对于一些小的 goroutine,例如那些仅等待一个 WaitGroup 或关闭一个通道的 goroutine,2MB 的栈显然是巨大的浪费。在 Go 程序中,创建十万左右的 goroutine 也不罕见,这种情况下,栈空间就显得过大了。另一方面,对于最复杂和深度递归的函数,固定大小的栈可能始终不够大。虽然调整这个固定大小可以提高空间效率并允许创建更多的线程,或支持更深的递归函数,但无法同时做到这两点。

作为对比,一个 goroutine 在其生命周期开始时只有一个很小的栈,典型情况下为 2KB。与 OS 线程类似,goroutine 的栈也用于存放正在执行或临时暂停的函数中的局部变量。然而,与 OS 线程不同的是,goroutine 的栈不是固定大小的,它可以根据需要增大和缩小。goroutine 的栈大小限制可以达到 1GB,比线程典型的固定大小栈高出几个数量级。当然,只有极少数的 goroutine 会使用这么大的栈。

goroutine 调度

OS 线程由 OS 内核来调度。每隔几毫秒,一个硬件时钟中断会发到 CPU,CPU 调用一个称为调度器的内核函数。这个函数暂停当前正在运行的线程,把它的寄存器信息保存到内存,然后查看线程列表并决定接下来运行哪个线程,再从内存恢复线程的寄存器信息,最后继续执行选中的线程。由于 OS 线程是由内核调度的,所以控制权限从一个线程到另一个线程需要一个完整的上下文切换(context switch),即保存一个线程的状态到内存,再恢复另一个线程的状态,最终更新调度器的数据结构。考虑到这个操作涉及的内存局域性以及内存访问数量,还有访问内存所需的 CPU 周期数量的增加,这个操作实际上是很慢的。

Go 运行时包含一个自己的调度器,它使用一种称为 M:N 调度的技术(因为它可以复用/调度 M 个 goroutine 到 N 个 OS 线程)。Go 调度器与内核调度器的工作类似,但 Go 调度器只需关心单个 Go 程序的 goroutine 调度问题。不同于操作系统的线程调度器,Go 调度器不是由硬件时钟定期触发的,而是由特定的 Go 语言结构触发的。例如,当一个 goroutine 调用 time.Sleep 或被通道阻塞或进行互斥量操作时,调度器会将这个 goroutine 设为休眠模式,并运行其他 goroutine 直到前一个 goroutine 可以重新唤醒为止。由于不需要切换到内核语境,调用一个 goroutine 比调度一个线程的成本低很多。

GOMAXPROCS

Go 调度器使用 GOMAXPROCS 参数来确定需要使用多少个 OS 线程来同时执行 Go 代码。默认值是机器上的 CPU 数量,所以在一个有 8 个 CPU 的机器上,调度器会将 Go 代码同时调度到 8 个 OS 线程上(GOMAXPROCS 是 M:N 调度中的 “N”)。正在休眠或者被通道通信阻塞的 goroutine 不需要占用线程。阻塞在 I/O 和其他系统调用中或调用非 Go 语言编写的函数的 goroutine 需要一个独立的 OS 线程,但这个线程不计算在 GOMAXPROCS 内。

可以通过 GOMAXPROCS 环境变量或者 runtime.GOMAXPROCS 函数显式控制这个参数。可以用一个小程序来看看 GOMAXPROCS 的效果,这个程序无止境地输出 0 和 1:

for {
    go fmt.Print(0)
    fmt.Print(1)
}
$ GOMAXPROCS=1 go run hacker-cliche.go
111111111111111111110000000000000000000011111...
$ GOMAXPROCS=2 go run hacker-cliche.go
010101010101010101011001100101011010010100110...

windows下执行:

$env:GOMAXPROCS=2; go run hacker-cliche.go

在第一次运行时,每次最多只能有一个 goroutine 运行。最开始是主 goroutine,它输出 1,一段时间以后,Go 调度器让主 goroutine 休眠,并唤醒另一个输出 0 的 goroutine,让它有机会执行。在第二次运行时,有两个可用的操作系统线程,所以两个 goroutine 可以同时运行,以一个差不多的速率输出两个数字。需要注意的是,影响 goroutine 调度的因素很多,运行时也在不断演化,所以你的结果可能与上面展示的结果有所不同。

goroutine 没有标识

在大多数支持多线程的操作系统和编程语言中,当前线程都有一个独特的标识,通常可以是一个整数或者指针。这个特性使得我们可以轻松构建一个线程局部存储,它本质上是一个全局的 map,以线程的标识作为键,这样每个线程都可以独立地用这个 map 存储和获取值,而不受其他线程的干扰。

goroutine 没有可供程序员访问的标识。这是设计决定的,因为线程局部存储有被滥用的倾向。比如,当一个 Web 服务器用一个支持线程局部存储的语言实现时,很多函数都会通过访问这个存储来查找关于 HTTP 请求的信息。但就像那些过度依赖于全局变量的程序一样,这也会导致一种不健康的“超距作用”,即函数的行为不仅取决于它的参数,还取决于运行它的线程标识。因此,在线程标识需要改变的场景(比如需要使用工作线程时),这些函数的行为就会变得诡异莫测。

Go 语言鼓励一种更简单的编程风格,其中能影响一个函数行为的参数应当是显式指定的。这不仅让程序更易阅读,还让我们能自由地把一个函数的子任务分发到多个不同的 goroutine,而无需担心这些 goroutine 的标识。