zhangzengfei
2023-10-08 0bfb4e53db6d0cad8fe7a59945e86ac3adc7744e
添加数据同步服务
3个文件已添加
1个文件已修改
587 ■■■■■ 已修改文件
main.go 42 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
serf/config.go 38 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
serf/sqlite.go 137 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
serf/sync.go 370 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.go
@@ -6,7 +6,9 @@
    "apsClient/model"
    "apsClient/nsq"
    "apsClient/pkg/logx"
    "apsClient/pkg/sqlitex"
    "apsClient/router"
    "apsClient/serf"
    "apsClient/service/plc_address"
    "fmt"
    "net/http"
@@ -41,7 +43,27 @@
    ////提前加载任务
    //service.NewTaskService().GetTask()
    go shutdown()
    //go shutdown()
    // 启动数据同步
    var serfStartChan = make(chan bool)
    // 需要同步的表
    var syncTables = []string{
        "procedures",
        "process_model",
        "production_progress",
        "work_order",
    }
    agent := serf.InitAgent("apsClient", syncTables, sqlitex.GetDB())
    agent.RegisterClusterEvent(serfClusterEvent)
    go agent.Serve(serfStartChan)
    <-serfStartChan
    // 判断当前集群状态
    //agent.ClusterStatus == "master"
    logx.Infof("apsClient start serve...")
    server := &http.Server{
        Addr:         fmt.Sprintf(":%d", conf.Conf.System.Port),
@@ -60,3 +82,21 @@
    logx.Infof("apsClient exited...")
    os.Exit(0)
}
func serfClusterEvent(stat int) {
    switch stat {
    case serf.EventCreateCluster:
        // 创建集群
    case serf.EventJoinCluster:
        // 加入集群
    case serf.EventLeaveCluster:
        // 退出集群
    case serf.EventSlave2Master:
        // 切换为主节点
    case serf.EventMaster2Slave:
        // 切换为子节点
    }
    fmt.Println("clusterEvent:", stat)
}
serf/config.go
New file
@@ -0,0 +1,38 @@
package serf
import (
    "fmt"
    "github.com/spf13/viper"
)
type vasystem struct {
    ServerName string `mapstructure:"serverName"`
    ServerID   string `mapstructure:"analyServerId"`
}
var Vasystem = &vasystem{}
// Init is an exported method that takes the environment starts the viper
// (external lib) and returns the configuration struct.
func init() {
    var err error
    v := viper.New()
    v.SetConfigType("yaml")
    v.SetConfigName("pro")
    v.AddConfigPath("")
    v.AddConfigPath("../config/")
    v.AddConfigPath("./config/")
    v.AddConfigPath("/opt/vasystem/config/")
    err = v.ReadInConfig()
    if err != nil {
        fmt.Println("error on parsing configuration file", err)
    }
    read2Conf(v)
}
func read2Conf(v *viper.Viper) {
    v.UnmarshalKey("server", Vasystem)
    fmt.Println("ServerID:", Vasystem.ServerID)
}
serf/sqlite.go
New file
@@ -0,0 +1,137 @@
package serf
import (
    "errors"
    "fmt"
    "regexp"
    "strings"
    "github.com/jinzhu/gorm"
)
type DumpSql struct {
    Sql string `json:"sql"`
}
type TableDesc struct {
    Cid       int         `json:"cid"`
    Name      string      `json:"name"`
    Type      string      `json:"type"`
    Notnull   bool        `json:"notnull"`
    DFltValue interface{} `json:"dflt_value"`
    Pk        int         `json:"pk"`
}
var syncSqlChan = make(chan string, 10)
func DumpTables(db *gorm.DB, tableNames []string) ([]string, error) {
    db.LogMode(false)
    defer db.LogMode(true)
    if tableNames != nil {
        var arr []string
        var dumpSql []DumpSql
        for _, table := range tableNames {
            fmt.Println("dump current tableName:", table)
            dumpSql = make([]DumpSql, 0)
            var tDescArr []TableDesc
            tSql := fmt.Sprintf(`PRAGMA table_info("%s")`, table)
            err := db.Raw(tSql).Scan(&tDescArr).Error
            fmt.Println("tDescArr err:", err, "len(tDescArr)=", len(tDescArr))
            if err != nil {
                return nil, errors.New("tableDesc err")
            }
            fmt.Println(table, "'Columns is:", tDescArr)
            if tDescArr == nil || len(tDescArr) == 0 {
                return nil, errors.New(table + " has no column")
            }
            var columnNames []string
            for _, col := range tDescArr {
                columnNames = append(columnNames, fmt.Sprintf(`'||quote("%s")||'`, col.Name))
            }
            tSql = fmt.Sprintf(`SELECT 'INSERT INTO "%s" VALUES(%s)' as sql FROM "%s";`,
                table,
                strings.Join(columnNames, ","),
                table)
            //fmt.Println("tSql:", tSql)
            err = db.Raw(tSql).Scan(&dumpSql).Error
            if err != nil {
                return nil, errors.New("dump err")
            }
            if len(dumpSql) > 0 {
                for _, d := range dumpSql {
                    arr = append(arr, d.Sql)
                }
            }
        }
        return arr, nil
    }
    return nil, errors.New("tableNames is nil")
}
type DbLogger struct {
}
func (dbLogger *DbLogger) Print(values ...interface{}) {
    var (
        level = values[0]
    )
    fmt.Println("dblogger", values)
    if level == "sql" {
        msgArr := gorm.LogFormatter(values...)
        sql := msgArr[3].(string)
        sql = strings.TrimPrefix(sql, " ")
        if !strings.HasPrefix(sql, "SELECT") && !strings.HasPrefix(sql, "select") && !strings.Contains(sql, "PRAGMA") && !strings.Contains(sql, "pragma") {
            affected := values[5].(int64)
            if affected > 0 { //执行成功
                //判断操作的是哪张表
                whereIdx := strings.Index(sql, "WHERE")
                sqlWithTable := sql
                if whereIdx > -1 {
                    sqlWithTable = sql[:whereIdx]
                }
                fmt.Println("判断是哪张表 sqlWithTable:", sqlWithTable)
                insertReg := regexp.MustCompile(`^\s*(?i:insert)\s`) //insert
                updateReg := regexp.MustCompile(`^\s*(?i:update)\s`) //update
                delReg := regexp.MustCompile(`^\s*(?i:delete)\s`)    //delete
                if insertReg.MatchString(sqlWithTable) {
                    fmt.Println("插入操作")
                    for _, t := range agent.syncTables {
                        reg := regexp.MustCompile(`\s+\"?(?i:` + t + `)\"?\s+`)
                        if reg.MatchString(sqlWithTable) {
                            fmt.Println("属于同步表:", t)
                            syncSqlChan <- sql
                        }
                    }
                } else if updateReg.MatchString(sqlWithTable) || delReg.MatchString(sqlWithTable) {
                    fmt.Println("删除或者更新")
                    for _, t := range agent.syncTables {
                        reg := regexp.MustCompile(`\s+\"?(?i:` + t + `)\"?\s+`)
                        if reg.MatchString(sqlWithTable) {
                            fmt.Println("属于同步表:", t)
                            syncSqlChan <- sql
                        }
                    }
                }
            }
        }
    } else {
        fmt.Println("dbLogger level!=sql")
    }
}
serf/sync.go
New file
@@ -0,0 +1,370 @@
package serf
import (
    "context"
    "encoding/json"
    "fmt"
    "os"
    "os/signal"
    "strings"
    "syscall"
    "time"
    "basic.com/pubsub/protomsg.git"
    "basic.com/valib/bhomeclient.git"
    "basic.com/valib/bhomedbapi.git"
    "github.com/gogo/protobuf/proto"
    "github.com/jinzhu/gorm"
)
var (
    agent = SyncServer{}
)
const (
    serfSyncTopic = "sync-proc-message-to-serf"
    EventCreateCluster = 0
    EventJoinCluster   = 1
    EventLeaveCluster  = 2
    EventMaster2Slave  = 3
    EventSlave2Master  = 4
)
type ProcMessageEvent struct {
    Owner   string `json:"owner"`    // 发送者
    Target  string `json:"target"`   // 指定接收者
    Proc    string `json:"procName"` // 进程名
    Topic   string `json:"topic"`    // 主题
    Payload []byte `json:"payload"`  // 消息体,自行解析
}
type SyncServer struct {
    ProcName        string   // 进程名称
    ServerId        string   // 本机id
    ClusterStatus   string   // 集群状态 master/slave 为空表示未加入集群
    syncSqlTopic    string   // 同步sql消息的主题
    queryTableTopic string   // 加入集群后请求集群数据的主题
    syncTables      []string // 需要同步的表
    sqlDB           *gorm.DB // 数据库
    bhClient        *bhomeclient.MicroNode
    clusterEventFn  func(int)
}
func InitAgent(procName string, syncTables []string, db *gorm.DB) *SyncServer {
    agent.ProcName = procName
    agent.ServerId = Vasystem.ServerID
    agent.sqlDB = db
    agent.syncTables = syncTables
    agent.syncSqlTopic = procName + "/serf/sync/sql"
    agent.queryTableTopic = procName + "/serf/query/sqls"
    // 设置日志回调
    db.SetLogger(&DbLogger{})
    // 先关闭日志
    db.LogMode(false)
    return &agent
}
func (ss *SyncServer) RegisterClusterEvent(fn func(int)) {
    ss.clusterEventFn = fn
}
func (ss *SyncServer) Serve(initChan chan bool) {
    proc := &bhomeclient.ProcInfo{
        Name: ss.ProcName, //进程名称
        ID:   ss.ProcName, //进程id
        Info: "",          //进程的描述信息,用于区分同一进程名称下多个进程
    }
    ctx, cancel := context.WithCancel(context.Background())
    var reg = &bhomeclient.RegisterInfo{
        Proc:        *proc,
        Channel:     nil,
        PubTopic:    []string{},
        SubTopic:    []string{bhomeclient.Proc_System_Service, ss.syncSqlTopic, ss.queryTableTopic},
        SubNetTopic: []string{},
    }
    q := make(chan os.Signal, 1)
    signal.Notify(q, os.Interrupt, os.Kill, syscall.SIGTERM)
    client, err := bhomeclient.NewMicroNode(ctx, q, ss.ServerId, reg, nil)
    if err != nil {
        initChan <- false
        return
    }
    bhomedbapi.InitGetNetNode(client.GetLocalNetNodeByTopic)
    bhomedbapi.InitDoReq(client.RequestOnly)
    //bhomedbapi.InitLog(logger.Debug)
    go client.StartServer(nil)
    ss.bhClient = client
    go ss.subBusMessage(ctx)
    go ss.handleDbLoggerPrint()
    // 启动后查询一次集群状态
    ss.QueryClusterStat()
    if ss.ClusterStatus != "" {
        ss.sqlDB.LogMode(true)
    }
    initChan <- true
    <-q
    client.DeRegister()
    cancel()
    client.Free()
    os.Exit(0)
}
func (ss *SyncServer) pubSyncSqlMessage(payload []byte, targetId string) error {
    var msg = ProcMessageEvent{
        Owner:   ss.ServerId,
        Target:  targetId,
        Proc:    ss.ProcName,
        Topic:   ss.syncSqlTopic,
        Payload: payload,
    }
    b, err := json.Marshal(msg)
    if err != nil {
        return err
    }
    return ss.bhClient.Publish(serfSyncTopic, b)
}
// 请求同步表的全量数据, 发送自己的id
func (ss *SyncServer) pubSyncTableMessage() error {
    var msg = ProcMessageEvent{
        Owner:   ss.ServerId,
        Proc:    ss.ProcName,
        Topic:   ss.queryTableTopic,
        Payload: []byte(ss.ServerId),
    }
    b, err := json.Marshal(msg)
    if err != nil {
        return err
    }
    fmt.Println("加入集群, 请求同步全量数据,id:", ss.ServerId)
    return ss.bhClient.Publish(serfSyncTopic, b)
}
func (ss *SyncServer) subBusMessage(ctx context.Context) {
    //fmt.Println("sub bus msg")
    for {
        select {
        case <-ctx.Done():
            fmt.Println("sub bus msg exit")
            return
        case busMsg := <-ss.bhClient.SubCh:
            if string(busMsg.Topic) == ss.syncSqlTopic {
                ss.handleClusterMessage(busMsg.Data)
            }
            // 处理同步全量数据的请求
            if string(busMsg.Topic) == ss.queryTableTopic {
                if ss.ClusterStatus == "master" {
                    fmt.Println("接收到同步全量数据请求")
                    ss.handleSyncTableMessage(busMsg.Data)
                }
            }
            // system-service发送的消息
            if string(busMsg.Topic) == bhomeclient.Proc_System_Service {
                var clusterMsg = &protomsg.DbChangeMessage{}
                if err := proto.Unmarshal(busMsg.Data, clusterMsg); err != nil {
                    if err = json.Unmarshal(busMsg.Data, clusterMsg); err != nil {
                        fmt.Println("proto.Unmarshal ", err.Error())
                        continue
                    }
                }
                if clusterMsg.Table == protomsg.TableChanged_T_Cluster {
                    switch clusterMsg.Info {
                    case "create":
                        // 创建集群, 开启日志跟踪, 设置角色master
                        ss.clusterEventFn(EventCreateCluster)
                        ss.ClusterStatus = "master"
                        ss.sqlDB.LogMode(true)
                    case "join":
                        // 加入集群, 开启日志跟踪, 设置角色slave
                        ss.clusterEventFn(EventJoinCluster)
                        ss.onJoinCluster()
                        ss.ClusterStatus = "slave"
                        ss.sqlDB.LogMode(true)
                    case "leave":
                        // 退出集群, 开启日志跟踪, 设置角色slave
                        ss.clusterEventFn(EventLeaveCluster)
                        ss.ClusterStatus = ""
                        ss.sqlDB.LogMode(false)
                    }
                }
            }
        }
    }
}
// 加入集群, 清空本地表, 同步集群内数据
func (ss *SyncServer) onJoinCluster() {
    var err error
    db := ss.sqlDB
    tx := db.Begin()
    defer func() {
        if err != nil && tx != nil {
            tx.Rollback()
        }
    }()
    tx.Exec("PRAGMA foreign_keys=OFF")
    //1.删除本地的同步库数据
    for _, t := range ss.syncTables {
        delSql := "delete from " + t + ""
        err = tx.Exec(delSql).Error
        if err != nil {
            fmt.Println("删除本地的同步库数据失败,", err.Error())
        }
    }
    //4.开启reference
    tx.Exec("PRAGMA foreign_keys=ON")
    tx.Commit()
    // 拉取集群内的同步库数据到本地数据库表中
    ss.pubSyncTableMessage()
}
func (ss *SyncServer) onLeaveCluster() {
}
func (ss *SyncServer) onCreateCluster() {
}
// 查询集群状态, 返回 master, slave, leave
func (ss *SyncServer) QueryClusterStat() string {
    clusterStatTopic := "/data/api-v/cluster/status"
    req := bhomeclient.Request{
        Path:   clusterStatTopic,
        Method: "POST",
    }
    reply, err := ss.bhClient.RequestTopic(ss.ServerId, req, 3000)
    if err != nil {
        fmt.Println("RequestTopic error", err.Error())
        return ""
    }
    ss.ClusterStatus = reply.Msg
    fmt.Println("当前集群状态:", ss.ClusterStatus)
    return reply.Msg
}
func (ss *SyncServer) handleDbLoggerPrint() {
    sqlBuf := make([]string, 0)
    ticker := time.NewTicker(3 * time.Second)
    sendSize := 0 //serf MaxUserEventSize is 9*1024
    for {
        select {
        case <-ticker.C:
            if len(sqlBuf) > 0 {
                syncSql := strings.Join(sqlBuf, "")
                //fmt.Println("同步sql语句:", syncSql)
                ss.pubSyncSqlMessage([]byte(syncSql), "")
                sqlBuf = append([]string{})
                sendSize = 0
            }
        case sql := <-syncSqlChan:
            if sendSize+len(sql) > (9*1024 - 1024) {
                if len(sqlBuf) > 0 {
                    syncSql := strings.Join(sqlBuf, "")
                    //fmt.Println("同步sql语句:", syncSql)
                    ss.pubSyncSqlMessage([]byte(syncSql), "")
                    sqlBuf = append([]string{})
                }
                s := strings.TrimRight(sql, ";")
                sqlBuf = append(sqlBuf, s+";")
                sendSize = len(sql)
            } else {
                s := strings.TrimRight(sql, ";")
                sqlBuf = append(sqlBuf, s+";")
                sendSize = sendSize + len(sql)
            }
        }
    }
}
func (ss *SyncServer) handleClusterMessage(msg []byte) {
    //fmt.Println("clusterMessage:", string(msg))
    sql := string(msg)
    if len(sql) <= 0 {
        return
    }
    db := ss.sqlDB
    if db != nil {
        db.LogMode(false)
        defer db.LogMode(true)
        var err error
        tx := db.Begin()
        defer func() {
            if err != nil && tx != nil {
                tx.Rollback()
            }
        }()
        result := tx.Exec(sql)
        err = result.Error
        if err != nil {
            fmt.Println("ExecuteSqlByGorm err:", err, ",sql:", sql)
        }
        if result.RowsAffected == 0 {
            fmt.Println("ExecuteSqlByGorm RowsAffected == 0", ",sql:", sql)
        }
        tx.Commit()
    }
}
func (ss *SyncServer) handleSyncTableMessage(msg []byte) error {
    targetId := string(msg)
    fmt.Println("同步全量数据给节点:", targetId)
    sqls, err := DumpTables(ss.sqlDB, ss.syncTables)
    if err != nil {
        fmt.Println("DumpTables error, ", err.Error())
        return err
    }
    syncSql := strings.Join(sqls, ";")
    err = ss.pubSyncSqlMessage([]byte(syncSql), targetId)
    return err
}