zl程序教程

您现在的位置是:首页 >  大数据

当前栏目

AMQP: Kafka code

2023-09-11 14:16:16 时间

 

No Callback:

package io.veer;


import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class Producer{
  public static void main(String[] args){
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.8.11:9092,192.168.8.12:9092,192.168.8.13:9092");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    // 创建KafkaProducer对象
    KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

    // 发送数据
    for(int i = 0; i < 5; i++){
      kafkaProducer.send(new ProducerRecord<>("zion", "etymology" + i));
    }
    kafkaProducer.close();
  }
}

 

Callback:

package io.veer;


import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class Producer{
  public static void main(String[] args){
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.8.11:9092,192.168.8.12:9092,192.168.8.13:9092");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    // 创建KafkaProducer对象
    KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

    // 发送数据
    for(int i = 0; i < 5; i++){
      kafkaProducer.send(new ProducerRecord<>("zion", "etymology" + i), new Callback(){
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception e){
          if(e == null){
            System.out.printf("\033[37;7m Topic: %s, Partition: %s \033[0m\n", recordMetadata.topic(), recordMetadata.partition());
          }
        }
      });
    }
    kafkaProducer.close();
  }
}

 

同步发送:

package io.veer;


import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class Producer{
  public static void main(String[] args) throws ExecutionException, InterruptedException{
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.8.11:9092,192.168.8.12:9092,192.168.8.13:9092");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    // 创建KafkaProducer对象
    KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

    // 发送数据
    for(int i = 0; i < 5; i++){
      kafkaProducer.send(new ProducerRecord<>("zion", "etymology" + i)).get();  // 加入.get() 变成同步
    }
    kafkaProducer.close();
  }
}

 

 

指定partition, partition必须存在, 否则hang:

package io.veer;


import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class Producer{
  public static void main(String[] args){
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.8.11:9092,192.168.8.12:9092,192.168.8.13:9092");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    // 创建KafkaProducer对象
    KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

    // 发送数据
    for(int i = 0; i < 5; i++){
      kafkaProducer.send(new ProducerRecord<>("zion", 0, "", "etymology" + i), new Callback(){
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception e){
          if(e == null){
            System.out.printf("\033[37;7m Topic: %s, Partition: %s \033[0m\n", recordMetadata.topic(), recordMetadata.partition());
          }
        }
      });
    }
    kafkaProducer.close();
  }
}

 

不指定partition, 根据key的hash值取模

package io.veer;


import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class Producer{
  public static void main(String[] args){
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.8.11:9092,192.168.8.12:9092,192.168.8.13:9092");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    // 创建KafkaProducer对象
    KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

    // 发送数据
    for(int i = 0; i < 5; i++){
      kafkaProducer.send(new ProducerRecord<>("zion", "moniker", "etymology" + i), new Callback(){
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception e){
          if(e == null){
            System.out.printf("\033[37;7m Topic: %s, Partition: %s \033[0m\n", recordMetadata.topic(), recordMetadata.partition());
          }
        }
      });
    }
    kafkaProducer.close();
  }
}

既没有partition值又没有key值的情况下,Kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直 使用该分区,待该分区的batch已满或者已完成,Kafka再随机一个分区进行使用(和上一次的分区不同)

public class Producer{
  public static void main(String[] args) throws InterruptedException{
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.8.11:9092,192.168.8.12:9092,192.168.8.13:9092");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    // 创建KafkaProducer对象
    KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

    // 发送数据
    for(int i = 0; i < 50; i++){
      kafkaProducer.send(new ProducerRecord<>("zion", "etymology" + i), new Callback(){
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception e){
          if(e == null){
            System.out.printf("\033[37;7m Topic: %s, Partition: %s \033[0m\n", recordMetadata.topic(), recordMetadata.partition());
          }
        }
      });
      Thread.sleep(2);
    }
    kafkaProducer.close();
  }
}

自定义partitioner:

package io.veer;


import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

public class MyPartitioner implements Partitioner{
  @Override
  public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster){
    String msg = o1.toString();
    System.out.println("msg = " + msg);
    int partition;
    if(msg.contains("moniker")){
      partition = 0;
    }else{
      partition = 1;
    }
    return partition;
  }

  @Override
  public void close(){

  }

  @Override
  public void configure(Map<String, ?> map){

  }
}
package io.veer;


import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class Producer{
  public static void main(String[] args) throws InterruptedException{
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.8.11:9092,192.168.8.12:9092,192.168.8.13:9092");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    // 关联自定义分区器
    properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "io.veer.MyPartitioner");
    // 创建KafkaProducer对象
    KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);


    // 发送数据
    for(int i = 0; i < 5; i++){
      kafkaProducer.send(new ProducerRecord<>("zion", "etymology" + i), new Callback(){
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception e){
          if(e == null){
            System.out.printf("\033[37;7m Topic: %s, Partition: %s \033[0m\n", recordMetadata.topic(), recordMetadata.partition());
          }
        }
      });
      Thread.sleep(2);
    }
    for(int i = 0; i < 5; i++){
      kafkaProducer.send(new ProducerRecord<>("zion", "moniker" + i), new Callback(){
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception e){
          if(e == null){
            System.out.printf("\033[37;7m Topic: %s, Partition: %s \033[0m\n", recordMetadata.topic(), recordMetadata.partition());
          }
        }
      });
    }
    kafkaProducer.close();
  }
}

设置参数

package io.veer;


import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class Producer{
  public static void main(String[] args) throws InterruptedException{
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.8.11:9092,192.168.8.12:9092,192.168.8.13:9092");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    // 关联自定义分区器
    properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "io.veer.MyPartitioner");
    properties.put(ProducerConfig.BATCH_SIZE_CONFIG, (int) Math.pow(2, 14));
    properties.put(ProducerConfig.LINGER_MS_CONFIG, 10);
    properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, (long) Math.pow(2, 25));
    properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
    // 创建KafkaProducer对象
    KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);


    // 发送数据
    for(int i = 0; i < 5; i++){
      kafkaProducer.send(new ProducerRecord<>("zion", "etymology" + i), new Callback(){
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception e){
          if(e == null){
            System.out.printf("\033[37;7m Topic: %s, Partition: %s \033[0m\n", recordMetadata.topic(), recordMetadata.partition());
          }
        }
      });
      Thread.sleep(2);
    }
    for(int i = 0; i < 5; i++){
      kafkaProducer.send(new ProducerRecord<>("zion", "moniker" + i), new Callback(){
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception e){
          if(e == null){
            System.out.printf("\033[37;7m Topic: %s, Partition: %s \033[0m\n", recordMetadata.topic(), recordMetadata.partition());
          }
        }
      });
    }
    Thread.sleep(1000 * 5);
    kafkaProducer.close();
  }
}

 

开启事务:

package io.veer;


import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class Producer{
  public static void main(String[] args) throws InterruptedException{
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.8.11:9092,192.168.8.12:9092,192.168.8.13:9092");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    // 关联自定义分区器
    properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "io.veer.MyPartitioner");
    // 指定事务ID, 全局唯一
    properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional_id_01");
    properties.put(ProducerConfig.BATCH_SIZE_CONFIG, (int) Math.pow(2, 14));
    properties.put(ProducerConfig.LINGER_MS_CONFIG, 10);
    properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, (long) Math.pow(2, 25));
    properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
    // 创建KafkaProducer对象
    KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);


    kafkaProducer.initTransactions();
    kafkaProducer.beginTransaction();
    try{
      for(int i = 0; i < 5; i++){
        kafkaProducer.send(new ProducerRecord<>("zion", "etymology" + i), new Callback(){
          @Override
          public void onCompletion(RecordMetadata recordMetadata, Exception e){
            if(e == null){
              System.out.printf("\033[37;7m Topic: %s, Partition: %s \033[0m\n", recordMetadata.topic(), recordMetadata.partition());
            }
          }
        });
        Thread.sleep(2);
      }
      int b = 1 / 0;
      kafkaProducer.commitTransaction();
    }catch(Exception e){
      e.printStackTrace();
      kafkaProducer.abortTransaction();
    }finally{
      kafkaProducer.close();
    }

  }
}

 

数据有序: