基于serf的数据库同步模块库
zhangzengfei
2023-05-15 2863a050be2530afc452e48aae8b4be9b3965ebd
config.go
@@ -17,14 +17,17 @@
package syncdb
import (
   "context"
   "fmt"
   "net"
   "os"
   "strconv"
   "strings"
   //"github.com/apache/servicecomb-service-center/syncer/pkg/utils"
   "github.com/hashicorp/memberlist"
   "github.com/hashicorp/serf/cmd/serf/command/agent"
   "github.com/hashicorp/serf/serf"
   "basic.com/valib/serf.git/cmd/serf/command/agent"
   "basic.com/valib/serf.git/serf"
)
const (
@@ -44,12 +47,10 @@
   BroadcastInterval  = 5
   MaxQueryRespSize   = 50 * 1024 * 1024
   MaxQuerySize       = 50 * 1024 * 1024
   MaxUserEventSize   = 9 * 1024
   MaxUserEventSize   = 9 * 1024 * 10
   ReplayOnJoinDefault = false
   SnapshotPathDefault = "/opt/vasystem/serfSnapShot"
   SnapshotPathDefault = "./serfSnapShot"
   MaxEventBufferCount = 2048
   TcpTransportPort = 30194 //tcp传输大数据量接口
)
// DefaultConfig default config
@@ -64,6 +65,32 @@
      Mode:        ModeSingle,
      Config:      agentConf,
      ClusterPort: DefaultClusterPort,
      Ctx:        context.Background(),
   }
}
func (c *Config) MergeConf(s *Config) {
   if s != nil {
      if s.Ctx != nil {
         c.Ctx = s.Ctx
      } else {
         c.Ctx = context.Background()
      }
      c.BindAddr = s.BindAddr
      c.RPCAddr = s.RPCAddr
      c.RPCPort = s.RPCPort
      //serf快照地址
      if s.SnapshotPath != "" {
         c.SnapshotPath = s.SnapshotPath
      }
      if s.EncryptKey != "" {
         //报文加密的key
         c.EncryptKey = s.EncryptKey
      }
      if s.RPCAuthKey != "" {
         //RPC认证的key
         c.RPCAuthKey = s.RPCAuthKey
      }
   }
}
@@ -71,14 +98,15 @@
type Config struct {
   // config from serf agent
   *agent.Config
   Mode string `json:"mode"`
   Mode       string       `json:"mode"`
   // name to group members into cluster
   ClusterID string `json:"cluster_name"`
   ClusterID    string       `json:"cluster_name"`
   // port to communicate between cluster members
   ClusterPort int `yaml:"cluster_port"`
   RPCPort     int `yaml:"-"`
   ClusterPort int       `yaml:"cluster_port"`
   RPCPort     int       `yaml:"-"`
   Ctx       context.Context
}
// readConfigFile reads configuration from config file
@@ -89,8 +117,30 @@
   return nil
}
func isFileRightful(filePath string) bool {
   if filePath != "" {
      _, err := os.Stat(filePath)
      if err != nil && os.IsNotExist(err) {
         pos := strings.LastIndex(filePath, "/")
         if pos != -1 {
            filePath = filePath[0:pos]
         }
         _, err = os.Stat(filePath)
         if err == nil || !os.IsNotExist(err) {
            return true
         } else {
            return false
         }
      } else {
         return false
      }
   }
   return false
}
// convertToSerf convert Config to serf.Config
func (c *Config) convertToSerf() (*serf.Config, error) {
func (c *Config) convertToSerf(snapshotPath string) (*serf.Config, error) {
   serfConf := serf.DefaultConfig()
   bindIP, bindPort, err := SplitHostPort(c.BindAddr, DefaultBindPort)
@@ -113,7 +163,7 @@
   serfConf.MemberlistConfig.BindPort = bindPort
   serfConf.NodeName = c.NodeName
   serfConf.Tags = map[string]string{TagKeyRPCPort: strconv.Itoa(c.RPCPort)}
   serfConf.Tags = map[string]string{TagKeyRPCPort: strconv.Itoa(c.RPCPort), "role": "slave"}
   if c.ClusterID != "" {
      serfConf.Tags[tagKeyClusterID] = c.ClusterID
@@ -123,7 +173,12 @@
   if c.Mode == ModeCluster && c.RetryMaxAttempts <= 0 {
      c.RetryMaxAttempts = retryMaxAttempts
   }
   c.SnapshotPath = SnapshotPathDefault
   if isFileRightful(snapshotPath) {
      c.SnapshotPath = snapshotPath
   }
   c.ReplayOnJoin = ReplayOnJoinDefault
   serfConf.QueryResponseSizeLimit = c.QueryResponseSizeLimit