zl程序教程

您现在的位置是:首页 >  后端

当前栏目

java 多线程消费kfk队列消息案例

2023-09-27 14:25:58 时间

package org.training.hadoop.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public class KafkaConsumerExample
{

//config

public static Properties getConfig()

 Properties props = new Properties();

 props.put("bootstrap.servers", "localhost:9092");

 props.put("group.id", "testGroup");

 props.put("enable.auto.commit", "true");

 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

 return props;

public void consumeMessage()

 // launch 3 threads to consume

 int numConsumers = 3;

 final String topic = "test1";

 final ExecutorService executor = Executors.newFixedThreadPool(numConsumers); //创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待

 final List KafkaConsumerRunner consumers = new ArrayList KafkaConsumerRunner 

 for (int i = 0; i numConsumers; i++) {

 KafkaConsumerRunner consumer = new KafkaConsumerRunner(topic);

 consumers.add(consumer);

 executor.submit(consumer);

 //关闭线程并清理---------------------------

 //当jvm关闭的时候,会执行系统中已经设置的所有通过方法addShutdownHook添加的钩子,清理

 Runtime.getRuntime().addShutdownHook(new Thread()

 @Override

 public void run()

 for (KafkaConsumerRunner consumer : consumers) {

 consumer.shutdown(); //关闭线程

 executor.shutdown();

 try {

 //当前线程阻塞,直到

 //等所有已提交的任务(包括正在跑的和队列中等待的)执行完或者等超时时间到

 //或者线程被中断,抛出InterruptedExcepti

 executor.awaitTermination(5000, TimeUnit.MILLISECONDS);

 catch (InterruptedException e) {

 e.printStackTrace();

// Thread to consume kafka data

public static class KafkaConsumerRunner

 implements Runnable

 private final AtomicBoolean closed = new AtomicBoolean(false);

 private final KafkaConsumer String, String consumer;

 private final String topic;

 public KafkaConsumerRunner(String topic)

 Properties props = getConfig();

 consumer = new KafkaConsumer String, String (props);

 this.topic = topic;

 public void handleRecord(ConsumerRecord record)

 System.out.println("name: " + Thread.currentThread().getName() + " ; topic: " + record.topic() + " ; offset" + record.offset() + " ; key: " + record.key() + " ; value: " + record.value());

 public void run()

 try {

 // subscribe

 consumer.subscribe(Arrays.asList(topic));

 while (!closed.get()) {

 //read data

 ConsumerRecords String, String records = consumer.poll(10000); //poll方法消费数据,心跳机制通知broker是否正常

 // Handle new records

 for (ConsumerRecord String, String record : records) {

 handleRecord(record); //打印消费数据

 catch (WakeupException e) {

 // Ignore exception if closing

 if (!closed.get()) {

 throw e;

 finally {

 consumer.close();

 // Shutdown hook which can be called from a separate thread

 public void shutdown()

 closed.set(true);

 consumer.wakeup();

public static void main(String[] args)

 KafkaConsumerExample example = new KafkaConsumerExample();

 example.consumeMessage();

}

}


Java报告推送失败补偿机制;钉钉群通知消息核心代码 Java报告推送失败补偿机制,超过次数后使用钉钉通知开发 自动补偿实现: 要求方法调用的过程中,失败的时候,系统有办法进行自动重试,重试达到一定次数后,钉钉通知开发。 实现设计:注解,反射,定时任务
重磅消息!弃用 Java 8、Apache Kafka 3.0 发布!  什么是Kafka? Apache Kafka是分布式发布订阅消息传递系统和强大的队列,可以处理大量数据,并使您能够将消息从一个端点传递到另一个终端。Kafka适用于离线和在线消息消费。Kafka消息被保留在磁盘上,并在集群内复制以防止数据丢失。Kafka建立在ZooKeeper同步服务之上。它与Apache Storm和Spark完美结合,实时流式传输数据分析。 作者:zhulin1028 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。