package serf
|
|
import (
|
"basic.com/valib/logger.git"
|
"basic.com/valib/serf.git/serf"
|
"github.com/jinzhu/gorm"
|
"regexp"
|
"strings"
|
"time"
|
)
|
|
type DbLogger struct {
|
}
|
|
var SyncTables = []string{
|
"cluster",
|
"cluster_node",
|
}
|
|
func (dbLogger *DbLogger) Print(values ...interface{}) {
|
var (
|
level = values[0]
|
)
|
|
if level == "sql" {
|
msgArr := gorm.LogFormatter(values...)
|
sql := msgArr[3].(string)
|
sql = strings.TrimPrefix(sql, " ")
|
if !strings.HasPrefix(sql, "SELECT") && !strings.HasPrefix(sql, "select") && !strings.Contains(sql, "PRAGMA") && !strings.Contains(sql, "pragma") {
|
affected := values[5].(int64)
|
if affected > 0 { //执行成功
|
//判断操作的是哪张表
|
whereIdx := strings.Index(sql, "WHERE")
|
sqlWithTable := sql
|
if whereIdx > -1 {
|
sqlWithTable = sql[:whereIdx]
|
}
|
insertReg := regexp.MustCompile(`^\s*(?i:insert)\s`) //insert
|
updateReg := regexp.MustCompile(`^\s*(?i:update)\s`) //update
|
delReg := regexp.MustCompile(`^\s*(?i:delete)\s`) //delete
|
|
if insertReg.MatchString(sqlWithTable) {
|
//logger.Debug("insertRegex match,sql:",sql)
|
for _, t := range SyncTables {
|
reg := regexp.MustCompile(`\s+\"?(?i:` + t + `)\"?\s+`)
|
if reg.MatchString(sqlWithTable) {
|
|
syncSqlChan <- sql
|
|
//if len(sql) > 100 {
|
// logger.Debug("AgentSync insert matchedTable:",t,",len(Sql):",len(sql))
|
//} else {
|
// logger.Debug("AgentSync insert matchedTable:",t,",Sql:",sql)
|
//}
|
}
|
}
|
} else if updateReg.MatchString(sqlWithTable) || delReg.MatchString(sqlWithTable) {
|
//logger.Debug("update or delete Regex match,sql:",sql)
|
for _, t := range SyncTables {
|
reg := regexp.MustCompile(`\s+\"?(?i:` + t + `)\"?\s+`)
|
if reg.MatchString(sqlWithTable) {
|
|
syncSqlChan <- sql
|
|
//if len(sql) > 100 {
|
// logger.Debug("AgentSync update or delete matchedTable:",t,",len(Sql):",len(sql))
|
//} else {
|
// logger.Debug("AgentSync update or delete matchedTable:",t,",Sql:",sql)
|
//}
|
}
|
}
|
}
|
} else {
|
//if len(values) >100 {
|
// logger.Debug("exec affected=0,dbLogger len(values):",len(values))
|
//} else {
|
// logger.Debug("exec affected=0,dbLogger:",values)
|
//}
|
}
|
}
|
} else {
|
logger.Debug("dbLogger level!=sql")
|
}
|
}
|
|
var syncSqlChan = make(chan string)
|
|
func StartSyncSqlToSerf() {
|
sqlBuf := make([]string, 0)
|
ticker := time.NewTicker(3 * time.Second)
|
sendSize := 0 //serf MaxUserEventSize is 9*1024
|
for {
|
select {
|
case <-ticker.C:
|
if len(sqlBuf) > 0 {
|
syncSql := strings.Join(sqlBuf, "")
|
syncToSerf(syncSql)
|
|
sqlBuf = append([]string{})
|
sendSize = 0
|
}
|
case sql := <-syncSqlChan:
|
|
if sendSize+len(sql) > (serf.UserEventSizeLimit - 1024) {
|
if len(sqlBuf) > 0 {
|
syncSql := strings.Join(sqlBuf, "")
|
syncToSerf(syncSql)
|
sqlBuf = append([]string{})
|
}
|
|
s := strings.TrimRight(sql, ";")
|
sqlBuf = append(sqlBuf, s+";")
|
sendSize = len(sql)
|
} else {
|
s := strings.TrimRight(sql, ";")
|
sqlBuf = append(sqlBuf, s+";")
|
|
sendSize = sendSize + len(sql)
|
}
|
}
|
}
|
}
|
|
func syncToSerf(sql string) {
|
if Agent != nil {
|
SyncSql([]string{sql})
|
} else {
|
logger.Debug("syncToSerf Agent is nil")
|
}
|
}
|