From 2cd1af13bc4e7aec4c85b9fe88db2d294af6468f Mon Sep 17 00:00:00 2001 From: zhangzengfei <zhangzengfei@smartai.com> Date: 星期日, 08 十月 2023 11:24:37 +0800 Subject: [PATCH] 修复集群同步功能 --- system-service/models/defHeadPic.go | 23 +- sysinfo-service/main.go | 28 ++- system-service/controllers/cluster.go | 43 ++++++ system-service/service/clusterService.go | 57 ++++--- system-service/service/proc.go | 5 system-service/serf/dbLogger.go | 15 -- system-service/serf/handler.go | 21 ++ system-service/main.go | 8 system-service/models/db.go | 68 --------- system-service/serf/serf.go | 23 ++ system-service/models/cluster.go | 28 ++- system-service/service/SysService.go | 19 +- system-service/serf/sync.go | 25 +++ 13 files changed, 207 insertions(+), 156 deletions(-) diff --git a/sysinfo-service/main.go b/sysinfo-service/main.go index 23e3e25..7e56ff0 100644 --- a/sysinfo-service/main.go +++ b/sysinfo-service/main.go @@ -18,10 +18,10 @@ var ( procName = service.ProcName - proc = &bhomeclient.ProcInfo{ + proc = &bhomeclient.ProcInfo{ Name: procName, //杩涚▼鍚嶇О - ID: procName, //杩涚▼id - Info: "", //杩涚▼鐨勬弿杩颁俊鎭紝鐢ㄤ簬鍖哄垎鍚屼竴杩涚▼鍚嶇О涓嬪涓繘绋� + ID: procName, //杩涚▼id + Info: "", //杩涚▼鐨勬弿杩颁俊鎭紝鐢ㄤ簬鍖哄垎鍚屼竴杩涚▼鍚嶇О涓嬪涓繘绋� } env = flag.String("e", "pro", "") ) @@ -33,24 +33,24 @@ config.Init(*env) // 鏃ュ織鍒濆鍖� - var logFile = config.LogConf.Path + "vamicro-"+procName+".log" + var logFile = config.LogConf.Path + "vamicro-" + procName + ".log" logger.InitLogger(logFile, config.LogConf.Level, config.LogConf.MaxSize, config.LogConf.MaxBackups, config.LogConf.MaxAge) logger.Info("log init success !") } -func main(){ +func main() { // pprof 鐢ㄤ簬鍒嗘瀽鎬ц兘 //go func() { // logger.Info(http.ListenAndServe("0.0.0.0:6079", nil)) //}() - fm,pubTopics := initFuncMap() + fm, pubTopics := initFuncMap() ctx, cancel := context.WithCancel(context.Background()) - var reg = &bhomeclient.RegisterInfo { - Proc: *proc, - Channel: nil, - PubTopic: pubTopics, - SubTopic: []string{}, + var reg = &bhomeclient.RegisterInfo{ + Proc: *proc, + Channel: nil, + PubTopic: pubTopics, + SubTopic: []string{}, SubNetTopic: []string{}, } @@ -58,7 +58,7 @@ signal.Notify(q, os.Interrupt, os.Kill, syscall.SIGTERM) ms, err := bhomeclient.NewMicroNode(ctx, q, config.Server.AnalyServerId, reg, logger.Debug) - if err !=nil { + if err != nil { return } @@ -76,6 +76,7 @@ } const urlPrefix = "/data/api-v" + func initFuncMap() (map[string]bhomeclient.MicroFunc, []string) { funcMap := make(map[string]bhomeclient.MicroFunc) @@ -84,8 +85,9 @@ funcMap[urlPrefix+"/sysinfo/showProcesses"] = controllers.ShowVasystemProcesses var pubTopics []string - for key,_ := range funcMap { + for key, _ := range funcMap { pubTopics = append(pubTopics, key) } + return funcMap, pubTopics } diff --git a/system-service/controllers/cluster.go b/system-service/controllers/cluster.go index bcc88ec..eb8bddf 100644 --- a/system-service/controllers/cluster.go +++ b/system-service/controllers/cluster.go @@ -1,6 +1,7 @@ package controllers import ( + "vamicro/config" "vamicro/system-service/models" "vamicro/system-service/service" "vamicro/system-service/vo" @@ -13,6 +14,48 @@ type ClusterController struct { } +// @Summary 鏌ヨ褰撳墠闆嗙兢鐘舵�� +// @Description 鏌ヨ鏈湴闆嗙兢 +// @Produce json +// @Tags cluster +// @Success 200 {string} json "{"code":200, success:true, msg:"", data:""}" +// @Failure 500 {string} json "{"code":500, success:false, msg:"",data:""}" +// @Router /data/api-v/cluster/status [get] +func (cc ClusterController) GetClusterStat(h *bhomeclient.WrapperHandler, c *bhomeclient.Request) *bhomeclient.Reply { + var clusterE models.Cluster + var reply = bhomeclient.Reply{ + Success: false, + Msg: "leave", + Data: nil, + } + + arr, err := clusterE.FindAll() + if err == nil { + if arr != nil && len(arr) > 0 { + // 琛ㄧず宸插姞鍏ラ泦缇� + reply.Success = true + + var nodeE models.Node + nodes, _ := nodeE.FindNodesByClusterId(arr[0].ClusterId) + logger.Debug("鏌ヨ闆嗙兢鑺傜偣:", nodes) + for _, node := range nodes { + logger.Debug("鑺傜偣:", node.NodeId, " servceId:", config.Server.AnalyServerId, " stat:", node.DriftState) + if node.NodeId == config.Server.AnalyServerId { + if node.DriftState == "master" { + reply.Msg = "master" + + } else { + reply.Msg = "slave" + } + break + } + } + } + } + + return &reply +} + // @Summary 鏌ヨ鏈湴闆嗙兢 // @Description 鏌ヨ鏈湴闆嗙兢 // @Produce json diff --git a/system-service/main.go b/system-service/main.go index 168e931..8e6fd0a 100644 --- a/system-service/main.go +++ b/system-service/main.go @@ -3,6 +3,7 @@ import ( "context" "flag" + "fmt" _ "net/http/pprof" "os" "os/signal" @@ -60,7 +61,7 @@ Proc: *proc, Channel: nil, PubTopic: pubTopics, - SubTopic: []string{versionControlS.AuthorizationUpdateTopic}, + SubTopic: []string{versionControlS.AuthorizationUpdateTopic, "sync-proc-message-to-serf"}, SubNetTopic: []string{}, } @@ -74,7 +75,7 @@ bhomedbapi.InitGetNetNode(ms.GetLocalNetNodeByTopic) bhomedbapi.InitDoReq(ms.RequestOnly) - //bhomedbapi.InitLog(logger.Debug) + bhomedbapi.InitLog(logger.Debug) //util.AuthCheck(ctx) //鎺堟潈妫�鏌� @@ -178,6 +179,7 @@ funcMap[urlPrefix+"/cluster/updateClusterName"] = clusterController.UpdateClusterName funcMap[urlPrefix+"/cluster/leave"] = clusterController.Leave funcMap[urlPrefix+"/cluster/findIpByNode"] = clusterController.FindIpByNode + funcMap[urlPrefix+"/cluster/status"] = clusterController.GetClusterStat sysMenuC := new(controllers.SysMenuController) funcMap["/data/api-u/sysmenus/tree"] = sysMenuC.MenuTree @@ -271,11 +273,13 @@ for key, _ := range funcMap { pubTopics = append(pubTopics, key) } + return funcMap, pubTopics } // 娴嬭瘯鎺ユ敹鍏ㄧ綉娑堟伅 func dealSubMsg(ctx context.Context, ms *bhomeclient.MicroNode) { + fmt.Println("dealSubMsg") for { select { case <-ctx.Done(): diff --git a/system-service/models/cluster.go b/system-service/models/cluster.go index 9a69c53..781906d 100644 --- a/system-service/models/cluster.go +++ b/system-service/models/cluster.go @@ -1,6 +1,7 @@ package models import ( + "errors" "fmt" "strconv" "time" @@ -48,40 +49,45 @@ return result, nil } -func (c *Cluster) Create() bool { +func (c *Cluster) Create() error { var localConfig LocalConfig - e := localConfig.Select() - if e != nil { - return false + err := localConfig.Select() + if err != nil { + return err } + serverId := config.Server.AnalyServerId if serverId == "" { - return false + return errors.New("serverId 涓虹┖") } + serverIp, _, e := util.GetLocalIP(config.Server.NetworkAdapter) if e != nil { - return false + return e } - var err error + tx := db.Begin() defer func() { if err != nil && tx != nil { tx.Rollback() } }() + sql := "insert into cluster (cluster_id,cluster_name,password,virtual_ip) values ('" + c.ClusterId + "','" + c.ClusterName + "','" + c.Password + "','" + c.VirtualIp + "')" if err = tx.Exec(sql).Error; err != nil { - return false + return err } timeUnix := time.Now().Unix() fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05") //娣诲姞鏈韩鑺傜偣 - sql = "insert into cluster_node (id,cluster_id,node_name,node_id,node_ip,create_time,isDelete,device_type) values ('" + serverId + "','" + c.ClusterId + "','" + localConfig.ServerName + "','" + serverId + "','" + (serverIp + ":" + strconv.Itoa(syncdb.DefaultBindPort)) + "','" + fmtTimeStr + "',0,'" + config.Server.DeviceType + "')" + sql = "insert into cluster_node (id,cluster_id,node_name,node_id,node_ip,create_time,isDelete,device_type,drift_state) values ('" + serverId + "','" + c.ClusterId + "','" + localConfig.ServerName + "','" + serverId + "','" + (serverIp + ":" + strconv.Itoa(syncdb.DefaultBindPort)) + "','" + fmtTimeStr + "',0,'" + config.Server.DeviceType + "','master')" if err = tx.Exec(sql).Error; err != nil { - return false + return err } + tx.Commit() - return true + + return nil } func (c *Cluster) UpdateClusterName(clusterName string, virtualIp string) bool { diff --git a/system-service/models/db.go b/system-service/models/db.go index 2a02bdc..f51d0a5 100644 --- a/system-service/models/db.go +++ b/system-service/models/db.go @@ -4,7 +4,6 @@ "basic.com/valib/logger.git" "github.com/jinzhu/gorm" _ "github.com/jinzhu/gorm/dialects/sqlite" - "strings" "vamicro/config" ) @@ -42,17 +41,9 @@ &AuthConfig{}, &AuthDevice{}, ) + InitSysSettingData() InitDefHeadPicData() - - initDic() - - //娣诲姞浼犳劅鍣ㄥ瓧鍏搁厤缃� - addSensorDic() - //鍒濆鍖栦簨浠舵帹閫佸瓧鍏� - setDataPushDic() - //鍒濆鍖栦汉鑴歌窡韪殑瀛楀吀 - setFaceTrackDic() } // GetDB ... @@ -62,61 +53,4 @@ func CloseDB() { db.Close() -} - -func initDic() { - db.Exec("INSERT INTO dictionary SELECT 'dcf9a925-dd1d-4dc4-a30d-002f976366a4', '1', '鏄�', 'endRecord', '鏄�', 1, '0' where not exists (select 1 from dictionary where id='dcf9a925-dd1d-4dc4-a30d-002f976366a4')") - db.Exec("INSERT INTO dictionary SELECT 'ec3e1a20-c5fd-4d3c-a57b-2d10f114bb8f', '0', '鍚�', 'endRecord', '鍚�', 2, '0' where not exists (select 1 from dictionary where id='ec3e1a20-c5fd-4d3c-a57b-2d10f114bb8f')") - db.Exec("INSERT INTO dictionary SELECT '4a4a1f9c-1431-4c56-ac95-12792ff51fd1', '1', '鏄�', 'change', '鏄�', 1, '0' where not exists (select 1 from dictionary where id='4a4a1f9c-1431-4c56-ac95-12792ff51fd1')") - db.Exec("INSERT INTO dictionary SELECT '8c330403-c36d-440f-92a1-b4101abcc168', '0', '鍚�', 'change', '鍚�', 2, '0' where not exists (select 1 from dictionary where id='8c330403-c36d-440f-92a1-b4101abcc168')") - //娣诲姞榛樿鐨勯�氶亾璁剧疆 -} - -func addSensorDic() { - db.Exec("INSERT INTO dictionary SELECT 'edded040-f9a4-4bbb-9eb1-23a01eaf595b', 'FaceTemperature', '娓╁害浼犳劅鍣�', 'sensorType', '浜鸿劯娴嬫俯', 1, '0' where not exists (select 1 from dictionary where id='edded040-f9a4-4bbb-9eb1-23a01eaf595b')") -} - -func setDataPushDic() { - var sqls []string - //sqls = append(sqls, "delete from dictionary where type='EVENTRULETOPIC';") - sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select 'B2C94522-9077-768C-595D-85502C6A315E', 'camera', '鎽勫儚鏈�', 'EVENTRULETOPIC', '涓婚-鎽勫儚鏈�', 1, '0' where not exists (select 1 from dictionary where id='B2C94522-9077-768C-595D-85502C6A315E');") - sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select 'E945DFFC-CD7D-AEED-3F0B-7C88FCD9134E', 'dbtable', '搴曞簱', 'EVENTRULETOPIC', '涓婚-搴曞簱', 2, '0' where not exists (select 1 from dictionary where id='E945DFFC-CD7D-AEED-3F0B-7C88FCD9134E');") - sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select '213E9B4A-4E5F-8E56-8279-6C9045621068', 'task', '鍦烘櫙', 'EVENTRULETOPIC', '涓婚-鍦烘櫙', 3, '0' where not exists (select 1 from dictionary where id='213E9B4A-4E5F-8E56-8279-6C9045621068');") - sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select '83536D8B-83D2-59B1-655B-1636DBFD8201', 'alarmLevel', '浜嬩欢绛夌骇', 'EVENTRULETOPIC', '涓婚-鎶ヨ绛夌骇', 5, '0' where not exists (select 1 from dictionary where id='83536D8B-83D2-59B1-655B-1636DBFD8201');") - sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select '0F082502-CEA8-AAA5-FB43-DD555F0BE7D7', 'cameraName', '璁惧鍚嶇О', 'EVENTRULETOPIC', 'custom,option', 1, 'B2C94522-9077-768C-595D-85502C6A315E' where not exists (select 1 from dictionary where id='0F082502-CEA8-AAA5-FB43-DD555F0BE7D7');") - sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select '1DEC0839-850A-C600-9070-69C871654CE6', 'cameraAddr', '璁惧鍦板潃', 'EVENTRULETOPIC', 'custom,option', 2, 'B2C94522-9077-768C-595D-85502C6A315E' where not exists (select 1 from dictionary where id='1DEC0839-850A-C600-9070-69C871654CE6');") - sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select '806E7B4A-9A42-4E79-8708-481BA748EEC8', 'alarmRules.#.alarmLevel', '绾у埆', 'EVENTRULETOPIC', 'option', 0, '83536D8B-83D2-59B1-655B-1636DBFD8201' where not exists (select 1 from dictionary where id='806E7B4A-9A42-4E79-8708-481BA748EEC8');") - sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select '7BBDDF13-AF28-49B9-AFAF-78E535C7CDDB', 'baseInfo.#.tableName', '鍚嶇О', 'EVENTRULETOPIC', '搴曞簱鐨勫瓙閫夐」', 0, 'E945DFFC-CD7D-AEED-3F0B-7C88FCD9134E' where not exists (select 1 from dictionary where id='7BBDDF13-AF28-49B9-AFAF-78E535C7CDDB');") - sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select '1C9392CD-9FC4-4A23-8DCD-AE2B8E53AF49', 'baseInfo.#.targetName', '浜哄憳濮撳悕', 'EVENTRULETOPIC', '搴曞簱鐨勫瓙閫夐」', 1, 'E945DFFC-CD7D-AEED-3F0B-7C88FCD9134E' where not exists (select 1 from dictionary where id='1C9392CD-9FC4-4A23-8DCD-AE2B8E53AF49');") - sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select '7455827A-43A6-4A9D-9D6F-F2D030E3C7D9', 'baseInfo.#.monitorLevel', '浜哄憳绛夌骇', 'EVENTRULETOPIC', '搴曞簱鐨勫瓙閫夐」', 2, 'E945DFFC-CD7D-AEED-3F0B-7C88FCD9134E' where not exists (select 1 from dictionary where id='7455827A-43A6-4A9D-9D6F-F2D030E3C7D9');") - sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select '950DA518-8D94-4293-A686-D85E8F94BD0B', 'baseInfo.#.labels.idCard', '韬唤璇佸彿', 'EVENTRULETOPIC', '搴曞簱鐨勫瓙閫夐」', 3, 'E945DFFC-CD7D-AEED-3F0B-7C88FCD9134E' where not exists (select 1 from dictionary where id='950DA518-8D94-4293-A686-D85E8F94BD0B');") - sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select '7B72769F-CAFE-458B-8589-B792B4BF123C', 'baseInfo.#.labels.phone', '鎵嬫満鍙�', 'EVENTRULETOPIC', '搴曞簱鐨勫瓙閫夐」', 4, 'E945DFFC-CD7D-AEED-3F0B-7C88FCD9134E' where not exists (select 1 from dictionary where id='7B72769F-CAFE-458B-8589-B792B4BF123C');") - sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select '58B67911-CF9F-4468-9B46-3DB9F7765816', 'baseInfo.#.labels.plate', '杞︾墝鍙�', 'EVENTRULETOPIC', '搴曞簱鐨勫瓙閫夐」', 5, 'E945DFFC-CD7D-AEED-3F0B-7C88FCD9134E' where not exists (select 1 from dictionary where id='58B67911-CF9F-4468-9B46-3DB9F7765816');") - sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select '1EC955EA-E6A5-4CA6-AD9A-4769D3CE96D6', 'taskName', '鍚嶇О', 'EVENTRULETOPIC', '鍦烘櫙鐨勫瓙閫夐」', 5, '213E9B4A-4E5F-8E56-8279-6C9045621068' where not exists (select 1 from dictionary where id='1EC955EA-E6A5-4CA6-AD9A-4769D3CE96D6');") - //sqls = append(sqls, "update dictionary set value='alarmRules.#.alarmLevel' where id='806E7B4A-9A42-4E79-8708-481BA748EEC8';") - db.Exec(strings.Join(sqls, "")) -} - -// 娣诲姞涓�浜涙槸鍚︾殑瀛楀吀鍊� -func setFaceTrackDic() { - var sqls []string - sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select '51739a24-20bd-4440-8ef5-47fe93200536', '1', '鏄�', 'bForceSend', '鏄�', 1, '0' where not exists (select 1 from dictionary where id='51739a24-20bd-4440-8ef5-47fe93200536');") - sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select 'fd8f9b4b-679b-4504-a295-7bb64fcd9ec5', '0', '鍚�', 'bForceSend', '鍚�', 2, '0' where not exists (select 1 from dictionary where id='fd8f9b4b-679b-4504-a295-7bb64fcd9ec5');") - sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select 'a25055c1-50be-4ed8-ab29-da670958fbd4', '1', '鏄�', 'bForcePush', '鏄�', 1, '0' where not exists (select 1 from dictionary where id='a25055c1-50be-4ed8-ab29-da670958fbd4');") - sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select '3c68a1fa-ae57-4e36-b418-719ef28db71e', '0', '鍚�', 'bForcePush', '鍚�', 2, '0' where not exists (select 1 from dictionary where id='3c68a1fa-ae57-4e36-b418-719ef28db71e');") - sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select '93d0d77a-ba85-47c7-867c-34f7fe1be553', '1', '鏄�', 'bForceSave', '鏄�', 1, '0' where not exists (select 1 from dictionary where id='93d0d77a-ba85-47c7-867c-34f7fe1be553');") - sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select '23dc5d8b-7894-4692-9042-2fa0aaf13375', '0', '鍚�', 'bForceSave', '鍚�', 2, '0' where not exists (select 1 from dictionary where id='23dc5d8b-7894-4692-9042-2fa0aaf13375');") - sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select 'e1677c52-844a-4f03-afcd-391b477de87b', '1', '鏄�', 'bReid', '鏄�', 1, '0' where not exists (select 1 from dictionary where id='e1677c52-844a-4f03-afcd-391b477de87b');") - - // 杩涘嚭鍏ョ粺璁�, 鏂瑰悜 - sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select 'f6e4104a-cb6b-41bc-b368-69c8a104466b', 'up', '涓�', 'inside', '涓�', 1, '0' where not exists (select 1 from dictionary where id='f6e4104a-cb6b-41bc-b368-69c8a104466b');") - sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select '7f588975-ab59-4ab5-9c9e-af92f64d5353', 'down', '涓�', 'inside', '涓�', 2, '0' where not exists (select 1 from dictionary where id='7f588975-ab59-4ab5-9c9e-af92f64d5353');") - sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select 'b24196de-c891-42b2-bf4a-dabfb2870397', 'right', '宸�', 'inside', '宸�', 3, '0' where not exists (select 1 from dictionary where id='b24196de-c891-42b2-bf4a-dabfb2870397');") - sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select 'b4535930-e1a9-41b1-a4e5-2b3831f37748', 'left', '鍙�', 'inside', '鍙�', 4, '0' where not exists (select 1 from dictionary where id='b4535930-e1a9-41b1-a4e5-2b3831f37748');") - sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select '1a691da7-989f-4492-958a-7e1f4a49b374', 'up', '涓�', 'outside', '涓�', 1, '0' where not exists (select 1 from dictionary where id='1a691da7-989f-4492-958a-7e1f4a49b374');") - sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select '34aadb9b-3132-4e60-a73f-dc4b6f67a619', 'down', '涓�', 'outside', '涓�', 2, '0' where not exists (select 1 from dictionary where id='34aadb9b-3132-4e60-a73f-dc4b6f67a619');") - sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select '04d56fa3-2285-470c-be42-a15d7ac4273f', 'right', '宸�', 'outside', '宸�', 3, '0' where not exists (select 1 from dictionary where id='04d56fa3-2285-470c-be42-a15d7ac4273f');") - sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select '5c7a68f2-dd0f-4c28-b703-f9626e0d19a2', 'left', '鍙�', 'outside', '鍙�', 4, '0' where not exists (select 1 from dictionary where id='5c7a68f2-dd0f-4c28-b703-f9626e0d19a2');") - - db.Exec(strings.Join(sqls, "")) } diff --git a/system-service/models/defHeadPic.go b/system-service/models/defHeadPic.go index 6552019..7b0a75a 100644 --- a/system-service/models/defHeadPic.go +++ b/system-service/models/defHeadPic.go @@ -1,6 +1,7 @@ package models import "errors" + func InitDefHeadPicData() { var defHeadPic DefHeadPic defHeadPic.Id = "1.jpg" @@ -34,18 +35,19 @@ defHeadPic.Path = "" defHeadPic.Insert() } -//鎶ヨ澹伴煶琛� + +// 鎶ヨ澹伴煶琛� type DefHeadPic struct { - Id string `gorm:"column:id;primary_key;type:varchar(100);unique;" json:"id"` - Path string `gorm:"column:path" json:"path"` + Id string `gorm:"column:id;primary_key;type:varchar(100);unique;" json:"id"` + Path string `gorm:"column:path" json:"path"` } func (DefHeadPic) TableName() string { return "def_head_pic" } -func (v *DefHeadPic) Insert() (bool,error) { - var tmp Voice +func (v *DefHeadPic) Insert() (bool, error) { + var tmp DefHeadPic if tmp.Exist(v.Id) { return false, errors.New("鏂囦欢涓嶅厑璁搁噸鍚�") } @@ -59,9 +61,9 @@ return false, errors.New("鏂板澶辫触") } -func (v *DefHeadPic) Exist(name string) bool { - dbSelect := db.Table(v.TableName()).Where("name=?", name).First(&v) - if dbSelect.Error != nil || dbSelect.RowsAffected ==0 { +func (v *DefHeadPic) Exist(id string) bool { + dbSelect := db.Table(v.TableName()).Where("id=?", id).First(&v) + if dbSelect.Error != nil || dbSelect.RowsAffected == 0 { return false } return true @@ -70,8 +72,7 @@ func (v *DefHeadPic) FindAll() (list []DefHeadPic, err error) { err = db.Table(v.TableName()).Find(&list).Order("id asc").Error if err != nil { - return nil,err + return nil, err } - return list,nil + return list, nil } - diff --git a/system-service/serf/dbLogger.go b/system-service/serf/dbLogger.go index 12317e6..0517be8 100644 --- a/system-service/serf/dbLogger.go +++ b/system-service/serf/dbLogger.go @@ -13,28 +13,15 @@ } var SyncTables = []string{ - //"area", - //"camera_area", - //"cameras", - //"gb28181_config", - //"dbtablepersons", - //"dbtables", - "cluster", "cluster_node", - "dictionary", - - "auth_config", //璁惧绠$悊鎺堟潈閰嶇疆 - - "t_device", //璁惧淇℃伅琛� - "t_device_app", //璁惧瀹夎鐨刟pp - "t_device_sdk", //璁惧瀹夎鐨剆dk } func (dbLogger *DbLogger) Print(values ...interface{}) { var ( level = values[0] ) + if level == "sql" { msgArr := gorm.LogFormatter(values...) sql := msgArr[3].(string) diff --git a/system-service/serf/handler.go b/system-service/serf/handler.go index a6fc49b..c5546db 100644 --- a/system-service/serf/handler.go +++ b/system-service/serf/handler.go @@ -77,8 +77,25 @@ logger.Info("LTime:", ev.LTime, " Recevie virtualIp change") SyncVirtualIpChan <- ev.Payload } +func HandleUserEventSyncMessage(ev serf.UserEvent) { + logger.Info("receive a UserEventSyncMessage event") + var procMsg ProcMessageEvent + err := json.Unmarshal(ev.Payload, &procMsg) + if err != nil { + logger.Error("sqlUe unmarshal err:", err) + return + } -//鏀跺埌鍏跺畠鑺傜偣涓诲姩灏嗘敞鍐屼腑蹇冪殑鎵�鏈塼opic閫氱煡鍒伴泦缇や腑 + // 鑷繁鍙戦�佺殑娑堟伅涓嶅鐞� + if procMsg.Owner != config.Server.AnalyServerId { + // 鍒ゆ柇鏄惁鏈夋寚瀹氱殑鎺ユ敹鐩爣 + if procMsg.Target == "" || procMsg.Target == config.Server.AnalyServerId { + SyncProcMessageChan <- ev.Payload + } + } +} + +// 鏀跺埌鍏跺畠鑺傜偣涓诲姩灏嗘敞鍐屼腑蹇冪殑鎵�鏈塼opic閫氱煡鍒伴泦缇や腑 func HandleSyncRegisterInfo(ev serf.UserEvent) { logger.Debug("HandleSyncRegisterInfo") var si bhome_msg_dev.MsgDevRegisterInfo @@ -167,7 +184,7 @@ } } -//澶勭悊鍏朵粬鐨勪竴浜泀uery璇锋眰 +// 澶勭悊鍏朵粬鐨勪竴浜泀uery璇锋眰 func HandleOtherQuery(ev *serf.Query) { var reqBody RequestSerfTopicMsg var ret []byte diff --git a/system-service/serf/serf.go b/system-service/serf/serf.go index 55ecb72..0a14a19 100644 --- a/system-service/serf/serf.go +++ b/system-service/serf/serf.go @@ -23,11 +23,15 @@ UserEventSyncVirtualIp = "SyncVirtualIp" //婕傜Щip淇敼 UserEventSyncRegisterInfo = "SyncRegisterInfo" //鍚屾娉ㄥ唽淇℃伅 DataSystemSerfSubscribe = "data-system-serf-subscribe" //鍚刟pp浠巗erf璁㈤槄娑堟伅 + UserEventSyncMessage = "SyncMessageForProc" // 涓哄叾浠栬繘绋嬪悓姝ユ秷鎭� TcpTransportPort = 30194 //tcp浼犺緭澶ф暟鎹噺鎺ュ彛 + + SUserEventSyncMessage ) var SyncDbTablePersonCacheChan = make(chan []byte, 512) var SyncVirtualIpChan = make(chan []byte, 512) +var SyncProcMessageChan = make(chan []byte, 512) func HandleSerfEvent(event serf.Event) { switch ev := event.(type) { @@ -42,6 +46,9 @@ HandleSyncRegisterInfo(ev) } else if ev.Name == DataSystemSerfSubscribe { HandleDataSystemSerfSub(ev) + } else if ev.Name == UserEventSyncMessage { + logger.Debug("鎺ユ敹鍒癝yncMessageForProc") + HandleUserEventSyncMessage(ev) } case *serf.Query: if ev.Name == QueryEventUpdateDBData { @@ -100,6 +107,14 @@ type SqlUserEvent struct { Owner string `json:"owner"` Sql []string `json:"sql"` +} + +type ProcMessageEvent struct { + Owner string `json:"owner"` // 鍙戦�佽�� + Target string `json:"target"` // 鎸囧畾鎺ユ敹鑰� + Proc string `json:"procName"` // 杩涚▼鍚� + Topic string `json:"topic"` // 涓婚 + Payload []byte `json:"payload"` // 娑堟伅浣�,鑷瑙f瀽 } type TableDesc struct { @@ -193,7 +208,7 @@ mbs := a.GroupMembers(clusterId) var specmembername string for _, m := range mbs { - logger.Info("m", m) + logger.Info("member", m) if m.Name != config.Server.AnalyServerId { //鍓嶇紑锛欴SVAD:鍒嗘瀽鏈嶅姟鍣� DSPAD:杩涘嚭鍏ad if strings.HasPrefix(config.Server.AnalyServerId, "DSVAD") { if strings.HasPrefix(m.Name, "DSVAD") { @@ -206,7 +221,7 @@ } } } - logger.Info("mbs:", mbs, "specmembername:", specmembername) + logger.Info("members:", mbs, "specmembername:", specmembername) if specmembername == "" { //濡傛灉鏈壘鍒扮洰鏍囪妭鐐癸紝璇存槑褰撳墠闆嗙兢鍐呴櫎浜嗘湰鑺傜偣锛屾病鏈夊叾浠栧彲鐢ㄨ妭鐐� return nil, errors.New("specmembername not found") } @@ -245,8 +260,8 @@ logger.Info("Query response's len:", len(msg)) err := json.Unmarshal(msg, &dumpSqls) if err == nil { - logger.Error("dumpSql:", dumpSqls) - logger.Error("data dump success") + //logger.Error("dumpSql:", dumpSqls) + logger.Debug("data dump success") } return } diff --git a/system-service/serf/sync.go b/system-service/serf/sync.go index ed1089c..c5710da 100644 --- a/system-service/serf/sync.go +++ b/system-service/serf/sync.go @@ -6,6 +6,7 @@ "basic.com/valib/bhomeclient.git" "basic.com/valib/logger.git" "context" + "encoding/json" "github.com/gogo/protobuf/proto" "nanomsg.org/go-mangos" "nanomsg.org/go-mangos/protocol/req" @@ -112,6 +113,30 @@ select { case <-ctx.Done(): return + case b := <-SyncProcMessageChan: + { + var procMsg ProcMessageEvent + err := json.Unmarshal(b, &procMsg) + if err != nil { + logger.Error("Unmarshal ProcMessageEvent ", err.Error()) + } else { + err = hms.Publish(procMsg.Topic, procMsg.Payload) + if err != nil { + logger.Error("hms.Publish error ", err.Error()) + } + } + + } + default: + time.Sleep(50 * time.Millisecond) + } + } + }() + go func() { + for { + select { + case <-ctx.Done(): + return case b := <-syncSdkCompareCacheChan: { logger.Debug("SyncSdkCompareCache in,len(b):", len(b)) diff --git a/system-service/service/SysService.go b/system-service/service/SysService.go index 779b65e..5a95364 100644 --- a/system-service/service/SysService.go +++ b/system-service/service/SysService.go @@ -165,7 +165,7 @@ return true, isComplete, "" } -//upgrade +// upgrade func (sv SysService) Upgrade(identifier string, filename string) (bool, error) { if !bakBeforeUpgrade() { return false, errors.New("鏇存柊鍓嶅浠藉け璐�") @@ -226,7 +226,7 @@ return true } -//鏇存柊绯荤粺绋嬪簭 +// 鏇存柊绯荤粺绋嬪簭 func updatePatch(identifier string, ext string) bool { configPatchPath := "" if config.Server.PatchPath != "" { @@ -349,7 +349,7 @@ return cmd.Output() } -//涓婁紶澹伴煶鏂囦欢 +// 涓婁紶澹伴煶鏂囦欢 func (sv SysService) UploadVoice(fileBytes []byte, filename string) (string, error) { fileExt := path.Ext(filename) fileExt = strings.ToLower(fileExt) @@ -382,18 +382,18 @@ return weedFilePath, err } -//鑾峰彇鎵�鏈夐厤缃� +// 鑾峰彇鎵�鏈夐厤缃� func (sv SysService) GetAllSetting() (settings []models.SysSetting, err error) { settingModel := models.SysSetting{} return settingModel.GetAllSetting() } -//鏇存柊閰嶇疆 +// 鏇存柊閰嶇疆 func (sv SysService) SaveSetting(setting models.SysSetting) (err error) { return setting.SaveSetting(true) } -//鑾峰彇纭洏鍓╀綑绌洪棿 +// 鑾峰彇纭洏鍓╀綑绌洪棿 func (sv SysService) DiskInfo(dev string) (uint64, uint64) { var stat syscall.Statfs_t err := syscall.Statfs(dev, &stat) @@ -406,13 +406,18 @@ return All, Free } -//鍚屾鏇存柊璁剧疆 +// 鍚屾鏇存柊璁剧疆 func PersistentWrapper(topic string, payloads []byte) { if versionControlS.AuthorizationUpdateTopic == topic { if err := json.Unmarshal(payloads, &AuthInfo); nil != err { logger.Error("handleSubMsg failed to persistent:", topic, string(payloads)) } } + + if "sync-proc-message-to-serf" == topic { + logger.Debug("handleSubMsg sync-proc-message-to-serf") + ClusterSyncProcMessage(payloads) + } } // 鑾峰彇鎺堟潈鏂瑰紡鍜屾巿鏉冨瘑鐮� diff --git a/system-service/service/clusterService.go b/system-service/service/clusterService.go index afe172b..2f978ef 100644 --- a/system-service/service/clusterService.go +++ b/system-service/service/clusterService.go @@ -112,19 +112,21 @@ arr, err := clusterE.FindAll() if err == nil && (arr == nil || len(arr) == 0) { - b := clusterE.Create() - if b { + err = clusterE.Create() + if err == nil { serf.InitAgent(context.Background()) chMsg := protomsg.DbChangeMessage{ Id: clusterE.ClusterId, Table: protomsg.TableChanged_T_Cluster, Action: protomsg.DbAction_Insert, - Info: "", + Info: "create", } s.AddDbMessage(&chMsg) return true, clusterId + } else { + logger.Error("鍒濆鍖栭泦缇ゆ暟鎹簱淇℃伅澶辫触. ", err.Error()) } } else { if s.UpdateClusterName(clusterName, virtualIp) { @@ -201,22 +203,21 @@ agent, err := dbSync.Init(joinArg.ClusterId, joinArg.Password, config.Server.AnalyServerId, joinIps, config.ClusterSet.SerfSnapShotPath, conf) if err == nil && agent != nil { //鍔犲叆鎴愬姛 logger.Debug("AddCluster dbSync.Init success") + agent.RegisterHandleEventFunc(serf.HandleSerfEvent) serf.Agent = agent t := time.Now() syncTableDataFromCluster(joinArg) logger.Debugf("AddCluster time=%v", time.Since(t)) //if syncB { - // //闇�瑕侀噸鏂板垵濮嬪寲鏈湴姣斿杩涚▼ - // go serf.ReInitDbPersonCompareData() - // chMsg := protomsg.DbChangeMessage{ - // Id: joinArg.ClusterId, - // Table: protomsg.TableChanged_T_Cluster, - // Action: protomsg.DbAction_Insert, - // Info: "", - // } - // - // s.AddDbMessage(&chMsg) + chMsg := protomsg.DbChangeMessage{ + Id: joinArg.ClusterId, + Table: protomsg.TableChanged_T_Cluster, + Action: protomsg.DbAction_Insert, + Info: "join", + } + + s.AddDbMessage(&chMsg) logger.Debugf("AddCluster 鍔犲叆闆嗙兢鎴愬姛 time=%v", time.Since(start)) return true, nil //} else { @@ -332,7 +333,7 @@ Id: "", Table: protomsg.TableChanged_T_Cluster, Action: protomsg.DbAction_Delete, - Info: "", + Info: "leave", } logger.Debugf("Leave delete db time=%v", time.Since(t)) tm := time.Now() @@ -341,6 +342,7 @@ } logger.Debugf("Leave success time=%v", time.Since(start)) + return true, nil } @@ -374,6 +376,7 @@ db := models.GetDB() db.LogMode(false) defer db.LogMode(true) + tx := db.Begin() defer func() { if err != nil && tx != nil { @@ -385,16 +388,12 @@ tx.Exec("PRAGMA foreign_keys=OFF") //1.鍒犻櫎鏈湴鐨勫悓姝ュ簱鏁版嵁 for _, t := range serf.SyncTables { - delSql := "" - if t == "dbtables" { - delSql = "delete from " + t + " where (analyServerId='' or analyServerId=NULL)" - } else if t == "dbtablepersons" { - delSql = "delete from " + t + " where tableId in (select id from dbtables where (analyServerId='' or analyServerId=NULL))" - } else { - delSql = "delete from " + t + "" - } + delSql := "delete from " + t + "" + err = tx.Exec(delSql).Error if err != nil { + logger.Error("鍒犻櫎鏈湴鐨勫悓姝ュ簱鏁版嵁澶辫触,", err.Error()) + logger.Error("sql:", delSql) return false } } @@ -404,8 +403,9 @@ dumpSqls, err = serf.GetTableDataFromCluster(serf.Agent, joinArg.ClusterId, serf.SyncTables, 20*time.Second) if dumpSqls != nil && len(*dumpSqls) > 0 { for _, sqlStr := range *dumpSqls { - logger.Debug("gorm exec dumpSql:", sqlStr) + //logger.Debug("gorm exec dumpSql:", sqlStr) if err = tx.Exec(sqlStr).Error; err != nil { + logger.Error("gorm exec dumpSql:", sqlStr, " error:", err.Error()) return false } } @@ -457,3 +457,14 @@ var lc models.Node return lc.FindIpByNode(nodeId) } + +func ClusterSyncProcMessage(payload []byte) { + if serf.Agent == nil { + logger.Error("鏈姞鍏ラ泦缇�") + } + + err := serf.Agent.UserEvent(serf.UserEventSyncMessage, payload, false) + if err != nil { + logger.Error("UserEventSyncMessage err:", err) + } +} diff --git a/system-service/service/proc.go b/system-service/service/proc.go index 98437af..f0d7d4e 100644 --- a/system-service/service/proc.go +++ b/system-service/service/proc.go @@ -1,4 +1,5 @@ package service - -const ProcName = "system-service" +const ( + ProcName = "system-service" +) -- Gitblit v1.8.0