package nsq import ( "apsClient/pkg/logx" "encoding/json" "fmt" "time" ) type Caller interface { Call(msg []byte, duration time.Duration) ([]byte, error) } type DefaultCaller struct { NsqChannel string RequestTopic string ResponseTopic string } func NewCaller(requestTopic, responseTopic string) *DefaultCaller { return &DefaultCaller{ NsqChannel: "", RequestTopic: requestTopic, ResponseTopic: responseTopic, } } func (caller *DefaultCaller) Call(input interface{}, output interface{}, timeout time.Duration) error { msg, err := json.Marshal(input) if err != nil { return err } producer := GetProducer() err = producer.Publish(caller.RequestTopic, msg) if err != nil { return err } to := time.After(timeout) for { select { case <-to: logx.Errorf("message call failed due to timeout, request topic: %v, msg: %+v, responseTopic", caller.RequestTopic, input, caller.ResponseTopic) return fmt.Errorf("wait response message timeout when request: %v", caller.ResponseTopic) case data := <-ReceivedMessageChan: if data.Topic == caller.ResponseTopic { return json.Unmarshal(data.Message, &output) } else { //把消息放回去 ReceivedMessageChan <- data } } } } func (caller *DefaultCaller) Send(input interface{}) error { msg, err := json.Marshal(input) if err != nil { return err } producer := GetProducer() err = producer.Publish(caller.RequestTopic, msg) return err }