zl程序教程

您现在的位置是:首页 >  大数据

当前栏目

kafka单节点的安装,部署,使用

2023-09-14 09:08:20 时间

1、kafka官网:http://kafka.apache.org/downloads

jdk:https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html

参考书籍:Apache kafka实战

下载安装如下所示:

我下载的是kafka_2.11-2.1.0

然后点击如下进行下载即可:

 下载jdk,根据自己的需求下载所需的jdk版本,64位还是32位的。我下载的32位的jdk-8u191-linux-i586.tar.gz。

 然后将下载好的jdk-8u191-linux-i586.tar.gz和kafka_2.11-2.1.0.tgz传输到自己的机器上面,找个特定的目录,这样方便自己进行管理。

2、然后将jdk-8u191-linux-i586.tar.gz和kafka_2.11-2.1.0.tgz。解压缩到特定的目录里面,方便管理。

配置jdk的环境变量:

3、启动服务器,配置好jdk和kafka就可以启动了,启动之前要先启动zookeeper服务器,zookeeper是为kafka提高协调服务的工具。kafka已经内置了一个zookeeper服务器以及一些启动脚本。

 

启动好zookeeper以后,然后启动kafka服务器,可以配置一下kafka的config/server.properties里面的配置,如下所示:

 1 server.properties配置中需要关注以下几个参数:
 2 
 3 # The id of the broker. This must be set to a unique integer for each broker.
 4 # 表示broker的编号,如果集群中有多个broker,则每个broker的编号需要设置的不同
 5 broker.id=0
 6 
 7 # listeners=PLAINTEXT://0.0.0.0:9092
 8 # broker对外提供的服务入口地址 
 9 listeners=PLAINTEXT://192.168.110.142:9092
10 
11 # 设置存放消息日志的地址
12 # A comma separated list of directories under which to store log files
13 log.dirs=/tmp/kafka-logs
14 
15 # zookeeper.connect=localhost:2181
16 # kafka所需的zookeeper的集群地址
17 zookeeper.connect=192.168.110.142:2181


 [KafkaServer id=0] started (kafka.server.KafkaServer)说明kafka服务器启动成功,默认的服务端口是9092。

也可以后台启动kafka,这样可以方便操作,如下所示:

如果要在后台运行Kafka服务,那么可以在启动命令中加入-daemon参数或&字符,示例如下:

可以通过jps命令查看Kafka服务进程是否已经启动,jps命令只是用来确认Kafka服务的进程已经正常启动,示例如下: 

 

4、创建topic,服务器启动后,我们需要创建一个主题(topic)用于消息的发送和接受。这一步将创建一个名称为test的topic,该topic只有一个分区(partition),且该partition也只有一个副本(replica)处理消息。

注意:为了要创建topic,要保证刚才启动的zookeeper和kafka的终端不被关闭。打开一个新的终端。

然后topic创建成功了,使用命令查看该topic的状态,还可以通过--describe展示主题的更多具体信息。

其中--zookeeper指定了Kafka所连接的ZooKeeper服务地址,--topic指定了所要创建主题的名称,--replication-factor 指定了副本因子,--partitions 指定了分区个数,--create是创建主题的动作指令。

查看所有主题,查看主题详情,命令如下所示:

 

 

 5、发送消息,即生产消息。kafka默认提供了脚本工具可以不断的接受标准输入并将他们发送到kafka的某个topic上面,用户在控制台终端下启动该命令,输入一行文本数据,然后该脚本将改行文本封装成一条kafka消息发送给指定的topic。打开新的终端,执行命令。

--bootstrap-server指定了连接kafka集群的地址,--topic指定了生产者发送消息的主题。

 6、消费消息,消费者,kafka提供了一对应的脚本用于消费某些topic下的消息并打印到标准输出。打开新的终端。执行如下命令。--bootstrap-server指定了连接kafka集群的地址,--topic指定了消费端订阅的主题。

 


2020-12-02 16:15:45

1、搭建Kafka运行环境还需要涉及ZooKeeper,Kafka和ZooKeeper都是运行在JVM之上的服务,所以还需要安装JDK,Kafka从2.0.0版本开始就不再支持JDK7及以下版本。

  安装JDK的第一步就是下载JDK 1.8的安装包,这里略过。ZooKeeper是安装Kafka集群的必要组件,Kafka通过ZooKeeper来实施对元数据信息的管理,包括集群、broker、主题、分区等内容。

  除了可以使用kafka自带的zookeeper,还可以自己安装zookeeper,这里使用自己安装的zookeeper,解压缩完毕也可以将环境变量配置到/etc/profile里面的,然后使用source刷新一下使其生效,修改 ZooKeeper 的配置文件。首先进入zookeeper/conf 目录,并将zoo_sample.cfg文件修改为zoo.cfg,如下所示:

然后修改zoo.cfg配置文件,zoo.cfg文件的内容参考如下:

默认情况下,Linux系统中没有/tmp/zookeeper/data和/tmp/zookeeper/log这两个目录,所以接下来还要创建这两个目录:

1 [root@localhost conf]# mkdir /tmp/zookeeper/data
2 [root@localhost conf]# mkdir /tmp/zookeeper/log
3 [root@localhost conf]# 

然后,在/tmp/zookeeper/data目录下创建一个myid文件,并写入一个数值,比如0。myid文件里存放的是服务器的编号,如下所示:

启动Zookeeper服务,然后可以通过zkServer.sh status命令查看Zookeeper服务状态,详情如下: 

可以关闭Zookeeper服务,如下所示:

启动kafka,由于我之前已经使用过kafka和kafka自带的zookeeper,所以再和单独的zookeeper进行集成的时候,报下面的错误,如下所示:

 1 [root@localhost kafka_2.12-2.6.0]# bin/kafka-server-start.sh config/server.properties 
 2 [2020-12-02 18:25:43,495] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
 3 [2020-12-02 18:25:44,471] INFO Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation (org.apache.zookeeper.common.X509Util)
 4 [2020-12-02 18:25:44,613] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)
 5 [2020-12-02 18:25:44,636] INFO starting (kafka.server.KafkaServer)
 6 [2020-12-02 18:25:44,637] INFO Connecting to zookeeper on 192.168.110.142:2181 (kafka.server.KafkaServer)
 7 [2020-12-02 18:25:44,690] INFO [ZooKeeperClient Kafka server] Initializing a new session to 192.168.110.142:2181. (kafka.zookeeper.ZooKeeperClient)
 8 [2020-12-02 18:25:44,716] INFO Client environment:zookeeper.version=3.5.8-f439ca583e70862c3068a1f2a7d4d068eec33315, built on 05/04/2020 15:53 GMT (org.apache.zookeeper.ZooKeeper)
 9 [2020-12-02 18:25:44,716] INFO Client environment:host.name=localhost (org.apache.zookeeper.ZooKeeper)
10 [2020-12-02 18:25:44,716] INFO Client environment:java.version=1.8.0_191 (org.apache.zookeeper.ZooKeeper)
11 [2020-12-02 18:25:44,717] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)
12 [2020-12-02 18:25:44,717] INFO Client environment:java.home=/home/hadoop/soft/jdk1.8.0_191/jre (org.apache.zookeeper.ZooKeeper)
13 [2020-12-02 18:25:44,717] INFO Client environment:java.class.path=/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/activation-1.1.1.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/aopalliance-repackaged-2.5.0.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/argparse4j-0.7.0.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/audience-annotations-0.5.0.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/commons-cli-1.4.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/commons-lang3-3.8.1.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/connect-api-2.6.0.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/connect-basic-auth-extension-2.6.0.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/connect-file-2.6.0.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/connect-json-2.6.0.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/connect-mirror-2.6.0.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/connect-mirror-client-2.6.0.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/connect-runtime-2.6.0.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/connect-transforms-2.6.0.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/hk2-api-2.5.0.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/hk2-locator-2.5.0.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/hk2-utils-2.5.0.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/jackson-annotations-2.10.2.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/jackson-core-2.10.2.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/jackson-databind-2.10.2.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/jackson-dataformat-csv-2.10.2.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/jackson-datatype-jdk8-2.10.2.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/jackson-jaxrs-base-2.10.2.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/jackson-jaxrs-json-provider-2.10.2.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/jackson-module-jaxb-annotations-2.10.2.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/jackson-module-paranamer-2.10.2.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/jackson-module-scala_2.12-2.10.2.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/jakarta.activation-api-1.2.1.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/jakarta.annotation-api-1.3.4.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/jakarta.inject-2.5.0.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/jakarta.ws.rs-api-2.1.5.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/javassist-3.22.0-CR2.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/javassist-3.26.0-GA.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/javax.servlet-api-3.1.0.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/javax.ws.rs-api-2.1.1.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/jaxb-api-2.3.0.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/jersey-client-2.28.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/jersey-common-2.28.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/jersey-container-servlet-2.28.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/jersey-container-servlet-core-2.28.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/jersey-hk2-2.28.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/jersey-media-jaxb-2.28.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/jersey-server-2.28.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/jetty-client-9.4.24.v20191120.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/jetty-continuation-9.4.24.v20191120.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/jetty-http-9.4.24.v20191120.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/jetty-io-9.4.24.v20191120.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/jetty-security-9.4.24.v20191120.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/jetty-server-9.4.24.v20191120.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/jetty-servlet-9.4.24.v20191120.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/jetty-servlets-9.4.24.v20191120.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/jetty-util-9.4.24.v20191120.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/jopt-simple-5.0.4.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/kafka_2.12-2.6.0.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/kafka_2.12-2.6.0-sources.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/kafka-clients-2.6.0.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/kafka-log4j-appender-2.6.0.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/kafka-streams-2.6.0.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/kafka-streams-examples-2.6.0.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/kafka-streams-scala_2.12-2.6.0.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/kafka-streams-test-utils-2.6.0.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/kafka-tools-2.6.0.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/log4j-1.2.17.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/lz4-java-1.7.1.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/maven-artifact-3.6.3.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/metrics-core-2.2.0.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/netty-buffer-4.1.50.Final.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/netty-codec-4.1.50.Final.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/netty-common-4.1.50.Final.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/netty-handler-4.1.50.Final.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/netty-resolver-4.1.50.Final.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/netty-transport-4.1.50.Final.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/netty-transport-native-epoll-4.1.50.Final.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/netty-transport-native-unix-common-4.1.50.Final.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/osgi-resource-locator-1.0.1.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/paranamer-2.8.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/plexus-utils-3.2.1.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/reflections-0.9.12.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/rocksdbjni-5.18.4.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/scala-collection-compat_2.12-2.1.6.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/scala-java8-compat_2.12-0.9.1.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/scala-library-2.12.11.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/scala-logging_2.12-3.9.2.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/scala-reflect-2.12.11.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/slf4j-api-1.7.30.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/slf4j-log4j12-1.7.30.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/snappy-java-1.1.7.3.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/validation-api-2.0.1.Final.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/zookeeper-3.5.8.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/zookeeper-jute-3.5.8.jar:/home/hadoop/soft/kafka_2.12-2.6.0/bin/../libs/zstd-jni-1.4.4-7.jar (org.apache.zookeeper.ZooKeeper)
14 [2020-12-02 18:25:44,718] INFO Client environment:java.library.path=/usr/java/packages/lib/i386:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper)
15 [2020-12-02 18:25:44,718] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)
16 [2020-12-02 18:25:44,718] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)
17 [2020-12-02 18:25:44,719] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)
18 [2020-12-02 18:25:44,719] INFO Client environment:os.arch=i386 (org.apache.zookeeper.ZooKeeper)
19 [2020-12-02 18:25:44,719] INFO Client environment:os.version=2.6.32-358.el6.i686 (org.apache.zookeeper.ZooKeeper)
20 [2020-12-02 18:25:44,720] INFO Client environment:user.name=root (org.apache.zookeeper.ZooKeeper)
21 [2020-12-02 18:25:44,720] INFO Client environment:user.home=/root (org.apache.zookeeper.ZooKeeper)
22 [2020-12-02 18:25:44,720] INFO Client environment:user.dir=/home/hadoop/soft/kafka_2.12-2.6.0 (org.apache.zookeeper.ZooKeeper)
23 [2020-12-02 18:25:44,720] INFO Client environment:os.memory.free=979MB (org.apache.zookeeper.ZooKeeper)
24 [2020-12-02 18:25:44,720] INFO Client environment:os.memory.max=1024MB (org.apache.zookeeper.ZooKeeper)
25 [2020-12-02 18:25:44,720] INFO Client environment:os.memory.total=1024MB (org.apache.zookeeper.ZooKeeper)
26 [2020-12-02 18:25:44,725] INFO Initiating client connection, connectString=192.168.110.142:2181 sessionTimeout=18000 watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@4ad17b (org.apache.zookeeper.ZooKeeper)
27 [2020-12-02 18:25:44,736] INFO jute.maxbuffer value is 4194304 Bytes (org.apache.zookeeper.ClientCnxnSocket)
28 [2020-12-02 18:25:44,757] INFO zookeeper.request.timeout value is 0. feature enabled= (org.apache.zookeeper.ClientCnxn)
29 [2020-12-02 18:25:44,775] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)
30 [2020-12-02 18:25:44,911] INFO Opening socket connection to server 192.168.110.142/192.168.110.142:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
31 [2020-12-02 18:25:44,919] INFO Socket connection established, initiating session, client: /192.168.110.142:36977, server: 192.168.110.142/192.168.110.142:2181 (org.apache.zookeeper.ClientCnxn)
32 [2020-12-02 18:25:44,934] INFO Session establishment complete on server 192.168.110.142/192.168.110.142:2181, sessionid = 0x1762665e5960003, negotiated timeout = 18000 (org.apache.zookeeper.ClientCnxn)
33 [2020-12-02 18:25:44,944] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient)
34 [2020-12-02 18:25:45,415] INFO Cluster ID = 312rhsudTGaFvK2fdbCjPQ (kafka.server.KafkaServer)
35 [2020-12-02 18:25:45,461] ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
36 kafka.common.InconsistentClusterIdException: The Cluster ID 312rhsudTGaFvK2fdbCjPQ doesn't match stored clusterId Some(d1w4HeUaTIykN_WZQBiSGQ) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.
37     at kafka.server.KafkaServer.startup(KafkaServer.scala:235)
38     at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
39     at kafka.Kafka$.main(Kafka.scala:82)
40     at kafka.Kafka.main(Kafka.scala)
41 [2020-12-02 18:25:45,473] INFO shutting down (kafka.server.KafkaServer)
42 [2020-12-02 18:25:45,482] INFO [ZooKeeperClient Kafka server] Closing. (kafka.zookeeper.ZooKeeperClient)
43 [2020-12-02 18:25:45,592] INFO Session: 0x1762665e5960003 closed (org.apache.zookeeper.ZooKeeper)
44 [2020-12-02 18:25:45,594] INFO EventThread shut down for session: 0x1762665e5960003 (org.apache.zookeeper.ClientCnxn)
45 [2020-12-02 18:25:45,596] INFO [ZooKeeperClient Kafka server] Closed. (kafka.zookeeper.ZooKeeperClient)
46 [2020-12-02 18:25:45,610] INFO shut down completed (kafka.server.KafkaServer)
47 [2020-12-02 18:25:45,621] ERROR Exiting Kafka. (kafka.server.KafkaServerStartable)
48 [2020-12-02 18:25:45,631] INFO shutting down (kafka.server.KafkaServer)

找到server.properties配置文件,log.dirs=/tmp/kafka-logs,将此目录下面的所有信息删除掉,重启kafka即可,不过此操作之前的信息包含offset都全部删除了,谨慎操作。