lichao
2021-04-06 bb9a7e348892eb5c4fccb063380aa6fcd9612b71
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
/*
 * =====================================================================================
 *
 *       Filename:  topic_reply.cpp
 *
 *    Description:  
 *
 *        Version:  1.0
 *        Created:  2021年04月06日 14时40分52秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), 
 *   Organization:  
 *
 * =====================================================================================
 */
#include "topic_reply.h"
#include <chrono>
#include <list>
 
using namespace bhome_msg;
using namespace std::chrono;
using namespace std::chrono_literals;
 
namespace
{
struct SrcInfo {
    std::vector<BHAddress> route;
    std::string msg_id;
};
 
class FailedQ
{
    struct FailedMsg {
        steady_clock::time_point xpr;
        std::string remote_;
        BHMsg msg_;
        FailedMsg(const std::string &addr, BHMsg &&msg) :
            xpr(steady_clock::now() + 10s), remote_(addr), msg_(std::move(msg)) {}
        bool Expired() { return steady_clock::now() > xpr; }
    };
    typedef std::list<FailedMsg> Queue;
    Synced<Queue> queue_;
 
public:
    void Push(const std::string &remote, BHMsg &&msg)
    {
        queue_->emplace_back(remote, std::move(msg));
    }
    void TrySend(ShmSocket &socket, const int timeout_ms = 0)
    {
        queue_.Apply([&](Queue &q) {
            if (!q.empty()) {
                auto it = q.begin();
                do {
                    if (it->Expired() || socket.SyncSend(it->remote_.data(), it->msg_, timeout_ms)) {
                        it = q.erase(it);
                    } else {
                        ++it;
                    }
                } while (it != q.end());
            }
        });
    }
};
 
} // namespace
 
bool SocketReply::Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms)
{
    //TODO check reply?
    return SyncSend(&kBHTopicReqRepCenter, MakeRegister(mq().Id(), proc_info, topics), timeout_ms);
}
bool SocketReply::Heartbeat(const ProcInfo &proc_info, const int timeout_ms)
{
    return SyncSend(&kBHTopicReqRepCenter, MakeHeartbeat(mq().Id(), proc_info), timeout_ms);
}
bool SocketReply::StartWorker(const OnRequest &rcb, int nworker)
{
    auto failed_q = std::make_shared<FailedQ>();
 
    auto onIdle = [failed_q](ShmSocket &socket) { failed_q->TrySend(socket); };
 
    auto onRecv = [this, rcb, failed_q, onIdle](BHMsg &msg) {
        if (msg.type() == kMsgTypeRequestTopic && msg.route_size() > 0) {
            MsgRequestTopic req;
            if (req.ParseFromString(msg.body())) {
                std::string out;
                if (rcb(req.topic(), req.data(), out)) {
                    BHMsg msg_reply(MakeReply(msg.msg_id(), out.data(), out.size()));
                    for (int i = 0; i < msg.route_size() - 1; ++i) {
                        msg.add_route()->Swap(msg.mutable_route(i));
                    }
                    if (!SyncSend(msg.route().rbegin()->mq_id().data(), msg_reply, 10)) {
                        failed_q->Push(msg.route().rbegin()->mq_id(), std::move(msg_reply));
                    }
                }
            }
        } else {
            // ignored, or dropped
        }
 
        onIdle(*this);
    };
 
    return rcb && Start(onRecv, onIdle, nworker);
}
 
bool SocketReply::RecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms)
{
    BHMsg msg;
    if (SyncRecv(msg, timeout_ms) && msg.type() == kMsgTypeRequestTopic) {
        MsgRequestTopic request;
        if (request.ParseFromString(msg.body())) {
            request.mutable_topic()->swap(topic);
            request.mutable_data()->swap(data);
            SrcInfo *p = new SrcInfo;
            p->route.assign(msg.route().begin(), msg.route().end());
            p->msg_id = msg.msg_id();
            src_info = p;
            return true;
        }
    }
    return false;
}
 
bool SocketReply::SendReply(void *src_info, const std::string &data, const int timeout_ms)
{
    SrcInfo *p = static_cast<SrcInfo *>(src_info);
    DEFER1(delete p);
    if (!p || p->route.empty()) {
        return false;
    }
 
    BHMsg msg(MakeReply(p->msg_id, data.data(), data.size()));
    for (unsigned i = 0; i < p->route.size() - 1; ++i) {
        msg.add_route()->Swap(&p->route[i]);
    }
 
    return SyncSend(p->route.back().mq_id().data(), msg, timeout_ms);
}