New file |
| | |
| | | package serf |
| | | |
| | | import ( |
| | | "context" |
| | | "encoding/json" |
| | | "fmt" |
| | | "os" |
| | | "os/signal" |
| | | "strings" |
| | | "syscall" |
| | | "time" |
| | | |
| | | "basic.com/pubsub/protomsg.git" |
| | | "basic.com/valib/bhomeclient.git" |
| | | "basic.com/valib/bhomedbapi.git" |
| | | |
| | | "github.com/gogo/protobuf/proto" |
| | | "github.com/jinzhu/gorm" |
| | | ) |
| | | |
| | | var ( |
| | | agent = SyncServer{} |
| | | ) |
| | | |
| | | const ( |
| | | serfSyncTopic = "sync-proc-message-to-serf" |
| | | |
| | | EventCreateCluster = 0 |
| | | EventJoinCluster = 1 |
| | | EventLeaveCluster = 2 |
| | | EventMaster2Slave = 3 |
| | | EventSlave2Master = 4 |
| | | ) |
| | | |
| | | type ProcMessageEvent struct { |
| | | Owner string `json:"owner"` // 发送者 |
| | | Target string `json:"target"` // 指定接收者 |
| | | Proc string `json:"procName"` // 进程名 |
| | | Topic string `json:"topic"` // 主题 |
| | | Payload []byte `json:"payload"` // 消息体,自行解析 |
| | | } |
| | | |
| | | type SyncServer struct { |
| | | ProcName string // 进程名称 |
| | | ServerId string // 本机id |
| | | ClusterStatus string // 集群状态 master/slave 为空表示未加入集群 |
| | | syncSqlTopic string // 同步sql消息的主题 |
| | | queryTableTopic string // 加入集群后请求集群数据的主题 |
| | | syncTables []string // 需要同步的表 |
| | | sqlDB *gorm.DB // 数据库 |
| | | bhClient *bhomeclient.MicroNode |
| | | clusterEventFn func(int) |
| | | } |
| | | |
| | | func InitAgent(procName string, syncTables []string, db *gorm.DB) *SyncServer { |
| | | agent.ProcName = procName |
| | | agent.ServerId = Vasystem.ServerID |
| | | agent.sqlDB = db |
| | | agent.syncTables = syncTables |
| | | agent.syncSqlTopic = procName + "/serf/sync/sql" |
| | | agent.queryTableTopic = procName + "/serf/query/sqls" |
| | | |
| | | // 设置日志回调 |
| | | db.SetLogger(&DbLogger{}) |
| | | // 先关闭日志 |
| | | db.LogMode(false) |
| | | |
| | | return &agent |
| | | } |
| | | |
| | | func (ss *SyncServer) RegisterClusterEvent(fn func(int)) { |
| | | ss.clusterEventFn = fn |
| | | } |
| | | |
| | | func (ss *SyncServer) Serve(initChan chan bool) { |
| | | proc := &bhomeclient.ProcInfo{ |
| | | Name: ss.ProcName, //进程名称 |
| | | ID: ss.ProcName, //进程id |
| | | Info: "", //进程的描述信息,用于区分同一进程名称下多个进程 |
| | | } |
| | | |
| | | ctx, cancel := context.WithCancel(context.Background()) |
| | | var reg = &bhomeclient.RegisterInfo{ |
| | | Proc: *proc, |
| | | Channel: nil, |
| | | PubTopic: []string{}, |
| | | SubTopic: []string{bhomeclient.Proc_System_Service, ss.syncSqlTopic, ss.queryTableTopic}, |
| | | SubNetTopic: []string{}, |
| | | } |
| | | |
| | | q := make(chan os.Signal, 1) |
| | | signal.Notify(q, os.Interrupt, os.Kill, syscall.SIGTERM) |
| | | |
| | | client, err := bhomeclient.NewMicroNode(ctx, q, ss.ServerId, reg, nil) |
| | | if err != nil { |
| | | initChan <- false |
| | | return |
| | | } |
| | | |
| | | bhomedbapi.InitGetNetNode(client.GetLocalNetNodeByTopic) |
| | | bhomedbapi.InitDoReq(client.RequestOnly) |
| | | //bhomedbapi.InitLog(logger.Debug) |
| | | |
| | | go client.StartServer(nil) |
| | | |
| | | ss.bhClient = client |
| | | |
| | | go ss.subBusMessage(ctx) |
| | | |
| | | go ss.handleDbLoggerPrint() |
| | | |
| | | // 启动后查询一次集群状态 |
| | | ss.QueryClusterStat() |
| | | |
| | | if ss.ClusterStatus != "" { |
| | | ss.sqlDB.LogMode(true) |
| | | } |
| | | |
| | | initChan <- true |
| | | <-q |
| | | |
| | | client.DeRegister() |
| | | cancel() |
| | | client.Free() |
| | | |
| | | os.Exit(0) |
| | | } |
| | | |
| | | func (ss *SyncServer) pubSyncSqlMessage(payload []byte, targetId string) error { |
| | | var msg = ProcMessageEvent{ |
| | | Owner: ss.ServerId, |
| | | Target: targetId, |
| | | Proc: ss.ProcName, |
| | | Topic: ss.syncSqlTopic, |
| | | Payload: payload, |
| | | } |
| | | |
| | | b, err := json.Marshal(msg) |
| | | if err != nil { |
| | | return err |
| | | } |
| | | |
| | | return ss.bhClient.Publish(serfSyncTopic, b) |
| | | } |
| | | |
| | | // 请求同步表的全量数据, 发送自己的id |
| | | func (ss *SyncServer) pubSyncTableMessage() error { |
| | | var msg = ProcMessageEvent{ |
| | | Owner: ss.ServerId, |
| | | Proc: ss.ProcName, |
| | | Topic: ss.queryTableTopic, |
| | | Payload: []byte(ss.ServerId), |
| | | } |
| | | |
| | | b, err := json.Marshal(msg) |
| | | if err != nil { |
| | | return err |
| | | } |
| | | |
| | | fmt.Println("加入集群, 请求同步全量数据,id:", ss.ServerId) |
| | | return ss.bhClient.Publish(serfSyncTopic, b) |
| | | } |
| | | |
| | | func (ss *SyncServer) subBusMessage(ctx context.Context) { |
| | | //fmt.Println("sub bus msg") |
| | | |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | fmt.Println("sub bus msg exit") |
| | | return |
| | | case busMsg := <-ss.bhClient.SubCh: |
| | | if string(busMsg.Topic) == ss.syncSqlTopic { |
| | | ss.handleClusterMessage(busMsg.Data) |
| | | } |
| | | |
| | | // 处理同步全量数据的请求 |
| | | if string(busMsg.Topic) == ss.queryTableTopic { |
| | | if ss.ClusterStatus == "master" { |
| | | fmt.Println("接收到同步全量数据请求") |
| | | ss.handleSyncTableMessage(busMsg.Data) |
| | | } |
| | | } |
| | | |
| | | // system-service发送的消息 |
| | | if string(busMsg.Topic) == bhomeclient.Proc_System_Service { |
| | | var clusterMsg = &protomsg.DbChangeMessage{} |
| | | |
| | | if err := proto.Unmarshal(busMsg.Data, clusterMsg); err != nil { |
| | | if err = json.Unmarshal(busMsg.Data, clusterMsg); err != nil { |
| | | fmt.Println("proto.Unmarshal ", err.Error()) |
| | | continue |
| | | } |
| | | } |
| | | |
| | | if clusterMsg.Table == protomsg.TableChanged_T_Cluster { |
| | | switch clusterMsg.Info { |
| | | case "create": |
| | | // 创建集群, 开启日志跟踪, 设置角色master |
| | | ss.clusterEventFn(EventCreateCluster) |
| | | ss.ClusterStatus = "master" |
| | | ss.sqlDB.LogMode(true) |
| | | |
| | | case "join": |
| | | // 加入集群, 开启日志跟踪, 设置角色slave |
| | | ss.clusterEventFn(EventJoinCluster) |
| | | ss.onJoinCluster() |
| | | ss.ClusterStatus = "slave" |
| | | ss.sqlDB.LogMode(true) |
| | | |
| | | case "leave": |
| | | // 退出集群, 开启日志跟踪, 设置角色slave |
| | | ss.clusterEventFn(EventLeaveCluster) |
| | | ss.ClusterStatus = "" |
| | | ss.sqlDB.LogMode(false) |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | // 加入集群, 清空本地表, 同步集群内数据 |
| | | func (ss *SyncServer) onJoinCluster() { |
| | | var err error |
| | | |
| | | db := ss.sqlDB |
| | | |
| | | tx := db.Begin() |
| | | defer func() { |
| | | if err != nil && tx != nil { |
| | | tx.Rollback() |
| | | } |
| | | }() |
| | | |
| | | tx.Exec("PRAGMA foreign_keys=OFF") |
| | | //1.删除本地的同步库数据 |
| | | for _, t := range ss.syncTables { |
| | | delSql := "delete from " + t + "" |
| | | |
| | | err = tx.Exec(delSql).Error |
| | | if err != nil { |
| | | fmt.Println("删除本地的同步库数据失败,", err.Error()) |
| | | } |
| | | } |
| | | |
| | | //4.开启reference |
| | | tx.Exec("PRAGMA foreign_keys=ON") |
| | | tx.Commit() |
| | | |
| | | // 拉取集群内的同步库数据到本地数据库表中 |
| | | ss.pubSyncTableMessage() |
| | | } |
| | | |
| | | func (ss *SyncServer) onLeaveCluster() { |
| | | |
| | | } |
| | | |
| | | func (ss *SyncServer) onCreateCluster() { |
| | | |
| | | } |
| | | |
| | | // 查询集群状态, 返回 master, slave, leave |
| | | func (ss *SyncServer) QueryClusterStat() string { |
| | | clusterStatTopic := "/data/api-v/cluster/status" |
| | | req := bhomeclient.Request{ |
| | | Path: clusterStatTopic, |
| | | Method: "POST", |
| | | } |
| | | |
| | | reply, err := ss.bhClient.RequestTopic(ss.ServerId, req, 3000) |
| | | if err != nil { |
| | | fmt.Println("RequestTopic error", err.Error()) |
| | | |
| | | return "" |
| | | } |
| | | |
| | | ss.ClusterStatus = reply.Msg |
| | | |
| | | fmt.Println("当前集群状态:", ss.ClusterStatus) |
| | | |
| | | return reply.Msg |
| | | } |
| | | |
| | | func (ss *SyncServer) handleDbLoggerPrint() { |
| | | sqlBuf := make([]string, 0) |
| | | ticker := time.NewTicker(3 * time.Second) |
| | | sendSize := 0 //serf MaxUserEventSize is 9*1024 |
| | | for { |
| | | select { |
| | | case <-ticker.C: |
| | | if len(sqlBuf) > 0 { |
| | | syncSql := strings.Join(sqlBuf, "") |
| | | |
| | | //fmt.Println("同步sql语句:", syncSql) |
| | | ss.pubSyncSqlMessage([]byte(syncSql), "") |
| | | |
| | | sqlBuf = append([]string{}) |
| | | sendSize = 0 |
| | | } |
| | | case sql := <-syncSqlChan: |
| | | if sendSize+len(sql) > (9*1024 - 1024) { |
| | | if len(sqlBuf) > 0 { |
| | | syncSql := strings.Join(sqlBuf, "") |
| | | //fmt.Println("同步sql语句:", syncSql) |
| | | |
| | | ss.pubSyncSqlMessage([]byte(syncSql), "") |
| | | |
| | | sqlBuf = append([]string{}) |
| | | } |
| | | |
| | | s := strings.TrimRight(sql, ";") |
| | | sqlBuf = append(sqlBuf, s+";") |
| | | sendSize = len(sql) |
| | | } else { |
| | | s := strings.TrimRight(sql, ";") |
| | | sqlBuf = append(sqlBuf, s+";") |
| | | |
| | | sendSize = sendSize + len(sql) |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | func (ss *SyncServer) handleClusterMessage(msg []byte) { |
| | | //fmt.Println("clusterMessage:", string(msg)) |
| | | sql := string(msg) |
| | | |
| | | if len(sql) <= 0 { |
| | | return |
| | | } |
| | | |
| | | db := ss.sqlDB |
| | | if db != nil { |
| | | db.LogMode(false) |
| | | defer db.LogMode(true) |
| | | |
| | | var err error |
| | | tx := db.Begin() |
| | | defer func() { |
| | | if err != nil && tx != nil { |
| | | tx.Rollback() |
| | | } |
| | | }() |
| | | result := tx.Exec(sql) |
| | | err = result.Error |
| | | if err != nil { |
| | | fmt.Println("ExecuteSqlByGorm err:", err, ",sql:", sql) |
| | | } |
| | | if result.RowsAffected == 0 { |
| | | fmt.Println("ExecuteSqlByGorm RowsAffected == 0", ",sql:", sql) |
| | | } |
| | | tx.Commit() |
| | | } |
| | | } |
| | | |
| | | func (ss *SyncServer) handleSyncTableMessage(msg []byte) error { |
| | | targetId := string(msg) |
| | | fmt.Println("同步全量数据给节点:", targetId) |
| | | sqls, err := DumpTables(ss.sqlDB, ss.syncTables) |
| | | if err != nil { |
| | | fmt.Println("DumpTables error, ", err.Error()) |
| | | return err |
| | | } |
| | | |
| | | syncSql := strings.Join(sqls, ";") |
| | | err = ss.pubSyncSqlMessage([]byte(syncSql), targetId) |
| | | |
| | | return err |
| | | } |