From 10c65da3d2af7056f48d9301e83f53f102f76e18 Mon Sep 17 00:00:00 2001 From: zhangqian <zhangqian@123.com> Date: 星期一, 30 十月 2023 14:52:11 +0800 Subject: [PATCH] fix --- api/v1/task.go | 188 +++++++++++++++++++++++++++++----------------- 1 files changed, 118 insertions(+), 70 deletions(-) diff --git a/api/v1/task.go b/api/v1/task.go index ce5b49d..cd04286 100644 --- a/api/v1/task.go +++ b/api/v1/task.go @@ -4,18 +4,14 @@ "apsClient/conf" "apsClient/constvar" "apsClient/model" - "apsClient/model/common" "apsClient/model/request" "apsClient/model/response" - "apsClient/nsq" "apsClient/pkg/contextx" "apsClient/pkg/ecode" "apsClient/pkg/logx" - "apsClient/pkg/safe" "apsClient/service" "apsClient/service/plc_address" "errors" - "fmt" "github.com/gin-gonic/gin" "github.com/jinzhu/gorm" "github.com/spf13/cast" @@ -179,15 +175,6 @@ }) } - safe.Go(func() { - caller := nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicGetPlcAddress, conf.Conf.NsqConf.NodeId), fmt.Sprintf(constvar.NsqTopicSendPlcAddress, conf.Conf.NsqConf.NodeId)) - var addressResult common.ResponsePlcAddress - err := caller.Call(common.RequestPlcAddress{DeviceId: conf.Conf.System.DeviceId}, &addressResult, time.Second*3) - if err != nil { - logx.Infof("get plc address err: %v", err.Error()) - } - }) - resp := response.ProcessParamsResponse{ Number: processModel.Number, Params: processParamsArr, @@ -226,25 +213,26 @@ ctx.Fail(ecode.ParamsErr) return } - err := service.NewTaskService().UpdateProcedureStatus(nil, id, model.ProcedureStatusFinished, procedure.Channel) + + err := model.WithTransaction(func(db *gorm.DB) error { + err := service.NewTaskService().UpdateProcedureStatusAndChannel(db, id, model.ProcedureStatusFinished, procedure.Channel, procedure.ProcessModelNumber) + if err != nil { + return err + } + record := model.TaskStatusSync{ + WorkOrderId: procedure.WorkOrderID, + ProcedureID: procedure.ProcedureID, + DeviceId: procedure.DeviceID, + IsProcessing: false, + IsFinish: true, + } + return service.NewTaskService().SaveTaskStatusSync(db, &record) + }) + if err != nil { - logx.Errorf("UpdateProcedureStatus err: %v", err.Error()) + logx.Errorf("TaskFinish UpdateProcedureStatus err: %v", err.Error()) ctx.Fail(ecode.UnknownErr) return - } - - msg := &common.MsgTaskStatusUpdate{ - WorkOrderId: procedure.WorkOrderID, - ProcedureID: procedure.ProceduresInfo.ProcedureID, - DeviceId: procedure.ProceduresInfo.DeviceID, - IsProcessing: false, - IsFinish: true, - } - - caller := nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicTaskProcedureStatusUpdate, conf.Conf.NsqConf.NodeId), "") - err = caller.Send(msg) - if err != nil { - logx.Errorf("send task status update msg error:%v", err.Error()) } service.TaskFlagUnset(procedure.Channel) @@ -290,7 +278,23 @@ ctx.FailWithMsg(ecode.ParamsErr, "鏈幏鍙栧埌宸ヨ壓鍙傛暟锛岃鍦ㄥ伐鑹烘ā鍨嬪簱涓笂浼狅紒") return } - + plcConfig, code := service.NewDevicePlcService().GetDevicePlc() + if code != ecode.OK || plcConfig.ID == 0 { + ctx.FailWithMsg(ecode.NeedConfirmedErr, "璇峰厛閰嶇疆PLC") + return + } + plcConfig.MaxTryTimes = 2 + err = SendParams(processModel.ParamsMap, plcConfig) + if err != nil { + ctx.FailWithMsg(ecode.NeedConfirmedErr, "PLC璇锋眰澶辫触锛岃妫�鏌LC閰嶇疆锛�") + return + } + plcConfig.CurrentTryTimes = 0 + err = service.PlcWrite(plcConfig, constvar.PlcStartAddressTypeTotalNumber, procedure.Channel, order.Amount.IntPart()) + if err != nil { + ctx.FailWithMsg(ecode.NeedConfirmedErr, "PLC璇锋眰澶辫触锛岃妫�鏌LC閰嶇疆锛�") + return + } err = model.WithTransaction(func(db *gorm.DB) error { err = taskService.UpdateProcedureStatusAndChannel(db, params.ProcedureId, model.ProcedureStatusProcessing, procedure.Channel, processModel.Number) if err != nil { @@ -300,52 +304,25 @@ if err != nil { return err } - return service.NewProgressService().Add(db, procedure, order) + err = service.NewProgressService().Add(db, procedure, order) + if err != nil { + return err + } + record := model.TaskStatusSync{ + WorkOrderId: procedure.WorkOrderID, + ProcedureID: procedure.ProcedureID, + DeviceId: procedure.DeviceID, + IsProcessing: true, + IsFinish: false, + } + return service.NewTaskService().SaveTaskStatusSync(db, &record) }) if err != nil { logx.Errorf("SendProcessParams update order and procedure status error:%v", err.Error()) ctx.FailWithMsg(ecode.DBErr, "鏇存敼宸ュ崟鐘舵�佸け璐�") return } - plcConfig, code := service.NewDevicePlcService().GetDevicePlc() - if code != ecode.OK || plcConfig.ID == 0 { - ctx.FailWithMsg(ecode.NeedConfirmedErr, "璇峰厛閰嶇疆PLC") - return - } - plcConfig.MaxTryTimes = 2 - err = SendParams(processModel.ParamsMap, plcConfig) - if err != nil { - logx.Errorf("SendProcessParams: %v", err.Error()) - err = model.WithTransaction(func(db *gorm.DB) error { - err = taskService.UpdateProcedureStatusAndChannel(db, params.ProcedureId, model.ProcedureStatusWaitProcess, procedure.Channel, "") - if err != nil { - return err - } - return nil - }) - ctx.FailWithMsg(ecode.NeedConfirmedErr, "PLC璇锋眰澶辫触锛岃妫�鏌LC閰嶇疆锛�") - return - } - plcConfig.CurrentTryTimes = 0 - err = service.PlcWrite(plcConfig, constvar.PlcStartAddressTypeTotalNumber, procedure.Channel, order.Amount.IntPart()) - if err != nil { - ctx.FailWithMsg(ecode.NeedConfirmedErr, "PLC璇锋眰澶辫触锛岃妫�鏌LC閰嶇疆") - return - } - msg := &common.MsgTaskStatusUpdate{ - WorkOrderId: procedure.WorkOrderID, - ProcedureID: procedure.ProceduresInfo.ProcedureID, - DeviceId: procedure.ProceduresInfo.DeviceID, - IsProcessing: true, - IsFinish: false, - } - - caller := nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicTaskProcedureStatusUpdate, conf.Conf.NsqConf.NodeId), "") - err = caller.Send(msg) - if err != nil { - logx.Errorf("send task status update msg error:%v", err.Error()) - } service.TaskFlagSet(procedure.Channel) ctx.Ok() } @@ -399,7 +376,7 @@ ctx.FailWithMsg(ecode.NeedConfirmedErr, "PLC璇锋眰澶辫触锛岃妫�鏌LC閰嶇疆锛�") return } - _ = taskService.UpdateProcedureStatusAndChannel(nil, params.ProcedureId, model.ProcedureStatusProcessing, procedure.Channel, processModel.Number) + _ = taskService.UpdateProcessModelNumber(nil, params.ProcedureId, processModel.Number) ctx.Ok() } @@ -437,3 +414,74 @@ logx.Info("----------------涓嬪彂宸ヨ壓鍙傛暟瀹屾瘯-----------------") return nil } + +// TaskListByChannel +// @Tags Task +// @Summary 鑾峰彇浠诲姟鍒楄〃2 +// @Produce application/json +// @Param object query request.SimpleTaskList true "鏌ヨ鍙傛暟" +// @Success 200 {object} contextx.Response{data=map[int32]response.taskResponse} "鎴愬姛" +// @Router /v1/task/listByChannel [get] +func (slf *TaskApi) TaskListByChannel(c *gin.Context) { + var params request.TaskListByChannel + ctx, ok := contextx.NewContext(c, ¶ms) + if !ok { + return + } + + channelAmount, err := service.NewDevicePlcService().GetDeviceChannelAmount() + if err != nil { + ctx.FailWithMsg(ecode.NeedConfirmedErr, err.Error()) + return + } + + dataMap := make(map[int32]*response.TaskResponse, channelAmount) + if params.Channel != nil { + taskResponse, err := getTaskResponseByChannel(params, *params.Channel) + if err != nil { + ctx.FailWithMsg(ecode.DBErr, err.Error()) + return + } + dataMap[*params.Channel] = taskResponse + ctx.OkWithDetailed(dataMap) + return + } + + //涓嶄紶channel鍙栨墍鏈塩hannel鐨� + var wg sync.WaitGroup + var mu sync.Mutex + for i := 0; i < channelAmount; i++ { + wg.Add(1) + go func(channel int32) { + defer wg.Done() + taskResponse, err := getTaskResponseByChannel(params, channel) + if err != nil { + ctx.FailWithMsg(ecode.DBErr, err.Error()) + return + } + mu.Lock() + defer mu.Unlock() + dataMap[channel] = taskResponse + }(int32(i)) + } + wg.Wait() + ctx.OkWithDetailed(dataMap) +} + +func getTaskResponseByChannel(params request.TaskListByChannel, channel int32) (taskResponse *response.TaskResponse, err error) { + taskResponse, err = service.NewTaskService().GetTask2(params.Offset, params.Limit, []int32{channel}, params.Type) //鍙栬繘琛屼腑鐨勬垨鏈紑濮嬬殑 + if err != nil { + return + } + + nowTs := time.Now().Unix() + flagMap := make(map[int32]struct{}, 0) + for _, task := range taskResponse.Tasks { + if _, ok := flagMap[task.Channel]; !ok && !service.TaskFlagGet(task.Channel) && task.Procedure.StartTime <= nowTs { + task.CanStarted = true + flagMap[task.Channel] = struct{}{} + } + } + taskResponse.Prompt = conf.Conf.Prompt + return +} -- Gitblit v1.8.0