| | |
| | | "fmt" |
| | | "os" |
| | | "os/signal" |
| | | "regexp" |
| | | "strings" |
| | | "syscall" |
| | | "time" |
| | | |
| | | "apsClient/pkg/logx" |
| | | |
| | | "basic.com/pubsub/protomsg.git" |
| | | "basic.com/valib/bhomeclient.git" |
| | |
| | | |
| | | "github.com/gogo/protobuf/proto" |
| | | "github.com/jinzhu/gorm" |
| | | "github.com/satori/go.uuid" |
| | | "github.com/mitchellh/mapstructure" |
| | | "github.com/muesli/cache2go" |
| | | ) |
| | | |
| | | var ( |
| | | agent = SyncServer{} |
| | | agent = SyncServer{} |
| | | dependProcs = []string{ |
| | | bhomeclient.Proc_System_Service, |
| | | } |
| | | |
| | | sqlMsgSeqCache = cache2go.Cache("syncSqlMsg") |
| | | ) |
| | | |
| | | const ( |
| | |
| | | Proc string `json:"procName"` // 进程名 |
| | | Topic string `json:"topic"` // 主题 |
| | | Payload []byte `json:"payload"` // 消息体,自行解析 |
| | | } |
| | | |
| | | type SqlMsg struct { |
| | | Id string |
| | | Sql string |
| | | Version string |
| | | } |
| | | |
| | | type SyncServer struct { |
| | |
| | | agent.queryTableTopic = procName + "/serf/query/sqls" |
| | | |
| | | // 设置日志回调 |
| | | db.SetLogger(&DbLogger{}) |
| | | db.SetLogger(&agent) |
| | | |
| | | // 先关闭日志 |
| | | db.LogMode(false) |
| | | //db.LogMode(false) |
| | | |
| | | return &agent |
| | | } |
| | |
| | | bhomedbapi.InitDoReq(client.RequestOnly) |
| | | //bhomedbapi.InitLog(logger.Debug) |
| | | |
| | | // 需要等待system-service进程成功启动后,才能获取集群状态(或者保证程序启动时获取到正确的状态) |
| | | tryTimes := 0 |
| | | loop: |
| | | for { |
| | | select { |
| | | case <-q: |
| | | initChan <- false |
| | | return |
| | | default: |
| | | if tryTimes < 15 { |
| | | clients, err := client.GetRegisteredClient() |
| | | if err == nil && len(clients) > 0 { |
| | | var existingProcs []string |
| | | for _, c := range clients { |
| | | if c.Online { |
| | | existingProcs = append(existingProcs, string(c.Proc.ProcId)) |
| | | } |
| | | } |
| | | if diff := arrayContains(existingProcs, dependProcs); diff == "" { |
| | | break loop |
| | | } else { |
| | | logx.Errorf("Proc: %s is not running!", diff) |
| | | time.Sleep(time.Second * 1) |
| | | } |
| | | } else { |
| | | tryTimes++ |
| | | time.Sleep(time.Second * 5) |
| | | } |
| | | } else { |
| | | logx.Errorf("tried 15 times, client.GetRegisteredClient failed") |
| | | initChan <- false |
| | | return |
| | | } |
| | | } |
| | | } |
| | | |
| | | go client.StartServer(nil) |
| | | |
| | | ss.bhClient = client |
| | |
| | | // 启动后查询一次集群状态 |
| | | ss.QueryClusterStat() |
| | | |
| | | if ss.ClusterStatus != "" { |
| | | ss.sqlDB.LogMode(true) |
| | | } |
| | | //if ss.ClusterStatus != "" { |
| | | //ss.sqlDB.LogMode(true) |
| | | //} |
| | | |
| | | initChan <- true |
| | | <-q |
| | |
| | | os.Exit(0) |
| | | } |
| | | |
| | | func (ss *SyncServer) pubSyncSqlMessage(payload []byte, targetId string) error { |
| | | func (ss *SyncServer) pubSyncSqlMessage(sql string, targetId string) error { |
| | | sqlMsg := SqlMsg{ |
| | | Id: uuid.NewV4().String(), |
| | | Sql: sql, |
| | | } |
| | | |
| | | bMsg, _ := json.Marshal(sqlMsg) |
| | | |
| | | var msg = ProcMessageEvent{ |
| | | Owner: ss.ServerId, |
| | | Target: targetId, |
| | | Proc: ss.ProcName, |
| | | Topic: ss.syncSqlTopic, |
| | | Payload: payload, |
| | | Payload: bMsg, |
| | | } |
| | | |
| | | b, err := json.Marshal(msg) |
| | |
| | | return err |
| | | } |
| | | |
| | | fmt.Println("加入集群, 请求同步全量数据,id:", ss.ServerId) |
| | | logx.Debugf("加入集群, 请求同步全量数据,id:%s", ss.ServerId) |
| | | return ss.bhClient.Publish(serfSyncTopic, b) |
| | | } |
| | | |
| | |
| | | // 处理同步全量数据的请求 |
| | | if string(busMsg.Topic) == ss.queryTableTopic { |
| | | if ss.ClusterStatus == "master" { |
| | | fmt.Println("接收到同步全量数据请求") |
| | | logx.Debugf("接收到同步全量数据请求.") |
| | | ss.handleSyncTableMessage(busMsg.Data) |
| | | } |
| | | } |
| | |
| | | // 创建集群, 开启日志跟踪, 设置角色master |
| | | ss.clusterEventFn(EventCreateCluster) |
| | | ss.ClusterStatus = "master" |
| | | ss.sqlDB.LogMode(true) |
| | | //ss.sqlDB.LogMode(true) |
| | | |
| | | case "join": |
| | | // 加入集群, 开启日志跟踪, 设置角色slave |
| | | ss.clusterEventFn(EventJoinCluster) |
| | | ss.onJoinCluster() |
| | | ss.ClusterStatus = "slave" |
| | | ss.sqlDB.LogMode(true) |
| | | //ss.sqlDB.LogMode(true) |
| | | |
| | | case "leave": |
| | | // 退出集群, 开启日志跟踪, 设置角色slave |
| | | ss.clusterEventFn(EventLeaveCluster) |
| | | ss.ClusterStatus = "" |
| | | ss.sqlDB.LogMode(false) |
| | | //ss.sqlDB.LogMode(true) |
| | | case "slave2master": |
| | | ss.clusterEventFn(EventSlave2Master) |
| | | ss.ClusterStatus = "master" |
| | | ss.sqlDB.LogMode(true) |
| | | //ss.sqlDB.LogMode(true) |
| | | case "master2slave": |
| | | ss.clusterEventFn(EventMaster2Slave) |
| | | ss.ClusterStatus = "slave" |
| | | ss.sqlDB.LogMode(true) |
| | | //ss.sqlDB.LogMode(true) |
| | | } |
| | | } |
| | | } |
| | |
| | | |
| | | err = tx.Exec(delSql).Error |
| | | if err != nil { |
| | | fmt.Println("删除本地的同步库数据失败,", err.Error()) |
| | | logx.Errorf("删除本地的同步库数据失败, %s", err.Error()) |
| | | } |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | // 查询集群状态, 返回 master, slave, leave |
| | | func (ss *SyncServer) QueryClusterStat() string { |
| | | func (ss *SyncServer) QueryClusterStat() *bhomeclient.Reply { |
| | | clusterStatTopic := "/data/api-v/cluster/status" |
| | | req := bhomeclient.Request{ |
| | | Path: clusterStatTopic, |
| | |
| | | if err != nil { |
| | | fmt.Println("RequestTopic error", err.Error()) |
| | | |
| | | return "" |
| | | return reply |
| | | } |
| | | |
| | | ss.ClusterStatus = reply.Msg |
| | | |
| | | fmt.Println("当前集群状态:", ss.ClusterStatus) |
| | | logx.Debugf("当前集群状态: %s", ss.ClusterStatus) |
| | | |
| | | return reply.Msg |
| | | return reply |
| | | } |
| | | |
| | | func (ss *SyncServer) handleDbLoggerPrint() { |
| | |
| | | syncSql := strings.Join(sqlBuf, "") |
| | | |
| | | //fmt.Println("同步sql语句:", syncSql) |
| | | ss.pubSyncSqlMessage([]byte(syncSql), "") |
| | | ss.pubSyncSqlMessage(syncSql, "") |
| | | |
| | | sqlBuf = append([]string{}) |
| | | sendSize = 0 |
| | |
| | | syncSql := strings.Join(sqlBuf, "") |
| | | //fmt.Println("同步sql语句:", syncSql) |
| | | |
| | | ss.pubSyncSqlMessage([]byte(syncSql), "") |
| | | ss.pubSyncSqlMessage(syncSql, "") |
| | | |
| | | sqlBuf = append([]string{}) |
| | | } |
| | |
| | | } |
| | | } |
| | | |
| | | func (ss *SyncServer) handleClusterMessage(msg []byte) { |
| | | //fmt.Println("clusterMessage:", string(msg)) |
| | | sql := string(msg) |
| | | func (ss *SyncServer) handleClusterMessage(clusterMsgData []byte) { |
| | | var msg SqlMsg |
| | | err := json.Unmarshal(clusterMsgData,&msg) |
| | | if err != nil { |
| | | logx.Errorf(" Unmarshal cluster message error, %s",err.Error()) |
| | | return |
| | | } |
| | | |
| | | // 判断消息是否曾经接收过 |
| | | if sqlMsgSeqCache.Exists(msg.Id) { |
| | | logx.Infof("clusterMessage:接收到重复消息, %s", msg.Sql) |
| | | return |
| | | } |
| | | |
| | | // 记录消息id, 半小时过期 |
| | | sqlMsgSeqCache.Add(msg.Id, 30*time.Minute, true) |
| | | |
| | | logx.Infof("clusterMessage:%s", msg.Sql) |
| | | sql := msg.Sql |
| | | |
| | | if len(sql) <= 0 { |
| | | return |
| | |
| | | } |
| | | } |
| | | |
| | | // serf 同步数据的限制为92160 byte |
| | | func (ss *SyncServer) handleSyncTableMessage(msg []byte) error { |
| | | sizeLimit := 61440 |
| | | targetId := string(msg) |
| | | fmt.Println("同步全量数据给节点:", targetId) |
| | | |
| | | //fmt.Println("同步全量数据给节点:", targetId) |
| | | sqls, err := DumpTables(ss.sqlDB, ss.syncTables) |
| | | if err != nil { |
| | | fmt.Println("DumpTables error, ", err.Error()) |
| | | logx.Errorf("DumpTables error: %s", err.Error()) |
| | | return err |
| | | } |
| | | |
| | | logx.Infof("DumpTables sql:%v", sqls) |
| | | syncSql := strings.Join(sqls, ";") |
| | | err = ss.pubSyncSqlMessage([]byte(syncSql), targetId) |
| | | if len(syncSql) < sizeLimit { |
| | | err = ss.pubSyncSqlMessage(syncSql, targetId) |
| | | } else { |
| | | shard := "" |
| | | for _, sql := range sqls { |
| | | if len(shard)+len(sql) > sizeLimit { |
| | | err = ss.pubSyncSqlMessage(shard, targetId) |
| | | shard = "" |
| | | } |
| | | |
| | | shard = fmt.Sprintf("%s%s;", shard, sql) |
| | | } |
| | | |
| | | if len(shard) > 0 { |
| | | err = ss.pubSyncSqlMessage(shard, targetId) |
| | | } |
| | | } |
| | | |
| | | return err |
| | | } |
| | | |
| | | func (ss *SyncServer) Print(values ...interface{}) { |
| | | var ( |
| | | level = values[0] |
| | | ) |
| | | |
| | | //fmt.Println("dblogger", values) |
| | | |
| | | if level == "sql" { |
| | | msgArr := gorm.LogFormatter(values...) |
| | | sql := msgArr[3].(string) |
| | | logx.Infof("sql: %v", sql) |
| | | sql = strings.TrimPrefix(sql, " ") |
| | | if !strings.HasPrefix(sql, "SELECT") && !strings.HasPrefix(sql, "select") && !strings.Contains(sql, "PRAGMA") && !strings.Contains(sql, "pragma") { |
| | | affected := values[5].(int64) |
| | | if affected > 0 { //执行成功 |
| | | //判断操作的是哪张表 |
| | | whereIdx := strings.Index(sql, "WHERE") |
| | | sqlWithTable := sql |
| | | if whereIdx > -1 { |
| | | sqlWithTable = sql[:whereIdx] |
| | | } |
| | | |
| | | //fmt.Println("判断是哪张表 sqlWithTable:", sqlWithTable) |
| | | |
| | | insertReg := regexp.MustCompile(`^\s*(?i:insert)\s`) //insert |
| | | updateReg := regexp.MustCompile(`^\s*(?i:update)\s`) //update |
| | | delReg := regexp.MustCompile(`^\s*(?i:delete)\s`) //delete |
| | | |
| | | if insertReg.MatchString(sqlWithTable) { |
| | | //fmt.Println("插入操作") |
| | | for _, t := range agent.syncTables { |
| | | reg := regexp.MustCompile(`\s+\"?(?i:` + t + `)\"?\s+`) |
| | | if reg.MatchString(sqlWithTable) { |
| | | //fmt.Println("属于同步表:", t) |
| | | // 判断是在集群内, 同步消息, 判断两种角色, 为避免其他出现状态 |
| | | if ss.ClusterStatus == "master" || ss.ClusterStatus == "slave" { |
| | | syncSqlChan <- sql |
| | | } |
| | | } |
| | | } |
| | | } else if updateReg.MatchString(sqlWithTable) || delReg.MatchString(sqlWithTable) { |
| | | //fmt.Println("删除或者更新") |
| | | for _, t := range agent.syncTables { |
| | | reg := regexp.MustCompile(`\s+\"?(?i:` + t + `)\"?\s+`) |
| | | if reg.MatchString(sqlWithTable) { |
| | | //fmt.Println("属于同步表:", t) |
| | | if ss.ClusterStatus == "master" || ss.ClusterStatus == "slave" { |
| | | syncSqlChan <- sql |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } else { |
| | | fmt.Println("dbLogger level!=sql") |
| | | } |
| | | } |
| | | |
| | | func arrayContains(list []string, arr []string) string { |
| | | if arr == nil || list == nil { |
| | | return "" |
| | | } |
| | | |
| | | for _, s := range arr { |
| | | isExist := false |
| | | for _, t := range list { |
| | | if s == t { |
| | | isExist = true |
| | | break |
| | | } |
| | | } |
| | | |
| | | if !isExist { |
| | | return s |
| | | } |
| | | } |
| | | |
| | | return "" |
| | | } |
| | | |
| | | type NodeInfo struct { |
| | | NodeID string `json:"node_id,omitempty"` |
| | | NodeIp string `json:"node_ip,omitempty"` |
| | | NodeName string `json:"node_name,omitempty"` |
| | | ClusterID string `json:"cluster_id"` |
| | | CreateTime string `json:"create_time"` |
| | | DeviceType string `json:"device_type"` |
| | | DriftState string `json:"drift_state"` |
| | | Online bool `json:"online"` |
| | | } |
| | | |
| | | func QueryClusterStatusAndNodeQuantity() (string, int) { |
| | | reply := agent.QueryClusterStat() |
| | | if reply == nil { |
| | | return "", 0 |
| | | } |
| | | var nodes []NodeInfo |
| | | err := mapstructure.Decode(reply.Data, &nodes) |
| | | if err != nil { |
| | | logx.Errorf("mapstructure.Decode QueryClusterStat data err:%v", err) |
| | | return reply.Msg, 0 |
| | | } |
| | | return reply.Msg, len(nodes) |
| | | } |