zl程序教程

您现在的位置是:首页 >  后端

当前栏目

Python操作RabbitMQ

2023-09-11 14:13:58 时间

来源:http://www.cnblogs.com/phennry/p/5713274.html

本篇博客主要介绍如何通过Python来操作管理RabbitMQ消息队列,大家在工作中可能遇到很多类似RabbitMQ这种消息队列的中间件,如:ZeroMQ、ActiveMQ、MetaMQ等,我们学会了如何操作RabbitMQ的话基本上操作其他的队列都是一通百通。

 一、RabbitMQ安装

    RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统,它遵循Mozilla Pulic License开源协议。

MQ全称为Message Queue,消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用链接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

1,yum安装rabbitmq

#安装配置epel源
  rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
 
#安装Erlang
  yum -y insatll erlang
 
#安装RabbitMQ
  yum -y install rabbitmq-server
 
#注意:
   service rabbitmq-server start/stop

2,安装API

#pip安装:
  pip install pika
 
#源码安装:
  https://pypi.python.org/pypi/pika  #官网地址
之前我们在介绍线程,进程的时候介绍过python中自带的队列用法,下面我们通过一段代码复习一下:
#生产者消费者模型,解耦的意思就是两个程序之间,互相没有关联了,互不影响。
import Queue
import threading
import time
q = Queue.Queue(20)      #队列里最多存放20个元素
  
def productor(arg):            #生成者,创建30个线程来请求吃包子,往队列里添加请求元素
    q.put(str(arg) + '- 包子') 
  
for i in range(30):
    t = threading.Thread(target=productor,args=(i,))
    t.start()
  
def consumer(arg):       #消费者,接收到队列请求以后开始生产包子,来消费队列里的请求
    while True:
        print(arg,q.get())
        time.sleep(2)
  
for j in range(3):
    t = threading.Thread(target=consumer,args=(j,))
    t.start()

二、通过Python来操作RabbitMQ队列

     上面我们已经将环境装备好,下面我们通过Pika模块来对Rabbitmq队列来进行操作,对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。

1,基本用法

####################################生产者#####################################
 
import pika
 
connection=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.10.131'))  
#创建一个链接对象,对象中绑定rabbitmq的IP地址
 
 
channel=connection.channel()        #创建一个频道
 
channel.queue_declare(queue='name1')  #通过这个频道来创建队列,如果MQ中队列存在忽略,没有则创建
 
channel.basic_publish(exchange='',
                      routing_key='name1',   #指定队列名称
                      body='Hello World!')   #往该队列中发送一个消息
print(" [x] Sent 'Hello World!'")
connection.close()                           #发送完关闭链接
#####################################消费者######################################
 
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.10.131'))
#创建一个链接对象,对象中绑定rabbitmq的IP地址
 
channel = connection.channel()         #创建一个频道
 
channel.queue_declare(queue='name1')   #通过这个频道来创建队列,如果MQ中队列存在忽略,没有则创建
 
def callback(ch, method, properties, body):   #callback函数负责接收队列里的消息
    print(" [x] Received %r" % body)
 
channel.basic_consume(callback,              #从队列里去消息
                      queue='name1',         #指定队列名
                      no_ack=True)
 
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

acknowledgment 消息不丢失

   上面的例子中如果我们将no-ack=False ,那么当消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么RabbitMQ会重新将该任务添加到队列中。

import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.10.131'))
channel = connection.channel()
 
channel.queue_declare(queue='name1')
 
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    import time
    time.sleep(10)
    print('ok')
    ch.basic_ack(delivery_tag = method.delivery_tag)   #向生成者发送消费完毕的确认信息,然后生产者将此条消息同队列里剔除
 
channel.basic_consume(callback,
                      queue='name1',                             
                      no_ack=False)                    #如果no_ack=False,当消费者down掉了,RabbitMQ会重新将该任务添加到队列中
 
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

上例如果消费者中断后如果不超过10秒,重新链接的时候数据还在。当超过10秒之后,消费者往生产者发送了ack,重新链接的时候数据将消失。

durable消息不丢失

    消费者down掉后我们知道怎么处理了,如果我的RabbitMQ服务down掉了该怎么办呢?

消息队列是可以做持久化,如果我们在生产消息的时候就指定某条消息需要做持久化,那么RabbitMQ发现有问题时,就会将消息保存到硬盘,持久化下来。

####################################生产者#####################################
#!/usr/bin/env python
  
import pika
  
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.10.131'))
  
channel = connection.channel()
  
channel.queue_declare(queue='name2', durable=True)    #指定队列持久化
  
channel.basic_publish(exchange='',
                      routing_key='name2',
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode=2,            #指定消息持久化
                      ))
print(" [x] Sent 'Hello World!'")
connection.close()
#####################################消费者######################################
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika
  
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.10.131'))
  
channel = connection.channel()
  
channel.queue_declare(queue='name2', durable=True)
  
  
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    import time
    time.sleep(10)
    print('ok')
    ch.basic_ack(delivery_tag = method.delivery_tag)
  
channel.basic_consume(callback,
                      queue='name2',
                      no_ack=False)
  
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

消息获取顺序

    默认消息队列里的数据是按照顺序被消费者拿走的,例如:消费者1去队列中获取奇数序列任务,消费者2去队列中获取偶数序列的任务,消费者1处理的比较快而消费者2处理的比较慢,那么消费者1就会一直处于繁忙的状态,为了解决这个问题在需要加入下面代码:

channel.basic_qos(prefetch_count=1)  :表示谁来获取,不再按照奇偶数 排列

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 
channel = connection.channel()
 
channel.queue_declare(queue='name1')
 
 
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    import time
    time.sleep(10)
    print 'ok'
    ch.basic_ack(delivery_tag = method.delivery_tag)
 
channel.basic_qos(prefetch_count=1)
 
channel.basic_consume(callback,
                      queue='name1',
                      no_ack=False)
 
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

2,发布订阅

    发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,二发布者发布消息时,会将消息放置在所有相关队列中。

    在RabbitMQ中,所有生产者提交的消息都有Exchange来接收,然后Exchange按照特定的策略转发到Queue进行存储,RabbitMQ提供了四种Exchange:fanout、direct、topic、header。由于header模式在实际工作中用的比较少,下面主要对前三种进行比较。

exchange type = fanout :任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有Queue上

  ​为了方便理解,应用了上面这张图,可以清晰的看到相互之间的关系,当我们设置成fanout模式时,如何操作请看下面代码:

####################################发布者#####################################
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='test_fanout',
                         type='fanout')
 
message = '4456'
channel.basic_publish(exchange='test_fanout',
                      routing_key='',
                      body=message)
print(' [x] Sent %r' % message)
connection.close()
####################################订阅者#####################################
 
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='test_fanout',        #创建一个exchange
                         type='fanout')                 #任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有Queue上
 
#随机创建队列
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
 
#绑定
channel.queue_bind(exchange='test_fanout',
                   queue=queue_name)                    #exchange绑定后端队列
 
print('<------------->')
 
def callback(ch,method,properties,body):
    print(' [x] %r' % body)
 
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
channel.start_consuming()

exchange type = direct:任何发送到Direct Exchange的消息都会被转发到RouteKey中指定的Queue上(关键字发送)

   之前事例,发送消息时明确指定了某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据关键字发送到消息Exchange,Exchange根据关键字判定应该将数据发送至指定队列。



 发布者:

#!/usr/bin/env python
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 
channel = connection.channel()
 
channel.exchange_declare(exchange='direct_test',
                         type='direct')
 
severity = 'info'         #设置一个key,
message = '99999'
channel.basic_publish(exchange='direct_test',
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

​订阅者1:

#!/usr/bin/env python
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 
channel = connection.channel()
 
channel.exchange_declare(exchange='direct_test',
                         type='direct')
 
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
 
severities = ['error','info',]      #绑定队列,并发送关键字error,info
for severity in severities:
    channel.queue_bind(exchange='direct_test',
                       queue=queue_name,
                       routing_key=severity)
 
print(' [*] Waiting for logs. To exit press CTRL+C')
 
def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))
 
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
 
channel.start_consuming()
订阅者2:
#!/usr/bin/env python
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 
channel = connection.channel()
 
channel.exchange_declare(exchange='direct_test',
                         type='direct')
 
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
 
severities = ['error',]
for severity in severities:
    channel.queue_bind(exchange='direct_test',
                       queue=queue_name,
                       routing_key=severity)
 
print(' [*] Waiting for logs. To exit press CTRL+C')
 
def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))
 
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
 
channel.start_consuming()

结论:当我们将发布者的key设置成Error的时候两个队列对可以收到Exchange的消息,当我们将key设置成info后,只有订阅者1可以收到Exchange的消息。

 exchange type = topic:任何发送到Topic Exchange的消息都会被转发到所有关心RouteKey中指定话题的Queue上(模糊匹配)

在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入"路由值"和"关键字"进行匹配,匹配成功,则将数据发送到指定队列。

  • # :表示可以匹配0个或多个单词;

  • * :表示只能匹配一个单词。

#发送路由值        队列中
www.cnblogs.com    www.* --->#无法匹配
www.cnblogs.com    www.# --->#匹配成功
发布者:
#!/usr/bin/env python
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 
channel = connection.channel()
 
channel.exchange_declare(exchange='topic_logs',
                         type='topic')
 
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
 
message = ' '.join(sys.argv[2:]) or 'Hello World!'
 
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
 
connection.close()
 
 
#执行方式: 
python xxx.py name1   #name1为routing_key
订阅者:
#!/usr/bin/env python
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 
channel = connection.channel()
 
channel.exchange_declare(exchange='topic_logs',
                         type='topic')
 
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
 
binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)
 
for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)
 
print(' [*] Waiting for logs. To exit press CTRL+C')
 
def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))
 
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
 
channel.start_consuming()
 
#执行方式:
python xxx,py name1

更多相关内容请参考以下连接:

http://www.rabbitmq.com/documentation.html

http://blog.csdn.net/songfreeman/article/details/50945025



python采用pika库使用rabbitmq总结,多篇笔记和示例

http://blog.csdn.net/chenjiebin/article/details/8253433

这一段时间学习了下rabbitmq,在学习的过程中,发现国内关于Python采用pika库使用rabbitmq的资料很少,官网有这方面的资料,不过是都英文的。于是笔者结合自己的理解,就这方面内容写了一些示例,总共有七篇笔记,分享出来。

笔记依次是循序渐进的,笔记内贴出的代码笔者都实际运行过,运行系统ubuntu 12.04,rabbitmq版本是2.7.1,python版本是2.7.3。

因为笔记里提到一些名词,虽然叫法不一样,不过都是表达同样的事物,所以有必要先说明下,以免产生疑惑。主要是两个名词:

  • producer 直译为生成者,就是产生消息的东东,笔记里提到的发送者、发送端都是一个意思。如果把消息比喻成任务,也可以理解为任务分配者。
  • consumer 直译为消费者,就是接收消息的东东 ,笔记里提到的接收者、接收端都是一个意思。如果把消息比喻成任务,也可以理解为工作者。

1、ubuntu安装rabbitmq和python的使用实现

这篇主要记录了在ubuntu下安装rabbitmq服务的过程和安装python pika库的过程,并演示了单向发送消息的工作方式。

2、python使用rabbitmq实例二,工作队列

继上一篇,演示了多个接收端情况下,消息发送的工作方式。

3、python使用rabbitmq实例三,交换机

前面两篇的示例,都只使用了一个队列,消息是依次发送给绑定到该队列的接收端。如果要广播出去,就要使用交换机,本篇演示了交换机的工作方式。

4、python使用rabbitmq实例四,路由键

第三篇的消息是广播出去的,所有接收端都会接收到,如果要精确指明消息的接收端,就要使用路由键,本篇主要演示了路由键的工作方式。

5、python使用rabbitmq实例五,路由键模糊匹配

第四篇的路由键是精确匹配的,有时用需要模糊匹配,本篇主要演示了路由键模糊匹配的工作方式。

6、python使用rabbitmq实例六,远程结果返回

前面五篇的消息都是发送出去就完事了,接收端并没有将结果返回给发送端。有些情况下需要接收端将接收到的消息处理后再返回给发送端,本篇演示了这种情况的处理方式。

7、python使用rabbitmq实例七,相互关联编号correlation id

上一篇只是发送单条消息,返回的结果自然是对应该条消息,但是如果同时发出多条消息,就会返回多个结果,如何将发送的消息和返回的结果一一对应起来呢?本篇演示了correlation id的工作方式,就是用来解决这个问题的。


使用python开发RabbitMQ应用

使用python开发RabbitMQ应用

(参考了RabbitMQ网站上提供的英文版本入门指南: http://www.rabbitmq.com/getstarted.html

 

测试环境:CentOS 6.2

1,测试环境准备

安装python(一般系统都自带了python)

安装RabbitMQ server可以参考前面的文章。

安装pika

使用pip安装的时候可能会报错:

importerror no module named pkg_resources

请用下面命令解决这个问题:

$ curl https://bitbucket.org/pypa/setuptools/raw/bootstrap/ez_setup.py | python

然后还可能出现:

pkg_resources.distributionnotfound pip==1.4.1

这时候先把pip卸载掉, sudo yum remove python-pip

然后去下载最新的get-pip.py文件,python get-pip.py安装

在/etc/profile里面将/usr/local/python27/bin加入PATH最前面

 

把rabbitmq server启动一下和准备好测试目录rabbitmq_app:

$ /usr/local/rabbitmq/sbin/rabbitmq-server -detached

$ cd ~

$ mkdir -p test /rabbitmq_app

$ cd test /rabbitmq_app

$ mkdir tut1 tut2 tut3 tut4 tut5 tut6

2,实例一:来个hello world程序

$ cd tut1

$ vim send.py (代码如下)

$ vim receive.py (代码如下)

首先是消息发送程序: send.py

Python代码   收藏代码
  1. #!/usr/bin/env python  
  2. # -*- coding: utf-8 -*-  
  3. import sys  
  4. import pika  
  5. connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))  
  6. channel = connection.channel()  
  7. channel.queue_declare(queue = 'hello')  
  8. if len (sys.argv) < 2 :  
  9.      print 'message is empty!'  
  10.      sys.exit(0)  
  11. message = sys.argv[1]  
  12. channel.basic_publish(exchange = '', routing_key='hello', body = message)  
  13. print "[x] sent: '" + message + "'\n"  
  14. connection.close()  

 

跑一下send.py发送一个消息

$ python send.py 'Hello World!'

$ python send.py '你好刀哥'

$ /usr/local/rabbitmq/sbin/rabbitmqctl list_queues

Listing queues ...

hello   2

... done .

如果你也看到hello队列里面有一个消息的话,就证明可以发消息了。

然后写一个接收消息脚本:receive.py

Python代码   收藏代码
  1. #!/usr/bin/env python  
  2. # -*- coding: utf-8 -*-  
  3. import pika  
  4. connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost' ))  
  5. channel = connection.channel()  
  6. channel.queue_declare(queue = 'hello' )  
  7. print '[*] Waiting for messages. To exit press CTRL+C'  
  8.    
  9. def callback(ch, method, properties, body):  
  10.      print body  
  11.    
  12. channel.basic_consume(callback, queue = 'hello' , no_ack = True )  
  13. channel.start_consuming()  

 

其中第12行的 no_ack=True 表示消费完了这个消息以后不主动把完成状态通知rabbitmq。

然后开另外一个shell,执行一下receive.py

$ python receive.py

[*] Waiting for messages. To exit press CTRL+C

Hello World!

你好刀哥

 

3,实例二:工作队列(work queue / task queue)

一般应用于把比较耗时的任务从主线任务分离出来。比如一个http页面请求,里面需要发送带大附件的邮件、或者是要处理一张头像图片等。这类型工作队列的 处理端一般有多个worker进程,分担队列里面的任务。这就有点负载均衡的策略在里面了。尽量做到每个进程的工作量比较平均,而且是完成了一个任务才接 第二个任务。看看我们的实现吧。

$ cd tut2

$ vim manager.py (代码如下)

$ vim worker.py (代码如下)

首先是消息发送程序: manager.py

Python代码   收藏代码
  1. #!/usr/bin/env python  
  2. # -*- coding: utf-8 -*-  
  3. import pika  
  4. import sys  
  5. parameters = pika.ConnectionParameters(host = 'localhost' )  
  6. connection = pika.BlockingConnection(parameters)  
  7. channel = connection.channel()  
  8. channel.queue_declare(queue = 'task_queue' , durable = True )  
  9. message = ' ' .join(sys.argv[ 1 :]) or "Hello World!"  
  10. channel.basic_publish(exchange = '',  
  11.                        routing_key = 'task_queue' ,  
  12.                        body = message,  
  13.                        properties = pika.BasicProperties(  
  14.                           delivery_mode = 2 , # make message persistent  
  15.                        ))  
  16. print " [x] Sent %r" % (message,)  
  17. connection.close()  

 

其中第8行的 durable=True 声明了队列需要持久化,第14行的 delivery_mode = 2 声明了队列的消息需要持久化。

 

然后写一个接收消息脚本:worker.py

Python代码   收藏代码
  1. #!/usr/bin/env python  
  2. # -*- coding: utf-8 -*-  
  3. import pika  
  4. import time  
  5. connection = pika.BlockingConnection(pika.ConnectionParameters(  
  6.          host = 'localhost' ))  
  7. channel = connection.channel()  
  8. channel.queue_declare(queue = 'task_queue' , durable = True )  
  9. print ' [*] Waiting for messages. To exit press CTRL+C'  
  10.    
  11. def callback(ch, method, properties, body):  
  12.      print " [x] Received %r" % (body,)  
  13.      time.sleep( body.count( '.' ) )  
  14.      print " [x] Done"  
  15.      ch.basic_ack(delivery_tag = method.delivery_tag)  
  16.    
  17. channel.basic_qos(prefetch_count = 1 )  
  18. channel.basic_consume(callback,  
  19.                        queue = 'task_queue' )  
  20. channel.start_consuming()  

 

其中第15行的 basic_ack 是执行完任务通知rabbitmq,第17行的basic_qos是告诉rabbitmq只有当worker完成了任务以后才分派1条新的消息,实现公平分派。

测试方法,开3个bash,2个跑worker,1个跑manager:

$ python manager.py task1.

$ python manager.py task2..

$ python manager.py task3...

$ python manager.py task4....

点号数量决定worker工作的时间( 其实是睡觉时间,呵呵 time.sleep(body.count('.')) )。

而在worker那边,可以看到每个worker都处理了两个任务。

这种分配机制就是所谓的循环调度(Round-robin dispatching)

 

4,实例三:发布和订阅

发布订阅模式,简单来说就像是广播,一个消息发布出来以后,所有订阅者都能听到,至于接收到这个信息以后大家做什么就看具体个人了。

 

啊!怎么忽然冒出个X,是什么玩意!这个X就是所谓的exchange,简单来说就是消息的管家,由他决定接收到的信息是放特定的队列,还是所有队列,还是直接丢弃。

其实在前两个实例里面,已经用到了exchange (channel.basic_publish(exchange='',...),这个exchange的名字为空,外号无名(人若无名,便可专心练剑~)。他会把你的消息都转达给routing_key指明的队列。

当我们声明了exchange以后,我们需要为queue和exchange建立联系,这时候,就要用到绑定(binding)了。

$ cd tut3

$ vim emitlog.py (代码如下)

$ vim recelog.py (代码如下)

emitlog.py

Python代码   收藏代码
  1. #!/usr/bin/env python  
  2. import pika  
  3. import sys  
  4.    
  5. connection = pika.BlockingConnection(pika.ConnectionParameters(  
  6.          host = 'localhost' ))  
  7. channel = connection.channel()  
  8.    
  9. channel.exchange_declare(exchange = 'logs' ,  
  10.                           type = 'fanout' )  
  11.    
  12. message = ' ' .join(sys.argv[ 1 :]) or "info: Hello World!"  
  13. channel.basic_publish(exchange = 'logs' ,  
  14.                        routing_key = '',  
  15.                        body = message)  
  16. print " [x] Sent %r" % (message,)  
  17. connection.close()  

 

recelog.py

Python代码   收藏代码
  1. #!/usr/bin/env python  
  2. import pika  
  3.    
  4. connection = pika.BlockingConnection(pika.ConnectionParameters(  
  5.          host = 'localhost' ))  
  6. channel = connection.channel()  
  7. channel.exchange_declare(exchange = 'logs' ,  
  8.                           type = 'fanout' )  
  9. result = channel.queue_declare(exclusive = True )  
  10. queue_name = result.method.queue  
  11. channel.queue_bind(exchange = 'logs' ,  
  12.                     queue = queue_name)  
  13. print ' [*] Waiting for logs. To exit press CTRL+C'  
  14.    
  15. def callback(ch, method, properties, body):  
  16.      print " [x] %r" % (body,)  
  17.    
  18. channel.basic_consume(callback,  
  19.                        queue = queue_name,  
  20.                        no_ack = True )  
  21. channel.start_consuming()  

 

测试:

和前一个实例差不多。开3个bash,2个跑recelog,1个跑emitlog。查看recelog是否都收到emitlog发送的消息。代码里面用 了一个fanout(意思是成扇形展开)类型的exchange,只要和exchange绑定的queue都能收到一份消息的 copy,routing_key会被忽略掉。

 

5,路由模式 (选择接收信息)

$ cd tut4

$ vim emitlog.py (代码如下)

$ vim recelog.py (代码如下)

emitlog.py

Python代码   收藏代码
  1. #!/usr/bin/env python  
  2. import pika  
  3. import sys  
  4.    
  5. connection = pika.BlockingConnection(pika.ConnectionParameters(  
  6.          host = 'localhost' ))  
  7. channel = connection.channel()  
  8. channel.exchange_declare(exchange = 'direct_logs' ,  
  9.                           type = 'direct' )  
  10. severity = sys.argv[ 1 ] if len (sys.argv) > 1 else 'info'  
  11. message = ' ' .join(sys.argv[ 2 :]) or 'Hello World!'  
  12. channel.basic_publish(exchange = 'direct_logs' ,  
  13.                        routing_key = severity,  
  14.                        body = message)  
  15. print " [x] Sent %r:%r" % (severity, message)  
  16. connection.close()  

 

这里声明exchange时类型定义为direct(直接匹配),就是说只有当一个信息的routing_key和队列的binding_key一 致时,信息才会被放入到这个队列。消息发布给exchange时必须带上routing_key。其实在消息生产端,队列这个概念是透明的。

 

recelog.py

Python代码   收藏代码
  1. #!/usr/bin/env python  
  2. import pika  
  3. import sys  
  4.    
  5. connection = pika.BlockingConnection(pika.ConnectionParameters(  
  6.          host = 'localhost' ))  
  7. channel = connection.channel()  
  8.    
  9. channel.exchange_declare(exchange = 'direct_logs' ,  
  10.                           type = 'direct' )  
  11. result = channel.queue_declare(exclusive = True )  
  12. queue_name = result.method.queue  
  13. severities = sys.argv[ 1 :]  
  14. if not severities:  
  15.      print >> sys.stderr, "Usage: %s [info] [warning] [error]" % \  
  16.                           (sys.argv[ 0 ],)  
  17.      sys.exit( 1 )  
  18. for severity in severities:  
  19.      channel.queue_bind(exchange = 'direct_logs' ,  
  20.                         queue = queue_name,  
  21.                         routing_key = severity)  
  22. print ' [*] Waiting for logs. To exit press CTRL+C'  
  23. def callback(ch, method, properties, body):  
  24.      print " [x] %r:%r" % (method.routing_key, body,)  
  25. channel.basic_consume(callback,  
  26.                        queue = queue_name,  
  27.                        no_ack = True )  
  28. channel.start_consuming()  

 

这里首先定义exchange,和消息发送端是一样的。然后定义队列,队列是自动命名,并且只要进程终止,队列就会终止。然后把队列和 exchange绑定,绑定时的routing_key是用户输入的,如果输入多个key,就做多次的绑定。注意这里的队列还是一个。如果你需要建立两个 队列,就得跑两次这个python脚本。

 

6,topic和rpc

官方tutorial还有两个高级一点的实例,topic和rpc,这里就不作说明了,留着大家学学英文吧 :)

RabbitMQ提供了很多消息队列客户端代码,比如python,java,c等等,大家可以根据产品或项目的实际情况选择。关键是原理必须搞懂。

 

其他资源:

中文入门篇:  http://adamlu.net/dev/2011/09/rabbitmq-get-started/


利用Python学习RabbitMQ消息队列

RabbitMQ可以当做一个消息代理,它的核心原理非常简单:即接收和发送消息,可以把它想象成一个邮局:我们把信件放入邮箱,邮递员就会把信件投递到你的收件人处,RabbitMQ就是一个邮箱、邮局、投递员功能综合体,整个过程就是:邮箱接收信件,邮局转发信件,投递员投递信件到达收件人处。

RabbitMQ和邮局的主要区别就是RabbitMQ接收、存储和发送的是二进制数据----消息。

rabbitmq基本管理命令:

一步启动Erlang node和Rabbit应用:sudo rabbitmq-server

在后台启动Rabbit node:sudo rabbitmq-server -detached

关闭整个节点(包括应用):sudo rabbitmqctl stop

add_user <UserName> <Password>
delete_user <UserName>
change_password <UserName> <NewPassword>
list_users
add_vhost <VHostPath>
delete_vhost <VHostPath>
list_vhosts
set_permissions [-p <VHostPath>] <UserName> <Regexp> <Regexp> <Regexp>
clear_permissions [-p <VHostPath>] <UserName>
list_permissions [-p <VHostPath>]
list_user_permissions <UserName>
list_queues [-p <VHostPath>] [<QueueInfoItem> ...]
list_exchanges [-p <VHostPath>] [<ExchangeInfoItem> ...]
list_bindings [-p <VHostPath>]
list_connections [<ConnectionInfoItem> ...]

Demo:

producer.py

#!/usr/bin/env python
# -*- coding: utf_ -*-
# Date: 年月日
# Author:蔚蓝行
# 博客 http://www.cnblogs.com/duanv/
import pika
import sys
#创建连接connection到localhost
con = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
#创建虚拟连接channel
cha = con.channel()
#创建队列anheng,durable参数为真时,队列将持久化;exclusive为真时,建立临时队列
result=cha.queue_declare(queue='anheng',durable=True,exclusive=False)
#创建名为yanfa,类型为fanout的exchange,其他类型还有direct和topic,如果指定durable为真,exchange将持久化
cha.exchange_declare(durable=False,
          exchange='yanfa',
          type='direct',)
#绑定exchange和queue,result.method.queue获取的是队列名称
cha.queue_bind(exchange='yanfa', 
       queue=result.method.queue,
       routing_key='',) 
#公平分发,使每个consumer在同一时间最多处理一个message,收到ack前,不会分配新的message
cha.basic_qos(prefetch_count=)
#发送信息到队列‘anheng'
message = ' '.join(sys.argv[:])
#消息持久化指定delivery_mode=;
cha.basic_publish(exchange='',
         routing_key='anheng',
         body=message,
         properties=pika.BasicProperties(
          delivery_mode = ,
        ))
print '[x] Sent %r' % (message,)
#关闭连接
con.close()

consumer.py

#!/usr/bin/env python
# -*- coding: utf_ -*-
# Date: 年月日
# Author:蔚蓝行
# 博客 http://www.cnblogs.com/duanv/
import pika
#建立连接connection到localhost
con = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
#创建虚拟连接channel
cha = con.channel()
#创建队列anheng
result=cha.queue_declare(queue='anheng',durable=True)
#创建名为yanfa,类型为fanout的交换机,其他类型还有direct和topic
cha.exchange_declare(durable=False,
          exchange='yanfa', 
          type='direct',)
#绑定exchange和queue,result.method.queue获取的是队列名称
cha.queue_bind(exchange='yanfa',
       queue=result.method.queue,
       routing_key='',)
#公平分发,使每个consumer在同一时间最多处理一个message,收到ack前,不会分配新的message
cha.basic_qos(prefetch_count=)
print ' [*] Waiting for messages. To exit press CTRL+C'
#定义回调函数
def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
  ch.basic_ack(delivery_tag = method.delivery_tag)
cha.basic_consume(callback,
         queue='anheng',
         no_ack=False,)
cha.start_consuming()

一、概念:

Connection: 一个TCP的连接。Producer和Consumer都是通过TCP连接到RabbitMQ Server的。程序的起始处就是建立这个TCP连接。

Channels: 虚拟连接。建立在上述的TCP连接中。数据流动都是在Channel中进行的。一般情况是程序起始建立TCP连接,第二步就是建立这个Channel。

二、队列:

首先建立一个Connection,然后建立Channels,在channel上建立队列

建立时指定durable参数为真,队列将持久化;指定exclusive为真,队列为临时队列,关闭consumer后该队列将不再存在,一般情况下建立临时队列并不指定队列名称,rabbitmq将随机起名,通过result.method.queue来获取队列名:

result = channel.queue_declare(exclusive=True)

result.method.queue

区别:durable是队列持久化与否,如果为真,队列将在rabbitmq服务重启后仍存在,如果为假,rabbitmq服务重启前不会消失,与consumer关闭与否无关;

而exclusive是建立临时队列,当consumer关闭后,该队列就会被删除

三、exchange和bind

Exchange中durable参数指定exchange是否持久化,exchange参数指定exchange名称,type指定exchange类型。Exchange类型有direct,fanout和topic。

Bind是将exchange与queue进行关联,exchange参数和queue参数分别指定要进行bind的exchange和queue,routing_key为可选参数。

Exchange的三种模式:

Direct:

任何发送到Direct Exchange的消息都会被转发到routing_key中指定的Queue

1.一般情况可以使用rabbitMQ自带的Exchange:””(该Exchange的名字为空字符串);

2.这种模式下不需要将Exchange进行任何绑定(bind)操作;

3.消息传递时需要一个“routing_key”,可以简单的理解为要发送到的队列名字;

4.如果vhost中不存在routing_key中指定的队列名,则该消息会被抛弃。

Demo中虽然声明了一个exchange='yanfa'和queue='anheng'的bind,但是在后面发送消息时并没有使用该exchange和bind,而是采用了direct的模式,没有指定exchange,而是指定了routing_key的名称为队列名,消息将发送到指定队列。

如果一个exchange 声明为direct,并且bind中指定了routing_key,那么发送消息时需要同时指明该exchange和routing_key.

Fanout:

任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有Queue上

1.可以理解为路由表的模式

2.这种模式不需要routing_key

3.这种模式需要提前将Exchange与Queue进行绑定,一个Exchange可以绑定多个Queue,一个Queue可以同多个Exchange进行绑定。

4.如果接受到消息的Exchange没有与任何Queue绑定,则消息会被抛弃。

Demo中创建了一个将一个exchange和一个queue进行fanout类型的bind.但是发送信息时没有用到它,如果要用到它,只要在发送消息时指定该exchange的名称即可,该exchange就会将消息发送到所有和它bind的队列中。在fanout模式下,指定的routing_key是无效的 。

Topic:

任何发送到Topic Exchange的消息都会被转发到所有关心routing_key中指定话题的Queue上

1.这种模式较为复杂,简单来说,就是每个队列都有其关心的主题,所有的消息都带有一个“标题”(routing_key),Exchange会将消息转发到所有关注主题能与routing_key模糊匹配的队列。

2.这种模式需要routing_key,也许要提前绑定Exchange与Queue。

3.在进行绑定时,要提供一个该队列关心的主题,如“#.log.#”表示该队列关心所有涉及log的消息(一个routing_key为”MQ.log.error”的消息会被转发到该队列)。

4.“#”表示0个或若干个关键字,“*”表示一个关键字。如“log.*”能与“log.warn”匹配,无法与“log.warn.timeout”匹配;但是“log.#”能与上述两者匹配。

5.同样,如果Exchange没有发现能够与routing_key匹配的Queue,则会抛弃此消息。

四、任务分发

1.Rabbitmq的任务是循环分发的,如果开启两个consumer,producer发送的信息是轮流发送到两个consume的。

2.在producer端使用cha.basic_publish()来发送消息,其中body参数就是要发送的消息,properties=pika.BasicProperties(delivery_mode = 2,)启用消息持久化,可以防止RabbitMQ Server 重启或者crash引起的数据丢失。

3.在接收端使用cha.basic_consume()无限循环监听,如果设置no-ack参数为真,每次Consumer接到数据后,而不管是否处理完成,RabbitMQ Server会立即把这个Message标记为完成,然后从queue中删除了。为了保证数据不被丢失,RabbitMQ支持消息确认机制,即acknowledgments。为了保证数据能被正确处理而不仅仅是被Consumer收到,那么我们不能采用no-ack。而应该是在处理完数据后发送ack。

在处理数据后发送的ack,就是告诉RabbitMQ数据已经被接收,处理完成,RabbitMQ可以去安全的删除它了。如果Consumer退出了但是没有发送ack,那么RabbitMQ就会把这个Message发送到下一个Consumer。这样就保证了在Consumer异常退出的情况下数据也不会丢失。

这里并没有用到超时机制。RabbitMQ仅仅通过Consumer的连接中断来确认该Message并没有被正确处理。也就是说,RabbitMQ给了Consumer足够长的时间来做数据处理。

Demo的callback方法中ch.basic_ack(delivery_tag = method.delivery_tag)告诉rabbitmq消息已经正确处理。如果没有这条代码,Consumer退出时,Message会重新分发。然后RabbitMQ会占用越来越多的内存,由于RabbitMQ会长时间运行,因此这个“内存泄漏”是致命的。去调试这种错误,可以通过一下命令打印un-acked Messages:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

4.公平分发:设置cha.basic_qos(prefetch_count=1),这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理一个Message。换句话说,在接收到该Consumer的ack前,他它不会将新的Message分发给它。

五、注意:

生产者和消费者都应该声明建立队列,网上教程上说第二次创建如果参数和第一次不一样,那么该操作虽然成功,但是queue的属性并不会被修改。

可能因为版本问题,在我的测试中如果第二次声明建立的队列属性和第一次不完全相同,将报类似这种错406, "PRECONDITION_FAILED - parameters for queue 'anheng' in vhost '/' not equivalent"

如果是exchange第二次创建属性不同,将报这种错406, "PRECONDITION_FAILED - cannot redeclare exchange 'yanfa' in vhost '/' with different type, durable, internal or autodelete value"

如果第一次声明建立队列也出现这个错误,说明之前存在名字相同的队列且本次声明的某些属性和之前声明不同,可通过命令sudo rabbitmqctl list_queues查看当前有哪些队列。解决方法是声明建立另一名称的队列或删除原有队列,如果原有队列是非持久化的,可通过重启rabbitmq服务删除原有队列,如果原有队列是持久化的,只能删除它所在的vhost,然后再重建vhost,再设置vhost的权限(先确认该vhost中没有其他有用队列)。

sudo rabbitmqctl delete_vhost /
sudo rabbitmqctl add_vhost /
sudo rabbitmqctl set_permissions -p / username '.*' '.*' '.*'

以上内容是小编给大家介绍的利用Python学习RabbitMQ消息队列,希望大家喜欢。