From ad4f3dcedab29a690c5eedbb08ba1b393917db0b Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期三, 21 四月 2021 17:39:34 +0800
Subject: [PATCH] update go api.

---
 src/socket.h |   61 +++++++++++++-----------------
 1 files changed, 26 insertions(+), 35 deletions(-)

diff --git a/src/socket.h b/src/socket.h
index 96af6e7..1ba10cb 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -36,11 +36,10 @@
 
 class ShmSocket : private boost::noncopyable
 {
-	bool SendImpl(const void *valid_remote, MsgI const &imsg, SendQ::OnMsgEvent onExpire = SendQ::OnMsgEvent())
+	template <class... T>
+	bool SendImpl(const void *valid_remote, T &&...rest)
 	{
-		// if (!mq().TrySend(*(MQId *) valid_remote, imsg)) {
-		send_buffer_.Append(*static_cast<const MQId *>(valid_remote), imsg, onExpire);
-		// }
+		send_buffer_.Append(*static_cast<const MQId *>(valid_remote), std::forward<decltype(rest)>(rest)...);
 		return true;
 	}
 
@@ -57,6 +56,7 @@
 	ShmSocket(Shm &shm, const int len = 12);
 	~ShmSocket();
 	static bool Remove(SharedMemory &shm, const MQId &id) { return Queue::Remove(shm, id); }
+	bool Remove() { return Remove(shm(), id()); }
 	const MQId &id() const { return mq().Id(); }
 	// start recv.
 	bool Start(int nworker = 1, const RecvCB &onData = RecvCB(), const IdleCB &onIdle = IdleCB());
@@ -66,34 +66,24 @@
 	size_t Pending() const { return mq().Pending(); }
 
 	template <class Body>
-	bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body)
+	bool Send(const void *valid_remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB())
 	{
-		MsgI msg;
-		if (msg.Make(shm(), head, body)) {
-			DEFER1(if (msg.IsCounted()) { msg.Release(shm()); });
-			return SendImpl(valid_remote, msg);
+		try {
+			if (!cb) {
+				return SendImpl(valid_remote, MsgI::Serialize(head, body));
+			} else {
+				std::string msg_id(head.msg_id());
+				per_msg_cbs_->Store(msg_id, std::move(cb));
+				auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) {
+					RecvCB cb_no_use;
+					per_msg_cbs_->Pick(msg_id, cb_no_use);
+				};
+				return SendImpl(valid_remote, MsgI::Serialize(head, body), onExpireRemoveCB);
+			}
+		} catch (...) {
+			SetLastError(eError, "Send internal error.");
+			return false;
 		}
-		return false;
-	}
-
-	template <class Body>
-	bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const RecvCB &cb)
-	{
-		//TODO send_buffer_ need flag, and remove callback on expire.
-		MsgI msg;
-		if (msg.Make(shm(), head, body)) {
-			DEFER1(if (msg.IsCounted()) { msg.Release(shm()); });
-			std::string msg_id(head.msg_id());
-			per_msg_cbs_->Add(msg_id, cb);
-			auto onExpireRemoveCB = [this, msg_id](MsgI const &msg) {
-				RecvCB cb_no_use;
-				per_msg_cbs_->Find(msg_id, cb_no_use);
-			};
-			return SendImpl(valid_remote, msg, onExpireRemoveCB);
-		} else {
-			printf("out of mem?, avail: %ld\n", shm().get_free_memory());
-		}
-		return false;
 	}
 
 	bool Send(const void *valid_remote, const MsgI &imsg)
@@ -101,10 +91,10 @@
 		return SendImpl(valid_remote, imsg);
 	}
 
-	bool SyncRecv(MsgI &msg, bhome::msg::BHMsgHead &head, const int timeout_ms);
+	bool SyncRecv(MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms);
 
 	template <class Body>
-	bool SendAndRecv(const void *remote, const BHMsgHead &head, const Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms)
+	bool SendAndRecv(const void *remote, BHMsgHead &head, Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms)
 	{
 		struct State {
 			std::mutex mutex;
@@ -127,7 +117,7 @@
 			};
 
 			std::unique_lock<std::mutex> lk(st->mutex);
-			bool sendok = Send(remote, head, body, OnRecv);
+			bool sendok = Send(remote, head, body, std::move(OnRecv));
 			if (!sendok) {
 				printf("send timeout\n");
 			}
@@ -135,6 +125,7 @@
 				return true;
 			} else {
 				st->canceled = true;
+				SetLastError(ETIMEDOUT, "timeout");
 				return false;
 			}
 		} catch (...) {
@@ -164,8 +155,8 @@
 
 	public:
 		bool empty() const { return store_.empty(); }
-		bool Add(const std::string &id, const RecvCB &cb) { return store_.emplace(id, cb).second; }
-		bool Find(const std::string &id, RecvCB &cb)
+		bool Store(const std::string &id, RecvCB &&cb) { return store_.emplace(id, std::move(cb)).second; }
+		bool Pick(const std::string &id, RecvCB &cb)
 		{
 			auto pos = store_.find(id);
 			if (pos != store_.end()) {

--
Gitblit v1.8.0