Go ---Go语言高级编程中订阅/发布模型例子解析
2023-03-14 22:47:33 时间
《Go语言高级编程》确实是本好书,我的反应是:很嫉妒,妈的!写的这么 牛逼!
func main() { // 一个过期时间为 0.1秒,缓冲区大小为10的发布者 // 发布者的缓冲区大小决定,订阅者的缓冲区大下 // 如果发布的主题订阅者没有接受将会阻塞这个订阅者 // 新发布的主题该订阅者无法在进行接收 p := NewPublisher(100*time.Millisecond, 10) defer p.Close() // 添加两个订阅者,一个订阅全部,一个订阅"golang" all := p.Subscribe() golang := p.SubscribeTopic( // 主题过滤规则 func(v interface{}) bool { // 是字符串类型吗? // 如果是,那么这个里面包含golang吗? // 满足上面的条件才是我这个订阅者想要的 // 不然返回 false // 如果是 nil,就代表只要发送我就要,相当于全部订阅 if s, ok := v.(string); ok { return strings.Contains(s, "golang") } return false }) p.Publish("hello, world!") p.Publish("hello, golang!") p.Publish("golang!") go func() { for msg := range all { fmt.Println("all:",msg) } }() go func() { for msg := range golang { fmt.Println("golang:",msg) } }() // 运行一段时间后退出 time.Sleep(3*time.Second) } // 订阅/发布模型 type ( // 订阅者为一个通道 subscriber chan interface{} // 主题为一个过滤器 topicFunc func(v interface{}) bool ) // Publisher 发布者对象 type Publisher struct { m sync.RWMutex buffer int timeout time.Duration subscribers map[subscriber] topicFunc } // 构建一个发布者对象,可以蛇者超时时间和缓存队列长度 func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher { return &Publisher{ buffer: buffer, timeout: publishTimeout, subscribers: make(map[subscriber]topicFunc), } } // 添加一个新的订阅者,订阅所有主题 func (p *Publisher) Subscribe() chan interface{} { return p.SubscribeTopic(nil) } // 添加一个新的订阅者,订阅过滤器筛选后的主题 func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} { ch := make(chan interface{}, p.buffer) p.m.Lock() // 给指定的订阅者,加上主题过滤器 p.subscribers[ch] = topic p.m.Unlock() return ch } // 退出订阅 func (p *Publisher) Evict(sub chan interface{}) { p.m.Lock() defer p.m.Unlock() // 将该订阅者从发布者的信息中删除 delete(p.subscribers, sub) close(sub) } // 发布一个主题 func (p *Publisher) Publish(v interface{}) { p.m.RLock() defer p.m.RUnlock() var wg sync.WaitGroup for sub, topic := range p.subscribers { wg.Add(1) go p.sendTopic(sub, topic, v, &wg) } // 等待主题发送完成 wg.Wait() } // 发送主题,可以容忍一定的超时 func (p *Publisher) sendTopic( sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup, ) { defer wg.Done() // 如果该订阅者没有订阅全部,并且发布的主题又不符合主题过滤器 // 那么直接返回 if topic != nil && !topic(v) { return } // 一般 time.After 与 select case一同使用 // 如果在指定时间内我们定义的通道中没有接受到值, // 那么将会执行<-time.After(p.timeout) // 是用于判断超时的操作 select { case sub <- v: case <-time.After(p.timeout): } } func (p *Publisher) Close() { p.m.Lock() defer p.m.Unlock() // 循环关闭所有的订阅者通道 for sub := range p.subscribers { delete(p.subscribers, sub) close(sub) } }
相关文章
- 在 Go 里用 CGO?这 7 个问题你要关注!
- 9款优秀的去中心化通讯软件 Matrix 的客户端
- 求职数据分析,项目经验该怎么写
- 在OKR中,我看到了数据驱动业务的未来
- 火山引擎云原生大数据在金融行业的实践
- OpenHarmony富设备移植指南(二)—从postmarketOS获取移植资源
- 《数据成熟度指数》报告:64%的企业领袖认为大多数员工“不懂数据”
- OpenHarmony 小型系统兼容性测试指南
- 肯睿中国(Cloudera):2023年企业数字战略三大趋势预测
- 适用于 Linux 的十大命令行游戏
- GNOME 截图工具的新旧截图方式
- System76 即将推出的 COSMIC 桌面正在酝酿大变化
- 2GB 内存 8GB 存储即可流畅运行,Windows 11 极致精简版系统 Tiny11 发布
- 迎接 ecode:一个即将推出的具有全新图形用户界面框架的现代、轻量级代码编辑器
- loongarch架构介绍(三)—地址翻译
- Go 语言怎么解决编译器错误“err is shadowed during return”?
- 敏捷:可能被开发人员遗忘的部分
- Denodo预测2023年数据管理和分析的未来
- 利用数据推动可持续发展
- 在 Vue3 中实现 React 原生 Hooks(useState、useEffect),深入理解 React Hooks 的