zl程序教程

您现在的位置是:首页 >  Java

当前栏目

Go---Go并发编程(详细)(一)

2023-03-14 22:59:19 时间

Goroutine(协程)

在java/c++中我们要实现并发编程的时候,我们通常需要自己维护一个线程池,并且需要自己去包装一个又一个的任务,同时需要自己去调度线程执行任务并维护上下文切换,这一切通常会耗费程序员大量的心智。

为此Go语言提供了 goroutine 这样的机制,goroutine的概念类似于线程,但 goroutine是由Go的运行时(runtime)调度和管理的。Go程序会智能地将 goroutine 中的任务合理地分配给每个CPU。Go语言之所以被称为现代化的编程语言,就是因为它在语言层面已经内置了调度和上下文切换的机制。

使用goroutine

`单个 goroutine`
func main() {
   //hello()
   go hello()
   // 有时 hello() 并不执行,因为程序会为main函数创建一个默认的goroutine,
   // 当main里的语句执行完goroutine也就结束了,没有 go hello() 执行的时间
   fmt.Println("main hello")
   // 为了确保go hello() 的goroutine能够执行可以延缓程序结束时间
   time.Sleep(time.Second)
}
func hello() {
   fmt.Println("hello")
}

单个 goroutine 可以通过时间延后来使这个 goroutine 被完全执行,但是当 goroutine 多到上百上千或更多时在使用 time.Sleep() 显然就没办法确定给多少时间来让 goroutine 被完全执行了,给多了影响程序效率,给少了有的 goroutine 又不会执行影响程序结果,这时候我们就要用到另一个东西那就是 sync.WaitGroup。

WaitGroup对象内部有个计时器, 最初从0 开始, 他有3个方法 Add() , Done(), Wait()用来控制计数器的数量。 Add(n) 把计数器设置成n, Done() 每次把计数器-1, wait() 会阻塞代码的运行, 直到计数器的值减为0。将 goroutine 所剩数量与WaitGroup结合可以解决上述问题

var wg sync.WaitGroup
func main() {
   for i := 0; i < 10; i++ {
      // 每添加一个 goroutine wg + 1
      wg.Add(1)
      go sayNum(i)
   }
   // 等待 wg = 0 在执行后面的代码
   wg.Wait()
   fmt.Println("end")
}
func sayNum(i int) {
   // 当一个 goroutine 结束就 - 1
   defer wg.Done()
   fmt.Println(i)
}

当我们运行这个代码就会发现每次的输出都不同,这是因为这10个 goroutine 的执行是并发的,而调度却是随机的

goroutine与线程

goroutine的栈是可增长的

OS线程(操作系统线程)一般都有固定的栈内存(通常为2MB),一个goroutine的栈在其生命周期开始时只有很小的栈(典型情况下2KB),goroutine的栈不是固定的,他可以按需增大和缩小,goroutine的栈大小限制可以达到1GB,虽然极少会用到这个大。所以在Go语言中一次创建十万左右的goroutine也是可以的。

goroutine调度

GPM是Go语言运行时(runtime)层面的实现,是go语言自己实现的一套调度系统。区别于操作系统调度OS线程。

  • 1.G很好理解,就是个goroutine的,里面除了存放本goroutine信息外 还有与所在P的绑定等信息。
  • 2.P管理着一组goroutine队列,P里面会存储当前goroutine运行的上下文环境(函数指针,堆栈地址及地址边界),P会对自己管理的goroutine队列做一些调度(比如把占用CPU时间较长的goroutine暂停、运行后续的goroutine等等)当自己的队列消费完了就去全局队列里取,如果全局队列里也消费完了会去其他P的队列里抢任务。
  • 3.M(machine)是Go运行时(runtime)对操作系统内核线程的虚拟, M与内核线程一般是一一映射的关系, 一个groutine最终是要放到M上执行的;

P与M一般也是一一对应的。他们关系是: P管理着一组G挂载在M上运行。当一个G长久阻塞在一个M上时,runtime会新建一个M,阻塞G所在的P会把其他的G 挂载在新建的M上。当旧的G阻塞完成或者认为其已经死掉时 回收旧的M。

P的个数是通过runtime.GOMAXPROCS设定(最大256),Go1.5版本之后默认为物理线程数。 在并发量大的时候会增加一些P和M,但不会太多,切换太频繁的话得不偿失。

单从线程调度讲,Go语言相比起其他语言的优势在于OS线程是由OS内核来调度的,goroutine则是由Go运行时(runtime)自己的调度器调度的,这个调度器使用一个称为m:n调度的技术(复用/调度m个goroutine到n个OS线程)。 其一大特点是goroutine的调度是在用户态下完成的, 不涉及内核态与用户态之间的频繁切换,包括内存的分配与释放,都是在用户态维护着一块大的内存池, 不直接调用系统的malloc函数(除非内存池需要改变),成本比调度OS线程低很多。 另一方面充分利用了多核的硬件资源,近似的把若干goroutine均分在物理线程上, 再加上本身goroutine的超轻量,以上种种保证了go调度方面的性能。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-PKmOgZxa-1637983441094)(image-20211124133622706.png)]

Goroutine池使用实例

作用:Goroutine池可以有效控制goroutine数量,防止goroutine数量暴涨

需求:

  • 计算一个数字的各个位数之和,例如数字123,结果为1+2+3=6
  • 随机生成数字进行计算,一直生成,一直计算
package main
import (
    "fmt"
    "math/rand"
)
type Job struct {
    // id
    Id int
    // 需要计算的随机数
    RandNum int
}
type Result struct {
    // 这里必须传对象实例
    job *Job
    // 求和
    sum int
}
func main() {
    // 需要2个管道
    // 1.job管道
    jobChan := make(chan *Job, 128)
    // 2.结果管道
    resultChan := make(chan *Result, 128)
    // 3.创建工作池
    createPool(64, jobChan, resultChan)
    // 4.开个打印的协程
    go func(resultChan chan *Result) {
        // 遍历结果管道打印
        for result := range resultChan {
            fmt.Printf("job id:%v randnum:%v result:%d
", result.job.Id,
                result.job.RandNum, result.sum)
        }
    }(resultChan)
    var id int
    // 循环创建job,输入到管道
    for {
        id++
        // 生成随机数
        r_num := rand.Int()
        job := &Job{
            Id:      id,
            RandNum: r_num,
        }
        jobChan <- job
    }
}
// 创建工作池
// 参数1:开几个协程
func createPool(num int, jobChan chan *Job, resultChan chan *Result) {
    // 根据开协程个数,去跑运行
    for i := 0; i < num; i++ {
        go func(jobChan chan *Job, resultChan chan *Result) {
            // 执行运算
            // 遍历job管道所有数据,进行相加
            for job := range jobChan {
                // 随机数接过来
                r_num := job.RandNum
                // 随机数每一位相加
                // 定义返回值
                var sum int
                for r_num != 0 {
                    tmp := r_num % 10
                    sum += tmp
                    r_num /= 10
                }
                // 想要的结果是Result
                r := &Result{
                    job: job,
                    sum: sum,
                }
                //运算结果扔到管道
                resultChan <- r
            }
        }(jobChan, resultChan)
    }
}

部分结果(控制台打印):

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-x1cOXqLo-1637983441095)(image-20211124211848787.png)]

runtime包

runtime.Gosched()

退出当前的 goroutine ,为其他 goroutine 腾出执行空间,最后再执行被退出的 goroutine

中文文档给了我们一个特别有意思的比喻:

(大概意思就是本来计划的好好的周末出去烧烤,但是你妈让你去相亲,两种情况第一就是你相亲速度非常快,见面就黄不耽误你继续烧烤,第二种情况就是你相亲速度特别慢,见面就是你侬我侬的,耽误了烧烤,但是还馋就是耽误了烧烤你还得去烧烤)

package main
import (
    "fmt"
    "runtime"
)
func main() {
    // 让所有协程在一个核上执行
    runtime.GOMAXPROCS(1)
    go func(s string) {
        for i := 0; i < 2; i++ {
            fmt.Println(s,i)
        }
    }("协程运行中:")
    // 主协程
    for i := 0; i < 2; i++ {
        fmt.Println("hello")
        // 停一下,再次分配任务
        runtime.Gosched()
        fmt.Println("world",i)
    }
}

runtime.Goexit()

退出当前的 goroutine ,以后也不会执行

(一边烧烤一边相亲,突然发现相亲对象太丑影响烧烤,果断让她滚蛋,然后也就没有然后了)

package main
import (
    "fmt"
    "runtime"
)
func main() {
    go func() {
        defer fmt.Println("A.defer")
        func() {
            defer fmt.Println("B.defer")
            // 结束协程
            runtime.Goexit()
            defer fmt.Println("C.defer")
            fmt.Println("B")
        }()
        fmt.Println("A")
    }()
    for {
    }
}

runtime.GOMAXPROCS

Go运行时的调度器使用GOMAXPROCS参数来确定需要使用多少个OS线程来同时执行Go代码。默认值是机器上的CPU核心数。例如在一个8核心的机器上,调度器会把Go代码同时调度到8个OS线程上(GOMAXPROCS是m:n调度中的n)。

Go语言中可以通过runtime.GOMAXPROCS()函数设置当前程序并发时占用的CPU逻辑核心数。

运行下面两个代码实例有助于理解核的数量对协程执行的影响

func a() {
    for i := 1; i < 10; i++ {
        fmt.Println("A:", i)
    }
}
func b() {
    for i := 1; i < 10; i++ {
        fmt.Println("B:", i)
    }
}
func main() {
    // 设定为单核执行
    runtime.GOMAXPROCS(1)
    go a()
    go b()
    time.Sleep(time.Second)
}  
func a() {
    for i := 1; i < 10; i++ {
        fmt.Println("A:", i)
    }
}
func b() {
    for i := 1; i < 10; i++ {
        fmt.Println("B:", i)
    }
}
func main() {
    // 设定为双核执行
    runtime.GOMAXPROCS(2)
    go a()
    go b()
    time.Sleep(time.Second)
}  

操作系统线程和goroutine的关系

  • 1.一个操作系统线程对应用户态多个goroutine。
  • 2.go程序可以同时使用多个操作系统线程。
  • 3.goroutine和OS线程是多对多的关系,即m:n。

channel

我们设置函数的意义就是为了在特定的输入下获取到特定的输出,如果只是让函数一味的并发而不进行值的传递,那么这个并发就是没有意义的。

如果说goroutine是Go程序并发的执行体,channel就是它们之间的连接。channel是可以让一个goroutine发送特定值到另一个goroutine的通信机制。

channel是个特殊的类型,通道类似传送带或是队列,遵循先进先出(First In First Out)的原则。

通道的声明和初始化

`每个通道都是特定类型的,在声明时需要指明通道里传输的元素类型`
var 变量 chan 元素类型
`例`
var ch chan int
println(ch)         // 0x0   空值为 nil

声明的通道后需要使用make函数初始化之后才能使用。初始化后的通道空值为一个十六进制的地址

`在不进行初始化的情况下使用通道会报 deadlock`
`其中缓存大小是可选项`
make(chan 元素类型, [缓冲大小])
`例`
var ch1,ch2 chan int
ch1 = make(chan int)    // 无缓存的通道ch1
ch2 = make(chan int,20) // 缓存大小为20的通道ch2

也可以直接定义通道

ch3 := make(chan int)