From abfd401180d1eb09c8ed1c24797c3e503e45fa08 Mon Sep 17 00:00:00 2001
From: liuxiaolong <liuxiaolong@aiotlink.com>
Date: 星期日, 07 二月 2021 15:02:50 +0800
Subject: [PATCH] 调整超时为3s
---
hbusc.go | 299 +++++++++++++++++++++++++++++++++++++----------------------
1 files changed, 189 insertions(+), 110 deletions(-)
diff --git a/hbusc.go b/hbusc.go
index def5a75..640b55c 100644
--- a/hbusc.go
+++ b/hbusc.go
@@ -7,7 +7,6 @@
"errors"
"fmt"
"os"
- "strconv"
"sync"
"time"
)
@@ -48,23 +47,24 @@
sockSub *sockClient //璁㈤槄涓婚鐨剆ocket锛岀嚎绋嬪疄鏃舵帴鏀舵秷鎭紝闇�瑕佸崟鐙鐞�
sockWorker *sockClient //鍙戠粰浠绘剰鐨剆erver,鐭殏鐨剅equest client
- mtxWorker sync.Mutex //SendAndRecv鍙兘涓嶆槸绾跨▼瀹夊叏鐨�
+ //mtxWorker sync.Mutex //SendAndRecv鍙兘涓嶆槸绾跨▼瀹夊叏鐨�
chSub chan TransInfo
- chReply chan TransInfo
}
//鑾峰彇鍏朵粬杩涚▼鍙戠粰姝ocket鐨勬秷鎭�,鍙互鏄痵ub鐨勬帴鏀讹紝涔熷彲浠ユ槸reply鐨勬帴鏀躲��
-func recvRoutine(ctx context.Context, s *bhomebus.Socket, wg *sync.WaitGroup, ch chan<-TransInfo) {
+func recvRoutine(ctx context.Context, s *bhomebus.Socket, wg *sync.WaitGroup, ch chan<-TransInfo, logFn func(...interface{})) {
var data []byte
var key int
for {
select {
case <-ctx.Done():
+ logFn("recvRoutine ctx.Done")
wg.Done()
return
default:
- if n := s.RecvfromTimeout(&data, &key, 10);n == 0 {
+ n := s.RecvfromTimeout(&data, &key, 1000) //鐩墠10001杩斿洖鍊艰〃绀鸿秴鏃�
+ if n == 0 {
var info MsgInfo
if err := json.Unmarshal(data, &info);err == nil {
ch <- TransInfo{
@@ -74,7 +74,29 @@
data = []byte{}
key = 0
+ } else {
+ logFn("unmarshal to MsgInfo err:", err)
}
+ } else {
+ time.Sleep(100 * time.Millisecond)
+ }
+ }
+ }
+}
+
+func recvandsendRoutine(ctx context.Context, s *bhomebus.Socket, wg *sync.WaitGroup,serveFn func(rdata []byte, rkey int, sdata *[]byte) bool, logFn func(...interface{})) {
+ for {
+ select {
+ case <-ctx.Done():
+ logFn("recvandsendRoutine ctx.Done")
+ wg.Done()
+ return
+ default:
+ n := s.RecvandsendTimeout(1000, serveFn) //鐩墠10001杩斿洖鍊艰〃绀鸿秴鏃�
+ if n != 0 {
+ logFn("RecvandsendTimeout success")
+ } else {
+ //time.Sleep(100 * time.Millisecond)
}
}
}
@@ -82,11 +104,11 @@
//Register
func Register(ctx context.Context, q chan os.Signal, config *Config, ri *RegisterInfo) (*BHBus,error) {
- handle := &BHBus{
+ handle := &BHBus {
+ ctx: ctx,
conf: config,
m: make(map[string]*sockServer),
chSub: make(chan TransInfo, config.chSize),
- chReply: make(chan TransInfo, config.chSize),
}
var err error
@@ -148,30 +170,32 @@
}
var rMsg []bhomebus.Mesg
- n := regSock.Sendandrecv(regAddr, msg, &rMsg) //n浠h〃鎴愬姛鍙戦�佺殑鑺傜偣鐨勪釜鏁�
- handle.printLog("regSock.Sendandrecv n:", n, "len(rMsg):", len(rMsg))
+ n := regSock.SendandrecvTimeout(regAddr, msg, &rMsg, handle.conf.sendTimeOut) //n浠h〃鎴愬姛鍙戦�佺殑鑺傜偣鐨勪釜鏁�
+ handle.printLog("regSock.SendandrecvTimeout n:", n)
if n == 1 && len(rMsg) == 1 {
- var cr CommonReply
+ var cr Reply
if err = json.Unmarshal(rMsg[0].Data, &cr);err != nil {
handle.printLog("unmarshal regReply err:", err)
return nil, errors.New("unmarshal regReply err:"+err.Error())
} else {
- if cr.Status == REPLY_SUCCESS {
- var rr RegisterReply
- if err = json.Unmarshal(cr.Body, &rr);err ==nil {
- regR = &rr
- break loop
+ if cr.Success {
+ if rpd,err := json.Marshal(cr.Data);err ==nil {
+ var rr RegisterReply
+ if err = json.Unmarshal(rpd, &rr); err == nil {
+ regR = &rr
+ break loop
+ } else {
+ handle.printLog("unmarshal RegisterReply err:", err)
+ }
} else {
- handle.printLog("unmarshal RegisterReply err:", err)
+ handle.printLog("marshal cr.Data err:", err)
}
-
} else {
- handle.printLog("cr.Status:", cr.Status, "Desc:", cr.Desc)
+ handle.printLog("cr:", cr)
}
-
}
} else {
- time.Sleep(100 * time.Millisecond)
+ time.Sleep(1 * time.Second)
}
}
}
@@ -181,7 +205,7 @@
for _, v := range ri.Channel {
if k,ok := regR.ChannelKey[v];ok {
s := bhomebus.OpenSocket()
- s.Bind(int(k))
+ s.ForceBind(int(k))
handle.m[v] = &sockServer{
sock: s,
info: &ri.Proc,
@@ -189,24 +213,29 @@
}
}
- handle.wg = &sync.WaitGroup{}
-
- sockReply := bhomebus.OpenSocket()
- sockReply.Bind(int(regR.ReplyKey))
- handle.wg.Add(1)
- //serve server reply
- go recvRoutine(ctx, sockReply, handle.wg, handle.chReply)
- handle.sockRep = &sockServer{
- sock: sockReply,
- info: &ri.Proc,
- }
-
//缁存寔蹇冭烦鐨剆ocket
sockHB := bhomebus.OpenSocket()
+ handle.printLog("open sockHB")
handle.sockHB = &sockClient{
sock: sockHB,
peer: int(regR.HeartbeatKey),
}
+
+ handle.wg = &sync.WaitGroup{}
+
+ if ri.PubTopic != nil && len(ri.PubTopic) > 0 {
+ sockReply := bhomebus.OpenSocket()
+ sockReply.ForceBind(int(regR.ReplyKey))
+ handle.printLog("after pubTopic forceBind")
+ //handle.wg.Add(1)
+ //serve server reply
+ //go recvRoutine(ctx, sockReply, handle.wg, handle.chReply, handle.printLog)
+ handle.sockRep = &sockServer{
+ sock: sockReply,
+ info: &ri.Proc,
+ }
+ }
+
//鍙戝竷娑堟伅鐨剆ocket, pub鏄皢娑堟伅鍙戝竷鍒癰us涓紝鎵�浠ヤ笉闇�瑕佹寚瀹歬ey
sockPub := bhomebus.OpenSocket()
@@ -215,19 +244,24 @@
peer: -1,
}
- //璁㈤槄娑堟伅鐨剆ocket
- sockSub := bhomebus.OpenSocket()
- //璁㈤槄鎵�鏈変富棰�
- for _,v := range ri.SubTopic {
- sockSub.Sub(v)
- }
+ //鏈夎闃呮秷鎭墠闇�瑕佸惎鍔ㄥ崗绋嬫帴鏀舵秷鎭�
+ if ri.SubTopic != nil && len(ri.SubTopic) > 0 {
+ //璁㈤槄娑堟伅鐨剆ocket
+ sockSub := bhomebus.OpenSocket()
+ //璁㈤槄鎵�鏈変富棰�
+ handle.printLog("start Sub topics")
+ for _,v := range ri.SubTopic {
+ subN := sockSub.Sub(v)
+ handle.printLog("subTopic:", v, " ret n:", subN)
+ }
- //鍚姩璁㈤槄淇℃伅鎺ユ敹
- handle.wg.Add(1)
- go recvRoutine(ctx, sockSub, handle.wg, handle.chSub)
- handle.sockSub = &sockClient{
- sock: sockSub,
- peer: -1,
+ //鍚姩璁㈤槄淇℃伅鎺ユ敹
+ handle.wg.Add(1)
+ go recvRoutine(ctx, sockSub, handle.wg, handle.chSub, handle.printLog)
+ handle.sockSub = &sockClient{
+ sock: sockSub,
+ peer: -1,
+ }
}
sockWorker := bhomebus.OpenSocket()
@@ -255,8 +289,8 @@
return err
}
- h.mtxWorker.Lock()
- defer h.mtxWorker.Unlock()
+ //h.mtxWorker.Lock()
+ //defer h.mtxWorker.Unlock()
netNodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{
Key: h.conf.regKey,
})
@@ -277,7 +311,9 @@
//Release
func (h *BHBus) Free() {
+ h.printLog("call BHBus free")
h.wg.Wait()
+ h.printLog("h.wg.Wait done")
for _,v := range h.m {
v.sock.Close()
}
@@ -355,74 +391,112 @@
//鑾峰彇topic瀵瑰簲鐨刱ey
//濡傛灉浼犱簡serverId涓嶄负绌猴紝鍒欒幏鍙栨寚瀹氭満鍣ㄤ笂鐨則opic-key
//濡傛灉server涓虹┖锛屽垯鑾峰彇鎵�鏈夎妭鐐逛笂topic-key
-func (h *BHBus) GetNetNodeByTopic(serverId string, topic string) ([]bhomebus.NetNode,error) {
+func (h *BHBus) GetNetNodeByTopic(serverId string,srcProc *ProcInfo, topic string) ([]bhomebus.NetNode,error) {
h.mtxNode.Lock()
defer h.mtxNode.Unlock()
var nodes []bhomebus.NetNode
- if h.nodes != nil {
- for _,n := range h.nodes {
- if serverId != "" { //鑾峰彇鎸囧畾鑺傜偣鐨�
- if n.SvrInfo.ID == serverId {
- if k,ok := n.Topic2Key[topic];ok {
- nodes = append(nodes, bhomebus.NetNode{
- IPHost:n.SvrInfo.IP,
- Port:n.SvrInfo.Port,
- Key:k,
- })
- }
- }
- } else { //鑾峰彇鎵�鏈夎妭鐐圭殑
- if k,ok := n.Topic2Key[topic];ok {
- nodes = append(nodes, bhomebus.NetNode{
- IPHost:n.SvrInfo.IP,
- Port:n.SvrInfo.Port,
- Key:k,
- })
- }
- }
+ reqNetNode := append([]bhomebus.NetNode{}, bhomebus.NetNode{
+ Key: h.sockWorker.peer,
+ })
+ reqD,err := json.Marshal(MsgInfo{
+ SrcProc: *srcProc,
+ MsgType: MesgType_ReqRep,
+ Topic: TOPIC_QUERYKEY,
+ Body: []byte(topic),
+ })
+ if err != nil {
+ return nil, fmt.Errorf("marshal req err:%s", err.Error())
+ }
+ var ret []bhomebus.Mesg
+ n := h.sockWorker.sock.SendandrecvTimeout(reqNetNode, reqD, &ret, h.conf.sendTimeOut)
+ if n > 0 {
+ var reply Reply
+ err = json.Unmarshal(ret[0].Data, &reply)
+ if err != nil {
+ h.printLog("unmarshal err:", err)
+ return nil, err
}
+
+ if reply.Success {
+ rd,err := json.Marshal(reply.Data)
+ if err == nil {
+ err = json.Unmarshal(rd, &nodes)
+ if err == nil {
+ return nodes, nil
+ } else {
+ h.printLog("unmarshal err:", err, "nodes:", nodes, " center reply.Data:", reply.Data)
+ return nil, fmt.Errorf("unmarshal to nodes err:%s", err.Error())
+ }
+ } else {
+ return nil, fmt.Errorf("marshal reply.Data err:%s", err.Error())
+ }
+
+ } else {
+ h.printLog("reply success:", reply.Success, "msg:", reply.Msg, "data:", reply.Data)
+ return nil, fmt.Errorf("REPLY msg:%s", reply.Msg)
+ }
+ } else {
+ return nil, fmt.Errorf("GetNetNodeByTopic ret n:%d", n)
}
- if len(nodes) == 0 {
- return nil,fmt.Errorf("topic not found in nodes")
- }
- return nodes, nil
}
-func (h *BHBus) Request(serverId string, req *MsgInfo, milliSecs int) (resp *MsgInfo, err error) {
+func (h *BHBus) Request(serverId string, req *MsgInfo, milliSecs int) (*Reply, error) {
//1.棣栧厛闇�瑕侀�氳繃topic鎷垮埌鏈満瀵瑰簲鐨凬etNode
- rNodes, err := h.GetNetNodeByTopic(serverId, req.Topic)
+ rNodes, err := h.GetNetNodeByTopic(serverId, &req.SrcProc, req.Topic)
+ h.printLog("topic", req.Topic, "rNodes:", rNodes, "err:", err)
if err != nil {
return nil, err
}
//2.灏嗚姹傝繑閫佸埌瀵瑰簲鐨剆erver,骞剁瓑寰呰繑鍥炲��
data, err := json.Marshal(*req)
+ h.printLog("marshal(*req) err:", err)
if err != nil {
return nil, err
}
var ret []bhomebus.Mesg
- if n := h.sockWorker.sock.SendandrecvTimeout(rNodes, data,&ret, milliSecs);n == 0 {
- if len(ret) > 0 {
- if err = json.Unmarshal(ret[0].Data, resp); err == nil {
- return resp, nil
- }
+ n := h.sockWorker.sock.SendandrecvTimeout(rNodes, data,&ret, milliSecs)
+
+ if n > 0 && len(ret) > 0 {
+ var resp Reply
+ if err = json.Unmarshal(ret[0].Data, &resp); err == nil {
+ return &resp, nil
+ } else {
+ h.printLog("unmarshal ret[0].Data err:", err)
+ return nil, err
}
+ } else {
+ h.printLog("Request n: ", n, " len(ret): ", len(ret))
}
return nil, fmt.Errorf("request err")
}
-func (h *BHBus) Reply(replyKey int, i MsgInfo) error {
- data,err := json.Marshal(i)
- if err != nil {
- return err
- }
+func (h *BHBus) RequestOnly(rData []byte, rNodes []bhomebus.NetNode) ([]byte, error) {
+ var ret []bhomebus.Mesg
- n := h.sockRep.sock.SendtoTimeout(data, replyKey, h.conf.sendTimeOut)
- if n != 0 {
- return errors.New("reply sendToTimeOut n:"+strconv.Itoa(n))
+ n := h.sockWorker.sock.SendandrecvTimeout(rNodes, rData, &ret, h.conf.sendTimeOut)
+
+ if n > 0 && len(ret) > 0 {
+ return ret[0].Data, nil
+ } else {
+ h.printLog("Request n: ", n, " len(ret): ", len(ret))
}
- return nil
+ return nil, fmt.Errorf("request err")
}
+
+//func (h *BHBus) Reply(replyKey int, i *Reply) error {
+// data,err := json.Marshal(*i)
+// if err != nil {
+// return err
+// }
+//
+// n := h.sockRep.sock.SendtoTimeout(data, replyKey, h.conf.sendTimeOut)
+// h.printLog("reply to key:", replyKey, " n:",n)
+// if n != 0 {
+// return errors.New("reply sendToTimeOut n:"+strconv.Itoa(n))
+// }
+// return nil
+//}
//鍙彂閫佽姹傦紝涓嶉渶瑕佸簲绛�.
@@ -432,8 +506,8 @@
if err != nil {
return err
}
- h.mtxWorker.Lock()
- defer h.mtxWorker.Unlock()
+ //h.mtxWorker.Lock()
+ //defer h.mtxWorker.Unlock()
n := h.sockWorker.sock.SendtoTimeout(data, key, h.conf.sendTimeOut)
if n != 0 {
return fmt.Errorf("sendOnly ret n:%d", n)
@@ -441,7 +515,7 @@
return nil
}
-func (h *BHBus) RequestCenter(req *MsgInfo) (*CommonReply, error) {
+func (h *BHBus) RequestCenter(req *MsgInfo) (*Reply, error) {
data, err := json.Marshal(*req)
if err != nil {
return nil, err
@@ -449,15 +523,17 @@
rNodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{
Key: KEY_QUERY,
})
- h.mtxWorker.Lock()
- defer h.mtxWorker.Unlock()
+ //h.mtxWorker.Lock()
+ //defer h.mtxWorker.Unlock()
var ret []bhomebus.Mesg
- if n := h.sockWorker.sock.SendandrecvTimeout(rNodes, data,&ret, h.conf.sendTimeOut);n == 0 {
- if len(ret) > 0 {
- var cr *CommonReply
- if err = json.Unmarshal(ret[0].Data, cr); err == nil {
- return cr, nil
- }
+ n := h.sockWorker.sock.SendandrecvTimeout(rNodes, data,&ret, h.conf.sendTimeOut)
+ h.printLog("requestCenter n:", n, "len(ret):", len(ret))
+ if n > 0 && len(ret) > 0{
+ var cr Reply
+ if err = json.Unmarshal(ret[0].Data, &cr); err == nil {
+ return &cr, nil
+ } else {
+ h.printLog("unmarshal to CommonReply err:", err)
}
}
return nil, fmt.Errorf("request center err")
@@ -468,7 +544,7 @@
func (h *BHBus) Pub(nodes []bhomebus.NetNode, msg *MsgInfo) error {
data,err := json.Marshal(*msg)
if err == nil {
- if n := h.sockPub.sock.PubTimeout(nodes, msg.Topic, data, h.conf.pubTimeOut);n == 0 {
+ if n := h.sockPub.sock.PubTimeout(nodes, msg.Topic, data, h.conf.pubTimeOut);n > 0 {
return nil
} else {
return fmt.Errorf("pub err n:%d", n)
@@ -476,6 +552,14 @@
}
return err
+}
+
+func (h *BHBus) PubTimeout(nodes []bhomebus.NetNode, msg *MsgInfo, timeout int) int {
+ data,err := json.Marshal(*msg)
+ if err == nil {
+ return h.sockPub.sock.PubTimeout(nodes, msg.Topic, data, timeout)
+ }
+ return -1
}
//杩藉姞璁㈤槄鐨勪富棰樻秷鎭�
@@ -500,18 +584,13 @@
//鑾峰彇sub 鎴栬�呴渶瑕乺eply鐨勬秷鎭�
-func (h *BHBus) GetMsg() (subMsg *MsgInfo, replyMsg *MsgInfo, replyKey int) {
- if h.sockHB == nil && h.sockRep == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil {
- return nil,nil, -1
+func (h *BHBus) GetMsg() (subMsg *MsgInfo) {
+ if h.sockHB == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil {
+ return nil
}
- if len(h.chSub) >1 {
+ if len(h.chSub) >0 {
m := <-h.chSub
subMsg = m.info
- }
- if len(h.chReply) > 1 {
- m := <-h.chReply
- replyMsg = m.info
- replyKey = m.port
}
return
}
\ No newline at end of file
--
Gitblit v1.8.0