Kafka策略模式
2023-06-13 09:15:00 时间
公共kafka工具模块
针对于不同场景的消费消息
代码结构如下
- consumerListener
package com.adaspace.kafka.consumer;
import com.adaspace.kafka.handler.HandlerContext;
import com.adaspace.kafka.handler.MessageHandler;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* 公共kafka消费者监听器,使用策略模式来进行不同场景的消息处理
*
* @Author: Frost
* @Date: 2021/11/12 23:23
*/
@Slf4j
@Component
public class ConsumerListener {
@Resource
private HandlerContext handlerContext;
@KafkaListener(topics = "#{'${kafka.listener.topics}'.split(',')}", groupId = "${kafka.listener.group-id}")
public void listen(ConsumerRecord record) {
log.info("监听kafka消息,topic={},partition={},offset={}", record.topic(), record.partition(), record.offset());
String topic = record.topic();
try {
MessageHandler handler = handlerContext.getHandler(topic);
String message = String.valueOf(record.value());
handler.handle(message);
} catch (Exception e) {
log.error("该topic消息策略不存在");
}
}
}
- handler
package com.adaspace.kafka.handler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @Author: Frost
* @Date: 2021/11/12 23:24
*/
@Component
public class HandlerContext {
@Autowired
public final Map<string, messagehandler=""> map = new ConcurrentHashMap<>();
/**
* 放入对应的策略
*
* @param map {key: topicName value:MessageHandler}
*/
public HandlerContext(Map<string, messagehandler=""> map) {
map.forEach(this.map::put);
}
/**
* 不同的topic进行MessageHandler的策略获取,通过公共kafka 监听器来触发不同的handler
*
* @param handler
* @return
*/
public MessageHandler getHandler(String handler) {
MessageHandler messageHandler = map.get(handler);
if (messageHandler == null) {
throw new RuntimeException();
}
return messageHandler;
}
}
</string,></string,>
- MessageHandler接口
package com.adaspace.kafka.handler;
import org.apache.kafka.clients.consumer.ConsumerRecord;
/**
* @Author: Frost
* @Date: 2021/11/12 23:18
*/
public interface MessageHandler {
/**
* 处理器
* @param message 消费消息体
*/
void handle(String message);
}
- 示例handler 代码
通过Component 将topic 名注入spring,不同topic 进行不同策略实现
package com.adaspace.mp.order.handler;
import com.adaspace.kafka.handler.MessageHandler;
import com.adaspace.mp.order.domain.*;
import com.adaspace.mp.order.dto.event.AIKafkaRespMessage;
import com.adaspace.mp.order.dto.event.KafkaMessageTypeEnum;
import com.adaspace.mp.order.gatewayimpl.impl.OrderGatewayImpl;
import com.adaspace.mp.unispace.api.UnispaceFeignClient;
import com.adaspace.mp.unispace.dto.ProcessResult;
import com.adaspace.mp.unispace.dto.UnispaceLoadDataDto;
import com.alibaba.cola.dto.SingleResponse;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import javax.annotation.Resource;
import java.util.WeakHashMap;
/**
* @Author: Frost
* @Date: 2021/11/12 23:28
*/
@Component("kafkaTest")
@Slf4j
public class AiHandler implements MessageHandler {
/**
* {k:分区,v:offset} (待用)
*/
WeakHashMap<topicpartition, long=""> offsetMap = new WeakHashMap<>();
@Resource
private OrderGatewayImpl orderGatewayImpl;
@Override
public void handle(String message) {
JSON parse = (JSON) JSON.parse(message);
AIKafkaRespMessage aiKafkaRespMessage = JSON.toJavaObject(parse, AIKafkaRespMessage.class);
KafkaMessageTypeEnum kafkaMessageType = aiKafkaRespMessage.getKafkaMessageType();
switch (kafkaMessageType) {
case UNTREATED_TASK:
case AI_END_TASK:
orderGatewayImpl.updateOrderItemData(
aiKafkaRespMessage.getId(),
aiKafkaRespMessage.getStatus(),
aiKafkaRespMessage.getTargetTIFF()
);
Order order = orderGatewayImpl
.getOrder(aiKafkaRespMessage.getOrderId(), AlgorithmReqExtension.class, new OrderItem2Algorithm());
OrderItemData orderItemData =
order.getOrderItem().getOrderItemData()
.stream()
.filter(x -> x.getId()
.equals(aiKafkaRespMessage.getId())).findAny().orElse(null);
Assert.notNull(orderItemData, "接收orderItemData为null");
orderItemData.setStatus(aiKafkaRespMessage.getStatus());
orderItemData.setIsReady(aiKafkaRespMessage.getStatus() == 0);
orderGatewayImpl.fireEvent(order, OrderEventType.SHIPPED, aiKafkaRespMessage.getTargetTIFF());
break;
default:
log.info("消息消费未找到对应处理case");
}
}
}
相关文章
- Kafuka面试(整合Kafka两种模式区别)
- Kafka入门实战教程(8):常用的shell工具脚本
- 下载Kafka安装包
- redis一主一从哨兵模式_kafka主从复制
- kafka学习之Kafka 的简介(一)
- Kafka源码解析_kafka删除消费组命令
- Kafka如何删除topic中的部分数据_kafka修改topic副本数
- kafka删除topic数据_kafka查看topic列表
- 如何用Know Streaming 快速对Kafka Topic 完成扩缩副本操作
- Kafka扩分区和分区副本重分配之后消费组会自动均衡吗?
- 面试系列-kafka偏移量提交
- 字节面试官狂问我:kafka 是什么?有什么作用?
- Kafka常见面试题
- 快速理解Kafka分布式消息队列框架详解架构师
- kafka之三 Kafka 高可用详解大数据
- kafka源码解析之十二KafkaController(下篇)详解编程语言
- kafka源码解析之六SocketServer详解编程语言
- kafka 、 zookeeper 集群(一)
- 的数据同步从MySQL到Kafka:实现实时数据同步(mysql到kafka)
- Linux安装Kafka:一步一步操作指南(linux安装kafka)
- Oracle 数据流轻松集成 Kafka 服务:提高数据传输效率(oracle到kafka)
- 比较Redis vs Kafka(redis还是kafka)