change node socket to vector; try lock free queue.
| | |
| | | |
| | | message BHAddress { |
| | | bytes mq_id = 1; // mqid, uuid |
| | | // bytes ip = 2; // |
| | | // int32 port = 3; |
| | | bytes ip = 2; // |
| | | int32 port = 3; |
| | | } |
| | | |
| | | message ProcInfo |
| | |
| | | } // namespace |
| | | |
| | | TopicNode::TopicNode(SharedMemory &shm) : |
| | | shm_(shm), sock_node_(shm), sock_client_(shm, kMqLen), sock_server_(shm, kMqLen), sock_sub_(shm, kMqLen), state_(eStateUnregistered) |
| | | shm_(shm), sockets_(eSockEnd), state_(eStateUnregistered) |
| | | { |
| | | for (int i = eSockStart; i < eSockEnd; ++i) { |
| | | sockets_[i].reset(new ShmSocket(shm_, kMqLen)); |
| | | } |
| | | // recv msgs to avoid memory leak. |
| | | auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; }; |
| | | SockNode().Start(default_ignore_msg); |
| | | SockClient().Start(default_ignore_msg); |
| | | SockServer().Start(default_ignore_msg); |
| | | SockSub().Start(default_ignore_msg); |
| | | for (auto &p : sockets_) { |
| | | p->Start(default_ignore_msg); |
| | | } |
| | | } |
| | | |
| | | TopicNode::~TopicNode() |
| | |
| | | Stop(); |
| | | SockNode().Stop(); |
| | | if (state() == eStateUnregistered) { |
| | | SockNode().Remove(); |
| | | SockClient().Remove(); |
| | | SockServer().Remove(); |
| | | SockSub().Remove(); |
| | | for (auto &p : sockets_) { p->Remove(); } |
| | | } |
| | | } |
| | | |
| | |
| | | } else if (nworker > 16) { |
| | | nworker = 16; |
| | | } |
| | | |
| | | SockNode().Start(); |
| | | ServerStart(server_cb, nworker); |
| | | SubscribeStartWorker(sub_cb, nworker); |
| | | ClientStartWorker(client_cb, nworker); |
| | | } |
| | | void TopicNode::Stop() |
| | | { |
| | | SockSub().Stop(); |
| | | SockServer().Stop(); |
| | | SockClient().Stop(); |
| | | for (auto &p : sockets_) { p->Stop(); } |
| | | } |
| | | |
| | | bool TopicNode::Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) |
| | |
| | | #include "socket.h" |
| | | #include <atomic> |
| | | #include <memory> |
| | | #include <vector> |
| | | |
| | | using namespace bhome_shm; |
| | | using namespace bhome_msg; |
| | |
| | | }; |
| | | |
| | | // some sockets may be the same one, using functions make it easy to change. |
| | | enum { eSockStart, |
| | | eSockNode = eSockStart, |
| | | eSockPub = eSockNode, |
| | | eSockServer, |
| | | eSockClient, |
| | | eSockSub, |
| | | eSockEnd, |
| | | }; |
| | | std::vector<std::unique_ptr<ShmSocket>> sockets_; |
| | | |
| | | ShmSocket &SockNode() { return sock_node_; } |
| | | ShmSocket &SockPub() { return SockNode(); } |
| | | ShmSocket &SockSub() { return sock_sub_; } |
| | | ShmSocket &SockClient() { return sock_client_; } |
| | | ShmSocket &SockServer() { return sock_server_; } |
| | | ShmSocket &SockNode() { return *sockets_[eSockNode]; } |
| | | ShmSocket &SockPub() { return *sockets_[eSockPub]; } |
| | | ShmSocket &SockSub() { return *sockets_[eSockSub]; } |
| | | ShmSocket &SockClient() { return *sockets_[eSockClient]; } |
| | | ShmSocket &SockServer() { return *sockets_[eSockServer]; } |
| | | |
| | | ShmSocket sock_node_; |
| | | ShmSocket sock_client_; |
| | | ShmSocket sock_server_; |
| | | ShmSocket sock_sub_; |
| | | enum State { |
| | | eStateUnregistered, |
| | | eStateOnline, |
| | |
| | | printf("maxsec: %ld\n", CountSeconds(max_time)); |
| | | |
| | | bool reg = false; |
| | | for (int i = 0; i < 10 && !reg; ++i) { |
| | | for (int i = 0; i < 3 && !reg; ++i) { |
| | | ProcInfo proc; |
| | | proc.set_proc_id("demo_client"); |
| | | proc.set_public_info("public info of demo_client. etc..."); |
| | |
| | | |
| | | BHFree(reply, reply_len); |
| | | Sleep(1s); |
| | | } |
| | | if (!reg) { |
| | | return; |
| | | } |
| | | |
| | | const std::string topic_ = "topic_"; |
| | |
| | | for (int i = 0; i < 1; ++i) { |
| | | MsgPublish pub; |
| | | pub.set_topic(topic_ + std::to_string(i)); |
| | | pub.set_data("pub_data_" + std::string(1024 * 1024, 'a')); |
| | | pub.set_data("pub_data_" + std::string(1024 * 1, 'a')); |
| | | std::string s(pub.SerializeAsString()); |
| | | BHPublish(s.data(), s.size(), 0); |
| | | // Sleep(1s); |
New file |
| | |
| | | /* |
| | | * ===================================================================================== |
| | | * |
| | | * Filename: lock_free_queue.cpp |
| | | * |
| | | * Description: |
| | | * |
| | | * Version: 1.0 |
| | | * Created: 2021年04月21日 13时57分02秒 |
| | | * Revision: none |
| | | * Compiler: gcc |
| | | * |
| | | * Author: Li Chao (), lichao@aiotlink.com |
| | | * Organization: |
| | | * |
| | | * ===================================================================================== |
| | | */ |
| | | #include "lock_free_queue.h" |
| | | #include "defs.h" |
| | | #include "util.h" |
| | | |
| | | BOOST_AUTO_TEST_CASE(LockFreeTest) |
| | | { |
| | | LockFreeQueue q(BHomeShm()); |
| | | for (int i = 0; i < 15; ++i) { |
| | | int r = q.Write(i); |
| | | printf("write %d %s\n", i, (r ? "ok" : "failed")); |
| | | } |
| | | } |
New file |
| | |
| | | /* |
| | | * ===================================================================================== |
| | | * |
| | | * Filename: lock_free_queue.h |
| | | * |
| | | * Description: |
| | | * |
| | | * Version: 1.0 |
| | | * Created: 2021年04月21日 14时03分27秒 |
| | | * Revision: none |
| | | * Compiler: gcc |
| | | * |
| | | * Author: Li Chao (), lichao@aiotlink.com |
| | | * Organization: |
| | | * |
| | | * ===================================================================================== |
| | | */ |
| | | |
| | | #ifndef LOCK_FREE_QUEUE_KQWP70HT |
| | | #define LOCK_FREE_QUEUE_KQWP70HT |
| | | |
| | | #include "shm.h" |
| | | #include <boost/interprocess/offset_ptr.hpp> |
| | | #include <boost/lockfree/queue.hpp> |
| | | |
| | | using namespace bhome_shm; |
| | | |
| | | typedef int64_t Data; |
| | | const int kQLen = 10; |
| | | class LockFreeQueue : private boost::lockfree::queue<Data, |
| | | boost::lockfree::allocator<Allocator<Data>>, |
| | | boost::lockfree::capacity<kQLen>>, |
| | | private boost::noncopyable |
| | | { |
| | | typedef boost::lockfree::queue<Data, |
| | | boost::lockfree::allocator<Allocator<Data>>, |
| | | boost::lockfree::capacity<kQLen>> |
| | | Queue; |
| | | |
| | | public: |
| | | LockFreeQueue(SharedMemory &shm) : |
| | | Queue(shm.get_segment_manager()) {} |
| | | bool Read(Data &d) { return pop(d); } |
| | | bool Write(Data const &d) { return push(d); } |
| | | template <class Func> |
| | | bool Write(Data const &d, Func onWrite) |
| | | { |
| | | if (Write(d)) { |
| | | onWrite(d); |
| | | return true; |
| | | } else { |
| | | return false; |
| | | } |
| | | } |
| | | }; |
| | | |
| | | #endif // end of include guard: LOCK_FREE_QUEUE_KQWP70HT |