zhangzengfei
2023-10-18 daac628c64069633787588372dea22499ac35396
修复集群功能
5个文件已修改
84 ■■■■■ 已修改文件
system-service/controllers/cluster.go 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
system-service/iwlist/iwlist.go 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
system-service/serf/handler.go 14 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
system-service/serf/serf.go 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
system-service/service/clusterService.go 56 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
system-service/controllers/cluster.go
@@ -25,7 +25,7 @@
    var clusterE models.Cluster
    var reply = bhomeclient.Reply{
        Success: false,
        Msg:     "leave",
        Msg:     "",
        Data:    nil,
    }
@@ -37,9 +37,9 @@
            var nodeE models.Node
            nodes, _ := nodeE.FindNodesByClusterId(arr[0].ClusterId)
            logger.Debug("查询集群节点:", nodes)
            //logger.Debug("查询集群节点:", nodes)
            for _, node := range nodes {
                logger.Debug("节点:", node.NodeId, " servceId:", config.Server.AnalyServerId, " stat:", node.DriftState)
                //logger.Debug("节点:", node.NodeId, " serverId:", config.Server.AnalyServerId, " stat:", node.DriftState)
                if node.NodeId == config.Server.AnalyServerId {
                    if node.DriftState == "master" {
                        reply.Msg = "master"
@@ -50,6 +50,7 @@
                    break
                }
            }
            reply.Data = nodes
        }
    }
system-service/iwlist/iwlist.go
@@ -1,6 +1,7 @@
package iwlist
import (
    "fmt"
    "os/exec"
    "regexp"
    "strconv"
@@ -44,7 +45,9 @@
func Scan(interfaceName string) ([]Cell, error) {
    // execute iwlist for scanning wireless networks
    cmd := exec.Command("iwlist", interfaceName, "scan")
    // 博通网卡不加sudo扫描不到网络
    cmdStr := fmt.Sprintf("sudo iwlist %s scan", interfaceName)
    cmd := exec.Command("/bin/bash", "-c", cmdStr)
    out, err := cmd.CombinedOutput()
    if err != nil {
        return nil, err
system-service/serf/handler.go
@@ -4,6 +4,7 @@
    "basic.com/valib/logger.git"
    "basic.com/valib/serf.git/serf"
    "encoding/json"
    "fmt"
    "github.com/golang/protobuf/proto"
    "github.com/hashicorp/memberlist"
    "github.com/satori/go.uuid"
@@ -239,7 +240,7 @@
func HandleEventMemberLeave(ev serf.MemberEvent) {
    if ev.Members != nil && len(ev.Members) == 1 {
        leaveMember := ev.Members[0]
        leaveSql := "update cluster_node set isDelete=1 where node_id='" + leaveMember.Name + "'"
        leaveSql := "update cluster_node set isDelete=1,drift_state='' where node_id='" + leaveMember.Name + "'"
        flag, e := executeSqlByGorm([]string{leaveSql})
        logger.Info("EventMemberLeave,current Members:", ev.Members)
@@ -260,8 +261,13 @@
func HandleEventMemberJoin(ev serf.MemberEvent) {
    if ev.Members != nil && len(ev.Members) == 1 {
        leaveMember := ev.Members[0]
        joinSql := "update cluster_node set isDelete=0 where node_id='" + leaveMember.Name + "'"
        joinMember := ev.Members[0]
        timeUnix := time.Now().Unix()
        fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05")
        //joinSql := "update cluster_node set isDelete=0 where node_id='" + leaveMember.Name + "'"
        joinSql := fmt.Sprintf("update cluster_node set isDelete=0,drift_state='',node_ip='%s',create_time='%s' where node_id='%s'", joinMember.Addr.String()+":"+strconv.Itoa(int(joinMember.Port)), fmtTimeStr, joinMember.Name)
        flag, e := executeSqlByGorm([]string{joinSql})
        logger.Info("EventMemberJoin,current Members:", ev.Members)
@@ -276,6 +282,6 @@
        if e != nil {
            logErr = e.Error()
        }
        executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + leaveMember.Name + "'," + logResult + ",'" + logErr + "')"})
        executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + joinMember.Name + "'," + logResult + ",'" + logErr + "')"})
    }
}
system-service/serf/serf.go
@@ -64,7 +64,7 @@
        } else if event.EventType() == serf.EventMemberJoin {
            HandleEventMemberJoin(ev)
        }
        logger.Error("serf MemberEvent ", event.EventType())
    default:
        logger.Warn("Unknown event type: %s\n", ev.EventType().String())
    }
system-service/service/clusterService.go
@@ -3,6 +3,7 @@
import (
    "context"
    "encoding/json"
    "fmt"
    "strconv"
    sysSync "sync"
    "time"
@@ -190,6 +191,7 @@
        logger.Debug("AddCluster JoinCluster len(joinIps)=0")
        return false, errors.New("加入的目标ip不能为空")
    }
    logger.Debug("AddCluster joinIps:", joinIps)
    joinArg.Password = config.ClusterSet.PwdPre + joinArg.Password
@@ -207,29 +209,29 @@
        serf.Agent = agent
        t := time.Now()
        syncTableDataFromCluster(joinArg)
        syncClusterNodes := syncTableDataFromCluster(joinArg)
        logger.Debugf("AddCluster  time=%v", time.Since(t))
        //if syncB {
        chMsg := protomsg.DbChangeMessage{
            Id:     joinArg.ClusterId,
            Table:  protomsg.TableChanged_T_Cluster,
            Action: protomsg.DbAction_Insert,
            Info:   "join",
        }
        if syncClusterNodes {
            chMsg := protomsg.DbChangeMessage{
                Id:     joinArg.ClusterId,
                Table:  protomsg.TableChanged_T_Cluster,
                Action: protomsg.DbAction_Insert,
                Info:   "join",
            }
        s.AddDbMessage(&chMsg)
        logger.Debugf("AddCluster 加入集群成功 time=%v", time.Since(start))
        return true, nil
        //} else {
        //    logger.Debug("AddCluster syncTableDataFromCluster fail")
        //    if agent != nil {
        //        agent.Leave()
        //        err = agent.Shutdown()
        //        if err != nil {
        //            logger.Debug("AddCluster agent shutdown err:", err)
        //        }
        //    }
        //}
            s.AddDbMessage(&chMsg)
            logger.Debugf("AddCluster 加入集群成功 time=%v", time.Since(start))
            return true, nil
        } else {
            logger.Debug("AddCluster syncTableDataFromCluster fail")
            agent.Leave()
            err = agent.Shutdown()
            if err != nil {
                logger.Debug("AddCluster agent shutdown err:", err)
            }
            return false, errors.New("加入集群失败")
        }
    } else {
        logger.Debug("AddCluster dbSync.Init err:", err)
        if agent != nil {
@@ -239,7 +241,9 @@
        }
    }
    logger.Debugf("AddCluster 加入集群失败, targetIp=%v, time=%v", targetIp, time.Since(start))
    return false, errors.New("加入集群失败")
}
@@ -403,7 +407,7 @@
    dumpSqls, err = serf.GetTableDataFromCluster(serf.Agent, joinArg.ClusterId, serf.SyncTables, 20*time.Second)
    if dumpSqls != nil && len(*dumpSqls) > 0 {
        for _, sqlStr := range *dumpSqls {
            //logger.Debug("gorm exec dumpSql:", sqlStr)
            logger.Debug("gorm exec dumpSql:", sqlStr)
            if err = tx.Exec(sqlStr).Error; err != nil {
                logger.Error("gorm exec dumpSql:", sqlStr, " error:", err.Error())
                return false
@@ -423,8 +427,11 @@
    serverIp, _, e1 := util.GetLocalIP(config.Server.NetworkAdapter)
    if e1 != nil || serverIp == "" {
        err = errors.New("get serverIp err")
        logger.Error("get serverIp err")
        return false
    }
    logger.Debug("cur Node serverIp:", serverIp, "nodeName:", nodeName)
    if nodeName == "" {
        nodeName = serverIp
@@ -436,7 +443,9 @@
        logger.Debug("add cur node err:", err)
        return false
    }
    if err = tx.Exec("update cluster_node set isDelete=0 where id='" + config.Server.AnalyServerId + "'").Error; err != nil {
    joinSql := fmt.Sprintf("update cluster_node set isDelete=0,drift_state='',node_ip='%s',create_time='%s' where node_id='%s'", serverIp+":"+strconv.Itoa(dbSync.DefaultBindPort), fmtTimeStr, config.Server.AnalyServerId)
    if err = tx.Exec(joinSql).Error; err != nil {
        logger.Debug("update isDelete err:", err)
        return false
    }
@@ -445,6 +454,7 @@
    tx.Exec("PRAGMA foreign_keys=ON")
    tx.Commit()
    serf.SyncSql([]string{sql})
    return true
}