From 2d5c411a22a653eb7cbde621db4e89b07755a852 Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期二, 05 十二月 2023 15:46:46 +0800 Subject: [PATCH] remove sonic json dependency --- hbusc.go | 57 +++++++++++++++++++++++++++------------------------------ 1 files changed, 27 insertions(+), 30 deletions(-) diff --git a/hbusc.go b/hbusc.go index d14efc7..e2b1c65 100644 --- a/hbusc.go +++ b/hbusc.go @@ -14,29 +14,29 @@ ) type MsgReq struct { - ProcId string + ProcId string bhome_msg.MsgRequestTopic - Src unsafe.Pointer + Src unsafe.Pointer } type BHBus struct { - ctx context.Context + ctx context.Context - ri *RegisterInfo + ri *RegisterInfo - conf *Config + conf *Config - nodes []NodeInfo //闆嗙兢涓妭鐐圭姸鎬佷俊鎭紝浠ュ強姣忎釜鑺傜偣涓婄殑topic淇℃伅銆傞泦缇ょ姸鎬佷笅闇�瑕佸�熷姪serf杩涜鍚屾 - mtxNode sync.Mutex //璁块棶鑺傜偣涓婚琛ㄦ椂锛岄渶瑕佸姞閿� + nodes []NodeInfo //闆嗙兢涓妭鐐圭姸鎬佷俊鎭紝浠ュ強姣忎釜鑺傜偣涓婄殑topic淇℃伅銆傞泦缇ょ姸鎬佷笅闇�瑕佸�熷姪serf杩涜鍚屾 + mtxNode sync.Mutex //璁块棶鑺傜偣涓婚琛ㄦ椂锛岄渶瑕佸姞閿� - wg *sync.WaitGroup + wg *sync.WaitGroup ChSub chan bhome_msg.MsgPublish ChReply chan MsgReq } //鑾峰彇鍏朵粬杩涚▼鍙戠粰姝ocket鐨勬秷鎭�,鍙互鏄痵ub鐨勬帴鏀讹紝涔熷彲浠ユ槸reply鐨勬帴鏀躲�� -func recvRequestRoutine(ctx context.Context, wg *sync.WaitGroup, ch chan<-MsgReq, logFn func(...interface{})) { +func recvRequestRoutine(ctx context.Context, wg *sync.WaitGroup, ch chan<- MsgReq, logFn func(...interface{})) { var procId string var msg bhome_msg.MsgRequestTopic var src unsafe.Pointer @@ -66,12 +66,12 @@ } //Register -func Register(ctx context.Context, q chan os.Signal, config *Config, ri *RegisterInfo) (*BHBus,error) { - handle := &BHBus { +func Register(ctx context.Context, q chan os.Signal, config *Config, ri *RegisterInfo) (*BHBus, error) { + handle := &BHBus{ ctx: ctx, conf: config, ri: ri, - wg: &sync.WaitGroup{}, + wg: &sync.WaitGroup{}, ChSub: make(chan bhome_msg.MsgPublish, config.chSize), ChReply: make(chan MsgReq, config.chSize), } @@ -79,7 +79,7 @@ //濡傛灉娉ㄥ唽澶辫触锛屽氨浼氫竴鐩村皾璇曟敞鍐� procI := bhome_msg.ProcInfo{ ProcId: []byte(ri.Proc.ID), - Name: []byte(ri.Proc.Name), + Name: []byte(ri.Proc.Name), } var regReply bhome_msg.MsgCommonReply loop: @@ -87,7 +87,7 @@ select { case <-q: handle.printLog("register <-q") - return nil,errors.New("ctx is done") + return nil, errors.New("ctx is done") default: if bhsgo.Register(&procI, ®Reply, handle.conf.sendTimeOut) { @@ -101,7 +101,7 @@ if ri.PubTopic != nil && len(ri.PubTopic) > 0 { topics := bhome_msg.MsgTopicList{} var regTopicReply bhome_msg.MsgCommonReply - for _,t := range ri.PubTopic { + for _, t := range ri.PubTopic { topics.TopicList = append(topics.TopicList, []byte(t)) } loopRT: @@ -124,13 +124,13 @@ go recvRequestRoutine(ctx, handle.wg, handle.ChReply, handle.printLog) } - handle.printLog("register done!" ) + handle.printLog("register done!") //鏈夎闃呮秷鎭墠闇�瑕佸惎鍔ㄥ崗绋嬫帴鏀舵秷鎭� if len(ri.SubTopic) > 0 { handle.printLog("sub topics") var subList bhome_msg.MsgTopicList - for _,v := range ri.SubTopic { + for _, v := range ri.SubTopic { subList.TopicList = append(subList.TopicList, []byte(v)) } @@ -143,7 +143,7 @@ if len(ri.SubNetTopic) > 0 { handle.printLog("sub net topics") var subNetList bhome_msg.MsgTopicList - for _,v := range ri.SubNetTopic { + for _, v := range ri.SubNetTopic { subNetList.TopicList = append(subNetList.TopicList, []byte(v)) } var subNetReply bhome_msg.MsgCommonReply @@ -161,7 +161,7 @@ return handle, nil } -func recvSubRoutine(ctx context.Context,wg *sync.WaitGroup, ch chan <-bhome_msg.MsgPublish, logFn func(...interface{})) { +func recvSubRoutine(ctx context.Context, wg *sync.WaitGroup, ch chan<- bhome_msg.MsgPublish, logFn func(...interface{})) { var procId string var msg bhome_msg.MsgPublish for { @@ -190,7 +190,7 @@ h.printLog("DeRegister") req := bhome_msg.ProcInfo{ ProcId: []byte(h.ri.Proc.ID), - Name: []byte(h.ri.Proc.Name), + Name: []byte(h.ri.Proc.Name), } reply := bhome_msg.MsgCommonReply{} if !bhsgo.Unregister(&req, &reply, h.conf.sendTimeOut) { @@ -214,12 +214,11 @@ h.printLog("h.wg.Wait done") } - //HeartBeat send func (h *BHBus) HeartBeat() error { procI := bhome_msg.ProcInfo{ ProcId: []byte(h.ri.Proc.ID), - Name: []byte(h.ri.Proc.Name), + Name: []byte(h.ri.Proc.Name), } var ret bhome_msg.MsgCommonReply if bhsgo.Heartbeat(&procI, &ret, h.conf.sendTimeOut) { @@ -228,8 +227,6 @@ return errors.New("send heartBeat return false") } } - - //鏇存柊涓婚鍒楄〃 func (h *BHBus) UpdateNodeTopics(arr []NodeInfo) { @@ -241,7 +238,7 @@ //鑾峰彇topic瀵瑰簲鐨刱ey //濡傛灉浼犱簡serverId涓嶄负绌猴紝鍒欒幏鍙栨寚瀹氭満鍣ㄤ笂鐨則opic-key //濡傛灉server涓虹┖锛屽垯鑾峰彇鎵�鏈夎妭鐐逛笂topic-key -func (h *BHBus) GetNetNodeByTopic(serverId string,srcProc *ProcInfo, topic string) ([]*bhome_msg.MsgQueryTopicReply_BHNodeAddress,error) { +func (h *BHBus) GetNetNodeByTopic(serverId string, srcProc *ProcInfo, topic string) ([]*bhome_msg.MsgQueryTopicReply_BHNodeAddress, error) { dest := bhome_msg.BHAddress{} reqTopic := bhome_msg.MsgQueryTopic{ Topic: []byte(topic), @@ -267,7 +264,7 @@ var reply Reply if err := json.Unmarshal(mrt.Data, &reply); err != nil { h.printLog("bhsgo.Request ret true, but unmarshal err:", err, " mrt.Data:", string(mrt.Data)) - return nil,err + return nil, err } return &reply, nil @@ -297,13 +294,14 @@ } func (h *BHBus) Reply(src unsafe.Pointer, i *Reply) error { - data,err := json.Marshal(i) + data, err := json.Marshal(i) if err != nil { return err } rep := bhome_msg.MsgRequestTopicReply{ Data: data, } + if bhsgo.SendReply(src, &rep) { return nil } @@ -321,7 +319,6 @@ return nil, errors.New("QueryProcs ret flase") } } - //鍚戜富棰橀�氶亾涓彂甯冩秷鎭� func (h *BHBus) Pub(nodes []bhome_msg.BHAddress, msg *bhome_msg.MsgPublish) error { @@ -341,7 +338,7 @@ //杩藉姞璁㈤槄鐨勪富棰樻秷鎭� func (h *BHBus) Sub(topics []string) { - if topics != nil && len(topics) >0 { + if topics != nil && len(topics) > 0 { var subList bhome_msg.MsgTopicList for _, v := range topics { subList.TopicList = append(subList.TopicList, []byte(v)) @@ -359,4 +356,4 @@ if topics != nil { } -} \ No newline at end of file +} -- Gitblit v1.8.0