springboot集成kafka及kafka web UI的使用
2023-09-14 09:02:01 时间
springboot集成kafka
application.properties
spring.kafka.bootstrap-servers=CentOSA:9092,CentOSB:9092,CentOSC:9092
spring.kafka.producer.retries=5
spring.kafka.producer.acks=all
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.properties.enable.idempotence=true
spring.kafka.producer.transaction-id-prefix=transaction-id-
spring.kafka.consumer.group-id=group1
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.properties.isolation.level=read_committed
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.streams.application-id=wordcount_id
spring.kafka.streams.client-id=app1
spring.kafka.streams.auto-startup=true
spring.kafka.streams.state-dir=/Users/admin/Desktop/checkpoint
spring.kafka.streams.replication-factor=1
spring.kafka.streams.properties.processing.guarantee=exactly_once
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.zhangxueliang</groupId>
<artifactId>springboot-kafka</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.5.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.0.1</version>
</dependency>
<!--测试-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
@KafkaListener消费消息
@SendTo转发消息
kafka web UI创建topic
KSQL的使用
mock发送消息
此时消息就被处理后(加了个后缀)发到了topic03中:
使用KafkaTemplate发送消息
数据是发往topic02,但是进行了转发,topic03会收到加了后缀的消息数据:
开启事务
开启事务后发送消息有两种编码方式:
- 使用executeInTransaction方法
此时消息就被转发到了topic03:
- 所在类加@Transactional注解
测试:
查看转发的消息:
相关文章
- springboot中.yml没有spring的小叶子标志解决办法
- springboot+多数据源配置
- SpringBoot应用和PostgreSQL数据库部署到Kubernetes上的一个例子
- Springboot怎么集成Thymeleaf模板引擎?
- 【springboot日志】logback.xml常用配置详解
- SpringBoot中对输出的json按字典表排序
- Springboot模拟https安全访问(使用Java提供的keytool命令生成证书)
- SpringBoot异常处理五种方式、Junit单元测试、热部署
- SpringBoot Profile机制灵活切换环境配置
- Springboot+WebUploader优雅实现超大文件的上传(一)
- Springboot中如何优雅的写好Controller层代码
- Springboot集成支付宝沙箱支付(退款功能)