zl程序教程

您现在的位置是:首页 >  大数据

当前栏目

kafka源码解析之十七消费者流程(客户端如何获取topic的数据)详解编程语言

2023-06-13 09:20:35 时间

特点:1)一个消息读取多次

  2)在一个处理过程中只消费某个broker上的partition的部分消息

  3)必须在程序中跟踪offset值

  4)必须找出指定TopicPartition中的lead broker

  5)必须处理broker的变动

客户端编程必须按照以下步骤:

  1)从所有活跃的broker中找出哪个是指定TopicPartition中的leader broker

  2)构造请求

  3)发送请求查询数据

  4)处理leader broker变更

客户端代码如下:

public class KafkaSimpleConsumer { 

 private List String  m_replicaBrokers = new ArrayList String  

 public KafkaSimpleConsumer() { 

 m_replicaBrokers = new ArrayList String  

 } 

 public static void main(String args[]) { 

 KafkaSimpleConsumer example = new KafkaSimpleConsumer(); 

 // 最大读取消息数量 

 long maxReads = Long.parseLong( 3  

 // 要订阅的topic 

 String topic =  mytopic  

 // 要查找的分区 

 int partition = Integer.parseInt( 0  

 // broker节点的ip 

 List String  seeds = new ArrayList String  

 seeds.add( 192.168.4.30  

 seeds.add( 192.168.4.31  

 seeds.add( 192.168.4.32  

 // 端口 

 int port = Integer.parseInt( 9092  

 try { 

 example.run(maxReads, topic, partition, seeds, port); 

 } catch (Exception e) { 

 System.out.println( Oops:  + e); 

 e.printStackTrace(); 

 } 

 } 

 public void run(long a_maxReads, String a_topic, int a_partition, List String  a_seedBrokers, int a_port) throws Exception { 

 // 获取指定Topic partition的元数据 

 PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition); 

 if (metadata == null) { 

 System.out.println( Can t find metadata for Topic and Partition. Exiting  

 return; 

 } 

 if (metadata.leader() == null) { 

 System.out.println( Can t find Leader for Topic and Partition. Exiting  

 return; 

 } 

 //找到leader broker 

 String leadBroker = metadata.leader().host(); 

 String clientName =  Client_  + a_topic +  _  + a_partition; 

//链接leader broker 

 SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName); 

//获取topic的最新偏移量 

 long readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName); 

 int numErrors = 0; 

 while (a_maxReads   0) { 

 if (consumer == null) { 

 consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName); 

 } 

//本质上就是发送FetchRequest请求 

 FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 100000).build(); 

 FetchResponse fetchResponse = consumer.fetch(req); 

 if (fetchResponse.hasError()) { 

 numErrors++; 

 // Something went wrong! 

 short code = fetchResponse.errorCode(a_topic, a_partition); 

 System.out.println( Error fetching data from the Broker:  + leadBroker +   Reason:   + code); 

 if (numErrors   5) 

 break; 

 if (code == ErrorMapping.OffsetOutOfRangeCode()) { 

 // We asked for an invalid offset. For simple case ask for 

 // the last element to reset 

 readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName); 

 continue; 

 } 

 consumer.close(); 

 consumer = null; 

 //处理topic的partition的leader发生变更的情况 

 leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port); 

 continue; 

 } 

 numErrors = 0; 

 long numRead = 0; 

 for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) { 

 long currentOffset = messageAndOffset.offset(); 

 if (currentOffset   readOffset) {//过滤旧的数据 

 System.out.println( Found an old offset:   + currentOffset +   Expecting:   + readOffset); 

 continue; 

 } 

 readOffset = messageAndOffset.nextOffset(); 

 ByteBuffer payload = messageAndOffset.message().payload(); 

 byte[] bytes = new byte[payload.limit()]; 

 payload.get(bytes); 

//打印消息 

 System.out.println(String.valueOf(messageAndOffset.offset()) +  :   + new String(bytes,  UTF-8 )); 

 numRead++; 

 a_maxReads--; 

 } 

 if (numRead == 0) { 

 try { 

 Thread.sleep(1000); 

 } catch (InterruptedException ie) { 

 } 

 } 

 } 

 if (consumer != null) 

 consumer.close(); 

 } 

 public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) { 

 TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); 

 Map TopicAndPartition, PartitionOffsetRequestInfo  requestInfo = new HashMap TopicAndPartition, PartitionOffsetRequestInfo  

 requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); 

 kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); 

 OffsetResponse response = consumer.getOffsetsBefore(request); 

 

 if (response.hasError()) { 

 System.out.println( Error fetching data Offset Data the Broker. Reason:   + response.errorCode(topic, partition)); 

 return 0; 

 } 

 long[] offsets = response.offsets(topic, partition); 

 return offsets[0]; 

 } 

 /** 

 * @param a_oldLeader 

 * @param a_topic 

 * @param a_partition 

 * @param a_port 

 * @return String 

 * @throws Exception 

 *找一个leader broker,其实就是发送TopicMetadataRequest请求 

 */ 

 private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception { 

 for (int i = 0; i   3; i++) { 

 boolean goToSleep = false; 

 PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition); 

 if (metadata == null) { 

 goToSleep = true; 

 } else if (metadata.leader() == null) { 

 goToSleep = true; 

 } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host())   i == 0) { 

 // first time through if the leader hasn t changed give 

 // ZooKeeper a second to recover 

 // second time, assume the broker did recover before failover, 

 // or it was a non-Broker issue 

 // 

 goToSleep = true; 

 } else { 

 return metadata.leader().host(); 

 } 

 if (goToSleep) { 

 try { 

 Thread.sleep(1000); 

 } catch (InterruptedException ie) { 

 } 

 } 

 } 

 System.out.println( Unable to find new leader after Broker failure. Exiting  

 throw new Exception( Unable to find new leader after Broker failure. Exiting  

 } 

 private PartitionMetadata findLeader(List String  a_seedBrokers, int a_port, String a_topic, int a_partition) { 

 PartitionMetadata returnMetaData = null; 

 loop: for (String seed : a_seedBrokers) { 

 SimpleConsumer consumer = null; 

 try { 

 consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024,  leaderLookup  

 List String  topics = Collections.singletonList(a_topic); 

 TopicMetadataRequest req = new TopicMetadataRequest(topics); 

 kafka.javaapi.TopicMetadataResponse resp = consumer.send(req); 

 List TopicMetadata  metaData = resp.topicsMetadata(); 

 for (TopicMetadata item : metaData) { 

 for (PartitionMetadata part : item.partitionsMetadata()) { 

 if (part.partitionId() == a_partition) { 

 returnMetaData = part; 

 break loop; 

 } 

 } 

 } 

 } catch (Exception e) { 

 System.out.println( Error communicating with Broker [  + seed +  ] to find Leader for [  + a_topic +  ,   + a_partition +  ] Reason:   + e); 

 } finally { 

 if (consumer != null) 

 consumer.close(); 

 } 

 } 

 if (returnMetaData != null) { 

 m_replicaBrokers.clear(); 

 for (kafka.cluster.Broker replica : returnMetaData.replicas()) { 

 m_replicaBrokers.add(replica.host()); 

 } 

 } 

 return returnMetaData; 

 } 

}
17.2 高级消费者

特点:

1)消费过的数据无法再次消费,如果想要再次消费数据,要么换另一个group

2)为了记录每次消费的位置,必须提交TopicAndPartition的offset,offset提交支持两种方式:

①提交至ZK (频繁操作zk是效率比较低的)

②提交至kafka内部

3)客户端通过stream获取数据,stream即指的是来自一个或多个服务器上的一个或者多个partition的消息。每一个stream都对应一个单线程处理。因此,client能够设置满足自己需求的stream数目。总之,一个stream也许代表了多个服务器partion的消息的聚合,但是每一个partition都只能到一个stream。

4)consumer和partition的关系:

   ①如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数

  ②如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀

   ③如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同

 

客户端编程必须按照以下步骤:

1)设计topic和stream的关系,即K为topic,V为stream的个数N

2)开启N个消费组线程消费这N个stream

客户端代码如下:

import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.consumer.ConsumerIterator; /** * 详细可以参考:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example * * @author Fung */ public class KafkaHighConsumer { private final ConsumerConnector consumer; private final String topic; private ExecutorService executor; public KafkaHighConsumer(String a_zookeeper, String a_groupId, String a_topic) { consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper, a_groupId)); this.topic = a_topic; } public void shutdown() { if (consumer != null) consumer.shutdown(); if (executor != null) executor.shutdown(); } public void run(int numThreads) { Map String, Integer  topicCountMap = new HashMap String, Integer  //设计topic和stream的关系,即K为topic,V为stream的个数N topicCountMap.put(topic, new Integer(numThreads)); //获取numThreads个stream Map String, List KafkaStream byte[], byte[]  consumerMap = consumer .createMessageStreams(topicCountMap); List KafkaStream byte[], byte[]  streams = consumerMap.get(topic); executor = Executors.newFixedThreadPool(numThreads); int threadNumber = 0; //开启N个消费组线程消费这N个stream for (final KafkaStream stream : streams) { executor.submit(new ConsumerMsgTask(stream, threadNumber)); threadNumber++; } } private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) { Properties props = new Properties(); props.put( zookeeper.connect , a_zookeeper); props.put( group.id , a_groupId); props.put( zookeeper.session.timeout.ms ,  400  props.put( zookeeper.sync.time.ms ,  200  props.put( auto.commit.interval.ms ,  1000  return new ConsumerConfig(props); } public static void main(String[] arg) { String[] args = { 172.168.63.221:2188 ,  group-1 ,  page_visits ,  12  String zooKeeper = args[0]; String groupId = args[1]; String topic = args[2]; int threads = Integer.parseInt(args[3]); KafkaHighConsumer demo = new KafkaHighConsumer(zooKeeper, groupId, topic); demo.run(threads); try { Thread.sleep(10000); } catch (InterruptedException ie) { } demo.shutdown(); } public class ConsumerMsgTask implements Runnable { private KafkaStream m_stream; private int m_threadNumber; public ConsumerMsgTask(KafkaStream stream, int threadNumber) { m_threadNumber = threadNumber; m_stream = stream; } public void run() {// KafkaStream的本质就是一个网络迭代器 ConsumerIterator byte[], byte[]  it = m_stream.iterator(); while (it.hasNext()) System.out.println( Thread   + m_threadNumber +  :   + new String(it.next().message())); System.out.println( Shutting down Thread:   + m_threadNumber); } } /** * Created by Administrator on 2016/4/11. */ public static class KafkaProducer { } }

其具体的消费逻辑如下:

ka.png

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/11809.html

cgojava