From acf384f3d6653f4696446b113177140f491c4e38 Mon Sep 17 00:00:00 2001
From: gongshangguo <gongshangguo@admin.com>
Date: 星期二, 12 四月 2022 09:58:10 +0800
Subject: [PATCH] 更新aiot
---
server/server.go | 246 +++++++++++++++++++++++++-----------------------
1 files changed, 129 insertions(+), 117 deletions(-)
diff --git a/server/server.go b/server/server.go
index 4a9b6aa..ae2f50a 100644
--- a/server/server.go
+++ b/server/server.go
@@ -4,7 +4,7 @@
"basic.com/valib/go-aiot.git/aiotProto/aiot"
"basic.com/valib/go-aiot.git/client"
"basic.com/valib/go-aiot.git/util"
- "basic.com/valib/logger.git"
+ "encoding/json"
uuid "github.com/satori/go.uuid"
"go.uber.org/zap"
"net"
@@ -39,10 +39,125 @@
ClusterBlackList map[string]struct{}
// 鍥炶皟鎺ュ彛
serverCallBack ServerCallBack
+ // logger
+ Logger *zap.SugaredLogger
}
// 鍏ㄥ眬鏈嶅姟
var Srv *Server
+
+// 涓婇攣
+func (s *Server) Lock() {
+ s.deviceLock.Lock()
+}
+
+// 瑙i攣
+func (s *Server) UnLock() {
+ s.deviceLock.Unlock()
+}
+
+// 鍒濆鍖栨湇鍔�
+func NewServer(addr string, serverId string, serverCallBack ServerCallBack, logger *zap.SugaredLogger) *Server {
+ logger.Debug("New server...", zap.String("addr", addr))
+ return &Server{
+ addr: addr,
+ waitGroup: &util.WaitGroupWrapper{},
+ serverId: serverId,
+ deviceLock: new(sync.RWMutex),
+ Clusters: make(map[string]*client.Client),
+ Devices: make(map[string]struct{}),
+ ClusterDevice: make(map[string]map[string]struct{}),
+ ClusterBlackList: make(map[string]struct{}),
+ serverCallBack: serverCallBack,
+ Logger: logger,
+ }
+}
+
+// 鍚姩鏈嶅姟
+func (s *Server) StartSrv() error {
+ s.Logger.Debug("Start server...", zap.String("addr", s.addr))
+ // 閿欒
+ var err error
+ // tcpAddr
+ s.tcpAddr, err = net.ResolveTCPAddr("tcp", s.addr)
+ if err != nil {
+ s.Logger.Error("Can not build tcp server for now", zap.String("addr", s.addr), zap.Error(err))
+ return err
+ }
+
+ // 鐩戝惉
+ s.tcpListener, err = net.ListenTCP("tcp", s.tcpAddr)
+ if err != nil {
+ s.Logger.Error("Can not listen tcp server for now", zap.String("addr", s.addr), zap.Error(err))
+ return err
+ }
+
+ // 鏀跺埌杩炴帴
+ s.Logger.Debug("Starting to listen addr", zap.String("addr", s.addr))
+ s.waitGroup.Wrap(func() {
+ for {
+ // 鑾峰彇杩炴帴
+ clientConn, err := s.tcpListener.Accept()
+ if err != nil {
+ // 璁╁嚭grouting
+ if netErr, ok := err.(net.Error);ok && netErr.Temporary() {
+ s.Logger.Error("Continue listening...", zap.String("addr", s.tcpListener.Addr().String()), zap.Error(err))
+ runtime.Gosched()
+ continue
+ }
+
+ // 涓嶈兘浣跨敤宸插叧闂殑杩炴帴
+ if !strings.Contains(err.Error(), "use of closed network connection") {
+ s.Logger.Error("Can not use a closed network connection", zap.String("addr", s.addr), zap.Error(err))
+ }
+ break
+ }
+
+ // 澶勭悊杩炴帴
+ s.Logger.Debug("Client connected", zap.String("RemoteAddr", clientConn.RemoteAddr().String()), zap.String("LocalAddr", clientConn.LocalAddr().String()))
+ go s.Handler(clientConn)
+ }
+ })
+
+ // wait
+ s.waitGroup.Wait()
+ s.Logger.Warn("Tcp server exist", zap.String("addr", s.addr))
+ return nil
+}
+
+// 澶勭悊杩炴帴
+func (s *Server) Handler(clientConn net.Conn) {
+ s.Logger.Debug("Client connected...", zap.String("RemoteAddr", clientConn.RemoteAddr().String()), zap.String("LocalAddr", clientConn.LocalAddr().String()))
+
+ // 涓存椂ID
+ tplClientId := uuid.NewV4().String()
+
+ // 娉ㄥ唽淇℃伅
+ cliRegister := &aiot.DeviceRegister{}
+
+ // 鍒濆鍖栬繛鎺�
+ cliCon := &Clients{}
+ cli := client.NewClient(s.addr, tplClientId,cliRegister, cliCon, s.Logger)
+
+ // 璁剧疆杩炴帴鐘舵��
+ cli.SetState(client.StateConnected)
+ cli.Conn = clientConn
+
+ // 鍚敤璇诲啓鍙ユ焺
+ cli.SetRWBuf()
+
+ // wait
+ cli.Wait()
+ cli.Close()
+}
+
+// 闆嗙兢鏄惁娉ㄥ唽
+func (s *Server) IsMasterOnline(masterId string) bool {
+ if _,ok := s.ClusterDevice[masterId];ok {
+ return true
+ }
+ return false
+}
// 閫氳繃masterId鑾峰彇闆嗙兢ID
func (s *Server) GetClusterIdByMasterId(masterId string) string {
@@ -67,39 +182,6 @@
s.Lock()
defer s.UnLock()
s.ClusterMaster[masterId] = clusterId
-}
-
-// 闆嗙兢鏄惁娉ㄥ唽
-func (s *Server) IsMasterOnline(masterId string) bool {
- if _,ok := s.ClusterDevice[masterId];ok {
- return true
- }
- return false
-}
-
-// 鍒濆鍖栨湇鍔�
-func NewServer(addr string, serverId string, serverCallBack ServerCallBack) *Server {
- return &Server{
- addr: addr,
- waitGroup: &util.WaitGroupWrapper{},
- serverId: serverId,
- deviceLock: new(sync.RWMutex),
- Clusters: make(map[string]*client.Client),
- Devices: make(map[string]struct{}),
- ClusterDevice: make(map[string]map[string]struct{}),
- ClusterBlackList: make(map[string]struct{}),
- serverCallBack: serverCallBack,
- }
-}
-
-// 涓婇攣
-func (s *Server) Lock() {
- s.deviceLock.Lock()
-}
-
-// 瑙i攣
-func (s *Server) UnLock() {
- s.deviceLock.Unlock()
}
// 娉ㄥ唽闆嗙兢ID
@@ -157,8 +239,9 @@
}
// 娉ㄥ唽璁惧淇℃伅
-func (s *Server) SetDeviceList(masterId string, nodeIds []string) bool {
- if len(nodeIds) == 0 {
+func (s *Server) SetDeviceList(masterId string, registerData *aiot.DeviceRegister) bool {
+
+ if len(registerData.DeviceList) == 0 {
return true
}
// 閿�
@@ -171,95 +254,24 @@
}
// 娣诲姞璁惧ID
- for _, nodeId := range nodeIds{
- s.ClusterDevice[masterId][nodeId] = struct{}{}
- s.Devices[nodeId] = struct{}{}
+ if s.ClusterDevice[masterId] == nil {
+ s.ClusterDevice[masterId] = make(map[string]struct{})
+ }
+ for _, node := range registerData.DeviceList{
+ s.ClusterDevice[masterId][node.DeviceId] = struct{}{}
+ s.Devices[node.DeviceId] = struct{}{}
}
return true
}
-// 鍚姩鏈嶅姟
-func (s *Server) StartSrv() error {
- // 閿欒
- var err error
- // tcpAddr
- s.tcpAddr, err = net.ResolveTCPAddr("tcp", s.addr)
- if err != nil {
- logger.Error("Can not build tcp server for now", zap.String("addr", s.addr), zap.Error(err))
- return err
- }
-
- // 鐩戝惉
- s.tcpListener, err = net.ListenTCP("tcp", s.tcpAddr)
- if err != nil {
- logger.Error("Can not listen tcp server for now", zap.String("addr", s.addr), zap.Error(err))
- return err
- }
-
- // 鏀跺埌杩炴帴
- logger.Debug("Starting to listen addr", zap.String("addr", s.addr))
- s.waitGroup.Wrap(func() {
- for {
- // 鑾峰彇杩炴帴
- clientConn, err := s.tcpListener.Accept()
- if err != nil {
- // 璁╁嚭grouting
- if netErr, ok := err.(net.Error);ok && netErr.Temporary() {
- logger.Error("Continue listening...", zap.String("addr", s.tcpListener.Addr().String()), zap.Error(err))
- runtime.Gosched()
- continue
- }
-
- // 涓嶈兘浣跨敤宸插叧闂殑杩炴帴
- if !strings.Contains(err.Error(), "use of closed network connection") {
- logger.Error("Can not use a closed network connection", zap.String("addr", s.addr), zap.Error(err))
- }
- break
- }
-
- // 澶勭悊杩炴帴
- logger.Debug("Client connected", zap.String("RemoteAddr", clientConn.RemoteAddr().String()), zap.String("LocalAddr", clientConn.LocalAddr().String()))
- go s.Handler(clientConn)
- }
- })
-
- // wait
- s.waitGroup.Wait()
- logger.Warn("Tcp server exist", zap.String("addr", s.addr))
- return nil
-}
-
-// 澶勭悊杩炴帴
-func (s *Server) Handler(clientConn net.Conn) {
- logger.Debug("Client connected...", zap.String("RemoteAddr", clientConn.RemoteAddr().String()), zap.String("LocalAddr", clientConn.LocalAddr().String()))
-
- // 涓存椂ID
- tplClientId := uuid.NewV4().String()
-
- // 娉ㄥ唽淇℃伅
- cliRegister := &aiot.DeviceRegister{}
-
- // 鍒濆鍖栬繛鎺�
- cliCon := &Clients{}
- cli := client.NewClient(s.addr, tplClientId,cliRegister, cliCon)
-
- // 璁剧疆杩炴帴鐘舵��
- cli.SetState(client.StateConnected)
- cli.Conn = clientConn
-
- // 鍚敤璇诲啓鍙ユ焺
- cli.SetRWBuf()
-
- // wait
- cli.Wait()
-}
-
// 澶勭悊璁惧娉ㄥ唽
-func (s *Server) RegisterDevice(msg *aiot.Protocol, cli *client.Client) {
+func (s *Server) RegisterDevice(cli *client.Client, msg *aiot.Protocol) {
// 璁剧疆鑺傜偣ID
cli.SetDeviceId(msg.SenderId)
// 娣诲姞闆嗙兢ID
s.SetCluster(msg.SenderId, cli)
// 璁剧疆闆嗙兢
- s.SetDeviceList(msg.SenderId, msg.DeviceProto.DeviceIds)
+ registerData := &aiot.DeviceRegister{}
+ _ = json.Unmarshal(msg.Data, registerData)
+ s.SetDeviceList(msg.SenderId, registerData)
}
--
Gitblit v1.8.0