From d13ed6d79af61db6333ad5cc3200d96b6410d064 Mon Sep 17 00:00:00 2001
From: gongshangguo <gongshangguo@admin.com>
Date: 星期三, 02 三月 2022 10:17:46 +0800
Subject: [PATCH] close

---
 server/server.go |  246 +++++++++++++++++++++++++-----------------------
 1 files changed, 129 insertions(+), 117 deletions(-)

diff --git a/server/server.go b/server/server.go
index 4a9b6aa..ae2f50a 100644
--- a/server/server.go
+++ b/server/server.go
@@ -4,7 +4,7 @@
 	"basic.com/valib/go-aiot.git/aiotProto/aiot"
 	"basic.com/valib/go-aiot.git/client"
 	"basic.com/valib/go-aiot.git/util"
-	"basic.com/valib/logger.git"
+	"encoding/json"
 	uuid "github.com/satori/go.uuid"
 	"go.uber.org/zap"
 	"net"
@@ -39,10 +39,125 @@
 	ClusterBlackList map[string]struct{}
 	// 鍥炶皟鎺ュ彛
 	serverCallBack ServerCallBack
+	// logger
+	Logger *zap.SugaredLogger
 }
 
 // 鍏ㄥ眬鏈嶅姟
 var Srv *Server
+
+// 涓婇攣
+func (s *Server) Lock() {
+	s.deviceLock.Lock()
+}
+
+// 瑙i攣
+func (s *Server) UnLock() {
+	s.deviceLock.Unlock()
+}
+
+// 鍒濆鍖栨湇鍔�
+func NewServer(addr string, serverId string, serverCallBack ServerCallBack, logger *zap.SugaredLogger) *Server {
+	logger.Debug("New server...", zap.String("addr", addr))
+	return &Server{
+		addr: addr,
+		waitGroup: &util.WaitGroupWrapper{},
+		serverId: serverId,
+		deviceLock: new(sync.RWMutex),
+		Clusters: make(map[string]*client.Client),
+		Devices: make(map[string]struct{}),
+		ClusterDevice: make(map[string]map[string]struct{}),
+		ClusterBlackList: make(map[string]struct{}),
+		serverCallBack: serverCallBack,
+		Logger: logger,
+	}
+}
+
+// 鍚姩鏈嶅姟
+func (s *Server) StartSrv() error {
+	s.Logger.Debug("Start server...", zap.String("addr", s.addr))
+	// 閿欒
+	var err error
+	// tcpAddr
+	s.tcpAddr, err = net.ResolveTCPAddr("tcp", s.addr)
+	if err != nil {
+		s.Logger.Error("Can not build tcp server for now", zap.String("addr", s.addr), zap.Error(err))
+		return err
+	}
+
+	// 鐩戝惉
+	s.tcpListener, err = net.ListenTCP("tcp", s.tcpAddr)
+	if err != nil {
+		s.Logger.Error("Can not listen tcp server for now", zap.String("addr", s.addr), zap.Error(err))
+		return err
+	}
+
+	// 鏀跺埌杩炴帴
+	s.Logger.Debug("Starting to listen addr", zap.String("addr", s.addr))
+	s.waitGroup.Wrap(func() {
+		for {
+			// 鑾峰彇杩炴帴
+			clientConn, err := s.tcpListener.Accept()
+			if err != nil {
+				// 璁╁嚭grouting
+				if netErr, ok := err.(net.Error);ok && netErr.Temporary() {
+					s.Logger.Error("Continue listening...", zap.String("addr", s.tcpListener.Addr().String()), zap.Error(err))
+					runtime.Gosched()
+					continue
+				}
+
+				// 涓嶈兘浣跨敤宸插叧闂殑杩炴帴
+				if !strings.Contains(err.Error(), "use of closed network connection") {
+					s.Logger.Error("Can not use a closed network connection", zap.String("addr", s.addr), zap.Error(err))
+				}
+				break
+			}
+
+			// 澶勭悊杩炴帴
+			s.Logger.Debug("Client connected", zap.String("RemoteAddr", clientConn.RemoteAddr().String()), zap.String("LocalAddr", clientConn.LocalAddr().String()))
+			go s.Handler(clientConn)
+		}
+	})
+
+	// wait
+	s.waitGroup.Wait()
+	s.Logger.Warn("Tcp server exist", zap.String("addr", s.addr))
+	return nil
+}
+
+// 澶勭悊杩炴帴
+func (s *Server) Handler(clientConn net.Conn) {
+	s.Logger.Debug("Client connected...", zap.String("RemoteAddr", clientConn.RemoteAddr().String()), zap.String("LocalAddr", clientConn.LocalAddr().String()))
+
+	// 涓存椂ID
+	tplClientId := uuid.NewV4().String()
+
+	// 娉ㄥ唽淇℃伅
+	cliRegister := &aiot.DeviceRegister{}
+
+	// 鍒濆鍖栬繛鎺�
+	cliCon := &Clients{}
+	cli := client.NewClient(s.addr, tplClientId,cliRegister, cliCon, s.Logger)
+
+	// 璁剧疆杩炴帴鐘舵��
+	cli.SetState(client.StateConnected)
+	cli.Conn = clientConn
+
+	// 鍚敤璇诲啓鍙ユ焺
+	cli.SetRWBuf()
+
+	// wait
+	cli.Wait()
+	cli.Close()
+}
+
+// 闆嗙兢鏄惁娉ㄥ唽
+func (s *Server) IsMasterOnline(masterId string) bool {
+	if _,ok := s.ClusterDevice[masterId];ok {
+		return true
+	}
+	return false
+}
 
 // 閫氳繃masterId鑾峰彇闆嗙兢ID
 func (s *Server) GetClusterIdByMasterId(masterId string) string {
@@ -67,39 +182,6 @@
 	s.Lock()
 	defer s.UnLock()
 	s.ClusterMaster[masterId] = clusterId
-}
-
-// 闆嗙兢鏄惁娉ㄥ唽
-func (s *Server) IsMasterOnline(masterId string) bool {
-	if _,ok := s.ClusterDevice[masterId];ok {
-		return true
-	}
-	return false
-}
-
-// 鍒濆鍖栨湇鍔�
-func NewServer(addr string, serverId string, serverCallBack ServerCallBack) *Server {
-	return &Server{
-		addr: addr,
-		waitGroup: &util.WaitGroupWrapper{},
-		serverId: serverId,
-		deviceLock: new(sync.RWMutex),
-		Clusters: make(map[string]*client.Client),
-		Devices: make(map[string]struct{}),
-		ClusterDevice: make(map[string]map[string]struct{}),
-		ClusterBlackList: make(map[string]struct{}),
-		serverCallBack: serverCallBack,
-	}
-}
-
-// 涓婇攣
-func (s *Server) Lock() {
-	s.deviceLock.Lock()
-}
-
-// 瑙i攣
-func (s *Server) UnLock() {
-	s.deviceLock.Unlock()
 }
 
 // 娉ㄥ唽闆嗙兢ID
@@ -157,8 +239,9 @@
 }
 
 // 娉ㄥ唽璁惧淇℃伅
-func (s *Server) SetDeviceList(masterId string, nodeIds []string) bool {
-	if len(nodeIds) == 0 {
+func (s *Server) SetDeviceList(masterId string, registerData *aiot.DeviceRegister) bool {
+
+	if len(registerData.DeviceList) == 0 {
 		return true
 	}
 	// 閿�
@@ -171,95 +254,24 @@
 	}
 
 	// 娣诲姞璁惧ID
-	for _, nodeId := range nodeIds{
-		s.ClusterDevice[masterId][nodeId] = struct{}{}
-		s.Devices[nodeId] = struct{}{}
+	if s.ClusterDevice[masterId] == nil {
+		s.ClusterDevice[masterId] = make(map[string]struct{})
+	}
+	for _, node := range registerData.DeviceList{
+		s.ClusterDevice[masterId][node.DeviceId] = struct{}{}
+		s.Devices[node.DeviceId] = struct{}{}
 	}
 	return true
 }
 
-// 鍚姩鏈嶅姟
-func (s *Server) StartSrv() error {
-	// 閿欒
-	var err error
-	// tcpAddr
-	s.tcpAddr, err = net.ResolveTCPAddr("tcp", s.addr)
-	if err != nil {
-		logger.Error("Can not build tcp server for now", zap.String("addr", s.addr), zap.Error(err))
-		return err
-	}
-
-	// 鐩戝惉
-	s.tcpListener, err = net.ListenTCP("tcp", s.tcpAddr)
-	if err != nil {
-		logger.Error("Can not listen tcp server for now", zap.String("addr", s.addr), zap.Error(err))
-		return err
-	}
-
-	// 鏀跺埌杩炴帴
-	logger.Debug("Starting to listen addr", zap.String("addr", s.addr))
-	s.waitGroup.Wrap(func() {
-		for {
-			// 鑾峰彇杩炴帴
-			clientConn, err := s.tcpListener.Accept()
-			if err != nil {
-				// 璁╁嚭grouting
-				if netErr, ok := err.(net.Error);ok && netErr.Temporary() {
-					logger.Error("Continue listening...", zap.String("addr", s.tcpListener.Addr().String()), zap.Error(err))
-					runtime.Gosched()
-					continue
-				}
-
-				// 涓嶈兘浣跨敤宸插叧闂殑杩炴帴
-				if !strings.Contains(err.Error(), "use of closed network connection") {
-					logger.Error("Can not use a closed network connection", zap.String("addr", s.addr), zap.Error(err))
-				}
-				break
-			}
-
-			// 澶勭悊杩炴帴
-			logger.Debug("Client connected", zap.String("RemoteAddr", clientConn.RemoteAddr().String()), zap.String("LocalAddr", clientConn.LocalAddr().String()))
-			go s.Handler(clientConn)
-		}
-	})
-
-	// wait
-	s.waitGroup.Wait()
-	logger.Warn("Tcp server exist", zap.String("addr", s.addr))
-	return nil
-}
-
-// 澶勭悊杩炴帴
-func (s *Server) Handler(clientConn net.Conn) {
-	logger.Debug("Client connected...", zap.String("RemoteAddr", clientConn.RemoteAddr().String()), zap.String("LocalAddr", clientConn.LocalAddr().String()))
-
-	// 涓存椂ID
-	tplClientId := uuid.NewV4().String()
-
-	// 娉ㄥ唽淇℃伅
-	cliRegister := &aiot.DeviceRegister{}
-
-	// 鍒濆鍖栬繛鎺�
-	cliCon := &Clients{}
-	cli := client.NewClient(s.addr, tplClientId,cliRegister, cliCon)
-
-	// 璁剧疆杩炴帴鐘舵��
-	cli.SetState(client.StateConnected)
-	cli.Conn = clientConn
-
-	// 鍚敤璇诲啓鍙ユ焺
-	cli.SetRWBuf()
-
-	// wait
-	cli.Wait()
-}
-
 // 澶勭悊璁惧娉ㄥ唽
-func (s *Server) RegisterDevice(msg *aiot.Protocol, cli *client.Client) {
+func (s *Server) RegisterDevice(cli *client.Client, msg *aiot.Protocol) {
 	// 璁剧疆鑺傜偣ID
 	cli.SetDeviceId(msg.SenderId)
 	// 娣诲姞闆嗙兢ID
 	s.SetCluster(msg.SenderId, cli)
 	// 璁剧疆闆嗙兢
-	s.SetDeviceList(msg.SenderId, msg.DeviceProto.DeviceIds)
+	registerData := &aiot.DeviceRegister{}
+	_ = json.Unmarshal(msg.Data, registerData)
+	s.SetDeviceList(msg.SenderId, registerData)
 }

--
Gitblit v1.8.0