zl程序教程

您现在的位置是:首页 >  大数据

当前栏目

Kafka的定义和安装与配置

2023-06-13 09:13:30 时间

一、Kafka是什么?

定义:Kafka是一个基于zookeeper协调的分布式、多副本的(replica)、支持分区的(partition)系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、Storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写的项目。

二、Kafka的安装与配置

一、Docker安装kafka

\

Kafka是用Scala语言开发的,运行在JVM上,在安装Kafka之前需要先安装JDK。

\

yum install java-1.8.0-openjdk* -y

\

下载zookeeper镜像

\

docker pull wurstmeister/zookeeper

\

开放端口

\

firewall-cmd --add-port=2181/tcp --permanent
firewall-cmd --reload
firewall-cmd --query-port=2181/tcp
systemctl restart docker

\

启动镜像生成容器

\

docker run -dit --restart=always --log-driver json-file --log-opt max-size=100m --log-opt max-file=2  --name zookeeper \
-p 2181:2181 \
-v /etc/localtime:/etc/localtime \
-t wurstmeister/zookeeper

\

下载kafka镜像

\

docker pull wurstmeister/kafka

\

开放端口

\

firewall-cmd --add-port=9092/tcp --permanent
firewall-cmd --reload
firewall-cmd --query-port=9092/tcp
systemctl restart docker

\

启动kafka镜像生成容器

\

docker run -dit --restart=always --log-driver json-file --log-opt max-size=100m --log-opt max-file=2 --name kafka \
-p 9092:9092 \
-e KAFKA_BROKER_ID=0 \
-e KAFKA_ZOOKEEPER_CONNECT=172.21.17.47:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.21.17.47:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka

\

参数说明:

\

-e KAFKA_BROKER_ID=0  在kafka集群中,每个kafka都有一个BROKER_ID来区分自己 -e KAFKA_ZOOKEEPER_CONNECT=ip:2181/kafka 配置zookeeper管理kafka的路径ip:2181/kafka,ip地址改为内网ip -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://ip:9092  把kafka的地址端口注册给zookeeper,ip地址改成内网ip -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 配置kafka的监听端口 -v /etc/localtime:/etc/localtime 容器时间同步虚拟机的时间

\

验证kafka是否可以使用

\

进入容器

\

docker exec -it kafka bash

\

进入 /opt/kafka_2.13-2.7.1/bin/ 目录下

\

cd /opt/kafka_2.13-2.7.1/bin/

\

运行kafka生产者发送消息

\

./kafka-console-producer.sh --broker-list 106.14.132.94:9092 --topic sun

\

发送消息

\

{"datas[{"channel":"","metric":"temperature","producer":"ijinus","sn":"IJA0101-00002245","time":"1543207156000","value":"80"}],"ver":"1.0"}

\

运行kafka消费者接收消息

\

./kafka-console-consumer.sh --bootstrap-server 106.14.132.94:9092 --topic sun --from-beginning

\

停止zookeeper和kafka

\

docker stop zookeeper
docker rm zookeeper
docker stop kafka
docker rm kafka

\

二、安装包安装kafka

\

一、安装JDK


yum install java-1.8.0-openjdk* -y

\

二、安装Zookeeper


wget https://archive.apache.org/dist/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8-bin.tar.gz
tar -zxvf apache-zookeeper-3.5.8-bin.tar.gz
cd apache-zookeeper-3.5.8-bin/
cp conf/zoo_sample.cfg conf/zoo.cfg
cd /home/apache-zookeeper-3.5.8-bin/bin
./zkServer.sh start
./zkCli.sh
ls /

\

打印结果:

\

[zk: localhost:2181(CONNECTED) 5] ls /
[admin, brokers, cluster, config, consumers, controller, controller_epoch, isr_change_notification, kafka, latest_producer_id_block, log_dir_event_notification, zookeeper]
[zk: localhost:2181(CONNECTED) 6]

\

三、安装Kafka


wget https://archive.apache.org/dist/kafka/2.4.1/kafka_2.11-2.4.1.tgz
tar -xzf kafka_2.11-2.4.1.tgz
cd /home/kafka_2.11-2.4.1/config
vim config/server.properties

\

配置文件编辑

\

#broker.id属性在kafka集群中必须要是唯一
broker.id=0
#kafka部署的机器ip和提供服务的端口号
listeners=PLAINTEXT://内网ip:9092   
#kafka的消息存储文件
log.dir=/usr/local/data/kafka-logs
#kafka连接zookeeper的地址
zookeeper.connect=内网ip:2181

\

如果填写外网ip可能会遇上这种情况:

\

四、启动并验证kafka

\

启动kafka

\

/home/kafka_2.11-2.4.1/bin/kafka-server-start.sh config/server.properties &

\

进入zookeeper目录通过zookeeper客户端查看下zookeeper的目录树

\

/home/apache-zookeeper-3.5.8-bin/bin/zkCli.sh
ls /
ls /brokers/ids

\

校验kafka

\

创建主题

\

/home/kafka_2.11-2.4.1/bin/kafka-topics.sh --create --zookeeper 106.14.132.94:2181 --replication-factor 1 --partitions 1 --topic test

\

查看kafka中目前存在的topic

\

/home/kafka_2.11-2.4.1/bin/kafka-topics.sh --list --zookeeper 106.14.132.94:2181

\

发送消息

\

/home/kafka_2.11-2.4.1/bin/kafka-console-producer.sh --broker-list 106.14.132.94:9092 --topic test

\

this is a msg

\

消费消息

\

/home/kafka_2.11-2.4.1/bin/kafka-console-consumer.sh --bootstrap-server 106.14.132.94:9092--topic test

\

消费之前的消息

\

/home/kafka_2.11-2.4.1/bin/kafka-console-consumer.sh --bootstrap-server 106.14.132.94:9092 --from-beginning --topic test

\

通过jps命令查看运行的情况

\

对于kafka来说,一个单独的broker意味着kafka集群中只有一个节点。要想增加kafka集群中的节点数量,只需要多启动几个broker实例即可。为了有更好的理解,现在我们在一台机器上同时启动三个broker实例。

\

三、集群搭建与使用

一、集群配置

首先,我们需要建立好其他2个broker的配置文件:

\

cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties

\

配置文件的需要修改的内容分别如下: config/server-1.properties:

\

#broker.id属性在kafka集群中必须要是唯一
broker.id=1
#kafka部署的机器ip和提供服务的端口号
listeners=PLAINTEXT://106.14.132.94:9093   
log.dir=/usr/local/data/kafka-logs-1
#kafka连接zookeeper的地址,要把多个kafka实例组成集群,对应连接的zookeeper必须相同
zookeeper.connect=106.14.132.94:2181

\

config/server-2.properties:

\

broker.id=2
listeners=PLAINTEXT://106.14.132.94:9094
log.dir=/usr/local/data/kafka-logs-2
zookeeper.connect=106.14.132.94:2181

\

目前我们已经有一个zookeeper实例和一个broker实例在运行了,现在我们只需要在启动2个broker实例即可:

\

bin/kafka-server-start.sh -daemon config/server-1.properties
bin/kafka-server-start.sh -daemon config/server-2.properties

\

二、验证

查看zookeeper确认集群节点是否都注册成功:

\

ls /brokers/ids

\

现在我们创建一个新的topic,副本数设置为3,分区数设置为2:

\

bin/kafka-topics.sh --create --zookeeper 106.14.132.94:2181 --replication-factor 3 --partitions 2 --topic my-replicated-topic

\

查看下topic的情况

\

 bin/kafka-topics.sh --describe --zookeeper 106.14.132.94:2181 --topic my-replicated-topic

\

以下是输出内容的解释,第一行是所有分区的概要信息,之后的每一行表示每一个partition的信息。 leader节点负责给定partition的所有读写请求,同一个主题不同分区leader副本一般不一样(为了容灾) replicas 表示某个partition在哪几个broker上存在备份。不管这个几点是不是”leader“,甚至这个节点挂了,也会列出。 isr 是replicas的一个子集,它只列出当前还存活着的,并且已同步备份了该partition的节点。

\

现在我们向新建的 my-replicated-topic 中发送一些message,kafka集群可以加上所有kafka节点:

\

bin/kafka-console-producer.sh --broker-list 106.14.132.94:9092,106.14.132.94:9093,106.14.132.94:9094 --topic my-replicated-topic

\

my test msg 1 my test msg 2

\

现在开始消费:

\

bin/kafka-console-consumer.sh --bootstrap-server 106.14.132.94:9092,106.14.132.94:9093,106.14.132.94:9094 --from-beginning --topic my-replicated-topic

\

my test msg 1 my test msg 2

\

现在我们来测试我们容错性,因为broker1目前是my-replicated-topic的分区0的leader,所以我们要将其kill

\

ps -ef | grep server.properties
kill 14776

\

现在再执行命令:

\

bin/kafka-topics.sh --describe --zookeeper 106.14.132.94:9092 --topic my-replicated-topic

\

我们可以看到,分区0的leader节点已经变成了broker 0。要注意的是,在Isr中,已经没有了1号节点。leader的选举也是从ISR(in-sync replica)中进行的。 此时,我们依然可以 消费新消息:

\

bin/kafka-console-consumer.sh --bootstrap-server 106.14.132.94:9092,106.14.132.94:9093,106.14.132.94:9094 --from-beginning --topic my-replicated-topic

\

my test msg 1 my test msg 2

\

查看主题分区对应的leader信息:

\

get /brokers/topics/my-relicated-topic/partitions/0/state

\

kafka将很多集群关键信息记录在zookeeper里,保证自己的无状态,从而在水平扩容时非常方便。

三、集群消费

\

log的partitions分布在kafka集群中不同的broker上,每个broker可以请求备份其他broker上partition上的数据。kafka集群支持配置一个partition备份的数量。针对每个partition,都有一个broker起到“leader”的作用,0个或多个其他的broker作为“follwers”的作用。leader处理所有的针对这个partition的读写请求,而followers被动复制leader的结果,不提供读写(主要是为了保证多副本数据与消费的一致性)。如果这个leader失效了,其中的一个follower将会自动的变成新的leader。

\

Producers

生产者将消息发送到topic中去,同时负责选择将message发送到topic的哪一个partition中。通过round-robin做简单的负载均衡。也可以根据消息中的某一个关键字来进行区分。通常第二种方式使用的更多。

\

Consumers

传统的消息传递模式有2种:队列( queue) 和(publish-subscribe)

\

  1. queue模式:多个consumer从服务器中读取数据,消息只会到达一个consumer。
  2. publish-subscribe模式:消息会被广播给所有的consumer。

\

Kafka基于这2种模式提供了一种consumer的抽象概念:消费组(consumer group)。

\

  • queue模式:所有的consumer都位于同一个consumer group 下。
  • publish-subscribe模式:所有的consumer都有着自己唯一的consumer group。

\

\

上图说明:由2个broker组成的kafka集群,某个主题总共有4个partition(P0-P3),分别位于不同的broker上。这个集群由2个Consumer Group消费, A有2个consumer instances ,B有4个。通常一个topic会有几个consumer group,每个consumer group都是一个逻辑上的订阅者( logical subscriber )。每个consumer group由多个consumer instance组成,从而达到可扩展和容灾的功能。

四、Kafka可视化管理工具kafka-manager

\

安装及基本使用可参考:Java廖志伟