zl程序教程

您现在的位置是:首页 >  其他

当前栏目

怎么使用Spring integration在Springboot中集成Mqtt

2023-04-18 12:55:55 时间

今天小编给大家分享一下怎么使用Spring integration在Springboot中集成Mqtt的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。

关于Spring Intergration

介绍关于Spring Integration 的一些概念性的东西,如果只想知道怎么集成可跳过该部分

Spring Integration provides an extension of the Spring programming model to support the well known Enterprise Integration Patterns. It enables lightweight messaging within Spring-based applications and supports integration with external systems through declarative adapters. Those adapters provide a higher level of abstraction over Spring’s support for remoting, messaging, and scheduling.

大致意思是

  • Spring Integration 为许多著名的企业级集成方案提供了扩展接口

  • 它能够在基于Sping的应用中进行轻量级消息传递

  • 支持声明式适配器与发布系统进行集成(见Mqtt send消息的实现)

  • 这些适配器在 Spring对远程调用,消息传递和系统调度提供了更高层次的抽象

Spring Intergration核心组件

Message(消息)

Message是对消息的包装,在Spring 系统中传递的任何消息都会被包装为Message。可以理解为是Spring Integration消息传递的基本单位

Message Channel(消息管道)

消息管道:Message 在Message Channel中进行传递,生产者向管道中投递消息,消费者从管道中取出消息。Spring Integration 支持两种消息传递模型,point-to-point(点对点模型),Publish-subscribe(发布订阅模型)有多种管道类型。

本次使用点对点模型,消息管道类型DirectChannel

Message Endpoint(消息切入点)

消息切入点:消息在管道中流动那必定会有某些流入或流出的点亦或是在某个位置(即特定函数)需要对消息进行处理,过滤,格式转换等。这些点即为Message Endpoint(实际为某些处理函数),例如消息发送,消息接收都是Message Endpoint。

Message Transformer(消息转化器)

消息转化器:是将消息进行特定转换例如将一个 Object 序列化为 Json 字符串

Message Filter

消息过滤器,过滤掉特定消息。例如在管道中发送的含username 和 age 属性的 User 对象,如果当前消息(一个User实例的包装)的age < 18则将其过滤掉,那么处在过滤器之后的消费者将无法接收到 age < 18的User对象。当然过滤条件不仅是消息负载的属性,也可以是消息本身的属性。

Message Router(消息路由)

消息路由:向管道投递消息时可由消息路由根据路由规则选择投递给那个管道

Splitter(分割器)

分割器:它从一个输入管道接收一条消息并将其分割为多条消息,再将每条消息发送到不同的输出管道上

Aggregator(聚合器)

聚合器:与分割器功能刚好相反

Service Activator

Service Activator: 它是一个用于将系统服务实例接入到消息系统的泛型切入点,该切入点必须配置输入管道。其返回值可是消息类型也可以是一个消息处理器,当返回值为消息类型时需要指定输出管道,即在该切入点对消息加工处理后再发送到指定的输出管道,如果返回值为消息处理器。那么消息交由消息处理器进行处理。下文中会为Mqtt消息出站配置Service Activator并且 返回值为消息处理器

Channel Adapter(管道适配器)

管道适配器:因为外部协议有无数种,消息适配器则用于连接不同协议的外部系统。从外部系统读入数据并对数据进行处理最终与Spring Integration 内部的消息系统适配。例如将要进行Mqtt集成,那么就需要一个Mqtt的管道适配器,事实上也确实有一个,下文中将会看到。

开始集成

依赖管理工具使用Gardle

引入spring-integration-mqtt依赖

implementation "org.springframework.integration:spring-integration-mqtt:5.4.6"

创建Mqtt配置类

@Configuration
public class MqttConfig {
    /**
     *  以下属性将在配置文件中读取
     **/
    //mqtt Broker 地址
    private String[] uris;
    //连接用户名
    private String username;
    //连接密码
    private String password;
    //入站Client ID
    private String inClientId;
    //出站Client ID
    private String outClientId;
    //默认订阅主题
    private String defaultTopic;

    public void setUris(String[] uris) {
        this.uris = uris;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public void setInClientId(String inClientId) {
        this.inClientId = inClientId;
    }

    public void setOutClientId(String outClientId) {
        this.outClientId = outClientId;
    }

    public void setDefaultTopic(String defaultTopic) {
        this.defaultTopic = defaultTopic;
    }
}

怎么使用Spring integration在Springboot中集成Mqtt