--use coroutine,watch realtime
|
local _M = {}
|
|
local json = require "cjson"
|
local shell = require "resty.shell"
|
|
local function print_map_keys()
|
local storage = _M.conf.storage
|
|
local keys = storage:get_keys(1024)
|
if #keys > 0 then
|
ngx.log(ngx.ERR, table.concat(storage:get_keys(1024),","))
|
else
|
ngx.log(ngx.ERR, "storage empty")
|
end
|
end
|
|
local function clean_keys(keys, storage)
|
for k, v in pairs(keys) do
|
if not v then
|
ngx.log(ngx.ERR, "delete:" .. k)
|
storage:delete(k)
|
end
|
end
|
end
|
|
local function get_nodes(key, storage)
|
local key_map = {}
|
local stor_keys = storage:get_keys(1024)
|
for k, v in pairs(stor_keys) do
|
key_map[v] = false
|
end
|
|
local status, stdout, err, reason, status = shell.run("docker exec etcd etcdctl get --prefix " .. key .. " -w=json")
|
--ngx.log(ngx.ERR, stdout)
|
|
local resp = json.decode(stdout)
|
|
if not resp then
|
ngx.log(ngx.ERR, "cli:get resp is nil")
|
return
|
end
|
|
local kvs = resp.kvs or {}
|
if not kvs or #kvs == 0 then
|
ngx.log(ngx.ERR, "resp.body.kvs is nil")
|
return
|
end
|
|
for i = 1, #kvs do
|
local kv = kvs[i]
|
if kv.value then
|
local node_key = ngx.decode_base64(kv.key)
|
local node_value = ngx.decode_base64(kv.value)
|
|
storage:set(node_key, node_value)
|
if key_map[node_key] ~= nil then
|
key_map[node_key] = true
|
end
|
end
|
end
|
|
clean_keys(key_map, storage)
|
|
-- 打印测试
|
print_map_keys()
|
end
|
|
local function watch(premature, tkey, storage)
|
get_nodes(tkey, storage)
|
|
local ok, err = ngx.timer.at(5, watch, tkey, storage)
|
if not ok then
|
ngx.log(ngx.ERR, "Restart watch err:"..err)
|
end
|
end
|
|
-- 在nginx.conf中设置一个全局的aps_nodes_map
|
-- 通过连接etcd, 并监听/aps/nodes/ 前缀的key, 将注册到etcd的apsServer节点添加到aps_nodes_map中
|
function _M.init(conf)
|
-- Only one worker start the syncer, here will use worker_id == 0
|
if ngx.worker.id() ~= 0 then
|
return
|
end
|
|
_M.conf = conf
|
|
local storage = _M.conf.storage
|
local data = storage:get("init")
|
if data then
|
ngx.log(ngx.ERR, "watch etcd already started")
|
return
|
else
|
storage:set("init", true)
|
end
|
|
local ok, err = ngx.timer.at(0, watch, conf.key_node, storage)
|
if not ok then
|
ngx.log(ngx.ERR, "Error start api watch:"..err)
|
end
|
end
|
|
return _M
|