chenshijun
2019-09-06 a6f8f26249cc3c2f1fbb66c58a18b969b31534c1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
package androidSync
 
import (
    sdb "basic.com/Android/syncdb.git"
    "basic.com/valib/logger.git"
    "encoding/json"
    "strconv"
    "strings"
    "time"
)
 
/*
    每次开机后都会调用该接口,该接口会去查询数据库,确实之前是否已经加入过集群,若是已经加入集群,则开机自动加入
 */
func InitAgent(devID string) bool {
    sqlFindAllCluster := string("select * from " + dBNameCluster)
    clusters, err := sdb.ExecuteQuerySql([]string{sqlFindAllCluster})
    if err == nil && clusters != nil && len(clusters) > 0 {
        c := clusters[0]
        sqlFindNodesByClusterId := string("select * from " + dBNameClusterNode + " where cluster_id='" + c.Values[0][1].(string) + "'")
        nodes, err := sdb.ExecuteQuerySql([]string{sqlFindNodesByClusterId})
        if err == nil && nodes != nil && len(nodes) > 0 {
            var nodeIps []string
            for _, n := range nodes {
                if n.Values[0][3].(string) != devID {
                    nodeIps = append(nodeIps, n.Values[0][4].(string))
                }
            }
            agent, err = sdb.Init(c.Values[0][0].(string), c.Values[0][2].(string), devID, nodeIps)
            if agent != nil {
                logger.Debug("sync.Agent init success!")
            } else {
                logger.Debug("sync.Agent init fail!")
            }
        }
    }
 
    return true
}
 
/*
    之前没有集群。调用此接口进行集群初始化,传入集群其他节点的ip,便于初始化后直接加入集群
    strAddrs = "ip1:port1;ip2:port2;ip3:port3"
*/
func SyncInit(clusterID string, password string, nodeID string, strAddrs string) bool {
    var ips []string
    if strAddrs == "" {
        ips = nil
    } else {
        ips = strings.Split(strAddrs, ";")
    }
 
    pwdFull := syncClusterKeyPrefix + password
 
    agent, _ = sdb.Init(clusterID, pwdFull, nodeID, ips)
    if agent == nil {
        logger.Error("sdb.Init")
        return false
    }
 
    return true
}
 
/*
    初始化时,若没能加入集群,可以通过该接口加入集群。
    strAddrs = "ip1:port1;ip2:port2;ip3:port3"
*/
func JoinByNodeAddrs(strAddrs string) bool {
    if strAddrs == "" {
        logger.Error("strAddrs == \"\"")
        return false
    }
    addrs := strings.Split(strAddrs, ";")
    err := agent.JoinByNodeAddrs(addrs)
    if err != nil {
        logger.Error("agent.JoinByNodeAddrs err:", err)
        return false
    }
 
    return true
}
 
/*
    加入集群,包含初始化节点SyncInit,并根据传入的集群其他节点列表自动加入集群
    strAddrs = "ip1:port1;ip2:port2;ip3:port3"
*/
func JoinCluster(clusterID, password, strAddrs, devID, devIP, devName string) bool {
 
    isOk := SyncInit(clusterID, password, devID, strAddrs)
 
    if isOk { //加入成功
        logger.Debug("dbSync.Init success")
 
        if ! syncTableDataFromCluster(clusterID, devID, devIP, devName) {
            logger.Error("加入集群失败!!!")
            return false
        }
    } else {
        logger.Error("dbSync.Init error")
        if agent != nil {
            err := agent.Shutdown()
            if err != nil {
                logger.Error("dbSync.Init err,shutdown err:", err)
            }
        }
        return false
    }
 
    return true
}
 
/*
    加入集群后,可以通过该接口获取集群的节点信息,不过最好直接查同步库的集群节点表
json vector
[
{Node1},
{Node2},
...
{Noden}
]
type NodeInfo struct {
    ClusterID   string `json:"clusterID"`
    NodeID      string `json:"nodeID"`
    NodeAddress string `json:"nodeAddress"`
    IsAlive     int    `json:"isAlive"` //StatusNone:0, StatusAlive:1, StatusLeaving:2, StatusLeft:3, StatusFailed:4
}
*/
func GetNodes() []byte {
    nodes := agent.GetNodes()
    strNode, err := json.Marshal(nodes)
    if err != nil {
        logger.Error("json.Marshal err:", err)
        return nil
    }
    return strNode
}
 
/*
    加入集群后, 清空本地的同步库数据,并从集群拉取最新的同步库数据
 */
func syncTableDataFromCluster(clusterID, devID, devIP, devName string) bool {
    var err error
 
    //0.关闭reference
    foreignSql := string("PRAGMA foreign_keys=OFF")
    _, err = sdb.ExecuteWriteSql([]string{foreignSql}, false)
    if err != nil {
        return false
    }
 
    //1.删除本地的同步库数据
    var sqls []string
    var delSql string
    for _, t := range syncTables {
        if t == dBNameTables {
            delSql = "delete from " + t + " where (analyServerId='' or analyServerId=NULL)"
        } else if t == dBNameTablePersons {
            delSql = "delete from " + t + " where tableId in (select id from dbtables where (analyServerId='' or analyServerId=NULL))"
        } else {
            delSql = "delete from " + t
        }
        sqls = append(sqls, delSql)
    }
 
    //2.拉取集群内的同步库数据到本地数据库表中
    var dumpSqls *[]string
    dumpSqls, err = agent.GetTableDataFromCluster(syncTables)
    if dumpSqls != nil {
        for _, dumpSql := range *dumpSqls {
            sqls = append(sqls, dumpSql)
        }
    }
 
    logger.Debug("成功添加当前节点到集群节点中")
 
    //3.将本节点加入到节点列表中
    timeUnix := time.Now().Unix()
    fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05")
 
    sqlSync := "insert into cluster_node(id,cluster_id,node_name,node_id,node_ip,create_time) values ('" +
        devID + "','" + clusterID + "','" + devName + "','" + devID + "','" +
        (devIP + ":" + strconv.Itoa(sdb.DefaultBindPort)) + "','" + fmtTimeStr + "')"
    sqls = append(sqls, sqlSync)
 
    //4. 写入数据库
    _, err = sdb.ExecuteWriteSql(sqls, true)
    if err != nil {
        logger.Debug("sdb.ExecuteWriteSql ERROR:", err)
        return false
    }
 
    //5. 同步该节点到集群
    agent.SyncSql([]string{sqlSync})
 
    //6.开启reference
    foreignSql = string("PRAGMA foreign_keys=ON")
    _, err = sdb.ExecuteWriteSql([]string{foreignSql}, false)
    if err != nil {
        return false
    }
 
    return true
}
 
/*
    操作数据库后,需要调用该接口将对应的sql语句同步到集群
    strSql = "sql1;sql2;sql3;...;sqln"
 */
func SyncSql(strSql string) {
 
    sqls := strings.Split(strSql, ";")
 
    agent.SyncSql(sqls)
}
 
/*
    更新集群的名字
 */
func UpdateClusterName(clusterName, clusterID string) bool {
    sql := string("update cluster set cluster_name='" + clusterName + "' where cluster_id='" + clusterID + "'")
    _, err := sdb.ExecuteWriteSql([]string{sql}, false)
    if err != nil {
        return false
    }
 
    return true
}
 
/*
    退出集群
 */
func Leave() bool {
 
    if agent != nil {
        err := agent.Leave()
        if err != nil {
            logger.Debug("cluster leave err")
            return false
        }
        agent.Shutdown()
        agent = nil
 
        sqls := []string{"delete from cluster_node", "delete from cluster"}
 
        _, err = sdb.ExecuteWriteSql(sqls, false)
        if err != nil {
            return false
        }
    }
 
    return true
}