zl程序教程

您现在的位置是:首页 >  Python

当前栏目

【网关开发】4.Openresty 使用events插件进行事件通知

2023-04-18 15:24:19 时间

背景

在某些业务场景下,比如数据更新,程序执行,只需要某个进程执行一次,但是其他进程需要知道本次执行的结果,所以就需要一个通知机制,主工作进程执行程序,执行之后的结果通知给其他进程,当然本质上也是通过共享内存进行处理。

场景逻辑

源码准备

本次插件来源依然是来自kong的插件
插件地址:https://github.com/Kong/lua-resty-worker-events
使用就是一个文件lib/resty/worker 下的events.lua 不需要编译与安装
将其放在自己的resty 目录下
本次代码地址:https://github.com/zhaoshoucheng/openresty/blob/main/pkg/lua_script/upstream/init.lua

主进程准备

我们需要一个worker进行任务的执行,以及推送event,为了与openresty的master做区别,我们可以将这个进行命名为main worker进程
如何确定main worker呢?因为所有worker在启动时都是平级的,所以我们可以任意选择worker作为main worker,所以我们利用第一个入共享内存数据的进程作为main worker。相当于一个锁。
每次启动时,因为master先执行,可以在这个阶段清空共享内存,启动其他进程时在重新选择main worker

local module_name = (...):match("(.-)[^%.]+$")
local cjson = require "cjson.safe"
local upstream_conf = require(module_name .. "config")
local upstream_shm = ngx.shared[upstream_conf.events_shm_name]
-- 这里是引入的配置文件
-- on_init 需要在master执行,也就是init_by_lua_file 中执行
local function on_init()
    -- 删除master 
    upstream_shm:delete("upstream_master")
end

-- 判断main worker的函数
local function is_master()
    local master = upstream_shm:get("upstream_master")
    if master then
        return false
    end
    upstream_shm:set("upstream_master","true")
    return true
end

程序中利用is_master()返回值判断是普通worker进程还是main worker

events程序

 -- worker init  事件配置等初始化。shm 是使用共享内存的名字 upstream_conf.events_shm_name = events
    local ev = require "resty.worker.events"
    local events_ok, err =
    ev.configure(
    {
        shm = upstream_conf.events_shm_name,
        timeout = 2, -- life time of unique event data in shm
        interval = 0.1, -- poll interval (seconds)
        wait_interval = 0.010, -- wait before retry fetching event data
        wait_max = 0.5, -- max wait time before discarding event
        shm_retries = 100 -- number of retries when the shm returns "no memory" on posting an event
    })
    if not events_ok then
        ngx.log(ngx.ERR, "failed to init events, err: "..tostring(err))
    end

  -- 定义事件列表
  local events = ev.event_list(
        upstream_conf.watch_path, -- available as _M.events._source
        "full_sync",                -- available as _M.events.full_sync
        "sync_keys"                  -- available as _M.events.sync_keys
  )
  -- 这里包含两个事件full_sync和sync_keys , _source 应该用任意字符串就可以
  
 --所有workers 注册事件处理函数
   local my_callback = function(data, event, source, pid)
        if event == events.full_sync then
            -- do sth
        elseif event == events.sync_keys then
            -- do sth
        end
        ngx.log(ngx.INFO,"get data event: "..cjson.encode(event).."data :"..data.."pid :"..tostring(pid).." now pid: "..tostring(ngx.worker.pid()))
    end
    ev.register(my_callback, events._source, events.full_sync)
-- ev.register(my_callback, events._source, events.sync_keys)

  -- master 发送事件
  if is_master() then
      local raise_event = function(p, event, data)
          ngx.log(ngx.INFO,"master post event ")
          return ev.post(events._source, event, data)
      end
      -- raise_event(nil, events.full_sync, "test_event")
      ngx.timer.at(0, raise_event,events.full_sync, "test_event")
  end 

测试

nginx 进程列表

思考与总结

直接执行 raise_event 和 ngx.timer.at(0) 有什么区别的?如果是ngx.timer.at(1) 有什么不一样的现象吗?
例如我们的master 启动4个worker进程,他们分别会如何打印 “get data event:......”的日志呢,可以观察pid实验一下。
我们上面的程序正常情况应该是4条(3条普通worker +1 条main worker)。

  1. 直接调用raise_event 会打印5条日志,其中1条是main worker ,另外4条的进程全部是is shutting down的进程,也就是正在销毁的4个进程还是会收到事件。这时另外3个进程还没有register成功,post event 自然不会收到数据。
  2. 使用ngx.timer.at(0) 会打印 5 ~8 条不等,因为是ngx.timer是启动lua协程启动,但是这并不能保证其他进程的register会成功,所以除了上面的5条外,还有部分成功register进程会收到数据
    3.将timer调长,由于我们的进程并没有太多处理工作,所以reload时is shutting down的进程很快就会退出,register也很快就会成功,所以一般会按照预想的打印4条

这个在数据同步等设计时可能需要考虑一下,否则可能会出现意想不到数据不同步的问题。
整体在插件使用时比较简单,只是在融合其他插件进行程序设计时需要多考虑一些。
后续会集合etcd、lmdb、cache、events 进行整体融合实现注册中心的功能。