基于serf的数据库同步模块库
liuxiaolong
2019-10-11 50462d9b005381c89dea9fe8b2f505cc489bcbc7
config.go
@@ -14,13 +14,14 @@
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package serf
package syncdb
import (
   "fmt"
   "net"
   "strconv"
   "github.com/apache/servicecomb-service-center/syncer/pkg/utils"
   //"github.com/apache/servicecomb-service-center/syncer/pkg/utils"
   "github.com/hashicorp/memberlist"
   "github.com/hashicorp/serf/cmd/serf/command/agent"
   "github.com/hashicorp/serf/serf"
@@ -34,14 +35,29 @@
   ModeCluster        = "cluster"
   retryMaxAttempts   = 3
   groupExpect        = 3
   tagKeyClusterName  = "syncer-cluster-name"
   DefaultEncryptKey  = "bjbasic@aiotlink"
   tagKeyClusterID    = "syncer-cluster-name"
   TagKeyClusterPort  = "syncer-cluster-port"
   TagKeyRPCPort      = "syncer-rpc-port"
   BroadcastIP        = "255.255.255.255"
   BroadcastPort      = 30193
   BroadcastInterval  = 5
   MaxQueryRespSize   = 50 * 1024 * 1024
   MaxQuerySize       = 50 * 1024 * 1024
   MaxUserEventSize   = 500 * 1024
   ReplayOnJoinDefault = false
   SnapshotPathDefault = "/opt/vasystem/serfSnapShot"
   MaxEventBufferCount = 2048
   TcpTransportPort = 30194 //tcp传输大数据量接口
)
// DefaultConfig default config
func DefaultConfig() *Config {
   agentConf := agent.DefaultConfig()
   agentConf.QueryResponseSizeLimit = MaxQueryRespSize
   agentConf.QuerySizeLimit = MaxQuerySize
   agentConf.UserEventSizeLimit = MaxUserEventSize
   agentConf.BindAddr = fmt.Sprintf("0.0.0.0:%d", DefaultBindPort)
   agentConf.RPCAddr = fmt.Sprintf("0.0.0.0:%d", DefaultRPCPort)
   return &Config{
@@ -58,7 +74,7 @@
   Mode string `json:"mode"`
   // name to group members into cluster
   ClusterName string `json:"cluster_name"`
   ClusterID string `json:"cluster_name"`
   // port to communicate between cluster members
   ClusterPort int `yaml:"cluster_port"`
@@ -77,7 +93,7 @@
func (c *Config) convertToSerf() (*serf.Config, error) {
   serfConf := serf.DefaultConfig()
   bindIP, bindPort, err := utils.SplitHostPort(c.BindAddr, DefaultBindPort)
   bindIP, bindPort, err := SplitHostPort(c.BindAddr, DefaultBindPort)
   if err != nil {
      return nil, fmt.Errorf("invalid bind address: %s", err)
   }
@@ -96,15 +112,44 @@
   serfConf.MemberlistConfig.BindAddr = bindIP
   serfConf.MemberlistConfig.BindPort = bindPort
   serfConf.NodeName = c.NodeName
   serfConf.Tags = map[string]string{TagKeyRPCPort: strconv.Itoa(c.RPCPort)}
   if c.ClusterName != "" {
      serfConf.Tags[tagKeyClusterName] = c.ClusterName
   if c.ClusterID != "" {
      serfConf.Tags[tagKeyClusterID] = c.ClusterID
      serfConf.Tags[TagKeyClusterPort] = strconv.Itoa(c.ClusterPort)
   }
   if c.Mode == ModeCluster && c.RetryMaxAttempts <= 0 {
      c.RetryMaxAttempts = retryMaxAttempts
   }
   c.SnapshotPath = SnapshotPathDefault
   c.ReplayOnJoin = ReplayOnJoinDefault
   serfConf.QueryResponseSizeLimit = c.QueryResponseSizeLimit
   serfConf.QuerySizeLimit = c.QuerySizeLimit
   serfConf.UserEventSizeLimit = c.UserEventSizeLimit
   serfConf.SnapshotPath = c.SnapshotPath
   serfConf.EventBuffer = MaxEventBufferCount
   return serfConf, nil
}
// SplitHostPort returns the parts of the address and port. If the port does not exist, use defaultPort.
func SplitHostPort(address string, defaultPort int) (string, int, error) {
   _, _, err := net.SplitHostPort(address)
   if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" {
      address = fmt.Sprintf("%s:%d", address, defaultPort)
      _, _, err = net.SplitHostPort(address)
   }
   if err != nil {
      return "", 0, err
   }
   addr, err := net.ResolveTCPAddr("tcp", address)
   if err != nil {
      return "", 0, err
   }
   return addr.IP.String(), addr.Port, nil
}