/*
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
* contributor license agreements. See the NOTICE file distributed with
|
* this work for additional information regarding copyright ownership.
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
* (the "License"); you may not use this file except in compliance with
|
* the License. You may obtain a copy of the License at
|
*
|
* http://www.apache.org/licenses/LICENSE-2.0
|
*
|
* Unless required by applicable law or agreed to in writing, software
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* See the License for the specific language governing permissions and
|
* limitations under the License.
|
*/
|
package syncdb
|
|
import (
|
"fmt"
|
"net"
|
"os"
|
"strconv"
|
"strings"
|
|
//"github.com/apache/servicecomb-service-center/syncer/pkg/utils"
|
"github.com/hashicorp/memberlist"
|
"github.com/hashicorp/serf/cmd/serf/command/agent"
|
"github.com/hashicorp/serf/serf"
|
)
|
|
const (
|
DefaultBindPort = 30190
|
DefaultRPCPort = 30191
|
DefaultClusterPort = 30192
|
ModeSingle = "single"
|
ModeCluster = "cluster"
|
retryMaxAttempts = 3
|
groupExpect = 3
|
DefaultEncryptKey = "bjbasic@aiotlink"
|
tagKeyClusterID = "syncer-cluster-name"
|
TagKeyClusterPort = "syncer-cluster-port"
|
TagKeyRPCPort = "syncer-rpc-port"
|
BroadcastIP = "255.255.255.255"
|
BroadcastPort = 30193
|
BroadcastInterval = 5
|
MaxQueryRespSize = 50 * 1024 * 1024
|
MaxQuerySize = 50 * 1024 * 1024
|
MaxUserEventSize = 9 * 1024
|
ReplayOnJoinDefault = false
|
SnapshotPathDefault = "/sdcard/serfSnapShot"
|
MaxEventBufferCount = 2048
|
|
TcpTransportPort = 30194 //tcp传输大数据量接口
|
)
|
|
// DefaultConfig default config
|
func DefaultConfig() *Config {
|
agentConf := agent.DefaultConfig()
|
agentConf.QueryResponseSizeLimit = MaxQueryRespSize
|
agentConf.QuerySizeLimit = MaxQuerySize
|
agentConf.UserEventSizeLimit = MaxUserEventSize
|
agentConf.BindAddr = fmt.Sprintf("0.0.0.0:%d", DefaultBindPort)
|
agentConf.RPCAddr = fmt.Sprintf("0.0.0.0:%d", DefaultRPCPort)
|
return &Config{
|
Mode: ModeSingle,
|
Config: agentConf,
|
ClusterPort: DefaultClusterPort,
|
}
|
}
|
|
// Config struct
|
type Config struct {
|
// config from serf agent
|
*agent.Config
|
Mode string `json:"mode"`
|
|
// name to group members into cluster
|
ClusterID string `json:"cluster_name"`
|
|
// port to communicate between cluster members
|
ClusterPort int `yaml:"cluster_port"`
|
RPCPort int `yaml:"-"`
|
}
|
|
// readConfigFile reads configuration from config file
|
func (c *Config) readConfigFile(filepath string) error {
|
if filepath != "" {
|
// todo:
|
}
|
return nil
|
}
|
|
func isFileRightful(filePath string) bool {
|
if filePath != "" {
|
_, err := os.Stat(filePath)
|
if err != nil && os.IsNotExist(err) {
|
pos := strings.LastIndex(filePath, "/")
|
if pos != -1 {
|
filePath = filePath[0:pos]
|
}
|
|
_, err = os.Stat(filePath)
|
if err == nil || !os.IsNotExist(err) {
|
return true
|
} else {
|
return false
|
}
|
} else {
|
return false
|
}
|
}
|
return false
|
}
|
|
// convertToSerf convert Config to serf.Config
|
func (c *Config) convertToSerf(snapshotPath string) (*serf.Config, error) {
|
serfConf := serf.DefaultConfig()
|
|
bindIP, bindPort, err := SplitHostPort(c.BindAddr, DefaultBindPort)
|
if err != nil {
|
return nil, fmt.Errorf("invalid bind address: %s", err)
|
}
|
|
switch c.Profile {
|
case "lan":
|
serfConf.MemberlistConfig = memberlist.DefaultLANConfig()
|
case "wan":
|
serfConf.MemberlistConfig = memberlist.DefaultWANConfig()
|
case "local":
|
serfConf.MemberlistConfig = memberlist.DefaultLocalConfig()
|
default:
|
serfConf.MemberlistConfig = memberlist.DefaultLANConfig()
|
}
|
|
serfConf.MemberlistConfig.BindAddr = bindIP
|
serfConf.MemberlistConfig.BindPort = bindPort
|
serfConf.NodeName = c.NodeName
|
|
serfConf.Tags = map[string]string{TagKeyRPCPort: strconv.Itoa(c.RPCPort)}
|
|
if c.ClusterID != "" {
|
serfConf.Tags[tagKeyClusterID] = c.ClusterID
|
serfConf.Tags[TagKeyClusterPort] = strconv.Itoa(c.ClusterPort)
|
}
|
|
if c.Mode == ModeCluster && c.RetryMaxAttempts <= 0 {
|
c.RetryMaxAttempts = retryMaxAttempts
|
}
|
|
c.SnapshotPath = SnapshotPathDefault
|
if isFileRightful(snapshotPath) {
|
c.SnapshotPath = snapshotPath
|
}
|
|
c.ReplayOnJoin = ReplayOnJoinDefault
|
|
serfConf.QueryResponseSizeLimit = c.QueryResponseSizeLimit
|
serfConf.QuerySizeLimit = c.QuerySizeLimit
|
serfConf.UserEventSizeLimit = c.UserEventSizeLimit
|
serfConf.SnapshotPath = c.SnapshotPath
|
serfConf.EventBuffer = MaxEventBufferCount
|
|
return serfConf, nil
|
}
|
|
// SplitHostPort returns the parts of the address and port. If the port does not exist, use defaultPort.
|
func SplitHostPort(address string, defaultPort int) (string, int, error) {
|
_, _, err := net.SplitHostPort(address)
|
if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" {
|
address = fmt.Sprintf("%s:%d", address, defaultPort)
|
_, _, err = net.SplitHostPort(address)
|
}
|
if err != nil {
|
return "", 0, err
|
}
|
|
addr, err := net.ResolveTCPAddr("tcp", address)
|
if err != nil {
|
return "", 0, err
|
}
|
|
return addr.IP.String(), addr.Port, nil
|
}
|