zl程序教程

您现在的位置是:首页 >  大数据

当前栏目

Apache Kafka源码分析 – Broker Server

ApacheKafkaserver源码 分析 Broker
2023-09-11 14:16:09 时间
 1: package kafka.server
 2: class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging {
 3: private var server : KafkaServer = null
 4:  
 5: private def init() {
 6: server = new KafkaServer(serverConfig)
 7: }
 8:  
 9: def startup() {
 10: try {
 11: server.startup()
 12: }
 13: catch {...}
 14: }
 15: }

KafkaServer代表一个kafka broker, 这是kafka的核心. 
只需要看看里面startup了哪些modules, 就知道broker做了哪些工作, 后面一个个具体分析吧


 2: /**
 3: * Represents the lifecycle of a single Kafka broker. Handles all functionality required
 4: * to start up and shutdown a single Kafka node.
 5: */
 6: class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging {
 7: var socketServer: SocketServer = null
 8: var requestHandlerPool: KafkaRequestHandlerPool = null
 9: var logManager: LogManager = null
 10: var kafkaHealthcheck: KafkaHealthcheck = null
 11: var topicConfigManager: TopicConfigManager = null
 12: var replicaManager: ReplicaManager = null
 13: var apis: KafkaApis = null
 14: var kafkaController: KafkaController = null
 15: val kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
 16: var zkClient: ZkClient = null
 17:  
 18: /**
 19: * Start up API for bringing up a single instance of the Kafka server.
 20: * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
 21: */
 22: def startup() {
 23: /* start scheduler */
 24: kafkaScheduler.startup()
 25: 
 26: /* setup zookeeper */
 27: zkClient = initZk()
 28:  
 29: /* start log manager */
 30: logManager = createLogManager(zkClient)
 31: logManager.startup()
 32:  
 33: socketServer = new SocketServer(config.brokerId,
 34: config.hostName,
 35: config.port,
 36: config.numNetworkThreads,
 37: config.queuedMaxRequests,
 38: config.socketSendBufferBytes,
 39: config.socketReceiveBufferBytes,
 40: config.socketRequestMaxBytes)
 41: socketServer.startup()
 42:  
 43: replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown)
 44: kafkaController = new KafkaController(config, zkClient)
 45: 
 46: /* start processing requests */
 47: apis = new KafkaApis(socketServer.requestChannel, replicaManager, zkClient, config.brokerId, config, kafkaController)
 48: requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
 49: 
 50: replicaManager.startup()
 51:  
 52: kafkaController.startup()
 53: 
 54: topicConfigManager = new TopicConfigManager(zkClient, logManager)
 55: topicConfigManager.startup()
 56: 
 57: /* tell everyone we are alive */
 58: kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient)
 59: kafkaHealthcheck.startup()
 60: }

2.1 KafkaScheduler

KafkaSchduler用于在后台执行一些任务,用ScheduledThreadPoolExecutor实现


 3: /**
 4: * A scheduler based on java.util.concurrent.ScheduledThreadPoolExecutor
 5: * 
 6: * It has a pool of kafka-scheduler- threads that do the actual work.
 7: * 
 8: * @param threads The number of threads in the thread pool
 9: * @param threadNamePrefix The name to use for scheduler threads. This prefix will have a number appended to it.
 10: * @param daemon If true the scheduler threads will be "daemon" threads and will not block jvm shutdown.
 11: */
 12: @threadsafe
 13: class KafkaScheduler(val threads: Int, 
 14: val threadNamePrefix: String = "kafka-scheduler-", 
 15: daemon: Boolean = true) extends Scheduler with Logging {
 16: @volatile private var executor: ScheduledThreadPoolExecutor = null 
 17: override def startup() {
 18: this synchronized {
 19: executor = new ScheduledThreadPoolExecutor(threads) //创建ScheduledThreadPoolExecutor
 20: executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)
 21: executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
 22: executor.setThreadFactory(new ThreadFactory() {
 23: def newThread(runnable: Runnable): Thread = 
 24: Utils.newThread(threadNamePrefix + schedulerThreadId.getAndIncrement(), runnable, daemon)
 25: })
 26: }
 27: }
 28:  
 29: def schedule(name: String, fun: ()= Unit, delay: Long, period: Long, unit: TimeUnit) = {
 30: val runnable = new Runnable { //将fun封装成Runnable
 31: def run() = {
 32: try {
 33: fun()
 34: } catch {...} 
 35: finally {...}
 36: }
 37: }
 38: if(period = 0) //在pool中进行delay schedule
 39: executor.scheduleAtFixedRate(runnable, delay, period, unit)
 40: else
 41: executor.schedule(runnable, delay, unit)
 42: }

2.2 Zookeeper Client

由于Kafka是基于zookeeper进行配置管理的, 所以需要创建zkclient和zookeeper集群通信

2.3 logManager

The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning. 
Apache Kafka源码分析 – Log Management

 

2.4 ReplicaManager

在0.8中新加入的replica相关模块

Apache Kafka Replication Design – High level
kafka Detailed Replication Design V3
Apache Kafka源码分析 – ReplicaManager

 

2.5 Kafka Socket Server

首先broker server是socket server,所有和broker的交互都是通过往socket端口发送request来实现的

socketServer = new SocketServer(config.brokerId...)

KafkaApis
该类封装了所有request的处理逻辑



KafkaRequestHandler



 

2.6 offsetManager

offsetManager = createOffsetManager()
定期清除过期的offset数据,即compact操作,

scheduler.schedule(name = "offsets-cache-compactor",

 fun = compact,

 period = config.offsetsRetentionCheckIntervalMs,

 unit = TimeUnit.MILLISECONDS)

以及consumer相关的一些offset操作,不细究了,因为我们不用highlevel consumer

 

2.7 KafkaController

kafkaController = new KafkaController(config, zkClient, brokerState)

Apache Kafka源码分析 – Controller

0.8后,为了处理replica,会用一个broker作为master,即controller,用于协调replica的一致性

2.8 TopicConfigManager



topicConfigManager = new TopicConfigManager(zkClient, logManager)

TopicConfigManager用于处理topic config的change,kafka除了全局的配置,还有一种叫Topic-level configuration

 bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic 

 --config max.message.bytes=128000

比如你可以这样设置,那么这些topic config如何生效的?

topic-level config默认是被存储在,

/brokers/topics/ topic_name /config
但是topic很多的情况下,为了避免创建太多的watcher,

所以单独创建一个目录

/brokers/config_changes

来触发配置的变化
所以上面的命令除了,把配置写入topic/config,还有增加一个通知,告诉watcher哪个topic的config发生了变化

/brokers/config_changes/config_change_13321

并且这个通知有个suffix,用于区别是否已处理过

复制代码
/**

 * Process the given list of config changes

 private def processConfigChanges(notifications: Seq[String]) {

 if (notifications.size 0) {

 info("Processing config change notification(s)...")

 val now = time.milliseconds

 val logs = logManager.logsByTopicPartition.toBuffer

 val logsByTopic = logs.groupBy(_._1.topic).mapValues(_.map(_._2))

 for (notification - notifications) {

 val changeId = changeNumber(notification)

 if (changeId lastExecutedChange) { //未处理过

 val changeZnode = ZkUtils.TopicConfigChangesPath + "/" + notification

 val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, changeZnode)

 if(jsonOpt.isDefined) {

 val json = jsonOpt.get

 val topic = json.substring(1, json.length - 1) // hacky way to dequote,从通知中获取topic name

 if (logsByTopic.contains(topic)) {

 /* combine the default properties with the overrides in zk to create the new LogConfig */

 val props = new Properties(logManager.defaultConfig.toProps)

 props.putAll(AdminUtils.fetchTopicConfig(zkClient, topic))

 val logConfig = LogConfig.fromProps(props)

 for (log - logsByTopic(topic))

 log.config = logConfig //真正的更新log配置

 info("Processed topic config change %d for topic %s, setting new config to %s.".format(changeId, topic, props))

 purgeObsoleteNotifications(now, notifications) //删除过期的notification,10分钟

 lastExecutedChange = changeId

 }
复制代码 这个failover也没问题,反正配置设置多次也是无害的,每次启动都会把所有没过期的notification处理一遍

并且broker重启后是会从zk中, loading完整的配置的,所以也ok的,这个主要用于实时更新topic的配置

 

2.8 KafkaHealthcheck

kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient)

这个很简单,就像注释的,告诉所有人我还活着。。。

实现就是在,

 /brokers/[0...N] -- advertisedHost:advertisedPort

register一个ephemeral znode,当SessionExpired时,再去register,典型zk应用
所以只需要watch这个路径就是知道broker是否还活着

2.9 ContolledShutdown

对于0.8之前,broker的startup和shutdown都很简单,把上面这些组件初始化,或stop就可以了

但是0.8后,增加replica,所以broker不能自己直接shutdown,需要先通知controller,controller做完处理后,比如partition leader的迁移,或replica offline,然后才能shutdown

private def controlledShutdown()

挺长的,逻辑就是找到controller,发送ControlledShutdownRequest,然后等待返回,如果失败,就是unclean shutdown


本文章摘自博客园,原文发布日期: 2014-02-14


Apache kafka安装和配置 Apache kafka是一个分布的、分区的、复制的提交日志服务,它使用独一无二的设计,提供了消息系统功能。它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现。kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker。无论是kafka集群,还是producer和consumer都依赖于zookeeper来保证系统可用性集群保存一些meta信息。
重磅消息!弃用 Java 8、Apache Kafka 3.0 发布!  什么是Kafka? Apache Kafka是分布式发布订阅消息传递系统和强大的队列,可以处理大量数据,并使您能够将消息从一个端点传递到另一个终端。Kafka适用于离线和在线消息消费。Kafka消息被保留在磁盘上,并在集群内复制以防止数据丢失。Kafka建立在ZooKeeper同步服务之上。它与Apache Storm和Spark完美结合,实时流式传输数据分析。 作者:zhulin1028 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
Apache Doris Broker数据导入使用示例及介绍 Broker load 是一个异步的导入方式,支持的数据源取决于 Broker 进程支持的数据源。用户需要通过 MySQL 协议 创建 Broker load 导入,并通过查看导入命令检查导入结果。
Java单元测试之 Apache Kafka 对于程序员是否有必要编写test case,何时编写依然存在很多争议,各种互斥的方法论(SE/AM/XP/TDD),以及不同的开发文化,但是可以确定是编写单元测试用例有助于提高编程能力。
《Apache Kafka实战》| 每日读本书 基于Apache Kafka 1.0.0版本进行介绍,Kafka Contributor执笔。包括Kafka基本概念与特性,以及Kafka的部署、开发、运营、监控、调试、优化以及重要组件的设计原理,并给出了翔实的案例。每日搜罗最具权威专业书籍,更多图书请关注“每日读本书”。
Apache NiFi之Kafka流数据到HBase 在大数据平台的业务场景中,处理实时kafka数据流数据,以成为必要的能力;此篇将尝试通过Apache NiFi来接入Kafka数据然后处理后存储之HBase Ⅰ).配置ConsumeKafka_0_10 测试使用了kafka0.
Apache Doris接入Kafka实时流数据 Apache Doris是由百度的Palo项目开源而来,整体架构分为两层:多个 FE 组成第一层,提供 FE 的横向扩展和高可用;多个 BE 组成第二层,负责数据存储于管理。 FE 节点分为 follower 和 observer 两类。