zl程序教程

您现在的位置是:首页 >  工具

当前栏目

GoLang使用Goroutine+Channel实现流水线处理,扇入扇出思想解决流水线上下游供需不平衡

Golang 实现 处理 解决 思想 平衡 channel 流水线
2023-09-27 14:25:41 时间

码字不易,转载请附原链,搬砖繁忙回复不及时见谅,技术交流请加QQ群:909211071

目的

在一些业务逻辑场景中, 我们要针对同一批数据依次进行不同的处理,并且它们之间是有先后顺序的。比如我们制造一个手机要经历三个阶段:buy(采购配件) - build(组装) - pack(打包),最终得到可以出售的手机。在这个需求场景中,就可以通过goroutine+无缓冲channel实现。

 

处理逻辑

我们把整个处理路程想象成消息队列,生产者buy生产,buy的下游build进行消费并生产,pack下游进行消费。逻辑图如下:

代码实现:

package main

import (
	"fmt"
	"sync"
	"time"
)

func buy(n int) <-chan string {
	out := make(chan string)
	go func() {
		defer close(out)
		for i := 1; i <= n; i++ {
			fmt.Println("proc:buy", i)
			out <- fmt.Sprintf("配件%d", i)
		}
	}()
	return out
}
func build(in <-chan string) <-chan string {
	out := make(chan string)
	go func() {
		defer close(out)
		for v := range in {
			fmt.Println("proc:build", v)
			out <- fmt.Sprintf("组装(%s)", v)
		}
	}()
	return out
}
func pack(in <-chan string) <-chan string {
	out := make(chan string)
	go func() {
		defer close(out)
		for v := range in {
			fmt.Println("proc:pack", v)
			out <- fmt.Sprintf("打包(%s)", v)
		}
	}()
	return out
}

func main() {
	coms := buy(10)

	phones := build(coms)

	packs := pack(phones)

	for v := range packs {
		fmt.Println("result:", v)
	}
}

打印结果:

[why@whydeMacBook-Pro] ~/Desktop/go/test$go run main.go 
proc:buy 1
proc:buy 2
proc:build 配件1
proc:build 配件2
proc:buy 3
proc:pack 组装(配件1)
proc:pack 组装(配件2)
proc:build 配件3
result: 打包(组装(配件1))
result: 打包(组装(配件2))
proc:pack 组装(配件3)
result: 打包(组装(配件3))
proc:buy 4
proc:buy 5
proc:build 配件4
proc:build 配件5
proc:buy 6
proc:pack 组装(配件4)
proc:pack 组装(配件5)
proc:build 配件6
proc:buy 7
result: 打包(组装(配件4))
result: 打包(组装(配件5))
proc:pack 组装(配件6)
result: 打包(组装(配件6))
proc:build 配件7
proc:pack 组装(配件7)
result: 打包(组装(配件7))
proc:buy 8
proc:buy 9
proc:build 配件8
proc:build 配件9
proc:buy 10
proc:pack 组装(配件8)
proc:pack 组装(配件9)
proc:build 配件10
result: 打包(组装(配件8))
result: 打包(组装(配件9))
proc:pack 组装(配件10)
result: 打包(组装(配件10))

可以看到不同的处理流程是并行处理的,单个处理流程是顺序处理的。

 

供需不平衡

当三个流程处理效率相同时,上面当实现没有什么问题,但是假设运行了一段时间只会,build 处理能力下降,就会由于中间一个环节阻塞,托满整个执行效率,此时该如何处理呢?

可能大部分人都会想到,增加 build 流水线的工人啊!没错,就是这个思路,所以演变后的逻辑变成下面这样:

我们用 sleep 模拟 build 处理能力下降,演变后的代码如下:

package main

import (
	"fmt"
	"sync"
	"time"
)

func buy(n int) <-chan string {
	out := make(chan string)
	go func() {
		defer close(out)
		for i := 1; i <= n; i++ {
			fmt.Println("proc:buy", i)
			out <- fmt.Sprintf("配件%d", i)
		}
	}()
	return out
}
func build(in <-chan string) <-chan string {
	out := make(chan string)
	go func() {
		defer close(out)
		for v := range in {
			fmt.Println("proc:build", v)
			time.Sleep(time.Duration(time.Second))
			out <- fmt.Sprintf("组装(%s)", v)
		}
	}()
	return out
}
func pack(in <-chan string) <-chan string {
	out := make(chan string)
	go func() {
		defer close(out)
		for v := range in {
			fmt.Println("proc:pack", v)
			out <- fmt.Sprintf("打包(%s)", v)
		}
	}()
	return out
}

//扇入,汇聚3个channel成一个
func merge(ins ...<-chan string) <-chan string {
	wg := sync.WaitGroup{}
	wg.Add(len(ins))

	out := make(chan string)

	//定义channel数据传递函数
	f := func(in <-chan string) {
		defer wg.Done()
		for v := range in {
			out <- v
		}
	}

	//按照传入channel个数并行处理
	for _, v := range ins {
		go f(v)
	}

	go func() {
		wg.Wait()
		close(out)
	}()

	return out
}

func main() {
	coms := buy(10)

	//phones := build(coms)

	//扇入增加build效率
	phones1 := build(coms)
	phones2 := build(coms)
	phones3 := build(coms)
	phones := merge(phones1, phones2, phones3)

	packs := pack(phones)

	for v := range packs {
		fmt.Println("result:", v)
	}
}

打印结果:

[why@whydeMacBook-Pro] ~/Desktop/go/test$go run main.go 
proc:buy 1
proc:build 配件1
proc:buy 2
proc:buy 3
proc:buy 4
proc:build 配件3
proc:build 配件2
proc:build 配件4
proc:pack 组装(配件3)
proc:buy 5
proc:buy 6
proc:pack 组装(配件2)
proc:build 配件5
proc:build 配件6
result: 打包(组装(配件3))
result: 打包(组装(配件2))
proc:pack 组装(配件1)
result: 打包(组装(配件1))
proc:buy 7
proc:build 配件7
proc:pack 组装(配件6)
proc:buy 8
result: 打包(组装(配件6))
proc:build 配件8
proc:buy 9
proc:buy 10
proc:build 配件9
proc:pack 组装(配件5)
proc:pack 组装(配件4)
result: 打包(组装(配件5))
result: 打包(组装(配件4))
proc:build 配件10
proc:pack 组装(配件9)
proc:pack 组装(配件8)
result: 打包(组装(配件9))
result: 打包(组装(配件8))
proc:pack 组装(配件7)
result: 打包(组装(配件7))
proc:pack 组装(配件10)
result: 打包(组装(配件10))
[why@whydeMacBook-Pro] ~/Desktop/go/test$

通过结果我们可以看到,buy 和 pack 的处理仍是顺序的,而 build 变成了并行处理,解决了我们供需不平衡的问题。

ps:为了减少代码量,提高阅读体验,这里没有贴前后对比图,你可以自己运行对比一下二者的效率,可以很直观地感受到。