From a32c642ffa2b9f05e3b55fc88b2c2bed3277a080 Mon Sep 17 00:00:00 2001
From: gongshangguo <gongshangguo@admin.com>
Date: 星期二, 08 三月 2022 10:00:30 +0800
Subject: [PATCH] ignore
---
client/client.go | 183 +++++++++++++++++++++++++++++----------------
1 files changed, 117 insertions(+), 66 deletions(-)
diff --git a/client/client.go b/client/client.go
index 74a8d3a..4688786 100644
--- a/client/client.go
+++ b/client/client.go
@@ -3,10 +3,10 @@
import (
"basic.com/valib/go-aiot.git/aiotProto/aiot"
"basic.com/valib/go-aiot.git/util"
- "basic.com/valib/logger.git"
"bufio"
"encoding/binary"
"encoding/json"
+ "errors"
uuid "github.com/satori/go.uuid"
"go.uber.org/zap"
"io"
@@ -23,8 +23,6 @@
DefaultHeartbeatInterval = 15 * time.Second
// 璇诲彇鏁版嵁瓒呮椂鏃堕棿
DefaultReaderTimeOut = 60 * time.Second
- // 杩炴帴灏濊瘯闂撮殧
- DefaultNetRetry = 10 * time.Second
)
// 杩炴帴鐘舵��
@@ -49,8 +47,10 @@
deviceRegister *aiot.DeviceRegister
// 鍏抽棴閿�
closeLock *sync.Mutex
- // 娑堟伅閿�
- msgLock *sync.Mutex
+ // 璇诲彇閿�
+ readLock *sync.Mutex
+ // 鍐欏叆閿�
+ writeLock *sync.Mutex
// 杩炴帴鍦板潃
addr string
// 璁惧ID
@@ -61,8 +61,6 @@
Writer *bufio.Writer
// 鍐欏叆閫氶亾
writeChan chan []byte
- // 閫�鍑洪�氶亾
- exitChan chan int8
// 杩炴帴鐘舵��
state State
// 鎶ユ枃澶�
@@ -73,24 +71,27 @@
clientCallback ClientCallBack
// 蹇冭烦鍖�
heartBeatProto *aiot.HeartBeatProto
+ // logger
+ Logger *zap.SugaredLogger
}
// 鍒濆鍖栧鎴风
-func NewClient(addr string, clientId string, deviceRegister *aiot.DeviceRegister, callBack ClientCallBack) *Client {
+func NewClient(addr string, clientId string, deviceRegister *aiot.DeviceRegister, callBack ClientCallBack, logger *zap.SugaredLogger) *Client {
logger.Debug("New Client...")
return &Client{
deviceRegister: deviceRegister,
+ readLock: new(sync.Mutex),
closeLock: new(sync.Mutex),
- msgLock: new(sync.Mutex),
+ writeLock: new(sync.Mutex),
addr: addr,
deviceId: clientId,
writeChan: make(chan []byte),
- exitChan: make(chan int8),
state: StateInit,
tmpByte4Slice: make([]byte, 4),
waitGroup: &util.WaitGroupWrapper{},
clientCallback: callBack,
heartBeatProto: &aiot.HeartBeatProto{},
+ Logger: logger,
}
}
@@ -104,9 +105,9 @@
// 鍒濆鍖栧綋鍓嶅睘鎬у��
c.Conn = nil
c.closeLock = new(sync.Mutex)
- c.msgLock = new(sync.Mutex)
+ c.readLock = new(sync.Mutex)
+ c.writeLock = new(sync.Mutex)
c.writeChan = make(chan []byte)
- c.exitChan = make(chan int8)
c.state = StateInit
c.tmpByte4Slice = make([]byte, 4)
c.waitGroup = &util.WaitGroupWrapper{}
@@ -115,35 +116,35 @@
// 鍚姩鏈嶅姟
func (c *Client) StartSrv() {
// 鍒ゆ柇杩炴帴鐘舵�侊紝閬垮厤閲嶅杩炴帴
- logger.Debug("Start client service...")
+ c.Logger.Debug("Start client service...")
if c.IsConnected(){
- logger.Error("net is connected, please do not repeat connect", zap.String("addr", c.addr))
+ c.Logger.Error("net is connected, please do not repeat connect", zap.String("addr", c.addr))
return
}
// 鍦板潃鏄惁鍙敤
if c.addr == "" {
- logger.Warn("net addr is nil, wait for retry", zap.String("deviceId", c.deviceId))
+ c.Logger.Warn("net addr is nil, wait for retry", zap.String("deviceId", c.deviceId))
return
}
// 杩炴帴TCP
- logger.Debug("Connecting to service", zap.String("addr", c.addr))
+ c.Logger.Debug("Connecting to service", zap.String("addr", c.addr))
tcpAddr, err := net.ResolveTCPAddr("tcp", c.addr)
if err != nil {
- logger.Warn("Net addr can not be connect for now,waiting...", zap.String("addr", c.addr), zap.Error(err))
+ c.Logger.Warn("Net addr can not be connect for now,waiting...", zap.String("addr", c.addr), zap.Error(err))
return
}
conn, err := net.DialTCP("tcp", nil, tcpAddr)
if err != nil {
- logger.Warn("Net addr can not be dial for now,waiting...", zap.String("addr", c.addr), zap.Error(err))
+ c.Logger.Warn("Net addr can not be dial for now,waiting...", zap.String("addr", c.addr), zap.Error(err))
return
}
c.Conn = conn
// 璁剧疆杩炴帴鐘舵��
c.SetState(StateConnected)
- logger.Debug("Client service connected.", zap.String("addr", c.addr))
+ c.Logger.Debug("Client service connected.", zap.String("addr", c.addr))
// 鍚敤璇诲彇閫氶亾
c.SetRWBuf()
@@ -154,7 +155,7 @@
// 鍚敤蹇冭烦
c.waitGroup.Wrap(c.writeHeartBeat)
c.Wait()
- logger.Warn("Client service disconnected.Return...", zap.String("addr", c.addr))
+ c.Logger.Warn("Client service disconnected.Return...", zap.String("addr", c.addr))
}
// 璁剧疆杩炴帴鐘舵��
@@ -187,43 +188,38 @@
byte4 := make([]byte,4)
for {
select {
- case <- c.exitChan:
- logger.Debug("Close client", zap.String("deviceId", c.deviceId))
- c.Close()
- logger.Warn("writeLoop Done...")
- return
case bodyByte := <- c.writeChan:
binary.BigEndian.PutUint32(byte4, uint32(len(bodyByte)))
body = append(byte4, bodyByte...)
+ c.writeLock.Lock()
_,err = c.Conn.Write(body)
+ c.writeLock.Unlock()
if err != nil {
- logger.Error("Fail to write message", zap.Error(err), zap.String("msg", string(bodyByte)))
+ c.Logger.Error("Fail to write message", zap.Error(err), zap.String("msg", string(bodyByte)))
c.Close()
- logger.Warn("writeLoop Done...")
+ c.Logger.Warn("writeLoop Done...")
return
}
err = c.Writer.Flush()
+ c.Logger.Debug("Write msg success...", zap.String("msg", string(bodyByte)))
if err != nil {
- logger.Error("Fail to write flush", zap.Error(err))
+ c.Logger.Error("Fail to write flush", zap.Error(err))
}
}
}
}
// 鍙戦�佹秷鎭�
-func (c *Client) WriteMsg(senderId string, msgType aiot.MSG_TYPE, reqType aiot.REQ_TYPE, data []byte, msgProto *aiot.MsgIdProto) (*aiot.Protocol, error) {
- // 閿�
- c.closeLock.Lock()
- defer c.closeLock.Unlock()
-
+func (c *Client) writeMsg(senderId string, msgType aiot.MSG_TYPE, reqType aiot.REQ_TYPE, data []byte, msgProto *aiot.MsgIdProto) (*aiot.Protocol, error) {
// 鍏抽棴鐨勮繛鎺ヤ笉鑳藉啓鍏�
if c.IsClosed() {
- logger.Error("Can not write msg on the closed chan", zap.Any("msgType", msgType), zap.Any("reqType", reqType), zap.Any("data", string(data)))
+ c.Logger.Error("Can not write msg on the closed chan", zap.Any("msgType", msgType), zap.Any("reqType", reqType), zap.Any("data", string(data)))
return nil,nil
}
// 鎷艰骞跺彂閫佹秷鎭�
body := &aiot.Protocol{
+ Receiver: aiot.RECEIVER_TO_SAAS,
SenderId: senderId,
MsgType: msgType,
ReqType: reqType,
@@ -232,38 +228,52 @@
}
// 鍙戦�佹秷鎭�
- c.WriteBody(body)
+ _ = c.WriteBody(body)
return body, nil
}
func (c *Client) WriteBody(body *aiot.Protocol) error {
- body.Receiver = aiot.RECEIVER_TO_SAAS
- logger.Debug("Send msg...", zap.Any("msg", body), zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType))
+ defer func() {
+ if err := recover();err != nil {
+ c.Logger.Error("Write Body Error:", err)
+ }
+ }()
+
+ // 閫氶亾宸插叧闂紝涓嶈兘鍐欏叆
+ if c.IsClosed() {
+ errMsg := "Can not write msg into closed chain"
+ c.Logger.Warn(errMsg, zap.Any("msg",body))
+ return errors.New(errMsg)
+ }
+
+ // 娑堟伅ID榛樿澶勭悊
+ if body.MsgProto == nil {
+ body.MsgProto = GetMsgProto("")
+ }
msgData, err := json.Marshal(body)
if err != nil {
- logger.Error("Fail to Marshal send data", zap.Error(err))
+ c.Logger.Error("Fail to Marshal send data", zap.Error(err))
return err
}
- c.msgLock.Lock()
- defer c.msgLock.Unlock()
+ c.Logger.Debug("Write Body into writeChan...", zap.Any("msg", body), zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType))
c.writeChan <- msgData
return nil
}
// 鍙戦�佹敞鍐屽寘
func (c *Client) writeRegister() {
- logger.Debug("registering...")
+ c.Logger.Debug("registering...")
data := c.deviceRegister
msgData, _ := json.Marshal(data)
- _, err := c.WriteMsg(c.deviceId, aiot.MSG_TYPE_REGISTER, aiot.REQ_TYPE_REQUEST, msgData, c.GetMsgProto(""))
+ _, err := c.writeMsg(c.deviceId, aiot.MSG_TYPE_REGISTER, aiot.REQ_TYPE_REQUEST, msgData, GetMsgProto(""))
if err != nil {
- logger.Error("Fail to send device register", zap.Any("msg", msgData))
+ c.Logger.Error("Fail to send device register", zap.Any("msg", msgData))
}
}
// 鍙戦�佸績璺冲寘
func (c *Client) writeHeartBeat() {
- logger.Debug("Start HeartBeating...",zap.String("addr", c.addr))
+ c.Logger.Debug("Start HeartBeating...",zap.String("addr", c.addr))
pingData, _ := json.Marshal(c.heartBeatProto)
t := time.NewTicker(DefaultHeartbeatInterval)
defer func() {
@@ -279,9 +289,33 @@
t.Stop()
return
}
- go c.WriteMsg(c.deviceId, aiot.MSG_TYPE_HEART_BEAT, aiot.REQ_TYPE_REQUEST, pingData, c.GetMsgProto(""))
+ go c.writeMsg(c.deviceId, aiot.MSG_TYPE_HEART_BEAT, aiot.REQ_TYPE_REQUEST, pingData, GetMsgProto(""))
}
}
+}
+
+// 鍙戦�佷笟鍔″寘璇锋眰
+func (c *Client) Request(receiver aiot.RECEIVER, senderId string, msgProto *aiot.MsgIdProto, data []byte) error {
+ defer func() {
+ if err := recover();err != nil {
+ c.Logger.Error("Write Body Error:", err)
+ }
+ }()
+ body := &aiot.Protocol{}
+ body.Receiver = receiver
+ body.SenderId = senderId
+ body.MsgProto = msgProto
+ body.MsgType = aiot.MSG_TYPE_BUSINESS
+ body.ReqType = aiot.REQ_TYPE_REQUEST
+ body.Data = data
+ c.Logger.Debug("Send msg...", zap.Any("msg", body), zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType))
+ msgData, err := json.Marshal(body)
+ if err != nil {
+ c.Logger.Error("Fail to Marshal send data", zap.Error(err))
+ return err
+ }
+ c.writeChan <- msgData
+ return nil
}
// 娑堟伅璇诲彇閫氶亾
@@ -289,14 +323,17 @@
var err error
var length uint32
for {
- c.SetDeadline(time.Now().Add(DefaultReaderTimeOut))
+ c.tmpByte4Slice = make([]byte, 4)
+ _ = c.SetDeadline(time.Now().Add(DefaultReaderTimeOut))
// 璇诲彇闀垮害
+ c.readLock.Lock()
_, err = io.ReadFull(c.Reader, c.tmpByte4Slice)
+ c.readLock.Unlock()
if err != nil {
if err == io.EOF {
+ c.Logger.Error("Fail to read request byte4", zap.Error(err))
err = nil
} else {
- logger.Error("Fail to read request", zap.Error(err))
c.Close()
return
}
@@ -304,31 +341,43 @@
}
length = binary.BigEndian.Uint32(c.tmpByte4Slice)
if length > DefaultBufferSize {
- logger.Error("Fail to read request data from io", zap.Uint32("length",length))
+ c.Logger.Error("Fail to read request data from io", zap.Uint32("length",length))
}
// 璇诲彇body
bodyByte := make([]byte, length)
- c.closeLock.Lock()
+ c.readLock.Lock()
_, err = io.ReadFull(c.Reader, bodyByte)
- c.closeLock.Unlock()
+ c.readLock.Unlock()
+ if err != nil {
+ if err == io.EOF {
+ c.Logger.Error("Fail to read request body", zap.Error(err))
+ err = nil
+ } else {
+ c.Close()
+ return
+ }
+ break
+ }
body := &aiot.Protocol{}
err = json.Unmarshal(bodyByte, body)
if err != nil {
- logger.Error("Fail to unmarshal message", zap.Error(err), zap.String("msg", string(bodyByte)))
+ c.Logger.Error("Fail to unmarshal message", zap.Error(err), zap.String("msg", string(bodyByte)))
}
- logger.Debug("Read msg...", zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType))
+ c.Logger.Debug("Read msg...", zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType))
// 澶勭悊鍥炶皟
c.onMessage(body)
}
- logger.Warn("ReadLoop Done...")
+ c.Logger.Warn("ReadLoop Done...")
+ // 鍏抽棴杩炴帴
+ c.Close()
}
// 澶勭悊鍥炶皟
func (c *Client) onMessage (body *aiot.Protocol) {
// 鏈皝瑁卌allback锛屽彧鍐欐棩蹇�
if c.clientCallback == nil {
- logger.Warn("ClientCallBack interface is nil...", zap.String("addr", c.addr), zap.Any("body", body))
+ c.Logger.Warn("ClientCallBack interface is nil...", zap.String("addr", c.addr), zap.Any("body", body))
return
}
@@ -336,12 +385,12 @@
switch body.MsgType {
// 蹇冭烦鍥炲
case aiot.MSG_TYPE_HEART_BEAT:
- c.clientCallback.OnHeartBeat(c,body)
+ go c.clientCallback.OnHeartBeat(c, body)
return
// 娉ㄥ唽鍥炲
case aiot.MSG_TYPE_REGISTER:
- c.clientCallback.OnRegister(c,body)
+ go c.clientCallback.OnRegister(c, body)
return
// 璁惧鎺у埗
@@ -372,7 +421,7 @@
}
// 鎷艰娑堟伅ID
-func (c *Client) GetMsgProto(msgId string) *aiot.MsgIdProto {
+func GetMsgProto(msgId string) *aiot.MsgIdProto {
// 鏂版秷鎭�
if msgId == "" {
return &aiot.MsgIdProto{
@@ -392,6 +441,11 @@
return c.deviceId
}
+// 鑾峰彇杩炴帴鐘舵��
+func (c *Client) GetState() State {
+ return c.state
+}
+
// 鍒ゆ柇杩炴帴鏄惁鍏抽棴
func (c *Client) IsClosed() bool {
return c.state == StateDisconnected
@@ -409,22 +463,19 @@
// 鍏抽棴TCP
func (c *Client) Close() {
- logger.Debug("Closing connect", zap.String("addr", c.addr))
+ c.Logger.Debug("Closing connect...", zap.String("addr", c.addr))
c.closeLock.Lock()
defer c.closeLock.Unlock()
-
// 鍏抽棴閫氶亾
if !c.IsClosed() {
- c.Conn.Close()
+ _ = c.Conn.Close()
if c.IsConnected() {
c.clientCallback.OnClose(c)
}
-
- // 璁剧疆杩炴帴灞炴��
- c.SetState(StateDisconnected)
-
- // 鍏抽棴绠¢亾
- close(c.exitChan)
close(c.writeChan)
}
+
+ // 璁剧疆杩炴帴灞炴��
+ c.SetState(StateDisconnected)
+ c.Logger.Debug("Connect closed...", zap.String("addr", c.addr))
}
--
Gitblit v1.8.0