package serf
|
|
import (
|
"basic.com/valib/logger.git"
|
"basic.com/valib/serf.git/serf"
|
"encoding/json"
|
"github.com/golang/protobuf/proto"
|
"github.com/hashicorp/memberlist"
|
"github.com/satori/go.uuid"
|
"path/filepath"
|
"reflect"
|
"runtime"
|
"strconv"
|
"strings"
|
"time"
|
"vamicro/config"
|
"vamicro/system-service/bhome_msg_dev"
|
)
|
|
type RpcHandle func(arg RpcParamTopic) ([]serf.NodeResponse, error)
|
|
var rpcHandlers map[string]RpcHandle
|
|
func init() {
|
rpcHandlers = make(map[string]RpcHandle)
|
}
|
|
// RegisterRpcHandles
|
func RegisterRpcHandles(fs ...RpcHandle) {
|
for _, f := range fs {
|
name := runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name() //absolute package path name eg: a.b.c.d.FuncName
|
nameEnd := filepath.Ext(name)
|
name = strings.TrimPrefix(nameEnd, ".")
|
if _, ok := rpcHandlers[name]; !ok {
|
rpcHandlers[name] = f
|
}
|
}
|
}
|
|
/*****************************UserEvent***************************************/
|
func HandleUserEventSyncSql(ev serf.UserEvent) {
|
logger.Info("receive a UserEventSyncSql event")
|
var sqlUe SqlUserEvent
|
err := json.Unmarshal(ev.Payload, &sqlUe)
|
if err != nil {
|
logger.Error("sqlUe unmarshal err:", err)
|
return
|
}
|
|
logger.Info("ev.LTime:", ev.LTime, "owner:", sqlUe.Owner, "sql:", sqlUe.Sql)
|
if sqlUe.Owner != config.Server.AnalyServerId {
|
go func() {
|
flag, e := executeSqlByGorm(sqlUe.Sql)
|
logger.Info("ev.LTime:", ev.LTime, "userEvent exec ", sqlUe.Sql, ",Result:", flag, ", err:", e)
|
logLT := strconv.Itoa(int(ev.LTime))
|
logT := time.Now().Format("2006-01-02 15:04:05")
|
logSql := strings.ReplaceAll(strings.Join(sqlUe.Sql, ";"), "'", "''")
|
logResult := "0"
|
if flag {
|
logResult = "1"
|
}
|
logErr := ""
|
if e != nil {
|
logErr = e.Error()
|
}
|
executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + sqlUe.Owner + "'," + logResult + ",'" + logErr + "')"})
|
}()
|
}
|
}
|
|
func HandleUserEventSyncDbTablePersonCache(ev serf.UserEvent) {
|
logger.Info("LTime:", ev.LTime, ",ev.Payload.len:", len(ev.Payload))
|
SyncDbTablePersonCacheChan <- ev.Payload
|
}
|
|
func HandleUserEventSyncVirtualIp(ev serf.UserEvent) {
|
logger.Info("LTime:", ev.LTime, " Recevie virtualIp change")
|
SyncVirtualIpChan <- ev.Payload
|
}
|
|
//收到其它节点主动将注册中心的所有topic通知到集群中
|
func HandleSyncRegisterInfo(ev serf.UserEvent) {
|
logger.Debug("HandleSyncRegisterInfo")
|
var si bhome_msg_dev.MsgDevRegisterInfo
|
if err := proto.Unmarshal(ev.Payload, &si); err == nil {
|
logger.Debug("HandleSyncRegisterInfo si.DevId:", string(si.DevId), " config.Server.AnalyServerId:", config.Server.AnalyServerId)
|
if string(si.DevId) != config.Server.AnalyServerId {
|
compareRPool(&si)
|
}
|
} else {
|
logger.Error("HandleSyncRegisterInfo unmarshal err:", err)
|
}
|
}
|
|
func HandleDataSystemSerfSub(ev serf.UserEvent) {
|
h := GetBusHandle()
|
if h == nil {
|
logger.Error("HandleDataSystemSerfSub bus handle is nil")
|
return
|
}
|
err := h.Publish(DataSystemSerfSubscribe, ev.Payload)
|
if err != nil {
|
logger.Error("HandleDataSystemSerfSub pub err:", err)
|
}
|
}
|
|
/*****************************Query***************************************/
|
func HandleQueryEventUpdateDBData(ev *serf.Query) {
|
logger.Info("receive QueryEventUpdateDBData, current node:", config.Server.AnalyServerId)
|
var fromP QueryTableDataParam
|
err := json.Unmarshal(ev.Payload, &fromP)
|
if err != nil {
|
logger.Error("Query tableNames unmarshal err")
|
if err := ev.Respond([]byte("request unmarshal err")); err != nil {
|
logger.Error("query.Respond err: %s\n", err)
|
return
|
}
|
|
return
|
}
|
logger.Info("Query tableNames:", fromP.Tables)
|
datas, err := DumpTables(fromP.Tables)
|
if err != nil {
|
logger.Error("queryByGorm err:", err)
|
if err := ev.Respond([]byte("queryByGorm err")); err != nil {
|
logger.Error("query.Respond err: %s\n", err)
|
return
|
}
|
return
|
}
|
bytesReturn, err := json.Marshal(datas)
|
logger.Info("results.len: ", len(bytesReturn))
|
|
var targetNode *memberlist.Node
|
nodes := Agent.Serf().Memberlist().Members()
|
if nodes != nil && len(nodes) > 0 {
|
for _, n := range nodes {
|
if n.Name == fromP.From {
|
targetNode = n
|
break
|
}
|
}
|
}
|
logger.Debug("targetNode:", targetNode.Name)
|
if targetNode != nil {
|
go func() {
|
addr := targetNode.Addr.String() + ":" + strconv.Itoa(TcpTransportPort)
|
sendErr := rawSendTcpMsg(addr, bytesReturn)
|
|
logLT := strconv.Itoa(int(ev.LTime))
|
logT := time.Now().Format("2006-01-02 15:04:05")
|
logSql := strings.ReplaceAll("QueryEventUpdateDBData from "+targetNode.Name, "'", "''")
|
logResult := "0"
|
logErr := ""
|
if sendErr == nil {
|
logResult = "1"
|
logger.Debug("sendToTcp success")
|
} else {
|
logErr = sendErr.Error()
|
logger.Debug("sendToTcp err:", sendErr)
|
}
|
|
executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + targetNode.Name + "'," + logResult + ",'" + logErr + "')"})
|
}()
|
} else {
|
logger.Debug("targetNode is nil")
|
}
|
}
|
|
//处理其他的一些query请求
|
func HandleOtherQuery(ev *serf.Query) {
|
var reqBody RequestSerfTopicMsg
|
var ret []byte
|
if err := json.Unmarshal(ev.Payload, &reqBody); err != nil {
|
ret = []byte(err.Error())
|
} else {
|
if err, data := QueryLocalProc(reqBody); err != nil {
|
ret = []byte(err.Error())
|
} else {
|
b, e := json.Marshal(data)
|
if e != nil {
|
ret = []byte(e.Error())
|
} else {
|
ret = b
|
}
|
}
|
}
|
|
if err := ev.Respond(ret); err != nil {
|
logger.Debug("HandleOtherQuery err:", err)
|
return
|
}
|
}
|
|
func HandleQueryRpc(ev *serf.Query) {
|
var ret []byte
|
var arg RpcParamTopic
|
err := json.Unmarshal(ev.Payload, &arg)
|
if err == nil {
|
if f, ok := rpcHandlers[arg.Topic]; ok {
|
resp, e := f(arg)
|
if e == nil {
|
if data, me := json.Marshal(resp); me == nil {
|
ret = data
|
} else {
|
logger.Debug("marshal resp err:", e)
|
}
|
} else {
|
logger.Debug("call f err:", e)
|
}
|
} else {
|
logger.Debug("rpcHandlers not contains topic:", arg.Topic)
|
}
|
} else {
|
logger.Debug("unmarshal RpcParamTopic err:", err)
|
}
|
if rErr := ev.Respond(ret); rErr != nil {
|
logger.Debug("HandleQueryRpc err:", rErr)
|
}
|
}
|
|
func HandleEventMemberLeave(ev serf.MemberEvent) {
|
if ev.Members != nil && len(ev.Members) == 1 {
|
leaveMember := ev.Members[0]
|
leaveSql := "update cluster_node set isDelete=1 where node_id='" + leaveMember.Name + "'"
|
flag, e := executeSqlByGorm([]string{leaveSql})
|
|
logger.Info("EventMemberLeave,current Members:", ev.Members)
|
logLT := ""
|
logT := time.Now().Format("2006-01-02 15:04:05")
|
logSql := strings.ReplaceAll(leaveSql, "'", "''")
|
logResult := "0"
|
if flag {
|
logResult = "1"
|
}
|
logErr := ""
|
if e != nil {
|
logErr = e.Error()
|
}
|
executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + leaveMember.Name + "'," + logResult + ",'" + logErr + "')"})
|
}
|
}
|
|
func HandleEventMemberJoin(ev serf.MemberEvent) {
|
if ev.Members != nil && len(ev.Members) == 1 {
|
leaveMember := ev.Members[0]
|
joinSql := "update cluster_node set isDelete=0 where node_id='" + leaveMember.Name + "'"
|
flag, e := executeSqlByGorm([]string{joinSql})
|
|
logger.Info("EventMemberJoin,current Members:", ev.Members)
|
logLT := ""
|
logT := time.Now().Format("2006-01-02 15:04:05")
|
logSql := strings.ReplaceAll(joinSql, "'", "''")
|
logResult := "0"
|
if flag {
|
logResult = "1"
|
}
|
logErr := ""
|
if e != nil {
|
logErr = e.Error()
|
}
|
executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + leaveMember.Name + "'," + logResult + ",'" + logErr + "')"})
|
}
|
}
|