zl程序教程

您现在的位置是:首页 >  Javascript

当前栏目

一个 Demo 学会 WorkerPool

2023-03-07 09:49:23 时间

本文转载自微信公众号「Golang来啦」,作者Seekload。转载本文请联系Golang来啦公众号。

四哥水平有限,如有翻译或理解错误,烦请帮忙指出,感谢!

今天给大家分享一篇关于 workPool 的文章,这个平时大家应该用的比较多,一起来看下。

原文如下:

工作池是这样一个池子,会创建指定数量的 worker,这些 worker 能获取任务并处理。允许多个任务同时处理,但是需要维持固定数量的 worker 避免系统资源被过度使用。

通常有两种方式创建任务池:

  • 一种是预先创建固定数量的 worker;
  • 另外一种是当有需要的时候才会创建 worker,当然也会有数量限制;

本文将与大家一起讨论第一种方式。当我们预先知道有许多任务需要同时运行,并且很大概率会用上最大数量的 worker,通常会采用这种方式。

为了演示,我们先创建 Worker 结构体,它获取任务并执行。

  1. import ( 
  2.  "fmt" 
  3.  
  4. // Worker ... 
  5. type Worker struct { 
  6.  ID       int 
  7.  Name     string 
  8.  StopChan chan bool 
  9.  
  10. // Start ... 
  11. func (w *Worker) Start(jobQueue chan Job) { 
  12.  w.StopChan = make(chan bool) 
  13.  successChan := make(chan bool) 
  14.  
  15.  go func() { 
  16.   successChan <- true 
  17.   for { 
  18.    // take job 
  19.    job := <-jobQueue 
  20.    if job != nil { 
  21.     job.Start(w) 
  22.    } else { 
  23.     fmt.Printf("worker %s to be stopped\n", w.Name
  24.     w.StopChan <- true 
  25.     break 
  26.    } 
  27.   } 
  28.  }() 
  29.  
  30.  // wait for the worker to start 
  31.  <-successChan 
  32.  
  33. // Stop ... 
  34. func (w *Worker) Stop() { 
  35.  // wait for the worker to stop, blocking 
  36.  _ = <-w.StopChan 
  37.  fmt.Printf("worker %s stopped\n", w.Name

Worker 有一些属性保存当前的状态,另外还声明了两个方法分别用于启动、停止 worker。

在 Start() 方法里,创建了两个 channel 分别用于 worker 的启动和停止。最重要的是 for 循环里面,worker 会一直等待获取 job 并可执行的直到任务队列关闭。

Job 是包含单个方法 Start() 的接口,所以只要实现 Start() 方法就可以有不同类型的 job。

  1. // Job ... 
  2. type Job interface { 
  3.  Start(worker *Worker) error 

一旦 Worker 确定之后,接下来就是创建 pool 来管理 workers。

  1. import ( 
  2.  "fmt" 
  3.  "sync" 
  4.  
  5. // Pool ... 
  6. type Pool struct { 
  7.  Name string 
  8.  
  9.  Size    int 
  10.  Workers []*Worker 
  11.  
  12.  QueueSize int 
  13.  Queue     chan Job 
  14.  
  15. // Initiualize ... 
  16. func (p *Pool) Initialize() { 
  17.  // maintain minimum 1 worker 
  18.  if p.Size < 1 { 
  19.   p.Size = 1 
  20.  } 
  21.  p.Workers = []*Worker{} 
  22.  for i := 1; i <= p.Size; i++ { 
  23.   worker := &Worker{ 
  24.    ID:   i - 1, 
  25.    Name: fmt.Sprintf("%s-worker-%d", p.Name, i-1), 
  26.   } 
  27.   p.Workers = append(p.Workers, worker) 
  28.  } 
  29.  
  30.  // maintain min queue size as 1 
  31.  if p.QueueSize < 1 { 
  32.   p.QueueSize = 1 
  33.  } 
  34.  p.Queue = make(chan Job, p.QueueSize) 
  35.  
  36. // Start ... 
  37. func (p *Pool) Start() { 
  38.  for _, worker := range p.Workers { 
  39.   worker.Start(p.Queue) 
  40.  } 
  41.  fmt.Println("all workers started"
  42.  
  43. // Stop ... 
  44. func (p *Pool) Stop() { 
  45.  close(p.Queue) // close the queue channel 
  46.  
  47.  var wg sync.WaitGroup 
  48.  for _, worker := range p.Workers { 
  49.   wg.Add(1) 
  50.   go func(w *Worker) { 
  51.    defer wg.Done() 
  52.  
  53.    w.Stop() 
  54.   }(worker) 
  55.  } 
  56.  wg.Wait() 
  57.  fmt.Println("all workers stopped"

Pool 包含 worker 切片和用于保存 job 的队列。worker 的数量在初始化的时候是可以自定义。

关键点在 Stop() 的逻辑,当它被调用时,会先关闭 job 队列,worker 便会从 job 队列读到 nil,接着就会关闭对应的 worker。接着在 for 循环里,等待 worker 并发地停止直到最后一个 worker 停止。

为了演示整体逻辑,下面的例子展示了一个仅仅输出值的 job。

  1. import "fmt" 
  2.  
  3. func main() { 
  4.  pool := &Pool{ 
  5.   Name:      "test"
  6.   Size:      5, 
  7.   QueueSize: 20, 
  8.  } 
  9.  pool.Initialize() 
  10.  pool.Start() 
  11.         defer pool.Stop() 
  12.  
  13.  for i := 1; i <= 100; i++ { 
  14.   job := &PrintJob{ 
  15.    Index: i, 
  16.   } 
  17.   pool.Queue <- job 
  18.  } 
  19.  
  20. // PrintJob ... 
  21. type PrintJob struct { 
  22.  Index int 
  23.  
  24. func (pj *PrintJob) Start(worker *Worker) error { 
  25.  fmt.Printf("job %s - %d\n", worker.Name, pj.Index
  26.  return nil 

如果你看了上面的代码逻辑,就会发现很简单,创建了有 5 个 worker 的工作池并且 job 队列的大小是 20。

接着,模拟 job 创建和处理过程:一旦 job 被创建就会 push 到任务队列里,等待着的 worker 便会从队列里取出 job 并处理。

类似下面这样的输出:

  1. all workers started 
  2. job test-worker-3 - 4 
  3. job test-worker-3 - 6 
  4. job test-worker-3 - 7 
  5. job test-worker-3 - 8 
  6. job test-worker-3 - 9 
  7. job test-worker-3 - 10 
  8. job test-worker-3 - 11 
  9. job test-worker-3 - 12 
  10. job test-worker-3 - 13 
  11. job test-worker-3 - 14 
  12. job test-worker-3 - 15 
  13. job test-worker-3 - 16 
  14. job test-worker-3 - 17 
  15. job test-worker-3 - 18 
  16. job test-worker-3 - 19 
  17. job test-worker-3 - 20 
  18. worker test-worker-3 to be stopped 
  19. job test-worker-4 - 5 
  20. job test-worker-0 - 1 
  21. worker test-worker-3 stopped 
  22. job test-worker-2 - 3 
  23. worker test-worker-2 to be stopped 
  24. worker test-worker-2 stopped 
  25. worker test-worker-4 to be stopped 
  26. worker test-worker-4 stopped 
  27. worker test-worker-0 to be stopped 
  28. worker test-worker-0 stopped 
  29. job test-worker-1 - 2 
  30. worker test-worker-1 to be stopped 
  31. worker test-worker-1 stopped 
  32. all workers stopped 

via:https://www.pixelstech.net/article/1611483826-Demo-on-creating-worker-pool-in-GoLang

作者:sonic0002