From c7069befa28a0f2594f0746044318a30d6989c19 Mon Sep 17 00:00:00 2001
From: liuxiaolong <liuxiaolong@aiotlink.com>
Date: 星期日, 25 四月 2021 11:36:03 +0800
Subject: [PATCH] 使用bhsgo by lichao

---
 /dev/null    |   54 ---
 message.go   |   45 --
 micronode.go |  175 ++--------
 broker.go    |   10 
 hbusc.go     |  626 +++++++++-----------------------------
 5 files changed, 215 insertions(+), 695 deletions(-)

diff --git a/broker.go b/broker.go
index cc78e6e..d1ba2c1 100644
--- a/broker.go
+++ b/broker.go
@@ -1,17 +1,17 @@
 package bhomeclient
 
-import "basic.com/valib/bhomebus.git"
+import "basic.com/valib/bhshmq.git/proto/source/bhome_msg"
 
 type Broker interface {
 	//鍙戝竷鍒版湰鏈�
-	Publish(topic string, msg []byte) error
+	Publish(string, []byte) error
 
 	//鍙戝竷鍒拌繙绋嬫満鍣�
-	PublishNet(nodes []bhomebus.NetNode, topic string, msg []byte) error
+	PublishNet([]bhome_msg.BHAddress, string, []byte) error
 
 	//璁㈤槄涓�浜涗富棰�,鍙姩鎬佹柊澧�
-	Subscribe(topics []string)
+	Subscribe([]string)
 
 	//娉ㄩ攢璁㈤槄鐨勪富棰�
-	DeSub(topics []string)
+	DeSub([]string)
 }
diff --git a/hbusc.go b/hbusc.go
index 0735c04..6ccd790 100644
--- a/hbusc.go
+++ b/hbusc.go
@@ -1,63 +1,45 @@
 package bhomeclient
 
 import (
-	"basic.com/valib/bhomebus.git"
+	"basic.com/valib/bhshmq.git/api/bhsgo"
+	"basic.com/valib/bhshmq.git/proto/source/bhome_msg"
 	"context"
 	"encoding/json"
 	"errors"
 	"fmt"
 	"os"
-	"strconv"
 	"sync"
 	"time"
+	"unsafe"
 )
 
-type sockServer struct {
-	sock *bhomebus.Socket
-	info *ProcInfo
-}
-
-type sockClient struct {
-	sock *bhomebus.Socket
-	peer int
-}
-
-type TransInfo struct {
-	info *MsgInfo
-	port int
+type MsgReq struct {
+	ProcId 		string
+	bhome_msg.MsgRequestTopic
+	Src 		unsafe.Pointer
 }
 
 type BHBus struct {
-	ctx context.Context
+	ctx 		context.Context
 
-	conf *Config
+	ri 			*RegisterInfo
 
-	nodes []NodeInfo     //闆嗙兢涓妭鐐圭姸鎬佷俊鎭紝浠ュ強姣忎釜鑺傜偣涓婄殑topic淇℃伅銆傞泦缇ょ姸鎬佷笅闇�瑕佸�熷姪serf杩涜鍚屾
-	mtxNode sync.Mutex   //璁块棶鑺傜偣涓婚琛ㄦ椂锛岄渶瑕佸姞閿�
+	conf 		*Config
 
-	m map[string]*sockServer
+	nodes 		[]NodeInfo     //闆嗙兢涓妭鐐圭姸鎬佷俊鎭紝浠ュ強姣忎釜鑺傜偣涓婄殑topic淇℃伅銆傞泦缇ょ姸鎬佷笅闇�瑕佸�熷姪serf杩涜鍚屾
+	mtxNode 	sync.Mutex   //璁块棶鑺傜偣涓婚琛ㄦ椂锛岄渶瑕佸姞閿�
 
-	wg *sync.WaitGroup
+	wg 			*sync.WaitGroup
 
-	sockRep *sockServer  //鍝嶅簲鍏朵粬杩涚▼request鐨剆ocket锛宻erver
-
-	sockHB *sockClient  //缁存寔蹇冭烦鐨剆ocket锛岀嚎绋嬪疄鏃跺彂閫侊紝闇�瑕佸崟鐙鐞�
-
-	sockPub *sockClient  //鍙戝竷涓婚鐨剆ocket锛岄渶瑕佸崟鐙瑂ocket澶勭悊
-
-	sockSub *sockClient  //璁㈤槄涓婚鐨剆ocket锛岀嚎绋嬪疄鏃舵帴鏀舵秷鎭紝闇�瑕佸崟鐙鐞�
-
-	sockWorker 	*sockClient  //鍙戠粰浠绘剰鐨剆erver,鐭殏鐨剅equest client
-	//mtxWorker 	sync.Mutex	 //SendAndRecv鍙兘涓嶆槸绾跨▼瀹夊叏鐨�
-
-	chSub chan TransInfo
-	chReply chan TransInfo
+	ChSub   chan bhome_msg.MsgPublish
+	ChReply chan MsgReq
 }
 
 //鑾峰彇鍏朵粬杩涚▼鍙戠粰姝ocket鐨勬秷鎭�,鍙互鏄痵ub鐨勬帴鏀讹紝涔熷彲浠ユ槸reply鐨勬帴鏀躲��
-func recvRoutine(ctx context.Context, s *bhomebus.Socket, wg *sync.WaitGroup, ch chan<-TransInfo, logFn func(...interface{})) {
-	var data []byte
-	var key int
+func recvRequestRoutine(ctx context.Context, wg *sync.WaitGroup, ch chan<-MsgReq, logFn func(...interface{})) {
+	var procId string
+	var msg bhome_msg.MsgRequestTopic
+	var src unsafe.Pointer
 	for {
 		select {
 		case <-ctx.Done():
@@ -65,20 +47,16 @@
 			wg.Done()
 			return
 		default:
-			n := s.RecvfromTimeout(&data, &key, 1000) //鐩墠10001杩斿洖鍊艰〃绀鸿秴鏃�
-			if n == 0 {
-				var info MsgInfo
-				if err := json.Unmarshal(data, &info);err == nil {
-					ch <- TransInfo{
-						info: &info,
-						port: key,  //杩欎釜key鍦ㄥ彂甯冭闃呮ā寮忎腑鏄痓us鐨刱ey锛屾槸涓浐瀹氬�硷紝涓婂眰鐢ㄤ笉鍒�
-					}
-
-					data = []byte{}
-					key = 0
-				} else {
-					logFn("unmarshal to MsgInfo err:", err)
+			if bhsgo.ReadRequest(&procId, &msg, &src, 100) {
+				ch <- MsgReq{
+					procId,
+					msg,
+					src,
 				}
+
+				procId = ""
+				msg.Reset()
+				src = unsafe.Pointer(nil)
 			} else {
 				time.Sleep(100 * time.Millisecond)
 			}
@@ -86,59 +64,22 @@
 	}
 }
 
-//func recvandsendRoutine(h context.WrapperHandler, s *bhomebus.Socket, wg *sync.WaitGroup,serveFn func(rdata []byte, rkey int, sdata *[]byte) bool, logFn func(...interface{})) {
-//	for {
-//		select {
-//		case <-ctx.Done():
-//			logFn("recvandsendRoutine ctx.Done")
-//			wg.Done()
-//			return
-//		default:
-//			n := s.RecvandsendTimeout(1000, serveFn) //鐩墠10001杩斿洖鍊艰〃绀鸿秴鏃�
-//			if n != 0 {
-//				logFn("RecvandsendTimeout success")
-//			} else {
-//				//time.Sleep(100 * time.Millisecond)
-//			}
-//		}
-//	}
-//}
-
 //Register
 func Register(ctx context.Context, q chan os.Signal, config *Config, ri *RegisterInfo) (*BHBus,error) {
 	handle := &BHBus {
-		ctx: ctx,
-		conf: config,
-		m: make(map[string]*sockServer),
-		chSub: make(chan TransInfo, config.chSize),
-		chReply: make(chan TransInfo, config.chSize),
+		ctx:     ctx,
+		conf:    config,
+		ri:      ri,
+		ChSub:   make(chan bhome_msg.MsgPublish, config.chSize),
+		ChReply: make(chan MsgReq, config.chSize),
 	}
 
-	var err error
-	err = bhomebus.Init("libshm_queue.so")
-	if err != nil {
-		handle.printLog("Init so err:", err)
-		return nil, err
-	}
-	err = bhomebus.ShmInit(512)
-	if err != nil {
-		handle.printLog("shmInit size err:", err)
-		return nil, err
-	}
-	regSock := bhomebus.OpenSocket()
-	if regSock == nil {
-		handle.printLog("Open Socket ret Nil")
-		return nil, errors.New("OpenSocket ret Nil")
-	}
-	defer func() {
-		regSock.Close()
-		handle.printLog("regSock.CLose")
-	}()
-
-	var msg []byte
-	var regAddr []bhomebus.NetNode
-	var regR *RegisterReply  //娉ㄥ唽缁撴灉淇℃伅
 	//濡傛灉娉ㄥ唽澶辫触锛屽氨浼氫竴鐩村皾璇曟敞鍐�
+	procI := bhome_msg.ProcInfo{
+		ProcId: []byte(ri.Proc.ID),
+		Name: []byte(ri.Proc.Name),
+	}
+	var regReply bhome_msg.MsgCommonReply
 loop:
 	for {
 		select {
@@ -146,165 +87,91 @@
 			handle.printLog("register <-q")
 			return nil,errors.New("ctx is done")
 		default:
-			if msg == nil {
-				rid, err := json.Marshal(*ri)
-				if err != nil {
-					handle.printLog("marshal registerInfo err:", err)
-					return nil, errors.New("marshal registerInfo err:"+err.Error())
-				}
-				s := MsgInfo{
-					SrcProc: ri.Proc,
-					MsgType: MesgType_ReqRep,
-					Topic:   TOPIC_REGISTER,
-					Body:    rid,
-				}
-				handle.printLog("register MsgInfo:", s)
-				dRegData,err := json.Marshal(s)
-				if err != nil {
-					handle.printLog("marshal deregister msg err:", err)
-					return nil, err
-				}
-				handle.printLog(string(dRegData))
-				msg = dRegData
-			}
-			if regAddr == nil {
-				regAddr = append([]bhomebus.NetNode{}, bhomebus.NetNode{
-					Key: handle.conf.regKey,
-				})
-			}
 
-			var rMsg []bhomebus.Mesg
-			n := regSock.SendandrecvTimeout(regAddr, msg, &rMsg, handle.conf.sendTimeOut) //n浠h〃鎴愬姛鍙戦�佺殑鑺傜偣鐨勪釜鏁�
-			handle.printLog("regSock.SendandrecvTimeout n:", n)
-			if n == 1 && len(rMsg) == 1 {
-				var cr Reply
-				if err = json.Unmarshal(rMsg[0].Data, &cr);err != nil {
-					handle.printLog("unmarshal regReply err:", err)
-					return nil, errors.New("unmarshal regReply err:"+err.Error())
-				} else {
-					if cr.Success {
-						if rpd,err := json.Marshal(cr.Data);err ==nil {
-							var rr RegisterReply
-							if err = json.Unmarshal(rpd, &rr); err == nil {
-								regR = &rr
-								break loop
-							} else {
-								handle.printLog("unmarshal RegisterReply err:", err)
-							}
-						} else {
-							handle.printLog("marshal cr.Data err:", err)
-						}
-					} else {
-						handle.printLog("cr:", cr)
-					}
-				}
+			if bhsgo.Register(&procI, &regReply, handle.conf.sendTimeOut) {
+				break loop
 			} else {
-				time.Sleep(1 * time.Second)
+				time.Sleep(time.Second)
 			}
 		}
 	}
 
-	handle.printLog("register Reply:", *regR)
-
-	for _, v := range ri.Channel {
-		if k,ok := regR.ChannelKey[v];ok {
-			s := bhomebus.OpenSocket()
-			s.ForceBind(int(k))
-			handle.m[v] = &sockServer{
-				sock: s,
-				info: &ri.Proc,
+	if ri.PubTopic != nil && len(ri.PubTopic) > 0 {
+		topics := bhome_msg.MsgTopicList{}
+		var regTopicReply bhome_msg.MsgCommonReply
+		for _,t := range ri.PubTopic {
+			topics.TopicList = append(topics.TopicList, []byte(t))
+		}
+	loopRT:
+		for {
+			select {
+			case <-q:
+				handle.printLog("RegisterTopics recv quit signal")
+				return nil, errors.New("RegisterTopics recv quit signal")
+			default:
+				if bhsgo.RegisterTopics(&topics, &regTopicReply, handle.conf.sendTimeOut) {
+					handle.printLog("bhsgo.RegisterTopics success!!")
+					break loopRT
+				} else {
+					time.Sleep(time.Second)
+				}
 			}
 		}
+
+		handle.wg.Add(1)
+		go recvRequestRoutine(ctx, handle.wg, handle.ChReply, handle.printLog)
 	}
 
-	//缁存寔蹇冭烦鐨剆ocket
-	sockHB := bhomebus.OpenSocket()
-	handle.printLog("open sockHB")
-	handle.sockHB = &sockClient{
-		sock: sockHB,
-		peer: int(regR.HeartbeatKey),
-	}
+	handle.printLog("register done!" )
 
 	handle.wg = &sync.WaitGroup{}
 
-	if ri.PubTopic != nil && len(ri.PubTopic) > 0 {
-		sockReply := bhomebus.OpenSocket()
-		sockReply.ForceBind(int(regR.ReplyKey))
-		handle.printLog("after pubTopic forceBind")
-		handle.wg.Add(1)
-		//serve server reply
-		go recvRoutine(ctx, sockReply, handle.wg, handle.chReply, handle.printLog)
-		handle.sockRep = &sockServer{
-			sock: sockReply,
-			info: &ri.Proc,
-		}
-	}
-
-
-	//鍙戝竷娑堟伅鐨剆ocket, pub鏄皢娑堟伅鍙戝竷鍒癰us涓紝鎵�浠ヤ笉闇�瑕佹寚瀹歬ey
-	sockPub := bhomebus.OpenSocket()
-	handle.sockPub = &sockClient{
-		sock: sockPub,
-		peer: -1,
-	}
-
 	//鏈夎闃呮秷鎭墠闇�瑕佸惎鍔ㄥ崗绋嬫帴鏀舵秷鎭�
 	if ri.SubTopic != nil && len(ri.SubTopic) > 0 {
-		//璁㈤槄娑堟伅鐨剆ocket
-		sockSub := bhomebus.OpenSocket()
-		//璁㈤槄鎵�鏈変富棰�
-		handle.printLog("start Sub topics")
+		handle.printLog("sub topics")
+		var subList bhome_msg.MsgTopicList
 		for _,v := range ri.SubTopic {
-			subN := sockSub.Sub(v)
-			handle.printLog("subTopic:", v, " ret n:", subN)
+			subList.TopicList = append(subList.TopicList, []byte(v))
 		}
 
-		//鍚姩璁㈤槄淇℃伅鎺ユ敹
-		handle.wg.Add(1)
-		go recvRoutine(ctx, sockSub, handle.wg, handle.chSub, handle.printLog)
-		handle.sockSub = &sockClient{
-			sock: sockSub,
-			peer: -1,
+		var subReply bhome_msg.MsgCommonReply
+		if bhsgo.Subscribe(&subList, &subReply, handle.conf.sendTimeOut) {
+			//鍚姩璁㈤槄淇℃伅鎺ユ敹
+			handle.wg.Add(1)
+			go recvSubRoutine(ctx, handle.wg, handle.ChSub, handle.printLog)
 		}
-	}
-
-	sockWorker := bhomebus.OpenSocket()
-	handle.sockWorker = &sockClient{
-		sock: sockWorker,
-		peer: int(regR.QueryTopicKey),
 	}
 
 	return handle, nil
 }
 
+func recvSubRoutine(ctx context.Context,wg *sync.WaitGroup, ch chan <-bhome_msg.MsgPublish, logFn func(...interface{})) {
+	var procId string
+	var msg bhome_msg.MsgPublish
+	for {
+		select {
+		case <-ctx.Done():
+			logFn("recvRoutine ctx.Done")
+			wg.Done()
+			return
+		default:
+			if bhsgo.ReadSub(&procId, &msg, 100) {
+				ch <- msg
+
+				procId = ""
+				msg.Reset()
+			} else {
+				//time.Sleep(100 * time.Millisecond)
+			}
+		}
+	}
+}
+
 //DeRegister
 func (h *BHBus) DeRegister(dri *RegisterInfo) error {
-	data, err := json.Marshal(*dri)
-	if err != nil {
-		return err
-	}
 
-	dRegData,err := json.Marshal(MsgInfo{
-		MsgType: "",
-		Topic: TOPIC_DEREGISTER,
-		Body: data,
-	})
-	if err != nil {
-		return err
-	}
-
-	//h.mtxWorker.Lock()
-	//defer h.mtxWorker.Unlock()
-	netNodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{
-		Key: h.conf.regKey,
-	})
-	var retMsg []bhomebus.Mesg
-	n := h.sockWorker.sock.SendandrecvTimeout(netNodes, dRegData, &retMsg, h.conf.sendTimeOut)
-	if n == 0 {
-		return nil
-	}
-	h.printLog("DeRegister retMsg:", retMsg)
-	return fmt.Errorf("DeRegister n:%d", n)
+	h.printLog("DeRegister")
+	return nil
 }
 
 func (h *BHBus) printLog(v ...interface{}) {
@@ -318,75 +185,27 @@
 	h.printLog("call BHBus free")
 	h.wg.Wait()
 	h.printLog("h.wg.Wait done")
-	for _,v := range h.m {
-		v.sock.Close()
-	}
-	if h.sockRep != nil {
-		h.sockRep.sock.Close()
-		h.sockRep = nil
-	}
-	if h.sockHB != nil {
-		h.sockHB.sock.Close()
-		h.sockHB = nil
-	}
-	if h.sockPub != nil {
-		h.sockPub.sock.Close()
-		h.sockPub = nil
-	}
-	if h.sockSub != nil {
-		h.sockSub.sock.Close()
-		h.sockSub = nil
-	}
-	if h.sockWorker != nil {
-		h.sockWorker.sock.Close()
-		h.sockWorker = nil
-	}
-
-	h.printLog("BHBus Freed")
 }
 
 
 //HeartBeat send
-func (h *BHBus) HeartBeat(info *HeartBeatInfo) error {
-	data, err := json.Marshal(*info)
-	if err == nil {
-		hbd,err := json.Marshal(MsgInfo{
-			SrcProc: info.Proc,
-			MsgType: MesgType_ReqRep,
-			Topic:   TOPIC_HEARTBEAT,
-			Body:    data,
-		})
-		if err != nil {
-			h.printLog("marshal heartbeat msgInfo err:", err)
-			return err
-		}
-		var rMsg []bhomebus.Mesg
-		hbAddr := append([]bhomebus.NetNode{}, bhomebus.NetNode{
-			Key: h.sockHB.peer,
-		})
-		//h.printLog("start send heartbeat")
-		n := h.sockHB.sock.SendandrecvTimeout(hbAddr, hbd, &rMsg, h.conf.sendTimeOut) //n浠h〃鎴愬姛鍙戦�佺殑鑺傜偣鐨勪釜鏁�
-		//h.printLog("sockHB.Sendandrecv n:", n, "len(rMsg):", len(rMsg))
-
-		if n > 0 {
-			return nil
-		} else {
-			return fmt.Errorf("sockHB Sendandrecv ret n:%d", n)
-		}
+func (h *BHBus) HeartBeat() error {
+	procI := bhome_msg.ProcInfo{
+		ProcId: []byte(h.ri.Proc.ID),
+		Name: []byte(h.ri.Proc.Name),
 	}
-	return err
+	var ret bhome_msg.MsgCommonReply
+	if bhsgo.Heartbeat(&procI, &ret, h.conf.sendTimeOut) {
+		return nil
+	} else {
+		return errors.New("send heartBeat return false")
+	}
 }
 
-//func (h *BHBus) send2(s *sockClient, data []byte, timeout int) error {
-//	n := s.sock.SendtoTimeout(data, s.peer, timeout)
-//	if n == 0 {
-//		return nil
-//	}
-//	return errors.New("SendtoTimeout n:"+strconv.Itoa(n))
-//}
+
 
 //鏇存柊涓婚鍒楄〃
-func (h *BHBus)  UpdateNodeTopics(arr []NodeInfo) {
+func (h *BHBus) UpdateNodeTopics(arr []NodeInfo) {
 	h.mtxNode.Lock()
 	defer h.mtxNode.Unlock()
 	h.nodes = arr
@@ -395,185 +214,92 @@
 //鑾峰彇topic瀵瑰簲鐨刱ey
 //濡傛灉浼犱簡serverId涓嶄负绌猴紝鍒欒幏鍙栨寚瀹氭満鍣ㄤ笂鐨則opic-key
 //濡傛灉server涓虹┖锛屽垯鑾峰彇鎵�鏈夎妭鐐逛笂topic-key
-func (h *BHBus) GetNetNodeByTopic(serverId string,srcProc *ProcInfo, topic string) ([]bhomebus.NetNode,error) {
-	h.mtxNode.Lock()
-	defer h.mtxNode.Unlock()
-	var nodes []bhomebus.NetNode
-	reqNetNode := append([]bhomebus.NetNode{}, bhomebus.NetNode{
-		Key: h.sockWorker.peer,
-	})
-	reqD,err := json.Marshal(MsgInfo{
-		SrcProc: *srcProc,
-		MsgType: MesgType_ReqRep,
-		Topic:   TOPIC_QUERYKEY,
-		Body:    []byte(topic),
-	})
-	if err != nil {
-		return nil, fmt.Errorf("marshal req err:%s", err.Error())
-	}
-	var ret []bhomebus.Mesg
-	n := h.sockWorker.sock.SendandrecvTimeout(reqNetNode, reqD, &ret, h.conf.sendTimeOut)
-	if n > 0 {
-		var reply Reply
-		err = json.Unmarshal(ret[0].Data, &reply)
-		if err != nil {
-			h.printLog("unmarshal err:", err)
-			return nil, err
-		}
+func (h *BHBus) GetNetNodeByTopic(serverId string,srcProc *ProcInfo, topic string) ([]bhome_msg.BHAddress,error) {
 
-		if reply.Success {
-			rd,err := json.Marshal(reply.Data)
-			if err == nil {
-				err = json.Unmarshal(rd, &nodes)
-				if err == nil {
-					return nodes, nil
-				} else {
-					h.printLog("unmarshal err:", err, "nodes:", nodes, " center reply.Data:", reply.Data)
-					return nil, fmt.Errorf("unmarshal to nodes err:%s", err.Error())
-				}
-			} else {
-				return nil, fmt.Errorf("marshal reply.Data err:%s", err.Error())
-			}
-
-		} else {
-			h.printLog("reply success:", reply.Success, "msg:", reply.Msg, "data:", reply.Data)
-			return nil, fmt.Errorf("REPLY msg:%s", reply.Msg)
-		}
-	} else {
-		return nil,	fmt.Errorf("GetNetNodeByTopic ret n:%d", n)
-	}
+	return nil, nil
 }
 
-func (h *BHBus) Request(serverId string, req *MsgInfo, milliSecs int) (*Reply, error) {
+func (h *BHBus) Request(serverId string, req *bhome_msg.MsgRequestTopic, milliSecs int) (*Reply, error) {
 	//1.棣栧厛闇�瑕侀�氳繃topic鎷垮埌鏈満瀵瑰簲鐨凬etNode
-	rNodes, err := h.GetNetNodeByTopic(serverId, &req.SrcProc, req.Topic)
-	if err != nil {
-		h.printLog("topic: ", req.Topic, " rNodes: ", rNodes, " err:", err)
-		return nil, err
-	}
-	if rNodes == nil || len(rNodes) == 0 {
-		return nil, errors.New("rNodes empty, topic: "+ req.Topic)
-	}
 	//2.灏嗚姹傝繑閫佸埌瀵瑰簲鐨剆erver,骞剁瓑寰呰繑鍥炲��
-	data, err := json.Marshal(*req)
-	if err != nil {
-		h.printLog("marshal(*req) err:", err)
-		return nil, err
-	}
-	var ret []bhomebus.Mesg
-
-	n := h.sockWorker.sock.SendandrecvTimeout(rNodes, data,&ret, milliSecs)
-
-	if n > 0 && len(ret) > 0 {
-		var resp Reply
-		if err = json.Unmarshal(ret[0].Data, &resp); err == nil {
-			return &resp, nil
-		} else {
-			h.printLog("unmarshal ret[0].Data err:", err)
-			return nil, err
+	pid := ""
+	mrt := bhome_msg.MsgRequestTopicReply{}
+	dest := bhome_msg.BHAddress{}
+	if bhsgo.Request(&dest, req, &pid, &mrt, milliSecs) {
+		var reply Reply
+		if err := json.Unmarshal(mrt.Data, &reply); err != nil {
+			return nil,err
 		}
+
+		return &reply, nil
 	} else {
-		h.printLog("Request n: ", n, " len(ret): ", len(ret))
-		return nil, fmt.Errorf("request err, SendandrecvTimeout n:%d", n)
+		return nil, errors.New("request ")
 	}
 }
 
-func (h *BHBus) RequestOnly(rData []byte, rNodes []bhomebus.NetNode) ([]byte, error) {
-	var ret []bhomebus.Mesg
-
-	n := h.sockWorker.sock.SendandrecvTimeout(rNodes, rData, &ret, h.conf.sendTimeOut)
-
-	if n > 0 && len(ret) > 0 {
-		return ret[0].Data, nil
+func (h *BHBus) RequestOnly(req *bhome_msg.MsgRequestTopic, destArr []bhome_msg.BHAddress) ([]byte, error) {
+	dest := bhome_msg.BHAddress{}
+	if destArr != nil && len(destArr) > 0 {
+		dest = destArr[0]
+	}
+	pid := ""
+	r := bhome_msg.MsgRequestTopicReply{}
+	if bhsgo.Request(&dest, req, &pid, &r, h.conf.sendTimeOut) {
+		return r.Data, nil
 	} else {
-		h.printLog("Request n: ", n, " len(ret): ", len(ret), "rData:", string(rData))
-		return nil, fmt.Errorf("request err, SendandrecvTimeout n:%d", n)
+		h.printLog("bhsgo.Request request err:", r.Errmsg)
+		return nil, errors.New("bhsgo.Request return false")
 	}
 }
 
-func (h *BHBus) Reply(replyKey int, i *Reply) error {
+func (h *BHBus) Reply(src unsafe.Pointer, i *Reply) error {
 	data,err := json.Marshal(*i)
 	if err != nil {
 		return err
 	}
-
-	n := h.sockRep.sock.SendtoTimeout(data, replyKey, h.conf.sendTimeOut)
-	h.printLog("reply to key:", replyKey, " n:",n)
-	if n != 0 {
-		return errors.New("reply sendToTimeOut n:"+strconv.Itoa(n))
+	rep := bhome_msg.MsgRequestTopicReply{
+		Data: data,
 	}
-	return nil
+	if bhsgo.SendReply(src, &rep) {
+		return nil
+	}
+
+	return errors.New("reply return false")
 }
 
+func (h *BHBus) RequestCenter(req *bhome_msg.MsgRequestTopic) (*Reply, error) {
 
-//鍙彂閫佽姹傦紝涓嶉渶瑕佸簲绛�.
-//鏆撮湶鍦ㄤ笂灞傜殑锛屽彧鏈塼opic锛屾病鏈塳ey銆�
-func (h *BHBus) SendOnly(key int, arg *MsgInfo) error {
-	data,err := json.Marshal(*arg)
-	if err != nil {
-		return err
-	}
-	//h.mtxWorker.Lock()
-	//defer h.mtxWorker.Unlock()
-	n := h.sockWorker.sock.SendtoTimeout(data, key, h.conf.sendTimeOut)
-	if n != 0 {
-		return fmt.Errorf("sendOnly ret n:%d", n)
-	}
-	return nil
-}
-
-func (h *BHBus) RequestCenter(req *MsgInfo) (*Reply, error) {
-	data, err := json.Marshal(*req)
-	if err != nil {
-		return nil, err
-	}
-	rNodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{
-		Key: KEY_QUERY,
-	})
-	//h.mtxWorker.Lock()
-	//defer h.mtxWorker.Unlock()
-	var ret []bhomebus.Mesg
-	n := h.sockWorker.sock.SendandrecvTimeout(rNodes, data,&ret, h.conf.sendTimeOut)
-	h.printLog("requestCenter n:", n, "len(ret):", len(ret))
-	if n > 0 && len(ret) > 0{
-		var cr Reply
-		if err = json.Unmarshal(ret[0].Data, &cr); err == nil {
-			return &cr, nil
-		} else {
-			h.printLog("unmarshal to CommonReply err:", err)
-		}
-	}
-	return nil, fmt.Errorf("request center err")
+	return nil, errors.New("")
 }
 
 
 //鍚戜富棰橀�氶亾涓彂甯冩秷鎭�
-func (h *BHBus) Pub(nodes []bhomebus.NetNode, msg *MsgInfo) error {
-	data,err := json.Marshal(*msg)
-	if err == nil {
-		if n := h.sockPub.sock.PubTimeout(nodes, msg.Topic, data, h.conf.pubTimeOut);n > 0 {
-			return nil
-		} else {
-			return fmt.Errorf("pub err n:%d", n)
-		}
+func (h *BHBus) Pub(nodes []bhome_msg.BHAddress, msg *bhome_msg.MsgPublish) error {
+	if bhsgo.Publish(msg, h.conf.pubTimeOut) {
+		return nil
+	} else {
+		return fmt.Errorf("pub err ")
 	}
-
-	return err
 }
 
-func (h *BHBus) PubTimeout(nodes []bhomebus.NetNode, msg *MsgInfo, timeout int) int {
-	data,err := json.Marshal(*msg)
-	if err == nil {
-		return h.sockPub.sock.PubTimeout(nodes, msg.Topic, data, timeout)
+func (h *BHBus) PubTimeout(nodes []bhome_msg.BHAddress, msg *bhome_msg.MsgPublish, timeout int) int {
+	if bhsgo.Publish(msg, timeout) {
+		return 1
 	}
 	return -1
 }
 
 //杩藉姞璁㈤槄鐨勪富棰樻秷鎭�
 func (h *BHBus) Sub(topics []string) {
-	if topics != nil {
-		for _,t := range topics {
-			h.sockSub.sock.Sub(t)
+	if topics != nil && len(topics) >0 {
+		var subList bhome_msg.MsgTopicList
+		for _, v := range topics {
+			subList.TopicList = append(subList.TopicList, []byte(v))
+		}
+
+		var subReply bhome_msg.MsgCommonReply
+		if bhsgo.Subscribe(&subList, &subReply, h.conf.sendTimeOut) {
+			h.printLog("sub topics")
 		}
 	}
 }
@@ -581,40 +307,6 @@
 //娉ㄩ攢璁㈤槄鐨勪富棰�
 func (h *BHBus) DeSub(topics []string) {
 	if topics != nil {
-		for _,t := range topics {
-			if n := h.sockSub.sock.Desub(t); n != 0 {
-				h.printLog("DeSub topic:", t, " n:", n)
-			}
-		}
-	}
-}
 
-
-//鑾峰彇sub 鎴栬�呴渶瑕乺eply鐨勬秷鎭�
-//func (h *BHBus) GetMsg() (subMsg *MsgInfo) {
-//	if h.sockHB == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil {
-//		return nil
-//	}
-//	if len(h.chSub) >0 {
-//		m  := <-h.chSub
-//		subMsg = m.info
-//	}
-//	return
-//}
-
-
-func (h *BHBus) GetMsg() (subMsg *MsgInfo, replyMsg *MsgInfo, replyKey int) {
-	if h.sockHB == nil && h.sockRep == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil {
-		return nil, nil, -1
 	}
-	if len(h.chSub) >0 {
-		m  := <-h.chSub
-		subMsg = m.info
-	}
-	if len(h.chReply) >0 {
-		m := <-h.chReply
-		replyMsg = m.info
-		replyKey = m.port
-	}
-	return
 }
\ No newline at end of file
diff --git a/hbusc.proto b/hbusc.proto
deleted file mode 100644
index 0feb99a..0000000
--- a/hbusc.proto
+++ /dev/null
@@ -1,54 +0,0 @@
-syntax = "proto3";
-
-package hbusc;
-
-message ProcInfo {
-    string procID = 1;
-    string name = 2;
-    string label = 3;
-}
-
-message RegisterInfo {
-    ProcInfo procInfo = 1;
-    repeated string channel = 2;
-    repeated string pubTopic = 3;
-    repeated string subTopic = 4;
-}
-
-message RegisterInfoReply {
-    ProcInfo procInfo = 1; //杩涚▼淇℃伅
-    map<string, int32> channelKey = 2;  //棰勭暀
-    int32 heartbeatKey = 3;  //蹇冭烦淇℃伅鍙戦�佸埌鐨勭洰鐨刱ey
-    int32 updateTopicKey = 4; //鏇存柊鎰熷叴瓒d富棰樼殑key锛屽湪鍏跺畠杩涚▼涓缁戝畾
-    int32 replyKey = 5; //鐢宠鍒扮殑鏈繘绋嬩綔涓簊erver鐢╧ey
-    int32 queryTopicKey = 6;
-}
-
-message TopicInfo {
-    string topic = 1;
-    string topicType = 2;
-}
-
-message TopicInfoReply {
-    TopicInfo info = 1;
-    int32 key = 2;
-}
-
-message HeartBeatInfo {
-    string healthLevel = 1;     // 鍋ュ悍绛夌骇
-    int32 fps = 2;              // 澶勭悊甯х巼(dec瑙g爜甯х巼銆乻dk澶勭悊甯х巼)
-    string warnInfo = 3;        // 鎶ヨ淇℃伅
-    string errorInfo = 4;       // 閿欒淇℃伅
-    bytes otherInfo = 5;        // 鍏朵粬鐗规湁淇℃伅锛屽鏈夐渶瑕佸氨鐢ㄨ繖涓�
-    int32 otherInfoSize = 6;    // 鍏朵粬鐗规湁淇℃伅闀垮害
-    ProcInfo procInfo = 7;      //杩涚▼淇℃伅
-}
-
-message MsgInfo {
-    ProcInfo srcProc = 1;       // 婧愯繘绋嬪熀鏈俊鎭�
-    string msgType = 2;         // 鏁版嵁绫诲瀷锛屽彲涓鸿姹傘�佸彂甯冦�佽闃呫�佸簲绛旂瓑
-    string topic= 3;            // 鏈嶅姟涓婚
-    int32 shmKey = 4;           // 璇锋眰搴旂瓟鏁版嵁浣跨敤鐨刱ey锛屽叾浠栨暟鎹笉鐢紝寰呯‘璁�
-    bytes body = 5;             // 鏁版嵁鍐呭锛屼簩杩涘埗缂栫爜鍚庣殑锛岄渶瑕佺‘瀹氱紪鐮佺被鍨嬪苟瑙g爜
-    int32 bodyLen = 6;          // 鏁版嵁闀垮害
-}
\ No newline at end of file
diff --git a/message.go b/message.go
index 9ba85b9..9ce78da 100644
--- a/message.go
+++ b/message.go
@@ -35,8 +35,8 @@
 )
 
 type NodeList struct {
-	Ip string `json:"ip"`
-	Port int `json:"port"`
+	Ip 			string 		`json:"ip"`
+	Port 		int 		`json:"port"`
 }
 
 const (
@@ -60,43 +60,10 @@
 }
 
 type RegisterInfo struct {
-	Proc       ProcInfo `json:"proc"`       // 杩涚▼鐨勪俊鎭�
-	Channel    []string `json:"channel"`    // 鏂板棰戦亾锛屽搴斾竴涓柊鐨勫叡浜唴瀛橀槦鍒�
-	PubTopic   []string `json:"pubTopic"`   // 杩涚▼瀵瑰鍙戝竷鐨勬湇鍔′富棰�
-	SubTopic   []string `json:"subTopic"`   // 杩涚▼璁㈤槄鐨勬湇鍔′富棰�
-}
-
-
-type RegisterReply struct {
-	TCPProxyIP     	string         		`json:"tcpProxyIP"`     // BHomeCenter鍚姩鐨則cp浠g悊鏈嶅姟鍣↖P
-	TCPProxyPort   	int            		`json:"tcpProxyPort"`   // BHomeCenter鍚姩鐨則cp浠g悊鏈嶅姟鍣ㄧ鍙�
-	HeartbeatKey   	int            		`json:"heartbeatKey"`   // client鍙戦�佸績璺崇殑key
-	ReplyKey       	int            		`json:"replyKey"`       // client鐨勫簲绛旀湇鍔ey
-	ChannelKey     	map[string]int 		`json:"channelKey"`     // client鐨刢han瀵瑰簲鐨刱ey
-	QueryTopicKey  	int            		`json:"queryTopicKey"`  // client鏌ヨtopic瀵瑰簲鐨刱ey鏃剁敤鍒扮殑key
-	Status         	int            		`json:"status"`         // 璇锋眰鐘舵��,鐩墠鍙湁涓や釜,鎴愬姛杩斿洖200,澶辫触202
-}
-
-type HeartBeatInfo struct {
-	Proc          	ProcInfo 			`json:"proc"`          // 杩涚▼鐨勪俊鎭�
-	HealthLevel   	string   			`json:"healthLevel"`   // 鍋ュ悍绛夌骇
-	Fps           	int      			`json:"fps"`           // 澶勭悊甯х巼(dec瑙g爜甯х巼銆乻dk澶勭悊甯х巼)
-	WarnInfo      	string   			`json:"warnInfo"`      // 鎶ヨ淇℃伅
-	ErrorInfo     	string   			`json:"errorInfo"`     // 閿欒淇℃伅
-	OtherInfo     	[]byte   			`json:"otherInfo"`     // 鍏朵粬鐗规湁淇℃伅锛屽鏈夐渶瑕佸氨鐢ㄨ繖涓�
-	OtherInfoSize 	int      			`json:"otherInfoSize"` // 鍏朵粬鐗规湁淇℃伅闀垮害
-}
-
-type HeartBeatReply struct {
-	Status 			int    				`json:"status"` 		// 璇锋眰鐘舵��,鐩墠鍙湁涓や釜,鎴愬姛杩斿洖200,澶辫触202
-	Desc   			string 				`json:"desc"`   		// 璇锋眰鐘舵�佺殑鎻忚堪,鎴愬姛"success",澶辫触杩斿洖澶辫触鍘熷洜,濡傚績璺虫湇鍔℃湭鍚姩
-}
-
-type MsgInfo struct {
-	SrcProc  		ProcInfo   			`json:"srcProc"`    	// 婧愯繘绋嬪熀鏈俊鎭�
-	MsgType   		string 				`json:"msgType"`        // 鏁版嵁绫诲瀷锛屽彲涓鸿姹傘�佸彂甯冦�佽闃呫�佸簲绛旂瓑
-	Topic 			string				`json:"topic"`			// 璇锋眰鐨勫嚱鏁�,骞朵笉瀵瑰簲浠讳綍鐨剆hmKey,涓氬姟灞傜殑topic
-	Body 			[]byte				`json:"body"`			// 璇锋眰鍐呭
+	Proc       		ProcInfo `json:"proc"`       // 杩涚▼鐨勪俊鎭�
+	Channel    		[]string `json:"channel"`    // 鏂板棰戦亾锛屽搴斾竴涓柊鐨勫叡浜唴瀛橀槦鍒�
+	PubTopic   		[]string `json:"pubTopic"`   // 杩涚▼瀵瑰鍙戝竷鐨勬湇鍔′富棰�
+	SubTopic   		[]string `json:"subTopic"`   // 杩涚▼璁㈤槄鐨勬湇鍔′富棰�
 }
 
 
diff --git a/micronode.go b/micronode.go
index 89e4088..a826412 100644
--- a/micronode.go
+++ b/micronode.go
@@ -1,7 +1,7 @@
 package bhomeclient
 
 import (
-	"basic.com/valib/bhomebus.git"
+	"basic.com/valib/bhshmq.git/proto/source/bhome_msg"
 	"context"
 	"encoding/json"
 	"errors"
@@ -20,7 +20,7 @@
 	serverId 	string
 	fnLog 		func(...interface{})
 
-	SubCh 		chan *MsgInfo
+	SubCh 		chan *bhome_msg.MsgPublish
 
 	mtx 		sync.Mutex
 	started 	bool
@@ -39,7 +39,7 @@
 		reg:      reg,
 		procInfo: &reg.Proc,
 		fnLog:    fnLog,
-		SubCh:    make(chan *MsgInfo, 512),
+		SubCh:    make(chan *bhome_msg.MsgPublish, 512),
 	}
 
 	return mn, nil
@@ -65,14 +65,6 @@
 }
 
 func (ms *MicroNode) startHeartbeat() {
-	hbi := &HeartBeatInfo{
-		HealthLevel: "health",
-		Fps:         12,
-		WarnInfo:    "warn",
-		ErrorInfo:   "error",
-		Proc:    *ms.procInfo,
-	}
-
 	t := time.NewTicker(1 * time.Second)
 	defer t.Stop()
 
@@ -81,7 +73,9 @@
 		case <-ms.ctx.Done():
 			return
 		case <-t.C:
-			ms.handle.HeartBeat(hbi)
+			ms.handle.HeartBeat()
+		default:
+			time.Sleep(500 * time.Millisecond)
 		}
 	}
 }
@@ -110,52 +104,18 @@
 			select {
 			case <- ms.ctx.Done():
 				return
+			case msgR := <-ms.handle.ChReply: //鏀跺埌鍏跺畠杩涚▼鐨勮姹傛秷鎭�
+				go ms.serve(ms.handle.ctx, &msgR)
+			case msgS := <-ms.handle.ChSub:
+				ms.printLog("Recv Sub Message:", string(msgS.Data))
+				ms.SubCh <- &msgS
 			default:
-				msgS, msgR, keyR := ms.handle.GetMsg()
-				if msgS != nil {
-					//鏀跺埌鍏跺畠杩涚▼鐨勫彂甯冩秷鎭�
-					ms.printLog("Recv Sub Message:", string(msgS.Body))
-					ms.SubCh <- msgS
-				}
-				if msgR != nil {
-					//鏀跺埌鍏跺畠杩涚▼鐨勮姹傛秷鎭�
-					go ms.serve(msgR, keyR)
-				}
-
 				time.Sleep(50 * time.Millisecond)
 			}
 		}
-
-		//鎺ユ敹璁㈤槄鍒扮殑娑堟伅
-		//go ms.startRecvSubMsg()
-		//浣滀负server鍚姩
-		//ms.serve()
 	}
 	ms.mtx.Unlock()
 }
-
-//寮�濮嬫帴鏀惰闃呮秷鎭�
-//func (ms *MicroNode) startRecvSubMsg() {
-//	for {
-//		select {
-//		case <- ms.ctx.Done():
-//			return
-//		default:
-//			msgS, msgR, keyR := ms.handle.GetMsg()
-//			if msgS != nil {
-//				//鏀跺埌鍏跺畠杩涚▼鐨勫彂甯冩秷鎭�
-//				ms.printLog("Recv Sub Message:", string(msgS.Body))
-//				ms.SubCh <- msgS
-//			}
-//			if msgR != nil {
-//				//鏀跺埌鍏跺畠杩涚▼鐨勮姹傛秷鎭�
-//				go ms.serve(msgR, keyR)
-//			}
-//
-//			time.Sleep(50 * time.Millisecond)
-//		}
-//	}
-//}
 
 func (ms *MicroNode) Request(serverId string, request Request, milliSecs int) (*Reply,error) {
 	t := time.Now()
@@ -163,9 +123,9 @@
 	ms.printLog("1:", time.Since(t))
 	t = time.Now()
 	rb, _ := json.Marshal(request)
-	msgR := &MsgInfo {
-		Topic: request.Path,
-		Body: rb,
+	msgR := &bhome_msg.MsgRequestTopic{
+		Topic: []byte(request.Path),
+		Data: rb,
 	}
 	ms.printLog("2:", time.Since(t))
 	return ms.handle.Request(serverId, msgR, milliSecs)
@@ -173,20 +133,20 @@
 
 func (ms *MicroNode) RequestTopic(serverId string, request Request, milliSecs int) (*Reply,error) {
 	rb, _ := json.Marshal(request)
-	msgR := &MsgInfo{
-		Topic: request.Path,
-		Body: rb,
+	msgR := &bhome_msg.MsgRequestTopic{
+		Topic: []byte(request.Path),
+		Data: rb,
 	}
 
 	return ms.handle.Request(serverId, msgR, milliSecs)
 }
 
-func (ms *MicroNode) RequestOnly(rData []byte, nodes []bhomebus.NetNode) ([]byte, error) {
-	return ms.handle.RequestOnly(rData, nodes)
+func (ms *MicroNode) RequestOnly(req *bhome_msg.MsgRequestTopic, dest []bhome_msg.BHAddress) ([]byte, error) {
+	return ms.handle.RequestOnly(req, dest)
 }
 
 //鑾峰彇鏈満涓煇涓�涓富棰樼殑 key  锛堢粨鏋滃彧鏈変竴涓厓绱狅級
-func (ms *MicroNode) GetLocalNetNodeByTopic(topicName string) []bhomebus.NetNode {
+func (ms *MicroNode) GetLocalNetNodeByTopic(topicName string) []bhome_msg.BHAddress {
 	netNodes, err := ms.handle.GetNetNodeByTopic(ms.serverId, ms.procInfo, topicName)
 	if err != nil {
 		ms.printLog("topic:",topicName, " netNodes:", netNodes, "err:", err)
@@ -196,7 +156,7 @@
 }
 
 //鑾峰彇闆嗙兢涓墍鏈夎妭鐐规煇涓富棰樼殑key淇℃伅锛�   锛堢粨鏋滃彲鑳芥湁澶氫釜锛�
-func (ms *MicroNode) GetAllNetNodesByTopic(topicName string) []bhomebus.NetNode {
+func (ms *MicroNode) GetAllNetNodesByTopic(topicName string) []bhome_msg.BHAddress {
 	netNodes, err := ms.handle.GetNetNodeByTopic("", ms.procInfo, topicName)
 	if err != nil {
 		return nil
@@ -205,11 +165,7 @@
 }
 
 func (ms *MicroNode) GetRegisteredClient() ([]RegisteredClient,error) {
-	r := MsgInfo{
-		SrcProc: *ms.procInfo,
-		MsgType: MesgType_ReqRep,
-		Topic: TOPIC_QUERYPROC,
-	}
+	r := bhome_msg.MsgRequestTopic{}
 	cr, err := ms.handle.RequestCenter(&r)
 	if err != nil {
 		ms.printLog("requestCenter reply:", cr, "err:", err)
@@ -234,76 +190,37 @@
 	return nil, fmt.Errorf("GetRegisteredClient list failed")
 }
 
-//func (ms *MicroNode) call(rdata []byte, rkey int, sdata *[]byte) bool {
-//	ri := &Reply{}
-//	if ms.handlers == nil {
-//		ri.Msg = "send wrong addr, check yourself!!!"
-//	} else {
-//		var msgR MsgInfo
-//		err := json.Unmarshal(rdata, &msgR)
-//		if err != nil {
-//			ri.Msg = err.Error()
-//		} else {
-//			var reqBody Request
-//			err = json.Unmarshal(rdata, &msgR.Body)
-//			if err != nil {
-//				ri.Msg = err.Error()
-//			} else {
-//				ms.printLog("srcProc:", reqBody.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", rkey)
-//				if f,ok := ms.handlers[reqBody.Path];ok {
-//					reqBody.SrcProc = msgR.SrcProc
-//					ri = f(&reqBody)
-//					ms.printLog("call funcMap f,reply:", *ri)
-//				} else {
-//					ms.printLog("ms.funcMap not eixst path: ", reqBody.Path)
-//					ri.Msg = "璇锋眰鐨勬帴鍙d笉瀛樺湪锛岃妫�鏌rl"
-//				}
-//			}
-//		}
-//	}
-//	result, err := json.Marshal(*ri)
-//	if err != nil {
-//		sdata = nil
-//	} else {
-//		sdata = &result
-//	}
-//	return ri.Success
-//}
-
-//func (ms *MicroNode) serve() {
-//	if ms.handlers == nil {
-//		return
-//	}
-//	for i:=0;i<10;i++ {
-//		ms.handle.wg.Add(1)
-//		go recvandsendRoutine(ms.ctx, ms.handle.sockRep.sock, ms.handle.wg, ms.call, ms.fnLog)
-//	}
-//}
-
-func (ms *MicroNode) serve(msgR *MsgInfo, p int) {
+func (ms *MicroNode) serve(ctx context.Context, msgR *MsgReq) {
 	if ms.handlers == nil {
 		return
 	}
 
 	var reqBody Request
 	var ri *Reply
-	err := json.Unmarshal(msgR.Body, &reqBody)
+	err := json.Unmarshal(msgR.Data, &reqBody)
 	if err != nil {
 		ms.printLog("serve unmarshal msgR.Body err:", err)
 		ri = &Reply {
 			Msg: err.Error(),
 		}
 	} else {
-		ms.printLog("srcProc:", msgR.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", p)
+		ms.printLog("reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap)
 
 		if f,ok := ms.handlers[reqBody.Path];ok {
-			reqBody.SrcProc = msgR.SrcProc
+			reqBody.SrcProc = ProcInfo{
+				ID: msgR.ProcId,
+			}
 			h := WrapperHandler{
 				ms,
 				ms,
 			}
-			ri = f(&h, &reqBody)
-			ms.printLog("call funcMap f,reply.Success:", ri.Success)
+			select {
+			case <-ctx.Done():
+				ms.printLog("get ctx.Done before f(&h, &reqBody) return, exit!!!")
+			default:
+				ri = f(&h, &reqBody)
+				ms.printLog("call funcMap f,reply.Success:", ri.Success)
+			}
 		} else {
 			ms.printLog("ms.funcMap not eixst path: ", reqBody.Path)
 			ri = &Reply{
@@ -314,7 +231,7 @@
 		}
 	}
 
-	retErr := ms.handle.Reply(p, ri)
+	retErr := ms.handle.Reply(msgR.Src, ri)
 	if retErr != nil {
 		ms.printLog("retErr:", retErr)
 	}
@@ -322,24 +239,22 @@
 
 //鍙戝竷鍒版湰鏈�
 func (ms *MicroNode) Publish(topic string,msg []byte) error {
-	nodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{
-		Key: 8,
-	})
+	var nodes []bhome_msg.BHAddress
 	return ms.PublishNet(nodes, topic, msg)
 }
 
-func (ms *MicroNode) PublishNet(nodes []bhomebus.NetNode, topic string,msg []byte) error {
-	pi := &MsgInfo{
-		Topic: topic,
-		Body: msg,
+func (ms *MicroNode) PublishNet(nodes []bhome_msg.BHAddress, topic string,data []byte) error {
+	pi := &bhome_msg.MsgPublish{
+		Topic: []byte(topic),
+		Data: data,
 	}
 	return ms.handle.Pub(nodes, pi)
 }
 
-func (ms *MicroNode) PublishNetTimeout(nodes []bhomebus.NetNode, topic string, msg []byte, timeout int) int {
-	pi := &MsgInfo{
-		Topic: topic,
-		Body: msg,
+func (ms *MicroNode) PublishNetTimeout(nodes []bhome_msg.BHAddress, topic string, data []byte, timeout int) int {
+	pi := &bhome_msg.MsgPublish{
+		Topic: []byte(topic),
+		Data: data,
 	}
 	return ms.handle.PubTimeout(nodes, pi, timeout)
 }

--
Gitblit v1.8.0