/*
|
* =====================================================================================
|
*
|
* Filename: msg.h
|
*
|
* Description:
|
*
|
* Version: 1.0
|
* Created: 2021年03月24日 16时49分20秒
|
* Revision: none
|
* Compiler: gcc
|
*
|
* Author: Li Chao (),
|
* Organization:
|
*
|
* =====================================================================================
|
*/
|
#ifndef MSG_5BILLZET
|
#define MSG_5BILLZET
|
|
#include "bh_util.h"
|
#include "proto.h"
|
#include "shm.h"
|
#include <atomic>
|
#include <boost/interprocess/offset_ptr.hpp>
|
#include <boost/uuid/uuid_generators.hpp>
|
#include <functional>
|
#include <stdint.h>
|
|
namespace bhome_msg
|
{
|
using namespace bhome_shm;
|
|
// ShmMsg is safe to be stored in shared memory, so POD data or offset_ptr is required.
|
// message content layout: (meta) / header_size + header + data_size + data
|
|
typedef boost::uuids::uuid MQId;
|
|
class ShmMsg
|
{
|
private:
|
// store ref count, msgs shareing the same data should also hold a pointer of the same RefCount object.
|
class RefCount : private boost::noncopyable
|
{
|
std::atomic<int> num_;
|
|
public:
|
RefCount() :
|
num_(1) { static_assert(std::is_pod<decltype(num_)>::value); }
|
int Inc() { return ++num_; }
|
int Dec() { return --num_; }
|
int Get() { return num_.load(); }
|
};
|
typedef int64_t Offset;
|
static Offset Addr(void *ptr) { return reinterpret_cast<Offset>(ptr); }
|
static void *Ptr(const Offset offset) { return reinterpret_cast<void *>(offset); }
|
static inline Offset BaseAddr()
|
{
|
static const Offset base = Addr(shm().get_address()); // cache value.
|
return base;
|
}
|
static inline SharedMemory &shm()
|
{
|
if (!pshm()) { throw std::string("Must set ShmMsg shm before use!"); }
|
return *pshm();
|
}
|
static inline SharedMemory *&pshm()
|
{
|
static SharedMemory *pshm = 0;
|
return pshm;
|
}
|
|
struct Meta {
|
RefCount count_;
|
};
|
Offset offset_;
|
void *Alloc(const size_t size)
|
{
|
void *p = shm().Alloc(sizeof(Meta) + size);
|
if (p) {
|
auto pmeta = new (p) Meta;
|
p = pmeta + 1;
|
}
|
return p;
|
}
|
void Free()
|
{
|
assert(valid());
|
shm().Dealloc(meta());
|
offset_ = 0;
|
assert(!valid());
|
}
|
Meta *meta() const { return get<Meta>() - 1; }
|
|
typedef std::function<void(void *p, int len)> ToArray;
|
void *Pack(const uint32_t head_len, const ToArray &headToArray,
|
const uint32_t body_len, const ToArray &bodyToArray)
|
{
|
void *addr = Alloc(sizeof(head_len) + head_len + sizeof(body_len) + body_len);
|
if (addr) {
|
auto p = static_cast<char *>(addr);
|
auto Pack1 = [&p](auto len, auto &writer) {
|
Put32(p, len);
|
p += sizeof(len);
|
writer(p, len);
|
p += len;
|
};
|
Pack1(head_len, headToArray);
|
Pack1(body_len, bodyToArray);
|
}
|
return addr;
|
}
|
|
template <class Body>
|
void *Pack(const BHMsgHead &head, const Body &body)
|
{
|
return Pack(
|
uint32_t(head.ByteSizeLong()), [&](void *p, int len) { head.SerializeToArray(p, len); },
|
uint32_t(body.ByteSizeLong()), [&](void *p, int len) { body.SerializeToArray(p, len); });
|
}
|
|
void *Pack(const std::string &content)
|
{
|
void *addr = Alloc(content.size());
|
if (addr) {
|
memcpy(addr, content.data(), content.size());
|
}
|
return addr;
|
}
|
|
bool Make(void *addr)
|
{
|
if (!addr) {
|
return false;
|
}
|
ShmMsg(addr).swap(*this);
|
return true;
|
}
|
ShmMsg(void *p) :
|
offset_(p ? (Addr(p) - BaseAddr()) : 0) {}
|
|
template <class T = void>
|
T *get() const { return static_cast<T *>(Ptr(offset_ + BaseAddr())); }
|
|
public:
|
static bool BindShm(SharedMemory &shm)
|
{
|
assert(!pshm());
|
pshm() = &shm;
|
return true;
|
}
|
|
ShmMsg() :
|
ShmMsg(nullptr) {}
|
explicit ShmMsg(const size_t size) :
|
ShmMsg(Alloc(size)) {}
|
void swap(ShmMsg &a) { std::swap(offset_, a.offset_); }
|
bool valid() const { return static_cast<bool>(offset_); }
|
|
// AddRef and Release works for both counted and not counted msg.
|
int AddRef() const { return valid() ? meta()->count_.Inc() : 1; }
|
int Release()
|
{
|
if (!valid()) {
|
return 0;
|
}
|
auto n = meta()->count_.Dec();
|
if (n == 0) {
|
Free();
|
}
|
return n;
|
}
|
int Count() const { return valid() ? meta()->count_.Get() : 1; }
|
|
template <class Body>
|
inline bool Make(const BHMsgHead &head, const Body &body) { return Make(Pack(head, body)); }
|
inline bool Make(const std::string &content) { return Make(Pack(content)); }
|
template <class Body>
|
static inline std::string Serialize(const BHMsgHead &head, const Body &body)
|
{
|
uint32_t head_len = head.ByteSizeLong();
|
uint32_t body_len = body.ByteSizeLong();
|
std::string s(4 + head_len + 4 + body_len, '\0');
|
size_t pos = 0;
|
auto add1 = [&](auto &&msg, auto &&size) {
|
Put32(&s[pos], size);
|
pos += 4;
|
msg.SerializeToArray(&s[pos], size);
|
pos += size;
|
};
|
add1(head, head_len);
|
add1(body, body_len);
|
assert(pos == s.size());
|
return s;
|
}
|
|
bool ParseHead(BHMsgHead &head) const
|
{
|
auto p = get<char>();
|
assert(p);
|
uint32_t msg_size = Get32(p);
|
p += 4;
|
return head.ParseFromArray(p, msg_size);
|
}
|
template <class Body>
|
bool ParseBody(Body &body) const
|
{
|
auto p = get<char>();
|
assert(p);
|
uint32_t size = Get32(p);
|
p += 4;
|
p += size;
|
size = Get32(p);
|
p += 4;
|
return body.ParseFromArray(p, size);
|
}
|
};
|
|
inline void swap(ShmMsg &m1, ShmMsg &m2) { m1.swap(m2); }
|
|
typedef ShmMsg MsgI;
|
|
} // namespace bhome_msg
|
|
#endif // end of include guard: MSG_5BILLZET
|