fix
zhangqian
2023-11-09 cacf363f222e354cad6db18c4a00fb1097d35c2c
crontask/cron_task.go
@@ -3,11 +3,14 @@
import (
   "apsClient/conf"
   "apsClient/constvar"
   "apsClient/model"
   "apsClient/model/common"
   "apsClient/nsq"
   "apsClient/pkg/ecode"
   "apsClient/pkg/logx"
   "apsClient/serf"
   "apsClient/service"
   "apsClient/service/problem"
   "fmt"
   "github.com/go-co-op/gocron"
   "github.com/spf13/cast"
@@ -78,7 +81,13 @@
   if isMaster {
      s.Every(60).Seconds().Do(SyncProductionProgress) //同步生产数据
      s.Every(30).Seconds().Do(SyncTaskStatus)         //同步任务状态
      s.Every(10).Seconds().Do(CheckNsqConn)           //查询nsq连接
   }
   s.Every(20).Seconds().Do(QueryClusterStatus) //查询集群节点数量
   s.Every(30).Seconds().Do(ProblemCheck)       //问题诊断
   s.StartAsync()
   return nil
}
@@ -143,11 +152,50 @@
   caller := nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicPullDataRequest, conf.Conf.NsqConf.NodeId), constvar.NsqTopicPullDataResponse)
   err := caller.Send(msg)
   if err != nil {
      logx.Errorf("send pull data msg error:%v", err.Error())
      logx.Errorf("send pull data msg error:%v, msg:%+v", err.Error(), msg)
   }
   caller = nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicGetPlcAddress, conf.Conf.NsqConf.NodeId), "")
   err = caller.Send(common.RequestPlcAddress{DeviceId: conf.Conf.System.DeviceId})
   msg = &common.MsgPullDataRequest{DataType: common.PullDataTypeDevice}
   caller = nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicPullDataRequest, conf.Conf.NsqConf.NodeId), constvar.NsqTopicPullDataResponse)
   err = caller.Send(msg)
   if err != nil {
      logx.Infof("get plc address err: %v", err.Error())
      logx.Errorf("send pull data msg error:%v, msg:%+v", err.Error(), msg)
   }
   msg = &common.MsgPullDataRequest{DataType: common.PullDataTypeProcessModelPlcAddress}
   caller = nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicPullDataRequest, conf.Conf.NsqConf.NodeId), constvar.NsqTopicPullDataResponse)
   err = caller.Send(msg)
   if err != nil {
      logx.Errorf("send pull data msg error:%v, msg:%+v", err.Error(), msg)
   }
}
func QueryClusterStatus() {
   clusterStatus, nodeQuantity := serf.QueryClusterStatusAndNodeQuantity()
   conf.Conf.SerfClusterStatus = clusterStatus
   conf.Conf.ClusterNodeQuantity = nodeQuantity
}
func ProblemCheck() {
   problem.Check()
}
func CheckNsqConn() {
   var err error
   var status constvar.SystemStatusValue
   if nsq.Ping() {
      status = constvar.SystemStatusValueNormal
   } else {
      status = constvar.SystemStatusValueUnNormal
   }
   old, err := model.NewSystemStatusSearch().SetKey(constvar.SystemStatusKeyNsq).First()
   if err != nil {
      logx.Errorf("get nsq status err:%v", err)
      return
   }
   if old.Value == status {
      return
   }
   err = model.NewSystemStatusSearch().SetKey(constvar.SystemStatusKeyNsq).Updates(map[string]interface{}{"value": status})
   if err != nil {
      logx.Errorf("update nsq status err:%v", err)
   }
}