From 0bfb4e53db6d0cad8fe7a59945e86ac3adc7744e Mon Sep 17 00:00:00 2001
From: zhangzengfei <zhangzengfei@smartai.com>
Date: 星期日, 08 十月 2023 14:12:40 +0800
Subject: [PATCH] 添加数据同步服务

---
 serf/sync.go   |  370 +++++++++++++++++++++++++++++++++++++
 main.go        |   42 ++++
 serf/sqlite.go |  137 +++++++++++++
 serf/config.go |   38 +++
 4 files changed, 586 insertions(+), 1 deletions(-)

diff --git a/main.go b/main.go
index f9585bf..00d2599 100644
--- a/main.go
+++ b/main.go
@@ -6,7 +6,9 @@
 	"apsClient/model"
 	"apsClient/nsq"
 	"apsClient/pkg/logx"
+	"apsClient/pkg/sqlitex"
 	"apsClient/router"
+	"apsClient/serf"
 	"apsClient/service/plc_address"
 	"fmt"
 	"net/http"
@@ -41,7 +43,27 @@
 	////鎻愬墠鍔犺浇浠诲姟
 	//service.NewTaskService().GetTask()
 
-	go shutdown()
+	//go shutdown()
+
+	// 鍚姩鏁版嵁鍚屾
+	var serfStartChan = make(chan bool)
+	// 闇�瑕佸悓姝ョ殑琛�
+	var syncTables = []string{
+		"procedures",
+		"process_model",
+		"production_progress",
+		"work_order",
+	}
+
+	agent := serf.InitAgent("apsClient", syncTables, sqlitex.GetDB())
+	agent.RegisterClusterEvent(serfClusterEvent)
+	go agent.Serve(serfStartChan)
+
+	<-serfStartChan
+
+	// 鍒ゆ柇褰撳墠闆嗙兢鐘舵��
+	//agent.ClusterStatus == "master"
+
 	logx.Infof("apsClient start serve...")
 	server := &http.Server{
 		Addr:         fmt.Sprintf(":%d", conf.Conf.System.Port),
@@ -60,3 +82,21 @@
 	logx.Infof("apsClient exited...")
 	os.Exit(0)
 }
+
+func serfClusterEvent(stat int) {
+	switch stat {
+
+	case serf.EventCreateCluster:
+		// 鍒涘缓闆嗙兢
+	case serf.EventJoinCluster:
+		// 鍔犲叆闆嗙兢
+	case serf.EventLeaveCluster:
+		// 閫�鍑洪泦缇�
+	case serf.EventSlave2Master:
+		// 鍒囨崲涓轰富鑺傜偣
+	case serf.EventMaster2Slave:
+		// 鍒囨崲涓哄瓙鑺傜偣
+	}
+
+	fmt.Println("clusterEvent:", stat)
+}
\ No newline at end of file
diff --git a/serf/config.go b/serf/config.go
new file mode 100644
index 0000000..70dfa1b
--- /dev/null
+++ b/serf/config.go
@@ -0,0 +1,38 @@
+package serf
+
+import (
+	"fmt"
+	"github.com/spf13/viper"
+)
+
+type vasystem struct {
+	ServerName string `mapstructure:"serverName"`
+	ServerID   string `mapstructure:"analyServerId"`
+}
+
+var Vasystem = &vasystem{}
+
+// Init is an exported method that takes the environment starts the viper
+// (external lib) and returns the configuration struct.
+func init() {
+	var err error
+	v := viper.New()
+	v.SetConfigType("yaml")
+	v.SetConfigName("pro")
+	v.AddConfigPath("")
+	v.AddConfigPath("../config/")
+	v.AddConfigPath("./config/")
+	v.AddConfigPath("/opt/vasystem/config/")
+
+	err = v.ReadInConfig()
+	if err != nil {
+		fmt.Println("error on parsing configuration file", err)
+	}
+
+	read2Conf(v)
+}
+
+func read2Conf(v *viper.Viper) {
+	v.UnmarshalKey("server", Vasystem)
+	fmt.Println("ServerID:", Vasystem.ServerID)
+}
diff --git a/serf/sqlite.go b/serf/sqlite.go
new file mode 100644
index 0000000..79021d9
--- /dev/null
+++ b/serf/sqlite.go
@@ -0,0 +1,137 @@
+package serf
+
+import (
+	"errors"
+	"fmt"
+	"regexp"
+	"strings"
+
+	"github.com/jinzhu/gorm"
+)
+
+type DumpSql struct {
+	Sql string `json:"sql"`
+}
+
+type TableDesc struct {
+	Cid       int         `json:"cid"`
+	Name      string      `json:"name"`
+	Type      string      `json:"type"`
+	Notnull   bool        `json:"notnull"`
+	DFltValue interface{} `json:"dflt_value"`
+	Pk        int         `json:"pk"`
+}
+
+var syncSqlChan = make(chan string, 10)
+
+func DumpTables(db *gorm.DB, tableNames []string) ([]string, error) {
+	db.LogMode(false)
+	defer db.LogMode(true)
+
+	if tableNames != nil {
+		var arr []string
+		var dumpSql []DumpSql
+
+		for _, table := range tableNames {
+			fmt.Println("dump current tableName:", table)
+
+			dumpSql = make([]DumpSql, 0)
+			var tDescArr []TableDesc
+
+			tSql := fmt.Sprintf(`PRAGMA table_info("%s")`, table)
+			err := db.Raw(tSql).Scan(&tDescArr).Error
+
+			fmt.Println("tDescArr err:", err, "len(tDescArr)=", len(tDescArr))
+			if err != nil {
+				return nil, errors.New("tableDesc err")
+			}
+
+			fmt.Println(table, "'Columns is:", tDescArr)
+			if tDescArr == nil || len(tDescArr) == 0 {
+				return nil, errors.New(table + " has no column")
+			}
+
+			var columnNames []string
+			for _, col := range tDescArr {
+				columnNames = append(columnNames, fmt.Sprintf(`'||quote("%s")||'`, col.Name))
+			}
+
+			tSql = fmt.Sprintf(`SELECT 'INSERT INTO "%s" VALUES(%s)' as sql FROM "%s";`,
+				table,
+				strings.Join(columnNames, ","),
+				table)
+			//fmt.Println("tSql:", tSql)
+
+			err = db.Raw(tSql).Scan(&dumpSql).Error
+			if err != nil {
+				return nil, errors.New("dump err")
+			}
+
+			if len(dumpSql) > 0 {
+				for _, d := range dumpSql {
+					arr = append(arr, d.Sql)
+				}
+			}
+		}
+
+		return arr, nil
+	}
+
+	return nil, errors.New("tableNames is nil")
+}
+
+type DbLogger struct {
+}
+
+func (dbLogger *DbLogger) Print(values ...interface{}) {
+	var (
+		level = values[0]
+	)
+
+	fmt.Println("dblogger", values)
+
+	if level == "sql" {
+		msgArr := gorm.LogFormatter(values...)
+		sql := msgArr[3].(string)
+		sql = strings.TrimPrefix(sql, " ")
+		if !strings.HasPrefix(sql, "SELECT") && !strings.HasPrefix(sql, "select") && !strings.Contains(sql, "PRAGMA") && !strings.Contains(sql, "pragma") {
+			affected := values[5].(int64)
+			if affected > 0 { //鎵ц鎴愬姛
+				//鍒ゆ柇鎿嶄綔鐨勬槸鍝紶琛�
+				whereIdx := strings.Index(sql, "WHERE")
+				sqlWithTable := sql
+				if whereIdx > -1 {
+					sqlWithTable = sql[:whereIdx]
+				}
+
+				fmt.Println("鍒ゆ柇鏄摢寮犺〃 sqlWithTable:", sqlWithTable)
+
+				insertReg := regexp.MustCompile(`^\s*(?i:insert)\s`) //insert
+				updateReg := regexp.MustCompile(`^\s*(?i:update)\s`) //update
+				delReg := regexp.MustCompile(`^\s*(?i:delete)\s`)    //delete
+
+				if insertReg.MatchString(sqlWithTable) {
+					fmt.Println("鎻掑叆鎿嶄綔")
+					for _, t := range agent.syncTables {
+						reg := regexp.MustCompile(`\s+\"?(?i:` + t + `)\"?\s+`)
+						if reg.MatchString(sqlWithTable) {
+							fmt.Println("灞炰簬鍚屾琛�:", t)
+							syncSqlChan <- sql
+						}
+					}
+				} else if updateReg.MatchString(sqlWithTable) || delReg.MatchString(sqlWithTable) {
+					fmt.Println("鍒犻櫎鎴栬�呮洿鏂�")
+					for _, t := range agent.syncTables {
+						reg := regexp.MustCompile(`\s+\"?(?i:` + t + `)\"?\s+`)
+						if reg.MatchString(sqlWithTable) {
+							fmt.Println("灞炰簬鍚屾琛�:", t)
+							syncSqlChan <- sql
+						}
+					}
+				}
+			}
+		}
+	} else {
+		fmt.Println("dbLogger level!=sql")
+	}
+}
diff --git a/serf/sync.go b/serf/sync.go
new file mode 100644
index 0000000..b49eebf
--- /dev/null
+++ b/serf/sync.go
@@ -0,0 +1,370 @@
+package serf
+
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+	"os"
+	"os/signal"
+	"strings"
+	"syscall"
+	"time"
+
+	"basic.com/pubsub/protomsg.git"
+	"basic.com/valib/bhomeclient.git"
+	"basic.com/valib/bhomedbapi.git"
+
+	"github.com/gogo/protobuf/proto"
+	"github.com/jinzhu/gorm"
+)
+
+var (
+	agent = SyncServer{}
+)
+
+const (
+	serfSyncTopic = "sync-proc-message-to-serf"
+
+	EventCreateCluster = 0
+	EventJoinCluster   = 1
+	EventLeaveCluster  = 2
+	EventMaster2Slave  = 3
+	EventSlave2Master  = 4
+)
+
+type ProcMessageEvent struct {
+	Owner   string `json:"owner"`    // 鍙戦�佽��
+	Target  string `json:"target"`   // 鎸囧畾鎺ユ敹鑰�
+	Proc    string `json:"procName"` // 杩涚▼鍚�
+	Topic   string `json:"topic"`    // 涓婚
+	Payload []byte `json:"payload"`  // 娑堟伅浣�,鑷瑙f瀽
+}
+
+type SyncServer struct {
+	ProcName        string   // 杩涚▼鍚嶇О
+	ServerId        string   // 鏈満id
+	ClusterStatus   string   // 闆嗙兢鐘舵�� master/slave 涓虹┖琛ㄧず鏈姞鍏ラ泦缇�
+	syncSqlTopic    string   // 鍚屾sql娑堟伅鐨勪富棰�
+	queryTableTopic string   // 鍔犲叆闆嗙兢鍚庤姹傞泦缇ゆ暟鎹殑涓婚
+	syncTables      []string // 闇�瑕佸悓姝ョ殑琛�
+	sqlDB           *gorm.DB // 鏁版嵁搴�
+	bhClient        *bhomeclient.MicroNode
+	clusterEventFn  func(int)
+}
+
+func InitAgent(procName string, syncTables []string, db *gorm.DB) *SyncServer {
+	agent.ProcName = procName
+	agent.ServerId = Vasystem.ServerID
+	agent.sqlDB = db
+	agent.syncTables = syncTables
+	agent.syncSqlTopic = procName + "/serf/sync/sql"
+	agent.queryTableTopic = procName + "/serf/query/sqls"
+
+	// 璁剧疆鏃ュ織鍥炶皟
+	db.SetLogger(&DbLogger{})
+	// 鍏堝叧闂棩蹇�
+	db.LogMode(false)
+
+	return &agent
+}
+
+func (ss *SyncServer) RegisterClusterEvent(fn func(int)) {
+	ss.clusterEventFn = fn
+}
+
+func (ss *SyncServer) Serve(initChan chan bool) {
+	proc := &bhomeclient.ProcInfo{
+		Name: ss.ProcName, //杩涚▼鍚嶇О
+		ID:   ss.ProcName, //杩涚▼id
+		Info: "",          //杩涚▼鐨勬弿杩颁俊鎭紝鐢ㄤ簬鍖哄垎鍚屼竴杩涚▼鍚嶇О涓嬪涓繘绋�
+	}
+
+	ctx, cancel := context.WithCancel(context.Background())
+	var reg = &bhomeclient.RegisterInfo{
+		Proc:        *proc,
+		Channel:     nil,
+		PubTopic:    []string{},
+		SubTopic:    []string{bhomeclient.Proc_System_Service, ss.syncSqlTopic, ss.queryTableTopic},
+		SubNetTopic: []string{},
+	}
+
+	q := make(chan os.Signal, 1)
+	signal.Notify(q, os.Interrupt, os.Kill, syscall.SIGTERM)
+
+	client, err := bhomeclient.NewMicroNode(ctx, q, ss.ServerId, reg, nil)
+	if err != nil {
+		initChan <- false
+		return
+	}
+
+	bhomedbapi.InitGetNetNode(client.GetLocalNetNodeByTopic)
+	bhomedbapi.InitDoReq(client.RequestOnly)
+	//bhomedbapi.InitLog(logger.Debug)
+
+	go client.StartServer(nil)
+
+	ss.bhClient = client
+
+	go ss.subBusMessage(ctx)
+
+	go ss.handleDbLoggerPrint()
+
+	// 鍚姩鍚庢煡璇竴娆¢泦缇ょ姸鎬�
+	ss.QueryClusterStat()
+
+	if ss.ClusterStatus != "" {
+		ss.sqlDB.LogMode(true)
+	}
+
+	initChan <- true
+	<-q
+
+	client.DeRegister()
+	cancel()
+	client.Free()
+
+	os.Exit(0)
+}
+
+func (ss *SyncServer) pubSyncSqlMessage(payload []byte, targetId string) error {
+	var msg = ProcMessageEvent{
+		Owner:   ss.ServerId,
+		Target:  targetId,
+		Proc:    ss.ProcName,
+		Topic:   ss.syncSqlTopic,
+		Payload: payload,
+	}
+
+	b, err := json.Marshal(msg)
+	if err != nil {
+		return err
+	}
+
+	return ss.bhClient.Publish(serfSyncTopic, b)
+}
+
+// 璇锋眰鍚屾琛ㄧ殑鍏ㄩ噺鏁版嵁, 鍙戦�佽嚜宸辩殑id
+func (ss *SyncServer) pubSyncTableMessage() error {
+	var msg = ProcMessageEvent{
+		Owner:   ss.ServerId,
+		Proc:    ss.ProcName,
+		Topic:   ss.queryTableTopic,
+		Payload: []byte(ss.ServerId),
+	}
+
+	b, err := json.Marshal(msg)
+	if err != nil {
+		return err
+	}
+
+	fmt.Println("鍔犲叆闆嗙兢, 璇锋眰鍚屾鍏ㄩ噺鏁版嵁,id:", ss.ServerId)
+	return ss.bhClient.Publish(serfSyncTopic, b)
+}
+
+func (ss *SyncServer) subBusMessage(ctx context.Context) {
+	//fmt.Println("sub bus msg")
+
+	for {
+		select {
+		case <-ctx.Done():
+			fmt.Println("sub bus msg exit")
+			return
+		case busMsg := <-ss.bhClient.SubCh:
+			if string(busMsg.Topic) == ss.syncSqlTopic {
+				ss.handleClusterMessage(busMsg.Data)
+			}
+
+			// 澶勭悊鍚屾鍏ㄩ噺鏁版嵁鐨勮姹�
+			if string(busMsg.Topic) == ss.queryTableTopic {
+				if ss.ClusterStatus == "master" {
+					fmt.Println("鎺ユ敹鍒板悓姝ュ叏閲忔暟鎹姹�")
+					ss.handleSyncTableMessage(busMsg.Data)
+				}
+			}
+
+			// system-service鍙戦�佺殑娑堟伅
+			if string(busMsg.Topic) == bhomeclient.Proc_System_Service {
+				var clusterMsg = &protomsg.DbChangeMessage{}
+
+				if err := proto.Unmarshal(busMsg.Data, clusterMsg); err != nil {
+					if err = json.Unmarshal(busMsg.Data, clusterMsg); err != nil {
+						fmt.Println("proto.Unmarshal ", err.Error())
+						continue
+					}
+				}
+
+				if clusterMsg.Table == protomsg.TableChanged_T_Cluster {
+					switch clusterMsg.Info {
+					case "create":
+						// 鍒涘缓闆嗙兢, 寮�鍚棩蹇楄窡韪�, 璁剧疆瑙掕壊master
+						ss.clusterEventFn(EventCreateCluster)
+						ss.ClusterStatus = "master"
+						ss.sqlDB.LogMode(true)
+
+					case "join":
+						// 鍔犲叆闆嗙兢, 寮�鍚棩蹇楄窡韪�, 璁剧疆瑙掕壊slave
+						ss.clusterEventFn(EventJoinCluster)
+						ss.onJoinCluster()
+						ss.ClusterStatus = "slave"
+						ss.sqlDB.LogMode(true)
+
+					case "leave":
+						// 閫�鍑洪泦缇�, 寮�鍚棩蹇楄窡韪�, 璁剧疆瑙掕壊slave
+						ss.clusterEventFn(EventLeaveCluster)
+						ss.ClusterStatus = ""
+						ss.sqlDB.LogMode(false)
+					}
+				}
+			}
+		}
+	}
+}
+
+// 鍔犲叆闆嗙兢, 娓呯┖鏈湴琛�, 鍚屾闆嗙兢鍐呮暟鎹�
+func (ss *SyncServer) onJoinCluster() {
+	var err error
+
+	db := ss.sqlDB
+
+	tx := db.Begin()
+	defer func() {
+		if err != nil && tx != nil {
+			tx.Rollback()
+		}
+	}()
+
+	tx.Exec("PRAGMA foreign_keys=OFF")
+	//1.鍒犻櫎鏈湴鐨勫悓姝ュ簱鏁版嵁
+	for _, t := range ss.syncTables {
+		delSql := "delete from " + t + ""
+
+		err = tx.Exec(delSql).Error
+		if err != nil {
+			fmt.Println("鍒犻櫎鏈湴鐨勫悓姝ュ簱鏁版嵁澶辫触,", err.Error())
+		}
+	}
+
+	//4.寮�鍚痳eference
+	tx.Exec("PRAGMA foreign_keys=ON")
+	tx.Commit()
+
+	// 鎷夊彇闆嗙兢鍐呯殑鍚屾搴撴暟鎹埌鏈湴鏁版嵁搴撹〃涓�
+	ss.pubSyncTableMessage()
+}
+
+func (ss *SyncServer) onLeaveCluster() {
+
+}
+
+func (ss *SyncServer) onCreateCluster() {
+
+}
+
+// 鏌ヨ闆嗙兢鐘舵��, 杩斿洖 master, slave, leave
+func (ss *SyncServer) QueryClusterStat() string {
+	clusterStatTopic := "/data/api-v/cluster/status"
+	req := bhomeclient.Request{
+		Path:   clusterStatTopic,
+		Method: "POST",
+	}
+
+	reply, err := ss.bhClient.RequestTopic(ss.ServerId, req, 3000)
+	if err != nil {
+		fmt.Println("RequestTopic error", err.Error())
+
+		return ""
+	}
+
+	ss.ClusterStatus = reply.Msg
+
+	fmt.Println("褰撳墠闆嗙兢鐘舵��:", ss.ClusterStatus)
+
+	return reply.Msg
+}
+
+func (ss *SyncServer) handleDbLoggerPrint() {
+	sqlBuf := make([]string, 0)
+	ticker := time.NewTicker(3 * time.Second)
+	sendSize := 0 //serf MaxUserEventSize is 9*1024
+	for {
+		select {
+		case <-ticker.C:
+			if len(sqlBuf) > 0 {
+				syncSql := strings.Join(sqlBuf, "")
+
+				//fmt.Println("鍚屾sql璇彞:", syncSql)
+				ss.pubSyncSqlMessage([]byte(syncSql), "")
+
+				sqlBuf = append([]string{})
+				sendSize = 0
+			}
+		case sql := <-syncSqlChan:
+			if sendSize+len(sql) > (9*1024 - 1024) {
+				if len(sqlBuf) > 0 {
+					syncSql := strings.Join(sqlBuf, "")
+					//fmt.Println("鍚屾sql璇彞:", syncSql)
+
+					ss.pubSyncSqlMessage([]byte(syncSql), "")
+
+					sqlBuf = append([]string{})
+				}
+
+				s := strings.TrimRight(sql, ";")
+				sqlBuf = append(sqlBuf, s+";")
+				sendSize = len(sql)
+			} else {
+				s := strings.TrimRight(sql, ";")
+				sqlBuf = append(sqlBuf, s+";")
+
+				sendSize = sendSize + len(sql)
+			}
+		}
+	}
+}
+
+func (ss *SyncServer) handleClusterMessage(msg []byte) {
+	//fmt.Println("clusterMessage:", string(msg))
+	sql := string(msg)
+
+	if len(sql) <= 0 {
+		return
+	}
+
+	db := ss.sqlDB
+	if db != nil {
+		db.LogMode(false)
+		defer db.LogMode(true)
+
+		var err error
+		tx := db.Begin()
+		defer func() {
+			if err != nil && tx != nil {
+				tx.Rollback()
+			}
+		}()
+		result := tx.Exec(sql)
+		err = result.Error
+		if err != nil {
+			fmt.Println("ExecuteSqlByGorm err:", err, ",sql:", sql)
+		}
+		if result.RowsAffected == 0 {
+			fmt.Println("ExecuteSqlByGorm RowsAffected == 0", ",sql:", sql)
+		}
+		tx.Commit()
+	}
+}
+
+func (ss *SyncServer) handleSyncTableMessage(msg []byte) error {
+	targetId := string(msg)
+	fmt.Println("鍚屾鍏ㄩ噺鏁版嵁缁欒妭鐐�:", targetId)
+	sqls, err := DumpTables(ss.sqlDB, ss.syncTables)
+	if err != nil {
+		fmt.Println("DumpTables error, ", err.Error())
+		return err
+	}
+
+	syncSql := strings.Join(sqls, ";")
+	err = ss.pubSyncSqlMessage([]byte(syncSql), targetId)
+
+	return err
+}

--
Gitblit v1.8.0