From 81afb6ffbf7f76f49644a1832dcfe241552d7e08 Mon Sep 17 00:00:00 2001 From: liuxiaolong <liuxiaolong@aiotlink.com> Date: 星期五, 05 二月 2021 18:20:09 +0800 Subject: [PATCH] add recvandsend --- hbusc.go | 65 +++++++++++++++++++------------- 1 files changed, 38 insertions(+), 27 deletions(-) diff --git a/hbusc.go b/hbusc.go index d0f9222..640b55c 100644 --- a/hbusc.go +++ b/hbusc.go @@ -7,7 +7,6 @@ "errors" "fmt" "os" - "strconv" "sync" "time" ) @@ -51,7 +50,6 @@ //mtxWorker sync.Mutex //SendAndRecv鍙兘涓嶆槸绾跨▼瀹夊叏鐨� chSub chan TransInfo - chReply chan TransInfo } //鑾峰彇鍏朵粬杩涚▼鍙戠粰姝ocket鐨勬秷鎭�,鍙互鏄痵ub鐨勬帴鏀讹紝涔熷彲浠ユ槸reply鐨勬帴鏀躲�� @@ -86,13 +84,31 @@ } } +func recvandsendRoutine(ctx context.Context, s *bhomebus.Socket, wg *sync.WaitGroup,serveFn func(rdata []byte, rkey int, sdata *[]byte) bool, logFn func(...interface{})) { + for { + select { + case <-ctx.Done(): + logFn("recvandsendRoutine ctx.Done") + wg.Done() + return + default: + n := s.RecvandsendTimeout(1000, serveFn) //鐩墠10001杩斿洖鍊艰〃绀鸿秴鏃� + if n != 0 { + logFn("RecvandsendTimeout success") + } else { + //time.Sleep(100 * time.Millisecond) + } + } + } +} + //Register func Register(ctx context.Context, q chan os.Signal, config *Config, ri *RegisterInfo) (*BHBus,error) { - handle := &BHBus{ + handle := &BHBus { + ctx: ctx, conf: config, m: make(map[string]*sockServer), chSub: make(chan TransInfo, config.chSize), - chReply: make(chan TransInfo, config.chSize), } var err error @@ -211,9 +227,9 @@ sockReply := bhomebus.OpenSocket() sockReply.ForceBind(int(regR.ReplyKey)) handle.printLog("after pubTopic forceBind") - handle.wg.Add(1) + //handle.wg.Add(1) //serve server reply - go recvRoutine(ctx, sockReply, handle.wg, handle.chReply, handle.printLog) + //go recvRoutine(ctx, sockReply, handle.wg, handle.chReply, handle.printLog) handle.sockRep = &sockServer{ sock: sockReply, info: &ri.Proc, @@ -468,19 +484,19 @@ return nil, fmt.Errorf("request err") } -func (h *BHBus) Reply(replyKey int, i *Reply) error { - data,err := json.Marshal(*i) - if err != nil { - return err - } - - n := h.sockRep.sock.SendtoTimeout(data, replyKey, h.conf.sendTimeOut) - h.printLog("reply to key:", replyKey, " n:",n) - if n != 0 { - return errors.New("reply sendToTimeOut n:"+strconv.Itoa(n)) - } - return nil -} +//func (h *BHBus) Reply(replyKey int, i *Reply) error { +// data,err := json.Marshal(*i) +// if err != nil { +// return err +// } +// +// n := h.sockRep.sock.SendtoTimeout(data, replyKey, h.conf.sendTimeOut) +// h.printLog("reply to key:", replyKey, " n:",n) +// if n != 0 { +// return errors.New("reply sendToTimeOut n:"+strconv.Itoa(n)) +// } +// return nil +//} //鍙彂閫佽姹傦紝涓嶉渶瑕佸簲绛�. @@ -568,18 +584,13 @@ //鑾峰彇sub 鎴栬�呴渶瑕乺eply鐨勬秷鎭� -func (h *BHBus) GetMsg() (subMsg *MsgInfo, replyMsg *MsgInfo, replyKey int) { - if h.sockHB == nil && h.sockRep == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil { - return nil,nil, -1 +func (h *BHBus) GetMsg() (subMsg *MsgInfo) { + if h.sockHB == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil { + return nil } if len(h.chSub) >0 { m := <-h.chSub subMsg = m.info - } - if len(h.chReply) >0 { - m := <-h.chReply - replyMsg = m.info - replyKey = m.port } return } \ No newline at end of file -- Gitblit v1.8.0