From 0bfb4e53db6d0cad8fe7a59945e86ac3adc7744e Mon Sep 17 00:00:00 2001 From: zhangzengfei <zhangzengfei@smartai.com> Date: 星期日, 08 十月 2023 14:12:40 +0800 Subject: [PATCH] 添加数据同步服务 --- serf/sync.go | 370 +++++++++++++++++++++++++++++++++++++ main.go | 42 ++++ serf/sqlite.go | 137 +++++++++++++ serf/config.go | 38 +++ 4 files changed, 586 insertions(+), 1 deletions(-) diff --git a/main.go b/main.go index f9585bf..00d2599 100644 --- a/main.go +++ b/main.go @@ -6,7 +6,9 @@ "apsClient/model" "apsClient/nsq" "apsClient/pkg/logx" + "apsClient/pkg/sqlitex" "apsClient/router" + "apsClient/serf" "apsClient/service/plc_address" "fmt" "net/http" @@ -41,7 +43,27 @@ ////鎻愬墠鍔犺浇浠诲姟 //service.NewTaskService().GetTask() - go shutdown() + //go shutdown() + + // 鍚姩鏁版嵁鍚屾 + var serfStartChan = make(chan bool) + // 闇�瑕佸悓姝ョ殑琛� + var syncTables = []string{ + "procedures", + "process_model", + "production_progress", + "work_order", + } + + agent := serf.InitAgent("apsClient", syncTables, sqlitex.GetDB()) + agent.RegisterClusterEvent(serfClusterEvent) + go agent.Serve(serfStartChan) + + <-serfStartChan + + // 鍒ゆ柇褰撳墠闆嗙兢鐘舵�� + //agent.ClusterStatus == "master" + logx.Infof("apsClient start serve...") server := &http.Server{ Addr: fmt.Sprintf(":%d", conf.Conf.System.Port), @@ -60,3 +82,21 @@ logx.Infof("apsClient exited...") os.Exit(0) } + +func serfClusterEvent(stat int) { + switch stat { + + case serf.EventCreateCluster: + // 鍒涘缓闆嗙兢 + case serf.EventJoinCluster: + // 鍔犲叆闆嗙兢 + case serf.EventLeaveCluster: + // 閫�鍑洪泦缇� + case serf.EventSlave2Master: + // 鍒囨崲涓轰富鑺傜偣 + case serf.EventMaster2Slave: + // 鍒囨崲涓哄瓙鑺傜偣 + } + + fmt.Println("clusterEvent:", stat) +} \ No newline at end of file diff --git a/serf/config.go b/serf/config.go new file mode 100644 index 0000000..70dfa1b --- /dev/null +++ b/serf/config.go @@ -0,0 +1,38 @@ +package serf + +import ( + "fmt" + "github.com/spf13/viper" +) + +type vasystem struct { + ServerName string `mapstructure:"serverName"` + ServerID string `mapstructure:"analyServerId"` +} + +var Vasystem = &vasystem{} + +// Init is an exported method that takes the environment starts the viper +// (external lib) and returns the configuration struct. +func init() { + var err error + v := viper.New() + v.SetConfigType("yaml") + v.SetConfigName("pro") + v.AddConfigPath("") + v.AddConfigPath("../config/") + v.AddConfigPath("./config/") + v.AddConfigPath("/opt/vasystem/config/") + + err = v.ReadInConfig() + if err != nil { + fmt.Println("error on parsing configuration file", err) + } + + read2Conf(v) +} + +func read2Conf(v *viper.Viper) { + v.UnmarshalKey("server", Vasystem) + fmt.Println("ServerID:", Vasystem.ServerID) +} diff --git a/serf/sqlite.go b/serf/sqlite.go new file mode 100644 index 0000000..79021d9 --- /dev/null +++ b/serf/sqlite.go @@ -0,0 +1,137 @@ +package serf + +import ( + "errors" + "fmt" + "regexp" + "strings" + + "github.com/jinzhu/gorm" +) + +type DumpSql struct { + Sql string `json:"sql"` +} + +type TableDesc struct { + Cid int `json:"cid"` + Name string `json:"name"` + Type string `json:"type"` + Notnull bool `json:"notnull"` + DFltValue interface{} `json:"dflt_value"` + Pk int `json:"pk"` +} + +var syncSqlChan = make(chan string, 10) + +func DumpTables(db *gorm.DB, tableNames []string) ([]string, error) { + db.LogMode(false) + defer db.LogMode(true) + + if tableNames != nil { + var arr []string + var dumpSql []DumpSql + + for _, table := range tableNames { + fmt.Println("dump current tableName:", table) + + dumpSql = make([]DumpSql, 0) + var tDescArr []TableDesc + + tSql := fmt.Sprintf(`PRAGMA table_info("%s")`, table) + err := db.Raw(tSql).Scan(&tDescArr).Error + + fmt.Println("tDescArr err:", err, "len(tDescArr)=", len(tDescArr)) + if err != nil { + return nil, errors.New("tableDesc err") + } + + fmt.Println(table, "'Columns is:", tDescArr) + if tDescArr == nil || len(tDescArr) == 0 { + return nil, errors.New(table + " has no column") + } + + var columnNames []string + for _, col := range tDescArr { + columnNames = append(columnNames, fmt.Sprintf(`'||quote("%s")||'`, col.Name)) + } + + tSql = fmt.Sprintf(`SELECT 'INSERT INTO "%s" VALUES(%s)' as sql FROM "%s";`, + table, + strings.Join(columnNames, ","), + table) + //fmt.Println("tSql:", tSql) + + err = db.Raw(tSql).Scan(&dumpSql).Error + if err != nil { + return nil, errors.New("dump err") + } + + if len(dumpSql) > 0 { + for _, d := range dumpSql { + arr = append(arr, d.Sql) + } + } + } + + return arr, nil + } + + return nil, errors.New("tableNames is nil") +} + +type DbLogger struct { +} + +func (dbLogger *DbLogger) Print(values ...interface{}) { + var ( + level = values[0] + ) + + fmt.Println("dblogger", values) + + if level == "sql" { + msgArr := gorm.LogFormatter(values...) + sql := msgArr[3].(string) + sql = strings.TrimPrefix(sql, " ") + if !strings.HasPrefix(sql, "SELECT") && !strings.HasPrefix(sql, "select") && !strings.Contains(sql, "PRAGMA") && !strings.Contains(sql, "pragma") { + affected := values[5].(int64) + if affected > 0 { //鎵ц鎴愬姛 + //鍒ゆ柇鎿嶄綔鐨勬槸鍝紶琛� + whereIdx := strings.Index(sql, "WHERE") + sqlWithTable := sql + if whereIdx > -1 { + sqlWithTable = sql[:whereIdx] + } + + fmt.Println("鍒ゆ柇鏄摢寮犺〃 sqlWithTable:", sqlWithTable) + + insertReg := regexp.MustCompile(`^\s*(?i:insert)\s`) //insert + updateReg := regexp.MustCompile(`^\s*(?i:update)\s`) //update + delReg := regexp.MustCompile(`^\s*(?i:delete)\s`) //delete + + if insertReg.MatchString(sqlWithTable) { + fmt.Println("鎻掑叆鎿嶄綔") + for _, t := range agent.syncTables { + reg := regexp.MustCompile(`\s+\"?(?i:` + t + `)\"?\s+`) + if reg.MatchString(sqlWithTable) { + fmt.Println("灞炰簬鍚屾琛�:", t) + syncSqlChan <- sql + } + } + } else if updateReg.MatchString(sqlWithTable) || delReg.MatchString(sqlWithTable) { + fmt.Println("鍒犻櫎鎴栬�呮洿鏂�") + for _, t := range agent.syncTables { + reg := regexp.MustCompile(`\s+\"?(?i:` + t + `)\"?\s+`) + if reg.MatchString(sqlWithTable) { + fmt.Println("灞炰簬鍚屾琛�:", t) + syncSqlChan <- sql + } + } + } + } + } + } else { + fmt.Println("dbLogger level!=sql") + } +} diff --git a/serf/sync.go b/serf/sync.go new file mode 100644 index 0000000..b49eebf --- /dev/null +++ b/serf/sync.go @@ -0,0 +1,370 @@ +package serf + +import ( + "context" + "encoding/json" + "fmt" + "os" + "os/signal" + "strings" + "syscall" + "time" + + "basic.com/pubsub/protomsg.git" + "basic.com/valib/bhomeclient.git" + "basic.com/valib/bhomedbapi.git" + + "github.com/gogo/protobuf/proto" + "github.com/jinzhu/gorm" +) + +var ( + agent = SyncServer{} +) + +const ( + serfSyncTopic = "sync-proc-message-to-serf" + + EventCreateCluster = 0 + EventJoinCluster = 1 + EventLeaveCluster = 2 + EventMaster2Slave = 3 + EventSlave2Master = 4 +) + +type ProcMessageEvent struct { + Owner string `json:"owner"` // 鍙戦�佽�� + Target string `json:"target"` // 鎸囧畾鎺ユ敹鑰� + Proc string `json:"procName"` // 杩涚▼鍚� + Topic string `json:"topic"` // 涓婚 + Payload []byte `json:"payload"` // 娑堟伅浣�,鑷瑙f瀽 +} + +type SyncServer struct { + ProcName string // 杩涚▼鍚嶇О + ServerId string // 鏈満id + ClusterStatus string // 闆嗙兢鐘舵�� master/slave 涓虹┖琛ㄧず鏈姞鍏ラ泦缇� + syncSqlTopic string // 鍚屾sql娑堟伅鐨勪富棰� + queryTableTopic string // 鍔犲叆闆嗙兢鍚庤姹傞泦缇ゆ暟鎹殑涓婚 + syncTables []string // 闇�瑕佸悓姝ョ殑琛� + sqlDB *gorm.DB // 鏁版嵁搴� + bhClient *bhomeclient.MicroNode + clusterEventFn func(int) +} + +func InitAgent(procName string, syncTables []string, db *gorm.DB) *SyncServer { + agent.ProcName = procName + agent.ServerId = Vasystem.ServerID + agent.sqlDB = db + agent.syncTables = syncTables + agent.syncSqlTopic = procName + "/serf/sync/sql" + agent.queryTableTopic = procName + "/serf/query/sqls" + + // 璁剧疆鏃ュ織鍥炶皟 + db.SetLogger(&DbLogger{}) + // 鍏堝叧闂棩蹇� + db.LogMode(false) + + return &agent +} + +func (ss *SyncServer) RegisterClusterEvent(fn func(int)) { + ss.clusterEventFn = fn +} + +func (ss *SyncServer) Serve(initChan chan bool) { + proc := &bhomeclient.ProcInfo{ + Name: ss.ProcName, //杩涚▼鍚嶇О + ID: ss.ProcName, //杩涚▼id + Info: "", //杩涚▼鐨勬弿杩颁俊鎭紝鐢ㄤ簬鍖哄垎鍚屼竴杩涚▼鍚嶇О涓嬪涓繘绋� + } + + ctx, cancel := context.WithCancel(context.Background()) + var reg = &bhomeclient.RegisterInfo{ + Proc: *proc, + Channel: nil, + PubTopic: []string{}, + SubTopic: []string{bhomeclient.Proc_System_Service, ss.syncSqlTopic, ss.queryTableTopic}, + SubNetTopic: []string{}, + } + + q := make(chan os.Signal, 1) + signal.Notify(q, os.Interrupt, os.Kill, syscall.SIGTERM) + + client, err := bhomeclient.NewMicroNode(ctx, q, ss.ServerId, reg, nil) + if err != nil { + initChan <- false + return + } + + bhomedbapi.InitGetNetNode(client.GetLocalNetNodeByTopic) + bhomedbapi.InitDoReq(client.RequestOnly) + //bhomedbapi.InitLog(logger.Debug) + + go client.StartServer(nil) + + ss.bhClient = client + + go ss.subBusMessage(ctx) + + go ss.handleDbLoggerPrint() + + // 鍚姩鍚庢煡璇竴娆¢泦缇ょ姸鎬� + ss.QueryClusterStat() + + if ss.ClusterStatus != "" { + ss.sqlDB.LogMode(true) + } + + initChan <- true + <-q + + client.DeRegister() + cancel() + client.Free() + + os.Exit(0) +} + +func (ss *SyncServer) pubSyncSqlMessage(payload []byte, targetId string) error { + var msg = ProcMessageEvent{ + Owner: ss.ServerId, + Target: targetId, + Proc: ss.ProcName, + Topic: ss.syncSqlTopic, + Payload: payload, + } + + b, err := json.Marshal(msg) + if err != nil { + return err + } + + return ss.bhClient.Publish(serfSyncTopic, b) +} + +// 璇锋眰鍚屾琛ㄧ殑鍏ㄩ噺鏁版嵁, 鍙戦�佽嚜宸辩殑id +func (ss *SyncServer) pubSyncTableMessage() error { + var msg = ProcMessageEvent{ + Owner: ss.ServerId, + Proc: ss.ProcName, + Topic: ss.queryTableTopic, + Payload: []byte(ss.ServerId), + } + + b, err := json.Marshal(msg) + if err != nil { + return err + } + + fmt.Println("鍔犲叆闆嗙兢, 璇锋眰鍚屾鍏ㄩ噺鏁版嵁,id:", ss.ServerId) + return ss.bhClient.Publish(serfSyncTopic, b) +} + +func (ss *SyncServer) subBusMessage(ctx context.Context) { + //fmt.Println("sub bus msg") + + for { + select { + case <-ctx.Done(): + fmt.Println("sub bus msg exit") + return + case busMsg := <-ss.bhClient.SubCh: + if string(busMsg.Topic) == ss.syncSqlTopic { + ss.handleClusterMessage(busMsg.Data) + } + + // 澶勭悊鍚屾鍏ㄩ噺鏁版嵁鐨勮姹� + if string(busMsg.Topic) == ss.queryTableTopic { + if ss.ClusterStatus == "master" { + fmt.Println("鎺ユ敹鍒板悓姝ュ叏閲忔暟鎹姹�") + ss.handleSyncTableMessage(busMsg.Data) + } + } + + // system-service鍙戦�佺殑娑堟伅 + if string(busMsg.Topic) == bhomeclient.Proc_System_Service { + var clusterMsg = &protomsg.DbChangeMessage{} + + if err := proto.Unmarshal(busMsg.Data, clusterMsg); err != nil { + if err = json.Unmarshal(busMsg.Data, clusterMsg); err != nil { + fmt.Println("proto.Unmarshal ", err.Error()) + continue + } + } + + if clusterMsg.Table == protomsg.TableChanged_T_Cluster { + switch clusterMsg.Info { + case "create": + // 鍒涘缓闆嗙兢, 寮�鍚棩蹇楄窡韪�, 璁剧疆瑙掕壊master + ss.clusterEventFn(EventCreateCluster) + ss.ClusterStatus = "master" + ss.sqlDB.LogMode(true) + + case "join": + // 鍔犲叆闆嗙兢, 寮�鍚棩蹇楄窡韪�, 璁剧疆瑙掕壊slave + ss.clusterEventFn(EventJoinCluster) + ss.onJoinCluster() + ss.ClusterStatus = "slave" + ss.sqlDB.LogMode(true) + + case "leave": + // 閫�鍑洪泦缇�, 寮�鍚棩蹇楄窡韪�, 璁剧疆瑙掕壊slave + ss.clusterEventFn(EventLeaveCluster) + ss.ClusterStatus = "" + ss.sqlDB.LogMode(false) + } + } + } + } + } +} + +// 鍔犲叆闆嗙兢, 娓呯┖鏈湴琛�, 鍚屾闆嗙兢鍐呮暟鎹� +func (ss *SyncServer) onJoinCluster() { + var err error + + db := ss.sqlDB + + tx := db.Begin() + defer func() { + if err != nil && tx != nil { + tx.Rollback() + } + }() + + tx.Exec("PRAGMA foreign_keys=OFF") + //1.鍒犻櫎鏈湴鐨勫悓姝ュ簱鏁版嵁 + for _, t := range ss.syncTables { + delSql := "delete from " + t + "" + + err = tx.Exec(delSql).Error + if err != nil { + fmt.Println("鍒犻櫎鏈湴鐨勫悓姝ュ簱鏁版嵁澶辫触,", err.Error()) + } + } + + //4.寮�鍚痳eference + tx.Exec("PRAGMA foreign_keys=ON") + tx.Commit() + + // 鎷夊彇闆嗙兢鍐呯殑鍚屾搴撴暟鎹埌鏈湴鏁版嵁搴撹〃涓� + ss.pubSyncTableMessage() +} + +func (ss *SyncServer) onLeaveCluster() { + +} + +func (ss *SyncServer) onCreateCluster() { + +} + +// 鏌ヨ闆嗙兢鐘舵��, 杩斿洖 master, slave, leave +func (ss *SyncServer) QueryClusterStat() string { + clusterStatTopic := "/data/api-v/cluster/status" + req := bhomeclient.Request{ + Path: clusterStatTopic, + Method: "POST", + } + + reply, err := ss.bhClient.RequestTopic(ss.ServerId, req, 3000) + if err != nil { + fmt.Println("RequestTopic error", err.Error()) + + return "" + } + + ss.ClusterStatus = reply.Msg + + fmt.Println("褰撳墠闆嗙兢鐘舵��:", ss.ClusterStatus) + + return reply.Msg +} + +func (ss *SyncServer) handleDbLoggerPrint() { + sqlBuf := make([]string, 0) + ticker := time.NewTicker(3 * time.Second) + sendSize := 0 //serf MaxUserEventSize is 9*1024 + for { + select { + case <-ticker.C: + if len(sqlBuf) > 0 { + syncSql := strings.Join(sqlBuf, "") + + //fmt.Println("鍚屾sql璇彞:", syncSql) + ss.pubSyncSqlMessage([]byte(syncSql), "") + + sqlBuf = append([]string{}) + sendSize = 0 + } + case sql := <-syncSqlChan: + if sendSize+len(sql) > (9*1024 - 1024) { + if len(sqlBuf) > 0 { + syncSql := strings.Join(sqlBuf, "") + //fmt.Println("鍚屾sql璇彞:", syncSql) + + ss.pubSyncSqlMessage([]byte(syncSql), "") + + sqlBuf = append([]string{}) + } + + s := strings.TrimRight(sql, ";") + sqlBuf = append(sqlBuf, s+";") + sendSize = len(sql) + } else { + s := strings.TrimRight(sql, ";") + sqlBuf = append(sqlBuf, s+";") + + sendSize = sendSize + len(sql) + } + } + } +} + +func (ss *SyncServer) handleClusterMessage(msg []byte) { + //fmt.Println("clusterMessage:", string(msg)) + sql := string(msg) + + if len(sql) <= 0 { + return + } + + db := ss.sqlDB + if db != nil { + db.LogMode(false) + defer db.LogMode(true) + + var err error + tx := db.Begin() + defer func() { + if err != nil && tx != nil { + tx.Rollback() + } + }() + result := tx.Exec(sql) + err = result.Error + if err != nil { + fmt.Println("ExecuteSqlByGorm err:", err, ",sql:", sql) + } + if result.RowsAffected == 0 { + fmt.Println("ExecuteSqlByGorm RowsAffected == 0", ",sql:", sql) + } + tx.Commit() + } +} + +func (ss *SyncServer) handleSyncTableMessage(msg []byte) error { + targetId := string(msg) + fmt.Println("鍚屾鍏ㄩ噺鏁版嵁缁欒妭鐐�:", targetId) + sqls, err := DumpTables(ss.sqlDB, ss.syncTables) + if err != nil { + fmt.Println("DumpTables error, ", err.Error()) + return err + } + + syncSql := strings.Join(sqls, ";") + err = ss.pubSyncSqlMessage([]byte(syncSql), targetId) + + return err +} -- Gitblit v1.8.0