change node init, no shm lock any more.
| | |
| | | |
| | | // center name, no relative to shm. |
| | | const std::string &id() const { return id_; } |
| | | void OnNodeInit(ShmSocket &socket, const int64_t val) |
| | | int64_t OnNodeInit(ShmSocket &socket, const int64_t val) |
| | | { |
| | | LOG_FUNCTION; |
| | | SharedMemory &shm = socket.shm(); |
| | | MQId ssn = (val >> 4) & MaskBits(60); |
| | | MQId ssn = (val >> 4) & MaskBits(56); |
| | | int reply = EncodeCmd(eCmdNodeInitReply); |
| | | |
| | | if (nodes_.find(ssn) != nodes_.end()) { |
| | | return; // ignore in exists. |
| | | return reply; // ignore if exists. |
| | | } |
| | | |
| | | auto UpdateRegInfo = [&](Node &node) { |
| | | node->state_.timestamp_ = NowSec() - offline_time_; |
| | | node->state_.UpdateState(NowSec(), offline_time_, kill_time_); |
| | | |
| | | // create sockets. |
| | | const int nsocks = 4; |
| | | try { |
| | | for (int i = 0; i < nsocks; ++i) { |
| | | ShmSocket tmp(shm, true, ssn + i, 16); |
| | | node->addrs_.emplace(ssn + i, tmp.AbsAddr()); |
| | | } |
| | | ShmSocket tmp(shm, true, ssn, 16); |
| | | node->addrs_.emplace(ssn, tmp.AbsAddr()); |
| | | return true; |
| | | } catch (...) { |
| | | for (int i = 0; i < nsocks; ++i) { |
| | | ShmSocket::Remove(shm, ssn + i); |
| | | } |
| | | return false; |
| | | } |
| | | }; |
| | |
| | | auto PrepareProcInit = [&](Node &node) { |
| | | bool r = false; |
| | | ShmMsg init_msg; |
| | | if (init_msg.Make(GetAllocSize(CalcAllocIndex(900)))) { |
| | | // 31bit pointer, 4bit cmd+flag |
| | | int64_t reply = (init_msg.Offset() << 4) | EncodeCmd(eCmdNodeInitReply); |
| | | r = SendAllocReply(socket, {ssn, node->addrs_[ssn]}, reply, init_msg); |
| | | } |
| | | return r; |
| | | DEFER1(init_msg.Release()); |
| | | MsgProcInit body; |
| | | auto head = InitMsgHead(GetType(body), id(), ssn); |
| | | return init_msg.Make(GetAllocSize(CalcAllocIndex(900))) && |
| | | init_msg.Fill(ShmMsg::Serialize(head, body)) && |
| | | SendAllocMsg(socket, {ssn, node->addrs_[ssn]}, init_msg); |
| | | }; |
| | | |
| | | Node node(new NodeInfo); |
| | | if (UpdateRegInfo(node) && PrepareProcInit(node)) { |
| | | reply |= (node->addrs_[ssn] << 4); |
| | | nodes_[ssn] = node; |
| | | LOG_INFO() << "new node ssn (" << ssn << ") init"; |
| | | } else { |
| | | for (int i = 0; i < 10; ++i) { |
| | | ShmSocket::Remove(shm, ssn + i); |
| | | } |
| | | ShmSocket::Remove(shm, ssn); |
| | | } |
| | | return reply; |
| | | } |
| | | void RecordMsg(const MsgI &msg) { msgs_.RecordMsg(msg); } |
| | | void RecordMsg(const MsgI &msg) |
| | | { |
| | | msg.reset_managed(true); |
| | | msgs_.RecordMsg(msg); |
| | | } |
| | | |
| | | bool SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg) |
| | | { |
| | |
| | | assert(IsCmd(val)); |
| | | int cmd = DecodeCmd(val); |
| | | switch (cmd) { |
| | | case eCmdNodeInit: OnNodeInit(socket, val); break; |
| | | case eCmdAllocRequest0: OnAlloc(socket, val); break; |
| | | case eCmdFree: OnFree(socket, val); break; |
| | | default: return false; |
| | |
| | | MsgProcInitReply ProcInit(const BHMsgHead &head, MsgProcInit &msg) |
| | | { |
| | | LOG_DEBUG() << "center got proc init."; |
| | | auto pos = nodes_.find(head.ssn_id()); |
| | | if (pos == nodes_.end()) { |
| | | return MakeReply<MsgProcInitReply>(eNotFound, "Node Not Initialised"); |
| | | } |
| | | auto index = procs_.Put(head.proc_id(), head.ssn_id()); |
| | | auto reply(MakeReply<MsgProcInitReply>(eSuccess)); |
| | | reply.set_proc_index(index); |
| | | return reply; |
| | | |
| | | auto &node = pos->second; |
| | | try { |
| | | for (int i = 0; i < msg.extra_mq_num(); ++i) { |
| | | ShmSocket tmp(BHomeShm(), true, head.ssn_id() + i + 1, 16); |
| | | node->addrs_.emplace(tmp.id(), tmp.AbsAddr()); |
| | | auto addr = reply.add_extra_mqs(); |
| | | addr->set_mq_id(tmp.id()); |
| | | addr->set_abs_addr(tmp.AbsAddr()); |
| | | } |
| | | return reply; |
| | | } catch (...) { |
| | | LOG_ERROR() << "proc init create mq error"; |
| | | return MakeReply<MsgProcInitReply>(eError, "Create mq failed."); |
| | | } |
| | | } |
| | | |
| | | MsgCommonReply Register(const BHMsgHead &head, MsgRegister &msg) |
| | |
| | | // now we can talk. |
| | | auto OnCenterIdle = [center_ptr](ShmSocket &socket) { |
| | | auto ¢er = *center_ptr; |
| | | auto onInit = [&](const int64_t request) { |
| | | return center->OnNodeInit(socket, request); |
| | | }; |
| | | BHCenterHandleInit(onInit); |
| | | center->OnTimer(); |
| | | }; |
| | | |
| | |
| | | MsgTopicList topics = 1; |
| | | } |
| | | |
| | | message MsgProcInit{ } // proc_id is in header. |
| | | message MsgProcInit{ |
| | | int32 extra_mq_num = 1; |
| | | } // proc_id is in header. |
| | | |
| | | message MsgProcInitReply { |
| | | ErrorMsg errmsg = 1; |
| | | int32 proc_index = 2; |
| | | repeated BHAddress extra_mqs = 3; |
| | | } |
| | | |
| | | service TopicRPC { |
| | |
| | | InitMQ(info.mq_sender_, NextId()); |
| | | InitMQ(info.mq_center_, NextId()); |
| | | InitMQ(info.mq_bus_, NextId()); |
| | | InitMQ(info.mq_init_, NextId()); |
| | | |
| | | pmeta->tag_ = kCenterInfoTag; |
| | | return true; |
| | |
| | | const MQInfo &BHGlobalSenderAddress() { return GetCenterInfo(BHomeShm())->mq_sender_; } |
| | | const MQInfo &BHTopicCenterAddress() { return GetCenterInfo(BHomeShm())->mq_center_; } |
| | | const MQInfo &BHTopicBusAddress() { return GetCenterInfo(BHomeShm())->mq_bus_; } |
| | | const MQInfo &BHCenterReplyAddress() { return GetCenterInfo(BHomeShm())->mq_init_; } |
| | | bool BHNodeInit(const int64_t request, int64_t &reply) |
| | | { |
| | | return GetCenterInfo(BHomeShm())->init_rr_.ClientRequest(request, reply); |
| | | } |
| | | void BHCenterHandleInit(std::function<int64_t(const int64_t)> const &onReq) |
| | | { |
| | | GetCenterInfo(BHomeShm())->init_rr_.ServerProcess(onReq); |
| | | } |
| | | |
| | | int64_t CalcAllocIndex(int64_t size) |
| | | { |
| | |
| | | #ifndef DEFS_KP8LKGD0 |
| | | #define DEFS_KP8LKGD0 |
| | | |
| | | #include "robust.h" |
| | | #include <atomic> |
| | | #include <string> |
| | | |
| | |
| | | struct CenterInfo { |
| | | MQInfo mq_center_; |
| | | MQInfo mq_bus_; |
| | | MQInfo mq_init_; |
| | | MQInfo mq_sender_; |
| | | robust::AtomicReqRep init_rr_; |
| | | std::atomic<MQId> mqid_; |
| | | CenterInfo() : |
| | | mqid_(100000) {} |
| | |
| | | const MQInfo &BHGlobalSenderAddress(); |
| | | const MQInfo &BHTopicCenterAddress(); |
| | | const MQInfo &BHTopicBusAddress(); |
| | | const MQInfo &BHCenterReplyAddress(); |
| | | bool BHNodeInit(const int64_t request, int64_t &reply); |
| | | void BHCenterHandleInit(std::function<int64_t(const int64_t)> const &onReq); |
| | | |
| | | #endif // end of include guard: DEFS_KP8LKGD0 |
| | |
| | | } |
| | | auto n = meta()->count_.Dec(); |
| | | if (n == 0) { |
| | | int64_t free_cmd = (id() << 4) | EncodeCmd(eCmdFree); |
| | | Sender().Send(BHTopicCenterAddress(), free_cmd); |
| | | if (meta()->managed_) { |
| | | int64_t free_cmd = (id() << 4) | EncodeCmd(eCmdFree); |
| | | Sender().Send(BHTopicCenterAddress(), free_cmd); |
| | | } else { |
| | | Free(); |
| | | } |
| | | } else if (n < 0) { |
| | | LOG_FATAL() << "error double release data."; |
| | | throw std::runtime_error("double release msg."); |
| | |
| | | |
| | | class ShmMsg : private StaticDataRef<SharedMemory, ShmMsg> |
| | | { |
| | | public: |
| | | static inline SharedMemory &shm() { return GetData(); } |
| | | |
| | | private: |
| | | static ShmSocket &Sender(); |
| | | |
| | | // store ref count, msgs shareing the same data should also hold a pointer of the same RefCount object. |
| | |
| | | const uint32_t size_ = 0; |
| | | const int64_t id_ = 0; |
| | | std::atomic<int64_t> timestamp_; |
| | | bool managed_ = false; |
| | | Meta(uint32_t size) : |
| | | size_(size), id_(NewId()), timestamp_(NowSec()) {} |
| | | }; |
| | |
| | | int AddRef() const { return valid() ? meta()->count_.Inc() : 1; } |
| | | int Release(); |
| | | void Free(); |
| | | void reset_managed(const bool val) const |
| | | { |
| | | if (valid()) { meta()->managed_ = val; } |
| | | } |
| | | |
| | | template <class Body> |
| | | inline bool Make(const BHMsgHead &head, const Body &body) |
| | |
| | | constexpr inline bool IsCmd(int64_t msg) { return (msg & 1) != 0; } |
| | | // int64_t pack format: cmd data ,3bit cmd, 1bit flag. |
| | | enum MsgCmd { |
| | | eCmdNodeInit = 0, // upto 59bit ssn id |
| | | eCmdNodeInit = 0, // upto 56bit ssn id |
| | | eCmdNodeInitReply = 1, // 31bit proc index, |
| | | eCmdAllocRequest0 = 2, // 8bit size, 4bit socket index, 16bit proc index, 28bit id |
| | | eCmdAllocReply0 = 3, // 31bit ptr, 28bit id, |
| | |
| | | Lock &l_; |
| | | }; |
| | | |
| | | template <class D, class Alloc = std::allocator<D>> |
| | | class CircularBuffer |
| | | { |
| | | typedef uint32_t size_type; |
| | | typedef uint32_t count_type; |
| | | typedef uint64_t meta_type; |
| | | static size_type Pos(meta_type meta) { return meta & 0xFFFFFFFF; } |
| | | static count_type Count(meta_type meta) { return meta >> 32; } |
| | | static meta_type Meta(meta_type count, size_type pos) { return (count << 32) | pos; } |
| | | |
| | | public: |
| | | typedef D Data; |
| | | |
| | | CircularBuffer(const size_type cap) : |
| | | CircularBuffer(cap, Alloc()) {} |
| | | CircularBuffer(const size_type cap, Alloc const &al) : |
| | | capacity_(cap + 1), mhead_(0), mtail_(0), al_(al), buf(al_.allocate(capacity_)) |
| | | { |
| | | if (!buf) { |
| | | throw("robust CircularBuffer allocate error: alloc buffer failed, out of mem!"); |
| | | } else { |
| | | memset(&buf[0], 0, sizeof(D) * capacity_); |
| | | } |
| | | } |
| | | ~CircularBuffer() { al_.deallocate(buf, capacity_); } |
| | | |
| | | bool push_back(const Data d) |
| | | { |
| | | auto old = mtail(); |
| | | auto pos = Pos(old); |
| | | auto full = ((capacity_ + pos + 1 - head()) % capacity_ == 0); |
| | | if (!full) { |
| | | buf[pos] = d; |
| | | return mtail_.compare_exchange_strong(old, next(old)); |
| | | } |
| | | return false; |
| | | } |
| | | bool pop_front(Data &d) |
| | | { |
| | | auto old = mhead(); |
| | | auto pos = Pos(old); |
| | | if (!(pos == tail())) { |
| | | d = buf[pos]; |
| | | return mhead_.compare_exchange_strong(old, next(old)); |
| | | } else { |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | private: |
| | | CircularBuffer(const CircularBuffer &); |
| | | CircularBuffer(CircularBuffer &&); |
| | | CircularBuffer &operator=(const CircularBuffer &) = delete; |
| | | CircularBuffer &operator=(CircularBuffer &&) = delete; |
| | | |
| | | meta_type next(meta_type meta) const { return Meta(Count(meta) + 1, (Pos(meta) + 1) % capacity_); } |
| | | size_type head() const { return Pos(mhead()); } |
| | | size_type tail() const { return Pos(mtail()); } |
| | | meta_type mhead() const { return mhead_.load(); } |
| | | meta_type mtail() const { return mtail_.load(); } |
| | | // data |
| | | const size_type capacity_; |
| | | std::atomic<meta_type> mhead_; |
| | | std::atomic<meta_type> mtail_; |
| | | Alloc al_; |
| | | typename Alloc::pointer buf = nullptr; |
| | | }; |
| | | |
| | | template <unsigned PowerSize = 4, class Int = int64_t> |
| | | class AtomicQueue |
| | | { |
| | |
| | | AData buf; |
| | | }; |
| | | |
| | | class AtomicReqRep |
| | | { |
| | | public: |
| | | typedef int64_t Data; |
| | | typedef std::function<Data(const Data)> Handler; |
| | | bool ClientRequest(const Data request, Data &reply) |
| | | { |
| | | auto end_time = now() + 3s; |
| | | do { |
| | | Data cur = data_.load(); |
| | | if (GetState(cur) == eStateFree && |
| | | DataCas(cur, Encode(request, eStateRequest))) { |
| | | do { |
| | | yield(); |
| | | cur = data_.load(); |
| | | if (GetState(cur) == eStateReply) { |
| | | DataCas(cur, Encode(0, eStateFree)); |
| | | reply = Decode(cur); |
| | | return true; |
| | | } |
| | | } while (now() < end_time); |
| | | } |
| | | yield(); |
| | | } while (now() < end_time); |
| | | return false; |
| | | } |
| | | |
| | | bool ServerProcess(Handler onReq) |
| | | { |
| | | Data cur = data_.load(); |
| | | switch (GetState(cur)) { |
| | | case eStateRequest: |
| | | if (DataCas(cur, Encode(onReq(Decode(cur)), eStateReply))) { |
| | | timestamp_ = now(); |
| | | return true; |
| | | } |
| | | break; |
| | | case eStateReply: |
| | | if (timestamp_.load() + 3s < now()) { |
| | | DataCas(cur, Encode(0, eStateFree)); |
| | | } |
| | | break; |
| | | case eStateFree: |
| | | default: break; |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | private: |
| | | enum State { |
| | | eStateFree, |
| | | eStateRequest, |
| | | eStateReply |
| | | }; |
| | | static int GetState(Data d) { return d & MaskBits(3); } |
| | | static Data Encode(Data d, State st) { return (d << 3) | st; } |
| | | static Data Decode(Data d) { return d >> 3; } |
| | | static void yield() { QuickSleep(); } |
| | | typedef steady_clock::duration Duration; |
| | | Duration now() { return steady_clock::now().time_since_epoch(); } |
| | | |
| | | bool DataCas(Data expected, Data val) { return data_.compare_exchange_strong(expected, val); } |
| | | std::atomic<Data> data_; |
| | | std::atomic<Duration> timestamp_; |
| | | }; |
| | | |
| | | } // namespace robust |
| | | #endif // end of include guard: ROBUST_Q31RCWYU |
| | |
| | | |
| | | template <class D> |
| | | using Circular = boost::circular_buffer<D, Allocator<D>>; |
| | | // using Circular = robust::CircularBuffer<D, Allocator<D>>; |
| | | |
| | | template <class D> |
| | | class SharedQueue |
| | |
| | | { |
| | | size_t size = content.size(); |
| | | auto OnResult = [content = std::move(content), msg_id, remote, cb = std::move(cb), this](MsgI &msg) mutable { |
| | | if (!msg.Fill(content)) { return; } |
| | | if (!msg.Fill(content)) { return false; } |
| | | |
| | | try { |
| | | if (!cb) { |
| | |
| | | }; |
| | | Send(remote, msg, onExpireRemoveCB); |
| | | } |
| | | return true; |
| | | } catch (...) { |
| | | SetLastError(eError, "Send internal error."); |
| | | return false; |
| | | } |
| | | }; |
| | | |
| | | #if 0 |
| | | // self alloc |
| | | MsgI msg; |
| | | if (msg.Make(size)) { |
| | | DEFER1(msg.Release()); |
| | | return OnResult(msg); |
| | | } |
| | | #else |
| | | // center alloc |
| | | return RequestAlloc(size, OnResult); |
| | | #endif |
| | | } |
| | | |
| | | bool ShmSocket::RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult) |
| | |
| | | } |
| | | LOG_DEBUG() << "Node Init, id " << ssn_id_; |
| | | auto NodeInit = [&]() { |
| | | auto SendInitCmd = [&]() { |
| | | int64_t init_cmd = ssn_id_ << 4 | EncodeCmd(eCmdNodeInit); |
| | | auto end_time = steady_clock::now() + 3s; |
| | | bool r = false; |
| | | do { |
| | | r = ShmMsgQueue::TrySend(shm(), BHTopicCenterAddress(), init_cmd); |
| | | } while (!r && steady_clock::now() < end_time); |
| | | return r; |
| | | }; |
| | | if (SendInitCmd()) { |
| | | LOG_DEBUG() << "node send init ok"; |
| | | auto end_time = steady_clock::now() + 3s; |
| | | do { |
| | | try { |
| | | //TODO recv offset, avoid query. |
| | | for (int i = eSockStart; i < eSockEnd; ++i) { |
| | | sockets_.emplace_back(new ShmSocket(shm_, false, ssn_id_ + i, kMqLen)); |
| | | } |
| | | break; |
| | | } catch (...) { |
| | | sockets_.clear(); |
| | | std::this_thread::sleep_for(100ms); |
| | | } |
| | | } while (steady_clock::now() < end_time); |
| | | int64_t init_request = ssn_id_ << 4 | EncodeCmd(eCmdNodeInit); |
| | | int64_t reply = 0; |
| | | if (BHNodeInit(init_request, reply) && DecodeCmd(reply) == eCmdNodeInitReply) { |
| | | int64_t abs_addr = reply >> 4; |
| | | sockets_.emplace_back(new ShmSocket(abs_addr, shm_, ssn_id_)); |
| | | LOG_DEBUG() << "node init ok"; |
| | | } else { |
| | | LOG_ERROR() << "Node Init Error"; |
| | | } |
| | | }; |
| | | if (sockets_.empty()) { |
| | | NodeInit(); |
| | | } |
| | | if (!sockets_.empty()) { |
| | | auto onNodeCmd = [this](ShmSocket &socket, int64_t &val) { |
| | | LOG_DEBUG() << "node recv cmd: " << DecodeCmd(val); |
| | | switch (DecodeCmd(val)) { |
| | | case eCmdNodeInitReply: { |
| | | MsgI msg(val >> 4); |
| | | DEFER1(msg.Release()); |
| | | auto onMsg = [this](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) { |
| | | LOG_DEBUG() << "node recv type: " << head.type(); |
| | | switch (head.type()) { |
| | | case kMsgTypeProcInit: { |
| | | // reuse msg to send proc init. |
| | | MsgProcInit body; |
| | | body.set_extra_mq_num(eSockEnd - eSockStart - 1); |
| | | auto head = InitMsgHead(GetType(body), info_.proc_id(), ssn_id_); |
| | | AddRoute(head, socket); |
| | | if (msg.Fill(head, body)) { |
| | | socket.Send(BHTopicCenterAddress(), msg); |
| | | if (imsg.Fill(head, body)) { |
| | | socket.Send(BHTopicCenterAddress(), imsg); |
| | | } |
| | | } break; |
| | | default: |
| | | break; |
| | | } |
| | | return true; |
| | | }; |
| | | |
| | | // recv msgs to avoid memory leak. |
| | | auto onMsg = [this](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { |
| | | LOG_DEBUG() << "node recv type: " << head.type(); |
| | | if (head.type() == kMsgTypeProcInitReply) { |
| | | case kMsgTypeProcInitReply: { |
| | | LOG_DEBUG() << "got proc init reply"; |
| | | MsgProcInitReply reply; |
| | | if (imsg.ParseBody(reply)) { |
| | | if (imsg.ParseBody(reply) && IsSuccess(reply.errmsg().errcode())) { |
| | | for (auto &addr : reply.extra_mqs()) { |
| | | LOG_DEBUG() << "add socket " << addr.abs_addr() << ", id:" << addr.mq_id(); |
| | | sockets_.emplace_back(new ShmSocket(addr.abs_addr(), shm(), addr.mq_id())); |
| | | } |
| | | SetProcIndex(reply.proc_index()); |
| | | this->state_ = eStateUnregistered; |
| | | } |
| | | } break; |
| | | default: break; |
| | | } |
| | | return true; |
| | | }; |
| | | SockNode().Start(1, onMsg, onNodeCmd); |
| | | SockNode().Start(1, onMsg); |
| | | return true; |
| | | } |
| | | return false; |
| | |
| | | |
| | | ///////////////////////////////////////////////////////////////////////////////////////// |
| | | |
| | | BOOST_AUTO_TEST_CASE(InitTest) |
| | | { |
| | | AtomicReqRep rr; |
| | | auto client = [&]() { |
| | | for (int i = 0; i < 20; ++i) { |
| | | int64_t reply = 0; |
| | | bool r = rr.ClientRequest(i, reply); |
| | | printf("init request %d, %s, reply %d\n", i, (r ? "ok" : "failed"), reply); |
| | | } |
| | | }; |
| | | |
| | | bool run = true; |
| | | auto server = [&]() { |
| | | auto onReq = [](int64_t req) { return req + 100; }; |
| | | while (run) { |
| | | rr.ServerProcess(onReq); |
| | | } |
| | | }; |
| | | |
| | | ThreadManager clients, servers; |
| | | servers.Launch(server); |
| | | for (int i = 0; i < 2; ++i) { |
| | | clients.Launch(client); |
| | | } |
| | | clients.WaitAll(); |
| | | run = false; |
| | | servers.WaitAll(); |
| | | } |
| | | |
| | | BOOST_AUTO_TEST_CASE(QueueTest) |
| | | { |
| | | const int nthread = 100; |