| | |
| | | cmake-build-debug/ |
| | | *.tmp |
| | | core |
| | | tags |
| | |
| | | ./socket/shm_socket.cpp |
| | | ./socket/shm_mod_socket.cpp |
| | | ./time_util.cpp |
| | | ./psem.cpp |
| | | ./bus_error.cpp |
| | | ./futex_sem.cpp |
| | | ./svsem.cpp |
| | |
| | | #define _PSEM_H_ |
| | | |
| | | #include "usg_common.h" |
| | | #include <semaphore.h> |
| | | #include "time_util.h" |
| | | |
| | | int psem_wait(sem_t *sem) ; |
| | | inline int psem_timedwait(sem_t *sem, const struct timespec *ts) { |
| | | struct timespec abs_timeout = TimeUtil::calc_abs_time(ts); |
| | | return sem_timedwait(sem, &abs_timeout); |
| | | // int rv ; |
| | | // while ( (rv = sem_timedwait(sem, &abs_timeout)) == -1) { |
| | | // if(errno == EINTR) |
| | | // continue; |
| | | // else { |
| | | // // LoggerFactory::getLogger()->error(errno, "LockFreeQueue push_timeout"); |
| | | // return -1; |
| | | // } |
| | | // } |
| | | // return 0; |
| | | } |
| | | |
| | | int psem_timedwait(sem_t *sem, const struct timespec *ts); |
| | | |
| | | int psem_trywait(sem_t *sem) ; |
| | | inline int psem_wait(sem_t *sem) { |
| | | return sem_wait(sem); |
| | | // int rv; |
| | | // while ( (rv = sem_wait(sem)) == -1) { |
| | | // if(errno == EINTR) |
| | | // continue; |
| | | // else { |
| | | // return -1; |
| | | // } |
| | | // } |
| | | // return 0; |
| | | } |
| | | |
| | | int psem_post(sem_t *sem); |
| | | inline int psem_trywait(sem_t *sem) { |
| | | return sem_trywait(sem); |
| | | } |
| | | |
| | | inline int psem_post(sem_t *sem) { |
| | | return sem_post(sem); |
| | | } |
| | | |
| | | |
| | | #endif |
| | |
| | | #include "psem.h" |
| | | #include "bus_error.h" |
| | | #include "bus_def.h" |
| | | #include "read_write_lock.h" |
| | | // default Queue size |
| | | #define LOCK_FREE_Q_DEFAULT_SIZE 16 |
| | | |
| | |
| | | |
| | | public: |
| | | sem_t mutex; |
| | | |
| | | |
| | | LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); |
| | | |
| | |
| | | } |
| | | } |
| | | |
| | | |
| | | if (m_qImpl.pop(a_data)) { |
| | | psem_post(&slots); |
| | | // sigprocmask(SIG_SETMASK, &pre, NULL); |
| | | // LoggerFactory::getLogger()->debug("==================LockFreeQueue pop after\n"); |
| | | return 0; |
| | | } |
| | | |
| | | |
| | | LABEL_FAILTURE: |
| | | // sigprocmask(SIG_SETMASK, &pre, NULL); |
| | |
| | | typename Allocator, |
| | | template<typename T, typename AT> class Q_TYPE> |
| | | void LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator delete(void *p) { |
| | | return Allocator::deallocate(p); |
| | | LockFreeQueue<ELEM_T, Allocator, Q_TYPE> * _que = (LockFreeQueue<ELEM_T, Allocator, Q_TYPE> * )p; |
| | | Allocator::deallocate(p); |
| | | } |
| | | |
| | | // include implementation files |
New file |
| | |
| | | #ifndef _CLOSE_LOCK_H_ |
| | | #define _CLOSE_LOCK_H_ |
| | | |
| | | #include "usg_common.h" |
| | | #include "psem.h" |
| | | class ReadWriteLock { |
| | | private: |
| | | unsigned int readCount = 0; |
| | | sem_t countMutex; |
| | | sem_t writeMutex; |
| | | |
| | | |
| | | public: |
| | | ReadWriteLock() { |
| | | |
| | | } |
| | | |
| | | void lockRead() { |
| | | //readCount是共享变量,所以需要实现一个锁来控制读写 |
| | | //synchronized(ReadWriteLock.class){} |
| | | psem_wait(&countMutex); |
| | | //只有是第一个读者,才将写锁加锁。其他的读者都是进行下一步 |
| | | if(readCount == 0){ |
| | | psem_wait(&writeMutex); |
| | | |
| | | } |
| | | ++readCount; |
| | | psem_post(&countMutex); |
| | | } |
| | | |
| | | |
| | | void unlockRead(){ |
| | | |
| | | psem_wait(&countMutex); |
| | | readCount--; |
| | | //只有当读者都读完了,才会进行写操作 |
| | | if(readCount == 0){ |
| | | psem_post(&writeMutex); |
| | | } |
| | | psem_post(&countMutex); |
| | | } |
| | | |
| | | |
| | | void lockWrite(){ |
| | | psem_wait(&writeMutex); |
| | | } |
| | | |
| | | void unlockWrite(){ |
| | | psem_post(&writeMutex); |
| | | } |
| | | }; |
| | | |
| | | #endif |
| | |
| | | |
| | | // use thread local |
| | | static int _shm_sendandrecv_thread_local(shm_socket_t *sockt, const void *send_buf, |
| | | const int send_size, const int send_key, void **recv_buf, |
| | | const int send_size, const int key, void **recv_buf, |
| | | int *recv_size, const struct timespec *timeout, int flags) { |
| | | int recv_key; |
| | | int rv; |
| | | int tryn = 0; |
| | | |
| | | |
| | | |
| | | int rv, tryn = 3; |
| | | shm_packet_t sendpak; |
| | | shm_packet_t recvpak; |
| | | std::map<int, shm_packet_t>::iterator recvbufIter; |
| | | // 用thread local 保证每个线程用一个独占的socket接受对方返回的信息 |
| | | shm_socket_t *tmp_socket; |
| | | |
| | |
| | | exit(1); |
| | | } |
| | | } |
| | | // int rv; |
| | | // int tryn = 0; |
| | | // int recv_key; |
| | | // rv = shm_sendto(tmp_socket, send_buf, send_size, send_key, timeout, flags); |
| | | |
| | | if ((rv = shm_sendto(tmp_socket, send_buf, send_size, send_key, timeout, flags)) == 0) { |
| | | // if (() == 0) { |
| | | |
| | | while(tryn < 3) { |
| | | tryn++; |
| | | rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_key, timeout, flags); |
| | | if(rv != 0) { |
| | | logger->error("_shm_sendandrecv_thread_local : %s\n", bus_strerror(rv)); |
| | | return rv; |
| | | } |
| | | // while(tryn < 3) { |
| | | // tryn++; |
| | | // rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_key, timeout, flags); |
| | | // if(rv != 0) { |
| | | // logger->error("_shm_sendandrecv_thread_local : %s\n", bus_strerror(rv)); |
| | | // return rv; |
| | | // } |
| | | |
| | | // 超时导致接发送对象,与返回对象不对应的情况 |
| | | if(send_key != recv_key) { |
| | | logger->debug("======%d use tmp_socket %d, send to %d, receive from %d\n", shm_socket_get_key(sockt), shm_socket_get_key(tmp_socket), send_key, recv_key); |
| | | // logger->error( "_shm_sendandrecv_alloc_new: send key expect to equal to recv key! send key =%d , recv key=%d", send_key, recv_key); |
| | | // exit(1); |
| | | continue; |
| | | // return EBUS_RECVFROM_WRONG_END; |
| | | // // 超时导致接发送对象,与返回对象不对应的情况 |
| | | // if(send_key != recv_key) { |
| | | // logger->debug("======%d use tmp_socket %d, send to %d, receive from %d\n", shm_socket_get_key(sockt), shm_socket_get_key(tmp_socket), send_key, recv_key); |
| | | // // logger->error( "_shm_sendandrecv_alloc_new: send key expect to equal to recv key! send key =%d , recv key=%d", send_key, recv_key); |
| | | // // exit(1); |
| | | // continue; |
| | | // // return EBUS_RECVFROM_WRONG_END; |
| | | // } |
| | | |
| | | // return 0; |
| | | // } |
| | | |
| | | // return EBUS_RECVFROM_WRONG_END; |
| | | // } |
| | | |
| | | |
| | | |
| | | sendpak.key = tmp_socket->key; |
| | | sendpak.size = send_size; |
| | | if(send_buf != NULL) { |
| | | sendpak.buf = mm_malloc(send_size); |
| | | memcpy(sendpak.buf, send_buf, send_size); |
| | | } |
| | | rv = shm_sendpakto(tmp_socket, &sendpak, key, timeout, flags); |
| | | |
| | | if(rv != 0) { |
| | | return rv; |
| | | } |
| | | |
| | | if(rv != 0) { |
| | | return rv; |
| | | } |
| | | |
| | | while(tryn > 0) { |
| | | tryn--; |
| | | recvbufIter = tmp_socket->recvbuf2.find(key); |
| | | if(recvbufIter != tmp_socket->recvbuf2.end()) { |
| | | // 在缓存里查到了UUID匹配成功的 |
| | | // logger->debug("get from recvbuf: %s", uuid.c_str()); |
| | | recvpak = recvbufIter->second; |
| | | sockt->recvbuf2.erase(recvbufIter); |
| | | goto LABLE_SUC; |
| | | } |
| | | |
| | | rv = shm_recvpakfrom(tmp_socket, &recvpak, timeout, flags); |
| | | |
| | | if (rv != 0) { |
| | | |
| | | if(rv == ETIMEDOUT) { |
| | | return EBUS_TIMEOUT; |
| | | } |
| | | |
| | | return 0; |
| | | } |
| | | |
| | | return EBUS_RECVFROM_WRONG_END; |
| | | } |
| | | logger->debug("%d shm_recvfrom failed %s", shm_socket_get_key(tmp_socket), bus_strerror(rv)); |
| | | return rv; |
| | | } |
| | | |
| | | return rv; |
| | | if (key == recvpak.key) { |
| | | // 发送与接受的UUID匹配成功 |
| | | goto LABLE_SUC; |
| | | } else { |
| | | // 答非所问,放到缓存里 |
| | | tmp_socket->recvbuf2.insert({recvpak.key, recvpak}); |
| | | continue; |
| | | } |
| | | } |
| | | |
| | | LABLE_FAIL: |
| | | return EBUS_RECVFROM_WRONG_END; |
| | | |
| | | LABLE_SUC: |
| | | if(recv_buf != NULL) { |
| | | void *_buf = malloc(recvpak.size); |
| | | memcpy(_buf, recvpak.buf, recvpak.size); |
| | | *recv_buf = _buf; |
| | | } |
| | | |
| | | if(recv_size != NULL) |
| | | *recv_size = recvpak.size; |
| | | |
| | | mm_free(recvpak.buf); |
| | | return 0; |
| | | } |
| | | |
| | | static int _shm_sendandrecv_alloc_new(shm_socket_t *sockt, const void *send_buf, |
| | |
| | | |
| | | LockFreeQueue<shm_packet_t> *queue; //self queue |
| | | LockFreeQueue<shm_packet_t> *remoteQueue; // peer queue |
| | | std::map<std::string, shm_packet_t> recvbuf; |
| | | |
| | | std::map<std::string, shm_packet_t> recvbuf; // for uuid |
| | | std::map<int, shm_packet_t> recvbuf2; //for thread local |
| | | |
| | | } shm_socket_t; |
| | | |
New file |
| | |
| | | #ifndef _CLOSE_LOCK_H_ |
| | | #define _CLOSE_LOCK_H_ |
| | | |
| | | #include "usg_common.h" |
| | | #include "svsem.h" |
| | | class SvReadWriteLock { |
| | | private: |
| | | unsigned int readCount = 0; |
| | | int countMutex; |
| | | int writeMutex; |
| | | |
| | | |
| | | public: |
| | | SvReadWriteLock() { |
| | | countMutex = svsem_get(IPC_PRIVATE, 1); |
| | | writeMutex = svsem_get(IPC_PRIVATE, 1); |
| | | } |
| | | |
| | | void lockRead() { |
| | | //readCount是共享变量,所以需要实现一个锁来控制读写 |
| | | //synchronized(ReadWriteLock.class){} |
| | | svsem_wait(&countMutex); |
| | | //只有是第一个读者,才将写锁加锁。其他的读者都是进行下一步 |
| | | if(readCount == 0){ |
| | | svsem_wait(&writeMutex); |
| | | |
| | | } |
| | | ++readCount; |
| | | svsem_post(&countMutex); |
| | | } |
| | | |
| | | |
| | | void unlockRead(){ |
| | | |
| | | svsem_wait(&countMutex); |
| | | readCount--; |
| | | //只有当读者都读完了,才会进行写操作 |
| | | if(readCount == 0){ |
| | | svsem_post(&writeMutex); |
| | | } |
| | | svsem_post(&countMutex); |
| | | } |
| | | |
| | | |
| | | void lockWrite(){ |
| | | svsem_wait(&writeMutex); |
| | | } |
| | | |
| | | void unlockWrite(){ |
| | | svsem_post(&writeMutex); |
| | | } |
| | | }; |
| | | |
| | | #endif |
| | |
| | | # add command |
| | | add_custom_command( |
| | | OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/net_mod_socket.sh |
| | | COMMAND cp ${CMAKE_CURRENT_SOURCE_DIR}/net_mod_socket.sh ${CMAKE_CURRENT_BINARY_DIR}/net_mod_socket.sh |
| | | OUTPUT ${PROJECT_BINARY_DIR}/bin/net_mod_socket.sh |
| | | COMMAND cp ${CMAKE_CURRENT_SOURCE_DIR}/net_mod_socket.sh ${PROJECT_BINARY_DIR}/bin/net_mod_socket.sh |
| | | DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/net_mod_socket.sh |
| | | ) |
| | | |
| | | |
| | | # add the executable |
| | | add_executable(test_net_mod_socket test_net_mod_socket.cpp ${CMAKE_CURRENT_BINARY_DIR}/net_mod_socket.sh) |
| | | add_executable(test_net_mod_socket test_net_mod_socket.cpp ${PROJECT_BINARY_DIR}/bin/net_mod_socket.sh) |
| | | target_link_libraries(test_net_mod_socket PRIVATE shm_queue ${EXTRA_LIBS} ) |
| | | |
| | | |
| | |
| | | double difftime = end.tv_sec * 1000000 + end.tv_usec - (start.tv_sec * 1000000 + start.tv_usec); |
| | | long diffsec = (long) (difftime/1000000); |
| | | long diffusec = difftime - diffsec*1000000; |
| | | fprintf(stderr,"发送数目: %ld, 用时: (%ld sec %ld usec), 平均: %f\n", total, diffsec, diffusec, difftime/total ); |
| | | fprintf(stderr,"发送数目:%ld, 成功数目: %ld, 用时: (%ld sec %ld usec), 平均: %f\n", |
| | | SCALE*node_arr_size, total, diffsec, diffusec, difftime/total ); |
| | | // fflush(stdout); |
| | | |
| | | } |