From a32c642ffa2b9f05e3b55fc88b2c2bed3277a080 Mon Sep 17 00:00:00 2001
From: gongshangguo <gongshangguo@admin.com>
Date: 星期二, 08 三月 2022 10:00:30 +0800
Subject: [PATCH] ignore

---
 client/client.go |  183 +++++++++++++++++++++++++++++----------------
 1 files changed, 117 insertions(+), 66 deletions(-)

diff --git a/client/client.go b/client/client.go
index 74a8d3a..4688786 100644
--- a/client/client.go
+++ b/client/client.go
@@ -3,10 +3,10 @@
 import (
 	"basic.com/valib/go-aiot.git/aiotProto/aiot"
 	"basic.com/valib/go-aiot.git/util"
-	"basic.com/valib/logger.git"
 	"bufio"
 	"encoding/binary"
 	"encoding/json"
+	"errors"
 	uuid "github.com/satori/go.uuid"
 	"go.uber.org/zap"
 	"io"
@@ -23,8 +23,6 @@
 	DefaultHeartbeatInterval = 15 * time.Second
 	// 璇诲彇鏁版嵁瓒呮椂鏃堕棿
 	DefaultReaderTimeOut = 60 * time.Second
-	// 杩炴帴灏濊瘯闂撮殧
-	DefaultNetRetry = 10 * time.Second
 )
 
 // 杩炴帴鐘舵��
@@ -49,8 +47,10 @@
 	deviceRegister *aiot.DeviceRegister
 	// 鍏抽棴閿�
 	closeLock *sync.Mutex
-	// 娑堟伅閿�
-	msgLock *sync.Mutex
+	// 璇诲彇閿�
+	readLock *sync.Mutex
+	// 鍐欏叆閿�
+	writeLock *sync.Mutex
 	// 杩炴帴鍦板潃
 	addr string
 	// 璁惧ID
@@ -61,8 +61,6 @@
 	Writer *bufio.Writer
 	// 鍐欏叆閫氶亾
 	writeChan chan []byte
-	// 閫�鍑洪�氶亾
-	exitChan chan int8
 	// 杩炴帴鐘舵��
 	state State
 	// 鎶ユ枃澶�
@@ -73,24 +71,27 @@
 	clientCallback ClientCallBack
 	// 蹇冭烦鍖�
 	heartBeatProto *aiot.HeartBeatProto
+	// logger
+	Logger *zap.SugaredLogger
 }
 
 // 鍒濆鍖栧鎴风
-func NewClient(addr string, clientId string, deviceRegister *aiot.DeviceRegister, callBack ClientCallBack) *Client {
+func NewClient(addr string, clientId string, deviceRegister *aiot.DeviceRegister, callBack ClientCallBack, logger *zap.SugaredLogger) *Client {
 	logger.Debug("New Client...")
 	return &Client{
 		deviceRegister: deviceRegister,
+		readLock: new(sync.Mutex),
 		closeLock: new(sync.Mutex),
-		msgLock: new(sync.Mutex),
+		writeLock: new(sync.Mutex),
 		addr: addr,
 		deviceId: clientId,
 		writeChan: make(chan []byte),
-		exitChan: make(chan int8),
 		state: StateInit,
 		tmpByte4Slice: make([]byte, 4),
 		waitGroup: &util.WaitGroupWrapper{},
 		clientCallback: callBack,
 		heartBeatProto: &aiot.HeartBeatProto{},
+		Logger: logger,
 	}
 }
 
@@ -104,9 +105,9 @@
 	// 鍒濆鍖栧綋鍓嶅睘鎬у��
 	c.Conn = nil
 	c.closeLock = new(sync.Mutex)
-	c.msgLock = new(sync.Mutex)
+	c.readLock = new(sync.Mutex)
+	c.writeLock = new(sync.Mutex)
 	c.writeChan = make(chan []byte)
-	c.exitChan = make(chan int8)
 	c.state = StateInit
 	c.tmpByte4Slice = make([]byte, 4)
 	c.waitGroup = &util.WaitGroupWrapper{}
@@ -115,35 +116,35 @@
 // 鍚姩鏈嶅姟
 func (c *Client) StartSrv() {
 	// 鍒ゆ柇杩炴帴鐘舵�侊紝閬垮厤閲嶅杩炴帴
-	logger.Debug("Start client service...")
+	c.Logger.Debug("Start client service...")
 	if c.IsConnected(){
-		logger.Error("net is connected, please do not repeat connect", zap.String("addr", c.addr))
+		c.Logger.Error("net is connected, please do not repeat connect", zap.String("addr", c.addr))
 		return
 	}
 
 	// 鍦板潃鏄惁鍙敤
 	if c.addr == "" {
-		logger.Warn("net addr is nil, wait for retry", zap.String("deviceId", c.deviceId))
+		c.Logger.Warn("net addr is nil, wait for retry", zap.String("deviceId", c.deviceId))
 		return
 	}
 
 	// 杩炴帴TCP
-	logger.Debug("Connecting to service", zap.String("addr", c.addr))
+	c.Logger.Debug("Connecting to service", zap.String("addr", c.addr))
 	tcpAddr, err := net.ResolveTCPAddr("tcp", c.addr)
 	if err != nil {
-		logger.Warn("Net addr can not be connect for now,waiting...", zap.String("addr", c.addr), zap.Error(err))
+		c.Logger.Warn("Net addr can not be connect for now,waiting...", zap.String("addr", c.addr), zap.Error(err))
 		return
 	}
 	conn, err := net.DialTCP("tcp", nil, tcpAddr)
 	if err != nil {
-		logger.Warn("Net addr can not be dial for now,waiting...", zap.String("addr", c.addr), zap.Error(err))
+		c.Logger.Warn("Net addr can not be dial for now,waiting...", zap.String("addr", c.addr), zap.Error(err))
 		return
 	}
 	c.Conn = conn
 
 	// 璁剧疆杩炴帴鐘舵��
 	c.SetState(StateConnected)
-	logger.Debug("Client service connected.", zap.String("addr", c.addr))
+	c.Logger.Debug("Client service connected.", zap.String("addr", c.addr))
 
 	// 鍚敤璇诲彇閫氶亾
 	c.SetRWBuf()
@@ -154,7 +155,7 @@
 	// 鍚敤蹇冭烦
 	c.waitGroup.Wrap(c.writeHeartBeat)
 	c.Wait()
-	logger.Warn("Client service disconnected.Return...", zap.String("addr", c.addr))
+	c.Logger.Warn("Client service disconnected.Return...", zap.String("addr", c.addr))
 }
 
 // 璁剧疆杩炴帴鐘舵��
@@ -187,43 +188,38 @@
 	byte4 := make([]byte,4)
 	for {
 		select {
-		case <- c.exitChan:
-			logger.Debug("Close client", zap.String("deviceId", c.deviceId))
-			c.Close()
-			logger.Warn("writeLoop Done...")
-			return
 		case bodyByte := <- c.writeChan:
 			binary.BigEndian.PutUint32(byte4, uint32(len(bodyByte)))
 			body = append(byte4, bodyByte...)
+			c.writeLock.Lock()
 			_,err = c.Conn.Write(body)
+			c.writeLock.Unlock()
 			if err != nil {
-				logger.Error("Fail to write message", zap.Error(err), zap.String("msg", string(bodyByte)))
+				c.Logger.Error("Fail to write message", zap.Error(err), zap.String("msg", string(bodyByte)))
 				c.Close()
-				logger.Warn("writeLoop Done...")
+				c.Logger.Warn("writeLoop Done...")
 				return
 			}
 			err = c.Writer.Flush()
+			c.Logger.Debug("Write msg success...", zap.String("msg", string(bodyByte)))
 			if err != nil {
-				logger.Error("Fail to write flush", zap.Error(err))
+				c.Logger.Error("Fail to write flush", zap.Error(err))
 			}
 		}
 	}
 }
 
 // 鍙戦�佹秷鎭�
-func (c *Client) WriteMsg(senderId string, msgType aiot.MSG_TYPE, reqType aiot.REQ_TYPE, data []byte, msgProto *aiot.MsgIdProto) (*aiot.Protocol, error) {
-	// 閿�
-	c.closeLock.Lock()
-	defer c.closeLock.Unlock()
-
+func (c *Client) writeMsg(senderId string, msgType aiot.MSG_TYPE, reqType aiot.REQ_TYPE, data []byte, msgProto *aiot.MsgIdProto) (*aiot.Protocol, error) {
 	// 鍏抽棴鐨勮繛鎺ヤ笉鑳藉啓鍏�
 	if c.IsClosed() {
-		logger.Error("Can not write msg on the closed chan", zap.Any("msgType", msgType), zap.Any("reqType", reqType), zap.Any("data", string(data)))
+		c.Logger.Error("Can not write msg on the closed chan", zap.Any("msgType", msgType), zap.Any("reqType", reqType), zap.Any("data", string(data)))
 		return nil,nil
 	}
 
 	// 鎷艰骞跺彂閫佹秷鎭�
 	body := &aiot.Protocol{
+		Receiver: aiot.RECEIVER_TO_SAAS,
 		SenderId: senderId,
 		MsgType: msgType,
 		ReqType: reqType,
@@ -232,38 +228,52 @@
 	}
 
 	// 鍙戦�佹秷鎭�
-	c.WriteBody(body)
+	_ = c.WriteBody(body)
 	return body, nil
 }
 
 func (c *Client) WriteBody(body *aiot.Protocol) error {
-	body.Receiver = aiot.RECEIVER_TO_SAAS
-	logger.Debug("Send msg...", zap.Any("msg", body), zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType))
+	defer func() {
+		if err := recover();err != nil  {
+			c.Logger.Error("Write Body Error:", err)
+		}
+	}()
+
+	// 閫氶亾宸插叧闂紝涓嶈兘鍐欏叆
+	if c.IsClosed() {
+		errMsg := "Can not write msg into closed chain"
+		c.Logger.Warn(errMsg, zap.Any("msg",body))
+		return errors.New(errMsg)
+	}
+
+	// 娑堟伅ID榛樿澶勭悊
+	if body.MsgProto == nil {
+		body.MsgProto = GetMsgProto("")
+	}
 	msgData, err := json.Marshal(body)
 	if err != nil {
-		logger.Error("Fail to Marshal send data", zap.Error(err))
+		c.Logger.Error("Fail to Marshal send data", zap.Error(err))
 		return err
 	}
-	c.msgLock.Lock()
-	defer c.msgLock.Unlock()
+	c.Logger.Debug("Write Body into writeChan...", zap.Any("msg", body), zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType))
 	c.writeChan <- msgData
 	return nil
 }
 
 // 鍙戦�佹敞鍐屽寘
 func (c *Client) writeRegister() {
-	logger.Debug("registering...")
+	c.Logger.Debug("registering...")
 	data := c.deviceRegister
 	msgData, _ := json.Marshal(data)
-	_, err := c.WriteMsg(c.deviceId, aiot.MSG_TYPE_REGISTER, aiot.REQ_TYPE_REQUEST, msgData, c.GetMsgProto(""))
+	_, err := c.writeMsg(c.deviceId, aiot.MSG_TYPE_REGISTER, aiot.REQ_TYPE_REQUEST, msgData, GetMsgProto(""))
 	if err != nil {
-		logger.Error("Fail to send device register", zap.Any("msg", msgData))
+		c.Logger.Error("Fail to send device register", zap.Any("msg", msgData))
 	}
 }
 
 // 鍙戦�佸績璺冲寘
 func (c *Client) writeHeartBeat() {
-	logger.Debug("Start HeartBeating...",zap.String("addr", c.addr))
+	c.Logger.Debug("Start HeartBeating...",zap.String("addr", c.addr))
 	pingData, _ := json.Marshal(c.heartBeatProto)
 	t := time.NewTicker(DefaultHeartbeatInterval)
 	defer func() {
@@ -279,9 +289,33 @@
 				t.Stop()
 				return
 			}
-			go c.WriteMsg(c.deviceId, aiot.MSG_TYPE_HEART_BEAT, aiot.REQ_TYPE_REQUEST, pingData, c.GetMsgProto(""))
+			go c.writeMsg(c.deviceId, aiot.MSG_TYPE_HEART_BEAT, aiot.REQ_TYPE_REQUEST, pingData, GetMsgProto(""))
 		}
 	}
+}
+
+// 鍙戦�佷笟鍔″寘璇锋眰
+func (c *Client) Request(receiver aiot.RECEIVER, senderId string, msgProto *aiot.MsgIdProto, data []byte) error {
+	defer func() {
+		if err := recover();err != nil  {
+			c.Logger.Error("Write Body Error:", err)
+		}
+	}()
+	body := &aiot.Protocol{}
+	body.Receiver = receiver
+	body.SenderId = senderId
+	body.MsgProto = msgProto
+	body.MsgType = aiot.MSG_TYPE_BUSINESS
+	body.ReqType = aiot.REQ_TYPE_REQUEST
+	body.Data = data
+	c.Logger.Debug("Send msg...", zap.Any("msg", body), zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType))
+	msgData, err := json.Marshal(body)
+	if err != nil {
+		c.Logger.Error("Fail to Marshal send data", zap.Error(err))
+		return err
+	}
+	c.writeChan <- msgData
+	return nil
 }
 
 // 娑堟伅璇诲彇閫氶亾
@@ -289,14 +323,17 @@
 	var err error
 	var length uint32
 	for {
-		c.SetDeadline(time.Now().Add(DefaultReaderTimeOut))
+		c.tmpByte4Slice = make([]byte, 4)
+		_ = c.SetDeadline(time.Now().Add(DefaultReaderTimeOut))
 		// 璇诲彇闀垮害
+		c.readLock.Lock()
 		_, err = io.ReadFull(c.Reader, c.tmpByte4Slice)
+		c.readLock.Unlock()
 		if err != nil {
 			if err == io.EOF {
+				c.Logger.Error("Fail to read request byte4", zap.Error(err))
 				err = nil
 			} else {
-				logger.Error("Fail to read request", zap.Error(err))
 				c.Close()
 				return
 			}
@@ -304,31 +341,43 @@
 		}
 		length = binary.BigEndian.Uint32(c.tmpByte4Slice)
 		if length > DefaultBufferSize {
-			logger.Error("Fail to read request data from io", zap.Uint32("length",length))
+			c.Logger.Error("Fail to read request data from io", zap.Uint32("length",length))
 		}
 		// 璇诲彇body
 		bodyByte := make([]byte, length)
-		c.closeLock.Lock()
+		c.readLock.Lock()
 		_, err = io.ReadFull(c.Reader, bodyByte)
-		c.closeLock.Unlock()
+		c.readLock.Unlock()
+		if err != nil {
+			if err == io.EOF {
+				c.Logger.Error("Fail to read request body", zap.Error(err))
+				err = nil
+			} else {
+				c.Close()
+				return
+			}
+			break
+		}
 		body := &aiot.Protocol{}
 		err = json.Unmarshal(bodyByte, body)
 		if err != nil {
-			logger.Error("Fail to unmarshal message", zap.Error(err), zap.String("msg", string(bodyByte)))
+			c.Logger.Error("Fail to unmarshal message", zap.Error(err), zap.String("msg", string(bodyByte)))
 		}
-		logger.Debug("Read msg...", zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType))
+		c.Logger.Debug("Read msg...", zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType))
 		// 澶勭悊鍥炶皟
 		c.onMessage(body)
 	}
 
-	logger.Warn("ReadLoop Done...")
+	c.Logger.Warn("ReadLoop Done...")
+	// 鍏抽棴杩炴帴
+	c.Close()
 }
 
 // 澶勭悊鍥炶皟
 func (c *Client) onMessage (body *aiot.Protocol) {
 	// 鏈皝瑁卌allback锛屽彧鍐欐棩蹇�
 	if c.clientCallback == nil {
-		logger.Warn("ClientCallBack interface is nil...", zap.String("addr", c.addr), zap.Any("body", body))
+		c.Logger.Warn("ClientCallBack interface is nil...", zap.String("addr", c.addr), zap.Any("body", body))
 		return
 	}
 
@@ -336,12 +385,12 @@
 	switch body.MsgType {
 	// 蹇冭烦鍥炲
 	case aiot.MSG_TYPE_HEART_BEAT:
-		c.clientCallback.OnHeartBeat(c,body)
+		go c.clientCallback.OnHeartBeat(c, body)
 		return
 
 	// 娉ㄥ唽鍥炲
 	case aiot.MSG_TYPE_REGISTER:
-		c.clientCallback.OnRegister(c,body)
+		go c.clientCallback.OnRegister(c, body)
 		return
 
 	// 璁惧鎺у埗
@@ -372,7 +421,7 @@
 }
 
 // 鎷艰娑堟伅ID
-func (c *Client) GetMsgProto(msgId string) *aiot.MsgIdProto {
+func GetMsgProto(msgId string) *aiot.MsgIdProto {
 	// 鏂版秷鎭�
 	if msgId == "" {
 		return &aiot.MsgIdProto{
@@ -392,6 +441,11 @@
 	return c.deviceId
 }
 
+// 鑾峰彇杩炴帴鐘舵��
+func (c *Client) GetState() State {
+	return c.state
+}
+
 // 鍒ゆ柇杩炴帴鏄惁鍏抽棴
 func (c *Client) IsClosed() bool {
 	return c.state == StateDisconnected
@@ -409,22 +463,19 @@
 
 // 鍏抽棴TCP
 func (c *Client) Close() {
-	logger.Debug("Closing connect", zap.String("addr", c.addr))
+	c.Logger.Debug("Closing connect...", zap.String("addr", c.addr))
 	c.closeLock.Lock()
 	defer c.closeLock.Unlock()
-
 	// 鍏抽棴閫氶亾
 	if !c.IsClosed() {
-		c.Conn.Close()
+		_ = c.Conn.Close()
 		if c.IsConnected() {
 			c.clientCallback.OnClose(c)
 		}
-
-		// 璁剧疆杩炴帴灞炴��
-		c.SetState(StateDisconnected)
-
-		// 鍏抽棴绠¢亾
-		close(c.exitChan)
 		close(c.writeChan)
 	}
+
+	// 璁剧疆杩炴帴灞炴��
+	c.SetState(StateDisconnected)
+	c.Logger.Debug("Connect closed...", zap.String("addr", c.addr))
 }

--
Gitblit v1.8.0