Optimize the code source.
| | |
| | | |
| | | static int gRun_stat = 0; |
| | | static void *gNetmod_socket = NULL; |
| | | static std::map<std::string, int> gRecvbuf; |
| | | |
| | | static pthread_mutex_t mutex; |
| | | |
| | |
| | | std::string str; |
| | | std::string MsgID; |
| | | int timeout_ms = 3000; |
| | | std::map<std::string, int>::iterator recvIter; |
| | | char data_buf[MAX_STR_LEN] = { 0x00 }; |
| | | char buf_temp[MAX_STR_LEN] = { 0x00 }; |
| | | char *topics_buf = NULL; |
| | |
| | | #endif |
| | | |
| | | str = buf_temp; |
| | | recvIter = gRecvbuf.find(str); |
| | | if(recvIter != gRecvbuf.end()) { |
| | | val = net_mod_socket_buf_data_get(gNetmod_socket, str); |
| | | if(val > 0) { |
| | | |
| | | rv = 0; |
| | | val = recvIter->second; |
| | | |
| | | } else { |
| | | rv = net_mod_socket_reg(gNetmod_socket, buf_temp, strlen(buf_temp), &buf, &size, timeout_ms, PROC_QUE_STCS); |
| | |
| | | val = atoi((char *)data_buf); |
| | | if (val > 0) { |
| | | str = buf_temp; |
| | | gRecvbuf.insert({str, val}); |
| | | net_mod_socket_buf_data_set(gNetmod_socket, str, val); |
| | | } |
| | | |
| | | free(buf); |
| | |
| | | net_mod_err_t *errarr; |
| | | int errarr_size = 0; |
| | | char *errString = NULL; |
| | | std::map<std::string, int>::iterator recvIter; |
| | | char buf_temp[MAX_STR_LEN] = { 0x00 }; |
| | | char *topics_buf = NULL; |
| | | |
| | |
| | | #endif |
| | | |
| | | str = buf_temp; |
| | | recvIter = gRecvbuf.find(str); |
| | | if(recvIter != gRecvbuf.end()) { |
| | | val = net_mod_socket_buf_data_get(gNetmod_socket, str); |
| | | if(val > 0) { |
| | | |
| | | rv = 0; |
| | | val = recvIter->second; |
| | | |
| | | } else { |
| | | rv = net_mod_socket_reg(gNetmod_socket, buf_temp, strlen(buf_temp), &buf, &size, timeout_ms, PROC_QUE_STCS); |
| | |
| | | val = atoi((char *)data_buf); |
| | | if (val > 0) { |
| | | str = buf_temp; |
| | | gRecvbuf.insert({str, val}); |
| | | net_mod_socket_buf_data_set(gNetmod_socket, str, val); |
| | | } |
| | | |
| | | free(buf); |
New file |
| | |
| | | #include "sem_util.h" |
| | | #include "logger_factory.h" |
| | | #include <sys/sem.h> |
| | | #include <sys/shm.h> |
| | | #include <sys/types.h> |
| | | #include <sys/ipc.h> |
| | | #include <sys/msg.h> |
| | | #include <errno.h> |
| | | #include "msg_mgr.h" |
| | | |
| | | static Logger *logger = LoggerFactory::getLogger(); |
| | | |
| | | static int sem_msg_id; |
| | | static Msg_info gMsg_buf[MAX_LOCK]; |
| | | |
| | | static int rsv_msg_id; |
| | | |
| | | int msg_init(void) |
| | | { |
| | | key_t key = ftok(MSG_PATH, 0x01); |
| | | |
| | | memset(gMsg_buf, 0x00, sizeof(gMsg_buf)); |
| | | sem_msg_id = msgget((key_t)key, 0666 | IPC_CREAT); |
| | | if(sem_msg_id < 0) { |
| | | logger->error("msgget failed with error: %d\n", sem_msg_id); |
| | | |
| | | return -1; |
| | | } |
| | | |
| | | key_t key_rsv = ftok(MSG_RSV_PATH, 0x02); |
| | | rsv_msg_id = msgget((key_t)key_rsv, 0666 | IPC_CREAT); |
| | | if(rsv_msg_id == -1) { |
| | | logger->error("msgget failed with error: %d\n", rsv_msg_id); |
| | | |
| | | return -1; |
| | | } |
| | | |
| | | return 0; |
| | | |
| | | } |
| | | |
| | | void msg_distrib(int msg_id, Msg_info *message) |
| | | { |
| | | int ret; |
| | | Msg_info* msg_obj = message; |
| | | |
| | | switch(msg_id) |
| | | { |
| | | case SEM_TYPE_ID: |
| | | msg_obj->mtype = MSG_TYPE_SEM; |
| | | ret = msgsnd(sem_msg_id, msg_obj, sizeof(Msg_info) - sizeof(long), IPC_NOWAIT); |
| | | if(ret < 0) { |
| | | logger->error("msg send fail: %d\n", ret); |
| | | |
| | | return; |
| | | |
| | | } |
| | | |
| | | break; |
| | | |
| | | case RSV_TYPE_ID: |
| | | msg_obj->mtype = MSG_TYPE_RSV; |
| | | ret = msgsnd(rsv_msg_id, msg_obj, sizeof(Msg_info), 0); |
| | | if(ret < 0) { |
| | | logger->error("msg send fail: %d\n", ret); |
| | | |
| | | return; |
| | | |
| | | } |
| | | |
| | | break; |
| | | |
| | | default: |
| | | logger->error("unknown msg type!\n"); |
| | | |
| | | } |
| | | |
| | | } |
| | | |
| | | int get_msg_info(int Msgid, Msg_info *message) |
| | | { |
| | | int ret = 0; |
| | | Msg_info msg_obj; |
| | | int msg_id; |
| | | |
| | | if (Msgid == SEM_TYPE_ID) { |
| | | msg_id = sem_msg_id; |
| | | } else if (Msgid == RSV_TYPE_ID) { |
| | | msg_id = rsv_msg_id; |
| | | } |
| | | |
| | | ret = msgrcv(msg_id, (void*)&msg_obj, sizeof(Msg_info) - sizeof(long), MSG_TYPE_SEM, 0); |
| | | if(ret < 0) { |
| | | |
| | | logger->error("msg recv fail: %d\n", ret); |
| | | |
| | | return -1; |
| | | |
| | | } |
| | | |
| | | switch(Msgid) { |
| | | case SEM_TYPE_ID: |
| | | msg_info_set(SEM_TYPE_ID, msg_obj); |
| | | |
| | | break; |
| | | |
| | | case RSV_TYPE_ID: |
| | | msg_info_set(SEM_TYPE_ID, msg_obj); |
| | | |
| | | break; |
| | | |
| | | default: |
| | | logger->error("unknown msg type!\n"); |
| | | |
| | | } |
| | | |
| | | return 0; |
| | | |
| | | } |
| | | |
| | | void msg_info_set(int index, Msg_info msg_obj) |
| | | { |
| | | gMsg_buf[index].key = msg_obj.key; |
| | | gMsg_buf[index].id = msg_obj.id; |
| | | |
| | | if (msg_obj.act == SEM_POST) { |
| | | gMsg_buf[index].count++; |
| | | } else if (msg_obj.act == SEM_GET) { |
| | | gMsg_buf[index].count--; |
| | | } else if (msg_obj.act == SEM_RESET) { |
| | | gMsg_buf[index].count = 0; |
| | | } |
| | | |
| | | } |
| | | |
| | | void *sem_msg_handler(void *skptr) |
| | | { |
| | | int ret; |
| | | Msg_info msg_obj; |
| | | |
| | | msg_init(); |
| | | while(true) { |
| | | ret = get_msg_info(SEM_TYPE_ID, &msg_obj); |
| | | if (ret == 0) { |
| | | //process_msg(msg_obj); |
| | | } |
| | | } |
| | | } |
| | | |
New file |
| | |
| | | #ifndef __MSG_MGR_DEF_ |
| | | #define __MSG_MGR_DEF_ |
| | | |
| | | #ifdef __cplusplus |
| | | extern "C" { |
| | | #endif |
| | | |
| | | #define SEM_TYPE_ID 0 |
| | | #define RSV_TYPE_ID 1 |
| | | |
| | | #define MSG_TYPE_SEM 1 |
| | | #define MSG_TYPE_RSV 2 |
| | | |
| | | #define SEM_CREATE 0 |
| | | #define SEM_GET 1 |
| | | #define SEM_POST 2 |
| | | #define SEM_RM 3 |
| | | #define SEM_RESET 4 |
| | | |
| | | #define MSG_PATH "/tmp/msgqueue" |
| | | #define MSG_RSV_PATH "/tmp/msgqueue_rsv" |
| | | |
| | | #define QUERY_MUTEX_KEY 0x8700 |
| | | |
| | | #define MAX_LOCK 100 |
| | | #define MAX_LOCK_HOLD 10 |
| | | |
| | | #define SEM_WT_TIMEOUT 60 |
| | | |
| | | typedef struct _msg_info |
| | | { |
| | | long mtype; |
| | | int key; |
| | | int id; |
| | | int act; |
| | | int count; |
| | | |
| | | } Msg_info; |
| | | |
| | | #ifdef __cplusplus |
| | | } |
| | | #endif |
| | | |
| | | int msg_init(void); |
| | | void msg_distrib(int msg_id, Msg_info *message); |
| | | int get_msg_info(int msg_id, Msg_info *message); |
| | | void *sem_msg_handler(void *skptr); |
| | | void msg_info_set(int index, Msg_info msg_obj); |
| | | |
| | | #endif //end of file |
| | | |
| | | |
| | | |
| | | |
| | |
| | | |
| | | } |
| | | |
| | | void NetModSocket::buf_data_set(std::string str, int val) { |
| | | recvbuf.insert({str, val}); |
| | | } |
| | | |
| | | int NetModSocket::buf_data_get(std::string str) { |
| | | |
| | | int i; |
| | | int val = 0; |
| | | std::map<std::string, int>::iterator recvIter; |
| | | |
| | | recvIter = recvbuf.find(str); |
| | | if(recvIter != recvbuf.end()) { |
| | | |
| | | val = recvIter->second; |
| | | |
| | | } |
| | | |
| | | return val; |
| | | } |
| | | |
| | | void NetModSocket::free_recv_msg_arr(net_mod_recv_msg_t * arr, size_t size) { |
| | | |
| | |
| | | ShmModSocket shmModSocket; |
| | | int int_val; |
| | | int svr_val; |
| | | std::map<std::string, int> recvbuf; |
| | | // pthread_mutex_t sendMutex; |
| | | |
| | | // request header 编码为网络传输的字节 |
| | |
| | | int sendandrecv_timeout( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, int sec, int nsec) ; |
| | | int sendandrecv_nowait( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size) ; |
| | | |
| | | |
| | | |
| | | |
| | | /** |
| | | * recvandsend |
| | | */ |
| | |
| | | */ |
| | | static void free_recv_msg_arr(net_mod_recv_msg_t * arr, size_t size); |
| | | |
| | | |
| | | void buf_data_set(std::string str, int val); |
| | | int buf_data_get(std::string str); |
| | | }; |
| | | |
| | | #endif |
| | |
| | | return sockt->svr_get(); |
| | | } |
| | | |
| | | void net_mod_socket_buf_data_set(void * _socket, std::string str, int val) { |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | sockt->buf_data_set(str, val); |
| | | } |
| | | |
| | | int net_mod_socket_buf_data_get(void * _socket, std::string str) { |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | return sockt->buf_data_get(str); |
| | | } |
| | | |
| | | /** |
| | | * 如果建立连接的节点没有接受到消息等待timeout的时间后返回 |
| | | * @timeout 等待时间,单位是千分之一秒 |
| | |
| | | void net_mod_socket_svr_set(void * _socket, int data); |
| | | int net_mod_socket_int_get(void * _socket); |
| | | int net_mod_socket_svr_get(void * _socket); |
| | | void net_mod_socket_buf_data_set(void * _socket, std::string str, int val); |
| | | int net_mod_socket_buf_data_get(void * _socket, std::string str); |
| | | |
| | | /** |
| | | * @brief 跨机器发送消息并接受返回的应答消息,直到发送完成才返回 |
| | |
| | | int data; |
| | | int data_fix; |
| | | int count; |
| | | |
| | | |
| | | _LinkNode *next; |
| | | } LinkNode; |
| | | |