package serf import ( "basic.com/valib/logger.git" "basic.com/valib/serf.git/serf" "encoding/json" "github.com/golang/protobuf/proto" "github.com/hashicorp/memberlist" "github.com/satori/go.uuid" "path/filepath" "reflect" "runtime" "strconv" "strings" "time" "vamicro/config" "vamicro/system-service/bhome_msg_dev" ) type RpcHandle func(arg RpcParamTopic) ([]serf.NodeResponse, error) var rpcHandlers map[string]RpcHandle func init() { rpcHandlers = make(map[string]RpcHandle) } // RegisterRpcHandles func RegisterRpcHandles(fs ...RpcHandle) { for _, f := range fs { name := runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name() //absolute package path name eg: a.b.c.d.FuncName nameEnd := filepath.Ext(name) name = strings.TrimPrefix(nameEnd, ".") if _, ok := rpcHandlers[name]; !ok { rpcHandlers[name] = f } } } /*****************************UserEvent***************************************/ func HandleUserEventSyncSql(ev serf.UserEvent) { logger.Info("receive a UserEventSyncSql event") var sqlUe SqlUserEvent err := json.Unmarshal(ev.Payload, &sqlUe) if err != nil { logger.Error("sqlUe unmarshal err:", err) return } logger.Info("ev.LTime:", ev.LTime, "owner:", sqlUe.Owner, "sql:", sqlUe.Sql) if sqlUe.Owner != config.Server.AnalyServerId { go func() { flag, e := executeSqlByGorm(sqlUe.Sql) logger.Info("ev.LTime:", ev.LTime, "userEvent exec ", sqlUe.Sql, ",Result:", flag, ", err:", e) logLT := strconv.Itoa(int(ev.LTime)) logT := time.Now().Format("2006-01-02 15:04:05") logSql := strings.ReplaceAll(strings.Join(sqlUe.Sql, ";"), "'", "''") logResult := "0" if flag { logResult = "1" } logErr := "" if e != nil { logErr = e.Error() } executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + sqlUe.Owner + "'," + logResult + ",'" + logErr + "')"}) }() } } func HandleUserEventSyncDbTablePersonCache(ev serf.UserEvent) { logger.Info("LTime:", ev.LTime, ",ev.Payload.len:", len(ev.Payload)) SyncDbTablePersonCacheChan <- ev.Payload } func HandleUserEventSyncVirtualIp(ev serf.UserEvent) { logger.Info("LTime:", ev.LTime, " Recevie virtualIp change") SyncVirtualIpChan <- ev.Payload } //收到其它节点主动将注册中心的所有topic通知到集群中 func HandleSyncRegisterInfo(ev serf.UserEvent) { logger.Debug("HandleSyncRegisterInfo") var si bhome_msg_dev.MsgDevRegisterInfo if err := proto.Unmarshal(ev.Payload, &si); err == nil { logger.Debug("HandleSyncRegisterInfo si.DevId:", string(si.DevId), " config.Server.AnalyServerId:", config.Server.AnalyServerId) if string(si.DevId) != config.Server.AnalyServerId { compareRPool(&si) } } else { logger.Error("HandleSyncRegisterInfo unmarshal err:", err) } } func HandleDataSystemSerfSub(ev serf.UserEvent) { h := GetBusHandle() if h == nil { logger.Error("HandleDataSystemSerfSub bus handle is nil") return } err := h.Publish(DataSystemSerfSubscribe, ev.Payload) if err != nil { logger.Error("HandleDataSystemSerfSub pub err:", err) } } /*****************************Query***************************************/ func HandleQueryEventUpdateDBData(ev *serf.Query) { logger.Info("receive QueryEventUpdateDBData, current node:", config.Server.AnalyServerId) var fromP QueryTableDataParam err := json.Unmarshal(ev.Payload, &fromP) if err != nil { logger.Error("Query tableNames unmarshal err") if err := ev.Respond([]byte("request unmarshal err")); err != nil { logger.Error("query.Respond err: %s\n", err) return } return } logger.Info("Query tableNames:", fromP.Tables) datas, err := DumpTables(fromP.Tables) if err != nil { logger.Error("queryByGorm err:", err) if err := ev.Respond([]byte("queryByGorm err")); err != nil { logger.Error("query.Respond err: %s\n", err) return } return } bytesReturn, err := json.Marshal(datas) logger.Info("results.len: ", len(bytesReturn)) var targetNode *memberlist.Node nodes := Agent.Serf().Memberlist().Members() if nodes != nil && len(nodes) > 0 { for _, n := range nodes { if n.Name == fromP.From { targetNode = n break } } } logger.Debug("targetNode:", targetNode.Name) if targetNode != nil { go func() { addr := targetNode.Addr.String() + ":" + strconv.Itoa(TcpTransportPort) sendErr := rawSendTcpMsg(addr, bytesReturn) logLT := strconv.Itoa(int(ev.LTime)) logT := time.Now().Format("2006-01-02 15:04:05") logSql := strings.ReplaceAll("QueryEventUpdateDBData from "+targetNode.Name, "'", "''") logResult := "0" logErr := "" if sendErr == nil { logResult = "1" logger.Debug("sendToTcp success") } else { logErr = sendErr.Error() logger.Debug("sendToTcp err:", sendErr) } executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + targetNode.Name + "'," + logResult + ",'" + logErr + "')"}) }() } else { logger.Debug("targetNode is nil") } } //处理其他的一些query请求 func HandleOtherQuery(ev *serf.Query) { var reqBody RequestSerfTopicMsg var ret []byte if err := json.Unmarshal(ev.Payload, &reqBody); err != nil { ret = []byte(err.Error()) } else { if err, data := QueryLocalProc(reqBody); err != nil { ret = []byte(err.Error()) } else { b, e := json.Marshal(data) if e != nil { ret = []byte(e.Error()) } else { ret = b } } } if err := ev.Respond(ret); err != nil { logger.Debug("HandleOtherQuery err:", err) return } } func HandleQueryRpc(ev *serf.Query) { var ret []byte var arg RpcParamTopic err := json.Unmarshal(ev.Payload, &arg) if err == nil { if f, ok := rpcHandlers[arg.Topic]; ok { resp, e := f(arg) if e == nil { if data, me := json.Marshal(resp); me == nil { ret = data } else { logger.Debug("marshal resp err:", e) } } else { logger.Debug("call f err:", e) } } else { logger.Debug("rpcHandlers not contains topic:", arg.Topic) } } else { logger.Debug("unmarshal RpcParamTopic err:", err) } if rErr := ev.Respond(ret); rErr != nil { logger.Debug("HandleQueryRpc err:", rErr) } } func HandleEventMemberLeave(ev serf.MemberEvent) { if ev.Members != nil && len(ev.Members) == 1 { leaveMember := ev.Members[0] leaveSql := "update cluster_node set isDelete=1 where node_id='" + leaveMember.Name + "'" flag, e := executeSqlByGorm([]string{leaveSql}) logger.Info("EventMemberLeave,current Members:", ev.Members) logLT := "" logT := time.Now().Format("2006-01-02 15:04:05") logSql := strings.ReplaceAll(leaveSql, "'", "''") logResult := "0" if flag { logResult = "1" } logErr := "" if e != nil { logErr = e.Error() } executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + leaveMember.Name + "'," + logResult + ",'" + logErr + "')"}) } } func HandleEventMemberJoin(ev serf.MemberEvent) { if ev.Members != nil && len(ev.Members) == 1 { leaveMember := ev.Members[0] joinSql := "update cluster_node set isDelete=0 where node_id='" + leaveMember.Name + "'" flag, e := executeSqlByGorm([]string{joinSql}) logger.Info("EventMemberJoin,current Members:", ev.Members) logLT := "" logT := time.Now().Format("2006-01-02 15:04:05") logSql := strings.ReplaceAll(joinSql, "'", "''") logResult := "0" if flag { logResult = "1" } logErr := "" if e != nil { logErr = e.Error() } executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + leaveMember.Name + "'," + logResult + ",'" + logErr + "')"}) } }