zl程序教程

您现在的位置是:首页 >  云平台

当前栏目

01_RabbitMQ【简单的消息队列】

2023-09-14 09:14:08 时间

RabbitMQ Tutorials — RabbitMQhttps://rabbitmq.com/getstarted.html​​​​​​1. 简单模式

P:消息的生产者;

C:消息的消费者;

红色框:消息队列;

2. 创建连接

public class ConnectionUtil {
    public static void main(String[] args) throws IOException {
        System.out.println(getConnection());
    }
 
    public static Connection getConnection() throws IOException {
        //定义连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置服务地址
        // connectionFactory.setHost("192.168.91.128");
        //设置端口
        connectionFactory.setPort(5672);
        //设置账号信息:用户名、密码、vhost
        connectionFactory.setUsername("lisi");
        connectionFactory.setPassword("123456");
        connectionFactory.setVirtualHost("/ls");
        //通过工程获取连接
        Connection connection = connectionFactory.newConnection();
        return connection;
    }
}

3. 生产者向队列中发送消息

/*
 * 生产者
 * */
public class Product {
    public static String QUEUE_NAME = "simple";

    public static void main(String[] args) throws IOException {
        //获得连接
        Connection connection = ConnectionUtil.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //声明(创建)队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //消息内容
        String message = "hello rabbit";
        //发送消息
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");
        //关闭连接
        channel.close();
        connection.close();

    }
}

4. 消费者

/*
 * 消费者:接收消息
 * */
public class Consumer {
    public static String QUEUE_NAME = "simple";

    public static void main(String[] args) throws IOException, InterruptedException {
        //获得连接
        Connection connection = ConnectionUtil.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //声明(创建)队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //定义通道的消费者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        //监听队列(消费消息)
        channel.basicConsume(QUEUE_NAME, true, queueingConsumer);
        //获得消息
        while (true) {
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        }
    }
}

5. 访问浏览器