zhangzengfei
2023-11-30 408f252ff3382ace333d96e85f49980a0e0b9b6f
hbusc.go
@@ -1,8 +1,8 @@
package bhomeclient
import (
   "basic.com/valib/bhshmq.git/api/bhsgo"
   "basic.com/valib/bhshmq.git/proto/source/bhome_msg"
   "basic.com/valib/c_bhomebus.git/api/bhsgo"
   "basic.com/valib/c_bhomebus.git/proto/source/bhome_msg"
   "context"
   "encoding/json"
   "errors"
@@ -11,6 +11,8 @@
   "sync"
   "time"
   "unsafe"
   "github.com/bytedance/sonic"
)
type MsgReq struct {
@@ -126,8 +128,6 @@
   handle.printLog("register done!" )
   handle.wg = &sync.WaitGroup{}
   //有订阅消息才需要启动协程接收消息
   if len(ri.SubTopic) > 0 {
      handle.printLog("sub topics")
@@ -169,13 +169,13 @@
   for {
      select {
      case <-ctx.Done():
         logFn("recvRoutine ctx.Done")
         logFn("recvSubRoutine ctx.Done")
         wg.Done()
         return
      default:
         if bhsgo.ReadSub(&procId, &msg, 100) {
            ch <- msg
            logFn("ReadSub topic:", string(msg.Topic), " data:", string(msg.Data))
            logFn("ReadSub topic:", string(msg.Topic), " len(data):", len(msg.Data))
            procId = ""
            msg.Reset()
@@ -190,6 +190,15 @@
func (h *BHBus) DeRegister(dri *RegisterInfo) error {
   h.printLog("DeRegister")
   req := bhome_msg.ProcInfo{
      ProcId: []byte(h.ri.Proc.ID),
      Name: []byte(h.ri.Proc.Name),
   }
   reply := bhome_msg.MsgCommonReply{}
   if !bhsgo.Unregister(&req, &reply, h.conf.sendTimeOut) {
      h.printLog("Unregister false! ")
      return errors.New("Unregister false! ")
   }
   return nil
}
@@ -259,6 +268,7 @@
   if bhsgo.Request(&dest, req, &pid, &mrt, milliSecs) {
      var reply Reply
      if err := json.Unmarshal(mrt.Data, &reply); err != nil {
         h.printLog("bhsgo.Request ret true, but unmarshal err:", err, " mrt.Data:", string(mrt.Data))
         return nil,err
      }
@@ -289,13 +299,18 @@
}
func (h *BHBus) Reply(src unsafe.Pointer, i *Reply) error {
   data,err := json.Marshal(*i)
   defer func() {
      i = nil
   }()
   data,err := sonic.Marshal(i)
   if err != nil {
      return err
   }
   rep := bhome_msg.MsgRequestTopicReply{
      Data: data,
   }
   if bhsgo.SendReply(src, &rep) {
      return nil
   }