AMQP: Kafka code
2023-09-11 14:16:16 时间
No Callback:
package io.veer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class Producer{ public static void main(String[] args){ Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.8.11:9092,192.168.8.12:9092,192.168.8.13:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 创建KafkaProducer对象 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties); // 发送数据 for(int i = 0; i < 5; i++){ kafkaProducer.send(new ProducerRecord<>("zion", "etymology" + i)); } kafkaProducer.close(); } }
Callback:
package io.veer; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class Producer{ public static void main(String[] args){ Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.8.11:9092,192.168.8.12:9092,192.168.8.13:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 创建KafkaProducer对象 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties); // 发送数据 for(int i = 0; i < 5; i++){ kafkaProducer.send(new ProducerRecord<>("zion", "etymology" + i), new Callback(){ @Override public void onCompletion(RecordMetadata recordMetadata, Exception e){ if(e == null){ System.out.printf("\033[37;7m Topic: %s, Partition: %s \033[0m\n", recordMetadata.topic(), recordMetadata.partition()); } } }); } kafkaProducer.close(); } }
同步发送:
package io.veer; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; import java.util.concurrent.ExecutionException; public class Producer{ public static void main(String[] args) throws ExecutionException, InterruptedException{ Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.8.11:9092,192.168.8.12:9092,192.168.8.13:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 创建KafkaProducer对象 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties); // 发送数据 for(int i = 0; i < 5; i++){ kafkaProducer.send(new ProducerRecord<>("zion", "etymology" + i)).get(); // 加入.get() 变成同步 } kafkaProducer.close(); } }
指定partition, partition必须存在, 否则hang:
package io.veer; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class Producer{ public static void main(String[] args){ Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.8.11:9092,192.168.8.12:9092,192.168.8.13:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 创建KafkaProducer对象 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties); // 发送数据 for(int i = 0; i < 5; i++){ kafkaProducer.send(new ProducerRecord<>("zion", 0, "", "etymology" + i), new Callback(){ @Override public void onCompletion(RecordMetadata recordMetadata, Exception e){ if(e == null){ System.out.printf("\033[37;7m Topic: %s, Partition: %s \033[0m\n", recordMetadata.topic(), recordMetadata.partition()); } } }); } kafkaProducer.close(); } }
不指定partition, 根据key的hash值取模
package io.veer; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class Producer{ public static void main(String[] args){ Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.8.11:9092,192.168.8.12:9092,192.168.8.13:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 创建KafkaProducer对象 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties); // 发送数据 for(int i = 0; i < 5; i++){ kafkaProducer.send(new ProducerRecord<>("zion", "moniker", "etymology" + i), new Callback(){ @Override public void onCompletion(RecordMetadata recordMetadata, Exception e){ if(e == null){ System.out.printf("\033[37;7m Topic: %s, Partition: %s \033[0m\n", recordMetadata.topic(), recordMetadata.partition()); } } }); } kafkaProducer.close(); } }
既没有partition值又没有key值的情况下,Kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直 使用该分区,待该分区的batch已满或者已完成,Kafka再随机一个分区进行使用(和上一次的分区不同)
public class Producer{ public static void main(String[] args) throws InterruptedException{ Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.8.11:9092,192.168.8.12:9092,192.168.8.13:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 创建KafkaProducer对象 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties); // 发送数据 for(int i = 0; i < 50; i++){ kafkaProducer.send(new ProducerRecord<>("zion", "etymology" + i), new Callback(){ @Override public void onCompletion(RecordMetadata recordMetadata, Exception e){ if(e == null){ System.out.printf("\033[37;7m Topic: %s, Partition: %s \033[0m\n", recordMetadata.topic(), recordMetadata.partition()); } } }); Thread.sleep(2); } kafkaProducer.close(); } }
自定义partitioner:
package io.veer; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import java.util.Map; public class MyPartitioner implements Partitioner{ @Override public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster){ String msg = o1.toString(); System.out.println("msg = " + msg); int partition; if(msg.contains("moniker")){ partition = 0; }else{ partition = 1; } return partition; } @Override public void close(){ } @Override public void configure(Map<String, ?> map){ } }
package io.veer; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class Producer{ public static void main(String[] args) throws InterruptedException{ Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.8.11:9092,192.168.8.12:9092,192.168.8.13:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 关联自定义分区器 properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "io.veer.MyPartitioner"); // 创建KafkaProducer对象 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties); // 发送数据 for(int i = 0; i < 5; i++){ kafkaProducer.send(new ProducerRecord<>("zion", "etymology" + i), new Callback(){ @Override public void onCompletion(RecordMetadata recordMetadata, Exception e){ if(e == null){ System.out.printf("\033[37;7m Topic: %s, Partition: %s \033[0m\n", recordMetadata.topic(), recordMetadata.partition()); } } }); Thread.sleep(2); } for(int i = 0; i < 5; i++){ kafkaProducer.send(new ProducerRecord<>("zion", "moniker" + i), new Callback(){ @Override public void onCompletion(RecordMetadata recordMetadata, Exception e){ if(e == null){ System.out.printf("\033[37;7m Topic: %s, Partition: %s \033[0m\n", recordMetadata.topic(), recordMetadata.partition()); } } }); } kafkaProducer.close(); } }
设置参数
package io.veer; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class Producer{ public static void main(String[] args) throws InterruptedException{ Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.8.11:9092,192.168.8.12:9092,192.168.8.13:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 关联自定义分区器 properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "io.veer.MyPartitioner"); properties.put(ProducerConfig.BATCH_SIZE_CONFIG, (int) Math.pow(2, 14)); properties.put(ProducerConfig.LINGER_MS_CONFIG, 10); properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, (long) Math.pow(2, 25)); properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); // 创建KafkaProducer对象 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties); // 发送数据 for(int i = 0; i < 5; i++){ kafkaProducer.send(new ProducerRecord<>("zion", "etymology" + i), new Callback(){ @Override public void onCompletion(RecordMetadata recordMetadata, Exception e){ if(e == null){ System.out.printf("\033[37;7m Topic: %s, Partition: %s \033[0m\n", recordMetadata.topic(), recordMetadata.partition()); } } }); Thread.sleep(2); } for(int i = 0; i < 5; i++){ kafkaProducer.send(new ProducerRecord<>("zion", "moniker" + i), new Callback(){ @Override public void onCompletion(RecordMetadata recordMetadata, Exception e){ if(e == null){ System.out.printf("\033[37;7m Topic: %s, Partition: %s \033[0m\n", recordMetadata.topic(), recordMetadata.partition()); } } }); } Thread.sleep(1000 * 5); kafkaProducer.close(); } }
开启事务:
package io.veer; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class Producer{ public static void main(String[] args) throws InterruptedException{ Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.8.11:9092,192.168.8.12:9092,192.168.8.13:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 关联自定义分区器 properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "io.veer.MyPartitioner"); // 指定事务ID, 全局唯一 properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional_id_01"); properties.put(ProducerConfig.BATCH_SIZE_CONFIG, (int) Math.pow(2, 14)); properties.put(ProducerConfig.LINGER_MS_CONFIG, 10); properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, (long) Math.pow(2, 25)); properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); // 创建KafkaProducer对象 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties); kafkaProducer.initTransactions(); kafkaProducer.beginTransaction(); try{ for(int i = 0; i < 5; i++){ kafkaProducer.send(new ProducerRecord<>("zion", "etymology" + i), new Callback(){ @Override public void onCompletion(RecordMetadata recordMetadata, Exception e){ if(e == null){ System.out.printf("\033[37;7m Topic: %s, Partition: %s \033[0m\n", recordMetadata.topic(), recordMetadata.partition()); } } }); Thread.sleep(2); } int b = 1 / 0; kafkaProducer.commitTransaction(); }catch(Exception e){ e.printStackTrace(); kafkaProducer.abortTransaction(); }finally{ kafkaProducer.close(); } } }
数据有序:
相关文章
- kafka学习之-深入研究原理
- 来吧,1分钟带你玩转Kafka
- FlinkCDC读取MySQL并写入Kafka案例(com.alibaba.ververica)
- 【消息队列】kafka是如何保证消息不被重复消费的
- 设置Kafka集群的方法
- Kafka项目实战-用户日志上报实时统计之应用概述
- Kafka JAVA客户端代码示例--高级应用
- Kafka消息队列
- helm部署kafka完整记录
- kafka可视化客户端工具(Kafka Tool)的基本使用
- 如何让 Visual Studio Code 里显示 Cypress 的 intelligent code suggestion
- 【译】使用Apache Kafka构建流式数据平台(1)
- kafka详解一、Kafka简介
- Ubuntu 下Unable to install “Visual Studio Code“:snap “code“ has “install-snap“ change in progress
- 5种kafka消费端性能优化方法
- elk-日志方案--使用Filebeat收集日志并输出到Kafka
- Kafka 如何优化内存缓冲机制造成的频繁 GC 问题?
- 大数据Hadoop之——Kafka鉴权认证(Kafka kerberos认证+kafka账号密码认证+CDH Kerberos认证)