基于serf的数据库同步模块库
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package serf
 
import (
    "fmt"
    "strconv"
 
    "github.com/apache/servicecomb-service-center/syncer/pkg/utils"
    "github.com/hashicorp/memberlist"
    "github.com/hashicorp/serf/cmd/serf/command/agent"
    "github.com/hashicorp/serf/serf"
)
 
const (
    DefaultBindPort    = 30190
    DefaultRPCPort     = 30191
    DefaultClusterPort = 30192
    ModeSingle         = "single"
    ModeCluster        = "cluster"
    retryMaxAttempts   = 3
    groupExpect        = 3
    tagKeyClusterName  = "syncer-cluster-name"
    TagKeyClusterPort  = "syncer-cluster-port"
    TagKeyRPCPort      = "syncer-rpc-port"
)
 
// DefaultConfig default config
func DefaultConfig() *Config {
    agentConf := agent.DefaultConfig()
    agentConf.BindAddr = fmt.Sprintf("0.0.0.0:%d", DefaultBindPort)
    agentConf.RPCAddr = fmt.Sprintf("0.0.0.0:%d", DefaultRPCPort)
    return &Config{
        Mode:        ModeSingle,
        Config:      agentConf,
        ClusterPort: DefaultClusterPort,
    }
}
 
// Config struct
type Config struct {
    // config from serf agent
    *agent.Config
    Mode string `json:"mode"`
 
    // name to group members into cluster
    ClusterName string `json:"cluster_name"`
 
    // port to communicate between cluster members
    ClusterPort int `yaml:"cluster_port"`
    RPCPort     int `yaml:"-"`
}
 
// readConfigFile reads configuration from config file
func (c *Config) readConfigFile(filepath string) error {
    if filepath != "" {
        // todo:
    }
    return nil
}
 
// convertToSerf convert Config to serf.Config
func (c *Config) convertToSerf() (*serf.Config, error) {
    serfConf := serf.DefaultConfig()
 
    bindIP, bindPort, err := utils.SplitHostPort(c.BindAddr, DefaultBindPort)
    if err != nil {
        return nil, fmt.Errorf("invalid bind address: %s", err)
    }
 
    switch c.Profile {
    case "lan":
        serfConf.MemberlistConfig = memberlist.DefaultLANConfig()
    case "wan":
        serfConf.MemberlistConfig = memberlist.DefaultWANConfig()
    case "local":
        serfConf.MemberlistConfig = memberlist.DefaultLocalConfig()
    default:
        serfConf.MemberlistConfig = memberlist.DefaultLANConfig()
    }
 
    serfConf.MemberlistConfig.BindAddr = bindIP
    serfConf.MemberlistConfig.BindPort = bindPort
    serfConf.NodeName = c.NodeName
    serfConf.Tags = map[string]string{TagKeyRPCPort: strconv.Itoa(c.RPCPort)}
 
    if c.ClusterName != "" {
        serfConf.Tags[tagKeyClusterName] = c.ClusterName
        serfConf.Tags[TagKeyClusterPort] = strconv.Itoa(c.ClusterPort)
    }
 
    if c.Mode == ModeCluster && c.RetryMaxAttempts <= 0 {
        c.RetryMaxAttempts = retryMaxAttempts
    }
    return serfConf, nil
}