lichao
2021-06-01 365c864a587365fe443b11cc0cd7cfc8f8f8eb81
refactor, clean up useless code.
1个文件已删除
7个文件已修改
973 ■■■■■ 已修改文件
src/robust.cpp 71 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/robust.h 187 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_msg_queue.cpp 24 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_msg_queue.h 32 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/robust_test.cpp 129 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/simple_tests.cpp 18 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/speed_test.cpp 220 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/utest.cpp 292 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/robust.cpp
@@ -22,43 +22,46 @@
namespace robust
{
namespace
bool AtomicReqRep::ClientRequest(const Data request, Data &reply)
{
static_assert(sizeof(steady_clock::duration) == sizeof(int64_t));
auto Now() { return steady_clock::now().time_since_epoch(); }
void Yield() { std::this_thread::sleep_for(10us); }
} // namespace
void QuickSleep() { Yield(); }
bool FMutex::try_lock()
{
    if (flock(fd_, LOCK_EX | LOCK_NB) == 0) {
        ++count_;
        if (mtx_.try_lock()) {
            return true;
        } else {
            if (--count_ == 0) {
                flock(fd_, LOCK_UN);
            }
    auto end_time = now() + 3s;
    do {
        Data cur = data_.load();
        if (GetState(cur) == eStateFree &&
            DataCas(cur, Encode(request, eStateRequest))) {
            do {
                yield();
                cur = data_.load();
                if (GetState(cur) == eStateReply) {
                    DataCas(cur, Encode(0, eStateFree));
                    reply = Decode(cur);
                    return true;
                }
            } while (now() < end_time);
        }
        yield();
    } while (now() < end_time);
    return false;
}
bool AtomicReqRep::ServerProcess(Handler onReq)
{
    Data cur = data_.load();
    switch (GetState(cur)) {
    case eStateRequest:
        if (DataCas(cur, Encode(onReq(Decode(cur)), eStateReply))) {
            timestamp_ = now();
            return true;
        }
        break;
    case eStateReply:
        if (timestamp_.load() + 3s < now()) {
            DataCas(cur, Encode(0, eStateFree));
        }
        break;
    case eStateFree:
    default: break;
    }
    return false;
}
void FMutex::lock()
{
    flock(fd_, LOCK_EX);
    ++count_;
    mtx_.lock();
}
void FMutex::unlock()
{
    mtx_.unlock();
    if (--count_ == 0) {
        flock(fd_, LOCK_UN);
    }
}
} // namespace robust
src/robust.h
@@ -31,6 +31,7 @@
#include <sys/sem.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <thread>
#include <unistd.h>
namespace robust
@@ -38,144 +39,6 @@
using namespace std::chrono;
using namespace std::chrono_literals;
void QuickSleep();
class CasMutex
{
    typedef uint64_t locker_t;
    static inline locker_t this_locker() { return pthread_self(); }
    static const uint64_t kLockerMask = MaskBits(63);
public:
    CasMutex() :
        meta_(0) {}
    int try_lock()
    {
        auto old = meta_.load();
        int r = 0;
        if (!Locked(old)) {
            r = MetaCas(old, Meta(1, this_locker()));
        }
        return r;
    }
    int lock()
    {
        int r = 0;
        do {
            r = try_lock();
        } while (r == 0);
        return r;
    }
    void unlock()
    {
        auto old = meta_.load();
        if (Locked(old) && Locker(old) == this_locker()) {
            MetaCas(old, Meta(0, this_locker()));
        }
    }
private:
    std::atomic<uint64_t> meta_;
    bool Locked(uint64_t meta) { return (meta >> 63) == 1; }
    locker_t Locker(uint64_t meta) { return meta & kLockerMask; }
    uint64_t Meta(uint64_t lk, locker_t lid) { return (lk << 63) | lid; }
    bool MetaCas(uint64_t exp, uint64_t val) { return meta_.compare_exchange_strong(exp, val); }
};
class NullMutex
{
public:
    template <class... T>
    explicit NullMutex(T &&...t) {} // easy test.
    bool try_lock() { return true; }
    void lock() {}
    void unlock() {}
};
// flock + mutex
class FMutex
{
public:
    typedef uint64_t id_t;
    FMutex(id_t id) :
        id_(id), fd_(Open(id_)), count_(0)
    {
        if (fd_ == -1) { throw "error create mutex!"; }
    }
    ~FMutex() { Close(fd_); }
    bool try_lock();
    void lock();
    void unlock();
private:
    static std::string GetPath(id_t id)
    {
        const std::string dir("/tmp/.bhome_mtx");
        mkdir(dir.c_str(), 0777);
        return dir + "/fm_" + std::to_string(id);
    }
    static int Open(id_t id) { return open(GetPath(id).c_str(), O_CREAT | O_RDONLY, 0666); }
    static int Close(int fd) { return close(fd); }
    id_t id_;
    int fd_;
    std::mutex mtx_;
    std::atomic<int32_t> count_;
};
union semun {
    int val;               /* Value for SETVAL */
    struct semid_ds *buf;  /* Buffer for IPC_STAT, IPC_SET */
    unsigned short *array; /* Array for GETALL, SETALL */
    struct seminfo *__buf; /* Buffer for IPC_INFO
                                           (Linux-specific) */
};
class SemMutex
{
public:
    SemMutex(key_t key) :
        key_(key), sem_id_(semget(key, 1, 0666))
    {
        if (sem_id_ == -1) { throw "error create semaphore."; }
    }
    ~SemMutex() {}
    bool try_lock()
    {
        sembuf op = {0, -1, SEM_UNDO | IPC_NOWAIT};
        return semop(sem_id_, &op, 1) == 0;
    }
    void lock()
    {
        sembuf op = {0, -1, SEM_UNDO};
        semop(sem_id_, &op, 1);
    }
    void unlock()
    {
        sembuf op = {0, 1, SEM_UNDO};
        semop(sem_id_, &op, 1);
    }
private:
    key_t key_;
    int sem_id_;
};
template <class Lock>
class Guard
{
public:
    Guard(Lock &l) :
        l_(l) { l_.lock(); }
    ~Guard() { l_.unlock(); }
private:
    Guard(const Guard &);
    Guard(Guard &&);
    Lock &l_;
};
template <unsigned PowerSize = 4, class Int = int64_t>
class AtomicQueue
@@ -194,8 +57,6 @@
    AtomicQueue() { memset(this, 0, sizeof(*this)); }
    size_type head() const { return head_.load(); }
    size_type tail() const { return tail_.load(); }
    bool like_empty() const { return head() == tail() && Empty(buf[head()]); }
    bool like_full() const { return head() == tail() && !Empty(buf[head()]); }
    bool push(const Data d, bool try_more = false)
    {
        bool r = false;
@@ -276,48 +137,8 @@
public:
    typedef int64_t Data;
    typedef std::function<Data(const Data)> Handler;
    bool ClientRequest(const Data request, Data &reply)
    {
        auto end_time = now() + 3s;
        do {
            Data cur = data_.load();
            if (GetState(cur) == eStateFree &&
                DataCas(cur, Encode(request, eStateRequest))) {
                do {
                    yield();
                    cur = data_.load();
                    if (GetState(cur) == eStateReply) {
                        DataCas(cur, Encode(0, eStateFree));
                        reply = Decode(cur);
                        return true;
                    }
                } while (now() < end_time);
            }
            yield();
        } while (now() < end_time);
        return false;
    }
    bool ServerProcess(Handler onReq)
    {
        Data cur = data_.load();
        switch (GetState(cur)) {
        case eStateRequest:
            if (DataCas(cur, Encode(onReq(Decode(cur)), eStateReply))) {
                timestamp_ = now();
                return true;
            }
            break;
        case eStateReply:
            if (timestamp_.load() + 3s < now()) {
                DataCas(cur, Encode(0, eStateFree));
            }
            break;
        case eStateFree:
        default: break;
        }
        return false;
    }
    bool ClientRequest(const Data request, Data &reply);
    bool ServerProcess(Handler onReq);
private:
    enum State {
@@ -328,7 +149,7 @@
    static int GetState(Data d) { return d & MaskBits(3); }
    static Data Encode(Data d, State st) { return (d << 3) | st; }
    static Data Decode(Data d) { return d >> 3; }
    static void yield() { QuickSleep(); }
    static void yield() { std::this_thread::sleep_for(10us); }
    typedef steady_clock::duration Duration;
    Duration now() { return steady_clock::now().time_since_epoch(); }
src/shm_msg_queue.cpp
@@ -53,22 +53,6 @@
ShmMsgQueue::~ShmMsgQueue() {}
#ifndef BH_USE_ATOMIC_Q
ShmMsgQueue::Mutex &ShmMsgQueue::GetMutex(const MQId id)
{
    static std::unordered_map<MQId, std::shared_ptr<Mutex>> imm;
    static std::mutex mtx;
    std::lock_guard<std::mutex> lock(mtx);
    auto pos = imm.find(id);
    if (pos == imm.end()) {
        pos = imm.emplace(id, new Mutex(id)).first;
        // pos = imm.emplace(id, new Mutex()).first;
    }
    return *pos->second;
}
#endif
bool ShmMsgQueue::Remove(SharedMemory &shm, const MQId id)
{
    Queue *q = Find(shm, id);
@@ -95,17 +79,9 @@
    try {
        //TODO find from center, or use offset.
        ShmMsgQueue dest(remote.offset_, shm, remote.id_);
#ifndef BH_USE_ATOMIC_Q
        Guard lock(GetMutex(remote_id));
#endif
        return dest.queue().TryWrite(val);
    } catch (...) {
        // SetLastError(eNotFound, "remote not found");
        return false;
    }
}
// Test shows that in the 2 cases:
// 1) build msg first, then find remote queue;
// 2) find remote queue first, then build msg;
// 1 is about 50% faster than 2, maybe cache related.
src/shm_msg_queue.h
@@ -25,23 +25,11 @@
using namespace bhome_shm;
using namespace bhome_msg;
#define BH_USE_ATOMIC_Q
class ShmMsgQueue
{
public:
    typedef int64_t RawData;
#ifdef BH_USE_ATOMIC_Q
    typedef ShmObject<SharedQ63<0>> Shmq;
#else
    typedef ShmObject<SharedQueue<RawData>> Shmq;
    // typedef robust::FMutex Mutex;
    // typedef robust::SemMutex Mutex;
    typedef robust::NullMutex Mutex;
    typedef robust::Guard<Mutex> Guard;
#endif
    typedef Shmq::Data Queue;
    typedef Shmq::ShmType ShmType;
    typedef uint64_t MQId;
@@ -55,21 +43,8 @@
    ShmType &shm() const { return queue_.shm(); }
    int64_t AbsAddr() const { return queue_.offset(); }
    bool Recv(RawData &val, const int timeout_ms)
    {
#ifndef BH_USE_ATOMIC_Q
        Guard lock(GetMutex(Id()));
#endif
        return queue().Read(val, timeout_ms);
    }
    bool TryRecv(RawData &val)
    {
#ifndef BH_USE_ATOMIC_Q
        Guard lock(GetMutex(Id()));
#endif
        return queue().TryRead(val);
    }
    bool Recv(RawData &val, const int timeout_ms) { return queue().Read(val, timeout_ms); }
    bool TryRecv(RawData &val) { return queue().TryRead(val); }
    bool Recv(MsgI &msg, const int timeout_ms) { return Recv(msg.OffsetRef(), timeout_ms); }
    bool TryRecv(MsgI &msg) { return TryRecv(msg.OffsetRef()); }
@@ -78,9 +53,6 @@
    bool TrySend(const MQInfo &remote, const RawData val) { return TrySend(shm(), remote, val); }
private:
#ifndef BH_USE_ATOMIC_Q
    static Mutex &GetMutex(const MQId id);
#endif
    MQId id_;
    Queue &queue() { return *queue_.data(); }
    Shmq queue_;
utest/robust_test.cpp
@@ -20,7 +20,7 @@
{
    AtomicReqRep rr;
    auto client = [&]() {
        for (int i = 0; i < 20; ++i) {
        for (int i = 0; i < 5; ++i) {
            int64_t reply = 0;
            bool r = rr.ClientRequest(i, reply);
            printf("init request %d, %s, reply %d\n", i, (r ? "ok" : "failed"), reply);
@@ -196,130 +196,3 @@
    printf("total: %ld, expected: %ld\n", total.load(), correct_total);
    BOOST_CHECK_EQUAL(total.load(), correct_total);
}
BOOST_AUTO_TEST_CASE(MutexTest)
{
    {
        int sem_id = semget(100, 1, 0666 | IPC_CREAT);
        auto P = [&]() {
            sembuf op = {0, -1, SEM_UNDO};
            semop(sem_id, &op, 1);
        };
        auto V = [&]() {
            sembuf op = {0, 1, SEM_UNDO};
            semop(sem_id, &op, 1);
        };
        for (int i = 0; i < 10; ++i) {
            V();
        }
        Sleep(10s);
        return;
    }
    // typedef robust::MFMutex RobustMutex;
    typedef robust::SemMutex RobustMutex;
    for (int i = 0; i < 20; ++i) {
        int size = i;
        int left = size & 7;
        int rsize = size + ((8 - left) & 7);
        printf("size: %3d, rsize: %3d\n", size, rsize);
    }
    SharedMemory &shm = TestShm();
    // shm.Remove();
    // return;
    GlobalInit(shm);
    const std::string mtx_name("test_mutex");
    const std::string int_name("test_int");
    // auto mtx = shm.FindOrCreate<RobustMutex>(mtx_name, 12345);
    RobustMutex rmtx(12345);
    auto mtx = &rmtx;
    auto pi = shm.FindOrCreate<int>(int_name, 100);
    std::mutex m;
    typedef std::chrono::steady_clock Clock;
    if (pi) {
        auto old = *pi;
        printf("int : %d, add1: %d\n", old, ++*pi);
    }
    auto LockSpeed = [](auto &mutex, const std::string &name) {
        const int ntimes = 1000 * 1;
        auto Lock = [&]() {
            for (int i = 0; i < ntimes; ++i) {
                mutex.lock();
                mutex.unlock();
            }
        };
        printf("\nTesting %s lock/unlock %d times\n", name.c_str(), ntimes);
        {
            boost::timer::auto_cpu_timer timer;
            printf("1 thread: ");
            Lock();
        }
        auto InThread = [&](int nthread) {
            boost::timer::auto_cpu_timer timer;
            printf("%d threads: ", nthread);
            std::vector<std::thread> vt;
            for (int i = 0; i < nthread; ++i) {
                vt.emplace_back(Lock);
            }
            for (auto &t : vt) {
                t.join();
            }
        };
        InThread(4);
        InThread(16);
        InThread(100);
        InThread(1000);
    };
    NullMutex null_mtx;
    std::mutex std_mtx;
    CasMutex cas_mtx;
    FMutex mfmtx(3);
    boost::interprocess::interprocess_mutex ipc_mutex;
    SemMutex sem_mtx(3);
    LockSpeed(null_mtx, "null mutex");
    LockSpeed(std_mtx, "std::mutex");
    // LockSpeed(cas_mtx, "CAS mutex");
    LockSpeed(ipc_mutex, "boost ipc mutex");
    LockSpeed(mfmtx, "mutex+flock");
    LockSpeed(sem_mtx, "sem mutex");
    auto TryLock = [&]() {
        if (mtx->try_lock()) {
            printf("try_lock ok\n");
            return true;
        } else {
            printf("try_lock failed\n");
            return false;
        }
    };
    auto Unlock = [&]() {
        mtx->unlock();
        printf("unlocked\n");
    };
    if (mtx) {
        printf("mtx exists\n");
        if (TryLock()) {
            // Sleep(10s);
            auto op = [&]() {
                if (TryLock()) {
                    Unlock();
                }
            };
            op();
            std::thread t(op);
            t.join();
            // Unlock();
        } else {
            // mtx->unlock();
        }
    } else {
        printf("mtx not exists\n");
    }
}
utest/simple_tests.cpp
@@ -104,21 +104,6 @@
    }
}
BOOST_AUTO_TEST_CASE(TimedWaitTest)
{
    SharedMemory &shm = TestShm();
    GlobalInit(shm);
    ShmMsgQueue q(shm, NewSession(), 64);
    for (int i = 0; i < 2; ++i) {
        int ms = i * 100;
        printf("Timeout Test %4d: ", ms);
        boost::timer::auto_cpu_timer timer;
        MsgI msg(shm);
        bool r = q.Recv(msg, ms);
        BOOST_CHECK(!r);
    }
}
BOOST_AUTO_TEST_CASE(RefCountTest)
{
    SharedMemory &shm = TestShm();
@@ -126,7 +111,8 @@
    GlobalInit(shm);
    Msg m0(1000, shm);
    BOOST_CHECK(m0.valid());
    BOOST_CHECK(!m0.valid());
    m0.Make(100);
    BOOST_CHECK_EQUAL(m0.Count(), 1);
    Msg m1 = m0;
    BOOST_CHECK(m1.valid());
utest/speed_test.cpp
File was deleted
utest/utest.cpp
@@ -56,294 +56,6 @@
        }
    }
    printf("time: %ld ns\n", (tps.back() - tps.front()).count());
    return;
    // sub topic partial match.
    Topic topics[] = {
        "",
        ".",
        "a",
        "sp",
        "sport",
        "sport.",
        "sport.a",
        "sport.a.b.c",
        "sport.ab.c",
        "sport.basketball",
        "sport.football",
    };
    const char sep = '.';
    auto Adjust = [&](const std::string &user_topic) {
        if (user_topic.empty() || user_topic.back() == sep) {
            return user_topic;
        } else {
            return user_topic + sep;
        }
    };
    for (auto &t : topics) {
        const std::string &a = Adjust(t);
        printf("orig: %20s   adjusted: %20s   parts:[", ("'" + t + "'").c_str(), ('\'' + a + '\'').c_str());
        size_t pos = 0;
        while (true) {
            auto &topic = t;
            pos = topic.find(kTopicSep, pos);
            if (pos == topic.npos || ++pos == topic.size()) {
                // Find1(std::string()); // sub all.
                break;
            } else {
                printf("'%s',", topic.substr(0, pos).c_str());
            }
        }
        printf("]\n");
    }
}
BOOST_AUTO_TEST_CASE(PubSubTest)
{
    SharedMemory &shm = TestShm();
    GlobalInit(shm);
    auto Avail = [&]() { return shm.get_free_memory(); };
    auto init_avail = Avail();
    int *flag = shm.FindOrCreate<int>("flag", 123);
    printf("flag = %d\n", *flag);
    ++*flag;
    const std::string sub_proc_id = "subscriber";
    const std::string pub_proc_id = "publisher";
    BHCenter center(shm);
    center.Start();
    Sleep(100ms);
    std::atomic<uint64_t> total_count(0);
    std::atomic<int64_t> last_time(NowSec() - 1);
    std::atomic<uint64_t> last_count(0);
    const uint64_t nmsg = 100 * 2;
    const int timeout = 1000;
    auto Sub = [&](int id, const std::vector<std::string> &topics) {
        DemoNode client("client_" + std::to_string(id), shm);
        MsgTopicList tlist;
        for (auto &t : topics) {
            tlist.add_topic_list(t);
        }
        MsgCommonReply reply_body;
        bool r = client.Subscribe(tlist, reply_body, timeout);
        if (!r) {
            printf("client subscribe failed.\n");
        }
        std::mutex mutex;
        std::condition_variable cv;
        std::atomic<uint64_t> n(0);
        auto OnTopicData = [&](const std::string &proc_id, const MsgPublish &pub) {
            ++total_count;
            auto cur = NowSec();
            if (last_time.exchange(cur) < cur) {
                std::cout << "time: " << cur;
                printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n",
                       total_count.load(), total_count - last_count.exchange(total_count), init_avail - Avail());
            }
            if (++n >= nmsg * topics.size()) {
                cv.notify_one();
            }
            // printf("sub %2d recv: %s/%s\n", id, pub.topic().c_str(), pub.data().c_str());
        };
        client.SubscribeStartWorker(OnTopicData, 1);
        std::unique_lock<std::mutex> lk(mutex);
        cv.wait(lk);
    };
    auto Pub = [&](const std::string &topic) {
        DemoNode provider("server_" + topic, shm);
        for (unsigned i = 0; i < nmsg; ++i) {
            std::string data = topic + std::to_string(i) + std::string(1000, '-');
            MsgPublish pub;
            pub.set_topic(topic);
            pub.set_data(data);
            bool r = provider.Publish(pub, 0);
            if (!r) {
                static std::atomic<int> an(0);
                int n = ++an;
                printf("pub %d ret: %s\n", n, r ? "ok" : "fail");
            }
        }
    };
    ThreadManager threads;
    typedef std::vector<Topic> Topics;
    Topics topics;
    for (int i = 0; i < 100; ++i) {
        topics.push_back("t" + std::to_string(i));
    }
    Topics part;
    boost::timer::auto_cpu_timer pubsub_timer;
    for (size_t i = 0; i < topics.size(); ++i) {
        part.push_back(topics[i]);
        threads.Launch(Sub, i, topics);
    }
    Sleep(100ms);
    for (auto &topic : topics) {
        threads.Launch(Pub, topic);
    }
    threads.Launch(Pub, "some_else");
    threads.WaitAll();
    printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n",
           total_count.load(), total_count - last_count.exchange(total_count), init_avail - Avail());
}
namespace
{
struct C {
    C() { printf("+C\n"); }
    C(const C &c) { printf("+C(const C&)\n"); }
    void F() { printf("C::F()\n"); }
    ~C() { printf("-C\n"); }
    char arr[100];
};
int F(C &c) { return printf(":::::::::::::F()\n"); }
} // namespace
BOOST_AUTO_TEST_CASE(ReqRepTest)
{
    SharedMemory &shm = TestShm();
    GlobalInit(shm);
    auto Avail = [&]() { return shm.get_free_memory(); };
    auto init_avail = Avail();
    int *flag = shm.FindOrCreate<int>("flag", 123);
    printf("flag = %d\n", *flag);
    ++*flag;
    const std::string client_proc_id = "client_proc_";
    const std::string server_proc_id = "server_proc_";
    BHCenter center(shm);
    center.Start();
    std::atomic<bool> run(true);
    auto Client = [&](const std::string &topic, const int nreq) {
        DemoNode client(client_proc_id + topic, shm);
        std::atomic<int> count(0);
        std::string reply;
        auto onRecv = [&](const BHMsgHead &head, const MsgRequestTopicReply &msg) {
            reply = msg.data();
            if (++count >= nreq) {
                printf("count: %d\n", count.load());
            }
        };
        MsgRequestTopic req;
        req.set_topic(topic);
        req.set_data("data " + std::string(100, 'a'));
        client.ClientStartWorker(onRecv, 2);
        boost::timer::auto_cpu_timer timer;
        for (int i = 0; i < nreq; ++i) {
            std::string msg_id;
            if (!client.ClientAsyncRequest(BHAddress(), req, msg_id)) {
                printf("client request failed\n");
                ++count;
            }
            // std::string proc_id;
            // MsgRequestTopicReply reply;
            // if (!client.ClientSyncRequest(req, proc_id, reply, 1000)) {
            //     printf("client request failed\n");
            // }
            // ++count;
        }
        do {
            std::this_thread::sleep_for(100ms);
        } while (count.load() < nreq);
        client.Stop();
        printf("request %s %d done ", topic.c_str(), count.load());
    };
    std::atomic_uint64_t server_msg_count(0);
    auto Server = [&](const std::string &name, const std::vector<std::string> &topics) {
        DemoNode server(name, shm);
        auto onDataSync = [&](const std::string &proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply) {
            ++server_msg_count;
            reply.set_data(request.topic() + ':' + request.data());
            return true;
        };
        auto onDataAsync = [&](void *src, std::string &proc_id, MsgRequestTopic &request) {
            ++server_msg_count;
            MsgRequestTopicReply reply;
            reply.set_data(request.topic() + ':' + request.data());
            server.ServerSendReply(src, reply);
        };
        server.ServerStart(onDataAsync);
        MsgTopicList rpc;
        for (auto &topic : topics) {
            rpc.add_topic_list(topic);
        }
        MsgCommonReply reply_body;
        if (!server.ServerRegisterRPC(rpc, reply_body, 100)) {
            printf("server register topic failed\n");
            return;
        }
        while (run) {
            std::this_thread::sleep_for(100ms);
        }
    };
    ThreadManager clients, servers;
    std::vector<Topic> topics = {"topic1", "topic2"};
    servers.Launch(Server, "server", topics);
    Sleep(100ms);
    for (auto &t : topics) {
        clients.Launch(Client, t, 1000 * 100 * 2);
    }
    clients.WaitAll();
    printf("clients done, server replyed: %ld\n", server_msg_count.load());
    run = false;
    servers.WaitAll();
}
BOOST_AUTO_TEST_CASE(HeartbeatTest)
{
    const std::string shm_name("ShmHeartbeat");
    ShmRemover auto_remove(shm_name);
    SharedMemory shm(shm_name, 1024 * 1024 * 50);
    BHCenter center(shm);
    center.Start();
    {
        DemoNode node("demo_node", shm);
        auto Check = [&]() {
            bool r = node.Heartbeat(100);
            printf("hearbeat ret : %s\n", r ? "ok" : "failed");
        };
        Check();
        for (int i = 0; i < 3; ++i) {
            Sleep(1s);
            Check();
        }
        Sleep(4s);
        for (int i = 0; i < 2; ++i) {
            Sleep(1s);
            Check();
        }
    }
    Sleep(8s);
}
inline int MyMin(int a, int b)
{
    printf("MyMin\n");
    return a < b ? a : b;
}
int test_main(int argc, char *argv[])
@@ -352,10 +64,6 @@
    int a = 0;
    int b = 0;
    BOOST_CHECK_EQUAL(a, b);
    int n = MyMin(4, 6);
    for (int i = 0; i < n; ++i) {
        printf("i = %d\n", i);
    }
    return 0;
}