From c7069befa28a0f2594f0746044318a30d6989c19 Mon Sep 17 00:00:00 2001
From: liuxiaolong <liuxiaolong@aiotlink.com>
Date: 星期日, 25 四月 2021 11:36:03 +0800
Subject: [PATCH] 使用bhsgo by lichao
---
/dev/null | 54 ---
message.go | 45 --
micronode.go | 175 ++--------
broker.go | 10
hbusc.go | 626 +++++++++-----------------------------
5 files changed, 215 insertions(+), 695 deletions(-)
diff --git a/broker.go b/broker.go
index cc78e6e..d1ba2c1 100644
--- a/broker.go
+++ b/broker.go
@@ -1,17 +1,17 @@
package bhomeclient
-import "basic.com/valib/bhomebus.git"
+import "basic.com/valib/bhshmq.git/proto/source/bhome_msg"
type Broker interface {
//鍙戝竷鍒版湰鏈�
- Publish(topic string, msg []byte) error
+ Publish(string, []byte) error
//鍙戝竷鍒拌繙绋嬫満鍣�
- PublishNet(nodes []bhomebus.NetNode, topic string, msg []byte) error
+ PublishNet([]bhome_msg.BHAddress, string, []byte) error
//璁㈤槄涓�浜涗富棰�,鍙姩鎬佹柊澧�
- Subscribe(topics []string)
+ Subscribe([]string)
//娉ㄩ攢璁㈤槄鐨勪富棰�
- DeSub(topics []string)
+ DeSub([]string)
}
diff --git a/hbusc.go b/hbusc.go
index 0735c04..6ccd790 100644
--- a/hbusc.go
+++ b/hbusc.go
@@ -1,63 +1,45 @@
package bhomeclient
import (
- "basic.com/valib/bhomebus.git"
+ "basic.com/valib/bhshmq.git/api/bhsgo"
+ "basic.com/valib/bhshmq.git/proto/source/bhome_msg"
"context"
"encoding/json"
"errors"
"fmt"
"os"
- "strconv"
"sync"
"time"
+ "unsafe"
)
-type sockServer struct {
- sock *bhomebus.Socket
- info *ProcInfo
-}
-
-type sockClient struct {
- sock *bhomebus.Socket
- peer int
-}
-
-type TransInfo struct {
- info *MsgInfo
- port int
+type MsgReq struct {
+ ProcId string
+ bhome_msg.MsgRequestTopic
+ Src unsafe.Pointer
}
type BHBus struct {
- ctx context.Context
+ ctx context.Context
- conf *Config
+ ri *RegisterInfo
- nodes []NodeInfo //闆嗙兢涓妭鐐圭姸鎬佷俊鎭紝浠ュ強姣忎釜鑺傜偣涓婄殑topic淇℃伅銆傞泦缇ょ姸鎬佷笅闇�瑕佸�熷姪serf杩涜鍚屾
- mtxNode sync.Mutex //璁块棶鑺傜偣涓婚琛ㄦ椂锛岄渶瑕佸姞閿�
+ conf *Config
- m map[string]*sockServer
+ nodes []NodeInfo //闆嗙兢涓妭鐐圭姸鎬佷俊鎭紝浠ュ強姣忎釜鑺傜偣涓婄殑topic淇℃伅銆傞泦缇ょ姸鎬佷笅闇�瑕佸�熷姪serf杩涜鍚屾
+ mtxNode sync.Mutex //璁块棶鑺傜偣涓婚琛ㄦ椂锛岄渶瑕佸姞閿�
- wg *sync.WaitGroup
+ wg *sync.WaitGroup
- sockRep *sockServer //鍝嶅簲鍏朵粬杩涚▼request鐨剆ocket锛宻erver
-
- sockHB *sockClient //缁存寔蹇冭烦鐨剆ocket锛岀嚎绋嬪疄鏃跺彂閫侊紝闇�瑕佸崟鐙鐞�
-
- sockPub *sockClient //鍙戝竷涓婚鐨剆ocket锛岄渶瑕佸崟鐙瑂ocket澶勭悊
-
- sockSub *sockClient //璁㈤槄涓婚鐨剆ocket锛岀嚎绋嬪疄鏃舵帴鏀舵秷鎭紝闇�瑕佸崟鐙鐞�
-
- sockWorker *sockClient //鍙戠粰浠绘剰鐨剆erver,鐭殏鐨剅equest client
- //mtxWorker sync.Mutex //SendAndRecv鍙兘涓嶆槸绾跨▼瀹夊叏鐨�
-
- chSub chan TransInfo
- chReply chan TransInfo
+ ChSub chan bhome_msg.MsgPublish
+ ChReply chan MsgReq
}
//鑾峰彇鍏朵粬杩涚▼鍙戠粰姝ocket鐨勬秷鎭�,鍙互鏄痵ub鐨勬帴鏀讹紝涔熷彲浠ユ槸reply鐨勬帴鏀躲��
-func recvRoutine(ctx context.Context, s *bhomebus.Socket, wg *sync.WaitGroup, ch chan<-TransInfo, logFn func(...interface{})) {
- var data []byte
- var key int
+func recvRequestRoutine(ctx context.Context, wg *sync.WaitGroup, ch chan<-MsgReq, logFn func(...interface{})) {
+ var procId string
+ var msg bhome_msg.MsgRequestTopic
+ var src unsafe.Pointer
for {
select {
case <-ctx.Done():
@@ -65,20 +47,16 @@
wg.Done()
return
default:
- n := s.RecvfromTimeout(&data, &key, 1000) //鐩墠10001杩斿洖鍊艰〃绀鸿秴鏃�
- if n == 0 {
- var info MsgInfo
- if err := json.Unmarshal(data, &info);err == nil {
- ch <- TransInfo{
- info: &info,
- port: key, //杩欎釜key鍦ㄥ彂甯冭闃呮ā寮忎腑鏄痓us鐨刱ey锛屾槸涓浐瀹氬�硷紝涓婂眰鐢ㄤ笉鍒�
- }
-
- data = []byte{}
- key = 0
- } else {
- logFn("unmarshal to MsgInfo err:", err)
+ if bhsgo.ReadRequest(&procId, &msg, &src, 100) {
+ ch <- MsgReq{
+ procId,
+ msg,
+ src,
}
+
+ procId = ""
+ msg.Reset()
+ src = unsafe.Pointer(nil)
} else {
time.Sleep(100 * time.Millisecond)
}
@@ -86,59 +64,22 @@
}
}
-//func recvandsendRoutine(h context.WrapperHandler, s *bhomebus.Socket, wg *sync.WaitGroup,serveFn func(rdata []byte, rkey int, sdata *[]byte) bool, logFn func(...interface{})) {
-// for {
-// select {
-// case <-ctx.Done():
-// logFn("recvandsendRoutine ctx.Done")
-// wg.Done()
-// return
-// default:
-// n := s.RecvandsendTimeout(1000, serveFn) //鐩墠10001杩斿洖鍊艰〃绀鸿秴鏃�
-// if n != 0 {
-// logFn("RecvandsendTimeout success")
-// } else {
-// //time.Sleep(100 * time.Millisecond)
-// }
-// }
-// }
-//}
-
//Register
func Register(ctx context.Context, q chan os.Signal, config *Config, ri *RegisterInfo) (*BHBus,error) {
handle := &BHBus {
- ctx: ctx,
- conf: config,
- m: make(map[string]*sockServer),
- chSub: make(chan TransInfo, config.chSize),
- chReply: make(chan TransInfo, config.chSize),
+ ctx: ctx,
+ conf: config,
+ ri: ri,
+ ChSub: make(chan bhome_msg.MsgPublish, config.chSize),
+ ChReply: make(chan MsgReq, config.chSize),
}
- var err error
- err = bhomebus.Init("libshm_queue.so")
- if err != nil {
- handle.printLog("Init so err:", err)
- return nil, err
- }
- err = bhomebus.ShmInit(512)
- if err != nil {
- handle.printLog("shmInit size err:", err)
- return nil, err
- }
- regSock := bhomebus.OpenSocket()
- if regSock == nil {
- handle.printLog("Open Socket ret Nil")
- return nil, errors.New("OpenSocket ret Nil")
- }
- defer func() {
- regSock.Close()
- handle.printLog("regSock.CLose")
- }()
-
- var msg []byte
- var regAddr []bhomebus.NetNode
- var regR *RegisterReply //娉ㄥ唽缁撴灉淇℃伅
//濡傛灉娉ㄥ唽澶辫触锛屽氨浼氫竴鐩村皾璇曟敞鍐�
+ procI := bhome_msg.ProcInfo{
+ ProcId: []byte(ri.Proc.ID),
+ Name: []byte(ri.Proc.Name),
+ }
+ var regReply bhome_msg.MsgCommonReply
loop:
for {
select {
@@ -146,165 +87,91 @@
handle.printLog("register <-q")
return nil,errors.New("ctx is done")
default:
- if msg == nil {
- rid, err := json.Marshal(*ri)
- if err != nil {
- handle.printLog("marshal registerInfo err:", err)
- return nil, errors.New("marshal registerInfo err:"+err.Error())
- }
- s := MsgInfo{
- SrcProc: ri.Proc,
- MsgType: MesgType_ReqRep,
- Topic: TOPIC_REGISTER,
- Body: rid,
- }
- handle.printLog("register MsgInfo:", s)
- dRegData,err := json.Marshal(s)
- if err != nil {
- handle.printLog("marshal deregister msg err:", err)
- return nil, err
- }
- handle.printLog(string(dRegData))
- msg = dRegData
- }
- if regAddr == nil {
- regAddr = append([]bhomebus.NetNode{}, bhomebus.NetNode{
- Key: handle.conf.regKey,
- })
- }
- var rMsg []bhomebus.Mesg
- n := regSock.SendandrecvTimeout(regAddr, msg, &rMsg, handle.conf.sendTimeOut) //n浠h〃鎴愬姛鍙戦�佺殑鑺傜偣鐨勪釜鏁�
- handle.printLog("regSock.SendandrecvTimeout n:", n)
- if n == 1 && len(rMsg) == 1 {
- var cr Reply
- if err = json.Unmarshal(rMsg[0].Data, &cr);err != nil {
- handle.printLog("unmarshal regReply err:", err)
- return nil, errors.New("unmarshal regReply err:"+err.Error())
- } else {
- if cr.Success {
- if rpd,err := json.Marshal(cr.Data);err ==nil {
- var rr RegisterReply
- if err = json.Unmarshal(rpd, &rr); err == nil {
- regR = &rr
- break loop
- } else {
- handle.printLog("unmarshal RegisterReply err:", err)
- }
- } else {
- handle.printLog("marshal cr.Data err:", err)
- }
- } else {
- handle.printLog("cr:", cr)
- }
- }
+ if bhsgo.Register(&procI, ®Reply, handle.conf.sendTimeOut) {
+ break loop
} else {
- time.Sleep(1 * time.Second)
+ time.Sleep(time.Second)
}
}
}
- handle.printLog("register Reply:", *regR)
-
- for _, v := range ri.Channel {
- if k,ok := regR.ChannelKey[v];ok {
- s := bhomebus.OpenSocket()
- s.ForceBind(int(k))
- handle.m[v] = &sockServer{
- sock: s,
- info: &ri.Proc,
+ if ri.PubTopic != nil && len(ri.PubTopic) > 0 {
+ topics := bhome_msg.MsgTopicList{}
+ var regTopicReply bhome_msg.MsgCommonReply
+ for _,t := range ri.PubTopic {
+ topics.TopicList = append(topics.TopicList, []byte(t))
+ }
+ loopRT:
+ for {
+ select {
+ case <-q:
+ handle.printLog("RegisterTopics recv quit signal")
+ return nil, errors.New("RegisterTopics recv quit signal")
+ default:
+ if bhsgo.RegisterTopics(&topics, ®TopicReply, handle.conf.sendTimeOut) {
+ handle.printLog("bhsgo.RegisterTopics success!!")
+ break loopRT
+ } else {
+ time.Sleep(time.Second)
+ }
}
}
+
+ handle.wg.Add(1)
+ go recvRequestRoutine(ctx, handle.wg, handle.ChReply, handle.printLog)
}
- //缁存寔蹇冭烦鐨剆ocket
- sockHB := bhomebus.OpenSocket()
- handle.printLog("open sockHB")
- handle.sockHB = &sockClient{
- sock: sockHB,
- peer: int(regR.HeartbeatKey),
- }
+ handle.printLog("register done!" )
handle.wg = &sync.WaitGroup{}
- if ri.PubTopic != nil && len(ri.PubTopic) > 0 {
- sockReply := bhomebus.OpenSocket()
- sockReply.ForceBind(int(regR.ReplyKey))
- handle.printLog("after pubTopic forceBind")
- handle.wg.Add(1)
- //serve server reply
- go recvRoutine(ctx, sockReply, handle.wg, handle.chReply, handle.printLog)
- handle.sockRep = &sockServer{
- sock: sockReply,
- info: &ri.Proc,
- }
- }
-
-
- //鍙戝竷娑堟伅鐨剆ocket, pub鏄皢娑堟伅鍙戝竷鍒癰us涓紝鎵�浠ヤ笉闇�瑕佹寚瀹歬ey
- sockPub := bhomebus.OpenSocket()
- handle.sockPub = &sockClient{
- sock: sockPub,
- peer: -1,
- }
-
//鏈夎闃呮秷鎭墠闇�瑕佸惎鍔ㄥ崗绋嬫帴鏀舵秷鎭�
if ri.SubTopic != nil && len(ri.SubTopic) > 0 {
- //璁㈤槄娑堟伅鐨剆ocket
- sockSub := bhomebus.OpenSocket()
- //璁㈤槄鎵�鏈変富棰�
- handle.printLog("start Sub topics")
+ handle.printLog("sub topics")
+ var subList bhome_msg.MsgTopicList
for _,v := range ri.SubTopic {
- subN := sockSub.Sub(v)
- handle.printLog("subTopic:", v, " ret n:", subN)
+ subList.TopicList = append(subList.TopicList, []byte(v))
}
- //鍚姩璁㈤槄淇℃伅鎺ユ敹
- handle.wg.Add(1)
- go recvRoutine(ctx, sockSub, handle.wg, handle.chSub, handle.printLog)
- handle.sockSub = &sockClient{
- sock: sockSub,
- peer: -1,
+ var subReply bhome_msg.MsgCommonReply
+ if bhsgo.Subscribe(&subList, &subReply, handle.conf.sendTimeOut) {
+ //鍚姩璁㈤槄淇℃伅鎺ユ敹
+ handle.wg.Add(1)
+ go recvSubRoutine(ctx, handle.wg, handle.ChSub, handle.printLog)
}
- }
-
- sockWorker := bhomebus.OpenSocket()
- handle.sockWorker = &sockClient{
- sock: sockWorker,
- peer: int(regR.QueryTopicKey),
}
return handle, nil
}
+func recvSubRoutine(ctx context.Context,wg *sync.WaitGroup, ch chan <-bhome_msg.MsgPublish, logFn func(...interface{})) {
+ var procId string
+ var msg bhome_msg.MsgPublish
+ for {
+ select {
+ case <-ctx.Done():
+ logFn("recvRoutine ctx.Done")
+ wg.Done()
+ return
+ default:
+ if bhsgo.ReadSub(&procId, &msg, 100) {
+ ch <- msg
+
+ procId = ""
+ msg.Reset()
+ } else {
+ //time.Sleep(100 * time.Millisecond)
+ }
+ }
+ }
+}
+
//DeRegister
func (h *BHBus) DeRegister(dri *RegisterInfo) error {
- data, err := json.Marshal(*dri)
- if err != nil {
- return err
- }
- dRegData,err := json.Marshal(MsgInfo{
- MsgType: "",
- Topic: TOPIC_DEREGISTER,
- Body: data,
- })
- if err != nil {
- return err
- }
-
- //h.mtxWorker.Lock()
- //defer h.mtxWorker.Unlock()
- netNodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{
- Key: h.conf.regKey,
- })
- var retMsg []bhomebus.Mesg
- n := h.sockWorker.sock.SendandrecvTimeout(netNodes, dRegData, &retMsg, h.conf.sendTimeOut)
- if n == 0 {
- return nil
- }
- h.printLog("DeRegister retMsg:", retMsg)
- return fmt.Errorf("DeRegister n:%d", n)
+ h.printLog("DeRegister")
+ return nil
}
func (h *BHBus) printLog(v ...interface{}) {
@@ -318,75 +185,27 @@
h.printLog("call BHBus free")
h.wg.Wait()
h.printLog("h.wg.Wait done")
- for _,v := range h.m {
- v.sock.Close()
- }
- if h.sockRep != nil {
- h.sockRep.sock.Close()
- h.sockRep = nil
- }
- if h.sockHB != nil {
- h.sockHB.sock.Close()
- h.sockHB = nil
- }
- if h.sockPub != nil {
- h.sockPub.sock.Close()
- h.sockPub = nil
- }
- if h.sockSub != nil {
- h.sockSub.sock.Close()
- h.sockSub = nil
- }
- if h.sockWorker != nil {
- h.sockWorker.sock.Close()
- h.sockWorker = nil
- }
-
- h.printLog("BHBus Freed")
}
//HeartBeat send
-func (h *BHBus) HeartBeat(info *HeartBeatInfo) error {
- data, err := json.Marshal(*info)
- if err == nil {
- hbd,err := json.Marshal(MsgInfo{
- SrcProc: info.Proc,
- MsgType: MesgType_ReqRep,
- Topic: TOPIC_HEARTBEAT,
- Body: data,
- })
- if err != nil {
- h.printLog("marshal heartbeat msgInfo err:", err)
- return err
- }
- var rMsg []bhomebus.Mesg
- hbAddr := append([]bhomebus.NetNode{}, bhomebus.NetNode{
- Key: h.sockHB.peer,
- })
- //h.printLog("start send heartbeat")
- n := h.sockHB.sock.SendandrecvTimeout(hbAddr, hbd, &rMsg, h.conf.sendTimeOut) //n浠h〃鎴愬姛鍙戦�佺殑鑺傜偣鐨勪釜鏁�
- //h.printLog("sockHB.Sendandrecv n:", n, "len(rMsg):", len(rMsg))
-
- if n > 0 {
- return nil
- } else {
- return fmt.Errorf("sockHB Sendandrecv ret n:%d", n)
- }
+func (h *BHBus) HeartBeat() error {
+ procI := bhome_msg.ProcInfo{
+ ProcId: []byte(h.ri.Proc.ID),
+ Name: []byte(h.ri.Proc.Name),
}
- return err
+ var ret bhome_msg.MsgCommonReply
+ if bhsgo.Heartbeat(&procI, &ret, h.conf.sendTimeOut) {
+ return nil
+ } else {
+ return errors.New("send heartBeat return false")
+ }
}
-//func (h *BHBus) send2(s *sockClient, data []byte, timeout int) error {
-// n := s.sock.SendtoTimeout(data, s.peer, timeout)
-// if n == 0 {
-// return nil
-// }
-// return errors.New("SendtoTimeout n:"+strconv.Itoa(n))
-//}
+
//鏇存柊涓婚鍒楄〃
-func (h *BHBus) UpdateNodeTopics(arr []NodeInfo) {
+func (h *BHBus) UpdateNodeTopics(arr []NodeInfo) {
h.mtxNode.Lock()
defer h.mtxNode.Unlock()
h.nodes = arr
@@ -395,185 +214,92 @@
//鑾峰彇topic瀵瑰簲鐨刱ey
//濡傛灉浼犱簡serverId涓嶄负绌猴紝鍒欒幏鍙栨寚瀹氭満鍣ㄤ笂鐨則opic-key
//濡傛灉server涓虹┖锛屽垯鑾峰彇鎵�鏈夎妭鐐逛笂topic-key
-func (h *BHBus) GetNetNodeByTopic(serverId string,srcProc *ProcInfo, topic string) ([]bhomebus.NetNode,error) {
- h.mtxNode.Lock()
- defer h.mtxNode.Unlock()
- var nodes []bhomebus.NetNode
- reqNetNode := append([]bhomebus.NetNode{}, bhomebus.NetNode{
- Key: h.sockWorker.peer,
- })
- reqD,err := json.Marshal(MsgInfo{
- SrcProc: *srcProc,
- MsgType: MesgType_ReqRep,
- Topic: TOPIC_QUERYKEY,
- Body: []byte(topic),
- })
- if err != nil {
- return nil, fmt.Errorf("marshal req err:%s", err.Error())
- }
- var ret []bhomebus.Mesg
- n := h.sockWorker.sock.SendandrecvTimeout(reqNetNode, reqD, &ret, h.conf.sendTimeOut)
- if n > 0 {
- var reply Reply
- err = json.Unmarshal(ret[0].Data, &reply)
- if err != nil {
- h.printLog("unmarshal err:", err)
- return nil, err
- }
+func (h *BHBus) GetNetNodeByTopic(serverId string,srcProc *ProcInfo, topic string) ([]bhome_msg.BHAddress,error) {
- if reply.Success {
- rd,err := json.Marshal(reply.Data)
- if err == nil {
- err = json.Unmarshal(rd, &nodes)
- if err == nil {
- return nodes, nil
- } else {
- h.printLog("unmarshal err:", err, "nodes:", nodes, " center reply.Data:", reply.Data)
- return nil, fmt.Errorf("unmarshal to nodes err:%s", err.Error())
- }
- } else {
- return nil, fmt.Errorf("marshal reply.Data err:%s", err.Error())
- }
-
- } else {
- h.printLog("reply success:", reply.Success, "msg:", reply.Msg, "data:", reply.Data)
- return nil, fmt.Errorf("REPLY msg:%s", reply.Msg)
- }
- } else {
- return nil, fmt.Errorf("GetNetNodeByTopic ret n:%d", n)
- }
+ return nil, nil
}
-func (h *BHBus) Request(serverId string, req *MsgInfo, milliSecs int) (*Reply, error) {
+func (h *BHBus) Request(serverId string, req *bhome_msg.MsgRequestTopic, milliSecs int) (*Reply, error) {
//1.棣栧厛闇�瑕侀�氳繃topic鎷垮埌鏈満瀵瑰簲鐨凬etNode
- rNodes, err := h.GetNetNodeByTopic(serverId, &req.SrcProc, req.Topic)
- if err != nil {
- h.printLog("topic: ", req.Topic, " rNodes: ", rNodes, " err:", err)
- return nil, err
- }
- if rNodes == nil || len(rNodes) == 0 {
- return nil, errors.New("rNodes empty, topic: "+ req.Topic)
- }
//2.灏嗚姹傝繑閫佸埌瀵瑰簲鐨剆erver,骞剁瓑寰呰繑鍥炲��
- data, err := json.Marshal(*req)
- if err != nil {
- h.printLog("marshal(*req) err:", err)
- return nil, err
- }
- var ret []bhomebus.Mesg
-
- n := h.sockWorker.sock.SendandrecvTimeout(rNodes, data,&ret, milliSecs)
-
- if n > 0 && len(ret) > 0 {
- var resp Reply
- if err = json.Unmarshal(ret[0].Data, &resp); err == nil {
- return &resp, nil
- } else {
- h.printLog("unmarshal ret[0].Data err:", err)
- return nil, err
+ pid := ""
+ mrt := bhome_msg.MsgRequestTopicReply{}
+ dest := bhome_msg.BHAddress{}
+ if bhsgo.Request(&dest, req, &pid, &mrt, milliSecs) {
+ var reply Reply
+ if err := json.Unmarshal(mrt.Data, &reply); err != nil {
+ return nil,err
}
+
+ return &reply, nil
} else {
- h.printLog("Request n: ", n, " len(ret): ", len(ret))
- return nil, fmt.Errorf("request err, SendandrecvTimeout n:%d", n)
+ return nil, errors.New("request ")
}
}
-func (h *BHBus) RequestOnly(rData []byte, rNodes []bhomebus.NetNode) ([]byte, error) {
- var ret []bhomebus.Mesg
-
- n := h.sockWorker.sock.SendandrecvTimeout(rNodes, rData, &ret, h.conf.sendTimeOut)
-
- if n > 0 && len(ret) > 0 {
- return ret[0].Data, nil
+func (h *BHBus) RequestOnly(req *bhome_msg.MsgRequestTopic, destArr []bhome_msg.BHAddress) ([]byte, error) {
+ dest := bhome_msg.BHAddress{}
+ if destArr != nil && len(destArr) > 0 {
+ dest = destArr[0]
+ }
+ pid := ""
+ r := bhome_msg.MsgRequestTopicReply{}
+ if bhsgo.Request(&dest, req, &pid, &r, h.conf.sendTimeOut) {
+ return r.Data, nil
} else {
- h.printLog("Request n: ", n, " len(ret): ", len(ret), "rData:", string(rData))
- return nil, fmt.Errorf("request err, SendandrecvTimeout n:%d", n)
+ h.printLog("bhsgo.Request request err:", r.Errmsg)
+ return nil, errors.New("bhsgo.Request return false")
}
}
-func (h *BHBus) Reply(replyKey int, i *Reply) error {
+func (h *BHBus) Reply(src unsafe.Pointer, i *Reply) error {
data,err := json.Marshal(*i)
if err != nil {
return err
}
-
- n := h.sockRep.sock.SendtoTimeout(data, replyKey, h.conf.sendTimeOut)
- h.printLog("reply to key:", replyKey, " n:",n)
- if n != 0 {
- return errors.New("reply sendToTimeOut n:"+strconv.Itoa(n))
+ rep := bhome_msg.MsgRequestTopicReply{
+ Data: data,
}
- return nil
+ if bhsgo.SendReply(src, &rep) {
+ return nil
+ }
+
+ return errors.New("reply return false")
}
+func (h *BHBus) RequestCenter(req *bhome_msg.MsgRequestTopic) (*Reply, error) {
-//鍙彂閫佽姹傦紝涓嶉渶瑕佸簲绛�.
-//鏆撮湶鍦ㄤ笂灞傜殑锛屽彧鏈塼opic锛屾病鏈塳ey銆�
-func (h *BHBus) SendOnly(key int, arg *MsgInfo) error {
- data,err := json.Marshal(*arg)
- if err != nil {
- return err
- }
- //h.mtxWorker.Lock()
- //defer h.mtxWorker.Unlock()
- n := h.sockWorker.sock.SendtoTimeout(data, key, h.conf.sendTimeOut)
- if n != 0 {
- return fmt.Errorf("sendOnly ret n:%d", n)
- }
- return nil
-}
-
-func (h *BHBus) RequestCenter(req *MsgInfo) (*Reply, error) {
- data, err := json.Marshal(*req)
- if err != nil {
- return nil, err
- }
- rNodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{
- Key: KEY_QUERY,
- })
- //h.mtxWorker.Lock()
- //defer h.mtxWorker.Unlock()
- var ret []bhomebus.Mesg
- n := h.sockWorker.sock.SendandrecvTimeout(rNodes, data,&ret, h.conf.sendTimeOut)
- h.printLog("requestCenter n:", n, "len(ret):", len(ret))
- if n > 0 && len(ret) > 0{
- var cr Reply
- if err = json.Unmarshal(ret[0].Data, &cr); err == nil {
- return &cr, nil
- } else {
- h.printLog("unmarshal to CommonReply err:", err)
- }
- }
- return nil, fmt.Errorf("request center err")
+ return nil, errors.New("")
}
//鍚戜富棰橀�氶亾涓彂甯冩秷鎭�
-func (h *BHBus) Pub(nodes []bhomebus.NetNode, msg *MsgInfo) error {
- data,err := json.Marshal(*msg)
- if err == nil {
- if n := h.sockPub.sock.PubTimeout(nodes, msg.Topic, data, h.conf.pubTimeOut);n > 0 {
- return nil
- } else {
- return fmt.Errorf("pub err n:%d", n)
- }
+func (h *BHBus) Pub(nodes []bhome_msg.BHAddress, msg *bhome_msg.MsgPublish) error {
+ if bhsgo.Publish(msg, h.conf.pubTimeOut) {
+ return nil
+ } else {
+ return fmt.Errorf("pub err ")
}
-
- return err
}
-func (h *BHBus) PubTimeout(nodes []bhomebus.NetNode, msg *MsgInfo, timeout int) int {
- data,err := json.Marshal(*msg)
- if err == nil {
- return h.sockPub.sock.PubTimeout(nodes, msg.Topic, data, timeout)
+func (h *BHBus) PubTimeout(nodes []bhome_msg.BHAddress, msg *bhome_msg.MsgPublish, timeout int) int {
+ if bhsgo.Publish(msg, timeout) {
+ return 1
}
return -1
}
//杩藉姞璁㈤槄鐨勪富棰樻秷鎭�
func (h *BHBus) Sub(topics []string) {
- if topics != nil {
- for _,t := range topics {
- h.sockSub.sock.Sub(t)
+ if topics != nil && len(topics) >0 {
+ var subList bhome_msg.MsgTopicList
+ for _, v := range topics {
+ subList.TopicList = append(subList.TopicList, []byte(v))
+ }
+
+ var subReply bhome_msg.MsgCommonReply
+ if bhsgo.Subscribe(&subList, &subReply, h.conf.sendTimeOut) {
+ h.printLog("sub topics")
}
}
}
@@ -581,40 +307,6 @@
//娉ㄩ攢璁㈤槄鐨勪富棰�
func (h *BHBus) DeSub(topics []string) {
if topics != nil {
- for _,t := range topics {
- if n := h.sockSub.sock.Desub(t); n != 0 {
- h.printLog("DeSub topic:", t, " n:", n)
- }
- }
- }
-}
-
-//鑾峰彇sub 鎴栬�呴渶瑕乺eply鐨勬秷鎭�
-//func (h *BHBus) GetMsg() (subMsg *MsgInfo) {
-// if h.sockHB == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil {
-// return nil
-// }
-// if len(h.chSub) >0 {
-// m := <-h.chSub
-// subMsg = m.info
-// }
-// return
-//}
-
-
-func (h *BHBus) GetMsg() (subMsg *MsgInfo, replyMsg *MsgInfo, replyKey int) {
- if h.sockHB == nil && h.sockRep == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil {
- return nil, nil, -1
}
- if len(h.chSub) >0 {
- m := <-h.chSub
- subMsg = m.info
- }
- if len(h.chReply) >0 {
- m := <-h.chReply
- replyMsg = m.info
- replyKey = m.port
- }
- return
}
\ No newline at end of file
diff --git a/hbusc.proto b/hbusc.proto
deleted file mode 100644
index 0feb99a..0000000
--- a/hbusc.proto
+++ /dev/null
@@ -1,54 +0,0 @@
-syntax = "proto3";
-
-package hbusc;
-
-message ProcInfo {
- string procID = 1;
- string name = 2;
- string label = 3;
-}
-
-message RegisterInfo {
- ProcInfo procInfo = 1;
- repeated string channel = 2;
- repeated string pubTopic = 3;
- repeated string subTopic = 4;
-}
-
-message RegisterInfoReply {
- ProcInfo procInfo = 1; //杩涚▼淇℃伅
- map<string, int32> channelKey = 2; //棰勭暀
- int32 heartbeatKey = 3; //蹇冭烦淇℃伅鍙戦�佸埌鐨勭洰鐨刱ey
- int32 updateTopicKey = 4; //鏇存柊鎰熷叴瓒d富棰樼殑key锛屽湪鍏跺畠杩涚▼涓缁戝畾
- int32 replyKey = 5; //鐢宠鍒扮殑鏈繘绋嬩綔涓簊erver鐢╧ey
- int32 queryTopicKey = 6;
-}
-
-message TopicInfo {
- string topic = 1;
- string topicType = 2;
-}
-
-message TopicInfoReply {
- TopicInfo info = 1;
- int32 key = 2;
-}
-
-message HeartBeatInfo {
- string healthLevel = 1; // 鍋ュ悍绛夌骇
- int32 fps = 2; // 澶勭悊甯х巼(dec瑙g爜甯х巼銆乻dk澶勭悊甯х巼)
- string warnInfo = 3; // 鎶ヨ淇℃伅
- string errorInfo = 4; // 閿欒淇℃伅
- bytes otherInfo = 5; // 鍏朵粬鐗规湁淇℃伅锛屽鏈夐渶瑕佸氨鐢ㄨ繖涓�
- int32 otherInfoSize = 6; // 鍏朵粬鐗规湁淇℃伅闀垮害
- ProcInfo procInfo = 7; //杩涚▼淇℃伅
-}
-
-message MsgInfo {
- ProcInfo srcProc = 1; // 婧愯繘绋嬪熀鏈俊鎭�
- string msgType = 2; // 鏁版嵁绫诲瀷锛屽彲涓鸿姹傘�佸彂甯冦�佽闃呫�佸簲绛旂瓑
- string topic= 3; // 鏈嶅姟涓婚
- int32 shmKey = 4; // 璇锋眰搴旂瓟鏁版嵁浣跨敤鐨刱ey锛屽叾浠栨暟鎹笉鐢紝寰呯‘璁�
- bytes body = 5; // 鏁版嵁鍐呭锛屼簩杩涘埗缂栫爜鍚庣殑锛岄渶瑕佺‘瀹氱紪鐮佺被鍨嬪苟瑙g爜
- int32 bodyLen = 6; // 鏁版嵁闀垮害
-}
\ No newline at end of file
diff --git a/message.go b/message.go
index 9ba85b9..9ce78da 100644
--- a/message.go
+++ b/message.go
@@ -35,8 +35,8 @@
)
type NodeList struct {
- Ip string `json:"ip"`
- Port int `json:"port"`
+ Ip string `json:"ip"`
+ Port int `json:"port"`
}
const (
@@ -60,43 +60,10 @@
}
type RegisterInfo struct {
- Proc ProcInfo `json:"proc"` // 杩涚▼鐨勪俊鎭�
- Channel []string `json:"channel"` // 鏂板棰戦亾锛屽搴斾竴涓柊鐨勫叡浜唴瀛橀槦鍒�
- PubTopic []string `json:"pubTopic"` // 杩涚▼瀵瑰鍙戝竷鐨勬湇鍔′富棰�
- SubTopic []string `json:"subTopic"` // 杩涚▼璁㈤槄鐨勬湇鍔′富棰�
-}
-
-
-type RegisterReply struct {
- TCPProxyIP string `json:"tcpProxyIP"` // BHomeCenter鍚姩鐨則cp浠g悊鏈嶅姟鍣↖P
- TCPProxyPort int `json:"tcpProxyPort"` // BHomeCenter鍚姩鐨則cp浠g悊鏈嶅姟鍣ㄧ鍙�
- HeartbeatKey int `json:"heartbeatKey"` // client鍙戦�佸績璺崇殑key
- ReplyKey int `json:"replyKey"` // client鐨勫簲绛旀湇鍔ey
- ChannelKey map[string]int `json:"channelKey"` // client鐨刢han瀵瑰簲鐨刱ey
- QueryTopicKey int `json:"queryTopicKey"` // client鏌ヨtopic瀵瑰簲鐨刱ey鏃剁敤鍒扮殑key
- Status int `json:"status"` // 璇锋眰鐘舵��,鐩墠鍙湁涓や釜,鎴愬姛杩斿洖200,澶辫触202
-}
-
-type HeartBeatInfo struct {
- Proc ProcInfo `json:"proc"` // 杩涚▼鐨勪俊鎭�
- HealthLevel string `json:"healthLevel"` // 鍋ュ悍绛夌骇
- Fps int `json:"fps"` // 澶勭悊甯х巼(dec瑙g爜甯х巼銆乻dk澶勭悊甯х巼)
- WarnInfo string `json:"warnInfo"` // 鎶ヨ淇℃伅
- ErrorInfo string `json:"errorInfo"` // 閿欒淇℃伅
- OtherInfo []byte `json:"otherInfo"` // 鍏朵粬鐗规湁淇℃伅锛屽鏈夐渶瑕佸氨鐢ㄨ繖涓�
- OtherInfoSize int `json:"otherInfoSize"` // 鍏朵粬鐗规湁淇℃伅闀垮害
-}
-
-type HeartBeatReply struct {
- Status int `json:"status"` // 璇锋眰鐘舵��,鐩墠鍙湁涓や釜,鎴愬姛杩斿洖200,澶辫触202
- Desc string `json:"desc"` // 璇锋眰鐘舵�佺殑鎻忚堪,鎴愬姛"success",澶辫触杩斿洖澶辫触鍘熷洜,濡傚績璺虫湇鍔℃湭鍚姩
-}
-
-type MsgInfo struct {
- SrcProc ProcInfo `json:"srcProc"` // 婧愯繘绋嬪熀鏈俊鎭�
- MsgType string `json:"msgType"` // 鏁版嵁绫诲瀷锛屽彲涓鸿姹傘�佸彂甯冦�佽闃呫�佸簲绛旂瓑
- Topic string `json:"topic"` // 璇锋眰鐨勫嚱鏁�,骞朵笉瀵瑰簲浠讳綍鐨剆hmKey,涓氬姟灞傜殑topic
- Body []byte `json:"body"` // 璇锋眰鍐呭
+ Proc ProcInfo `json:"proc"` // 杩涚▼鐨勪俊鎭�
+ Channel []string `json:"channel"` // 鏂板棰戦亾锛屽搴斾竴涓柊鐨勫叡浜唴瀛橀槦鍒�
+ PubTopic []string `json:"pubTopic"` // 杩涚▼瀵瑰鍙戝竷鐨勬湇鍔′富棰�
+ SubTopic []string `json:"subTopic"` // 杩涚▼璁㈤槄鐨勬湇鍔′富棰�
}
diff --git a/micronode.go b/micronode.go
index 89e4088..a826412 100644
--- a/micronode.go
+++ b/micronode.go
@@ -1,7 +1,7 @@
package bhomeclient
import (
- "basic.com/valib/bhomebus.git"
+ "basic.com/valib/bhshmq.git/proto/source/bhome_msg"
"context"
"encoding/json"
"errors"
@@ -20,7 +20,7 @@
serverId string
fnLog func(...interface{})
- SubCh chan *MsgInfo
+ SubCh chan *bhome_msg.MsgPublish
mtx sync.Mutex
started bool
@@ -39,7 +39,7 @@
reg: reg,
procInfo: ®.Proc,
fnLog: fnLog,
- SubCh: make(chan *MsgInfo, 512),
+ SubCh: make(chan *bhome_msg.MsgPublish, 512),
}
return mn, nil
@@ -65,14 +65,6 @@
}
func (ms *MicroNode) startHeartbeat() {
- hbi := &HeartBeatInfo{
- HealthLevel: "health",
- Fps: 12,
- WarnInfo: "warn",
- ErrorInfo: "error",
- Proc: *ms.procInfo,
- }
-
t := time.NewTicker(1 * time.Second)
defer t.Stop()
@@ -81,7 +73,9 @@
case <-ms.ctx.Done():
return
case <-t.C:
- ms.handle.HeartBeat(hbi)
+ ms.handle.HeartBeat()
+ default:
+ time.Sleep(500 * time.Millisecond)
}
}
}
@@ -110,52 +104,18 @@
select {
case <- ms.ctx.Done():
return
+ case msgR := <-ms.handle.ChReply: //鏀跺埌鍏跺畠杩涚▼鐨勮姹傛秷鎭�
+ go ms.serve(ms.handle.ctx, &msgR)
+ case msgS := <-ms.handle.ChSub:
+ ms.printLog("Recv Sub Message:", string(msgS.Data))
+ ms.SubCh <- &msgS
default:
- msgS, msgR, keyR := ms.handle.GetMsg()
- if msgS != nil {
- //鏀跺埌鍏跺畠杩涚▼鐨勫彂甯冩秷鎭�
- ms.printLog("Recv Sub Message:", string(msgS.Body))
- ms.SubCh <- msgS
- }
- if msgR != nil {
- //鏀跺埌鍏跺畠杩涚▼鐨勮姹傛秷鎭�
- go ms.serve(msgR, keyR)
- }
-
time.Sleep(50 * time.Millisecond)
}
}
-
- //鎺ユ敹璁㈤槄鍒扮殑娑堟伅
- //go ms.startRecvSubMsg()
- //浣滀负server鍚姩
- //ms.serve()
}
ms.mtx.Unlock()
}
-
-//寮�濮嬫帴鏀惰闃呮秷鎭�
-//func (ms *MicroNode) startRecvSubMsg() {
-// for {
-// select {
-// case <- ms.ctx.Done():
-// return
-// default:
-// msgS, msgR, keyR := ms.handle.GetMsg()
-// if msgS != nil {
-// //鏀跺埌鍏跺畠杩涚▼鐨勫彂甯冩秷鎭�
-// ms.printLog("Recv Sub Message:", string(msgS.Body))
-// ms.SubCh <- msgS
-// }
-// if msgR != nil {
-// //鏀跺埌鍏跺畠杩涚▼鐨勮姹傛秷鎭�
-// go ms.serve(msgR, keyR)
-// }
-//
-// time.Sleep(50 * time.Millisecond)
-// }
-// }
-//}
func (ms *MicroNode) Request(serverId string, request Request, milliSecs int) (*Reply,error) {
t := time.Now()
@@ -163,9 +123,9 @@
ms.printLog("1:", time.Since(t))
t = time.Now()
rb, _ := json.Marshal(request)
- msgR := &MsgInfo {
- Topic: request.Path,
- Body: rb,
+ msgR := &bhome_msg.MsgRequestTopic{
+ Topic: []byte(request.Path),
+ Data: rb,
}
ms.printLog("2:", time.Since(t))
return ms.handle.Request(serverId, msgR, milliSecs)
@@ -173,20 +133,20 @@
func (ms *MicroNode) RequestTopic(serverId string, request Request, milliSecs int) (*Reply,error) {
rb, _ := json.Marshal(request)
- msgR := &MsgInfo{
- Topic: request.Path,
- Body: rb,
+ msgR := &bhome_msg.MsgRequestTopic{
+ Topic: []byte(request.Path),
+ Data: rb,
}
return ms.handle.Request(serverId, msgR, milliSecs)
}
-func (ms *MicroNode) RequestOnly(rData []byte, nodes []bhomebus.NetNode) ([]byte, error) {
- return ms.handle.RequestOnly(rData, nodes)
+func (ms *MicroNode) RequestOnly(req *bhome_msg.MsgRequestTopic, dest []bhome_msg.BHAddress) ([]byte, error) {
+ return ms.handle.RequestOnly(req, dest)
}
//鑾峰彇鏈満涓煇涓�涓富棰樼殑 key 锛堢粨鏋滃彧鏈変竴涓厓绱狅級
-func (ms *MicroNode) GetLocalNetNodeByTopic(topicName string) []bhomebus.NetNode {
+func (ms *MicroNode) GetLocalNetNodeByTopic(topicName string) []bhome_msg.BHAddress {
netNodes, err := ms.handle.GetNetNodeByTopic(ms.serverId, ms.procInfo, topicName)
if err != nil {
ms.printLog("topic:",topicName, " netNodes:", netNodes, "err:", err)
@@ -196,7 +156,7 @@
}
//鑾峰彇闆嗙兢涓墍鏈夎妭鐐规煇涓富棰樼殑key淇℃伅锛� 锛堢粨鏋滃彲鑳芥湁澶氫釜锛�
-func (ms *MicroNode) GetAllNetNodesByTopic(topicName string) []bhomebus.NetNode {
+func (ms *MicroNode) GetAllNetNodesByTopic(topicName string) []bhome_msg.BHAddress {
netNodes, err := ms.handle.GetNetNodeByTopic("", ms.procInfo, topicName)
if err != nil {
return nil
@@ -205,11 +165,7 @@
}
func (ms *MicroNode) GetRegisteredClient() ([]RegisteredClient,error) {
- r := MsgInfo{
- SrcProc: *ms.procInfo,
- MsgType: MesgType_ReqRep,
- Topic: TOPIC_QUERYPROC,
- }
+ r := bhome_msg.MsgRequestTopic{}
cr, err := ms.handle.RequestCenter(&r)
if err != nil {
ms.printLog("requestCenter reply:", cr, "err:", err)
@@ -234,76 +190,37 @@
return nil, fmt.Errorf("GetRegisteredClient list failed")
}
-//func (ms *MicroNode) call(rdata []byte, rkey int, sdata *[]byte) bool {
-// ri := &Reply{}
-// if ms.handlers == nil {
-// ri.Msg = "send wrong addr, check yourself!!!"
-// } else {
-// var msgR MsgInfo
-// err := json.Unmarshal(rdata, &msgR)
-// if err != nil {
-// ri.Msg = err.Error()
-// } else {
-// var reqBody Request
-// err = json.Unmarshal(rdata, &msgR.Body)
-// if err != nil {
-// ri.Msg = err.Error()
-// } else {
-// ms.printLog("srcProc:", reqBody.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", rkey)
-// if f,ok := ms.handlers[reqBody.Path];ok {
-// reqBody.SrcProc = msgR.SrcProc
-// ri = f(&reqBody)
-// ms.printLog("call funcMap f,reply:", *ri)
-// } else {
-// ms.printLog("ms.funcMap not eixst path: ", reqBody.Path)
-// ri.Msg = "璇锋眰鐨勬帴鍙d笉瀛樺湪锛岃妫�鏌rl"
-// }
-// }
-// }
-// }
-// result, err := json.Marshal(*ri)
-// if err != nil {
-// sdata = nil
-// } else {
-// sdata = &result
-// }
-// return ri.Success
-//}
-
-//func (ms *MicroNode) serve() {
-// if ms.handlers == nil {
-// return
-// }
-// for i:=0;i<10;i++ {
-// ms.handle.wg.Add(1)
-// go recvandsendRoutine(ms.ctx, ms.handle.sockRep.sock, ms.handle.wg, ms.call, ms.fnLog)
-// }
-//}
-
-func (ms *MicroNode) serve(msgR *MsgInfo, p int) {
+func (ms *MicroNode) serve(ctx context.Context, msgR *MsgReq) {
if ms.handlers == nil {
return
}
var reqBody Request
var ri *Reply
- err := json.Unmarshal(msgR.Body, &reqBody)
+ err := json.Unmarshal(msgR.Data, &reqBody)
if err != nil {
ms.printLog("serve unmarshal msgR.Body err:", err)
ri = &Reply {
Msg: err.Error(),
}
} else {
- ms.printLog("srcProc:", msgR.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", p)
+ ms.printLog("reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap)
if f,ok := ms.handlers[reqBody.Path];ok {
- reqBody.SrcProc = msgR.SrcProc
+ reqBody.SrcProc = ProcInfo{
+ ID: msgR.ProcId,
+ }
h := WrapperHandler{
ms,
ms,
}
- ri = f(&h, &reqBody)
- ms.printLog("call funcMap f,reply.Success:", ri.Success)
+ select {
+ case <-ctx.Done():
+ ms.printLog("get ctx.Done before f(&h, &reqBody) return, exit!!!")
+ default:
+ ri = f(&h, &reqBody)
+ ms.printLog("call funcMap f,reply.Success:", ri.Success)
+ }
} else {
ms.printLog("ms.funcMap not eixst path: ", reqBody.Path)
ri = &Reply{
@@ -314,7 +231,7 @@
}
}
- retErr := ms.handle.Reply(p, ri)
+ retErr := ms.handle.Reply(msgR.Src, ri)
if retErr != nil {
ms.printLog("retErr:", retErr)
}
@@ -322,24 +239,22 @@
//鍙戝竷鍒版湰鏈�
func (ms *MicroNode) Publish(topic string,msg []byte) error {
- nodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{
- Key: 8,
- })
+ var nodes []bhome_msg.BHAddress
return ms.PublishNet(nodes, topic, msg)
}
-func (ms *MicroNode) PublishNet(nodes []bhomebus.NetNode, topic string,msg []byte) error {
- pi := &MsgInfo{
- Topic: topic,
- Body: msg,
+func (ms *MicroNode) PublishNet(nodes []bhome_msg.BHAddress, topic string,data []byte) error {
+ pi := &bhome_msg.MsgPublish{
+ Topic: []byte(topic),
+ Data: data,
}
return ms.handle.Pub(nodes, pi)
}
-func (ms *MicroNode) PublishNetTimeout(nodes []bhomebus.NetNode, topic string, msg []byte, timeout int) int {
- pi := &MsgInfo{
- Topic: topic,
- Body: msg,
+func (ms *MicroNode) PublishNetTimeout(nodes []bhome_msg.BHAddress, topic string, data []byte, timeout int) int {
+ pi := &bhome_msg.MsgPublish{
+ Topic: []byte(topic),
+ Data: data,
}
return ms.handle.PubTimeout(nodes, pi, timeout)
}
--
Gitblit v1.8.0