zl程序教程

您现在的位置是:首页 >  后端

当前栏目

Python中使用Redis的批处理工具pipeline(这种方法从底层思考效率还是低于“订阅发布机制”)

PythonRedis方法效率工具 发布 机制 还是
2023-09-11 14:14:44 时间

一、pipeline出现的原因

1.Redis执行命令的过程

redis客户端执行一条命令的过程:

发送命令-〉命令排队-〉命令执行-〉返回结果

使用python给redis发送命令时的过程:

  1. 客户端发送请求,获取socket,阻塞等待返回;
  2. 服务端执行命令并将结果返回给客户端;

2.效率提升

当redis需要执行的命令较多时,这样的一来一回的网络传输所消耗的时间被称为RTT(Round Trip Time),显而易见,如果可以将这些命令作为一个请求一次性发送给服务端,并一次性将结果返回客户端,会节约很多网络传输的消耗,可以大大提升响应时间。因此我们python中通过pipeline来进行效率提升。

二、pepeline的性能

1、未使用pipeline执行N条命令

2、使用了pipeline执行N条命令

两种方式中:使用Pipeline执行速度比逐条执行要快,特别是客户端与服务端的网络延迟越大,性能体能越明显

三、原生批命令与Pipeline对比

原始批命令:(mset, mget)

  • 原生批命令是原子性,pipeline是非原子性(原子性概念:一个事务是一个不可分割的最小工作单位,要么都成功要么都失败)
  • 原生批命令一命令多个key, 但pipeline支持多命令(存在事务),非原子性
  • 原生批命令是服务端实现,而pipeline需要服务端与客户端共同完成

四、pipeline的简单使用

1.简单的使用

# coding:utf-8
import redis

r = redis.StrictRedis.from_url('redis://127.0.0.1/0')
# 创建管道对象
pipe = r.pipeline()
pipe.set('name', '张三')
pipe.set('age', 15)
pipe.set('gender', '男')
# 执行
pipe.execute()

2.pipeline支持命令写在一起

# coding:utf-8
import redis

r = redis.StrictRedis.from_url('redis://127.0.0.1/0')
pipe = r.pipeline()
# pipeline支持命令写在一起
pipe.set('hello', 'redis').sadd('faz', 'baz').incr('num').execute()

3.pipeline配合上下文管理器

# coding:utf-8
import redis

r = redis.StrictRedis.from_url('redis://127.0.0.1/0')
# 创建管道对象,一般配合上下文管理器使用
with r.pipeline() as pipe:
    pipe.set('name', '张三')
    pipe.set('age1', '12')
    pipe.set('age2', '121')
    pipe.set('age3', '1212')
    pipe.set('age4', '12121')
    try:
        # 执行
        pipe.execute()
    except Exception as e:
        print e

4.批量接收pipeline的值

# coding:utf-8
import redis

r = redis.StrictRedis.from_url('redis://127.0.0.1/0')

# 往num_list中添加100个值
with r.pipeline() as pipe:
    for i in range(1, 101):
        pipe.lpush('num_list'.format(i), i)
    try:
        pipe.execute()
    except Exception as e:
        print e

# 从num_list中取出所有值
num_len=r.llen('num_list')
with r.pipeline() as pipe:
    for i in range(num_len):
        pipe.rpop('num_list')
    try:
        result = pipe.execute()
        print result
    except Exception as e:
        print e

批量接收pipeline的值会以列表形式返回

5.pipeline配合事务的操作

默认pipeline中支持事务,若想关闭事务,则创建pipeline的时候:

pipe = r.pipeline(transaction=False)

错误总结

①开启事务时候命令出错

把set写成settt造成语法错误

# coding:utf-8
import redis

r = redis.StrictRedis.from_url('redis://127.0.0.1/0')
# 创建管道对象,一般配合上下文管理器使用
with r.pipeline() as pipe:
    pipe.set('name', '张三')
    pipe.settt('age',18)
    try:
        # 执行
        pipe.execute()
    except Exception as e:
        print e

语法错误,整个事务无法执行,控制台报错,数据库也不会执行。

②开启事务时候运行出错

错将str求长度写成列表求长度的命令,在redis执行过程中报错

# coding:utf-8
import redis

r = redis.StrictRedis.from_url('redis://127.0.0.1/0')
# 创建管道对象,一般配合上下文管理器使用
with r.pipeline() as pipe:
    pipe.set('name', '张三')
    pipe.llen('name')
    pipe.set('name2', '李四')
    try:
        # 执行
        pipe.execute()
    except Exception as e:
        print e

在执行过程中发现命令报错,那么命令错误的语句无法执行,不会影响其他命令

未改进的小例子

import redis
import time
import json


host = "127.0.0.1"
port = 6379
password = "ics12345"
redis_list_name = "dga-metadata"

redis_conn = redis.Redis(host, port, 0, password, decode_responses=True)
pipe = redis_conn.pipeline()
count = 0
www = 10000
while www:
    #data = redis_conn.rpop(redis_list_name)
    pipe.rpop(redis_list_name)
    #print(data)
    www -= 1
    count += 1
    if count % 1000 == 0:
        print(count)
data = pipe.execute()
print("end", len(data))
aaa = []
for item in data:
    if not item is None:
        json.loads(item)
        aaa.append(item)
print("end2", len(aaa))