lichao
2021-04-21 3931f83205f153f2bc7fc36d1a894cdc3f14b4db
change node socket to vector; try lock free queue.
2个文件已添加
4个文件已修改
144 ■■■■ 已修改文件
proto/source/bhome_msg_api.proto 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.cpp 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.h 24 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/api_test.cpp 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/lock_free_queue.cpp 29 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/lock_free_queue.h 57 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
proto/source/bhome_msg_api.proto
@@ -9,8 +9,8 @@
message BHAddress {
    bytes mq_id = 1; // mqid, uuid
    // bytes ip = 2;   //
    // int32 port = 3;
    bytes ip = 2;   //
    int32 port = 3;
}
message ProcInfo
src/topic_node.cpp
@@ -37,14 +37,16 @@
} // namespace
TopicNode::TopicNode(SharedMemory &shm) :
    shm_(shm), sock_node_(shm), sock_client_(shm, kMqLen), sock_server_(shm, kMqLen), sock_sub_(shm, kMqLen), state_(eStateUnregistered)
    shm_(shm), sockets_(eSockEnd), state_(eStateUnregistered)
{
    for (int i = eSockStart; i < eSockEnd; ++i) {
        sockets_[i].reset(new ShmSocket(shm_, kMqLen));
    }
    // recv msgs to avoid memory leak.
    auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; };
    SockNode().Start(default_ignore_msg);
    SockClient().Start(default_ignore_msg);
    SockServer().Start(default_ignore_msg);
    SockSub().Start(default_ignore_msg);
    for (auto &p : sockets_) {
        p->Start(default_ignore_msg);
    }
}
TopicNode::~TopicNode()
@@ -52,10 +54,7 @@
    Stop();
    SockNode().Stop();
    if (state() == eStateUnregistered) {
        SockNode().Remove();
        SockClient().Remove();
        SockServer().Remove();
        SockSub().Remove();
        for (auto &p : sockets_) { p->Remove(); }
    }
}
@@ -66,16 +65,14 @@
    } else if (nworker > 16) {
        nworker = 16;
    }
    SockNode().Start();
    ServerStart(server_cb, nworker);
    SubscribeStartWorker(sub_cb, nworker);
    ClientStartWorker(client_cb, nworker);
}
void TopicNode::Stop()
{
    SockSub().Stop();
    SockServer().Stop();
    SockClient().Stop();
    for (auto &p : sockets_) { p->Stop(); }
}
bool TopicNode::Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
src/topic_node.h
@@ -22,6 +22,7 @@
#include "socket.h"
#include <atomic>
#include <memory>
#include <vector>
using namespace bhome_shm;
using namespace bhome_msg;
@@ -107,17 +108,22 @@
    };
    // some sockets may be the same one, using functions make it easy to change.
    enum { eSockStart,
           eSockNode = eSockStart,
           eSockPub = eSockNode,
           eSockServer,
           eSockClient,
           eSockSub,
           eSockEnd,
    };
    std::vector<std::unique_ptr<ShmSocket>> sockets_;
    ShmSocket &SockNode() { return sock_node_; }
    ShmSocket &SockPub() { return SockNode(); }
    ShmSocket &SockSub() { return sock_sub_; }
    ShmSocket &SockClient() { return sock_client_; }
    ShmSocket &SockServer() { return sock_server_; }
    ShmSocket &SockNode() { return *sockets_[eSockNode]; }
    ShmSocket &SockPub() { return *sockets_[eSockPub]; }
    ShmSocket &SockSub() { return *sockets_[eSockSub]; }
    ShmSocket &SockClient() { return *sockets_[eSockClient]; }
    ShmSocket &SockServer() { return *sockets_[eSockServer]; }
    ShmSocket sock_node_;
    ShmSocket sock_client_;
    ShmSocket sock_server_;
    ShmSocket sock_sub_;
    enum State {
        eStateUnregistered,
        eStateOnline,
utest/api_test.cpp
@@ -155,7 +155,7 @@
    printf("maxsec: %ld\n", CountSeconds(max_time));
    bool reg = false;
    for (int i = 0; i < 10 && !reg; ++i) {
    for (int i = 0; i < 3 && !reg; ++i) {
        ProcInfo proc;
        proc.set_proc_id("demo_client");
        proc.set_public_info("public info of demo_client. etc...");
@@ -167,6 +167,9 @@
        BHFree(reply, reply_len);
        Sleep(1s);
    }
    if (!reg) {
        return;
    }
    const std::string topic_ = "topic_";
@@ -204,7 +207,7 @@
        for (int i = 0; i < 1; ++i) {
            MsgPublish pub;
            pub.set_topic(topic_ + std::to_string(i));
            pub.set_data("pub_data_" + std::string(1024 * 1024, 'a'));
            pub.set_data("pub_data_" + std::string(1024 * 1, 'a'));
            std::string s(pub.SerializeAsString());
            BHPublish(s.data(), s.size(), 0);
            // Sleep(1s);
utest/lock_free_queue.cpp
New file
@@ -0,0 +1,29 @@
/*
 * =====================================================================================
 *
 *       Filename:  lock_free_queue.cpp
 *
 *    Description:
 *
 *        Version:  1.0
 *        Created:  2021年04月21日 13时57分02秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), lichao@aiotlink.com
 *   Organization:
 *
 * =====================================================================================
 */
#include "lock_free_queue.h"
#include "defs.h"
#include "util.h"
BOOST_AUTO_TEST_CASE(LockFreeTest)
{
    LockFreeQueue q(BHomeShm());
    for (int i = 0; i < 15; ++i) {
        int r = q.Write(i);
        printf("write %d %s\n", i, (r ? "ok" : "failed"));
    }
}
utest/lock_free_queue.h
New file
@@ -0,0 +1,57 @@
/*
 * =====================================================================================
 *
 *       Filename:  lock_free_queue.h
 *
 *    Description:
 *
 *        Version:  1.0
 *        Created:  2021年04月21日 14时03分27秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), lichao@aiotlink.com
 *   Organization:
 *
 * =====================================================================================
 */
#ifndef LOCK_FREE_QUEUE_KQWP70HT
#define LOCK_FREE_QUEUE_KQWP70HT
#include "shm.h"
#include <boost/interprocess/offset_ptr.hpp>
#include <boost/lockfree/queue.hpp>
using namespace bhome_shm;
typedef int64_t Data;
const int kQLen = 10;
class LockFreeQueue : private boost::lockfree::queue<Data,
                                                     boost::lockfree::allocator<Allocator<Data>>,
                                                     boost::lockfree::capacity<kQLen>>,
                      private boost::noncopyable
{
    typedef boost::lockfree::queue<Data,
                                   boost::lockfree::allocator<Allocator<Data>>,
                                   boost::lockfree::capacity<kQLen>>
        Queue;
public:
    LockFreeQueue(SharedMemory &shm) :
        Queue(shm.get_segment_manager()) {}
    bool Read(Data &d) { return pop(d); }
    bool Write(Data const &d) { return push(d); }
    template <class Func>
    bool Write(Data const &d, Func onWrite)
    {
        if (Write(d)) {
            onWrite(d);
            return true;
        } else {
            return false;
        }
    }
};
#endif // end of include guard: LOCK_FREE_QUEUE_KQWP70HT