| | |
| | | * See the License for the specific language governing permissions and |
| | | * limitations under the License. |
| | | */ |
| | | package serf |
| | | package syncdb |
| | | |
| | | import ( |
| | | "fmt" |
| | | "net" |
| | | "strconv" |
| | | |
| | | "github.com/apache/servicecomb-service-center/syncer/pkg/utils" |
| | | //"github.com/apache/servicecomb-service-center/syncer/pkg/utils" |
| | | "github.com/hashicorp/memberlist" |
| | | "github.com/hashicorp/serf/cmd/serf/command/agent" |
| | | "github.com/hashicorp/serf/serf" |
| | | ) |
| | | |
| | | const ( |
| | | DefaultBindPort = 30190 |
| | | DefaultRPCPort = 30191 |
| | | DefaultBindPort = 5000//30190 |
| | | DefaultRPCPort = 7373//30191 |
| | | DefaultClusterPort = 30192 |
| | | ModeSingle = "single" |
| | | ModeCluster = "cluster" |
| | | retryMaxAttempts = 3 |
| | | groupExpect = 3 |
| | | tagKeyClusterName = "syncer-cluster-name" |
| | | DefaultEncryptKey = "bjbasic@aiotlink" |
| | | tagKeyClusterID = "syncer-cluster-name" |
| | | TagKeyClusterPort = "syncer-cluster-port" |
| | | TagKeyRPCPort = "syncer-rpc-port" |
| | | ) |
| | |
| | | // DefaultConfig default config |
| | | func DefaultConfig() *Config { |
| | | agentConf := agent.DefaultConfig() |
| | | agentConf.QueryResponseSizeLimit = 50 * 1024 *1024 |
| | | agentConf.QuerySizeLimit = 50 * 1024 *1024 |
| | | agentConf.UserEventSizeLimit = 1024 |
| | | agentConf.BindAddr = fmt.Sprintf("0.0.0.0:%d", DefaultBindPort) |
| | | agentConf.RPCAddr = fmt.Sprintf("0.0.0.0:%d", DefaultRPCPort) |
| | | return &Config{ |
| | |
| | | Mode string `json:"mode"` |
| | | |
| | | // name to group members into cluster |
| | | ClusterName string `json:"cluster_name"` |
| | | ClusterID string `json:"cluster_name"` |
| | | |
| | | // port to communicate between cluster members |
| | | ClusterPort int `yaml:"cluster_port"` |
| | |
| | | func (c *Config) convertToSerf() (*serf.Config, error) { |
| | | serfConf := serf.DefaultConfig() |
| | | |
| | | bindIP, bindPort, err := utils.SplitHostPort(c.BindAddr, DefaultBindPort) |
| | | bindIP, bindPort, err := SplitHostPort(c.BindAddr, DefaultBindPort) |
| | | if err != nil { |
| | | return nil, fmt.Errorf("invalid bind address: %s", err) |
| | | } |
| | |
| | | serfConf.MemberlistConfig.BindAddr = bindIP |
| | | serfConf.MemberlistConfig.BindPort = bindPort |
| | | serfConf.NodeName = c.NodeName |
| | | |
| | | serfConf.Tags = map[string]string{TagKeyRPCPort: strconv.Itoa(c.RPCPort)} |
| | | |
| | | if c.ClusterName != "" { |
| | | serfConf.Tags[tagKeyClusterName] = c.ClusterName |
| | | if c.ClusterID != "" { |
| | | serfConf.Tags[tagKeyClusterID] = c.ClusterID |
| | | serfConf.Tags[TagKeyClusterPort] = strconv.Itoa(c.ClusterPort) |
| | | } |
| | | |
| | |
| | | } |
| | | return serfConf, nil |
| | | } |
| | | |
| | | // SplitHostPort returns the parts of the address and port. If the port does not exist, use defaultPort. |
| | | func SplitHostPort(address string, defaultPort int) (string, int, error) { |
| | | _, _, err := net.SplitHostPort(address) |
| | | if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" { |
| | | address = fmt.Sprintf("%s:%d", address, defaultPort) |
| | | _, _, err = net.SplitHostPort(address) |
| | | } |
| | | if err != nil { |
| | | return "", 0, err |
| | | } |
| | | |
| | | addr, err := net.ResolveTCPAddr("tcp", address) |
| | | if err != nil { |
| | | return "", 0, err |
| | | } |
| | | |
| | | return addr.IP.String(), addr.Port, nil |
| | | } |
| | | |