package serf import ( "basic.com/valib/logger.git" "basic.com/valib/serf.git/serf" "github.com/jinzhu/gorm" "regexp" "strings" "time" ) type DbLogger struct { } var SyncTables = []string{ "cluster", "cluster_node", } func (dbLogger *DbLogger) Print(values ...interface{}) { var ( level = values[0] ) if level == "sql" { msgArr := gorm.LogFormatter(values...) sql := msgArr[3].(string) sql = strings.TrimPrefix(sql, " ") if !strings.HasPrefix(sql, "SELECT") && !strings.HasPrefix(sql, "select") && !strings.Contains(sql, "PRAGMA") && !strings.Contains(sql, "pragma") { affected := values[5].(int64) if affected > 0 { //执行成功 //判断操作的是哪张表 whereIdx := strings.Index(sql, "WHERE") sqlWithTable := sql if whereIdx > -1 { sqlWithTable = sql[:whereIdx] } insertReg := regexp.MustCompile(`^\s*(?i:insert)\s`) //insert updateReg := regexp.MustCompile(`^\s*(?i:update)\s`) //update delReg := regexp.MustCompile(`^\s*(?i:delete)\s`) //delete if insertReg.MatchString(sqlWithTable) { //logger.Debug("insertRegex match,sql:",sql) for _, t := range SyncTables { reg := regexp.MustCompile(`\s+\"?(?i:` + t + `)\"?\s+`) if reg.MatchString(sqlWithTable) { syncSqlChan <- sql //if len(sql) > 100 { // logger.Debug("AgentSync insert matchedTable:",t,",len(Sql):",len(sql)) //} else { // logger.Debug("AgentSync insert matchedTable:",t,",Sql:",sql) //} } } } else if updateReg.MatchString(sqlWithTable) || delReg.MatchString(sqlWithTable) { //logger.Debug("update or delete Regex match,sql:",sql) for _, t := range SyncTables { reg := regexp.MustCompile(`\s+\"?(?i:` + t + `)\"?\s+`) if reg.MatchString(sqlWithTable) { syncSqlChan <- sql //if len(sql) > 100 { // logger.Debug("AgentSync update or delete matchedTable:",t,",len(Sql):",len(sql)) //} else { // logger.Debug("AgentSync update or delete matchedTable:",t,",Sql:",sql) //} } } } } else { //if len(values) >100 { // logger.Debug("exec affected=0,dbLogger len(values):",len(values)) //} else { // logger.Debug("exec affected=0,dbLogger:",values) //} } } } else { logger.Debug("dbLogger level!=sql") } } var syncSqlChan = make(chan string) func StartSyncSqlToSerf() { sqlBuf := make([]string, 0) ticker := time.NewTicker(3 * time.Second) sendSize := 0 //serf MaxUserEventSize is 9*1024 for { select { case <-ticker.C: if len(sqlBuf) > 0 { syncSql := strings.Join(sqlBuf, "") syncToSerf(syncSql) sqlBuf = append([]string{}) sendSize = 0 } case sql := <-syncSqlChan: if sendSize+len(sql) > (serf.UserEventSizeLimit - 1024) { if len(sqlBuf) > 0 { syncSql := strings.Join(sqlBuf, "") syncToSerf(syncSql) sqlBuf = append([]string{}) } s := strings.TrimRight(sql, ";") sqlBuf = append(sqlBuf, s+";") sendSize = len(sql) } else { s := strings.TrimRight(sql, ";") sqlBuf = append(sqlBuf, s+";") sendSize = sendSize + len(sql) } } } } func syncToSerf(sql string) { if Agent != nil { SyncSql([]string{sql}) } else { logger.Debug("syncToSerf Agent is nil") } }