package plc import ( "apsClient/conf" "apsClient/pkg/logx" "context" "errors" "fmt" apiModel "github.com/apache/plc4x/plc4go/pkg/api/model" "github.com/spf13/cast" "sync" "sync/atomic" "time" plc4go "github.com/apache/plc4x/plc4go/pkg/api" "github.com/apache/plc4x/plc4go/pkg/api/drivers" "github.com/apache/plc4x/plc4go/pkg/api/transports" ) type ConnectionManager struct { connections map[string]plc4go.PlcConnection mu sync.Mutex } func newPlcConnectionManager() *ConnectionManager { return &ConnectionManager{ connections: make(map[string]plc4go.PlcConnection), } } func (cm *ConnectionManager) GetConnection(address string) (plc4go.PlcConnection, bool) { cm.mu.Lock() defer cm.mu.Unlock() conn, ok := cm.connections[address] if !ok { return nil, false } //if ok, _ := cm.CheckConnect(conn, time.Second); !ok { // conn.Close() //} return conn, true } var connectionManager = newPlcConnectionManager() func (cm *ConnectionManager) AddConnection(address string, connection plc4go.PlcConnection) { cm.mu.Lock() defer cm.mu.Unlock() cm.connections[address] = connection } func (cm *ConnectionManager) CheckConnect(conn plc4go.PlcConnection, timeout time.Duration) (bool, error) { pingCh := conn.Ping() timer := time.NewTimer(timeout) select { case err := <-pingCh: if err == nil { return true, nil } return false, err.GetErr() case <-timer.C: return false, fmt.Errorf("connection timed out after %s", timeout) } } func GetModbusConnection(ipAddr string) (plc4go.PlcConnection, error) { if conn, ok := connectionManager.GetConnection(ipAddr); ok { return conn, nil } // 创建一个上下文,并设置 3 秒超时 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() conn, err := newGetModbusConnection(ctx, ipAddr) if err != nil { logx.Errorf("new modbus connection err: %v", err.Error()) return nil, err } connectionManager.AddConnection(ipAddr, conn) return conn, nil } func newGetModbusConnection(ctx context.Context, ipAddr string) (plc4go.PlcConnection, error) { // 创建驱动管理器 driverManager := plc4go.NewPlcDriverManager() // 注册TCP传输 transports.RegisterTcpTransport(driverManager) // 注册驱动 //drivers.RegisterKnxDriver(driverManager) drivers.RegisterModbusTcpDriver(driverManager) // 通过TCP连接PLC设备 connectionString := fmt.Sprintf("modbus-tcp://%s", ipAddr) connectionRequestChanel := driverManager.GetConnection(connectionString) // 等待连接响应,同时考虑上下文的超时 select { case connectionResult := <-connectionRequestChanel: if err := connectionResult.GetErr(); err != nil { return nil, err } return connectionResult.GetConnection(), nil case <-ctx.Done(): return nil, ctx.Err() } } func readHoldingRegisterSingle(connection plc4go.PlcConnection, address int) ([]byte, error) { tag := fmt.Sprintf("tag:%v", address) tagAddress := getTagAddress(address, 1) // 读模式 readRequest, err := connection.ReadRequestBuilder().AddTagAddress(tag, tagAddress).Build() if err != nil { logx.Errorf("preparing read-request:%s\n", err.Error()) return nil, err } // 执行 readResult := <-readRequest.Execute() if err := readResult.GetErr(); err != nil { logx.Errorf("execting read-request:%s\n", err.Error()) return nil, err } // 判断响应码是否正确 if readResult.GetResponse().GetResponseCode(tag) != apiModel.PlcResponseCode_OK { logx.Errorf("error an non-ok return code: %s", readResult.GetResponse().GetResponseCode(tag).GetName()) return nil, nil } value := readResult.GetResponse().GetValue(tag) return value.GetRaw(), err } func readHoldingRegisterList(connection plc4go.PlcConnection, address, length int) ([]byte, error) { tag := fmt.Sprintf("tag:%v:%v", address, length) tagAddress := getTagAddress(address, length) // 读模式 readRequest, err := connection.ReadRequestBuilder().AddTagAddress(tag, tagAddress).Build() if err != nil { logx.Errorf("preparing read-request:%s\n", err.Error()) return nil, err } // 执行 readResult := <-readRequest.Execute() if err := readResult.GetErr(); err != nil { logx.Errorf("plc4x execute read-request:%s\n", err.Error()) return nil, err } // 判断响应码是否正确 if readResult.GetResponse().GetResponseCode(tag) != apiModel.PlcResponseCode_OK { logx.Errorf("plc4x response error code: %s", readResult.GetResponse().GetResponseCode(tag).GetName()) return nil, errors.New("error code: " + readResult.GetResponse().GetResponseCode(tag).GetName()) } value := readResult.GetResponse().GetValue(tag) var result []byte for _, val := range value.GetList() { result = append(result, val.GetRaw()...) } return result, nil } func ReadHoldingRegister(connection plc4go.PlcConnection, address, length int) ([]byte, error) { if length > 1 { return readHoldingRegisterList(connection, address, length) } return readHoldingRegisterSingle(connection, address) } func getTagAddress(address int, length int) string { intType := conf.Conf.PLC.ModbusIntType if intType == "" { intType = "DINT" } if length == 1 { return fmt.Sprintf("holding-register:%d:%v", address, intType) } else { return fmt.Sprintf("holding-register:%d:%v[%d]", address, intType, length) } } func WriteHoldingRegister(connection plc4go.PlcConnection, address int, value any) (string, error) { tag := fmt.Sprintf("tag:%v:w", address) var tagAddress string if cast.ToInt32(value) > 2<<16 { tagAddress = getTagAddress(address, 2) } else { tagAddress = getTagAddress(address, 1) } // 写模式 writeRequest, err := connection.WriteRequestBuilder().AddTagAddress(tag, tagAddress, value).Build() if err != nil { logx.Errorf("plc4x preparing read-request:%s\n", err.Error()) return "", err } // 执行 writeResult := <-writeRequest.Execute() if err := writeResult.GetErr(); err != nil { logx.Errorf("plc4x execute write-request:%s\n", err.Error()) return "", err } // 判断响应码是否正确 if writeResult.GetResponse().GetResponseCode(tag) != apiModel.PlcResponseCode_OK { logx.Errorf("plc4x response error code: %s", writeResult.GetResponse().GetResponseCode(tag).GetName()) return "", errors.New("error code: " + writeResult.GetResponse().GetResponseCode(tag).GetName()) } result := writeResult.GetResponse().String() return result, nil } func dealErr(err error, ipAddr string) { if err != nil { FailureRemainingOpportunitiesDecr() //减少失败剩余机会 } else { FailureRemainingOpportunitiesReset() //重置失败剩余机会 } } var connectionStatus atomic.Bool var failureRemainingOpportunities atomic.Int64 const ( defaultFailureRemainingOpportunities = 20 ) func IsConnect() bool { return connectionStatus.Load() } func FailureRemainingOpportunitiesDecr() { newValue := failureRemainingOpportunities.Add(-1) if newValue <= 0 { connectionStatus.Store(false) } return } func FailureRemainingOpportunitiesReset() { if failureRemainingOpportunities.Load() < defaultFailureRemainingOpportunities { failureRemainingOpportunities.Store(defaultFailureRemainingOpportunities) } if connectionStatus.Load() == false { connectionStatus.Store(true) } return }