基于serf的数据库同步模块库
liuxiaolong
2019-08-09 1bd25d38c58785102004948b420e931578be7012
config.go
@@ -14,13 +14,14 @@
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package serf
package syncdb
import (
   "fmt"
   "net"
   "strconv"
   "github.com/apache/servicecomb-service-center/syncer/pkg/utils"
   //"github.com/apache/servicecomb-service-center/syncer/pkg/utils"
   "github.com/hashicorp/memberlist"
   "github.com/hashicorp/serf/cmd/serf/command/agent"
   "github.com/hashicorp/serf/serf"
@@ -34,14 +35,24 @@
   ModeCluster        = "cluster"
   retryMaxAttempts   = 3
   groupExpect        = 3
   tagKeyClusterName  = "syncer-cluster-name"
   DefaultEncryptKey  = "bjbasic@aiotlink"
   tagKeyClusterID    = "syncer-cluster-name"
   TagKeyClusterPort  = "syncer-cluster-port"
   TagKeyRPCPort      = "syncer-rpc-port"
   BroadcastIP        = "255.255.255.255"
   BroadcastPort      = 30193
   BroadcastInterval  = 5
   MaxQueryRespSize   = 50 * 1024 * 1024
   MaxQuerySize       = 50 * 1024 * 1024
   MaxUserEventSize   = 5 * 1024
)
// DefaultConfig default config
func DefaultConfig() *Config {
   agentConf := agent.DefaultConfig()
   agentConf.QueryResponseSizeLimit = MaxQueryRespSize
   agentConf.QuerySizeLimit = MaxQuerySize
   agentConf.UserEventSizeLimit = MaxUserEventSize
   agentConf.BindAddr = fmt.Sprintf("0.0.0.0:%d", DefaultBindPort)
   agentConf.RPCAddr = fmt.Sprintf("0.0.0.0:%d", DefaultRPCPort)
   return &Config{
@@ -58,7 +69,7 @@
   Mode string `json:"mode"`
   // name to group members into cluster
   ClusterName string `json:"cluster_name"`
   ClusterID string `json:"cluster_name"`
   // port to communicate between cluster members
   ClusterPort int `yaml:"cluster_port"`
@@ -77,7 +88,7 @@
func (c *Config) convertToSerf() (*serf.Config, error) {
   serfConf := serf.DefaultConfig()
   bindIP, bindPort, err := utils.SplitHostPort(c.BindAddr, DefaultBindPort)
   bindIP, bindPort, err := SplitHostPort(c.BindAddr, DefaultBindPort)
   if err != nil {
      return nil, fmt.Errorf("invalid bind address: %s", err)
   }
@@ -96,10 +107,11 @@
   serfConf.MemberlistConfig.BindAddr = bindIP
   serfConf.MemberlistConfig.BindPort = bindPort
   serfConf.NodeName = c.NodeName
   serfConf.Tags = map[string]string{TagKeyRPCPort: strconv.Itoa(c.RPCPort)}
   if c.ClusterName != "" {
      serfConf.Tags[tagKeyClusterName] = c.ClusterName
   if c.ClusterID != "" {
      serfConf.Tags[tagKeyClusterID] = c.ClusterID
      serfConf.Tags[TagKeyClusterPort] = strconv.Itoa(c.ClusterPort)
   }
@@ -108,3 +120,22 @@
   }
   return serfConf, nil
}
// SplitHostPort returns the parts of the address and port. If the port does not exist, use defaultPort.
func SplitHostPort(address string, defaultPort int) (string, int, error) {
   _, _, err := net.SplitHostPort(address)
   if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" {
      address = fmt.Sprintf("%s:%d", address, defaultPort)
      _, _, err = net.SplitHostPort(address)
   }
   if err != nil {
      return "", 0, err
   }
   addr, err := net.ResolveTCPAddr("tcp", address)
   if err != nil {
      return "", 0, err
   }
   return addr.IP.String(), addr.Port, nil
}