zl程序教程

您现在的位置是:首页 >  数据库

当前栏目

SpringBoot 集成 Kafka

2023-04-18 16:26:44 时间

1 Docker 安装 Kafka

Docker 安装 Kafka

2 Kafka 创建 Topic

创建两个topic:topic1、topic2,其分区和副本数都设置为1 (可以在Java代码中创建)

PS C:UsersAdministrator> docker exec -it kafka /bin/sh

$ kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic1
Created topic topic1.

$ kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic2
Created topic topic2.

3 Java 创建 Topic

package com.xu.mq.demo.test.service;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import org.apache.kafka.clients.admin.NewTopic;

/**
 * kafka 初始化配置类 创建 Topic
 *
 * @author Administrator
 * @date 2023年2月17日11点30分
 */
@Configuration
public class KafkaInitialConfig {

    public static final String AUDIO_UPLOAD_TOPIC = "AudioUploadTopic";

    public static final String TEXT_UPLOAD_TOPIC = "TextUploadTopic";

    @Bean
    public NewTopic audioUploadTopic() {
    	// 设置分区1,备份1 
        return new NewTopic(AUDIO_UPLOAD_TOPIC, 1, (short) 1);
    }

    @Bean
    public NewTopic textUploadTopic() {
    	// 设置分区3,备份2 
        return new NewTopic(TEXT_UPLOAD_TOPIC, 3, (short) 2);
    }

}

4 SpringBoot 集成 Kafka

4.1 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.7.8</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>

	<groupId>com.xu</groupId>
	<artifactId>kafka</artifactId>
	<version>0.0.1-SNAPSHOT</version>

	<name>kafka</name>

	<description>Demo project for Spring Boot</description>

	<properties>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>

		<dependency>
			<groupId>cn.hutool</groupId>
			<artifactId>hutool-all</artifactId>
			<version>5.8.12</version>
		</dependency>

		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-streams</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka-test</artifactId>
			<scope>test</scope>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>

		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<optional>true</optional>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-devtools</artifactId>
			<scope>runtime</scope>
			<optional>true</optional>
		</dependency>

	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
				<configuration>
					<excludes>
						<exclude>
							<groupId>org.projectlombok</groupId>
							<artifactId>lombok</artifactId>
						</exclude>
					</excludes>
				</configuration>
			</plugin>
		</plugins>
	</build>

</project>

4.2 application.yml

server:
  port: 8001

spring:
  application:
    name: hello-kafka
  kafka:
    # 以逗号分隔的地址列表,用于建立与Kafka集群的初始连接(kafka 默认的端口号为9092)
    bootstrap-servers: 192.168.1.92:9092
    producer:
      # 消息发送失败重试次数
      retries: 3
      # 重试间隔
      retry-backoff-ms: 500
      # 压缩消息,支持四种类型,分别为:none、lz4、gzip、snappy,默认为none,lz4压缩比最高
      compression-type: none
      #发送缓冲区大小32M
      buffer-memory: 33554432
      # 去缓冲区中一次拉16k的数据,发送到broker
      batch-size: 16384
      # 每条消息大小限制 20M
      max-request-size: 20971520
      # 设置发送延时时间,如果在设置的时间内依然没有达到batch-size,依然发出消息给kafka集群
      linger-ms: 30
      # 失败重试时,保证消息顺序性,会降低吞吐量
      max-in-flight-requests-per-connection: 1
      # 开启发送消息幂等性(单分区)
      enable-idempotence: true
      # 生产者空间不足时阻塞的时间,默认60s
      max-block-ms: 6000
      # acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
      # acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
      # acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。
      acks: -1
      # key,value序列化器选择
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 开启事务,当开启时retries必须>0 acks必须为all 可以使用kafkaTemplate.executeInTransaction和@Transactional实现卡夫卡事物
      transaction-id-prefix: transaction
    consumer:
      group-id: KafkaGroup
      # 提交offset延时(接收到消息后多久提交offset)
      auto-commit-interval: 1000
      # earliest:重置为分区中最小的offset;
      # latest:重置为分区中最新的offset(消费分区中新产生的数据);
      # none:只要有一个分区不存在已提交的offset,就抛出异常;
      auto-offset-reset: earliest
      # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
      enable-auto-commit: false
      # 键的反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 批量消费每次最多消费多少条消息
      max-poll-records: 50
    # 根据实际场景可将max.poll.interval.ms值设置大一点,避免不必要的Rebalance
    properties:
      max:
        poll:
          interval:
            ms: 600000
    listener:
      missing-topics-fatal: false
      # 线程数
      concurrency: 4
      # RECORD 当每⼀条记录被消费者监听器(ListenerConsumer)处理之后提交
      # BATCH 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
      # TIME 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间⼤于TIME时提交
      # COUNT 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量⼤于等于COUNT时提交
      # COUNT_TIME TIME | COUNT 有⼀个条件满⾜时提交
      # MANUAL 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后, ⼿动调⽤Acknowledgment.acknowledge()后提交
      # MANUAL_IMMEDIATE ⼿动调⽤Acknowledgment.acknowledge()后⽴即提交,⼀般使⽤这种
      ack-mode: manual-immediate
      # 消费超时时间
      poll-timeout: 3000

4.3 KafkaApplication.java

package com.xu.mq.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;

/**
 * @author Administrator
 */
@EnableKafka
@SpringBootApplication
public class KafkaApplication {

	public static void main(String[] args) {
		SpringApplication.run(DemoApplication.class, args);
	}

}

4.4 CustomizePartitioner.java

自定义消息推送的分区,更加具体的业务逻辑可以使用Java自定义设置推送分区也可以使用kafka的默认设置。

package com.xu.kafka.config;


import java.util.Map;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

/**
 * @author Administrator
 */
public class CustomizePartitioner implements Partitioner {

    /**
     * 自定义分区规则
     *
     * @param topic      The topic name
     * @param key        The key to partition on (or null if no key)
     * @param keyBytes   The serialized key to partition on( or null if no key)
     * @param value      The value to partition on or null
     * @param valueBytes The serialized value to partition on or null
     * @param cluster    The current cluster metadata
     * @return
     */
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        return 0;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }

}

4.5 KafkaInitialConfig.java

package com.xu.kafka.config;


import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import org.apache.kafka.clients.admin.NewTopic;

/**
 * kafka 初始化配置类 创建 Topic
 *
 * @author Administrator
 * @date 2023年2月17日11点30分
 */
@Configuration
public class KafkaInitialConfig {

    public static final String AUDIO_UPLOAD_TOPIC = "AudioUploadTopic";

    public static final String TEXT_UPLOAD_TOPIC = "TextUploadTopic";

    @Bean
    public NewTopic audioUploadTopic() {
    	// 设置分区1,备份1 
        return new NewTopic(AUDIO_UPLOAD_TOPIC, 1, (short) 1);
    }

    @Bean
    public NewTopic textUploadTopic() {
    	// 设置分区3,备份2 
        return new NewTopic(TEXT_UPLOAD_TOPIC, 3, (short) 2);
    }

}

4.6 SendMessageController.java 生产者

package com.xu.kafka.message.controller;


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.xu.kafka.config.KafkaInitialConfig;

import lombok.extern.slf4j.Slf4j;

/**
 * @author Administrator
 */
@Slf4j
@RequestMapping(value = "/kafka")
@RestController
public class SendMessageController {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    @Autowired
    private RetryTemplate retryTemplate;

    /**
     * KafkaTemplate 发送消息 同步
     *
     * @param msg
     */
    @GetMapping("/test1/{msg}")
    public void test1(@PathVariable("msg") String msg) {
        String key = KafkaInitialConfig.AUDIO_UPLOAD_TOPIC;
        String topic = KafkaInitialConfig.AUDIO_UPLOAD_TOPIC;
        kafkaTemplate.send(topic, key, msg);
    }

    /**
     * KafkaTemplate 发送消息 异步
     *
     * @param msg
     */
    @GetMapping("/test2/{msg}")
    public void test2(@PathVariable("msg") String msg) throws Exception {
        String key = KafkaInitialConfig.AUDIO_UPLOAD_TOPIC;
        String topic = KafkaInitialConfig.AUDIO_UPLOAD_TOPIC;
        kafkaTemplate.send(topic, key, msg).get();
    }

    /**
     * KafkaTemplate 发送消息 同步 有回调
     *
     * @param msg
     */
    @GetMapping("/test3/{msg}")
    public void test3(@PathVariable("msg") String msg) {
        String key = KafkaInitialConfig.AUDIO_UPLOAD_TOPIC;
        String topic = KafkaInitialConfig.AUDIO_UPLOAD_TOPIC;
        kafkaTemplate.send(topic, key, msg).addCallback(success -> {
            System.out.println("发送成功	" + success);
        }, fail -> {
            System.out.println("发送失败	" + fail);
        });
    }

    /**
     * RetryTemplate 发送消息 同步 有回调
     *
     * @param msg
     */
    @GetMapping("/test4/{msg}")
    public void test4(@PathVariable("msg") String msg) {
        String key = KafkaInitialConfig.AUDIO_UPLOAD_TOPIC;
        String topic = KafkaInitialConfig.AUDIO_UPLOAD_TOPIC;
        retryTemplate.execute(retryCallback -> {
            kafkaTemplate.send(topic, key, msg);
            log.info("Kafka 发送成功 Topic:{}, Key:{}, Count:{}", topic, key, retryCallback.getRetryCount());
            return "success " + retryCallback.getRetryCount();
        }, recoveryCallback -> {
            // 重试后仍然失败后需要执行的代码
            log.info("Kafka 发送失败 Topic:{}, Key{}, Count:{}, Info:{}", topic, key, recoveryCallback.getRetryCount(),
                    recoveryCallback.getLastThrowable().getMessage());
            return "failure " + recoveryCallback.getRetryCount();
        });
    }

    /**
     * KafkaTemplate 发送消息 事物
     *
     * @param msg
     */
    @GetMapping("/test5/{msg}")
    public void test5(@PathVariable("msg") String msg) {
        String key = KafkaInitialConfig.AUDIO_UPLOAD_TOPIC;
        String topic = KafkaInitialConfig.AUDIO_UPLOAD_TOPIC;
        kafkaTemplate.executeInTransaction(operations -> {
            operations.send(topic, key, msg);
            throw new RuntimeException("fail");
        });
    }

}

4.7 KafkaConsumer.java 消费者

package com.xu.kafka.message.controller;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import com.xu.kafka.config.KafkaInitialConfig;

import lombok.extern.slf4j.Slf4j;

/**
 * @author Administrator
 */
@Slf4j
@Component
public class KafkaConsumer {

    /**
     * 指定一个消费者组,一个主题主题。
     *
     * @param record
     */
    @KafkaListener(topics = KafkaInitialConfig.AUDIO_UPLOAD_TOPIC)
    public void consumer(ConsumerRecord<String, String> record, Acknowledgment ack) {
        System.out.printf("分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",
                record.partition(),
                record.offset(),
                record.key(),
                record.value(),
                record.timestamp()
        );
        ack.acknowledge();
    }

}
分区 = 2, 偏移量 = 16, key = 11111111, 内容 = 1111111111111111111111111111,创建消息的时间戳 =1676983128560

在这里插入图片描述

5 启动服务

  .   ____          _            __ _ _
 /\ / ___'_ __ _ _(_)_ __  __ _    
( ( )\___ | '_ | '_| | '_ / _` |    
 \/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.7.8)

2023-02-22 10:20:51.142  INFO 38584 --- [  restartedMain] com.xu.kafka.KafkaApplication            : Starting KafkaApplication using Java 1.8.0_301 on Hyacinth with PID 38584 (E:SourceCodeIdeakafka	argetclasses started by Administrator in E:SourceCodeIdeakafka)
2023-02-22 10:20:51.143  INFO 38584 --- [  restartedMain] com.xu.kafka.KafkaApplication            : No active profile set, falling back to 1 default profile: "default"
2023-02-22 10:20:51.200  INFO 38584 --- [  restartedMain] .e.DevToolsPropertyDefaultsPostProcessor : Devtools property defaults active! Set 'spring.devtools.add-properties' to 'false' to disable
2023-02-22 10:20:51.200  INFO 38584 --- [  restartedMain] .e.DevToolsPropertyDefaultsPostProcessor : For additional web related logging consider setting the 'logging.level.web' property to 'DEBUG'
2023-02-22 10:20:53.010  INFO 38584 --- [  restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port(s): 8001 (http)
2023-02-22 10:20:53.021  INFO 38584 --- [  restartedMain] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
2023-02-22 10:20:53.021  INFO 38584 --- [  restartedMain] org.apache.catalina.core.StandardEngine  : Starting Servlet engine: [Apache Tomcat/9.0.71]
2023-02-22 10:20:53.140  INFO 38584 --- [  restartedMain] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
2023-02-22 10:20:53.140  INFO 38584 --- [  restartedMain] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 1939 ms
2023-02-22 10:20:53.683  INFO 38584 --- [  restartedMain] o.s.b.d.a.OptionalLiveReloadServer       : LiveReload server is running on port 35729
2023-02-22 10:20:53.755  INFO 38584 --- [  restartedMain] o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values: 
	bootstrap.servers = [192.168.1.92:9092]
	client.dns.lookup = use_all_dns_ips
	client.id = 
	connections.max.idle.ms = 300000
	default.api.timeout.ms = 60000
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 2147483647
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.connect.timeout.ms = null
	sasl.login.read.timeout.ms = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.login.retry.backoff.max.ms = 10000
	sasl.login.retry.backoff.ms = 100
	sasl.mechanism = GSSAPI
	sasl.oauthbearer.clock.skew.seconds = 30
	sasl.oauthbearer.expected.audience = null
	sasl.oauthbearer.expected.issuer = null
	sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
	sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
	sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
	sasl.oauthbearer.jwks.endpoint.url = null
	sasl.oauthbearer.scope.claim.name = scope
	sasl.oauthbearer.sub.claim.name = sub
	sasl.oauthbearer.token.endpoint.url = null
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	socket.connection.setup.timeout.max.ms = 30000
	socket.connection.setup.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.certificate.chain = null
	ssl.keystore.key = null
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.2
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.certificates = null
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS

2023-02-22 10:20:53.847  WARN 38584 --- [  restartedMain] o.a.k.clients.admin.AdminClientConfig    : The configuration 'max.poll.interval.ms' was supplied but isn't a known config.
2023-02-22 10:20:53.849  INFO 38584 --- [  restartedMain] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.1.2
2023-02-22 10:20:53.849  INFO 38584 --- [  restartedMain] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: f8c67dc3ae0a3265
2023-02-22 10:20:53.849  INFO 38584 --- [  restartedMain] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1677032453848
2023-02-22 10:20:54.234 ERROR 38584 --- [  restartedMain] o.springframework.kafka.core.KafkaAdmin  : Failed to create topics

org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 2 larger than available brokers: 1.

2023-02-22 10:20:54.236 ERROR 38584 --- [  restartedMain] o.springframework.kafka.core.KafkaAdmin  : Could not configure topics

org.springframework.kafka.KafkaException: Failed to create topics; nested exception is org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 2 larger than available brokers: 1.
	at org.springframework.kafka.core.KafkaAdmin.addTopics(KafkaAdmin.java:450) [spring-kafka-2.8.11.jar:2.8.11]
	at org.springframework.kafka.core.KafkaAdmin.addOrModifyTopicsIfNeeded(KafkaAdmin.java:300) [spring-kafka-2.8.11.jar:2.8.11]
	at org.springframework.kafka.core.KafkaAdmin.initialize(KafkaAdmin.java:201) [spring-kafka-2.8.11.jar:2.8.11]
	at org.springframework.kafka.core.KafkaAdmin.afterSingletonsInstantiated(KafkaAdmin.java:171) [spring-kafka-2.8.11.jar:2.8.11]
	at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:974) [spring-beans-5.3.25.jar:5.3.25]
	at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:918) [spring-context-5.3.25.jar:5.3.25]
	at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:583) [spring-context-5.3.25.jar:5.3.25]
	at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:147) [spring-boot-2.7.8.jar:2.7.8]
	at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:731) [spring-boot-2.7.8.jar:2.7.8]
	at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:408) [spring-boot-2.7.8.jar:2.7.8]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:307) [spring-boot-2.7.8.jar:2.7.8]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1303) [spring-boot-2.7.8.jar:2.7.8]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1292) [spring-boot-2.7.8.jar:2.7.8]
	at com.xu.kafka.KafkaApplication.main(KafkaApplication.java:15) [classes/:na]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_301]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_301]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_301]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_301]
	at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49) [spring-boot-devtools-2.7.8.jar:2.7.8]
Caused by: org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 2 larger than available brokers: 1.

2023-02-22 10:20:54.236  INFO 38584 --- [| adminclient-1] o.a.kafka.common.utils.AppInfoParser     : App info kafka.admin.client for adminclient-1 unregistered
2023-02-22 10:20:54.239  INFO 38584 --- [| adminclient-1] org.apache.kafka.common.metrics.Metrics  : Metrics scheduler closed
2023-02-22 10:20:54.240  INFO 38584 --- [| adminclient-1] org.apache.kafka.common.metrics.Metrics  : Closing reporter org.apache.kafka.common.metrics.JmxReporter
2023-02-22 10:20:54.240  INFO 38584 --- [| adminclient-1] org.apache.kafka.common.metrics.Metrics  : Metrics reporters closed
2023-02-22 10:20:54.266  INFO 38584 --- [  restartedMain] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
	allow.auto.create.topics = true
	auto.commit.interval.ms = 1000
	auto.offset.reset = earliest
	bootstrap.servers = [192.168.1.92:9092]
	check.crcs = true
	client.dns.lookup = use_all_dns_ips
	client.id = consumer-KafkaGroup-1
	client.rack = 
	connections.max.idle.ms = 540000
	default.api.timeout.ms = 60000
	enable.auto.commit = false
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = KafkaGroup
	group.instance.id = null
	heartbeat.interval.ms = 3000
	interceptor.classes = []
	internal.leave.group.on.close = true
	internal.throw.on.fetch.stable.offset.unsupported = false
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 600000
	max.poll.records = 50
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.connect.timeout.ms = null
	sasl.login.read.timeout.ms = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.login.retry.backoff.max.ms = 10000
	sasl.login.retry.backoff.ms = 100
	sasl.mechanism = GSSAPI
	sasl.oauthbearer.clock.skew.seconds = 30
	sasl.oauthbearer.expected.audience = null
	sasl.oauthbearer.expected.issuer = null
	sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
	sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
	sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
	sasl.oauthbearer.jwks.endpoint.url = null
	sasl.oauthbearer.scope.claim.name = scope
	sasl.oauthbearer.sub.claim.name = sub
	sasl.oauthbearer.token.endpoint.url = null
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	session.timeout.ms = 45000
	socket.connection.setup.timeout.max.ms = 30000
	socket.connection.setup.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.certificate.chain = null
	ssl.keystore.key = null
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.2
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.certificates = null
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

2023-02-22 10:20:54.300  INFO 38584 --- [  restartedMain] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.1.2
2023-02-22 10:20:54.300  INFO 38584 --- [  restartedMain] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: f8c67dc3ae0a3265
2023-02-22 10:20:54.300  INFO 38584 --- [  restartedMain] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1677032454299
2023-02-22 10:20:54.301  INFO 38584 --- [  restartedMain] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-KafkaGroup-1, groupId=KafkaGroup] Subscribed to topic(s): AudioUploadTopic
2023-02-22 10:20:54.309  INFO 38584 --- [  restartedMain] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
	allow.auto.create.topics = true
	auto.commit.interval.ms = 1000
	auto.offset.reset = earliest
	bootstrap.servers = [192.168.1.92:9092]
	check.crcs = true
	client.dns.lookup = use_all_dns_ips
	client.id = consumer-KafkaGroup-2
	client.rack = 
	connections.max.idle.ms = 540000
	default.api.timeout.ms = 60000
	enable.auto.commit = false
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = KafkaGroup
	group.instance.id = null
	heartbeat.interval.ms = 3000
	interceptor.classes = []
	internal.leave.group.on.close = true
	internal.throw.on.fetch.stable.offset.unsupported = false
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 600000
	max.poll.records = 50
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.connect.timeout.ms = null
	sasl.login.read.timeout.ms = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.login.retry.backoff.max.ms = 10000
	sasl.login.retry.backoff.ms = 100
	sasl.mechanism = GSSAPI
	sasl.oauthbearer.clock.skew.seconds = 30
	sasl.oauthbearer.expected.audience = null
	sasl.oauthbearer.expected.issuer = null
	sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
	sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
	sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
	sasl.oauthbearer.jwks.endpoint.url = null
	sasl.oauthbearer.scope.claim.name = scope
	sasl.oauthbearer.sub.claim.name = sub
	sasl.oauthbearer.token.endpoint.url = null
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	session.timeout.ms = 45000
	socket.connection.setup.timeout.max.ms = 30000
	socket.connection.setup.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.certificate.chain = null
	ssl.keystore.key = null
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.2
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.certificates = null
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

2023-02-22 10:20:54.315  INFO 38584 --- [  restartedMain] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.1.2
2023-02-22 10:20:54.315  INFO 38584 --- [  restartedMain] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: f8c67dc3ae0a3265
2023-02-22 10:20:54.315  INFO 38584 --- [  restartedMain] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1677032454315
2023-02-22 10:20:54.315  INFO 38584 --- [  restartedMain] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-KafkaGroup-2, groupId=KafkaGroup] Subscribed to topic(s): AudioUploadTopic
2023-02-22 10:20:54.317  INFO 38584 --- [  restartedMain] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
	allow.auto.create.topics = true
	auto.commit.interval.ms = 1000
	auto.offset.reset = earliest
	bootstrap.servers = [192.168.1.92:9092]
	check.crcs = true
	client.dns.lookup = use_all_dns_ips
	client.id = consumer-KafkaGroup-3
	client.rack = 
	connections.max.idle.ms = 540000
	default.api.timeout.ms = 60000
	enable.auto.commit = false
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = KafkaGroup
	group.instance.id = null
	heartbeat.interval.ms = 3000
	interceptor.classes = []
	internal.leave.group.on.close = true
	internal.throw.on.fetch.stable.offset.unsupported = false
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 600000
	max.poll.records = 50
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.connect.timeout.ms = null
	sasl.login.read.timeout.ms = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.login.retry.backoff.max.ms = 10000
	sasl.login.retry.backoff.ms = 100
	sasl.mechanism = GSSAPI
	sasl.oauthbearer.clock.skew.seconds = 30
	sasl.oauthbearer.expected.audience = null
	sasl.oauthbearer.expected.issuer = null
	sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
	sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
	sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
	sasl.oauthbearer.jwks.endpoint.url = null
	sasl.oauthbearer.scope.claim.name = scope
	sasl.oauthbearer.sub.claim.name = sub
	sasl.oauthbearer.token.endpoint.url = null
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	session.timeout.ms = 45000
	socket.connection.setup.timeout.max.ms = 30000
	socket.connection.setup.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.certificate.chain = null
	ssl.keystore.key = null
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.2
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.certificates = null
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

2023-02-22 10:20:54.325  INFO 38584 --- [  restartedMain] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.1.2
2023-02-22 10:20:54.325  INFO 38584 --- [  restartedMain] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: f8c67dc3ae0a3265
2023-02-22 10:20:54.325  INFO 38584 --- [  restartedMain] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1677032454325
2023-02-22 10:20:54.325  INFO 38584 --- [  restartedMain] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-KafkaGroup-3, groupId=KafkaGroup] Subscribed to topic(s): AudioUploadTopic
2023-02-22 10:20:54.326  WARN 38584 --- [ntainer#0-1-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-KafkaGroup-2, groupId=KafkaGroup] Error while fetching metadata with correlation id 2 : {AudioUploadTopic=UNKNOWN_TOPIC_OR_PARTITION}
2023-02-22 10:20:54.327  INFO 38584 --- [  restartedMain] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
	allow.auto.create.topics = true
	auto.commit.interval.ms = 1000
	auto.offset.reset = earliest
	bootstrap.servers = [192.168.1.92:9092]
	check.crcs = true
	client.dns.lookup = use_all_dns_ips
	client.id = consumer-KafkaGroup-4
	client.rack = 
	connections.max.idle.ms = 540000
	default.api.timeout.ms = 60000
	enable.auto.commit = false
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = KafkaGroup
	group.instance.id = null
	heartbeat.interval.ms = 3000
	interceptor.classes = []
	internal.leave.group.on.close = true
	internal.throw.on.fetch.stable.offset.unsupported = false
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 600000
	max.poll.records = 50
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.connect.timeout.ms = null
	sasl.login.read.timeout.ms = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.login.retry.backoff.max.ms = 10000
	sasl.login.retry.backoff.ms = 100
	sasl.mechanism = GSSAPI
	sasl.oauthbearer.clock.skew.seconds = 30
	sasl.oauthbearer.expected.audience = null
	sasl.oauthbearer.expected.issuer = null
	sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
	sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
	sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
	sasl.oauthbearer.jwks.endpoint.url = null
	sasl.oauthbearer.scope.claim.name = scope
	sasl.oauthbearer.sub.claim.name = sub
	sasl.oauthbearer.token.endpoint.url = null
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	session.timeout.ms = 45000
	socket.connection.setup.timeout.max.ms = 30000
	socket.connection.setup.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.certificate.chain = null
	ssl.keystore.key = null
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.2
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.certificates = null
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

2023-02-22 10:20:54.328  INFO 38584 --- [ntainer#0-1-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-KafkaGroup-2, groupId=KafkaGroup] Cluster ID: 1-P2cI8YRgSflmbY8vMd1w
2023-02-22 10:20:54.332  INFO 38584 --- [ntainer#0-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-KafkaGroup-2, groupId=KafkaGroup] Discovered group coordinator 192.168.1.92:9092 (id: 2147482646 rack: null)
2023-02-22 10:20:54.335  INFO 38584 --- [ntainer#0-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-KafkaGroup-2, groupId=KafkaGroup] (Re-)joining group
2023-02-22 10:20:54.340  INFO 38584 --- [  restartedMain] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.1.2
2023-02-22 10:20:54.340  INFO 38584 --- [  restartedMain] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: f8c67dc3ae0a3265
2023-02-22 10:20:54.340  WARN 38584 --- [ntainer#0-2-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-KafkaGroup-3, groupId=KafkaGroup] Error while fetching metadata with correlation id 2 : {AudioUploadTopic=UNKNOWN_TOPIC_OR_PARTITION}
2023-02-22 10:20:54.340  INFO 38584 --- [  restartedMain] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1677032454340
2023-02-22 10:20:54.340  INFO 38584 --- [ntainer#0-2-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-KafkaGroup-3, groupId=KafkaGroup] Cluster ID: 1-P2cI8YRgSflmbY8vMd1w
2023-02-22 10:20:54.340  INFO 38584 --- [  restartedMain] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-KafkaGroup-4, groupId=KafkaGroup] Subscribed to topic(s): AudioUploadTopic
2023-02-22 10:20:54.340  INFO 38584 --- [ntainer#0-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-KafkaGroup-3, groupId=KafkaGroup] Discovered group coordinator 192.168.1.92:9092 (id: 2147482646 rack: null)
2023-02-22 10:20:54.343  INFO 38584 --- [ntainer#0-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-KafkaGroup-3, groupId=KafkaGroup] (Re-)joining group
2023-02-22 10:20:54.361  WARN 38584 --- [ntainer#0-3-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-KafkaGroup-4, groupId=KafkaGroup] Error while fetching metadata with correlation id 2 : {AudioUploadTopic=UNKNOWN_TOPIC_OR_PARTITION}
2023-02-22 10:20:54.361  INFO 38584 --- [ntainer#0-3-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-KafkaGroup-4, groupId=KafkaGroup] Cluster ID: 1-P2cI8YRgSflmbY8vMd1w
2023-02-22 10:20:54.363  INFO 38584 --- [ntainer#0-3-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-KafkaGroup-4, groupId=KafkaGroup] Discovered group coordinator 192.168.1.92:9092 (id: 2147482646 rack: null)
2023-02-22 10:20:54.365  INFO 38584 --- [ntainer#0-3-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-KafkaGroup-4, groupId=KafkaGroup] (Re-)joining group
2023-02-22 10:20:54.367  INFO 38584 --- [ntainer#0-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-KafkaGroup-3, groupId=KafkaGroup] Request joining group due to: need to re-join with the given member-id
2023-02-22 10:20:54.367  INFO 38584 --- [ntainer#0-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-KafkaGroup-2, groupId=KafkaGroup] Request joining group due to: need to re-join with the given member-id
2023-02-22 10:20:54.368  INFO 38584 --- [ntainer#0-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-KafkaGroup-3, groupId=KafkaGroup] (Re-)joining group
2023-02-22 10:20:54.368  INFO 38584 --- [ntainer#0-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-KafkaGroup-2, groupId=KafkaGroup] (Re-)joining group
2023-02-22 10:20:54.374  INFO 38584 --- [  restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8001 (http) with context path ''
2023-02-22 10:20:54.375  INFO 38584 --- [ntainer#0-3-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-KafkaGroup-4, groupId=KafkaGroup] Request joining group due to: need to re-join with the given member-id
2023-02-22 10:20:54.376  INFO 38584 --- [ntainer#0-3-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-KafkaGroup-4, groupId=KafkaGroup] (Re-)joining group
2023-02-22 10:20:54.389  WARN 38584 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-KafkaGroup-1, groupId=KafkaGroup] Error while fetching metadata with correlation id 2 : {AudioUploadTopic=LEADER_NOT_AVAILABLE}
2023-02-22 10:20:54.390  INFO 38584 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-KafkaGroup-1, groupId=KafkaGroup] Cluster ID: 1-P2cI8YRgSflmbY8vMd1w
2023-02-22 10:20:54.390  INFO 38584 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-KafkaGroup-1, groupId=KafkaGroup] Discovered group coordinator 192.168.1.92:9092 (id: 2147482646 rack: null)
2023-02-22 10:20:54.391  INFO 38584 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-KafkaGroup-1, groupId=KafkaGroup] (Re-)joining group
2023-02-22 10:20:54.394  INFO 38584 --- [  restartedMain] com.xu.kafka.KafkaApplication            : Started KafkaApplication in 3.708 seconds (JVM running for 4.278)

6 效果测试

在这里插入图片描述

发送成功	SendResult [producerRecord=ProducerRecord(topic=AudioUploadTopic, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=有回调的消息推送111, timestamp=null), recordMetadata=AudioUploadTopic-0@4]

在这里插入图片描述