rabbitmq 的hello world程序
2023-09-27 14:22:26 时间
一、引入rabbitmq依赖
首先在pom中引入rabbitmq的依赖,如下所示:
<!--引入rabbitmq依赖-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.1.0</version>
</dependency>
二、创建消息的生产者和消费者
创建消息的生产者,如下代码所示:
package com;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
* 消息生产者
*/
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException,TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
//由连接工厂创建连接
Connection conn = factory.newConnection();
//通过连接创建通道
Channel channel = conn.createChannel();
//声明交换器以及exchange类型为direct
String exchangeName = "hello-exchange";
channel.exchangeDeclare(exchangeName, "direct", true);
//设置路由键
String routingKey = "jasonKey";
//声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//要发送到queue的消息
String message = "jason";
//发布消息
channel.basicPublish(exchangeName, routingKey, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println("生产者生产了消息:" + message );
//关闭通道和连接
channel.close();
conn.close();
}
}
创建消息的消费者,如下代码所示:
package com;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消息消费者
*
*/
public class Recive {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
//建立到代理服务器到连接
final Connection conn = factory.newConnection();
//获得信道
final Channel channel = conn.createChannel();
//声明交换器
String exchangeName = "hello-exchange";
channel.exchangeDeclare(exchangeName, "direct", true);
String routingKey = "jasonKey";
//绑定队列,通过键 hola 将队列和交换器绑定起来
channel.queueBind(QUEUE_NAME, exchangeName, routingKey);
//消费消息
channel.basicConsume(QUEUE_NAME, false, "", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// properties 这是消息的一些相关属性,例如内容编码、内容类型等
String routingKey = envelope.getRoutingKey();
System.out.println("消费的路由键:" + routingKey);
long deliveryTag = envelope.getDeliveryTag();
//确认消息
channel.basicAck(deliveryTag, false);
String bodyStr = new String(body, "UTF-8");
if("jason".equals(bodyStr)){
System.out.println("消费者从队列中获得了消息,并且可以根据这个消息进行一些业务操作:" + bodyStr);
}
try {
channel.close();
conn.close();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
});
}
}
三、运行结果
先执行生产者程序,后执行消费者程序,运行程序结果如下:
四、总结
消费者和生产者,关键在于消息的传递以及消费。
生产者把消息发送到队列,消费者从队列中取出消息并进行消费,这其中还需要一个监听器,来实时监听这个队列,如果里面有了消息,消费者就进行消费。
生产者和queue之间通过exchange来进行关联,消费者也是一样的。
这就需要非常清楚 routing key、exchange、binding key、queue 的概念以及它们之间的关系。还需要清楚的知道消息的分发策略,即exchange的四种类型(direct、fanout、topic、headers)以及它们的区别,各自有什么不同。
相关文章
- 微信小程序 - 分包加载(分包使用)
- c#Winform程序调用app.config文件配置数据库连接字符串 SQL Server文章目录 浅谈SQL Server中统计对于查询的影响 有关索引的DMV SQL Server中的执行引擎入门 【译】表变量和临时表的比较 对于表列数据类型选择的一点思考 SQL Server复制入门(一)----复制简介 操作系统中的进程与线程
- java程序编译
- 如何使用linux程序mdadm创建软件RAID1软阵列
- [UWP]使用AlphaMaskEffect提升故障艺术动画的性能(顺便介绍怎么使用性能探测器分析UWP程序)
- 微信小程序通过api接口将json数据展现到小程序示例
- 有关回旋数组的程序
- 微信小程序实战–集阅读与电影于一体的小程序项目(八)
- [转]使用互斥对象让程序只运行一次(delphi)