From 6f7957c42c409624ca7a05c54bd35752f996ba68 Mon Sep 17 00:00:00 2001 From: liuxiaolong <liuxiaolong@aiotlink.com> Date: 星期日, 07 二月 2021 15:24:10 +0800 Subject: [PATCH] rm recvandsend --- hbusc.go | 90 +++++++++++++++++++++++++++----------------- 1 files changed, 55 insertions(+), 35 deletions(-) diff --git a/hbusc.go b/hbusc.go index 640b55c..a4a6649 100644 --- a/hbusc.go +++ b/hbusc.go @@ -7,6 +7,7 @@ "errors" "fmt" "os" + "strconv" "sync" "time" ) @@ -50,6 +51,7 @@ //mtxWorker sync.Mutex //SendAndRecv鍙兘涓嶆槸绾跨▼瀹夊叏鐨� chSub chan TransInfo + chReply chan TransInfo } //鑾峰彇鍏朵粬杩涚▼鍙戠粰姝ocket鐨勬秷鎭�,鍙互鏄痵ub鐨勬帴鏀讹紝涔熷彲浠ユ槸reply鐨勬帴鏀躲�� @@ -84,23 +86,23 @@ } } -func recvandsendRoutine(ctx context.Context, s *bhomebus.Socket, wg *sync.WaitGroup,serveFn func(rdata []byte, rkey int, sdata *[]byte) bool, logFn func(...interface{})) { - for { - select { - case <-ctx.Done(): - logFn("recvandsendRoutine ctx.Done") - wg.Done() - return - default: - n := s.RecvandsendTimeout(1000, serveFn) //鐩墠10001杩斿洖鍊艰〃绀鸿秴鏃� - if n != 0 { - logFn("RecvandsendTimeout success") - } else { - //time.Sleep(100 * time.Millisecond) - } - } - } -} +//func recvandsendRoutine(ctx context.Context, s *bhomebus.Socket, wg *sync.WaitGroup,serveFn func(rdata []byte, rkey int, sdata *[]byte) bool, logFn func(...interface{})) { +// for { +// select { +// case <-ctx.Done(): +// logFn("recvandsendRoutine ctx.Done") +// wg.Done() +// return +// default: +// n := s.RecvandsendTimeout(1000, serveFn) //鐩墠10001杩斿洖鍊艰〃绀鸿秴鏃� +// if n != 0 { +// logFn("RecvandsendTimeout success") +// } else { +// //time.Sleep(100 * time.Millisecond) +// } +// } +// } +//} //Register func Register(ctx context.Context, q chan os.Signal, config *Config, ri *RegisterInfo) (*BHBus,error) { @@ -109,6 +111,7 @@ conf: config, m: make(map[string]*sockServer), chSub: make(chan TransInfo, config.chSize), + chReply: make(chan TransInfo, config.chSize), } var err error @@ -227,9 +230,9 @@ sockReply := bhomebus.OpenSocket() sockReply.ForceBind(int(regR.ReplyKey)) handle.printLog("after pubTopic forceBind") - //handle.wg.Add(1) + handle.wg.Add(1) //serve server reply - //go recvRoutine(ctx, sockReply, handle.wg, handle.chReply, handle.printLog) + go recvRoutine(ctx, sockReply, handle.wg, handle.chReply, handle.printLog) handle.sockRep = &sockServer{ sock: sockReply, info: &ri.Proc, @@ -484,19 +487,19 @@ return nil, fmt.Errorf("request err") } -//func (h *BHBus) Reply(replyKey int, i *Reply) error { -// data,err := json.Marshal(*i) -// if err != nil { -// return err -// } -// -// n := h.sockRep.sock.SendtoTimeout(data, replyKey, h.conf.sendTimeOut) -// h.printLog("reply to key:", replyKey, " n:",n) -// if n != 0 { -// return errors.New("reply sendToTimeOut n:"+strconv.Itoa(n)) -// } -// return nil -//} +func (h *BHBus) Reply(replyKey int, i *Reply) error { + data,err := json.Marshal(*i) + if err != nil { + return err + } + + n := h.sockRep.sock.SendtoTimeout(data, replyKey, h.conf.sendTimeOut) + h.printLog("reply to key:", replyKey, " n:",n) + if n != 0 { + return errors.New("reply sendToTimeOut n:"+strconv.Itoa(n)) + } + return nil +} //鍙彂閫佽姹傦紝涓嶉渶瑕佸簲绛�. @@ -584,13 +587,30 @@ //鑾峰彇sub 鎴栬�呴渶瑕乺eply鐨勬秷鎭� -func (h *BHBus) GetMsg() (subMsg *MsgInfo) { - if h.sockHB == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil { - return nil +//func (h *BHBus) GetMsg() (subMsg *MsgInfo) { +// if h.sockHB == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil { +// return nil +// } +// if len(h.chSub) >0 { +// m := <-h.chSub +// subMsg = m.info +// } +// return +//} + + +func (h *BHBus) GetMsg() (subMsg *MsgInfo, replyMsg *MsgInfo, replyKey int) { + if h.sockHB == nil && h.sockRep == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil { + return nil, nil, -1 } if len(h.chSub) >0 { m := <-h.chSub subMsg = m.info } + if len(h.chReply) >0 { + m := <-h.chReply + replyMsg = m.info + replyKey = m.port + } return } \ No newline at end of file -- Gitblit v1.8.0