基于serf的数据库同步模块库
liuxiaolong
2019-08-19 c1ed6226183527f60bf7a9f15b780a2c8f67e69a
add sync DbTablePerson UserEvent
1个文件已修改
40 ■■■■ 已修改文件
agent.go 40 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
agent.go
@@ -40,6 +40,8 @@
const (
    QueryEventGetDB        = "GetDatabase"
    QueryEventUpdateDBData = "UpdateDBData"
    UserEventSyncSql       = "SyncSql"
    UserEventSyncDbTablePersonCache       = "SyncCache"
)
// Agent warps the serf agent
@@ -110,6 +112,8 @@
    go a.BroadcastMemberlist(BroadcastInterval * time.Second)
}
var SyncDbTablePersonCacheChan = make(chan []byte,0)
// HandleEvent Handles serf.EventMemberJoin events,
// which will wait for members to join until the number of group members is equal to "groupExpect"
// when the startup mode is "ModeCluster",
@@ -118,18 +122,22 @@
    switch ev := event.(type) {
    case serf.UserEvent:
        //fmt.Println(string(ev.Payload))
        var sqlUe SqlUserEvent
        err := json.Unmarshal(ev.Payload, &sqlUe)
        if err !=nil {
            fmt.Println("sqlUe unmarshal err:",err)
            return
        if ev.Name == UserEventSyncSql {
            var sqlUe SqlUserEvent
            err := json.Unmarshal(ev.Payload, &sqlUe)
            if err !=nil {
                fmt.Println("sqlUe unmarshal err:",err)
                return
            }
            if sqlUe.Owner != a.conf.NodeName {
                //results, err := ExecuteWriteSql(sqlArr)
                flag, _ := ExecuteSqlByGorm(sqlUe.Sql)
                fmt.Println("userEvent exec ",sqlUe.Sql,",Result:",flag)
            }
        } else if ev.Name == UserEventSyncDbTablePersonCache {
            SyncDbTablePersonCacheChan <- ev.Payload
        }
        if sqlUe.Owner != a.conf.NodeName {
            //results, err := ExecuteWriteSql(sqlArr)
            flag, _ := ExecuteSqlByGorm(sqlUe.Sql)
            fmt.Println("userEvent exec ",sqlUe.Sql,",Result:",flag)
        }
    case *serf.Query:
@@ -501,12 +509,20 @@
        fmt.Println("sqlUE marshal err:",err)
        return
    }
    err = a.UserEvent("SyncSql", ueB, false)
    err = a.UserEvent(UserEventSyncSql, ueB, false)
    if err == nil || !strings.Contains(err.Error(), "cannot contain") {
        fmt.Println("err: ", err)
    }
}
//更新同步库的比对缓存
func (a *Agent) SyncDbTablePersonCache(b []byte) {
    err := a.UserEvent(UserEventSyncDbTablePersonCache, b, false)
    if err !=nil{
        fmt.Println("UserEventSyncDbTablePersonCache err:",err)
    }
}
//Init serf Init
func Init(clusterID string, password string, nodeID string, ips []string) (*Agent, error) {
    agent, err := InitNode(clusterID, password, nodeID)