zhangqian
2023-08-22 b483b294741920e90815c2d1f4c6827d9921310c
plc写入地址文件通过nsq从云端获取
5个文件已添加
8个文件已修改
417 ■■■■■ 已修改文件
api/v1/task.go 87 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
conf/apsClient.json 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
constvar/const.go 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
datafile/plc_address_key 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
datafile/plc_address_value 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
model/common/common.go 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsq/caller.go 48 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsq/consumer.go 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsq/msg_handler.go 61 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsq/nsq.go 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service/plc_address/address_map.go 70 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/msg_handler_test.go 42 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/file/file.go 56 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
api/v1/task.go
@@ -2,15 +2,19 @@
import (
    "apsClient/conf"
    "apsClient/constvar"
    "apsClient/model"
    "apsClient/model/common"
    "apsClient/model/response"
    _ "apsClient/model/response"
    "apsClient/nsq"
    "apsClient/pkg/contextx"
    "apsClient/pkg/convertx"
    "apsClient/pkg/ecode"
    "apsClient/pkg/logx"
    "apsClient/pkg/plc"
    "apsClient/pkg/safe"
    "apsClient/service"
    "apsClient/service/plc_address"
    "errors"
    "fmt"
    "github.com/gin-gonic/gin"
@@ -62,7 +66,6 @@
        ctx.Fail(code)
        return
    }
    if procedure.Status != model.ProcedureStatusUnFinished {
        ctx.FailWithMsg(ecode.ParamsErr, "该工序已结束")
        return
@@ -73,7 +76,6 @@
        ctx.Fail(ecode.UnknownErr)
        return
    }
    params := service.GetProcessModelParams{
        WorkOrder: procedure.WorkOrderID,
        OrderId:   procedure.OrderID,
@@ -81,9 +83,7 @@
        Procedure: procedure.ProceduresInfo.ProcedureName,
        Device:    procedure.ProceduresInfo.DeviceName,
    }
    resp, err := service.ProcessModel{}.GetProcessModel(params)
    if err != nil {
        logx.Errorf("TaskStart Notice GetProcessModel error: %v", err.Error())
        ctx.FailWithMsg(ecode.UnknownErr, "未获取到工艺参数")
@@ -96,12 +96,12 @@
            Value: v,
        })
    }
    response := response.ProcessParamsResponse{
    data := response.ProcessParamsResponse{
        Number: resp.Number,
        Params: processParamsArr,
    }
    logx.Infof("TaskStart Notice GetProcessModel: %+v", resp)
    ctx.OkWithDetailed(response)
    ctx.OkWithDetailed(data)
}
// TaskFinish
@@ -166,11 +166,11 @@
    }
    params := service.GetProcessModelParams{
        WorkOrder: "",
        WorkOrder: procedure.WorkOrderID,
        OrderId:   procedure.OrderID,
        Product:   order.ProductName,
        Procedure: procedure.ProceduresInfo.ProcedureName,
        Device:    procedure.ProceduresInfo.DeviceID,
        Device:    procedure.ProceduresInfo.DeviceName,
    }
    resp, err := service.ProcessModel{}.GetProcessModel(params)
@@ -180,12 +180,12 @@
        return
    }
    fmt.Println("----------------开始下发工艺参数-----------------")
    for k, v := range resp.ParamsMap {
        fmt.Println(fmt.Sprintf("%v : %v", k, v))
        time.Sleep(time.Millisecond * 300)
    }
    fmt.Println("----------------下发工艺参数完毕-----------------")
    //fmt.Println("----------------开始下发工艺参数-----------------")
    //for k, v := range resp.ParamsMap {
    //    fmt.Println(fmt.Sprintf("%v : %v", k, v))
    //    time.Sleep(time.Millisecond * 300)
    //}
    //fmt.Println("----------------下发工艺参数完毕-----------------")
    //err = SendParams(resp.ParamsMap)
    //if err != nil {
@@ -194,10 +194,20 @@
    //    return
    //}
    safe.Go(func() {
        err = SendParams2(resp.ParamsMap, 0)
        if err != nil {
            logx.Errorf("SendProcessParams: %v", err.Error())
            return
        }
    })
    ctx.Ok()
}
func SendParams(paramsMap map[string]interface{}) error {
    if len(paramsMap) == 0 {
        return errors.New("empty params")
    }
    plcAddressMap := make(map[string]*int, len(conf.Conf.PLCAddresses))
    for _, item := range conf.Conf.PLCAddresses {
        plcAddressMap[item.FieldName] = &item.Address
@@ -227,3 +237,50 @@
    logx.Info("----------------下发工艺参数完毕-----------------")
    return nil
}
func SendParams2(paramsMap map[string]interface{}, tryTimes int) error {
    if len(paramsMap) == 0 {
        return errors.New("empty params")
    }
    if tryTimes > 2 {
        return errors.New("beyond max try time")
    }
    plcConfig, code := service.NewDevicePlcService().GetDevicePlc()
    if code != ecode.OK {
        return errors.New("请先配置PLC")
    }
    conn, err := plc.NewModbusConnection(plcConfig.Address)
    if err != nil {
        return errors.New(fmt.Sprintf("连接plc失败: %v", err.Error()))
    }
    defer conn.Close()
    logx.Info("----------------开始下发工艺参数-----------------")
    var missNumbers int
    for k, v := range paramsMap {
        if address, ok := plc_address.Get(k); ok {
            result, err := plc.WriteHoldingRegister(conn, address, v)
            if err != nil {
                logx.Errorf("WriteHoldingRegister err:%v, address: %v, key: %v value: %v", err.Error(), address, k, v)
            } else {
                delete(paramsMap, k)
                logx.Infof("WriteHoldingRegister ok: key: %v, value: %v, result: %v", k, v, result)
            }
        } else {
            missNumbers++
        }
    }
    if missNumbers >= 1 {
        caller := nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicGetPlcAddress, conf.Conf.NsqConf.NodeId), fmt.Sprintf(constvar.NsqTopicSendPlcAddress, conf.Conf.NsqConf.NodeId))
        var addressResult common.ResponsePlcAddress
        err := caller.Call(common.RequestPlcAddress{DeviceId: conf.Conf.System.DeviceId}, &addressResult, time.Second*2)
        if err != nil {
            logx.Infof("SendParams2 err: %v", err.Error())
            return err
        }
        tryTimes++
        return SendParams2(paramsMap, tryTimes)
    }
    logx.Info("----------------下发工艺参数完毕-----------------")
    return nil
}
conf/apsClient.json
@@ -2,7 +2,7 @@
  "system": {
    "env": "develop",
    "port": 8003,
    "deviceId": "DeviceID 1",
    "deviceId": "DeviceID1",
    "netSetShellPath": "/data/network/",
    "netUpShellName": "up.sh",
    "netDownShellName": "down.sh"
constvar/const.go
@@ -1,7 +1,9 @@
package constvar
const (
    NsqTopicScheduleTask = "aps.%v.scheduleTask" //排程任务下发
    NsqTopicScheduleTask   = "aps.%v.scheduleTask" //排程任务下发
    NsqTopicGetPlcAddress  = "aps.%v.getPlcAddress"
    NsqTopicSendPlcAddress = "aps.%v.sendPlcAddress"
)
type PlcStartAddressType string
@@ -9,3 +11,9 @@
const (
    PlcStartAddressTypeFinishNumber = "1"
)
const (
    PlcAddressDataPath          = "datafile/"
    PlcAddressDataKeyFileName   = "plc_address_key"
    PlcAddressDataValueFileName = "plc_address_value"
)
datafile/plc_address_key
New file
@@ -0,0 +1,3 @@
压强
时间
温度
datafile/plc_address_value
New file
@@ -0,0 +1,3 @@
1000
1001
1002
model/common/common.go
@@ -54,3 +54,14 @@
        Procedures []*ProductProcedure `json:"procedures"` // 工序列表
    }
)
// PLC写入地址
type (
    RequestPlcAddress struct {
        DeviceId string
    }
    ResponsePlcAddress struct {
        KeyData     []byte
        AddressData []byte
    }
)
nsq/caller.go
New file
@@ -0,0 +1,48 @@
package nsq
import (
    "encoding/json"
    "errors"
    "time"
)
type Caller interface {
    Call(msg []byte, duration time.Duration) ([]byte, error)
}
type DefaultCaller struct {
    NsqChannel    string
    RequestTopic  string
    ResponseTopic string
}
func NewCaller(requestTopic, responseTopic string) *DefaultCaller {
    return &DefaultCaller{
        NsqChannel:    "",
        RequestTopic:  requestTopic,
        ResponseTopic: responseTopic,
    }
}
func (caller *DefaultCaller) Call(input interface{}, output interface{}, timeout time.Duration) error {
    msg, err := json.Marshal(input)
    if err != nil {
        return err
    }
    producer := GetProducer()
    err = producer.Publish(caller.RequestTopic, msg)
    if err != nil {
        return err
    }
    to := time.After(timeout)
    for {
        select {
        case <-to:
            return errors.New("timeout")
        case data := <-ReceivedMessageChan:
            if data.Topic == caller.ResponseTopic {
                return json.Unmarshal(data.Message, &output)
            }
        }
    }
}
nsq/consumer.go
@@ -2,6 +2,7 @@
import (
    "apsClient/conf"
    "apsClient/constvar"
    "apsClient/pkg/logx"
    "apsClient/pkg/nsqclient"
    "context"
@@ -17,8 +18,10 @@
    logx.Infof("Consume NewNsqConsumer topic:%v", topic)
    var handler MsgHandler
    switch topic {
    case fmt.Sprintf("aps.%v.scheduleTask", conf.Conf.NsqConf.NodeId):
    case fmt.Sprintf(constvar.NsqTopicScheduleTask, conf.Conf.NsqConf.NodeId):
        handler = new(ScheduleTask)
    case fmt.Sprintf(constvar.NsqTopicSendPlcAddress, conf.Conf.NsqConf.NodeId):
        handler = &PlcAddress{Topic: topic}
    }
    c.AddHandler(handler.HandleMessage)
nsq/msg_handler.go
@@ -2,13 +2,30 @@
import (
    "apsClient/conf"
    "apsClient/constvar"
    "apsClient/model"
    "apsClient/model/common"
    "apsClient/pkg/logx"
    "apsClient/pkg/structx"
    "apsClient/service/plc_address"
    "apsClient/utils/file"
    "encoding/json"
    "fmt"
    "github.com/spf13/cast"
    "gorm.io/gorm"
    "strings"
)
type ReceivedMessage struct {
    Topic   string
    Message []byte
}
var ReceivedMessageChan chan *ReceivedMessage
func init() {
    ReceivedMessageChan = make(chan *ReceivedMessage, 1000)
}
type MsgHandler interface {
    HandleMessage(data []byte) (err error)
@@ -65,3 +82,47 @@
    }
    return nil
}
type PlcAddress struct {
    Topic string
}
func (slf *PlcAddress) HandleMessage(data []byte) (err error) {
    logx.Infof("get an PlcAddress message :%s", data)
    var resp = new(common.ResponsePlcAddress)
    err = json.Unmarshal(data, &resp)
    if err != nil {
        logx.Errorf("ScheduleTask HandleMessage Unmarshal json err: %v", err.Error())
        return err
    }
    //写入到文件
    err = file.WriteFile(fmt.Sprintf("%s%s", constvar.PlcAddressDataPath, constvar.PlcAddressDataKeyFileName), resp.KeyData)
    if err != nil {
        return err
    }
    err = file.WriteFile(fmt.Sprintf("%s%s", constvar.PlcAddressDataPath, constvar.PlcAddressDataValueFileName), resp.AddressData)
    if err != nil {
        return err
    }
    //写入到内存
    keyString := string(resp.KeyData)
    addressString := string(resp.AddressData)
    keys := strings.Split(keyString, "\n")
    addresses := strings.Split(addressString, "\n")
    if len(keys) != len(addresses) {
        logx.Error("plc address message error: key length not equal address length")
        return nil
    }
    for i := 0; i < len(keys); i++ {
        key := strings.ReplaceAll(keys[i], "\r", "")
        address := cast.ToInt(strings.ReplaceAll(addresses[i], "\r", ""))
        plc_address.Set(key, address)
    }
    //通知回复收到
    ReceivedMessageChan <- &ReceivedMessage{
        Topic:   slf.Topic,
        Message: data,
    }
    return nil
}
nsq/nsq.go
@@ -2,9 +2,13 @@
import (
    "apsClient/conf"
    "apsClient/constvar"
    "apsClient/model/common"
    "apsClient/pkg/logx"
    "apsClient/pkg/safe"
    "errors"
    "fmt"
    "time"
)
func Init() error {
@@ -17,7 +21,20 @@
    }
    safe.Go(func() {
        _ = Consume(fmt.Sprintf("aps.%v.scheduleTask", conf.Conf.NsqConf.NodeId), conf.Conf.System.DeviceId)
        caller := NewCaller(fmt.Sprintf(constvar.NsqTopicGetPlcAddress, conf.Conf.NsqConf.NodeId), fmt.Sprintf(constvar.NsqTopicSendPlcAddress, conf.Conf.NsqConf.NodeId))
        var addressResult common.ResponsePlcAddress
        err := caller.Call(common.RequestPlcAddress{DeviceId: conf.Conf.System.DeviceId}, &addressResult, time.Second*2)
        if err != nil {
            logx.Infof("SendParams2 err: %v", err.Error())
        }
    })
    safe.Go(func() {
        _ = Consume(fmt.Sprintf(constvar.NsqTopicScheduleTask, conf.Conf.NsqConf.NodeId), conf.Conf.System.DeviceId)
    })
    safe.Go(func() {
        _ = Consume(fmt.Sprintf(constvar.NsqTopicSendPlcAddress, conf.Conf.NsqConf.NodeId), conf.Conf.System.DeviceId)
    })
    return nil
service/plc_address/address_map.go
New file
@@ -0,0 +1,70 @@
package plc_address
import (
    "apsClient/constvar"
    "apsClient/pkg/logx"
    "apsClient/utils/file"
    "fmt"
    "github.com/spf13/cast"
    "os"
    "strings"
    "sync"
)
type addressMap struct {
    store sync.Map
}
var defaultAddressMap *addressMap
func newAddressMap() *addressMap {
    return &addressMap{store: sync.Map{}}
}
func init() {
    defaultAddressMap = newAddressMap()
    LoadAddressFromFile()
}
func Set(key string, value int) {
    defaultAddressMap.store.Store(key, value)
}
func Get(key string) (value int, ok bool) {
    if v, ok := defaultAddressMap.store.Load(key); ok {
        return v.(int), ok
    }
    return 0, false
}
func LoadAddressFromFile() (loadOk bool) {
    keyFileName := fmt.Sprintf("%s%s", constvar.PlcAddressDataPath, constvar.PlcAddressDataKeyFileName)
    addressFileName := fmt.Sprintf("%s%s", constvar.PlcAddressDataPath, constvar.PlcAddressDataValueFileName)
    if !file.Exists(keyFileName) || !file.Exists(addressFileName) {
        return
    }
    keyData, err := os.ReadFile(keyFileName)
    if err != nil {
        logx.Errorf("LoadAddressFromFile ReadFile err: %v", err.Error())
        return
    }
    addressData, err := os.ReadFile(addressFileName)
    if err != nil {
        logx.Errorf("LoadAddressFromFile ReadFile err: %v", err.Error())
        return
    }
    keyString := string(keyData)
    addressString := string(addressData)
    keys := strings.Split(keyString, "\n")
    addresses := strings.Split(addressString, "\n")
    if len(keys) != len(addresses) {
        logx.Error("plc address message error: key length not equal address length")
        return
    }
    for i := 0; i < len(keys); i++ {
        key := strings.ReplaceAll(keys[i], "\r", "")
        address := cast.ToInt(strings.ReplaceAll(addresses[i], "\r", ""))
        Set(key, address)
    }
    return true
}
test/msg_handler_test.go
@@ -1,7 +1,7 @@
package test
import (
    "apsClient/model/request"
    "apsClient/model/common"
    "apsClient/nsq"
    "encoding/json"
    "fmt"
@@ -13,30 +13,30 @@
func TestHandleMessage(t *testing.T) {
    Init()
    var tasks = make([]*request.DeliverScheduleTask, 0)
    startTime, _ := time.ParseInLocation("2006-01-02 15:04", "2023-08-19 08:00", time.Local)
    endTime, _ := time.ParseInLocation("2006-01-02 15:04", "2023-08-20 12:00", time.Local)
    var tasks = make([]*common.DeliverScheduleTask, 0)
    startTime, _ := time.ParseInLocation("2006-01-02 15:04", "2023-08-22 08:00", time.Local)
    endTime, _ := time.ParseInLocation("2006-01-02 15:04", "2023-08-23 12:00", time.Local)
    fmt.Println(startTime)
    fmt.Println(startTime.Unix())
    startTime1, _ := time.ParseInLocation("2006-01-02 15:04", "2023-08-19 08:00", time.Local)
    endTime1, _ := time.ParseInLocation("2006-01-02 15:04", "2023-08-19 18:00", time.Local)
    startTime1, _ := time.ParseInLocation("2006-01-02 15:04", "2023-08-22 08:00", time.Local)
    endTime1, _ := time.ParseInLocation("2006-01-02 15:04", "2023-08-22 18:00", time.Local)
    startTime2, _ := time.ParseInLocation("2006-01-02 15:04", "2023-08-19 18:00", time.Local)
    endTime2, _ := time.ParseInLocation("2006-01-02 15:04", "2023-08-20 06:00", time.Local)
    startTime2, _ := time.ParseInLocation("2006-01-02 15:04", "2023-08-22 18:00", time.Local)
    endTime2, _ := time.ParseInLocation("2006-01-02 15:04", "2023-08-23 06:00", time.Local)
    inputMaterials := []*request.ProcedureMaterial{{
    inputMaterials := []*common.ProcedureMaterial{{
        MaterialID:   "MaterialID 1",
        MaterialName: "这是一个输入物料名",
        Amount:       decimal.NewFromFloat(100),
        Unit:         "件",
    }}
    outputMaterials := []*request.ProcedureMaterial{{
    outputMaterials := []*common.ProcedureMaterial{{
        MaterialID:   "MaterialID",
        MaterialName: "这是一个输出物料名",
        Amount:       decimal.NewFromFloat(20),
        Unit:         "件",
    }}
    workers := []*request.ProcedureWorker{{
    workers := []*common.ProcedureWorker{{
        WorkerID:   "WorkerID 1",
        WorkerName: "张三",
        PhoneNum:   "18800000000",
@@ -51,8 +51,8 @@
            EndTime:    endTime2.Unix(),
        }}
    task1 := request.DeliverScheduleTask{
        WorkOrder: request.WorkOrder{
    task1 := common.DeliverScheduleTask{
        WorkOrder: common.WorkOrder{
            WorkOrderID: "WorkOrderID 1",
            OrderID:     "OrderID 1",
            ProductID:   "ProductID 1",
@@ -66,10 +66,10 @@
            StartTime:   startTime.Unix(),
            EndTime:     endTime.Unix(),
        },
        Procedures: []*request.ProductProcedure{{
        Procedures: []*common.ProductProcedure{{
            ProcedureID:     "ProcedureID 1",
            ProcedureName:   "ProcedureName 1",
            DeviceID:        "DeviceID 1",
            DeviceID:        "DeviceID1",
            DeviceName:      "DeviceName 1",
            StartTime:       startTime.Unix(),
            EndTime:         endTime.Unix(),
@@ -79,7 +79,7 @@
            Workers:         workers,
        }, {ProcedureID: "ProcedureID 2",
            ProcedureName:   "ProcedureName 2",
            DeviceID:        "DeviceID 1",
            DeviceID:        "DeviceID1",
            DeviceName:      "DeviceName 1",
            StartTime:       startTime.Unix(),
            EndTime:         endTime.Unix(),
@@ -89,8 +89,8 @@
            Workers:         workers,
        }},
    }
    task2 := request.DeliverScheduleTask{
        WorkOrder: request.WorkOrder{
    task2 := common.DeliverScheduleTask{
        WorkOrder: common.WorkOrder{
            WorkOrderID: "WorkOrderID 2",
            OrderID:     "OrderID 2",
            ProductID:   "ProductID 2",
@@ -104,10 +104,10 @@
            StartTime:   startTime.Unix(),
            EndTime:     endTime.Unix(),
        },
        Procedures: []*request.ProductProcedure{{
        Procedures: []*common.ProductProcedure{{
            ProcedureID:     "ProcedureID 3",
            ProcedureName:   "ProcedureName 3",
            DeviceID:        "DeviceID 1",
            DeviceID:        "DeviceID1",
            DeviceName:      "DeviceName 1",
            StartTime:       startTime.Unix(),
            EndTime:         endTime.Unix(),
@@ -118,7 +118,7 @@
        }, {
            ProcedureID:     "ProcedureID 4",
            ProcedureName:   "ProcedureName 4",
            DeviceID:        "DeviceID 1",
            DeviceID:        "DeviceID1",
            DeviceName:      "DeviceName 1",
            StartTime:       startTime.Unix(),
            EndTime:         endTime.Unix(),
utils/file/file.go
New file
@@ -0,0 +1,56 @@
package file
import (
    "errors"
    "os"
    "path/filepath"
)
func WriteFile(filename string, content []byte) error {
    path := filepath.Dir(filename)
    if !IsDir(path) {
        err := os.Mkdir(path, os.ModeDir)
        if err != nil {
            return err
        }
    }
    dstFile, err := os.OpenFile(filename, os.O_CREATE|os.O_TRUNC, 0777)
    if err != nil {
        return err
    }
    _, err = dstFile.Write(content)
    return err
}
// Exists 判断所给路径文件/文件夹是否存在
func Exists(path string) bool {
    _, err := os.Stat(path) //os.Stat获取文件信息
    if err != nil {
        if os.IsExist(err) {
            return true
        }
        return false
    }
    return true
}
// IsDir 判断所给路径是否为文件夹
func IsDir(path string) bool {
    s, err := os.Stat(path)
    if err != nil {
        return false
    }
    return s.IsDir()
}
// IsFile 判断所给路径是否为文件
func IsFile(path string) bool {
    return !IsDir(path)
}
func ReadFile(filename string) (content []byte, err error) {
    if !Exists(filename) {
        return nil, errors.New("file not exist")
    }
    return os.ReadFile(filename)
}