| | |
| | | "errors" |
| | | "fmt" |
| | | "os" |
| | | "strconv" |
| | | "sync" |
| | | "time" |
| | | ) |
| | |
| | | //mtxWorker sync.Mutex //SendAndRecv可能不是线程安全的 |
| | | |
| | | chSub chan TransInfo |
| | | chReply chan TransInfo |
| | | } |
| | | |
| | | //获取其他进程发给此socket的消息,可以是sub的接收,也可以是reply的接收。 |
| | |
| | | } |
| | | } |
| | | |
| | | func recvandsendRoutine(ctx context.Context, s *bhomebus.Socket, wg *sync.WaitGroup,serveFn func(rdata []byte, rkey int, sdata *[]byte) bool, logFn func(...interface{})) { |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | logFn("recvandsendRoutine ctx.Done") |
| | | wg.Done() |
| | | return |
| | | default: |
| | | n := s.RecvandsendTimeout(1000, serveFn) //目前10001返回值表示超时 |
| | | if n != 0 { |
| | | logFn("RecvandsendTimeout success") |
| | | } else { |
| | | //time.Sleep(100 * time.Millisecond) |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | //Register |
| | | func Register(ctx context.Context, q chan os.Signal, config *Config, ri *RegisterInfo) (*BHBus,error) { |
| | | handle := &BHBus{ |
| | | handle := &BHBus { |
| | | ctx: ctx, |
| | | conf: config, |
| | | m: make(map[string]*sockServer), |
| | | chSub: make(chan TransInfo, config.chSize), |
| | | chReply: make(chan TransInfo, config.chSize), |
| | | } |
| | | |
| | | var err error |
| | |
| | | sockReply := bhomebus.OpenSocket() |
| | | sockReply.ForceBind(int(regR.ReplyKey)) |
| | | handle.printLog("after pubTopic forceBind") |
| | | handle.wg.Add(1) |
| | | //handle.wg.Add(1) |
| | | //serve server reply |
| | | go recvRoutine(ctx, sockReply, handle.wg, handle.chReply, handle.printLog) |
| | | //go recvRoutine(ctx, sockReply, handle.wg, handle.chReply, handle.printLog) |
| | | handle.sockRep = &sockServer{ |
| | | sock: sockReply, |
| | | info: &ri.Proc, |
| | |
| | | return nil, fmt.Errorf("request err") |
| | | } |
| | | |
| | | func (h *BHBus) Reply(replyKey int, i *Reply) error { |
| | | data,err := json.Marshal(*i) |
| | | if err != nil { |
| | | return err |
| | | } |
| | | |
| | | n := h.sockRep.sock.SendtoTimeout(data, replyKey, h.conf.sendTimeOut) |
| | | h.printLog("reply to key:", replyKey, " n:",n) |
| | | if n != 0 { |
| | | return errors.New("reply sendToTimeOut n:"+strconv.Itoa(n)) |
| | | } |
| | | return nil |
| | | } |
| | | //func (h *BHBus) Reply(replyKey int, i *Reply) error { |
| | | // data,err := json.Marshal(*i) |
| | | // if err != nil { |
| | | // return err |
| | | // } |
| | | // |
| | | // n := h.sockRep.sock.SendtoTimeout(data, replyKey, h.conf.sendTimeOut) |
| | | // h.printLog("reply to key:", replyKey, " n:",n) |
| | | // if n != 0 { |
| | | // return errors.New("reply sendToTimeOut n:"+strconv.Itoa(n)) |
| | | // } |
| | | // return nil |
| | | //} |
| | | |
| | | |
| | | //只发送请求,不需要应答. |
| | |
| | | |
| | | |
| | | //获取sub 或者需要reply的消息 |
| | | func (h *BHBus) GetMsg() (subMsg *MsgInfo, replyMsg *MsgInfo, replyKey int) { |
| | | if h.sockHB == nil && h.sockRep == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil { |
| | | return nil,nil, -1 |
| | | func (h *BHBus) GetMsg() (subMsg *MsgInfo) { |
| | | if h.sockHB == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil { |
| | | return nil |
| | | } |
| | | if len(h.chSub) >0 { |
| | | m := <-h.chSub |
| | | subMsg = m.info |
| | | } |
| | | if len(h.chReply) >0 { |
| | | m := <-h.chReply |
| | | replyMsg = m.info |
| | | replyKey = m.port |
| | | } |
| | | return |
| | | } |
| | |
| | | ms.handlers = funcMap |
| | | |
| | | go ms.startHeartbeat() |
| | | //接收订阅到的消息 |
| | | go ms.startRecvSubMsg() |
| | | //作为server启动 |
| | | ms.serve() |
| | | } |
| | | |
| | | //开始接收订阅消息 |
| | | func (ms *MicroNode) startRecvSubMsg() { |
| | | for { |
| | | select { |
| | | case <- ms.ctx.Done(): |
| | | return |
| | | default: |
| | | msgS, msgR, keyR := ms.handle.GetMsg() |
| | | msgS := ms.handle.GetMsg() |
| | | if msgS != nil { |
| | | //收到其它进程的发布消息 |
| | | ms.printLog("Recv Sub Message:", string(msgS.Body)) |
| | | ms.SubCh <- msgS |
| | | } |
| | | if msgR != nil { |
| | | //收到其它进程的请求消息 |
| | | go ms.serve(msgR, keyR) |
| | | } |
| | | |
| | | time.Sleep(50 * time.Millisecond) |
| | |
| | | return nil, fmt.Errorf("GetRegisteredClient list failed") |
| | | } |
| | | |
| | | func (ms *MicroNode) serve(msgR *MsgInfo, p int) { |
| | | func (ms *MicroNode) call(rdata []byte, rkey int, sdata *[]byte) bool { |
| | | ri := &Reply{} |
| | | if ms.handlers == nil { |
| | | return |
| | | } |
| | | |
| | | var reqBody Request |
| | | var ri *Reply |
| | | err := json.Unmarshal(msgR.Body, &reqBody) |
| | | if err != nil { |
| | | ms.printLog("serve unmarshal msgR.Body err:", err) |
| | | ri = &Reply { |
| | | Msg: err.Error(), |
| | | } |
| | | ri.Msg = "send wrong addr, check yourself!!!" |
| | | } else { |
| | | ms.printLog("srcProc:", msgR.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", p) |
| | | |
| | | if f,ok := ms.handlers[reqBody.Path];ok { |
| | | reqBody.SrcProc = msgR.SrcProc |
| | | ri = f(&reqBody) |
| | | ms.printLog("call funcMap f,reply:", *ri) |
| | | var msgR MsgInfo |
| | | err := json.Unmarshal(rdata, &msgR) |
| | | if err != nil { |
| | | ri.Msg = err.Error() |
| | | } else { |
| | | ms.printLog("ms.funcMap not eixst path: ", reqBody.Path) |
| | | ri = &Reply{ |
| | | Success: false, |
| | | Msg: "请求的接口不存在,请检查url", |
| | | Data: "请求的接口不存在,请检查url", |
| | | var reqBody Request |
| | | err = json.Unmarshal(rdata, &msgR.Body) |
| | | if err != nil { |
| | | ri.Msg = err.Error() |
| | | } else { |
| | | ms.printLog("srcProc:", reqBody.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", rkey) |
| | | if f,ok := ms.handlers[reqBody.Path];ok { |
| | | reqBody.SrcProc = msgR.SrcProc |
| | | ri = f(&reqBody) |
| | | ms.printLog("call funcMap f,reply:", *ri) |
| | | } else { |
| | | ms.printLog("ms.funcMap not eixst path: ", reqBody.Path) |
| | | ri.Msg = "请求的接口不存在,请检查url" |
| | | } |
| | | } |
| | | } |
| | | } |
| | | result, err := json.Marshal(*ri) |
| | | if err != nil { |
| | | sdata = nil |
| | | } else { |
| | | sdata = &result |
| | | } |
| | | return ri.Success |
| | | } |
| | | |
| | | retErr := ms.handle.Reply(p, ri) |
| | | if retErr != nil { |
| | | ms.printLog("retErr:", retErr) |
| | | func (ms *MicroNode) serve() { |
| | | if ms.handlers == nil { |
| | | return |
| | | } |
| | | for i:=0;i<10;i++ { |
| | | ms.handle.wg.Add(1) |
| | | go recvandsendRoutine(ms.ctx, ms.handle.sockRep.sock, ms.handle.wg, ms.call, ms.fnLog) |
| | | } |
| | | } |
| | | |