基于serf的数据库同步模块库
liuxiaolong
2020-09-15 b4df2b1324b17e817bf0ab6f3f80471acc64610f
add more log,execute sql and rawSendTcpMsg use goroutine, show serf log
1个文件已修改
35 ■■■■ 已修改文件
agent.go 35 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
agent.go
@@ -70,7 +70,7 @@
    // create serf agent with serf config
    logger.Info("conf.Config.EncryptKey:", conf.EncryptKey)
    serfAgent, err := agent.Create(conf.Config, serfConf, nil)
    serfAgent, err := agent.Create(conf.Config, serfConf, logger.GetLogFile())
    if err != nil {
        return nil, err
    }
@@ -125,18 +125,21 @@
    switch ev := event.(type) {
    case serf.UserEvent:
        if ev.Name == UserEventSyncSql {
            logger.Info("receive a UserEventSyncSql event")
            var sqlUe SqlUserEvent
            err := json.Unmarshal(ev.Payload, &sqlUe)
            if err !=nil {
                logger.Error("sqlUe unmarshal err:",err)
                return
            }
            logger.Info("ev.LTime:", ev.LTime ,"owner:", sqlUe.Owner, "sql:", sqlUe.Sql)
            if sqlUe.Owner != a.conf.NodeName {
                go func() {
                flag, e := ExecuteSqlByGorm(sqlUe.Sql)
                evTime := uint64(ev.LTime)
                logger.Info("ev.LTime:",evTime,"userEvent exec ",sqlUe.Sql,",Result:",flag,", err:",e)
                    logger.Info("ev.LTime:",ev.LTime,"userEvent exec ",sqlUe.Sql,",Result:",flag,", err:",e)
                }()
            }
        } else if ev.Name == UserEventSyncDbTablePersonCache {
            logger.Info("LTime:",ev.LTime,",ev.Payload.len:",len(ev.Payload))
@@ -189,16 +192,29 @@
            //for _, r := range rows {
            //    rowsReturn = append(rowsReturn, *r)
            //}
            logger.Info("receive QueryEventUpdateDBData, current node:", a.conf.NodeName)
            var fromP QueryTableDataParam
            err := json.Unmarshal(ev.Payload, &fromP)
            if err !=nil {
                logger.Error("Query tableNames unmarshal err")
                if query, ok := event.(*serf.Query); ok {
                    if err := query.Respond([]byte("request unmarshal err")); err != nil {
                        logger.Error("query.Respond err: %s\n", err)
                        return
                    }
                }
                return
            }
            logger.Info("Query tableNames:",fromP.Tables)
            datas, err := ExecuteQueryByGorm(fromP.Tables)
            if err !=nil {
                logger.Error("queryByGorm err")
                logger.Error("queryByGorm err:", err)
                if query, ok := event.(*serf.Query); ok {
                    if err := query.Respond([]byte("queryByGorm err")); err != nil {
                        logger.Error("query.Respond err: %s\n", err)
                        return
                    }
                }
                return
            }
            bytesReturn, err := json.Marshal(datas)
@@ -216,6 +232,7 @@
            }
            logger.Debug("targetNode:",targetNode.Name)
            if targetNode !=nil {
                go func() {
                addr := targetNode.Addr.String() + ":" + strconv.Itoa(TcpTransportPort)
                sendErr := rawSendTcpMsg(addr, bytesReturn)
@@ -224,7 +241,7 @@
                } else {
                    logger.Debug("sendToTcp success")
                }
                }()
            } else {
                logger.Debug("targetNode is nil")
            }
@@ -486,7 +503,7 @@
var QueryTcpResponseChan = make(chan []byte)
//GetDbFromCluster get the newest database after join cluster
//dbPathWrite the path where to write after got a database,
func (a *Agent) GetTableDataFromCluster(tableNames []string) (*[]string,error) {
func (a *Agent) GetTableDataFromCluster(tableNames []string, timeout time.Duration) (*[]string,error) {
    //members: get name of first member
    mbs := a.GroupMembers(a.conf.ClusterID)
    var specmembername string
@@ -531,7 +548,7 @@
    var wg sync.WaitGroup
    wg.Add(1)
    ticker := time.NewTicker(300*time.Second)
    ticker := time.NewTicker(timeout)
    go func(tk *time.Ticker) {
        defer tk.Stop()
        defer wg.Done()