From 34cd75f77d0ca94dbdba4e6cc9451fe4d33e78b3 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期三, 19 五月 2021 19:14:13 +0800
Subject: [PATCH] add api BHQueryProcs.

---
 src/socket.h |   51 ++++++++++++++-------------------------------------
 1 files changed, 14 insertions(+), 37 deletions(-)

diff --git a/src/socket.h b/src/socket.h
index d69b8d4..7557034 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -49,16 +49,16 @@
 
 	ShmSocket(Shm &shm, const MQId id, const int len);
 	ShmSocket(Shm &shm, const bool create_or_else_find, const MQId id, const int len);
-	ShmSocket(Shm &shm, const int len = 12);
+	ShmSocket(int64_t offset, Shm &shm, const MQId id);
 	~ShmSocket();
 	static bool Remove(SharedMemory &shm, const MQId id) { return Queue::Remove(shm, id); }
 	bool Remove() { return Remove(shm(), id()); }
 	MQId id() const { return mq().Id(); }
+	int64_t AbsAddr() const { return mq().AbsAddr(); }
 	void SetNodeProc(const int proc_index, const int socket_index)
 	{
 		node_proc_index_ = proc_index;
 		socket_index_ = socket_index;
-		LOG_DEBUG() << "Set Node Proc " << node_proc_index_ << ", " << socket_index_;
 	}
 	// start recv.
 	bool Start(int nworker = 1, const RecvCB &onMsg = RecvCB(), const RawRecvCB &onRaw = RawRecvCB(), const IdleCB &onIdle = IdleCB());
@@ -67,7 +67,7 @@
 	bool Stop();
 
 	template <class Body>
-	bool CenterSend(const MQId remote, BHMsgHead &head, Body &body)
+	bool CenterSend(const MQInfo &remote, BHMsgHead &head, Body &body)
 	{
 		try {
 			//TODO alloc outsiez and use send.
@@ -85,47 +85,23 @@
 	bool RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult);
 
 	template <class Body>
-	bool Send(const MQId remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB())
+	bool Send(const MQInfo &remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB())
 	{
-		std::string msg_id(head.msg_id());
-		std::string content(MsgI::Serialize(head, body));
-		size_t size = content.size();
-		auto OnResult = [content = std::move(content), msg_id, remote, cb = std::move(cb), this](MsgI &msg) mutable {
-			if (!msg.Fill(content)) { return; }
-
-			try {
-				if (!cb) {
-					Send(remote, msg);
-				} else {
-					per_msg_cbs_->Store(msg_id, std::move(cb));
-					auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) {
-						RecvCB cb_no_use;
-						per_msg_cbs_->Pick(msg_id, cb_no_use);
-					};
-					Send(remote, msg, onExpireRemoveCB);
-				}
-			} catch (...) {
-				SetLastError(eError, "Send internal error.");
-			}
-		};
-
-		return RequestAlloc(size, OnResult);
+		return Send(remote, MsgI::Serialize(head, body), head.msg_id(), std::move(cb));
 	}
 	template <class... T>
-	bool Send(const MQId remote, const MsgI &imsg, T &&...t)
+	bool Send(const MQInfo &remote, const MsgI &imsg, T &&...t)
 	{
 		return SendImpl(remote, imsg, std::forward<decltype(t)>(t)...);
 	}
 	template <class... T>
-	bool Send(const MQId remote, const int64_t cmd, T &&...t)
+	bool Send(const MQInfo &remote, const int64_t cmd, T &&...t)
 	{
 		return SendImpl(remote, cmd, std::forward<decltype(t)>(t)...);
 	}
-	bool SyncRecv(int64_t &cmd, const int timeout_ms);
-	bool SyncRecv(MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms);
 
 	template <class Body>
-	bool SendAndRecv(const MQId remote, BHMsgHead &head, Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms)
+	bool SendAndRecv(const MQInfo &remote, BHMsgHead &head, Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms)
 	{
 		struct State {
 			std::mutex mutex;
@@ -135,6 +111,7 @@
 
 		try {
 			std::shared_ptr<State> st(new State);
+
 			auto endtime = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms);
 
 			auto OnRecv = [st, &reply, &reply_head](ShmSocket &sock, MsgI &msg, BHMsgHead &head) {
@@ -175,12 +152,12 @@
 	bool StopNoLock();
 	bool RunningNoLock() { return !workers_.empty(); }
 
+	bool Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb = RecvCB());
+
 	template <class... Rest>
-	bool SendImpl(const MQId remote, Rest &&...rest)
+	bool SendImpl(const MQInfo &remote, Rest &&...rest)
 	{
-		// TODO send alloc request, and pack later, higher bit means alloc?
-		send_buffer_.Append(remote, std::forward<decltype(rest)>(rest)...);
-		return true;
+		return send_buffer_.Append(remote, std::forward<decltype(rest)>(rest)...);
 	}
 
 	std::vector<std::thread> workers_;
@@ -211,8 +188,8 @@
 
 	Synced<CallbackRecords<std::string, RecvCB>> per_msg_cbs_;
 	Synced<CallbackRecords<int, RawRecvCB>> alloc_cbs_;
-
 	SendQ send_buffer_;
+
 	// node request center alloc memory.
 	int node_proc_index_ = -1;
 	int socket_index_ = -1;

--
Gitblit v1.8.0