| | |
| | | "context" |
| | | "encoding/json" |
| | | "fmt" |
| | | "github.com/mitchellh/mapstructure" |
| | | "os" |
| | | "os/signal" |
| | | "regexp" |
| | | "strings" |
| | | "syscall" |
| | | "time" |
| | |
| | | agent.queryTableTopic = procName + "/serf/query/sqls" |
| | | |
| | | // 设置日志回调 |
| | | db.SetLogger(&DbLogger{}) |
| | | db.SetLogger(&agent) |
| | | |
| | | // 先关闭日志 |
| | | db.LogMode(false) |
| | | //db.LogMode(false) |
| | | |
| | | return &agent |
| | | } |
| | |
| | | // 启动后查询一次集群状态 |
| | | ss.QueryClusterStat() |
| | | |
| | | if ss.ClusterStatus != "" { |
| | | ss.sqlDB.LogMode(true) |
| | | } |
| | | //if ss.ClusterStatus != "" { |
| | | //ss.sqlDB.LogMode(true) |
| | | //} |
| | | |
| | | initChan <- true |
| | | <-q |
| | |
| | | // 创建集群, 开启日志跟踪, 设置角色master |
| | | ss.clusterEventFn(EventCreateCluster) |
| | | ss.ClusterStatus = "master" |
| | | ss.sqlDB.LogMode(true) |
| | | //ss.sqlDB.LogMode(true) |
| | | |
| | | case "join": |
| | | // 加入集群, 开启日志跟踪, 设置角色slave |
| | | ss.clusterEventFn(EventJoinCluster) |
| | | ss.onJoinCluster() |
| | | ss.ClusterStatus = "slave" |
| | | ss.sqlDB.LogMode(true) |
| | | //ss.sqlDB.LogMode(true) |
| | | |
| | | case "leave": |
| | | // 退出集群, 开启日志跟踪, 设置角色slave |
| | | ss.clusterEventFn(EventLeaveCluster) |
| | | ss.ClusterStatus = "" |
| | | ss.sqlDB.LogMode(false) |
| | | //ss.sqlDB.LogMode(true) |
| | | case "slave2master": |
| | | ss.clusterEventFn(EventSlave2Master) |
| | | ss.ClusterStatus = "master" |
| | | ss.sqlDB.LogMode(true) |
| | | //ss.sqlDB.LogMode(true) |
| | | case "master2slave": |
| | | ss.clusterEventFn(EventMaster2Slave) |
| | | ss.ClusterStatus = "slave" |
| | | ss.sqlDB.LogMode(true) |
| | | //ss.sqlDB.LogMode(true) |
| | | } |
| | | } |
| | | } |
| | |
| | | } |
| | | |
| | | // 查询集群状态, 返回 master, slave, leave |
| | | func (ss *SyncServer) QueryClusterStat() string { |
| | | func (ss *SyncServer) QueryClusterStat() *bhomeclient.Reply { |
| | | clusterStatTopic := "/data/api-v/cluster/status" |
| | | req := bhomeclient.Request{ |
| | | Path: clusterStatTopic, |
| | |
| | | if err != nil { |
| | | fmt.Println("RequestTopic error", err.Error()) |
| | | |
| | | return "" |
| | | return reply |
| | | } |
| | | |
| | | ss.ClusterStatus = reply.Msg |
| | | |
| | | logx.Debugf("当前集群状态: %s", ss.ClusterStatus) |
| | | |
| | | return reply.Msg |
| | | return reply |
| | | } |
| | | |
| | | func (ss *SyncServer) handleDbLoggerPrint() { |
| | |
| | | } |
| | | |
| | | func (ss *SyncServer) handleClusterMessage(msg []byte) { |
| | | //fmt.Println("clusterMessage:", string(msg)) |
| | | logx.Infof("clusterMessage:", string(msg)) |
| | | sql := string(msg) |
| | | |
| | | if len(sql) <= 0 { |
| | |
| | | } |
| | | } |
| | | |
| | | // serf 同步数据的限制为92160 byte |
| | | func (ss *SyncServer) handleSyncTableMessage(msg []byte) error { |
| | | sizeLimit := 61440 |
| | | targetId := string(msg) |
| | | |
| | | //fmt.Println("同步全量数据给节点:", targetId) |
| | | sqls, err := DumpTables(ss.sqlDB, ss.syncTables) |
| | | if err != nil { |
| | | fmt.Println("DumpTables error, ", err.Error()) |
| | | logx.Errorf("DumpTables error: %s", err.Error()) |
| | | return err |
| | | } |
| | | |
| | | logx.Infof("DumpTables sql:%v", sqls) |
| | | syncSql := strings.Join(sqls, ";") |
| | | err = ss.pubSyncSqlMessage([]byte(syncSql), targetId) |
| | | if len(syncSql) < sizeLimit { |
| | | err = ss.pubSyncSqlMessage([]byte(syncSql), targetId) |
| | | } else { |
| | | shard := "" |
| | | for _, sql := range sqls { |
| | | if len(shard)+len(sql) > sizeLimit { |
| | | err = ss.pubSyncSqlMessage([]byte(shard), targetId) |
| | | shard = "" |
| | | } |
| | | |
| | | shard = fmt.Sprintf("%s%s;", shard, sql) |
| | | } |
| | | |
| | | if len(shard) > 0 { |
| | | err = ss.pubSyncSqlMessage([]byte(shard), targetId) |
| | | } |
| | | } |
| | | |
| | | return err |
| | | } |
| | | |
| | | func (ss *SyncServer) Print(values ...interface{}) { |
| | | var ( |
| | | level = values[0] |
| | | ) |
| | | |
| | | //fmt.Println("dblogger", values) |
| | | |
| | | if level == "sql" { |
| | | msgArr := gorm.LogFormatter(values...) |
| | | sql := msgArr[3].(string) |
| | | logx.Infof("sql: %v", sql) |
| | | sql = strings.TrimPrefix(sql, " ") |
| | | if !strings.HasPrefix(sql, "SELECT") && !strings.HasPrefix(sql, "select") && !strings.Contains(sql, "PRAGMA") && !strings.Contains(sql, "pragma") { |
| | | affected := values[5].(int64) |
| | | if affected > 0 { //执行成功 |
| | | //判断操作的是哪张表 |
| | | whereIdx := strings.Index(sql, "WHERE") |
| | | sqlWithTable := sql |
| | | if whereIdx > -1 { |
| | | sqlWithTable = sql[:whereIdx] |
| | | } |
| | | |
| | | //fmt.Println("判断是哪张表 sqlWithTable:", sqlWithTable) |
| | | |
| | | insertReg := regexp.MustCompile(`^\s*(?i:insert)\s`) //insert |
| | | updateReg := regexp.MustCompile(`^\s*(?i:update)\s`) //update |
| | | delReg := regexp.MustCompile(`^\s*(?i:delete)\s`) //delete |
| | | |
| | | if insertReg.MatchString(sqlWithTable) { |
| | | //fmt.Println("插入操作") |
| | | for _, t := range agent.syncTables { |
| | | reg := regexp.MustCompile(`\s+\"?(?i:` + t + `)\"?\s+`) |
| | | if reg.MatchString(sqlWithTable) { |
| | | //fmt.Println("属于同步表:", t) |
| | | // 判断是在集群内, 同步消息, 判断两种角色, 为避免其他出现状态 |
| | | if ss.ClusterStatus == "master" || ss.ClusterStatus == "slave" { |
| | | syncSqlChan <- sql |
| | | } |
| | | } |
| | | } |
| | | } else if updateReg.MatchString(sqlWithTable) || delReg.MatchString(sqlWithTable) { |
| | | //fmt.Println("删除或者更新") |
| | | for _, t := range agent.syncTables { |
| | | reg := regexp.MustCompile(`\s+\"?(?i:` + t + `)\"?\s+`) |
| | | if reg.MatchString(sqlWithTable) { |
| | | //fmt.Println("属于同步表:", t) |
| | | if ss.ClusterStatus == "master" || ss.ClusterStatus == "slave" { |
| | | syncSqlChan <- sql |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } else { |
| | | fmt.Println("dbLogger level!=sql") |
| | | } |
| | | } |
| | | |
| | | func arrayContains(list []string, arr []string) string { |
| | |
| | | |
| | | return "" |
| | | } |
| | | |
| | | type NodeInfo struct { |
| | | NodeID string `json:"node_id,omitempty"` |
| | | NodeIp string `json:"node_ip,omitempty"` |
| | | NodeName string `json:"node_name,omitempty"` |
| | | ClusterID string `json:"cluster_id"` |
| | | CreateTime string `json:"create_time"` |
| | | DeviceType string `json:"device_type"` |
| | | DriftState string `json:"drift_state"` |
| | | Online bool `json:"online"` |
| | | } |
| | | |
| | | func QueryClusterStatusAndNodeQuantity() (string, int) { |
| | | reply := agent.QueryClusterStat() |
| | | if reply == nil { |
| | | return "", 0 |
| | | } |
| | | var nodes []NodeInfo |
| | | err := mapstructure.Decode(reply.Data, &nodes) |
| | | if err != nil { |
| | | logx.Errorf("mapstructure.Decode QueryClusterStat data err:%v", err) |
| | | return reply.Msg, 0 |
| | | } |
| | | return reply.Msg, len(nodes) |
| | | } |