java 多线程消费kfk队列消息案例
2023-09-27 14:25:58 时间
package org.training.hadoop.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class KafkaConsumerExample
{
//config public static Properties getConfig() Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "testGroup"); props.put("enable.auto.commit", "true"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); return props; public void consumeMessage() // launch 3 threads to consume int numConsumers = 3; final String topic = "test1"; final ExecutorService executor = Executors.newFixedThreadPool(numConsumers); //创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待 final List KafkaConsumerRunner consumers = new ArrayList KafkaConsumerRunner for (int i = 0; i numConsumers; i++) { KafkaConsumerRunner consumer = new KafkaConsumerRunner(topic); consumers.add(consumer); executor.submit(consumer); //关闭线程并清理--------------------------- //当jvm关闭的时候,会执行系统中已经设置的所有通过方法addShutdownHook添加的钩子,清理 Runtime.getRuntime().addShutdownHook(new Thread() @Override public void run() for (KafkaConsumerRunner consumer : consumers) { consumer.shutdown(); //关闭线程 executor.shutdown(); try { //当前线程阻塞,直到 //等所有已提交的任务(包括正在跑的和队列中等待的)执行完或者等超时时间到 //或者线程被中断,抛出InterruptedExcepti executor.awaitTermination(5000, TimeUnit.MILLISECONDS); catch (InterruptedException e) { e.printStackTrace(); // Thread to consume kafka data public static class KafkaConsumerRunner implements Runnable private final AtomicBoolean closed = new AtomicBoolean(false); private final KafkaConsumer String, String consumer; private final String topic; public KafkaConsumerRunner(String topic) Properties props = getConfig(); consumer = new KafkaConsumer String, String (props); this.topic = topic; public void handleRecord(ConsumerRecord record) System.out.println("name: " + Thread.currentThread().getName() + " ; topic: " + record.topic() + " ; offset" + record.offset() + " ; key: " + record.key() + " ; value: " + record.value()); public void run() try { // subscribe consumer.subscribe(Arrays.asList(topic)); while (!closed.get()) { //read data ConsumerRecords String, String records = consumer.poll(10000); //poll方法消费数据,心跳机制通知broker是否正常 // Handle new records for (ConsumerRecord String, String record : records) { handleRecord(record); //打印消费数据 catch (WakeupException e) { // Ignore exception if closing if (!closed.get()) { throw e; finally { consumer.close(); // Shutdown hook which can be called from a separate thread public void shutdown() closed.set(true); consumer.wakeup(); public static void main(String[] args) KafkaConsumerExample example = new KafkaConsumerExample(); example.consumeMessage(); }
}
Java报告推送失败补偿机制;钉钉群通知消息核心代码 Java报告推送失败补偿机制,超过次数后使用钉钉通知开发 自动补偿实现: 要求方法调用的过程中,失败的时候,系统有办法进行自动重试,重试达到一定次数后,钉钉通知开发。 实现设计:注解,反射,定时任务
重磅消息!弃用 Java 8、Apache Kafka 3.0 发布! 什么是Kafka? Apache Kafka是分布式发布订阅消息传递系统和强大的队列,可以处理大量数据,并使您能够将消息从一个端点传递到另一个终端。Kafka适用于离线和在线消息消费。Kafka消息被保留在磁盘上,并在集群内复制以防止数据丢失。Kafka建立在ZooKeeper同步服务之上。它与Apache Storm和Spark完美结合,实时流式传输数据分析。 作者:zhulin1028 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
相关文章
- 3.Java 加解密技术系列之 SHA
- Java 集合框架
- Java中的强制类型转换
- Java字符串String 集合的迭代器
- 18 案例:开发JAVA采集程序
- ES5.6.12客户端连接报错Caused by: java.lang.ClassNotFoundException: org.elasticsearch.http.AbstractHttpServe
- java 常见几种发送http请求案例
- 第40节:Java中的IO知识案例
- JVM(Java虚拟机)优化大全和案例实战
- Java程序员惯性思维的一个错误
- java中的构造块、静态块等说明
- Java中发送http的get、post请求
- 详细分析Java中断机制[转]
- 【java养成】:案例(模拟用户注册、点歌系统)
- 【java养成】:案例(打印三角形,超市购物、随机点名)
- 【Java养成实例】:练习案例
- Java基础案例 | 第二弹(持续更新...xdm冲啊)
- Java基础案例 | 第一弹(持续更新...冲冲冲)
- Java多线程基础(一)---Thread API(join深度详解、源码分析和案例分析之代码实现,优雅关闭线程三种方式)
- Java的++自增
- Java使用spire进行word文档的替换
- Java 解析pdf文档内容实战案例
- Java ThreadPoolExecutor机制