zl程序教程

您现在的位置是:首页 >  后端

当前栏目

SpringBoot整合Kafka消息组件

2023-09-14 09:08:18 时间

1、Kafka是新一代的消息系统,也是目前性能最好的消息组件,在数据采集业务中被广泛应用。这里Kafka将基于Kerberos认证实现消息组件处理。

修改pom.xml配置文件,追加依赖库配置,如下所示:

  1 <?xml version="1.0" encoding="UTF-8"?>
  2 <project xmlns="http://maven.apache.org/POM/4.0.0"
  3     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
  5     https://maven.apache.org/xsd/maven-4.0.0.xsd">
  6     <modelVersion>4.0.0</modelVersion>
  7     <parent>
  8         <groupId>org.springframework.boot</groupId>
  9         <artifactId>spring-boot-starter-parent</artifactId>
 10         <version>2.3.5.RELEASE</version>
 11         <relativePath /> <!-- lookup parent from repository -->
 12     </parent>
 13     <groupId>com.example</groupId>
 14     <artifactId>demo</artifactId>
 15     <version>0.0.1-SNAPSHOT</version>
 16     <name>demo</name>
 17     <description>Demo project for Spring Boot</description>
 18 
 19     <properties>
 20         <java.version>1.8</java.version>
 21         <maven-jar-plugin.version>3.1.1</maven-jar-plugin.version>
 22     </properties>
 23 
 24     <dependencies>
 25         <dependency>
 26             <groupId>org.springframework.boot</groupId>
 27             <artifactId>spring-boot-starter-web</artifactId>
 28         </dependency>
 29 
 30         <dependency>
 31             <groupId>org.springframework.boot</groupId>
 32             <artifactId>spring-boot-starter-test</artifactId>
 33             <scope>test</scope>
 34             <exclusions>
 35                 <exclusion>
 36                     <groupId>org.junit.vintage</groupId>
 37                     <artifactId>junit-vintage-engine</artifactId>
 38                 </exclusion>
 39             </exclusions>
 40         </dependency>
 41 
 42         <!-- mysql驱动包 -->
 43         <dependency>
 44             <groupId>mysql</groupId>
 45             <artifactId>mysql-connector-java</artifactId>
 46         </dependency>
 47 
 48         <!-- druid连接池 -->
 49         <dependency>
 50             <groupId>com.alibaba</groupId>
 51             <artifactId>druid</artifactId>
 52             <version>1.1.10</version>
 53         </dependency>
 54 
 55         <dependency>
 56             <groupId>org.springframework.boot</groupId>
 57             <artifactId>spring-boot-starter-data-jpa</artifactId>
 58         </dependency>
 59         <dependency>
 60             <groupId>org.springframework.boot</groupId>
 61             <artifactId>spring-boot-starter-cache</artifactId>
 62         </dependency>
 63         <dependency>
 64             <groupId>org.hibernate</groupId>
 65             <artifactId>hibernate-ehcache</artifactId>
 66         </dependency>
 67 
 68         <!-- activeMQ -->
 69         <dependency>
 70             <groupId>org.springframework.boot</groupId>
 71             <artifactId>spring-boot-starter-activemq</artifactId>
 72         </dependency>
 73         
 74         <!-- rabbitMQ -->
 75         <dependency>
 76             <groupId>org.springframework.boot</groupId>
 77             <artifactId>spring-boot-starter-amqp</artifactId>
 78         </dependency>
 79         
 80         <!-- kafka -->
 81         <dependency>
 82             <groupId>org.springframework.kafka</groupId>
 83             <artifactId>spring-kafka</artifactId>
 84         </dependency>
 85     </dependencies>
 86 
 87     <build>
 88         <plugins>
 89             <plugin>
 90                 <groupId>org.springframework.boot</groupId>
 91                 <artifactId>spring-boot-maven-plugin</artifactId>
 92             </plugin>
 93         </plugins>
 94         <resources>
 95             <resource>
 96                 <directory>src/main/resources</directory>
 97                 <includes>
 98                     <include>**/*.properties</include>
 99                     <include>**/*.yml</include>
100                     <include>**/*.xml</include>
101                     <include>**/*.p12</include>
102                     <include>**/*.html</include>
103                     <include>**/*.jpg</include>
104                     <include>**/*.png</include>
105                 </includes>
106             </resource>
107         </resources>
108     </build>
109 
110 </project>

修改pom.xml配置文件,追加依赖库配置,如下所示:

 1 # 定义主机列表
 2 spring.kafka.bootstrap-servers=192.168.110.142:9092
 3 # 定义主题名称
 4 spring.kafka.template.default-topic=test
 5 # 定义生产者配置
 6 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
 7 spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
 8 # 定义消费者配置
 9 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
10 spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
11 # 数据分组
12 spring.kafka.consumer.group-id=group-1

使用Kafka消息机制实现消息发送接口,如下所示:

 1 package com.demo.producer;
 2 
 3 import org.springframework.beans.factory.annotation.Autowired;
 4 import org.springframework.kafka.core.KafkaTemplate;
 5 import org.springframework.stereotype.Service;
 6 
 7 @Service
 8 public class KafkaMessageProducer {
 9 
10     // kafka消息模板
11     @Autowired
12     private KafkaTemplate<String, String> kafkaTemplate;
13 
14     public void send(String text) {
15         // 发送消息
16         this.kafkaTemplate.sendDefault("message-key", text);
17     }
18 
19 }

建立一个Kafka消息的消费程序类,如下所示:

 1 package com.demo.consumer;
 2 
 3 import org.apache.kafka.clients.consumer.ConsumerRecord;
 4 import org.springframework.kafka.annotation.KafkaListener;
 5 import org.springframework.stereotype.Service;
 6 
 7 @Service
 8 public class KafkaMessageConsumer {
 9 
10     /**
11      * 进行消息接收处理
12      * 
13      * @param record
14      */
15     @KafkaListener(topics = { "test" })
16     public void receiveMessage(ConsumerRecord<String, String> record) {
17         System.err.println("【*** 接收消息 ***】 key = " + record.key() + " , value = " + record.value());
18     }
19 
20 }

通过测试程序调用IMessageProducer接口进行消息发送,由于Kafka已经配置了自动创建主题,所以即使现在主题不存在,也不影响程序执行。

 1 package com.demo.controller;
 2 
 3 import org.springframework.beans.factory.annotation.Autowired;
 4 import org.springframework.stereotype.Controller;
 5 import org.springframework.web.bind.annotation.RequestMapping;
 6 import org.springframework.web.bind.annotation.ResponseBody;
 7 
 8 import com.demo.producer.KafkaMessageProducer;
 9 
10 @Controller
11 public class KafkaMessageController {
12 
13     @Autowired
14     private KafkaMessageProducer kafkaMessageProducer;
15 
16     @RequestMapping(value = "/messageProducer")
17     @ResponseBody
18     public void findAll() {
19         for (int i = 0; i < 20000; i++) {
20             if (i % 20 == 0) {
21                 try {
22                     Thread.sleep(1000);
23                 } catch (InterruptedException e) {
24                     e.printStackTrace();
25                 }
26             }
27             kafkaMessageProducer.send("Kafka producer message : " + i);
28         }
29     }
30 }

如果启动项目报下面的错误,如下所示:

1 WARN 17821 --- [gateway-logback] org.apache.kafka.clients.NetworkClient   : [Producer clientId=framework-gateway-logback] Connection to node 0 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2 WARN 17821 --- [gateway-logback] org.apache.kafka.clients.NetworkClient   : [Producer clientId=framework-gateway-logback] Connection to node 0 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
3 WARN 17821 --- [gateway-logback] org.apache.kafka.clients.NetworkClient   : [Producer clientId=framework-gateway-logback] Connection to node 0 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
4 WARN 17821 --- [gateway-logback] org.apache.kafka.clients.NetworkClient   : [Producer clientId=framework-gateway-logback] Connection to node 0 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.

修改server.properties的两行默认配置,即可通过外网连接服务器Kafka,问题解决:

1 # 允许外部端口连接                                            
2 listeners=PLAINTEXT://0.0.0.0:9092  
3 # 外部代理地址                                                
4 advertised.listeners=PLAINTEXT://192.168.110.142:9092