From 41a9bf370cff977ff8afe122a7610e07fa6c3b80 Mon Sep 17 00:00:00 2001
From: zhangqian <zhangqian@123.com>
Date: 星期一, 09 十月 2023 17:02:16 +0800
Subject: [PATCH] 加入serf集群
---
go.sum | 21 +++++
pkg/nsqclient/consumer.go | 3
go.mod | 9 ++
main.go | 51 +++---------
nsq/consumer.go | 5
nsq/nsq.go | 102 ++++++++++++++++---------
6 files changed, 112 insertions(+), 79 deletions(-)
diff --git a/go.mod b/go.mod
index b655972..f87ef64 100644
--- a/go.mod
+++ b/go.mod
@@ -4,6 +4,9 @@
require (
basic.com/aps/nsqclient.git v0.0.0-20230517072415-37491f4a5d25
+ basic.com/pubsub/protomsg.git v0.0.0-20230210092337-5f1e6cdae7c3
+ basic.com/valib/bhomeclient.git v1.2.1
+ basic.com/valib/bhomedbapi.git v0.0.0-20220825084023-fe74ddd6ae6e
github.com/apache/plc4x/plc4go v0.0.0-20230817065839-dd203446b558
github.com/gin-contrib/cors v1.4.0
github.com/gin-gonic/gin v1.9.1
@@ -11,6 +14,7 @@
github.com/go-redis/redis/v8 v8.11.4
github.com/goburrow/modbus v0.1.0
github.com/gofrs/uuid v4.4.0+incompatible
+ github.com/gogo/protobuf v1.3.2
github.com/gorilla/websocket v1.5.0
github.com/jacobsa/go-serial v0.0.0-20180131005756-15cf729a72d4
github.com/jinzhu/gorm v1.9.16
@@ -33,8 +37,10 @@
)
require (
+ basic.com/valib/c_bhomebus.git v0.0.0-20230203061815-9f24b2f398b7 // indirect
github.com/IBM/netaddr v1.5.0 // indirect
github.com/KyleBanks/depth v1.2.1 // indirect
+ github.com/ajg/form v1.5.1 // indirect
github.com/bytedance/sonic v1.9.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect
@@ -54,6 +60,7 @@
github.com/goburrow/serial v0.1.0 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 // indirect
+ github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.1 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/gopacket/gopacket v1.1.1 // indirect
@@ -94,6 +101,8 @@
golang.org/x/sys v0.11.0 // indirect
golang.org/x/text v0.12.0 // indirect
golang.org/x/tools v0.12.0 // indirect
+ google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
+ google.golang.org/grpc v1.55.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
diff --git a/go.sum b/go.sum
index e5d41eb..3832c1e 100644
--- a/go.sum
+++ b/go.sum
@@ -1,5 +1,13 @@
basic.com/aps/nsqclient.git v0.0.0-20230517072415-37491f4a5d25 h1:sZyNfIISgP1eoY94LG48Kav6HYVLem6EzaEbCeXlcXQ=
basic.com/aps/nsqclient.git v0.0.0-20230517072415-37491f4a5d25/go.mod h1:1RnwEtePLR7ATQorQTxdgvs1o7uuUy1Vw8W7GYtVnoY=
+basic.com/pubsub/protomsg.git v0.0.0-20230210092337-5f1e6cdae7c3 h1:h6dF39g4oqEMY0wDcFG3W4wYpeTNFjwWMp8TmFKnrAg=
+basic.com/pubsub/protomsg.git v0.0.0-20230210092337-5f1e6cdae7c3/go.mod h1:un5NV5VWQoblVLZfx1Rt5vyLgwR0jI92d3VJhfrJhWU=
+basic.com/valib/bhomeclient.git v1.2.1 h1:2q5hcf8V0lSTJSX+WcJh9rI/LDnD+We65+yXK04rjfY=
+basic.com/valib/bhomeclient.git v1.2.1/go.mod h1:QEPxNQPQjTTVrsMI+AyVRzq/bI99lxmct1BgCmu/Pvs=
+basic.com/valib/bhomedbapi.git v0.0.0-20220825084023-fe74ddd6ae6e h1:LYulJQfA5y0/y51KOTQfAGIA/l6r6cQ36Fvrwfce5Pk=
+basic.com/valib/bhomedbapi.git v0.0.0-20220825084023-fe74ddd6ae6e/go.mod h1:no2OZ7ght2oZ6thE+e0w3UdJjgjt/TgKlWSwdPs9GAc=
+basic.com/valib/c_bhomebus.git v0.0.0-20230203061815-9f24b2f398b7 h1:ck7olnaIkX0bJ3LfY3n1yaaRBjYlYJYrdieDqigfIKQ=
+basic.com/valib/c_bhomebus.git v0.0.0-20230203061815-9f24b2f398b7/go.mod h1:QFq9kACofwTIwjtY3xyfrSnhr3NLWJt8WSvwnnEta1U=
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU=
@@ -48,6 +56,8 @@
github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0=
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE=
github.com/ajankovic/xdiff v0.0.1 h1:V1cj8t5xwYzm6ZGPqPOlAc9AIajXuTEn41D/1MJBWMM=
+github.com/ajg/form v1.5.1 h1:t9c7v8JUKu/XxOGBU0yjNpaMloxGEJhUkqFRq0ibGeU=
+github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY=
github.com/andybalholm/cascadia v1.1.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y=
github.com/antchfx/xpath v0.0.0-20170515025933-1f3266e77307 h1:C735MoY/X+UOx6SECmHk5pVOj51h839Ph13pEoY8UmU=
github.com/apache/plc4x/plc4go v0.0.0-20230817065839-dd203446b558 h1:d3INvMf4ei9qlX10We5+z/+dQnmmCx0J0wflcZVihGo=
@@ -148,6 +158,8 @@
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gofrs/uuid v4.4.0+incompatible h1:3qXRTX8/NbyulANqlc0lchS1gqAVxRgsuW1YrTJupqA=
github.com/gofrs/uuid v4.4.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
+github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
+github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe h1:lXe2qZdvpiX5WZkZR4hgp4KJVfY3nMkvmwbVkpv1rVY=
github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0=
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 h1:DACJavvAHhabrF08vX0COfcOBJRhZ8lUbR+ZWIs0Y5g=
@@ -179,6 +191,8 @@
github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
+github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
+github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
@@ -246,6 +260,7 @@
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
+github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.4 h1:acbojRNwl3o09bUq+yDCtZFc1aiwaAAxtcn8YkZXnvk=
@@ -620,6 +635,7 @@
golang.org/x/tools v0.0.0-20200512131952-2bc93b1c0c88/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20200515010526-7d3b6ebf133d/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20200618134242-20370b0cb4b2/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
+golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20200729194436-6467de6f59a7/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
@@ -629,6 +645,7 @@
golang.org/x/tools v0.0.0-20201208233053-a543418bbed2/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20210105154028-b0ab187a4818/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
+golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20210108195828-e2f9c7f1fc8e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
@@ -700,6 +717,8 @@
google.golang.org/genproto v0.0.0-20201214200347-8c77b98c765d/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20210108203827-ffc7fda8c3d7/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20210226172003-ab064af71705/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
+google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 h1:KpwkzHKEF7B9Zxg18WzOa7djJ+Ha5DzthMyZYQfEn2A=
+google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1/go.mod h1:nKE/iIaLqn2bQwXBg8f1g2Ylh6r5MN5CmZvuzZCgsCU=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
@@ -716,6 +735,8 @@
google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc=
google.golang.org/grpc v1.34.0/go.mod h1:WotjhfgOW/POjDeRt8vscBtXq+2VjORFy659qA51WJ8=
google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
+google.golang.org/grpc v1.55.0 h1:3Oj82/tFSCeUrRTg/5E/7d/W5A1tj6Ky1ABAuZuv5ag=
+google.golang.org/grpc v1.55.0/go.mod h1:iYEXKGkEBhg1PjZQvoYEVPTDkHo1/bjTnfwTeGONTY8=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
diff --git a/main.go b/main.go
index 00d2599..569d9c6 100644
--- a/main.go
+++ b/main.go
@@ -12,9 +12,6 @@
"apsClient/service/plc_address"
"fmt"
"net/http"
- "os"
- "os/signal"
- "syscall"
"time"
)
@@ -28,22 +25,13 @@
return
}
- if err := nsq.Init(); err != nil {
- logx.Errorf("nsq Init err:%v", err)
- return
- }
-
if err := crontask.InitTask(); err != nil {
logx.Errorf("crontab task Init err:%v", err)
return
}
+
//鍔犺浇plc鍐欏叆鍦板潃
plc_address.LoadAddressFromFile()
-
- ////鎻愬墠鍔犺浇浠诲姟
- //service.NewTaskService().GetTask()
-
- //go shutdown()
// 鍚姩鏁版嵁鍚屾
var serfStartChan = make(chan bool)
@@ -58,11 +46,15 @@
agent := serf.InitAgent("apsClient", syncTables, sqlitex.GetDB())
agent.RegisterClusterEvent(serfClusterEvent)
go agent.Serve(serfStartChan)
-
<-serfStartChan
// 鍒ゆ柇褰撳墠闆嗙兢鐘舵��
- //agent.ClusterStatus == "master"
+ if agent.ClusterStatus != "slave" {
+ if err := nsq.Init(); err != nil {
+ logx.Errorf("nsq Init err:%v", err)
+ return
+ }
+ }
logx.Infof("apsClient start serve...")
server := &http.Server{
@@ -74,29 +66,16 @@
logx.Error(server.ListenAndServe().Error())
}
-func shutdown() {
- quit := make(chan os.Signal, 1)
- signal.Notify(quit, syscall.SIGKILL, syscall.SIGQUIT, syscall.SIGINT, syscall.SIGTERM)
- <-quit
-
- logx.Infof("apsClient exited...")
- os.Exit(0)
-}
-
func serfClusterEvent(stat int) {
switch stat {
-
- case serf.EventCreateCluster:
- // 鍒涘缓闆嗙兢
- case serf.EventJoinCluster:
- // 鍔犲叆闆嗙兢
- case serf.EventLeaveCluster:
- // 閫�鍑洪泦缇�
- case serf.EventSlave2Master:
- // 鍒囨崲涓轰富鑺傜偣
- case serf.EventMaster2Slave:
- // 鍒囨崲涓哄瓙鑺傜偣
+ case serf.EventCreateCluster, serf.EventSlave2Master, serf.EventLeaveCluster:
+ if err := nsq.Init(); err != nil {
+ logx.Errorf("nsq Init err:%v", err)
+ return
+ }
+ case serf.EventJoinCluster, serf.EventMaster2Slave:
+ nsq.Stop()
}
fmt.Println("clusterEvent:", stat)
-}
\ No newline at end of file
+}
diff --git a/nsq/consumer.go b/nsq/consumer.go
index d6e668a..879f641 100644
--- a/nsq/consumer.go
+++ b/nsq/consumer.go
@@ -5,12 +5,11 @@
"apsClient/constvar"
"apsClient/pkg/logx"
"apsClient/pkg/nsqclient"
- "context"
"fmt"
)
-func Consume(topic, channel string) (err error) {
- c, err := nsqclient.NewNsqConsumer(context.Background(), topic, channel)
+func NewConsumer(topic, channel string) (c *nsqclient.NsqConsumer, err error) {
+ c, err = nsqclient.NewNsqConsumer(topic, channel)
if err != nil {
logx.Errorf("NewNsqConsumer err:%v", err)
return
diff --git a/nsq/nsq.go b/nsq/nsq.go
index 0c293db..2035f15 100644
--- a/nsq/nsq.go
+++ b/nsq/nsq.go
@@ -6,20 +6,32 @@
"apsClient/model/common"
"apsClient/pkg/logx"
"apsClient/pkg/safe"
+ "basic.com/aps/nsqclient.git"
+ "context"
"errors"
"fmt"
+ "sync"
"time"
)
-func Init() error {
+type consumerManager struct {
+ ctx context.Context
+ clients sync.Map
+}
+
+var defaultConsumerManager *consumerManager
+
+func init() {
+ defaultConsumerManager = new(consumerManager)
+}
+
+func (c *consumerManager) init() error {
if len(conf.Conf.NsqConf.NodeId) <= 0 {
return errors.New("no NodeId")
}
-
if err := initProducer(); err != nil {
return err
}
-
safe.Go(func() {
caller := NewCaller(fmt.Sprintf(constvar.NsqTopicGetPlcAddress, conf.Conf.NsqConf.NodeId), fmt.Sprintf(constvar.NsqTopicSendPlcAddress, conf.Conf.NsqConf.NodeId))
var addressResult common.ResponsePlcAddress
@@ -29,40 +41,54 @@
}
})
- safe.Go(func() {
- err := Consume(fmt.Sprintf(constvar.NsqTopicScheduleTask, conf.Conf.NsqConf.NodeId), conf.Conf.System.DeviceId)
- if err != nil {
- logx.Errorf("start nsq consume err: %v", err)
- }
- })
-
- safe.Go(func() {
- err := Consume(fmt.Sprintf(constvar.NsqTopicSendPlcAddress, conf.Conf.NsqConf.NodeId), conf.Conf.System.DeviceId)
- if err != nil {
- logx.Errorf("start nsq consume err: %v", err)
- }
- })
-
- safe.Go(func() {
- err := Consume(fmt.Sprintf(constvar.NsqTopicProcessParamsResponse, conf.Conf.NsqConf.NodeId), conf.Conf.System.DeviceId)
- if err != nil {
- logx.Errorf("start nsq consume err: %v", err)
- }
- })
-
- safe.Go(func() {
- err := Consume(fmt.Sprintf(constvar.NsqTopicApsProcessParams, conf.Conf.NsqConf.NodeId), conf.Conf.System.DeviceId)
- if err != nil {
- logx.Errorf("start nsq consume err: %v", err)
- }
- })
-
- safe.Go(func() {
- err := Consume(fmt.Sprintf(constvar.NsqTopicDeviceUpdate, conf.Conf.NsqConf.NodeId), conf.Conf.System.DeviceId)
- if err != nil {
- logx.Errorf("start nsq consume err: %v", err)
- }
- })
-
+ var topics = []string{
+ constvar.NsqTopicScheduleTask,
+ constvar.NsqTopicSendPlcAddress,
+ constvar.NsqTopicProcessParamsResponse,
+ constvar.NsqTopicApsProcessParams,
+ constvar.NsqTopicDeviceUpdate,
+ }
+ for _, t := range topics {
+ topic := fmt.Sprintf(t, conf.Conf.NsqConf.NodeId)
+ c.AddConsumer(topic)
+ }
return nil
}
+
+func (c *consumerManager) AddConsumer(topic string) {
+ client, err := NewConsumer(topic, conf.Conf.System.DeviceId)
+ if err != nil {
+ logx.Errorf("start nsq consume err: %v", err)
+ }
+ c.clients.Store(topic, client)
+ safe.Go(func() {
+ if len(conf.Conf.NsqConf.NsqlookupdAddr) > 0 {
+ if err = client.RunLookupd(conf.Conf.NsqConf.NsqlookupdAddr, 1); err != nil {
+ logx.Errorf("RunLookupd err:%v", err)
+ return
+ }
+ } else {
+ if err = client.Run(conf.Conf.NsqConf.NsqdAddr, 1); err != nil {
+ logx.Errorf("Run err:%v", err)
+ return
+ }
+ }
+ })
+}
+
+func (c *consumerManager) stop() {
+ c.clients.Range(func(key, value any) bool {
+ if consumer, ok := value.(*nsqclient.NsqConsumer); ok {
+ nsqclient.DestroyNsqConsumer(consumer)
+ }
+ return true
+ })
+}
+
+func Init() error {
+ return defaultConsumerManager.init()
+}
+
+func Stop() {
+ defaultConsumerManager.stop()
+}
diff --git a/pkg/nsqclient/consumer.go b/pkg/nsqclient/consumer.go
index a0df0b0..acbf91a 100644
--- a/pkg/nsqclient/consumer.go
+++ b/pkg/nsqclient/consumer.go
@@ -18,7 +18,7 @@
channel string
}
-func NewNsqConsumer(ctx context.Context, topic, channel string, options ...func(*nsq.Config)) (*NsqConsumer, error) {
+func NewNsqConsumer(topic, channel string, options ...func(*nsq.Config)) (*NsqConsumer, error) {
conf := nsq.NewConfig()
conf.MaxAttempts = 0
conf.MsgTimeout = 10 * time.Minute // 榛樿涓�涓秷鎭渶澶氳兘澶勭悊鍗佸垎閽燂紝鍚﹀垯灏变細閲嶆柊涓㈠叆闃熷垪
@@ -33,7 +33,6 @@
}
return &NsqConsumer{
consumer: consumer,
- ctx: ctx,
topic: topic,
channel: channel,
}, nil
--
Gitblit v1.8.0