zl程序教程

您现在的位置是:首页 >  后端

当前栏目

Go 同步原语:sync 包让你对并发控制得心应手

2023-09-14 09:15:15 时间

channel 为什么是并发安全的呢?

是因为 channel 内部使用了互斥锁来保证并发的安全。

在 Go 语言中,不仅有 channel 这类比较易用且高级的同步机制,还有 sync.Mutex、sync.WaitGroup 等比较原始的同步机制。通过它们,我们可以更加灵活地控制数据的同步和多协程的并发。

理解线程安全


为什么线程会有不安全的情况?这个就跟cpu多核架构是非常相关的。

一个程序跑了多个线程,共享内存是java多线程的通信机制。上图有两个线程去同时访问一块内存,在多核体系架构里面 ,这两个线程从内核层面会被调度到多个cpu上面去,在计算机里面访问内存虽然比较快,内存访问是相对比较快的一个速度,但是针对cpu的速度来说还是远远不及的,如果每次内存的读取都去物理内存里面读取,整个程序的效率就不会高。

为了解决这个问题,在计算机里面每一个cpu内部都有一段缓存,当线程1,2同时去访问这一组变量,这组变量在被第一次读取的时候都是去物理内存中读区,读取完之后这个变量会被缓存到每一个cpu cache里面,这个时候就会出现问题。假设当前打线程对键值对做了一个修改,因为修改会先修改到本地缓存,并不会立即将修改后的结果写入到内存里面,在线程2里面看到的依然是原值,这个时候两个线程对同一个变量读取到的值就不一样了。

在做多线程代码编写,比如第一个线程已经set了一个值第二个线程里面去读的时候它就应该是新值,但是事实上不是这样的,因为缓存并没有同步,这样就容易导致你的程序出现bug。

为了解决这些问题,就是通过锁来解决。


锁:在修改内存的时候,当多个线程要访问同一块共享内存的时候,需要加一个锁,如果需要去写这份数据,这份数据会被同步回主内存里面,其他线程要访问这块内存的时候也需要加锁,而且是一个互斥锁,所以当线程1对这块内存加锁的时候,线程2是没有办法访问这块内存的。只有线程1将锁释放之后,这样线程2才能获取到锁,访问这块内存。通过这种方式实现了多线程的一个安全访问。 这样就能够确保一个线程修改的数据,另外一个线程看到的也是最新的。
go语言多线程之间的通信,我们鼓励使用channel,一边发,一边接。这样就将锁的原有的复杂性给隐藏起来了,对开发人员更加友好。
但是你一定想通过共享内存的方式去访问,去做多线程的协调,可以吗?可以的,go语言也是支持的,支持的还蛮强大的,比如sync包。
Go 语言不仅仅提供基于 CSP 的通讯模型,也支持基于共享内存的多线程数据访问
Sync 包提供了锁的基本原语
sync.Mutex 互斥锁
    • Lock()加锁,Unlock()解锁
    • sync.RWMutex 读写分离锁    不限制并发读,只限制并发写和并发读写
sync.WaitGroup (你启动啦很多的协程,你希望这些协程全部执行完以后,下面的逻辑才会运行,那么这个场景就使用waitgroup)
             等待一组 goroutine 返回
sync.Once
            保证某段代码只执行一次
            sync.Cond
            让一组 goroutine 在满足特定条件时被唤醒

Mutex 示例


在Kubernetes也经常会使用锁,比如Kubernetes里面有个著名的设计模式,或者叫做informer。

informer的主要目的是用来监听Kubernetes的对象,informer本身会有一个start,start里面有一个锁,这个地方加锁是因为要去遍历这个informer,这个informer是一个map。map本身是不能并发读写的,如果针对map,一个线程里面修改里面的值,另外一个map在读,或者说两个线程同时修改map里面的同一个key,这个时候程序就会发生冲突,整个程序就会panic。

这个时候程序就会认为有多个人修改map,它不知道怎么处理了,因为map的原语是没有线程安全保护的,如果你的程序有这个场景,就需要通过加锁来实现。(通过lock加锁之后,要记得unlock掉,否则会出现死锁)

https://github.com/lovekeepcoding/golang/blob/master/examples/module2/mutex/main.gohttps://github.com/lovekeepcoding/golang/blob/master/examples/module2/mutex/main.go

func rLock() {
	lock := sync.RWMutex{}
	for i := 0; i < 3; i++ {
		lock.RLock()
		defer lock.RUnlock()
		fmt.Println("rLock:", i)
	}
}

循环3次,每次获取R锁,这里有个defer lock.RUnlock(),这个不是for循环完成之后运行的,也就是整个程序完成以后运行的。

对于读锁来说,读锁是不互斥的,所以可以循环三次。

func wLock() {
	lock := sync.RWMutex{}
	for i := 0; i < 3; i++ {
		lock.Lock()
		defer lock.Unlock()
		fmt.Println("wLock:", i)
	}

这个lock默认就是写锁,写锁是互斥的,获取了锁,但是在for循环里面是释放不掉锁的,第二次进来之后就走不下去了。因为第二个循环获取不到锁,这个锁是互斥的。

那么mutex逻辑也是一模一样的,行为和上面的wlock是一样的。

WaitGroup 示例


waitgroup是什么样的场景下使用呢?

waitgroup主要是用来协调主线程和子线程之间关系的

一个主的线程,要启动多个子线程,但是我希望多个子线程处理完之后我的主程序才能够继续,在Kubernetes代码里面就有这样例子。

这里有个函数叫做createBatch,然后传入了一堆的pod,之后定义了一个waitgroup,然后去遍历这个pod数组,每次拿到一个pod就加1,然后去执行自己的逻辑。

如果有3个pod,这个循环会运行3次,那么就add三次,那么wait会等待三个线程都结束了再继续往下走。

golang/main.go at master · lovekeepcoding/golang · GitHubhttps://github.com/lovekeepcoding/golang/blob/master/examples/module2/waitgroup/main.go等的方式有几种,一种是sleep。

主线程启动了多个子线程,如何确保子线程都能够被正确的执行完呢?在主线程里面sleep一会,那么子线程都可以被执行。

如果里面是具体的业务逻辑,你就不能够正确的评估你的义务逻辑要多久才可以执行完,这样你是不知道的,估计的多了,影响程序的效率,估计少了,子线程还没有执行完。

其次就是使用管道,channel本身是父子线程通信的逻辑,第一个是可以传数据,其次可以协调多线程的执行顺序,子线程往里面去写数据,主线程读数据,这样逻辑确保主线程里面要读取到数据才不会阻塞,否则会一直阻塞住。

最后一种方法就是waitgroup

资源竞争


在一个 goroutine 中,如果分配的内存没有被其他 goroutine 访问,只在该 goroutine 中被使用,那么不存在资源竞争的问题。

但如果同一块内存被多个 goroutine 同时访问,就会产生不知道谁先访问也无法预料最后结果的情况。这就是资源竞争,这块内存可以称为共享的资源。

我们通过下面的示例来进一步地了解:

//共享的资源

var sum = 0

func main() {
  //开启100个协程让sum+10
   for i := 0; i < 100; i++ {
      go add(10)
   }

   //防止提前退出
   time.Sleep(2 * time.Second)
   fmt.Println("和为:",sum)
}

func add(i int) {
   sum += i
}

示例中,你期待的结果可能是“和为 1000”,但当运行程序后,可能如预期所示,但也可能是 990 或者 980。导致这种情况的核心原因是资源 sum 不是并发安全的,因为同时会有多个协程交叉执行 sum+=i,产生不可预料的结果。

既然已经知道了原因,解决的办法也就有了,只需要确保同时只有一个协程执行 sum+=i 操作即可。要达到该目的,可以使用 sync.Mutex 互斥锁。

小技巧:使用 go build、go run、go test 这些 Go 语言工具链提供的命令时,添加 -race 标识可以帮你检查 Go 语言代码是否存在资源竞争。

同步原语


sync.Mutex

互斥锁,顾名思义,指的是在同一时刻只有一个协程执行某段代码,其他协程都要等待该协程执行完毕后才能继续执行。

在下面的示例中,我声明了一个互斥锁 mutex,然后修改 add 函数,对 sum+=i 这段代码加锁保护。这样这段访问共享资源的代码片段就并发安全了,可以得到正确的结果。 

var(
   sum int
   mutex sync.Mutex
)


func add(i int) {
   mutex.Lock()
   sum += i
   mutex.Unlock()
}

互斥锁的使用非常简单,它只有两个方法 Lock 和 Unlock,代表加锁和解锁。当一个协程获得 Mutex 锁后,其他协程只能等到 Mutex 锁释放后才能再次获得锁。

Mutex 的 Lock 和 Unlock 方法总是成对出现,而且要确保 Lock 获得锁后,一定执行 UnLock 释放锁,所以在函数或者方法中会采用 defer 语句释放锁,如下面的代码所示:

func add(i int) {
   mutex.Lock()
   defer mutex.Unlock()
   sum += i
}

这样可以确保锁一定会被释放,不会被遗忘。

sync.RWMutex

在 sync.Mutex 小节中,我对共享资源 sum 的加法操作进行了加锁,这样可以保证在修改 sum 值的时候是并发安全的。如果读取操作也采用多个协程呢?如下面的代码所示:

func main() {

   for i := 0; i < 100; i++ {
      go add(10)
   }

   for i:=0; i<10;i++ {
      go fmt.Println("和为:",readSum())
   }

   time.Sleep(2 * time.Second)
}

//增加了一个读取sum的函数,便于演示并发
func readSum() int {
   b:=sum
   return b
}

这个示例开启了 10 个协程,它们同时读取 sum 的值。因为 readSum 函数并没有任何加锁控制,所以它不是并发安全的,即一个 goroutine 正在执行 sum+=i 操作的时候,另一个 goroutine 可能正在执行 b:=sum 操作,这就会导致读取的 num 值是一个过期的值,结果不可预期

如果要解决以上资源竞争的问题,可以使用互斥锁 sync.Mutex,如下面的代码所示:

func readSum() int {
   mutex.Lock()
   defer mutex.Unlock()
   b:=sum
   return b
}

因为 add 和 readSum 函数使用的是同一个 sync.Mutex,所以它们的操作是互斥的,也就是一个 goroutine 进行修改操作 sum+=i 的时候,另一个 gouroutine 读取 sum 的操作 b:=sum 会等待,直到修改操作执行完毕。

现在我们解决了多个 goroutine 同时读写的资源竞争问题,但是又遇到另外一个问题——性能。因为每次读写共享资源都要加锁,所以性能低下,这该怎么解决呢?

现在我们分析读写这个特殊场景,有以下几种情况:

  1. 写的时候不能同时读,因为这个时候读取的话可能读到脏数据(不正确的数据);

  2. 读的时候不能同时写,因为也可能产生不可预料的结果;

  3. 读的时候可以同时读,因为数据不会改变,所以不管多少个 goroutine 读都是并发安全的。

所以就可以通过读写锁 sync.RWMutex 来优化这段代码,提升性能。现在我将以上示例改为读写锁,来实现我们想要的结果,如下所示:

var mutex sync.RWMutex

func readSum() int {
   //只获取读锁
   mutex.RLock()
   defer mutex.RUnlock()
   b:=sum
   return b
}

对比互斥锁的示例,读写锁的改动有两处:

  1. 把锁的声明换成读写锁 sync.RWMutex。

  2. 把函数 readSum 读取数据的代码换成读锁,也就是 RLock 和 RUnlock。

这样性能就会有很大的提升,因为多个 goroutine 可以同时读数据,不再相互等待

为了强行不让关键代码并行,强行串行执行,这就需要锁的机制。

上面在执行sum+的时候,还是串行执行

锁的互斥性

func main()  {
var lock sync.RWMutex

go func() {
	lock.Lock()
	fmt.Println("a lock success")
}()

go func() {
	lock.Lock()
	fmt.Println("b lock success")
}()

go func() {
	lock.Lock()
	fmt.Println("c lock success")
}()

time.Sleep(time.Second * 2)
}


a lock success
func main()  {
var lock sync.RWMutex

go func() {
	lock.RLock()
	fmt.Println("a lock success")
}()

go func() {
	lock.RLock()
	fmt.Println("b lock success")
}()

go func() {
	lock.RLock()
	fmt.Println("c lock success")
}()

time.Sleep(time.Second * 2)

}

a lock success
c lock success
b lock success

sync.WaitGroup


在以上示例中,相信你注意到了这段 time.Sleep(2 * time.Second) 代码,这是为了防止主函数 main 返回使用,一旦 main 函数返回了,程序也就退出了。

因为我们不知道 100 个执行 add 的协程和 10 个执行 readSum 的协程什么时候完全执行完毕,所以设置了一个比较长的等待时间,也就是两秒。

小提示:一个函数或者方法的返回 (return) 也就意味着当前函数执行完毕。 

所以存在一个问题,如果这 110 个协程在两秒内执行完毕,main 函数本该提前返回,但是偏偏要等两秒才能返回,会产生性能问题。

如果这 110 个协程执行的时间超过两秒,因为设置的等待时间只有两秒,程序就会提前返回,导致有协程没有执行完毕,产生不可预知的结果。

那么有没有办法解决这个问题呢?也就是说有没有办法监听所有协程的执行,一旦全部执行完毕,程序马上退出,这样既可保证所有协程执行完毕,又可以及时退出节省时间,提升性能。你第一时间应该会想到channel。没错,channel 的确可以解决这个问题,不过非常复杂,Go 语言为我们提供了更简洁的解决办法,它就是 sync.WaitGroup。

func main() {
   run()
}

func run(){

   for i := 0; i < 100; i++ {
      go add(10)
   }

   for i:=0; i<10;i++ {
      go fmt.Println("和为:",readSum())
   }

   time.Sleep(2 * time.Second)
}

这样执行读写的 110 个协程代码逻辑就都放在了 run 函数中,在 main 函数中直接调用 run 函数即可。现在只需通过 sync.WaitGroup 对 run 函数进行改造,让其恰好执行完毕,如下所示:

func run(){
   var wg sync.WaitGroup
   //因为要监控110个协程,所以设置计数器为110
   wg.Add(110)

   for i := 0; i < 100; i++ {
      go func() {
         //计数器值减1
         defer wg.Done()
         add(10)
      }()
   }

   for i:=0; i<10;i++ {
      go func() {
         //计数器值减1
         defer wg.Done()
         fmt.Println("和为:",readSum())
      }()
   }

   //一直等待,只要计数器值为0
   wg.Wait()
}

 sync.WaitGroup 的使用比较简单,一共分为三步:

  1. 声明一个 sync.WaitGroup,然后通过 Add 方法设置计数器的值,需要跟踪多少个协程就设置多少,这里是 110;

  2. 在每个协程执行完毕时调用 Done 方法,让计数器减 1,告诉 sync.WaitGroup 该协程已经执行完毕;

  3. 最后调用 Wait 方法一直等待,直到计数器值为 0,也就是所有跟踪的协程都执行完毕。

通过 sync.WaitGroup 可以很好地跟踪协程。在协程执行完毕后,整个 run 函数才能执行完毕,时间不多不少,正好是协程执行的时间。

sync.WaitGroup 适合协调多个协程共同做一件事情的场景比如下载一个文件,假设使用 10 个协程,每个协程下载文件的 1/10 大小,只有 10 个协程都下载好了整个文件才算是下载好了。这就是我们经常听到的多线程下载,通过多个线程共同做一件事情,显著提高效率。

小提示:其实你也可以把 Go 语言中的协程理解为平常说的线程,从用户体验上也并无不可,但是从技术实现上,你知道他们是不一样的就可以了。

sync.Cond


在 Go 语言中,sync.WaitGroup 用于最终完成的场景,关键点在于一定要等待所有协程都执行完毕。

而 sync.Cond 可以用于发号施令,一声令下所有协程都可以开始执行,关键点在于协程开始的时候是等待的,要等待 sync.Cond 唤醒才能执行。

sync.Cond 从字面意思看是条件变量,它具有阻塞协程和唤醒协程的功能,所以可以在满足一定条件的情况下唤醒协程,但条件变量只是它的一种使用场景。

下面我以 10 个人赛跑为例来演示 sync.Cond 的用法。在这个示例中有一个裁判,裁判要先等这 10 个人准备就绪,然后一声发令枪响,这 10 个人就可以开始跑了,如下所示:

//10个人赛跑,1个裁判发号施令

func race(){

   cond :=sync.NewCond(&sync.Mutex{})
   var wg sync.WaitGroup
   wg.Add(11)

   for i:=0;i<10; i++ {

      go func(num int) {
         defer  wg.Done()
         fmt.Println(num,"号已经就位")
         cond.L.Lock()
         cond.Wait()//等待发令枪响
         fmt.Println(num,"号开始跑……")
         cond.L.Unlock()
      }(i)
   }

   //等待所有goroutine都进入wait状态
   time.Sleep(2*time.Second)

   go func() {
      defer  wg.Done()
      fmt.Println("裁判已经就位,准备发令枪")
      fmt.Println("比赛开始,大家准备跑")
      cond.Broadcast()//发令枪响
   }()

   //防止函数提前返回退出
   wg.Wait()
}

 以上示例中有注释说明,已经很好理解,我这里再大概讲解一下步骤:

  1. 通过 sync.NewCond 函数生成一个 *sync.Cond,用于阻塞和唤醒协程

  2. 然后启动 10 个协程模拟 10 个人,准备就位后调用 cond.Wait() 方法阻塞当前协程等待发令枪响,这里需要注意的是调用 cond.Wait() 方法时要加锁

  3. time.Sleep 用于等待所有人都进入 wait 阻塞状态,这样裁判才能调用 cond.Broadcast() 发号施令

  4. 裁判准备完毕后,就可以调用 cond.Broadcast() 通知所有人开始跑了

sync.Cond 有三个方法,它们分别是:

  1. Wait,阻塞当前协程,直到被其他协程调用 Broadcast 或者 Signal 方法唤醒,使用的时候需要加锁,使用 sync.Cond 中的锁即可,也就是 L 字段。

  2. Signal,唤醒一个等待时间最长的协程。

  3. Broadcast,唤醒所有等待的协程。

注意:在调用 Signal 或者 Broadcast 之前,要确保目标协程处于 Wait 阻塞状态,不然会出现死锁问题。

如果你以前学过 Java,会发现 sync.Cond 和 Java 的等待唤醒机制很像,它的三个方法 Wait、Signal、Broadcast 就分别对应 Java 中的 wait、notify、notifyAll。

 容器的并发安全性


  • 数组、slice、struct允许并发修改(可能会脏写),并发修改map有时会发生panic
  • 如果需要并发修改map请使用sync.Map

数组、slice、struct允许并发,go程序不会报错,但是可能会脏读脏写,结果不符合预期

map不一样,当你多个协程,会发生Panic,这是不允许的,如果非要这样,使用sync.map

总结


这节课主要讲解 Go 语言的同步原语使用,通过它们可以更灵活地控制多协程的并发。从使用上讲,Go 语言还是更推荐 channel 这种更高级别的并发控制方式,因为它更简洁,也更容易理解和使用。

当然本节课讲的这些比较基础的同步原语也很有用。同步原语通常用于更复杂的并发控制,如果追求更灵活的控制方式和性能,你可以使用它们。

sync 包里还有一个同步原语我没有讲,它就是 sync.Map。sync.Map 的使用和内置的 map 类型一样,只不过它是并发安全的。