zl程序教程

您现在的位置是:首页 >  Java

当前栏目

kafka集群搭建及Java客户端使用

2023-03-20 14:58:23 时间

kafka集群搭建及Java客户端使用

kafka简介

Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、Storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。 优势:

  • 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒;
  • 可扩展性:kafka集群支持热扩展;
  • 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失;
  • 容错性:允许集群中节点故障(若副本数量为n,则允许n-1个节点故障);
  • 高并发:支持数千个客户端同时读写。

术语

  1. Record(消息):Kafka处理的主要对象。
  2. Topic(主题):主题是承载消息的逻辑容器,在实际使用中多用来区分具体的业务。Kafka中的Topics总是多订阅者模式,一个topic可以拥有一个或者多个消费者来订阅它的数据。
  3. Partition(分区):一个有序不变的消息序列。每个Topic下可以有多个分区。
  4. Offset(消息位移):表示分区中每条消息的位置信息,是一个单调递增且不变的值。
  5. Replica(副本):Kafka中同一条消息能够被拷贝到多个地方以提供数据冗余,这些地方就是所谓的副本。副本还分为领导者(leader)副本和追随者(follower)副本,各自有不同的角色划分。副本是在分区层级下的,即每个分区可配置多个副本实现高可用。
  6. Broker(代理):Kafka以集群的方式运行,集群中的每一台服务器称之为一个代理(broker)Producer(生产者):消息生产者,向Broker发送消息的客户端。
  7. Consumer(消费者):消息消费者,从Broker读取消息的客户端。
  8. ConsumerOffset(消费者位移):表征消费者消费进度,每个消费者都有自己的消费者位移
  9. ConsumerGroup(消费者组):每个Consumer属于一个特定的ConsumerGroup,一条消息可以被多个不同的ConsumerGroup消费,但是一个ConsumerGroup中只能有一个Consumer能够消费该消息。每一个Topic,下面可以有多个分区(Partition)日志文件。Partition是一个有序的message序列,这些message按顺序添加到一个叫做commitlog的文件中。每个partition中的消息都有一个唯一的编号,称之为offset,用来唯一标示某个分区中的message。每个consumer是基于自己在commitlog中的消费进度(offset)来进行工作的。在kafka中,消费offset由consumer自己来维护;一般情况下我们按照顺序逐条消费commitlog中的消息,当然我可以通过指定offset来重复消费某些消息,或者跳过某些消息。

应用场景

  1. 日志收集:用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer;
  2. 消息系统:解耦生产者和消费者、缓存消息等;
  3. 用户活动跟踪:kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后消费者通过订阅这些topic来做实时的监控分析,亦可保存到数据库;
  4. 运营指标:kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告;
  5. 流式处理:比如sparkstreaming和storm

kafka使用与集群搭建

环境准备 Kafka是用Scala语言开发的,运行在JVM上,在安装Kafka之前需要先安装JDK kafka依赖zookeeper,需要先安装zookeeper,下载地址

wgethttp://archive.apache.org/dist/zookeeper/zookeeper-3.4.9/zookeeper-
3.4.9.tar.gz
tar-zxvfzookeeper-3.4.9.tar.gz
cdzookeeper-3.4.9
cpconf/zoo_sample.cfgconf/zoo.cfg
#启动zookeeper服务
bin/zkServer.shstart
#启动客户端
bin/zkCli.sh

下载kafka安装包 参考官网安装步骤

wgethttps://archive.apache.org/dist/kafka/1.1.1/kafka_2.11-1.1.1.tgz
tar-zxvf kafka_2.11-1.1.1.tgz
cd kafka_2.11-1.1.1.tgz

启动kafka服务

bin/kafka-server-start.sh config/server.properties

server.properties是kafka核心配置文件(官网)

Property

Default

Description

broker.id

0

每个broker都可以用一个唯一的非负整数id进行标识;这个id可以作为broker的“名字”,你可以选择任意你喜欢的数字作为id,只要id是唯一的即可

log.dirs

/tmp/kafka-logs

kafka存放数据的路径。这个路径并不是唯一的,可以是多个,路径之间只需要使用逗号分隔即可;每当创建新partition时,都会选择在包含最少partitions的路径下进行

listeners

9092

server接受客户端连接的端口

zookeeper.connect

localhost:2181

zooKeeper连接字符串的格式为hostname:port,此处hostname和port分别ZooKeeper集群中某个节点的host和port;zookeeper如果是集群,连接方式为hostname1:port1, hostname2:port2,hostname3:port3

log.retention.hours

168

每个日志文件删除之前保存的时间。默认数据保存时间对所有topic都一样

min.insync.replicas

1

当producer设置acks为-1时,min.insync.replicas指定replicas的最小数目(必须确认每一个repica的写数据都是成功的),如果这个数目没有达到,producer发送消息会产生异常

delete.topic.enable

false

是否运行删除主题

创建主题

#创建分区数是1,副本数是1的主题为test的topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
#查看topic列表
bin/kafka-topics.sh --list --zookeeper localhost:2181
#查看命令帮助
bin/kafka-topics.sh

启动Producer发送消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

启动Consumer消费消息

# --group 指定消费组 --from-beginning 从头开始消费 
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test -- group consumer1 --from-beginning
#	查看消费组
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
  • 单播消息:kafka中,在同一个消费组里,一条消息只能被某一个消费者消费
  • 多播消息:针对Kafka同一条消息只能被同一个消费组下的某一个消费者消费的特性,要实现多播只要保证这些消费者属于不同的消费组

集群配置 配置3个broker

>	cp config/server.properties config/server-1.properties
>	cp config/server.properties config/server-2.properties
config/server-1.properties:
	broker.id=1
	listeners=PLAINTEXT://:9093
	log.dir=/tmp/kafka-logs-1
config/server-2.properties:
	broker.id=2
	listeners=PLAINTEXT://:9094
	log.dir=/tmp/kafka-logs-2
#启动broker
bin/kafka-server-start.sh -daemon config/server-1.properties bin/kafka-server-start.sh -daemon config/server-2.properties
#创建一个 副本为3,分区为3的topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 - -partitions 3 --topic my-replicated-topic
#	查看topic的情况
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
  • leader节点负责给定partition的所有读写请求。
  • replicas表示某个partition在哪几个broker上存在备份。不管这个几点是不是”leader“,甚至这个节点挂了,也会列出。
  • isr是replicas的一个子集,它只列出当前还存活着的,并且已同步备份了该partition的节点。

Java中kafka‐clients应用 Java中使用kafka,引入maven依赖

>
  >org.apache.kafka>
  >kafka-clients>
  >1.1.1>
>

具体Java客户端学习源码地址:项目源码