package client
|
|
import (
|
"basic.com/valib/go-aiot.git/aiotProto/aiot"
|
"basic.com/valib/go-aiot.git/util"
|
"bufio"
|
"encoding/binary"
|
"encoding/json"
|
"errors"
|
uuid "github.com/satori/go.uuid"
|
"go.uber.org/zap"
|
"io"
|
"net"
|
"sync"
|
"time"
|
)
|
|
// 消息及时长
|
const (
|
// 最大消息长度
|
DefaultBufferSize = 512 * 1024
|
// 心跳间隔
|
DefaultHeartbeatInterval = 15 * time.Second
|
// 读取数据超时时间
|
DefaultReaderTimeOut = 60 * time.Second
|
)
|
|
// 连接状态
|
const (
|
// 初始化
|
StateInit State = iota
|
// 已连接
|
StateConnected
|
// 已注册
|
StateRegistered
|
// 已断开
|
StateDisconnected
|
)
|
|
// 连接状态
|
type State int32
|
|
// 客户端连接结构体
|
type Client struct {
|
net.Conn
|
// 注册包
|
deviceRegister *aiot.DeviceRegister
|
// 关闭锁
|
closeLock *sync.Mutex
|
// 读取锁
|
readLock *sync.Mutex
|
// 写入锁
|
writeLock *sync.Mutex
|
// 连接地址
|
addr string
|
// 设备ID
|
deviceId string
|
// 消息读取buffer
|
Reader *bufio.Reader
|
// 消息写入buffer
|
Writer *bufio.Writer
|
// 写入通道
|
writeChan chan []byte
|
// 连接状态
|
state State
|
// 报文头
|
tmpByte4Slice []byte
|
// waiter
|
waitGroup *util.WaitGroupWrapper
|
// 回调函数
|
clientCallback ClientCallBack
|
// 心跳包
|
heartBeatProto *aiot.HeartBeatProto
|
// logger
|
Logger *zap.SugaredLogger
|
}
|
|
// 初始化客户端
|
func NewClient(addr string, clientId string, deviceRegister *aiot.DeviceRegister, callBack ClientCallBack, logger *zap.SugaredLogger) *Client {
|
logger.Debug("New Client...")
|
return &Client{
|
deviceRegister: deviceRegister,
|
readLock: new(sync.Mutex),
|
closeLock: new(sync.Mutex),
|
writeLock: new(sync.Mutex),
|
addr: addr,
|
deviceId: clientId,
|
writeChan: make(chan []byte),
|
state: StateInit,
|
tmpByte4Slice: make([]byte, 4),
|
waitGroup: &util.WaitGroupWrapper{},
|
clientCallback: callBack,
|
heartBeatProto: &aiot.HeartBeatProto{},
|
Logger: logger,
|
}
|
}
|
|
// 设置回掉接口
|
func (c *Client) SetCallBack(callBack ClientCallBack) {
|
c.clientCallback = callBack
|
}
|
|
// 初始化连接属性
|
func (c *Client) InitClient() {
|
// 初始化当前属性值
|
c.Conn = nil
|
c.closeLock = new(sync.Mutex)
|
c.readLock = new(sync.Mutex)
|
c.writeLock = new(sync.Mutex)
|
c.writeChan = make(chan []byte)
|
c.state = StateInit
|
c.tmpByte4Slice = make([]byte, 4)
|
c.waitGroup = &util.WaitGroupWrapper{}
|
}
|
|
// 启动服务
|
func (c *Client) StartSrv() {
|
// 判断连接状态,避免重复连接
|
c.Logger.Debug("Start client service...")
|
if c.IsConnected(){
|
c.Logger.Error("net is connected, please do not repeat connect", zap.String("addr", c.addr))
|
return
|
}
|
|
// 地址是否可用
|
if c.addr == "" {
|
c.Logger.Warn("net addr is nil, wait for retry", zap.String("deviceId", c.deviceId))
|
return
|
}
|
|
// 连接TCP
|
c.Logger.Debug("Connecting to service", zap.String("addr", c.addr))
|
tcpAddr, err := net.ResolveTCPAddr("tcp", c.addr)
|
if err != nil {
|
c.Logger.Warn("Net addr can not be connect for now,waiting...", zap.String("addr", c.addr), zap.Error(err))
|
return
|
}
|
conn, err := net.DialTCP("tcp", nil, tcpAddr)
|
if err != nil {
|
c.Logger.Warn("Net addr can not be dial for now,waiting...", zap.String("addr", c.addr), zap.Error(err))
|
return
|
}
|
c.Conn = conn
|
|
// 设置连接状态
|
c.SetState(StateConnected)
|
c.Logger.Debug("Client service connected.", zap.String("addr", c.addr))
|
|
// 启用读取通道
|
c.SetRWBuf()
|
|
// 发送注册信息
|
c.writeRegister()
|
|
// 启用心跳
|
c.waitGroup.Wrap(c.writeHeartBeat)
|
c.Wait()
|
c.Logger.Warn("Client service disconnected.Return...", zap.String("addr", c.addr))
|
}
|
|
// 设置连接状态
|
func (c *Client) SetState(state State) {
|
c.state = state
|
}
|
|
// 指定节点ID
|
func (c *Client) SetDeviceId(deviceId string) {
|
c.deviceId = deviceId
|
}
|
|
// 启用读写通道
|
func (c *Client) SetRWBuf() {
|
// 设置数据发送和接收上限
|
c.Writer = bufio.NewWriterSize(c.Conn, DefaultBufferSize)
|
c.Reader = bufio.NewReaderSize(c.Conn, DefaultBufferSize)
|
|
// 启用读取通道
|
c.waitGroup.Wrap(c.readLoop)
|
|
// 启用写入通道
|
c.waitGroup.Wrap(c.writeLoop)
|
}
|
|
// 消息写入通道
|
func (c *Client) writeLoop() {
|
var err error
|
var body []byte
|
byte4 := make([]byte,4)
|
for {
|
select {
|
case bodyByte := <- c.writeChan:
|
binary.BigEndian.PutUint32(byte4, uint32(len(bodyByte)))
|
body = append(byte4, bodyByte...)
|
c.writeLock.Lock()
|
_,err = c.Conn.Write(body)
|
c.writeLock.Unlock()
|
if err != nil {
|
c.Logger.Error("Fail to write message", zap.Error(err), zap.String("msg", string(bodyByte)))
|
c.Close()
|
c.Logger.Warn("writeLoop Done...")
|
return
|
}
|
err = c.Writer.Flush()
|
c.Logger.Debug("Write msg success...", zap.String("msg", string(bodyByte)))
|
if err != nil {
|
c.Logger.Error("Fail to write flush", zap.Error(err))
|
}
|
}
|
}
|
}
|
|
// 发送消息
|
func (c *Client) writeMsg(senderId string, msgType aiot.MSG_TYPE, reqType aiot.REQ_TYPE, data []byte, msgProto *aiot.MsgIdProto) (*aiot.Protocol, error) {
|
// 关闭的连接不能写入
|
if c.IsClosed() {
|
c.Logger.Error("Can not write msg on the closed chan", zap.Any("msgType", msgType), zap.Any("reqType", reqType), zap.Any("data", string(data)))
|
return nil,nil
|
}
|
|
// 拼装并发送消息
|
body := &aiot.Protocol{
|
Receiver: aiot.RECEIVER_TO_SAAS,
|
SenderId: senderId,
|
MsgType: msgType,
|
ReqType: reqType,
|
Data: data,
|
MsgProto: msgProto,
|
}
|
|
// 发送消息
|
_ = c.WriteBody(body)
|
return body, nil
|
}
|
|
func (c *Client) WriteBody(body *aiot.Protocol) error {
|
defer func() {
|
if err := recover();err != nil {
|
c.Logger.Error("Write Body Error:", err)
|
}
|
}()
|
|
// 通道已关闭,不能写入
|
if c.IsClosed() {
|
errMsg := "Can not write msg into closed chain"
|
c.Logger.Warn(errMsg, zap.Any("msg",body))
|
return errors.New(errMsg)
|
}
|
|
// 消息ID默认处理
|
if body.MsgProto == nil {
|
body.MsgProto = GetMsgProto("")
|
}
|
msgData, err := json.Marshal(body)
|
if err != nil {
|
c.Logger.Error("Fail to Marshal send data", zap.Error(err))
|
return err
|
}
|
c.Logger.Debug("Write Body into writeChan...", zap.Any("msg", body), zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType))
|
c.writeChan <- msgData
|
return nil
|
}
|
|
// 发送注册包
|
func (c *Client) writeRegister() {
|
c.Logger.Debug("registering...")
|
data := c.deviceRegister
|
msgData, _ := json.Marshal(data)
|
_, err := c.writeMsg(c.deviceId, aiot.MSG_TYPE_REGISTER, aiot.REQ_TYPE_REQUEST, msgData, GetMsgProto(""))
|
if err != nil {
|
c.Logger.Error("Fail to send device register", zap.Any("msg", msgData))
|
}
|
}
|
|
// 发送心跳包
|
func (c *Client) writeHeartBeat() {
|
c.Logger.Debug("Start HeartBeating...",zap.String("addr", c.addr))
|
pingData, _ := json.Marshal(c.heartBeatProto)
|
t := time.NewTicker(DefaultHeartbeatInterval)
|
defer func() {
|
t.Stop()
|
c.Close()
|
}()
|
|
// 循环发送
|
for{
|
select {
|
case <- t.C:
|
if c.IsClosed() {
|
t.Stop()
|
return
|
}
|
go c.writeMsg(c.deviceId, aiot.MSG_TYPE_HEART_BEAT, aiot.REQ_TYPE_REQUEST, pingData, GetMsgProto(""))
|
}
|
}
|
}
|
|
// 发送业务包请求
|
func (c *Client) Request(receiver aiot.RECEIVER, senderId string, msgProto *aiot.MsgIdProto, data []byte) error {
|
defer func() {
|
if err := recover();err != nil {
|
c.Logger.Error("Write Body Error:", err)
|
}
|
}()
|
body := &aiot.Protocol{}
|
body.Receiver = receiver
|
body.SenderId = senderId
|
body.MsgProto = msgProto
|
body.MsgType = aiot.MSG_TYPE_BUSINESS
|
body.ReqType = aiot.REQ_TYPE_REQUEST
|
body.Data = data
|
c.Logger.Debug("Send msg...", zap.Any("msg", body), zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType))
|
msgData, err := json.Marshal(body)
|
if err != nil {
|
c.Logger.Error("Fail to Marshal send data", zap.Error(err))
|
return err
|
}
|
c.writeChan <- msgData
|
return nil
|
}
|
|
// 消息读取通道
|
func (c *Client) readLoop() {
|
var err error
|
var length uint32
|
for {
|
c.tmpByte4Slice = make([]byte, 4)
|
_ = c.SetDeadline(time.Now().Add(DefaultReaderTimeOut))
|
// 读取长度
|
c.readLock.Lock()
|
_, err = io.ReadFull(c.Reader, c.tmpByte4Slice)
|
c.readLock.Unlock()
|
if err != nil {
|
if err == io.EOF {
|
c.Logger.Error("Fail to read request byte4", zap.Error(err))
|
err = nil
|
} else {
|
c.Close()
|
return
|
}
|
break
|
}
|
length = binary.BigEndian.Uint32(c.tmpByte4Slice)
|
if length > DefaultBufferSize {
|
c.Logger.Error("Fail to read request data from io", zap.Uint32("length",length))
|
}
|
// 读取body
|
bodyByte := make([]byte, length)
|
c.readLock.Lock()
|
_, err = io.ReadFull(c.Reader, bodyByte)
|
c.readLock.Unlock()
|
if err != nil {
|
if err == io.EOF {
|
c.Logger.Error("Fail to read request body", zap.Error(err))
|
err = nil
|
} else {
|
c.Close()
|
return
|
}
|
break
|
}
|
body := &aiot.Protocol{}
|
err = json.Unmarshal(bodyByte, body)
|
if err != nil {
|
c.Logger.Error("Fail to unmarshal message", zap.Error(err), zap.String("msg", string(bodyByte)))
|
}
|
c.Logger.Debug("Read msg...", zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType))
|
// 处理回调
|
c.onMessage(body)
|
}
|
|
c.Logger.Warn("ReadLoop Done...")
|
// 关闭连接
|
c.Close()
|
}
|
|
// 处理回调
|
func (c *Client) onMessage (body *aiot.Protocol) {
|
// 未封装callback,只写日志
|
if c.clientCallback == nil {
|
c.Logger.Warn("ClientCallBack interface is nil...", zap.String("addr", c.addr), zap.Any("body", body))
|
return
|
}
|
|
// 根据消息类型分类
|
switch body.MsgType {
|
// 心跳回复
|
case aiot.MSG_TYPE_HEART_BEAT:
|
go c.clientCallback.OnHeartBeat(c, body)
|
return
|
|
// 注册回复
|
case aiot.MSG_TYPE_REGISTER:
|
go c.clientCallback.OnRegister(c, body)
|
return
|
|
// 设备控制
|
case aiot.MSG_TYPE_CONTROL:
|
go c.clientCallback.OnDeviceControl(c,body)
|
return
|
|
// 数据下发
|
case aiot.MSG_TYPE_DATA_REPORT:
|
go c.clientCallback.OnDataReport(c,body)
|
return
|
|
// 其他业务消息
|
case aiot.MSG_TYPE_BUSINESS:
|
switch body.ReqType {
|
// 被请求
|
case aiot.REQ_TYPE_REQUEST:
|
go c.clientCallback.OnRequest(c,body)
|
return
|
|
// 被响应
|
case aiot.REQ_TYPE_RESPONSE:
|
go c.clientCallback.OnResponse(c,body)
|
return
|
}
|
return
|
}
|
}
|
|
// 拼装消息ID
|
func GetMsgProto(msgId string) *aiot.MsgIdProto {
|
// 新消息
|
if msgId == "" {
|
return &aiot.MsgIdProto{
|
MsgId: uuid.NewV4().String(),
|
}
|
}
|
|
// 回复消息
|
return &aiot.MsgIdProto{
|
MsgId: uuid.NewV4().String(),
|
PreMsgId: msgId,
|
}
|
}
|
|
// 获取deviceId
|
func (c *Client) GetDeviceId() string {
|
return c.deviceId
|
}
|
|
// 获取连接状态
|
func (c *Client) GetState() State {
|
return c.state
|
}
|
|
// 判断连接是否关闭
|
func (c *Client) IsClosed() bool {
|
return c.state == StateDisconnected
|
}
|
|
// 连接是否在线
|
func (c *Client) IsConnected () bool {
|
return c.state == StateConnected || c.state == StateRegistered
|
}
|
|
// 阻塞等待
|
func (c *Client) Wait() {
|
c.waitGroup.Wait()
|
}
|
|
// 关闭TCP
|
func (c *Client) Close() {
|
c.Logger.Debug("Closing connect...", zap.String("addr", c.addr))
|
c.closeLock.Lock()
|
defer c.closeLock.Unlock()
|
// 关闭通道
|
if !c.IsClosed() {
|
_ = c.Conn.Close()
|
if c.IsConnected() {
|
c.clientCallback.OnClose(c)
|
}
|
close(c.writeChan)
|
}
|
|
// 设置连接属性
|
c.SetState(StateDisconnected)
|
c.Logger.Debug("Connect closed...", zap.String("addr", c.addr))
|
}
|