zl程序教程

您现在的位置是:首页 >  Java

当前栏目

Go---Go并发编程(详细)(二)

2023-03-14 22:59:20 时间

channel操作

  • 发送
ch <- 10 // 把10发送到ch中   
  • 接收
x := <- ch // 从ch中接收值并赋值给变量x
<-ch       // 从ch中接收值,忽略结果   
  • 关闭
close(ch)   

关闭后的通道有以下特点:

只有当发送信息的 goroutine 将所有信息发送完成才能关闭通道,通道是可以被垃圾回收机制回收的,它和关闭文件是不一样的,在结束操作之后关闭文件是必须要做的,但关闭通道不是必须的。

关闭的通道就想一头封闭住的管子,管出不管进。

1.对一个关闭的通道再发送值就会导致panic。

2.对一个关闭的通道进行接收会一直获取值直到通道为空。

3.对一个关闭的并且没有值的通道执行接收操作会得到对应类型的零值。

4.关闭一个已经关闭的通道会导致panic。

通道的缓存大小

无缓存

无缓冲的通道又称为阻塞的通道,发送给通道的值必须被接收,不然会 panic

func main() {
   ch1 := make(chan int)
   ch1 <- 10           // deadlock!
}

正常使用应该为

func main() {
    ch := make(chan int)
    go func(ch1 chan int) {
        res := <- ch1
        fmt.Println(res)
    }(ch)
    ch <- 10
    close(ch)
    time.Sleep(time.Second)     // 延缓结束时间使 goroutine 能够执行完
}

如果接收和发送在连个 goroutine 上,无法判断哪一个 goroutine 会先执行,这是如果先执行的是接收方,那么接收方会先阻塞一段时间等待有 goroutine 往该通道发送值。这个过程完成了两个 goroutine 的同步,因此无缓存通道也被成为同步通道。

有缓冲
func main() {
   ch1 := make(chan int,10)
   ch1 <- 10           // 不会报 deadlock!
   for i := 0; i < 10; i++ {
      ch1 <- i
      fmt.Println(i+1)         // 超出缓存范围会报 deadlock!
   }
}

只要通道的容量大于零,那么该通道就是有缓冲的通道,通道的容量表示通道中能存放元素的数量。就像你小区的快递柜只有那么个多格子,格子满了就装不下了,就阻塞了,等到别人取走一个快递员就能往里面放一个。

单向通道

在很多时候函数只会向通道单方向存值或取值,Go语言中提供了单向通道来处理这种情况。

func counter(out chan<- int) {
    for i := 0; i < 100; i++ {
        out <- i
    }
    close(out)
}
func squarer(out chan<- int, in <-chan int) {
    for i := range in {
        out <- i * i
    }
    close(out)
}
func printer(in <-chan int) {
    for i := range in {
        fmt.Println(i)
    }
}
func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)
    go counter(ch1)
    go squarer(ch2, ch1)
    printer(ch2)
}  

1.chan <- int是一个只能发送的通道,可以发送但是不能接收;

2.<- chan int是一个只能接收的通道,可以接收但是不能发送。

单向通道只是对函数来说的,在定义通道是不存在单向的通道,因为没有意义。

定时器

Timer

时间到了只执行一次

简单使用:

package main
import (
   "fmt"
   "time"
)
func main() {
   // timer 简单使用
   timer := time.NewTimer(2 * time.Second)
   t1 := time.Now()
   fmt.Println(t1)
   t2 := <- timer.C   // 当打印完 t1 会停留2秒钟
   fmt.Println(t2)
   //t3 := <- timer.C // 再次接收会 deadlock!
   //fmt.Println(t3)
}

延时功能:

package main
import (
   "fmt"
   "time"
)
func main() {
   //实现延时功能
   // 1.time.Sleep()
   time.Sleep(2 * time.Second)
   fmt.Println(time.Now())
   // 2.<- time.NewTimer().C
   <- time.NewTimer(2 * time.Second).C
   fmt.Println(time.Now())
   // 3.<- time.After()
   <- time.After(2 * time.Second)
   fmt.Println(time.Now())
}

关闭定时器:

package main
import (
   "fmt"
   "time"
)
func main() {
   timer := time.NewTimer(2 * time.Second)
   go func() {
      fmt.Println("定时器准备工作")
      // 如果将定时器停止则,协程也在这一步终止,不会继续执行该协程后续代码,但不会影响别的协程进行
      <- timer.C                   
      fmt.Println("定时器开始工作")
   }()
   go func() {
      fmt.Println("另一个协程正在运行")
   }()
   stop := timer.Stop()
   if stop {
      fmt.Println("停止计时器成功")
   }
   time.Sleep(5 * time.Second)
}

重置定时器时间:

package main
import (
   "fmt"
   "time"
)
func main() {
   timer := time.NewTimer(3 * time.Second)
   // 将定时器时间改为 1 秒
   timer.Reset(1 * time.Second)
   fmt.Println(time.Now())
   // 一秒后执行
   fmt.Println(<-timer.C)
   timer.Reset(2 * time.Second)
   // 重置后的 timer 可以再次使用,不会出现 deadlock!
   fmt.Println(<-timer.C)
}

Ticker

时间到了,多次执行

使用:

package main
import (
   "fmt"
   "time"
)
func main() {
   ticker := time.NewTicker(time.Second)
   go func() {
      i := 0
      for i != 5 {
         i ++
         fmt.Println(<- ticker.C)
      }
      fmt.Println("协程结束")
   }()
   time.Sleep(6 * time.Second)
}

select

有些需求会让我们从多个通道接收数据,若是没有数据可以被接收就会产生阻塞的情况,影响程序效率。

我们可以使用:

for{
    // 尝试从ch1接收值
    data, ok := <-ch1
    // 尝试从ch2接收值
    data, ok := <-ch2
}

解决阻塞问题,但是运行性能会差很多。

这时候我们就可以使用 select 关键字

使用格式

select {
    case <-chan1:
       // 如果chan1成功读到数据,则进行该case处理语句
    case chan2 <- 1:
       // 如果成功向chan2写入数据,则进行该case处理语句
    default:
       // 如果上面都没有成功,则进入default处理流程
    }

监听通道,直到有一个 channel 开始执行

package main
import (
   "fmt"
)
func main() {
   // 定义两个没有缓存的通道
   ch1 := make(chan string)
   ch2 := make(chan string)
   // 执行两个子协程来向通道发送数据
   go func(chan string) {
      ch1 <- "ch1"
   }(ch1)
   go func(chan string) {
      ch2 <- "ch2"
   }(ch2)
   // 使用select对管道进行监控,只会执行第一个被发送数据的通道
   select {
   case s1 := <- ch1:
      fmt.Println("s1 =",s1)
   case s2 := <- ch2:
      fmt.Println("s2 =",s2)
   }
}

判断管道是否存满

package main
import (
   "fmt"
   "time"
)
func main() {
   // 创建通道
   ch := make(chan string,10)
   // 子协程发送数据
   go func(ch1 chan string) {
      for {
         select {
         case ch1 <- "hello":
            fmt.Println("发送数据")
         default:
            fmt.Println("管道满了")
         }
         time.Sleep(time.Microsecond * 500000)
      }
   }(ch)
   // 接收数据
   for s := range  ch {
      fmt.Println("res:",s)
      time.Sleep(time.Second)
   }
}

并发安全和锁

竞态问题

有时候在Go代码中可能会存在多个goroutine同时操作一个资源(临界区),这种情况会发生竞态问题(数据竞态)。类比现实生活中的例子有十字路口被各个方向的的汽车竞争;还有火车上的卫生间被车厢里的人竞争。

例如以下的代码

package main
import (
   "fmt"
   "sync"
)
var x int64
var wg sync.WaitGroup
func add() {
   defer wg.Done()
   for i := 0; i < 5000; i++ {
      x++
   }
}
// 原本预期为10000,但是输出却是不确定的
func main() {
   wg.Add(2)
   go add()
   go add()
   wg.Wait()
   fmt.Println(x)
}

上面的代码中我们开启了两个goroutine去累加变量x的值,这两个goroutine在访问和修改x变量的时候就会存在数据竞争,导致最后的结果与期待的不符。

互斥锁

互斥锁是一种常用的控制共享资源访问的方法,它能够保证同时只有一个goroutine可以访问共享资源。Go语言中使用sync包的Mutex类型来实现互斥锁。

使用方法

package main
import (
   "fmt"
   "sync"
)
var x int64
var wg sync.WaitGroup
var lock sync.Mutex
func add() {
   for i := 0; i < 5000; i++ {
      lock.Lock() // 加锁
      x ++
      lock.Unlock() // 解锁
   }
   wg.Done()
}
// 结果与预期一致
func main() {
   wg.Add(2)
   go add()
   go add()
   wg.Wait()
   fmt.Println(x)
}

使用互斥锁能够保证同一时间有且只有一个goroutine进入临界区,其他的goroutine则在等待锁;当互斥锁释放后,等待的goroutine才可以获取锁进入临界区,多个goroutine同时等待一个锁时,唤醒的策略是随机的。