EasyCVR使用NSQ处理消息时topic和channel的理解
2023-03-14 22:33:57 时间
EasyCVR 使用 NSQ 进行消息的处理和推送,目前发现对 topic 和 channel 很难理解其使用,官网的解释也是复杂难懂,因此直接写代码进行确认。
首先编写两个消费者,topic 相同,channel 不同,一个是 channel 1 一个是 channel 2,代码如下:
package main
import (
"fmt"
"github.com/nsqio/go-nsq"
"log"
"os"
"os/signal"
"syscall"
"time"
"zhangqiadams.com/gotools/model/consts"
)
type myMessageHandler struct{}
// HandleMessage 为接口,如果返回 nil, nsq 收到 nil 就会标记消息已经被成功处理
func (h *myMessageHandler) HandleMessage(m *nsq.Message) error {
if len(m.Body) == 0 {
// Returning nil will automatically send a FIN command to NSQ to mark the message as processed.
// In this case, a message with an empty body is simply ignored/discarded.
return nil
}
// do whatever actual message processing is desired
err := h.processMessage(m.Body)
// Returning a non-nil error will automatically send a REQ command to NSQ to re-queue the message.
return err
}
// 自定义的处理消息函数
func (h *myMessageHandler) processMessage(m []byte) error {
fmt.Println("接收消息时间为", time.Now().Format(consts.TimeFormat))
fmt.Println("收到消息为", string(m))
return nil
}
func main() {
// 1. 初始化配置
config := nsq.NewConfig()
consumer, err := nsq.NewConsumer("topic1", "channel2", config)
if err != nil {
log.Fatal(err)
}
// 2. 消息处理, AddHandler 内部默认采用 1 个协程处理返回的消息
// AddConcurrentHandlers 可以自定义多少个协程处理返回的消息
consumer.AddHandler(&myMessageHandler{})
// 3. 连接 nsqlookupd 用于搜索对应的 nsqd, 连接 nsqlookupd 对应的 http 地址
// 可以直接发送信息 ConnectToNSQD, ConnectToNSQDs, ConnectToNSQLookupds.
// 如果不需要分布式,只需要发送消息,暂时不需要分布式,可以直接连接 nsqd 的 tcp 地址
// 实测使用 ConnectToNSQLookupd 的过程中,如果是新的 topic 和 channel,需要等待大约40s的时间才能收到第一次消息,后面立刻能收到消息
// 不使用分布式,直接使用 ConnectToNSQD,基本立刻能收到消息
//err = consumer.ConnectToNSQLookupd("127.0.0.1:4161")
err = consumer.ConnectToNSQLookupd("127.0.0.1:4154")
if err != nil {
log.Fatal(err)
}
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
// 程序退出,关闭所有的消费者
consumer.Stop()
}
只需要修改对应的msg.NewCustomer() 中的 channel 为 channel1,即可确定对应的消息。
消费者代码如下:
package main
import (
"fmt"
"github.com/nsqio/go-nsq"
"log"
"os"
"os/signal"
"syscall"
"time"
"zhangqiadams.com/gotools/model/consts"
)
func main() {
config := nsq.NewConfig()
// 1. 向 nsqd 的 tcp 端口发送消息,因此进行对应的配置
producer, err := nsq.NewProducer("127.0.0.1:4154", config)
if err != nil {
log.Fatal(err)
}
messageBody := []byte("hello world")
topicName := "topic2"
// 2. 同步推流到 nspd, 同步推流代表等待 nspd 的响应,如果发送失败返回错误。
// PublishAsync 代表是异步推送消息给 nspd,发送完消息后立刻返回
err = producer.Publish(topicName, messageBody)
fmt.Println("发送消息时间为", time.Now().Format(consts.TimeFormat))
if err != nil {
log.Fatal(err)
}
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
// 3. 停止生产者,一般在停止服务,停止进程的时候需要调用
producer.Stop()
}
经过代码测试总结,对 topic 和 channel 的理解如下:
1. 测试一发多收的情况,收端 topic 和 channel 完全相同的情况下:
- 如果两个收端A、B,topic=topic1 channel=channel1
- 发送端发送消息,topic=topic1 body=“hello world”
- A B 只会有一个收端收到消息,可以用作负载均衡
2. 测试收端 topic 相同,channel 不同的情况下:
- 收端 A,topic=topic1 channel=channel1
- 收端 B,topic=topic1 channel=channel2
- 发端 C,topic=topic1 body=“hello world”
- A 和 B 均可以收到信息
因此可以根据使用场景,来进行对应的 channel 的设置。
相关文章
- 在 Go 里用 CGO?这 7 个问题你要关注!
- 9款优秀的去中心化通讯软件 Matrix 的客户端
- 求职数据分析,项目经验该怎么写
- 在OKR中,我看到了数据驱动业务的未来
- 火山引擎云原生大数据在金融行业的实践
- OpenHarmony富设备移植指南(二)—从postmarketOS获取移植资源
- 《数据成熟度指数》报告:64%的企业领袖认为大多数员工“不懂数据”
- OpenHarmony 小型系统兼容性测试指南
- 肯睿中国(Cloudera):2023年企业数字战略三大趋势预测
- 适用于 Linux 的十大命令行游戏
- GNOME 截图工具的新旧截图方式
- System76 即将推出的 COSMIC 桌面正在酝酿大变化
- 2GB 内存 8GB 存储即可流畅运行,Windows 11 极致精简版系统 Tiny11 发布
- 迎接 ecode:一个即将推出的具有全新图形用户界面框架的现代、轻量级代码编辑器
- loongarch架构介绍(三)—地址翻译
- Go 语言怎么解决编译器错误“err is shadowed during return”?
- 敏捷:可能被开发人员遗忘的部分
- Denodo预测2023年数据管理和分析的未来
- 利用数据推动可持续发展
- 在 Vue3 中实现 React 原生 Hooks(useState、useEffect),深入理解 React Hooks 的