From aea48291949e54554f5c484cca32a1d3119d8e76 Mon Sep 17 00:00:00 2001
From: liuxiaolong <liuxiaolong@aiotlink.com>
Date: 星期一, 07 六月 2021 15:06:34 +0800
Subject: [PATCH] add rpc
---
agent.go | 53 ++++++++++++++++++++++----
config.go | 27 +++++++++++++
2 files changed, 71 insertions(+), 9 deletions(-)
diff --git a/agent.go b/agent.go
index 22dbcfd..822583b 100644
--- a/agent.go
+++ b/agent.go
@@ -36,6 +36,7 @@
// Agent warps the serf agent
type Agent struct {
*agent.Agent
+ ipc *agent.AgentIPC
conf *Config
readyCh chan struct{}
errorCh chan error
@@ -91,15 +92,30 @@
}
}
-// Start agent
+// Start agent and IPC
func (a *Agent) Start(ctx context.Context) {
+ a.RegisterEventHandler(a)
err := a.Agent.Start()
if err != nil {
logger.Error(err, "start serf agent failed")
a.errorCh <- err
return
}
- a.RegisterEventHandler(a)
+
+ ipc := a.startIPC()
+ a.ipc = ipc
+ go func() {
+ for {
+ select {
+ case <-ctx.Done():
+ a.Agent.Shutdown()
+ a.ipc.Shutdown()
+ return
+ default:
+ time.Sleep(1*time.Second)
+ }
+ }
+ }()
err = a.retryJoin(ctx)
if err != nil {
@@ -112,6 +128,25 @@
go a.BroadcastMemberlist(BroadcastInterval * time.Second)
}
+func (a *Agent) startIPC() *agent.AgentIPC {
+ // Parse the bind address information
+ bindIP, bindPort, err := a.conf.AddrParts(a.conf.BindAddr)
+ bindAddr := &net.TCPAddr{IP: net.ParseIP(bindIP), Port: bindPort}
+
+ // Setup the RPC listener
+ rpcListener, err := net.Listen("tcp", a.conf.RPCAddr)
+ if err != nil {
+ logger.Error("Error starting RPC listener:", err)
+ return nil
+ }
+ ipc := agent.NewAgentIPC(a.Agent, a.conf.RPCAuthKey, rpcListener, logger.GetLogFile(), nil)
+ logger.Debug("RPC addr:", a.conf.RPCAddr)
+ logger.Debug("Snapshot:", a.conf.SnapshotPath)
+ logger.Debug("Profile:", a.conf.Profile)
+ logger.Debug("Message Compression Enabled:", a.conf.EnableCompression)
+ logger.Debug("bindAddr:", bindAddr)
+ return ipc
+}
// HandleEvent Handles serf.EventMemberJoin events,
// which will wait for members to join until the number of group members is equal to "groupExpect"
// when the startup mode is "ModeCluster",
@@ -276,8 +311,8 @@
}
//Init serf Init
-func Init(clusterID string, password string, nodeID string, addrs []string, snapshotPath string) (*Agent, error) {
- agent, err := InitNode(clusterID, password, nodeID, snapshotPath)
+func Init(clusterID string, password string, nodeID string, addrs []string, snapshotPath string, c *Config) (*Agent, error) {
+ agent, err := InitNode(clusterID, password, nodeID, snapshotPath, c)
if err != nil {
logger.Error("InitNode failed, error: %s", err)
return agent, err
@@ -293,8 +328,10 @@
}
//InitNode web鍚庡彴鏀跺埌鍒涘缓闆嗙兢鐨勮姹傦紝
-func InitNode(clusterID string, password string, nodeID string, snapshotPath string) (*Agent, error) {
+func InitNode(clusterID string, password string, nodeID string, snapshotPath string, c *Config) (*Agent, error) {
conf := DefaultConfig()
+ conf.MergeConf(c)
+
logger.Info("clusterID:", clusterID, "password:", password, "nodeID:", nodeID)
conf.ClusterID = clusterID
conf.NodeName = nodeID
@@ -315,11 +352,9 @@
return agent, err
}
- agent.Start(context.Background())
+ agent.Start(conf.Ctx)
//<- agent.readyCh
- go func() {
- agent.ShutdownCh()
- }()
+
time.Sleep(time.Second)
logger.Info("Stats:", agent.Agent.Serf().Stats())
logger.Info("EncryptionEnabled:", agent.Agent.Serf().EncryptionEnabled())
diff --git a/config.go b/config.go
index 643226a..855d299 100644
--- a/config.go
+++ b/config.go
@@ -17,6 +17,7 @@
package syncdb
import (
+ "context"
"fmt"
"net"
"os"
@@ -67,6 +68,31 @@
}
}
+func (c *Config) MergeConf(s *Config) {
+ if s != nil {
+ if s.Ctx != nil {
+ c.Ctx = s.Ctx
+ } else {
+ c.Ctx = context.Background()
+ }
+ c.BindAddr = s.BindAddr
+ c.RPCAddr = s.RPCAddr
+ c.RPCPort = s.RPCPort
+ //serf蹇収鍦板潃
+ if s.SnapshotPath != "" {
+ c.SnapshotPath = s.SnapshotPath
+ }
+ if s.EncryptKey != "" {
+ //鎶ユ枃鍔犲瘑鐨刱ey
+ c.EncryptKey = s.EncryptKey
+ }
+ if s.RPCAuthKey != "" {
+ //RPC璁よ瘉鐨刱ey
+ c.RPCAuthKey = s.RPCAuthKey
+ }
+ }
+}
+
// Config struct
type Config struct {
// config from serf agent
@@ -79,6 +105,7 @@
// port to communicate between cluster members
ClusterPort int `yaml:"cluster_port"`
RPCPort int `yaml:"-"`
+ Ctx context.Context
}
// readConfigFile reads configuration from config file
--
Gitblit v1.8.0