GoLang使用Goroutine+Channel实现流水线处理,扇入扇出思想解决流水线上下游供需不平衡
Golang 实现 处理 解决 思想 平衡 channel 流水线
2023-09-27 14:25:41 时间
码字不易,转载请附原链,搬砖繁忙回复不及时见谅,技术交流请加QQ群:909211071
目的
在一些业务逻辑场景中, 我们要针对同一批数据依次进行不同的处理,并且它们之间是有先后顺序的。比如我们制造一个手机要经历三个阶段:buy(采购配件) - build(组装) - pack(打包),最终得到可以出售的手机。在这个需求场景中,就可以通过goroutine+无缓冲channel实现。
处理逻辑
我们把整个处理路程想象成消息队列,生产者buy生产,buy的下游build进行消费并生产,pack下游进行消费。逻辑图如下:
代码实现:
package main
import (
"fmt"
"sync"
"time"
)
func buy(n int) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for i := 1; i <= n; i++ {
fmt.Println("proc:buy", i)
out <- fmt.Sprintf("配件%d", i)
}
}()
return out
}
func build(in <-chan string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for v := range in {
fmt.Println("proc:build", v)
out <- fmt.Sprintf("组装(%s)", v)
}
}()
return out
}
func pack(in <-chan string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for v := range in {
fmt.Println("proc:pack", v)
out <- fmt.Sprintf("打包(%s)", v)
}
}()
return out
}
func main() {
coms := buy(10)
phones := build(coms)
packs := pack(phones)
for v := range packs {
fmt.Println("result:", v)
}
}
打印结果:
[why@whydeMacBook-Pro] ~/Desktop/go/test$go run main.go
proc:buy 1
proc:buy 2
proc:build 配件1
proc:build 配件2
proc:buy 3
proc:pack 组装(配件1)
proc:pack 组装(配件2)
proc:build 配件3
result: 打包(组装(配件1))
result: 打包(组装(配件2))
proc:pack 组装(配件3)
result: 打包(组装(配件3))
proc:buy 4
proc:buy 5
proc:build 配件4
proc:build 配件5
proc:buy 6
proc:pack 组装(配件4)
proc:pack 组装(配件5)
proc:build 配件6
proc:buy 7
result: 打包(组装(配件4))
result: 打包(组装(配件5))
proc:pack 组装(配件6)
result: 打包(组装(配件6))
proc:build 配件7
proc:pack 组装(配件7)
result: 打包(组装(配件7))
proc:buy 8
proc:buy 9
proc:build 配件8
proc:build 配件9
proc:buy 10
proc:pack 组装(配件8)
proc:pack 组装(配件9)
proc:build 配件10
result: 打包(组装(配件8))
result: 打包(组装(配件9))
proc:pack 组装(配件10)
result: 打包(组装(配件10))
可以看到不同的处理流程是并行处理的,单个处理流程是顺序处理的。
供需不平衡
当三个流程处理效率相同时,上面当实现没有什么问题,但是假设运行了一段时间只会,build 处理能力下降,就会由于中间一个环节阻塞,托满整个执行效率,此时该如何处理呢?
可能大部分人都会想到,增加 build 流水线的工人啊!没错,就是这个思路,所以演变后的逻辑变成下面这样:
我们用 sleep 模拟 build 处理能力下降,演变后的代码如下:
package main
import (
"fmt"
"sync"
"time"
)
func buy(n int) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for i := 1; i <= n; i++ {
fmt.Println("proc:buy", i)
out <- fmt.Sprintf("配件%d", i)
}
}()
return out
}
func build(in <-chan string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for v := range in {
fmt.Println("proc:build", v)
time.Sleep(time.Duration(time.Second))
out <- fmt.Sprintf("组装(%s)", v)
}
}()
return out
}
func pack(in <-chan string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for v := range in {
fmt.Println("proc:pack", v)
out <- fmt.Sprintf("打包(%s)", v)
}
}()
return out
}
//扇入,汇聚3个channel成一个
func merge(ins ...<-chan string) <-chan string {
wg := sync.WaitGroup{}
wg.Add(len(ins))
out := make(chan string)
//定义channel数据传递函数
f := func(in <-chan string) {
defer wg.Done()
for v := range in {
out <- v
}
}
//按照传入channel个数并行处理
for _, v := range ins {
go f(v)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
coms := buy(10)
//phones := build(coms)
//扇入增加build效率
phones1 := build(coms)
phones2 := build(coms)
phones3 := build(coms)
phones := merge(phones1, phones2, phones3)
packs := pack(phones)
for v := range packs {
fmt.Println("result:", v)
}
}
打印结果:
[why@whydeMacBook-Pro] ~/Desktop/go/test$go run main.go
proc:buy 1
proc:build 配件1
proc:buy 2
proc:buy 3
proc:buy 4
proc:build 配件3
proc:build 配件2
proc:build 配件4
proc:pack 组装(配件3)
proc:buy 5
proc:buy 6
proc:pack 组装(配件2)
proc:build 配件5
proc:build 配件6
result: 打包(组装(配件3))
result: 打包(组装(配件2))
proc:pack 组装(配件1)
result: 打包(组装(配件1))
proc:buy 7
proc:build 配件7
proc:pack 组装(配件6)
proc:buy 8
result: 打包(组装(配件6))
proc:build 配件8
proc:buy 9
proc:buy 10
proc:build 配件9
proc:pack 组装(配件5)
proc:pack 组装(配件4)
result: 打包(组装(配件5))
result: 打包(组装(配件4))
proc:build 配件10
proc:pack 组装(配件9)
proc:pack 组装(配件8)
result: 打包(组装(配件9))
result: 打包(组装(配件8))
proc:pack 组装(配件7)
result: 打包(组装(配件7))
proc:pack 组装(配件10)
result: 打包(组装(配件10))
[why@whydeMacBook-Pro] ~/Desktop/go/test$
通过结果我们可以看到,buy 和 pack 的处理仍是顺序的,而 build 变成了并行处理,解决了我们供需不平衡的问题。
ps:为了减少代码量,提高阅读体验,这里没有贴前后对比图,你可以自己运行对比一下二者的效率,可以很直观地感受到。
相关文章
- 聊一聊 golang 的测试与性能调优
- Golang 测试工具 go test
- Golang M 2023 3 topic
- Golang 【basic_leaming】文件、目录操作
- Golang及python实现Unix Socket
- qqwry - 纯真ip库的golang服务
- Top15的golang学习资源网站
- 组织Golang代码
- golang 面试
- 使用Golang利用ectd实现一个分布式锁
- 浅析Golang map的实现原理
- golang 查找素数
- Golang 中的反向代理(ReverseProxy) 介绍与使用
- Golang 实现 Redis(5): 使用跳表实现 SortedSet
- Golang 实现 Redis(4): AOF 持久化与AOF重写
- Golang 实现 Redis(2): 实现 Redis 协议解析器
- Golang 实现 Redis(1): Golang 编写 Tcp 服务器
- 实习第一周(Golang)
- Golang 基于excelize实现Excel表格的解析、导出
- golang 三目运算的实现
- centos6使用 swig3.0.6 编译c,golang 1.4.2调用
- Golang 1.0 windows 64 bit 配置环境。
- 深入解析golang几种非常主流的依赖注入框架,附实现案例及原理解析
- golang 实现程序运行时函数动态替换,举例说明动态注入和替换本地方法、系统库方法、第三方库方法,附完整源码实现