| | |
| | | "errors" |
| | | "fmt" |
| | | "os" |
| | | "sync" |
| | | "time" |
| | | ) |
| | | |
| | |
| | | fnLog func(...interface{}) |
| | | |
| | | SubCh chan *MsgInfo |
| | | |
| | | mtx sync.Mutex |
| | | started bool |
| | | } |
| | | |
| | | func NewMicroNode(ctx context.Context,q chan os.Signal, serverId string, reg *RegisterInfo, fnLog func(v ...interface{})) (*MicroNode, error){ |
| | | conf := NewConfig(KEY_REGISTER,512,5,1000,1000,1000, fnLog) |
| | | conf := NewConfig(KEY_REGISTER,512,5,5000,5000,2000, fnLog) |
| | | handle, err := Register(ctx, q, conf, reg) |
| | | if err != nil { |
| | | return nil, err |
| | |
| | | } |
| | | |
| | | func (ms *MicroNode) StartClient() { |
| | | go ms.startHeartbeat() |
| | | ms.mtx.Lock() |
| | | defer ms.mtx.Unlock() |
| | | if !ms.started { |
| | | ms.started = true |
| | | |
| | | go ms.startHeartbeat() |
| | | } |
| | | } |
| | | |
| | | func (ms *MicroNode) StartServer(funcMap map[string]MicroFunc) { |
| | | ms.handlers = funcMap |
| | | ms.mtx.Lock() |
| | | if !ms.started { |
| | | ms.started = true |
| | | ms.mtx.Unlock() |
| | | |
| | | go ms.startHeartbeat() |
| | | ms.handlers = funcMap |
| | | |
| | | for { |
| | | select { |
| | | case <- ms.ctx.Done(): |
| | | return |
| | | default: |
| | | msgS, msgR, keyR := ms.handle.GetMsg() |
| | | if msgS != nil { |
| | | //收到其它进程的发布消息 |
| | | ms.printLog("Recv Sub Message:", string(msgS.Body)) |
| | | ms.SubCh <- msgS |
| | | } |
| | | if msgR != nil { |
| | | //收到其它进程的请求消息 |
| | | go ms.serve(msgR, keyR) |
| | | go ms.startHeartbeat() |
| | | |
| | | for { |
| | | select { |
| | | case <- ms.ctx.Done(): |
| | | return |
| | | default: |
| | | msgS, msgR, keyR := ms.handle.GetMsg() |
| | | if msgS != nil { |
| | | //收到其它进程的发布消息 |
| | | ms.printLog("Recv Sub Message:", string(msgS.Body)) |
| | | ms.SubCh <- msgS |
| | | } |
| | | if msgR != nil { |
| | | //收到其它进程的请求消息 |
| | | go ms.serve(msgR, keyR) |
| | | } |
| | | |
| | | time.Sleep(50 * time.Millisecond) |
| | | } |
| | | } |
| | | |
| | | //接收订阅到的消息 |
| | | //go ms.startRecvSubMsg() |
| | | //作为server启动 |
| | | //ms.serve() |
| | | } |
| | | ms.mtx.Unlock() |
| | | } |
| | | |
| | | //开始接收订阅消息 |
| | | //func (ms *MicroNode) startRecvSubMsg() { |
| | | // for { |
| | | // select { |
| | | // case <- ms.ctx.Done(): |
| | | // return |
| | | // default: |
| | | // msgS, msgR, keyR := ms.handle.GetMsg() |
| | | // if msgS != nil { |
| | | // //收到其它进程的发布消息 |
| | | // ms.printLog("Recv Sub Message:", string(msgS.Body)) |
| | | // ms.SubCh <- msgS |
| | | // } |
| | | // if msgR != nil { |
| | | // //收到其它进程的请求消息 |
| | | // go ms.serve(msgR, keyR) |
| | | // } |
| | | // |
| | | // time.Sleep(50 * time.Millisecond) |
| | | // } |
| | | // } |
| | | //} |
| | | |
| | | func (ms *MicroNode) Request(serverId string, request Request, milliSecs int) (*Reply,error) { |
| | | t := time.Now() |
| | |
| | | return nil, fmt.Errorf("GetRegisteredClient list failed") |
| | | } |
| | | |
| | | //func (ms *MicroNode) call(rdata []byte, rkey int, sdata *[]byte) bool { |
| | | // ri := &Reply{} |
| | | // if ms.handlers == nil { |
| | | // ri.Msg = "send wrong addr, check yourself!!!" |
| | | // } else { |
| | | // var msgR MsgInfo |
| | | // err := json.Unmarshal(rdata, &msgR) |
| | | // if err != nil { |
| | | // ri.Msg = err.Error() |
| | | // } else { |
| | | // var reqBody Request |
| | | // err = json.Unmarshal(rdata, &msgR.Body) |
| | | // if err != nil { |
| | | // ri.Msg = err.Error() |
| | | // } else { |
| | | // ms.printLog("srcProc:", reqBody.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", rkey) |
| | | // if f,ok := ms.handlers[reqBody.Path];ok { |
| | | // reqBody.SrcProc = msgR.SrcProc |
| | | // ri = f(&reqBody) |
| | | // ms.printLog("call funcMap f,reply:", *ri) |
| | | // } else { |
| | | // ms.printLog("ms.funcMap not eixst path: ", reqBody.Path) |
| | | // ri.Msg = "请求的接口不存在,请检查url" |
| | | // } |
| | | // } |
| | | // } |
| | | // } |
| | | // result, err := json.Marshal(*ri) |
| | | // if err != nil { |
| | | // sdata = nil |
| | | // } else { |
| | | // sdata = &result |
| | | // } |
| | | // return ri.Success |
| | | //} |
| | | |
| | | //func (ms *MicroNode) serve() { |
| | | // if ms.handlers == nil { |
| | | // return |
| | | // } |
| | | // for i:=0;i<10;i++ { |
| | | // ms.handle.wg.Add(1) |
| | | // go recvandsendRoutine(ms.ctx, ms.handle.sockRep.sock, ms.handle.wg, ms.call, ms.fnLog) |
| | | // } |
| | | //} |
| | | |
| | | func (ms *MicroNode) serve(msgR *MsgInfo, p int) { |
| | | if ms.handlers == nil { |
| | | return |
| | |
| | | Msg: err.Error(), |
| | | } |
| | | } else { |
| | | ms.printLog("reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", p) |
| | | ms.printLog("srcProc:", msgR.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", p) |
| | | |
| | | if f,ok := ms.handlers[reqBody.Path];ok { |
| | | reqBody.SrcProc = msgR.SrcProc |
| | | ri = f(&reqBody) |
| | | ms.printLog("call funcMap f,reply:", *ri) |
| | | ctx := Context{ |
| | | ms, |
| | | ms, |
| | | } |
| | | ri = f(&ctx, &reqBody) |
| | | ms.printLog("call funcMap f,reply.Success:", ri.Success) |
| | | } else { |
| | | ms.printLog("ms.funcMap not eixst path: ", reqBody.Path) |
| | | ri = &Reply{ |
| | |
| | | return ms.handle.Pub(nodes, pi) |
| | | } |
| | | |
| | | func (ms *MicroNode) PublishNetTimeout(nodes []bhomebus.NetNode, topic string, msg []byte, timeout int) int { |
| | | pi := &MsgInfo{ |
| | | Topic: topic, |
| | | Body: msg, |
| | | } |
| | | return ms.handle.PubTimeout(nodes, pi, timeout) |
| | | } |
| | | |
| | | //订阅主题 |
| | | func (ms *MicroNode) Subscribe(topics []string) { |
| | | ms.handle.Sub(topics) |