CMakeLists.txt | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
shm_util/CMakeLists.txt | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
shm_util/shm_util.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/logger_factory.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/net/net_mod_socket_wrapper.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/socket/shm_socket.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/socket/shm_socket.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
test/CMakeLists.txt | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
test/thread_set.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
test_net_socket/CMakeLists.txt | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
CMakeLists.txt
@@ -30,4 +30,5 @@ add_subdirectory(${PROJECT_SOURCE_DIR}/test) add_subdirectory(${PROJECT_SOURCE_DIR}/test_net_socket) add_subdirectory(${PROJECT_SOURCE_DIR}/test_socket) add_subdirectory(${PROJECT_SOURCE_DIR}/shm_util) endif() shm_util/CMakeLists.txt
New file @@ -0,0 +1,6 @@ add_executable(shm_util shm_util.cpp ) target_link_libraries(shm_util PRIVATE shm_queue ${EXTRA_LIBS} ) target_include_directories(shm_util PRIVATE "${PROJECT_BINARY_DIR}" ${EXTRA_INCLUDES} ) shm_util/shm_util.cpp
New file @@ -0,0 +1,63 @@ #include <assert.h> #include "net_mod_server_socket_wrapper.h" #include "net_mod_socket_wrapper.h" #include "bus_server_socket_wrapper.h" #include "shm_mm_wrapper.h" #include "usg_common.h" #include <getopt.h> #include "logger_factory.h" static void usage(const char *name) { printf("Usage: %s {list}\n", name); } void list () { hashtable_t *hashtable = mm_get_hashtable(); hashtable_foreach(hashtable, [&](int key, void * value){ printf("%d\n", key); }); } void remove(int key) { hashtable_t *hashtable = mm_get_hashtable(); LockFreeQueue<shm_packet_t> * mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, key); if(mqueue != NULL) { delete mqueue; hashtable_remove(hashtable, key); } } int main(int argc, char *argv[]) { shm_mm_wrapper_init(512); int key; int i; if(argc < 2) { usage(argv[0]); return 1; } if(strcmp(argv[1], "list") == 0) { list(); } else if(strcmp(argv[1], "rm") == 0) { if(argc < 3) { usage(argv[0]); return 1; } for(i = 2; i < argc; i++) { key = atoi(argv[i]); remove(key); } } shm_mm_wrapper_destroy(); } src/logger_factory.cpp
@@ -18,7 +18,7 @@ config.logFile = logFile; #ifdef BUILD_Debug config.level = Logger::DEBUG; config.level = Logger::INFO; config.console = 1; #else config.level = Logger::ERROR; src/net/net_mod_socket_wrapper.cpp
@@ -16,8 +16,8 @@ * 关闭 */ void net_mod_socket_close(void *_socket) { NetModSocket *sockt = (NetModSocket *)_socket; delete sockt; // NetModSocket *sockt = (NetModSocket *)_socket; // delete sockt; } int net_mod_socket_stop(void *_socket) { src/socket/shm_socket.cpp
@@ -27,7 +27,7 @@ const int key, const struct timespec *timeout, const int flag); static int _shm_sendandrecv_use_uuid(shm_socket_t *sockt, const void *send_buf, static int _shm_sendandrecv_uuid(shm_socket_t *sockt, const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size, const struct timespec *timeout, int flags); @@ -74,53 +74,6 @@ return queue; } //删除包含在keys内的queue // size_t shm_socket_remove_keys(int keys[], size_t length) { // hashtable_t *hashtable = mm_get_hashtable(); // LockFreeQueue<shm_packet_t> *mqueue; // size_t count = 0; // for(int i = 0; i< length; i++) { // // 销毁共享内存的queue // mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, keys[i]); // delete mqueue; // hashtable_remove(hashtable, keys[i]); // count++; // } // return count; // } // // 删除不在keys内的queue // size_t shm_socket_remove_keys_exclude(int keys[], size_t length) { // hashtable_t *hashtable = mm_get_hashtable(); // std::set<int> *keyset = hashtable_keyset(hashtable); // std::set<int>::iterator keyItr; // LockFreeQueue<shm_packet_t> *mqueue; // bool found; // size_t count = 0; // for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) { // found = false; // for (size_t i = 0; i < length; i++) { // if (*keyItr == keys[i]) { // found = true; // break; // } // } // // 100内的是bus内部自己用的 // if (!found && *keyItr > 100) { // // 销毁共享内存的queue // mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, *keyItr); // delete mqueue; // hashtable_remove(hashtable, *keyItr); // count++; // } // } // delete keyset; // return count; // } shm_socket_t *shm_socket_open(shm_socket_type_t socket_type) { int s, type; pthread_mutexattr_t mtxAttr; @@ -151,11 +104,8 @@ return sockt; } int shm_socket_close(shm_socket_t *sockt) { } int _shm_socket_close_(shm_socket_t *sockt) { int rv; logger->debug("shm_socket_close\n"); @@ -221,6 +171,7 @@ int shm_sendandrecv(shm_socket_t *sockt, const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size, const struct timespec *timeout, int flags) { // return _shm_sendandrecv_uuid(sockt, send_buf, send_size, key, recv_buf, recv_size, timeout, flags); return _shm_sendandrecv_thread_local(sockt, send_buf, send_size, key, recv_buf, recv_size, timeout, flags); } @@ -348,7 +299,7 @@ } static int _shm_sendandrecv_use_uuid(shm_socket_t *sockt, const void *send_buf, static int _shm_sendandrecv_uuid(shm_socket_t *sockt, const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size, const struct timespec *timeout, int flags) { @@ -411,7 +362,6 @@ LABLE_FAIL: return EBUS_RECVFROM_WRONG_END; // return rv; LABLE_SUC: if(recv_buf != NULL) { @@ -424,12 +374,7 @@ if(recv_size != NULL) *recv_size = recvpak.size; return 0; } // use thread local @@ -438,7 +383,7 @@ int *recv_size, const struct timespec *timeout, int flags) { int rv, tryn = 3; int rv, tryn = 6; shm_packet_t sendpak; shm_packet_t recvpak; std::map<int, shm_packet_t>::iterator recvbufIter; @@ -480,18 +425,14 @@ return rv; } if(rv != 0) { return rv; } while(tryn > 0) { tryn--; recvbufIter = tmp_socket->recvbuf2.find(key); if(recvbufIter != tmp_socket->recvbuf2.end()) { // 在缓存里查到了UUID匹配成功的 // logger->debug("get from recvbuf: %s", uuid.c_str()); // 在缓存里查到了key匹配成功的 // logger->info("get from recvbuf: %d", key); recvpak = recvbufIter->second; sockt->recvbuf2.erase(recvbufIter); tmp_socket->recvbuf2.erase(recvbufIter); goto LABLE_SUC; } src/socket/shm_socket.h
@@ -51,9 +51,9 @@ shm_socket_t *shm_socket_open(shm_socket_type_t socket_type); int shm_socket_close(shm_socket_t * socket) ; int shm_socket_close(shm_socket_t * sockt) ; int shm_socket_stop(shm_socket_t *sockt); int shm_socket_stop(shm_socket_t * sockt); int shm_socket_bind(shm_socket_t * socket, int key) ; test/CMakeLists.txt
@@ -64,4 +64,9 @@ ${EXTRA_INCLUDES} ) add_executable(thread_set thread_set.cpp ) target_link_libraries(thread_set PRIVATE ${EXTRA_LIBS} ) target_include_directories(thread_set PRIVATE "${PROJECT_BINARY_DIR}" ${EXTRA_INCLUDES} ) test/thread_set.cpp
New file @@ -0,0 +1,53 @@ #include "usg_common.h" #define THREADS 4 typedef struct shm_packet_t { int key; size_t size; void * buf; char uuid[64]; int action; } shm_packet_t; void *_run_(void *arg) { std::map<int, shm_packet_t> * amap = (std::map<int, shm_packet_t> *)arg; std::map<int, shm_packet_t>::iterator iter; iter = amap->find(2); while( iter != amap->end() ) { amap->erase(iter); iter = amap->find(2); sleep(1); } } int main() { int i; pthread_t tids[THREADS]; std::map<int, shm_packet_t> * amap = new std::map<int, shm_packet_t>; for (i = 0; i < THREADS; i++) { pthread_create(&tids[i], NULL, _run_, (void *)&amap); } shm_packet_t pack; pack.key = 2; while(true) { sleep(1); amap->insert({2, pack}); } for (i = 0; i < THREADS; i++) { if (pthread_join(tids[i], NULL) != 0) { perror(" pthread_join"); } } } test_net_socket/CMakeLists.txt
@@ -9,6 +9,11 @@ # add the executable add_executable(test_net_mod_socket test_net_mod_socket.cpp ${PROJECT_BINARY_DIR}/bin/net_mod_socket.sh) target_link_libraries(test_net_mod_socket PRIVATE shm_queue ${EXTRA_LIBS} ) target_include_directories(test_net_mod_socket PRIVATE "${PROJECT_BINARY_DIR}" ${EXTRA_INCLUDES} ) add_executable(test_bus_stop test_bus_stop.cpp) @@ -24,10 +29,6 @@ ) target_include_directories(test_net_mod_socket PRIVATE "${PROJECT_BINARY_DIR}" ${EXTRA_INCLUDES} ) # add the install targets install(TARGETS test_net_mod_socket DESTINATION bin)