package serf import ( "context" "encoding/json" "fmt" "os" "os/signal" "regexp" "strings" "syscall" "time" "apsClient/pkg/logx" "basic.com/pubsub/protomsg.git" "basic.com/valib/bhomeclient.git" "basic.com/valib/bhomedbapi.git" "github.com/gogo/protobuf/proto" "github.com/jinzhu/gorm" "github.com/satori/go.uuid" "github.com/mitchellh/mapstructure" "github.com/muesli/cache2go" ) var ( agent = SyncServer{} dependProcs = []string{ bhomeclient.Proc_System_Service, } sqlMsgSeqCache = cache2go.Cache("syncSqlMsg") ) const ( serfSyncTopic = "sync-proc-message-to-serf" EventCreateCluster = 0 EventJoinCluster = 1 EventLeaveCluster = 2 EventMaster2Slave = 3 EventSlave2Master = 4 ) type ProcMessageEvent struct { Owner string `json:"owner"` // 发送者 Target string `json:"target"` // 指定接收者 Proc string `json:"procName"` // 进程名 Topic string `json:"topic"` // 主题 Payload []byte `json:"payload"` // 消息体,自行解析 } type SqlMsg struct { Id string Sql string Version string } type SyncServer struct { ProcName string // 进程名称 ServerId string // 本机id ClusterStatus string // 集群状态 master/slave 为空表示未加入集群 syncSqlTopic string // 同步sql消息的主题 queryTableTopic string // 加入集群后请求集群数据的主题 syncTables []string // 需要同步的表 sqlDB *gorm.DB // 数据库 bhClient *bhomeclient.MicroNode clusterEventFn func(int) } func InitAgent(procName string, syncTables []string, db *gorm.DB) *SyncServer { agent.ProcName = procName agent.ServerId = Vasystem.ServerID agent.sqlDB = db agent.syncTables = syncTables agent.syncSqlTopic = procName + "/serf/sync/sql" agent.queryTableTopic = procName + "/serf/query/sqls" // 设置日志回调 db.SetLogger(&agent) // 先关闭日志 //db.LogMode(false) return &agent } func (ss *SyncServer) RegisterClusterEvent(fn func(int)) { ss.clusterEventFn = fn } func (ss *SyncServer) Serve(initChan chan bool) { proc := &bhomeclient.ProcInfo{ Name: ss.ProcName, //进程名称 ID: ss.ProcName, //进程id Info: "", //进程的描述信息,用于区分同一进程名称下多个进程 } ctx, cancel := context.WithCancel(context.Background()) var reg = &bhomeclient.RegisterInfo{ Proc: *proc, Channel: nil, PubTopic: []string{}, SubTopic: []string{bhomeclient.Proc_System_Service, ss.syncSqlTopic, ss.queryTableTopic}, SubNetTopic: []string{}, } q := make(chan os.Signal, 1) signal.Notify(q, os.Interrupt, os.Kill, syscall.SIGTERM) client, err := bhomeclient.NewMicroNode(ctx, q, ss.ServerId, reg, nil) if err != nil { initChan <- false return } bhomedbapi.InitGetNetNode(client.GetLocalNetNodeByTopic) bhomedbapi.InitDoReq(client.RequestOnly) //bhomedbapi.InitLog(logger.Debug) // 需要等待system-service进程成功启动后,才能获取集群状态(或者保证程序启动时获取到正确的状态) tryTimes := 0 loop: for { select { case <-q: initChan <- false return default: if tryTimes < 15 { clients, err := client.GetRegisteredClient() if err == nil && len(clients) > 0 { var existingProcs []string for _, c := range clients { if c.Online { existingProcs = append(existingProcs, string(c.Proc.ProcId)) } } if diff := arrayContains(existingProcs, dependProcs); diff == "" { break loop } else { logx.Errorf("Proc: %s is not running!", diff) time.Sleep(time.Second * 1) } } else { tryTimes++ time.Sleep(time.Second * 5) } } else { logx.Errorf("tried 15 times, client.GetRegisteredClient failed") initChan <- false return } } } go client.StartServer(nil) ss.bhClient = client go ss.subBusMessage(ctx) go ss.handleDbLoggerPrint() // 启动后查询一次集群状态 ss.QueryClusterStat() //if ss.ClusterStatus != "" { //ss.sqlDB.LogMode(true) //} initChan <- true <-q client.DeRegister() cancel() client.Free() os.Exit(0) } func (ss *SyncServer) pubSyncSqlMessage(sql string, targetId string) error { sqlMsg := SqlMsg{ Id: uuid.NewV4().String(), Sql: sql, } bMsg, _ := json.Marshal(sqlMsg) var msg = ProcMessageEvent{ Owner: ss.ServerId, Target: targetId, Proc: ss.ProcName, Topic: ss.syncSqlTopic, Payload: bMsg, } b, err := json.Marshal(msg) if err != nil { return err } return ss.bhClient.Publish(serfSyncTopic, b) } // 请求同步表的全量数据, 发送自己的id func (ss *SyncServer) pubSyncTableMessage() error { var msg = ProcMessageEvent{ Owner: ss.ServerId, Proc: ss.ProcName, Topic: ss.queryTableTopic, Payload: []byte(ss.ServerId), } b, err := json.Marshal(msg) if err != nil { return err } logx.Debugf("加入集群, 请求同步全量数据,id:%s", ss.ServerId) return ss.bhClient.Publish(serfSyncTopic, b) } func (ss *SyncServer) subBusMessage(ctx context.Context) { //fmt.Println("sub bus msg") for { select { case <-ctx.Done(): fmt.Println("sub bus msg exit") return case busMsg := <-ss.bhClient.SubCh: if string(busMsg.Topic) == ss.syncSqlTopic { ss.handleClusterMessage(busMsg.Data) } // 处理同步全量数据的请求 if string(busMsg.Topic) == ss.queryTableTopic { if ss.ClusterStatus == "master" { logx.Debugf("接收到同步全量数据请求.") ss.handleSyncTableMessage(busMsg.Data) } } // system-service发送的消息 if string(busMsg.Topic) == bhomeclient.Proc_System_Service { var clusterMsg = &protomsg.DbChangeMessage{} if err := proto.Unmarshal(busMsg.Data, clusterMsg); err != nil { if err = json.Unmarshal(busMsg.Data, clusterMsg); err != nil { fmt.Println("proto.Unmarshal ", err.Error()) continue } } if clusterMsg.Table == protomsg.TableChanged_T_Cluster { switch clusterMsg.Info { case "create": // 创建集群, 开启日志跟踪, 设置角色master ss.clusterEventFn(EventCreateCluster) ss.ClusterStatus = "master" //ss.sqlDB.LogMode(true) case "join": // 加入集群, 开启日志跟踪, 设置角色slave ss.clusterEventFn(EventJoinCluster) ss.onJoinCluster() ss.ClusterStatus = "slave" //ss.sqlDB.LogMode(true) case "leave": // 退出集群, 开启日志跟踪, 设置角色slave ss.clusterEventFn(EventLeaveCluster) ss.ClusterStatus = "" //ss.sqlDB.LogMode(true) case "slave2master": ss.clusterEventFn(EventSlave2Master) ss.ClusterStatus = "master" //ss.sqlDB.LogMode(true) case "master2slave": ss.clusterEventFn(EventMaster2Slave) ss.ClusterStatus = "slave" //ss.sqlDB.LogMode(true) } } } } } } // 加入集群, 清空本地表, 同步集群内数据 func (ss *SyncServer) onJoinCluster() { var err error db := ss.sqlDB tx := db.Begin() defer func() { if err != nil && tx != nil { tx.Rollback() } }() tx.Exec("PRAGMA foreign_keys=OFF") //1.删除本地的同步库数据 for _, t := range ss.syncTables { delSql := "delete from " + t + "" err = tx.Exec(delSql).Error if err != nil { logx.Errorf("删除本地的同步库数据失败, %s", err.Error()) } } //4.开启reference tx.Exec("PRAGMA foreign_keys=ON") tx.Commit() // 拉取集群内的同步库数据到本地数据库表中 ss.pubSyncTableMessage() } func (ss *SyncServer) onLeaveCluster() { } func (ss *SyncServer) onCreateCluster() { } // 查询集群状态, 返回 master, slave, leave func (ss *SyncServer) QueryClusterStat() *bhomeclient.Reply { clusterStatTopic := "/data/api-v/cluster/status" req := bhomeclient.Request{ Path: clusterStatTopic, Method: "POST", } reply, err := ss.bhClient.RequestTopic(ss.ServerId, req, 3000) if err != nil { fmt.Println("RequestTopic error", err.Error()) return reply } ss.ClusterStatus = reply.Msg logx.Debugf("当前集群状态: %s", ss.ClusterStatus) return reply } func (ss *SyncServer) handleDbLoggerPrint() { sqlBuf := make([]string, 0) ticker := time.NewTicker(3 * time.Second) sendSize := 0 //serf MaxUserEventSize is 9*1024 for { select { case <-ticker.C: if len(sqlBuf) > 0 { syncSql := strings.Join(sqlBuf, "") //fmt.Println("同步sql语句:", syncSql) ss.pubSyncSqlMessage(syncSql, "") sqlBuf = append([]string{}) sendSize = 0 } case sql := <-syncSqlChan: if sendSize+len(sql) > (9*1024 - 1024) { if len(sqlBuf) > 0 { syncSql := strings.Join(sqlBuf, "") //fmt.Println("同步sql语句:", syncSql) ss.pubSyncSqlMessage(syncSql, "") sqlBuf = append([]string{}) } s := strings.TrimRight(sql, ";") sqlBuf = append(sqlBuf, s+";") sendSize = len(sql) } else { s := strings.TrimRight(sql, ";") sqlBuf = append(sqlBuf, s+";") sendSize = sendSize + len(sql) } } } } func (ss *SyncServer) handleClusterMessage(clusterMsgData []byte) { var msg SqlMsg err := json.Unmarshal(clusterMsgData,&msg) if err != nil { logx.Errorf(" Unmarshal cluster message error, %s",err.Error()) return } // 判断消息是否曾经接收过 if sqlMsgSeqCache.Exists(msg.Id) { logx.Infof("clusterMessage:接收到重复消息, %s", msg.Sql) return } // 记录消息id, 半小时过期 sqlMsgSeqCache.Add(msg.Id, 30*time.Minute, true) logx.Infof("clusterMessage:%s", msg.Sql) sql := msg.Sql if len(sql) <= 0 { return } db := ss.sqlDB if db != nil { db.LogMode(false) defer db.LogMode(true) var err error tx := db.Begin() defer func() { if err != nil && tx != nil { tx.Rollback() } }() result := tx.Exec(sql) err = result.Error if err != nil { fmt.Println("ExecuteSqlByGorm err:", err, ",sql:", sql) } if result.RowsAffected == 0 { fmt.Println("ExecuteSqlByGorm RowsAffected == 0", ",sql:", sql) } tx.Commit() } } // serf 同步数据的限制为92160 byte func (ss *SyncServer) handleSyncTableMessage(msg []byte) error { sizeLimit := 61440 targetId := string(msg) //fmt.Println("同步全量数据给节点:", targetId) sqls, err := DumpTables(ss.sqlDB, ss.syncTables) if err != nil { logx.Errorf("DumpTables error: %s", err.Error()) return err } logx.Infof("DumpTables sql:%v", sqls) syncSql := strings.Join(sqls, ";") if len(syncSql) < sizeLimit { err = ss.pubSyncSqlMessage(syncSql, targetId) } else { shard := "" for _, sql := range sqls { if len(shard)+len(sql) > sizeLimit { err = ss.pubSyncSqlMessage(shard, targetId) shard = "" } shard = fmt.Sprintf("%s%s;", shard, sql) } if len(shard) > 0 { err = ss.pubSyncSqlMessage(shard, targetId) } } return err } func (ss *SyncServer) Print(values ...interface{}) { var ( level = values[0] ) //fmt.Println("dblogger", values) if level == "sql" { msgArr := gorm.LogFormatter(values...) sql := msgArr[3].(string) logx.Infof("sql: %v", sql) sql = strings.TrimPrefix(sql, " ") if !strings.HasPrefix(sql, "SELECT") && !strings.HasPrefix(sql, "select") && !strings.Contains(sql, "PRAGMA") && !strings.Contains(sql, "pragma") { affected := values[5].(int64) if affected > 0 { //执行成功 //判断操作的是哪张表 whereIdx := strings.Index(sql, "WHERE") sqlWithTable := sql if whereIdx > -1 { sqlWithTable = sql[:whereIdx] } //fmt.Println("判断是哪张表 sqlWithTable:", sqlWithTable) insertReg := regexp.MustCompile(`^\s*(?i:insert)\s`) //insert updateReg := regexp.MustCompile(`^\s*(?i:update)\s`) //update delReg := regexp.MustCompile(`^\s*(?i:delete)\s`) //delete if insertReg.MatchString(sqlWithTable) { //fmt.Println("插入操作") for _, t := range agent.syncTables { reg := regexp.MustCompile(`\s+\"?(?i:` + t + `)\"?\s+`) if reg.MatchString(sqlWithTable) { //fmt.Println("属于同步表:", t) // 判断是在集群内, 同步消息, 判断两种角色, 为避免其他出现状态 if ss.ClusterStatus == "master" || ss.ClusterStatus == "slave" { syncSqlChan <- sql } } } } else if updateReg.MatchString(sqlWithTable) || delReg.MatchString(sqlWithTable) { //fmt.Println("删除或者更新") for _, t := range agent.syncTables { reg := regexp.MustCompile(`\s+\"?(?i:` + t + `)\"?\s+`) if reg.MatchString(sqlWithTable) { //fmt.Println("属于同步表:", t) if ss.ClusterStatus == "master" || ss.ClusterStatus == "slave" { syncSqlChan <- sql } } } } } } } else { fmt.Println("dbLogger level!=sql") } } func arrayContains(list []string, arr []string) string { if arr == nil || list == nil { return "" } for _, s := range arr { isExist := false for _, t := range list { if s == t { isExist = true break } } if !isExist { return s } } return "" } type NodeInfo struct { NodeID string `json:"node_id,omitempty"` NodeIp string `json:"node_ip,omitempty"` NodeName string `json:"node_name,omitempty"` ClusterID string `json:"cluster_id"` CreateTime string `json:"create_time"` DeviceType string `json:"device_type"` DriftState string `json:"drift_state"` Online bool `json:"online"` } func QueryClusterStatusAndNodeQuantity() (string, int) { reply := agent.QueryClusterStat() if reply == nil { return "", 0 } var nodes []NodeInfo err := mapstructure.Decode(reply.Data, &nodes) if err != nil { logx.Errorf("mapstructure.Decode QueryClusterStat data err:%v", err) return reply.Msg, 0 } return reply.Msg, len(nodes) }