lichao
2021-04-26 1b167ec5ad101ac44451381e26cc73ab5d67d2a1
fix socket busy loop; del locked readall; refactor.
4个文件已添加
12个文件已修改
556 ■■■■■ 已修改文件
box/center.cpp 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/center_main.cc 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/defs.cpp 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/defs.h 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/sendq.cpp 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/sendq.h 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_alloc_queue.cpp 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_alloc_queue.h 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_msg_queue.cpp 105 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_msg_queue.h 77 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_queue.cpp 87 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_queue.h 88 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket.cpp 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket.h 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.cpp 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/api_test.cpp 102 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/center.cpp
@@ -133,12 +133,12 @@
                UpdateRegInfo(node);
                nodes_[ssn] = node;
                printf("new ssn %ld\n", ssn);
                printf("new node (%s) ssn (%ld)\n", head.proc_id().c_str(), ssn);
                auto old = online_node_addr_map_.find(head.proc_id());
                if (old != online_node_addr_map_.end()) { // old session
                    auto &old_ssn = old->second;
                    nodes_[old_ssn]->state_.PutOffline(offline_time_);
                    printf("put %s %ld offline\n", nodes_[old_ssn]->proc_.proc_id().c_str(), old->second);
                    printf("put node (%s) ssn (%ld) offline\n", nodes_[old_ssn]->proc_.proc_id().c_str(), old->second);
                    old_ssn = ssn;
                } else {
                    online_node_addr_map_.emplace(head.proc_id(), ssn);
@@ -201,6 +201,10 @@
                for (auto &topic : topics) {
                    service_map_[topic].insert(dest);
                }
                printf("node %s ssn %ld serve %d topics:\n", node->proc_.proc_id().c_str(), *node->addrs_.begin(), topics.size());
                for (auto &topic : topics) {
                    printf("\t %s\n", topic.c_str());
                }
                return MakeReply(eSuccess);
            });
    }
box/center_main.cc
@@ -85,15 +85,15 @@
} // namespace
int center_main(int argc, const char *argv[])
{
    auto &shm = BHomeShm();
    GlobalInit(shm);
    AppArg args(argc, argv);
    if (args.Has("remove")) {
        shm.Remove();
        SharedMemory::Remove(BHomeShmName());
        return 0;
    }
    auto &shm = BHomeShm();
    GlobalInit(shm);
    InstanceFlag inst(shm, kCenterRunningFlag);
    if (!inst.TryStartAsFirstInstance()) {
        printf("another instance is running, exit.\n");
src/defs.cpp
@@ -17,7 +17,7 @@
 */
#include "defs.h"
#include "msg.h"
#include "shm_queue.h"
#include "shm_msg_queue.h"
namespace
{
@@ -35,9 +35,13 @@
} // namespace
std::string BHomeShmName()
{
    return "bhome_default_shm_v0";
}
bhome_shm::SharedMemory &BHomeShm()
{
    static bhome_shm::SharedMemory shm("bhome_default_shm_v0", 1024 * 1024 * 512);
    static bhome_shm::SharedMemory shm(BHomeShmName(), 1024 * 1024 * 512);
    return shm;
}
src/defs.h
@@ -37,6 +37,7 @@
class SharedMemory;
} // namespace bhome_shm
std::string BHomeShmName();
bhome_shm::SharedMemory &BHomeShm();
bool GlobalInit(bhome_shm::SharedMemory &shm);
typedef std::string Topic;
src/sendq.cpp
@@ -16,10 +16,12 @@
 * =====================================================================================
 */
#include "sendq.h"
#include "shm_queue.h"
#include "shm_msg_queue.h"
#include <chrono>
int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote remote, Array &arr)
using namespace bhome_shm;
int SendQ::DoSend1Remote(ShmMsgQueue &mq, const Remote remote, Array &arr)
{
    auto FirstNotExpired = [](Array &l) {
        auto Less = [](const TimedMsg &msg, const TimePoint &tp) { return msg.expire() < tp; };
@@ -65,7 +67,7 @@
    return nprocessed;
}
int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote remote, ArrayList &al)
int SendQ::DoSend1Remote(ShmMsgQueue &mq, const Remote remote, ArrayList &al)
{
    int nsend = 0;
    auto AllSent = [&](Array &arr) {
@@ -76,7 +78,7 @@
    return nsend;
}
bool SendQ::TrySend(bhome_shm::ShmMsgQueue &mq)
bool SendQ::TrySend(ShmMsgQueue &mq)
{
    std::unique_lock<std::mutex> lock(mutex_out_);
    size_t nsend = 0;
src/sendq.h
@@ -29,10 +29,7 @@
#include <string>
#include <unordered_map>
namespace bhome_shm
{
class ShmMsgQueue;
} // namespace bhome_shm
class SendQ
{
@@ -65,7 +62,7 @@
    {
        AppendData(addr, Data(std::move(content)), DefaultExpire(), onExpire);
    }
    bool TrySend(bhome_shm::ShmMsgQueue &mq);
    bool TrySend(ShmMsgQueue &mq);
    // bool empty() const { return store_.empty(); }
private:
@@ -88,8 +85,8 @@
    typedef std::list<Array> ArrayList;
    typedef std::unordered_map<Remote, ArrayList> Store;
    int DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote remote, Array &arr);
    int DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote remote, ArrayList &arr);
    int DoSend1Remote(ShmMsgQueue &mq, const Remote remote, Array &arr);
    int DoSend1Remote(ShmMsgQueue &mq, const Remote remote, ArrayList &arr);
    std::mutex mutex_in_;
    std::mutex mutex_out_;
src/shm_alloc_queue.cpp
New file
@@ -0,0 +1,19 @@
/*
 * =====================================================================================
 *
 *       Filename:  shm_alloc_queue.cpp
 *
 *    Description:
 *
 *        Version:  1.0
 *        Created:  2021年04月26日 16时24分25秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), lichao@aiotlink.com
 *   Organization:
 *
 * =====================================================================================
 */
#include "shm_alloc_queue.h"
src/shm_alloc_queue.h
New file
@@ -0,0 +1,23 @@
/*
 * =====================================================================================
 *
 *       Filename:  shm_alloc_queue.h
 *
 *    Description:
 *
 *        Version:  1.0
 *        Created:  2021年04月26日 16时24分40秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), lichao@aiotlink.com
 *   Organization:
 *
 * =====================================================================================
 */
#ifndef SHM_ALLOC_QUEUE_EQBLM9FZ
#define SHM_ALLOC_QUEUE_EQBLM9FZ
#endif // end of include guard: SHM_ALLOC_QUEUE_EQBLM9FZ
src/shm_msg_queue.cpp
New file
@@ -0,0 +1,105 @@
/*
 * =====================================================================================
 *
 *       Filename:  shm_msg_queue.cpp
 *
 *    Description:
 *
 *        Version:  1.0
 *        Created:  2021年04月26日 16时25分05秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), lichao@aiotlink.com
 *   Organization:
 *
 * =====================================================================================
 */
#include "shm_msg_queue.h"
using namespace bhome_msg;
using namespace boost::interprocess;
namespace
{
std::string MsgQIdToName(const ShmMsgQueue::MQId id)
{
    char buf[40] = "mqOx";
    int n = sprintf(buf + 4, "%lx", id);
    return std::string(buf, n + 4);
}
const int AdjustMQLength(const int len)
{
    const int kMaxLength = 10000;
    const int kDefaultLen = 12;
    if (len <= 0) {
        return kDefaultLen;
    } else if (len < kMaxLength) {
        return len;
    } else {
        return kMaxLength;
    }
}
} // namespace
ShmMsgQueue::MQId ShmMsgQueue::NewId()
{
    static auto &id = GetData();
    return ++id;
}
// ShmMsgQueue memory usage: (320 + 16*length) bytes, length >= 2
ShmMsgQueue::ShmMsgQueue(const MQId id, ShmType &segment, const int len) :
    id_(id),
    queue_(segment, MsgQIdToName(id_), AdjustMQLength(len), segment.get_segment_manager())
{
}
ShmMsgQueue::ShmMsgQueue(ShmType &segment, const int len) :
    id_(NewId()),
    queue_(segment, true, MsgQIdToName(id_), AdjustMQLength(len), segment.get_segment_manager())
{
    if (!queue_.IsOk()) {
        throw("error create msgq " + std::to_string(id_));
    }
}
ShmMsgQueue::~ShmMsgQueue() {}
bool ShmMsgQueue::Remove(SharedMemory &shm, const MQId id)
{
    Queue *q = Find(shm, id);
    if (q) {
        MsgI msg;
        while (q->TryRead(msg)) {
            msg.Release();
        }
    }
    return Shmq::Remove(shm, MsgQIdToName(id));
}
ShmMsgQueue::Queue *ShmMsgQueue::Find(SharedMemory &shm, const MQId remote_id)
{
    return Shmq::Find(shm, MsgQIdToName(remote_id));
}
bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote_id, const MsgI &msg, OnSend const &onsend)
{
    Queue *remote = Find(shm, remote_id);
    if (remote) {
        if (onsend) {
            return remote->TryWrite(msg, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); });
        } else {
            return remote->TryWrite(msg, [](const MsgI &msg) { msg.AddRef(); });
        }
    } else {
        // SetLestError(eNotFound);
        return false;
    }
}
// Test shows that in the 2 cases:
// 1) build msg first, then find remote queue;
// 2) find remote queue first, then build msg;
// 1 is about 50% faster than 2, maybe cache related.
src/shm_msg_queue.h
New file
@@ -0,0 +1,77 @@
/*
 * =====================================================================================
 *
 *       Filename:  shm_msg_queue.h
 *
 *    Description:
 *
 *        Version:  1.0
 *        Created:  2021年04月26日 16时25分21秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), lichao@aiotlink.com
 *   Organization:
 *
 * =====================================================================================
 */
#ifndef SHM_MSG_QUEUE_D847TQXH
#define SHM_MSG_QUEUE_D847TQXH
#include "msg.h"
#include "shm_queue.h"
using namespace bhome_shm;
using namespace bhome_msg;
class ShmMsgQueue : public StaticDataRef<std::atomic<uint64_t>, ShmMsgQueue>
{
    typedef ShmObject<SharedQueue<MsgI>> Shmq;
    typedef Shmq::ShmType ShmType;
    typedef Shmq::Data Queue;
    typedef std::function<void()> OnSend;
public:
    typedef uint64_t MQId;
    static MQId NewId();
    ShmMsgQueue(const MQId id, ShmType &segment, const int len);
    ShmMsgQueue(ShmType &segment, const int len);
    ~ShmMsgQueue();
    static bool Remove(SharedMemory &shm, const MQId id);
    MQId Id() const { return id_; }
    ShmType &shm() const { return queue_.shm(); }
    bool Recv(MsgI &msg, const int timeout_ms) { return queue_.data()->Read(msg, timeout_ms); }
    bool TryRecv(MsgI &msg) { return queue_.data()->TryRead(msg); }
    static Queue *Find(SharedMemory &shm, const MQId remote_id);
    static bool TrySend(SharedMemory &shm, const MQId remote_id, const MsgI &msg, OnSend const &onsend = OnSend());
    template <class Iter>
    static int TrySendAll(SharedMemory &shm, const MQId remote_id, const Iter begin, const Iter end, OnSend const &onsend = OnSend())
    {
        Queue *remote = Find(shm, remote_id);
        if (remote) {
            if (onsend) {
                return remote->TryWrite(begin, end, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); });
            } else {
                return remote->TryWrite(begin, end, [](const MsgI &msg) { msg.AddRef(); });
            }
        } else {
            // SetLestError(eNotFound);
            return 0;
        }
    }
    template <class... Rest>
    bool TrySend(const MQId remote_id, Rest const &...rest) { return TrySend(shm(), remote_id, rest...); }
    template <class... Rest>
    int TrySendAll(const MQId remote_id, Rest const &...rest) { return TrySendAll(shm(), remote_id, rest...); }
private:
    MQId id_;
    Shmq &queue() { return queue_; }
    Shmq queue_;
};
#endif // end of include guard: SHM_MSG_QUEUE_D847TQXH
src/shm_queue.cpp
@@ -21,91 +21,4 @@
namespace bhome_shm
{
using namespace bhome_msg;
using namespace boost::interprocess;
namespace
{
std::string MsgQIdToName(const ShmMsgQueue::MQId id)
{
    char buf[40] = "mqOx";
    int n = sprintf(buf + 4, "%lx", id);
    return std::string(buf, n + 4);
}
const int AdjustMQLength(const int len)
{
    const int kMaxLength = 10000;
    const int kDefaultLen = 12;
    if (len <= 0) {
        return kDefaultLen;
    } else if (len < kMaxLength) {
        return len;
    } else {
        return kMaxLength;
    }
}
} // namespace
ShmMsgQueue::MQId ShmMsgQueue::NewId()
{
    static auto &id = GetData();
    return ++id;
}
// ShmMsgQueue memory usage: (320 + 16*length) bytes, length >= 2
ShmMsgQueue::ShmMsgQueue(const MQId id, ShmType &segment, const int len) :
    id_(id),
    queue_(segment, MsgQIdToName(id_), AdjustMQLength(len), segment.get_segment_manager())
{
}
ShmMsgQueue::ShmMsgQueue(ShmType &segment, const int len) :
    id_(NewId()),
    queue_(segment, true, MsgQIdToName(id_), AdjustMQLength(len), segment.get_segment_manager())
{
    if (!queue_.IsOk()) {
        throw("error create msgq " + std::to_string(id_));
    }
}
ShmMsgQueue::~ShmMsgQueue() {}
bool ShmMsgQueue::Remove(SharedMemory &shm, const MQId id)
{
    Queue *q = Find(shm, id);
    if (q) {
        MsgI msg;
        while (q->TryRead(msg)) {
            msg.Release();
        }
    }
    return Shmq::Remove(shm, MsgQIdToName(id));
}
ShmMsgQueue::Queue *ShmMsgQueue::Find(SharedMemory &shm, const MQId remote_id)
{
    return Shmq::Find(shm, MsgQIdToName(remote_id));
}
bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote_id, const MsgI &msg, OnSend const &onsend)
{
    Queue *remote = Find(shm, remote_id);
    if (remote) {
        if (onsend) {
            return remote->TryWrite(msg, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); });
        } else {
            return remote->TryWrite(msg, [](const MsgI &msg) { msg.AddRef(); });
        }
    } else {
        // SetLestError(eNotFound);
        return false;
    }
}
// Test shows that in the 2 cases:
// 1) build msg first, then find remote queue;
// 2) find remote queue first, then build msg;
// 1 is about 50% faster than 2, maybe cache related.
} // namespace bhome_shm
src/shm_queue.h
@@ -19,7 +19,6 @@
#ifndef SHM_QUEUE_JE0OEUP3
#define SHM_QUEUE_JE0OEUP3
#include "msg.h"
#include "shm.h"
#include <atomic>
#include <boost/circular_buffer.hpp>
@@ -59,30 +58,21 @@
        return [this](Guard &lock) { return !this->empty(); };
    }
    template <class Pred, class OnData>
    int ReadAllOnCond(Pred const &pred, OnData const &onData)
    {
        Guard lock(this->mutex());
        int n = 0;
        while (pred(lock)) {
            ++n;
            onData(this->front());
            this->pop_front();
            this->cond_write_.notify_one();
        }
        return n;
    }
    template <class Pred>
    bool ReadOnCond(D &buf, Pred const &pred)
    {
        int flag = 0;
        auto only_once = [&](Guard &lock) { return flag++ == 0 && pred(lock); };
        auto onData = [&buf](D &d) {
            using std::swap;
            swap(buf, d);
        auto Read = [&]() {
            Guard lock(this->mutex());
            if (pred(lock)) {
                using std::swap;
                swap(buf, Super::front());
                Super::pop_front();
                return true;
            } else {
                return false;
            }
        };
        return ReadAllOnCond(only_once, onData);
        return Read() ? (this->cond_write_.notify_one(), true) : false;
    }
    template <class Iter, class Pred, class OnWrite>
@@ -94,7 +84,7 @@
        Guard lock(mutex());
        while (pred(lock)) {
            onWrite(*begin);
            this->push_back(*begin);
            Super::push_back(*begin);
            ++n;
            cond_read_.notify_one();
            if (++begin == end) {
@@ -130,60 +120,6 @@
    bool Read(D &buf, const int timeout_ms) { return ReadOnCond(buf, TimedReadPred(timeout_ms)); }
    bool TryRead(D &buf) { return ReadOnCond(buf, TryReadPred()); }
};
using namespace bhome_msg;
class ShmMsgQueue : public StaticDataRef<std::atomic<uint64_t>, ShmMsgQueue>
{
    typedef ShmObject<SharedQueue<MsgI>> Shmq;
    typedef Shmq::ShmType ShmType;
    typedef Shmq::Data Queue;
    typedef std::function<void()> OnSend;
public:
    typedef uint64_t MQId;
    static MQId NewId();
    ShmMsgQueue(const MQId id, ShmType &segment, const int len);
    ShmMsgQueue(ShmType &segment, const int len);
    ~ShmMsgQueue();
    static bool Remove(SharedMemory &shm, const MQId id);
    MQId Id() const { return id_; }
    ShmType &shm() const { return queue_.shm(); }
    bool Recv(MsgI &msg, const int timeout_ms) { return queue_.data()->Read(msg, timeout_ms); }
    bool TryRecv(MsgI &msg) { return queue_.data()->TryRead(msg); }
    template <class OnData>
    int TryRecvAll(OnData const &onData) { return queue_.data()->TryReadAll(onData); }
    static Queue *Find(SharedMemory &shm, const MQId remote_id);
    static bool TrySend(SharedMemory &shm, const MQId remote_id, const MsgI &msg, OnSend const &onsend = OnSend());
    template <class Iter>
    static int TrySendAll(SharedMemory &shm, const MQId remote_id, const Iter begin, const Iter end, OnSend const &onsend = OnSend())
    {
        Queue *remote = Find(shm, remote_id);
        if (remote) {
            if (onsend) {
                return remote->TryWrite(begin, end, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); });
            } else {
                return remote->TryWrite(begin, end, [](const MsgI &msg) { msg.AddRef(); });
            }
        } else {
            // SetLestError(eNotFound);
            return 0;
        }
    }
    template <class... Rest>
    bool TrySend(const MQId remote_id, Rest const &...rest) { return TrySend(shm(), remote_id, rest...); }
    template <class... Rest>
    int TrySendAll(const MQId remote_id, Rest const &...rest) { return TrySendAll(shm(), remote_id, rest...); }
private:
    MQId id_;
    Shmq &queue() { return queue_; }
    Shmq queue_;
};
} // namespace bhome_shm
src/socket.cpp
@@ -65,7 +65,8 @@
                    onRecvWithPerMsgCB(*this, imsg, head);
                }
            };
            return mq().TryRecvAll(onMsg) > 0; // this will recv all msgs.
            MsgI imsg;
            return mq().TryRecv(imsg) ? (onMsg(imsg), true) : false;
        };
        try {
@@ -74,6 +75,8 @@
            if (onIdle) { onIdle(*this); }
            if (!more_to_send && !more_to_recv) {
                std::this_thread::yield();
                using namespace std::chrono_literals;
                std::this_thread::sleep_for(10000ns);
            }
        } catch (...) {
        }
src/socket.h
@@ -22,7 +22,7 @@
#include "bh_util.h"
#include "defs.h"
#include "sendq.h"
#include "shm_queue.h"
#include "shm_msg_queue.h"
#include <atomic>
#include <boost/noncopyable.hpp>
#include <condition_variable>
@@ -37,7 +37,7 @@
{
protected:
    typedef bhome_shm::ShmMsgQueue Queue;
    typedef ShmMsgQueue Queue;
public:
    typedef ShmMsgQueue::MQId MQId;
src/topic_node.cpp
@@ -389,7 +389,7 @@
        BHAddress addr;
        if (ClientQueryRPCTopic(request.topic(), addr, timeout_ms)) {
            printf("node: %ld, topic dest: %ld\n", SockNode().id(), addr.mq_id());
            // printf("node: %ld, topic dest: %ld\n", SockNode().id(), addr.mq_id());
            BHMsgHead head(InitMsgHead(GetType(request), proc_id(), ssn()));
            AddRoute(head, sock.id());
            head.set_topic(request.topic());
utest/api_test.cpp
@@ -18,6 +18,7 @@
#include "bh_api.h"
#include "util.h"
#include <atomic>
#include <boost/lockfree/queue.hpp>
using namespace bhome_msg;
@@ -49,7 +50,6 @@
    static MsgStatus st;
    return st;
}
} // namespace
void SubRecvProc(const void *proc_id,
                 const int proc_id_len,
@@ -59,7 +59,7 @@
    std::string proc((const char *) proc_id, proc_id_len);
    MsgPublish pub;
    pub.ParseFromArray(data, data_len);
    // printf("Sub data, %s : %s\n", pub.topic().c_str(), pub.data().c_str());
    printf("Sub data, %s : %s\n", pub.topic().c_str(), pub.data().c_str());
}
void ServerProc(const void *proc_id,
@@ -98,8 +98,8 @@
class TLMutex
{
    // typedef boost::interprocess::interprocess_mutex MutexT;
    typedef CasMutex MutexT;
    typedef boost::interprocess::interprocess_mutex MutexT;
    // typedef CasMutex MutexT;
    // typedef std::mutex MutexT;
    typedef std::chrono::steady_clock Clock;
    typedef Clock::duration Duration;
@@ -108,6 +108,7 @@
    const Duration limit_;
    std::atomic<Duration> last_lock_time_;
    MutexT mutex_;
    bool Expired(const Duration diff) { return diff > limit_; }
public:
    struct Status {
@@ -127,16 +128,18 @@
    {
        if (mutex_.try_lock()) {
            auto old_time = last_lock_time_.load();
            if (Now() - old_time > limit_) {
                return last_lock_time_.compare_exchange_strong(old_time, Now());
            auto cur = Now();
            if (Expired(cur - old_time)) {
                return last_lock_time_.compare_exchange_strong(old_time, cur);
            } else {
                last_lock_time_.store(Now());
                return true;
            }
        } else {
            auto old_time = last_lock_time_.load();
            if (Now() - old_time > limit_) {
                return last_lock_time_.compare_exchange_strong(old_time, Now());
            auto cur = Now();
            if (Expired(cur - old_time)) {
                return last_lock_time_.compare_exchange_strong(old_time, cur);
            } else {
                return false;
            }
@@ -154,55 +157,88 @@
    void unlock()
    {
        auto old_time = last_lock_time_.load();
        if (Now() - old_time > limit_) {
        } else {
            if (last_lock_time_.compare_exchange_strong(old_time, Now())) {
        auto cur = Now();
        if (!Expired(cur - old_time)) {
            if (last_lock_time_.compare_exchange_strong(old_time, cur)) {
                mutex_.unlock();
            }
        }
    }
};
namespace
{
typedef int64_t Offset;
Offset Addr(void *ptr) { return reinterpret_cast<Offset>(ptr); }
void *Ptr(const Offset offset) { return reinterpret_cast<void *>(offset); }
} // namespace
//robust attr does NOT work, maybe os does not support it.
class RobustMutex
{
public:
    RobustMutex()
    {
        pthread_mutexattr_t attr;
        pthread_mutexattr_init(&attr);
        pthread_mutexattr_setrobust(&attr, 1);
        pthread_mutex_init(mtx(), &attr);
        if (!valid()) {
        pthread_mutexattr_t mutex_attr;
        auto attr = [&]() { return &mutex_attr; };
        int r = pthread_mutexattr_init(attr());
        r |= pthread_mutexattr_setpshared(attr(), PTHREAD_PROCESS_SHARED);
        r |= pthread_mutexattr_setrobust_np(attr(), PTHREAD_MUTEX_ROBUST_NP);
        r |= pthread_mutex_init(mtx(), attr());
        int rob = 0;
        pthread_mutexattr_getrobust_np(attr(), &rob);
        int shared = 0;
        pthread_mutexattr_getpshared(attr(), &shared);
        printf("robust : %d, shared : %d\n", rob, shared);
        r |= pthread_mutexattr_destroy(attr());
        if (r) {
            throw("init mutex error.");
        }
    }
    ~RobustMutex()
    {
        pthread_mutex_destroy(mtx());
    }
public:
    void lock() { Lock(); }
    bool try_lock()
    {
        int r = TryLock();
        printf("TryLock ret: %d\n", r);
        return r == 0;
    }
    void unlock() { Unlock(); }
    // private:
    int TryLock() { return pthread_mutex_trylock(mtx()); }
    int Lock() { return pthread_mutex_lock(mtx()); }
    int Unlock() { return pthread_mutex_unlock(mtx()); }
    bool valid() const { return false; }
private:
    pthread_mutex_t *mtx() { return &mutex_; }
    pthread_mutex_t mutex_;
};
class LockFreeQueue
{
    typedef int64_t Data;
    typedef boost::lockfree::queue<Data, boost::lockfree::capacity<1024>> LFQueue;
    void push_back(Data d) { queue_.push(d); }
private:
    LFQueue queue_;
};
} // namespace
BOOST_AUTO_TEST_CASE(MutexTest)
{
    SharedMemory &shm = TestShm();
    // shm.Remove();
    // return;
    GlobalInit(shm);
    const std::string mtx_name("test_mutex");
    const std::string int_name("test_int");
    auto mtx = shm.FindOrCreate<Mutex>(mtx_name);
    auto mtx = shm.FindOrCreate<TLMutex>(mtx_name);
    auto pi = shm.FindOrCreate<int>(int_name, 100);
    std::mutex m;
    typedef std::chrono::steady_clock Clock;
    auto Now = []() { return Clock::now().time_since_epoch(); };
    if (pi) {
@@ -334,7 +370,6 @@
        printf("subscribe topic : %s\n", r ? "ok" : "failed");
    }
    // BHStartWorker(&ServerProc, &SubRecvProc, &ClientProc);
    auto ServerLoop = [&](std::atomic<bool> *run) {
        while (*run) {
            void *proc_id = 0;
@@ -446,27 +481,20 @@
    std::atomic<bool> run(true);
    BHStartWorker(&ServerProc, &SubRecvProc, &ClientProc);
    ThreadManager threads;
    boost::timer::auto_cpu_timer timer;
    threads.Launch(hb, &run);
    threads.Launch(ServerLoop, &run);
    threads.Launch(showStatus, &run);
    int ncli = 10;
    const uint64_t nreq = 1000 * 1;
    const uint64_t nreq = 1000 * 10;
    for (int i = 0; i < ncli; ++i) {
        // threads.Launch(asyncRequest, nreq);
        threads.Launch(asyncRequest, nreq);
    }
    for (int i = 0; i < 10; ++i) {
        SyncRequest(i);
    }
    // run.store(false);
    // server_thread.join();
    // return;
    int same = 0;
    int64_t last = 0;
    while (last < nreq * ncli && same < 1) {
    while (last < nreq * ncli && same < 2) {
        Sleep(1s, false);
        auto cur = Status().nreply_.load();
        if (last == cur) {