zhangzengfei
2023-10-08 2cd1af13bc4e7aec4c85b9fe88db2d294af6468f
修复集群同步功能
13个文件已修改
305 ■■■■■ 已修改文件
sysinfo-service/main.go 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
system-service/controllers/cluster.go 43 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
system-service/main.go 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
system-service/models/cluster.go 28 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
system-service/models/db.go 68 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
system-service/models/defHeadPic.go 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
system-service/serf/dbLogger.go 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
system-service/serf/handler.go 17 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
system-service/serf/serf.go 23 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
system-service/serf/sync.go 25 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
system-service/service/SysService.go 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
system-service/service/clusterService.go 57 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
system-service/service/proc.go 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
sysinfo-service/main.go
@@ -76,6 +76,7 @@
}
const urlPrefix = "/data/api-v"
func initFuncMap() (map[string]bhomeclient.MicroFunc, []string) {
    funcMap := make(map[string]bhomeclient.MicroFunc)
@@ -87,5 +88,6 @@
    for key,_ := range funcMap {
        pubTopics = append(pubTopics, key)
    }
    return funcMap, pubTopics
}
system-service/controllers/cluster.go
@@ -1,6 +1,7 @@
package controllers
import (
    "vamicro/config"
    "vamicro/system-service/models"
    "vamicro/system-service/service"
    "vamicro/system-service/vo"
@@ -13,6 +14,48 @@
type ClusterController struct {
}
// @Summary 查询当前集群状态
// @Description 查询本地集群
// @Produce json
// @Tags cluster
// @Success 200 {string} json "{"code":200, success:true, msg:"", data:""}"
// @Failure 500 {string} json "{"code":500, success:false, msg:"",data:""}"
// @Router /data/api-v/cluster/status [get]
func (cc ClusterController) GetClusterStat(h *bhomeclient.WrapperHandler, c *bhomeclient.Request) *bhomeclient.Reply {
    var clusterE models.Cluster
    var reply = bhomeclient.Reply{
        Success: false,
        Msg:     "leave",
        Data:    nil,
    }
    arr, err := clusterE.FindAll()
    if err == nil {
        if arr != nil && len(arr) > 0 {
            // 表示已加入集群
            reply.Success = true
            var nodeE models.Node
            nodes, _ := nodeE.FindNodesByClusterId(arr[0].ClusterId)
            logger.Debug("查询集群节点:", nodes)
            for _, node := range nodes {
                logger.Debug("节点:", node.NodeId, " servceId:", config.Server.AnalyServerId, " stat:", node.DriftState)
                if node.NodeId == config.Server.AnalyServerId {
                    if node.DriftState == "master" {
                        reply.Msg = "master"
                    } else {
                        reply.Msg = "slave"
                    }
                    break
                }
            }
        }
    }
    return &reply
}
// @Summary 查询本地集群
// @Description 查询本地集群
// @Produce json
system-service/main.go
@@ -3,6 +3,7 @@
import (
    "context"
    "flag"
    "fmt"
    _ "net/http/pprof"
    "os"
    "os/signal"
@@ -60,7 +61,7 @@
        Proc:        *proc,
        Channel:     nil,
        PubTopic:    pubTopics,
        SubTopic:    []string{versionControlS.AuthorizationUpdateTopic},
        SubTopic:    []string{versionControlS.AuthorizationUpdateTopic, "sync-proc-message-to-serf"},
        SubNetTopic: []string{},
    }
@@ -74,7 +75,7 @@
    bhomedbapi.InitGetNetNode(ms.GetLocalNetNodeByTopic)
    bhomedbapi.InitDoReq(ms.RequestOnly)
    //bhomedbapi.InitLog(logger.Debug)
    bhomedbapi.InitLog(logger.Debug)
    //util.AuthCheck(ctx) //授权检查
@@ -178,6 +179,7 @@
    funcMap[urlPrefix+"/cluster/updateClusterName"] = clusterController.UpdateClusterName
    funcMap[urlPrefix+"/cluster/leave"] = clusterController.Leave
    funcMap[urlPrefix+"/cluster/findIpByNode"] = clusterController.FindIpByNode
    funcMap[urlPrefix+"/cluster/status"] = clusterController.GetClusterStat
    sysMenuC := new(controllers.SysMenuController)
    funcMap["/data/api-u/sysmenus/tree"] = sysMenuC.MenuTree
@@ -271,11 +273,13 @@
    for key, _ := range funcMap {
        pubTopics = append(pubTopics, key)
    }
    return funcMap, pubTopics
}
// 测试接收全网消息
func dealSubMsg(ctx context.Context, ms *bhomeclient.MicroNode) {
    fmt.Println("dealSubMsg")
    for {
        select {
        case <-ctx.Done():
system-service/models/cluster.go
@@ -1,6 +1,7 @@
package models
import (
    "errors"
    "fmt"
    "strconv"
    "time"
@@ -48,40 +49,45 @@
    return result, nil
}
func (c *Cluster) Create() bool {
func (c *Cluster) Create() error {
    var localConfig LocalConfig
    e := localConfig.Select()
    if e != nil {
        return false
    err := localConfig.Select()
    if err != nil {
        return err
    }
    serverId := config.Server.AnalyServerId
    if serverId == "" {
        return false
        return errors.New("serverId 为空")
    }
    serverIp, _, e := util.GetLocalIP(config.Server.NetworkAdapter)
    if e != nil {
        return false
        return e
    }
    var err error
    tx := db.Begin()
    defer func() {
        if err != nil && tx != nil {
            tx.Rollback()
        }
    }()
    sql := "insert into cluster (cluster_id,cluster_name,password,virtual_ip) values ('" + c.ClusterId + "','" + c.ClusterName + "','" + c.Password + "','" + c.VirtualIp + "')"
    if err = tx.Exec(sql).Error; err != nil {
        return false
        return err
    }
    timeUnix := time.Now().Unix()
    fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05")
    //添加本身节点
    sql = "insert into cluster_node (id,cluster_id,node_name,node_id,node_ip,create_time,isDelete,device_type) values ('" + serverId + "','" + c.ClusterId + "','" + localConfig.ServerName + "','" + serverId + "','" + (serverIp + ":" + strconv.Itoa(syncdb.DefaultBindPort)) + "','" + fmtTimeStr + "',0,'" + config.Server.DeviceType + "')"
    sql = "insert into cluster_node (id,cluster_id,node_name,node_id,node_ip,create_time,isDelete,device_type,drift_state) values ('" + serverId + "','" + c.ClusterId + "','" + localConfig.ServerName + "','" + serverId + "','" + (serverIp + ":" + strconv.Itoa(syncdb.DefaultBindPort)) + "','" + fmtTimeStr + "',0,'" + config.Server.DeviceType + "','master')"
    if err = tx.Exec(sql).Error; err != nil {
        return false
        return err
    }
    tx.Commit()
    return true
    return nil
}
func (c *Cluster) UpdateClusterName(clusterName string, virtualIp string) bool {
system-service/models/db.go
@@ -4,7 +4,6 @@
    "basic.com/valib/logger.git"
    "github.com/jinzhu/gorm"
    _ "github.com/jinzhu/gorm/dialects/sqlite"
    "strings"
    "vamicro/config"
)
@@ -42,17 +41,9 @@
        &AuthConfig{},
        &AuthDevice{},
    )
    InitSysSettingData()
    InitDefHeadPicData()
    initDic()
    //添加传感器字典配置
    addSensorDic()
    //初始化事件推送字典
    setDataPushDic()
    //初始化人脸跟踪的字典
    setFaceTrackDic()
}
// GetDB ...
@@ -62,61 +53,4 @@
func CloseDB() {
    db.Close()
}
func initDic() {
    db.Exec("INSERT INTO dictionary SELECT 'dcf9a925-dd1d-4dc4-a30d-002f976366a4', '1', '是', 'endRecord', '是', 1, '0' where not exists (select 1 from dictionary where id='dcf9a925-dd1d-4dc4-a30d-002f976366a4')")
    db.Exec("INSERT INTO dictionary SELECT 'ec3e1a20-c5fd-4d3c-a57b-2d10f114bb8f', '0', '否', 'endRecord', '否', 2, '0' where not exists (select 1 from dictionary where id='ec3e1a20-c5fd-4d3c-a57b-2d10f114bb8f')")
    db.Exec("INSERT INTO dictionary SELECT '4a4a1f9c-1431-4c56-ac95-12792ff51fd1', '1', '是', 'change', '是', 1, '0' where not exists (select 1 from dictionary where id='4a4a1f9c-1431-4c56-ac95-12792ff51fd1')")
    db.Exec("INSERT INTO dictionary SELECT '8c330403-c36d-440f-92a1-b4101abcc168', '0', '否', 'change', '否', 2, '0' where not exists (select 1 from dictionary where id='8c330403-c36d-440f-92a1-b4101abcc168')")
    //添加默认的通道设置
}
func addSensorDic() {
    db.Exec("INSERT INTO dictionary SELECT 'edded040-f9a4-4bbb-9eb1-23a01eaf595b', 'FaceTemperature', '温度传感器', 'sensorType', '人脸测温', 1, '0' where not exists (select 1 from dictionary where id='edded040-f9a4-4bbb-9eb1-23a01eaf595b')")
}
func setDataPushDic() {
    var sqls []string
    //sqls = append(sqls, "delete from dictionary where type='EVENTRULETOPIC';")
    sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select 'B2C94522-9077-768C-595D-85502C6A315E', 'camera', '摄像机', 'EVENTRULETOPIC', '主题-摄像机', 1, '0' where not exists (select 1 from dictionary where id='B2C94522-9077-768C-595D-85502C6A315E');")
    sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select 'E945DFFC-CD7D-AEED-3F0B-7C88FCD9134E', 'dbtable', '底库', 'EVENTRULETOPIC', '主题-底库', 2, '0' where not exists (select 1 from dictionary where id='E945DFFC-CD7D-AEED-3F0B-7C88FCD9134E');")
    sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select '213E9B4A-4E5F-8E56-8279-6C9045621068', 'task', '场景', 'EVENTRULETOPIC', '主题-场景', 3, '0' where not exists (select 1 from dictionary where id='213E9B4A-4E5F-8E56-8279-6C9045621068');")
    sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select '83536D8B-83D2-59B1-655B-1636DBFD8201', 'alarmLevel', '事件等级', 'EVENTRULETOPIC', '主题-报警等级', 5, '0' where not exists (select 1 from dictionary where id='83536D8B-83D2-59B1-655B-1636DBFD8201');")
    sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select '0F082502-CEA8-AAA5-FB43-DD555F0BE7D7', 'cameraName', '设备名称', 'EVENTRULETOPIC', 'custom,option', 1, 'B2C94522-9077-768C-595D-85502C6A315E' where not exists (select 1 from dictionary where id='0F082502-CEA8-AAA5-FB43-DD555F0BE7D7');")
    sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select '1DEC0839-850A-C600-9070-69C871654CE6', 'cameraAddr', '设备地址', 'EVENTRULETOPIC', 'custom,option', 2, 'B2C94522-9077-768C-595D-85502C6A315E' where not exists (select 1 from dictionary where id='1DEC0839-850A-C600-9070-69C871654CE6');")
    sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select '806E7B4A-9A42-4E79-8708-481BA748EEC8', 'alarmRules.#.alarmLevel', '级别', 'EVENTRULETOPIC', 'option', 0, '83536D8B-83D2-59B1-655B-1636DBFD8201' where not exists (select 1 from dictionary where id='806E7B4A-9A42-4E79-8708-481BA748EEC8');")
    sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select '7BBDDF13-AF28-49B9-AFAF-78E535C7CDDB', 'baseInfo.#.tableName', '名称', 'EVENTRULETOPIC', '底库的子选项', 0, 'E945DFFC-CD7D-AEED-3F0B-7C88FCD9134E' where not exists (select 1 from dictionary where id='7BBDDF13-AF28-49B9-AFAF-78E535C7CDDB');")
    sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select '1C9392CD-9FC4-4A23-8DCD-AE2B8E53AF49', 'baseInfo.#.targetName', '人员姓名', 'EVENTRULETOPIC', '底库的子选项', 1, 'E945DFFC-CD7D-AEED-3F0B-7C88FCD9134E' where not exists (select 1 from dictionary where id='1C9392CD-9FC4-4A23-8DCD-AE2B8E53AF49');")
    sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select '7455827A-43A6-4A9D-9D6F-F2D030E3C7D9', 'baseInfo.#.monitorLevel', '人员等级', 'EVENTRULETOPIC', '底库的子选项', 2, 'E945DFFC-CD7D-AEED-3F0B-7C88FCD9134E' where not exists (select 1 from dictionary where id='7455827A-43A6-4A9D-9D6F-F2D030E3C7D9');")
    sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select '950DA518-8D94-4293-A686-D85E8F94BD0B', 'baseInfo.#.labels.idCard', '身份证号', 'EVENTRULETOPIC', '底库的子选项', 3, 'E945DFFC-CD7D-AEED-3F0B-7C88FCD9134E' where not exists (select 1 from dictionary where id='950DA518-8D94-4293-A686-D85E8F94BD0B');")
    sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select '7B72769F-CAFE-458B-8589-B792B4BF123C', 'baseInfo.#.labels.phone', '手机号', 'EVENTRULETOPIC', '底库的子选项', 4, 'E945DFFC-CD7D-AEED-3F0B-7C88FCD9134E' where not exists (select 1 from dictionary where id='7B72769F-CAFE-458B-8589-B792B4BF123C');")
    sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select '58B67911-CF9F-4468-9B46-3DB9F7765816', 'baseInfo.#.labels.plate', '车牌号', 'EVENTRULETOPIC', '底库的子选项', 5, 'E945DFFC-CD7D-AEED-3F0B-7C88FCD9134E' where not exists (select 1 from dictionary where id='58B67911-CF9F-4468-9B46-3DB9F7765816');")
    sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select '1EC955EA-E6A5-4CA6-AD9A-4769D3CE96D6', 'taskName', '名称', 'EVENTRULETOPIC', '场景的子选项', 5, '213E9B4A-4E5F-8E56-8279-6C9045621068' where not exists (select 1 from dictionary where id='1EC955EA-E6A5-4CA6-AD9A-4769D3CE96D6');")
    //sqls = append(sqls, "update dictionary set value='alarmRules.#.alarmLevel' where id='806E7B4A-9A42-4E79-8708-481BA748EEC8';")
    db.Exec(strings.Join(sqls, ""))
}
// 添加一些是否的字典值
func setFaceTrackDic() {
    var sqls []string
    sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select '51739a24-20bd-4440-8ef5-47fe93200536', '1', '是', 'bForceSend', '是', 1, '0' where not exists (select 1 from dictionary where id='51739a24-20bd-4440-8ef5-47fe93200536');")
    sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select 'fd8f9b4b-679b-4504-a295-7bb64fcd9ec5', '0', '否', 'bForceSend', '否', 2, '0' where not exists (select 1 from dictionary where id='fd8f9b4b-679b-4504-a295-7bb64fcd9ec5');")
    sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select 'a25055c1-50be-4ed8-ab29-da670958fbd4', '1', '是', 'bForcePush', '是', 1, '0' where not exists (select 1 from dictionary where id='a25055c1-50be-4ed8-ab29-da670958fbd4');")
    sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select '3c68a1fa-ae57-4e36-b418-719ef28db71e', '0', '否', 'bForcePush', '否', 2, '0' where not exists (select 1 from dictionary where id='3c68a1fa-ae57-4e36-b418-719ef28db71e');")
    sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select '93d0d77a-ba85-47c7-867c-34f7fe1be553', '1', '是', 'bForceSave', '是', 1, '0' where not exists (select 1 from dictionary where id='93d0d77a-ba85-47c7-867c-34f7fe1be553');")
    sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select '23dc5d8b-7894-4692-9042-2fa0aaf13375', '0', '否', 'bForceSave', '否', 2, '0' where not exists (select 1 from dictionary where id='23dc5d8b-7894-4692-9042-2fa0aaf13375');")
    sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select 'e1677c52-844a-4f03-afcd-391b477de87b', '1', '是', 'bReid', '是', 1, '0' where not exists (select 1 from dictionary where id='e1677c52-844a-4f03-afcd-391b477de87b');")
    // 进出入统计, 方向
    sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select 'f6e4104a-cb6b-41bc-b368-69c8a104466b', 'up', '上', 'inside', '上', 1, '0' where not exists (select 1 from dictionary where id='f6e4104a-cb6b-41bc-b368-69c8a104466b');")
    sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select '7f588975-ab59-4ab5-9c9e-af92f64d5353', 'down', '下', 'inside', '下', 2, '0' where not exists (select 1 from dictionary where id='7f588975-ab59-4ab5-9c9e-af92f64d5353');")
    sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select 'b24196de-c891-42b2-bf4a-dabfb2870397', 'right', '左', 'inside', '左', 3, '0' where not exists (select 1 from dictionary where id='b24196de-c891-42b2-bf4a-dabfb2870397');")
    sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select 'b4535930-e1a9-41b1-a4e5-2b3831f37748', 'left', '右', 'inside', '右', 4, '0' where not exists (select 1 from dictionary where id='b4535930-e1a9-41b1-a4e5-2b3831f37748');")
    sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select '1a691da7-989f-4492-958a-7e1f4a49b374', 'up', '上', 'outside', '上', 1, '0' where not exists (select 1 from dictionary where id='1a691da7-989f-4492-958a-7e1f4a49b374');")
    sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select '34aadb9b-3132-4e60-a73f-dc4b6f67a619', 'down', '下', 'outside', '下', 2, '0' where not exists (select 1 from dictionary where id='34aadb9b-3132-4e60-a73f-dc4b6f67a619');")
    sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select '04d56fa3-2285-470c-be42-a15d7ac4273f', 'right', '左', 'outside', '左', 3, '0' where not exists (select 1 from dictionary where id='04d56fa3-2285-470c-be42-a15d7ac4273f');")
    sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select '5c7a68f2-dd0f-4c28-b703-f9626e0d19a2', 'left', '右', 'outside', '右', 4, '0' where not exists (select 1 from dictionary where id='5c7a68f2-dd0f-4c28-b703-f9626e0d19a2');")
    db.Exec(strings.Join(sqls, ""))
}
system-service/models/defHeadPic.go
@@ -1,6 +1,7 @@
package models
import "errors"
func InitDefHeadPicData() {
    var defHeadPic DefHeadPic
    defHeadPic.Id = "1.jpg"
@@ -34,6 +35,7 @@
    defHeadPic.Path = ""
    defHeadPic.Insert()
}
//报警声音表
type DefHeadPic struct {
    Id        string       `gorm:"column:id;primary_key;type:varchar(100);unique;" json:"id"`
@@ -45,7 +47,7 @@
}
func (v *DefHeadPic) Insert() (bool,error) {
    var tmp Voice
    var tmp DefHeadPic
    if tmp.Exist(v.Id) {
        return false, errors.New("文件不允许重名")
    }
@@ -59,8 +61,8 @@
    return false, errors.New("新增失败")
}
func (v *DefHeadPic) Exist(name string) bool {
    dbSelect := db.Table(v.TableName()).Where("name=?", name).First(&v)
func (v *DefHeadPic) Exist(id string) bool {
    dbSelect := db.Table(v.TableName()).Where("id=?", id).First(&v)
    if dbSelect.Error != nil || dbSelect.RowsAffected ==0 {
        return false
    }
@@ -74,4 +76,3 @@
    }
    return list,nil
}
system-service/serf/dbLogger.go
@@ -13,28 +13,15 @@
}
var SyncTables = []string{
    //"area",
    //"camera_area",
    //"cameras",
    //"gb28181_config",
    //"dbtablepersons",
    //"dbtables",
    "cluster",
    "cluster_node",
    "dictionary",
    "auth_config", //设备管理授权配置
    "t_device",     //设备信息表
    "t_device_app", //设备安装的app
    "t_device_sdk", //设备安装的sdk
}
func (dbLogger *DbLogger) Print(values ...interface{}) {
    var (
        level = values[0]
    )
    if level == "sql" {
        msgArr := gorm.LogFormatter(values...)
        sql := msgArr[3].(string)
system-service/serf/handler.go
@@ -77,6 +77,23 @@
    logger.Info("LTime:", ev.LTime, " Recevie virtualIp change")
    SyncVirtualIpChan <- ev.Payload
}
func HandleUserEventSyncMessage(ev serf.UserEvent) {
    logger.Info("receive a UserEventSyncMessage event")
    var procMsg ProcMessageEvent
    err := json.Unmarshal(ev.Payload, &procMsg)
    if err != nil {
        logger.Error("sqlUe unmarshal err:", err)
        return
    }
    // 自己发送的消息不处理
    if procMsg.Owner != config.Server.AnalyServerId {
        // 判断是否有指定的接收目标
        if procMsg.Target == "" || procMsg.Target == config.Server.AnalyServerId {
            SyncProcMessageChan <- ev.Payload
        }
    }
}
//收到其它节点主动将注册中心的所有topic通知到集群中
func HandleSyncRegisterInfo(ev serf.UserEvent) {
system-service/serf/serf.go
@@ -23,11 +23,15 @@
    UserEventSyncVirtualIp          = "SyncVirtualIp"              //漂移ip修改
    UserEventSyncRegisterInfo       = "SyncRegisterInfo"           //同步注册信息
    DataSystemSerfSubscribe         = "data-system-serf-subscribe" //各app从serf订阅消息
    UserEventSyncMessage            = "SyncMessageForProc"         // 为其他进程同步消息
    TcpTransportPort                = 30194                        //tcp传输大数据量接口
    SUserEventSyncMessage
)
var SyncDbTablePersonCacheChan = make(chan []byte, 512)
var SyncVirtualIpChan = make(chan []byte, 512)
var SyncProcMessageChan = make(chan []byte, 512)
func HandleSerfEvent(event serf.Event) {
    switch ev := event.(type) {
@@ -42,6 +46,9 @@
            HandleSyncRegisterInfo(ev)
        } else if ev.Name == DataSystemSerfSubscribe {
            HandleDataSystemSerfSub(ev)
        } else if ev.Name == UserEventSyncMessage {
            logger.Debug("接收到SyncMessageForProc")
            HandleUserEventSyncMessage(ev)
        }
    case *serf.Query:
        if ev.Name == QueryEventUpdateDBData {
@@ -100,6 +107,14 @@
type SqlUserEvent struct {
    Owner string   `json:"owner"`
    Sql   []string `json:"sql"`
}
type ProcMessageEvent struct {
    Owner   string `json:"owner"`    // 发送者
    Target  string `json:"target"`   // 指定接收者
    Proc    string `json:"procName"` // 进程名
    Topic   string `json:"topic"`    // 主题
    Payload []byte `json:"payload"`  // 消息体,自行解析
}
type TableDesc struct {
@@ -193,7 +208,7 @@
    mbs := a.GroupMembers(clusterId)
    var specmembername string
    for _, m := range mbs {
        logger.Info("m", m)
        logger.Info("member", m)
        if m.Name != config.Server.AnalyServerId { //前缀:DSVAD:分析服务器 DSPAD:进出入pad
            if strings.HasPrefix(config.Server.AnalyServerId, "DSVAD") {
                if strings.HasPrefix(m.Name, "DSVAD") {
@@ -206,7 +221,7 @@
            }
        }
    }
    logger.Info("mbs:", mbs, "specmembername:", specmembername)
    logger.Info("members:", mbs, "specmembername:", specmembername)
    if specmembername == "" { //如果未找到目标节点,说明当前集群内除了本节点,没有其他可用节点
        return nil, errors.New("specmembername not found")
    }
@@ -245,8 +260,8 @@
                logger.Info("Query response's len:", len(msg))
                err := json.Unmarshal(msg, &dumpSqls)
                if err == nil {
                    logger.Error("dumpSql:", dumpSqls)
                    logger.Error("data dump success")
                    //logger.Error("dumpSql:", dumpSqls)
                    logger.Debug("data dump success")
                }
                return
            }
system-service/serf/sync.go
@@ -6,6 +6,7 @@
    "basic.com/valib/bhomeclient.git"
    "basic.com/valib/logger.git"
    "context"
    "encoding/json"
    "github.com/gogo/protobuf/proto"
    "nanomsg.org/go-mangos"
    "nanomsg.org/go-mangos/protocol/req"
@@ -112,6 +113,30 @@
            select {
            case <-ctx.Done():
                return
            case b := <-SyncProcMessageChan:
                {
                    var procMsg ProcMessageEvent
                    err := json.Unmarshal(b, &procMsg)
                    if err != nil {
                        logger.Error("Unmarshal ProcMessageEvent ", err.Error())
                    } else {
                        err = hms.Publish(procMsg.Topic, procMsg.Payload)
                        if err != nil {
                            logger.Error("hms.Publish error ", err.Error())
                        }
                    }
                }
            default:
                time.Sleep(50 * time.Millisecond)
            }
        }
    }()
    go func() {
        for {
            select {
            case <-ctx.Done():
                return
            case b := <-syncSdkCompareCacheChan:
                {
                    logger.Debug("SyncSdkCompareCache in,len(b):", len(b))
system-service/service/SysService.go
@@ -412,6 +412,11 @@
        if err := json.Unmarshal(payloads, &AuthInfo); nil != err {
            logger.Error("handleSubMsg failed to persistent:", topic, string(payloads))
        }
    }
    if "sync-proc-message-to-serf" == topic {
        logger.Debug("handleSubMsg sync-proc-message-to-serf")
        ClusterSyncProcMessage(payloads)
    }
}
system-service/service/clusterService.go
@@ -112,19 +112,21 @@
    arr, err := clusterE.FindAll()
    if err == nil && (arr == nil || len(arr) == 0) {
        b := clusterE.Create()
        if b {
        err = clusterE.Create()
        if err == nil {
            serf.InitAgent(context.Background())
            chMsg := protomsg.DbChangeMessage{
                Id:     clusterE.ClusterId,
                Table:  protomsg.TableChanged_T_Cluster,
                Action: protomsg.DbAction_Insert,
                Info:   "",
                Info:   "create",
            }
            s.AddDbMessage(&chMsg)
            return true, clusterId
        } else {
            logger.Error("初始化集群数据库信息失败. ", err.Error())
        }
    } else {
        if s.UpdateClusterName(clusterName, virtualIp) {
@@ -201,22 +203,21 @@
    agent, err := dbSync.Init(joinArg.ClusterId, joinArg.Password, config.Server.AnalyServerId, joinIps, config.ClusterSet.SerfSnapShotPath, conf)
    if err == nil && agent != nil { //加入成功
        logger.Debug("AddCluster dbSync.Init success")
        agent.RegisterHandleEventFunc(serf.HandleSerfEvent)
        serf.Agent = agent
        t := time.Now()
        syncTableDataFromCluster(joinArg)
        logger.Debugf("AddCluster  time=%v", time.Since(t))
        //if syncB {
        //    //需要重新初始化本地比对进程
        //    go serf.ReInitDbPersonCompareData()
        //    chMsg := protomsg.DbChangeMessage{
        //        Id:     joinArg.ClusterId,
        //        Table:  protomsg.TableChanged_T_Cluster,
        //        Action: protomsg.DbAction_Insert,
        //        Info:   "",
        //    }
        //
        //    s.AddDbMessage(&chMsg)
        chMsg := protomsg.DbChangeMessage{
            Id:     joinArg.ClusterId,
            Table:  protomsg.TableChanged_T_Cluster,
            Action: protomsg.DbAction_Insert,
            Info:   "join",
        }
        s.AddDbMessage(&chMsg)
        logger.Debugf("AddCluster 加入集群成功 time=%v", time.Since(start))
        return true, nil
        //} else {
@@ -332,7 +333,7 @@
            Id:     "",
            Table:  protomsg.TableChanged_T_Cluster,
            Action: protomsg.DbAction_Delete,
            Info:   "",
            Info:   "leave",
        }
        logger.Debugf("Leave delete db time=%v", time.Since(t))
        tm := time.Now()
@@ -341,6 +342,7 @@
    }
    logger.Debugf("Leave success time=%v", time.Since(start))
    return true, nil
}
@@ -374,6 +376,7 @@
    db := models.GetDB()
    db.LogMode(false)
    defer db.LogMode(true)
    tx := db.Begin()
    defer func() {
        if err != nil && tx != nil {
@@ -385,16 +388,12 @@
    tx.Exec("PRAGMA foreign_keys=OFF")
    //1.删除本地的同步库数据
    for _, t := range serf.SyncTables {
        delSql := ""
        if t == "dbtables" {
            delSql = "delete from " + t + " where (analyServerId='' or analyServerId=NULL)"
        } else if t == "dbtablepersons" {
            delSql = "delete from " + t + " where tableId in (select id from dbtables where (analyServerId='' or analyServerId=NULL))"
        } else {
            delSql = "delete from " + t + ""
        }
        delSql := "delete from " + t + ""
        err = tx.Exec(delSql).Error
        if err != nil {
            logger.Error("删除本地的同步库数据失败,", err.Error())
            logger.Error("sql:", delSql)
            return false
        }
    }
@@ -404,8 +403,9 @@
    dumpSqls, err = serf.GetTableDataFromCluster(serf.Agent, joinArg.ClusterId, serf.SyncTables, 20*time.Second)
    if dumpSqls != nil && len(*dumpSqls) > 0 {
        for _, sqlStr := range *dumpSqls {
            logger.Debug("gorm exec dumpSql:", sqlStr)
            //logger.Debug("gorm exec dumpSql:", sqlStr)
            if err = tx.Exec(sqlStr).Error; err != nil {
                logger.Error("gorm exec dumpSql:", sqlStr, " error:", err.Error())
                return false
            }
        }
@@ -457,3 +457,14 @@
    var lc models.Node
    return lc.FindIpByNode(nodeId)
}
func ClusterSyncProcMessage(payload []byte) {
    if serf.Agent == nil {
        logger.Error("未加入集群")
    }
    err := serf.Agent.UserEvent(serf.UserEventSyncMessage, payload, false)
    if err != nil {
        logger.Error("UserEventSyncMessage err:", err)
    }
}
system-service/service/proc.go
@@ -1,4 +1,5 @@
package service
const ProcName = "system-service"
const (
    ProcName = "system-service"
)