liujiandao
2024-02-20 07eea46970759aba106f3db3f4bc24c518ab41de
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package cache
 
import (
    "fmt"
    "sdkCompare/config"
 
    "github.com/go-mysql-org/go-mysql/canal"
)
 
type MyEventHandler struct {
    canal.DummyEventHandler
}
 
func (h *MyEventHandler) OnRow(e *canal.RowsEvent) error {
    // 过滤历史消息和不关心的表
    if e.Table.Name != config.DbPersonCompInfo.PersonTable || e.Header == nil {
        return nil
    }
 
    fmt.Printf("%s %s %v\n", e.Table.Name, e.Action, e.Rows)
 
    return nil
}
 
func (h *MyEventHandler) String() string {
    return "MyEventHandler"
}
 
func WatchDB() error {
    cfg := canal.NewDefaultConfig()
    cfg.Addr = config.DbPersonCompInfo.MysqlAddr
    cfg.User = config.DbPersonCompInfo.Username
    cfg.Password = config.DbPersonCompInfo.Password
 
    // 监听的数据库和表
    cfg.Dump.TableDB = config.DbPersonCompInfo.Database
    cfg.Dump.Tables = []string{config.DbPersonCompInfo.PersonTable}
 
    cfg.ParseTime = true
 
    c, err := canal.NewCanal(cfg)
    if err != nil {
        return err
    }
 
    // Register a handler to handle RowsEvent
    c.SetEventHandler(&MyEventHandler{})
 
    // Start canal
    err = c.Run()
 
    return err
}