zl程序教程

您现在的位置是:首页 >  大数据

当前栏目

Kafka 生产者 TCP 管理

KafkaTCP 管理 生产者
2023-09-14 09:14:49 时间

Kafka 生产者 TCP 管理

Kafka 的所有通信都是基于 TCP

TCP/HTTP 区别:

  • 用 TCP 多路复用请求,同时轮询多个连接的能力
  • HTTP 库都简陋

多路复用请求 (multiplexing request) :

  • 将多个数据流合并到底层单一物理连接中的过程
  • TCP 的多路复用请求会在一条物理连接上创建 n 个虚拟连接,每个虚拟连接负责流转各自对应的数据流

开发生产者的步骤:

  1. 构造生产者对象所需的参数对象
  2. 利用参数对象,创建 KafkaProducer 对象实例
  3. 用 KafkaProducer 的 send 发送消息
  4. 调用 KafkaProducer 的 close 关闭生产者并释放各种系统资源
Properties props = new Properties();
props.put("参数 1", "参数 1 的值");
props.put("参数 2", "参数 2 的值");

//...
try (Producer<String, String> producer = new KafkaProducer<>(props)) {
	producer.send(new ProducerRecord<String, String>(……), callback);
	//...
}

Java Producer 管理 TCP 连接:

  • KafkaProducer 实例创建时, 启动 Sender 线程,并与 bootstrap.servers 中所有 Broker 的 TCP 连接
  • KafkaProducer 实例首次更新元数据信息后,还会与集群中所有 Broker 的 TCP 连接
  • 当 Producer 发送消息到某台 Broker 时,发现没有与该 Broker 的 TCP 连接,就会立即创建连接
  • 当 Producer 端 connections.max.idle.ms > 0 ,TCP 连接会自动关闭
  • connections.max.idle.ms = -1,TCP 连接将无法关闭,成为僵尸连接

创建 TCP 连接

创建 KafkaProducer 实例时,生产者会在后台创建并启动 Sender 线程

  • 该 Sender 线程开始运行时,先会创建与 Broker 的连接

Producer 启动时,会与 bootstrap.servers内的 Broker 连接

  • 生产中,只要配置 3~4 台在 bootstrap.servers 中就行

日志:

  • bootstrap.servers 配置了 localhost:9092,localhost:9093
  • KafkaProducer 创建后,会创建与两台 Broker 的 TCP 连接
  • 日志最后一行 : Producer 向某一台 Broker 发送了 METADATA 请求,尝试获取集群的元数据信息
[2018-12-09 09:35:45,620] DEBUG [Producer clientId=producer-1] Initialize connection to node localhost:9093 (id: -2 rack: null) for sending metadata request (org.apache.kafka.clients.NetworkClient:1084)
[2018-12-09 09:35:45,622] DEBUG [Producer clientId=producer-1] Initiating connection to node localhost:9093 (id: -2 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:914)

[2018-12-09 09:35:45,814] DEBUG [Producer clientId=producer-1] Initialize connection to node localhost:9092 (id: -1 rack: null) for sending metadata request (org.apache.kafka.clients.NetworkClient:1084)
[2018-12-09 09:35:45,815] DEBUG [Producer clientId=producer-1] Initiating connection to node localhost:9092 (id: -1 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:914)

[2018-12-09 09:35:45,828] DEBUG [Producer clientId=producer-1] Sending metadata request (type=MetadataRequest, topics=) to node localhost:9093 (id: -2 rack: null) (org.apache.kafka.clients.NetworkClient:1068)

TCP 连接可能被创建地方 :

  • 更新元数据后 : 当 Producer 更新集群的元数据信息后,当与某些 Broker 没有连接,就创建一个 TCP 连接
  • 在消息发送时 : 当要发送消息时,当与目标 Broker 没有连接,就创建一个 TCP 连接

Producer 更新集群元数据信息的场景 :

  • 当 Producer 向不存在的主题发送消息时,Broker 会告诉 Producer 该主题不存在。此时 Producer 会发送 METADATA 请求给 Kafka 集群,尝试获取最新的元数据信息
  • Producer 通过 metadata.max.age.ms (默认值: 300000) 定期更新元数据信息。不管集群是否有变化,Producer 每 metadata.max.age.ms 会强制刷新一次元数据

关闭 TCP 连接

Producer 关闭 TCP 连接的方式:

  • 用户主动关闭
  • Kafka 自动关闭

主动关闭 :

  • kill -9 杀掉 Producer
  • 调用 producer.close() 关闭

Kafka 自动关闭:

  • 当在 connections.max.idle.ms (默认: 9 分钟) 内没有任何请求某个 TCP 连接,主动关闭 TCP 连接
  • connections.max.idle.ms=-1 ( Producer ),TCP 连接为永久长连接,只能被 keepalive 探活机制关闭

Kafka 自动关闭,对客户端是被动关闭状态 (passive close )

  • 被动关闭的后果 : 产生大量的 CLOSE_WAIT 连接