From 365c864a587365fe443b11cc0cd7cfc8f8f8eb81 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期二, 01 六月 2021 11:19:22 +0800 Subject: [PATCH] refactor, clean up useless code. --- src/robust.h | 187 ---------- /dev/null | 220 ------------ utest/simple_tests.cpp | 18 src/shm_msg_queue.h | 32 - utest/robust_test.cpp | 129 ------- utest/utest.cpp | 292 ----------------- src/robust.cpp | 71 ++-- src/shm_msg_queue.cpp | 24 - 8 files changed, 46 insertions(+), 927 deletions(-) diff --git a/src/robust.cpp b/src/robust.cpp index 4654652..015f97a 100644 --- a/src/robust.cpp +++ b/src/robust.cpp @@ -22,43 +22,46 @@ namespace robust { -namespace +bool AtomicReqRep::ClientRequest(const Data request, Data &reply) { -static_assert(sizeof(steady_clock::duration) == sizeof(int64_t)); - -auto Now() { return steady_clock::now().time_since_epoch(); } -void Yield() { std::this_thread::sleep_for(10us); } - -} // namespace - -void QuickSleep() { Yield(); } - -bool FMutex::try_lock() -{ - if (flock(fd_, LOCK_EX | LOCK_NB) == 0) { - ++count_; - if (mtx_.try_lock()) { - return true; - } else { - if (--count_ == 0) { - flock(fd_, LOCK_UN); - } + auto end_time = now() + 3s; + do { + Data cur = data_.load(); + if (GetState(cur) == eStateFree && + DataCas(cur, Encode(request, eStateRequest))) { + do { + yield(); + cur = data_.load(); + if (GetState(cur) == eStateReply) { + DataCas(cur, Encode(0, eStateFree)); + reply = Decode(cur); + return true; + } + } while (now() < end_time); } + yield(); + } while (now() < end_time); + return false; +} + +bool AtomicReqRep::ServerProcess(Handler onReq) +{ + Data cur = data_.load(); + switch (GetState(cur)) { + case eStateRequest: + if (DataCas(cur, Encode(onReq(Decode(cur)), eStateReply))) { + timestamp_ = now(); + return true; + } + break; + case eStateReply: + if (timestamp_.load() + 3s < now()) { + DataCas(cur, Encode(0, eStateFree)); + } + break; + case eStateFree: + default: break; } return false; } -void FMutex::lock() -{ - flock(fd_, LOCK_EX); - ++count_; - mtx_.lock(); -} -void FMutex::unlock() -{ - mtx_.unlock(); - if (--count_ == 0) { - flock(fd_, LOCK_UN); - } -} - } // namespace robust \ No newline at end of file diff --git a/src/robust.h b/src/robust.h index b3b99c4..1bbe8fc 100644 --- a/src/robust.h +++ b/src/robust.h @@ -31,6 +31,7 @@ #include <sys/sem.h> #include <sys/stat.h> #include <sys/types.h> +#include <thread> #include <unistd.h> namespace robust @@ -38,144 +39,6 @@ using namespace std::chrono; using namespace std::chrono_literals; -void QuickSleep(); - -class CasMutex -{ - typedef uint64_t locker_t; - static inline locker_t this_locker() { return pthread_self(); } - static const uint64_t kLockerMask = MaskBits(63); - -public: - CasMutex() : - meta_(0) {} - int try_lock() - { - auto old = meta_.load(); - int r = 0; - if (!Locked(old)) { - r = MetaCas(old, Meta(1, this_locker())); - } - return r; - } - int lock() - { - int r = 0; - do { - r = try_lock(); - } while (r == 0); - return r; - } - void unlock() - { - auto old = meta_.load(); - if (Locked(old) && Locker(old) == this_locker()) { - MetaCas(old, Meta(0, this_locker())); - } - } - -private: - std::atomic<uint64_t> meta_; - bool Locked(uint64_t meta) { return (meta >> 63) == 1; } - locker_t Locker(uint64_t meta) { return meta & kLockerMask; } - uint64_t Meta(uint64_t lk, locker_t lid) { return (lk << 63) | lid; } - bool MetaCas(uint64_t exp, uint64_t val) { return meta_.compare_exchange_strong(exp, val); } -}; - -class NullMutex -{ -public: - template <class... T> - explicit NullMutex(T &&...t) {} // easy test. - bool try_lock() { return true; } - void lock() {} - void unlock() {} -}; - -// flock + mutex -class FMutex -{ -public: - typedef uint64_t id_t; - FMutex(id_t id) : - id_(id), fd_(Open(id_)), count_(0) - { - if (fd_ == -1) { throw "error create mutex!"; } - } - ~FMutex() { Close(fd_); } - bool try_lock(); - void lock(); - void unlock(); - -private: - static std::string GetPath(id_t id) - { - const std::string dir("/tmp/.bhome_mtx"); - mkdir(dir.c_str(), 0777); - return dir + "/fm_" + std::to_string(id); - } - static int Open(id_t id) { return open(GetPath(id).c_str(), O_CREAT | O_RDONLY, 0666); } - static int Close(int fd) { return close(fd); } - id_t id_; - int fd_; - std::mutex mtx_; - std::atomic<int32_t> count_; -}; - -union semun { - int val; /* Value for SETVAL */ - struct semid_ds *buf; /* Buffer for IPC_STAT, IPC_SET */ - unsigned short *array; /* Array for GETALL, SETALL */ - struct seminfo *__buf; /* Buffer for IPC_INFO - (Linux-specific) */ -}; - -class SemMutex -{ -public: - SemMutex(key_t key) : - key_(key), sem_id_(semget(key, 1, 0666)) - { - if (sem_id_ == -1) { throw "error create semaphore."; } - } - ~SemMutex() {} - - bool try_lock() - { - sembuf op = {0, -1, SEM_UNDO | IPC_NOWAIT}; - return semop(sem_id_, &op, 1) == 0; - } - - void lock() - { - sembuf op = {0, -1, SEM_UNDO}; - semop(sem_id_, &op, 1); - } - - void unlock() - { - sembuf op = {0, 1, SEM_UNDO}; - semop(sem_id_, &op, 1); - } - -private: - key_t key_; - int sem_id_; -}; - -template <class Lock> -class Guard -{ -public: - Guard(Lock &l) : - l_(l) { l_.lock(); } - ~Guard() { l_.unlock(); } - -private: - Guard(const Guard &); - Guard(Guard &&); - Lock &l_; -}; template <unsigned PowerSize = 4, class Int = int64_t> class AtomicQueue @@ -194,8 +57,6 @@ AtomicQueue() { memset(this, 0, sizeof(*this)); } size_type head() const { return head_.load(); } size_type tail() const { return tail_.load(); } - bool like_empty() const { return head() == tail() && Empty(buf[head()]); } - bool like_full() const { return head() == tail() && !Empty(buf[head()]); } bool push(const Data d, bool try_more = false) { bool r = false; @@ -276,48 +137,8 @@ public: typedef int64_t Data; typedef std::function<Data(const Data)> Handler; - bool ClientRequest(const Data request, Data &reply) - { - auto end_time = now() + 3s; - do { - Data cur = data_.load(); - if (GetState(cur) == eStateFree && - DataCas(cur, Encode(request, eStateRequest))) { - do { - yield(); - cur = data_.load(); - if (GetState(cur) == eStateReply) { - DataCas(cur, Encode(0, eStateFree)); - reply = Decode(cur); - return true; - } - } while (now() < end_time); - } - yield(); - } while (now() < end_time); - return false; - } - - bool ServerProcess(Handler onReq) - { - Data cur = data_.load(); - switch (GetState(cur)) { - case eStateRequest: - if (DataCas(cur, Encode(onReq(Decode(cur)), eStateReply))) { - timestamp_ = now(); - return true; - } - break; - case eStateReply: - if (timestamp_.load() + 3s < now()) { - DataCas(cur, Encode(0, eStateFree)); - } - break; - case eStateFree: - default: break; - } - return false; - } + bool ClientRequest(const Data request, Data &reply); + bool ServerProcess(Handler onReq); private: enum State { @@ -328,7 +149,7 @@ static int GetState(Data d) { return d & MaskBits(3); } static Data Encode(Data d, State st) { return (d << 3) | st; } static Data Decode(Data d) { return d >> 3; } - static void yield() { QuickSleep(); } + static void yield() { std::this_thread::sleep_for(10us); } typedef steady_clock::duration Duration; Duration now() { return steady_clock::now().time_since_epoch(); } diff --git a/src/shm_msg_queue.cpp b/src/shm_msg_queue.cpp index be2d2a2..9db4c6b 100644 --- a/src/shm_msg_queue.cpp +++ b/src/shm_msg_queue.cpp @@ -53,22 +53,6 @@ ShmMsgQueue::~ShmMsgQueue() {} -#ifndef BH_USE_ATOMIC_Q -ShmMsgQueue::Mutex &ShmMsgQueue::GetMutex(const MQId id) -{ - static std::unordered_map<MQId, std::shared_ptr<Mutex>> imm; - - static std::mutex mtx; - std::lock_guard<std::mutex> lock(mtx); - auto pos = imm.find(id); - if (pos == imm.end()) { - pos = imm.emplace(id, new Mutex(id)).first; - // pos = imm.emplace(id, new Mutex()).first; - } - return *pos->second; -} -#endif - bool ShmMsgQueue::Remove(SharedMemory &shm, const MQId id) { Queue *q = Find(shm, id); @@ -95,17 +79,9 @@ try { //TODO find from center, or use offset. ShmMsgQueue dest(remote.offset_, shm, remote.id_); -#ifndef BH_USE_ATOMIC_Q - Guard lock(GetMutex(remote_id)); -#endif return dest.queue().TryWrite(val); } catch (...) { // SetLastError(eNotFound, "remote not found"); return false; } } - -// Test shows that in the 2 cases: -// 1) build msg first, then find remote queue; -// 2) find remote queue first, then build msg; -// 1 is about 50% faster than 2, maybe cache related. diff --git a/src/shm_msg_queue.h b/src/shm_msg_queue.h index 6d922aa..23faa24 100644 --- a/src/shm_msg_queue.h +++ b/src/shm_msg_queue.h @@ -25,23 +25,11 @@ using namespace bhome_shm; using namespace bhome_msg; -#define BH_USE_ATOMIC_Q - class ShmMsgQueue { public: typedef int64_t RawData; - -#ifdef BH_USE_ATOMIC_Q typedef ShmObject<SharedQ63<0>> Shmq; -#else - typedef ShmObject<SharedQueue<RawData>> Shmq; - // typedef robust::FMutex Mutex; - // typedef robust::SemMutex Mutex; - typedef robust::NullMutex Mutex; - typedef robust::Guard<Mutex> Guard; -#endif - typedef Shmq::Data Queue; typedef Shmq::ShmType ShmType; typedef uint64_t MQId; @@ -55,21 +43,8 @@ ShmType &shm() const { return queue_.shm(); } int64_t AbsAddr() const { return queue_.offset(); } - bool Recv(RawData &val, const int timeout_ms) - { -#ifndef BH_USE_ATOMIC_Q - Guard lock(GetMutex(Id())); -#endif - return queue().Read(val, timeout_ms); - } - - bool TryRecv(RawData &val) - { -#ifndef BH_USE_ATOMIC_Q - Guard lock(GetMutex(Id())); -#endif - return queue().TryRead(val); - } + bool Recv(RawData &val, const int timeout_ms) { return queue().Read(val, timeout_ms); } + bool TryRecv(RawData &val) { return queue().TryRead(val); } bool Recv(MsgI &msg, const int timeout_ms) { return Recv(msg.OffsetRef(), timeout_ms); } bool TryRecv(MsgI &msg) { return TryRecv(msg.OffsetRef()); } @@ -78,9 +53,6 @@ bool TrySend(const MQInfo &remote, const RawData val) { return TrySend(shm(), remote, val); } private: -#ifndef BH_USE_ATOMIC_Q - static Mutex &GetMutex(const MQId id); -#endif MQId id_; Queue &queue() { return *queue_.data(); } Shmq queue_; diff --git a/utest/robust_test.cpp b/utest/robust_test.cpp index ea6144c..3270481 100644 --- a/utest/robust_test.cpp +++ b/utest/robust_test.cpp @@ -20,7 +20,7 @@ { AtomicReqRep rr; auto client = [&]() { - for (int i = 0; i < 20; ++i) { + for (int i = 0; i < 5; ++i) { int64_t reply = 0; bool r = rr.ClientRequest(i, reply); printf("init request %d, %s, reply %d\n", i, (r ? "ok" : "failed"), reply); @@ -196,130 +196,3 @@ printf("total: %ld, expected: %ld\n", total.load(), correct_total); BOOST_CHECK_EQUAL(total.load(), correct_total); } - -BOOST_AUTO_TEST_CASE(MutexTest) -{ - { - int sem_id = semget(100, 1, 0666 | IPC_CREAT); - auto P = [&]() { - sembuf op = {0, -1, SEM_UNDO}; - semop(sem_id, &op, 1); - }; - auto V = [&]() { - sembuf op = {0, 1, SEM_UNDO}; - semop(sem_id, &op, 1); - }; - for (int i = 0; i < 10; ++i) { - V(); - } - Sleep(10s); - - return; - } - - // typedef robust::MFMutex RobustMutex; - typedef robust::SemMutex RobustMutex; - - for (int i = 0; i < 20; ++i) { - int size = i; - int left = size & 7; - int rsize = size + ((8 - left) & 7); - printf("size: %3d, rsize: %3d\n", size, rsize); - } - SharedMemory &shm = TestShm(); - // shm.Remove(); - // return; - GlobalInit(shm); - - const std::string mtx_name("test_mutex"); - const std::string int_name("test_int"); - // auto mtx = shm.FindOrCreate<RobustMutex>(mtx_name, 12345); - RobustMutex rmtx(12345); - auto mtx = &rmtx; - auto pi = shm.FindOrCreate<int>(int_name, 100); - - std::mutex m; - typedef std::chrono::steady_clock Clock; - - if (pi) { - auto old = *pi; - printf("int : %d, add1: %d\n", old, ++*pi); - } - - auto LockSpeed = [](auto &mutex, const std::string &name) { - const int ntimes = 1000 * 1; - auto Lock = [&]() { - for (int i = 0; i < ntimes; ++i) { - mutex.lock(); - mutex.unlock(); - } - }; - printf("\nTesting %s lock/unlock %d times\n", name.c_str(), ntimes); - { - boost::timer::auto_cpu_timer timer; - printf("1 thread: "); - Lock(); - } - auto InThread = [&](int nthread) { - boost::timer::auto_cpu_timer timer; - printf("%d threads: ", nthread); - std::vector<std::thread> vt; - for (int i = 0; i < nthread; ++i) { - vt.emplace_back(Lock); - } - for (auto &t : vt) { - t.join(); - } - }; - InThread(4); - InThread(16); - InThread(100); - InThread(1000); - }; - NullMutex null_mtx; - std::mutex std_mtx; - CasMutex cas_mtx; - FMutex mfmtx(3); - boost::interprocess::interprocess_mutex ipc_mutex; - SemMutex sem_mtx(3); - LockSpeed(null_mtx, "null mutex"); - LockSpeed(std_mtx, "std::mutex"); - // LockSpeed(cas_mtx, "CAS mutex"); - LockSpeed(ipc_mutex, "boost ipc mutex"); - LockSpeed(mfmtx, "mutex+flock"); - LockSpeed(sem_mtx, "sem mutex"); - - auto TryLock = [&]() { - if (mtx->try_lock()) { - printf("try_lock ok\n"); - return true; - } else { - printf("try_lock failed\n"); - return false; - } - }; - auto Unlock = [&]() { - mtx->unlock(); - printf("unlocked\n"); - }; - - if (mtx) { - printf("mtx exists\n"); - if (TryLock()) { - // Sleep(10s); - auto op = [&]() { - if (TryLock()) { - Unlock(); - } - }; - op(); - std::thread t(op); - t.join(); - // Unlock(); - } else { - // mtx->unlock(); - } - } else { - printf("mtx not exists\n"); - } -} \ No newline at end of file diff --git a/utest/simple_tests.cpp b/utest/simple_tests.cpp index e9131b0..6d3c7a2 100644 --- a/utest/simple_tests.cpp +++ b/utest/simple_tests.cpp @@ -104,21 +104,6 @@ } } -BOOST_AUTO_TEST_CASE(TimedWaitTest) -{ - SharedMemory &shm = TestShm(); - GlobalInit(shm); - ShmMsgQueue q(shm, NewSession(), 64); - for (int i = 0; i < 2; ++i) { - int ms = i * 100; - printf("Timeout Test %4d: ", ms); - boost::timer::auto_cpu_timer timer; - MsgI msg(shm); - bool r = q.Recv(msg, ms); - BOOST_CHECK(!r); - } -} - BOOST_AUTO_TEST_CASE(RefCountTest) { SharedMemory &shm = TestShm(); @@ -126,7 +111,8 @@ GlobalInit(shm); Msg m0(1000, shm); - BOOST_CHECK(m0.valid()); + BOOST_CHECK(!m0.valid()); + m0.Make(100); BOOST_CHECK_EQUAL(m0.Count(), 1); Msg m1 = m0; BOOST_CHECK(m1.valid()); diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp deleted file mode 100644 index f33f0db..0000000 --- a/utest/speed_test.cpp +++ /dev/null @@ -1,220 +0,0 @@ -/* - * ===================================================================================== - * - * Filename: speed_test.cpp - * - * Description: - * - * Version: 1.0 - * Created: 2021骞�03鏈�30鏃� 11鏃�35鍒�21绉� - * Revision: none - * Compiler: gcc - * - * Author: Li Chao (), - * Organization: - * - * ===================================================================================== - */ -#include "robust.h" -#include "util.h" - -using namespace robust; - -BOOST_AUTO_TEST_CASE(SpeedTest) -{ - SharedMemory &shm = TestShm(); - GlobalInit(shm); - MQId server_id = NewSession(); - ShmMsgQueue server(server_id, shm, 1000); - - const int timeout = 1000; - const uint32_t data_size = 1001; - const std::string proc_id = "demo_proc"; - std::atomic<int64_t> nwrite(0); - std::atomic<int64_t> nread(0); - - std::string str(data_size, 'a'); - auto Writer = [&](int writer_id, uint64_t n) { - MQId cli_id = NewSession(); - - ShmMsgQueue mq(cli_id, shm, 64); - MsgI msg(shm); - MsgRequestTopic body; - body.set_topic("topic"); - body.set_data(str); - auto head(InitMsgHead(GetType(body), proc_id, mq.Id())); - msg.Make(head, body); - assert(msg.valid()); - DEFER1(msg.Release();); - - for (uint64_t i = 0; i < n; ++i) { - msg.AddRef(); - while (!mq.TrySend({server.Id(), server.AbsAddr()}, msg.Offset())) {} - ++nwrite; - } - }; - auto Reader = [&](int reader_id, std::atomic<bool> *run, bool isfork) { - ShmMsgQueue &mq = server; - auto now = []() { return steady_clock::now(); }; - auto tm = now(); - while (*run) { - MsgI msg(shm); - BHMsgHead head; - if (mq.TryRecv(msg)) { - DEFER1(msg.Release()); - tm = now(); - ++nread; - } else if (isfork) { - if (now() > tm + 1s) { - exit(0); // for forked quit after 1s. - } - } - } - }; - auto State = [&](std::atomic<bool> *run) { - auto init = shm.get_free_memory(); - printf("shm init : %ld\n", init); - uint64_t last_read = 0; - while (*run) { - auto cur = shm.get_free_memory(); - auto cur_read = nread.load(); - printf("shm used : %8ld/%ld, write: %8ld, read: %8ld, speed: %8ld\n", init - cur, init, nwrite.load(), cur_read, cur_read - last_read); - last_read = cur_read; - std::this_thread::sleep_for(1s); - } - }; - - int nwriters[] = {1, 10, 100, 1000}; - int nreaders[] = {2}; - const int64_t total_msg = 1000 * 1000; - - auto Test = [&](auto &www, auto &rrr, bool isfork) { - for (auto nreader : nreaders) { - for (auto nwriter : nwriters) { - const uint64_t nmsg = total_msg / nwriter; - std::atomic<bool> run(true); - std::this_thread::sleep_for(10ms); - boost::timer::auto_cpu_timer timer; - for (int i = 0; i < nreader; ++i) { - rrr.Launch(Reader, i, &run, isfork); - } - for (int i = 0; i < nwriter; ++i) { - www.Launch(Writer, i, nmsg); - } - www.WaitAll(); - printf("writer finished\n"); - run.store(false); - rrr.WaitAll(); - printf("Write %ld msg R(%3d) W(%3d), : ", total_msg, nreader, nwriter); - } - } - }; - - std::atomic<bool> run(true); - ThreadManager state; - state.Launch(State, &run); - DEFER1(run.store(false);); - - // typedef ProcessManager Manager; - // typedef ThreadManager Manager; - // const bool isfork = IsSameType<Manager, ProcessManager>::value; - - if (0) { - ThreadManager tw, tr; - printf("---------------- Testing thread io: -------------------------------------------------------\n"); - Test(tw, tr, false); - } - - if (1) { - ProcessManager pw, pr; - printf("================ Testing process io: =======================================================\n"); - Test(pw, pr, true); - } -} - -// Send Recv Test -BOOST_AUTO_TEST_CASE(SRTest) -{ - const int qlen = 64; - const size_t msg_length = 100; - std::string msg_content(msg_length, 'a'); - msg_content[20] = '\0'; - const std::string client_proc_id = "client_proc"; - const std::string server_proc_id = "server_proc"; - - SharedMemory &shm = TestShm(); - // shm.Remove(); - // return; - GlobalInit(shm); - - auto Avail = [&]() { return shm.get_free_memory(); }; - auto init_avail = Avail(); - ShmSocket srv(shm, NewSession(), qlen); - ShmSocket cli(shm, NewSession(), qlen); - - int ncli = 1; - uint64_t nmsg = 1000 * 1000 * 1; - std::atomic<uint64_t> count(0); - - std::atomic<int64_t> last_time(NowSec() - 1); - std::atomic<uint64_t> last_count(0); - - auto PrintStatus = [&](int64_t cur) { - std::cout << "time: " << cur; - printf(", total msg:%10ld, speed:[%8ld/s], used mem:%8ld\n", - count.load(), count - last_count.exchange(count), init_avail - Avail()); - }; - auto onRecv = [&](ShmSocket &sock, MsgI &msg, BHMsgHead &head) { - ++count; - auto cur = NowSec(); - if (last_time.exchange(cur) < cur) { - PrintStatus(cur); - } - }; - cli.Start(onRecv, 2); - - auto Client = [&](int cli_id, int nmsg) { - for (int i = 0; i < nmsg; ++i) { - auto Req = [&]() { - MsgRequestTopic req_body; - req_body.set_topic("topic"); - req_body.set_data(msg_content); - auto req_head(InitMsgHead(GetType(req_body), client_proc_id, cli.id())); - auto route = req_head.add_route(); - route->set_mq_id(cli.id()); - route->set_abs_addr(cli.AbsAddr()); - return cli.Send({srv.id(), srv.AbsAddr()}, req_head, req_body); - }; - - Req(); - } - }; - auto onRequest = [&](ShmSocket &sock, MsgI &msg, BHMsgHead &head) { - if (head.type() == kMsgTypeRequestTopic) { - MQInfo src_mq = {head.route()[0].mq_id(), head.route()[0].abs_addr()}; - - MsgRequestTopic reply_body; - reply_body.set_topic("topic"); - reply_body.set_data(msg_content); - auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, srv.id(), head.msg_id())); - srv.Send(src_mq, reply_head, reply_body); - } - }; - srv.Start(onRequest); - - boost::timer::auto_cpu_timer timer; - DEFER1(printf("Request Reply Test:");); - - ThreadManager clients; - - printf("client threads: %d, msgs : %ld, total msg: %ld\n", ncli, nmsg, ncli * nmsg); - for (int i = 0; i < ncli; ++i) { clients.Launch(Client, i, nmsg); } - clients.WaitAll(); - printf("request ok: %ld\n", count.load()); - do { - std::this_thread::sleep_for(100ms); - } while (count.load() < ncli * nmsg); - PrintStatus(NowSec()); - srv.Stop(); - // BOOST_CHECK_THROW(reply.Count(), int); -} diff --git a/utest/utest.cpp b/utest/utest.cpp index 7cb9587..60b490f 100644 --- a/utest/utest.cpp +++ b/utest/utest.cpp @@ -56,294 +56,6 @@ } } printf("time: %ld ns\n", (tps.back() - tps.front()).count()); - return; - // sub topic partial match. - Topic topics[] = { - "", - ".", - "a", - "sp", - "sport", - "sport.", - "sport.a", - "sport.a.b.c", - "sport.ab.c", - "sport.basketball", - "sport.football", - }; - const char sep = '.'; - auto Adjust = [&](const std::string &user_topic) { - if (user_topic.empty() || user_topic.back() == sep) { - return user_topic; - } else { - return user_topic + sep; - } - }; - - for (auto &t : topics) { - const std::string &a = Adjust(t); - printf("orig: %20s adjusted: %20s parts:[", ("'" + t + "'").c_str(), ('\'' + a + '\'').c_str()); - - size_t pos = 0; - while (true) { - auto &topic = t; - pos = topic.find(kTopicSep, pos); - if (pos == topic.npos || ++pos == topic.size()) { - // Find1(std::string()); // sub all. - break; - } else { - printf("'%s',", topic.substr(0, pos).c_str()); - } - } - printf("]\n"); - } -} - -BOOST_AUTO_TEST_CASE(PubSubTest) -{ - SharedMemory &shm = TestShm(); - GlobalInit(shm); - - auto Avail = [&]() { return shm.get_free_memory(); }; - auto init_avail = Avail(); - int *flag = shm.FindOrCreate<int>("flag", 123); - printf("flag = %d\n", *flag); - ++*flag; - const std::string sub_proc_id = "subscriber"; - const std::string pub_proc_id = "publisher"; - - BHCenter center(shm); - center.Start(); - - Sleep(100ms); - - std::atomic<uint64_t> total_count(0); - std::atomic<int64_t> last_time(NowSec() - 1); - std::atomic<uint64_t> last_count(0); - - const uint64_t nmsg = 100 * 2; - const int timeout = 1000; - auto Sub = [&](int id, const std::vector<std::string> &topics) { - DemoNode client("client_" + std::to_string(id), shm); - MsgTopicList tlist; - for (auto &t : topics) { - tlist.add_topic_list(t); - } - MsgCommonReply reply_body; - bool r = client.Subscribe(tlist, reply_body, timeout); - if (!r) { - printf("client subscribe failed.\n"); - } - std::mutex mutex; - std::condition_variable cv; - - std::atomic<uint64_t> n(0); - auto OnTopicData = [&](const std::string &proc_id, const MsgPublish &pub) { - ++total_count; - - auto cur = NowSec(); - if (last_time.exchange(cur) < cur) { - std::cout << "time: " << cur; - printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n", - total_count.load(), total_count - last_count.exchange(total_count), init_avail - Avail()); - } - if (++n >= nmsg * topics.size()) { - cv.notify_one(); - } - // printf("sub %2d recv: %s/%s\n", id, pub.topic().c_str(), pub.data().c_str()); - }; - client.SubscribeStartWorker(OnTopicData, 1); - - std::unique_lock<std::mutex> lk(mutex); - cv.wait(lk); - }; - - auto Pub = [&](const std::string &topic) { - DemoNode provider("server_" + topic, shm); - - for (unsigned i = 0; i < nmsg; ++i) { - std::string data = topic + std::to_string(i) + std::string(1000, '-'); - MsgPublish pub; - pub.set_topic(topic); - pub.set_data(data); - bool r = provider.Publish(pub, 0); - if (!r) { - static std::atomic<int> an(0); - int n = ++an; - printf("pub %d ret: %s\n", n, r ? "ok" : "fail"); - } - } - }; - ThreadManager threads; - typedef std::vector<Topic> Topics; - Topics topics; - for (int i = 0; i < 100; ++i) { - topics.push_back("t" + std::to_string(i)); - } - Topics part; - boost::timer::auto_cpu_timer pubsub_timer; - for (size_t i = 0; i < topics.size(); ++i) { - part.push_back(topics[i]); - threads.Launch(Sub, i, topics); - } - Sleep(100ms); - for (auto &topic : topics) { - threads.Launch(Pub, topic); - } - threads.Launch(Pub, "some_else"); - - threads.WaitAll(); - - printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n", - total_count.load(), total_count - last_count.exchange(total_count), init_avail - Avail()); -} - -namespace -{ -struct C { - C() { printf("+C\n"); } - C(const C &c) { printf("+C(const C&)\n"); } - void F() { printf("C::F()\n"); } - ~C() { printf("-C\n"); } - char arr[100]; -}; -int F(C &c) { return printf(":::::::::::::F()\n"); } -} // namespace - -BOOST_AUTO_TEST_CASE(ReqRepTest) -{ - SharedMemory &shm = TestShm(); - GlobalInit(shm); - - auto Avail = [&]() { return shm.get_free_memory(); }; - auto init_avail = Avail(); - int *flag = shm.FindOrCreate<int>("flag", 123); - printf("flag = %d\n", *flag); - ++*flag; - - const std::string client_proc_id = "client_proc_"; - const std::string server_proc_id = "server_proc_"; - - BHCenter center(shm); - center.Start(); - std::atomic<bool> run(true); - - auto Client = [&](const std::string &topic, const int nreq) { - DemoNode client(client_proc_id + topic, shm); - - std::atomic<int> count(0); - std::string reply; - auto onRecv = [&](const BHMsgHead &head, const MsgRequestTopicReply &msg) { - reply = msg.data(); - if (++count >= nreq) { - printf("count: %d\n", count.load()); - } - }; - MsgRequestTopic req; - req.set_topic(topic); - req.set_data("data " + std::string(100, 'a')); - - client.ClientStartWorker(onRecv, 2); - - boost::timer::auto_cpu_timer timer; - for (int i = 0; i < nreq; ++i) { - std::string msg_id; - if (!client.ClientAsyncRequest(BHAddress(), req, msg_id)) { - printf("client request failed\n"); - ++count; - } - - // std::string proc_id; - // MsgRequestTopicReply reply; - // if (!client.ClientSyncRequest(req, proc_id, reply, 1000)) { - // printf("client request failed\n"); - // } - // ++count; - } - do { - std::this_thread::sleep_for(100ms); - } while (count.load() < nreq); - client.Stop(); - printf("request %s %d done ", topic.c_str(), count.load()); - }; - - std::atomic_uint64_t server_msg_count(0); - auto Server = [&](const std::string &name, const std::vector<std::string> &topics) { - DemoNode server(name, shm); - - auto onDataSync = [&](const std::string &proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply) { - ++server_msg_count; - reply.set_data(request.topic() + ':' + request.data()); - return true; - }; - auto onDataAsync = [&](void *src, std::string &proc_id, MsgRequestTopic &request) { - ++server_msg_count; - MsgRequestTopicReply reply; - reply.set_data(request.topic() + ':' + request.data()); - server.ServerSendReply(src, reply); - }; - server.ServerStart(onDataAsync); - - MsgTopicList rpc; - for (auto &topic : topics) { - rpc.add_topic_list(topic); - } - MsgCommonReply reply_body; - if (!server.ServerRegisterRPC(rpc, reply_body, 100)) { - printf("server register topic failed\n"); - return; - } - - while (run) { - std::this_thread::sleep_for(100ms); - } - }; - ThreadManager clients, servers; - std::vector<Topic> topics = {"topic1", "topic2"}; - servers.Launch(Server, "server", topics); - Sleep(100ms); - for (auto &t : topics) { - clients.Launch(Client, t, 1000 * 100 * 2); - } - clients.WaitAll(); - printf("clients done, server replyed: %ld\n", server_msg_count.load()); - run = false; - servers.WaitAll(); -} - -BOOST_AUTO_TEST_CASE(HeartbeatTest) -{ - const std::string shm_name("ShmHeartbeat"); - ShmRemover auto_remove(shm_name); - SharedMemory shm(shm_name, 1024 * 1024 * 50); - - BHCenter center(shm); - center.Start(); - - { - - DemoNode node("demo_node", shm); - auto Check = [&]() { - bool r = node.Heartbeat(100); - printf("hearbeat ret : %s\n", r ? "ok" : "failed"); - }; - Check(); - for (int i = 0; i < 3; ++i) { - Sleep(1s); - Check(); - } - Sleep(4s); - for (int i = 0; i < 2; ++i) { - Sleep(1s); - Check(); - } - } - Sleep(8s); -} -inline int MyMin(int a, int b) -{ - printf("MyMin\n"); - return a < b ? a : b; } int test_main(int argc, char *argv[]) @@ -352,10 +64,6 @@ int a = 0; int b = 0; BOOST_CHECK_EQUAL(a, b); - int n = MyMin(4, 6); - for (int i = 0; i < n; ++i) { - printf("i = %d\n", i); - } return 0; } -- Gitblit v1.8.0