From 15ba78e620fd0c1e7f505cebd4032979676c7eb2 Mon Sep 17 00:00:00 2001
From: liuxiaolong <liuxiaolong@aiotlink.com>
Date: 星期二, 28 六月 2022 16:12:50 +0800
Subject: [PATCH] scale MaxUserEventSize to 90k
---
agent.go | 53 ++++++++++++++++++++++++++++++++++++++++++++---------
1 files changed, 44 insertions(+), 9 deletions(-)
diff --git a/agent.go b/agent.go
index 22dbcfd..822583b 100644
--- a/agent.go
+++ b/agent.go
@@ -36,6 +36,7 @@
// Agent warps the serf agent
type Agent struct {
*agent.Agent
+ ipc *agent.AgentIPC
conf *Config
readyCh chan struct{}
errorCh chan error
@@ -91,15 +92,30 @@
}
}
-// Start agent
+// Start agent and IPC
func (a *Agent) Start(ctx context.Context) {
+ a.RegisterEventHandler(a)
err := a.Agent.Start()
if err != nil {
logger.Error(err, "start serf agent failed")
a.errorCh <- err
return
}
- a.RegisterEventHandler(a)
+
+ ipc := a.startIPC()
+ a.ipc = ipc
+ go func() {
+ for {
+ select {
+ case <-ctx.Done():
+ a.Agent.Shutdown()
+ a.ipc.Shutdown()
+ return
+ default:
+ time.Sleep(1*time.Second)
+ }
+ }
+ }()
err = a.retryJoin(ctx)
if err != nil {
@@ -112,6 +128,25 @@
go a.BroadcastMemberlist(BroadcastInterval * time.Second)
}
+func (a *Agent) startIPC() *agent.AgentIPC {
+ // Parse the bind address information
+ bindIP, bindPort, err := a.conf.AddrParts(a.conf.BindAddr)
+ bindAddr := &net.TCPAddr{IP: net.ParseIP(bindIP), Port: bindPort}
+
+ // Setup the RPC listener
+ rpcListener, err := net.Listen("tcp", a.conf.RPCAddr)
+ if err != nil {
+ logger.Error("Error starting RPC listener:", err)
+ return nil
+ }
+ ipc := agent.NewAgentIPC(a.Agent, a.conf.RPCAuthKey, rpcListener, logger.GetLogFile(), nil)
+ logger.Debug("RPC addr:", a.conf.RPCAddr)
+ logger.Debug("Snapshot:", a.conf.SnapshotPath)
+ logger.Debug("Profile:", a.conf.Profile)
+ logger.Debug("Message Compression Enabled:", a.conf.EnableCompression)
+ logger.Debug("bindAddr:", bindAddr)
+ return ipc
+}
// HandleEvent Handles serf.EventMemberJoin events,
// which will wait for members to join until the number of group members is equal to "groupExpect"
// when the startup mode is "ModeCluster",
@@ -276,8 +311,8 @@
}
//Init serf Init
-func Init(clusterID string, password string, nodeID string, addrs []string, snapshotPath string) (*Agent, error) {
- agent, err := InitNode(clusterID, password, nodeID, snapshotPath)
+func Init(clusterID string, password string, nodeID string, addrs []string, snapshotPath string, c *Config) (*Agent, error) {
+ agent, err := InitNode(clusterID, password, nodeID, snapshotPath, c)
if err != nil {
logger.Error("InitNode failed, error: %s", err)
return agent, err
@@ -293,8 +328,10 @@
}
//InitNode web鍚庡彴鏀跺埌鍒涘缓闆嗙兢鐨勮姹傦紝
-func InitNode(clusterID string, password string, nodeID string, snapshotPath string) (*Agent, error) {
+func InitNode(clusterID string, password string, nodeID string, snapshotPath string, c *Config) (*Agent, error) {
conf := DefaultConfig()
+ conf.MergeConf(c)
+
logger.Info("clusterID:", clusterID, "password:", password, "nodeID:", nodeID)
conf.ClusterID = clusterID
conf.NodeName = nodeID
@@ -315,11 +352,9 @@
return agent, err
}
- agent.Start(context.Background())
+ agent.Start(conf.Ctx)
//<- agent.readyCh
- go func() {
- agent.ShutdownCh()
- }()
+
time.Sleep(time.Second)
logger.Info("Stats:", agent.Agent.Serf().Stats())
logger.Info("EncryptionEnabled:", agent.Agent.Serf().EncryptionEnabled())
--
Gitblit v1.8.0