/*
|
* =====================================================================================
|
*
|
* Filename: msg.h
|
*
|
* Description:
|
*
|
* Version: 1.0
|
* Created: 2021年03月24日 16时49分20秒
|
* Revision: none
|
* Compiler: gcc
|
*
|
* Author: Li Chao (),
|
* Organization:
|
*
|
* =====================================================================================
|
*/
|
#ifndef MSG_5BILLZET
|
#define MSG_5BILLZET
|
|
#include "bh_util.h"
|
#include "proto.h"
|
#include "shm.h"
|
#include <atomic>
|
#include <stdint.h>
|
|
class ShmSocket;
|
namespace bhome_msg
|
{
|
using namespace bhome_shm;
|
|
// ShmMsg is safe to be stored in shared memory, so POD data or offset_ptr is required.
|
// message content layout: (meta) / header_size + header + data_size + data
|
|
class ShmMsg
|
{
|
private:
|
// store ref count, msgs shareing the same data should also hold a pointer of the same RefCount object.
|
class RefCount : private boost::noncopyable
|
{
|
std::atomic<int> num_;
|
|
public:
|
RefCount() :
|
num_(1) { static_assert(std::is_pod<decltype(num_)>::value); }
|
int Inc() { return ++num_; }
|
int Dec() { return --num_; }
|
int Get() { return num_.load(); }
|
};
|
|
typedef int64_t OffsetType;
|
static OffsetType Addr(void *ptr) { return reinterpret_cast<OffsetType>(ptr); }
|
static void *Ptr(const OffsetType offset) { return reinterpret_cast<void *>(offset); }
|
OffsetType BaseAddr() const { return Addr(shm().get_address()); }
|
|
static const uint32_t kMsgTag = 0xf1e2d3c4;
|
struct Meta {
|
static int64_t NewId()
|
{
|
static std::atomic<int64_t> id(0);
|
return ++id;
|
}
|
|
RefCount count_;
|
const uint32_t tag_ = kMsgTag;
|
const uint32_t capacity_ = 0;
|
const int64_t id_ = 0;
|
std::atomic<int64_t> timestamp_;
|
bool managed_ = false;
|
uint32_t size_ = 0;
|
Meta(uint32_t size) :
|
capacity_(size), id_(NewId()), timestamp_(NowSec()) {}
|
};
|
OffsetType offset_;
|
SharedMemory *pshm_;
|
|
void *Alloc(const size_t size, const void *src = nullptr)
|
{
|
void *p = shm().Alloc(sizeof(Meta) + size);
|
if (p) {
|
auto pmeta = new (p) Meta(size);
|
p = pmeta + 1;
|
if (src) {
|
memcpy(p, src, size);
|
}
|
}
|
return p;
|
}
|
|
private:
|
Meta *meta() const { return get<Meta>() - 1; }
|
|
template <class Body>
|
void *Pack(const BHMsgHead &head, const uint32_t head_len, const Body &body, const uint32_t body_len)
|
{
|
void *addr = get();
|
if (addr) {
|
auto p = static_cast<char *>(addr);
|
auto Pack1 = [&p](auto len, auto &&writer) {
|
Put32(p, len);
|
p += sizeof(len);
|
writer(p, len);
|
p += len;
|
};
|
Pack1(head_len, [&](void *p, int len) { head.SerializeToArray(p, len); });
|
Pack1(body_len, [&](void *p, int len) { body.SerializeToArray(p, len); });
|
meta()->size_ = 4 + head_len + 4 + body_len;
|
}
|
return addr;
|
}
|
void *Pack(const BHMsgHead &head, const uint32_t head_len, const std::string &body_content)
|
{
|
void *addr = get();
|
if (addr) {
|
auto p = static_cast<char *>(addr);
|
auto Pack1 = [&p](uint32_t len, auto &&writer) {
|
Put32(p, len);
|
p += sizeof(len);
|
writer(p, len);
|
p += len;
|
};
|
Pack1(head_len, [&](void *p, int len) { head.SerializeToArray(p, len); });
|
Pack1(body_content.size(), [&](void *p, int len) { memcpy(p, body_content.data(), len); });
|
meta()->size_ = 4 + head_len + 4 + body_content.size();
|
}
|
return addr;
|
}
|
|
void *Pack(const void *src, const size_t size)
|
{
|
void *addr = get();
|
if (addr && src) {
|
memcpy(addr, src, size);
|
meta()->size_ = size;
|
}
|
return addr;
|
}
|
|
void *Pack(const std::string &content) { return Pack(content.data(), content.size()); }
|
|
bool Make(void *addr)
|
{
|
if (!addr) {
|
return false;
|
}
|
offset_ = Addr(addr) - BaseAddr();
|
return true;
|
}
|
|
template <class T = void>
|
T *get() const { return offset_ != 0 ? static_cast<T *>(Ptr(offset_ + BaseAddr())) : nullptr; }
|
|
public:
|
explicit ShmMsg(SharedMemory &shm) :
|
offset_(0), pshm_(&shm) {}
|
ShmMsg(const OffsetType offset, SharedMemory &shm) :
|
offset_(offset), pshm_(&shm) {}
|
OffsetType Offset() const { return offset_; }
|
OffsetType &OffsetRef() { return offset_; }
|
SharedMemory &shm() const { return *pshm_; }
|
|
void swap(ShmMsg &a)
|
{
|
std::swap(offset_, a.offset_);
|
std::swap(pshm_, a.pshm_);
|
}
|
bool valid() const { return offset_ != 0 && meta()->tag_ == kMsgTag; }
|
int64_t id() const { return valid() ? meta()->id_ : 0; }
|
int64_t timestamp() const { return valid() ? meta()->timestamp_.load() : 0; }
|
size_t Size() const { return valid() ? meta()->size_ : 0; }
|
int Count() const { return valid() ? meta()->count_.Get() : 1; }
|
int AddRef() const { return valid() ? meta()->count_.Inc() : 1; }
|
int Release();
|
void Free();
|
void reset_managed(const bool val) const
|
{
|
if (valid()) { meta()->managed_ = val; }
|
}
|
|
template <class Body>
|
inline bool Make(const BHMsgHead &head, const Body &body)
|
{
|
uint32_t head_len = head.ByteSizeLong();
|
uint32_t body_len = body.ByteSizeLong();
|
uint32_t size = sizeof(head_len) + head_len + sizeof(body_len) + body_len;
|
return Make(size) && Pack(head, head_len, body, body_len);
|
}
|
inline bool Make(const BHMsgHead &head, const std::string &body_content)
|
{
|
uint32_t head_len = head.ByteSizeLong();
|
uint32_t size = sizeof(head_len) + head_len + sizeof(uint32_t) + body_content.size();
|
return Make(size) && Pack(head, head_len, body_content);
|
}
|
|
template <class Body>
|
inline bool Fill(const BHMsgHead &head, const Body &body)
|
{
|
uint32_t head_len = head.ByteSizeLong();
|
uint32_t body_len = body.ByteSizeLong();
|
uint32_t size = sizeof(head_len) + head_len + sizeof(body_len) + body_len;
|
return valid() && (meta()->capacity_ >= size) && Pack(head, head_len, body, body_len);
|
}
|
|
inline bool Make(const void *src, const size_t size) { return Make(Alloc(size, src)); }
|
inline bool Fill(const void *src, const size_t size) { return valid() && (meta()->capacity_ >= size) && Pack(src, size); }
|
|
inline bool Make(const std::string &content) { return Make(content.data(), content.size()); }
|
inline bool Fill(const std::string &content) { return Fill(content.data(), content.size()); }
|
|
inline bool Make(const size_t size) { return Make(Alloc(size)); }
|
|
template <class Body>
|
static inline std::string Serialize(const BHMsgHead &head, const Body &body)
|
{
|
uint32_t head_len = head.ByteSizeLong();
|
uint32_t body_len = body.ByteSizeLong();
|
std::string s(4 + head_len + 4 + body_len, '\0');
|
size_t pos = 0;
|
auto add1 = [&](auto &&msg, auto &&size) {
|
Put32(&s[pos], size);
|
pos += 4;
|
msg.SerializeToArray(&s[pos], size);
|
pos += size;
|
};
|
add1(head, head_len);
|
add1(body, body_len);
|
assert(pos == s.size());
|
return s;
|
}
|
|
bool ParseHead(BHMsgHead &head) const
|
{
|
auto p = get<char>();
|
assert(p);
|
uint32_t msg_size = Get32(p);
|
p += 4;
|
return head.ParseFromArray(p, msg_size);
|
}
|
std::string content() const
|
{
|
auto p = get<char>();
|
return p ? std::string(p, meta()->size_) : std::string();
|
}
|
std::string body() const
|
{
|
auto p = get<char>();
|
assert(p);
|
uint32_t size = Get32(p);
|
p += 4;
|
p += size;
|
size = Get32(p);
|
p += 4;
|
return std::string(p, size);
|
}
|
template <class Body>
|
bool ParseBody(Body &body) const
|
{
|
auto p = get<char>();
|
assert(p);
|
uint32_t size = Get32(p);
|
p += 4;
|
p += size;
|
size = Get32(p);
|
p += 4;
|
return body.ParseFromArray(p, size);
|
}
|
};
|
|
inline void swap(ShmMsg &m1, ShmMsg &m2) { m1.swap(m2); }
|
|
typedef ShmMsg MsgI;
|
|
constexpr inline int EncodeCmd(int cmd) { return ((cmd & MaskBits(3)) << 1) | 1; }
|
constexpr inline int DecodeCmd(int64_t msg) { return (msg >> 1) & MaskBits(3); }
|
constexpr inline bool IsCmd(int64_t msg) { return (msg & 1) != 0; }
|
// int64_t pack format: cmd data ,3bit cmd, 1bit flag.
|
enum MsgCmd {
|
eCmdNodeInit = 0, // upto 56bit ssn id
|
eCmdNodeInitReply = 1, // 31bit proc index,
|
eCmdAllocRequest0 = 2, // 8bit size, 4bit socket index, 16bit proc index, 28bit id
|
eCmdAllocReply0 = 3, // 31bit ptr, 28bit id,
|
eCmdFree = 4, // upto 59bit msg id,
|
};
|
|
} // namespace bhome_msg
|
|
#endif // end of include guard: MSG_5BILLZET
|