zl程序教程

您现在的位置是:首页 >  大数据

当前栏目

消息中间件 Kafka

Kafka 消息中间件
2023-06-13 09:15:29 时间

1. 简介

消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。适用于需要可靠的数据传送的分布式环境。

2. 常用消息中间件对比

特性

ActiveMQ

RabbitMQ

RocketMQ

Kafka

开发语言

java

erlang

java

scala

单机吞吐量

万级

万级

10万级

10万级

时效性

ms

us

ms

ms级以内

可用性

高(主从)

高(主从)

非常高(分布式)

非常高(分布式)

功能特性

成熟的产品、较全的文档、各种协议支持好

并发能力强、性能好、延迟低

MQ功能比较完善,扩展性佳

只支持主要的MQ功能,主要应用于大数据领域

3. 选择建议

消息中间件

建议

Kafka

追求高吞吐量,适合产生大量数据的互联网服务的数据收集业务

RocketMQ

可靠性要求很高的金融互联网领域,稳定性高,经历了多次阿里双11考验

RabbitMQ

性能较好,社区活跃度高,数据量没有那么大,优先选择功能比较完备的RabbitMQ

4. Kafka

Kafka 是一个分布式流媒体平台,类似于消息队列或企业消息传递系统。kafka 官网为:http://kafka.apache.org/

名词解释 -- producer:发布消息的对象称之为主题生产者(Kafka topic producer) -- topic:Kafka 将消息分门别类,每一类的消息称之为一个主题(Topic) -- consumer:订阅消息并处理发布的消息的对象称之为主题消费者(consumers) -- broker:已发布的消息保存在一组服务器中,称之为 Kafka 集群。集群中的每一个服务器都是一个代理(Broker)。消费者可以订阅一个或多个主题(topic),并从Broker 拉数据,从而消费这些已发布的消息

5. Kafka 入门案例

导入 Kafka 客户端依赖

<dependency>
       <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
 </dependency>

编写消息生产者类 ProducerQuickstart -- 设置kafka的配置信息

properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"HOST:PORT");
 //消息 key 的序列化器
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
 //消息 value 的序列化器
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

-- 创建生产者对象

KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);

-- 发送消息

ProducerRecord<String,String> record = new ProducerRecord<String,String>(“topic","key",“value");
producer.send(record);

-- 关闭消息通道

producer.close();

创建 ConsumerQuickStart 消费者类 -- 设置kafka的配置信息

//连接信息
properties.put(ConsumerConfig.*BOOTSTRAP_SERVERS_CONFIG*,"HOST:PORT");
//指定消费者组
properties.put(ConsumerConfig.*GROUP_ID_CONFIG*,"group2");
//反序列化的key和value
properties.put(ConsumerConfig.*KEY_DESERIALIZER_CLASS_CONFIG*,"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.*VALUE_DESERIALIZER_CLASS_CONFIG*,"org.apache.kafka.common.serialization.StringDeserializer");

-- 创建消费者对象

KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(properties);

-- 订阅主题

consumer.subscribe(Collections.*singletonList*("itcast-001"));

-- 获取消息

ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.*ofMillis*(1000));

6. Kafka 解析

两种类型 -- 生产者发送消息,多个消费者同时订阅一个主题,只有一个消费者能收到消息(一对一)

-- 生产者发送消息,多个消费者同时订阅一个主题,所有消费者都能收到消息(一对多)

分区机制 Kafka 中的分区机制指的是将每个主题划分成多个分区(Partition)可以处理更多的消息,不受单台服务器的限制,可以不受限的处理更多的数据

topic 解析 每一个分区都是一个顺序的、不可变的消息队列, 并且可以持续的添加。分区中的消息都被分了一个序列号,称之为偏移量(offset),在每个分区中此偏移量都是唯一的

分区策略

分区策略

说明

轮询策略

按顺序轮流将每条数据分配到每个分区中

随机策略

每次都随机地将消息分配到每个分区

按键保存策略

生产者发送数据的时候,可以指定一个key,计算这个key的hashCode值,按照hashCode的值对不同消息进行存储

7. Kafka高可用设计

集群 Kafka 的服务器端由被称为 Broker 的服务进程构成,即一个 Kafka 集群由多个 Broker 组成,这样如果集群中某一台机器宕机,其他机器上的 Broker 也依然能够对外提供服务。这其实就是 Kafka 提供高可用的手段之一

备份机制 Kafka 中消息的备份又叫做副本(Replica) Kafka 定义了两类副本: 领导者副本(Leader Replica) 追随者副本(Follower Replica)

-- 备份机制同步方式 ISR(in-sync replica)需要同步复制保存的follower 如果leader失效后,需要选出新的leader,选举的原则如下: 第一:选举时优先从ISR中选定,因为这个列表中follower的数据是与leader同步的 第二:如果ISR列表中的follower都不行了,就只能从其他follower中选取 极端情况,就是所有副本都失效了,这时有两种方案 第一:等待ISR中的一个活过来,选为Leader,数据可靠,但活过来的时间不确定 第二:选择第一个活过来的Replication,不一定是ISR中的,选为leader,以最快速度恢复可用性,但数据不一定完整

8. Kafka生产者

发送类型 -- 同步发送:使用send()方法发送,它会返回一个Future对象,调用get()方法进行等待,就可以知道消息是否发送成功

//发送消息
try {
   RecordMetadata recordMetadata = producer.send(record).get();
   System.out.println(recordMetadata.offset());*//**获取偏移量
}catch (Exception e){
   e.printStackTrace();
 }

-- 异步发送:调用send()方法,并指定一个回调函数,服务器在返回响应时调用函数

//发送消息
try {
   producer.send(record, new Callback() {
 @Override
 public void onCompletion(RecordMetadata recordMetadata, Exception e) {
   if(e!=null){
     e.printStackTrace();
   }
   System.out.println(recordMetadata.offset());
 }
});
}catch (Exception e){
 e.printStackTrace();
}

参数详解(ack)

确认机制

说明

acks=0

生产者在成功写入消息之前不会等待任何来自服务器的响应,消息有丢失的风险,但是速度最快

acks=1(默认值)

只要集群首领节点收到消息,生产者就会收到一个来自服务器的成功响应

acks=all

只有当所有参与赋值的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应

参数详解(retries) 生产者从服务器收到的错误有可能是临时性错误,在这种情况下,retries参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试返回错误,默认情况下,生产者会在每次重试之间等待 100ms

//设置重试次数
prop.put(ProducerConfig.RETRIES_CONFIG,10);

参数详解(消息压缩)

压缩算法

说明

snappy

占用较少的 CPU, 却能提供较好的性能和相当可观的压缩比, 如果看重性能和网络带宽,建议采用

lz4

占用较少的 CPU, 压缩和解压缩速度较快,压缩比也很客观

gzip

占用较多的 CPU,但会提供更高的压缩比,网络带宽有限,可以使用这种算法

使用压缩可以降低网络传输开销和存储开销,而这往往是向 Kafka 发送消息的瓶颈所在

    //消息压缩
    prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"gzip");

9. Kafka消费者

消费者组 消费者组(Consumer Group) :指的就是由一个或多个消费者组成的群体 一个发布在Topic上消息被分发给此消费者组中的一个消费者 所有的消费者都在一个组中,那么这就变成了 queue 模型 所有的消费者都在不同的组中,那么就完全变成了发布-订阅模型

消息有序性 应用场景: 即时消息中的单对单聊天和群聊,保证发送方消息发送顺序与接收方的顺序一致 充值转账两个渠道在同一个时间进行余额变更,短信通知必须要有顺序 ……

kafka 集群托管 4 个分区(P0-P3),2 个消费者组,消费组 A 有 2 个消费者,消费组 B 有 4 个 topic 分区中消息只能由消费者组中的唯一一个消费者处理,所以消息肯定是按照先后顺序进行处理的。但是它也仅仅是保证 Topic 的一个分区顺序处理,不能保证跨分区的消息先后处理顺序。 所以,如果你想要顺序的处理 Topic 的所有消息,那就只提供一个分区

提交和偏移量 kafka 不会像其他 JMS 队列那样需要得到消费者的确认,消费者可以使用 kafka 来追踪消息在分区的位置(偏移量)。消费者会往一个叫做 _consumer_offset 的特殊主题发送消息,消息里包含了每个分区的偏移量。如果消费者发生崩溃或有新的消费者加入群组,就会触发再均衡

偏移量 如果提交偏移量小于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理

如果提交的偏移量大于客户端的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失

偏移量提交方式 -- 自动提交 当 enable.auto.commit 被设置为 true,提交方式就是让消费者自动提交偏移量,每隔 5 秒消费者会自动把从 poll() 方法接收的最大偏移量提交上去 -- 手动提交 当enable.auto.commit被设置为false可以有以下三种提交方式 •提交当前偏移量(同步提交) •异步提交 •同步和异步组合提交

提交当前偏移量(同步提交)

while (true){
 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
 for (ConsumerRecord<String, String> record : records) {
   System.out.println(record.value());
   System.out.println(record.key());
   try {
     consumer.commitSync();//同步提交当前最新的偏移量
   }catch (CommitFailedException e){
     System.out.println("记录提交失败的异常:"+e);
   }
 }
 }

异步提交

while (true){
 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
 for (ConsumerRecord<String, String> record : records) {
   System.out.println(record.value());
   System.out.println(record.key());
 }
 consumer.commitAsync(new OffsetCommitCallback() {
   @Override
   public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
     if(e!=null){
       System.out.println("记录错误的提交偏移量:"+ map+",异常信息"+e);
     }
   }
 });
 }

同步异步组合提交

try {
 while (true){
   ConsumerRecords<String, String> records = consumer.poll(Duration.*ofMillis*(1000));
   for (ConsumerRecord<String, String> record : records) {
     System.out.println(record.value());
     System.out.println(record.key());
   }
   consumer.commitAsync();
 }
 }catch (Exception e){+
   e.printStackTrace();
 System.out.println("记录错误信息:"+e);
 }finally {
 try {
   consumer.commitSync();
 }finally {
   consumer.close();
 }
 }

10. SpringBoot集成Kafka收发消息

导入 Kafka 相关依赖

<!-- kafkfa --> 
<dependency>     
   <groupId>org.springframework.kafka</groupId>
​     <artifactId>spring-kafka</artifactId>
​     <exclusions>
​         <exclusion>
​             <groupId>org.apache.kafka</groupId>
​             <artifactId>kafka-clients</artifactId>
​         </exclusion>
​     </exclusions>
 </dependency>
 <dependency>
​     <groupId>org.apache.kafka</groupId>
​     <artifactId>kafka-clients</artifactId>
 </dependency>
 <dependency>
​     <groupId>com.alibaba</groupId>
​     <artifactId>fastjson</artifactId>
 </dependency>

resources 下创建 application.yml 配置文件

server:
port: 9991
 spring:
application:
 name: kafka-demo
kafka:
 bootstrap-servers: HOST:PORT
 producer:
  retries: 10
  key-serializer: org.apache.kafka.common.serialization.StringSerializer
  value-serializer: org.apache.kafka.common.serialization.StringSerializer
 consumer:
  group-id: test-hello-group
  key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

消息生产者

@Autowired
 private KafkaTemplate<String,String> kafkaTemplate;
 @GetMapping("/hello")
 public String hello(){
 //第一个参数:topics 
 //第二个参数:消息内容
 kafkaTemplate.send("kafka-hello","World");
 return "ok";
 }

消息消费者

@Component
 public class HelloListener {
 @KafkaListener(topics = {"kafka-hello"})
 public void onMessage(String message){
   if(!StringUtils.isEmpty(message)){
     System.out.println(message);
   }
 }
 }

传递消息作为对象 目前 springboot 整合后的 kafka,因为序列化器是 StringSerializer,这个时候如果需要传递对象可以有两种方式 方式一:可以自定义序列化器,对象类型众多,这种方式通用性不强(不推荐) 方式二:可以把要传递的对象进行转 json 字符串,接收消息后再转为对象(推荐) 以方式二为例:

发送消息

//发送消息
 User user = new User();
 user.setUsername("dragon");
 user.setAge(12);
 kafkaTemplate.send("kafka-hello", JSON.toJSONString(user));
 return "ok";

接收消息

User user = JSON.parseObject((String) value, User.class);