zl程序教程

您现在的位置是:首页 >  后端

当前栏目

springboot集成kafka及kafka web UI的使用

2023-09-14 09:02:01 时间

springboot集成kafka

application.properties

在这里插入图片描述

spring.kafka.bootstrap-servers=CentOSA:9092,CentOSB:9092,CentOSC:9092

spring.kafka.producer.retries=5
spring.kafka.producer.acks=all
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.properties.enable.idempotence=true
spring.kafka.producer.transaction-id-prefix=transaction-id-


spring.kafka.consumer.group-id=group1
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.properties.isolation.level=read_committed
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.streams.application-id=wordcount_id
spring.kafka.streams.client-id=app1
spring.kafka.streams.auto-startup=true
spring.kafka.streams.state-dir=/Users/admin/Desktop/checkpoint
spring.kafka.streams.replication-factor=1
spring.kafka.streams.properties.processing.guarantee=exactly_once
        

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.zhangxueliang</groupId>
    <artifactId>springboot-kafka</artifactId>
    <version>1.0-SNAPSHOT</version>


    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.5.RELEASE</version>
    </parent>

   <dependencies>

       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter</artifactId>
       </dependency>

       <dependency>
           <groupId>org.springframework.kafka</groupId>
           <artifactId>spring-kafka</artifactId>
       </dependency>



       <dependency>
           <groupId>org.apache.kafka</groupId>
           <artifactId>kafka-streams</artifactId>
           <version>2.0.1</version>
       </dependency>

       <!--测试-->
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-test</artifactId>
           <scope>test</scope>
       </dependency>

   </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

@KafkaListener消费消息

在这里插入图片描述

@SendTo转发消息

在这里插入图片描述

kafka web UI创建topic

在这里插入图片描述
在这里插入图片描述

KSQL的使用

在这里插入图片描述

mock发送消息

在这里插入图片描述
此时消息就被处理后(加了个后缀)发到了topic03中:
在这里插入图片描述

使用KafkaTemplate发送消息

在这里插入图片描述
数据是发往topic02,但是进行了转发,topic03会收到加了后缀的消息数据:
在这里插入图片描述

开启事务

在这里插入图片描述
开启事务后发送消息有两种编码方式:

  • 使用executeInTransaction方法

在这里插入图片描述
此时消息就被转发到了topic03:
在这里插入图片描述

  • 所在类加@Transactional注解

在这里插入图片描述
测试:
在这里插入图片描述
查看转发的消息:
在这里插入图片描述