From b483b294741920e90815c2d1f4c6827d9921310c Mon Sep 17 00:00:00 2001 From: zhangqian <zhangqian@123.com> Date: 星期二, 22 八月 2023 16:24:54 +0800 Subject: [PATCH] plc写入地址文件通过nsq从云端获取 --- nsq/msg_handler.go | 61 ++++++++ service/plc_address/address_map.go | 70 ++++++++++ model/common/common.go | 11 + nsq/caller.go | 48 ++++++ test/msg_handler_test.go | 42 +++--- nsq/consumer.go | 5 api/v1/task.go | 87 ++++++++++-- datafile/plc_address_key | 3 utils/file/file.go | 56 ++++++++ constvar/const.go | 10 + datafile/plc_address_value | 3 conf/apsClient.json | 2 nsq/nsq.go | 19 ++ 13 files changed, 377 insertions(+), 40 deletions(-) diff --git a/api/v1/task.go b/api/v1/task.go index 893106b..b0719bc 100644 --- a/api/v1/task.go +++ b/api/v1/task.go @@ -2,15 +2,19 @@ import ( "apsClient/conf" + "apsClient/constvar" "apsClient/model" + "apsClient/model/common" "apsClient/model/response" - _ "apsClient/model/response" + "apsClient/nsq" "apsClient/pkg/contextx" "apsClient/pkg/convertx" "apsClient/pkg/ecode" "apsClient/pkg/logx" "apsClient/pkg/plc" + "apsClient/pkg/safe" "apsClient/service" + "apsClient/service/plc_address" "errors" "fmt" "github.com/gin-gonic/gin" @@ -62,7 +66,6 @@ ctx.Fail(code) return } - if procedure.Status != model.ProcedureStatusUnFinished { ctx.FailWithMsg(ecode.ParamsErr, "璇ュ伐搴忓凡缁撴潫") return @@ -73,7 +76,6 @@ ctx.Fail(ecode.UnknownErr) return } - params := service.GetProcessModelParams{ WorkOrder: procedure.WorkOrderID, OrderId: procedure.OrderID, @@ -81,9 +83,7 @@ Procedure: procedure.ProceduresInfo.ProcedureName, Device: procedure.ProceduresInfo.DeviceName, } - resp, err := service.ProcessModel{}.GetProcessModel(params) - if err != nil { logx.Errorf("TaskStart Notice GetProcessModel error: %v", err.Error()) ctx.FailWithMsg(ecode.UnknownErr, "鏈幏鍙栧埌宸ヨ壓鍙傛暟") @@ -96,12 +96,12 @@ Value: v, }) } - response := response.ProcessParamsResponse{ + data := response.ProcessParamsResponse{ Number: resp.Number, Params: processParamsArr, } logx.Infof("TaskStart Notice GetProcessModel: %+v", resp) - ctx.OkWithDetailed(response) + ctx.OkWithDetailed(data) } // TaskFinish @@ -166,11 +166,11 @@ } params := service.GetProcessModelParams{ - WorkOrder: "", + WorkOrder: procedure.WorkOrderID, OrderId: procedure.OrderID, Product: order.ProductName, Procedure: procedure.ProceduresInfo.ProcedureName, - Device: procedure.ProceduresInfo.DeviceID, + Device: procedure.ProceduresInfo.DeviceName, } resp, err := service.ProcessModel{}.GetProcessModel(params) @@ -180,12 +180,12 @@ return } - fmt.Println("----------------寮�濮嬩笅鍙戝伐鑹哄弬鏁�-----------------") - for k, v := range resp.ParamsMap { - fmt.Println(fmt.Sprintf("%v : %v", k, v)) - time.Sleep(time.Millisecond * 300) - } - fmt.Println("----------------涓嬪彂宸ヨ壓鍙傛暟瀹屾瘯-----------------") + //fmt.Println("----------------寮�濮嬩笅鍙戝伐鑹哄弬鏁�-----------------") + //for k, v := range resp.ParamsMap { + // fmt.Println(fmt.Sprintf("%v : %v", k, v)) + // time.Sleep(time.Millisecond * 300) + //} + //fmt.Println("----------------涓嬪彂宸ヨ壓鍙傛暟瀹屾瘯-----------------") //err = SendParams(resp.ParamsMap) //if err != nil { @@ -194,10 +194,20 @@ // return //} + safe.Go(func() { + err = SendParams2(resp.ParamsMap, 0) + if err != nil { + logx.Errorf("SendProcessParams: %v", err.Error()) + return + } + }) ctx.Ok() } func SendParams(paramsMap map[string]interface{}) error { + if len(paramsMap) == 0 { + return errors.New("empty params") + } plcAddressMap := make(map[string]*int, len(conf.Conf.PLCAddresses)) for _, item := range conf.Conf.PLCAddresses { plcAddressMap[item.FieldName] = &item.Address @@ -227,3 +237,50 @@ logx.Info("----------------涓嬪彂宸ヨ壓鍙傛暟瀹屾瘯-----------------") return nil } + +func SendParams2(paramsMap map[string]interface{}, tryTimes int) error { + if len(paramsMap) == 0 { + return errors.New("empty params") + } + if tryTimes > 2 { + return errors.New("beyond max try time") + } + plcConfig, code := service.NewDevicePlcService().GetDevicePlc() + if code != ecode.OK { + return errors.New("璇峰厛閰嶇疆PLC") + } + conn, err := plc.NewModbusConnection(plcConfig.Address) + if err != nil { + return errors.New(fmt.Sprintf("杩炴帴plc澶辫触: %v", err.Error())) + } + defer conn.Close() + + logx.Info("----------------寮�濮嬩笅鍙戝伐鑹哄弬鏁�-----------------") + var missNumbers int + for k, v := range paramsMap { + if address, ok := plc_address.Get(k); ok { + result, err := plc.WriteHoldingRegister(conn, address, v) + if err != nil { + logx.Errorf("WriteHoldingRegister err:%v, address: %v, key: %v value: %v", err.Error(), address, k, v) + } else { + delete(paramsMap, k) + logx.Infof("WriteHoldingRegister ok: key: %v, value: %v, result: %v", k, v, result) + } + } else { + missNumbers++ + } + } + if missNumbers >= 1 { + caller := nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicGetPlcAddress, conf.Conf.NsqConf.NodeId), fmt.Sprintf(constvar.NsqTopicSendPlcAddress, conf.Conf.NsqConf.NodeId)) + var addressResult common.ResponsePlcAddress + err := caller.Call(common.RequestPlcAddress{DeviceId: conf.Conf.System.DeviceId}, &addressResult, time.Second*2) + if err != nil { + logx.Infof("SendParams2 err: %v", err.Error()) + return err + } + tryTimes++ + return SendParams2(paramsMap, tryTimes) + } + logx.Info("----------------涓嬪彂宸ヨ壓鍙傛暟瀹屾瘯-----------------") + return nil +} diff --git a/conf/apsClient.json b/conf/apsClient.json index 6549a1c..677bcec 100644 --- a/conf/apsClient.json +++ b/conf/apsClient.json @@ -2,7 +2,7 @@ "system": { "env": "develop", "port": 8003, - "deviceId": "DeviceID 1", + "deviceId": "DeviceID1", "netSetShellPath": "/data/network/", "netUpShellName": "up.sh", "netDownShellName": "down.sh" diff --git a/constvar/const.go b/constvar/const.go index 63eece0..5d30c70 100644 --- a/constvar/const.go +++ b/constvar/const.go @@ -1,7 +1,9 @@ package constvar const ( - NsqTopicScheduleTask = "aps.%v.scheduleTask" //鎺掔▼浠诲姟涓嬪彂 + NsqTopicScheduleTask = "aps.%v.scheduleTask" //鎺掔▼浠诲姟涓嬪彂 + NsqTopicGetPlcAddress = "aps.%v.getPlcAddress" + NsqTopicSendPlcAddress = "aps.%v.sendPlcAddress" ) type PlcStartAddressType string @@ -9,3 +11,9 @@ const ( PlcStartAddressTypeFinishNumber = "1" ) + +const ( + PlcAddressDataPath = "datafile/" + PlcAddressDataKeyFileName = "plc_address_key" + PlcAddressDataValueFileName = "plc_address_value" +) diff --git a/datafile/plc_address_key b/datafile/plc_address_key new file mode 100644 index 0000000..83130ac --- /dev/null +++ b/datafile/plc_address_key @@ -0,0 +1,3 @@ +鍘嬪己 +鏃堕棿 +娓╁害 \ No newline at end of file diff --git a/datafile/plc_address_value b/datafile/plc_address_value new file mode 100644 index 0000000..3a9e4ec --- /dev/null +++ b/datafile/plc_address_value @@ -0,0 +1,3 @@ +1000 +1001 +1002 \ No newline at end of file diff --git a/model/common/common.go b/model/common/common.go index 4bbc606..580b2b8 100644 --- a/model/common/common.go +++ b/model/common/common.go @@ -54,3 +54,14 @@ Procedures []*ProductProcedure `json:"procedures"` // 宸ュ簭鍒楄〃 } ) + +// PLC鍐欏叆鍦板潃 +type ( + RequestPlcAddress struct { + DeviceId string + } + ResponsePlcAddress struct { + KeyData []byte + AddressData []byte + } +) diff --git a/nsq/caller.go b/nsq/caller.go new file mode 100644 index 0000000..36839cc --- /dev/null +++ b/nsq/caller.go @@ -0,0 +1,48 @@ +package nsq + +import ( + "encoding/json" + "errors" + "time" +) + +type Caller interface { + Call(msg []byte, duration time.Duration) ([]byte, error) +} + +type DefaultCaller struct { + NsqChannel string + RequestTopic string + ResponseTopic string +} + +func NewCaller(requestTopic, responseTopic string) *DefaultCaller { + return &DefaultCaller{ + NsqChannel: "", + RequestTopic: requestTopic, + ResponseTopic: responseTopic, + } +} + +func (caller *DefaultCaller) Call(input interface{}, output interface{}, timeout time.Duration) error { + msg, err := json.Marshal(input) + if err != nil { + return err + } + producer := GetProducer() + err = producer.Publish(caller.RequestTopic, msg) + if err != nil { + return err + } + to := time.After(timeout) + for { + select { + case <-to: + return errors.New("timeout") + case data := <-ReceivedMessageChan: + if data.Topic == caller.ResponseTopic { + return json.Unmarshal(data.Message, &output) + } + } + } +} diff --git a/nsq/consumer.go b/nsq/consumer.go index 0f0bbdc..feb2814 100644 --- a/nsq/consumer.go +++ b/nsq/consumer.go @@ -2,6 +2,7 @@ import ( "apsClient/conf" + "apsClient/constvar" "apsClient/pkg/logx" "apsClient/pkg/nsqclient" "context" @@ -17,8 +18,10 @@ logx.Infof("Consume NewNsqConsumer topic:%v", topic) var handler MsgHandler switch topic { - case fmt.Sprintf("aps.%v.scheduleTask", conf.Conf.NsqConf.NodeId): + case fmt.Sprintf(constvar.NsqTopicScheduleTask, conf.Conf.NsqConf.NodeId): handler = new(ScheduleTask) + case fmt.Sprintf(constvar.NsqTopicSendPlcAddress, conf.Conf.NsqConf.NodeId): + handler = &PlcAddress{Topic: topic} } c.AddHandler(handler.HandleMessage) diff --git a/nsq/msg_handler.go b/nsq/msg_handler.go index 9485653..a33cf9f 100644 --- a/nsq/msg_handler.go +++ b/nsq/msg_handler.go @@ -2,13 +2,30 @@ import ( "apsClient/conf" + "apsClient/constvar" "apsClient/model" "apsClient/model/common" "apsClient/pkg/logx" "apsClient/pkg/structx" + "apsClient/service/plc_address" + "apsClient/utils/file" "encoding/json" + "fmt" + "github.com/spf13/cast" "gorm.io/gorm" + "strings" ) + +type ReceivedMessage struct { + Topic string + Message []byte +} + +var ReceivedMessageChan chan *ReceivedMessage + +func init() { + ReceivedMessageChan = make(chan *ReceivedMessage, 1000) +} type MsgHandler interface { HandleMessage(data []byte) (err error) @@ -65,3 +82,47 @@ } return nil } + +type PlcAddress struct { + Topic string +} + +func (slf *PlcAddress) HandleMessage(data []byte) (err error) { + logx.Infof("get an PlcAddress message :%s", data) + var resp = new(common.ResponsePlcAddress) + err = json.Unmarshal(data, &resp) + if err != nil { + logx.Errorf("ScheduleTask HandleMessage Unmarshal json err: %v", err.Error()) + return err + } + //鍐欏叆鍒版枃浠� + err = file.WriteFile(fmt.Sprintf("%s%s", constvar.PlcAddressDataPath, constvar.PlcAddressDataKeyFileName), resp.KeyData) + if err != nil { + return err + } + err = file.WriteFile(fmt.Sprintf("%s%s", constvar.PlcAddressDataPath, constvar.PlcAddressDataValueFileName), resp.AddressData) + if err != nil { + return err + } + //鍐欏叆鍒板唴瀛� + keyString := string(resp.KeyData) + addressString := string(resp.AddressData) + + keys := strings.Split(keyString, "\n") + addresses := strings.Split(addressString, "\n") + if len(keys) != len(addresses) { + logx.Error("plc address message error: key length not equal address length") + return nil + } + for i := 0; i < len(keys); i++ { + key := strings.ReplaceAll(keys[i], "\r", "") + address := cast.ToInt(strings.ReplaceAll(addresses[i], "\r", "")) + plc_address.Set(key, address) + } + //閫氱煡鍥炲鏀跺埌 + ReceivedMessageChan <- &ReceivedMessage{ + Topic: slf.Topic, + Message: data, + } + return nil +} diff --git a/nsq/nsq.go b/nsq/nsq.go index 542f31f..2710e13 100644 --- a/nsq/nsq.go +++ b/nsq/nsq.go @@ -2,9 +2,13 @@ import ( "apsClient/conf" + "apsClient/constvar" + "apsClient/model/common" + "apsClient/pkg/logx" "apsClient/pkg/safe" "errors" "fmt" + "time" ) func Init() error { @@ -17,7 +21,20 @@ } safe.Go(func() { - _ = Consume(fmt.Sprintf("aps.%v.scheduleTask", conf.Conf.NsqConf.NodeId), conf.Conf.System.DeviceId) + caller := NewCaller(fmt.Sprintf(constvar.NsqTopicGetPlcAddress, conf.Conf.NsqConf.NodeId), fmt.Sprintf(constvar.NsqTopicSendPlcAddress, conf.Conf.NsqConf.NodeId)) + var addressResult common.ResponsePlcAddress + err := caller.Call(common.RequestPlcAddress{DeviceId: conf.Conf.System.DeviceId}, &addressResult, time.Second*2) + if err != nil { + logx.Infof("SendParams2 err: %v", err.Error()) + } + }) + + safe.Go(func() { + _ = Consume(fmt.Sprintf(constvar.NsqTopicScheduleTask, conf.Conf.NsqConf.NodeId), conf.Conf.System.DeviceId) + }) + + safe.Go(func() { + _ = Consume(fmt.Sprintf(constvar.NsqTopicSendPlcAddress, conf.Conf.NsqConf.NodeId), conf.Conf.System.DeviceId) }) return nil diff --git a/service/plc_address/address_map.go b/service/plc_address/address_map.go new file mode 100644 index 0000000..2020f2e --- /dev/null +++ b/service/plc_address/address_map.go @@ -0,0 +1,70 @@ +package plc_address + +import ( + "apsClient/constvar" + "apsClient/pkg/logx" + "apsClient/utils/file" + "fmt" + "github.com/spf13/cast" + "os" + "strings" + "sync" +) + +type addressMap struct { + store sync.Map +} + +var defaultAddressMap *addressMap + +func newAddressMap() *addressMap { + return &addressMap{store: sync.Map{}} +} + +func init() { + defaultAddressMap = newAddressMap() + LoadAddressFromFile() +} + +func Set(key string, value int) { + defaultAddressMap.store.Store(key, value) +} + +func Get(key string) (value int, ok bool) { + if v, ok := defaultAddressMap.store.Load(key); ok { + return v.(int), ok + } + return 0, false +} + +func LoadAddressFromFile() (loadOk bool) { + keyFileName := fmt.Sprintf("%s%s", constvar.PlcAddressDataPath, constvar.PlcAddressDataKeyFileName) + addressFileName := fmt.Sprintf("%s%s", constvar.PlcAddressDataPath, constvar.PlcAddressDataValueFileName) + if !file.Exists(keyFileName) || !file.Exists(addressFileName) { + return + } + keyData, err := os.ReadFile(keyFileName) + if err != nil { + logx.Errorf("LoadAddressFromFile ReadFile err: %v", err.Error()) + return + } + addressData, err := os.ReadFile(addressFileName) + if err != nil { + logx.Errorf("LoadAddressFromFile ReadFile err: %v", err.Error()) + return + } + keyString := string(keyData) + addressString := string(addressData) + keys := strings.Split(keyString, "\n") + addresses := strings.Split(addressString, "\n") + if len(keys) != len(addresses) { + logx.Error("plc address message error: key length not equal address length") + return + } + for i := 0; i < len(keys); i++ { + key := strings.ReplaceAll(keys[i], "\r", "") + address := cast.ToInt(strings.ReplaceAll(addresses[i], "\r", "")) + Set(key, address) + } + return true +} diff --git a/test/msg_handler_test.go b/test/msg_handler_test.go index c6a2c54..d2b0566 100644 --- a/test/msg_handler_test.go +++ b/test/msg_handler_test.go @@ -1,7 +1,7 @@ package test import ( - "apsClient/model/request" + "apsClient/model/common" "apsClient/nsq" "encoding/json" "fmt" @@ -13,30 +13,30 @@ func TestHandleMessage(t *testing.T) { Init() - var tasks = make([]*request.DeliverScheduleTask, 0) - startTime, _ := time.ParseInLocation("2006-01-02 15:04", "2023-08-19 08:00", time.Local) - endTime, _ := time.ParseInLocation("2006-01-02 15:04", "2023-08-20 12:00", time.Local) + var tasks = make([]*common.DeliverScheduleTask, 0) + startTime, _ := time.ParseInLocation("2006-01-02 15:04", "2023-08-22 08:00", time.Local) + endTime, _ := time.ParseInLocation("2006-01-02 15:04", "2023-08-23 12:00", time.Local) fmt.Println(startTime) fmt.Println(startTime.Unix()) - startTime1, _ := time.ParseInLocation("2006-01-02 15:04", "2023-08-19 08:00", time.Local) - endTime1, _ := time.ParseInLocation("2006-01-02 15:04", "2023-08-19 18:00", time.Local) + startTime1, _ := time.ParseInLocation("2006-01-02 15:04", "2023-08-22 08:00", time.Local) + endTime1, _ := time.ParseInLocation("2006-01-02 15:04", "2023-08-22 18:00", time.Local) - startTime2, _ := time.ParseInLocation("2006-01-02 15:04", "2023-08-19 18:00", time.Local) - endTime2, _ := time.ParseInLocation("2006-01-02 15:04", "2023-08-20 06:00", time.Local) + startTime2, _ := time.ParseInLocation("2006-01-02 15:04", "2023-08-22 18:00", time.Local) + endTime2, _ := time.ParseInLocation("2006-01-02 15:04", "2023-08-23 06:00", time.Local) - inputMaterials := []*request.ProcedureMaterial{{ + inputMaterials := []*common.ProcedureMaterial{{ MaterialID: "MaterialID 1", MaterialName: "杩欐槸涓�涓緭鍏ョ墿鏂欏悕", Amount: decimal.NewFromFloat(100), Unit: "浠�", }} - outputMaterials := []*request.ProcedureMaterial{{ + outputMaterials := []*common.ProcedureMaterial{{ MaterialID: "MaterialID", MaterialName: "杩欐槸涓�涓緭鍑虹墿鏂欏悕", Amount: decimal.NewFromFloat(20), Unit: "浠�", }} - workers := []*request.ProcedureWorker{{ + workers := []*common.ProcedureWorker{{ WorkerID: "WorkerID 1", WorkerName: "寮犱笁", PhoneNum: "18800000000", @@ -51,8 +51,8 @@ EndTime: endTime2.Unix(), }} - task1 := request.DeliverScheduleTask{ - WorkOrder: request.WorkOrder{ + task1 := common.DeliverScheduleTask{ + WorkOrder: common.WorkOrder{ WorkOrderID: "WorkOrderID 1", OrderID: "OrderID 1", ProductID: "ProductID 1", @@ -66,10 +66,10 @@ StartTime: startTime.Unix(), EndTime: endTime.Unix(), }, - Procedures: []*request.ProductProcedure{{ + Procedures: []*common.ProductProcedure{{ ProcedureID: "ProcedureID 1", ProcedureName: "ProcedureName 1", - DeviceID: "DeviceID 1", + DeviceID: "DeviceID1", DeviceName: "DeviceName 1", StartTime: startTime.Unix(), EndTime: endTime.Unix(), @@ -79,7 +79,7 @@ Workers: workers, }, {ProcedureID: "ProcedureID 2", ProcedureName: "ProcedureName 2", - DeviceID: "DeviceID 1", + DeviceID: "DeviceID1", DeviceName: "DeviceName 1", StartTime: startTime.Unix(), EndTime: endTime.Unix(), @@ -89,8 +89,8 @@ Workers: workers, }}, } - task2 := request.DeliverScheduleTask{ - WorkOrder: request.WorkOrder{ + task2 := common.DeliverScheduleTask{ + WorkOrder: common.WorkOrder{ WorkOrderID: "WorkOrderID 2", OrderID: "OrderID 2", ProductID: "ProductID 2", @@ -104,10 +104,10 @@ StartTime: startTime.Unix(), EndTime: endTime.Unix(), }, - Procedures: []*request.ProductProcedure{{ + Procedures: []*common.ProductProcedure{{ ProcedureID: "ProcedureID 3", ProcedureName: "ProcedureName 3", - DeviceID: "DeviceID 1", + DeviceID: "DeviceID1", DeviceName: "DeviceName 1", StartTime: startTime.Unix(), EndTime: endTime.Unix(), @@ -118,7 +118,7 @@ }, { ProcedureID: "ProcedureID 4", ProcedureName: "ProcedureName 4", - DeviceID: "DeviceID 1", + DeviceID: "DeviceID1", DeviceName: "DeviceName 1", StartTime: startTime.Unix(), EndTime: endTime.Unix(), diff --git a/utils/file/file.go b/utils/file/file.go new file mode 100644 index 0000000..7e2dcb1 --- /dev/null +++ b/utils/file/file.go @@ -0,0 +1,56 @@ +package file + +import ( + "errors" + "os" + "path/filepath" +) + +func WriteFile(filename string, content []byte) error { + path := filepath.Dir(filename) + if !IsDir(path) { + err := os.Mkdir(path, os.ModeDir) + if err != nil { + return err + } + } + dstFile, err := os.OpenFile(filename, os.O_CREATE|os.O_TRUNC, 0777) + if err != nil { + return err + } + _, err = dstFile.Write(content) + return err +} + +// Exists 鍒ゆ柇鎵�缁欒矾寰勬枃浠�/鏂囦欢澶规槸鍚﹀瓨鍦� +func Exists(path string) bool { + _, err := os.Stat(path) //os.Stat鑾峰彇鏂囦欢淇℃伅 + if err != nil { + if os.IsExist(err) { + return true + } + return false + } + return true +} + +// IsDir 鍒ゆ柇鎵�缁欒矾寰勬槸鍚︿负鏂囦欢澶� +func IsDir(path string) bool { + s, err := os.Stat(path) + if err != nil { + return false + } + return s.IsDir() +} + +// IsFile 鍒ゆ柇鎵�缁欒矾寰勬槸鍚︿负鏂囦欢 +func IsFile(path string) bool { + return !IsDir(path) +} + +func ReadFile(filename string) (content []byte, err error) { + if !Exists(filename) { + return nil, errors.New("file not exist") + } + return os.ReadFile(filename) +} -- Gitblit v1.8.0