基于serf的数据库同步模块库
liuxiaolong
2021-06-07 aea48291949e54554f5c484cca32a1d3119d8e76
add rpc
2个文件已修改
80 ■■■■ 已修改文件
agent.go 53 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
config.go 27 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
agent.go
@@ -36,6 +36,7 @@
// Agent warps the serf agent
type Agent struct {
    *agent.Agent
    ipc *agent.AgentIPC
    conf         *Config
    readyCh      chan struct{}
    errorCh      chan error
@@ -91,15 +92,30 @@
    }
}
// Start agent
// Start agent and IPC
func (a *Agent) Start(ctx context.Context) {
    a.RegisterEventHandler(a)
    err := a.Agent.Start()
    if err != nil {
        logger.Error(err, "start serf agent failed")
        a.errorCh <- err
        return
    }
    a.RegisterEventHandler(a)
    ipc := a.startIPC()
    a.ipc = ipc
    go func() {
        for {
            select {
            case <-ctx.Done():
                a.Agent.Shutdown()
                a.ipc.Shutdown()
                return
            default:
                time.Sleep(1*time.Second)
            }
        }
    }()
    err = a.retryJoin(ctx)
    if err != nil {
@@ -112,6 +128,25 @@
    go a.BroadcastMemberlist(BroadcastInterval * time.Second)
}
func (a *Agent) startIPC() *agent.AgentIPC {
    // Parse the bind address information
    bindIP, bindPort, err := a.conf.AddrParts(a.conf.BindAddr)
    bindAddr := &net.TCPAddr{IP: net.ParseIP(bindIP), Port: bindPort}
    // Setup the RPC listener
    rpcListener, err := net.Listen("tcp", a.conf.RPCAddr)
    if err != nil {
        logger.Error("Error starting RPC listener:", err)
        return nil
    }
    ipc := agent.NewAgentIPC(a.Agent, a.conf.RPCAuthKey, rpcListener, logger.GetLogFile(), nil)
    logger.Debug("RPC addr:", a.conf.RPCAddr)
    logger.Debug("Snapshot:", a.conf.SnapshotPath)
    logger.Debug("Profile:", a.conf.Profile)
    logger.Debug("Message Compression Enabled:", a.conf.EnableCompression)
    logger.Debug("bindAddr:", bindAddr)
    return ipc
}
// HandleEvent Handles serf.EventMemberJoin events,
// which will wait for members to join until the number of group members is equal to "groupExpect"
// when the startup mode is "ModeCluster",
@@ -276,8 +311,8 @@
}
//Init serf Init
func Init(clusterID string, password string, nodeID string, addrs []string, snapshotPath string) (*Agent, error) {
    agent, err := InitNode(clusterID, password, nodeID, snapshotPath)
func Init(clusterID string, password string, nodeID string, addrs []string, snapshotPath string, c *Config) (*Agent, error) {
    agent, err := InitNode(clusterID, password, nodeID, snapshotPath, c)
    if err != nil {
        logger.Error("InitNode failed, error: %s", err)
        return agent, err
@@ -293,8 +328,10 @@
}
//InitNode web后台收到创建集群的请求,
func InitNode(clusterID string, password string, nodeID string, snapshotPath string) (*Agent, error) {
func InitNode(clusterID string, password string, nodeID string, snapshotPath string, c *Config) (*Agent, error) {
    conf := DefaultConfig()
    conf.MergeConf(c)
    logger.Info("clusterID:", clusterID, "password:", password, "nodeID:", nodeID)
    conf.ClusterID = clusterID
    conf.NodeName = nodeID
@@ -315,11 +352,9 @@
        return agent, err
    }
    agent.Start(context.Background())
    agent.Start(conf.Ctx)
    //<- agent.readyCh
    go func() {
        agent.ShutdownCh()
    }()
    time.Sleep(time.Second)
    logger.Info("Stats:", agent.Agent.Serf().Stats())
    logger.Info("EncryptionEnabled:", agent.Agent.Serf().EncryptionEnabled())
config.go
@@ -17,6 +17,7 @@
package syncdb
import (
    "context"
    "fmt"
    "net"
    "os"
@@ -67,6 +68,31 @@
    }
}
func (c *Config) MergeConf(s *Config) {
    if s != nil {
        if s.Ctx != nil {
            c.Ctx = s.Ctx
        } else {
            c.Ctx = context.Background()
        }
        c.BindAddr = s.BindAddr
        c.RPCAddr = s.RPCAddr
        c.RPCPort = s.RPCPort
        //serf快照地址
        if s.SnapshotPath != "" {
            c.SnapshotPath = s.SnapshotPath
        }
        if s.EncryptKey != "" {
            //报文加密的key
            c.EncryptKey = s.EncryptKey
        }
        if s.RPCAuthKey != "" {
            //RPC认证的key
            c.RPCAuthKey = s.RPCAuthKey
        }
    }
}
// Config struct
type Config struct {
    // config from serf agent
@@ -79,6 +105,7 @@
    // port to communicate between cluster members
    ClusterPort int         `yaml:"cluster_port"`
    RPCPort     int         `yaml:"-"`
    Ctx         context.Context
}
// readConfigFile reads configuration from config file