zl程序教程

您现在的位置是:首页 >  其他

当前栏目

Go select语句及其相关实例 【Go语言圣经笔记】

2023-04-18 12:30:32 时间

基于select的多路复用

下面的程序会进行类似于火箭发射的倒计时的工作。time.Tick函数返回一个channel,程序会周期性地像一个节拍器一样向这个channel发送事件。每一个事件的值是一个时间戳,不过更有意思的是值的传送方式。

// gopl.io/ch8/countdown1
func main() {
    fmt.Println("Commencing countdown.")
    tick := time.Tick(1*time.Second)  // channel
    for countdown:=10; countdown>0; countdown-- {
        fmt.Println(countdown)
        <-tick
    }
    launch()
}

现在我们让这个程序支持在倒计时中,用户按下return键时直接中断发射流程。首先,我们启动一个goroutine,这个goroutine会尝试从标准输入中读入一个单独的byte并且,如果成功了,会向名为abort的channel发送一个值。

// gopl.io/ch8/countdown2
abort := make(chan struct{})
go func() {
    os.Stdin.Read(make([]byte, 1))
    abort <- struct{}{}
}()

现在每一次计数循环的迭代都需要等待两个channel中的其中一个返回事件了:当一切正常时的ticker channel(就像NASA jorgon的"nominal",译注:这梗估计我们是不懂了)或者异常时返回的abort事件。我们无法做到从每一个channel中接收信息,如果我们这么做的话,如果第一个channel中没有事件发过来那么程序就会立刻被阻塞,这样我们就无法收到第二个channel中发过来的事件。这时候我们需要多路复用(multiplex)这些操作了,为了能够多路复用,我们使用了select语句

select {
case <-ch1:
    // ...
case x := <-ch2:
    // ...use x...
case ch3 <- y:
    // ...
default:
    // ...
}

上面是select语句的一般形式。和switch语句稍微有点相似,也会有几个case和最后的default选择分支。每一个case代表一个通信操作在某个channel上进行发送或者接收),并且会包含一些语句组成的一个语句块。一个接收表达式可以只包含接收表达式自身(笔者注:这种情况下channel发送的值没有赋值给某个变量,自然也无法引用),就像上面的第一个case,或者包含在一个简短的变量声明中,像第二个case里一样;第二种形式让你能够引用接收到的值。

select会等待case中有能够执行的case时去执行当条件满足时,select才会去通信并执行case之后的语句并且此时其它通信是不会执行的一个没有任何case的select语句写作select{},会永远地等待下去

让我们回到我们的火箭发射程序。time.After函数会立即返回一个channel,并起一个新的goroutine在经过特定的时间后向该channel发送一个独立的值。下面的select语句会一直等待直到两个事件中的一个到达,无论是abort事件或者一个10秒经过的事件。如果10秒经过了还没有abort事件进入,那么火箭就会发射。

func main() {
    // ...create abort channel...
    
    fmt.Println("Commencing countdonw. Press return to abort.")
    select {
    case <- time.After(10 * time.Second)
        // do nothing time.After本身即是计时器 这里直接被调用
    case <- abort:
        fmt.Println("lanuch abort") 
        return  // return 实现终止
    }
    launch()
}

下面这个例子更微妙。ch这个channel的buffer大小是1,所以会交替的为空或为满,所以只有一个case可以进行下去,无论i是奇数或者偶数,它都会打印0 2 4 6 8。

ch := make(chan int, 1)
for i:=0; i<10; i++ {
    select {
    case x := <-ch:
        fmt.Println(x)  // 0 2 4 6 8
    case ch <- i:
    }
}

笔者注:为什么?注意select语句下的case对应的是通信行为,也就是说,如果没有发送个channel这一行为,第一个case永远不会执行(第一次输出零值是由于缓冲大小为1),因此i为单数的时候只能运行第二个case向channel发送,i为偶数是由于存在发送行为才能运行第一个case。

如果多个case同时就绪时,select会随机地选择一个执行,这样来保证每一个channel都有平等的被select的机会。增加前一个例子的buffer大小(笔者注:>1)会使其输出变得不确定,因为当buffer既不为满也不为空时,select语句的执行情况就像是抛硬币的行为一样是随机的。

下面让我们的发射程序打印倒计时。这里的select语句会使每次循环迭代等待一秒来执行退出操作。

// gopl.io/ch8/countdown3

func main() {
    // ...create abort channel...

    fmt.Println("Commecing countdown. Press return to abort.")
    tick := time.Tick(1*time.Second)
    for countdown := 10; countdown > 0; countdown-- {
        fmt.Println(countdown)
        select {
        case <- tick:
            // do noting count down
        case <- abort:
            fmt.Println("Launch aborted")
            return
        }
    }
    launch()
}

time.Tick函数表现得好像它创建了一个在循环中调用time.Sleep的goroutine,每次被唤醒时发送一个事件。当countdown函数返回时,它会停止从tick中接收事件,但是ticker这个goroutine还依然存活,继续徒劳地尝试向channel中发送值,然而这时候已经没有其它的goroutine会从该channel中接收值了–这被称为goroutine泄露(§8.4.4)。

Tick函数挺方便,但是只有当程序整个生命周期都需要这个时间时我们使用它才比较合适。否则的话,我们应该使用下面的这种模式:

ticker := time.NewTicker(1 * time.Second)
<- ticker.C    // receive from the ticker's channel
ticker.Stop()  // cause the ticker's goroutine to terminate

有时候我们希望能够从channel中发送或者接收值,并避免因为发送或者接收导致的阻塞,尤其是当channel没有准备好写或者读时,select语句就可以实现这样的功能select可以有一个default来设置当其它的操作都不能够马上被处理时程序需要执行哪些逻辑

下面的select语句会在abort channel中有值时,从其中接收值,无值时什么都不做。这是一个非阻塞的接收操作。反复地做这样的操作叫做“轮询channel”。

select {
case <- abort:
    fmt.Printf("Launch aborted
")
    return
default:
    // do nothing      
}

channel的零值是nil。也许会让你觉得比较奇怪,nil的channel有时候也是有一些用处的。因为对一个nil的channel发送和接收操作会永远阻塞在select语句中操作nil的channel永远都不会被select到

这使得我们可以用nil来激活或者禁用case来达成处理其它输入或输出事件时超时和取消的逻辑。我们会在下一节中看到一个例子。

示例:并发的目录遍历

du = disk usage

在本小节中,我们会创建一个程序来生成指定目录的硬盘使用情况报告,这个程序和Unix里的du工具比较相似。大多数工作用下面这个walkDir函数来完成,这个函数使用dirents函数来枚举一个目录下的所有入口。

// gopl.io/ch8/du1
// walkDir recursively walks the file tree rooted a dir
// and sends the size of each found file on fileSizes

func walkDir(dir string, fileSize chan<- int64) {
    for _, entry := range dirents(dir) { // 这个函数声明在下面
        if entry.IsDir() {
            subdir := filepath.Join(dir, entry.Name())
            walkDir(subdir, fileSizes)
        } else {
            fileSizes <- entry.Size()    
        }
    }
}


// dirents returns the entires of directory dir
func dirents(dir string) []os.FileInfo {
    entries, err := ioutil.ReadDir(dir)
    if err != nil {
        fmt.Fprintf(os.Stderr, "du1: %v
", err)
        return nil
    }
    return entries
}

ioutil.ReadDir函数会返回一个os.FileInfo类型的slice,os.FileInfo类型也是os.Stat这个函数的返回值。对每一个子目录而言,walkDir会递归地调用其自身,同时也在递归里获取每一个文件的信息。walkDir函数会向fileSizes这个channel发送一条消息。这条消息包含了文件的字节大小。

下面的主函数,用了两个goroutine。后台的goroutine调用walkDir来遍历命令行给出的每一个路径并最终关闭fileSizes这个channel。主goroutine会对其从channel中接收到的文件大小进行累加,并输出最终的和。

package main

import (
    "flag"
    "fmt"
    "io/ioutil"
    "os"
    "path/filepath"
)

func main() {
    // Determine the initial directories
    flag.Parse()  // flag用于获取命令行参数 前面章节的例子中出现过
    roots := flag.Args()
    if len(roots) == 0 {
        roots = []string{"."}
    }
    
    // Traverse the file tree
    fileSiezes := make(chan int64)
    go func() {
        for _, root := range roots {
            walkDir(root, fileSizes)
        }
        close(fileSizes)
    }()
    
    
    // Print the results
    var nfiles, nbytes int64
    for size := range fileSizes {
        nfiles++
        nbytes += size
    }
    printDiskUsage(nfiles, nbytes)
}


func printDiskUsage(nfiles, nbytes int64) {
    fmt.Printf("%d files %.1f GB
", nfiles, float64(nbytes)/1e9)
} 

这个程序会在打印其结果之前卡住很长时间。

$ go build gopl.io/ch8/du1
$ ./du1 $HOME /usr /bin /etc
213201 files  62.7 GB

下面这个du的变种会间歇打印内容,不过只有在调用时提供了-v的flag才会显示程序进度信息。在roots目录上循环的后台goroutine在这里保持不变。主goroutine现在使用了计时器来每500ms生成事件,然后用select语句来等待文件大小的消息来更新总大小数据,或者一个计时器的事件来打印当前的总大小数据。如果-v的flag在运行时没有传入的话,tick这个channel会保持为nil,这样在select里的case也就相当于被禁用了。

// // gopl.io/ch8/du2
var verbose = flag.Bool("v", false, "show verbose progress messages")

func main() {
    // ...start background goroutine...
    
    // Print the results periodically
    var tick <-chan time.Time  // 单向channel
    if *verbose {
        tick = time.Tick(500*time.Millisecond)
    }
    var nfiles, nbytes int64
loop:  // loop关键字 本书第一次出现 
    for {
        select {
        case size, ok := <-fileSizes:
            if !ok {
                break loop  // fileSizes was closed
            }
            nfiles++
            nbytes += 1
        case <-tick:
            printDiskUsage(nfiles, nbytes)
        }
    }
    printDiskUsage(nfile, nbytes)  // final totals
}

由于我们的程序不再使用range循环,第一个select的case必须显式地判断fileSizes的channel是不是已经被关闭了,这里可以用到channel接收的二值形式。如果channel已经被关闭了的话,程序会直接退出循环。这里的break语句用到了标签break,这样可以同时终结select和for两个循环如果没有用标签就break的话只会退出内层的select循环而外层的for循环会使之进入下一轮select循环

现在程序会悠闲地为我们打印更新流:

$ go build gopl.io/ch8/du2
$ ./du2 -v $HOME /usr /bin /etc
28608 files  8.3 GB
54147 files  10.3 GB
93591 files  15.1 GB
127169 files  52.9 GB
175931 files  62.2 GB
213201 files  62.7 GB

然而这个程序还是会花上很长时间才会结束。完全可以并发调用walkDir,从而发挥磁盘系统的并行性能。下面这个第三个版本的du,会对每一个walkDir的调用创建一个新的goroutine。它使用sync.WaitGroup (§8.5)(笔者注:一个同步信号量,前面出现过)来对仍旧活跃的walkDir调用进行计数,另一个goroutine会在计数器减为零的时候将fileSizes这个channel关闭。

// gopl.io/ch8/du3

func main() {
    // deterimine roots, traverse each root of the file tree in parallel.
    fileSizes := make(chan int64)
    var n sync.WaitGroup
    for _, root := range roots {
        n.Add(1)  // 初始化先给1 否则下面的goroutine将直接关闭channel
        go walkDir(root, &n, fileSizes)
    }
    go func() {
        n.Wait()
        close(fileSizes)
    }()
// select loop 同上
loop:
    for {
        select {
        case size, ok := <-fileSizes:
            if !ok {
                break loop  // fileSizes was closed
            }
            nfiles++
            nbytes += 1
        case <-tick:
            printDiskUsage(nfiles, nbytes)
        }
    }
    printDiskUsage(nfile, nbytes)  // final totals
}

func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) {
    defer n.Done()
    for _, entry := range dirents(dir) {
        if entry.IsDir() {
            n.Add(1)
            subdir := filepath.Join(dir, entry.Name())
            go walkDir(subdir, n, fileSizes)  // 注意,原来的递归 这里变成了 新建一个goroutine
        } else {
            fileSizes <- entry.Size() 
        }
    }
}

由于这个程序在高峰期会创建成百上千的goroutine,我们需要修改dirents函数,用计数信号量来阻止他同时打开太多的文件,就像我们在8.7节中的并发爬虫一样:

// sema is a counting semaphore for limiting concurrency in dirents
var sema = make(chan struct{}, 20)

// dirents returns the entries of diertory dir
func dirents(dir string) []os.FileInfo {
    sema <- struct{}{}        // acquire token
    defer func() { <-sema }()  // release token 括号表示匿名函数声明后直接被调用 勿忘
    // ...
}

这个版本比之前那个快了好几倍,尽管其具体效率还是和你的运行环境,机器配置相关。

并发的退出

有时候我们需要通知goroutine停止它正在干的事情,比如一个正在执行计算的web服务,然而它的客户端已经断开了和服务端的连接。

Go语言并没有提供在一个goroutine中终止另一个goroutine的方法因为这样会导致goroutine之间的共享变量落在未定义的状态上。在8.7节中的rocket launch程序中,我们往名字叫abort的channel里发送了一个简单的值,在countdown的goroutine中会把这个值理解为自己的退出信号。但是如果我们想要退出两个或者任意多个goroutine怎么办呢?

一种可能的手段是向abort的channel里发送和goroutine数目一样多的事件来退出它们。如果这些goroutine中已经有一些自己退出了,那么会导致我们的channel里的事件数比goroutine还多,这样导致我们的发送直接被阻塞另一方面,如果这些goroutine又生成了其它的goroutine,我们的channel里的数目又太少了,所以有些goroutine可能会无法接收到退出消息。一般情况下我们是很难知道在某一个时刻具体有多少个goroutine在运行着的。另外,当一个goroutine从abort channel中接收到一个值的时候,他会消费掉这个值,这样其它的goroutine就没法看到这条信息。为了能够达到我们退出goroutine的目的,我们需要更靠谱的策略,来通过一个channel把消息广播出去,这样goroutine们能够看到这条事件消息,并且在事件完成之后,可以知道这件事已经发生过了。

回忆一下我们关闭了一个channel并且被消费掉了所有已发送的值,操作channel之后的代码可以立即被执行,并且会产生零值。我们可以将这个机制扩展一下,来作为我们的广播机制不要向channel发送值,而是用关闭一个channel来进行广播

只要一些小修改,我们就可以把退出逻辑加入到前一节的du程序。首先,我们创建一个退出的channel,不需要向这个channel发送任何值,但其所在的闭包内要写明程序需要退出。我们同时还定义了一个工具函数,cancelled,这个函数在被调用的时候会轮询退出状态。

// gopl.io/ch8/du4
var done = make(chan struct{})

func cancelled() bool {
    select {
    case <-done:
        return true
    default:
        return false    
    }
}

下面我们创建一个从标准输入流中读取内容的goroutine,这是一个比较典型的连接到终端的程序。每当有输入被读到(比如用户按了回车键),这个goroutine就会把取消消息通过关闭done的channel广播出去。

// Cancel traversal when input is detected
go func() {
    os.Stdin.Read(make([]byte, 1))  // read a single byte
    close(done)
}()

现在我们需要使我们的goroutine来对取消进行响应。在main goroutine中,我们添加了select的第三个case语句,尝试从done channel中接收内容。如果这个case被满足的话,在select到的时候即会返回,但在结束之前我们需要把fileSizes channel中的内容“排”空,在channel被关闭之前,舍弃掉所有值。这样可以保证对walkDir的调用不要被向fileSizes发送信息阻塞住,可以正确地完成。

for {
    select{
    case <- done:
        // drain fileSizes to allow existing goroutines to finish
        for range fileSizes {
            // finish the job
        }
        return
    case size, ok := fileSizes:
        // ...    
    }
}

walkDir这个goroutine一启动就会轮询取消状态,如果取消状态被设置的话会直接返回,并且不做额外的事情。这样我们将所有在取消事件之后创建的goroutine改变为无操作。

func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) {
    defer n.Done()  // 无论是接收到
    if cancelled() {
        return
    }
    for _, entry := range dirents(dir) {
        // ...
    }
}

在walkDir函数的循环中我们对取消状态进行轮询可以带来明显的益处,可以避免在取消事件发生时还去创建goroutine。取消本身是有一些代价的;想要快速的响应需要对程序逻辑进行侵入式的修改。确保在取消发生之后不要有代价太大的操作可能会需要修改你代码里的很多地方,但是在一些重要的地方去检查取消事件也确实能带来很大的好处

对这个程序做一个简单的性能分析能揭示出瓶颈卡在dirents函数中获取一个信号量。下面的select可以让这种操作可以被取消,并且可以将取消时的延迟从几百毫秒降低到几十毫秒。

func dirents(dir string) []os.fileInfo {
    select {
    case sema <- struct{}{}  // acquire token
    case <-done:
        return nil          // cancelled
    }
    defer func() { <-sema }() // release token
    // ...read directory...
}

现在当取消发生时,所有后台的goroutine都会迅速停止并且主函数会返回。当然,当主函数返回时,一个程序会退出,而我们又无法在主函数退出的时候确认其已经释放了所有的资源。这里有一个方便的窍门我们可以一用:取代掉直接从主函数返回,我们调用一个panic,然后runtime会把每一个goroutine的栈dump下来如果main goroutine是唯一一个剩下的goroutine的话,他会清理掉自己的一切资源。但是如果还有其它的goroutine没有退出,他们可能没办法被正确地取消掉,也有可能被取消但是取消操作会很花时间;所以这里的一个调研还是很有必要的。我们用panic来获取到足够的信息来验证我们上面的判断,看看最终到底是什么样的情况

示例: 聊天服务

我们用一个聊天服务器来终结本章节的内容,这个程序可以让一些用户通过服务器向其它所有用户广播文本消息。这个程序中有四种goroutine。main和broadcaster各自是一个goroutine实例,每一个客户端的连接都会有一个handleConn和clientWriter的goroutine。broadcaster是select用法的不错的样例,因为它需要处理三种不同类型的消息。

下面演示的main goroutine的工作内容,可以归结为listen和accept从客户端过来的连接。对每一个连接,程序都会建立一个新的handleConn的goroutine,就像我们在本章开头的并发的echo服务器里所做的那样。

// gopl.io/ch8/chat

func main() {
    listener, err := net.Listen("tcp", "localhost:8000")
    if err != nil {
        log.Fatal(err)
    }
    go broadcaster()
    for {  // 轮询
        conn, err := listener.Accept()
        if err != nil {
            log.Print(err)
            continue
        }
        go handleConn(conn)
    }
}

然后是broadcaster的goroutine。他的内部变量clients会记录当前建立连接的客户端集合。其记录的内容是每一个客户端的消息发出channel的"资格"信息。

type client chan<- string  // an outgoing message channel 单向字符串通道

var (
    entering = make(chan client)  // 注意 client本身是通道类型 这样entering是接收client通道的通道 这种用法在本书中首次出现
    leaving = make(chan client)
    messages = make(chan string)  // all incoming client messages
)

func broadcaster() {
    clients := make(map[client] bool)  // all connected clients 
                                    // 使用通道作为mao的key 所有可比较类型都能作为map的key
    for {
        select {
        case msg := <-messages:
            // broadcast incoming message to all clients' outgoing message channels
            for cli := range clients {
                cli <- msg
            }
        case cli := <-entering:
            clients[cli] = true
        case cli := <-leaving:
            delete(clients, cli)
            close(cli)
        }
    }

}

broadcaster监听来自全局的entering和leaving的channel来获知客户端的到来和离开事件。当其接收到其中的一个事件时,会更新clients集合,当该事件是离开行为时,它会关闭客户端的消息发送channel。broadcaster也会监听全局的消息channel,所有的客户端都会向这个channel中发送消息。当broadcaster接收到什么消息时,就会将其广播至所有连接到服务端的客户端。

现在让我们看看每一个客户端的goroutine。handleConn函数会为它的客户端创建一个消息发送channel并通过entering channel来通知客户端的到来。然后它会读取客户端发来的每一行文本,并通过全局的消息channel来将这些文本发送出去,并为每条消息带上发送者的前缀来标明消息身份。当客户端发送完毕后,handleConn会通过leaving这个channel来通知客户端的离开并关闭连接。

func handleConn(conn net.Conn) {
    ch := make(chan string)    // outgoing client messages
    go clientWriter(conn, ch)
    
    who := conn.RemoteAddr().String()
    ch <- "You are " + who
    messages <- who + "has arrived"
    entering <- ch
    
    input := bufio.NewScanner(conn)
    for input.Scan() {
        messages <- who + ": " + input.Text()
    }
    // note: ignoring pontential errors from input.Err()
    
    leaving <- ch
    messages <- who + " has left"
    conn.Close()
}

func clientWriter(conn net.Conn, ch <-chan string) {
    for msg := range ch {
        fmt.Fprintln(conn, msg)  // NOTE: ignoring network errors
    }
}

另外,handleConn为每一个客户端创建了一个clientWriter的goroutine,用来接收向客户端发送消息的channel中的广播消息,并将它们写入到客户端的网络连接。客户端的读取循环会在broadcaster接收到leaving通知并关闭了channel后终止。

笔者注:本例中使用client(单向string通道)作为同步多个goroutine的信息,而不是string。因此 需要关闭通道的操作close(cli)

下面演示的是当服务器有两个活动的客户端连接,并且在两个窗口中运行的情况,使用netcat来聊天:

$ go build gopl.io/ch8/chat
$ go build gopl.io/ch8/netcat3
$ ./chat &
$ ./netcat3
You are 127.0.0.1:64208               $ ./netcat3
127.0.0.1:64211 has arrived           You are 127.0.0.1:64211
Hi!
127.0.0.1:64208: Hi!                  127.0.0.1:64208: Hi!
                                      Hi yourself.
127.0.0.1:64211: Hi yourself.         127.0.0.1:64211: Hi yourself.
^C
                                      127.0.0.1:64208 has left
$ ./netcat3
You are 127.0.0.1:64216               127.0.0.1:64216 has arrived
                                      Welcome.
127.0.0.1:64211: Welcome.             127.0.0.1:64211: Welcome.
                                      ^C
127.0.0.1:64211 has left”

当与n个客户端保持聊天session时,这个程序会有2n+2个并发的goroutine,然而这个程序却并不需要显式的锁(§9.2)。clients这个map被限制在了一个独立的goroutine中,broadcaster,所以它不能被并发地访问。多个goroutine共享的变量只有这些channelnet.Conn的实例,两个东西都是并发安全的。我们会在下一章中更多地讲解约束,并发安全以及goroutine中共享变量的含义。