Spring Cloud Stream与Kafka集成示例
下面是一个完整的示例,它使用Spring Cloud Stream和Kafka来创建一个简单的消息处理器和发布器:
1. 添加依赖
在pom.xml文件中添加以下依赖:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
2. 配置Kafka
在application.properties文件中添加以下配置:
propertiesCopy codespring.cloud.stream.kafka.binder.brokers=localhost:9092
spring.cloud.stream.kafka.binder.zkNodes=localhost:2181
spring.cloud.stream.kafka.binder.configuration.acks=all
spring.cloud.stream.kafka.binder.configuration.retries=3
spring.cloud.stream.kafka.binder.configuration.batch.size=16384
spring.cloud.stream.kafka.binder.configuration.linger.ms=1
spring.cloud.stream.kafka.binder.configuration.buffer.memory=33554432
spring.cloud.stream.kafka.binder.configuration.compression.type=gzip
3. 创建消息处理器
@EnableBinding(MyProcessor.class)
@SpringBootApplication
public class MyApplication {
public static void main(String[] args) {
SpringApplication.run(MyApplication.class, args);
}
@Autowired
private MyProcessor processor;
@StreamListener(MyProcessor.INPUT)
public void handle(Message<String> message) {
System.out.println("Received message: " + message.getPayload());
}
public interface MyProcessor {
String INPUT = "myInput";
String OUTPUT = "myOutput";
@Input(INPUT)
SubscribableChannel input();
@Output(OUTPUT)
MessageChannel output();
}
}
在这个示例中,我们定义了一个名为MyProcessor的声明式接口,其中包含了一个名为myInput的输入通道和一个名为myOutput的输出通道。我们使用@EnableBinding注解告诉Spring Boot应用程序使用MyProcessor接口中定义的输入和输出通道。
然后,我们定义了一个@StreamListener注解的方法handle(),该方法处理从输入通道接收到的消息,并将其打印到控制台。
4. 创建消息发布器
@Component
public class MyPublisher {
@Autowired
private MyProcessor processor;
public void publish(String message) {
processor.output().send(MessageBuilder.withPayload(message).build());
}
}
在这个示例中,我们创建了一个名为MyPublisher的组件,并在其中注入了MyProcessor接口。我们还定义了一个名为publish()的方法,该方法使用processor.output().send()方法将一个带有有效载荷的消息发送到名为myOutput的输出通道中。
5. 测试应用程序
@RestController
public class MyController {
@Autowired
private MyPublisher publisher;
@PostMapping("/publish")
public void publishMessage(@RequestBody String message) {
publisher.publish(message);
}
}
在这个示例中,我们创建了一个名为MyController的REST控制器,并在其中注入了MyPublisher组件。我们还定义了一个名为publishMessage()的POST请求处理程序,该处理程序将消息正文作为输入,并使用MyPublisher组件将其发送到名为myOutput的输出通道中。
6. 运行应用程序
现在我们可以启动应用程序并测试它了。我们可以使用任何HTTP客户端向/publish端点发送POST请求,并将消息正文作为输入。
例如,我们可以使用curl命令向端口8080发送一条消息:
curl -X POST -H "Content-Type: text/plain" -d "Hello, Kafka!" http://localhost:8080/publish
应用程序应该在控制台上输出以下内容:
Received message: Hello, Kafka!
这证明消息已成功从myOutput输出通道发送到myInput输入通道,并由handle()方法处理。
相关文章
- spring注解解析流程_深入理解Kafka
- Spring学习笔记(三)——Spring注解开发&spring与Junit整合测试
- spring boot自动配置原理面试题_Spring boot面试
- Spring学习笔记(三十二)——SpringBoot中cache缓存的介绍和使用
- Spring学习笔记(三十五)——小技巧:配置全局跨域、全局⽇期格式化、读取本地json
- unity c#面试_spring面试题及答案
- 一个依赖搞定 Spring Boot 接口防盗刷
- 解释spring框架中bean的生命周期_Spring bean的生命周期
- Spring boot集成plumelog日志系统
- kafka学习之Kafka 的简介(一)
- 斗转星移,无人能及——Spring MVC
- taskscheduler java_Spring TaskScheduler使用实例解析
- SpringAOP(1)-spring源码详解(六)
- Spring Security 里的filer们
- Spring:IOC
- 《Spring核心技术》开篇:我要带你一步步调试Spring6.0源码啦!
- 不背锅运维:消息队列概念、kafka入门、Kafka Golang客户端库
- Spring for Apache Kafka 3.0 和 Spring for RabbitMQ 3.0 发布
- Spring Boot2.0之 整合Zookeeper集群详解编程语言
- 深入探究Kafka与Redis的对比(kafka与redis)
- Spring Boot(十五):spring boot+jpa+thymeleaf增删改查示例详解编程语言
- SpringMVC + Spring + Hibernate实战(通用配置)详解编程语言
- Maven搭建SSH(Struts2+Spring+Hibernate)框架入门教程(二)
- spring使用Spring整合Redis和Jedis构建高性能应用(redisjedis与)
- 从Linux启动Kafka:一步一步指南(linux启动kafka)
- Spring框架致力于搭建基于MSSQL的稳健开发体系(spring mssql)
- Linux环境下部署Kafka服务器实践(linux kafka)