go.mod | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
go.sum | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
main.go | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
nsq/consumer.go | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
nsq/nsq.go | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
pkg/nsqclient/consumer.go | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
go.mod
@@ -4,6 +4,9 @@ require ( basic.com/aps/nsqclient.git v0.0.0-20230517072415-37491f4a5d25 basic.com/pubsub/protomsg.git v0.0.0-20230210092337-5f1e6cdae7c3 basic.com/valib/bhomeclient.git v1.2.1 basic.com/valib/bhomedbapi.git v0.0.0-20220825084023-fe74ddd6ae6e github.com/apache/plc4x/plc4go v0.0.0-20230817065839-dd203446b558 github.com/gin-contrib/cors v1.4.0 github.com/gin-gonic/gin v1.9.1 @@ -11,6 +14,7 @@ github.com/go-redis/redis/v8 v8.11.4 github.com/goburrow/modbus v0.1.0 github.com/gofrs/uuid v4.4.0+incompatible github.com/gogo/protobuf v1.3.2 github.com/gorilla/websocket v1.5.0 github.com/jacobsa/go-serial v0.0.0-20180131005756-15cf729a72d4 github.com/jinzhu/gorm v1.9.16 @@ -33,8 +37,10 @@ ) require ( basic.com/valib/c_bhomebus.git v0.0.0-20230203061815-9f24b2f398b7 // indirect github.com/IBM/netaddr v1.5.0 // indirect github.com/KyleBanks/depth v1.2.1 // indirect github.com/ajg/form v1.5.1 // indirect github.com/bytedance/sonic v1.9.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect @@ -54,6 +60,7 @@ github.com/goburrow/serial v0.1.0 // indirect github.com/goccy/go-json v0.10.2 // indirect github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/golang/snappy v0.0.1 // indirect github.com/google/uuid v1.3.0 // indirect github.com/gopacket/gopacket v1.1.1 // indirect @@ -94,6 +101,8 @@ golang.org/x/sys v0.11.0 // indirect golang.org/x/text v0.12.0 // indirect golang.org/x/tools v0.12.0 // indirect google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect google.golang.org/grpc v1.55.0 // indirect google.golang.org/protobuf v1.30.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect go.sum
@@ -1,5 +1,13 @@ basic.com/aps/nsqclient.git v0.0.0-20230517072415-37491f4a5d25 h1:sZyNfIISgP1eoY94LG48Kav6HYVLem6EzaEbCeXlcXQ= basic.com/aps/nsqclient.git v0.0.0-20230517072415-37491f4a5d25/go.mod h1:1RnwEtePLR7ATQorQTxdgvs1o7uuUy1Vw8W7GYtVnoY= basic.com/pubsub/protomsg.git v0.0.0-20230210092337-5f1e6cdae7c3 h1:h6dF39g4oqEMY0wDcFG3W4wYpeTNFjwWMp8TmFKnrAg= basic.com/pubsub/protomsg.git v0.0.0-20230210092337-5f1e6cdae7c3/go.mod h1:un5NV5VWQoblVLZfx1Rt5vyLgwR0jI92d3VJhfrJhWU= basic.com/valib/bhomeclient.git v1.2.1 h1:2q5hcf8V0lSTJSX+WcJh9rI/LDnD+We65+yXK04rjfY= basic.com/valib/bhomeclient.git v1.2.1/go.mod h1:QEPxNQPQjTTVrsMI+AyVRzq/bI99lxmct1BgCmu/Pvs= basic.com/valib/bhomedbapi.git v0.0.0-20220825084023-fe74ddd6ae6e h1:LYulJQfA5y0/y51KOTQfAGIA/l6r6cQ36Fvrwfce5Pk= basic.com/valib/bhomedbapi.git v0.0.0-20220825084023-fe74ddd6ae6e/go.mod h1:no2OZ7ght2oZ6thE+e0w3UdJjgjt/TgKlWSwdPs9GAc= basic.com/valib/c_bhomebus.git v0.0.0-20230203061815-9f24b2f398b7 h1:ck7olnaIkX0bJ3LfY3n1yaaRBjYlYJYrdieDqigfIKQ= basic.com/valib/c_bhomebus.git v0.0.0-20230203061815-9f24b2f398b7/go.mod h1:QFq9kACofwTIwjtY3xyfrSnhr3NLWJt8WSvwnnEta1U= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU= @@ -48,6 +56,8 @@ github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= github.com/ajankovic/xdiff v0.0.1 h1:V1cj8t5xwYzm6ZGPqPOlAc9AIajXuTEn41D/1MJBWMM= github.com/ajg/form v1.5.1 h1:t9c7v8JUKu/XxOGBU0yjNpaMloxGEJhUkqFRq0ibGeU= github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY= github.com/andybalholm/cascadia v1.1.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y= github.com/antchfx/xpath v0.0.0-20170515025933-1f3266e77307 h1:C735MoY/X+UOx6SECmHk5pVOj51h839Ph13pEoY8UmU= github.com/apache/plc4x/plc4go v0.0.0-20230817065839-dd203446b558 h1:d3INvMf4ei9qlX10We5+z/+dQnmmCx0J0wflcZVihGo= @@ -148,6 +158,8 @@ github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gofrs/uuid v4.4.0+incompatible h1:3qXRTX8/NbyulANqlc0lchS1gqAVxRgsuW1YrTJupqA= github.com/gofrs/uuid v4.4.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe h1:lXe2qZdvpiX5WZkZR4hgp4KJVfY3nMkvmwbVkpv1rVY= github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0= github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 h1:DACJavvAHhabrF08vX0COfcOBJRhZ8lUbR+ZWIs0Y5g= @@ -179,6 +191,8 @@ github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= @@ -246,6 +260,7 @@ github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.4 h1:acbojRNwl3o09bUq+yDCtZFc1aiwaAAxtcn8YkZXnvk= @@ -620,6 +635,7 @@ golang.org/x/tools v0.0.0-20200512131952-2bc93b1c0c88/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20200515010526-7d3b6ebf133d/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20200618134242-20370b0cb4b2/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20200729194436-6467de6f59a7/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= @@ -629,6 +645,7 @@ golang.org/x/tools v0.0.0-20201208233053-a543418bbed2/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210105154028-b0ab187a4818/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210108195828-e2f9c7f1fc8e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= @@ -700,6 +717,8 @@ google.golang.org/genproto v0.0.0-20201214200347-8c77b98c765d/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20210108203827-ffc7fda8c3d7/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20210226172003-ab064af71705/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 h1:KpwkzHKEF7B9Zxg18WzOa7djJ+Ha5DzthMyZYQfEn2A= google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1/go.mod h1:nKE/iIaLqn2bQwXBg8f1g2Ylh6r5MN5CmZvuzZCgsCU= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= @@ -716,6 +735,8 @@ google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc= google.golang.org/grpc v1.34.0/go.mod h1:WotjhfgOW/POjDeRt8vscBtXq+2VjORFy659qA51WJ8= google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.55.0 h1:3Oj82/tFSCeUrRTg/5E/7d/W5A1tj6Ky1ABAuZuv5ag= google.golang.org/grpc v1.55.0/go.mod h1:iYEXKGkEBhg1PjZQvoYEVPTDkHo1/bjTnfwTeGONTY8= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= main.go
@@ -12,9 +12,6 @@ "apsClient/service/plc_address" "fmt" "net/http" "os" "os/signal" "syscall" "time" ) @@ -28,22 +25,13 @@ return } if err := nsq.Init(); err != nil { logx.Errorf("nsq Init err:%v", err) return } if err := crontask.InitTask(); err != nil { logx.Errorf("crontab task Init err:%v", err) return } //加载plc写入地址 plc_address.LoadAddressFromFile() ////提前加载任务 //service.NewTaskService().GetTask() //go shutdown() // 启动数据同步 var serfStartChan = make(chan bool) @@ -58,11 +46,15 @@ agent := serf.InitAgent("apsClient", syncTables, sqlitex.GetDB()) agent.RegisterClusterEvent(serfClusterEvent) go agent.Serve(serfStartChan) <-serfStartChan // 判断当前集群状态 //agent.ClusterStatus == "master" if agent.ClusterStatus != "slave" { if err := nsq.Init(); err != nil { logx.Errorf("nsq Init err:%v", err) return } } logx.Infof("apsClient start serve...") server := &http.Server{ @@ -74,28 +66,15 @@ logx.Error(server.ListenAndServe().Error()) } func shutdown() { quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGKILL, syscall.SIGQUIT, syscall.SIGINT, syscall.SIGTERM) <-quit logx.Infof("apsClient exited...") os.Exit(0) } func serfClusterEvent(stat int) { switch stat { case serf.EventCreateCluster: // 创建集群 case serf.EventJoinCluster: // 加入集群 case serf.EventLeaveCluster: // 退出集群 case serf.EventSlave2Master: // 切换为主节点 case serf.EventMaster2Slave: // 切换为子节点 case serf.EventCreateCluster, serf.EventSlave2Master, serf.EventLeaveCluster: if err := nsq.Init(); err != nil { logx.Errorf("nsq Init err:%v", err) return } case serf.EventJoinCluster, serf.EventMaster2Slave: nsq.Stop() } fmt.Println("clusterEvent:", stat) nsq/consumer.go
@@ -5,12 +5,11 @@ "apsClient/constvar" "apsClient/pkg/logx" "apsClient/pkg/nsqclient" "context" "fmt" ) func Consume(topic, channel string) (err error) { c, err := nsqclient.NewNsqConsumer(context.Background(), topic, channel) func NewConsumer(topic, channel string) (c *nsqclient.NsqConsumer, err error) { c, err = nsqclient.NewNsqConsumer(topic, channel) if err != nil { logx.Errorf("NewNsqConsumer err:%v", err) return nsq/nsq.go
@@ -6,20 +6,32 @@ "apsClient/model/common" "apsClient/pkg/logx" "apsClient/pkg/safe" "basic.com/aps/nsqclient.git" "context" "errors" "fmt" "sync" "time" ) func Init() error { type consumerManager struct { ctx context.Context clients sync.Map } var defaultConsumerManager *consumerManager func init() { defaultConsumerManager = new(consumerManager) } func (c *consumerManager) init() error { if len(conf.Conf.NsqConf.NodeId) <= 0 { return errors.New("no NodeId") } if err := initProducer(); err != nil { return err } safe.Go(func() { caller := NewCaller(fmt.Sprintf(constvar.NsqTopicGetPlcAddress, conf.Conf.NsqConf.NodeId), fmt.Sprintf(constvar.NsqTopicSendPlcAddress, conf.Conf.NsqConf.NodeId)) var addressResult common.ResponsePlcAddress @@ -29,40 +41,54 @@ } }) safe.Go(func() { err := Consume(fmt.Sprintf(constvar.NsqTopicScheduleTask, conf.Conf.NsqConf.NodeId), conf.Conf.System.DeviceId) if err != nil { logx.Errorf("start nsq consume err: %v", err) var topics = []string{ constvar.NsqTopicScheduleTask, constvar.NsqTopicSendPlcAddress, constvar.NsqTopicProcessParamsResponse, constvar.NsqTopicApsProcessParams, constvar.NsqTopicDeviceUpdate, } }) safe.Go(func() { err := Consume(fmt.Sprintf(constvar.NsqTopicSendPlcAddress, conf.Conf.NsqConf.NodeId), conf.Conf.System.DeviceId) if err != nil { logx.Errorf("start nsq consume err: %v", err) for _, t := range topics { topic := fmt.Sprintf(t, conf.Conf.NsqConf.NodeId) c.AddConsumer(topic) } }) safe.Go(func() { err := Consume(fmt.Sprintf(constvar.NsqTopicProcessParamsResponse, conf.Conf.NsqConf.NodeId), conf.Conf.System.DeviceId) if err != nil { logx.Errorf("start nsq consume err: %v", err) } }) safe.Go(func() { err := Consume(fmt.Sprintf(constvar.NsqTopicApsProcessParams, conf.Conf.NsqConf.NodeId), conf.Conf.System.DeviceId) if err != nil { logx.Errorf("start nsq consume err: %v", err) } }) safe.Go(func() { err := Consume(fmt.Sprintf(constvar.NsqTopicDeviceUpdate, conf.Conf.NsqConf.NodeId), conf.Conf.System.DeviceId) if err != nil { logx.Errorf("start nsq consume err: %v", err) } }) return nil } func (c *consumerManager) AddConsumer(topic string) { client, err := NewConsumer(topic, conf.Conf.System.DeviceId) if err != nil { logx.Errorf("start nsq consume err: %v", err) } c.clients.Store(topic, client) safe.Go(func() { if len(conf.Conf.NsqConf.NsqlookupdAddr) > 0 { if err = client.RunLookupd(conf.Conf.NsqConf.NsqlookupdAddr, 1); err != nil { logx.Errorf("RunLookupd err:%v", err) return } } else { if err = client.Run(conf.Conf.NsqConf.NsqdAddr, 1); err != nil { logx.Errorf("Run err:%v", err) return } } }) } func (c *consumerManager) stop() { c.clients.Range(func(key, value any) bool { if consumer, ok := value.(*nsqclient.NsqConsumer); ok { nsqclient.DestroyNsqConsumer(consumer) } return true }) } func Init() error { return defaultConsumerManager.init() } func Stop() { defaultConsumerManager.stop() } pkg/nsqclient/consumer.go
@@ -18,7 +18,7 @@ channel string } func NewNsqConsumer(ctx context.Context, topic, channel string, options ...func(*nsq.Config)) (*NsqConsumer, error) { func NewNsqConsumer(topic, channel string, options ...func(*nsq.Config)) (*NsqConsumer, error) { conf := nsq.NewConfig() conf.MaxAttempts = 0 conf.MsgTimeout = 10 * time.Minute // 默认一个消息最多能处理十分钟,否则就会重新丢入队列 @@ -33,7 +33,6 @@ } return &NsqConsumer{ consumer: consumer, ctx: ctx, topic: topic, channel: channel, }, nil