From 81a82791a041420f8bd2232f69310a5c52952291 Mon Sep 17 00:00:00 2001
From: gongshangguo <gongshangguo@admin.com>
Date: 星期二, 01 三月 2022 16:49:02 +0800
Subject: [PATCH] 锁
---
client/client.go | 67 ++++++++++++++++++++-------------
1 files changed, 40 insertions(+), 27 deletions(-)
diff --git a/client/client.go b/client/client.go
index 3d6035b..eb7f1a0 100644
--- a/client/client.go
+++ b/client/client.go
@@ -38,6 +38,8 @@
StateDisconnected
)
+var syncReq map[string]chan *aiot.Protocol
+
// 杩炴帴鐘舵��
type State int32
@@ -46,8 +48,8 @@
net.Conn
// 娉ㄥ唽鍖�
deviceRegister *aiot.DeviceRegister
- // 鍏抽棴閿�
- closeLock *sync.Mutex
+ // 瀹㈡埛绔攣
+ lock *sync.Mutex
// 娑堟伅閿�
msgLock *sync.Mutex
// 杩炴帴鍦板潃
@@ -81,8 +83,7 @@
logger.Debug("New Client...")
return &Client{
deviceRegister: deviceRegister,
- closeLock: new(sync.Mutex),
- msgLock: new(sync.Mutex),
+ lock: new(sync.Mutex),
addr: addr,
deviceId: clientId,
writeChan: make(chan []byte),
@@ -105,8 +106,7 @@
func (c *Client) InitClient() {
// 鍒濆鍖栧綋鍓嶅睘鎬у��
c.Conn = nil
- c.closeLock = new(sync.Mutex)
- c.msgLock = new(sync.Mutex)
+ c.lock = new(sync.Mutex)
c.writeChan = make(chan []byte)
c.exitChan = make(chan int8)
c.state = StateInit
@@ -197,11 +197,9 @@
case bodyByte := <- c.writeChan:
binary.BigEndian.PutUint32(byte4, uint32(len(bodyByte)))
body = append(byte4, bodyByte...)
- c.closeLock.Lock()
- c.msgLock.Lock()
+ c.lock.Lock()
_,err = c.Conn.Write(body)
- c.closeLock.Unlock()
- c.msgLock.Unlock()
+ c.lock.Unlock()
if err != nil {
c.Logger.Error("Fail to write message", zap.Error(err), zap.String("msg", string(bodyByte)))
c.Close()
@@ -220,15 +218,15 @@
// 鍙戦�佹秷鎭�
func (c *Client) WriteMsg(senderId string, msgType aiot.MSG_TYPE, reqType aiot.REQ_TYPE, data []byte, msgProto *aiot.MsgIdProto) (*aiot.Protocol, error) {
// 閿�
- c.closeLock.Lock()
+ c.lock.Lock()
// 鍏抽棴鐨勮繛鎺ヤ笉鑳藉啓鍏�
if c.IsClosed() {
c.Logger.Error("Can not write msg on the closed chan", zap.Any("msgType", msgType), zap.Any("reqType", reqType), zap.Any("data", string(data)))
- c.closeLock.Unlock()
+ c.lock.Unlock()
return nil,nil
}
- c.closeLock.Unlock()
+ c.lock.Unlock()
// 鎷艰骞跺彂閫佹秷鎭�
body := &aiot.Protocol{
@@ -245,18 +243,19 @@
}
func (c *Client) WriteBody(body *aiot.Protocol) error {
+ defer func() {
+ if err := recover();err != nil {
+ c.Logger.Error("Write Body Error:", err)
+ }
+ }()
body.Receiver = aiot.RECEIVER_TO_SAAS
- c.Logger.Debug("Write Body...", zap.Any("msg", body), zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType))
msgData, err := json.Marshal(body)
if err != nil {
c.Logger.Error("Fail to Marshal send data", zap.Error(err))
return err
}
- c.msgLock.Lock()
- c.closeLock.Lock()
+ c.Logger.Debug("Write Body into writeChan...", zap.Any("msg", body), zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType))
c.writeChan <- msgData
- c.closeLock.Unlock()
- c.msgLock.Unlock()
return nil
}
@@ -296,6 +295,11 @@
// 鍙戦�佷笟鍔″寘璇锋眰
func (c *Client) Request(receiver aiot.RECEIVER, senderId string, msgProto *aiot.MsgIdProto, data []byte) error {
+ defer func() {
+ if err := recover();err != nil {
+ c.Logger.Error("Write Body Error:", err)
+ }
+ }()
body := &aiot.Protocol{}
body.Receiver = receiver
body.SenderId = senderId
@@ -309,11 +313,7 @@
c.Logger.Error("Fail to Marshal send data", zap.Error(err))
return err
}
- c.msgLock.Lock()
- c.closeLock.Lock()
c.writeChan <- msgData
- c.closeLock.Unlock()
- c.msgLock.Unlock()
return nil
}
@@ -322,14 +322,17 @@
var err error
var length uint32
for {
+ c.tmpByte4Slice = make([]byte, 4)
c.SetDeadline(time.Now().Add(DefaultReaderTimeOut))
// 璇诲彇闀垮害
+ c.lock.Lock()
_, err = io.ReadFull(c.Reader, c.tmpByte4Slice)
+ c.lock.Unlock()
if err != nil {
if err == io.EOF {
+ c.Logger.Error("Fail to read request byte4", zap.Error(err))
err = nil
} else {
- c.Logger.Error("Fail to read request", zap.Error(err))
c.Close()
return
}
@@ -341,9 +344,19 @@
}
// 璇诲彇body
bodyByte := make([]byte, length)
- c.closeLock.Lock()
+ c.lock.Lock()
_, err = io.ReadFull(c.Reader, bodyByte)
- c.closeLock.Unlock()
+ c.lock.Unlock()
+ if err != nil {
+ if err == io.EOF {
+ c.Logger.Error("Fail to read request body", zap.Error(err))
+ err = nil
+ } else {
+ c.Close()
+ return
+ }
+ break
+ }
body := &aiot.Protocol{}
err = json.Unmarshal(bodyByte, body)
if err != nil {
@@ -445,8 +458,8 @@
// 鍏抽棴TCP
func (c *Client) Close() {
c.Logger.Debug("Closing connect", zap.String("addr", c.addr))
- c.closeLock.Lock()
- defer c.closeLock.Unlock()
+ c.lock.Lock()
+ defer c.lock.Unlock()
// 鍏抽棴閫氶亾
if !c.IsClosed() {
--
Gitblit v1.8.0