/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package syncdb import ( "fmt" "net" "os" "strconv" "strings" //"github.com/apache/servicecomb-service-center/syncer/pkg/utils" "github.com/hashicorp/memberlist" "github.com/hashicorp/serf/cmd/serf/command/agent" "github.com/hashicorp/serf/serf" ) const ( DefaultBindPort = 30190 DefaultRPCPort = 30191 DefaultClusterPort = 30192 ModeSingle = "single" ModeCluster = "cluster" retryMaxAttempts = 3 groupExpect = 3 DefaultEncryptKey = "bjbasic@aiotlink" tagKeyClusterID = "syncer-cluster-name" TagKeyClusterPort = "syncer-cluster-port" TagKeyRPCPort = "syncer-rpc-port" BroadcastIP = "255.255.255.255" BroadcastPort = 30193 BroadcastInterval = 5 MaxQueryRespSize = 50 * 1024 * 1024 MaxQuerySize = 50 * 1024 * 1024 MaxUserEventSize = 9 * 1024 ReplayOnJoinDefault = false SnapshotPathDefault = "/sdcard/serfSnapShot" MaxEventBufferCount = 2048 TcpTransportPort = 30194 //tcp传输大数据量接口 ) // DefaultConfig default config func DefaultConfig() *Config { agentConf := agent.DefaultConfig() agentConf.QueryResponseSizeLimit = MaxQueryRespSize agentConf.QuerySizeLimit = MaxQuerySize agentConf.UserEventSizeLimit = MaxUserEventSize agentConf.BindAddr = fmt.Sprintf("0.0.0.0:%d", DefaultBindPort) agentConf.RPCAddr = fmt.Sprintf("0.0.0.0:%d", DefaultRPCPort) return &Config{ Mode: ModeSingle, Config: agentConf, ClusterPort: DefaultClusterPort, } } // Config struct type Config struct { // config from serf agent *agent.Config Mode string `json:"mode"` // name to group members into cluster ClusterID string `json:"cluster_name"` // port to communicate between cluster members ClusterPort int `yaml:"cluster_port"` RPCPort int `yaml:"-"` } // readConfigFile reads configuration from config file func (c *Config) readConfigFile(filepath string) error { if filepath != "" { // todo: } return nil } func isFileRightful(filePath string) bool { if filePath != "" { _, err := os.Stat(filePath) if err != nil && os.IsNotExist(err) { pos := strings.LastIndex(filePath, "/") if pos != -1 { filePath = filePath[0:pos] } _, err = os.Stat(filePath) if err == nil || !os.IsNotExist(err) { return true } else { return false } } else { return false } } return false } // convertToSerf convert Config to serf.Config func (c *Config) convertToSerf(snapshotPath string) (*serf.Config, error) { serfConf := serf.DefaultConfig() bindIP, bindPort, err := SplitHostPort(c.BindAddr, DefaultBindPort) if err != nil { return nil, fmt.Errorf("invalid bind address: %s", err) } switch c.Profile { case "lan": serfConf.MemberlistConfig = memberlist.DefaultLANConfig() case "wan": serfConf.MemberlistConfig = memberlist.DefaultWANConfig() case "local": serfConf.MemberlistConfig = memberlist.DefaultLocalConfig() default: serfConf.MemberlistConfig = memberlist.DefaultLANConfig() } serfConf.MemberlistConfig.BindAddr = bindIP serfConf.MemberlistConfig.BindPort = bindPort serfConf.NodeName = c.NodeName serfConf.Tags = map[string]string{TagKeyRPCPort: strconv.Itoa(c.RPCPort)} if c.ClusterID != "" { serfConf.Tags[tagKeyClusterID] = c.ClusterID serfConf.Tags[TagKeyClusterPort] = strconv.Itoa(c.ClusterPort) } if c.Mode == ModeCluster && c.RetryMaxAttempts <= 0 { c.RetryMaxAttempts = retryMaxAttempts } c.SnapshotPath = SnapshotPathDefault if isFileRightful(snapshotPath) { c.SnapshotPath = snapshotPath } c.ReplayOnJoin = ReplayOnJoinDefault serfConf.QueryResponseSizeLimit = c.QueryResponseSizeLimit serfConf.QuerySizeLimit = c.QuerySizeLimit serfConf.UserEventSizeLimit = c.UserEventSizeLimit serfConf.SnapshotPath = c.SnapshotPath serfConf.EventBuffer = MaxEventBufferCount return serfConf, nil } // SplitHostPort returns the parts of the address and port. If the port does not exist, use defaultPort. func SplitHostPort(address string, defaultPort int) (string, int, error) { _, _, err := net.SplitHostPort(address) if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" { address = fmt.Sprintf("%s:%d", address, defaultPort) _, _, err = net.SplitHostPort(address) } if err != nil { return "", 0, err } addr, err := net.ResolveTCPAddr("tcp", address) if err != nil { return "", 0, err } return addr.IP.String(), addr.Port, nil }