zhangqian
2023-12-26 a2667454aa39ea72b0e3660dca7dc4f468d712a2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
--use coroutine,watch realtime
local _M = {}
 
local json = require "cjson"
local shell = require "resty.shell"
 
local function print_map_keys()
    local storage = _M.conf.storage
 
    local keys = storage:get_keys(1024)
    if #keys > 0 then
        ngx.log(ngx.ERR, table.concat(storage:get_keys(1024),","))
    else 
        ngx.log(ngx.ERR, "storage empty")
    end
end
 
local function clean_keys(keys, storage)
    for k, v in pairs(keys) do
        if not v then
            ngx.log(ngx.ERR, "delete:" .. k)
            storage:delete(k)
        end
    end
end
 
local function get_nodes(key, storage)
    local key_map = {}
    local stor_keys = storage:get_keys(1024)
    for k, v in pairs(stor_keys) do
        key_map[v] = false
    end
 
    local status, stdout, err, reason, status = shell.run("docker exec etcd etcdctl get --prefix " .. key .. " -w=json")
    --ngx.log(ngx.ERR, stdout)
    
    local resp = json.decode(stdout)
 
    if not resp then
        ngx.log(ngx.ERR, "cli:get resp is nil")
        return
    end
 
    local kvs = resp.kvs or {}
    if not kvs or #kvs == 0 then
        ngx.log(ngx.ERR, "resp.body.kvs is nil")
        return
    end
    
    for i = 1, #kvs do
        local kv = kvs[i]
        if kv.value then
            local node_key = ngx.decode_base64(kv.key)
            local node_value = ngx.decode_base64(kv.value)
            
            storage:set(node_key, node_value)
            if key_map[node_key] ~= nil then
                key_map[node_key] = true
            end
        end
    end
 
    clean_keys(key_map, storage)
 
    -- 打印测试
    print_map_keys()
end
 
local function watch(premature, tkey, storage)    
    get_nodes(tkey, storage)
 
    local ok, err = ngx.timer.at(5, watch, tkey, storage)
    if not ok then
        ngx.log(ngx.ERR, "Restart watch err:"..err)
    end
end
 
-- 在nginx.conf中设置一个全局的aps_nodes_map
-- 通过连接etcd, 并监听/aps/nodes/ 前缀的key, 将注册到etcd的apsServer节点添加到aps_nodes_map中
function _M.init(conf)
    -- Only one worker start the syncer, here will use worker_id == 0
    if ngx.worker.id() ~= 0 then
        return
    end
 
    _M.conf = conf
 
    local storage = _M.conf.storage
    local data = storage:get("init")
    if data then
        ngx.log(ngx.ERR, "watch etcd already started")
        return
    else 
        storage:set("init", true)
    end
 
    local ok, err = ngx.timer.at(0, watch, conf.key_node, storage)
    if not ok then
        ngx.log(ngx.ERR, "Error start api watch:"..err)
    end
end
 
return _M