| | |
| | | * See the License for the specific language governing permissions and |
| | | * limitations under the License. |
| | | */ |
| | | package serf |
| | | package syncdb |
| | | |
| | | import ( |
| | | "fmt" |
| | | "net" |
| | | "strconv" |
| | | |
| | | "github.com/apache/servicecomb-service-center/syncer/pkg/utils" |
| | | //"github.com/apache/servicecomb-service-center/syncer/pkg/utils" |
| | | "github.com/hashicorp/memberlist" |
| | | "github.com/hashicorp/serf/cmd/serf/command/agent" |
| | | "github.com/hashicorp/serf/serf" |
| | |
| | | ModeCluster = "cluster" |
| | | retryMaxAttempts = 3 |
| | | groupExpect = 3 |
| | | tagKeyClusterName = "syncer-cluster-name" |
| | | DefaultEncryptKey = "bjbasic@aiotlink" |
| | | tagKeyClusterID = "syncer-cluster-name" |
| | | TagKeyClusterPort = "syncer-cluster-port" |
| | | TagKeyRPCPort = "syncer-rpc-port" |
| | | BroadcastIP = "255.255.255.255" |
| | | BroadcastPort = 30193 |
| | | BroadcastInterval = 5 |
| | | MaxQueryRespSize = 50 * 1024 * 1024 |
| | | MaxQuerySize = 50 * 1024 * 1024 |
| | | MaxUserEventSize = 5 * 1024 |
| | | ReplayOnJoinDefault = false |
| | | SnapshotPathDefault = "/opt/vasystem/serfSnapShot" |
| | | ) |
| | | |
| | | // DefaultConfig default config |
| | | func DefaultConfig() *Config { |
| | | agentConf := agent.DefaultConfig() |
| | | agentConf.QueryResponseSizeLimit = MaxQueryRespSize |
| | | agentConf.QuerySizeLimit = MaxQuerySize |
| | | agentConf.UserEventSizeLimit = MaxUserEventSize |
| | | agentConf.BindAddr = fmt.Sprintf("0.0.0.0:%d", DefaultBindPort) |
| | | agentConf.RPCAddr = fmt.Sprintf("0.0.0.0:%d", DefaultRPCPort) |
| | | return &Config{ |
| | |
| | | Mode string `json:"mode"` |
| | | |
| | | // name to group members into cluster |
| | | ClusterName string `json:"cluster_name"` |
| | | ClusterID string `json:"cluster_name"` |
| | | |
| | | // port to communicate between cluster members |
| | | ClusterPort int `yaml:"cluster_port"` |
| | |
| | | func (c *Config) convertToSerf() (*serf.Config, error) { |
| | | serfConf := serf.DefaultConfig() |
| | | |
| | | bindIP, bindPort, err := utils.SplitHostPort(c.BindAddr, DefaultBindPort) |
| | | bindIP, bindPort, err := SplitHostPort(c.BindAddr, DefaultBindPort) |
| | | if err != nil { |
| | | return nil, fmt.Errorf("invalid bind address: %s", err) |
| | | } |
| | |
| | | serfConf.MemberlistConfig.BindAddr = bindIP |
| | | serfConf.MemberlistConfig.BindPort = bindPort |
| | | serfConf.NodeName = c.NodeName |
| | | |
| | | serfConf.Tags = map[string]string{TagKeyRPCPort: strconv.Itoa(c.RPCPort)} |
| | | |
| | | if c.ClusterName != "" { |
| | | serfConf.Tags[tagKeyClusterName] = c.ClusterName |
| | | if c.ClusterID != "" { |
| | | serfConf.Tags[tagKeyClusterID] = c.ClusterID |
| | | serfConf.Tags[TagKeyClusterPort] = strconv.Itoa(c.ClusterPort) |
| | | } |
| | | |
| | | if c.Mode == ModeCluster && c.RetryMaxAttempts <= 0 { |
| | | c.RetryMaxAttempts = retryMaxAttempts |
| | | } |
| | | c.SnapshotPath = SnapshotPathDefault |
| | | c.ReplayOnJoin = ReplayOnJoinDefault |
| | | |
| | | serfConf.QueryResponseSizeLimit = c.QueryResponseSizeLimit |
| | | serfConf.QuerySizeLimit = c.QuerySizeLimit |
| | | serfConf.UserEventSizeLimit = c.UserEventSizeLimit |
| | | serfConf.SnapshotPath = c.SnapshotPath |
| | | |
| | | return serfConf, nil |
| | | } |
| | | |
| | | // SplitHostPort returns the parts of the address and port. If the port does not exist, use defaultPort. |
| | | func SplitHostPort(address string, defaultPort int) (string, int, error) { |
| | | _, _, err := net.SplitHostPort(address) |
| | | if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" { |
| | | address = fmt.Sprintf("%s:%d", address, defaultPort) |
| | | _, _, err = net.SplitHostPort(address) |
| | | } |
| | | if err != nil { |
| | | return "", 0, err |
| | | } |
| | | |
| | | addr, err := net.ResolveTCPAddr("tcp", address) |
| | | if err != nil { |
| | | return "", 0, err |
| | | } |
| | | |
| | | return addr.IP.String(), addr.Port, nil |
| | | } |