From 2d5c411a22a653eb7cbde621db4e89b07755a852 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期二, 05 十二月 2023 15:46:46 +0800
Subject: [PATCH] remove sonic json dependency
---
hbusc.go | 57 +++++++++++++++++++++++++++------------------------------
1 files changed, 27 insertions(+), 30 deletions(-)
diff --git a/hbusc.go b/hbusc.go
index d14efc7..e2b1c65 100644
--- a/hbusc.go
+++ b/hbusc.go
@@ -14,29 +14,29 @@
)
type MsgReq struct {
- ProcId string
+ ProcId string
bhome_msg.MsgRequestTopic
- Src unsafe.Pointer
+ Src unsafe.Pointer
}
type BHBus struct {
- ctx context.Context
+ ctx context.Context
- ri *RegisterInfo
+ ri *RegisterInfo
- conf *Config
+ conf *Config
- nodes []NodeInfo //闆嗙兢涓妭鐐圭姸鎬佷俊鎭紝浠ュ強姣忎釜鑺傜偣涓婄殑topic淇℃伅銆傞泦缇ょ姸鎬佷笅闇�瑕佸�熷姪serf杩涜鍚屾
- mtxNode sync.Mutex //璁块棶鑺傜偣涓婚琛ㄦ椂锛岄渶瑕佸姞閿�
+ nodes []NodeInfo //闆嗙兢涓妭鐐圭姸鎬佷俊鎭紝浠ュ強姣忎釜鑺傜偣涓婄殑topic淇℃伅銆傞泦缇ょ姸鎬佷笅闇�瑕佸�熷姪serf杩涜鍚屾
+ mtxNode sync.Mutex //璁块棶鑺傜偣涓婚琛ㄦ椂锛岄渶瑕佸姞閿�
- wg *sync.WaitGroup
+ wg *sync.WaitGroup
ChSub chan bhome_msg.MsgPublish
ChReply chan MsgReq
}
//鑾峰彇鍏朵粬杩涚▼鍙戠粰姝ocket鐨勬秷鎭�,鍙互鏄痵ub鐨勬帴鏀讹紝涔熷彲浠ユ槸reply鐨勬帴鏀躲��
-func recvRequestRoutine(ctx context.Context, wg *sync.WaitGroup, ch chan<-MsgReq, logFn func(...interface{})) {
+func recvRequestRoutine(ctx context.Context, wg *sync.WaitGroup, ch chan<- MsgReq, logFn func(...interface{})) {
var procId string
var msg bhome_msg.MsgRequestTopic
var src unsafe.Pointer
@@ -66,12 +66,12 @@
}
//Register
-func Register(ctx context.Context, q chan os.Signal, config *Config, ri *RegisterInfo) (*BHBus,error) {
- handle := &BHBus {
+func Register(ctx context.Context, q chan os.Signal, config *Config, ri *RegisterInfo) (*BHBus, error) {
+ handle := &BHBus{
ctx: ctx,
conf: config,
ri: ri,
- wg: &sync.WaitGroup{},
+ wg: &sync.WaitGroup{},
ChSub: make(chan bhome_msg.MsgPublish, config.chSize),
ChReply: make(chan MsgReq, config.chSize),
}
@@ -79,7 +79,7 @@
//濡傛灉娉ㄥ唽澶辫触锛屽氨浼氫竴鐩村皾璇曟敞鍐�
procI := bhome_msg.ProcInfo{
ProcId: []byte(ri.Proc.ID),
- Name: []byte(ri.Proc.Name),
+ Name: []byte(ri.Proc.Name),
}
var regReply bhome_msg.MsgCommonReply
loop:
@@ -87,7 +87,7 @@
select {
case <-q:
handle.printLog("register <-q")
- return nil,errors.New("ctx is done")
+ return nil, errors.New("ctx is done")
default:
if bhsgo.Register(&procI, ®Reply, handle.conf.sendTimeOut) {
@@ -101,7 +101,7 @@
if ri.PubTopic != nil && len(ri.PubTopic) > 0 {
topics := bhome_msg.MsgTopicList{}
var regTopicReply bhome_msg.MsgCommonReply
- for _,t := range ri.PubTopic {
+ for _, t := range ri.PubTopic {
topics.TopicList = append(topics.TopicList, []byte(t))
}
loopRT:
@@ -124,13 +124,13 @@
go recvRequestRoutine(ctx, handle.wg, handle.ChReply, handle.printLog)
}
- handle.printLog("register done!" )
+ handle.printLog("register done!")
//鏈夎闃呮秷鎭墠闇�瑕佸惎鍔ㄥ崗绋嬫帴鏀舵秷鎭�
if len(ri.SubTopic) > 0 {
handle.printLog("sub topics")
var subList bhome_msg.MsgTopicList
- for _,v := range ri.SubTopic {
+ for _, v := range ri.SubTopic {
subList.TopicList = append(subList.TopicList, []byte(v))
}
@@ -143,7 +143,7 @@
if len(ri.SubNetTopic) > 0 {
handle.printLog("sub net topics")
var subNetList bhome_msg.MsgTopicList
- for _,v := range ri.SubNetTopic {
+ for _, v := range ri.SubNetTopic {
subNetList.TopicList = append(subNetList.TopicList, []byte(v))
}
var subNetReply bhome_msg.MsgCommonReply
@@ -161,7 +161,7 @@
return handle, nil
}
-func recvSubRoutine(ctx context.Context,wg *sync.WaitGroup, ch chan <-bhome_msg.MsgPublish, logFn func(...interface{})) {
+func recvSubRoutine(ctx context.Context, wg *sync.WaitGroup, ch chan<- bhome_msg.MsgPublish, logFn func(...interface{})) {
var procId string
var msg bhome_msg.MsgPublish
for {
@@ -190,7 +190,7 @@
h.printLog("DeRegister")
req := bhome_msg.ProcInfo{
ProcId: []byte(h.ri.Proc.ID),
- Name: []byte(h.ri.Proc.Name),
+ Name: []byte(h.ri.Proc.Name),
}
reply := bhome_msg.MsgCommonReply{}
if !bhsgo.Unregister(&req, &reply, h.conf.sendTimeOut) {
@@ -214,12 +214,11 @@
h.printLog("h.wg.Wait done")
}
-
//HeartBeat send
func (h *BHBus) HeartBeat() error {
procI := bhome_msg.ProcInfo{
ProcId: []byte(h.ri.Proc.ID),
- Name: []byte(h.ri.Proc.Name),
+ Name: []byte(h.ri.Proc.Name),
}
var ret bhome_msg.MsgCommonReply
if bhsgo.Heartbeat(&procI, &ret, h.conf.sendTimeOut) {
@@ -228,8 +227,6 @@
return errors.New("send heartBeat return false")
}
}
-
-
//鏇存柊涓婚鍒楄〃
func (h *BHBus) UpdateNodeTopics(arr []NodeInfo) {
@@ -241,7 +238,7 @@
//鑾峰彇topic瀵瑰簲鐨刱ey
//濡傛灉浼犱簡serverId涓嶄负绌猴紝鍒欒幏鍙栨寚瀹氭満鍣ㄤ笂鐨則opic-key
//濡傛灉server涓虹┖锛屽垯鑾峰彇鎵�鏈夎妭鐐逛笂topic-key
-func (h *BHBus) GetNetNodeByTopic(serverId string,srcProc *ProcInfo, topic string) ([]*bhome_msg.MsgQueryTopicReply_BHNodeAddress,error) {
+func (h *BHBus) GetNetNodeByTopic(serverId string, srcProc *ProcInfo, topic string) ([]*bhome_msg.MsgQueryTopicReply_BHNodeAddress, error) {
dest := bhome_msg.BHAddress{}
reqTopic := bhome_msg.MsgQueryTopic{
Topic: []byte(topic),
@@ -267,7 +264,7 @@
var reply Reply
if err := json.Unmarshal(mrt.Data, &reply); err != nil {
h.printLog("bhsgo.Request ret true, but unmarshal err:", err, " mrt.Data:", string(mrt.Data))
- return nil,err
+ return nil, err
}
return &reply, nil
@@ -297,13 +294,14 @@
}
func (h *BHBus) Reply(src unsafe.Pointer, i *Reply) error {
- data,err := json.Marshal(i)
+ data, err := json.Marshal(i)
if err != nil {
return err
}
rep := bhome_msg.MsgRequestTopicReply{
Data: data,
}
+
if bhsgo.SendReply(src, &rep) {
return nil
}
@@ -321,7 +319,6 @@
return nil, errors.New("QueryProcs ret flase")
}
}
-
//鍚戜富棰橀�氶亾涓彂甯冩秷鎭�
func (h *BHBus) Pub(nodes []bhome_msg.BHAddress, msg *bhome_msg.MsgPublish) error {
@@ -341,7 +338,7 @@
//杩藉姞璁㈤槄鐨勪富棰樻秷鎭�
func (h *BHBus) Sub(topics []string) {
- if topics != nil && len(topics) >0 {
+ if topics != nil && len(topics) > 0 {
var subList bhome_msg.MsgTopicList
for _, v := range topics {
subList.TopicList = append(subList.TopicList, []byte(v))
@@ -359,4 +356,4 @@
if topics != nil {
}
-}
\ No newline at end of file
+}
--
Gitblit v1.8.0