package nsq
|
|
import (
|
"apsClient/pkg/logx"
|
"encoding/json"
|
"fmt"
|
"time"
|
)
|
|
type Caller interface {
|
Call(msg []byte, duration time.Duration) ([]byte, error)
|
}
|
|
type DefaultCaller struct {
|
NsqChannel string
|
RequestTopic string
|
ResponseTopic string
|
}
|
|
func NewCaller(requestTopic, responseTopic string) *DefaultCaller {
|
return &DefaultCaller{
|
NsqChannel: "",
|
RequestTopic: requestTopic,
|
ResponseTopic: responseTopic,
|
}
|
}
|
|
func (caller *DefaultCaller) Call(input interface{}, output interface{}, timeout time.Duration) error {
|
msg, err := json.Marshal(input)
|
if err != nil {
|
return err
|
}
|
producer := GetProducer()
|
err = producer.Publish(caller.RequestTopic, msg)
|
if err != nil {
|
return err
|
}
|
to := time.After(timeout)
|
for {
|
select {
|
case <-to:
|
logx.Errorf("message call failed due to timeout, request topic: %v, msg: %+v, responseTopic", caller.RequestTopic, input, caller.ResponseTopic)
|
return fmt.Errorf("wait response message timeout when request: %v", caller.ResponseTopic)
|
case data := <-ReceivedMessageChan:
|
if data.Topic == caller.ResponseTopic {
|
return json.Unmarshal(data.Message, &output)
|
} else {
|
//把消息放回去
|
ReceivedMessageChan <- data
|
}
|
}
|
}
|
}
|
|
func (caller *DefaultCaller) Send(input interface{}) error {
|
msg, err := json.Marshal(input)
|
if err != nil {
|
return err
|
}
|
producer := GetProducer()
|
err = producer.Publish(caller.RequestTopic, msg)
|
return err
|
|
}
|