Atitit Kafka 使用总结 内容 Kafka2.0 50M1 启动 要启动zookeeper 先,比ativemp麻烦很多啊1 Kafka生产者 1 Kafka消费者2 2
Atitit Kafka 使用总结
内容
启动 要启动zookeeper 先,比ativemp麻烦很多啊1
zookeeper-server-start.bat D:\kafka_2.11-2.1.0\config\zookeeper.properties
// D:\kafka_2.11-2.1.0\bin\windows\kafka-server-start.bat D:\kafka_2.11-2.1.0\config\server.properties
D:\kafka_2.11-2.1.0\bin\windows\kafka-server-start.bat D:\kafka_2.11-2.1.0\config\server.properties
//\\ kafka-server-start.bat D:\kafka_2.11-2.1.0\config\server.properties
/**
* Kafka生产者
*/
public class KafkaProducerDemo extends Thread{
public static void main(String[] args) throws InterruptedException, ExecutionException {
Properties props = new Properties();
props.put("bootstrap.servers", "127.0.0.1:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
String message = "message_" ;
String topic="hello_topic";
Producer producer = new KafkaProducer(props);
//Producer producer =new KafkaProducer(props);
//new Producer (new ProducerConfig(properties));
Future<RecordMetadata> Future_RecordMetadata= producer.send(new ProducerRecord<String, String>(topic, "val332222"));
System.out.println("Sent: " + message);
System.out.println(Future_RecordMetadata.get());
}
Kafka消费者
/**
* Kafka消费者
*/
public class KafkaConsumerCls {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "127.0.0.1:9092");
props.put("group.id", "test");
props.put("key.deserializer", StringDeserializer.class);
props.put("value.deserializer", StringDeserializer.class);
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("hello_topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
kafka_2.12-1.1.0 生产与消费java实现示例 - cctext - 博客园
相关文章
- Kafka + Zookeeper集群搭建
- eruka快速刷新和kafka配置
- Flume和Kafka的组合使用
- 下载Kafka安装包
- zookeeper入门教程_dubbo和Zookeeper详解
- kafka学习之Kafka 的简介(一)
- Kafka 会不会丢消息?怎么处理的?
- Kafka 删除topic_kafka自动创建topic
- Kafka源码解析_kafka删除消费组命令
- Kafka入门经典教程_kafka菜鸟教程
- kafka应用场景包括_不是kafka适合的应用场景
- 画图搞懂Kafka的高可用方案-ISR机制如何保证写入数据时主从的数据同步
- 【Spring Boot实战与进阶】集成Kafka消息队列
- kafka 入门
- 字节面试官狂问我:kafka 是什么?有什么作用?
- 【愚公系列】2023年03月 MES生产制造执行系统-004.Kafka的使用
- 14 张图详解 Zookeeper + Kafka on K8S 环境部署
- 新浪微博从 Kafka 到 Pulsar 的演变
- Kafka解惑之时间轮 (TimingWheel)详解大数据
- kafka源码解析Word版详解编程语言
- kafka源码解析之七KafkaRequestHandlerPool详解编程语言
- kafka原理解析-《Learning Apache Kafka, 2nd Edition.pdf》详解编程语言
- 从Linux启动Kafka:一步一步指南(linux启动kafka)
- kafka 、 zookeeper 集群(二)
- 如何在Linux上安装Zookeeper?(linux安装zookeeper)
- Linux下搭建Kafka Stream架构的实践(linux kafka)
- Kafka连接Oracle数据库提高数据处理能力(kafka连oracle)
- 利用Kafka与Oracle实现实时数据交换(kafka oracle)