package serf
|
|
import (
|
"context"
|
"encoding/json"
|
"fmt"
|
"os"
|
"os/signal"
|
"regexp"
|
"strings"
|
"syscall"
|
"time"
|
|
"apsClient/pkg/logx"
|
|
"basic.com/pubsub/protomsg.git"
|
"basic.com/valib/bhomeclient.git"
|
"basic.com/valib/bhomedbapi.git"
|
|
"github.com/gogo/protobuf/proto"
|
"github.com/jinzhu/gorm"
|
"github.com/satori/go.uuid"
|
"github.com/mitchellh/mapstructure"
|
"github.com/muesli/cache2go"
|
)
|
|
var (
|
agent = SyncServer{}
|
dependProcs = []string{
|
bhomeclient.Proc_System_Service,
|
}
|
|
sqlMsgSeqCache = cache2go.Cache("syncSqlMsg")
|
)
|
|
const (
|
serfSyncTopic = "sync-proc-message-to-serf"
|
|
EventCreateCluster = 0
|
EventJoinCluster = 1
|
EventLeaveCluster = 2
|
EventMaster2Slave = 3
|
EventSlave2Master = 4
|
)
|
|
type ProcMessageEvent struct {
|
Owner string `json:"owner"` // 发送者
|
Target string `json:"target"` // 指定接收者
|
Proc string `json:"procName"` // 进程名
|
Topic string `json:"topic"` // 主题
|
Payload []byte `json:"payload"` // 消息体,自行解析
|
}
|
|
type SqlMsg struct {
|
Id string
|
Sql string
|
Version string
|
}
|
|
type SyncServer struct {
|
ProcName string // 进程名称
|
ServerId string // 本机id
|
ClusterStatus string // 集群状态 master/slave 为空表示未加入集群
|
syncSqlTopic string // 同步sql消息的主题
|
queryTableTopic string // 加入集群后请求集群数据的主题
|
syncTables []string // 需要同步的表
|
sqlDB *gorm.DB // 数据库
|
bhClient *bhomeclient.MicroNode
|
clusterEventFn func(int)
|
}
|
|
func InitAgent(procName string, syncTables []string, db *gorm.DB) *SyncServer {
|
agent.ProcName = procName
|
agent.ServerId = Vasystem.ServerID
|
agent.sqlDB = db
|
agent.syncTables = syncTables
|
agent.syncSqlTopic = procName + "/serf/sync/sql"
|
agent.queryTableTopic = procName + "/serf/query/sqls"
|
|
// 设置日志回调
|
db.SetLogger(&agent)
|
|
// 先关闭日志
|
//db.LogMode(false)
|
|
return &agent
|
}
|
|
func (ss *SyncServer) RegisterClusterEvent(fn func(int)) {
|
ss.clusterEventFn = fn
|
}
|
|
func (ss *SyncServer) Serve(initChan chan bool) {
|
proc := &bhomeclient.ProcInfo{
|
Name: ss.ProcName, //进程名称
|
ID: ss.ProcName, //进程id
|
Info: "", //进程的描述信息,用于区分同一进程名称下多个进程
|
}
|
|
ctx, cancel := context.WithCancel(context.Background())
|
var reg = &bhomeclient.RegisterInfo{
|
Proc: *proc,
|
Channel: nil,
|
PubTopic: []string{},
|
SubTopic: []string{bhomeclient.Proc_System_Service, ss.syncSqlTopic, ss.queryTableTopic},
|
SubNetTopic: []string{},
|
}
|
|
q := make(chan os.Signal, 1)
|
signal.Notify(q, os.Interrupt, os.Kill, syscall.SIGTERM)
|
|
client, err := bhomeclient.NewMicroNode(ctx, q, ss.ServerId, reg, nil)
|
if err != nil {
|
initChan <- false
|
return
|
}
|
|
bhomedbapi.InitGetNetNode(client.GetLocalNetNodeByTopic)
|
bhomedbapi.InitDoReq(client.RequestOnly)
|
//bhomedbapi.InitLog(logger.Debug)
|
|
// 需要等待system-service进程成功启动后,才能获取集群状态(或者保证程序启动时获取到正确的状态)
|
tryTimes := 0
|
loop:
|
for {
|
select {
|
case <-q:
|
initChan <- false
|
return
|
default:
|
if tryTimes < 15 {
|
clients, err := client.GetRegisteredClient()
|
if err == nil && len(clients) > 0 {
|
var existingProcs []string
|
for _, c := range clients {
|
if c.Online {
|
existingProcs = append(existingProcs, string(c.Proc.ProcId))
|
}
|
}
|
if diff := arrayContains(existingProcs, dependProcs); diff == "" {
|
break loop
|
} else {
|
logx.Errorf("Proc: %s is not running!", diff)
|
time.Sleep(time.Second * 1)
|
}
|
} else {
|
tryTimes++
|
time.Sleep(time.Second * 5)
|
}
|
} else {
|
logx.Errorf("tried 15 times, client.GetRegisteredClient failed")
|
initChan <- false
|
return
|
}
|
}
|
}
|
|
go client.StartServer(nil)
|
|
ss.bhClient = client
|
|
go ss.subBusMessage(ctx)
|
|
go ss.handleDbLoggerPrint()
|
|
// 启动后查询一次集群状态
|
ss.QueryClusterStat()
|
|
//if ss.ClusterStatus != "" {
|
//ss.sqlDB.LogMode(true)
|
//}
|
|
initChan <- true
|
<-q
|
|
client.DeRegister()
|
cancel()
|
client.Free()
|
|
os.Exit(0)
|
}
|
|
func (ss *SyncServer) pubSyncSqlMessage(sql string, targetId string) error {
|
sqlMsg := SqlMsg{
|
Id: uuid.NewV4().String(),
|
Sql: sql,
|
}
|
|
bMsg, _ := json.Marshal(sqlMsg)
|
|
var msg = ProcMessageEvent{
|
Owner: ss.ServerId,
|
Target: targetId,
|
Proc: ss.ProcName,
|
Topic: ss.syncSqlTopic,
|
Payload: bMsg,
|
}
|
|
b, err := json.Marshal(msg)
|
if err != nil {
|
return err
|
}
|
|
return ss.bhClient.Publish(serfSyncTopic, b)
|
}
|
|
// 请求同步表的全量数据, 发送自己的id
|
func (ss *SyncServer) pubSyncTableMessage() error {
|
var msg = ProcMessageEvent{
|
Owner: ss.ServerId,
|
Proc: ss.ProcName,
|
Topic: ss.queryTableTopic,
|
Payload: []byte(ss.ServerId),
|
}
|
|
b, err := json.Marshal(msg)
|
if err != nil {
|
return err
|
}
|
|
logx.Debugf("加入集群, 请求同步全量数据,id:%s", ss.ServerId)
|
return ss.bhClient.Publish(serfSyncTopic, b)
|
}
|
|
func (ss *SyncServer) subBusMessage(ctx context.Context) {
|
//fmt.Println("sub bus msg")
|
|
for {
|
select {
|
case <-ctx.Done():
|
fmt.Println("sub bus msg exit")
|
return
|
case busMsg := <-ss.bhClient.SubCh:
|
if string(busMsg.Topic) == ss.syncSqlTopic {
|
ss.handleClusterMessage(busMsg.Data)
|
}
|
|
// 处理同步全量数据的请求
|
if string(busMsg.Topic) == ss.queryTableTopic {
|
if ss.ClusterStatus == "master" {
|
logx.Debugf("接收到同步全量数据请求.")
|
ss.handleSyncTableMessage(busMsg.Data)
|
}
|
}
|
|
// system-service发送的消息
|
if string(busMsg.Topic) == bhomeclient.Proc_System_Service {
|
var clusterMsg = &protomsg.DbChangeMessage{}
|
|
if err := proto.Unmarshal(busMsg.Data, clusterMsg); err != nil {
|
if err = json.Unmarshal(busMsg.Data, clusterMsg); err != nil {
|
fmt.Println("proto.Unmarshal ", err.Error())
|
continue
|
}
|
}
|
|
if clusterMsg.Table == protomsg.TableChanged_T_Cluster {
|
switch clusterMsg.Info {
|
case "create":
|
// 创建集群, 开启日志跟踪, 设置角色master
|
ss.clusterEventFn(EventCreateCluster)
|
ss.ClusterStatus = "master"
|
//ss.sqlDB.LogMode(true)
|
|
case "join":
|
// 加入集群, 开启日志跟踪, 设置角色slave
|
ss.clusterEventFn(EventJoinCluster)
|
ss.onJoinCluster()
|
ss.ClusterStatus = "slave"
|
//ss.sqlDB.LogMode(true)
|
|
case "leave":
|
// 退出集群, 开启日志跟踪, 设置角色slave
|
ss.clusterEventFn(EventLeaveCluster)
|
ss.ClusterStatus = ""
|
//ss.sqlDB.LogMode(true)
|
case "slave2master":
|
ss.clusterEventFn(EventSlave2Master)
|
ss.ClusterStatus = "master"
|
//ss.sqlDB.LogMode(true)
|
case "master2slave":
|
ss.clusterEventFn(EventMaster2Slave)
|
ss.ClusterStatus = "slave"
|
//ss.sqlDB.LogMode(true)
|
}
|
}
|
}
|
}
|
}
|
}
|
|
// 加入集群, 清空本地表, 同步集群内数据
|
func (ss *SyncServer) onJoinCluster() {
|
var err error
|
|
db := ss.sqlDB
|
|
tx := db.Begin()
|
defer func() {
|
if err != nil && tx != nil {
|
tx.Rollback()
|
}
|
}()
|
|
tx.Exec("PRAGMA foreign_keys=OFF")
|
//1.删除本地的同步库数据
|
for _, t := range ss.syncTables {
|
delSql := "delete from " + t + ""
|
|
err = tx.Exec(delSql).Error
|
if err != nil {
|
logx.Errorf("删除本地的同步库数据失败, %s", err.Error())
|
}
|
}
|
|
//4.开启reference
|
tx.Exec("PRAGMA foreign_keys=ON")
|
tx.Commit()
|
|
// 拉取集群内的同步库数据到本地数据库表中
|
ss.pubSyncTableMessage()
|
}
|
|
func (ss *SyncServer) onLeaveCluster() {
|
|
}
|
|
func (ss *SyncServer) onCreateCluster() {
|
|
}
|
|
// 查询集群状态, 返回 master, slave, leave
|
func (ss *SyncServer) QueryClusterStat() *bhomeclient.Reply {
|
clusterStatTopic := "/data/api-v/cluster/status"
|
req := bhomeclient.Request{
|
Path: clusterStatTopic,
|
Method: "POST",
|
}
|
|
reply, err := ss.bhClient.RequestTopic(ss.ServerId, req, 3000)
|
if err != nil {
|
fmt.Println("RequestTopic error", err.Error())
|
|
return reply
|
}
|
|
ss.ClusterStatus = reply.Msg
|
|
logx.Debugf("当前集群状态: %s", ss.ClusterStatus)
|
|
return reply
|
}
|
|
func (ss *SyncServer) handleDbLoggerPrint() {
|
sqlBuf := make([]string, 0)
|
ticker := time.NewTicker(3 * time.Second)
|
sendSize := 0 //serf MaxUserEventSize is 9*1024
|
for {
|
select {
|
case <-ticker.C:
|
if len(sqlBuf) > 0 {
|
syncSql := strings.Join(sqlBuf, "")
|
|
//fmt.Println("同步sql语句:", syncSql)
|
ss.pubSyncSqlMessage(syncSql, "")
|
|
sqlBuf = append([]string{})
|
sendSize = 0
|
}
|
case sql := <-syncSqlChan:
|
if sendSize+len(sql) > (9*1024 - 1024) {
|
if len(sqlBuf) > 0 {
|
syncSql := strings.Join(sqlBuf, "")
|
//fmt.Println("同步sql语句:", syncSql)
|
|
ss.pubSyncSqlMessage(syncSql, "")
|
|
sqlBuf = append([]string{})
|
}
|
|
s := strings.TrimRight(sql, ";")
|
sqlBuf = append(sqlBuf, s+";")
|
sendSize = len(sql)
|
} else {
|
s := strings.TrimRight(sql, ";")
|
sqlBuf = append(sqlBuf, s+";")
|
|
sendSize = sendSize + len(sql)
|
}
|
}
|
}
|
}
|
|
func (ss *SyncServer) handleClusterMessage(clusterMsgData []byte) {
|
var msg SqlMsg
|
err := json.Unmarshal(clusterMsgData,&msg)
|
if err != nil {
|
logx.Errorf(" Unmarshal cluster message error, %s",err.Error())
|
return
|
}
|
|
// 判断消息是否曾经接收过
|
if sqlMsgSeqCache.Exists(msg.Id) {
|
logx.Infof("clusterMessage:接收到重复消息, %s", msg.Sql)
|
return
|
}
|
|
// 记录消息id, 半小时过期
|
sqlMsgSeqCache.Add(msg.Id, 30*time.Minute, true)
|
|
logx.Infof("clusterMessage:%s", msg.Sql)
|
sql := msg.Sql
|
|
if len(sql) <= 0 {
|
return
|
}
|
|
db := ss.sqlDB
|
if db != nil {
|
db.LogMode(false)
|
defer db.LogMode(true)
|
|
var err error
|
tx := db.Begin()
|
defer func() {
|
if err != nil && tx != nil {
|
tx.Rollback()
|
}
|
}()
|
result := tx.Exec(sql)
|
err = result.Error
|
if err != nil {
|
fmt.Println("ExecuteSqlByGorm err:", err, ",sql:", sql)
|
}
|
if result.RowsAffected == 0 {
|
fmt.Println("ExecuteSqlByGorm RowsAffected == 0", ",sql:", sql)
|
}
|
tx.Commit()
|
}
|
}
|
|
// serf 同步数据的限制为92160 byte
|
func (ss *SyncServer) handleSyncTableMessage(msg []byte) error {
|
sizeLimit := 61440
|
targetId := string(msg)
|
|
//fmt.Println("同步全量数据给节点:", targetId)
|
sqls, err := DumpTables(ss.sqlDB, ss.syncTables)
|
if err != nil {
|
logx.Errorf("DumpTables error: %s", err.Error())
|
return err
|
}
|
|
logx.Infof("DumpTables sql:%v", sqls)
|
syncSql := strings.Join(sqls, ";")
|
if len(syncSql) < sizeLimit {
|
err = ss.pubSyncSqlMessage(syncSql, targetId)
|
} else {
|
shard := ""
|
for _, sql := range sqls {
|
if len(shard)+len(sql) > sizeLimit {
|
err = ss.pubSyncSqlMessage(shard, targetId)
|
shard = ""
|
}
|
|
shard = fmt.Sprintf("%s%s;", shard, sql)
|
}
|
|
if len(shard) > 0 {
|
err = ss.pubSyncSqlMessage(shard, targetId)
|
}
|
}
|
|
return err
|
}
|
|
func (ss *SyncServer) Print(values ...interface{}) {
|
var (
|
level = values[0]
|
)
|
|
//fmt.Println("dblogger", values)
|
|
if level == "sql" {
|
msgArr := gorm.LogFormatter(values...)
|
sql := msgArr[3].(string)
|
logx.Infof("sql: %v", sql)
|
sql = strings.TrimPrefix(sql, " ")
|
if !strings.HasPrefix(sql, "SELECT") && !strings.HasPrefix(sql, "select") && !strings.Contains(sql, "PRAGMA") && !strings.Contains(sql, "pragma") {
|
affected := values[5].(int64)
|
if affected > 0 { //执行成功
|
//判断操作的是哪张表
|
whereIdx := strings.Index(sql, "WHERE")
|
sqlWithTable := sql
|
if whereIdx > -1 {
|
sqlWithTable = sql[:whereIdx]
|
}
|
|
//fmt.Println("判断是哪张表 sqlWithTable:", sqlWithTable)
|
|
insertReg := regexp.MustCompile(`^\s*(?i:insert)\s`) //insert
|
updateReg := regexp.MustCompile(`^\s*(?i:update)\s`) //update
|
delReg := regexp.MustCompile(`^\s*(?i:delete)\s`) //delete
|
|
if insertReg.MatchString(sqlWithTable) {
|
//fmt.Println("插入操作")
|
for _, t := range agent.syncTables {
|
reg := regexp.MustCompile(`\s+\"?(?i:` + t + `)\"?\s+`)
|
if reg.MatchString(sqlWithTable) {
|
//fmt.Println("属于同步表:", t)
|
// 判断是在集群内, 同步消息, 判断两种角色, 为避免其他出现状态
|
if ss.ClusterStatus == "master" || ss.ClusterStatus == "slave" {
|
syncSqlChan <- sql
|
}
|
}
|
}
|
} else if updateReg.MatchString(sqlWithTable) || delReg.MatchString(sqlWithTable) {
|
//fmt.Println("删除或者更新")
|
for _, t := range agent.syncTables {
|
reg := regexp.MustCompile(`\s+\"?(?i:` + t + `)\"?\s+`)
|
if reg.MatchString(sqlWithTable) {
|
//fmt.Println("属于同步表:", t)
|
if ss.ClusterStatus == "master" || ss.ClusterStatus == "slave" {
|
syncSqlChan <- sql
|
}
|
}
|
}
|
}
|
}
|
}
|
} else {
|
fmt.Println("dbLogger level!=sql")
|
}
|
}
|
|
func arrayContains(list []string, arr []string) string {
|
if arr == nil || list == nil {
|
return ""
|
}
|
|
for _, s := range arr {
|
isExist := false
|
for _, t := range list {
|
if s == t {
|
isExist = true
|
break
|
}
|
}
|
|
if !isExist {
|
return s
|
}
|
}
|
|
return ""
|
}
|
|
type NodeInfo struct {
|
NodeID string `json:"node_id,omitempty"`
|
NodeIp string `json:"node_ip,omitempty"`
|
NodeName string `json:"node_name,omitempty"`
|
ClusterID string `json:"cluster_id"`
|
CreateTime string `json:"create_time"`
|
DeviceType string `json:"device_type"`
|
DriftState string `json:"drift_state"`
|
Online bool `json:"online"`
|
}
|
|
func QueryClusterStatusAndNodeQuantity() (string, int) {
|
reply := agent.QueryClusterStat()
|
if reply == nil {
|
return "", 0
|
}
|
var nodes []NodeInfo
|
err := mapstructure.Decode(reply.Data, &nodes)
|
if err != nil {
|
logx.Errorf("mapstructure.Decode QueryClusterStat data err:%v", err)
|
return reply.Msg, 0
|
}
|
return reply.Msg, len(nodes)
|
}
|