From 160a425c85f128ed47f92aa15c7eb6f76d68610a Mon Sep 17 00:00:00 2001
From: gongshangguo <gongshangguo@admin.com>
Date: 星期一, 28 二月 2022 17:26:10 +0800
Subject: [PATCH] 回复register
---
server/server.go | 227 ++++++++++++++++++++++----------------------
server/clienter.go | 28 +++-
server/callback.go | 12 +-
3 files changed, 139 insertions(+), 128 deletions(-)
diff --git a/server/callback.go b/server/callback.go
index 1aae81c..92e8aa9 100644
--- a/server/callback.go
+++ b/server/callback.go
@@ -7,18 +7,18 @@
// 鍥炶皟鎺ュ彛
type ServerCallBack interface {
+ // 鏀跺埌蹇冭烦
+ OnHeartBeat (c *client.Client, msg *aiot.Protocol)
+ // 璁惧娉ㄥ唽
+ OnRegister (c *client.Client, msg *aiot.Protocol)
// 鏀跺埌璇锋眰
OnRequest (c *client.Client, msg *aiot.Protocol)
// 鏀跺埌鍝嶅簲
OnResponse (c *client.Client, msg *aiot.Protocol)
- // 璁惧娉ㄥ唽
- OnRegister (c *client.Client, msg *aiot.Protocol)
- // 鏀跺埌蹇冭烦
- OnHeartBeat (c *client.Client, msg *aiot.Protocol)
- // 閫氶亾鍏抽棴
- OnClose (c *client.Client)
// 鏁版嵁涓婃姤
OnDataReport (c *client.Client, msg *aiot.Protocol)
// 璁惧鎺у埗
OnDeviceControl(c *client.Client, msg *aiot.Protocol)
+ // 閫氶亾鍏抽棴
+ OnClose (c *client.Client)
}
\ No newline at end of file
diff --git a/server/clienter.go b/server/clienter.go
index f18377e..710f769 100644
--- a/server/clienter.go
+++ b/server/clienter.go
@@ -39,6 +39,16 @@
func (c *Clients) OnRegister(cli *client.Client, msg *aiot.Protocol) error {
Srv.RegisterDevice(msg, cli)
go Srv.serverCallBack.OnRegister(cli, msg)
+ msgFeedBack := &aiot.Protocol{
+ Receiver: aiot.RECEIVER_TO_MASTER,
+ SenderId: Srv.serverId,
+ DeviceProto: msg.DeviceProto,
+ MsgType: aiot.MSG_TYPE_REGISTER,
+ ReqType: aiot.REQ_TYPE_RESPONSE,
+ MsgProto: cli.GetMsgProto(msg.MsgProto.MsgId),
+ Data: msg.Data,
+ }
+ _ = cli.WriteBody(msgFeedBack)
return nil
}
@@ -54,15 +64,6 @@
return nil
}
-// 瀹炵幇OnClose
-func (c *Clients) OnClose(cli *client.Client) {
- if Srv.IsMasterOnline(cli.GetDeviceId()) {
- Srv.RemoveCluster(cli.GetDeviceId())
- }
- go Srv.serverCallBack.OnClose(cli)
- return
-}
-
// 瀹炵幇OnDataReport
func (c *Clients) OnDataReport(cli *client.Client, msg *aiot.Protocol) error {
go Srv.serverCallBack.OnDataReport(cli, msg)
@@ -74,3 +75,12 @@
go Srv.serverCallBack.OnDeviceControl(cli, msg)
return nil
}
+
+// 瀹炵幇OnClose
+func (c *Clients) OnClose(cli *client.Client) {
+ if Srv.IsMasterOnline(cli.GetDeviceId()) {
+ Srv.RemoveCluster(cli.GetDeviceId())
+ }
+ go Srv.serverCallBack.OnClose(cli)
+ return
+}
diff --git a/server/server.go b/server/server.go
index c8f378e..e5fb458 100644
--- a/server/server.go
+++ b/server/server.go
@@ -46,6 +46,119 @@
// 鍏ㄥ眬鏈嶅姟
var Srv *Server
+// 涓婇攣
+func (s *Server) Lock() {
+ s.deviceLock.Lock()
+}
+
+// 瑙i攣
+func (s *Server) UnLock() {
+ s.deviceLock.Unlock()
+}
+
+// 鍒濆鍖栨湇鍔�
+func NewServer(addr string, serverId string, serverCallBack ServerCallBack, logger *zap.SugaredLogger) *Server {
+ logger.Debug("New server...", zap.String("addr", addr))
+ return &Server{
+ addr: addr,
+ waitGroup: &util.WaitGroupWrapper{},
+ serverId: serverId,
+ deviceLock: new(sync.RWMutex),
+ Clusters: make(map[string]*client.Client),
+ Devices: make(map[string]struct{}),
+ ClusterDevice: make(map[string]map[string]struct{}),
+ ClusterBlackList: make(map[string]struct{}),
+ serverCallBack: serverCallBack,
+ Logger: logger,
+ }
+}
+
+// 鍚姩鏈嶅姟
+func (s *Server) StartSrv() error {
+ s.Logger.Debug("Start server...", zap.String("addr", s.addr))
+ // 閿欒
+ var err error
+ // tcpAddr
+ s.tcpAddr, err = net.ResolveTCPAddr("tcp", s.addr)
+ if err != nil {
+ s.Logger.Error("Can not build tcp server for now", zap.String("addr", s.addr), zap.Error(err))
+ return err
+ }
+
+ // 鐩戝惉
+ s.tcpListener, err = net.ListenTCP("tcp", s.tcpAddr)
+ if err != nil {
+ s.Logger.Error("Can not listen tcp server for now", zap.String("addr", s.addr), zap.Error(err))
+ return err
+ }
+
+ // 鏀跺埌杩炴帴
+ s.Logger.Debug("Starting to listen addr", zap.String("addr", s.addr))
+ s.waitGroup.Wrap(func() {
+ for {
+ // 鑾峰彇杩炴帴
+ clientConn, err := s.tcpListener.Accept()
+ if err != nil {
+ // 璁╁嚭grouting
+ if netErr, ok := err.(net.Error);ok && netErr.Temporary() {
+ s.Logger.Error("Continue listening...", zap.String("addr", s.tcpListener.Addr().String()), zap.Error(err))
+ runtime.Gosched()
+ continue
+ }
+
+ // 涓嶈兘浣跨敤宸插叧闂殑杩炴帴
+ if !strings.Contains(err.Error(), "use of closed network connection") {
+ s.Logger.Error("Can not use a closed network connection", zap.String("addr", s.addr), zap.Error(err))
+ }
+ break
+ }
+
+ // 澶勭悊杩炴帴
+ s.Logger.Debug("Client connected", zap.String("RemoteAddr", clientConn.RemoteAddr().String()), zap.String("LocalAddr", clientConn.LocalAddr().String()))
+ go s.Handler(clientConn)
+ }
+ })
+
+ // wait
+ s.waitGroup.Wait()
+ s.Logger.Warn("Tcp server exist", zap.String("addr", s.addr))
+ return nil
+}
+
+// 澶勭悊杩炴帴
+func (s *Server) Handler(clientConn net.Conn) {
+ s.Logger.Debug("Client connected...", zap.String("RemoteAddr", clientConn.RemoteAddr().String()), zap.String("LocalAddr", clientConn.LocalAddr().String()))
+
+ // 涓存椂ID
+ tplClientId := uuid.NewV4().String()
+
+ // 娉ㄥ唽淇℃伅
+ cliRegister := &aiot.DeviceRegister{}
+
+ // 鍒濆鍖栬繛鎺�
+ cliCon := &Clients{}
+ cli := client.NewClient(s.addr, tplClientId,cliRegister, cliCon, s.Logger)
+
+ // 璁剧疆杩炴帴鐘舵��
+ cli.SetState(client.StateConnected)
+ cli.Conn = clientConn
+
+ // 鍚敤璇诲啓鍙ユ焺
+ cli.SetRWBuf()
+
+ // wait
+ cli.Wait()
+ cli.Close()
+}
+
+// 闆嗙兢鏄惁娉ㄥ唽
+func (s *Server) IsMasterOnline(masterId string) bool {
+ if _,ok := s.ClusterDevice[masterId];ok {
+ return true
+ }
+ return false
+}
+
// 閫氳繃masterId鑾峰彇闆嗙兢ID
func (s *Server) GetClusterIdByMasterId(masterId string) string {
if clusterId,ok := s.ClusterMaster[masterId];ok {
@@ -69,41 +182,6 @@
s.Lock()
defer s.UnLock()
s.ClusterMaster[masterId] = clusterId
-}
-
-// 闆嗙兢鏄惁娉ㄥ唽
-func (s *Server) IsMasterOnline(masterId string) bool {
- if _,ok := s.ClusterDevice[masterId];ok {
- return true
- }
- return false
-}
-
-// 鍒濆鍖栨湇鍔�
-func NewServer(addr string, serverId string, serverCallBack ServerCallBack, logger *zap.SugaredLogger) *Server {
- logger.Debug("New server...", zap.String("addr", addr))
- return &Server{
- addr: addr,
- waitGroup: &util.WaitGroupWrapper{},
- serverId: serverId,
- deviceLock: new(sync.RWMutex),
- Clusters: make(map[string]*client.Client),
- Devices: make(map[string]struct{}),
- ClusterDevice: make(map[string]map[string]struct{}),
- ClusterBlackList: make(map[string]struct{}),
- serverCallBack: serverCallBack,
- Logger: logger,
- }
-}
-
-// 涓婇攣
-func (s *Server) Lock() {
- s.deviceLock.Lock()
-}
-
-// 瑙i攣
-func (s *Server) UnLock() {
- s.deviceLock.Unlock()
}
// 娉ㄥ唽闆嗙兢ID
@@ -186,83 +264,6 @@
return true
}
-// 鍚姩鏈嶅姟
-func (s *Server) StartSrv() error {
- s.Logger.Debug("Start server...", zap.String("addr", s.addr))
- // 閿欒
- var err error
- // tcpAddr
- s.tcpAddr, err = net.ResolveTCPAddr("tcp", s.addr)
- if err != nil {
- s.Logger.Error("Can not build tcp server for now", zap.String("addr", s.addr), zap.Error(err))
- return err
- }
-
- // 鐩戝惉
- s.tcpListener, err = net.ListenTCP("tcp", s.tcpAddr)
- if err != nil {
- s.Logger.Error("Can not listen tcp server for now", zap.String("addr", s.addr), zap.Error(err))
- return err
- }
-
- // 鏀跺埌杩炴帴
- s.Logger.Debug("Starting to listen addr", zap.String("addr", s.addr))
- s.waitGroup.Wrap(func() {
- for {
- // 鑾峰彇杩炴帴
- clientConn, err := s.tcpListener.Accept()
- if err != nil {
- // 璁╁嚭grouting
- if netErr, ok := err.(net.Error);ok && netErr.Temporary() {
- s.Logger.Error("Continue listening...", zap.String("addr", s.tcpListener.Addr().String()), zap.Error(err))
- runtime.Gosched()
- continue
- }
-
- // 涓嶈兘浣跨敤宸插叧闂殑杩炴帴
- if !strings.Contains(err.Error(), "use of closed network connection") {
- s.Logger.Error("Can not use a closed network connection", zap.String("addr", s.addr), zap.Error(err))
- }
- break
- }
-
- // 澶勭悊杩炴帴
- s.Logger.Debug("Client connected", zap.String("RemoteAddr", clientConn.RemoteAddr().String()), zap.String("LocalAddr", clientConn.LocalAddr().String()))
- go s.Handler(clientConn)
- }
- })
-
- // wait
- s.waitGroup.Wait()
- s.Logger.Warn("Tcp server exist", zap.String("addr", s.addr))
- return nil
-}
-
-// 澶勭悊杩炴帴
-func (s *Server) Handler(clientConn net.Conn) {
- s.Logger.Debug("Client connected...", zap.String("RemoteAddr", clientConn.RemoteAddr().String()), zap.String("LocalAddr", clientConn.LocalAddr().String()))
-
- // 涓存椂ID
- tplClientId := uuid.NewV4().String()
-
- // 娉ㄥ唽淇℃伅
- cliRegister := &aiot.DeviceRegister{}
-
- // 鍒濆鍖栬繛鎺�
- cliCon := &Clients{}
- cli := client.NewClient(s.addr, tplClientId,cliRegister, cliCon, s.Logger)
-
- // 璁剧疆杩炴帴鐘舵��
- cli.SetState(client.StateConnected)
- cli.Conn = clientConn
-
- // 鍚敤璇诲啓鍙ユ焺
- cli.SetRWBuf()
-
- // wait
- cli.Wait()
-}
-
// 澶勭悊璁惧娉ㄥ唽
func (s *Server) RegisterDevice(msg *aiot.Protocol, cli *client.Client) {
// 璁剧疆鑺傜偣ID
@@ -271,6 +272,6 @@
s.SetCluster(msg.SenderId, cli)
// 璁剧疆闆嗙兢
registerData := &aiot.DeviceRegister{}
- json.Unmarshal(msg.Data, registerData)
+ _ = json.Unmarshal(msg.Data, registerData)
s.SetDeviceList(msg.SenderId, registerData)
}
--
Gitblit v1.8.0