fix
zhangqian
2023-12-01 8324f872ef3a4d0c978a9b1d062800c6a1701c12
nsq/caller.go
@@ -1,8 +1,9 @@
package nsq
import (
   "apsClient/pkg/logx"
   "encoding/json"
   "errors"
   "fmt"
   "time"
)
@@ -38,11 +39,26 @@
   for {
      select {
      case <-to:
         return errors.New("timeout")
         logx.Errorf("message call failed due to timeout, request topic: %v, msg: %+v, responseTopic", caller.RequestTopic, input, caller.ResponseTopic)
         return fmt.Errorf("wait response message timeout when request: %v", caller.ResponseTopic)
      case data := <-ReceivedMessageChan:
         if data.Topic == caller.ResponseTopic {
            return json.Unmarshal(data.Message, &output)
         } else {
            //把消息放回去
            ReceivedMessageChan <- data
         }
      }
   }
}
func (caller *DefaultCaller) Send(input interface{}) error {
   msg, err := json.Marshal(input)
   if err != nil {
      return err
   }
   producer := GetProducer()
   err = producer.Publish(caller.RequestTopic, msg)
   return err
}