package cache
|
|
import (
|
"fmt"
|
"sdkCompare/config"
|
|
"github.com/go-mysql-org/go-mysql/canal"
|
)
|
|
type MyEventHandler struct {
|
canal.DummyEventHandler
|
}
|
|
func (h *MyEventHandler) OnRow(e *canal.RowsEvent) error {
|
// 过滤历史消息和不关心的表
|
if e.Table.Name != config.DbPersonCompInfo.PersonTable || e.Header == nil {
|
return nil
|
}
|
|
fmt.Printf("%s %s %v\n", e.Table.Name, e.Action, e.Rows)
|
|
return nil
|
}
|
|
func (h *MyEventHandler) String() string {
|
return "MyEventHandler"
|
}
|
|
func WatchDB() error {
|
cfg := canal.NewDefaultConfig()
|
cfg.Addr = config.DbPersonCompInfo.MysqlAddr
|
cfg.User = config.DbPersonCompInfo.Username
|
cfg.Password = config.DbPersonCompInfo.Password
|
|
// 监听的数据库和表
|
cfg.Dump.TableDB = config.DbPersonCompInfo.Database
|
cfg.Dump.Tables = []string{config.DbPersonCompInfo.PersonTable}
|
|
cfg.ParseTime = true
|
|
c, err := canal.NewCanal(cfg)
|
if err != nil {
|
return err
|
}
|
|
// Register a handler to handle RowsEvent
|
c.SetEventHandler(&MyEventHandler{})
|
|
// Start canal
|
err = c.Run()
|
|
return err
|
}
|