From 41a9bf370cff977ff8afe122a7610e07fa6c3b80 Mon Sep 17 00:00:00 2001 From: zhangqian <zhangqian@123.com> Date: 星期一, 09 十月 2023 17:02:16 +0800 Subject: [PATCH] 加入serf集群 --- go.sum | 21 +++++ pkg/nsqclient/consumer.go | 3 go.mod | 9 ++ main.go | 51 +++--------- nsq/consumer.go | 5 nsq/nsq.go | 102 ++++++++++++++++--------- 6 files changed, 112 insertions(+), 79 deletions(-) diff --git a/go.mod b/go.mod index b655972..f87ef64 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,9 @@ require ( basic.com/aps/nsqclient.git v0.0.0-20230517072415-37491f4a5d25 + basic.com/pubsub/protomsg.git v0.0.0-20230210092337-5f1e6cdae7c3 + basic.com/valib/bhomeclient.git v1.2.1 + basic.com/valib/bhomedbapi.git v0.0.0-20220825084023-fe74ddd6ae6e github.com/apache/plc4x/plc4go v0.0.0-20230817065839-dd203446b558 github.com/gin-contrib/cors v1.4.0 github.com/gin-gonic/gin v1.9.1 @@ -11,6 +14,7 @@ github.com/go-redis/redis/v8 v8.11.4 github.com/goburrow/modbus v0.1.0 github.com/gofrs/uuid v4.4.0+incompatible + github.com/gogo/protobuf v1.3.2 github.com/gorilla/websocket v1.5.0 github.com/jacobsa/go-serial v0.0.0-20180131005756-15cf729a72d4 github.com/jinzhu/gorm v1.9.16 @@ -33,8 +37,10 @@ ) require ( + basic.com/valib/c_bhomebus.git v0.0.0-20230203061815-9f24b2f398b7 // indirect github.com/IBM/netaddr v1.5.0 // indirect github.com/KyleBanks/depth v1.2.1 // indirect + github.com/ajg/form v1.5.1 // indirect github.com/bytedance/sonic v1.9.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect @@ -54,6 +60,7 @@ github.com/goburrow/serial v0.1.0 // indirect github.com/goccy/go-json v0.10.2 // indirect github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 // indirect + github.com/golang/protobuf v1.5.3 // indirect github.com/golang/snappy v0.0.1 // indirect github.com/google/uuid v1.3.0 // indirect github.com/gopacket/gopacket v1.1.1 // indirect @@ -94,6 +101,8 @@ golang.org/x/sys v0.11.0 // indirect golang.org/x/text v0.12.0 // indirect golang.org/x/tools v0.12.0 // indirect + google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect + google.golang.org/grpc v1.55.0 // indirect google.golang.org/protobuf v1.30.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index e5d41eb..3832c1e 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,13 @@ basic.com/aps/nsqclient.git v0.0.0-20230517072415-37491f4a5d25 h1:sZyNfIISgP1eoY94LG48Kav6HYVLem6EzaEbCeXlcXQ= basic.com/aps/nsqclient.git v0.0.0-20230517072415-37491f4a5d25/go.mod h1:1RnwEtePLR7ATQorQTxdgvs1o7uuUy1Vw8W7GYtVnoY= +basic.com/pubsub/protomsg.git v0.0.0-20230210092337-5f1e6cdae7c3 h1:h6dF39g4oqEMY0wDcFG3W4wYpeTNFjwWMp8TmFKnrAg= +basic.com/pubsub/protomsg.git v0.0.0-20230210092337-5f1e6cdae7c3/go.mod h1:un5NV5VWQoblVLZfx1Rt5vyLgwR0jI92d3VJhfrJhWU= +basic.com/valib/bhomeclient.git v1.2.1 h1:2q5hcf8V0lSTJSX+WcJh9rI/LDnD+We65+yXK04rjfY= +basic.com/valib/bhomeclient.git v1.2.1/go.mod h1:QEPxNQPQjTTVrsMI+AyVRzq/bI99lxmct1BgCmu/Pvs= +basic.com/valib/bhomedbapi.git v0.0.0-20220825084023-fe74ddd6ae6e h1:LYulJQfA5y0/y51KOTQfAGIA/l6r6cQ36Fvrwfce5Pk= +basic.com/valib/bhomedbapi.git v0.0.0-20220825084023-fe74ddd6ae6e/go.mod h1:no2OZ7ght2oZ6thE+e0w3UdJjgjt/TgKlWSwdPs9GAc= +basic.com/valib/c_bhomebus.git v0.0.0-20230203061815-9f24b2f398b7 h1:ck7olnaIkX0bJ3LfY3n1yaaRBjYlYJYrdieDqigfIKQ= +basic.com/valib/c_bhomebus.git v0.0.0-20230203061815-9f24b2f398b7/go.mod h1:QFq9kACofwTIwjtY3xyfrSnhr3NLWJt8WSvwnnEta1U= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU= @@ -48,6 +56,8 @@ github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= github.com/ajankovic/xdiff v0.0.1 h1:V1cj8t5xwYzm6ZGPqPOlAc9AIajXuTEn41D/1MJBWMM= +github.com/ajg/form v1.5.1 h1:t9c7v8JUKu/XxOGBU0yjNpaMloxGEJhUkqFRq0ibGeU= +github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY= github.com/andybalholm/cascadia v1.1.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y= github.com/antchfx/xpath v0.0.0-20170515025933-1f3266e77307 h1:C735MoY/X+UOx6SECmHk5pVOj51h839Ph13pEoY8UmU= github.com/apache/plc4x/plc4go v0.0.0-20230817065839-dd203446b558 h1:d3INvMf4ei9qlX10We5+z/+dQnmmCx0J0wflcZVihGo= @@ -148,6 +158,8 @@ github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gofrs/uuid v4.4.0+incompatible h1:3qXRTX8/NbyulANqlc0lchS1gqAVxRgsuW1YrTJupqA= github.com/gofrs/uuid v4.4.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe h1:lXe2qZdvpiX5WZkZR4hgp4KJVfY3nMkvmwbVkpv1rVY= github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0= github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 h1:DACJavvAHhabrF08vX0COfcOBJRhZ8lUbR+ZWIs0Y5g= @@ -179,6 +191,8 @@ github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= @@ -246,6 +260,7 @@ github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.4 h1:acbojRNwl3o09bUq+yDCtZFc1aiwaAAxtcn8YkZXnvk= @@ -620,6 +635,7 @@ golang.org/x/tools v0.0.0-20200512131952-2bc93b1c0c88/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20200515010526-7d3b6ebf133d/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20200618134242-20370b0cb4b2/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20200729194436-6467de6f59a7/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= @@ -629,6 +645,7 @@ golang.org/x/tools v0.0.0-20201208233053-a543418bbed2/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210105154028-b0ab187a4818/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210108195828-e2f9c7f1fc8e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= @@ -700,6 +717,8 @@ google.golang.org/genproto v0.0.0-20201214200347-8c77b98c765d/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20210108203827-ffc7fda8c3d7/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20210226172003-ab064af71705/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= +google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 h1:KpwkzHKEF7B9Zxg18WzOa7djJ+Ha5DzthMyZYQfEn2A= +google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1/go.mod h1:nKE/iIaLqn2bQwXBg8f1g2Ylh6r5MN5CmZvuzZCgsCU= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= @@ -716,6 +735,8 @@ google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc= google.golang.org/grpc v1.34.0/go.mod h1:WotjhfgOW/POjDeRt8vscBtXq+2VjORFy659qA51WJ8= google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= +google.golang.org/grpc v1.55.0 h1:3Oj82/tFSCeUrRTg/5E/7d/W5A1tj6Ky1ABAuZuv5ag= +google.golang.org/grpc v1.55.0/go.mod h1:iYEXKGkEBhg1PjZQvoYEVPTDkHo1/bjTnfwTeGONTY8= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= diff --git a/main.go b/main.go index 00d2599..569d9c6 100644 --- a/main.go +++ b/main.go @@ -12,9 +12,6 @@ "apsClient/service/plc_address" "fmt" "net/http" - "os" - "os/signal" - "syscall" "time" ) @@ -28,22 +25,13 @@ return } - if err := nsq.Init(); err != nil { - logx.Errorf("nsq Init err:%v", err) - return - } - if err := crontask.InitTask(); err != nil { logx.Errorf("crontab task Init err:%v", err) return } + //鍔犺浇plc鍐欏叆鍦板潃 plc_address.LoadAddressFromFile() - - ////鎻愬墠鍔犺浇浠诲姟 - //service.NewTaskService().GetTask() - - //go shutdown() // 鍚姩鏁版嵁鍚屾 var serfStartChan = make(chan bool) @@ -58,11 +46,15 @@ agent := serf.InitAgent("apsClient", syncTables, sqlitex.GetDB()) agent.RegisterClusterEvent(serfClusterEvent) go agent.Serve(serfStartChan) - <-serfStartChan // 鍒ゆ柇褰撳墠闆嗙兢鐘舵�� - //agent.ClusterStatus == "master" + if agent.ClusterStatus != "slave" { + if err := nsq.Init(); err != nil { + logx.Errorf("nsq Init err:%v", err) + return + } + } logx.Infof("apsClient start serve...") server := &http.Server{ @@ -74,29 +66,16 @@ logx.Error(server.ListenAndServe().Error()) } -func shutdown() { - quit := make(chan os.Signal, 1) - signal.Notify(quit, syscall.SIGKILL, syscall.SIGQUIT, syscall.SIGINT, syscall.SIGTERM) - <-quit - - logx.Infof("apsClient exited...") - os.Exit(0) -} - func serfClusterEvent(stat int) { switch stat { - - case serf.EventCreateCluster: - // 鍒涘缓闆嗙兢 - case serf.EventJoinCluster: - // 鍔犲叆闆嗙兢 - case serf.EventLeaveCluster: - // 閫�鍑洪泦缇� - case serf.EventSlave2Master: - // 鍒囨崲涓轰富鑺傜偣 - case serf.EventMaster2Slave: - // 鍒囨崲涓哄瓙鑺傜偣 + case serf.EventCreateCluster, serf.EventSlave2Master, serf.EventLeaveCluster: + if err := nsq.Init(); err != nil { + logx.Errorf("nsq Init err:%v", err) + return + } + case serf.EventJoinCluster, serf.EventMaster2Slave: + nsq.Stop() } fmt.Println("clusterEvent:", stat) -} \ No newline at end of file +} diff --git a/nsq/consumer.go b/nsq/consumer.go index d6e668a..879f641 100644 --- a/nsq/consumer.go +++ b/nsq/consumer.go @@ -5,12 +5,11 @@ "apsClient/constvar" "apsClient/pkg/logx" "apsClient/pkg/nsqclient" - "context" "fmt" ) -func Consume(topic, channel string) (err error) { - c, err := nsqclient.NewNsqConsumer(context.Background(), topic, channel) +func NewConsumer(topic, channel string) (c *nsqclient.NsqConsumer, err error) { + c, err = nsqclient.NewNsqConsumer(topic, channel) if err != nil { logx.Errorf("NewNsqConsumer err:%v", err) return diff --git a/nsq/nsq.go b/nsq/nsq.go index 0c293db..2035f15 100644 --- a/nsq/nsq.go +++ b/nsq/nsq.go @@ -6,20 +6,32 @@ "apsClient/model/common" "apsClient/pkg/logx" "apsClient/pkg/safe" + "basic.com/aps/nsqclient.git" + "context" "errors" "fmt" + "sync" "time" ) -func Init() error { +type consumerManager struct { + ctx context.Context + clients sync.Map +} + +var defaultConsumerManager *consumerManager + +func init() { + defaultConsumerManager = new(consumerManager) +} + +func (c *consumerManager) init() error { if len(conf.Conf.NsqConf.NodeId) <= 0 { return errors.New("no NodeId") } - if err := initProducer(); err != nil { return err } - safe.Go(func() { caller := NewCaller(fmt.Sprintf(constvar.NsqTopicGetPlcAddress, conf.Conf.NsqConf.NodeId), fmt.Sprintf(constvar.NsqTopicSendPlcAddress, conf.Conf.NsqConf.NodeId)) var addressResult common.ResponsePlcAddress @@ -29,40 +41,54 @@ } }) - safe.Go(func() { - err := Consume(fmt.Sprintf(constvar.NsqTopicScheduleTask, conf.Conf.NsqConf.NodeId), conf.Conf.System.DeviceId) - if err != nil { - logx.Errorf("start nsq consume err: %v", err) - } - }) - - safe.Go(func() { - err := Consume(fmt.Sprintf(constvar.NsqTopicSendPlcAddress, conf.Conf.NsqConf.NodeId), conf.Conf.System.DeviceId) - if err != nil { - logx.Errorf("start nsq consume err: %v", err) - } - }) - - safe.Go(func() { - err := Consume(fmt.Sprintf(constvar.NsqTopicProcessParamsResponse, conf.Conf.NsqConf.NodeId), conf.Conf.System.DeviceId) - if err != nil { - logx.Errorf("start nsq consume err: %v", err) - } - }) - - safe.Go(func() { - err := Consume(fmt.Sprintf(constvar.NsqTopicApsProcessParams, conf.Conf.NsqConf.NodeId), conf.Conf.System.DeviceId) - if err != nil { - logx.Errorf("start nsq consume err: %v", err) - } - }) - - safe.Go(func() { - err := Consume(fmt.Sprintf(constvar.NsqTopicDeviceUpdate, conf.Conf.NsqConf.NodeId), conf.Conf.System.DeviceId) - if err != nil { - logx.Errorf("start nsq consume err: %v", err) - } - }) - + var topics = []string{ + constvar.NsqTopicScheduleTask, + constvar.NsqTopicSendPlcAddress, + constvar.NsqTopicProcessParamsResponse, + constvar.NsqTopicApsProcessParams, + constvar.NsqTopicDeviceUpdate, + } + for _, t := range topics { + topic := fmt.Sprintf(t, conf.Conf.NsqConf.NodeId) + c.AddConsumer(topic) + } return nil } + +func (c *consumerManager) AddConsumer(topic string) { + client, err := NewConsumer(topic, conf.Conf.System.DeviceId) + if err != nil { + logx.Errorf("start nsq consume err: %v", err) + } + c.clients.Store(topic, client) + safe.Go(func() { + if len(conf.Conf.NsqConf.NsqlookupdAddr) > 0 { + if err = client.RunLookupd(conf.Conf.NsqConf.NsqlookupdAddr, 1); err != nil { + logx.Errorf("RunLookupd err:%v", err) + return + } + } else { + if err = client.Run(conf.Conf.NsqConf.NsqdAddr, 1); err != nil { + logx.Errorf("Run err:%v", err) + return + } + } + }) +} + +func (c *consumerManager) stop() { + c.clients.Range(func(key, value any) bool { + if consumer, ok := value.(*nsqclient.NsqConsumer); ok { + nsqclient.DestroyNsqConsumer(consumer) + } + return true + }) +} + +func Init() error { + return defaultConsumerManager.init() +} + +func Stop() { + defaultConsumerManager.stop() +} diff --git a/pkg/nsqclient/consumer.go b/pkg/nsqclient/consumer.go index a0df0b0..acbf91a 100644 --- a/pkg/nsqclient/consumer.go +++ b/pkg/nsqclient/consumer.go @@ -18,7 +18,7 @@ channel string } -func NewNsqConsumer(ctx context.Context, topic, channel string, options ...func(*nsq.Config)) (*NsqConsumer, error) { +func NewNsqConsumer(topic, channel string, options ...func(*nsq.Config)) (*NsqConsumer, error) { conf := nsq.NewConfig() conf.MaxAttempts = 0 conf.MsgTimeout = 10 * time.Minute // 榛樿涓�涓秷鎭渶澶氳兘澶勭悊鍗佸垎閽燂紝鍚﹀垯灏变細閲嶆柊涓㈠叆闃熷垪 @@ -33,7 +33,6 @@ } return &NsqConsumer{ consumer: consumer, - ctx: ctx, topic: topic, channel: channel, }, nil -- Gitblit v1.8.0