Kafka 生产者 TCP 管理
2023-09-14 09:14:49 时间
Kafka 的所有通信都是基于 TCP
TCP/HTTP 区别:
- 用 TCP 多路复用请求,同时轮询多个连接的能力
- HTTP 库都简陋
多路复用请求 (multiplexing request) :
- 将多个数据流合并到底层单一物理连接中的过程
- TCP 的多路复用请求会在一条物理连接上创建 n 个虚拟连接,每个虚拟连接负责流转各自对应的数据流
开发生产者的步骤:
- 构造生产者对象所需的参数对象
- 利用参数对象,创建 KafkaProducer 对象实例
- 用 KafkaProducer 的 send 发送消息
- 调用 KafkaProducer 的 close 关闭生产者并释放各种系统资源
Properties props = new Properties();
props.put("参数 1", "参数 1 的值");
props.put("参数 2", "参数 2 的值");
//...
try (Producer<String, String> producer = new KafkaProducer<>(props)) {
producer.send(new ProducerRecord<String, String>(……), callback);
//...
}
Java Producer 管理 TCP 连接:
- KafkaProducer 实例创建时, 启动 Sender 线程,并与
bootstrap.servers
中所有 Broker 的 TCP 连接 - KafkaProducer 实例首次更新元数据信息后,还会与集群中所有 Broker 的 TCP 连接
- 当 Producer 发送消息到某台 Broker 时,发现没有与该 Broker 的 TCP 连接,就会立即创建连接
- 当 Producer 端
connections.max.idle.ms > 0
,TCP 连接会自动关闭 - 当
connections.max.idle.ms = -1
,TCP 连接将无法关闭,成为僵尸连接
创建 TCP 连接
创建 KafkaProducer 实例时,生产者会在后台创建并启动 Sender 线程
- 该 Sender 线程开始运行时,先会创建与 Broker 的连接
Producer 启动时,会与 bootstrap.servers
内的 Broker 连接
- 生产中,只要配置 3~4 台在
bootstrap.servers
中就行
日志:
bootstrap.servers
配置了 localhost:9092,localhost:9093- KafkaProducer 创建后,会创建与两台 Broker 的 TCP 连接
- 日志最后一行 : Producer 向某一台 Broker 发送了 METADATA 请求,尝试获取集群的元数据信息
[2018-12-09 09:35:45,620] DEBUG [Producer clientId=producer-1] Initialize connection to node localhost:9093 (id: -2 rack: null) for sending metadata request (org.apache.kafka.clients.NetworkClient:1084)
[2018-12-09 09:35:45,622] DEBUG [Producer clientId=producer-1] Initiating connection to node localhost:9093 (id: -2 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:914)
[2018-12-09 09:35:45,814] DEBUG [Producer clientId=producer-1] Initialize connection to node localhost:9092 (id: -1 rack: null) for sending metadata request (org.apache.kafka.clients.NetworkClient:1084)
[2018-12-09 09:35:45,815] DEBUG [Producer clientId=producer-1] Initiating connection to node localhost:9092 (id: -1 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:914)
[2018-12-09 09:35:45,828] DEBUG [Producer clientId=producer-1] Sending metadata request (type=MetadataRequest, topics=) to node localhost:9093 (id: -2 rack: null) (org.apache.kafka.clients.NetworkClient:1068)
TCP 连接可能被创建地方 :
- 更新元数据后 : 当 Producer 更新集群的元数据信息后,当与某些 Broker 没有连接,就创建一个 TCP 连接
- 在消息发送时 : 当要发送消息时,当与目标 Broker 没有连接,就创建一个 TCP 连接
Producer 更新集群元数据信息的场景 :
- 当 Producer 向不存在的主题发送消息时,Broker 会告诉 Producer 该主题不存在。此时 Producer 会发送 METADATA 请求给 Kafka 集群,尝试获取最新的元数据信息
- Producer 通过
metadata.max.age.ms
(默认值: 300000) 定期更新元数据信息。不管集群是否有变化,Producer 每metadata.max.age.ms
会强制刷新一次元数据
关闭 TCP 连接
Producer 关闭 TCP 连接的方式:
- 用户主动关闭
- Kafka 自动关闭
主动关闭 :
kill -9
杀掉 Producer- 调用
producer.close()
关闭
Kafka 自动关闭:
- 当在
connections.max.idle.ms
(默认: 9 分钟) 内没有任何请求某个 TCP 连接,主动关闭 TCP 连接 connections.max.idle.ms=-1
( Producer ),TCP 连接为永久长连接,只能被 keepalive 探活机制关闭
Kafka 自动关闭,对客户端是被动关闭状态 (passive close )
- 被动关闭的后果 : 产生大量的 CLOSE_WAIT 连接
相关文章
- Kafka vs RocketMQ—— Topic数量对单机性能的影响
- kafka学习之-雅虎开源管理工具Kafka Manager
- 来吧,1分钟带你玩转Kafka
- kafka集群中jmx端口设置
- Kafka压测— 搞垮kafka的方法(转)
- kafka文档
- KAFKA安装+配置详解+常用操作+监控
- kafka 第一次小整理(草稿篇)————整理一下自己的认知
- FATAL Fatal error during KafkaServerStable startup. Prepare to shutdown (kafka.server.KafkaServerStartable) java.io.FileNotFoundException: /tmp/kafka-logs/.lock (Permission denied)
- TCP 三次握手四次挥手, ack 报文的大小.tcp和udp的不同之处、tcp如何保证可靠的、tcp滑动窗口解释
- 【云原生 | Kubernetes 系列】---Kafka 集群安装配置手册
- pyspark kafka createDirectStream和createStream 区别
- 大数据Hadoop之——Kafka鉴权认证(Kafka kerberos认证+kafka账号密码认证+CDH Kerberos认证)
- Kafka消费者 TCP管理
- 解开Kafka神秘的面纱(一):kafka架构与应用场景
- 解开Kafka神秘的面纱(三):kafka单机部署和集群部署
- Apache Kafka 特性