From c7069befa28a0f2594f0746044318a30d6989c19 Mon Sep 17 00:00:00 2001 From: liuxiaolong <liuxiaolong@aiotlink.com> Date: 星期日, 25 四月 2021 11:36:03 +0800 Subject: [PATCH] 使用bhsgo by lichao --- hbusc.go | 626 ++++++++++++++------------------------------------------ 1 files changed, 159 insertions(+), 467 deletions(-) diff --git a/hbusc.go b/hbusc.go index 0735c04..6ccd790 100644 --- a/hbusc.go +++ b/hbusc.go @@ -1,63 +1,45 @@ package bhomeclient import ( - "basic.com/valib/bhomebus.git" + "basic.com/valib/bhshmq.git/api/bhsgo" + "basic.com/valib/bhshmq.git/proto/source/bhome_msg" "context" "encoding/json" "errors" "fmt" "os" - "strconv" "sync" "time" + "unsafe" ) -type sockServer struct { - sock *bhomebus.Socket - info *ProcInfo -} - -type sockClient struct { - sock *bhomebus.Socket - peer int -} - -type TransInfo struct { - info *MsgInfo - port int +type MsgReq struct { + ProcId string + bhome_msg.MsgRequestTopic + Src unsafe.Pointer } type BHBus struct { - ctx context.Context + ctx context.Context - conf *Config + ri *RegisterInfo - nodes []NodeInfo //闆嗙兢涓妭鐐圭姸鎬佷俊鎭紝浠ュ強姣忎釜鑺傜偣涓婄殑topic淇℃伅銆傞泦缇ょ姸鎬佷笅闇�瑕佸�熷姪serf杩涜鍚屾 - mtxNode sync.Mutex //璁块棶鑺傜偣涓婚琛ㄦ椂锛岄渶瑕佸姞閿� + conf *Config - m map[string]*sockServer + nodes []NodeInfo //闆嗙兢涓妭鐐圭姸鎬佷俊鎭紝浠ュ強姣忎釜鑺傜偣涓婄殑topic淇℃伅銆傞泦缇ょ姸鎬佷笅闇�瑕佸�熷姪serf杩涜鍚屾 + mtxNode sync.Mutex //璁块棶鑺傜偣涓婚琛ㄦ椂锛岄渶瑕佸姞閿� - wg *sync.WaitGroup + wg *sync.WaitGroup - sockRep *sockServer //鍝嶅簲鍏朵粬杩涚▼request鐨剆ocket锛宻erver - - sockHB *sockClient //缁存寔蹇冭烦鐨剆ocket锛岀嚎绋嬪疄鏃跺彂閫侊紝闇�瑕佸崟鐙鐞� - - sockPub *sockClient //鍙戝竷涓婚鐨剆ocket锛岄渶瑕佸崟鐙瑂ocket澶勭悊 - - sockSub *sockClient //璁㈤槄涓婚鐨剆ocket锛岀嚎绋嬪疄鏃舵帴鏀舵秷鎭紝闇�瑕佸崟鐙鐞� - - sockWorker *sockClient //鍙戠粰浠绘剰鐨剆erver,鐭殏鐨剅equest client - //mtxWorker sync.Mutex //SendAndRecv鍙兘涓嶆槸绾跨▼瀹夊叏鐨� - - chSub chan TransInfo - chReply chan TransInfo + ChSub chan bhome_msg.MsgPublish + ChReply chan MsgReq } //鑾峰彇鍏朵粬杩涚▼鍙戠粰姝ocket鐨勬秷鎭�,鍙互鏄痵ub鐨勬帴鏀讹紝涔熷彲浠ユ槸reply鐨勬帴鏀躲�� -func recvRoutine(ctx context.Context, s *bhomebus.Socket, wg *sync.WaitGroup, ch chan<-TransInfo, logFn func(...interface{})) { - var data []byte - var key int +func recvRequestRoutine(ctx context.Context, wg *sync.WaitGroup, ch chan<-MsgReq, logFn func(...interface{})) { + var procId string + var msg bhome_msg.MsgRequestTopic + var src unsafe.Pointer for { select { case <-ctx.Done(): @@ -65,20 +47,16 @@ wg.Done() return default: - n := s.RecvfromTimeout(&data, &key, 1000) //鐩墠10001杩斿洖鍊艰〃绀鸿秴鏃� - if n == 0 { - var info MsgInfo - if err := json.Unmarshal(data, &info);err == nil { - ch <- TransInfo{ - info: &info, - port: key, //杩欎釜key鍦ㄥ彂甯冭闃呮ā寮忎腑鏄痓us鐨刱ey锛屾槸涓浐瀹氬�硷紝涓婂眰鐢ㄤ笉鍒� - } - - data = []byte{} - key = 0 - } else { - logFn("unmarshal to MsgInfo err:", err) + if bhsgo.ReadRequest(&procId, &msg, &src, 100) { + ch <- MsgReq{ + procId, + msg, + src, } + + procId = "" + msg.Reset() + src = unsafe.Pointer(nil) } else { time.Sleep(100 * time.Millisecond) } @@ -86,59 +64,22 @@ } } -//func recvandsendRoutine(h context.WrapperHandler, s *bhomebus.Socket, wg *sync.WaitGroup,serveFn func(rdata []byte, rkey int, sdata *[]byte) bool, logFn func(...interface{})) { -// for { -// select { -// case <-ctx.Done(): -// logFn("recvandsendRoutine ctx.Done") -// wg.Done() -// return -// default: -// n := s.RecvandsendTimeout(1000, serveFn) //鐩墠10001杩斿洖鍊艰〃绀鸿秴鏃� -// if n != 0 { -// logFn("RecvandsendTimeout success") -// } else { -// //time.Sleep(100 * time.Millisecond) -// } -// } -// } -//} - //Register func Register(ctx context.Context, q chan os.Signal, config *Config, ri *RegisterInfo) (*BHBus,error) { handle := &BHBus { - ctx: ctx, - conf: config, - m: make(map[string]*sockServer), - chSub: make(chan TransInfo, config.chSize), - chReply: make(chan TransInfo, config.chSize), + ctx: ctx, + conf: config, + ri: ri, + ChSub: make(chan bhome_msg.MsgPublish, config.chSize), + ChReply: make(chan MsgReq, config.chSize), } - var err error - err = bhomebus.Init("libshm_queue.so") - if err != nil { - handle.printLog("Init so err:", err) - return nil, err - } - err = bhomebus.ShmInit(512) - if err != nil { - handle.printLog("shmInit size err:", err) - return nil, err - } - regSock := bhomebus.OpenSocket() - if regSock == nil { - handle.printLog("Open Socket ret Nil") - return nil, errors.New("OpenSocket ret Nil") - } - defer func() { - regSock.Close() - handle.printLog("regSock.CLose") - }() - - var msg []byte - var regAddr []bhomebus.NetNode - var regR *RegisterReply //娉ㄥ唽缁撴灉淇℃伅 //濡傛灉娉ㄥ唽澶辫触锛屽氨浼氫竴鐩村皾璇曟敞鍐� + procI := bhome_msg.ProcInfo{ + ProcId: []byte(ri.Proc.ID), + Name: []byte(ri.Proc.Name), + } + var regReply bhome_msg.MsgCommonReply loop: for { select { @@ -146,165 +87,91 @@ handle.printLog("register <-q") return nil,errors.New("ctx is done") default: - if msg == nil { - rid, err := json.Marshal(*ri) - if err != nil { - handle.printLog("marshal registerInfo err:", err) - return nil, errors.New("marshal registerInfo err:"+err.Error()) - } - s := MsgInfo{ - SrcProc: ri.Proc, - MsgType: MesgType_ReqRep, - Topic: TOPIC_REGISTER, - Body: rid, - } - handle.printLog("register MsgInfo:", s) - dRegData,err := json.Marshal(s) - if err != nil { - handle.printLog("marshal deregister msg err:", err) - return nil, err - } - handle.printLog(string(dRegData)) - msg = dRegData - } - if regAddr == nil { - regAddr = append([]bhomebus.NetNode{}, bhomebus.NetNode{ - Key: handle.conf.regKey, - }) - } - var rMsg []bhomebus.Mesg - n := regSock.SendandrecvTimeout(regAddr, msg, &rMsg, handle.conf.sendTimeOut) //n浠h〃鎴愬姛鍙戦�佺殑鑺傜偣鐨勪釜鏁� - handle.printLog("regSock.SendandrecvTimeout n:", n) - if n == 1 && len(rMsg) == 1 { - var cr Reply - if err = json.Unmarshal(rMsg[0].Data, &cr);err != nil { - handle.printLog("unmarshal regReply err:", err) - return nil, errors.New("unmarshal regReply err:"+err.Error()) - } else { - if cr.Success { - if rpd,err := json.Marshal(cr.Data);err ==nil { - var rr RegisterReply - if err = json.Unmarshal(rpd, &rr); err == nil { - regR = &rr - break loop - } else { - handle.printLog("unmarshal RegisterReply err:", err) - } - } else { - handle.printLog("marshal cr.Data err:", err) - } - } else { - handle.printLog("cr:", cr) - } - } + if bhsgo.Register(&procI, ®Reply, handle.conf.sendTimeOut) { + break loop } else { - time.Sleep(1 * time.Second) + time.Sleep(time.Second) } } } - handle.printLog("register Reply:", *regR) - - for _, v := range ri.Channel { - if k,ok := regR.ChannelKey[v];ok { - s := bhomebus.OpenSocket() - s.ForceBind(int(k)) - handle.m[v] = &sockServer{ - sock: s, - info: &ri.Proc, + if ri.PubTopic != nil && len(ri.PubTopic) > 0 { + topics := bhome_msg.MsgTopicList{} + var regTopicReply bhome_msg.MsgCommonReply + for _,t := range ri.PubTopic { + topics.TopicList = append(topics.TopicList, []byte(t)) + } + loopRT: + for { + select { + case <-q: + handle.printLog("RegisterTopics recv quit signal") + return nil, errors.New("RegisterTopics recv quit signal") + default: + if bhsgo.RegisterTopics(&topics, ®TopicReply, handle.conf.sendTimeOut) { + handle.printLog("bhsgo.RegisterTopics success!!") + break loopRT + } else { + time.Sleep(time.Second) + } } } + + handle.wg.Add(1) + go recvRequestRoutine(ctx, handle.wg, handle.ChReply, handle.printLog) } - //缁存寔蹇冭烦鐨剆ocket - sockHB := bhomebus.OpenSocket() - handle.printLog("open sockHB") - handle.sockHB = &sockClient{ - sock: sockHB, - peer: int(regR.HeartbeatKey), - } + handle.printLog("register done!" ) handle.wg = &sync.WaitGroup{} - if ri.PubTopic != nil && len(ri.PubTopic) > 0 { - sockReply := bhomebus.OpenSocket() - sockReply.ForceBind(int(regR.ReplyKey)) - handle.printLog("after pubTopic forceBind") - handle.wg.Add(1) - //serve server reply - go recvRoutine(ctx, sockReply, handle.wg, handle.chReply, handle.printLog) - handle.sockRep = &sockServer{ - sock: sockReply, - info: &ri.Proc, - } - } - - - //鍙戝竷娑堟伅鐨剆ocket, pub鏄皢娑堟伅鍙戝竷鍒癰us涓紝鎵�浠ヤ笉闇�瑕佹寚瀹歬ey - sockPub := bhomebus.OpenSocket() - handle.sockPub = &sockClient{ - sock: sockPub, - peer: -1, - } - //鏈夎闃呮秷鎭墠闇�瑕佸惎鍔ㄥ崗绋嬫帴鏀舵秷鎭� if ri.SubTopic != nil && len(ri.SubTopic) > 0 { - //璁㈤槄娑堟伅鐨剆ocket - sockSub := bhomebus.OpenSocket() - //璁㈤槄鎵�鏈変富棰� - handle.printLog("start Sub topics") + handle.printLog("sub topics") + var subList bhome_msg.MsgTopicList for _,v := range ri.SubTopic { - subN := sockSub.Sub(v) - handle.printLog("subTopic:", v, " ret n:", subN) + subList.TopicList = append(subList.TopicList, []byte(v)) } - //鍚姩璁㈤槄淇℃伅鎺ユ敹 - handle.wg.Add(1) - go recvRoutine(ctx, sockSub, handle.wg, handle.chSub, handle.printLog) - handle.sockSub = &sockClient{ - sock: sockSub, - peer: -1, + var subReply bhome_msg.MsgCommonReply + if bhsgo.Subscribe(&subList, &subReply, handle.conf.sendTimeOut) { + //鍚姩璁㈤槄淇℃伅鎺ユ敹 + handle.wg.Add(1) + go recvSubRoutine(ctx, handle.wg, handle.ChSub, handle.printLog) } - } - - sockWorker := bhomebus.OpenSocket() - handle.sockWorker = &sockClient{ - sock: sockWorker, - peer: int(regR.QueryTopicKey), } return handle, nil } +func recvSubRoutine(ctx context.Context,wg *sync.WaitGroup, ch chan <-bhome_msg.MsgPublish, logFn func(...interface{})) { + var procId string + var msg bhome_msg.MsgPublish + for { + select { + case <-ctx.Done(): + logFn("recvRoutine ctx.Done") + wg.Done() + return + default: + if bhsgo.ReadSub(&procId, &msg, 100) { + ch <- msg + + procId = "" + msg.Reset() + } else { + //time.Sleep(100 * time.Millisecond) + } + } + } +} + //DeRegister func (h *BHBus) DeRegister(dri *RegisterInfo) error { - data, err := json.Marshal(*dri) - if err != nil { - return err - } - dRegData,err := json.Marshal(MsgInfo{ - MsgType: "", - Topic: TOPIC_DEREGISTER, - Body: data, - }) - if err != nil { - return err - } - - //h.mtxWorker.Lock() - //defer h.mtxWorker.Unlock() - netNodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{ - Key: h.conf.regKey, - }) - var retMsg []bhomebus.Mesg - n := h.sockWorker.sock.SendandrecvTimeout(netNodes, dRegData, &retMsg, h.conf.sendTimeOut) - if n == 0 { - return nil - } - h.printLog("DeRegister retMsg:", retMsg) - return fmt.Errorf("DeRegister n:%d", n) + h.printLog("DeRegister") + return nil } func (h *BHBus) printLog(v ...interface{}) { @@ -318,75 +185,27 @@ h.printLog("call BHBus free") h.wg.Wait() h.printLog("h.wg.Wait done") - for _,v := range h.m { - v.sock.Close() - } - if h.sockRep != nil { - h.sockRep.sock.Close() - h.sockRep = nil - } - if h.sockHB != nil { - h.sockHB.sock.Close() - h.sockHB = nil - } - if h.sockPub != nil { - h.sockPub.sock.Close() - h.sockPub = nil - } - if h.sockSub != nil { - h.sockSub.sock.Close() - h.sockSub = nil - } - if h.sockWorker != nil { - h.sockWorker.sock.Close() - h.sockWorker = nil - } - - h.printLog("BHBus Freed") } //HeartBeat send -func (h *BHBus) HeartBeat(info *HeartBeatInfo) error { - data, err := json.Marshal(*info) - if err == nil { - hbd,err := json.Marshal(MsgInfo{ - SrcProc: info.Proc, - MsgType: MesgType_ReqRep, - Topic: TOPIC_HEARTBEAT, - Body: data, - }) - if err != nil { - h.printLog("marshal heartbeat msgInfo err:", err) - return err - } - var rMsg []bhomebus.Mesg - hbAddr := append([]bhomebus.NetNode{}, bhomebus.NetNode{ - Key: h.sockHB.peer, - }) - //h.printLog("start send heartbeat") - n := h.sockHB.sock.SendandrecvTimeout(hbAddr, hbd, &rMsg, h.conf.sendTimeOut) //n浠h〃鎴愬姛鍙戦�佺殑鑺傜偣鐨勪釜鏁� - //h.printLog("sockHB.Sendandrecv n:", n, "len(rMsg):", len(rMsg)) - - if n > 0 { - return nil - } else { - return fmt.Errorf("sockHB Sendandrecv ret n:%d", n) - } +func (h *BHBus) HeartBeat() error { + procI := bhome_msg.ProcInfo{ + ProcId: []byte(h.ri.Proc.ID), + Name: []byte(h.ri.Proc.Name), } - return err + var ret bhome_msg.MsgCommonReply + if bhsgo.Heartbeat(&procI, &ret, h.conf.sendTimeOut) { + return nil + } else { + return errors.New("send heartBeat return false") + } } -//func (h *BHBus) send2(s *sockClient, data []byte, timeout int) error { -// n := s.sock.SendtoTimeout(data, s.peer, timeout) -// if n == 0 { -// return nil -// } -// return errors.New("SendtoTimeout n:"+strconv.Itoa(n)) -//} + //鏇存柊涓婚鍒楄〃 -func (h *BHBus) UpdateNodeTopics(arr []NodeInfo) { +func (h *BHBus) UpdateNodeTopics(arr []NodeInfo) { h.mtxNode.Lock() defer h.mtxNode.Unlock() h.nodes = arr @@ -395,185 +214,92 @@ //鑾峰彇topic瀵瑰簲鐨刱ey //濡傛灉浼犱簡serverId涓嶄负绌猴紝鍒欒幏鍙栨寚瀹氭満鍣ㄤ笂鐨則opic-key //濡傛灉server涓虹┖锛屽垯鑾峰彇鎵�鏈夎妭鐐逛笂topic-key -func (h *BHBus) GetNetNodeByTopic(serverId string,srcProc *ProcInfo, topic string) ([]bhomebus.NetNode,error) { - h.mtxNode.Lock() - defer h.mtxNode.Unlock() - var nodes []bhomebus.NetNode - reqNetNode := append([]bhomebus.NetNode{}, bhomebus.NetNode{ - Key: h.sockWorker.peer, - }) - reqD,err := json.Marshal(MsgInfo{ - SrcProc: *srcProc, - MsgType: MesgType_ReqRep, - Topic: TOPIC_QUERYKEY, - Body: []byte(topic), - }) - if err != nil { - return nil, fmt.Errorf("marshal req err:%s", err.Error()) - } - var ret []bhomebus.Mesg - n := h.sockWorker.sock.SendandrecvTimeout(reqNetNode, reqD, &ret, h.conf.sendTimeOut) - if n > 0 { - var reply Reply - err = json.Unmarshal(ret[0].Data, &reply) - if err != nil { - h.printLog("unmarshal err:", err) - return nil, err - } +func (h *BHBus) GetNetNodeByTopic(serverId string,srcProc *ProcInfo, topic string) ([]bhome_msg.BHAddress,error) { - if reply.Success { - rd,err := json.Marshal(reply.Data) - if err == nil { - err = json.Unmarshal(rd, &nodes) - if err == nil { - return nodes, nil - } else { - h.printLog("unmarshal err:", err, "nodes:", nodes, " center reply.Data:", reply.Data) - return nil, fmt.Errorf("unmarshal to nodes err:%s", err.Error()) - } - } else { - return nil, fmt.Errorf("marshal reply.Data err:%s", err.Error()) - } - - } else { - h.printLog("reply success:", reply.Success, "msg:", reply.Msg, "data:", reply.Data) - return nil, fmt.Errorf("REPLY msg:%s", reply.Msg) - } - } else { - return nil, fmt.Errorf("GetNetNodeByTopic ret n:%d", n) - } + return nil, nil } -func (h *BHBus) Request(serverId string, req *MsgInfo, milliSecs int) (*Reply, error) { +func (h *BHBus) Request(serverId string, req *bhome_msg.MsgRequestTopic, milliSecs int) (*Reply, error) { //1.棣栧厛闇�瑕侀�氳繃topic鎷垮埌鏈満瀵瑰簲鐨凬etNode - rNodes, err := h.GetNetNodeByTopic(serverId, &req.SrcProc, req.Topic) - if err != nil { - h.printLog("topic: ", req.Topic, " rNodes: ", rNodes, " err:", err) - return nil, err - } - if rNodes == nil || len(rNodes) == 0 { - return nil, errors.New("rNodes empty, topic: "+ req.Topic) - } //2.灏嗚姹傝繑閫佸埌瀵瑰簲鐨剆erver,骞剁瓑寰呰繑鍥炲�� - data, err := json.Marshal(*req) - if err != nil { - h.printLog("marshal(*req) err:", err) - return nil, err - } - var ret []bhomebus.Mesg - - n := h.sockWorker.sock.SendandrecvTimeout(rNodes, data,&ret, milliSecs) - - if n > 0 && len(ret) > 0 { - var resp Reply - if err = json.Unmarshal(ret[0].Data, &resp); err == nil { - return &resp, nil - } else { - h.printLog("unmarshal ret[0].Data err:", err) - return nil, err + pid := "" + mrt := bhome_msg.MsgRequestTopicReply{} + dest := bhome_msg.BHAddress{} + if bhsgo.Request(&dest, req, &pid, &mrt, milliSecs) { + var reply Reply + if err := json.Unmarshal(mrt.Data, &reply); err != nil { + return nil,err } + + return &reply, nil } else { - h.printLog("Request n: ", n, " len(ret): ", len(ret)) - return nil, fmt.Errorf("request err, SendandrecvTimeout n:%d", n) + return nil, errors.New("request ") } } -func (h *BHBus) RequestOnly(rData []byte, rNodes []bhomebus.NetNode) ([]byte, error) { - var ret []bhomebus.Mesg - - n := h.sockWorker.sock.SendandrecvTimeout(rNodes, rData, &ret, h.conf.sendTimeOut) - - if n > 0 && len(ret) > 0 { - return ret[0].Data, nil +func (h *BHBus) RequestOnly(req *bhome_msg.MsgRequestTopic, destArr []bhome_msg.BHAddress) ([]byte, error) { + dest := bhome_msg.BHAddress{} + if destArr != nil && len(destArr) > 0 { + dest = destArr[0] + } + pid := "" + r := bhome_msg.MsgRequestTopicReply{} + if bhsgo.Request(&dest, req, &pid, &r, h.conf.sendTimeOut) { + return r.Data, nil } else { - h.printLog("Request n: ", n, " len(ret): ", len(ret), "rData:", string(rData)) - return nil, fmt.Errorf("request err, SendandrecvTimeout n:%d", n) + h.printLog("bhsgo.Request request err:", r.Errmsg) + return nil, errors.New("bhsgo.Request return false") } } -func (h *BHBus) Reply(replyKey int, i *Reply) error { +func (h *BHBus) Reply(src unsafe.Pointer, i *Reply) error { data,err := json.Marshal(*i) if err != nil { return err } - - n := h.sockRep.sock.SendtoTimeout(data, replyKey, h.conf.sendTimeOut) - h.printLog("reply to key:", replyKey, " n:",n) - if n != 0 { - return errors.New("reply sendToTimeOut n:"+strconv.Itoa(n)) + rep := bhome_msg.MsgRequestTopicReply{ + Data: data, } - return nil + if bhsgo.SendReply(src, &rep) { + return nil + } + + return errors.New("reply return false") } +func (h *BHBus) RequestCenter(req *bhome_msg.MsgRequestTopic) (*Reply, error) { -//鍙彂閫佽姹傦紝涓嶉渶瑕佸簲绛�. -//鏆撮湶鍦ㄤ笂灞傜殑锛屽彧鏈塼opic锛屾病鏈塳ey銆� -func (h *BHBus) SendOnly(key int, arg *MsgInfo) error { - data,err := json.Marshal(*arg) - if err != nil { - return err - } - //h.mtxWorker.Lock() - //defer h.mtxWorker.Unlock() - n := h.sockWorker.sock.SendtoTimeout(data, key, h.conf.sendTimeOut) - if n != 0 { - return fmt.Errorf("sendOnly ret n:%d", n) - } - return nil -} - -func (h *BHBus) RequestCenter(req *MsgInfo) (*Reply, error) { - data, err := json.Marshal(*req) - if err != nil { - return nil, err - } - rNodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{ - Key: KEY_QUERY, - }) - //h.mtxWorker.Lock() - //defer h.mtxWorker.Unlock() - var ret []bhomebus.Mesg - n := h.sockWorker.sock.SendandrecvTimeout(rNodes, data,&ret, h.conf.sendTimeOut) - h.printLog("requestCenter n:", n, "len(ret):", len(ret)) - if n > 0 && len(ret) > 0{ - var cr Reply - if err = json.Unmarshal(ret[0].Data, &cr); err == nil { - return &cr, nil - } else { - h.printLog("unmarshal to CommonReply err:", err) - } - } - return nil, fmt.Errorf("request center err") + return nil, errors.New("") } //鍚戜富棰橀�氶亾涓彂甯冩秷鎭� -func (h *BHBus) Pub(nodes []bhomebus.NetNode, msg *MsgInfo) error { - data,err := json.Marshal(*msg) - if err == nil { - if n := h.sockPub.sock.PubTimeout(nodes, msg.Topic, data, h.conf.pubTimeOut);n > 0 { - return nil - } else { - return fmt.Errorf("pub err n:%d", n) - } +func (h *BHBus) Pub(nodes []bhome_msg.BHAddress, msg *bhome_msg.MsgPublish) error { + if bhsgo.Publish(msg, h.conf.pubTimeOut) { + return nil + } else { + return fmt.Errorf("pub err ") } - - return err } -func (h *BHBus) PubTimeout(nodes []bhomebus.NetNode, msg *MsgInfo, timeout int) int { - data,err := json.Marshal(*msg) - if err == nil { - return h.sockPub.sock.PubTimeout(nodes, msg.Topic, data, timeout) +func (h *BHBus) PubTimeout(nodes []bhome_msg.BHAddress, msg *bhome_msg.MsgPublish, timeout int) int { + if bhsgo.Publish(msg, timeout) { + return 1 } return -1 } //杩藉姞璁㈤槄鐨勪富棰樻秷鎭� func (h *BHBus) Sub(topics []string) { - if topics != nil { - for _,t := range topics { - h.sockSub.sock.Sub(t) + if topics != nil && len(topics) >0 { + var subList bhome_msg.MsgTopicList + for _, v := range topics { + subList.TopicList = append(subList.TopicList, []byte(v)) + } + + var subReply bhome_msg.MsgCommonReply + if bhsgo.Subscribe(&subList, &subReply, h.conf.sendTimeOut) { + h.printLog("sub topics") } } } @@ -581,40 +307,6 @@ //娉ㄩ攢璁㈤槄鐨勪富棰� func (h *BHBus) DeSub(topics []string) { if topics != nil { - for _,t := range topics { - if n := h.sockSub.sock.Desub(t); n != 0 { - h.printLog("DeSub topic:", t, " n:", n) - } - } - } -} - -//鑾峰彇sub 鎴栬�呴渶瑕乺eply鐨勬秷鎭� -//func (h *BHBus) GetMsg() (subMsg *MsgInfo) { -// if h.sockHB == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil { -// return nil -// } -// if len(h.chSub) >0 { -// m := <-h.chSub -// subMsg = m.info -// } -// return -//} - - -func (h *BHBus) GetMsg() (subMsg *MsgInfo, replyMsg *MsgInfo, replyKey int) { - if h.sockHB == nil && h.sockRep == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil { - return nil, nil, -1 } - if len(h.chSub) >0 { - m := <-h.chSub - subMsg = m.info - } - if len(h.chReply) >0 { - m := <-h.chReply - replyMsg = m.info - replyKey = m.port - } - return } \ No newline at end of file -- Gitblit v1.8.0