package bhomeclient
|
|
import (
|
"basic.com/valib/bhshmq.git/proto/source/bhome_msg"
|
"context"
|
"encoding/json"
|
"errors"
|
"fmt"
|
"os"
|
"sync"
|
"time"
|
)
|
|
type MicroNode struct {
|
ctx context.Context
|
handle *BHBus
|
reg *RegisterInfo
|
procInfo *ProcInfo
|
handlers map[string]MicroFunc
|
serverId string
|
fnLog func(...interface{})
|
|
SubCh chan *bhome_msg.MsgPublish
|
|
mtx sync.Mutex
|
started bool
|
}
|
|
func NewMicroNode(ctx context.Context,q chan os.Signal, serverId string, reg *RegisterInfo, fnLog func(v ...interface{})) (*MicroNode, error){
|
conf := NewConfig(KEY_REGISTER,512,5,5000,5000,2000, fnLog)
|
handle, err := Register(ctx, q, conf, reg)
|
if err != nil {
|
return nil, err
|
}
|
mn := &MicroNode {
|
ctx: ctx,
|
serverId: serverId,
|
handle: handle,
|
reg: reg,
|
procInfo: ®.Proc,
|
fnLog: fnLog,
|
SubCh: make(chan *bhome_msg.MsgPublish, 512),
|
}
|
|
go startHeartbeat(ctx, handle)
|
|
return mn, nil
|
}
|
|
func (ms *MicroNode) printLog(v ...interface{}) {
|
if ms.fnLog != nil {
|
ms.fnLog(v...)
|
} else {
|
fmt.Println(v...)
|
}
|
}
|
|
func (ms *MicroNode) UpdateNodeTopics(ts []NodeInfo) {
|
ms.handle.UpdateNodeTopics(ts)
|
}
|
|
func (ms *MicroNode) DeRegister() error {
|
if ms.handle != nil {
|
return ms.handle.DeRegister(ms.reg)
|
}
|
return errors.New("ms.handle is nil")
|
}
|
|
func startHeartbeat(ctx context.Context, h *BHBus) {
|
t := time.NewTicker(1 * time.Second)
|
defer t.Stop()
|
|
for {
|
select {
|
case <-ctx.Done():
|
return
|
case <-t.C:
|
h.HeartBeat()
|
default:
|
time.Sleep(500 * time.Millisecond)
|
}
|
}
|
}
|
|
func (ms *MicroNode) StartClient() {
|
|
}
|
|
func (ms *MicroNode) StartServer(funcMap map[string]MicroFunc) {
|
ms.mtx.Lock()
|
if !ms.started {
|
ms.started = true
|
ms.mtx.Unlock()
|
|
ms.handlers = funcMap
|
|
for {
|
select {
|
case <- ms.ctx.Done():
|
return
|
case msgR := <-ms.handle.ChReply: //收到其它进程的请求消息
|
go ms.serve(ms.handle.ctx, &msgR)
|
case msgS := <-ms.handle.ChSub:
|
ms.printLog("Recv Sub Message:", string(msgS.Data))
|
ms.SubCh <- &msgS
|
default:
|
time.Sleep(50 * time.Millisecond)
|
}
|
}
|
}
|
ms.mtx.Unlock()
|
}
|
|
func (ms *MicroNode) Request(serverId string, request Request, milliSecs int) (*Reply,error) {
|
t := time.Now()
|
|
ms.printLog("1:", time.Since(t))
|
t = time.Now()
|
rb, _ := json.Marshal(request)
|
msgR := &bhome_msg.MsgRequestTopic{
|
Topic: []byte(request.Path),
|
Data: rb,
|
}
|
ms.printLog("2:", time.Since(t))
|
return ms.handle.Request(serverId, msgR, milliSecs)
|
}
|
|
func (ms *MicroNode) RequestTopic(serverId string, request Request, milliSecs int) (*Reply,error) {
|
rb, _ := json.Marshal(request)
|
msgR := &bhome_msg.MsgRequestTopic{
|
Topic: []byte(request.Path),
|
Data: rb,
|
}
|
|
return ms.handle.Request(serverId, msgR, milliSecs)
|
}
|
|
func (ms *MicroNode) RequestOnly(req *bhome_msg.MsgRequestTopic, dest []*bhome_msg.MsgQueryTopicReply_BHNodeAddress) ([]byte, error) {
|
return ms.handle.RequestOnly(req, dest)
|
}
|
|
//获取本机中某一个主题的 key (结果只有一个元素)
|
func (ms *MicroNode) GetLocalNetNodeByTopic(topicName string) []*bhome_msg.MsgQueryTopicReply_BHNodeAddress {
|
netNodes, err := ms.handle.GetNetNodeByTopic(ms.serverId, ms.procInfo, topicName)
|
if err != nil {
|
ms.printLog("topic:",topicName, " netNodes:", netNodes, "err:", err)
|
return nil
|
}
|
return netNodes
|
}
|
|
//获取集群中所有节点某个主题的key信息, (结果可能有多个)
|
//func (ms *MicroNode) GetAllNetNodesByTopic(topicName string) []bhome_msg.BHAddress {
|
// netNodes, err := ms.handle.GetNetNodeByTopic("", ms.procInfo, topicName)
|
// if err != nil {
|
// return nil
|
// }
|
// return netNodes
|
//}
|
|
func (ms *MicroNode) GetRegisteredClient() ([]*bhome_msg.MsgQueryProcReply_Info,error) {
|
return ms.handle.RequestCenter()
|
}
|
|
func (ms *MicroNode) serve(ctx context.Context, msgR *MsgReq) {
|
if ms.handlers == nil {
|
return
|
}
|
|
var reqBody Request
|
var ri *Reply
|
err := json.Unmarshal(msgR.Data, &reqBody)
|
if err != nil {
|
ms.printLog("serve unmarshal msgR.Body err:", err)
|
ri = &Reply {
|
Msg: err.Error(),
|
}
|
} else {
|
ms.printLog("reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap)
|
|
if f,ok := ms.handlers[reqBody.Path];ok {
|
reqBody.SrcProc = ProcInfo{
|
ID: msgR.ProcId,
|
}
|
h := WrapperHandler{
|
ms,
|
ms,
|
}
|
select {
|
case <-ctx.Done():
|
ms.printLog("get ctx.Done before f(&h, &reqBody) return, exit!!!")
|
default:
|
ri = f(&h, &reqBody)
|
ms.printLog("call funcMap f,reply.Success:", ri.Success)
|
}
|
} else {
|
ms.printLog("ms.funcMap not eixst path: ", reqBody.Path)
|
ri = &Reply{
|
Success: false,
|
Msg: "请求的接口不存在,请检查url",
|
Data: "请求的接口不存在,请检查url",
|
}
|
}
|
}
|
|
retErr := ms.handle.Reply(msgR.Src, ri)
|
if retErr != nil {
|
ms.printLog("retErr:", retErr)
|
}
|
}
|
|
//发布到本机
|
func (ms *MicroNode) Publish(topic string,msg []byte) error {
|
var nodes []bhome_msg.BHAddress
|
return ms.PublishNet(nodes, topic, msg)
|
}
|
|
func (ms *MicroNode) PublishNet(nodes []bhome_msg.BHAddress, topic string,data []byte) error {
|
pi := &bhome_msg.MsgPublish{
|
Topic: []byte(topic),
|
Data: data,
|
}
|
return ms.handle.Pub(nodes, pi)
|
}
|
|
func (ms *MicroNode) PublishNetTimeout(nodes []bhome_msg.BHAddress, topic string, data []byte, timeout int) int {
|
pi := &bhome_msg.MsgPublish{
|
Topic: []byte(topic),
|
Data: data,
|
}
|
return ms.handle.PubTimeout(nodes, pi, timeout)
|
}
|
|
//订阅主题
|
func (ms *MicroNode) Subscribe(topics []string) {
|
ms.handle.Sub(topics)
|
for _,t := range topics {
|
if ms.reg.SubTopic == nil {
|
ms.reg.SubTopic = make([]string, 0)
|
}
|
found := false
|
for _,it := range ms.reg.SubTopic {
|
if it == t {
|
found = true
|
break
|
}
|
}
|
if !found {
|
ms.reg.SubTopic = append(ms.reg.SubTopic, t)
|
}
|
}
|
}
|
|
//取消订阅的主题
|
func (ms *MicroNode) DeSub(topics []string) {
|
ms.printLog("DeSub topics:", topics)
|
ms.handle.DeSub(topics)
|
if ms.reg.SubTopic != nil {
|
var leftTopics []string
|
for _,t := range ms.reg.SubTopic {
|
found := false
|
for _,it := range topics {
|
if it == t {
|
found = true
|
break
|
}
|
}
|
if !found {
|
leftTopics = append(leftTopics, t)
|
}
|
}
|
ms.reg.SubTopic = leftTopics
|
}
|
}
|
|
//free handle
|
func (ms *MicroNode) Free() {
|
if ms.handle != nil {
|
ms.handle.Free()
|
}
|
}
|