如果没有pubTopics或者subTopic,则不启动server接收
| | |
| | | } |
| | | } |
| | | |
| | | handle.wg = &sync.WaitGroup{} |
| | | |
| | | sockReply := bhomebus.OpenSocket() |
| | | sockReply.ForceBind(int(regR.ReplyKey)) |
| | | handle.wg.Add(1) |
| | | //serve server reply |
| | | go recvRoutine(ctx, sockReply, handle.wg, handle.chReply, handle.printLog) |
| | | handle.sockRep = &sockServer{ |
| | | sock: sockReply, |
| | | info: &ri.Proc, |
| | | } |
| | | |
| | | //维持心跳的socket |
| | | sockHB := bhomebus.OpenSocket() |
| | | handle.sockHB = &sockClient{ |
| | | sock: sockHB, |
| | | peer: int(regR.HeartbeatKey), |
| | | } |
| | | |
| | | handle.wg = &sync.WaitGroup{} |
| | | |
| | | if ri.PubTopic != nil && len(ri.PubTopic) > 0 { |
| | | sockReply := bhomebus.OpenSocket() |
| | | sockReply.ForceBind(int(regR.ReplyKey)) |
| | | handle.wg.Add(1) |
| | | //serve server reply |
| | | go recvRoutine(ctx, sockReply, handle.wg, handle.chReply, handle.printLog) |
| | | handle.sockRep = &sockServer{ |
| | | sock: sockReply, |
| | | info: &ri.Proc, |
| | | } |
| | | } |
| | | |
| | | |
| | | //发布消息的socket, pub是将消息发布到bus中,所以不需要指定key |
| | | sockPub := bhomebus.OpenSocket() |
| | |
| | | peer: -1, |
| | | } |
| | | |
| | | //订阅消息的socket |
| | | sockSub := bhomebus.OpenSocket() |
| | | //订阅所有主题 |
| | | for _,v := range ri.SubTopic { |
| | | sockSub.Sub(v) |
| | | } |
| | | //有订阅消息才需要启动协程接收消息 |
| | | if ri.SubTopic != nil && len(ri.SubTopic) > 0 { |
| | | //订阅消息的socket |
| | | sockSub := bhomebus.OpenSocket() |
| | | //订阅所有主题 |
| | | for _,v := range ri.SubTopic { |
| | | sockSub.Sub(v) |
| | | } |
| | | |
| | | //启动订阅信息接收 |
| | | handle.wg.Add(1) |
| | | go recvRoutine(ctx, sockSub, handle.wg, handle.chSub, handle.printLog) |
| | | handle.sockSub = &sockClient{ |
| | | sock: sockSub, |
| | | peer: -1, |
| | | //启动订阅信息接收 |
| | | handle.wg.Add(1) |
| | | go recvRoutine(ctx, sockSub, handle.wg, handle.chSub, handle.printLog) |
| | | handle.sockSub = &sockClient{ |
| | | sock: sockSub, |
| | | peer: -1, |
| | | } |
| | | } |
| | | |
| | | sockWorker := bhomebus.OpenSocket() |
| | |
| | | SubCh chan *MsgInfo |
| | | } |
| | | |
| | | func NewMicroNode(ctx context.Context,q chan os.Signal, serverId string, reg *RegisterInfo, procInfo *ProcInfo,fnLog func(v ...interface{})) (*MicroNode, error){ |
| | | func NewMicroNode(ctx context.Context,q chan os.Signal, serverId string, reg *RegisterInfo, fnLog func(v ...interface{})) (*MicroNode, error){ |
| | | conf := NewConfig(KEY_REGISTER,512,5,100,100,100, fnLog) |
| | | handle, err := Register(ctx, q, conf, reg) |
| | | if err != nil { |
| | |
| | | serverId: serverId, |
| | | handle: handle, |
| | | reg: reg, |
| | | procInfo: procInfo, |
| | | procInfo: ®.Proc, |
| | | fnLog: fnLog, |
| | | SubCh: make(chan *MsgInfo, 512), |
| | | } |