| | |
| | | package serf |
| | | |
| | | import ( |
| | | "basic.com/valib/logger.git" |
| | | "basic.com/valib/serf.git/serf" |
| | | "encoding/json" |
| | | "github.com/golang/protobuf/proto" |
| | | "github.com/hashicorp/memberlist" |
| | | "github.com/satori/go.uuid" |
| | | "path/filepath" |
| | | "reflect" |
| | | "runtime" |
| | | "strconv" |
| | | "strings" |
| | | "time" |
| | | "vamicro/config" |
| | | "vamicro/system-service/bhome_msg_dev" |
| | | ) |
| | | |
| | | type RpcHandle func(arg RpcParamTopic) ([]serf.NodeResponse, error) |
| | | |
| | | var rpcHandlers map[string]RpcHandle |
| | | |
| | | func init() { |
| | | rpcHandlers = make(map[string]RpcHandle) |
| | | } |
| | | |
| | | // RegisterRpcHandles |
| | | func RegisterRpcHandles(fs ...RpcHandle) { |
| | | for _, f := range fs { |
| | | name := runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name() //absolute package path name eg: a.b.c.d.FuncName |
| | | nameEnd := filepath.Ext(name) |
| | | name = strings.TrimPrefix(nameEnd, ".") |
| | | if _, ok := rpcHandlers[name]; !ok { |
| | | rpcHandlers[name] = f |
| | | } |
| | | } |
| | | } |
| | | |
| | | /*****************************UserEvent***************************************/ |
| | | func HandleUserEventSyncSql(ev serf.UserEvent) { |
| | | logger.Info("receive a UserEventSyncSql event") |
| | | var sqlUe SqlUserEvent |
| | | err := json.Unmarshal(ev.Payload, &sqlUe) |
| | | if err != nil { |
| | | logger.Error("sqlUe unmarshal err:", err) |
| | | return |
| | | } |
| | | |
| | | logger.Info("ev.LTime:", ev.LTime, "owner:", sqlUe.Owner, "sql:", sqlUe.Sql) |
| | | if sqlUe.Owner != config.Server.AnalyServerId { |
| | | go func() { |
| | | flag, e := executeSqlByGorm(sqlUe.Sql) |
| | | logger.Info("ev.LTime:", ev.LTime, "userEvent exec ", sqlUe.Sql, ",Result:", flag, ", err:", e) |
| | | logLT := strconv.Itoa(int(ev.LTime)) |
| | | logT := time.Now().Format("2006-01-02 15:04:05") |
| | | logSql := strings.ReplaceAll(strings.Join(sqlUe.Sql, ";"), "'", "''") |
| | | logResult := "0" |
| | | if flag { |
| | | logResult = "1" |
| | | } |
| | | logErr := "" |
| | | if e != nil { |
| | | logErr = e.Error() |
| | | } |
| | | executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + sqlUe.Owner + "'," + logResult + ",'" + logErr + "')"}) |
| | | }() |
| | | } |
| | | } |
| | | |
| | | func HandleUserEventSyncDbTablePersonCache(ev serf.UserEvent) { |
| | | logger.Info("LTime:", ev.LTime, ",ev.Payload.len:", len(ev.Payload)) |
| | | SyncDbTablePersonCacheChan <- ev.Payload |
| | | } |
| | | |
| | | func HandleUserEventSyncVirtualIp(ev serf.UserEvent) { |
| | | logger.Info("LTime:", ev.LTime, " Recevie virtualIp change") |
| | | SyncVirtualIpChan <- ev.Payload |
| | | } |
| | | |
| | | //收到其它节点主动将注册中心的所有topic通知到集群中 |
| | | func HandleSyncRegisterInfo(ev serf.UserEvent) { |
| | | logger.Debug("HandleSyncRegisterInfo") |
| | | var si bhome_msg_dev.MsgDevRegisterInfo |
| | | if err := proto.Unmarshal(ev.Payload, &si); err == nil { |
| | | logger.Debug("HandleSyncRegisterInfo si.DevId:", string(si.DevId), " config.Server.AnalyServerId:", config.Server.AnalyServerId) |
| | | if string(si.DevId) != config.Server.AnalyServerId { |
| | | compareRPool(&si) |
| | | } |
| | | } else { |
| | | logger.Error("HandleSyncRegisterInfo unmarshal err:", err) |
| | | } |
| | | } |
| | | |
| | | func HandleDataSystemSerfSub(ev serf.UserEvent) { |
| | | h := GetBusHandle() |
| | | if h == nil { |
| | | logger.Error("HandleDataSystemSerfSub bus handle is nil") |
| | | return |
| | | } |
| | | err := h.Publish(DataSystemSerfSubscribe, ev.Payload) |
| | | if err != nil { |
| | | logger.Error("HandleDataSystemSerfSub pub err:", err) |
| | | } |
| | | } |
| | | |
| | | /*****************************Query***************************************/ |
| | | func HandleQueryEventUpdateDBData(ev *serf.Query) { |
| | | logger.Info("receive QueryEventUpdateDBData, current node:", config.Server.AnalyServerId) |
| | | var fromP QueryTableDataParam |
| | | err := json.Unmarshal(ev.Payload, &fromP) |
| | | if err != nil { |
| | | logger.Error("Query tableNames unmarshal err") |
| | | if err := ev.Respond([]byte("request unmarshal err")); err != nil { |
| | | logger.Error("query.Respond err: %s\n", err) |
| | | return |
| | | } |
| | | |
| | | return |
| | | } |
| | | logger.Info("Query tableNames:", fromP.Tables) |
| | | datas, err := DumpTables(fromP.Tables) |
| | | if err != nil { |
| | | logger.Error("queryByGorm err:", err) |
| | | if err := ev.Respond([]byte("queryByGorm err")); err != nil { |
| | | logger.Error("query.Respond err: %s\n", err) |
| | | return |
| | | } |
| | | return |
| | | } |
| | | bytesReturn, err := json.Marshal(datas) |
| | | logger.Info("results.len: ", len(bytesReturn)) |
| | | |
| | | var targetNode *memberlist.Node |
| | | nodes := Agent.Serf().Memberlist().Members() |
| | | if nodes != nil && len(nodes) > 0 { |
| | | for _, n := range nodes { |
| | | if n.Name == fromP.From { |
| | | targetNode = n |
| | | break |
| | | } |
| | | } |
| | | } |
| | | logger.Debug("targetNode:", targetNode.Name) |
| | | if targetNode != nil { |
| | | go func() { |
| | | addr := targetNode.Addr.String() + ":" + strconv.Itoa(TcpTransportPort) |
| | | sendErr := rawSendTcpMsg(addr, bytesReturn) |
| | | |
| | | logLT := strconv.Itoa(int(ev.LTime)) |
| | | logT := time.Now().Format("2006-01-02 15:04:05") |
| | | logSql := strings.ReplaceAll("QueryEventUpdateDBData from "+targetNode.Name, "'", "''") |
| | | logResult := "0" |
| | | logErr := "" |
| | | if sendErr == nil { |
| | | logResult = "1" |
| | | logger.Debug("sendToTcp success") |
| | | } else { |
| | | logErr = sendErr.Error() |
| | | logger.Debug("sendToTcp err:", sendErr) |
| | | } |
| | | |
| | | executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + targetNode.Name + "'," + logResult + ",'" + logErr + "')"}) |
| | | }() |
| | | } else { |
| | | logger.Debug("targetNode is nil") |
| | | } |
| | | } |
| | | |
| | | //处理其他的一些query请求 |
| | | func HandleOtherQuery(ev *serf.Query) { |
| | | var reqBody RequestSerfTopicMsg |
| | | var ret []byte |
| | | if err := json.Unmarshal(ev.Payload, &reqBody); err != nil { |
| | | ret = []byte(err.Error()) |
| | | } else { |
| | | if err, data := QueryLocalProc(reqBody); err != nil { |
| | | ret = []byte(err.Error()) |
| | | } else { |
| | | b, e := json.Marshal(data) |
| | | if e != nil { |
| | | ret = []byte(e.Error()) |
| | | } else { |
| | | ret = b |
| | | } |
| | | } |
| | | } |
| | | |
| | | if err := ev.Respond(ret); err != nil { |
| | | logger.Debug("HandleOtherQuery err:", err) |
| | | return |
| | | } |
| | | } |
| | | |
| | | func HandleQueryRpc(ev *serf.Query) { |
| | | var ret []byte |
| | | var arg RpcParamTopic |
| | | err := json.Unmarshal(ev.Payload, &arg) |
| | | if err == nil { |
| | | if f, ok := rpcHandlers[arg.Topic]; ok { |
| | | resp, e := f(arg) |
| | | if e == nil { |
| | | if data, me := json.Marshal(resp); me == nil { |
| | | ret = data |
| | | } else { |
| | | logger.Debug("marshal resp err:", e) |
| | | } |
| | | } else { |
| | | logger.Debug("call f err:", e) |
| | | } |
| | | } else { |
| | | logger.Debug("rpcHandlers not contains topic:", arg.Topic) |
| | | } |
| | | } else { |
| | | logger.Debug("unmarshal RpcParamTopic err:", err) |
| | | } |
| | | if rErr := ev.Respond(ret); rErr != nil { |
| | | logger.Debug("HandleQueryRpc err:", rErr) |
| | | } |
| | | } |
| | | |
| | | func HandleEventMemberLeave(ev serf.MemberEvent) { |
| | | if ev.Members != nil && len(ev.Members) == 1 { |
| | | leaveMember := ev.Members[0] |
| | | leaveSql := "update cluster_node set isDelete=1 where node_id='" + leaveMember.Name + "'" |
| | | flag, e := executeSqlByGorm([]string{leaveSql}) |
| | | |
| | | logger.Info("EventMemberLeave,current Members:", ev.Members) |
| | | logLT := "" |
| | | logT := time.Now().Format("2006-01-02 15:04:05") |
| | | logSql := strings.ReplaceAll(leaveSql, "'", "''") |
| | | logResult := "0" |
| | | if flag { |
| | | logResult = "1" |
| | | } |
| | | logErr := "" |
| | | if e != nil { |
| | | logErr = e.Error() |
| | | } |
| | | executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + leaveMember.Name + "'," + logResult + ",'" + logErr + "')"}) |
| | | } |
| | | } |
| | | |
| | | func HandleEventMemberJoin(ev serf.MemberEvent) { |
| | | if ev.Members != nil && len(ev.Members) == 1 { |
| | | leaveMember := ev.Members[0] |
| | | joinSql := "update cluster_node set isDelete=0 where node_id='" + leaveMember.Name + "'" |
| | | flag, e := executeSqlByGorm([]string{joinSql}) |
| | | |
| | | logger.Info("EventMemberJoin,current Members:", ev.Members) |
| | | logLT := "" |
| | | logT := time.Now().Format("2006-01-02 15:04:05") |
| | | logSql := strings.ReplaceAll(joinSql, "'", "''") |
| | | logResult := "0" |
| | | if flag { |
| | | logResult = "1" |
| | | } |
| | | logErr := "" |
| | | if e != nil { |
| | | logErr = e.Error() |
| | | } |
| | | executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + leaveMember.Name + "'," + logResult + ",'" + logErr + "')"}) |
| | | } |
| | | } |
| | | package serf
|
| | |
|
| | | import (
|
| | | "basic.com/valib/logger.git"
|
| | | "basic.com/valib/serf.git/serf"
|
| | | "encoding/json"
|
| | | "github.com/golang/protobuf/proto"
|
| | | "github.com/hashicorp/memberlist"
|
| | | "github.com/satori/go.uuid"
|
| | | "path/filepath"
|
| | | "reflect"
|
| | | "runtime"
|
| | | "strconv"
|
| | | "strings"
|
| | | "time"
|
| | | "vamicro/config"
|
| | | "vamicro/system-service/bhome_msg_dev"
|
| | | )
|
| | |
|
| | | type RpcHandle func(arg RpcParamTopic) ([]serf.NodeResponse, error)
|
| | |
|
| | | var rpcHandlers map[string]RpcHandle
|
| | |
|
| | | func init() {
|
| | | rpcHandlers = make(map[string]RpcHandle)
|
| | | }
|
| | |
|
| | | // RegisterRpcHandles
|
| | | func RegisterRpcHandles(fs ...RpcHandle) {
|
| | | for _, f := range fs {
|
| | | name := runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name() //absolute package path name eg: a.b.c.d.FuncName
|
| | | nameEnd := filepath.Ext(name)
|
| | | name = strings.TrimPrefix(nameEnd, ".")
|
| | | if _, ok := rpcHandlers[name]; !ok {
|
| | | rpcHandlers[name] = f
|
| | | }
|
| | | }
|
| | | }
|
| | |
|
| | | /*****************************UserEvent***************************************/
|
| | | func HandleUserEventSyncSql(ev serf.UserEvent) {
|
| | | logger.Info("receive a UserEventSyncSql event")
|
| | | var sqlUe SqlUserEvent
|
| | | err := json.Unmarshal(ev.Payload, &sqlUe)
|
| | | if err != nil {
|
| | | logger.Error("sqlUe unmarshal err:", err)
|
| | | return
|
| | | }
|
| | |
|
| | | logger.Info("ev.LTime:", ev.LTime, "owner:", sqlUe.Owner, "sql:", sqlUe.Sql)
|
| | | if sqlUe.Owner != config.Server.AnalyServerId {
|
| | | go func() {
|
| | | flag, e := executeSqlByGorm(sqlUe.Sql)
|
| | | logger.Info("ev.LTime:", ev.LTime, "userEvent exec ", sqlUe.Sql, ",Result:", flag, ", err:", e)
|
| | | logLT := strconv.Itoa(int(ev.LTime))
|
| | | logT := time.Now().Format("2006-01-02 15:04:05")
|
| | | logSql := strings.ReplaceAll(strings.Join(sqlUe.Sql, ";"), "'", "''")
|
| | | logResult := "0"
|
| | | if flag {
|
| | | logResult = "1"
|
| | | }
|
| | | logErr := ""
|
| | | if e != nil {
|
| | | logErr = e.Error()
|
| | | }
|
| | | executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + sqlUe.Owner + "'," + logResult + ",'" + logErr + "')"})
|
| | | }()
|
| | | }
|
| | | }
|
| | |
|
| | | func HandleUserEventSyncDbTablePersonCache(ev serf.UserEvent) {
|
| | | logger.Info("LTime:", ev.LTime, ",ev.Payload.len:", len(ev.Payload))
|
| | | SyncDbTablePersonCacheChan <- ev.Payload
|
| | | }
|
| | |
|
| | | func HandleUserEventSyncVirtualIp(ev serf.UserEvent) {
|
| | | logger.Info("LTime:", ev.LTime, " Recevie virtualIp change")
|
| | | SyncVirtualIpChan <- ev.Payload
|
| | | }
|
| | |
|
| | | //收到其它节点主动将注册中心的所有topic通知到集群中
|
| | | func HandleSyncRegisterInfo(ev serf.UserEvent) {
|
| | | logger.Debug("HandleSyncRegisterInfo")
|
| | | var si bhome_msg_dev.MsgDevRegisterInfo
|
| | | if err := proto.Unmarshal(ev.Payload, &si); err == nil {
|
| | | logger.Debug("HandleSyncRegisterInfo si.DevId:", string(si.DevId), " config.Server.AnalyServerId:", config.Server.AnalyServerId)
|
| | | if string(si.DevId) != config.Server.AnalyServerId {
|
| | | compareRPool(&si)
|
| | | }
|
| | | } else {
|
| | | logger.Error("HandleSyncRegisterInfo unmarshal err:", err)
|
| | | }
|
| | | }
|
| | |
|
| | | func HandleDataSystemSerfSub(ev serf.UserEvent) {
|
| | | h := GetBusHandle()
|
| | | if h == nil {
|
| | | logger.Error("HandleDataSystemSerfSub bus handle is nil")
|
| | | return
|
| | | }
|
| | | err := h.Publish(DataSystemSerfSubscribe, ev.Payload)
|
| | | if err != nil {
|
| | | logger.Error("HandleDataSystemSerfSub pub err:", err)
|
| | | }
|
| | | }
|
| | |
|
| | | /*****************************Query***************************************/
|
| | | func HandleQueryEventUpdateDBData(ev *serf.Query) {
|
| | | logger.Info("receive QueryEventUpdateDBData, current node:", config.Server.AnalyServerId)
|
| | | var fromP QueryTableDataParam
|
| | | err := json.Unmarshal(ev.Payload, &fromP)
|
| | | if err != nil {
|
| | | logger.Error("Query tableNames unmarshal err")
|
| | | if err := ev.Respond([]byte("request unmarshal err")); err != nil {
|
| | | logger.Error("query.Respond err: %s\n", err)
|
| | | return
|
| | | }
|
| | |
|
| | | return
|
| | | }
|
| | | logger.Info("Query tableNames:", fromP.Tables)
|
| | | datas, err := DumpTables(fromP.Tables)
|
| | | if err != nil {
|
| | | logger.Error("queryByGorm err:", err)
|
| | | if err := ev.Respond([]byte("queryByGorm err")); err != nil {
|
| | | logger.Error("query.Respond err: %s\n", err)
|
| | | return
|
| | | }
|
| | | return
|
| | | }
|
| | | bytesReturn, err := json.Marshal(datas)
|
| | | logger.Info("results.len: ", len(bytesReturn))
|
| | |
|
| | | var targetNode *memberlist.Node
|
| | | nodes := Agent.Serf().Memberlist().Members()
|
| | | if nodes != nil && len(nodes) > 0 {
|
| | | for _, n := range nodes {
|
| | | if n.Name == fromP.From {
|
| | | targetNode = n
|
| | | break
|
| | | }
|
| | | }
|
| | | }
|
| | | logger.Debug("targetNode:", targetNode.Name)
|
| | | if targetNode != nil {
|
| | | go func() {
|
| | | addr := targetNode.Addr.String() + ":" + strconv.Itoa(TcpTransportPort)
|
| | | sendErr := rawSendTcpMsg(addr, bytesReturn)
|
| | |
|
| | | logLT := strconv.Itoa(int(ev.LTime))
|
| | | logT := time.Now().Format("2006-01-02 15:04:05")
|
| | | logSql := strings.ReplaceAll("QueryEventUpdateDBData from "+targetNode.Name, "'", "''")
|
| | | logResult := "0"
|
| | | logErr := ""
|
| | | if sendErr == nil {
|
| | | logResult = "1"
|
| | | logger.Debug("sendToTcp success")
|
| | | } else {
|
| | | logErr = sendErr.Error()
|
| | | logger.Debug("sendToTcp err:", sendErr)
|
| | | }
|
| | |
|
| | | executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + targetNode.Name + "'," + logResult + ",'" + logErr + "')"})
|
| | | }()
|
| | | } else {
|
| | | logger.Debug("targetNode is nil")
|
| | | }
|
| | | }
|
| | |
|
| | | //处理其他的一些query请求
|
| | | func HandleOtherQuery(ev *serf.Query) {
|
| | | var reqBody RequestSerfTopicMsg
|
| | | var ret []byte
|
| | | if err := json.Unmarshal(ev.Payload, &reqBody); err != nil {
|
| | | ret = []byte(err.Error())
|
| | | } else {
|
| | | if err, data := QueryLocalProc(reqBody); err != nil {
|
| | | ret = []byte(err.Error())
|
| | | } else {
|
| | | b, e := json.Marshal(data)
|
| | | if e != nil {
|
| | | ret = []byte(e.Error())
|
| | | } else {
|
| | | ret = b
|
| | | }
|
| | | }
|
| | | }
|
| | |
|
| | | if err := ev.Respond(ret); err != nil {
|
| | | logger.Debug("HandleOtherQuery err:", err)
|
| | | return
|
| | | }
|
| | | }
|
| | |
|
| | | func HandleQueryRpc(ev *serf.Query) {
|
| | | var ret []byte
|
| | | var arg RpcParamTopic
|
| | | err := json.Unmarshal(ev.Payload, &arg)
|
| | | if err == nil {
|
| | | if f, ok := rpcHandlers[arg.Topic]; ok {
|
| | | resp, e := f(arg)
|
| | | if e == nil {
|
| | | if data, me := json.Marshal(resp); me == nil {
|
| | | ret = data
|
| | | } else {
|
| | | logger.Debug("marshal resp err:", e)
|
| | | }
|
| | | } else {
|
| | | logger.Debug("call f err:", e)
|
| | | }
|
| | | } else {
|
| | | logger.Debug("rpcHandlers not contains topic:", arg.Topic)
|
| | | }
|
| | | } else {
|
| | | logger.Debug("unmarshal RpcParamTopic err:", err)
|
| | | }
|
| | | if rErr := ev.Respond(ret); rErr != nil {
|
| | | logger.Debug("HandleQueryRpc err:", rErr)
|
| | | }
|
| | | }
|
| | |
|
| | | func HandleEventMemberLeave(ev serf.MemberEvent) {
|
| | | if ev.Members != nil && len(ev.Members) == 1 {
|
| | | leaveMember := ev.Members[0]
|
| | | leaveSql := "update cluster_node set isDelete=1 where node_id='" + leaveMember.Name + "'"
|
| | | flag, e := executeSqlByGorm([]string{leaveSql})
|
| | |
|
| | | logger.Info("EventMemberLeave,current Members:", ev.Members)
|
| | | logLT := ""
|
| | | logT := time.Now().Format("2006-01-02 15:04:05")
|
| | | logSql := strings.ReplaceAll(leaveSql, "'", "''")
|
| | | logResult := "0"
|
| | | if flag {
|
| | | logResult = "1"
|
| | | }
|
| | | logErr := ""
|
| | | if e != nil {
|
| | | logErr = e.Error()
|
| | | }
|
| | | executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + leaveMember.Name + "'," + logResult + ",'" + logErr + "')"})
|
| | | }
|
| | | }
|
| | |
|
| | | func HandleEventMemberJoin(ev serf.MemberEvent) {
|
| | | if ev.Members != nil && len(ev.Members) == 1 {
|
| | | leaveMember := ev.Members[0]
|
| | | joinSql := "update cluster_node set isDelete=0 where node_id='" + leaveMember.Name + "'"
|
| | | flag, e := executeSqlByGorm([]string{joinSql})
|
| | |
|
| | | logger.Info("EventMemberJoin,current Members:", ev.Members)
|
| | | logLT := ""
|
| | | logT := time.Now().Format("2006-01-02 15:04:05")
|
| | | logSql := strings.ReplaceAll(joinSql, "'", "''")
|
| | | logResult := "0"
|
| | | if flag {
|
| | | logResult = "1"
|
| | | }
|
| | | logErr := ""
|
| | | if e != nil {
|
| | | logErr = e.Error()
|
| | | }
|
| | | executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + leaveMember.Name + "'," + logResult + ",'" + logErr + "')"})
|
| | | }
|
| | | }
|