From 3a706d3378aa3626501370352963883fd2783558 Mon Sep 17 00:00:00 2001 From: zhangzengfei <zhangzengfei@smartai.com> Date: 星期二, 28 十一月 2023 11:24:49 +0800 Subject: [PATCH] 添加appcenter,完善算法上传下载功能 --- system-service/serf/handler.go | 157 ++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 files changed, 151 insertions(+), 6 deletions(-) diff --git a/system-service/serf/handler.go b/system-service/serf/handler.go index a6fc49b..a90d4ba 100644 --- a/system-service/serf/handler.go +++ b/system-service/serf/handler.go @@ -1,9 +1,11 @@ package serf import ( + "basic.com/pubsub/protomsg.git" "basic.com/valib/logger.git" "basic.com/valib/serf.git/serf" "encoding/json" + "fmt" "github.com/golang/protobuf/proto" "github.com/hashicorp/memberlist" "github.com/satori/go.uuid" @@ -15,6 +17,7 @@ "time" "vamicro/config" "vamicro/system-service/bhome_msg_dev" + "vamicro/system-service/models" ) type RpcHandle func(arg RpcParamTopic) ([]serf.NodeResponse, error) @@ -77,8 +80,63 @@ logger.Info("LTime:", ev.LTime, " Recevie virtualIp change") SyncVirtualIpChan <- ev.Payload } +func HandleUserEventChangeMaster(ev serf.UserEvent) { + masterId := string(ev.Payload) + localId := Agent.LocalMember().Name + logger.Info("鍙樹负涓昏妭鐐圭殑id,", masterId, "鏈妭鐐筰d:", localId) + if masterId == localId { + return + } -//鏀跺埌鍏跺畠鑺傜偣涓诲姩灏嗘敞鍐屼腑蹇冪殑鎵�鏈塼opic閫氱煡鍒伴泦缇や腑 + var clusterDb models.Node + localNode, err := clusterDb.FindNodeById(localId) + if err == nil { + if localNode.DriftState == "master" { + logger.Info("鏈妭鐐逛负涔嬪墠鐨刴aster,閫氱煡鍙樻洿涓簊lave") + + // 閫氱煡涓昏妭鐐瑰彉鏇� + chMsg := protomsg.DbChangeMessage{ + Id: localNode.ClusterId, + Table: protomsg.TableChanged_T_Cluster, + Action: protomsg.DbAction_Insert, + Info: "master2slave", + } + + bts, _ := json.Marshal(chMsg) + h := GetBusHandle() + if h == nil { + logger.Error("HandleEventMemberLeave bus handle is nil") + return + } + err := h.Publish("system-service", bts) + if err != nil { + logger.Error("HandleEventMemberLeave pub master err:", err) + } + } + } + + clusterDb.UpdateDriftStateByNodeId("master", masterId, false) +} + +func HandleUserEventSyncMessage(ev serf.UserEvent) { + logger.Info("receive a UserEventSyncMessage event") + var procMsg ProcMessageEvent + err := json.Unmarshal(ev.Payload, &procMsg) + if err != nil { + logger.Error("sqlUe unmarshal err:", err) + return + } + + // 鑷繁鍙戦�佺殑娑堟伅涓嶅鐞� + if procMsg.Owner != config.Server.AnalyServerId { + // 鍒ゆ柇鏄惁鏈夋寚瀹氱殑鎺ユ敹鐩爣 + if procMsg.Target == "" || procMsg.Target == config.Server.AnalyServerId { + SyncProcMessageChan <- ev.Payload + } + } +} + +// 鏀跺埌鍏跺畠鑺傜偣涓诲姩灏嗘敞鍐屼腑蹇冪殑鎵�鏈塼opic閫氱煡鍒伴泦缇や腑 func HandleSyncRegisterInfo(ev serf.UserEvent) { logger.Debug("HandleSyncRegisterInfo") var si bhome_msg_dev.MsgDevRegisterInfo @@ -167,7 +225,7 @@ } } -//澶勭悊鍏朵粬鐨勪竴浜泀uery璇锋眰 +// 澶勭悊鍏朵粬鐨勪竴浜泀uery璇锋眰 func HandleOtherQuery(ev *serf.Query) { var reqBody RequestSerfTopicMsg var ret []byte @@ -222,10 +280,46 @@ func HandleEventMemberLeave(ev serf.MemberEvent) { if ev.Members != nil && len(ev.Members) == 1 { leaveMember := ev.Members[0] + logger.Info("Event Member Leave, Members:", ev.Members) + leaveSql := "update cluster_node set isDelete=1 where node_id='" + leaveMember.Name + "'" flag, e := executeSqlByGorm([]string{leaveSql}) - logger.Info("EventMemberLeave,current Members:", ev.Members) + // 鍒ゆ柇绂诲紑鏄惁鏄富鑺傜偣绂诲紑, 鏇存崲涓昏妭鐐� + var clusterDb models.Node + leaveNode, err := clusterDb.FindNodeById(leaveMember.Name) + if err == nil { + logger.Info("鏌ヨ绂诲紑鑺傜偣鐨勪俊鎭�,", leaveNode) + if leaveNode.DriftState == "master" { + firstNode, _ := clusterDb.FindFirstNode() + logger.Info("绂诲紑鐨勮妭鐐逛负涓昏妭鐐�, 鏌ヨ鍔犲叆鏈�鏃╃殑鑺傜偣, ", firstNode) + logger.Info("鏈妭鐐逛俊鎭�:", Agent.LocalMember().Name) + if firstNode.NodeId == Agent.LocalMember().Name { + logger.Info("鏇存柊鏈妭鐐逛负涓昏妭鐐�") + clusterDb.UpdateDriftStateByNodeId("master", firstNode.NodeId, true) + + // 閫氱煡涓昏妭鐐瑰彉鏇� + chMsg := protomsg.DbChangeMessage{ + Id: firstNode.ClusterId, + Table: protomsg.TableChanged_T_Cluster, + Action: protomsg.DbAction_Insert, + Info: "slave2master", + } + + bts, _ := json.Marshal(chMsg) + h := GetBusHandle() + if h == nil { + logger.Error("HandleEventMemberLeave bus handle is nil") + return + } + err := h.Publish("system-service", bts) + if err != nil { + logger.Error("HandleEventMemberLeave pub master err:", err) + } + } + } + } + logLT := "" logT := time.Now().Format("2006-01-02 15:04:05") logSql := strings.ReplaceAll(leaveSql, "'", "''") @@ -243,8 +337,40 @@ func HandleEventMemberJoin(ev serf.MemberEvent) { if ev.Members != nil && len(ev.Members) == 1 { - leaveMember := ev.Members[0] - joinSql := "update cluster_node set isDelete=0 where node_id='" + leaveMember.Name + "'" + joinMember := ev.Members[0] + + timeUnix := time.Now().Unix() + fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05") + + // 鏇存柊鍦ㄧ嚎鐘舵�� + updateOnlineSql := fmt.Sprintf("update cluster_node set online=1 where node_id='%s'", joinMember.Name) + + // 濡傛灉鏄柊鍔犲叆鐨勬洿鏂板垱寤烘椂闂� + joinSql := fmt.Sprintf("update cluster_node set isDelete=0,drift_state='slave',node_ip='%s',create_time='%s' where node_id='%s' and isDelete=1", joinMember.Addr.String()+":"+strconv.Itoa(int(joinMember.Port)), fmtTimeStr, joinMember.Name) + flag, e := executeSqlByGorm([]string{updateOnlineSql, joinSql}) + + logger.Info("EventMemberJoin,current Members:", ev.Members) + logLT := "" + logT := time.Now().Format("2006-01-02 15:04:05") + logSql := strings.ReplaceAll(joinSql, "'", "''") + logResult := "0" + if flag { + logResult = "1" + } + logErr := "" + if e != nil { + logErr = e.Error() + } + executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + joinMember.Name + "'," + logResult + ",'" + logErr + "')"}) + } +} + +func HandleEventMemberFail(ev serf.MemberEvent) { + if ev.Members != nil && len(ev.Members) == 1 { + joinMember := ev.Members[0] + + //joinSql := "update cluster_node set isDelete=0 where node_id='" + leaveMember.Name + "'" + joinSql := fmt.Sprintf("update cluster_node set online=0 where node_id='%s'", joinMember.Name) flag, e := executeSqlByGorm([]string{joinSql}) logger.Info("EventMemberJoin,current Members:", ev.Members) @@ -259,6 +385,25 @@ if e != nil { logErr = e.Error() } - executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + leaveMember.Name + "'," + logResult + ",'" + logErr + "')"}) + executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + joinMember.Name + "'," + logResult + ",'" + logErr + "')"}) + } +} + +func HandleUpdateMemberStatus() { + if Agent == nil { + return + } + + for _, member := range Agent.Serf().Members() { + alive := 0 + if member.Status == serf.StatusAlive { + alive = 1 + } + + sql := fmt.Sprintf("update cluster_node set online=%d where node_id='%s'", alive, member.Name) + _, err := executeSqlByGorm([]string{sql}) + if err != nil { + logger.Error(err) + } } } -- Gitblit v1.8.0