package bhomeclient
|
|
import (
|
"basic.com/valib/c_bhomebus.git/api/bhsgo"
|
"basic.com/valib/c_bhomebus.git/proto/source/bhome_msg"
|
"context"
|
"encoding/json"
|
"errors"
|
"fmt"
|
"os"
|
"sync"
|
"time"
|
"unsafe"
|
)
|
|
type MsgReq struct {
|
ProcId string
|
bhome_msg.MsgRequestTopic
|
Src unsafe.Pointer
|
}
|
|
type BHBus struct {
|
ctx context.Context
|
|
ri *RegisterInfo
|
|
conf *Config
|
|
nodes []NodeInfo //集群中节点状态信息,以及每个节点上的topic信息。集群状态下需要借助serf进行同步
|
mtxNode sync.Mutex //访问节点主题表时,需要加锁
|
|
wg *sync.WaitGroup
|
|
ChSub chan bhome_msg.MsgPublish
|
ChReply chan MsgReq
|
}
|
|
//获取其他进程发给此socket的消息,可以是sub的接收,也可以是reply的接收。
|
func recvRequestRoutine(ctx context.Context, wg *sync.WaitGroup, ch chan<-MsgReq, logFn func(...interface{})) {
|
var procId string
|
var msg bhome_msg.MsgRequestTopic
|
var src unsafe.Pointer
|
for {
|
select {
|
case <-ctx.Done():
|
logFn("recvRoutine ctx.Done")
|
wg.Done()
|
return
|
default:
|
if bhsgo.ReadRequest(&procId, &msg, &src, 100) {
|
ch <- MsgReq{
|
procId,
|
msg,
|
src,
|
}
|
|
logFn("ReadRequest topic:", string(msg.Topic), " data:", string(msg.Data))
|
procId = ""
|
msg.Reset()
|
src = unsafe.Pointer(nil)
|
} else {
|
time.Sleep(100 * time.Millisecond)
|
}
|
}
|
}
|
}
|
|
//Register
|
func Register(ctx context.Context, q chan os.Signal, config *Config, ri *RegisterInfo) (*BHBus,error) {
|
handle := &BHBus {
|
ctx: ctx,
|
conf: config,
|
ri: ri,
|
wg: &sync.WaitGroup{},
|
ChSub: make(chan bhome_msg.MsgPublish, config.chSize),
|
ChReply: make(chan MsgReq, config.chSize),
|
}
|
|
//如果注册失败,就会一直尝试注册
|
procI := bhome_msg.ProcInfo{
|
ProcId: []byte(ri.Proc.ID),
|
Name: []byte(ri.Proc.Name),
|
}
|
var regReply bhome_msg.MsgCommonReply
|
loop:
|
for {
|
select {
|
case <-q:
|
handle.printLog("register <-q")
|
return nil,errors.New("ctx is done")
|
default:
|
|
if bhsgo.Register(&procI, ®Reply, handle.conf.sendTimeOut) {
|
break loop
|
} else {
|
time.Sleep(time.Second)
|
}
|
}
|
}
|
|
if ri.PubTopic != nil && len(ri.PubTopic) > 0 {
|
topics := bhome_msg.MsgTopicList{}
|
var regTopicReply bhome_msg.MsgCommonReply
|
for _,t := range ri.PubTopic {
|
topics.TopicList = append(topics.TopicList, []byte(t))
|
}
|
loopRT:
|
for {
|
select {
|
case <-q:
|
handle.printLog("RegisterTopics recv quit signal")
|
return nil, errors.New("RegisterTopics recv quit signal")
|
default:
|
if bhsgo.RegisterTopics(&topics, ®TopicReply, handle.conf.sendTimeOut) {
|
handle.printLog("bhsgo.RegisterTopics success!!")
|
break loopRT
|
} else {
|
time.Sleep(time.Second)
|
}
|
}
|
}
|
|
handle.wg.Add(1)
|
go recvRequestRoutine(ctx, handle.wg, handle.ChReply, handle.printLog)
|
}
|
|
handle.printLog("register done!" )
|
|
//有订阅消息才需要启动协程接收消息
|
if len(ri.SubTopic) > 0 {
|
handle.printLog("sub topics")
|
var subList bhome_msg.MsgTopicList
|
for _,v := range ri.SubTopic {
|
subList.TopicList = append(subList.TopicList, []byte(v))
|
}
|
|
var subReply bhome_msg.MsgCommonReply
|
if !bhsgo.Subscribe(&subList, &subReply, handle.conf.sendTimeOut) {
|
handle.printLog("bhsgo.Subscribe ret false")
|
}
|
}
|
|
if len(ri.SubNetTopic) > 0 {
|
handle.printLog("sub net topics")
|
var subNetList bhome_msg.MsgTopicList
|
for _,v := range ri.SubNetTopic {
|
subNetList.TopicList = append(subNetList.TopicList, []byte(v))
|
}
|
var subNetReply bhome_msg.MsgCommonReply
|
if !bhsgo.SubscribeNet(&subNetList, &subNetReply, handle.conf.sendTimeOut) {
|
handle.printLog("bhsgo.SubscribeNet ret false")
|
}
|
}
|
|
if len(ri.SubTopic) > 0 || len(ri.SubNetTopic) > 0 {
|
//启动订阅信息接收
|
handle.wg.Add(1)
|
go recvSubRoutine(ctx, handle.wg, handle.ChSub, handle.printLog)
|
}
|
|
return handle, nil
|
}
|
|
func recvSubRoutine(ctx context.Context,wg *sync.WaitGroup, ch chan <-bhome_msg.MsgPublish, logFn func(...interface{})) {
|
var procId string
|
var msg bhome_msg.MsgPublish
|
for {
|
select {
|
case <-ctx.Done():
|
logFn("recvSubRoutine ctx.Done")
|
wg.Done()
|
return
|
default:
|
if bhsgo.ReadSub(&procId, &msg, 100) {
|
ch <- msg
|
logFn("ReadSub topic:", string(msg.Topic), " len(data):", len(msg.Data))
|
|
procId = ""
|
msg.Reset()
|
} else {
|
//time.Sleep(100 * time.Millisecond)
|
}
|
}
|
}
|
}
|
|
//DeRegister
|
func (h *BHBus) DeRegister(dri *RegisterInfo) error {
|
|
h.printLog("DeRegister")
|
req := bhome_msg.ProcInfo{
|
ProcId: []byte(h.ri.Proc.ID),
|
Name: []byte(h.ri.Proc.Name),
|
}
|
reply := bhome_msg.MsgCommonReply{}
|
if !bhsgo.Unregister(&req, &reply, h.conf.sendTimeOut) {
|
h.printLog("Unregister false! ")
|
return errors.New("Unregister false! ")
|
}
|
return nil
|
}
|
|
func (h *BHBus) printLog(v ...interface{}) {
|
if h.conf.fnLog != nil {
|
h.conf.fnLog(v...)
|
}
|
}
|
|
//Release
|
func (h *BHBus) Free() {
|
h.printLog("call BHBus free")
|
h.wg.Wait()
|
bhsgo.Cleanup()
|
h.printLog("h.wg.Wait done")
|
}
|
|
|
//HeartBeat send
|
func (h *BHBus) HeartBeat() error {
|
procI := bhome_msg.ProcInfo{
|
ProcId: []byte(h.ri.Proc.ID),
|
Name: []byte(h.ri.Proc.Name),
|
}
|
var ret bhome_msg.MsgCommonReply
|
if bhsgo.Heartbeat(&procI, &ret, h.conf.sendTimeOut) {
|
return nil
|
} else {
|
return errors.New("send heartBeat return false")
|
}
|
}
|
|
|
|
//更新主题列表
|
func (h *BHBus) UpdateNodeTopics(arr []NodeInfo) {
|
h.mtxNode.Lock()
|
defer h.mtxNode.Unlock()
|
h.nodes = arr
|
}
|
|
//获取topic对应的key
|
//如果传了serverId不为空,则获取指定机器上的topic-key
|
//如果server为空,则获取所有节点上topic-key
|
func (h *BHBus) GetNetNodeByTopic(serverId string,srcProc *ProcInfo, topic string) ([]*bhome_msg.MsgQueryTopicReply_BHNodeAddress,error) {
|
dest := bhome_msg.BHAddress{}
|
reqTopic := bhome_msg.MsgQueryTopic{
|
Topic: []byte(topic),
|
}
|
rep := bhome_msg.MsgQueryTopicReply{}
|
if bhsgo.QueryTopicAddress(&dest, &reqTopic, &rep, h.conf.sendTimeOut) {
|
return rep.NodeAddress, nil
|
}
|
if rep.Errmsg != nil {
|
h.printLog("QueryTopicAddress errCode:", rep.Errmsg.ErrCode, "errMsg:", string(rep.Errmsg.ErrString))
|
return nil, errors.New(string(rep.Errmsg.ErrString))
|
}
|
return nil, errors.New("bhsgo.QueryTopicAddress ret false")
|
}
|
|
func (h *BHBus) Request(serverId string, req *bhome_msg.MsgRequestTopic, milliSecs int) (*Reply, error) {
|
//1.首先需要通过topic拿到本机对应的NetNode
|
//2.将请求返送到对应的server,并等待返回值
|
pid := ""
|
mrt := bhome_msg.MsgRequestTopicReply{}
|
dest := bhome_msg.BHAddress{}
|
if bhsgo.Request(&dest, req, &pid, &mrt, milliSecs) {
|
var reply Reply
|
if err := json.Unmarshal(mrt.Data, &reply); err != nil {
|
h.printLog("bhsgo.Request ret true, but unmarshal err:", err, " mrt.Data:", string(mrt.Data))
|
return nil,err
|
}
|
|
return &reply, nil
|
} else {
|
i, s := bhsgo.GetLastError()
|
h.printLog(" lastErr i:", i, " msg:", s, " topic:", string(req.Topic))
|
return nil, errors.New("request ")
|
}
|
}
|
|
func (h *BHBus) RequestOnly(req *bhome_msg.MsgRequestTopic, destArr []*bhome_msg.MsgQueryTopicReply_BHNodeAddress) ([]byte, error) {
|
dest := bhome_msg.BHAddress{}
|
if destArr != nil && len(destArr) > 0 {
|
if destArr[0].Addr != nil {
|
dest = *(destArr[0].Addr)
|
}
|
}
|
pid := ""
|
r := bhome_msg.MsgRequestTopicReply{}
|
if bhsgo.Request(&dest, req, &pid, &r, h.conf.sendTimeOut) {
|
return r.Data, nil
|
} else {
|
i, s := bhsgo.GetLastError()
|
h.printLog("bhsgo.Request request lastErr i:", i, " msg:", s, " topic:", string(req.Topic), " dest:", dest)
|
return nil, errors.New("bhsgo.Request return false")
|
}
|
}
|
|
func (h *BHBus) Reply(src unsafe.Pointer, i *Reply) error {
|
data,err := json.Marshal(*i)
|
if err != nil {
|
return err
|
}
|
rep := bhome_msg.MsgRequestTopicReply{
|
Data: data,
|
}
|
if bhsgo.SendReply(src, &rep) {
|
return nil
|
}
|
|
return errors.New("reply return false")
|
}
|
|
func (h *BHBus) RequestCenter() ([]*bhome_msg.MsgQueryProcReply_Info, error) {
|
dest := &bhome_msg.BHAddress{}
|
topic := &bhome_msg.MsgQueryProc{}
|
rep := &bhome_msg.MsgQueryProcReply{}
|
if bhsgo.QueryProcs(dest, topic, rep, h.conf.sendTimeOut) {
|
return rep.ProcList, nil
|
} else {
|
return nil, errors.New("QueryProcs ret flase")
|
}
|
}
|
|
|
//向主题通道中发布消息
|
func (h *BHBus) Pub(nodes []bhome_msg.BHAddress, msg *bhome_msg.MsgPublish) error {
|
if bhsgo.Publish(msg, h.conf.pubTimeOut) {
|
return nil
|
} else {
|
return fmt.Errorf("pub err ")
|
}
|
}
|
|
func (h *BHBus) PubTimeout(nodes []bhome_msg.BHAddress, msg *bhome_msg.MsgPublish, timeout int) int {
|
if bhsgo.Publish(msg, timeout) {
|
return 1
|
}
|
return -1
|
}
|
|
//追加订阅的主题消息
|
func (h *BHBus) Sub(topics []string) {
|
if topics != nil && len(topics) >0 {
|
var subList bhome_msg.MsgTopicList
|
for _, v := range topics {
|
subList.TopicList = append(subList.TopicList, []byte(v))
|
}
|
|
var subReply bhome_msg.MsgCommonReply
|
if bhsgo.Subscribe(&subList, &subReply, h.conf.sendTimeOut) {
|
h.printLog("sub topics")
|
}
|
}
|
}
|
|
//注销订阅的主题
|
func (h *BHBus) DeSub(topics []string) {
|
if topics != nil {
|
|
}
|
}
|