| | |
| | | package nsq |
| | | |
| | | import ( |
| | | "apsClient/pkg/logx" |
| | | "encoding/json" |
| | | "errors" |
| | | "fmt" |
| | | "time" |
| | | ) |
| | | |
| | |
| | | for { |
| | | select { |
| | | case <-to: |
| | | return errors.New("timeout") |
| | | logx.Errorf("message call failed due to timeout, request topic: %v, msg: %+v, responseTopic", caller.RequestTopic, input, caller.ResponseTopic) |
| | | return fmt.Errorf("wait response message timeout when request: %v", caller.ResponseTopic) |
| | | case data := <-ReceivedMessageChan: |
| | | if data.Topic == caller.ResponseTopic { |
| | | return json.Unmarshal(data.Message, &output) |
| | | } else { |
| | | //把消息放回去 |
| | | ReceivedMessageChan <- data |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | func (caller *DefaultCaller) Send(input interface{}) error { |
| | | msg, err := json.Marshal(input) |
| | | if err != nil { |
| | | return err |
| | | } |
| | | producer := GetProducer() |
| | | err = producer.Publish(caller.RequestTopic, msg) |
| | | return err |
| | | |
| | | } |