package bhomeclient
|
|
import (
|
"basic.com/valib/bhomebus.git"
|
"context"
|
"encoding/json"
|
"errors"
|
"fmt"
|
"os"
|
"time"
|
)
|
|
type MicroNode struct {
|
ctx context.Context
|
handle *BHBus
|
reg *RegisterInfo
|
procInfo *ProcInfo
|
handlers map[string]MicroFunc
|
serverId string
|
fnLog func(...interface{})
|
|
SubCh chan *MsgInfo
|
}
|
|
func NewMicroNode(ctx context.Context,q chan os.Signal, serverId string, reg *RegisterInfo, fnLog func(v ...interface{})) (*MicroNode, error){
|
conf := NewConfig(KEY_REGISTER,512,5,1000,1000,1000, fnLog)
|
handle, err := Register(ctx, q, conf, reg)
|
if err != nil {
|
return nil, err
|
}
|
mn := &MicroNode {
|
ctx: ctx,
|
serverId: serverId,
|
handle: handle,
|
reg: reg,
|
procInfo: ®.Proc,
|
fnLog: fnLog,
|
SubCh: make(chan *MsgInfo, 512),
|
}
|
|
return mn, nil
|
}
|
|
func (ms *MicroNode) printLog(v ...interface{}) {
|
if ms.fnLog != nil {
|
ms.fnLog(v...)
|
} else {
|
fmt.Println(v...)
|
}
|
}
|
|
func (ms *MicroNode) UpdateNodeTopics(ts []NodeInfo) {
|
ms.handle.UpdateNodeTopics(ts)
|
}
|
|
func (ms *MicroNode) DeRegister() error {
|
if ms.handle != nil {
|
return ms.handle.DeRegister(ms.reg)
|
}
|
return errors.New("ms.handle is nil")
|
}
|
|
func (ms *MicroNode) startHeartbeat() {
|
hbi := &HeartBeatInfo{
|
HealthLevel: "health",
|
Fps: 12,
|
WarnInfo: "warn",
|
ErrorInfo: "error",
|
Proc: *ms.procInfo,
|
}
|
|
t := time.NewTicker(1 * time.Second)
|
defer t.Stop()
|
|
for {
|
select {
|
case <-ms.ctx.Done():
|
return
|
case <-t.C:
|
ms.handle.HeartBeat(hbi)
|
}
|
}
|
}
|
|
func (ms *MicroNode) StartClient() {
|
go ms.startHeartbeat()
|
}
|
|
func (ms *MicroNode) StartServer(funcMap map[string]MicroFunc) {
|
ms.handlers = funcMap
|
|
go ms.startHeartbeat()
|
|
for {
|
select {
|
case <- ms.ctx.Done():
|
return
|
default:
|
msgS, msgR, keyR := ms.handle.GetMsg()
|
if msgS != nil {
|
//收到其它进程的发布消息
|
ms.printLog("Recv Sub Message:", string(msgS.Body))
|
ms.SubCh <- msgS
|
}
|
if msgR != nil {
|
//收到其它进程的请求消息
|
go ms.serve(msgR, keyR)
|
}
|
|
time.Sleep(50 * time.Millisecond)
|
}
|
}
|
}
|
|
func (ms *MicroNode) Request(serverId string, request Request, milliSecs int) (*Reply,error) {
|
t := time.Now()
|
|
ms.printLog("1:", time.Since(t))
|
t = time.Now()
|
rb, _ := json.Marshal(request)
|
msgR := &MsgInfo {
|
Topic: request.Path,
|
Body: rb,
|
}
|
ms.printLog("2:", time.Since(t))
|
return ms.handle.Request(serverId, msgR, milliSecs)
|
}
|
|
func (ms *MicroNode) RequestTopic(serverId string, request Request, milliSecs int) (*Reply,error) {
|
rb, _ := json.Marshal(request)
|
msgR := &MsgInfo{
|
Topic: request.Path,
|
Body: rb,
|
}
|
|
return ms.handle.Request(serverId, msgR, milliSecs)
|
}
|
|
func (ms *MicroNode) RequestOnly(rData []byte, nodes []bhomebus.NetNode) ([]byte, error) {
|
return ms.handle.RequestOnly(rData, nodes)
|
}
|
|
//获取本机中某一个主题的 key (结果只有一个元素)
|
func (ms *MicroNode) GetLocalNetNodeByTopic(topicName string) []bhomebus.NetNode {
|
netNodes, err := ms.handle.GetNetNodeByTopic(ms.serverId, ms.procInfo, topicName)
|
if err != nil {
|
ms.printLog("topic:",topicName, " netNodes:", netNodes, "err:", err)
|
return nil
|
}
|
return netNodes
|
}
|
|
//获取集群中所有节点某个主题的key信息, (结果可能有多个)
|
func (ms *MicroNode) GetAllNetNodesByTopic(topicName string) []bhomebus.NetNode {
|
netNodes, err := ms.handle.GetNetNodeByTopic("", ms.procInfo, topicName)
|
if err != nil {
|
return nil
|
}
|
return netNodes
|
}
|
|
func (ms *MicroNode) GetRegisteredClient() ([]RegisteredClient,error) {
|
r := MsgInfo{
|
SrcProc: *ms.procInfo,
|
MsgType: MesgType_ReqRep,
|
Topic: TOPIC_QUERYPROC,
|
}
|
cr, err := ms.handle.RequestCenter(&r)
|
if err != nil {
|
ms.printLog("requestCenter reply:", cr, "err:", err)
|
return nil, err
|
}
|
if cr.Success {
|
rd,err := json.Marshal(cr.Data)
|
if err == nil {
|
var list []RegisteredClient
|
err = json.Unmarshal(rd, &list)
|
if err == nil {
|
return list, nil
|
} else {
|
ms.printLog("unmarshal to RegisteredClient list err:", err)
|
}
|
} else {
|
return nil, fmt.Errorf("marshal reply.Data err:%s", err.Error())
|
}
|
} else {
|
ms.printLog("request center failed,status:", cr.Success, "msg:", cr.Msg, " data:", cr.Data)
|
}
|
return nil, fmt.Errorf("GetRegisteredClient list failed")
|
}
|
|
func (ms *MicroNode) serve(msgR *MsgInfo, p int) {
|
if ms.handlers == nil {
|
return
|
}
|
|
var reqBody Request
|
var ri *Reply
|
err := json.Unmarshal(msgR.Body, &reqBody)
|
if err != nil {
|
ms.printLog("serve unmarshal msgR.Body err:", err)
|
ri = &Reply {
|
Msg: err.Error(),
|
}
|
} else {
|
ms.printLog("srcProc:", msgR.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", p)
|
|
if f,ok := ms.handlers[reqBody.Path];ok {
|
reqBody.SrcProc = msgR.SrcProc
|
ri = f(&reqBody)
|
ms.printLog("call funcMap f,reply:", *ri)
|
} else {
|
ms.printLog("ms.funcMap not eixst path: ", reqBody.Path)
|
ri = &Reply{
|
Success: false,
|
Msg: "请求的接口不存在,请检查url",
|
Data: "请求的接口不存在,请检查url",
|
}
|
}
|
}
|
|
retErr := ms.handle.Reply(p, ri)
|
if retErr != nil {
|
ms.printLog("retErr:", retErr)
|
}
|
}
|
|
//发布到本机
|
func (ms *MicroNode) Publish(topic string,msg []byte) error {
|
nodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{
|
Key: 8,
|
})
|
return ms.PublishNet(nodes, topic, msg)
|
}
|
|
func (ms *MicroNode) PublishNet(nodes []bhomebus.NetNode, topic string,msg []byte) error {
|
pi := &MsgInfo{
|
Topic: topic,
|
Body: msg,
|
}
|
return ms.handle.Pub(nodes, pi)
|
}
|
|
//订阅主题
|
func (ms *MicroNode) Subscribe(topics []string) {
|
ms.handle.Sub(topics)
|
for _,t := range topics {
|
if ms.reg.SubTopic == nil {
|
ms.reg.SubTopic = make([]string, 0)
|
}
|
found := false
|
for _,it := range ms.reg.SubTopic {
|
if it == t {
|
found = true
|
break
|
}
|
}
|
if !found {
|
ms.reg.SubTopic = append(ms.reg.SubTopic, t)
|
}
|
}
|
}
|
|
//取消订阅的主题
|
func (ms *MicroNode) DeSub(topics []string) {
|
ms.printLog("DeSub topics:", topics)
|
ms.handle.DeSub(topics)
|
if ms.reg.SubTopic != nil {
|
var leftTopics []string
|
for _,t := range ms.reg.SubTopic {
|
found := false
|
for _,it := range topics {
|
if it == t {
|
found = true
|
break
|
}
|
}
|
if !found {
|
leftTopics = append(leftTopics, t)
|
}
|
}
|
ms.reg.SubTopic = leftTopics
|
}
|
}
|
|
//free handle
|
func (ms *MicroNode) Free() {
|
if ms.handle != nil {
|
ms.handle.Free()
|
}
|
}
|