From 8d2a95fc0eeabe1b13d0a914c9ec2845d42c0be3 Mon Sep 17 00:00:00 2001
From: zhangzengfei <zhangzengfei@smartai.com>
Date: 星期四, 19 十月 2023 11:32:57 +0800
Subject: [PATCH] 添加主从serf切换事件
---
nsq/nsq.go | 25 +++++++++----------------
1 files changed, 9 insertions(+), 16 deletions(-)
diff --git a/nsq/nsq.go b/nsq/nsq.go
index 2035f15..888cf08 100644
--- a/nsq/nsq.go
+++ b/nsq/nsq.go
@@ -3,15 +3,13 @@
import (
"apsClient/conf"
"apsClient/constvar"
- "apsClient/model/common"
"apsClient/pkg/logx"
+ "apsClient/pkg/nsqclient"
"apsClient/pkg/safe"
- "basic.com/aps/nsqclient.git"
"context"
"errors"
"fmt"
"sync"
- "time"
)
type consumerManager struct {
@@ -32,14 +30,6 @@
if err := initProducer(); err != nil {
return err
}
- safe.Go(func() {
- caller := NewCaller(fmt.Sprintf(constvar.NsqTopicGetPlcAddress, conf.Conf.NsqConf.NodeId), fmt.Sprintf(constvar.NsqTopicSendPlcAddress, conf.Conf.NsqConf.NodeId))
- var addressResult common.ResponsePlcAddress
- err := caller.Call(common.RequestPlcAddress{DeviceId: conf.Conf.System.DeviceId}, &addressResult, time.Second*3)
- if err != nil {
- logx.Infof("get plc address err: %v", err.Error())
- }
- })
var topics = []string{
constvar.NsqTopicScheduleTask,
@@ -47,6 +37,7 @@
constvar.NsqTopicProcessParamsResponse,
constvar.NsqTopicApsProcessParams,
constvar.NsqTopicDeviceUpdate,
+ constvar.NsqTopicPullDataResponse,
}
for _, t := range topics {
topic := fmt.Sprintf(t, conf.Conf.NsqConf.NodeId)
@@ -56,12 +47,12 @@
}
func (c *consumerManager) AddConsumer(topic string) {
- client, err := NewConsumer(topic, conf.Conf.System.DeviceId)
- if err != nil {
- logx.Errorf("start nsq consume err: %v", err)
- }
- c.clients.Store(topic, client)
safe.Go(func() {
+ client, err := NewConsumer(topic, conf.Conf.System.DeviceId)
+ if err != nil {
+ logx.Errorf("start nsq consume err: %v", err)
+ }
+ c.clients.Store(topic, client)
if len(conf.Conf.NsqConf.NsqlookupdAddr) > 0 {
if err = client.RunLookupd(conf.Conf.NsqConf.NsqlookupdAddr, 1); err != nil {
logx.Errorf("RunLookupd err:%v", err)
@@ -73,6 +64,7 @@
return
}
}
+ logx.Infof("add consumer success, topic:%v", topic)
})
}
@@ -80,6 +72,7 @@
c.clients.Range(func(key, value any) bool {
if consumer, ok := value.(*nsqclient.NsqConsumer); ok {
nsqclient.DestroyNsqConsumer(consumer)
+ logx.Infof("try stop consumer, topic : %v", key)
}
return true
})
--
Gitblit v1.8.0