lichao
2021-06-03 8967e7f2f8b94dc032135707e16c8a9f233d0db6
rafactor, remove old todo, add some err msg.
7个文件已修改
23 ■■■■■ 已修改文件
box/center.cpp 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/center_main.cc 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/node_center.h 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/defs.h 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm.h 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_msg_queue.cpp 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.cpp 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/center.cpp
@@ -33,8 +33,6 @@
namespace
{
//TODO check proc_id
template <class Body, class OnMsg, class Replyer>
inline void Dispatch(MsgI &msg, BHMsgHead &head, OnMsg const &onmsg, Replyer const &replyer)
{
box/center_main.cc
@@ -50,7 +50,7 @@
    GlobalInit(shm);
    if (args.Has("daemon") || args.Has("d")) {
        int r = daemon(0, 0); // TODO center control msg to close itself.
        int r = daemon(0, 0); // maybe add center control msg to close itself.
    }
    BHCenter center(shm);
box/node_center.h
@@ -149,8 +149,8 @@
                    return op(node);
                }
            }
        } catch (...) {
            //TODO error log
        } catch (std::exception &e) {
            LOG_ERROR() << "handle msg exception: " << e.what();
            return MakeReply<Reply>(eError, "internal error.");
        }
    }
src/defs.h
@@ -61,7 +61,6 @@
typedef std::string Topic;
void SetLastError(const int ec, const std::string &msg);
void GetLastError(int &ec, std::string &msg);
//TODO center can check shm for previous crash.
const MQInfo &BHTopicCenterAddress(SharedMemory &shm);
const MQInfo &BHTopicBusAddress(SharedMemory &shm);
src/shm.h
@@ -115,7 +115,7 @@
        default: break;
        }
        if (!IsOk()) {
            throw("Error: shm can not create/open \"" + name_ + "\"");
            throw std::runtime_error("Error: shm can not create/open \"" + name_ + "\"");
        }
    }
src/shm_msg_queue.cpp
@@ -37,10 +37,7 @@
{
}
ShmMsgQueue::ShmMsgQueue(const int64_t abs_addr, ShmType &segment, const MQId id) :
    id_(id), queue_(abs_addr, segment, MsgQIdToName(id_))
{
    //TODO check some tag.
}
    id_(id), queue_(abs_addr, segment, MsgQIdToName(id_)) {}
ShmMsgQueue::~ShmMsgQueue() {}
@@ -68,7 +65,6 @@
bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQInfo &remote, const RawData val)
{
    try {
        //TODO find from center, or use offset.
        ShmMsgQueue dest(remote.offset_, shm, remote.id_);
        return dest.queue().TryWrite(val);
    } catch (...) {
src/topic_node.cpp
@@ -569,7 +569,8 @@
            reply_head.mutable_proc_id()->swap(out_proc_id);
            return true;
        }
    } catch (...) {
    } catch (std::exception &e) {
        LOG_ERROR() << __func__ << " exception: " << e.what();
        SetLastError(eError, __func__ + std::string(" internal errer."));
    }
    return false;
@@ -672,7 +673,6 @@
                   reply.ParseBody(reply_body) &&
                   IsSuccess(reply_body.errmsg().errcode());
        }
        // TODO wait for result?
    } catch (...) {
        return false;
    }
@@ -718,12 +718,12 @@
            return false;
        }
    }
    //TODO error msg.
    if (head.type() == kMsgTypePublish) {
        if (pub.ParseFromString(body)) {
            head.mutable_proc_id()->swap(proc_id);
            return true;
        }
    }
    SetLastError(eError, "invalid subcribe msg received.");
    return false;
}