zl程序教程

您现在的位置是:首页 >  数据库

当前栏目

在微信小程序上做一个「博客园年度总结」:使用redis存储数据

Redis存储微信程序数据 使用 一个 总结
2023-06-13 09:17:13 时间

前面写过一篇博客:解决前端获取接口数据太慢的一种思路

当时是由于博客园接口获取数据比较慢,所以从博客园拿到数据后,先把数据存到一个文件中,再从文件中读取数据,这样就不必每次都请求接口了

本次用redis来实现这个功能:把数据存储到redis中,再从redis中读取

1、本地安装redis

因为是在本地进行调试,所以要先在自己的电脑中安装redis

mac下安装redis可参考:

https://www.jianshu.com/p/2972c8e6d2f6

2、安装rdm

为了方便查看redis,安装一个rdm软件,下载传送门:

链接: https://pan.baidu.com/s/1HSydI8sthcJ1ZgbIutfiAQ 密码: 3ehk

3、使用redis

关于如何在python中使用redis,可以参考这篇博客,写的比较清楚,传送门:

https://zhuanlan.zhihu.com/p/374381314

(1)定义操作redis的基本方法

先创建一个conf_redis_db.py文件

# coding: utf-8
"""
author: hmk
detail: 
create_time: 
"""

import redis

r = redis.Redis(host="127.0.0.1", port=6379, decode_responses=True, charset='UTF-8', encoding='UTF-8')

# r.set("name", "测试", ex=50)
# print(r.get("name"))

EXPIRES_TIME = 86400  # 过期时间,s


def set_redis_data(key, data):
    """redis写入数据"""
    r.set(name=key, value=data, ex=EXPIRES_TIME)


def get_redis_data(key):
    """redis读取数据"""
    data = r.get(key)
    return data

(2)把博客园接口数据存储到redis中

修改 get_cnblogs_data.py

博客园的数据是通过get_blogs_api()方法获取的,如下

    def get_blogs_api(self, blog_name):
        """获取个人随笔列表接口"""
        
        flag = True
        try:
            blogs = []
            i = 1
            while flag is True:

                url = "https://api.cnblogs.com/api/blogs/{}/posts?pageIndex={}".format(blog_name, i)
                res = requests.get(url=url, headers=self.headers)
                data = res.json()

                if data:
                    # 如果接口有返回数据,就把数据追加到blogs中,同时页码+1
                    blogs += data
                    i += 1
                else:
                    # 如果接口返回空,说明当前传入的页码已经没有没有数据了,结束循环
                    # print(data)
                    flag = False

            new_blogs = list(map(self.deal_blogs, blogs))  # 调用map函数处理博客原始数据

            first_blog = new_blogs[-1]  # 发布的第一篇博客
 
            sort_blogs = sorted(new_blogs, key=lambda item: item["ViewCount"], reverse=True)  # 按照ViewCount排序,降序

            view_max_10 = sort_blogs[0:5]  # 浏览量前10的文章

            """提取2022年的月度数据并处理"""
            blog_date1 = [i["PostDate"][0:7] for i in new_blogs]  # 提取每条数据的年月,组成一个列表
            temp = Counter(blog_date1)
            month_blog_date = dict(temp)

            months = ["2022-01", "2022-02", "2022-03", "2022-04", "2022-05", "2022-06",
                      "2022-07", "2022-08", "2022-09", "2022-10", "2022-11", "2022-12"]

            month_result = []  # 2022年每月博客新增数量

            for j in months:  # 遍历日期范围列表
                if j in month_blog_date:
                    # 如果一个日期在bug列表中,说明这个日期有值,取bug字典中该日期的值赋给bug_num,同时date取当前日期,组合为一个字典
                    month_result.append({"date": j, "value": month_blog_date[j]})
                else:
                    # 否则这个日期对应的value=0
                    month_result.append({"date": j, "value": 0})

            now_year_blog_sum = sum([i["value"] for i in month_result])  # 2022年新增博客总数

            """提取年度数据并处理"""
            blog_date2 = [i["PostDate"][0:4] for i in new_blogs]  # 提取每条数据的年,组成一个列表
            year_blog_date = dict(Counter(blog_date2))
            begin_year = first_blog["PostDate"][0:4]  # 取发布的第一篇博客所在的年份,因为这就是博客起始年份
            end_year = get_now_year()  # 取当年年份为结束年份

            date_gap = int(end_year) - int(begin_year) + 1  # 计算年份差

            years = []
            for i in range(date_gap):
                years.append(str(int(begin_year) + i))

            year_result = []  # 每年博客新增数量
            for j in years:  # 遍历日期范围列表
                if j in year_blog_date:
                    # 如果一个日期在bug列表中,说明这个日期有值,取bug字典中该日期的值赋给bug_num,同时date取当前日期,组合为一个字典
                    year_result.append({"date": j, "value": year_blog_date[j]})
                else:
                    # 否则这个日期对应的value=0
                    year_result.append({"date": j, "value": 0})


            res = {
                "first_blog": first_blog,  # 发布的第一篇博客
                "view_max_10": view_max_10,  # 浏览量前10的文章
                "now_year_blog_sum": now_year_blog_sum,  # 2022年新增博客总数
                "month_result": month_result,  # 2022年每月博客新增数量
                "year_result": year_result  # 每年博客新增数量
            }

            return res

        except Exception as e:
            raise e

再写一个方法把上述返回的数据存到redis中

from utils.conf_redis_db import *


def save_blogs(self, name):
    data = self.get_blogs_api(name)
    # print(data)

    set_redis_data("blogs_data", json.dumps(data, ensure_ascii=False))  # 需要将data转为字符串,同时防止中文乱码设置ensure_ascii

因为get_blogs_api()返回的数据为字典格式,不能直接放到redis中,需要转为字符串

这里使用json.dumps()进行转换,同时指定ensure_ascii为False,以避免中文乱码

执行这个方法后,redis中会存储blogs_data键,效果如下,

(3)调用redis数据

打开cnblog.py,修改GetBlogs方法,通过读取redis中的key获取数据

因为在向redis写入数据时,设置了过期时间,每隔24h会失效,当key失效时,我们会获取到的是null

所以需要判断这个情况,当key失效后,重新向redis写入数据

class GetBlogs(Resource):
    """接口:获取个人随笔列表"""

    """使用redis数据"""
    @staticmethod
    def get():
        blog_app = cn_blogs.conf["cn_blogs"]["blogApp"]  # request.args.get("blog_app")

        data = r.get("blogs_data")
        if data is None: # 如果从redis读取到空,说明key失效了,此时调用save_blogs()方法,重新向redis写入数据
            cn_blogs.save_blogs(blog_app)
            return r.get("blogs_data")
        else:
            return data