| | |
| | | package syncdb |
| | | |
| | | import ( |
| | | "context" |
| | | "fmt" |
| | | "net" |
| | | "os" |
| | | "strconv" |
| | | "strings" |
| | | |
| | | //"github.com/apache/servicecomb-service-center/syncer/pkg/utils" |
| | | "github.com/hashicorp/memberlist" |
| | | "github.com/hashicorp/serf/cmd/serf/command/agent" |
| | | "github.com/hashicorp/serf/serf" |
| | | "basic.com/valib/serf.git/cmd/serf/command/agent" |
| | | "basic.com/valib/serf.git/serf" |
| | | ) |
| | | |
| | | const ( |
| | |
| | | ModeCluster = "cluster" |
| | | retryMaxAttempts = 3 |
| | | groupExpect = 3 |
| | | DefaultEncryptKey = "bjbasic@aiotlink" |
| | | tagKeyClusterID = "syncer-cluster-name" |
| | | DefaultEncryptKey = "bjbasic@aiotlink" |
| | | tagKeyClusterID = "syncer-cluster-name" |
| | | TagKeyClusterPort = "syncer-cluster-port" |
| | | TagKeyRPCPort = "syncer-rpc-port" |
| | | BroadcastIP = "255.255.255.255" |
| | | BroadcastPort = 30193 |
| | | BroadcastInterval = 5 |
| | | BroadcastIP = "255.255.255.255" |
| | | BroadcastPort = 30193 |
| | | BroadcastInterval = 5 |
| | | MaxQueryRespSize = 50 * 1024 * 1024 |
| | | MaxQuerySize = 50 * 1024 * 1024 |
| | | MaxUserEventSize = 9 * 1024 * 10 |
| | | ReplayOnJoinDefault = false |
| | | SnapshotPathDefault = "./serfSnapShot" |
| | | MaxEventBufferCount = 2048 |
| | | ) |
| | | |
| | | // DefaultConfig default config |
| | | func DefaultConfig() *Config { |
| | | agentConf := agent.DefaultConfig() |
| | | agentConf.QueryResponseSizeLimit = 50 * 1024 *1024 |
| | | agentConf.QuerySizeLimit = 50 * 1024 *1024 |
| | | agentConf.UserEventSizeLimit = 1024 |
| | | agentConf.QueryResponseSizeLimit = MaxQueryRespSize |
| | | agentConf.QuerySizeLimit = MaxQuerySize |
| | | agentConf.UserEventSizeLimit = MaxUserEventSize |
| | | agentConf.BindAddr = fmt.Sprintf("0.0.0.0:%d", DefaultBindPort) |
| | | agentConf.RPCAddr = fmt.Sprintf("0.0.0.0:%d", DefaultRPCPort) |
| | | return &Config{ |
| | | Mode: ModeSingle, |
| | | Config: agentConf, |
| | | ClusterPort: DefaultClusterPort, |
| | | Ctx: context.Background(), |
| | | } |
| | | } |
| | | |
| | | func (c *Config) MergeConf(s *Config) { |
| | | if s != nil { |
| | | if s.Ctx != nil { |
| | | c.Ctx = s.Ctx |
| | | } else { |
| | | c.Ctx = context.Background() |
| | | } |
| | | c.BindAddr = s.BindAddr |
| | | c.RPCAddr = s.RPCAddr |
| | | c.RPCPort = s.RPCPort |
| | | //serf快照地址 |
| | | if s.SnapshotPath != "" { |
| | | c.SnapshotPath = s.SnapshotPath |
| | | } |
| | | if s.EncryptKey != "" { |
| | | //报文加密的key |
| | | c.EncryptKey = s.EncryptKey |
| | | } |
| | | if s.RPCAuthKey != "" { |
| | | //RPC认证的key |
| | | c.RPCAuthKey = s.RPCAuthKey |
| | | } |
| | | } |
| | | } |
| | | |
| | |
| | | type Config struct { |
| | | // config from serf agent |
| | | *agent.Config |
| | | Mode string `json:"mode"` |
| | | Mode string `json:"mode"` |
| | | |
| | | // name to group members into cluster |
| | | ClusterID string `json:"cluster_name"` |
| | | ClusterID string `json:"cluster_name"` |
| | | |
| | | // port to communicate between cluster members |
| | | ClusterPort int `yaml:"cluster_port"` |
| | | RPCPort int `yaml:"-"` |
| | | ClusterPort int `yaml:"cluster_port"` |
| | | RPCPort int `yaml:"-"` |
| | | Ctx context.Context |
| | | } |
| | | |
| | | // readConfigFile reads configuration from config file |
| | |
| | | return nil |
| | | } |
| | | |
| | | func isFileRightful(filePath string) bool { |
| | | if filePath != "" { |
| | | _, err := os.Stat(filePath) |
| | | if err != nil && os.IsNotExist(err) { |
| | | pos := strings.LastIndex(filePath, "/") |
| | | if pos != -1 { |
| | | filePath = filePath[0:pos] |
| | | } |
| | | |
| | | _, err = os.Stat(filePath) |
| | | if err == nil || !os.IsNotExist(err) { |
| | | return true |
| | | } else { |
| | | return false |
| | | } |
| | | } else { |
| | | return false |
| | | } |
| | | } |
| | | return false |
| | | } |
| | | |
| | | // convertToSerf convert Config to serf.Config |
| | | func (c *Config) convertToSerf() (*serf.Config, error) { |
| | | func (c *Config) convertToSerf(snapshotPath string) (*serf.Config, error) { |
| | | serfConf := serf.DefaultConfig() |
| | | |
| | | bindIP, bindPort, err := SplitHostPort(c.BindAddr, DefaultBindPort) |
| | |
| | | if c.Mode == ModeCluster && c.RetryMaxAttempts <= 0 { |
| | | c.RetryMaxAttempts = retryMaxAttempts |
| | | } |
| | | |
| | | c.SnapshotPath = SnapshotPathDefault |
| | | if isFileRightful(snapshotPath) { |
| | | c.SnapshotPath = snapshotPath |
| | | } |
| | | |
| | | c.ReplayOnJoin = ReplayOnJoinDefault |
| | | |
| | | serfConf.QueryResponseSizeLimit = c.QueryResponseSizeLimit |
| | | serfConf.QuerySizeLimit = c.QuerySizeLimit |
| | | serfConf.UserEventSizeLimit = c.UserEventSizeLimit |
| | | serfConf.SnapshotPath = c.SnapshotPath |
| | | serfConf.EventBuffer = MaxEventBufferCount |
| | | |
| | | return serfConf, nil |
| | | } |
| | | |
| | |
| | | |
| | | return addr.IP.String(), addr.Port, nil |
| | | } |
| | | |