From 0bfb4e53db6d0cad8fe7a59945e86ac3adc7744e Mon Sep 17 00:00:00 2001
From: zhangzengfei <zhangzengfei@smartai.com>
Date: 星期日, 08 十月 2023 14:12:40 +0800
Subject: [PATCH] 添加数据同步服务
---
serf/sync.go | 370 +++++++++++++++++++++++++++++++++++++
main.go | 42 ++++
serf/sqlite.go | 137 +++++++++++++
serf/config.go | 38 +++
4 files changed, 586 insertions(+), 1 deletions(-)
diff --git a/main.go b/main.go
index f9585bf..00d2599 100644
--- a/main.go
+++ b/main.go
@@ -6,7 +6,9 @@
"apsClient/model"
"apsClient/nsq"
"apsClient/pkg/logx"
+ "apsClient/pkg/sqlitex"
"apsClient/router"
+ "apsClient/serf"
"apsClient/service/plc_address"
"fmt"
"net/http"
@@ -41,7 +43,27 @@
////鎻愬墠鍔犺浇浠诲姟
//service.NewTaskService().GetTask()
- go shutdown()
+ //go shutdown()
+
+ // 鍚姩鏁版嵁鍚屾
+ var serfStartChan = make(chan bool)
+ // 闇�瑕佸悓姝ョ殑琛�
+ var syncTables = []string{
+ "procedures",
+ "process_model",
+ "production_progress",
+ "work_order",
+ }
+
+ agent := serf.InitAgent("apsClient", syncTables, sqlitex.GetDB())
+ agent.RegisterClusterEvent(serfClusterEvent)
+ go agent.Serve(serfStartChan)
+
+ <-serfStartChan
+
+ // 鍒ゆ柇褰撳墠闆嗙兢鐘舵��
+ //agent.ClusterStatus == "master"
+
logx.Infof("apsClient start serve...")
server := &http.Server{
Addr: fmt.Sprintf(":%d", conf.Conf.System.Port),
@@ -60,3 +82,21 @@
logx.Infof("apsClient exited...")
os.Exit(0)
}
+
+func serfClusterEvent(stat int) {
+ switch stat {
+
+ case serf.EventCreateCluster:
+ // 鍒涘缓闆嗙兢
+ case serf.EventJoinCluster:
+ // 鍔犲叆闆嗙兢
+ case serf.EventLeaveCluster:
+ // 閫�鍑洪泦缇�
+ case serf.EventSlave2Master:
+ // 鍒囨崲涓轰富鑺傜偣
+ case serf.EventMaster2Slave:
+ // 鍒囨崲涓哄瓙鑺傜偣
+ }
+
+ fmt.Println("clusterEvent:", stat)
+}
\ No newline at end of file
diff --git a/serf/config.go b/serf/config.go
new file mode 100644
index 0000000..70dfa1b
--- /dev/null
+++ b/serf/config.go
@@ -0,0 +1,38 @@
+package serf
+
+import (
+ "fmt"
+ "github.com/spf13/viper"
+)
+
+type vasystem struct {
+ ServerName string `mapstructure:"serverName"`
+ ServerID string `mapstructure:"analyServerId"`
+}
+
+var Vasystem = &vasystem{}
+
+// Init is an exported method that takes the environment starts the viper
+// (external lib) and returns the configuration struct.
+func init() {
+ var err error
+ v := viper.New()
+ v.SetConfigType("yaml")
+ v.SetConfigName("pro")
+ v.AddConfigPath("")
+ v.AddConfigPath("../config/")
+ v.AddConfigPath("./config/")
+ v.AddConfigPath("/opt/vasystem/config/")
+
+ err = v.ReadInConfig()
+ if err != nil {
+ fmt.Println("error on parsing configuration file", err)
+ }
+
+ read2Conf(v)
+}
+
+func read2Conf(v *viper.Viper) {
+ v.UnmarshalKey("server", Vasystem)
+ fmt.Println("ServerID:", Vasystem.ServerID)
+}
diff --git a/serf/sqlite.go b/serf/sqlite.go
new file mode 100644
index 0000000..79021d9
--- /dev/null
+++ b/serf/sqlite.go
@@ -0,0 +1,137 @@
+package serf
+
+import (
+ "errors"
+ "fmt"
+ "regexp"
+ "strings"
+
+ "github.com/jinzhu/gorm"
+)
+
+type DumpSql struct {
+ Sql string `json:"sql"`
+}
+
+type TableDesc struct {
+ Cid int `json:"cid"`
+ Name string `json:"name"`
+ Type string `json:"type"`
+ Notnull bool `json:"notnull"`
+ DFltValue interface{} `json:"dflt_value"`
+ Pk int `json:"pk"`
+}
+
+var syncSqlChan = make(chan string, 10)
+
+func DumpTables(db *gorm.DB, tableNames []string) ([]string, error) {
+ db.LogMode(false)
+ defer db.LogMode(true)
+
+ if tableNames != nil {
+ var arr []string
+ var dumpSql []DumpSql
+
+ for _, table := range tableNames {
+ fmt.Println("dump current tableName:", table)
+
+ dumpSql = make([]DumpSql, 0)
+ var tDescArr []TableDesc
+
+ tSql := fmt.Sprintf(`PRAGMA table_info("%s")`, table)
+ err := db.Raw(tSql).Scan(&tDescArr).Error
+
+ fmt.Println("tDescArr err:", err, "len(tDescArr)=", len(tDescArr))
+ if err != nil {
+ return nil, errors.New("tableDesc err")
+ }
+
+ fmt.Println(table, "'Columns is:", tDescArr)
+ if tDescArr == nil || len(tDescArr) == 0 {
+ return nil, errors.New(table + " has no column")
+ }
+
+ var columnNames []string
+ for _, col := range tDescArr {
+ columnNames = append(columnNames, fmt.Sprintf(`'||quote("%s")||'`, col.Name))
+ }
+
+ tSql = fmt.Sprintf(`SELECT 'INSERT INTO "%s" VALUES(%s)' as sql FROM "%s";`,
+ table,
+ strings.Join(columnNames, ","),
+ table)
+ //fmt.Println("tSql:", tSql)
+
+ err = db.Raw(tSql).Scan(&dumpSql).Error
+ if err != nil {
+ return nil, errors.New("dump err")
+ }
+
+ if len(dumpSql) > 0 {
+ for _, d := range dumpSql {
+ arr = append(arr, d.Sql)
+ }
+ }
+ }
+
+ return arr, nil
+ }
+
+ return nil, errors.New("tableNames is nil")
+}
+
+type DbLogger struct {
+}
+
+func (dbLogger *DbLogger) Print(values ...interface{}) {
+ var (
+ level = values[0]
+ )
+
+ fmt.Println("dblogger", values)
+
+ if level == "sql" {
+ msgArr := gorm.LogFormatter(values...)
+ sql := msgArr[3].(string)
+ sql = strings.TrimPrefix(sql, " ")
+ if !strings.HasPrefix(sql, "SELECT") && !strings.HasPrefix(sql, "select") && !strings.Contains(sql, "PRAGMA") && !strings.Contains(sql, "pragma") {
+ affected := values[5].(int64)
+ if affected > 0 { //鎵ц鎴愬姛
+ //鍒ゆ柇鎿嶄綔鐨勬槸鍝紶琛�
+ whereIdx := strings.Index(sql, "WHERE")
+ sqlWithTable := sql
+ if whereIdx > -1 {
+ sqlWithTable = sql[:whereIdx]
+ }
+
+ fmt.Println("鍒ゆ柇鏄摢寮犺〃 sqlWithTable:", sqlWithTable)
+
+ insertReg := regexp.MustCompile(`^\s*(?i:insert)\s`) //insert
+ updateReg := regexp.MustCompile(`^\s*(?i:update)\s`) //update
+ delReg := regexp.MustCompile(`^\s*(?i:delete)\s`) //delete
+
+ if insertReg.MatchString(sqlWithTable) {
+ fmt.Println("鎻掑叆鎿嶄綔")
+ for _, t := range agent.syncTables {
+ reg := regexp.MustCompile(`\s+\"?(?i:` + t + `)\"?\s+`)
+ if reg.MatchString(sqlWithTable) {
+ fmt.Println("灞炰簬鍚屾琛�:", t)
+ syncSqlChan <- sql
+ }
+ }
+ } else if updateReg.MatchString(sqlWithTable) || delReg.MatchString(sqlWithTable) {
+ fmt.Println("鍒犻櫎鎴栬�呮洿鏂�")
+ for _, t := range agent.syncTables {
+ reg := regexp.MustCompile(`\s+\"?(?i:` + t + `)\"?\s+`)
+ if reg.MatchString(sqlWithTable) {
+ fmt.Println("灞炰簬鍚屾琛�:", t)
+ syncSqlChan <- sql
+ }
+ }
+ }
+ }
+ }
+ } else {
+ fmt.Println("dbLogger level!=sql")
+ }
+}
diff --git a/serf/sync.go b/serf/sync.go
new file mode 100644
index 0000000..b49eebf
--- /dev/null
+++ b/serf/sync.go
@@ -0,0 +1,370 @@
+package serf
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "os"
+ "os/signal"
+ "strings"
+ "syscall"
+ "time"
+
+ "basic.com/pubsub/protomsg.git"
+ "basic.com/valib/bhomeclient.git"
+ "basic.com/valib/bhomedbapi.git"
+
+ "github.com/gogo/protobuf/proto"
+ "github.com/jinzhu/gorm"
+)
+
+var (
+ agent = SyncServer{}
+)
+
+const (
+ serfSyncTopic = "sync-proc-message-to-serf"
+
+ EventCreateCluster = 0
+ EventJoinCluster = 1
+ EventLeaveCluster = 2
+ EventMaster2Slave = 3
+ EventSlave2Master = 4
+)
+
+type ProcMessageEvent struct {
+ Owner string `json:"owner"` // 鍙戦�佽��
+ Target string `json:"target"` // 鎸囧畾鎺ユ敹鑰�
+ Proc string `json:"procName"` // 杩涚▼鍚�
+ Topic string `json:"topic"` // 涓婚
+ Payload []byte `json:"payload"` // 娑堟伅浣�,鑷瑙f瀽
+}
+
+type SyncServer struct {
+ ProcName string // 杩涚▼鍚嶇О
+ ServerId string // 鏈満id
+ ClusterStatus string // 闆嗙兢鐘舵�� master/slave 涓虹┖琛ㄧず鏈姞鍏ラ泦缇�
+ syncSqlTopic string // 鍚屾sql娑堟伅鐨勪富棰�
+ queryTableTopic string // 鍔犲叆闆嗙兢鍚庤姹傞泦缇ゆ暟鎹殑涓婚
+ syncTables []string // 闇�瑕佸悓姝ョ殑琛�
+ sqlDB *gorm.DB // 鏁版嵁搴�
+ bhClient *bhomeclient.MicroNode
+ clusterEventFn func(int)
+}
+
+func InitAgent(procName string, syncTables []string, db *gorm.DB) *SyncServer {
+ agent.ProcName = procName
+ agent.ServerId = Vasystem.ServerID
+ agent.sqlDB = db
+ agent.syncTables = syncTables
+ agent.syncSqlTopic = procName + "/serf/sync/sql"
+ agent.queryTableTopic = procName + "/serf/query/sqls"
+
+ // 璁剧疆鏃ュ織鍥炶皟
+ db.SetLogger(&DbLogger{})
+ // 鍏堝叧闂棩蹇�
+ db.LogMode(false)
+
+ return &agent
+}
+
+func (ss *SyncServer) RegisterClusterEvent(fn func(int)) {
+ ss.clusterEventFn = fn
+}
+
+func (ss *SyncServer) Serve(initChan chan bool) {
+ proc := &bhomeclient.ProcInfo{
+ Name: ss.ProcName, //杩涚▼鍚嶇О
+ ID: ss.ProcName, //杩涚▼id
+ Info: "", //杩涚▼鐨勬弿杩颁俊鎭紝鐢ㄤ簬鍖哄垎鍚屼竴杩涚▼鍚嶇О涓嬪涓繘绋�
+ }
+
+ ctx, cancel := context.WithCancel(context.Background())
+ var reg = &bhomeclient.RegisterInfo{
+ Proc: *proc,
+ Channel: nil,
+ PubTopic: []string{},
+ SubTopic: []string{bhomeclient.Proc_System_Service, ss.syncSqlTopic, ss.queryTableTopic},
+ SubNetTopic: []string{},
+ }
+
+ q := make(chan os.Signal, 1)
+ signal.Notify(q, os.Interrupt, os.Kill, syscall.SIGTERM)
+
+ client, err := bhomeclient.NewMicroNode(ctx, q, ss.ServerId, reg, nil)
+ if err != nil {
+ initChan <- false
+ return
+ }
+
+ bhomedbapi.InitGetNetNode(client.GetLocalNetNodeByTopic)
+ bhomedbapi.InitDoReq(client.RequestOnly)
+ //bhomedbapi.InitLog(logger.Debug)
+
+ go client.StartServer(nil)
+
+ ss.bhClient = client
+
+ go ss.subBusMessage(ctx)
+
+ go ss.handleDbLoggerPrint()
+
+ // 鍚姩鍚庢煡璇竴娆¢泦缇ょ姸鎬�
+ ss.QueryClusterStat()
+
+ if ss.ClusterStatus != "" {
+ ss.sqlDB.LogMode(true)
+ }
+
+ initChan <- true
+ <-q
+
+ client.DeRegister()
+ cancel()
+ client.Free()
+
+ os.Exit(0)
+}
+
+func (ss *SyncServer) pubSyncSqlMessage(payload []byte, targetId string) error {
+ var msg = ProcMessageEvent{
+ Owner: ss.ServerId,
+ Target: targetId,
+ Proc: ss.ProcName,
+ Topic: ss.syncSqlTopic,
+ Payload: payload,
+ }
+
+ b, err := json.Marshal(msg)
+ if err != nil {
+ return err
+ }
+
+ return ss.bhClient.Publish(serfSyncTopic, b)
+}
+
+// 璇锋眰鍚屾琛ㄧ殑鍏ㄩ噺鏁版嵁, 鍙戦�佽嚜宸辩殑id
+func (ss *SyncServer) pubSyncTableMessage() error {
+ var msg = ProcMessageEvent{
+ Owner: ss.ServerId,
+ Proc: ss.ProcName,
+ Topic: ss.queryTableTopic,
+ Payload: []byte(ss.ServerId),
+ }
+
+ b, err := json.Marshal(msg)
+ if err != nil {
+ return err
+ }
+
+ fmt.Println("鍔犲叆闆嗙兢, 璇锋眰鍚屾鍏ㄩ噺鏁版嵁,id:", ss.ServerId)
+ return ss.bhClient.Publish(serfSyncTopic, b)
+}
+
+func (ss *SyncServer) subBusMessage(ctx context.Context) {
+ //fmt.Println("sub bus msg")
+
+ for {
+ select {
+ case <-ctx.Done():
+ fmt.Println("sub bus msg exit")
+ return
+ case busMsg := <-ss.bhClient.SubCh:
+ if string(busMsg.Topic) == ss.syncSqlTopic {
+ ss.handleClusterMessage(busMsg.Data)
+ }
+
+ // 澶勭悊鍚屾鍏ㄩ噺鏁版嵁鐨勮姹�
+ if string(busMsg.Topic) == ss.queryTableTopic {
+ if ss.ClusterStatus == "master" {
+ fmt.Println("鎺ユ敹鍒板悓姝ュ叏閲忔暟鎹姹�")
+ ss.handleSyncTableMessage(busMsg.Data)
+ }
+ }
+
+ // system-service鍙戦�佺殑娑堟伅
+ if string(busMsg.Topic) == bhomeclient.Proc_System_Service {
+ var clusterMsg = &protomsg.DbChangeMessage{}
+
+ if err := proto.Unmarshal(busMsg.Data, clusterMsg); err != nil {
+ if err = json.Unmarshal(busMsg.Data, clusterMsg); err != nil {
+ fmt.Println("proto.Unmarshal ", err.Error())
+ continue
+ }
+ }
+
+ if clusterMsg.Table == protomsg.TableChanged_T_Cluster {
+ switch clusterMsg.Info {
+ case "create":
+ // 鍒涘缓闆嗙兢, 寮�鍚棩蹇楄窡韪�, 璁剧疆瑙掕壊master
+ ss.clusterEventFn(EventCreateCluster)
+ ss.ClusterStatus = "master"
+ ss.sqlDB.LogMode(true)
+
+ case "join":
+ // 鍔犲叆闆嗙兢, 寮�鍚棩蹇楄窡韪�, 璁剧疆瑙掕壊slave
+ ss.clusterEventFn(EventJoinCluster)
+ ss.onJoinCluster()
+ ss.ClusterStatus = "slave"
+ ss.sqlDB.LogMode(true)
+
+ case "leave":
+ // 閫�鍑洪泦缇�, 寮�鍚棩蹇楄窡韪�, 璁剧疆瑙掕壊slave
+ ss.clusterEventFn(EventLeaveCluster)
+ ss.ClusterStatus = ""
+ ss.sqlDB.LogMode(false)
+ }
+ }
+ }
+ }
+ }
+}
+
+// 鍔犲叆闆嗙兢, 娓呯┖鏈湴琛�, 鍚屾闆嗙兢鍐呮暟鎹�
+func (ss *SyncServer) onJoinCluster() {
+ var err error
+
+ db := ss.sqlDB
+
+ tx := db.Begin()
+ defer func() {
+ if err != nil && tx != nil {
+ tx.Rollback()
+ }
+ }()
+
+ tx.Exec("PRAGMA foreign_keys=OFF")
+ //1.鍒犻櫎鏈湴鐨勫悓姝ュ簱鏁版嵁
+ for _, t := range ss.syncTables {
+ delSql := "delete from " + t + ""
+
+ err = tx.Exec(delSql).Error
+ if err != nil {
+ fmt.Println("鍒犻櫎鏈湴鐨勫悓姝ュ簱鏁版嵁澶辫触,", err.Error())
+ }
+ }
+
+ //4.寮�鍚痳eference
+ tx.Exec("PRAGMA foreign_keys=ON")
+ tx.Commit()
+
+ // 鎷夊彇闆嗙兢鍐呯殑鍚屾搴撴暟鎹埌鏈湴鏁版嵁搴撹〃涓�
+ ss.pubSyncTableMessage()
+}
+
+func (ss *SyncServer) onLeaveCluster() {
+
+}
+
+func (ss *SyncServer) onCreateCluster() {
+
+}
+
+// 鏌ヨ闆嗙兢鐘舵��, 杩斿洖 master, slave, leave
+func (ss *SyncServer) QueryClusterStat() string {
+ clusterStatTopic := "/data/api-v/cluster/status"
+ req := bhomeclient.Request{
+ Path: clusterStatTopic,
+ Method: "POST",
+ }
+
+ reply, err := ss.bhClient.RequestTopic(ss.ServerId, req, 3000)
+ if err != nil {
+ fmt.Println("RequestTopic error", err.Error())
+
+ return ""
+ }
+
+ ss.ClusterStatus = reply.Msg
+
+ fmt.Println("褰撳墠闆嗙兢鐘舵��:", ss.ClusterStatus)
+
+ return reply.Msg
+}
+
+func (ss *SyncServer) handleDbLoggerPrint() {
+ sqlBuf := make([]string, 0)
+ ticker := time.NewTicker(3 * time.Second)
+ sendSize := 0 //serf MaxUserEventSize is 9*1024
+ for {
+ select {
+ case <-ticker.C:
+ if len(sqlBuf) > 0 {
+ syncSql := strings.Join(sqlBuf, "")
+
+ //fmt.Println("鍚屾sql璇彞:", syncSql)
+ ss.pubSyncSqlMessage([]byte(syncSql), "")
+
+ sqlBuf = append([]string{})
+ sendSize = 0
+ }
+ case sql := <-syncSqlChan:
+ if sendSize+len(sql) > (9*1024 - 1024) {
+ if len(sqlBuf) > 0 {
+ syncSql := strings.Join(sqlBuf, "")
+ //fmt.Println("鍚屾sql璇彞:", syncSql)
+
+ ss.pubSyncSqlMessage([]byte(syncSql), "")
+
+ sqlBuf = append([]string{})
+ }
+
+ s := strings.TrimRight(sql, ";")
+ sqlBuf = append(sqlBuf, s+";")
+ sendSize = len(sql)
+ } else {
+ s := strings.TrimRight(sql, ";")
+ sqlBuf = append(sqlBuf, s+";")
+
+ sendSize = sendSize + len(sql)
+ }
+ }
+ }
+}
+
+func (ss *SyncServer) handleClusterMessage(msg []byte) {
+ //fmt.Println("clusterMessage:", string(msg))
+ sql := string(msg)
+
+ if len(sql) <= 0 {
+ return
+ }
+
+ db := ss.sqlDB
+ if db != nil {
+ db.LogMode(false)
+ defer db.LogMode(true)
+
+ var err error
+ tx := db.Begin()
+ defer func() {
+ if err != nil && tx != nil {
+ tx.Rollback()
+ }
+ }()
+ result := tx.Exec(sql)
+ err = result.Error
+ if err != nil {
+ fmt.Println("ExecuteSqlByGorm err:", err, ",sql:", sql)
+ }
+ if result.RowsAffected == 0 {
+ fmt.Println("ExecuteSqlByGorm RowsAffected == 0", ",sql:", sql)
+ }
+ tx.Commit()
+ }
+}
+
+func (ss *SyncServer) handleSyncTableMessage(msg []byte) error {
+ targetId := string(msg)
+ fmt.Println("鍚屾鍏ㄩ噺鏁版嵁缁欒妭鐐�:", targetId)
+ sqls, err := DumpTables(ss.sqlDB, ss.syncTables)
+ if err != nil {
+ fmt.Println("DumpTables error, ", err.Error())
+ return err
+ }
+
+ syncSql := strings.Join(sqls, ";")
+ err = ss.pubSyncSqlMessage([]byte(syncSql), targetId)
+
+ return err
+}
--
Gitblit v1.8.0