From c7069befa28a0f2594f0746044318a30d6989c19 Mon Sep 17 00:00:00 2001 From: liuxiaolong <liuxiaolong@aiotlink.com> Date: 星期日, 25 四月 2021 11:36:03 +0800 Subject: [PATCH] 使用bhsgo by lichao --- /dev/null | 54 --- message.go | 45 -- micronode.go | 175 ++-------- broker.go | 10 hbusc.go | 626 +++++++++----------------------------- 5 files changed, 215 insertions(+), 695 deletions(-) diff --git a/broker.go b/broker.go index cc78e6e..d1ba2c1 100644 --- a/broker.go +++ b/broker.go @@ -1,17 +1,17 @@ package bhomeclient -import "basic.com/valib/bhomebus.git" +import "basic.com/valib/bhshmq.git/proto/source/bhome_msg" type Broker interface { //鍙戝竷鍒版湰鏈� - Publish(topic string, msg []byte) error + Publish(string, []byte) error //鍙戝竷鍒拌繙绋嬫満鍣� - PublishNet(nodes []bhomebus.NetNode, topic string, msg []byte) error + PublishNet([]bhome_msg.BHAddress, string, []byte) error //璁㈤槄涓�浜涗富棰�,鍙姩鎬佹柊澧� - Subscribe(topics []string) + Subscribe([]string) //娉ㄩ攢璁㈤槄鐨勪富棰� - DeSub(topics []string) + DeSub([]string) } diff --git a/hbusc.go b/hbusc.go index 0735c04..6ccd790 100644 --- a/hbusc.go +++ b/hbusc.go @@ -1,63 +1,45 @@ package bhomeclient import ( - "basic.com/valib/bhomebus.git" + "basic.com/valib/bhshmq.git/api/bhsgo" + "basic.com/valib/bhshmq.git/proto/source/bhome_msg" "context" "encoding/json" "errors" "fmt" "os" - "strconv" "sync" "time" + "unsafe" ) -type sockServer struct { - sock *bhomebus.Socket - info *ProcInfo -} - -type sockClient struct { - sock *bhomebus.Socket - peer int -} - -type TransInfo struct { - info *MsgInfo - port int +type MsgReq struct { + ProcId string + bhome_msg.MsgRequestTopic + Src unsafe.Pointer } type BHBus struct { - ctx context.Context + ctx context.Context - conf *Config + ri *RegisterInfo - nodes []NodeInfo //闆嗙兢涓妭鐐圭姸鎬佷俊鎭紝浠ュ強姣忎釜鑺傜偣涓婄殑topic淇℃伅銆傞泦缇ょ姸鎬佷笅闇�瑕佸�熷姪serf杩涜鍚屾 - mtxNode sync.Mutex //璁块棶鑺傜偣涓婚琛ㄦ椂锛岄渶瑕佸姞閿� + conf *Config - m map[string]*sockServer + nodes []NodeInfo //闆嗙兢涓妭鐐圭姸鎬佷俊鎭紝浠ュ強姣忎釜鑺傜偣涓婄殑topic淇℃伅銆傞泦缇ょ姸鎬佷笅闇�瑕佸�熷姪serf杩涜鍚屾 + mtxNode sync.Mutex //璁块棶鑺傜偣涓婚琛ㄦ椂锛岄渶瑕佸姞閿� - wg *sync.WaitGroup + wg *sync.WaitGroup - sockRep *sockServer //鍝嶅簲鍏朵粬杩涚▼request鐨剆ocket锛宻erver - - sockHB *sockClient //缁存寔蹇冭烦鐨剆ocket锛岀嚎绋嬪疄鏃跺彂閫侊紝闇�瑕佸崟鐙鐞� - - sockPub *sockClient //鍙戝竷涓婚鐨剆ocket锛岄渶瑕佸崟鐙瑂ocket澶勭悊 - - sockSub *sockClient //璁㈤槄涓婚鐨剆ocket锛岀嚎绋嬪疄鏃舵帴鏀舵秷鎭紝闇�瑕佸崟鐙鐞� - - sockWorker *sockClient //鍙戠粰浠绘剰鐨剆erver,鐭殏鐨剅equest client - //mtxWorker sync.Mutex //SendAndRecv鍙兘涓嶆槸绾跨▼瀹夊叏鐨� - - chSub chan TransInfo - chReply chan TransInfo + ChSub chan bhome_msg.MsgPublish + ChReply chan MsgReq } //鑾峰彇鍏朵粬杩涚▼鍙戠粰姝ocket鐨勬秷鎭�,鍙互鏄痵ub鐨勬帴鏀讹紝涔熷彲浠ユ槸reply鐨勬帴鏀躲�� -func recvRoutine(ctx context.Context, s *bhomebus.Socket, wg *sync.WaitGroup, ch chan<-TransInfo, logFn func(...interface{})) { - var data []byte - var key int +func recvRequestRoutine(ctx context.Context, wg *sync.WaitGroup, ch chan<-MsgReq, logFn func(...interface{})) { + var procId string + var msg bhome_msg.MsgRequestTopic + var src unsafe.Pointer for { select { case <-ctx.Done(): @@ -65,20 +47,16 @@ wg.Done() return default: - n := s.RecvfromTimeout(&data, &key, 1000) //鐩墠10001杩斿洖鍊艰〃绀鸿秴鏃� - if n == 0 { - var info MsgInfo - if err := json.Unmarshal(data, &info);err == nil { - ch <- TransInfo{ - info: &info, - port: key, //杩欎釜key鍦ㄥ彂甯冭闃呮ā寮忎腑鏄痓us鐨刱ey锛屾槸涓浐瀹氬�硷紝涓婂眰鐢ㄤ笉鍒� - } - - data = []byte{} - key = 0 - } else { - logFn("unmarshal to MsgInfo err:", err) + if bhsgo.ReadRequest(&procId, &msg, &src, 100) { + ch <- MsgReq{ + procId, + msg, + src, } + + procId = "" + msg.Reset() + src = unsafe.Pointer(nil) } else { time.Sleep(100 * time.Millisecond) } @@ -86,59 +64,22 @@ } } -//func recvandsendRoutine(h context.WrapperHandler, s *bhomebus.Socket, wg *sync.WaitGroup,serveFn func(rdata []byte, rkey int, sdata *[]byte) bool, logFn func(...interface{})) { -// for { -// select { -// case <-ctx.Done(): -// logFn("recvandsendRoutine ctx.Done") -// wg.Done() -// return -// default: -// n := s.RecvandsendTimeout(1000, serveFn) //鐩墠10001杩斿洖鍊艰〃绀鸿秴鏃� -// if n != 0 { -// logFn("RecvandsendTimeout success") -// } else { -// //time.Sleep(100 * time.Millisecond) -// } -// } -// } -//} - //Register func Register(ctx context.Context, q chan os.Signal, config *Config, ri *RegisterInfo) (*BHBus,error) { handle := &BHBus { - ctx: ctx, - conf: config, - m: make(map[string]*sockServer), - chSub: make(chan TransInfo, config.chSize), - chReply: make(chan TransInfo, config.chSize), + ctx: ctx, + conf: config, + ri: ri, + ChSub: make(chan bhome_msg.MsgPublish, config.chSize), + ChReply: make(chan MsgReq, config.chSize), } - var err error - err = bhomebus.Init("libshm_queue.so") - if err != nil { - handle.printLog("Init so err:", err) - return nil, err - } - err = bhomebus.ShmInit(512) - if err != nil { - handle.printLog("shmInit size err:", err) - return nil, err - } - regSock := bhomebus.OpenSocket() - if regSock == nil { - handle.printLog("Open Socket ret Nil") - return nil, errors.New("OpenSocket ret Nil") - } - defer func() { - regSock.Close() - handle.printLog("regSock.CLose") - }() - - var msg []byte - var regAddr []bhomebus.NetNode - var regR *RegisterReply //娉ㄥ唽缁撴灉淇℃伅 //濡傛灉娉ㄥ唽澶辫触锛屽氨浼氫竴鐩村皾璇曟敞鍐� + procI := bhome_msg.ProcInfo{ + ProcId: []byte(ri.Proc.ID), + Name: []byte(ri.Proc.Name), + } + var regReply bhome_msg.MsgCommonReply loop: for { select { @@ -146,165 +87,91 @@ handle.printLog("register <-q") return nil,errors.New("ctx is done") default: - if msg == nil { - rid, err := json.Marshal(*ri) - if err != nil { - handle.printLog("marshal registerInfo err:", err) - return nil, errors.New("marshal registerInfo err:"+err.Error()) - } - s := MsgInfo{ - SrcProc: ri.Proc, - MsgType: MesgType_ReqRep, - Topic: TOPIC_REGISTER, - Body: rid, - } - handle.printLog("register MsgInfo:", s) - dRegData,err := json.Marshal(s) - if err != nil { - handle.printLog("marshal deregister msg err:", err) - return nil, err - } - handle.printLog(string(dRegData)) - msg = dRegData - } - if regAddr == nil { - regAddr = append([]bhomebus.NetNode{}, bhomebus.NetNode{ - Key: handle.conf.regKey, - }) - } - var rMsg []bhomebus.Mesg - n := regSock.SendandrecvTimeout(regAddr, msg, &rMsg, handle.conf.sendTimeOut) //n浠h〃鎴愬姛鍙戦�佺殑鑺傜偣鐨勪釜鏁� - handle.printLog("regSock.SendandrecvTimeout n:", n) - if n == 1 && len(rMsg) == 1 { - var cr Reply - if err = json.Unmarshal(rMsg[0].Data, &cr);err != nil { - handle.printLog("unmarshal regReply err:", err) - return nil, errors.New("unmarshal regReply err:"+err.Error()) - } else { - if cr.Success { - if rpd,err := json.Marshal(cr.Data);err ==nil { - var rr RegisterReply - if err = json.Unmarshal(rpd, &rr); err == nil { - regR = &rr - break loop - } else { - handle.printLog("unmarshal RegisterReply err:", err) - } - } else { - handle.printLog("marshal cr.Data err:", err) - } - } else { - handle.printLog("cr:", cr) - } - } + if bhsgo.Register(&procI, ®Reply, handle.conf.sendTimeOut) { + break loop } else { - time.Sleep(1 * time.Second) + time.Sleep(time.Second) } } } - handle.printLog("register Reply:", *regR) - - for _, v := range ri.Channel { - if k,ok := regR.ChannelKey[v];ok { - s := bhomebus.OpenSocket() - s.ForceBind(int(k)) - handle.m[v] = &sockServer{ - sock: s, - info: &ri.Proc, + if ri.PubTopic != nil && len(ri.PubTopic) > 0 { + topics := bhome_msg.MsgTopicList{} + var regTopicReply bhome_msg.MsgCommonReply + for _,t := range ri.PubTopic { + topics.TopicList = append(topics.TopicList, []byte(t)) + } + loopRT: + for { + select { + case <-q: + handle.printLog("RegisterTopics recv quit signal") + return nil, errors.New("RegisterTopics recv quit signal") + default: + if bhsgo.RegisterTopics(&topics, ®TopicReply, handle.conf.sendTimeOut) { + handle.printLog("bhsgo.RegisterTopics success!!") + break loopRT + } else { + time.Sleep(time.Second) + } } } + + handle.wg.Add(1) + go recvRequestRoutine(ctx, handle.wg, handle.ChReply, handle.printLog) } - //缁存寔蹇冭烦鐨剆ocket - sockHB := bhomebus.OpenSocket() - handle.printLog("open sockHB") - handle.sockHB = &sockClient{ - sock: sockHB, - peer: int(regR.HeartbeatKey), - } + handle.printLog("register done!" ) handle.wg = &sync.WaitGroup{} - if ri.PubTopic != nil && len(ri.PubTopic) > 0 { - sockReply := bhomebus.OpenSocket() - sockReply.ForceBind(int(regR.ReplyKey)) - handle.printLog("after pubTopic forceBind") - handle.wg.Add(1) - //serve server reply - go recvRoutine(ctx, sockReply, handle.wg, handle.chReply, handle.printLog) - handle.sockRep = &sockServer{ - sock: sockReply, - info: &ri.Proc, - } - } - - - //鍙戝竷娑堟伅鐨剆ocket, pub鏄皢娑堟伅鍙戝竷鍒癰us涓紝鎵�浠ヤ笉闇�瑕佹寚瀹歬ey - sockPub := bhomebus.OpenSocket() - handle.sockPub = &sockClient{ - sock: sockPub, - peer: -1, - } - //鏈夎闃呮秷鎭墠闇�瑕佸惎鍔ㄥ崗绋嬫帴鏀舵秷鎭� if ri.SubTopic != nil && len(ri.SubTopic) > 0 { - //璁㈤槄娑堟伅鐨剆ocket - sockSub := bhomebus.OpenSocket() - //璁㈤槄鎵�鏈変富棰� - handle.printLog("start Sub topics") + handle.printLog("sub topics") + var subList bhome_msg.MsgTopicList for _,v := range ri.SubTopic { - subN := sockSub.Sub(v) - handle.printLog("subTopic:", v, " ret n:", subN) + subList.TopicList = append(subList.TopicList, []byte(v)) } - //鍚姩璁㈤槄淇℃伅鎺ユ敹 - handle.wg.Add(1) - go recvRoutine(ctx, sockSub, handle.wg, handle.chSub, handle.printLog) - handle.sockSub = &sockClient{ - sock: sockSub, - peer: -1, + var subReply bhome_msg.MsgCommonReply + if bhsgo.Subscribe(&subList, &subReply, handle.conf.sendTimeOut) { + //鍚姩璁㈤槄淇℃伅鎺ユ敹 + handle.wg.Add(1) + go recvSubRoutine(ctx, handle.wg, handle.ChSub, handle.printLog) } - } - - sockWorker := bhomebus.OpenSocket() - handle.sockWorker = &sockClient{ - sock: sockWorker, - peer: int(regR.QueryTopicKey), } return handle, nil } +func recvSubRoutine(ctx context.Context,wg *sync.WaitGroup, ch chan <-bhome_msg.MsgPublish, logFn func(...interface{})) { + var procId string + var msg bhome_msg.MsgPublish + for { + select { + case <-ctx.Done(): + logFn("recvRoutine ctx.Done") + wg.Done() + return + default: + if bhsgo.ReadSub(&procId, &msg, 100) { + ch <- msg + + procId = "" + msg.Reset() + } else { + //time.Sleep(100 * time.Millisecond) + } + } + } +} + //DeRegister func (h *BHBus) DeRegister(dri *RegisterInfo) error { - data, err := json.Marshal(*dri) - if err != nil { - return err - } - dRegData,err := json.Marshal(MsgInfo{ - MsgType: "", - Topic: TOPIC_DEREGISTER, - Body: data, - }) - if err != nil { - return err - } - - //h.mtxWorker.Lock() - //defer h.mtxWorker.Unlock() - netNodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{ - Key: h.conf.regKey, - }) - var retMsg []bhomebus.Mesg - n := h.sockWorker.sock.SendandrecvTimeout(netNodes, dRegData, &retMsg, h.conf.sendTimeOut) - if n == 0 { - return nil - } - h.printLog("DeRegister retMsg:", retMsg) - return fmt.Errorf("DeRegister n:%d", n) + h.printLog("DeRegister") + return nil } func (h *BHBus) printLog(v ...interface{}) { @@ -318,75 +185,27 @@ h.printLog("call BHBus free") h.wg.Wait() h.printLog("h.wg.Wait done") - for _,v := range h.m { - v.sock.Close() - } - if h.sockRep != nil { - h.sockRep.sock.Close() - h.sockRep = nil - } - if h.sockHB != nil { - h.sockHB.sock.Close() - h.sockHB = nil - } - if h.sockPub != nil { - h.sockPub.sock.Close() - h.sockPub = nil - } - if h.sockSub != nil { - h.sockSub.sock.Close() - h.sockSub = nil - } - if h.sockWorker != nil { - h.sockWorker.sock.Close() - h.sockWorker = nil - } - - h.printLog("BHBus Freed") } //HeartBeat send -func (h *BHBus) HeartBeat(info *HeartBeatInfo) error { - data, err := json.Marshal(*info) - if err == nil { - hbd,err := json.Marshal(MsgInfo{ - SrcProc: info.Proc, - MsgType: MesgType_ReqRep, - Topic: TOPIC_HEARTBEAT, - Body: data, - }) - if err != nil { - h.printLog("marshal heartbeat msgInfo err:", err) - return err - } - var rMsg []bhomebus.Mesg - hbAddr := append([]bhomebus.NetNode{}, bhomebus.NetNode{ - Key: h.sockHB.peer, - }) - //h.printLog("start send heartbeat") - n := h.sockHB.sock.SendandrecvTimeout(hbAddr, hbd, &rMsg, h.conf.sendTimeOut) //n浠h〃鎴愬姛鍙戦�佺殑鑺傜偣鐨勪釜鏁� - //h.printLog("sockHB.Sendandrecv n:", n, "len(rMsg):", len(rMsg)) - - if n > 0 { - return nil - } else { - return fmt.Errorf("sockHB Sendandrecv ret n:%d", n) - } +func (h *BHBus) HeartBeat() error { + procI := bhome_msg.ProcInfo{ + ProcId: []byte(h.ri.Proc.ID), + Name: []byte(h.ri.Proc.Name), } - return err + var ret bhome_msg.MsgCommonReply + if bhsgo.Heartbeat(&procI, &ret, h.conf.sendTimeOut) { + return nil + } else { + return errors.New("send heartBeat return false") + } } -//func (h *BHBus) send2(s *sockClient, data []byte, timeout int) error { -// n := s.sock.SendtoTimeout(data, s.peer, timeout) -// if n == 0 { -// return nil -// } -// return errors.New("SendtoTimeout n:"+strconv.Itoa(n)) -//} + //鏇存柊涓婚鍒楄〃 -func (h *BHBus) UpdateNodeTopics(arr []NodeInfo) { +func (h *BHBus) UpdateNodeTopics(arr []NodeInfo) { h.mtxNode.Lock() defer h.mtxNode.Unlock() h.nodes = arr @@ -395,185 +214,92 @@ //鑾峰彇topic瀵瑰簲鐨刱ey //濡傛灉浼犱簡serverId涓嶄负绌猴紝鍒欒幏鍙栨寚瀹氭満鍣ㄤ笂鐨則opic-key //濡傛灉server涓虹┖锛屽垯鑾峰彇鎵�鏈夎妭鐐逛笂topic-key -func (h *BHBus) GetNetNodeByTopic(serverId string,srcProc *ProcInfo, topic string) ([]bhomebus.NetNode,error) { - h.mtxNode.Lock() - defer h.mtxNode.Unlock() - var nodes []bhomebus.NetNode - reqNetNode := append([]bhomebus.NetNode{}, bhomebus.NetNode{ - Key: h.sockWorker.peer, - }) - reqD,err := json.Marshal(MsgInfo{ - SrcProc: *srcProc, - MsgType: MesgType_ReqRep, - Topic: TOPIC_QUERYKEY, - Body: []byte(topic), - }) - if err != nil { - return nil, fmt.Errorf("marshal req err:%s", err.Error()) - } - var ret []bhomebus.Mesg - n := h.sockWorker.sock.SendandrecvTimeout(reqNetNode, reqD, &ret, h.conf.sendTimeOut) - if n > 0 { - var reply Reply - err = json.Unmarshal(ret[0].Data, &reply) - if err != nil { - h.printLog("unmarshal err:", err) - return nil, err - } +func (h *BHBus) GetNetNodeByTopic(serverId string,srcProc *ProcInfo, topic string) ([]bhome_msg.BHAddress,error) { - if reply.Success { - rd,err := json.Marshal(reply.Data) - if err == nil { - err = json.Unmarshal(rd, &nodes) - if err == nil { - return nodes, nil - } else { - h.printLog("unmarshal err:", err, "nodes:", nodes, " center reply.Data:", reply.Data) - return nil, fmt.Errorf("unmarshal to nodes err:%s", err.Error()) - } - } else { - return nil, fmt.Errorf("marshal reply.Data err:%s", err.Error()) - } - - } else { - h.printLog("reply success:", reply.Success, "msg:", reply.Msg, "data:", reply.Data) - return nil, fmt.Errorf("REPLY msg:%s", reply.Msg) - } - } else { - return nil, fmt.Errorf("GetNetNodeByTopic ret n:%d", n) - } + return nil, nil } -func (h *BHBus) Request(serverId string, req *MsgInfo, milliSecs int) (*Reply, error) { +func (h *BHBus) Request(serverId string, req *bhome_msg.MsgRequestTopic, milliSecs int) (*Reply, error) { //1.棣栧厛闇�瑕侀�氳繃topic鎷垮埌鏈満瀵瑰簲鐨凬etNode - rNodes, err := h.GetNetNodeByTopic(serverId, &req.SrcProc, req.Topic) - if err != nil { - h.printLog("topic: ", req.Topic, " rNodes: ", rNodes, " err:", err) - return nil, err - } - if rNodes == nil || len(rNodes) == 0 { - return nil, errors.New("rNodes empty, topic: "+ req.Topic) - } //2.灏嗚姹傝繑閫佸埌瀵瑰簲鐨剆erver,骞剁瓑寰呰繑鍥炲�� - data, err := json.Marshal(*req) - if err != nil { - h.printLog("marshal(*req) err:", err) - return nil, err - } - var ret []bhomebus.Mesg - - n := h.sockWorker.sock.SendandrecvTimeout(rNodes, data,&ret, milliSecs) - - if n > 0 && len(ret) > 0 { - var resp Reply - if err = json.Unmarshal(ret[0].Data, &resp); err == nil { - return &resp, nil - } else { - h.printLog("unmarshal ret[0].Data err:", err) - return nil, err + pid := "" + mrt := bhome_msg.MsgRequestTopicReply{} + dest := bhome_msg.BHAddress{} + if bhsgo.Request(&dest, req, &pid, &mrt, milliSecs) { + var reply Reply + if err := json.Unmarshal(mrt.Data, &reply); err != nil { + return nil,err } + + return &reply, nil } else { - h.printLog("Request n: ", n, " len(ret): ", len(ret)) - return nil, fmt.Errorf("request err, SendandrecvTimeout n:%d", n) + return nil, errors.New("request ") } } -func (h *BHBus) RequestOnly(rData []byte, rNodes []bhomebus.NetNode) ([]byte, error) { - var ret []bhomebus.Mesg - - n := h.sockWorker.sock.SendandrecvTimeout(rNodes, rData, &ret, h.conf.sendTimeOut) - - if n > 0 && len(ret) > 0 { - return ret[0].Data, nil +func (h *BHBus) RequestOnly(req *bhome_msg.MsgRequestTopic, destArr []bhome_msg.BHAddress) ([]byte, error) { + dest := bhome_msg.BHAddress{} + if destArr != nil && len(destArr) > 0 { + dest = destArr[0] + } + pid := "" + r := bhome_msg.MsgRequestTopicReply{} + if bhsgo.Request(&dest, req, &pid, &r, h.conf.sendTimeOut) { + return r.Data, nil } else { - h.printLog("Request n: ", n, " len(ret): ", len(ret), "rData:", string(rData)) - return nil, fmt.Errorf("request err, SendandrecvTimeout n:%d", n) + h.printLog("bhsgo.Request request err:", r.Errmsg) + return nil, errors.New("bhsgo.Request return false") } } -func (h *BHBus) Reply(replyKey int, i *Reply) error { +func (h *BHBus) Reply(src unsafe.Pointer, i *Reply) error { data,err := json.Marshal(*i) if err != nil { return err } - - n := h.sockRep.sock.SendtoTimeout(data, replyKey, h.conf.sendTimeOut) - h.printLog("reply to key:", replyKey, " n:",n) - if n != 0 { - return errors.New("reply sendToTimeOut n:"+strconv.Itoa(n)) + rep := bhome_msg.MsgRequestTopicReply{ + Data: data, } - return nil + if bhsgo.SendReply(src, &rep) { + return nil + } + + return errors.New("reply return false") } +func (h *BHBus) RequestCenter(req *bhome_msg.MsgRequestTopic) (*Reply, error) { -//鍙彂閫佽姹傦紝涓嶉渶瑕佸簲绛�. -//鏆撮湶鍦ㄤ笂灞傜殑锛屽彧鏈塼opic锛屾病鏈塳ey銆� -func (h *BHBus) SendOnly(key int, arg *MsgInfo) error { - data,err := json.Marshal(*arg) - if err != nil { - return err - } - //h.mtxWorker.Lock() - //defer h.mtxWorker.Unlock() - n := h.sockWorker.sock.SendtoTimeout(data, key, h.conf.sendTimeOut) - if n != 0 { - return fmt.Errorf("sendOnly ret n:%d", n) - } - return nil -} - -func (h *BHBus) RequestCenter(req *MsgInfo) (*Reply, error) { - data, err := json.Marshal(*req) - if err != nil { - return nil, err - } - rNodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{ - Key: KEY_QUERY, - }) - //h.mtxWorker.Lock() - //defer h.mtxWorker.Unlock() - var ret []bhomebus.Mesg - n := h.sockWorker.sock.SendandrecvTimeout(rNodes, data,&ret, h.conf.sendTimeOut) - h.printLog("requestCenter n:", n, "len(ret):", len(ret)) - if n > 0 && len(ret) > 0{ - var cr Reply - if err = json.Unmarshal(ret[0].Data, &cr); err == nil { - return &cr, nil - } else { - h.printLog("unmarshal to CommonReply err:", err) - } - } - return nil, fmt.Errorf("request center err") + return nil, errors.New("") } //鍚戜富棰橀�氶亾涓彂甯冩秷鎭� -func (h *BHBus) Pub(nodes []bhomebus.NetNode, msg *MsgInfo) error { - data,err := json.Marshal(*msg) - if err == nil { - if n := h.sockPub.sock.PubTimeout(nodes, msg.Topic, data, h.conf.pubTimeOut);n > 0 { - return nil - } else { - return fmt.Errorf("pub err n:%d", n) - } +func (h *BHBus) Pub(nodes []bhome_msg.BHAddress, msg *bhome_msg.MsgPublish) error { + if bhsgo.Publish(msg, h.conf.pubTimeOut) { + return nil + } else { + return fmt.Errorf("pub err ") } - - return err } -func (h *BHBus) PubTimeout(nodes []bhomebus.NetNode, msg *MsgInfo, timeout int) int { - data,err := json.Marshal(*msg) - if err == nil { - return h.sockPub.sock.PubTimeout(nodes, msg.Topic, data, timeout) +func (h *BHBus) PubTimeout(nodes []bhome_msg.BHAddress, msg *bhome_msg.MsgPublish, timeout int) int { + if bhsgo.Publish(msg, timeout) { + return 1 } return -1 } //杩藉姞璁㈤槄鐨勪富棰樻秷鎭� func (h *BHBus) Sub(topics []string) { - if topics != nil { - for _,t := range topics { - h.sockSub.sock.Sub(t) + if topics != nil && len(topics) >0 { + var subList bhome_msg.MsgTopicList + for _, v := range topics { + subList.TopicList = append(subList.TopicList, []byte(v)) + } + + var subReply bhome_msg.MsgCommonReply + if bhsgo.Subscribe(&subList, &subReply, h.conf.sendTimeOut) { + h.printLog("sub topics") } } } @@ -581,40 +307,6 @@ //娉ㄩ攢璁㈤槄鐨勪富棰� func (h *BHBus) DeSub(topics []string) { if topics != nil { - for _,t := range topics { - if n := h.sockSub.sock.Desub(t); n != 0 { - h.printLog("DeSub topic:", t, " n:", n) - } - } - } -} - -//鑾峰彇sub 鎴栬�呴渶瑕乺eply鐨勬秷鎭� -//func (h *BHBus) GetMsg() (subMsg *MsgInfo) { -// if h.sockHB == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil { -// return nil -// } -// if len(h.chSub) >0 { -// m := <-h.chSub -// subMsg = m.info -// } -// return -//} - - -func (h *BHBus) GetMsg() (subMsg *MsgInfo, replyMsg *MsgInfo, replyKey int) { - if h.sockHB == nil && h.sockRep == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil { - return nil, nil, -1 } - if len(h.chSub) >0 { - m := <-h.chSub - subMsg = m.info - } - if len(h.chReply) >0 { - m := <-h.chReply - replyMsg = m.info - replyKey = m.port - } - return } \ No newline at end of file diff --git a/hbusc.proto b/hbusc.proto deleted file mode 100644 index 0feb99a..0000000 --- a/hbusc.proto +++ /dev/null @@ -1,54 +0,0 @@ -syntax = "proto3"; - -package hbusc; - -message ProcInfo { - string procID = 1; - string name = 2; - string label = 3; -} - -message RegisterInfo { - ProcInfo procInfo = 1; - repeated string channel = 2; - repeated string pubTopic = 3; - repeated string subTopic = 4; -} - -message RegisterInfoReply { - ProcInfo procInfo = 1; //杩涚▼淇℃伅 - map<string, int32> channelKey = 2; //棰勭暀 - int32 heartbeatKey = 3; //蹇冭烦淇℃伅鍙戦�佸埌鐨勭洰鐨刱ey - int32 updateTopicKey = 4; //鏇存柊鎰熷叴瓒d富棰樼殑key锛屽湪鍏跺畠杩涚▼涓缁戝畾 - int32 replyKey = 5; //鐢宠鍒扮殑鏈繘绋嬩綔涓簊erver鐢╧ey - int32 queryTopicKey = 6; -} - -message TopicInfo { - string topic = 1; - string topicType = 2; -} - -message TopicInfoReply { - TopicInfo info = 1; - int32 key = 2; -} - -message HeartBeatInfo { - string healthLevel = 1; // 鍋ュ悍绛夌骇 - int32 fps = 2; // 澶勭悊甯х巼(dec瑙g爜甯х巼銆乻dk澶勭悊甯х巼) - string warnInfo = 3; // 鎶ヨ淇℃伅 - string errorInfo = 4; // 閿欒淇℃伅 - bytes otherInfo = 5; // 鍏朵粬鐗规湁淇℃伅锛屽鏈夐渶瑕佸氨鐢ㄨ繖涓� - int32 otherInfoSize = 6; // 鍏朵粬鐗规湁淇℃伅闀垮害 - ProcInfo procInfo = 7; //杩涚▼淇℃伅 -} - -message MsgInfo { - ProcInfo srcProc = 1; // 婧愯繘绋嬪熀鏈俊鎭� - string msgType = 2; // 鏁版嵁绫诲瀷锛屽彲涓鸿姹傘�佸彂甯冦�佽闃呫�佸簲绛旂瓑 - string topic= 3; // 鏈嶅姟涓婚 - int32 shmKey = 4; // 璇锋眰搴旂瓟鏁版嵁浣跨敤鐨刱ey锛屽叾浠栨暟鎹笉鐢紝寰呯‘璁� - bytes body = 5; // 鏁版嵁鍐呭锛屼簩杩涘埗缂栫爜鍚庣殑锛岄渶瑕佺‘瀹氱紪鐮佺被鍨嬪苟瑙g爜 - int32 bodyLen = 6; // 鏁版嵁闀垮害 -} \ No newline at end of file diff --git a/message.go b/message.go index 9ba85b9..9ce78da 100644 --- a/message.go +++ b/message.go @@ -35,8 +35,8 @@ ) type NodeList struct { - Ip string `json:"ip"` - Port int `json:"port"` + Ip string `json:"ip"` + Port int `json:"port"` } const ( @@ -60,43 +60,10 @@ } type RegisterInfo struct { - Proc ProcInfo `json:"proc"` // 杩涚▼鐨勪俊鎭� - Channel []string `json:"channel"` // 鏂板棰戦亾锛屽搴斾竴涓柊鐨勫叡浜唴瀛橀槦鍒� - PubTopic []string `json:"pubTopic"` // 杩涚▼瀵瑰鍙戝竷鐨勬湇鍔′富棰� - SubTopic []string `json:"subTopic"` // 杩涚▼璁㈤槄鐨勬湇鍔′富棰� -} - - -type RegisterReply struct { - TCPProxyIP string `json:"tcpProxyIP"` // BHomeCenter鍚姩鐨則cp浠g悊鏈嶅姟鍣↖P - TCPProxyPort int `json:"tcpProxyPort"` // BHomeCenter鍚姩鐨則cp浠g悊鏈嶅姟鍣ㄧ鍙� - HeartbeatKey int `json:"heartbeatKey"` // client鍙戦�佸績璺崇殑key - ReplyKey int `json:"replyKey"` // client鐨勫簲绛旀湇鍔ey - ChannelKey map[string]int `json:"channelKey"` // client鐨刢han瀵瑰簲鐨刱ey - QueryTopicKey int `json:"queryTopicKey"` // client鏌ヨtopic瀵瑰簲鐨刱ey鏃剁敤鍒扮殑key - Status int `json:"status"` // 璇锋眰鐘舵��,鐩墠鍙湁涓や釜,鎴愬姛杩斿洖200,澶辫触202 -} - -type HeartBeatInfo struct { - Proc ProcInfo `json:"proc"` // 杩涚▼鐨勪俊鎭� - HealthLevel string `json:"healthLevel"` // 鍋ュ悍绛夌骇 - Fps int `json:"fps"` // 澶勭悊甯х巼(dec瑙g爜甯х巼銆乻dk澶勭悊甯х巼) - WarnInfo string `json:"warnInfo"` // 鎶ヨ淇℃伅 - ErrorInfo string `json:"errorInfo"` // 閿欒淇℃伅 - OtherInfo []byte `json:"otherInfo"` // 鍏朵粬鐗规湁淇℃伅锛屽鏈夐渶瑕佸氨鐢ㄨ繖涓� - OtherInfoSize int `json:"otherInfoSize"` // 鍏朵粬鐗规湁淇℃伅闀垮害 -} - -type HeartBeatReply struct { - Status int `json:"status"` // 璇锋眰鐘舵��,鐩墠鍙湁涓や釜,鎴愬姛杩斿洖200,澶辫触202 - Desc string `json:"desc"` // 璇锋眰鐘舵�佺殑鎻忚堪,鎴愬姛"success",澶辫触杩斿洖澶辫触鍘熷洜,濡傚績璺虫湇鍔℃湭鍚姩 -} - -type MsgInfo struct { - SrcProc ProcInfo `json:"srcProc"` // 婧愯繘绋嬪熀鏈俊鎭� - MsgType string `json:"msgType"` // 鏁版嵁绫诲瀷锛屽彲涓鸿姹傘�佸彂甯冦�佽闃呫�佸簲绛旂瓑 - Topic string `json:"topic"` // 璇锋眰鐨勫嚱鏁�,骞朵笉瀵瑰簲浠讳綍鐨剆hmKey,涓氬姟灞傜殑topic - Body []byte `json:"body"` // 璇锋眰鍐呭 + Proc ProcInfo `json:"proc"` // 杩涚▼鐨勪俊鎭� + Channel []string `json:"channel"` // 鏂板棰戦亾锛屽搴斾竴涓柊鐨勫叡浜唴瀛橀槦鍒� + PubTopic []string `json:"pubTopic"` // 杩涚▼瀵瑰鍙戝竷鐨勬湇鍔′富棰� + SubTopic []string `json:"subTopic"` // 杩涚▼璁㈤槄鐨勬湇鍔′富棰� } diff --git a/micronode.go b/micronode.go index 89e4088..a826412 100644 --- a/micronode.go +++ b/micronode.go @@ -1,7 +1,7 @@ package bhomeclient import ( - "basic.com/valib/bhomebus.git" + "basic.com/valib/bhshmq.git/proto/source/bhome_msg" "context" "encoding/json" "errors" @@ -20,7 +20,7 @@ serverId string fnLog func(...interface{}) - SubCh chan *MsgInfo + SubCh chan *bhome_msg.MsgPublish mtx sync.Mutex started bool @@ -39,7 +39,7 @@ reg: reg, procInfo: ®.Proc, fnLog: fnLog, - SubCh: make(chan *MsgInfo, 512), + SubCh: make(chan *bhome_msg.MsgPublish, 512), } return mn, nil @@ -65,14 +65,6 @@ } func (ms *MicroNode) startHeartbeat() { - hbi := &HeartBeatInfo{ - HealthLevel: "health", - Fps: 12, - WarnInfo: "warn", - ErrorInfo: "error", - Proc: *ms.procInfo, - } - t := time.NewTicker(1 * time.Second) defer t.Stop() @@ -81,7 +73,9 @@ case <-ms.ctx.Done(): return case <-t.C: - ms.handle.HeartBeat(hbi) + ms.handle.HeartBeat() + default: + time.Sleep(500 * time.Millisecond) } } } @@ -110,52 +104,18 @@ select { case <- ms.ctx.Done(): return + case msgR := <-ms.handle.ChReply: //鏀跺埌鍏跺畠杩涚▼鐨勮姹傛秷鎭� + go ms.serve(ms.handle.ctx, &msgR) + case msgS := <-ms.handle.ChSub: + ms.printLog("Recv Sub Message:", string(msgS.Data)) + ms.SubCh <- &msgS default: - msgS, msgR, keyR := ms.handle.GetMsg() - if msgS != nil { - //鏀跺埌鍏跺畠杩涚▼鐨勫彂甯冩秷鎭� - ms.printLog("Recv Sub Message:", string(msgS.Body)) - ms.SubCh <- msgS - } - if msgR != nil { - //鏀跺埌鍏跺畠杩涚▼鐨勮姹傛秷鎭� - go ms.serve(msgR, keyR) - } - time.Sleep(50 * time.Millisecond) } } - - //鎺ユ敹璁㈤槄鍒扮殑娑堟伅 - //go ms.startRecvSubMsg() - //浣滀负server鍚姩 - //ms.serve() } ms.mtx.Unlock() } - -//寮�濮嬫帴鏀惰闃呮秷鎭� -//func (ms *MicroNode) startRecvSubMsg() { -// for { -// select { -// case <- ms.ctx.Done(): -// return -// default: -// msgS, msgR, keyR := ms.handle.GetMsg() -// if msgS != nil { -// //鏀跺埌鍏跺畠杩涚▼鐨勫彂甯冩秷鎭� -// ms.printLog("Recv Sub Message:", string(msgS.Body)) -// ms.SubCh <- msgS -// } -// if msgR != nil { -// //鏀跺埌鍏跺畠杩涚▼鐨勮姹傛秷鎭� -// go ms.serve(msgR, keyR) -// } -// -// time.Sleep(50 * time.Millisecond) -// } -// } -//} func (ms *MicroNode) Request(serverId string, request Request, milliSecs int) (*Reply,error) { t := time.Now() @@ -163,9 +123,9 @@ ms.printLog("1:", time.Since(t)) t = time.Now() rb, _ := json.Marshal(request) - msgR := &MsgInfo { - Topic: request.Path, - Body: rb, + msgR := &bhome_msg.MsgRequestTopic{ + Topic: []byte(request.Path), + Data: rb, } ms.printLog("2:", time.Since(t)) return ms.handle.Request(serverId, msgR, milliSecs) @@ -173,20 +133,20 @@ func (ms *MicroNode) RequestTopic(serverId string, request Request, milliSecs int) (*Reply,error) { rb, _ := json.Marshal(request) - msgR := &MsgInfo{ - Topic: request.Path, - Body: rb, + msgR := &bhome_msg.MsgRequestTopic{ + Topic: []byte(request.Path), + Data: rb, } return ms.handle.Request(serverId, msgR, milliSecs) } -func (ms *MicroNode) RequestOnly(rData []byte, nodes []bhomebus.NetNode) ([]byte, error) { - return ms.handle.RequestOnly(rData, nodes) +func (ms *MicroNode) RequestOnly(req *bhome_msg.MsgRequestTopic, dest []bhome_msg.BHAddress) ([]byte, error) { + return ms.handle.RequestOnly(req, dest) } //鑾峰彇鏈満涓煇涓�涓富棰樼殑 key 锛堢粨鏋滃彧鏈変竴涓厓绱狅級 -func (ms *MicroNode) GetLocalNetNodeByTopic(topicName string) []bhomebus.NetNode { +func (ms *MicroNode) GetLocalNetNodeByTopic(topicName string) []bhome_msg.BHAddress { netNodes, err := ms.handle.GetNetNodeByTopic(ms.serverId, ms.procInfo, topicName) if err != nil { ms.printLog("topic:",topicName, " netNodes:", netNodes, "err:", err) @@ -196,7 +156,7 @@ } //鑾峰彇闆嗙兢涓墍鏈夎妭鐐规煇涓富棰樼殑key淇℃伅锛� 锛堢粨鏋滃彲鑳芥湁澶氫釜锛� -func (ms *MicroNode) GetAllNetNodesByTopic(topicName string) []bhomebus.NetNode { +func (ms *MicroNode) GetAllNetNodesByTopic(topicName string) []bhome_msg.BHAddress { netNodes, err := ms.handle.GetNetNodeByTopic("", ms.procInfo, topicName) if err != nil { return nil @@ -205,11 +165,7 @@ } func (ms *MicroNode) GetRegisteredClient() ([]RegisteredClient,error) { - r := MsgInfo{ - SrcProc: *ms.procInfo, - MsgType: MesgType_ReqRep, - Topic: TOPIC_QUERYPROC, - } + r := bhome_msg.MsgRequestTopic{} cr, err := ms.handle.RequestCenter(&r) if err != nil { ms.printLog("requestCenter reply:", cr, "err:", err) @@ -234,76 +190,37 @@ return nil, fmt.Errorf("GetRegisteredClient list failed") } -//func (ms *MicroNode) call(rdata []byte, rkey int, sdata *[]byte) bool { -// ri := &Reply{} -// if ms.handlers == nil { -// ri.Msg = "send wrong addr, check yourself!!!" -// } else { -// var msgR MsgInfo -// err := json.Unmarshal(rdata, &msgR) -// if err != nil { -// ri.Msg = err.Error() -// } else { -// var reqBody Request -// err = json.Unmarshal(rdata, &msgR.Body) -// if err != nil { -// ri.Msg = err.Error() -// } else { -// ms.printLog("srcProc:", reqBody.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", rkey) -// if f,ok := ms.handlers[reqBody.Path];ok { -// reqBody.SrcProc = msgR.SrcProc -// ri = f(&reqBody) -// ms.printLog("call funcMap f,reply:", *ri) -// } else { -// ms.printLog("ms.funcMap not eixst path: ", reqBody.Path) -// ri.Msg = "璇锋眰鐨勬帴鍙d笉瀛樺湪锛岃妫�鏌rl" -// } -// } -// } -// } -// result, err := json.Marshal(*ri) -// if err != nil { -// sdata = nil -// } else { -// sdata = &result -// } -// return ri.Success -//} - -//func (ms *MicroNode) serve() { -// if ms.handlers == nil { -// return -// } -// for i:=0;i<10;i++ { -// ms.handle.wg.Add(1) -// go recvandsendRoutine(ms.ctx, ms.handle.sockRep.sock, ms.handle.wg, ms.call, ms.fnLog) -// } -//} - -func (ms *MicroNode) serve(msgR *MsgInfo, p int) { +func (ms *MicroNode) serve(ctx context.Context, msgR *MsgReq) { if ms.handlers == nil { return } var reqBody Request var ri *Reply - err := json.Unmarshal(msgR.Body, &reqBody) + err := json.Unmarshal(msgR.Data, &reqBody) if err != nil { ms.printLog("serve unmarshal msgR.Body err:", err) ri = &Reply { Msg: err.Error(), } } else { - ms.printLog("srcProc:", msgR.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", p) + ms.printLog("reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap) if f,ok := ms.handlers[reqBody.Path];ok { - reqBody.SrcProc = msgR.SrcProc + reqBody.SrcProc = ProcInfo{ + ID: msgR.ProcId, + } h := WrapperHandler{ ms, ms, } - ri = f(&h, &reqBody) - ms.printLog("call funcMap f,reply.Success:", ri.Success) + select { + case <-ctx.Done(): + ms.printLog("get ctx.Done before f(&h, &reqBody) return, exit!!!") + default: + ri = f(&h, &reqBody) + ms.printLog("call funcMap f,reply.Success:", ri.Success) + } } else { ms.printLog("ms.funcMap not eixst path: ", reqBody.Path) ri = &Reply{ @@ -314,7 +231,7 @@ } } - retErr := ms.handle.Reply(p, ri) + retErr := ms.handle.Reply(msgR.Src, ri) if retErr != nil { ms.printLog("retErr:", retErr) } @@ -322,24 +239,22 @@ //鍙戝竷鍒版湰鏈� func (ms *MicroNode) Publish(topic string,msg []byte) error { - nodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{ - Key: 8, - }) + var nodes []bhome_msg.BHAddress return ms.PublishNet(nodes, topic, msg) } -func (ms *MicroNode) PublishNet(nodes []bhomebus.NetNode, topic string,msg []byte) error { - pi := &MsgInfo{ - Topic: topic, - Body: msg, +func (ms *MicroNode) PublishNet(nodes []bhome_msg.BHAddress, topic string,data []byte) error { + pi := &bhome_msg.MsgPublish{ + Topic: []byte(topic), + Data: data, } return ms.handle.Pub(nodes, pi) } -func (ms *MicroNode) PublishNetTimeout(nodes []bhomebus.NetNode, topic string, msg []byte, timeout int) int { - pi := &MsgInfo{ - Topic: topic, - Body: msg, +func (ms *MicroNode) PublishNetTimeout(nodes []bhome_msg.BHAddress, topic string, data []byte, timeout int) int { + pi := &bhome_msg.MsgPublish{ + Topic: []byte(topic), + Data: data, } return ms.handle.PubTimeout(nodes, pi, timeout) } -- Gitblit v1.8.0