package models import ( "basic.com/valib/logger.git" "github.com/jinzhu/gorm" "regexp" "strings" "time" ) type DbLogger struct { } var SyncTables = []string{ "dbtablepersons", "dbtables" } const ( UserEventSizeLimit = 8192 ) func (dbLogger *DbLogger) Print(values ...interface{}) { var ( level = values[0] ) if level == "sql" { msgArr := gorm.LogFormatter(values...) sql := msgArr[3].(string) sql = strings.TrimPrefix(sql, " ") if !strings.HasPrefix(sql, "SELECT") && !strings.HasPrefix(sql, "select") && !strings.Contains(sql, "PRAGMA") && !strings.Contains(sql, "pragma"){ affected := values[5].(int64) if affected > 0 {//执行成功 //判断操作的是哪张表 whereIdx := strings.Index(sql, "WHERE") sqlWithTable := sql if whereIdx > -1{ sqlWithTable = sql[:whereIdx] } insertReg := regexp.MustCompile(`^\s*(?i:insert)\s`)//insert updateReg := regexp.MustCompile(`^\s*(?i:update)\s`)//update delReg := regexp.MustCompile(`^\s*(?i:delete)\s`)//delete if insertReg.MatchString(sqlWithTable) { logger.Debug("insertRegex match,sql:",sql) for _,t :=range SyncTables { reg := regexp.MustCompile(`\s+\"?(?i:`+t+`)\"?\s+`) if reg.MatchString(sqlWithTable) { syncSqlChan <- sql if len(sql) > 100 { logger.Debug("AgentSync insert matchedTable:",t,",len(Sql):",len(sql)) } else { logger.Debug("AgentSync insert matchedTable:",t,",Sql:",sql) } } } } else if updateReg.MatchString(sqlWithTable) || delReg.MatchString(sqlWithTable) { logger.Debug("update or delete Regex match,sql:",sql) for _,t :=range SyncTables { reg := regexp.MustCompile(`\s+\"?(?i:`+t+`)\"?\s+`) if reg.MatchString(sqlWithTable) { syncSqlChan <- sql if len(sql) > 100 { logger.Debug("AgentSync update or delete matchedTable:",t,",len(Sql):",len(sql)) } else { logger.Debug("AgentSync update or delete matchedTable:",t,",Sql:",sql) } } } } } else { if len(values) >100 { logger.Debug("exec affected=0,dbLogger len(values):",len(values)) } else { logger.Debug("exec affected=0,dbLogger:",values) } } } } else { logger.Debug("dbLogger level!=sql") } } var syncSqlChan = make(chan string, 512) func StartSync() { sqlBuf := make([]string, 0) ticker := time.NewTicker(3 * time.Second) sendSize := 0//serf MaxUserEventSize is 9*1024 for { select { case <-ticker.C: if len(sqlBuf) >0 { syncSql := strings.Join(sqlBuf, "") syncSqlEvent(syncSql) sqlBuf = append([]string{}) sendSize = 0 } case sql := <-syncSqlChan: if sendSize + len(sql) > UserEventSizeLimit { if len(sqlBuf) >0 { syncSql := strings.Join(sqlBuf, "") syncSqlEvent(syncSql) sqlBuf = append([]string{}) } s:=strings.TrimRight(sql,";") sqlBuf = append(sqlBuf, s+";") sendSize = len(sql) } else { s:=strings.TrimRight(sql,";") sqlBuf = append(sqlBuf,s+";") sendSize = sendSize + len(sql) } } } } func syncSqlEvent(sql string) { //使用softbus同步sql到集群内其他节点 logger.Debug("syncSqlEvent sql:", sql) }