zhangzengfei
2023-10-19 bf2b61519fd0d79ddb19f0469749fbbe1d6c4ad8
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package nsq
 
import (
    "apsClient/pkg/logx"
    "encoding/json"
    "fmt"
    "time"
)
 
type Caller interface {
    Call(msg []byte, duration time.Duration) ([]byte, error)
}
 
type DefaultCaller struct {
    NsqChannel    string
    RequestTopic  string
    ResponseTopic string
}
 
func NewCaller(requestTopic, responseTopic string) *DefaultCaller {
    return &DefaultCaller{
        NsqChannel:    "",
        RequestTopic:  requestTopic,
        ResponseTopic: responseTopic,
    }
}
 
func (caller *DefaultCaller) Call(input interface{}, output interface{}, timeout time.Duration) error {
    msg, err := json.Marshal(input)
    if err != nil {
        return err
    }
    producer := GetProducer()
    err = producer.Publish(caller.RequestTopic, msg)
    if err != nil {
        return err
    }
    to := time.After(timeout)
    for {
        select {
        case <-to:
            logx.Errorf("message call failed due to timeout, request topic: %v, msg: %+v, responseTopic", caller.RequestTopic, input, caller.ResponseTopic)
            return fmt.Errorf("wait response message timeout when request: %v", caller.ResponseTopic)
        case data := <-ReceivedMessageChan:
            if data.Topic == caller.ResponseTopic {
                return json.Unmarshal(data.Message, &output)
            } else {
                //把消息放回去
                ReceivedMessageChan <- data
            }
        }
    }
}
 
func (caller *DefaultCaller) Send(input interface{}) error {
    msg, err := json.Marshal(input)
    if err != nil {
        return err
    }
    producer := GetProducer()
    err = producer.Publish(caller.RequestTopic, msg)
    return err
 
}