From abfd401180d1eb09c8ed1c24797c3e503e45fa08 Mon Sep 17 00:00:00 2001
From: liuxiaolong <liuxiaolong@aiotlink.com>
Date: 星期日, 07 二月 2021 15:02:50 +0800
Subject: [PATCH] 调整超时为3s

---
 hbusc.go |  317 +++++++++++++++++++++++++++++++++++-----------------
 1 files changed, 215 insertions(+), 102 deletions(-)

diff --git a/hbusc.go b/hbusc.go
index e5d4e33..640b55c 100644
--- a/hbusc.go
+++ b/hbusc.go
@@ -1,4 +1,4 @@
-package mc
+package bhomeclient
 
 import (
 	"basic.com/valib/bhomebus.git"
@@ -7,7 +7,6 @@
 	"errors"
 	"fmt"
 	"os"
-	"strconv"
 	"sync"
 	"time"
 )
@@ -48,23 +47,24 @@
 	sockSub *sockClient  //璁㈤槄涓婚鐨剆ocket锛岀嚎绋嬪疄鏃舵帴鏀舵秷鎭紝闇�瑕佸崟鐙鐞�
 
 	sockWorker 	*sockClient  //鍙戠粰浠绘剰鐨剆erver,鐭殏鐨剅equest client
-	mtxWorker 	sync.Mutex	 //SendAndRecv鍙兘涓嶆槸绾跨▼瀹夊叏鐨�
+	//mtxWorker 	sync.Mutex	 //SendAndRecv鍙兘涓嶆槸绾跨▼瀹夊叏鐨�
 
 	chSub chan TransInfo
-	chReply chan TransInfo
 }
 
 //鑾峰彇鍏朵粬杩涚▼鍙戠粰姝ocket鐨勬秷鎭�,鍙互鏄痵ub鐨勬帴鏀讹紝涔熷彲浠ユ槸reply鐨勬帴鏀躲��
-func recvRoutine(ctx context.Context, s *bhomebus.Socket, wg *sync.WaitGroup, ch chan<-TransInfo) {
+func recvRoutine(ctx context.Context, s *bhomebus.Socket, wg *sync.WaitGroup, ch chan<-TransInfo, logFn func(...interface{})) {
 	var data []byte
 	var key int
 	for {
 		select {
 		case <-ctx.Done():
+			logFn("recvRoutine ctx.Done")
 			wg.Done()
 			return
 		default:
-			if n := s.RecvfromTimeout(&data, &key, 10);n == 0 {
+			n := s.RecvfromTimeout(&data, &key, 1000) //鐩墠10001杩斿洖鍊艰〃绀鸿秴鏃�
+			if n == 0 {
 				var info MsgInfo
 				if err := json.Unmarshal(data, &info);err == nil {
 					ch <- TransInfo{
@@ -74,7 +74,29 @@
 
 					data = []byte{}
 					key = 0
+				} else {
+					logFn("unmarshal to MsgInfo err:", err)
 				}
+			} else {
+				time.Sleep(100 * time.Millisecond)
+			}
+		}
+	}
+}
+
+func recvandsendRoutine(ctx context.Context, s *bhomebus.Socket, wg *sync.WaitGroup,serveFn func(rdata []byte, rkey int, sdata *[]byte) bool, logFn func(...interface{})) {
+	for {
+		select {
+		case <-ctx.Done():
+			logFn("recvandsendRoutine ctx.Done")
+			wg.Done()
+			return
+		default:
+			n := s.RecvandsendTimeout(1000, serveFn) //鐩墠10001杩斿洖鍊艰〃绀鸿秴鏃�
+			if n != 0 {
+				logFn("RecvandsendTimeout success")
+			} else {
+				//time.Sleep(100 * time.Millisecond)
 			}
 		}
 	}
@@ -82,11 +104,11 @@
 
 //Register
 func Register(ctx context.Context, q chan os.Signal, config *Config, ri *RegisterInfo) (*BHBus,error) {
-	handle := &BHBus{
+	handle := &BHBus {
+		ctx: ctx,
 		conf: config,
 		m: make(map[string]*sockServer),
 		chSub: make(chan TransInfo, config.chSize),
-		chReply: make(chan TransInfo, config.chSize),
 	}
 
 	var err error
@@ -148,30 +170,32 @@
 			}
 
 			var rMsg []bhomebus.Mesg
-			n := regSock.Sendandrecv(regAddr, msg, &rMsg) //n浠h〃鎴愬姛鍙戦�佺殑鑺傜偣鐨勪釜鏁�
-			handle.printLog("regSock.Sendandrecv n:", n, "len(rMsg):", len(rMsg))
+			n := regSock.SendandrecvTimeout(regAddr, msg, &rMsg, handle.conf.sendTimeOut) //n浠h〃鎴愬姛鍙戦�佺殑鑺傜偣鐨勪釜鏁�
+			handle.printLog("regSock.SendandrecvTimeout n:", n)
 			if n == 1 && len(rMsg) == 1 {
-				var cr CommonReply
+				var cr Reply
 				if err = json.Unmarshal(rMsg[0].Data, &cr);err != nil {
 					handle.printLog("unmarshal regReply err:", err)
 					return nil, errors.New("unmarshal regReply err:"+err.Error())
 				} else {
-					if cr.Status == REPLY_SUCCESS {
-						var rr RegisterReply
-						if err = json.Unmarshal(cr.Body, &rr);err ==nil {
-							regR = &rr
-							break loop
+					if cr.Success {
+						if rpd,err := json.Marshal(cr.Data);err ==nil {
+							var rr RegisterReply
+							if err = json.Unmarshal(rpd, &rr); err == nil {
+								regR = &rr
+								break loop
+							} else {
+								handle.printLog("unmarshal RegisterReply err:", err)
+							}
 						} else {
-							handle.printLog("unmarshal RegisterReply err:", err)
+							handle.printLog("marshal cr.Data err:", err)
 						}
-
 					} else {
-						handle.printLog("cr.Status:", cr.Status, "Desc:", cr.Desc)
+						handle.printLog("cr:", cr)
 					}
-
 				}
 			} else {
-				time.Sleep(100 * time.Millisecond)
+				time.Sleep(1 * time.Second)
 			}
 		}
 	}
@@ -181,7 +205,7 @@
 	for _, v := range ri.Channel {
 		if k,ok := regR.ChannelKey[v];ok {
 			s := bhomebus.OpenSocket()
-			s.Bind(int(k))
+			s.ForceBind(int(k))
 			handle.m[v] = &sockServer{
 				sock: s,
 				info: &ri.Proc,
@@ -189,24 +213,29 @@
 		}
 	}
 
-	handle.wg = &sync.WaitGroup{}
-
-	sockReply := bhomebus.OpenSocket()
-	sockReply.Bind(int(regR.ReplyKey))
-	handle.wg.Add(1)
-	//serve server reply
-	go recvRoutine(ctx, sockReply, handle.wg, handle.chReply)
-	handle.sockRep = &sockServer{
-		sock: sockReply,
-		info: &ri.Proc,
-	}
-
 	//缁存寔蹇冭烦鐨剆ocket
 	sockHB := bhomebus.OpenSocket()
+	handle.printLog("open sockHB")
 	handle.sockHB = &sockClient{
 		sock: sockHB,
 		peer: int(regR.HeartbeatKey),
 	}
+
+	handle.wg = &sync.WaitGroup{}
+
+	if ri.PubTopic != nil && len(ri.PubTopic) > 0 {
+		sockReply := bhomebus.OpenSocket()
+		sockReply.ForceBind(int(regR.ReplyKey))
+		handle.printLog("after pubTopic forceBind")
+		//handle.wg.Add(1)
+		//serve server reply
+		//go recvRoutine(ctx, sockReply, handle.wg, handle.chReply, handle.printLog)
+		handle.sockRep = &sockServer{
+			sock: sockReply,
+			info: &ri.Proc,
+		}
+	}
+
 
 	//鍙戝竷娑堟伅鐨剆ocket, pub鏄皢娑堟伅鍙戝竷鍒癰us涓紝鎵�浠ヤ笉闇�瑕佹寚瀹歬ey
 	sockPub := bhomebus.OpenSocket()
@@ -215,19 +244,24 @@
 		peer: -1,
 	}
 
-	//璁㈤槄娑堟伅鐨剆ocket
-	sockSub := bhomebus.OpenSocket()
-	//璁㈤槄鎵�鏈変富棰�
-	for _,v := range ri.SubTopic {
-		sockSub.Sub(v)
-	}
+	//鏈夎闃呮秷鎭墠闇�瑕佸惎鍔ㄥ崗绋嬫帴鏀舵秷鎭�
+	if ri.SubTopic != nil && len(ri.SubTopic) > 0 {
+		//璁㈤槄娑堟伅鐨剆ocket
+		sockSub := bhomebus.OpenSocket()
+		//璁㈤槄鎵�鏈変富棰�
+		handle.printLog("start Sub topics")
+		for _,v := range ri.SubTopic {
+			subN := sockSub.Sub(v)
+			handle.printLog("subTopic:", v, " ret n:", subN)
+		}
 
-	//鍚姩璁㈤槄淇℃伅鎺ユ敹
-	handle.wg.Add(1)
-	go recvRoutine(ctx, sockSub, handle.wg, handle.chSub)
-	handle.sockSub = &sockClient{
-		sock: sockSub,
-		peer: -1,
+		//鍚姩璁㈤槄淇℃伅鎺ユ敹
+		handle.wg.Add(1)
+		go recvRoutine(ctx, sockSub, handle.wg, handle.chSub, handle.printLog)
+		handle.sockSub = &sockClient{
+			sock: sockSub,
+			peer: -1,
+		}
 	}
 
 	sockWorker := bhomebus.OpenSocket()
@@ -255,8 +289,8 @@
 		return err
 	}
 
-	h.mtxWorker.Lock()
-	defer h.mtxWorker.Unlock()
+	//h.mtxWorker.Lock()
+	//defer h.mtxWorker.Unlock()
 	netNodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{
 		Key: h.conf.regKey,
 	})
@@ -277,7 +311,9 @@
 
 //Release
 func (h *BHBus) Free() {
+	h.printLog("call BHBus free")
 	h.wg.Wait()
+	h.printLog("h.wg.Wait done")
 	for _,v := range h.m {
 		v.sock.Close()
 	}
@@ -355,74 +391,112 @@
 //鑾峰彇topic瀵瑰簲鐨刱ey
 //濡傛灉浼犱簡serverId涓嶄负绌猴紝鍒欒幏鍙栨寚瀹氭満鍣ㄤ笂鐨則opic-key
 //濡傛灉server涓虹┖锛屽垯鑾峰彇鎵�鏈夎妭鐐逛笂topic-key
-func (h *BHBus) GetNetNodeByTopic(serverId string, topic string) ([]bhomebus.NetNode,error) {
+func (h *BHBus) GetNetNodeByTopic(serverId string,srcProc *ProcInfo, topic string) ([]bhomebus.NetNode,error) {
 	h.mtxNode.Lock()
 	defer h.mtxNode.Unlock()
 	var nodes []bhomebus.NetNode
-	if h.nodes != nil {
-		for _,n := range h.nodes {
-			if serverId != "" { //鑾峰彇鎸囧畾鑺傜偣鐨�
-				if n.SvrInfo.ID == serverId {
-					if k,ok := n.Topic2Key[topic];ok {
-						nodes = append(nodes, bhomebus.NetNode{
-							IPHost:n.SvrInfo.IP,
-							Port:n.SvrInfo.Port,
-							Key:k,
-						})
-					}
-				}
-			} else { //鑾峰彇鎵�鏈夎妭鐐圭殑
-				if k,ok := n.Topic2Key[topic];ok {
-					nodes = append(nodes, bhomebus.NetNode{
-						IPHost:n.SvrInfo.IP,
-						Port:n.SvrInfo.Port,
-						Key:k,
-					})
-				}
-			}
+	reqNetNode := append([]bhomebus.NetNode{}, bhomebus.NetNode{
+		Key: h.sockWorker.peer,
+	})
+	reqD,err := json.Marshal(MsgInfo{
+		SrcProc: *srcProc,
+		MsgType: MesgType_ReqRep,
+		Topic:   TOPIC_QUERYKEY,
+		Body:    []byte(topic),
+	})
+	if err != nil {
+		return nil, fmt.Errorf("marshal req err:%s", err.Error())
+	}
+	var ret []bhomebus.Mesg
+	n := h.sockWorker.sock.SendandrecvTimeout(reqNetNode, reqD, &ret, h.conf.sendTimeOut)
+	if n > 0 {
+		var reply Reply
+		err = json.Unmarshal(ret[0].Data, &reply)
+		if err != nil {
+			h.printLog("unmarshal err:", err)
+			return nil, err
 		}
+
+		if reply.Success {
+			rd,err := json.Marshal(reply.Data)
+			if err == nil {
+				err = json.Unmarshal(rd, &nodes)
+				if err == nil {
+					return nodes, nil
+				} else {
+					h.printLog("unmarshal err:", err, "nodes:", nodes, " center reply.Data:", reply.Data)
+					return nil, fmt.Errorf("unmarshal to nodes err:%s", err.Error())
+				}
+			} else {
+				return nil, fmt.Errorf("marshal reply.Data err:%s", err.Error())
+			}
+
+		} else {
+			h.printLog("reply success:", reply.Success, "msg:", reply.Msg, "data:", reply.Data)
+			return nil, fmt.Errorf("REPLY msg:%s", reply.Msg)
+		}
+	} else {
+		return nil,	fmt.Errorf("GetNetNodeByTopic ret n:%d", n)
 	}
-	if len(nodes) == 0 {
-		return nil,fmt.Errorf("topic not found in nodes")
-	}
-	return nodes, nil
 }
 
-func (h *BHBus) Request(serverId string, req *MsgInfo, milliSecs int) (resp *MsgInfo, err error) {
+func (h *BHBus) Request(serverId string, req *MsgInfo, milliSecs int) (*Reply, error) {
 	//1.棣栧厛闇�瑕侀�氳繃topic鎷垮埌鏈満瀵瑰簲鐨凬etNode
-	rNodes, err := h.GetNetNodeByTopic(serverId, req.Topic)
+	rNodes, err := h.GetNetNodeByTopic(serverId, &req.SrcProc, req.Topic)
+	h.printLog("topic", req.Topic, "rNodes:", rNodes, "err:", err)
 	if err != nil {
 		return nil, err
 	}
 	//2.灏嗚姹傝繑閫佸埌瀵瑰簲鐨剆erver,骞剁瓑寰呰繑鍥炲��
 	data, err := json.Marshal(*req)
+	h.printLog("marshal(*req) err:", err)
 	if err != nil {
 		return nil, err
 	}
 	var ret []bhomebus.Mesg
 
-	if n := h.sockWorker.sock.SendandrecvTimeout(rNodes, data,&ret, milliSecs);n == 0 {
-		if len(ret) > 0 {
-			if err = json.Unmarshal(ret[0].Data, resp); err == nil {
-				return resp, nil
-			}
+	n := h.sockWorker.sock.SendandrecvTimeout(rNodes, data,&ret, milliSecs)
+
+	if n > 0 && len(ret) > 0 {
+		var resp Reply
+		if err = json.Unmarshal(ret[0].Data, &resp); err == nil {
+			return &resp, nil
+		} else {
+			h.printLog("unmarshal ret[0].Data err:", err)
+			return nil, err
 		}
+	} else {
+		h.printLog("Request n: ", n, " len(ret): ", len(ret))
 	}
 	return nil, fmt.Errorf("request err")
 }
 
-func (h *BHBus) Reply(replyKey int, i MsgInfo) error {
-	data,err := json.Marshal(i)
-	if err != nil {
-		return err
-	}
+func (h *BHBus) RequestOnly(rData []byte, rNodes []bhomebus.NetNode) ([]byte, error) {
+	var ret []bhomebus.Mesg
 
-	n := h.sockRep.sock.SendtoTimeout(data, replyKey, h.conf.sendTimeOut)
-	if n != 0 {
-		return errors.New("reply sendToTimeOut n:"+strconv.Itoa(n))
+	n := h.sockWorker.sock.SendandrecvTimeout(rNodes, rData, &ret, h.conf.sendTimeOut)
+
+	if n > 0 && len(ret) > 0 {
+		return ret[0].Data, nil
+	} else {
+		h.printLog("Request n: ", n, " len(ret): ", len(ret))
 	}
-	return nil
+	return nil, fmt.Errorf("request err")
 }
+
+//func (h *BHBus) Reply(replyKey int, i *Reply) error {
+//	data,err := json.Marshal(*i)
+//	if err != nil {
+//		return err
+//	}
+//
+//	n := h.sockRep.sock.SendtoTimeout(data, replyKey, h.conf.sendTimeOut)
+//	h.printLog("reply to key:", replyKey, " n:",n)
+//	if n != 0 {
+//		return errors.New("reply sendToTimeOut n:"+strconv.Itoa(n))
+//	}
+//	return nil
+//}
 
 
 //鍙彂閫佽姹傦紝涓嶉渶瑕佸簲绛�.
@@ -432,8 +506,8 @@
 	if err != nil {
 		return err
 	}
-	h.mtxWorker.Lock()
-	defer h.mtxWorker.Unlock()
+	//h.mtxWorker.Lock()
+	//defer h.mtxWorker.Unlock()
 	n := h.sockWorker.sock.SendtoTimeout(data, key, h.conf.sendTimeOut)
 	if n != 0 {
 		return fmt.Errorf("sendOnly ret n:%d", n)
@@ -441,11 +515,36 @@
 	return nil
 }
 
+func (h *BHBus) RequestCenter(req *MsgInfo) (*Reply, error) {
+	data, err := json.Marshal(*req)
+	if err != nil {
+		return nil, err
+	}
+	rNodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{
+		Key: KEY_QUERY,
+	})
+	//h.mtxWorker.Lock()
+	//defer h.mtxWorker.Unlock()
+	var ret []bhomebus.Mesg
+	n := h.sockWorker.sock.SendandrecvTimeout(rNodes, data,&ret, h.conf.sendTimeOut)
+	h.printLog("requestCenter n:", n, "len(ret):", len(ret))
+	if n > 0 && len(ret) > 0{
+		var cr Reply
+		if err = json.Unmarshal(ret[0].Data, &cr); err == nil {
+			return &cr, nil
+		} else {
+			h.printLog("unmarshal to CommonReply err:", err)
+		}
+	}
+	return nil, fmt.Errorf("request center err")
+}
+
+
 //鍚戜富棰橀�氶亾涓彂甯冩秷鎭�
 func (h *BHBus) Pub(nodes []bhomebus.NetNode, msg *MsgInfo) error {
 	data,err := json.Marshal(*msg)
 	if err == nil {
-		if n := h.sockPub.sock.PubTimeout(nodes, msg.Topic, data, h.conf.pubTimeOut);n == 0 {
+		if n := h.sockPub.sock.PubTimeout(nodes, msg.Topic, data, h.conf.pubTimeOut);n > 0 {
 			return nil
 		} else {
 			return fmt.Errorf("pub err n:%d", n)
@@ -453,6 +552,14 @@
 	}
 
 	return err
+}
+
+func (h *BHBus) PubTimeout(nodes []bhomebus.NetNode, msg *MsgInfo, timeout int) int {
+	data,err := json.Marshal(*msg)
+	if err == nil {
+		return h.sockPub.sock.PubTimeout(nodes, msg.Topic, data, timeout)
+	}
+	return -1
 }
 
 //杩藉姞璁㈤槄鐨勪富棰樻秷鎭�
@@ -464,20 +571,26 @@
 	}
 }
 
+//娉ㄩ攢璁㈤槄鐨勪富棰�
+func (h *BHBus) DeSub(topics []string) {
+	if topics != nil {
+		for _,t := range topics {
+			if n := h.sockSub.sock.Desub(t); n != 0 {
+				h.printLog("DeSub topic:", t, " n:", n)
+			}
+		}
+	}
+}
+
 
 //鑾峰彇sub 鎴栬�呴渶瑕乺eply鐨勬秷鎭�
-func (h *BHBus) GetMsg() (subMsg *MsgInfo, replyMsg *MsgInfo, replyKey int) {
-	if h.sockHB == nil && h.sockRep == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil {
-		return nil,nil, -1
+func (h *BHBus) GetMsg() (subMsg *MsgInfo) {
+	if h.sockHB == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil {
+		return nil
 	}
-	if len(h.chSub) >1 {
+	if len(h.chSub) >0 {
 		m  := <-h.chSub
 		subMsg = m.info
-	}
-	if len(h.chReply) > 1 {
-		m := <-h.chReply
-		replyMsg = m.info
-		replyKey = m.port
 	}
 	return
 }
\ No newline at end of file

--
Gitblit v1.8.0