zl程序教程

您现在的位置是:首页 >  其他

当前栏目

EasyCVR使用NSQ处理消息时topic和channel的理解

2023-03-14 22:33:57 时间

EasyCVR 使用 NSQ 进行消息的处理和推送,目前发现对 topic 和 channel 很难理解其使用,官网的解释也是复杂难懂,因此直接写代码进行确认。

首先编写两个消费者,topic 相同,channel 不同,一个是 channel 1 一个是 channel 2,代码如下:

package main

import (
   "fmt"
   "github.com/nsqio/go-nsq"
   "log"
   "os"
   "os/signal"
   "syscall"
   "time"
   "zhangqiadams.com/gotools/model/consts"
)

type myMessageHandler struct{}

// HandleMessage 为接口,如果返回 nil, nsq 收到 nil 就会标记消息已经被成功处理
func (h *myMessageHandler) HandleMessage(m *nsq.Message) error {
   if len(m.Body) == 0 {
      // Returning nil will automatically send a FIN command to NSQ to mark the message as processed.
      // In this case, a message with an empty body is simply ignored/discarded.
      return nil
   }

   // do whatever actual message processing is desired
   err := h.processMessage(m.Body)

   // Returning a non-nil error will automatically send a REQ command to NSQ to re-queue the message.
   return err
}

// 自定义的处理消息函数
func (h *myMessageHandler) processMessage(m []byte) error {
   fmt.Println("接收消息时间为", time.Now().Format(consts.TimeFormat))
   fmt.Println("收到消息为", string(m))
   return nil
}

func main() {
   // 1. 初始化配置
   config := nsq.NewConfig()
   consumer, err := nsq.NewConsumer("topic1", "channel2", config)
   if err != nil {
      log.Fatal(err)
   }

   // 2. 消息处理, AddHandler 内部默认采用 1 个协程处理返回的消息
   // AddConcurrentHandlers 可以自定义多少个协程处理返回的消息
   consumer.AddHandler(&myMessageHandler{})

   // 3. 连接 nsqlookupd 用于搜索对应的 nsqd, 连接 nsqlookupd 对应的 http 地址
   // 可以直接发送信息 ConnectToNSQD, ConnectToNSQDs, ConnectToNSQLookupds.
   // 如果不需要分布式,只需要发送消息,暂时不需要分布式,可以直接连接 nsqd 的 tcp 地址
   // 实测使用 ConnectToNSQLookupd 的过程中,如果是新的 topic 和 channel,需要等待大约40s的时间才能收到第一次消息,后面立刻能收到消息
   // 不使用分布式,直接使用 ConnectToNSQD,基本立刻能收到消息
   //err = consumer.ConnectToNSQLookupd("127.0.0.1:4161")
   err = consumer.ConnectToNSQLookupd("127.0.0.1:4154")
   if err != nil {
      log.Fatal(err)
   }

   sigChan := make(chan os.Signal, 1)
   signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
   <-sigChan

   // 程序退出,关闭所有的消费者
   consumer.Stop()
}

只需要修改对应的msg.NewCustomer() 中的 channel 为 channel1,即可确定对应的消息。

消费者代码如下:

package main

import (
   "fmt"
   "github.com/nsqio/go-nsq"
   "log"
   "os"
   "os/signal"
   "syscall"
   "time"
   "zhangqiadams.com/gotools/model/consts"
)

func main() {
   config := nsq.NewConfig()
   // 1. 向 nsqd 的 tcp 端口发送消息,因此进行对应的配置
   producer, err := nsq.NewProducer("127.0.0.1:4154", config)
   if err != nil {
      log.Fatal(err)
   }

   messageBody := []byte("hello world")
   topicName := "topic2"

   // 2. 同步推流到 nspd, 同步推流代表等待 nspd 的响应,如果发送失败返回错误。
   // PublishAsync 代表是异步推送消息给 nspd,发送完消息后立刻返回
   err = producer.Publish(topicName, messageBody)
   fmt.Println("发送消息时间为", time.Now().Format(consts.TimeFormat))
   if err != nil {
      log.Fatal(err)
   }


   sigChan := make(chan os.Signal, 1)
   signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
   <-sigChan

   // 3. 停止生产者,一般在停止服务,停止进程的时候需要调用
   producer.Stop()
}

经过代码测试总结,对 topic 和 channel 的理解如下:

1. 测试一发多收的情况,收端 topic 和 channel 完全相同的情况下:

  • 如果两个收端A、B,topic=topic1 channel=channel1
  • 发送端发送消息,topic=topic1 body=“hello world”
  • A B 只会有一个收端收到消息,可以用作负载均衡

2. 测试收端 topic 相同,channel 不同的情况下:

  • 收端 A,topic=topic1 channel=channel1
  • 收端 B,topic=topic1 channel=channel2
  • 发端 C,topic=topic1 body=“hello world”
  • A 和 B 均可以收到信息

因此可以根据使用场景,来进行对应的 channel 的设置。