package cache import ( "fmt" "sdkCompare/config" "github.com/go-mysql-org/go-mysql/canal" ) type MyEventHandler struct { canal.DummyEventHandler } func (h *MyEventHandler) OnRow(e *canal.RowsEvent) error { // 过滤历史消息和不关心的表 if e.Table.Name != config.DbPersonCompInfo.PersonTable || e.Header == nil { return nil } fmt.Printf("%s %s %v\n", e.Table.Name, e.Action, e.Rows) return nil } func (h *MyEventHandler) String() string { return "MyEventHandler" } func WatchDB() error { cfg := canal.NewDefaultConfig() cfg.Addr = config.DbPersonCompInfo.MysqlAddr cfg.User = config.DbPersonCompInfo.Username cfg.Password = config.DbPersonCompInfo.Password // 监听的数据库和表 cfg.Dump.TableDB = config.DbPersonCompInfo.Database cfg.Dump.Tables = []string{config.DbPersonCompInfo.PersonTable} cfg.ParseTime = true c, err := canal.NewCanal(cfg) if err != nil { return err } // Register a handler to handle RowsEvent c.SetEventHandler(&MyEventHandler{}) // Start canal err = c.Run() return err }