package serf
|
|
import (
|
"basic.com/syncdb.git"
|
"basic.com/valib/logger.git"
|
"basic.com/valib/serf.git/serf"
|
"encoding/json"
|
"errors"
|
"fmt"
|
"strings"
|
"sync"
|
"time"
|
"vamicro/config"
|
"vamicro/system-service/models"
|
)
|
|
const (
|
QueryEventUpdateDBData = "UpdateDBData"
|
QueryNodesByTopic = "queryNodeByTopic"
|
QueryRpc = "queryRpc"
|
UserEventSyncSql = "SyncSql"
|
UserEventSyncDbTablePersonCache = "SyncCache"
|
UserEventSyncVirtualIp = "SyncVirtualIp" //漂移ip修改
|
UserEventSyncRegisterInfo = "SyncRegisterInfo" //同步注册信息
|
DataSystemSerfSubscribe = "data-system-serf-subscribe" //各app从serf订阅消息
|
UserEventSyncMessage = "SyncMessageForProc" // 为其他进程同步消息
|
TcpTransportPort = 30194 //tcp传输大数据量接口
|
|
SUserEventSyncMessage
|
)
|
|
var SyncDbTablePersonCacheChan = make(chan []byte, 512)
|
var SyncVirtualIpChan = make(chan []byte, 512)
|
var SyncProcMessageChan = make(chan []byte, 512)
|
|
func HandleSerfEvent(event serf.Event) {
|
switch ev := event.(type) {
|
case serf.UserEvent:
|
if ev.Name == UserEventSyncSql {
|
HandleUserEventSyncSql(ev)
|
} else if ev.Name == UserEventSyncDbTablePersonCache {
|
HandleUserEventSyncDbTablePersonCache(ev)
|
} else if ev.Name == UserEventSyncVirtualIp {
|
HandleUserEventSyncVirtualIp(ev)
|
} else if ev.Name == UserEventSyncRegisterInfo {
|
HandleSyncRegisterInfo(ev)
|
} else if ev.Name == DataSystemSerfSubscribe {
|
HandleDataSystemSerfSub(ev)
|
} else if ev.Name == UserEventSyncMessage {
|
logger.Debug("接收到SyncMessageForProc")
|
HandleUserEventSyncMessage(ev)
|
}
|
case *serf.Query:
|
if ev.Name == QueryEventUpdateDBData {
|
HandleQueryEventUpdateDBData(ev)
|
} else if ev.Name == QueryNodesByTopic {
|
HandleOtherQuery(ev)
|
} else if ev.Name == QueryRpc {
|
HandleQueryRpc(ev)
|
}
|
case serf.MemberEvent:
|
if event.EventType() == serf.EventMemberLeave {
|
HandleEventMemberLeave(ev)
|
} else if event.EventType() == serf.EventMemberJoin {
|
HandleEventMemberJoin(ev)
|
}
|
|
default:
|
logger.Warn("Unknown event type: %s\n", ev.EventType().String())
|
}
|
}
|
|
func executeSqlByGorm(sqls []string) (bool, error) {
|
if len(sqls) > 0 {
|
db := models.GetDB()
|
if db != nil {
|
db.LogMode(false)
|
defer db.LogMode(true)
|
var err error
|
tx := db.Begin()
|
defer func() {
|
if err != nil && tx != nil {
|
tx.Rollback()
|
}
|
}()
|
for _, sql := range sqls {
|
result := tx.Exec(sql)
|
err = result.Error
|
if err != nil {
|
logger.Error("ExecuteSqlByGorm err:", err, ",sql:", sql)
|
return false, err
|
}
|
if result.RowsAffected == 0 {
|
logger.Debug("ExecuteSqlByGorm RowsAffected == 0", ",sql:", sql)
|
err = errors.New("ExecuteSqlByGorm RowsAffected == 0")
|
return false, err
|
}
|
}
|
tx.Commit()
|
return true, nil
|
}
|
return false, errors.New("db handle is nil")
|
}
|
return true, nil
|
}
|
|
type SqlUserEvent struct {
|
Owner string `json:"owner"`
|
Sql []string `json:"sql"`
|
}
|
|
type ProcMessageEvent struct {
|
Owner string `json:"owner"` // 发送者
|
Target string `json:"target"` // 指定接收者
|
Proc string `json:"procName"` // 进程名
|
Topic string `json:"topic"` // 主题
|
Payload []byte `json:"payload"` // 消息体,自行解析
|
}
|
|
type TableDesc struct {
|
Cid int `json:"cid"`
|
Name string `json:"name"`
|
Type string `json:"type"`
|
Notnull bool `json:"notnull"`
|
DFltValue interface{} `json:"dflt_value"`
|
Pk int `json:"pk"`
|
}
|
|
type DumpSql struct {
|
Sql string `json:"sql"`
|
}
|
|
const (
|
DbT_TableName = "dbtables"
|
DBP_TableName = "dbtablepersons"
|
)
|
|
func DumpTables(tableNames []string) ([]string, error) {
|
db := models.GetDB()
|
db.LogMode(false)
|
defer db.LogMode(true)
|
if tableNames != nil {
|
var arr []string
|
var dumpSql []DumpSql
|
for _, table := range tableNames {
|
logger.Info("dump current tableName:", table)
|
dumpSql = make([]DumpSql, 0)
|
var tDescArr []TableDesc
|
tSql := fmt.Sprintf(`PRAGMA table_info("%s")`, table)
|
err := db.Raw(tSql).Scan(&tDescArr).Error
|
logger.Debug("tDescArr err:", err, "len(tDescArr)=", len(tDescArr))
|
if err != nil {
|
return nil, errors.New("tableDesc err")
|
}
|
logger.Info(table, "'Columns is:", tDescArr)
|
if tDescArr == nil || len(tDescArr) == 0 {
|
return nil, errors.New(table + " has no column")
|
}
|
var columnNames []string
|
for _, col := range tDescArr {
|
columnNames = append(columnNames, fmt.Sprintf(`'||quote("%s")||'`, col.Name))
|
}
|
if table == DbT_TableName {
|
tSql = fmt.Sprintf(`SELECT 'INSERT INTO "%s" VALUES(%s)' as sql FROM "%s" where (analyServerId='' or analyServerId is NULL);`,
|
table,
|
strings.Join(columnNames, ","),
|
table)
|
} else if table == DBP_TableName {
|
tSql = fmt.Sprintf(`SELECT 'INSERT INTO "%s" VALUES(%s)' as sql FROM "%s" where tableId in (select id from dbTables where (analyServerId='' or analyServerId is NULL));`,
|
table,
|
strings.Join(columnNames, ","),
|
table)
|
} else {
|
tSql = fmt.Sprintf(`SELECT 'INSERT INTO "%s" VALUES(%s)' as sql FROM "%s";`,
|
table,
|
strings.Join(columnNames, ","),
|
table)
|
}
|
|
logger.Info("tSql:", tSql)
|
|
err = db.Raw(tSql).Scan(&dumpSql).Error
|
logger.Debug("dump err:", err)
|
if err != nil {
|
return nil, errors.New("dump err")
|
}
|
if len(dumpSql) > 0 {
|
for _, d := range dumpSql {
|
arr = append(arr, d.Sql)
|
}
|
}
|
|
}
|
return arr, nil
|
}
|
return nil, errors.New("tableNames is nil")
|
}
|
|
type QueryTableDataParam struct {
|
Tables []string `json:"tables"`
|
From string `json:"from"`
|
}
|
|
var QueryTcpResponseChan = make(chan []byte)
|
|
func GetTableDataFromCluster(a *syncdb.Agent, clusterId string, tableNames []string, timeout time.Duration) (*[]string, error) {
|
//members: get name of first member
|
mbs := a.GroupMembers(clusterId)
|
var specmembername string
|
for _, m := range mbs {
|
logger.Info("member", m)
|
if m.Name != config.Server.AnalyServerId { //前缀:DSVAD:分析服务器 DSPAD:进出入pad
|
if strings.HasPrefix(config.Server.AnalyServerId, "DSVAD") {
|
if strings.HasPrefix(m.Name, "DSVAD") {
|
specmembername = m.Name
|
break
|
}
|
} else {
|
specmembername = m.Name
|
break
|
}
|
}
|
}
|
logger.Info("members:", mbs, "specmembername:", specmembername)
|
if specmembername == "" { //如果未找到目标节点,说明当前集群内除了本节点,没有其他可用节点
|
return nil, errors.New("specmembername not found")
|
}
|
|
//query: get db file.
|
params := serf.QueryParam{
|
FilterNodes: strings.Fields(specmembername),
|
}
|
|
//get db tables
|
var fromP = QueryTableDataParam{
|
Tables: tableNames,
|
From: config.Server.AnalyServerId,
|
}
|
tBytes, _ := json.Marshal(fromP)
|
|
resp, err := a.Query(QueryEventUpdateDBData, tBytes, ¶ms)
|
if err == nil || !strings.Contains(err.Error(), "cannot contain") {
|
logger.Error("err: ", err)
|
}
|
logger.Info("Query.resp.err:", err, "resp:", resp)
|
|
var dumpSqls []string
|
|
var wg sync.WaitGroup
|
wg.Add(1)
|
ticker := time.NewTicker(timeout)
|
go func(tk *time.Ticker) {
|
defer tk.Stop()
|
defer wg.Done()
|
for {
|
select {
|
case <-tk.C:
|
return
|
case msg := <-QueryTcpResponseChan:
|
logger.Info("Query response's len:", len(msg))
|
err := json.Unmarshal(msg, &dumpSqls)
|
if err == nil {
|
//logger.Error("dumpSql:", dumpSqls)
|
logger.Debug("data dump success")
|
}
|
return
|
}
|
}
|
}(ticker)
|
wg.Wait()
|
return &dumpSqls, nil
|
}
|
|
func SyncSql(sqlOp []string) {
|
var sqlUe = SqlUserEvent{
|
Owner: config.Server.AnalyServerId,
|
Sql: sqlOp,
|
}
|
ueB, err := json.Marshal(sqlUe)
|
if err != nil {
|
logger.Error("sqlUE marshal err:", err)
|
return
|
}
|
err = Agent.UserEvent(UserEventSyncSql, ueB, false)
|
if err != nil {
|
logger.Error("sending sync sql event err: ", err)
|
}
|
}
|