zhangzengfei
2023-10-08 2cd1af13bc4e7aec4c85b9fe88db2d294af6468f
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package serf
 
import (
    "basic.com/valib/logger.git"
    "basic.com/valib/serf.git/serf"
    "github.com/jinzhu/gorm"
    "regexp"
    "strings"
    "time"
)
 
type DbLogger struct {
}
 
var SyncTables = []string{
    "cluster",
    "cluster_node",
}
 
func (dbLogger *DbLogger) Print(values ...interface{}) {
    var (
        level = values[0]
    )
 
    if level == "sql" {
        msgArr := gorm.LogFormatter(values...)
        sql := msgArr[3].(string)
        sql = strings.TrimPrefix(sql, " ")
        if !strings.HasPrefix(sql, "SELECT") && !strings.HasPrefix(sql, "select") && !strings.Contains(sql, "PRAGMA") && !strings.Contains(sql, "pragma") {
            affected := values[5].(int64)
            if affected > 0 { //执行成功
                //判断操作的是哪张表
                whereIdx := strings.Index(sql, "WHERE")
                sqlWithTable := sql
                if whereIdx > -1 {
                    sqlWithTable = sql[:whereIdx]
                }
                insertReg := regexp.MustCompile(`^\s*(?i:insert)\s`) //insert
                updateReg := regexp.MustCompile(`^\s*(?i:update)\s`) //update
                delReg := regexp.MustCompile(`^\s*(?i:delete)\s`)    //delete
 
                if insertReg.MatchString(sqlWithTable) {
                    //logger.Debug("insertRegex match,sql:",sql)
                    for _, t := range SyncTables {
                        reg := regexp.MustCompile(`\s+\"?(?i:` + t + `)\"?\s+`)
                        if reg.MatchString(sqlWithTable) {
 
                            syncSqlChan <- sql
 
                            //if len(sql) > 100 {
                            //    logger.Debug("AgentSync insert matchedTable:",t,",len(Sql):",len(sql))
                            //} else {
                            //    logger.Debug("AgentSync insert matchedTable:",t,",Sql:",sql)
                            //}
                        }
                    }
                } else if updateReg.MatchString(sqlWithTable) || delReg.MatchString(sqlWithTable) {
                    //logger.Debug("update or delete Regex match,sql:",sql)
                    for _, t := range SyncTables {
                        reg := regexp.MustCompile(`\s+\"?(?i:` + t + `)\"?\s+`)
                        if reg.MatchString(sqlWithTable) {
 
                            syncSqlChan <- sql
 
                            //if len(sql) > 100 {
                            //    logger.Debug("AgentSync update or delete matchedTable:",t,",len(Sql):",len(sql))
                            //} else {
                            //    logger.Debug("AgentSync update or delete matchedTable:",t,",Sql:",sql)
                            //}
                        }
                    }
                }
            } else {
                //if len(values) >100 {
                //    logger.Debug("exec affected=0,dbLogger len(values):",len(values))
                //} else {
                //    logger.Debug("exec affected=0,dbLogger:",values)
                //}
            }
        }
    } else {
        logger.Debug("dbLogger level!=sql")
    }
}
 
var syncSqlChan = make(chan string)
 
func StartSyncSqlToSerf() {
    sqlBuf := make([]string, 0)
    ticker := time.NewTicker(3 * time.Second)
    sendSize := 0 //serf MaxUserEventSize is 9*1024
    for {
        select {
        case <-ticker.C:
            if len(sqlBuf) > 0 {
                syncSql := strings.Join(sqlBuf, "")
                syncToSerf(syncSql)
 
                sqlBuf = append([]string{})
                sendSize = 0
            }
        case sql := <-syncSqlChan:
 
            if sendSize+len(sql) > (serf.UserEventSizeLimit - 1024) {
                if len(sqlBuf) > 0 {
                    syncSql := strings.Join(sqlBuf, "")
                    syncToSerf(syncSql)
                    sqlBuf = append([]string{})
                }
 
                s := strings.TrimRight(sql, ";")
                sqlBuf = append(sqlBuf, s+";")
                sendSize = len(sql)
            } else {
                s := strings.TrimRight(sql, ";")
                sqlBuf = append(sqlBuf, s+";")
 
                sendSize = sendSize + len(sql)
            }
        }
    }
}
 
func syncToSerf(sql string) {
    if Agent != nil {
        SyncSql([]string{sql})
    } else {
        logger.Debug("syncToSerf Agent is nil")
    }
}