Go编程基础-8. goroutine和通道
goroutine和通道
并发编程指的是一个程序由若干个自主执行的活动单元组成。Web服务器可以同时处理数千个请求。平板电脑和手机应用在渲染用户界面的同时,后端也在同步进行计算和处理网络请求。甚至传统的批处理任务——如读取数据、进行计算并输出结果——也使用并发技术来隐藏I/O操作的延迟,从而充分利用现代多核计算机的优势。尽管处理器内核数量每年都在增加,但单核速度变化不大。
Go 提供了两种并发编程的风格。本章将展示 goroutine 和通道(channel),它们支持通信顺序进程(Communicating Sequential Process,CSP)。CSP 是一种并发编程模式,通过在不同的执行体(goroutine)之间传递值来实现,但变量本身限制在单一的执行体中。
1. goroutine
在Golang中,每一个并发执行的活动称为goroutine。考虑一个程序,它包含两个函数:一个用于执行一些计算工作,另一个用于输出结果,假设它们彼此独立,不相互调用。在顺序执行的程序中,可以先调用一个函数,然后再调用另一个函数。但是,在并发程序中,两个函数可以通过多个goroutine同时执行。如果你使用过操作系统或其他编程语言中的线程,可以将goroutine类比为线程。
当一个程序启动时,只有一个goroutine来调用main
函数,这个goroutine称为主goroutine。新的goroutine通过go
语句创建。语法上,go
语句是在普通的函数或方法调用前加上go
关键字前缀。go
语句会使函数在一个新创建的goroutine中调用,而go
语句本身的执行会立即完成:
f() // 调用 f() 并等待它返回
go f() // 创建一个调用 f() 的新goroutine,不等待它返回
示例1:Fabonacci
在下面的例子中,主goroutine计算第45个斐波那契数。由于使用了非常低效的递归算法,这个计算需要大量时间。为了在此期间提供一个可见的提示,显示一个字符串“spinner”来指示程序依然在运行。
package main
import (
"fmt"
"time"
)
func main() {
go spinner(100 * time.Millisecond)
const n = 45
fibN := fib(n) // 慢速计算
fmt.Printf("\rFibonacci(%d) = %d\n", n, fibN)
}
func spinner(delay time.Duration) {
for {
for _, r := range `-\|/` {
fmt.Printf("\r%c", r)
time.Sleep(delay)
}
}
}
func fib(x int) int {
if x < 2 {
return x
}
return fib(x-1) + fib(x-2)
}
若干秒后,fib(45)
返回,main
函数输出结果:
Fibonacci(45) = 1134903170
然后main
函数返回,当这一事件发生时,所有的goroutine都会被直接终止,程序退出。除了从main
函数返回或退出程序外,没有编程方法可以让一个goroutine停止另一个goroutine。但是,如我们将看到的,有办法通过与goroutine通信来请求它自行停止。
注意这个程序如何通过两个独立的活动(指示器和斐波那契数计算)来表达并发性。它们被写成独立的函数,但同时运行。
示例2:并发时钟服务器
服务端:
package main
import (
"io"
"log"
"net"
"time"
)
func main() {
listener, err := net.Listen("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
for {
conn, err := listener.Accept()
if err != nil {
log.Print(err) // 例如,连接中止
continue
}
go handleConn(conn) // 一次处理一个连接
}
}
func handleConn(c net.Conn) {
defer c.Close()
for {
err := io.WriteString(c, time.Now().Format("15:04:05\n"))
if err != nil {
return // 例如,连接断开
}
time.Sleep(1 * time.Second)
}
}
这里有两个点需要注意:
- handleConn封装了连接的所有处理操作,handleConn处理完成后,连接就可以关闭了。因此defer c.Close()被封装在了handleConn的开头。
- 如果和以往的操作一样,获取到连接后,立即defer c.Close(),那么连接就会在main函数退出后才会执行,这样导致连接资源被长时间占用。
- 将连接的处理逻辑封装在一个独立的函数中(即 handleConn),使得代码更清晰,更容易维护。main 函数的主要职责是监听和接受连接,然后将连接交给处理函数,而不是直接处理和管理连接的生命周期。
客户端:
package main
import (
"io"
"log"
"net"
"os"
)
func main() {
conn, err := net.Dial("tcp", "127.0.0.1:8000")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
mustCopy(os.Stdout, conn)
}
func mustCopy(dst io.Writer, src io.Reader) {
if _, err := io.Copy(dst, src); err != nil {
log.Fatal(err)
}
}
2. 通道
如果说 goroutine 是 Go 程序并发的执行体,那么通道就是它们之间的连接。通道是一个通信机制,允许一个 goroutine 发送特定值到另一个 goroutine。每个通道都有一个具体类型,称为通道的元素类型。一个包含 int
类型元素的通道写作 chan int
。
使用内置的 make
函数来创建一个通道:
ch := make(chan int) // ch 的类型是 'chan int'
像 map 一样,通道是一个使用 make
创建的数据结构的引用。当通道被复制或作为参数传递到函数时,传递的是引用,因此调用者和被调用者都引用同一个数据结构。和其他引用类型一样,通道的零值是 nil
。
相同类型的通道可以使用 ==
运算符进行比较。如果两个通道引用同一个数据结构,比较结果为 true
。通道也可以与 nil
进行比较。
通道有两个主要操作:发送(send)和接收(receive),统称为通信。send
语句将一个值从一个 goroutine 传输到另一个正在执行接收表达式的 goroutine。两个操作都使用 <-
运算符表示。发送操作等价于将数据放入通道中,因此 ch <- x
;接收操作等价于从通道中获取数据,因此 x := <- ch
或 <- ch
(丢弃结果)。
ch <- x // 发送语句
x = <-ch // 赋值语句中的接收表达式
<-ch // 接收语句,丢弃结果
通道还支持第三个操作:关闭(close)。关闭操作设置一个标志,指示当前通道已经发送完所有值,通道中不再有新的值可发送;关闭后的发送操作将导致宕机。在一个已经关闭的通道上进行接收操作,将获取所有已经发送的值,直到通道为空;此时任何接收操作会立即完成,并返回通道元素类型对应的零值。
使用内置的 close
函数来关闭通道:
close(ch)
使用简单的 make
调用创建的通道称为无缓冲(unbuffered)通道,但 make
还可以接受第二个可选参数,一个表示通道容量的整数。如果容量是 0,make
创建一个无缓冲通道:
ch = make(chan int) // 无缓冲通道
ch = make(chan int, 0) // 无缓冲通道
ch = make(chan int, 3) // 容量为 3 的缓冲通道
2.1 无缓冲通道
在无缓冲通道上进行发送操作时,发送方会阻塞,直到另一个 goroutine 在对应的通道上执行接收操作。当值被成功传递之后,两个 goroutine 都可以继续执行。相对地,如果接收操作先执行,接收方 goroutine 会阻塞,直到另一个 goroutine 在同一个通道上发送一个值。
由于无缓冲通道使发送和接收 goroutine 同步,因此它们也被称为同步通道。只有在值被接收后,发送方 goroutine 才会被唤醒,继续执行。
这种机制使得我们在并发编程中可以控制函数的执行顺序或者变量的更新顺序,从而实现一些可预期的操作(例如,确保 x 发生在 y 之前)。
举个例子:假设我们启动一个 goroutine 来发送数据,而主线程继续执行其他任务。我们希望在 goroutine 完成数据发送后,主线程再退出 (否则数据发送到一半,goroutine就被强行关闭了,导致出现异常)。但由于两边的执行速度可能不同,结果难以预测。这时,我们可以使用同步通道让主线程阻塞,从而确保执行顺序符合预期。
func main() {
conn, err := net.Dial("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
done := make(chan struct{})
go func() {
io.Copy(os.Stdout, conn) // 注意: 忽略错误
log.Println("done")
done <- struct{}{} // 通知主 goroutine
}()
/**
其它操作
*/
<-done // 等待后台 goroutine 完成
conn.Close()
}
在这个例子中,我们通过 done
通道来同步主 goroutine 和后台 goroutine 的执行顺序,从而确保主 goroutine 在后台 goroutine 完成数据处理后再退出。
2.2 管道
由通道连接而成的多个goroutine称为管道。下为一个三级管道的示例:
第一个 goroutine 是 counter,生成一个 0, 1, 2, … 的整数序列,然后通过一个通道发送给第二个 goroutine(称为 square),计算这些数值的平方,并通过另一个通道将结果发送给第三个 goroutine(称为 printer),接收并打印这些结果。为了简化示例,我们特意选择了非常简单的函数,尽管它们在现实程序中不太可能各自拥有独立的 goroutine。
func main() {
naturals := make(chan int)
squares := make(chan int)
// counter
go func() {
for x := 0; ; x++ {
naturals <- x
}
}()
// squarer
go func() {
for {
x := <-naturals
squares <- x * x
}
}()
// printer (在主 goroutine 中)
for {
fmt.Println(<-squares)
}
}
正如预期的那样,程序将输出无限的平方序列 0, 1, 4, 9, …。像这样的管道常用于长期运行的服务器程序中,其中通道用于在包含无限循环的 goroutine 之间进行通信。如果需要通过管道发送有限的数据该怎么办?
当发送方知道没有更多的数据要发送时,通知接收方所在的 goroutine 停止等待是非常有用的。这可以通过调用内置的 close
函数来关闭通道:
close(naturals)
在通道关闭后,任何后续的发送操作将导致应用崩溃。当关闭的通道被读完(即最后一个发送的值被接收)后,所有后续的接收操作将顺利进行,只是获取到的是零值。关闭 naturals
通道会导致计算平方的循环快速执行,并将结果 0 传递给 printer
goroutine。
没有直接的方法判断通道是否已关闭,但有一种接收操作变体,它产生两个结果:接收到的通道元素,以及一个布尔值(通常称为 ok
),当其为 true
时表示接收成功,false
表示当前的接收操作在一个已关闭且读完的通道上。利用这个特性,可以修改 squarer
的循环,当 naturals
通道读完后,关闭 squares
通道。
// squarer
go func() {
for {
x, ok := <-naturals
if !ok {
break // 通道关闭并且读完
}
squares <- x * x
}
close(squares)
}()
由于上面的语法比较笨拙,而模式又较为通用,该语言提供了 range
循环语法以便在通道上迭代。这种语法更方便接收通道上所有发送的值,并在接收完最后一个值后关闭循环。
在下面的示例中,当 counter
goroutine 在生成 100 个元素后结束循环时,它关闭 naturals
通道,这导致 squarer
结束循环并关闭 squares
通道。在更复杂的程序中,可以将 counter
和 squarer
的 goroutine 的 close
调用延迟到外层。最终,主 goroutine 结束,程序退出。
func main() {
naturals := make(chan int)
squares := make(chan int)
// counter
go func() {
for x := 0; x < 100; x++ {
naturals <- x
}
close(naturals)
}()
// squarer
go func() {
for x := range naturals {
squares <- x * x
}
close(squares)
}()
// printer (在主 goroutine 中)
for x := range squares {
fmt.Println(x)
}
}
结束时,关闭每一个通道不是必需的。只有在通知接收方 goroutine 所有的数据都发送完毕时才需要关闭通道。通道也可以通过垃圾回收器根据其是否被访问来决定是否回收,而不是根据其是否关闭。(不要将通道的 close
操作与文件的 close
操作混淆。在结束时对每一个文件调用 Close
方法是非常重要的。)
试图关闭一个已经关闭的通道会导致程序宕机,就像关闭一个空通道一样。关闭通道还可以作为一种广播机制。
通道和管道:在 Go 语言中,通道的英文名是 channel,而管道的英文名是 pipeline。通道是 Go 语言中的一种用于 goroutine 之间通信的类型,而管道通常是指一系列处理步骤,通过多个 goroutine 和 channel 进行数据传递和处理。
2.3 单向通道
当一个通道用作函数的形参时,它几乎总是被有意地限制为不能发送或不能接收。将这种意图文档化可以避免误用。Go语言的类型系统提供了单向通道类型,仅允许发送或接收操作。类型 chan<- int
是一个只能发送的通道,允许发送但不允许接收。相反,类型 <-chan int
是一个只能接收的 int
类型通道,允许接收但不允许发送。(运算符相对于 chan
关键字的位置是一个帮助记忆的点)。违反这个原则会在编译时被检查出来。
因为 close
操作表示通道上不再有数据发送,所以只能在发送方的 goroutine 上调用它。试图关闭一个只能接收的通道在编译时会报错。
这里我们再次看到平方管道,这次我们使用单向通道类型:
func counter(limit int, naturals chan<- int) {
for x := range limit {
naturals <- x
}
close(naturals)
}
func squarer(naturals <-chan int, squares chan<- int) {
for x := range naturals {
squares <- x * x
}
close(squares)
}
func printer(w io.Writer, squares <-chan int, endflag chan<- bool) {
// 设置输出前缀和标志
log.SetOutput(w)
log.SetPrefix("LOG: ")
log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
for x := range squares {
log.Println(x)
}
endflag <- true
}
调用 counter(naturals)
隐式地将 chan int
类型转换为参数要求的 chan<- int
类型。调用 printer(squares)
做了类似的 <-chan int
转换。在任何赋值操作中,将双向通道转换为单向通道都是允许的,但反过来则不行。一旦有了一个像 chan<- int
这样的单向通道,就无法通过它获取到引用同一个数据结构的 chan int
类型的通道。
2.4 缓冲通道
缓冲通道有一个元素队列,其最大长度在创建时通过 make
的容量参数进行设置。下面的语句创建了一个可以容纳三个字符串的缓冲通道:
ch = make(chan string, 3)
在缓冲通道上,发送操作会在队列的尾部插入一个元素,而接收操作则从队列的头部移除一个元素。如果通道已满,发送操作会阻塞所在的 goroutine,直到另一个 goroutine 进行接收操作,腾出可用空间。反之,如果通道为空,执行接收操作的 goroutine 会阻塞,直到另一个 goroutine 在通道上发送数据。
可以在当前通道上无阻塞地发送三个值:
ch <- "A"
ch <- "B"
ch <- "C"
此时,通道已满,如图 8-3 所示。第四个发送语句将会阻塞。若接收一个值:
fmt.Println(<-ch) // "A"
通道既不满也不空,如图 8-4 所示,因此此时的接收或发送操作都不会阻塞。通过这种方式,通道的缓冲区将发送和接收 goroutine 解耦。
在某些情况下,程序可能需要知道通道缓冲区的容量,可以通过调用内置的 cap
函数获取:
fmt.Println(cap(ch)) // "3"
使用内置的 len
函数,可以获取当前通道内的元素个数。由于在并发程序中这个信息会迅速过时,它的价值较低,但在错误诊断和性能优化时非常有用:
fmt.Println(len(ch)) // "2"
通过接下去的两次接收操作,通道将再次变空,第四次接收会被阻塞:
fmt.Println(<-ch) // "B"
fmt.Println(<-ch) // "C"
在这个例子中,发送和接收操作都由同一个 goroutine 执行,但在真实的程序中通常由不同的 goroutine 执行。由于语法简单,新手有时会粗暴地在单个 goroutine 中将缓冲通道用作队列,这是错误的用法。
通道和 goroutine 的调度深度关联,如果没有另一个 goroutine 从通道进行接收,发送者(甚至可能是整个程序)有被永久阻塞的风险。如果只需要一个简单的队列,使用 slice
创建一个即可。
并发请求,获取最快响应
下面的例子展示了一个使用缓冲通道的应用。它并发地向三个镜像地址发请求,这些镜像指的是相同但分布在不同地理区域的服务器。它将这些服务器的响应通过一个缓冲通道进行发送,然后只接收第一个返回的响应,因为它是最早到达的。因此,mirroredQuery
函数甚至能在两个较慢的服务器还没响应之前就返回结果。有时会出现多个 goroutine 同时在一个通道上并发发送或接收的情况,如本例所示。
func mirroredQuery() string {
responses := make(chan string, 3)
go func() { responses <- request("asia.gopl.io") }()
go func() { responses <- request("europe.gopl.io") }()
go func() { responses <- request("americas.gopl.io") }()
return <-responses // 返回最早的响应
}
func request(hostname string) (response string) {
// 模拟请求的逻辑
return "response from " + hostname
}
如果使用一个无缓冲通道,两个较慢的 goroutine 将被卡住,因为当它们尝试发送响应结果到通道时,没有其他 goroutine 来接收。这种情况称为 goroutine 泄漏,是一种 bug。与变量回收不同,泄漏的 goroutine 不会自动回收,因此需要确保 goroutine 在不再需要时可以自动结束。
缓冲通道使用的注意事项
无缓冲和缓冲通道的选择以及缓冲通道容量大小的选择,都会对程序的正确性产生影响。无缓冲通道提供强同步保障,因为每一次发送都需要和一次对应的接收同步;对于缓冲通道,这些操作则是解耦的。如果我们知道要发送的值数量上限,通常会创建一个容量等于该上限的缓冲通道,在接收第一个值前就完成所有的发送。但在内存无法提供足够缓冲容量的情况下,可能会导致程序死锁。
缓冲通道的容量选择也可能影响程序的性能。想象一个蛋糕店里的三个厨师在生产线上工作,一个烤蛋糕,一个加糖衣,一个雕刻。如果每个厨师在完成一个蛋糕流程后必须等待下一个厨师准备好接受,这就类似于使用无缓冲通道来通信。
如果在厨师之间有一个可以放一个蛋糕的位置,一个厨师可以将制作好的蛋糕放在这里,然后立即开始制作下一个蛋糕,这类似于使用一个容量为 1 的缓冲通道。只要厨师们的工作速度相同,大多数工作就可以快速处理,从而消除各自之间的速率差异。如果在厨师之间有更多的空间——更长的缓冲区——就可以消除更大的暂时性速率波动,而不影响组装流水线,比如当一个厨师稍作休息时,后面再加紧跟上进度。
另一方面,如果生产线的上游持续比下游快,缓冲区会经常满。如果后续的流程更快,缓冲区通常是空的。这时缓冲区的存在是没有价值的。
PS: 通道只能消除速率波动带来的差异,例如:发送方平均速率为5,但是瞬时速率最高100;接收方平均速率为5,但是瞬时速率最高为10;这种情况下可以使用缓存通道来暂存待处理的任务,平衡双方的瞬时速率差异。反之如果双方平均速度存在差异,缓冲区会经常满或者经常空,这时缓冲区的存在是没有价值的。
2.5 并行循环
探讨一些通用的并行模式,来并行执行所有的循环迭代。
- 通过goroutine并行生成缩略图
package __goroutine
import (
"github.com/nfnt/resize"
"image"
"image/png"
"log"
"os"
"path/filepath"
)
func makeThumbnails(filenames []string) {
finished := make(chan bool)
for _, f := range filenames {
//go func() {
// thumbnail(f) // 错误示范:直接在go routine中使用了外部循环变量,可能由于变化而导致不可知的结果
// finished <- true
//}()
go func(f string) {
thumbnail(f) // 正确示范:将外部循环变量作为参数传递,中间经过一次值拷贝,和外部变量实现了隔离
finished <- true
}(f)
}
for range filenames {
<-finished
}
}
func thumbnail(f string) {
// 1. open file
log.Println(f)
file, err := os.Open(f)
if err != nil {
log.Fatal(err)
}
defer file.Close()
// 2. decode image
img, _, err := image.Decode(file)
if err != nil {
log.Fatal(err)
}
// 3. resize image
thumbnail := resize.Thumbnail(100, 100, img, resize.Lanczos3)
// 4. create a empty file
dir := filepath.Dir(f)
fileName := filepath.Base(f)
out, err := os.Create(dir + `\thumbnails_2\` + fileName)
if err != nil {
log.Fatal(err)
}
defer out.Close()
// 5. save the thumbnail as png into the empty file
err = png.Encode(out, thumbnail)
if err != nil {
log.Fatal(err)
}
log.Println("Successfully created thumbnail for " + f)
}
注意:如果使用channel通信(非缓冲)来确定goroutine执行状态,那么就需要确保1. 发送和接收操作的原子性;2.发送和接受操作次数对等。避免在发送或接受操作上增加if语句,从而导致在某些情况下退出,不再发送/接受信息,造成goroutine阻塞、泄露、永不终止。一个简单的解决方案是,根据需求创建一个有足够容量的缓冲通道,来避免因为channel通道而导致泄露发生。
- 比起通过channel通信来确定并统计子goroutine的状态,使用sync.WaitGroup计数器是一个更好的选择
func makeThumbnails(filenames []string) {
var wg sync.WaitGroup
for _, filename := range filenames {
wg.Add(1)
go func(filename string) {
defer wg.Done()
thumbnail(filename)
}(filename)
}
wg.Wait()
}
- 考虑一种特殊情况:每次创建缩略图时会返回文件的大小,然后在main goroutine中对进行累计后输出。示例如下:
func makeThumbnails(filenames []string) {
var wg sync.WaitGroup
sizeChannel := make(chan int64)
for _, filename := range filenames {
wg.Add(1)
go func(filename string) {
defer wg.Done()
size := thumbnail(filename)
sizeChannel <- size
}(filename)
}
// 1. 在此处等待,会导致所有goroutine卡在发送消息的环节,出现阻塞泄露
// wg.Wait()
// close(sizeChannel)
var total int64
for size := range sizeChannel {
total += size
}
// 2. 在此处等待,会导致主goroutine由于通道一直没关闭而无限循环等待
// wg.Wait()
// close(sizeChannel)
return total
}
解决方案1:另起一个goroutine来监督缩略图的生成任务,完成后关闭通道,主goroutine负责计算累计大小。
解决方案2:使用和任务数相等长度的缓冲通道。
func makeThumbnails(filenames []string) {
var wg sync.WaitGroup
sizeChannel := make(chan int64)
for _, filename := range filenames {
wg.Add(1)
go func(filename string) {
defer wg.Done()
size := thumbnail(filename)
sizeChannel <- size
}(filename)
}
go func() {
wg.Wait()
close(sizeChannel)
}
var total int64
for size := range sizeChannel {
total += size
}
return total
}
2.6 控制并发
无限制的并行通常不是一个好的主意,因为系统中总有瓶颈。例如,对于计算密集型应用,CPU的核数是限制因素;对于磁盘I/O操作,磁头和磁盘的数量是限制因素;对于下载流,网络带宽是限制因素;对于Web服务,本身的容量、网络的连接数是限制因素。解决方法是根据资源的可用情况限制并发的数量,以匹配适当的并行度。
我们可以使用容量为 h
的缓冲通道来建立一种并发原语,称为计数信号量。概念上,对于缓冲通道中的每一个空闲槽,都代表一个令牌,持有令牌的操作可以执行。通过发送一个值到通道中来获取令牌,从通道中接收一个值来释放令牌,从而创建一个新的空闲槽。这保证了在没有接收操作的时候,最多同时有 h
个发送操作。(尽管使用已填充槽比令牌更直观,但使用空闲槽在创建通道缓冲区之后可以省掉填充的过程。)因为通道的元素类型在这里不重要,所以我们使用 struct{}
,它所占用的空间大小为0。
var tokens = make(chan struct{}, 10)
func makeThumbnails(filenames []string) {
var wg sync.WaitGroup
for _, filename := range filenames {
wg.Add(1)
tokens <- struct{}{} // 获取令牌
go func(filename string) {
defer func() {
wg.Done()
<-tokens // 释放令牌
}()
thumbnail(filename)
}(filename)
}
wg.Wait()
}
**注意,**并发控制应该紧贴受限资源的操作,例如在I/O操作前后获取和释放令牌,以更好、更精确地控制并发。
2.7 使用select多路复用
试想一个简单的例子:我们希望一个函数定时执行,但是又希望在接收到退出指令时结束,该怎么通过通道实现?
- 通过time.Tick() 获取一个定时发送事件的通道,通过定时消息+自动阻塞获取操作实现定时执行的效果:
func countDown() {
fmt.Println("countDown")
tick := time.Tick(time.Second)
for countdown := 10; countdown > 0; countdown-- {
fmt.Println(countdown)
<-tick
}
launch()
}
func launch() {
fmt.Println("launch")
}
// PS:Tick 函数很方便使用,但是它仅仅在应用的整个生命周期中都需要时才合适。否则建议使用 time.NewTicker() + <- ticker.C的形式,因为ticker可以Stop()。
- 通过select多路复用同时监听多个通道,当检测到第一个可以执行的操作时(发送or接收),执行并结束:
package main
import (
"fmt"
"os"
"time"
)
func main() {
countDown()
}
func countDown() {
// 1. 创建abort通道
abort := make(chan struct{})
go func() {
os.Stdin.Read(make([]byte, 1))
abort <- struct{}{}
}()
// 2. 创建定时任务通道
fmt.Println("countDown")
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for countdown := 10; countdown > 0; countdown-- {
// 多路复用,同时监听两路信号
select {
case <-ticker.C:
fmt.Println(countdown)
case <-abort:
fmt.Println("launch abort")
return
}
}
launch()
}
func launch() {
fmt.Println("launch success")
}
3. 取消
有时候,我们需要让一个 goroutine 停止它当前的任务。例如,当一个 Web 服务器在处理客户请求的过程中,客户端突然断开连接。一个 goroutine 无法直接终止另一个 goroutine,因为这会导致共享变量的状态变得不确定。在上述示例中,我们通过向 abort 通道发送一个值来通知倒计时 goroutine 终止其任务。然而,如何取消多个或特定数量的 goroutine 呢?
一种可能的方法是向 abort 通道发送与要取消的 goroutine 数量相同的事件。但如果一些 goroutine 已经自行终止,那么发送过程会因为多余的计数而阻塞。而如果这些 goroutine 能够自行繁殖,发送的事件数量可能不足,从而导致有些 goroutine 无法收到取消通知。通常情况下,我们很难准确知道有多少 goroutine 正在工作。此外,当一个 goroutine 从 abort 通道接收到值后,它会利用这个值,导致其他 goroutine 无法再接收到该值。因此,为了实现取消操作,我们需要一个可靠的机制,在一个通道上广播事件,让多个 goroutine 可以感知到这个事件的发生。
回忆一下,当一个通道关闭且所有已发送的值都被接收后,随后的接收操作会立即返回零值。我们可以利用这一特性来创建一个广播机制:不在通道上发送值,而是关闭它。这样,所有正在监听该通道的 goroutine 都会立即收到通知,从而终止其当前任务。
package main
import (
"fmt"
"os"
"sync"
"sync/atomic"
"time"
)
var done = make(chan struct{})
var counter int32
/*
封装goroutine的关闭操作,利用关闭通道时的广播机制,批量关闭goroutine
*/
func cancelled() bool {
select {
case <-done:
return true
default:
return false
}
}
// CheckAndShutdownGoroutines :开启一个goroutine,根据条件判断是否关闭通道
func CheckAndShutdownGoroutines() {
go func() {
os.Stdin.Read(make([]byte, 1)) // 读一个字节
close(done)
}()
}
func DeepIterSleep(wg *sync.WaitGroup) {
for !cancelled() {
// 1. add & print counter
wg.Add(1)
atomic.AddInt32(&counter, 1)
fmt.Println(counter)
wg.Done()
// 2. sleep and generate goroutines
time.Sleep(2 * time.Second)
go DeepIterSleep(wg)
}
// 后处理,释放文件句柄、关闭通道、清空事件等操作
}
// main :启用线程
func main() {
CheckAndShutdownGoroutines()
var wg sync.WaitGroup
go DeepIterSleep(&wg)
ticker := time.NewTicker(20 * time.Second)
// 利用定时or广播机制来结束main goroutine,避免过早关闭
select {
case <-ticker.C:
fmt.Println("timeout")
case <-done:
fmt.Println("done")
}
}
4. 示例:聊天服务器
- chatServer
package __goroutine
import (
"bufio"
"fmt"
"log"
"net"
)
/*
聊天服务器架构:
1. 一个主函数,负责接收和处理连接
2. 一个广播函数,负责进行客户端的注册/管理和消息的群发,实现聊天室的效果
每个连接:
1. 一个单独的goroutine, 处理连接,创建对应的消息通道进行注册。监听客户连接中传来的消息,并通过消息通道发送给广播器
2. 一个单独的goroutine,将接收到的广播消息回写给客户端
*/
func chatServer() {
// 1. 监听端口,创建服务
listener, err := net.Listen("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
// 2. 注册广播服务
go broadcaster()
// 3. 接收连接,并注册对应的处理goroutine
for {
conn, err := listener.Accept()
if err != nil {
log.Println(err)
continue
}
go handleConn(conn)
}
}
// 1. 定义广播服务的变量
// (1)每个连接对应一个单独的通道,进行消息的发送
type client chan<- string
// (2)所有连接共享messages\entering\leaving通道,用于向广播器发送消息,通知客户的到来和离开
var (
entering = make(chan client)
leaving = make(chan client)
messages = make(chan string)
)
// 2. 定义广播服务
func broadcaster() {
// 1. 局部变量,保存所有的客户连接
clients := make(map[client]bool)
// 2. 不断地监听事件
for {
select {
// 客户注册
case cli := <-entering:
clients[cli] = true
// 客户注销
case cli := <-leaving:
delete(clients, cli)
close(cli)
// 客户注销
case msg := <-messages:
for cli := range clients {
cli <- msg
}
}
}
}
// 3. 每个客户单独地goroutine
func handleConn(conn net.Conn) {
defer conn.Close()
// 1. 创建客户独有地消息通道
ch := make(chan string)
// 2. 监听接收到消息并发送给客户
go clientWriter(conn, ch)
// 3. 向广播器注册客户
who := conn.RemoteAddr().String()
ch <- "You are " + who
messages <- who + " has arrived"
entering <- ch
// 4. 监听客户连接发送地消息,并发送到公告消息发送通道messages中
input := bufio.NewScanner(conn)
for input.Scan() {
messages <- who + ": " + input.Text()
}
// 5. 当客户断开后,注销客户
leaving <- ch
messages <- who + " has left"
}
func clientWriter(conn net.Conn, ch <-chan string) {
for msg := range ch {
fmt.Fprintln(conn, msg) // 忽略网络层面的错误
}
}
- chatClient
package main
import (
"bufio"
"fmt"
"log"
"net"
"os"
)
func main() {
conn, err := net.Dial("tcp", "127.0.0.1:8000")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
// 1. 创建一个结束通道
done := make(chan struct{})
// 2. 创建一个goroutine,监听服务端的返回消息
go func() {
scanner := bufio.NewScanner(conn)
for scanner.Scan() {
fmt.Println(scanner.Text())
}
if err := scanner.Err(); err != nil {
fmt.Fprintln(os.Stderr, "Error reading input from server:", err)
}
done <- struct{}{}
}()
// 3. 主main goroutine监听标准输入,发送给服务器
input := bufio.NewScanner(os.Stdin)
for input.Scan() {
fmt.Fprintln(conn, input.Text())
}
if err := input.Err(); err != nil {
log.Println("Error reading input:", err)
}
// 4. 等待子goroutine退出后结束
<-done
}