| | |
| | | package serf |
| | | |
| | | import ( |
| | | "apsClient/pkg/logx" |
| | | "context" |
| | | "encoding/json" |
| | | "fmt" |
| | | "github.com/mitchellh/mapstructure" |
| | | "os" |
| | | "os/signal" |
| | | "regexp" |
| | |
| | | "syscall" |
| | | "time" |
| | | |
| | | "apsClient/pkg/logx" |
| | | |
| | | "basic.com/pubsub/protomsg.git" |
| | | "basic.com/valib/bhomeclient.git" |
| | | "basic.com/valib/bhomedbapi.git" |
| | | |
| | | "github.com/gogo/protobuf/proto" |
| | | "github.com/jinzhu/gorm" |
| | | "github.com/satori/go.uuid" |
| | | "github.com/mitchellh/mapstructure" |
| | | "github.com/muesli/cache2go" |
| | | ) |
| | | |
| | | var ( |
| | |
| | | dependProcs = []string{ |
| | | bhomeclient.Proc_System_Service, |
| | | } |
| | | |
| | | sqlMsgSeqCache = cache2go.Cache("syncSqlMsg") |
| | | ) |
| | | |
| | | const ( |
| | |
| | | Proc string `json:"procName"` // 进程名 |
| | | Topic string `json:"topic"` // 主题 |
| | | Payload []byte `json:"payload"` // 消息体,自行解析 |
| | | } |
| | | |
| | | type SqlMsg struct { |
| | | Id string |
| | | Sql string |
| | | Version string |
| | | } |
| | | |
| | | type SyncServer struct { |
| | |
| | | os.Exit(0) |
| | | } |
| | | |
| | | func (ss *SyncServer) pubSyncSqlMessage(payload []byte, targetId string) error { |
| | | func (ss *SyncServer) pubSyncSqlMessage(sql string, targetId string) error { |
| | | sqlMsg := SqlMsg{ |
| | | Id: uuid.NewV4().String(), |
| | | Sql: sql, |
| | | } |
| | | |
| | | bMsg, _ := json.Marshal(sqlMsg) |
| | | |
| | | var msg = ProcMessageEvent{ |
| | | Owner: ss.ServerId, |
| | | Target: targetId, |
| | | Proc: ss.ProcName, |
| | | Topic: ss.syncSqlTopic, |
| | | Payload: payload, |
| | | Payload: bMsg, |
| | | } |
| | | |
| | | b, err := json.Marshal(msg) |
| | |
| | | syncSql := strings.Join(sqlBuf, "") |
| | | |
| | | //fmt.Println("同步sql语句:", syncSql) |
| | | ss.pubSyncSqlMessage([]byte(syncSql), "") |
| | | ss.pubSyncSqlMessage(syncSql, "") |
| | | |
| | | sqlBuf = append([]string{}) |
| | | sendSize = 0 |
| | |
| | | syncSql := strings.Join(sqlBuf, "") |
| | | //fmt.Println("同步sql语句:", syncSql) |
| | | |
| | | ss.pubSyncSqlMessage([]byte(syncSql), "") |
| | | ss.pubSyncSqlMessage(syncSql, "") |
| | | |
| | | sqlBuf = append([]string{}) |
| | | } |
| | |
| | | } |
| | | } |
| | | |
| | | func (ss *SyncServer) handleClusterMessage(msg []byte) { |
| | | logx.Infof("clusterMessage:", string(msg)) |
| | | sql := string(msg) |
| | | func (ss *SyncServer) handleClusterMessage(clusterMsgData []byte) { |
| | | var msg SqlMsg |
| | | err := json.Unmarshal(clusterMsgData,&msg) |
| | | if err != nil { |
| | | logx.Errorf(" Unmarshal cluster message error, %s",err.Error()) |
| | | return |
| | | } |
| | | |
| | | // 判断消息是否曾经接收过 |
| | | if sqlMsgSeqCache.Exists(msg.Id) { |
| | | logx.Infof("clusterMessage:接收到重复消息, %s", msg.Sql) |
| | | return |
| | | } |
| | | |
| | | // 记录消息id, 半小时过期 |
| | | sqlMsgSeqCache.Add(msg.Id, 30*time.Minute, true) |
| | | |
| | | logx.Infof("clusterMessage:%s", msg.Sql) |
| | | sql := msg.Sql |
| | | |
| | | if len(sql) <= 0 { |
| | | return |
| | |
| | | logx.Infof("DumpTables sql:%v", sqls) |
| | | syncSql := strings.Join(sqls, ";") |
| | | if len(syncSql) < sizeLimit { |
| | | err = ss.pubSyncSqlMessage([]byte(syncSql), targetId) |
| | | err = ss.pubSyncSqlMessage(syncSql, targetId) |
| | | } else { |
| | | shard := "" |
| | | for _, sql := range sqls { |
| | | if len(shard)+len(sql) > sizeLimit { |
| | | err = ss.pubSyncSqlMessage([]byte(shard), targetId) |
| | | err = ss.pubSyncSqlMessage(shard, targetId) |
| | | shard = "" |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | if len(shard) > 0 { |
| | | err = ss.pubSyncSqlMessage([]byte(shard), targetId) |
| | | err = ss.pubSyncSqlMessage(shard, targetId) |
| | | } |
| | | } |
| | | |