From c7069befa28a0f2594f0746044318a30d6989c19 Mon Sep 17 00:00:00 2001
From: liuxiaolong <liuxiaolong@aiotlink.com>
Date: 星期日, 25 四月 2021 11:36:03 +0800
Subject: [PATCH] 使用bhsgo by lichao

---
 hbusc.go |  626 ++++++++++++++------------------------------------------
 1 files changed, 159 insertions(+), 467 deletions(-)

diff --git a/hbusc.go b/hbusc.go
index 0735c04..6ccd790 100644
--- a/hbusc.go
+++ b/hbusc.go
@@ -1,63 +1,45 @@
 package bhomeclient
 
 import (
-	"basic.com/valib/bhomebus.git"
+	"basic.com/valib/bhshmq.git/api/bhsgo"
+	"basic.com/valib/bhshmq.git/proto/source/bhome_msg"
 	"context"
 	"encoding/json"
 	"errors"
 	"fmt"
 	"os"
-	"strconv"
 	"sync"
 	"time"
+	"unsafe"
 )
 
-type sockServer struct {
-	sock *bhomebus.Socket
-	info *ProcInfo
-}
-
-type sockClient struct {
-	sock *bhomebus.Socket
-	peer int
-}
-
-type TransInfo struct {
-	info *MsgInfo
-	port int
+type MsgReq struct {
+	ProcId 		string
+	bhome_msg.MsgRequestTopic
+	Src 		unsafe.Pointer
 }
 
 type BHBus struct {
-	ctx context.Context
+	ctx 		context.Context
 
-	conf *Config
+	ri 			*RegisterInfo
 
-	nodes []NodeInfo     //闆嗙兢涓妭鐐圭姸鎬佷俊鎭紝浠ュ強姣忎釜鑺傜偣涓婄殑topic淇℃伅銆傞泦缇ょ姸鎬佷笅闇�瑕佸�熷姪serf杩涜鍚屾
-	mtxNode sync.Mutex   //璁块棶鑺傜偣涓婚琛ㄦ椂锛岄渶瑕佸姞閿�
+	conf 		*Config
 
-	m map[string]*sockServer
+	nodes 		[]NodeInfo     //闆嗙兢涓妭鐐圭姸鎬佷俊鎭紝浠ュ強姣忎釜鑺傜偣涓婄殑topic淇℃伅銆傞泦缇ょ姸鎬佷笅闇�瑕佸�熷姪serf杩涜鍚屾
+	mtxNode 	sync.Mutex   //璁块棶鑺傜偣涓婚琛ㄦ椂锛岄渶瑕佸姞閿�
 
-	wg *sync.WaitGroup
+	wg 			*sync.WaitGroup
 
-	sockRep *sockServer  //鍝嶅簲鍏朵粬杩涚▼request鐨剆ocket锛宻erver
-
-	sockHB *sockClient  //缁存寔蹇冭烦鐨剆ocket锛岀嚎绋嬪疄鏃跺彂閫侊紝闇�瑕佸崟鐙鐞�
-
-	sockPub *sockClient  //鍙戝竷涓婚鐨剆ocket锛岄渶瑕佸崟鐙瑂ocket澶勭悊
-
-	sockSub *sockClient  //璁㈤槄涓婚鐨剆ocket锛岀嚎绋嬪疄鏃舵帴鏀舵秷鎭紝闇�瑕佸崟鐙鐞�
-
-	sockWorker 	*sockClient  //鍙戠粰浠绘剰鐨剆erver,鐭殏鐨剅equest client
-	//mtxWorker 	sync.Mutex	 //SendAndRecv鍙兘涓嶆槸绾跨▼瀹夊叏鐨�
-
-	chSub chan TransInfo
-	chReply chan TransInfo
+	ChSub   chan bhome_msg.MsgPublish
+	ChReply chan MsgReq
 }
 
 //鑾峰彇鍏朵粬杩涚▼鍙戠粰姝ocket鐨勬秷鎭�,鍙互鏄痵ub鐨勬帴鏀讹紝涔熷彲浠ユ槸reply鐨勬帴鏀躲��
-func recvRoutine(ctx context.Context, s *bhomebus.Socket, wg *sync.WaitGroup, ch chan<-TransInfo, logFn func(...interface{})) {
-	var data []byte
-	var key int
+func recvRequestRoutine(ctx context.Context, wg *sync.WaitGroup, ch chan<-MsgReq, logFn func(...interface{})) {
+	var procId string
+	var msg bhome_msg.MsgRequestTopic
+	var src unsafe.Pointer
 	for {
 		select {
 		case <-ctx.Done():
@@ -65,20 +47,16 @@
 			wg.Done()
 			return
 		default:
-			n := s.RecvfromTimeout(&data, &key, 1000) //鐩墠10001杩斿洖鍊艰〃绀鸿秴鏃�
-			if n == 0 {
-				var info MsgInfo
-				if err := json.Unmarshal(data, &info);err == nil {
-					ch <- TransInfo{
-						info: &info,
-						port: key,  //杩欎釜key鍦ㄥ彂甯冭闃呮ā寮忎腑鏄痓us鐨刱ey锛屾槸涓浐瀹氬�硷紝涓婂眰鐢ㄤ笉鍒�
-					}
-
-					data = []byte{}
-					key = 0
-				} else {
-					logFn("unmarshal to MsgInfo err:", err)
+			if bhsgo.ReadRequest(&procId, &msg, &src, 100) {
+				ch <- MsgReq{
+					procId,
+					msg,
+					src,
 				}
+
+				procId = ""
+				msg.Reset()
+				src = unsafe.Pointer(nil)
 			} else {
 				time.Sleep(100 * time.Millisecond)
 			}
@@ -86,59 +64,22 @@
 	}
 }
 
-//func recvandsendRoutine(h context.WrapperHandler, s *bhomebus.Socket, wg *sync.WaitGroup,serveFn func(rdata []byte, rkey int, sdata *[]byte) bool, logFn func(...interface{})) {
-//	for {
-//		select {
-//		case <-ctx.Done():
-//			logFn("recvandsendRoutine ctx.Done")
-//			wg.Done()
-//			return
-//		default:
-//			n := s.RecvandsendTimeout(1000, serveFn) //鐩墠10001杩斿洖鍊艰〃绀鸿秴鏃�
-//			if n != 0 {
-//				logFn("RecvandsendTimeout success")
-//			} else {
-//				//time.Sleep(100 * time.Millisecond)
-//			}
-//		}
-//	}
-//}
-
 //Register
 func Register(ctx context.Context, q chan os.Signal, config *Config, ri *RegisterInfo) (*BHBus,error) {
 	handle := &BHBus {
-		ctx: ctx,
-		conf: config,
-		m: make(map[string]*sockServer),
-		chSub: make(chan TransInfo, config.chSize),
-		chReply: make(chan TransInfo, config.chSize),
+		ctx:     ctx,
+		conf:    config,
+		ri:      ri,
+		ChSub:   make(chan bhome_msg.MsgPublish, config.chSize),
+		ChReply: make(chan MsgReq, config.chSize),
 	}
 
-	var err error
-	err = bhomebus.Init("libshm_queue.so")
-	if err != nil {
-		handle.printLog("Init so err:", err)
-		return nil, err
-	}
-	err = bhomebus.ShmInit(512)
-	if err != nil {
-		handle.printLog("shmInit size err:", err)
-		return nil, err
-	}
-	regSock := bhomebus.OpenSocket()
-	if regSock == nil {
-		handle.printLog("Open Socket ret Nil")
-		return nil, errors.New("OpenSocket ret Nil")
-	}
-	defer func() {
-		regSock.Close()
-		handle.printLog("regSock.CLose")
-	}()
-
-	var msg []byte
-	var regAddr []bhomebus.NetNode
-	var regR *RegisterReply  //娉ㄥ唽缁撴灉淇℃伅
 	//濡傛灉娉ㄥ唽澶辫触锛屽氨浼氫竴鐩村皾璇曟敞鍐�
+	procI := bhome_msg.ProcInfo{
+		ProcId: []byte(ri.Proc.ID),
+		Name: []byte(ri.Proc.Name),
+	}
+	var regReply bhome_msg.MsgCommonReply
 loop:
 	for {
 		select {
@@ -146,165 +87,91 @@
 			handle.printLog("register <-q")
 			return nil,errors.New("ctx is done")
 		default:
-			if msg == nil {
-				rid, err := json.Marshal(*ri)
-				if err != nil {
-					handle.printLog("marshal registerInfo err:", err)
-					return nil, errors.New("marshal registerInfo err:"+err.Error())
-				}
-				s := MsgInfo{
-					SrcProc: ri.Proc,
-					MsgType: MesgType_ReqRep,
-					Topic:   TOPIC_REGISTER,
-					Body:    rid,
-				}
-				handle.printLog("register MsgInfo:", s)
-				dRegData,err := json.Marshal(s)
-				if err != nil {
-					handle.printLog("marshal deregister msg err:", err)
-					return nil, err
-				}
-				handle.printLog(string(dRegData))
-				msg = dRegData
-			}
-			if regAddr == nil {
-				regAddr = append([]bhomebus.NetNode{}, bhomebus.NetNode{
-					Key: handle.conf.regKey,
-				})
-			}
 
-			var rMsg []bhomebus.Mesg
-			n := regSock.SendandrecvTimeout(regAddr, msg, &rMsg, handle.conf.sendTimeOut) //n浠h〃鎴愬姛鍙戦�佺殑鑺傜偣鐨勪釜鏁�
-			handle.printLog("regSock.SendandrecvTimeout n:", n)
-			if n == 1 && len(rMsg) == 1 {
-				var cr Reply
-				if err = json.Unmarshal(rMsg[0].Data, &cr);err != nil {
-					handle.printLog("unmarshal regReply err:", err)
-					return nil, errors.New("unmarshal regReply err:"+err.Error())
-				} else {
-					if cr.Success {
-						if rpd,err := json.Marshal(cr.Data);err ==nil {
-							var rr RegisterReply
-							if err = json.Unmarshal(rpd, &rr); err == nil {
-								regR = &rr
-								break loop
-							} else {
-								handle.printLog("unmarshal RegisterReply err:", err)
-							}
-						} else {
-							handle.printLog("marshal cr.Data err:", err)
-						}
-					} else {
-						handle.printLog("cr:", cr)
-					}
-				}
+			if bhsgo.Register(&procI, &regReply, handle.conf.sendTimeOut) {
+				break loop
 			} else {
-				time.Sleep(1 * time.Second)
+				time.Sleep(time.Second)
 			}
 		}
 	}
 
-	handle.printLog("register Reply:", *regR)
-
-	for _, v := range ri.Channel {
-		if k,ok := regR.ChannelKey[v];ok {
-			s := bhomebus.OpenSocket()
-			s.ForceBind(int(k))
-			handle.m[v] = &sockServer{
-				sock: s,
-				info: &ri.Proc,
+	if ri.PubTopic != nil && len(ri.PubTopic) > 0 {
+		topics := bhome_msg.MsgTopicList{}
+		var regTopicReply bhome_msg.MsgCommonReply
+		for _,t := range ri.PubTopic {
+			topics.TopicList = append(topics.TopicList, []byte(t))
+		}
+	loopRT:
+		for {
+			select {
+			case <-q:
+				handle.printLog("RegisterTopics recv quit signal")
+				return nil, errors.New("RegisterTopics recv quit signal")
+			default:
+				if bhsgo.RegisterTopics(&topics, &regTopicReply, handle.conf.sendTimeOut) {
+					handle.printLog("bhsgo.RegisterTopics success!!")
+					break loopRT
+				} else {
+					time.Sleep(time.Second)
+				}
 			}
 		}
+
+		handle.wg.Add(1)
+		go recvRequestRoutine(ctx, handle.wg, handle.ChReply, handle.printLog)
 	}
 
-	//缁存寔蹇冭烦鐨剆ocket
-	sockHB := bhomebus.OpenSocket()
-	handle.printLog("open sockHB")
-	handle.sockHB = &sockClient{
-		sock: sockHB,
-		peer: int(regR.HeartbeatKey),
-	}
+	handle.printLog("register done!" )
 
 	handle.wg = &sync.WaitGroup{}
 
-	if ri.PubTopic != nil && len(ri.PubTopic) > 0 {
-		sockReply := bhomebus.OpenSocket()
-		sockReply.ForceBind(int(regR.ReplyKey))
-		handle.printLog("after pubTopic forceBind")
-		handle.wg.Add(1)
-		//serve server reply
-		go recvRoutine(ctx, sockReply, handle.wg, handle.chReply, handle.printLog)
-		handle.sockRep = &sockServer{
-			sock: sockReply,
-			info: &ri.Proc,
-		}
-	}
-
-
-	//鍙戝竷娑堟伅鐨剆ocket, pub鏄皢娑堟伅鍙戝竷鍒癰us涓紝鎵�浠ヤ笉闇�瑕佹寚瀹歬ey
-	sockPub := bhomebus.OpenSocket()
-	handle.sockPub = &sockClient{
-		sock: sockPub,
-		peer: -1,
-	}
-
 	//鏈夎闃呮秷鎭墠闇�瑕佸惎鍔ㄥ崗绋嬫帴鏀舵秷鎭�
 	if ri.SubTopic != nil && len(ri.SubTopic) > 0 {
-		//璁㈤槄娑堟伅鐨剆ocket
-		sockSub := bhomebus.OpenSocket()
-		//璁㈤槄鎵�鏈変富棰�
-		handle.printLog("start Sub topics")
+		handle.printLog("sub topics")
+		var subList bhome_msg.MsgTopicList
 		for _,v := range ri.SubTopic {
-			subN := sockSub.Sub(v)
-			handle.printLog("subTopic:", v, " ret n:", subN)
+			subList.TopicList = append(subList.TopicList, []byte(v))
 		}
 
-		//鍚姩璁㈤槄淇℃伅鎺ユ敹
-		handle.wg.Add(1)
-		go recvRoutine(ctx, sockSub, handle.wg, handle.chSub, handle.printLog)
-		handle.sockSub = &sockClient{
-			sock: sockSub,
-			peer: -1,
+		var subReply bhome_msg.MsgCommonReply
+		if bhsgo.Subscribe(&subList, &subReply, handle.conf.sendTimeOut) {
+			//鍚姩璁㈤槄淇℃伅鎺ユ敹
+			handle.wg.Add(1)
+			go recvSubRoutine(ctx, handle.wg, handle.ChSub, handle.printLog)
 		}
-	}
-
-	sockWorker := bhomebus.OpenSocket()
-	handle.sockWorker = &sockClient{
-		sock: sockWorker,
-		peer: int(regR.QueryTopicKey),
 	}
 
 	return handle, nil
 }
 
+func recvSubRoutine(ctx context.Context,wg *sync.WaitGroup, ch chan <-bhome_msg.MsgPublish, logFn func(...interface{})) {
+	var procId string
+	var msg bhome_msg.MsgPublish
+	for {
+		select {
+		case <-ctx.Done():
+			logFn("recvRoutine ctx.Done")
+			wg.Done()
+			return
+		default:
+			if bhsgo.ReadSub(&procId, &msg, 100) {
+				ch <- msg
+
+				procId = ""
+				msg.Reset()
+			} else {
+				//time.Sleep(100 * time.Millisecond)
+			}
+		}
+	}
+}
+
 //DeRegister
 func (h *BHBus) DeRegister(dri *RegisterInfo) error {
-	data, err := json.Marshal(*dri)
-	if err != nil {
-		return err
-	}
 
-	dRegData,err := json.Marshal(MsgInfo{
-		MsgType: "",
-		Topic: TOPIC_DEREGISTER,
-		Body: data,
-	})
-	if err != nil {
-		return err
-	}
-
-	//h.mtxWorker.Lock()
-	//defer h.mtxWorker.Unlock()
-	netNodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{
-		Key: h.conf.regKey,
-	})
-	var retMsg []bhomebus.Mesg
-	n := h.sockWorker.sock.SendandrecvTimeout(netNodes, dRegData, &retMsg, h.conf.sendTimeOut)
-	if n == 0 {
-		return nil
-	}
-	h.printLog("DeRegister retMsg:", retMsg)
-	return fmt.Errorf("DeRegister n:%d", n)
+	h.printLog("DeRegister")
+	return nil
 }
 
 func (h *BHBus) printLog(v ...interface{}) {
@@ -318,75 +185,27 @@
 	h.printLog("call BHBus free")
 	h.wg.Wait()
 	h.printLog("h.wg.Wait done")
-	for _,v := range h.m {
-		v.sock.Close()
-	}
-	if h.sockRep != nil {
-		h.sockRep.sock.Close()
-		h.sockRep = nil
-	}
-	if h.sockHB != nil {
-		h.sockHB.sock.Close()
-		h.sockHB = nil
-	}
-	if h.sockPub != nil {
-		h.sockPub.sock.Close()
-		h.sockPub = nil
-	}
-	if h.sockSub != nil {
-		h.sockSub.sock.Close()
-		h.sockSub = nil
-	}
-	if h.sockWorker != nil {
-		h.sockWorker.sock.Close()
-		h.sockWorker = nil
-	}
-
-	h.printLog("BHBus Freed")
 }
 
 
 //HeartBeat send
-func (h *BHBus) HeartBeat(info *HeartBeatInfo) error {
-	data, err := json.Marshal(*info)
-	if err == nil {
-		hbd,err := json.Marshal(MsgInfo{
-			SrcProc: info.Proc,
-			MsgType: MesgType_ReqRep,
-			Topic:   TOPIC_HEARTBEAT,
-			Body:    data,
-		})
-		if err != nil {
-			h.printLog("marshal heartbeat msgInfo err:", err)
-			return err
-		}
-		var rMsg []bhomebus.Mesg
-		hbAddr := append([]bhomebus.NetNode{}, bhomebus.NetNode{
-			Key: h.sockHB.peer,
-		})
-		//h.printLog("start send heartbeat")
-		n := h.sockHB.sock.SendandrecvTimeout(hbAddr, hbd, &rMsg, h.conf.sendTimeOut) //n浠h〃鎴愬姛鍙戦�佺殑鑺傜偣鐨勪釜鏁�
-		//h.printLog("sockHB.Sendandrecv n:", n, "len(rMsg):", len(rMsg))
-
-		if n > 0 {
-			return nil
-		} else {
-			return fmt.Errorf("sockHB Sendandrecv ret n:%d", n)
-		}
+func (h *BHBus) HeartBeat() error {
+	procI := bhome_msg.ProcInfo{
+		ProcId: []byte(h.ri.Proc.ID),
+		Name: []byte(h.ri.Proc.Name),
 	}
-	return err
+	var ret bhome_msg.MsgCommonReply
+	if bhsgo.Heartbeat(&procI, &ret, h.conf.sendTimeOut) {
+		return nil
+	} else {
+		return errors.New("send heartBeat return false")
+	}
 }
 
-//func (h *BHBus) send2(s *sockClient, data []byte, timeout int) error {
-//	n := s.sock.SendtoTimeout(data, s.peer, timeout)
-//	if n == 0 {
-//		return nil
-//	}
-//	return errors.New("SendtoTimeout n:"+strconv.Itoa(n))
-//}
+
 
 //鏇存柊涓婚鍒楄〃
-func (h *BHBus)  UpdateNodeTopics(arr []NodeInfo) {
+func (h *BHBus) UpdateNodeTopics(arr []NodeInfo) {
 	h.mtxNode.Lock()
 	defer h.mtxNode.Unlock()
 	h.nodes = arr
@@ -395,185 +214,92 @@
 //鑾峰彇topic瀵瑰簲鐨刱ey
 //濡傛灉浼犱簡serverId涓嶄负绌猴紝鍒欒幏鍙栨寚瀹氭満鍣ㄤ笂鐨則opic-key
 //濡傛灉server涓虹┖锛屽垯鑾峰彇鎵�鏈夎妭鐐逛笂topic-key
-func (h *BHBus) GetNetNodeByTopic(serverId string,srcProc *ProcInfo, topic string) ([]bhomebus.NetNode,error) {
-	h.mtxNode.Lock()
-	defer h.mtxNode.Unlock()
-	var nodes []bhomebus.NetNode
-	reqNetNode := append([]bhomebus.NetNode{}, bhomebus.NetNode{
-		Key: h.sockWorker.peer,
-	})
-	reqD,err := json.Marshal(MsgInfo{
-		SrcProc: *srcProc,
-		MsgType: MesgType_ReqRep,
-		Topic:   TOPIC_QUERYKEY,
-		Body:    []byte(topic),
-	})
-	if err != nil {
-		return nil, fmt.Errorf("marshal req err:%s", err.Error())
-	}
-	var ret []bhomebus.Mesg
-	n := h.sockWorker.sock.SendandrecvTimeout(reqNetNode, reqD, &ret, h.conf.sendTimeOut)
-	if n > 0 {
-		var reply Reply
-		err = json.Unmarshal(ret[0].Data, &reply)
-		if err != nil {
-			h.printLog("unmarshal err:", err)
-			return nil, err
-		}
+func (h *BHBus) GetNetNodeByTopic(serverId string,srcProc *ProcInfo, topic string) ([]bhome_msg.BHAddress,error) {
 
-		if reply.Success {
-			rd,err := json.Marshal(reply.Data)
-			if err == nil {
-				err = json.Unmarshal(rd, &nodes)
-				if err == nil {
-					return nodes, nil
-				} else {
-					h.printLog("unmarshal err:", err, "nodes:", nodes, " center reply.Data:", reply.Data)
-					return nil, fmt.Errorf("unmarshal to nodes err:%s", err.Error())
-				}
-			} else {
-				return nil, fmt.Errorf("marshal reply.Data err:%s", err.Error())
-			}
-
-		} else {
-			h.printLog("reply success:", reply.Success, "msg:", reply.Msg, "data:", reply.Data)
-			return nil, fmt.Errorf("REPLY msg:%s", reply.Msg)
-		}
-	} else {
-		return nil,	fmt.Errorf("GetNetNodeByTopic ret n:%d", n)
-	}
+	return nil, nil
 }
 
-func (h *BHBus) Request(serverId string, req *MsgInfo, milliSecs int) (*Reply, error) {
+func (h *BHBus) Request(serverId string, req *bhome_msg.MsgRequestTopic, milliSecs int) (*Reply, error) {
 	//1.棣栧厛闇�瑕侀�氳繃topic鎷垮埌鏈満瀵瑰簲鐨凬etNode
-	rNodes, err := h.GetNetNodeByTopic(serverId, &req.SrcProc, req.Topic)
-	if err != nil {
-		h.printLog("topic: ", req.Topic, " rNodes: ", rNodes, " err:", err)
-		return nil, err
-	}
-	if rNodes == nil || len(rNodes) == 0 {
-		return nil, errors.New("rNodes empty, topic: "+ req.Topic)
-	}
 	//2.灏嗚姹傝繑閫佸埌瀵瑰簲鐨剆erver,骞剁瓑寰呰繑鍥炲��
-	data, err := json.Marshal(*req)
-	if err != nil {
-		h.printLog("marshal(*req) err:", err)
-		return nil, err
-	}
-	var ret []bhomebus.Mesg
-
-	n := h.sockWorker.sock.SendandrecvTimeout(rNodes, data,&ret, milliSecs)
-
-	if n > 0 && len(ret) > 0 {
-		var resp Reply
-		if err = json.Unmarshal(ret[0].Data, &resp); err == nil {
-			return &resp, nil
-		} else {
-			h.printLog("unmarshal ret[0].Data err:", err)
-			return nil, err
+	pid := ""
+	mrt := bhome_msg.MsgRequestTopicReply{}
+	dest := bhome_msg.BHAddress{}
+	if bhsgo.Request(&dest, req, &pid, &mrt, milliSecs) {
+		var reply Reply
+		if err := json.Unmarshal(mrt.Data, &reply); err != nil {
+			return nil,err
 		}
+
+		return &reply, nil
 	} else {
-		h.printLog("Request n: ", n, " len(ret): ", len(ret))
-		return nil, fmt.Errorf("request err, SendandrecvTimeout n:%d", n)
+		return nil, errors.New("request ")
 	}
 }
 
-func (h *BHBus) RequestOnly(rData []byte, rNodes []bhomebus.NetNode) ([]byte, error) {
-	var ret []bhomebus.Mesg
-
-	n := h.sockWorker.sock.SendandrecvTimeout(rNodes, rData, &ret, h.conf.sendTimeOut)
-
-	if n > 0 && len(ret) > 0 {
-		return ret[0].Data, nil
+func (h *BHBus) RequestOnly(req *bhome_msg.MsgRequestTopic, destArr []bhome_msg.BHAddress) ([]byte, error) {
+	dest := bhome_msg.BHAddress{}
+	if destArr != nil && len(destArr) > 0 {
+		dest = destArr[0]
+	}
+	pid := ""
+	r := bhome_msg.MsgRequestTopicReply{}
+	if bhsgo.Request(&dest, req, &pid, &r, h.conf.sendTimeOut) {
+		return r.Data, nil
 	} else {
-		h.printLog("Request n: ", n, " len(ret): ", len(ret), "rData:", string(rData))
-		return nil, fmt.Errorf("request err, SendandrecvTimeout n:%d", n)
+		h.printLog("bhsgo.Request request err:", r.Errmsg)
+		return nil, errors.New("bhsgo.Request return false")
 	}
 }
 
-func (h *BHBus) Reply(replyKey int, i *Reply) error {
+func (h *BHBus) Reply(src unsafe.Pointer, i *Reply) error {
 	data,err := json.Marshal(*i)
 	if err != nil {
 		return err
 	}
-
-	n := h.sockRep.sock.SendtoTimeout(data, replyKey, h.conf.sendTimeOut)
-	h.printLog("reply to key:", replyKey, " n:",n)
-	if n != 0 {
-		return errors.New("reply sendToTimeOut n:"+strconv.Itoa(n))
+	rep := bhome_msg.MsgRequestTopicReply{
+		Data: data,
 	}
-	return nil
+	if bhsgo.SendReply(src, &rep) {
+		return nil
+	}
+
+	return errors.New("reply return false")
 }
 
+func (h *BHBus) RequestCenter(req *bhome_msg.MsgRequestTopic) (*Reply, error) {
 
-//鍙彂閫佽姹傦紝涓嶉渶瑕佸簲绛�.
-//鏆撮湶鍦ㄤ笂灞傜殑锛屽彧鏈塼opic锛屾病鏈塳ey銆�
-func (h *BHBus) SendOnly(key int, arg *MsgInfo) error {
-	data,err := json.Marshal(*arg)
-	if err != nil {
-		return err
-	}
-	//h.mtxWorker.Lock()
-	//defer h.mtxWorker.Unlock()
-	n := h.sockWorker.sock.SendtoTimeout(data, key, h.conf.sendTimeOut)
-	if n != 0 {
-		return fmt.Errorf("sendOnly ret n:%d", n)
-	}
-	return nil
-}
-
-func (h *BHBus) RequestCenter(req *MsgInfo) (*Reply, error) {
-	data, err := json.Marshal(*req)
-	if err != nil {
-		return nil, err
-	}
-	rNodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{
-		Key: KEY_QUERY,
-	})
-	//h.mtxWorker.Lock()
-	//defer h.mtxWorker.Unlock()
-	var ret []bhomebus.Mesg
-	n := h.sockWorker.sock.SendandrecvTimeout(rNodes, data,&ret, h.conf.sendTimeOut)
-	h.printLog("requestCenter n:", n, "len(ret):", len(ret))
-	if n > 0 && len(ret) > 0{
-		var cr Reply
-		if err = json.Unmarshal(ret[0].Data, &cr); err == nil {
-			return &cr, nil
-		} else {
-			h.printLog("unmarshal to CommonReply err:", err)
-		}
-	}
-	return nil, fmt.Errorf("request center err")
+	return nil, errors.New("")
 }
 
 
 //鍚戜富棰橀�氶亾涓彂甯冩秷鎭�
-func (h *BHBus) Pub(nodes []bhomebus.NetNode, msg *MsgInfo) error {
-	data,err := json.Marshal(*msg)
-	if err == nil {
-		if n := h.sockPub.sock.PubTimeout(nodes, msg.Topic, data, h.conf.pubTimeOut);n > 0 {
-			return nil
-		} else {
-			return fmt.Errorf("pub err n:%d", n)
-		}
+func (h *BHBus) Pub(nodes []bhome_msg.BHAddress, msg *bhome_msg.MsgPublish) error {
+	if bhsgo.Publish(msg, h.conf.pubTimeOut) {
+		return nil
+	} else {
+		return fmt.Errorf("pub err ")
 	}
-
-	return err
 }
 
-func (h *BHBus) PubTimeout(nodes []bhomebus.NetNode, msg *MsgInfo, timeout int) int {
-	data,err := json.Marshal(*msg)
-	if err == nil {
-		return h.sockPub.sock.PubTimeout(nodes, msg.Topic, data, timeout)
+func (h *BHBus) PubTimeout(nodes []bhome_msg.BHAddress, msg *bhome_msg.MsgPublish, timeout int) int {
+	if bhsgo.Publish(msg, timeout) {
+		return 1
 	}
 	return -1
 }
 
 //杩藉姞璁㈤槄鐨勪富棰樻秷鎭�
 func (h *BHBus) Sub(topics []string) {
-	if topics != nil {
-		for _,t := range topics {
-			h.sockSub.sock.Sub(t)
+	if topics != nil && len(topics) >0 {
+		var subList bhome_msg.MsgTopicList
+		for _, v := range topics {
+			subList.TopicList = append(subList.TopicList, []byte(v))
+		}
+
+		var subReply bhome_msg.MsgCommonReply
+		if bhsgo.Subscribe(&subList, &subReply, h.conf.sendTimeOut) {
+			h.printLog("sub topics")
 		}
 	}
 }
@@ -581,40 +307,6 @@
 //娉ㄩ攢璁㈤槄鐨勪富棰�
 func (h *BHBus) DeSub(topics []string) {
 	if topics != nil {
-		for _,t := range topics {
-			if n := h.sockSub.sock.Desub(t); n != 0 {
-				h.printLog("DeSub topic:", t, " n:", n)
-			}
-		}
-	}
-}
 
-
-//鑾峰彇sub 鎴栬�呴渶瑕乺eply鐨勬秷鎭�
-//func (h *BHBus) GetMsg() (subMsg *MsgInfo) {
-//	if h.sockHB == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil {
-//		return nil
-//	}
-//	if len(h.chSub) >0 {
-//		m  := <-h.chSub
-//		subMsg = m.info
-//	}
-//	return
-//}
-
-
-func (h *BHBus) GetMsg() (subMsg *MsgInfo, replyMsg *MsgInfo, replyKey int) {
-	if h.sockHB == nil && h.sockRep == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil {
-		return nil, nil, -1
 	}
-	if len(h.chSub) >0 {
-		m  := <-h.chSub
-		subMsg = m.info
-	}
-	if len(h.chReply) >0 {
-		m := <-h.chReply
-		replyMsg = m.info
-		replyKey = m.port
-	}
-	return
 }
\ No newline at end of file

--
Gitblit v1.8.0