From 41a9bf370cff977ff8afe122a7610e07fa6c3b80 Mon Sep 17 00:00:00 2001
From: zhangqian <zhangqian@123.com>
Date: 星期一, 09 十月 2023 17:02:16 +0800
Subject: [PATCH] 加入serf集群

---
 go.sum                    |   21 +++++
 pkg/nsqclient/consumer.go |    3 
 go.mod                    |    9 ++
 main.go                   |   51 +++---------
 nsq/consumer.go           |    5 
 nsq/nsq.go                |  102 ++++++++++++++++---------
 6 files changed, 112 insertions(+), 79 deletions(-)

diff --git a/go.mod b/go.mod
index b655972..f87ef64 100644
--- a/go.mod
+++ b/go.mod
@@ -4,6 +4,9 @@
 
 require (
 	basic.com/aps/nsqclient.git v0.0.0-20230517072415-37491f4a5d25
+	basic.com/pubsub/protomsg.git v0.0.0-20230210092337-5f1e6cdae7c3
+	basic.com/valib/bhomeclient.git v1.2.1
+	basic.com/valib/bhomedbapi.git v0.0.0-20220825084023-fe74ddd6ae6e
 	github.com/apache/plc4x/plc4go v0.0.0-20230817065839-dd203446b558
 	github.com/gin-contrib/cors v1.4.0
 	github.com/gin-gonic/gin v1.9.1
@@ -11,6 +14,7 @@
 	github.com/go-redis/redis/v8 v8.11.4
 	github.com/goburrow/modbus v0.1.0
 	github.com/gofrs/uuid v4.4.0+incompatible
+	github.com/gogo/protobuf v1.3.2
 	github.com/gorilla/websocket v1.5.0
 	github.com/jacobsa/go-serial v0.0.0-20180131005756-15cf729a72d4
 	github.com/jinzhu/gorm v1.9.16
@@ -33,8 +37,10 @@
 )
 
 require (
+	basic.com/valib/c_bhomebus.git v0.0.0-20230203061815-9f24b2f398b7 // indirect
 	github.com/IBM/netaddr v1.5.0 // indirect
 	github.com/KyleBanks/depth v1.2.1 // indirect
+	github.com/ajg/form v1.5.1 // indirect
 	github.com/bytedance/sonic v1.9.1 // indirect
 	github.com/cespare/xxhash/v2 v2.2.0 // indirect
 	github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect
@@ -54,6 +60,7 @@
 	github.com/goburrow/serial v0.1.0 // indirect
 	github.com/goccy/go-json v0.10.2 // indirect
 	github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 // indirect
+	github.com/golang/protobuf v1.5.3 // indirect
 	github.com/golang/snappy v0.0.1 // indirect
 	github.com/google/uuid v1.3.0 // indirect
 	github.com/gopacket/gopacket v1.1.1 // indirect
@@ -94,6 +101,8 @@
 	golang.org/x/sys v0.11.0 // indirect
 	golang.org/x/text v0.12.0 // indirect
 	golang.org/x/tools v0.12.0 // indirect
+	google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
+	google.golang.org/grpc v1.55.0 // indirect
 	google.golang.org/protobuf v1.30.0 // indirect
 	gopkg.in/ini.v1 v1.67.0 // indirect
 	gopkg.in/yaml.v3 v3.0.1 // indirect
diff --git a/go.sum b/go.sum
index e5d41eb..3832c1e 100644
--- a/go.sum
+++ b/go.sum
@@ -1,5 +1,13 @@
 basic.com/aps/nsqclient.git v0.0.0-20230517072415-37491f4a5d25 h1:sZyNfIISgP1eoY94LG48Kav6HYVLem6EzaEbCeXlcXQ=
 basic.com/aps/nsqclient.git v0.0.0-20230517072415-37491f4a5d25/go.mod h1:1RnwEtePLR7ATQorQTxdgvs1o7uuUy1Vw8W7GYtVnoY=
+basic.com/pubsub/protomsg.git v0.0.0-20230210092337-5f1e6cdae7c3 h1:h6dF39g4oqEMY0wDcFG3W4wYpeTNFjwWMp8TmFKnrAg=
+basic.com/pubsub/protomsg.git v0.0.0-20230210092337-5f1e6cdae7c3/go.mod h1:un5NV5VWQoblVLZfx1Rt5vyLgwR0jI92d3VJhfrJhWU=
+basic.com/valib/bhomeclient.git v1.2.1 h1:2q5hcf8V0lSTJSX+WcJh9rI/LDnD+We65+yXK04rjfY=
+basic.com/valib/bhomeclient.git v1.2.1/go.mod h1:QEPxNQPQjTTVrsMI+AyVRzq/bI99lxmct1BgCmu/Pvs=
+basic.com/valib/bhomedbapi.git v0.0.0-20220825084023-fe74ddd6ae6e h1:LYulJQfA5y0/y51KOTQfAGIA/l6r6cQ36Fvrwfce5Pk=
+basic.com/valib/bhomedbapi.git v0.0.0-20220825084023-fe74ddd6ae6e/go.mod h1:no2OZ7ght2oZ6thE+e0w3UdJjgjt/TgKlWSwdPs9GAc=
+basic.com/valib/c_bhomebus.git v0.0.0-20230203061815-9f24b2f398b7 h1:ck7olnaIkX0bJ3LfY3n1yaaRBjYlYJYrdieDqigfIKQ=
+basic.com/valib/c_bhomebus.git v0.0.0-20230203061815-9f24b2f398b7/go.mod h1:QFq9kACofwTIwjtY3xyfrSnhr3NLWJt8WSvwnnEta1U=
 cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
 cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
 cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU=
@@ -48,6 +56,8 @@
 github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0=
 github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE=
 github.com/ajankovic/xdiff v0.0.1 h1:V1cj8t5xwYzm6ZGPqPOlAc9AIajXuTEn41D/1MJBWMM=
+github.com/ajg/form v1.5.1 h1:t9c7v8JUKu/XxOGBU0yjNpaMloxGEJhUkqFRq0ibGeU=
+github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY=
 github.com/andybalholm/cascadia v1.1.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y=
 github.com/antchfx/xpath v0.0.0-20170515025933-1f3266e77307 h1:C735MoY/X+UOx6SECmHk5pVOj51h839Ph13pEoY8UmU=
 github.com/apache/plc4x/plc4go v0.0.0-20230817065839-dd203446b558 h1:d3INvMf4ei9qlX10We5+z/+dQnmmCx0J0wflcZVihGo=
@@ -148,6 +158,8 @@
 github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
 github.com/gofrs/uuid v4.4.0+incompatible h1:3qXRTX8/NbyulANqlc0lchS1gqAVxRgsuW1YrTJupqA=
 github.com/gofrs/uuid v4.4.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
+github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
+github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
 github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe h1:lXe2qZdvpiX5WZkZR4hgp4KJVfY3nMkvmwbVkpv1rVY=
 github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0=
 github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 h1:DACJavvAHhabrF08vX0COfcOBJRhZ8lUbR+ZWIs0Y5g=
@@ -179,6 +191,8 @@
 github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
 github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
 github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
+github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
+github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
 github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
 github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
 github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
@@ -246,6 +260,7 @@
 github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
 github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
 github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
+github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
 github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
 github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
 github.com/klauspost/cpuid/v2 v2.2.4 h1:acbojRNwl3o09bUq+yDCtZFc1aiwaAAxtcn8YkZXnvk=
@@ -620,6 +635,7 @@
 golang.org/x/tools v0.0.0-20200512131952-2bc93b1c0c88/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
 golang.org/x/tools v0.0.0-20200515010526-7d3b6ebf133d/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
 golang.org/x/tools v0.0.0-20200618134242-20370b0cb4b2/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
+golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
 golang.org/x/tools v0.0.0-20200729194436-6467de6f59a7/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
 golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
 golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
@@ -629,6 +645,7 @@
 golang.org/x/tools v0.0.0-20201208233053-a543418bbed2/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
 golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
 golang.org/x/tools v0.0.0-20210105154028-b0ab187a4818/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
+golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
 golang.org/x/tools v0.0.0-20210108195828-e2f9c7f1fc8e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
 golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
 golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
@@ -700,6 +717,8 @@
 google.golang.org/genproto v0.0.0-20201214200347-8c77b98c765d/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
 google.golang.org/genproto v0.0.0-20210108203827-ffc7fda8c3d7/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
 google.golang.org/genproto v0.0.0-20210226172003-ab064af71705/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
+google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 h1:KpwkzHKEF7B9Zxg18WzOa7djJ+Ha5DzthMyZYQfEn2A=
+google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1/go.mod h1:nKE/iIaLqn2bQwXBg8f1g2Ylh6r5MN5CmZvuzZCgsCU=
 google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
 google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
 google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
@@ -716,6 +735,8 @@
 google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc=
 google.golang.org/grpc v1.34.0/go.mod h1:WotjhfgOW/POjDeRt8vscBtXq+2VjORFy659qA51WJ8=
 google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
+google.golang.org/grpc v1.55.0 h1:3Oj82/tFSCeUrRTg/5E/7d/W5A1tj6Ky1ABAuZuv5ag=
+google.golang.org/grpc v1.55.0/go.mod h1:iYEXKGkEBhg1PjZQvoYEVPTDkHo1/bjTnfwTeGONTY8=
 google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
 google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
 google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
diff --git a/main.go b/main.go
index 00d2599..569d9c6 100644
--- a/main.go
+++ b/main.go
@@ -12,9 +12,6 @@
 	"apsClient/service/plc_address"
 	"fmt"
 	"net/http"
-	"os"
-	"os/signal"
-	"syscall"
 	"time"
 )
 
@@ -28,22 +25,13 @@
 		return
 	}
 
-	if err := nsq.Init(); err != nil {
-		logx.Errorf("nsq Init err:%v", err)
-		return
-	}
-
 	if err := crontask.InitTask(); err != nil {
 		logx.Errorf("crontab task Init err:%v", err)
 		return
 	}
+
 	//鍔犺浇plc鍐欏叆鍦板潃
 	plc_address.LoadAddressFromFile()
-
-	////鎻愬墠鍔犺浇浠诲姟
-	//service.NewTaskService().GetTask()
-
-	//go shutdown()
 
 	// 鍚姩鏁版嵁鍚屾
 	var serfStartChan = make(chan bool)
@@ -58,11 +46,15 @@
 	agent := serf.InitAgent("apsClient", syncTables, sqlitex.GetDB())
 	agent.RegisterClusterEvent(serfClusterEvent)
 	go agent.Serve(serfStartChan)
-
 	<-serfStartChan
 
 	// 鍒ゆ柇褰撳墠闆嗙兢鐘舵��
-	//agent.ClusterStatus == "master"
+	if agent.ClusterStatus != "slave" {
+		if err := nsq.Init(); err != nil {
+			logx.Errorf("nsq Init err:%v", err)
+			return
+		}
+	}
 
 	logx.Infof("apsClient start serve...")
 	server := &http.Server{
@@ -74,29 +66,16 @@
 	logx.Error(server.ListenAndServe().Error())
 }
 
-func shutdown() {
-	quit := make(chan os.Signal, 1)
-	signal.Notify(quit, syscall.SIGKILL, syscall.SIGQUIT, syscall.SIGINT, syscall.SIGTERM)
-	<-quit
-
-	logx.Infof("apsClient exited...")
-	os.Exit(0)
-}
-
 func serfClusterEvent(stat int) {
 	switch stat {
-
-	case serf.EventCreateCluster:
-		// 鍒涘缓闆嗙兢
-	case serf.EventJoinCluster:
-		// 鍔犲叆闆嗙兢
-	case serf.EventLeaveCluster:
-		// 閫�鍑洪泦缇�
-	case serf.EventSlave2Master:
-		// 鍒囨崲涓轰富鑺傜偣
-	case serf.EventMaster2Slave:
-		// 鍒囨崲涓哄瓙鑺傜偣
+	case serf.EventCreateCluster, serf.EventSlave2Master, serf.EventLeaveCluster:
+		if err := nsq.Init(); err != nil {
+			logx.Errorf("nsq Init err:%v", err)
+			return
+		}
+	case serf.EventJoinCluster, serf.EventMaster2Slave:
+		nsq.Stop()
 	}
 
 	fmt.Println("clusterEvent:", stat)
-}
\ No newline at end of file
+}
diff --git a/nsq/consumer.go b/nsq/consumer.go
index d6e668a..879f641 100644
--- a/nsq/consumer.go
+++ b/nsq/consumer.go
@@ -5,12 +5,11 @@
 	"apsClient/constvar"
 	"apsClient/pkg/logx"
 	"apsClient/pkg/nsqclient"
-	"context"
 	"fmt"
 )
 
-func Consume(topic, channel string) (err error) {
-	c, err := nsqclient.NewNsqConsumer(context.Background(), topic, channel)
+func NewConsumer(topic, channel string) (c *nsqclient.NsqConsumer, err error) {
+	c, err = nsqclient.NewNsqConsumer(topic, channel)
 	if err != nil {
 		logx.Errorf("NewNsqConsumer err:%v", err)
 		return
diff --git a/nsq/nsq.go b/nsq/nsq.go
index 0c293db..2035f15 100644
--- a/nsq/nsq.go
+++ b/nsq/nsq.go
@@ -6,20 +6,32 @@
 	"apsClient/model/common"
 	"apsClient/pkg/logx"
 	"apsClient/pkg/safe"
+	"basic.com/aps/nsqclient.git"
+	"context"
 	"errors"
 	"fmt"
+	"sync"
 	"time"
 )
 
-func Init() error {
+type consumerManager struct {
+	ctx     context.Context
+	clients sync.Map
+}
+
+var defaultConsumerManager *consumerManager
+
+func init() {
+	defaultConsumerManager = new(consumerManager)
+}
+
+func (c *consumerManager) init() error {
 	if len(conf.Conf.NsqConf.NodeId) <= 0 {
 		return errors.New("no NodeId")
 	}
-
 	if err := initProducer(); err != nil {
 		return err
 	}
-
 	safe.Go(func() {
 		caller := NewCaller(fmt.Sprintf(constvar.NsqTopicGetPlcAddress, conf.Conf.NsqConf.NodeId), fmt.Sprintf(constvar.NsqTopicSendPlcAddress, conf.Conf.NsqConf.NodeId))
 		var addressResult common.ResponsePlcAddress
@@ -29,40 +41,54 @@
 		}
 	})
 
-	safe.Go(func() {
-		err := Consume(fmt.Sprintf(constvar.NsqTopicScheduleTask, conf.Conf.NsqConf.NodeId), conf.Conf.System.DeviceId)
-		if err != nil {
-			logx.Errorf("start nsq consume err: %v", err)
-		}
-	})
-
-	safe.Go(func() {
-		err := Consume(fmt.Sprintf(constvar.NsqTopicSendPlcAddress, conf.Conf.NsqConf.NodeId), conf.Conf.System.DeviceId)
-		if err != nil {
-			logx.Errorf("start nsq consume err: %v", err)
-		}
-	})
-
-	safe.Go(func() {
-		err := Consume(fmt.Sprintf(constvar.NsqTopicProcessParamsResponse, conf.Conf.NsqConf.NodeId), conf.Conf.System.DeviceId)
-		if err != nil {
-			logx.Errorf("start nsq consume err: %v", err)
-		}
-	})
-
-	safe.Go(func() {
-		err := Consume(fmt.Sprintf(constvar.NsqTopicApsProcessParams, conf.Conf.NsqConf.NodeId), conf.Conf.System.DeviceId)
-		if err != nil {
-			logx.Errorf("start nsq consume err: %v", err)
-		}
-	})
-
-	safe.Go(func() {
-		err := Consume(fmt.Sprintf(constvar.NsqTopicDeviceUpdate, conf.Conf.NsqConf.NodeId), conf.Conf.System.DeviceId)
-		if err != nil {
-			logx.Errorf("start nsq consume err: %v", err)
-		}
-	})
-
+	var topics = []string{
+		constvar.NsqTopicScheduleTask,
+		constvar.NsqTopicSendPlcAddress,
+		constvar.NsqTopicProcessParamsResponse,
+		constvar.NsqTopicApsProcessParams,
+		constvar.NsqTopicDeviceUpdate,
+	}
+	for _, t := range topics {
+		topic := fmt.Sprintf(t, conf.Conf.NsqConf.NodeId)
+		c.AddConsumer(topic)
+	}
 	return nil
 }
+
+func (c *consumerManager) AddConsumer(topic string) {
+	client, err := NewConsumer(topic, conf.Conf.System.DeviceId)
+	if err != nil {
+		logx.Errorf("start nsq consume err: %v", err)
+	}
+	c.clients.Store(topic, client)
+	safe.Go(func() {
+		if len(conf.Conf.NsqConf.NsqlookupdAddr) > 0 {
+			if err = client.RunLookupd(conf.Conf.NsqConf.NsqlookupdAddr, 1); err != nil {
+				logx.Errorf("RunLookupd err:%v", err)
+				return
+			}
+		} else {
+			if err = client.Run(conf.Conf.NsqConf.NsqdAddr, 1); err != nil {
+				logx.Errorf("Run err:%v", err)
+				return
+			}
+		}
+	})
+}
+
+func (c *consumerManager) stop() {
+	c.clients.Range(func(key, value any) bool {
+		if consumer, ok := value.(*nsqclient.NsqConsumer); ok {
+			nsqclient.DestroyNsqConsumer(consumer)
+		}
+		return true
+	})
+}
+
+func Init() error {
+	return defaultConsumerManager.init()
+}
+
+func Stop() {
+	defaultConsumerManager.stop()
+}
diff --git a/pkg/nsqclient/consumer.go b/pkg/nsqclient/consumer.go
index a0df0b0..acbf91a 100644
--- a/pkg/nsqclient/consumer.go
+++ b/pkg/nsqclient/consumer.go
@@ -18,7 +18,7 @@
 	channel   string
 }
 
-func NewNsqConsumer(ctx context.Context, topic, channel string, options ...func(*nsq.Config)) (*NsqConsumer, error) {
+func NewNsqConsumer(topic, channel string, options ...func(*nsq.Config)) (*NsqConsumer, error) {
 	conf := nsq.NewConfig()
 	conf.MaxAttempts = 0
 	conf.MsgTimeout = 10 * time.Minute         // 榛樿涓�涓秷鎭渶澶氳兘澶勭悊鍗佸垎閽燂紝鍚﹀垯灏变細閲嶆柊涓㈠叆闃熷垪
@@ -33,7 +33,6 @@
 	}
 	return &NsqConsumer{
 		consumer: consumer,
-		ctx:      ctx,
 		topic:    topic,
 		channel:  channel,
 	}, nil

--
Gitblit v1.8.0