SpringBoot 集成 Kafka
2023-04-18 16:26:44 时间
SpringBoot 集成 Kafka
1 Docker 安装 Kafka
2 Kafka 创建 Topic
创建两个topic:topic1、topic2,其分区和副本数都设置为1 (可以在Java代码中创建)
PS C:UsersAdministrator> docker exec -it kafka /bin/sh
$ kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic1
Created topic topic1.
$ kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic2
Created topic topic2.
3 Java 创建 Topic
package com.xu.mq.demo.test.service;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.apache.kafka.clients.admin.NewTopic;
/**
* kafka 初始化配置类 创建 Topic
*
* @author Administrator
* @date 2023年2月17日11点30分
*/
@Configuration
public class KafkaInitialConfig {
public static final String AUDIO_UPLOAD_TOPIC = "AudioUploadTopic";
public static final String TEXT_UPLOAD_TOPIC = "TextUploadTopic";
@Bean
public NewTopic audioUploadTopic() {
// 设置分区1,备份1
return new NewTopic(AUDIO_UPLOAD_TOPIC, 1, (short) 1);
}
@Bean
public NewTopic textUploadTopic() {
// 设置分区3,备份2
return new NewTopic(TEXT_UPLOAD_TOPIC, 3, (short) 2);
}
}
4 SpringBoot 集成 Kafka
4.1 pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.8</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.xu</groupId>
<artifactId>kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafka</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.12</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
4.2 application.yml
server:
port: 8001
spring:
application:
name: hello-kafka
kafka:
# 以逗号分隔的地址列表,用于建立与Kafka集群的初始连接(kafka 默认的端口号为9092)
bootstrap-servers: 192.168.1.92:9092
producer:
# 消息发送失败重试次数
retries: 3
# 重试间隔
retry-backoff-ms: 500
# 压缩消息,支持四种类型,分别为:none、lz4、gzip、snappy,默认为none,lz4压缩比最高
compression-type: none
#发送缓冲区大小32M
buffer-memory: 33554432
# 去缓冲区中一次拉16k的数据,发送到broker
batch-size: 16384
# 每条消息大小限制 20M
max-request-size: 20971520
# 设置发送延时时间,如果在设置的时间内依然没有达到batch-size,依然发出消息给kafka集群
linger-ms: 30
# 失败重试时,保证消息顺序性,会降低吞吐量
max-in-flight-requests-per-connection: 1
# 开启发送消息幂等性(单分区)
enable-idempotence: true
# 生产者空间不足时阻塞的时间,默认60s
max-block-ms: 6000
# acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
# acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
# acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。
acks: -1
# key,value序列化器选择
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 开启事务,当开启时retries必须>0 acks必须为all 可以使用kafkaTemplate.executeInTransaction和@Transactional实现卡夫卡事物
transaction-id-prefix: transaction
consumer:
group-id: KafkaGroup
# 提交offset延时(接收到消息后多久提交offset)
auto-commit-interval: 1000
# earliest:重置为分区中最小的offset;
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
# none:只要有一个分区不存在已提交的offset,就抛出异常;
auto-offset-reset: earliest
# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
enable-auto-commit: false
# 键的反序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 值的反序列化方式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 批量消费每次最多消费多少条消息
max-poll-records: 50
# 根据实际场景可将max.poll.interval.ms值设置大一点,避免不必要的Rebalance
properties:
max:
poll:
interval:
ms: 600000
listener:
missing-topics-fatal: false
# 线程数
concurrency: 4
# RECORD 当每⼀条记录被消费者监听器(ListenerConsumer)处理之后提交
# BATCH 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
# TIME 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间⼤于TIME时提交
# COUNT 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量⼤于等于COUNT时提交
# COUNT_TIME TIME | COUNT 有⼀个条件满⾜时提交
# MANUAL 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后, ⼿动调⽤Acknowledgment.acknowledge()后提交
# MANUAL_IMMEDIATE ⼿动调⽤Acknowledgment.acknowledge()后⽴即提交,⼀般使⽤这种
ack-mode: manual-immediate
# 消费超时时间
poll-timeout: 3000
4.3 KafkaApplication.java
package com.xu.mq.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;
/**
* @author Administrator
*/
@EnableKafka
@SpringBootApplication
public class KafkaApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
4.4 CustomizePartitioner.java
自定义消息推送的分区,更加具体的业务逻辑可以使用Java自定义设置推送分区也可以使用kafka的默认设置。
package com.xu.kafka.config;
import java.util.Map;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
/**
* @author Administrator
*/
public class CustomizePartitioner implements Partitioner {
/**
* 自定义分区规则
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes The serialized key to partition on( or null if no key)
* @param value The value to partition on or null
* @param valueBytes The serialized value to partition on or null
* @param cluster The current cluster metadata
* @return
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
return 0;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
4.5 KafkaInitialConfig.java
package com.xu.kafka.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.apache.kafka.clients.admin.NewTopic;
/**
* kafka 初始化配置类 创建 Topic
*
* @author Administrator
* @date 2023年2月17日11点30分
*/
@Configuration
public class KafkaInitialConfig {
public static final String AUDIO_UPLOAD_TOPIC = "AudioUploadTopic";
public static final String TEXT_UPLOAD_TOPIC = "TextUploadTopic";
@Bean
public NewTopic audioUploadTopic() {
// 设置分区1,备份1
return new NewTopic(AUDIO_UPLOAD_TOPIC, 1, (short) 1);
}
@Bean
public NewTopic textUploadTopic() {
// 设置分区3,备份2
return new NewTopic(TEXT_UPLOAD_TOPIC, 3, (short) 2);
}
}
4.6 SendMessageController.java 生产者
package com.xu.kafka.message.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.xu.kafka.config.KafkaInitialConfig;
import lombok.extern.slf4j.Slf4j;
/**
* @author Administrator
*/
@Slf4j
@RequestMapping(value = "/kafka")
@RestController
public class SendMessageController {
@Autowired
private KafkaTemplate kafkaTemplate;
@Autowired
private RetryTemplate retryTemplate;
/**
* KafkaTemplate 发送消息 同步
*
* @param msg
*/
@GetMapping("/test1/{msg}")
public void test1(@PathVariable("msg") String msg) {
String key = KafkaInitialConfig.AUDIO_UPLOAD_TOPIC;
String topic = KafkaInitialConfig.AUDIO_UPLOAD_TOPIC;
kafkaTemplate.send(topic, key, msg);
}
/**
* KafkaTemplate 发送消息 异步
*
* @param msg
*/
@GetMapping("/test2/{msg}")
public void test2(@PathVariable("msg") String msg) throws Exception {
String key = KafkaInitialConfig.AUDIO_UPLOAD_TOPIC;
String topic = KafkaInitialConfig.AUDIO_UPLOAD_TOPIC;
kafkaTemplate.send(topic, key, msg).get();
}
/**
* KafkaTemplate 发送消息 同步 有回调
*
* @param msg
*/
@GetMapping("/test3/{msg}")
public void test3(@PathVariable("msg") String msg) {
String key = KafkaInitialConfig.AUDIO_UPLOAD_TOPIC;
String topic = KafkaInitialConfig.AUDIO_UPLOAD_TOPIC;
kafkaTemplate.send(topic, key, msg).addCallback(success -> {
System.out.println("发送成功 " + success);
}, fail -> {
System.out.println("发送失败 " + fail);
});
}
/**
* RetryTemplate 发送消息 同步 有回调
*
* @param msg
*/
@GetMapping("/test4/{msg}")
public void test4(@PathVariable("msg") String msg) {
String key = KafkaInitialConfig.AUDIO_UPLOAD_TOPIC;
String topic = KafkaInitialConfig.AUDIO_UPLOAD_TOPIC;
retryTemplate.execute(retryCallback -> {
kafkaTemplate.send(topic, key, msg);
log.info("Kafka 发送成功 Topic:{}, Key:{}, Count:{}", topic, key, retryCallback.getRetryCount());
return "success " + retryCallback.getRetryCount();
}, recoveryCallback -> {
// 重试后仍然失败后需要执行的代码
log.info("Kafka 发送失败 Topic:{}, Key{}, Count:{}, Info:{}", topic, key, recoveryCallback.getRetryCount(),
recoveryCallback.getLastThrowable().getMessage());
return "failure " + recoveryCallback.getRetryCount();
});
}
/**
* KafkaTemplate 发送消息 事物
*
* @param msg
*/
@GetMapping("/test5/{msg}")
public void test5(@PathVariable("msg") String msg) {
String key = KafkaInitialConfig.AUDIO_UPLOAD_TOPIC;
String topic = KafkaInitialConfig.AUDIO_UPLOAD_TOPIC;
kafkaTemplate.executeInTransaction(operations -> {
operations.send(topic, key, msg);
throw new RuntimeException("fail");
});
}
}
4.7 KafkaConsumer.java 消费者
package com.xu.kafka.message.controller;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import com.xu.kafka.config.KafkaInitialConfig;
import lombok.extern.slf4j.Slf4j;
/**
* @author Administrator
*/
@Slf4j
@Component
public class KafkaConsumer {
/**
* 指定一个消费者组,一个主题主题。
*
* @param record
*/
@KafkaListener(topics = KafkaInitialConfig.AUDIO_UPLOAD_TOPIC)
public void consumer(ConsumerRecord<String, String> record, Acknowledgment ack) {
System.out.printf("分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",
record.partition(),
record.offset(),
record.key(),
record.value(),
record.timestamp()
);
ack.acknowledge();
}
}
分区 = 2, 偏移量 = 16, key = 11111111, 内容 = 1111111111111111111111111111,创建消息的时间戳 =1676983128560
5 启动服务
. ____ _ __ _ _
/\ / ___'_ __ _ _(_)_ __ __ _
( ( )\___ | '_ | '_| | '_ / _` |
\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.7.8)
2023-02-22 10:20:51.142 INFO 38584 --- [ restartedMain] com.xu.kafka.KafkaApplication : Starting KafkaApplication using Java 1.8.0_301 on Hyacinth with PID 38584 (E:SourceCodeIdeakafka argetclasses started by Administrator in E:SourceCodeIdeakafka)
2023-02-22 10:20:51.143 INFO 38584 --- [ restartedMain] com.xu.kafka.KafkaApplication : No active profile set, falling back to 1 default profile: "default"
2023-02-22 10:20:51.200 INFO 38584 --- [ restartedMain] .e.DevToolsPropertyDefaultsPostProcessor : Devtools property defaults active! Set 'spring.devtools.add-properties' to 'false' to disable
2023-02-22 10:20:51.200 INFO 38584 --- [ restartedMain] .e.DevToolsPropertyDefaultsPostProcessor : For additional web related logging consider setting the 'logging.level.web' property to 'DEBUG'
2023-02-22 10:20:53.010 INFO 38584 --- [ restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 8001 (http)
2023-02-22 10:20:53.021 INFO 38584 --- [ restartedMain] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2023-02-22 10:20:53.021 INFO 38584 --- [ restartedMain] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.71]
2023-02-22 10:20:53.140 INFO 38584 --- [ restartedMain] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2023-02-22 10:20:53.140 INFO 38584 --- [ restartedMain] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 1939 ms
2023-02-22 10:20:53.683 INFO 38584 --- [ restartedMain] o.s.b.d.a.OptionalLiveReloadServer : LiveReload server is running on port 35729
2023-02-22 10:20:53.755 INFO 38584 --- [ restartedMain] o.a.k.clients.admin.AdminClientConfig : AdminClientConfig values:
bootstrap.servers = [192.168.1.92:9092]
client.dns.lookup = use_all_dns_ips
client.id =
connections.max.idle.ms = 300000
default.api.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.connect.timeout.ms = null
sasl.login.read.timeout.ms = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.login.retry.backoff.max.ms = 10000
sasl.login.retry.backoff.ms = 100
sasl.mechanism = GSSAPI
sasl.oauthbearer.clock.skew.seconds = 30
sasl.oauthbearer.expected.audience = null
sasl.oauthbearer.expected.issuer = null
sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
sasl.oauthbearer.jwks.endpoint.url = null
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url = null
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
2023-02-22 10:20:53.847 WARN 38584 --- [ restartedMain] o.a.k.clients.admin.AdminClientConfig : The configuration 'max.poll.interval.ms' was supplied but isn't a known config.
2023-02-22 10:20:53.849 INFO 38584 --- [ restartedMain] o.a.kafka.common.utils.AppInfoParser : Kafka version: 3.1.2
2023-02-22 10:20:53.849 INFO 38584 --- [ restartedMain] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: f8c67dc3ae0a3265
2023-02-22 10:20:53.849 INFO 38584 --- [ restartedMain] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1677032453848
2023-02-22 10:20:54.234 ERROR 38584 --- [ restartedMain] o.springframework.kafka.core.KafkaAdmin : Failed to create topics
org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 2 larger than available brokers: 1.
2023-02-22 10:20:54.236 ERROR 38584 --- [ restartedMain] o.springframework.kafka.core.KafkaAdmin : Could not configure topics
org.springframework.kafka.KafkaException: Failed to create topics; nested exception is org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 2 larger than available brokers: 1.
at org.springframework.kafka.core.KafkaAdmin.addTopics(KafkaAdmin.java:450) [spring-kafka-2.8.11.jar:2.8.11]
at org.springframework.kafka.core.KafkaAdmin.addOrModifyTopicsIfNeeded(KafkaAdmin.java:300) [spring-kafka-2.8.11.jar:2.8.11]
at org.springframework.kafka.core.KafkaAdmin.initialize(KafkaAdmin.java:201) [spring-kafka-2.8.11.jar:2.8.11]
at org.springframework.kafka.core.KafkaAdmin.afterSingletonsInstantiated(KafkaAdmin.java:171) [spring-kafka-2.8.11.jar:2.8.11]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:974) [spring-beans-5.3.25.jar:5.3.25]
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:918) [spring-context-5.3.25.jar:5.3.25]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:583) [spring-context-5.3.25.jar:5.3.25]
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:147) [spring-boot-2.7.8.jar:2.7.8]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:731) [spring-boot-2.7.8.jar:2.7.8]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:408) [spring-boot-2.7.8.jar:2.7.8]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:307) [spring-boot-2.7.8.jar:2.7.8]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1303) [spring-boot-2.7.8.jar:2.7.8]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1292) [spring-boot-2.7.8.jar:2.7.8]
at com.xu.kafka.KafkaApplication.main(KafkaApplication.java:15) [classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_301]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_301]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_301]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_301]
at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49) [spring-boot-devtools-2.7.8.jar:2.7.8]
Caused by: org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 2 larger than available brokers: 1.
2023-02-22 10:20:54.236 INFO 38584 --- [| adminclient-1] o.a.kafka.common.utils.AppInfoParser : App info kafka.admin.client for adminclient-1 unregistered
2023-02-22 10:20:54.239 INFO 38584 --- [| adminclient-1] org.apache.kafka.common.metrics.Metrics : Metrics scheduler closed
2023-02-22 10:20:54.240 INFO 38584 --- [| adminclient-1] org.apache.kafka.common.metrics.Metrics : Closing reporter org.apache.kafka.common.metrics.JmxReporter
2023-02-22 10:20:54.240 INFO 38584 --- [| adminclient-1] org.apache.kafka.common.metrics.Metrics : Metrics reporters closed
2023-02-22 10:20:54.266 INFO 38584 --- [ restartedMain] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 1000
auto.offset.reset = earliest
bootstrap.servers = [192.168.1.92:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = consumer-KafkaGroup-1
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = KafkaGroup
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 600000
max.poll.records = 50
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.connect.timeout.ms = null
sasl.login.read.timeout.ms = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.login.retry.backoff.max.ms = 10000
sasl.login.retry.backoff.ms = 100
sasl.mechanism = GSSAPI
sasl.oauthbearer.clock.skew.seconds = 30
sasl.oauthbearer.expected.audience = null
sasl.oauthbearer.expected.issuer = null
sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
sasl.oauthbearer.jwks.endpoint.url = null
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url = null
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 45000
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
2023-02-22 10:20:54.300 INFO 38584 --- [ restartedMain] o.a.kafka.common.utils.AppInfoParser : Kafka version: 3.1.2
2023-02-22 10:20:54.300 INFO 38584 --- [ restartedMain] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: f8c67dc3ae0a3265
2023-02-22 10:20:54.300 INFO 38584 --- [ restartedMain] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1677032454299
2023-02-22 10:20:54.301 INFO 38584 --- [ restartedMain] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-KafkaGroup-1, groupId=KafkaGroup] Subscribed to topic(s): AudioUploadTopic
2023-02-22 10:20:54.309 INFO 38584 --- [ restartedMain] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 1000
auto.offset.reset = earliest
bootstrap.servers = [192.168.1.92:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = consumer-KafkaGroup-2
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = KafkaGroup
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 600000
max.poll.records = 50
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.connect.timeout.ms = null
sasl.login.read.timeout.ms = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.login.retry.backoff.max.ms = 10000
sasl.login.retry.backoff.ms = 100
sasl.mechanism = GSSAPI
sasl.oauthbearer.clock.skew.seconds = 30
sasl.oauthbearer.expected.audience = null
sasl.oauthbearer.expected.issuer = null
sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
sasl.oauthbearer.jwks.endpoint.url = null
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url = null
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 45000
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
2023-02-22 10:20:54.315 INFO 38584 --- [ restartedMain] o.a.kafka.common.utils.AppInfoParser : Kafka version: 3.1.2
2023-02-22 10:20:54.315 INFO 38584 --- [ restartedMain] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: f8c67dc3ae0a3265
2023-02-22 10:20:54.315 INFO 38584 --- [ restartedMain] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1677032454315
2023-02-22 10:20:54.315 INFO 38584 --- [ restartedMain] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-KafkaGroup-2, groupId=KafkaGroup] Subscribed to topic(s): AudioUploadTopic
2023-02-22 10:20:54.317 INFO 38584 --- [ restartedMain] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 1000
auto.offset.reset = earliest
bootstrap.servers = [192.168.1.92:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = consumer-KafkaGroup-3
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = KafkaGroup
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 600000
max.poll.records = 50
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.connect.timeout.ms = null
sasl.login.read.timeout.ms = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.login.retry.backoff.max.ms = 10000
sasl.login.retry.backoff.ms = 100
sasl.mechanism = GSSAPI
sasl.oauthbearer.clock.skew.seconds = 30
sasl.oauthbearer.expected.audience = null
sasl.oauthbearer.expected.issuer = null
sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
sasl.oauthbearer.jwks.endpoint.url = null
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url = null
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 45000
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
2023-02-22 10:20:54.325 INFO 38584 --- [ restartedMain] o.a.kafka.common.utils.AppInfoParser : Kafka version: 3.1.2
2023-02-22 10:20:54.325 INFO 38584 --- [ restartedMain] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: f8c67dc3ae0a3265
2023-02-22 10:20:54.325 INFO 38584 --- [ restartedMain] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1677032454325
2023-02-22 10:20:54.325 INFO 38584 --- [ restartedMain] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-KafkaGroup-3, groupId=KafkaGroup] Subscribed to topic(s): AudioUploadTopic
2023-02-22 10:20:54.326 WARN 38584 --- [ntainer#0-1-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-KafkaGroup-2, groupId=KafkaGroup] Error while fetching metadata with correlation id 2 : {AudioUploadTopic=UNKNOWN_TOPIC_OR_PARTITION}
2023-02-22 10:20:54.327 INFO 38584 --- [ restartedMain] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 1000
auto.offset.reset = earliest
bootstrap.servers = [192.168.1.92:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = consumer-KafkaGroup-4
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = KafkaGroup
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 600000
max.poll.records = 50
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.connect.timeout.ms = null
sasl.login.read.timeout.ms = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.login.retry.backoff.max.ms = 10000
sasl.login.retry.backoff.ms = 100
sasl.mechanism = GSSAPI
sasl.oauthbearer.clock.skew.seconds = 30
sasl.oauthbearer.expected.audience = null
sasl.oauthbearer.expected.issuer = null
sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
sasl.oauthbearer.jwks.endpoint.url = null
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url = null
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 45000
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
2023-02-22 10:20:54.328 INFO 38584 --- [ntainer#0-1-C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-KafkaGroup-2, groupId=KafkaGroup] Cluster ID: 1-P2cI8YRgSflmbY8vMd1w
2023-02-22 10:20:54.332 INFO 38584 --- [ntainer#0-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-KafkaGroup-2, groupId=KafkaGroup] Discovered group coordinator 192.168.1.92:9092 (id: 2147482646 rack: null)
2023-02-22 10:20:54.335 INFO 38584 --- [ntainer#0-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-KafkaGroup-2, groupId=KafkaGroup] (Re-)joining group
2023-02-22 10:20:54.340 INFO 38584 --- [ restartedMain] o.a.kafka.common.utils.AppInfoParser : Kafka version: 3.1.2
2023-02-22 10:20:54.340 INFO 38584 --- [ restartedMain] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: f8c67dc3ae0a3265
2023-02-22 10:20:54.340 WARN 38584 --- [ntainer#0-2-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-KafkaGroup-3, groupId=KafkaGroup] Error while fetching metadata with correlation id 2 : {AudioUploadTopic=UNKNOWN_TOPIC_OR_PARTITION}
2023-02-22 10:20:54.340 INFO 38584 --- [ restartedMain] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1677032454340
2023-02-22 10:20:54.340 INFO 38584 --- [ntainer#0-2-C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-KafkaGroup-3, groupId=KafkaGroup] Cluster ID: 1-P2cI8YRgSflmbY8vMd1w
2023-02-22 10:20:54.340 INFO 38584 --- [ restartedMain] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-KafkaGroup-4, groupId=KafkaGroup] Subscribed to topic(s): AudioUploadTopic
2023-02-22 10:20:54.340 INFO 38584 --- [ntainer#0-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-KafkaGroup-3, groupId=KafkaGroup] Discovered group coordinator 192.168.1.92:9092 (id: 2147482646 rack: null)
2023-02-22 10:20:54.343 INFO 38584 --- [ntainer#0-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-KafkaGroup-3, groupId=KafkaGroup] (Re-)joining group
2023-02-22 10:20:54.361 WARN 38584 --- [ntainer#0-3-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-KafkaGroup-4, groupId=KafkaGroup] Error while fetching metadata with correlation id 2 : {AudioUploadTopic=UNKNOWN_TOPIC_OR_PARTITION}
2023-02-22 10:20:54.361 INFO 38584 --- [ntainer#0-3-C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-KafkaGroup-4, groupId=KafkaGroup] Cluster ID: 1-P2cI8YRgSflmbY8vMd1w
2023-02-22 10:20:54.363 INFO 38584 --- [ntainer#0-3-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-KafkaGroup-4, groupId=KafkaGroup] Discovered group coordinator 192.168.1.92:9092 (id: 2147482646 rack: null)
2023-02-22 10:20:54.365 INFO 38584 --- [ntainer#0-3-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-KafkaGroup-4, groupId=KafkaGroup] (Re-)joining group
2023-02-22 10:20:54.367 INFO 38584 --- [ntainer#0-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-KafkaGroup-3, groupId=KafkaGroup] Request joining group due to: need to re-join with the given member-id
2023-02-22 10:20:54.367 INFO 38584 --- [ntainer#0-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-KafkaGroup-2, groupId=KafkaGroup] Request joining group due to: need to re-join with the given member-id
2023-02-22 10:20:54.368 INFO 38584 --- [ntainer#0-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-KafkaGroup-3, groupId=KafkaGroup] (Re-)joining group
2023-02-22 10:20:54.368 INFO 38584 --- [ntainer#0-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-KafkaGroup-2, groupId=KafkaGroup] (Re-)joining group
2023-02-22 10:20:54.374 INFO 38584 --- [ restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8001 (http) with context path ''
2023-02-22 10:20:54.375 INFO 38584 --- [ntainer#0-3-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-KafkaGroup-4, groupId=KafkaGroup] Request joining group due to: need to re-join with the given member-id
2023-02-22 10:20:54.376 INFO 38584 --- [ntainer#0-3-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-KafkaGroup-4, groupId=KafkaGroup] (Re-)joining group
2023-02-22 10:20:54.389 WARN 38584 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-KafkaGroup-1, groupId=KafkaGroup] Error while fetching metadata with correlation id 2 : {AudioUploadTopic=LEADER_NOT_AVAILABLE}
2023-02-22 10:20:54.390 INFO 38584 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-KafkaGroup-1, groupId=KafkaGroup] Cluster ID: 1-P2cI8YRgSflmbY8vMd1w
2023-02-22 10:20:54.390 INFO 38584 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-KafkaGroup-1, groupId=KafkaGroup] Discovered group coordinator 192.168.1.92:9092 (id: 2147482646 rack: null)
2023-02-22 10:20:54.391 INFO 38584 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-KafkaGroup-1, groupId=KafkaGroup] (Re-)joining group
2023-02-22 10:20:54.394 INFO 38584 --- [ restartedMain] com.xu.kafka.KafkaApplication : Started KafkaApplication in 3.708 seconds (JVM running for 4.278)
6 效果测试
发送成功 SendResult [producerRecord=ProducerRecord(topic=AudioUploadTopic, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=有回调的消息推送111, timestamp=null), recordMetadata=AudioUploadTopic-0@4]
相关文章
- DB2权限与操作
- PostgreSQL 9.1 Alpha 2版本发布(附下载)
- DB2临时表定义的方法
- DB2创建用户的方法
- 『数据密集型应用系统设计』读书笔记(三)
- IBM发布Cognos 10 首次融合社交网络协作和分析功能
- DB2性能优化的十大注意事项
- SQL SERVER连接DB2数据库
- DB2索引创建原则
- 带您了解DB2索引结构
- DB2索引类型介绍
- Oracle、SQL和DB2分页查询写法介绍
- DB2分区数据库的前滚操作
- 两种DB2分区数据库恢复方式
- DB2分区数据库备份的实现方法
- 三类DB2数据库备份方案
- DB2 logfilsiz参数设置
- Cassandra查询语句CQL的小技巧
- DB2批量执行SQL脚本的实现
- DB2命令行的连接