基于serf的数据库同步模块库
zhangzengfei
2023-05-15 12f52d5835388f22fdecb2a890d58e5425688c8e
config.go
@@ -19,34 +19,47 @@
import (
   "fmt"
   "net"
   "os"
   "strconv"
   "strings"
   //"github.com/apache/servicecomb-service-center/syncer/pkg/utils"
   "github.com/hashicorp/memberlist"
   "github.com/hashicorp/serf/cmd/serf/command/agent"
   "github.com/hashicorp/serf/serf"
   "basic.com/valib/serf.git/cmd/serf/command/agent"
   "basic.com/valib/serf.git/serf"
)
const (
   DefaultBindPort    = 5000//30190
   DefaultRPCPort     = 7373//30191
   DefaultBindPort    = 30190
   DefaultRPCPort     = 30191
   DefaultClusterPort = 30192
   ModeSingle         = "single"
   ModeCluster        = "cluster"
   retryMaxAttempts   = 3
   groupExpect        = 3
   DefaultEncryptKey   = "bjbasic@aiotlink"
   tagKeyClusterID  = "syncer-cluster-name"
   DefaultEncryptKey  = "bjbasic@aiotlink"
   tagKeyClusterID    = "syncer-cluster-name"
   TagKeyClusterPort  = "syncer-cluster-port"
   TagKeyRPCPort      = "syncer-rpc-port"
   BroadcastIP        = "255.255.255.255"
   BroadcastPort      = 30193
   BroadcastInterval  = 5
   MaxQueryRespSize   = 50 * 1024 * 1024
   MaxQuerySize       = 50 * 1024 * 1024
   MaxUserEventSize   = 9 * 1024
   ReplayOnJoinDefault = false
   SnapshotPathDefault = "./serfSnapShot"
   MaxEventBufferCount = 2048
   TcpTransportPort = 30194 //tcp传输大数据量接口
)
// DefaultConfig default config
func DefaultConfig() *Config {
   agentConf := agent.DefaultConfig()
   agentConf.QueryResponseSizeLimit = 50 * 1024 *1024
   agentConf.QuerySizeLimit = 50 * 1024 *1024
   agentConf.UserEventSizeLimit = 1024
   agentConf.QueryResponseSizeLimit = MaxQueryRespSize
   agentConf.QuerySizeLimit = MaxQuerySize
   agentConf.UserEventSizeLimit = MaxUserEventSize
   agentConf.BindAddr = fmt.Sprintf("0.0.0.0:%d", DefaultBindPort)
   agentConf.RPCAddr = fmt.Sprintf("0.0.0.0:%d", DefaultRPCPort)
   return &Config{
@@ -78,8 +91,30 @@
   return nil
}
func isFileRightful(filePath string) bool {
   if filePath != "" {
      _, err := os.Stat(filePath)
      if err != nil && os.IsNotExist(err) {
         pos := strings.LastIndex(filePath, "/")
         if pos != -1 {
            filePath = filePath[0:pos]
         }
         _, err = os.Stat(filePath)
         if err == nil || !os.IsNotExist(err) {
            return true
         } else {
            return false
         }
      } else {
         return false
      }
   }
   return false
}
// convertToSerf convert Config to serf.Config
func (c *Config) convertToSerf() (*serf.Config, error) {
func (c *Config) convertToSerf(snapshotPath string) (*serf.Config, error) {
   serfConf := serf.DefaultConfig()
   bindIP, bindPort, err := SplitHostPort(c.BindAddr, DefaultBindPort)
@@ -102,7 +137,10 @@
   serfConf.MemberlistConfig.BindPort = bindPort
   serfConf.NodeName = c.NodeName
   serfConf.Tags = map[string]string{TagKeyRPCPort: strconv.Itoa(c.RPCPort)}
   serfConf.Tags = map[string]string{
      TagKeyRPCPort: strconv.Itoa(c.RPCPort),
      "role": "slave",
   }
   if c.ClusterID != "" {
      serfConf.Tags[tagKeyClusterID] = c.ClusterID
@@ -112,6 +150,20 @@
   if c.Mode == ModeCluster && c.RetryMaxAttempts <= 0 {
      c.RetryMaxAttempts = retryMaxAttempts
   }
   c.SnapshotPath = SnapshotPathDefault
   if isFileRightful(snapshotPath) {
      c.SnapshotPath = snapshotPath
   }
   c.ReplayOnJoin = ReplayOnJoinDefault
   serfConf.QueryResponseSizeLimit = c.QueryResponseSizeLimit
   serfConf.QuerySizeLimit = c.QuerySizeLimit
   serfConf.UserEventSizeLimit = c.UserEventSizeLimit
   serfConf.SnapshotPath = c.SnapshotPath
   serfConf.EventBuffer = MaxEventBufferCount
   return serfConf, nil
}
@@ -133,4 +185,3 @@
   return addr.IP.String(), addr.Port, nil
}