package bhomeclient
|
|
import (
|
"basic.com/valib/bhomebus.git"
|
"context"
|
"encoding/json"
|
"errors"
|
"fmt"
|
"os"
|
"time"
|
)
|
|
type MicroNode struct {
|
ctx context.Context
|
handle *BHBus
|
reg *RegisterInfo
|
procInfo *ProcInfo
|
handlers map[string]MicroFunc
|
serverId string
|
fnLog func(...interface{})
|
|
SubChM map[string]chan *MsgInfo //以订阅的主题为key
|
}
|
|
func NewMicroNode(ctx context.Context,q chan os.Signal, serverId string, reg *RegisterInfo, procInfo *ProcInfo,fnLog func(v ...interface{})) (*MicroNode, error){
|
conf := NewConfig(KEY_REGISTER,512,5,10,10,100, fnLog)
|
handle, err := Register(ctx, q, conf, reg)
|
if err != nil {
|
return nil, err
|
}
|
mn := &MicroNode {
|
serverId: serverId,
|
handle: handle,
|
reg: reg,
|
procInfo: procInfo,
|
fnLog: fnLog,
|
SubChM: make(map[string]chan *MsgInfo),
|
}
|
for _,subTopic := range reg.SubTopic {
|
mn.SubChM[subTopic] = make(chan *MsgInfo, 512)
|
}
|
|
return mn, nil
|
}
|
|
func (ms *MicroNode) printLog(v ...interface{}) {
|
if ms.fnLog != nil {
|
ms.fnLog(v...)
|
} else {
|
fmt.Println(v...)
|
}
|
}
|
|
func (ms *MicroNode) UpdateNodeTopics(ts []NodeInfo) {
|
ms.handle.UpdateNodeTopics(ts)
|
}
|
|
func (ms *MicroNode) DeRegister() error {
|
if ms.handle != nil {
|
return ms.handle.DeRegister(ms.reg)
|
}
|
return errors.New("ms.handle is nil")
|
}
|
|
func (ms *MicroNode) startHeartbeat() {
|
hbi := &HeartBeatInfo{
|
HealthLevel: "health",
|
Fps: 12,
|
WarnInfo: "warn",
|
ErrorInfo: "error",
|
Proc: *ms.procInfo,
|
}
|
|
t := time.NewTicker(time.Second)
|
defer t.Stop()
|
|
for {
|
select {
|
case <-ms.ctx.Done():
|
return
|
case <-t.C:
|
ms.handle.HeartBeat(hbi)
|
}
|
}
|
}
|
|
func (ms *MicroNode) StartClient() {
|
go ms.startHeartbeat()
|
}
|
|
func (ms *MicroNode) StartServer(funcMap map[string]MicroFunc) {
|
ms.handlers = funcMap
|
|
go ms.startHeartbeat()
|
|
for {
|
select {
|
case <- ms.ctx.Done():
|
return
|
default:
|
msgS, msgR, keyR := ms.handle.GetMsg()
|
if msgS != nil {
|
//收到其它进程的发布消息
|
ms.printLog("Recv Sub Message:", string(msgS.Body))
|
if ch,ok := ms.SubChM[msgS.Topic];ok {
|
ch <- msgS
|
}
|
}
|
if msgR != nil {
|
//收到其它进程的请求消息
|
go ms.serve(msgR, keyR)
|
}
|
}
|
}
|
}
|
|
func (ms *MicroNode) Request(serverId string, request Request) (*Reply,error) {
|
t := time.Now()
|
topicName := request.Header("Servicename")
|
|
if topicName == "" {
|
return nil,errors.New("Servicename 不能为空")
|
}
|
ms.printLog("1:", time.Since(t))
|
t = time.Now()
|
rb, _ := json.Marshal(request)
|
msgR := &MsgInfo {
|
Topic: request.Path,
|
Body: rb,
|
}
|
ms.printLog("2:", time.Since(t))
|
t = time.Now()
|
mi,err := ms.handle.Request(serverId, msgR, 5000)
|
if mi == nil || err != nil {
|
return nil, err
|
}
|
ms.printLog("3:", time.Since(t))
|
t = time.Now()
|
ri := new(Reply)
|
err = json.Unmarshal(mi.Body, ri)
|
if err != nil {
|
ms.printLog("unmarshal mi.Body err:", err)
|
ri = &Reply{
|
Success: false,
|
Msg: "服务请求失败",
|
Data: "服务请求失败",
|
}
|
}
|
ms.printLog("4:", time.Since(t))
|
return ri, nil
|
}
|
|
func (ms *MicroNode) RequestTopic(serverId string, request Request) (*Reply,error) {
|
rb, _ := json.Marshal(request)
|
msgR := &MsgInfo{
|
Topic: request.Path,
|
Body: rb,
|
}
|
|
mi, err := ms.handle.Request(serverId, msgR, 5000)
|
if err != nil {
|
return nil, err
|
}
|
var ri *Reply
|
err = json.Unmarshal(mi.Body, ri)
|
if err != nil {
|
ri = &Reply{
|
Success: false,
|
Msg: "服务请求失败",
|
Data: "服务请求失败",
|
}
|
}
|
return ri, nil
|
}
|
|
//获取本机中某一个主题的 key (结果只有一个元素)
|
func (ms *MicroNode) GetLocalNetNodeByTopic(serviceName string) []bhomebus.NetNode {
|
netNodes, err := ms.handle.GetNetNodeByTopic(ms.serverId, serviceName)
|
if err != nil {
|
return nil
|
}
|
return netNodes
|
}
|
|
//获取集群中所有节点某个主题的key信息, (结果可能有多个)
|
func (ms *MicroNode) GetAllNetNodesByTopic(serviceName string) []bhomebus.NetNode {
|
netNodes, err := ms.handle.GetNetNodeByTopic("", serviceName)
|
if err != nil {
|
return nil
|
}
|
return netNodes
|
}
|
|
func (ms *MicroNode) serve(msgR *MsgInfo, p int) {
|
var reqBody Request
|
err := json.Unmarshal(msgR.Body, &reqBody)
|
if err != nil {
|
ms.printLog("serve unmarshal msgR.Body err:", err)
|
}
|
|
ms.printLog("reqBody:", reqBody)
|
var ri *Reply
|
if f,ok := ms.handlers[reqBody.Path];ok {
|
ri = f(&reqBody)
|
ms.printLog("call funcMap f,reply:", *ri)
|
} else {
|
ms.printLog("ms.funcMap not eixst path")
|
ri = &Reply{
|
Success: false,
|
Msg: "请求的接口不存在,请检查url",
|
Data: "请求的接口不存在,请检查url",
|
}
|
}
|
rd,err := json.Marshal(*ri)
|
if err != nil {
|
ms.printLog("marshal *ri err:", err)
|
}
|
rMsg := MsgInfo{
|
Body: rd,
|
}
|
ms.handle.Reply(p, rMsg)
|
}
|
|
//发布到本机
|
func (ms *MicroNode) Publish(topic string,msg []byte) error {
|
nodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{})
|
return ms.PublishNet(nodes, topic, msg)
|
}
|
|
func (ms *MicroNode) PublishNet(nodes []bhomebus.NetNode, topic string,msg []byte) error {
|
pi := &MsgInfo{
|
Topic: topic,
|
Body: msg,
|
}
|
return ms.handle.Pub(nodes, pi)
|
}
|
|
func (ms *MicroNode) Subscribe(topics []string) chan []byte {
|
ch := make(chan []byte)
|
return ch
|
}
|
|
//free handle
|
func (ms *MicroNode) Free() {
|
if ms.handle != nil {
|
ms.handle.Free()
|
}
|
}
|