lichao
2021-04-16 94a455aba299af5ffe476560d859dfd007cd5467
fix crash by using normal timeout; add sendq todo.
10个文件已修改
132 ■■■■ 已修改文件
.gitignore 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/center.cpp 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/center.h 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/defs.cpp 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/sendq.cpp 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/sendq.h 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket.h 20 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.cpp 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/api_test.cpp 82 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/util.h 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
.gitignore
@@ -9,3 +9,4 @@
box/bhshmqbox
box/bhshmq_center
box/help
utest/bhshmq_center
box/center.cpp
@@ -417,7 +417,7 @@
bool AddCenter(const std::string &id, const NodeCenter::Cleaner &cleaner)
{
    auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner, 5s, 10s);
    auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner, 60s, 60s * 2);
    auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) {
        return [&](auto &&rep_body) {
            auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id()));
@@ -496,7 +496,7 @@
SharedMemory &BHomeShm()
{
    static SharedMemory shm("bhome_default_shm_v0", 1024 * 1024 * 64);
    static SharedMemory shm("bhome_default_shm_v0", 1024 * 1024 * 512);
    return shm;
}
@@ -530,9 +530,6 @@
        sockets_[info.name_] = std::make_shared<ShmSocket>(shm, *(MQId *) info.mqid_.data(), info.mq_len_);
    }
}
BHCenter::BHCenter() :
    BHCenter(BHomeShm()) {}
bool BHCenter::Start()
{
box/center.h
@@ -34,7 +34,6 @@
    static bool Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId &mqid, const int mq_len);
    BHCenter(Socket::Shm &shm);
    BHCenter();
    ~BHCenter() { Stop(); }
    bool Start();
    bool Stop();
src/defs.cpp
@@ -20,7 +20,7 @@
{
const MQId kBHTopicBus = boost::uuids::string_generator()("01234567-89ab-cdef-8349-1234567890ff");
const MQId kBHTopicReqRepCenter = boost::uuids::string_generator()("12345670-89ab-cdef-8349-1234567890ff");
const MQId kBHTopicCenter = boost::uuids::string_generator()("12345670-89ab-cdef-8349-1234567890ff");
const MQId kBHUniCenter = boost::uuids::string_generator()("87654321-89ab-cdef-8349-1234567890ff");
struct LastError {
@@ -37,7 +37,7 @@
} // namespace
const MQId &BHTopicBusAddress() { return kBHTopicBus; }
const MQId &BHTopicCenterAddress() { return kBHTopicReqRepCenter; }
const MQId &BHTopicCenterAddress() { return kBHTopicCenter; }
const MQId &BHUniCenterAddress() { return kBHUniCenter; }
void SetLastError(const int ec, const std::string &msg)
src/sendq.cpp
@@ -19,6 +19,11 @@
#include "shm_queue.h"
#include <chrono>
//TODO change to save head, body, instead of MsgI.
// as MsgI which is in shm, but head, body are in current process.
// Then if node crashes, shm will not be affected by msgs in sendq.
// but pulishing ref-counted msg need some work.
int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, Array &arr)
{
    auto FirstNotExpired = [](Array &l) {
src/sendq.h
@@ -55,7 +55,7 @@
    void Append(const Remote &addr, const MsgI &msg, OnMsgEvent onExpire = OnMsgEvent())
    {
        using namespace std::chrono_literals;
        Append(addr, msg, Now() + 3s, onExpire);
        Append(addr, msg, Now() + 60s, onExpire);
    }
    bool TrySend(bhome_shm::ShmMsgQueue &mq);
    // bool empty() const { return store_.empty(); }
src/socket.h
@@ -66,32 +66,24 @@
    size_t Pending() const { return mq().Pending(); }
    template <class Body>
    bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body)
    bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const RecvCB &cb = RecvCB())
    {
        MsgI msg;
        if (msg.Make(shm(), head, body)) {
            DEFER1(if (msg.IsCounted()) { msg.Release(shm()); });
            return SendImpl(valid_remote, msg);
        }
        return false;
    }
    template <class Body>
    bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const RecvCB &cb)
    {
        //TODO send_buffer_ need flag, and remove callback on expire.
        MsgI msg;
        if (msg.Make(shm(), head, body)) {
            DEFER1(if (msg.IsCounted()) { msg.Release(shm()); });
            std::string msg_id(head.msg_id());
            if (!cb) {
                return SendImpl(valid_remote, msg);
            } else {
            per_msg_cbs_->Add(msg_id, cb);
            auto onExpireRemoveCB = [this, msg_id](MsgI const &msg) {
                RecvCB cb_no_use;
                per_msg_cbs_->Find(msg_id, cb_no_use);
            };
            return SendImpl(valid_remote, msg, onExpireRemoveCB);
            }
        } else {
            printf("out of mem?, avail: %ld\n", shm().get_free_memory());
            SetLastError(ENOMEM, "Out of mem");
        }
        return false;
    }
src/topic_node.cpp
@@ -245,7 +245,10 @@
bool TopicNode::ClientAsyncRequest(const MsgRequestTopic &req, std::string &out_msg_id, const RequestResultCB &cb)
{
    if (!IsRegistered()) { return false; }
    if (!IsRegistered()) {
        SetLastError(eNotRegistered, "Not Registered.");
        return false;
    }
    const std::string &msg_id(NewMsgId());
@@ -298,6 +301,7 @@
        return sock.Send(&BHTopicCenterAddress(), head, query, onQueryResult);
    } catch (...) {
        SetLastError(eError, "internal error.");
        return false;
    }
}
utest/api_test.cpp
@@ -25,12 +25,23 @@
{
typedef std::atomic<uint64_t> Number;
void Assign(Number &a, const Number &b) { a.store(b.load()); }
struct MsgStatus {
    Number nrequest_;
    Number nfailed_;
    Number nreply_;
    Number nserved_;
    MsgStatus() :
        nrequest_(0), nreply_(0), nserved_(0) {}
    MsgStatus &operator=(const MsgStatus &a)
    {
        Assign(nrequest_, a.nrequest_);
        Assign(nserved_, a.nserved_);
        Assign(nreply_, a.nreply_);
        Assign(nfailed_, a.nfailed_);
        return *this;
    }
};
MsgStatus &Status()
@@ -83,6 +94,48 @@
        ++Status().nreply_;
    }
    // printf("client Recv reply : %s\n", reply.data().c_str());
}
BOOST_AUTO_TEST_CASE(MutexTest)
{
    const std::string shm_name("ShmMutex");
    // ShmRemover auto_remove(shm_name);
    SharedMemory shm(shm_name, 1024 * 1024 * 10);
    const std::string mtx_name("test_mutex");
    const std::string int_name("test_int");
    auto mtx = shm.find_or_construct<Mutex>(mtx_name.c_str())();
    auto pi = shm.find_or_construct<int>(int_name.c_str())(100);
    if (pi) {
        auto old = *pi;
        printf("int : %d, add1: %d\n", old, ++*pi);
    }
    auto TryLock = [&]() {
        if (mtx->try_lock()) {
            printf("try_lock ok\n");
            return true;
        } else {
            printf("try_lock failed\n");
            return false;
        }
    };
    auto Unlock = [&]() {
        mtx->unlock();
        printf("unlocked\n");
    };
    if (mtx) {
        printf("mtx exists\n");
        if (TryLock()) {
            if (TryLock()) {
                Unlock();
            }
            // Unlock();
        }
    } else {
        printf("mtx not exists\n");
    }
}
BOOST_AUTO_TEST_CASE(ApiTest)
@@ -166,36 +219,49 @@
            std::string s(req.SerializeAsString());
            void *msg_id = 0;
            int len = 0;
            // Sleep(10ms, false);
            bool r = BHAsyncRequest(s.data(), s.size(), 0, 0);
            DEFER1(BHFree(msg_id, len););
            if (r) {
                ++Status().nrequest_;
            } else {
                printf("request topic : %s\n", r ? "ok" : "failed");
                ++Status().nfailed_;
                static std::atomic<int64_t> last(0);
                auto now = NowSec();
                if (last.exchange(now) < now) {
                    int ec = 0;
                    std::string msg;
                    GetLastError(ec, msg);
                    printf("request topic error --------- : %s\n", msg.c_str());
                }
            }
        }
    };
    auto showStatus = [](std::atomic<bool> *run) {
        int64_t last = 0;
        MsgStatus last;
        while (*run) {
            auto &st = Status();
            std::this_thread::sleep_for(1s);
            int cur = st.nreply_.load();
            printf("nreq: %8ld, nsrv: %8ld, nreply: %8ld, speed %8ld\n", st.nrequest_.load(), st.nserved_.load(), cur, cur - last);
            last = cur;
            printf("nreq: %8ld, spd %8ld | failed: %8ld | nsrv: %8ld, spd %8ld | nreply: %8ld, spd %8ld\n",
                   st.nrequest_.load(), st.nrequest_ - last.nrequest_,
                   st.nfailed_.load(),
                   st.nserved_.load(), st.nserved_ - last.nserved_,
                   st.nreply_.load(), st.nreply_ - last.nreply_);
            last = st;
        }
    };
    auto hb = [](std::atomic<bool> *run) {
        while (*run) {
            BHHeartBeatEasy(0);
            std::this_thread::sleep_for(1s);
            Sleep(1s, false);
            bool r = BHHeartBeatEasy(1000);
            printf("heartbeat: %s\n", r ? "ok" : "failed");
        }
    };
    std::atomic<bool> run(true);
    ThreadManager threads;
    boost::timer::auto_cpu_timer timer;
    threads.Launch(hb, &run);
    // threads.Launch(showStatus, &run);
    threads.Launch(showStatus, &run);
    int ncli = 10;
    const uint64_t nreq = 1000 * 100;
    for (int i = 0; i < ncli; ++i) {
utest/util.h
@@ -39,9 +39,11 @@
using namespace std::chrono_literals;
template <class D>
inline void Sleep(D d)
inline void Sleep(D d, bool print = true)
{
    if (print) {
    printf("sleep for %ld ms\n", std::chrono::duration_cast<std::chrono::milliseconds>(d).count());
    }
    std::this_thread::sleep_for(d);
}