package models
|
|
import (
|
"basic.com/valib/logger.git"
|
"github.com/jinzhu/gorm"
|
"regexp"
|
"strings"
|
"time"
|
)
|
|
type DbLogger struct {
|
|
}
|
|
var SyncTables = []string{ "area", "camera_area", "cameras" }
|
|
const (
|
UserEventSizeLimit = 8192
|
)
|
|
func (dbLogger *DbLogger) Print(values ...interface{}) {
|
var (
|
level = values[0]
|
)
|
if level == "sql" {
|
msgArr := gorm.LogFormatter(values...)
|
sql := msgArr[3].(string)
|
sql = strings.TrimPrefix(sql, " ")
|
if !strings.HasPrefix(sql, "SELECT") && !strings.HasPrefix(sql, "select") && !strings.Contains(sql, "PRAGMA") && !strings.Contains(sql, "pragma"){
|
affected := values[5].(int64)
|
if affected > 0 {//执行成功
|
//判断操作的是哪张表
|
whereIdx := strings.Index(sql, "WHERE")
|
sqlWithTable := sql
|
if whereIdx > -1{
|
sqlWithTable = sql[:whereIdx]
|
}
|
insertReg := regexp.MustCompile(`^\s*(?i:insert)\s`)//insert
|
updateReg := regexp.MustCompile(`^\s*(?i:update)\s`)//update
|
delReg := regexp.MustCompile(`^\s*(?i:delete)\s`)//delete
|
|
if insertReg.MatchString(sqlWithTable) {
|
logger.Debug("insertRegex match,sql:",sql)
|
for _,t :=range SyncTables {
|
reg := regexp.MustCompile(`\s+\"?(?i:`+t+`)\"?\s+`)
|
if reg.MatchString(sqlWithTable) {
|
|
syncSqlChan <- sql
|
|
if len(sql) > 100 {
|
logger.Debug("AgentSync insert matchedTable:",t,",len(Sql):",len(sql))
|
} else {
|
logger.Debug("AgentSync insert matchedTable:",t,",Sql:",sql)
|
}
|
}
|
}
|
} else if updateReg.MatchString(sqlWithTable) || delReg.MatchString(sqlWithTable) {
|
logger.Debug("update or delete Regex match,sql:",sql)
|
for _,t :=range SyncTables {
|
reg := regexp.MustCompile(`\s+\"?(?i:`+t+`)\"?\s+`)
|
if reg.MatchString(sqlWithTable) {
|
|
syncSqlChan <- sql
|
|
if len(sql) > 100 {
|
logger.Debug("AgentSync update or delete matchedTable:",t,",len(Sql):",len(sql))
|
} else {
|
logger.Debug("AgentSync update or delete matchedTable:",t,",Sql:",sql)
|
}
|
}
|
}
|
}
|
} else {
|
if len(values) >100 {
|
logger.Debug("exec affected=0,dbLogger len(values):",len(values))
|
} else {
|
logger.Debug("exec affected=0,dbLogger:",values)
|
}
|
}
|
}
|
} else {
|
logger.Debug("dbLogger level!=sql")
|
}
|
}
|
|
var syncSqlChan = make(chan string, 512)
|
func StartSync() {
|
|
sqlBuf := make([]string, 0)
|
ticker := time.NewTicker(3 * time.Second)
|
sendSize := 0//serf MaxUserEventSize is 9*1024
|
for {
|
select {
|
case <-ticker.C:
|
if len(sqlBuf) >0 {
|
syncSql := strings.Join(sqlBuf, "")
|
syncSqlEvent(syncSql)
|
|
sqlBuf = append([]string{})
|
sendSize = 0
|
}
|
case sql := <-syncSqlChan:
|
|
if sendSize + len(sql) > UserEventSizeLimit {
|
if len(sqlBuf) >0 {
|
syncSql := strings.Join(sqlBuf, "")
|
syncSqlEvent(syncSql)
|
sqlBuf = append([]string{})
|
}
|
|
s:=strings.TrimRight(sql,";")
|
sqlBuf = append(sqlBuf, s+";")
|
sendSize = len(sql)
|
} else {
|
s:=strings.TrimRight(sql,";")
|
sqlBuf = append(sqlBuf,s+";")
|
|
sendSize = sendSize + len(sql)
|
}
|
}
|
}
|
}
|
|
func syncSqlEvent(sql string) {
|
//使用softbus同步sql到集群内其他节点
|
logger.Debug("syncSqlEvent sql:", sql)
|
}
|