zhangqian
2023-10-09 41a9bf370cff977ff8afe122a7610e07fa6c3b80
加入serf集群
6个文件已修改
191 ■■■■■ 已修改文件
go.mod 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
go.sum 21 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.go 51 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsq/consumer.go 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsq/nsq.go 102 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pkg/nsqclient/consumer.go 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
go.mod
@@ -4,6 +4,9 @@
require (
    basic.com/aps/nsqclient.git v0.0.0-20230517072415-37491f4a5d25
    basic.com/pubsub/protomsg.git v0.0.0-20230210092337-5f1e6cdae7c3
    basic.com/valib/bhomeclient.git v1.2.1
    basic.com/valib/bhomedbapi.git v0.0.0-20220825084023-fe74ddd6ae6e
    github.com/apache/plc4x/plc4go v0.0.0-20230817065839-dd203446b558
    github.com/gin-contrib/cors v1.4.0
    github.com/gin-gonic/gin v1.9.1
@@ -11,6 +14,7 @@
    github.com/go-redis/redis/v8 v8.11.4
    github.com/goburrow/modbus v0.1.0
    github.com/gofrs/uuid v4.4.0+incompatible
    github.com/gogo/protobuf v1.3.2
    github.com/gorilla/websocket v1.5.0
    github.com/jacobsa/go-serial v0.0.0-20180131005756-15cf729a72d4
    github.com/jinzhu/gorm v1.9.16
@@ -33,8 +37,10 @@
)
require (
    basic.com/valib/c_bhomebus.git v0.0.0-20230203061815-9f24b2f398b7 // indirect
    github.com/IBM/netaddr v1.5.0 // indirect
    github.com/KyleBanks/depth v1.2.1 // indirect
    github.com/ajg/form v1.5.1 // indirect
    github.com/bytedance/sonic v1.9.1 // indirect
    github.com/cespare/xxhash/v2 v2.2.0 // indirect
    github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect
@@ -54,6 +60,7 @@
    github.com/goburrow/serial v0.1.0 // indirect
    github.com/goccy/go-json v0.10.2 // indirect
    github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 // indirect
    github.com/golang/protobuf v1.5.3 // indirect
    github.com/golang/snappy v0.0.1 // indirect
    github.com/google/uuid v1.3.0 // indirect
    github.com/gopacket/gopacket v1.1.1 // indirect
@@ -94,6 +101,8 @@
    golang.org/x/sys v0.11.0 // indirect
    golang.org/x/text v0.12.0 // indirect
    golang.org/x/tools v0.12.0 // indirect
    google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
    google.golang.org/grpc v1.55.0 // indirect
    google.golang.org/protobuf v1.30.0 // indirect
    gopkg.in/ini.v1 v1.67.0 // indirect
    gopkg.in/yaml.v3 v3.0.1 // indirect
go.sum
@@ -1,5 +1,13 @@
basic.com/aps/nsqclient.git v0.0.0-20230517072415-37491f4a5d25 h1:sZyNfIISgP1eoY94LG48Kav6HYVLem6EzaEbCeXlcXQ=
basic.com/aps/nsqclient.git v0.0.0-20230517072415-37491f4a5d25/go.mod h1:1RnwEtePLR7ATQorQTxdgvs1o7uuUy1Vw8W7GYtVnoY=
basic.com/pubsub/protomsg.git v0.0.0-20230210092337-5f1e6cdae7c3 h1:h6dF39g4oqEMY0wDcFG3W4wYpeTNFjwWMp8TmFKnrAg=
basic.com/pubsub/protomsg.git v0.0.0-20230210092337-5f1e6cdae7c3/go.mod h1:un5NV5VWQoblVLZfx1Rt5vyLgwR0jI92d3VJhfrJhWU=
basic.com/valib/bhomeclient.git v1.2.1 h1:2q5hcf8V0lSTJSX+WcJh9rI/LDnD+We65+yXK04rjfY=
basic.com/valib/bhomeclient.git v1.2.1/go.mod h1:QEPxNQPQjTTVrsMI+AyVRzq/bI99lxmct1BgCmu/Pvs=
basic.com/valib/bhomedbapi.git v0.0.0-20220825084023-fe74ddd6ae6e h1:LYulJQfA5y0/y51KOTQfAGIA/l6r6cQ36Fvrwfce5Pk=
basic.com/valib/bhomedbapi.git v0.0.0-20220825084023-fe74ddd6ae6e/go.mod h1:no2OZ7ght2oZ6thE+e0w3UdJjgjt/TgKlWSwdPs9GAc=
basic.com/valib/c_bhomebus.git v0.0.0-20230203061815-9f24b2f398b7 h1:ck7olnaIkX0bJ3LfY3n1yaaRBjYlYJYrdieDqigfIKQ=
basic.com/valib/c_bhomebus.git v0.0.0-20230203061815-9f24b2f398b7/go.mod h1:QFq9kACofwTIwjtY3xyfrSnhr3NLWJt8WSvwnnEta1U=
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU=
@@ -48,6 +56,8 @@
github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0=
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE=
github.com/ajankovic/xdiff v0.0.1 h1:V1cj8t5xwYzm6ZGPqPOlAc9AIajXuTEn41D/1MJBWMM=
github.com/ajg/form v1.5.1 h1:t9c7v8JUKu/XxOGBU0yjNpaMloxGEJhUkqFRq0ibGeU=
github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY=
github.com/andybalholm/cascadia v1.1.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y=
github.com/antchfx/xpath v0.0.0-20170515025933-1f3266e77307 h1:C735MoY/X+UOx6SECmHk5pVOj51h839Ph13pEoY8UmU=
github.com/apache/plc4x/plc4go v0.0.0-20230817065839-dd203446b558 h1:d3INvMf4ei9qlX10We5+z/+dQnmmCx0J0wflcZVihGo=
@@ -148,6 +158,8 @@
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gofrs/uuid v4.4.0+incompatible h1:3qXRTX8/NbyulANqlc0lchS1gqAVxRgsuW1YrTJupqA=
github.com/gofrs/uuid v4.4.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe h1:lXe2qZdvpiX5WZkZR4hgp4KJVfY3nMkvmwbVkpv1rVY=
github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0=
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 h1:DACJavvAHhabrF08vX0COfcOBJRhZ8lUbR+ZWIs0Y5g=
@@ -179,6 +191,8 @@
github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
@@ -246,6 +260,7 @@
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.4 h1:acbojRNwl3o09bUq+yDCtZFc1aiwaAAxtcn8YkZXnvk=
@@ -620,6 +635,7 @@
golang.org/x/tools v0.0.0-20200512131952-2bc93b1c0c88/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20200515010526-7d3b6ebf133d/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20200618134242-20370b0cb4b2/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20200729194436-6467de6f59a7/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
@@ -629,6 +645,7 @@
golang.org/x/tools v0.0.0-20201208233053-a543418bbed2/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20210105154028-b0ab187a4818/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20210108195828-e2f9c7f1fc8e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
@@ -700,6 +717,8 @@
google.golang.org/genproto v0.0.0-20201214200347-8c77b98c765d/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20210108203827-ffc7fda8c3d7/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20210226172003-ab064af71705/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 h1:KpwkzHKEF7B9Zxg18WzOa7djJ+Ha5DzthMyZYQfEn2A=
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1/go.mod h1:nKE/iIaLqn2bQwXBg8f1g2Ylh6r5MN5CmZvuzZCgsCU=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
@@ -716,6 +735,8 @@
google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc=
google.golang.org/grpc v1.34.0/go.mod h1:WotjhfgOW/POjDeRt8vscBtXq+2VjORFy659qA51WJ8=
google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/grpc v1.55.0 h1:3Oj82/tFSCeUrRTg/5E/7d/W5A1tj6Ky1ABAuZuv5ag=
google.golang.org/grpc v1.55.0/go.mod h1:iYEXKGkEBhg1PjZQvoYEVPTDkHo1/bjTnfwTeGONTY8=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
main.go
@@ -12,9 +12,6 @@
    "apsClient/service/plc_address"
    "fmt"
    "net/http"
    "os"
    "os/signal"
    "syscall"
    "time"
)
@@ -28,22 +25,13 @@
        return
    }
    if err := nsq.Init(); err != nil {
        logx.Errorf("nsq Init err:%v", err)
        return
    }
    if err := crontask.InitTask(); err != nil {
        logx.Errorf("crontab task Init err:%v", err)
        return
    }
    //加载plc写入地址
    plc_address.LoadAddressFromFile()
    ////提前加载任务
    //service.NewTaskService().GetTask()
    //go shutdown()
    // 启动数据同步
    var serfStartChan = make(chan bool)
@@ -58,11 +46,15 @@
    agent := serf.InitAgent("apsClient", syncTables, sqlitex.GetDB())
    agent.RegisterClusterEvent(serfClusterEvent)
    go agent.Serve(serfStartChan)
    <-serfStartChan
    // 判断当前集群状态
    //agent.ClusterStatus == "master"
    if agent.ClusterStatus != "slave" {
        if err := nsq.Init(); err != nil {
            logx.Errorf("nsq Init err:%v", err)
            return
        }
    }
    logx.Infof("apsClient start serve...")
    server := &http.Server{
@@ -74,29 +66,16 @@
    logx.Error(server.ListenAndServe().Error())
}
func shutdown() {
    quit := make(chan os.Signal, 1)
    signal.Notify(quit, syscall.SIGKILL, syscall.SIGQUIT, syscall.SIGINT, syscall.SIGTERM)
    <-quit
    logx.Infof("apsClient exited...")
    os.Exit(0)
}
func serfClusterEvent(stat int) {
    switch stat {
    case serf.EventCreateCluster:
        // 创建集群
    case serf.EventJoinCluster:
        // 加入集群
    case serf.EventLeaveCluster:
        // 退出集群
    case serf.EventSlave2Master:
        // 切换为主节点
    case serf.EventMaster2Slave:
        // 切换为子节点
    case serf.EventCreateCluster, serf.EventSlave2Master, serf.EventLeaveCluster:
        if err := nsq.Init(); err != nil {
            logx.Errorf("nsq Init err:%v", err)
            return
        }
    case serf.EventJoinCluster, serf.EventMaster2Slave:
        nsq.Stop()
    }
    fmt.Println("clusterEvent:", stat)
}
}
nsq/consumer.go
@@ -5,12 +5,11 @@
    "apsClient/constvar"
    "apsClient/pkg/logx"
    "apsClient/pkg/nsqclient"
    "context"
    "fmt"
)
func Consume(topic, channel string) (err error) {
    c, err := nsqclient.NewNsqConsumer(context.Background(), topic, channel)
func NewConsumer(topic, channel string) (c *nsqclient.NsqConsumer, err error) {
    c, err = nsqclient.NewNsqConsumer(topic, channel)
    if err != nil {
        logx.Errorf("NewNsqConsumer err:%v", err)
        return
nsq/nsq.go
@@ -6,20 +6,32 @@
    "apsClient/model/common"
    "apsClient/pkg/logx"
    "apsClient/pkg/safe"
    "basic.com/aps/nsqclient.git"
    "context"
    "errors"
    "fmt"
    "sync"
    "time"
)
func Init() error {
type consumerManager struct {
    ctx     context.Context
    clients sync.Map
}
var defaultConsumerManager *consumerManager
func init() {
    defaultConsumerManager = new(consumerManager)
}
func (c *consumerManager) init() error {
    if len(conf.Conf.NsqConf.NodeId) <= 0 {
        return errors.New("no NodeId")
    }
    if err := initProducer(); err != nil {
        return err
    }
    safe.Go(func() {
        caller := NewCaller(fmt.Sprintf(constvar.NsqTopicGetPlcAddress, conf.Conf.NsqConf.NodeId), fmt.Sprintf(constvar.NsqTopicSendPlcAddress, conf.Conf.NsqConf.NodeId))
        var addressResult common.ResponsePlcAddress
@@ -29,40 +41,54 @@
        }
    })
    safe.Go(func() {
        err := Consume(fmt.Sprintf(constvar.NsqTopicScheduleTask, conf.Conf.NsqConf.NodeId), conf.Conf.System.DeviceId)
        if err != nil {
            logx.Errorf("start nsq consume err: %v", err)
        }
    })
    safe.Go(func() {
        err := Consume(fmt.Sprintf(constvar.NsqTopicSendPlcAddress, conf.Conf.NsqConf.NodeId), conf.Conf.System.DeviceId)
        if err != nil {
            logx.Errorf("start nsq consume err: %v", err)
        }
    })
    safe.Go(func() {
        err := Consume(fmt.Sprintf(constvar.NsqTopicProcessParamsResponse, conf.Conf.NsqConf.NodeId), conf.Conf.System.DeviceId)
        if err != nil {
            logx.Errorf("start nsq consume err: %v", err)
        }
    })
    safe.Go(func() {
        err := Consume(fmt.Sprintf(constvar.NsqTopicApsProcessParams, conf.Conf.NsqConf.NodeId), conf.Conf.System.DeviceId)
        if err != nil {
            logx.Errorf("start nsq consume err: %v", err)
        }
    })
    safe.Go(func() {
        err := Consume(fmt.Sprintf(constvar.NsqTopicDeviceUpdate, conf.Conf.NsqConf.NodeId), conf.Conf.System.DeviceId)
        if err != nil {
            logx.Errorf("start nsq consume err: %v", err)
        }
    })
    var topics = []string{
        constvar.NsqTopicScheduleTask,
        constvar.NsqTopicSendPlcAddress,
        constvar.NsqTopicProcessParamsResponse,
        constvar.NsqTopicApsProcessParams,
        constvar.NsqTopicDeviceUpdate,
    }
    for _, t := range topics {
        topic := fmt.Sprintf(t, conf.Conf.NsqConf.NodeId)
        c.AddConsumer(topic)
    }
    return nil
}
func (c *consumerManager) AddConsumer(topic string) {
    client, err := NewConsumer(topic, conf.Conf.System.DeviceId)
    if err != nil {
        logx.Errorf("start nsq consume err: %v", err)
    }
    c.clients.Store(topic, client)
    safe.Go(func() {
        if len(conf.Conf.NsqConf.NsqlookupdAddr) > 0 {
            if err = client.RunLookupd(conf.Conf.NsqConf.NsqlookupdAddr, 1); err != nil {
                logx.Errorf("RunLookupd err:%v", err)
                return
            }
        } else {
            if err = client.Run(conf.Conf.NsqConf.NsqdAddr, 1); err != nil {
                logx.Errorf("Run err:%v", err)
                return
            }
        }
    })
}
func (c *consumerManager) stop() {
    c.clients.Range(func(key, value any) bool {
        if consumer, ok := value.(*nsqclient.NsqConsumer); ok {
            nsqclient.DestroyNsqConsumer(consumer)
        }
        return true
    })
}
func Init() error {
    return defaultConsumerManager.init()
}
func Stop() {
    defaultConsumerManager.stop()
}
pkg/nsqclient/consumer.go
@@ -18,7 +18,7 @@
    channel   string
}
func NewNsqConsumer(ctx context.Context, topic, channel string, options ...func(*nsq.Config)) (*NsqConsumer, error) {
func NewNsqConsumer(topic, channel string, options ...func(*nsq.Config)) (*NsqConsumer, error) {
    conf := nsq.NewConfig()
    conf.MaxAttempts = 0
    conf.MsgTimeout = 10 * time.Minute         // 默认一个消息最多能处理十分钟,否则就会重新丢入队列
@@ -33,7 +33,6 @@
    }
    return &NsqConsumer{
        consumer: consumer,
        ctx:      ctx,
        topic:    topic,
        channel:  channel,
    }, nil