| | |
| | | int shmqueue_push(void * _shmqueue, void *src, int size); |
| | | |
| | | /** |
| | | * 入队, 队列满时立即返回. |
| | | * 入队, 立刻返回 |
| | | * @return 1 入队成功, 0 入队失败 |
| | | */ |
| | | int shmqueue_push_nowait(void * _shmqueue, void *src, int size) ; |
| | |
| | | int shmqueue_pop(void * _shmqueue, void **dest, int *size); |
| | | |
| | | /** |
| | | * 出队, 队列空时立即返回 |
| | | * 出队, 立刻返回 |
| | | * @return 1 出队成功, 0出队失败 |
| | | */ |
| | | int shmqueue_pop_nowait(void * _shmqueue, void **dest, int *size) ; |
| | |
| | | return shm_socket_force_bind(socket->shm_socket, port); |
| | | } |
| | | |
| | | |
| | | |
| | | int dgram_mod_sendto(void *_socket, const void *buf, const int size, const int port) { |
| | | dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; |
| | | return shm_sendto(socket->shm_socket, buf, size, port); |
| | | return shm_sendto(socket->shm_socket, buf, size, port, NULL, 0); |
| | | |
| | | } |
| | | |
| | | int dgram_mod_recvfrom(void *_socket, void **buf, int *size, int *port) { |
| | | int dgram_mod_sendto_timeout(void *_socket, const void *buf, const int size, const int port, int sec, int nsec) { |
| | | dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; |
| | | struct timespec timeout = {sec, nsec}; |
| | | return shm_sendto(socket->shm_socket, buf, size, port, &timeout, 0); |
| | | |
| | | } |
| | | |
| | | int dgram_mod_sendto_nowait(void *_socket, const void *buf, const int size, const int port) { |
| | | dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; |
| | | return shm_sendto(socket->shm_socket, buf, size, port, NULL, (int)SHM_MSG_NOWAIT); |
| | | |
| | | } |
| | | |
| | | static inline int _dgram_mod_recvfrom_(void *_socket, void **buf, int *size, int *port, struct timespec *timeout, int flags) { |
| | | |
| | | dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; |
| | | if(socket->mod == BUS) { |
| | | err_exit(0, "Can not use method recvfrom in a Bus"); |
| | | } |
| | | // printf("dgram_mod_recvfrom before\n"); |
| | | int rv = shm_recvfrom(socket->shm_socket, buf, size, port); |
| | | int rv = shm_recvfrom(socket->shm_socket, buf, size, port, timeout, flags); |
| | | // printf("dgram_mod_recvfrom after\n"); |
| | | return rv; |
| | | } |
| | | |
| | | int dgram_mod_recvfrom(void *_socket, void **buf, int *size, int *port) { |
| | | return _dgram_mod_recvfrom_(_socket, buf, size, port, NULL, 0); |
| | | } |
| | | |
| | | int dgram_mod_recvfrom_timeout(void *_socket, void **buf, int *size, int *port, int sec, int nsec) { |
| | | struct timespec timeout = {sec, nsec}; |
| | | return _dgram_mod_recvfrom_(_socket, buf, size, port, &timeout, 0); |
| | | } |
| | | |
| | | int dgram_mod_recvfrom_nowait(void *_socket, void **buf, int *size, int *port) { |
| | | return _dgram_mod_recvfrom_(_socket, buf, size, port, NULL, (int)SHM_MSG_NOWAIT); |
| | | } |
| | | |
| | | |
| | | int dgram_mod_sendandrecv(void * _socket, const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size) { |
| | | |
| | | int dgram_mod_sendandrecv(void * _socket, const void *send_buf, const int send_size, const int send_port, |
| | | void **recv_buf, int *recv_size) { |
| | | dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; |
| | | return shm_sendandrecv(socket->shm_socket, send_buf, send_size, send_port, recv_buf, recv_size); |
| | | return shm_sendandrecv(socket->shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, NULL, 0); |
| | | |
| | | } |
| | | |
| | | |
| | | |
| | | int dgram_mod_get_port(void * _socket) { |
| | | int dgram_mod_sendandrecv_timeout(void * _socket, const void *send_buf, const int send_size, const int send_port, |
| | | void **recv_buf, int *recv_size, int sec, int nsec) { |
| | | struct timespec timeout = {sec, nsec}; |
| | | dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; |
| | | return socket->shm_socket->port; |
| | | return shm_sendandrecv(socket->shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, &timeout, 0); |
| | | |
| | | } |
| | | |
| | | int dgram_mod_sendandrecv_nowait(void * _socket, const void *send_buf, const int send_size, const int send_port, |
| | | void **recv_buf, int *recv_size) { |
| | | dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; |
| | | return shm_sendandrecv(socket->shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT); |
| | | |
| | | } |
| | | |
| | | |
| | | void dgram_mod_free(void *buf) { |
| | | free(buf); |
| | | } |
| | | // =================bus======================== |
| | | |
| | | int dgram_mod_start_bus(void * _socket) { |
| | | dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; |
| | |
| | | /** |
| | | * @port 总线端口 |
| | | */ |
| | | int dgram_mod_sub(void * _socket, void *topic, int size, int port) { |
| | | static int _dgram_mod_sub_(void * _socket, void *topic, int size, int port, |
| | | struct timespec *timeout, int flags) { |
| | | dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; |
| | | char buf[8192]; |
| | | snprintf(buf, 8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER); |
| | | return shm_sendto(socket->shm_socket, buf, strlen(buf) + 1, port); |
| | | return shm_sendto(socket->shm_socket, buf, strlen(buf) + 1, port, timeout, flags); |
| | | } |
| | | |
| | | int dgram_mod_sub(void * _socket, void *topic, int size, int port ) { |
| | | return _dgram_mod_sub_(_socket, topic, size, port, NULL, 0); |
| | | } |
| | | |
| | | int dgram_mod_sub_timeout(void * _socket, void *topic, int size, int port, int sec, int nsec) { |
| | | struct timespec timeout = {sec, nsec}; |
| | | return _dgram_mod_sub_(_socket, topic, size, port, &timeout, 0); |
| | | } |
| | | |
| | | int dgram_mod_sub_nowait(void * _socket, void *topic, int size, int port) { |
| | | return _dgram_mod_sub_(_socket, topic, size, port, NULL, (int)SHM_MSG_NOWAIT); |
| | | } |
| | | |
| | | /** |
| | | * @port 总线端口 |
| | | */ |
| | | int dgram_mod_pub(void * _socket, void *topic, int topic_size, void *content, int content_size, int port) { |
| | | static int _dgram_mod_pub_(void * _socket, void *topic, int topic_size, void *content, int content_size, int port, |
| | | struct timespec *timeout, int flags) { |
| | | |
| | | dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; |
| | | int head_len; |
| | |
| | | snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER); |
| | | head_len = strlen(buf); |
| | | memcpy(buf+head_len, content, content_size); |
| | | return shm_sendto(socket->shm_socket, buf, head_len+content_size, port); |
| | | return shm_sendto(socket->shm_socket, buf, head_len+content_size, port, timeout, flags); |
| | | |
| | | } |
| | | |
| | | int dgram_mod_pub(void * _socket, void *topic, int topic_size, void *content, int content_size, int port) { |
| | | return _dgram_mod_pub_(_socket, topic, topic_size, content, content_size, port, NULL, 0); |
| | | } |
| | | |
| | | int dgram_mod_pub_timeout(void * _socket, void *topic, int topic_size, void *content, int content_size, int port, int sec, int nsec) { |
| | | struct timespec timeout = {sec, nsec}; |
| | | return _dgram_mod_pub_(_socket, topic, topic_size, content, content_size, port, &timeout, 0); |
| | | } |
| | | |
| | | int dgram_mod_pub_nowait(void * _socket, void *topic, int topic_size, void *content, int content_size, int port) { |
| | | return _dgram_mod_pub_(_socket, topic, topic_size, content, content_size, port, NULL, (int)SHM_MSG_NOWAIT); |
| | | } |
| | | |
| | | |
| | | |
| | | int dgram_mod_get_port(void * _socket) { |
| | | dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; |
| | | return socket->shm_socket->port; |
| | | } |
| | | |
| | | |
| | | void dgram_mod_free(void *buf) { |
| | | free(buf); |
| | | } |
| | | |
| | | //========================================================================================================================== |
| | | |
| | |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int dgram_mod_sendto(void *_socket, const void *buf, const int size, const int port); |
| | | |
| | | // 发送信息超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int dgram_mod_sendto_timeout(void *_socket, const void *buf, const int size, const int port, int sec, int nsec); |
| | | // 发送信息立刻返回。 |
| | | int dgram_mod_sendto_nowait(void *_socket, const void *buf, const int size, const int port); |
| | | |
| | | /** |
| | | * 接收信息 |
| | |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int dgram_mod_recvfrom(void *_socket, void **buf, int *size, int *port); |
| | | // 接受信息超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int dgram_mod_recvfrom_timeout(void *_socket, void **buf, int *size, int *port, int sec, int nsec); |
| | | int dgram_mod_recvfrom_nowait(void *_socket, void **buf, int *size, int *port); |
| | | |
| | | /** |
| | | * 发送请求信息并等待接收应答 |
| | |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int dgram_mod_sendandrecv(void * _socket, const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ; |
| | | // 超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int dgram_mod_sendandrecv_timeout(void * _socket, const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size, int sec, int nsec) ; |
| | | int dgram_mod_sendandrecv_nowait(void * _socket, const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ; |
| | | |
| | | |
| | | /** |
| | |
| | | * @port 总线端口 |
| | | */ |
| | | int dgram_mod_sub(void * _socket, void *topic, int size, int port); |
| | | // 超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int dgram_mod_sub_timeout(void * _socket, void *topic, int size, int port, int sec, int nsec); |
| | | int dgram_mod_sub_nowait(void * _socket, void *topic, int size, int port); |
| | | |
| | | |
| | | |
| | | /** |
| | | * 发布主题 |
| | |
| | | * @port 总线端口 |
| | | */ |
| | | int dgram_mod_pub(void * _socket, void *topic, int topic_size, void *content, int content_size, int port); |
| | | // 超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int dgram_mod_pub_timeout(void * _socket, void *topic, int topic_size, void *content, int content_size, int port, int sec, int nsec); |
| | | int dgram_mod_pub_nowait(void * _socket, void *topic, int topic_size, void *content, int content_size, int port); |
| | | |
| | | |
| | | /** |
| | |
| | | #include "usg_typedef.h" |
| | | #include "shm_queue.h" |
| | | |
| | | #ifdef __cplusplus |
| | | extern "C" { |
| | | #endif |
| | | |
| | | |
| | | enum shm_msg_type_t |
| | | { |
| | |
| | | SHM_SOCKET_CLOSE = 3, |
| | | SHM_COMMON_MSG = 4 |
| | | |
| | | }; |
| | | |
| | | enum shm_socket_flag_t |
| | | { |
| | | SHM_MSG_TIMEOUT = 1, |
| | | SHM_MSG_NOWAIT = 2 |
| | | }; |
| | | |
| | | enum shm_socket_type_t |
| | |
| | | |
| | | int shm_send(shm_socket_t * socket, const void *buf, const int size) ; |
| | | |
| | | int shm_sendandrecv(shm_socket_t *socket, const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size); |
| | | |
| | | int shm_recv(shm_socket_t * socket, void **buf, int *size) ; |
| | | |
| | | int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int port, const struct timespec * timeout = NULL); |
| | | int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int port, const struct timespec * timeout = NULL, const int flags=0); |
| | | |
| | | int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *port); |
| | | int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *port, struct timespec * timeout = NULL, int flags=0); |
| | | |
| | | int shm_sendandrecv(shm_socket_t *socket, const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size, |
| | | struct timespec * timeout = NULL, int flags=0); |
| | | |
| | | |
| | | #ifdef __cplusplus |
| | | } |
| | | #endif |
| | | |
| | | #endif |
| | |
| | | |
| | | static Logger logger = LoggerFactory::getLogger(); |
| | | |
| | | |
| | | |
| | | void print_msg(char *head, shm_msg_t &msg) { |
| | | // err_msg(0, "%s: port=%d, type=%d\n", head, msg.port, msg.type); |
| | | } |
| | | |
| | | void *_server_run_msg_rev(void *_socket); |
| | | static void *_server_run_msg_rev(void *_socket); |
| | | |
| | | void *_client_run_msg_rev(void *_socket); |
| | | static void *_client_run_msg_rev(void *_socket); |
| | | |
| | | int _shm_close_dgram_socket(shm_socket_t *socket); |
| | | static int _shm_close_dgram_socket(shm_socket_t *socket); |
| | | |
| | | int _shm_close_stream_socket(shm_socket_t *socket, bool notifyRemote); |
| | | static int _shm_close_stream_socket(shm_socket_t *socket, bool notifyRemote); |
| | | |
| | | static inline int _shm_socket_check_key(shm_socket_t *socket) { |
| | | void *tmp_ptr = mm_get_by_key(socket->port); |
| | |
| | | } |
| | | } |
| | | |
| | | |
| | | // 短连接方式发送 |
| | | int shm_sendto(shm_socket_t *socket, const void *buf, const int size, |
| | | const int port, const struct timespec *timeout) { |
| | | const int port, const struct timespec *timeout, const int flags) { |
| | | if (socket->socket_type != SHM_SOCKET_DGRAM) { |
| | | err_exit(0, "Can't invoke shm_sendto method in a %d type socket which is " |
| | | "not a SHM_SOCKET_DGRAM socket ", |
| | |
| | | |
| | | SHMQueue<shm_msg_t> *remoteQueue; |
| | | if ((remoteQueue = _attach_remote_queue(port)) == NULL) { |
| | | err_msg(0, "shm_sendto failed, then other end has been closed!"); |
| | | err_msg(0, "shm_sendto failed, the other end has been closed, or has not been opened!"); |
| | | return -1; |
| | | } |
| | | // printf("shm_sendto push before\n"); |
| | | bool rv; |
| | | if(timeout != NULL) { |
| | | if(flags & SHM_MSG_NOWAIT != 0) { |
| | | rv = remoteQueue->push_nowait(dest); |
| | | } else if(timeout != NULL) { |
| | | rv = remoteQueue->push_timeout(dest, timeout); |
| | | } else { |
| | | rv = remoteQueue->push(dest); |
| | |
| | | } |
| | | |
| | | // 短连接方式接受 |
| | | int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *port) { |
| | | int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *port, struct timespec *timeout, int flags) { |
| | | if (socket->socket_type != SHM_SOCKET_DGRAM) { |
| | | err_exit(0, "Can't invoke shm_recvfrom method in a %d type socket which " |
| | | "is not a SHM_SOCKET_DGRAM socket ", |
| | |
| | | |
| | | shm_msg_t src; |
| | | // printf("shm_recvfrom pop before\n"); |
| | | if (socket->queue->pop(src)) { |
| | | bool rv; |
| | | if(flags & SHM_MSG_NOWAIT != 0) { |
| | | rv = socket->queue->pop_nowait(src); |
| | | } else if(timeout != NULL) { |
| | | rv = socket->queue->pop_timeout(src, timeout); |
| | | } else { |
| | | rv = socket->queue->pop(src); |
| | | } |
| | | |
| | | if (rv) { |
| | | void *_buf = malloc(src.size); |
| | | memcpy(_buf, src.buf, src.size); |
| | | *buf = _buf; |
| | |
| | | |
| | | int shm_sendandrecv(shm_socket_t *socket, const void *send_buf, |
| | | const int send_size, const int send_port, void **recv_buf, |
| | | int *recv_size) { |
| | | int *recv_size, struct timespec *timeout, int flags) { |
| | | if (socket->socket_type != SHM_SOCKET_DGRAM) { |
| | | err_exit(0, "Can't invoke shm_sendandrecv method in a %d type socket " |
| | | "which is not a SHM_SOCKET_DGRAM socket ", |
| | |
| | | int rv; |
| | | |
| | | shm_socket_t *tmp_socket = shm_open_socket(SHM_SOCKET_DGRAM); |
| | | if (shm_sendto(tmp_socket, send_buf, send_size, send_port) == 0) { |
| | | rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_port); |
| | | if (shm_sendto(tmp_socket, send_buf, send_size, send_port, timeout, flags) == 0) { |
| | | rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_port, timeout, flags); |
| | | shm_close_socket(tmp_socket); |
| | | return rv; |
| | | } |
| | |
| | | |
| | | /* Reserve semaphore - decrement it by 1 */ |
| | | int SemUtil::dec(int semId) { |
| | | logger.debug("%d: SemUtil::dec\n", semId); |
| | | // logger.debug("%d: SemUtil::dec\n", semId); |
| | | struct sembuf sops; |
| | | |
| | | sops.sem_num = 0; |
| | |
| | | |
| | | while (semop(semId, &sops, 1) == -1) |
| | | if (errno != EINTR) { |
| | | err_msg(errno, "SemUtil::dec"); |
| | | // err_msg(errno, "SemUtil::dec"); |
| | | return -1; |
| | | } |
| | | |
| | |
| | | |
| | | while (semop(semId, &sops, 1) == -1) |
| | | if (errno != EINTR) { |
| | | err_msg(errno, "SemUtil::dec_nowait"); |
| | | // err_msg(errno, "SemUtil::dec_nowait"); |
| | | return -1; |
| | | } |
| | | |
| | |
| | | |
| | | while (semtimedop(semId, &sops, 1, timeout) == -1) |
| | | if (errno != EINTR) { |
| | | err_msg(errno, "SemUtil::dec_timeout"); |
| | | //err_msg(errno, "SemUtil::dec_timeout"); |
| | | return -1; |
| | | } |
| | | |
| | |
| | | include $(ROOT)/Make.defines.$(PLATFORM) |
| | | |
| | | |
| | | PROGS = dgram_mod_bus dgram_mod_survey dgram_mod_req_rep |
| | | PROGS = dgram_mod_bus dgram_mod_survey dgram_mod_req_rep test_timeout |
| | | |
| | | |
| | | build: $(PROGS) |
| | |
| | | if(strcmp(action, "sub") == 0) { |
| | | printf("Please input topic!\n"); |
| | | scanf("%s", topic); |
| | | dgram_mod_sub(socket, topic, strlen(topic), port); |
| | | if (dgram_mod_sub(socket, topic, strlen(topic), port) == 0) { |
| | | printf("Sub success!\n"); |
| | | } else { |
| | | printf("Sub failture!\n"); |
| | | exit(0); |
| | | } |
| | | |
| | | } |
| | | else if(strcmp(action, "pub") == 0) { |
| | | // printf("%s %s %s\n", action, topic, content); |
| | | printf("Please input topic and content\n"); |
| | | scanf("%s %s", topic, content); |
| | | dgram_mod_pub(socket, topic, strlen(topic)+1, content, strlen(content)+1, port); |
| | | if(dgram_mod_pub(socket, topic, strlen(topic)+1, content, strlen(content)+1, port) == 0){ |
| | | printf("Pub success!\n"); |
| | | } else { |
| | | printf("Pub failture!\n"); |
| | | } |
| | | |
| | | } else if(strcmp(action, "quit") == 0) { |
| | | break; |
| | | } else { |
New file |
| | |
| | | #include "dgram_mod_socket.h" |
| | | #include "shm_mm.h" |
| | | #include "usg_common.h" |
| | | |
| | | void server(int port) { |
| | | void *socket = dgram_mod_open_socket(); |
| | | dgram_mod_bind(socket, port); |
| | | int size; |
| | | void *recvbuf; |
| | | char sendbuf[512]; |
| | | int rv; |
| | | int remote_port; |
| | | if ( (rv = dgram_mod_recvfrom_timeout(socket, &recvbuf, &size, &remote_port, 5, 0) ) == 0) { |
| | | printf( "RECEIVED HREARTBEAT FROM %d: %s\n", remote_port, recvbuf); |
| | | free(recvbuf); |
| | | } else { |
| | | printf("RECEIVED failture, timeout\n"); |
| | | } |
| | | sleep(100); |
| | | dgram_mod_close_socket(socket); |
| | | } |
| | | |
| | | void client(int port) { |
| | | void *socket = dgram_mod_open_socket(); |
| | | int size; |
| | | char sendbuf[512]; |
| | | long i = 0; |
| | | while (true) { |
| | | sprintf(sendbuf, "%d", i); |
| | | |
| | | if(dgram_mod_sendto_timeout(socket, sendbuf, strlen(sendbuf) + 1, port, 2, 0) == 0) { |
| | | printf("SEND HEART:%s\n", sendbuf); |
| | | printf("send success\n"); |
| | | } else { |
| | | printf("send failture, timeout\n"); |
| | | } |
| | | i++; |
| | | } |
| | | dgram_mod_close_socket(socket); |
| | | } |
| | | |
| | | |
| | | |
| | | int main(int argc, char *argv[]) { |
| | | shm_init(512); |
| | | int port; |
| | | if (argc < 3) { |
| | | fprintf(stderr, "Usage: reqrep %s|%s <PORT> ...\n", "server", "client"); |
| | | return 1; |
| | | } |
| | | |
| | | port = atoi(argv[2]); |
| | | |
| | | if (strcmp("server", argv[1]) == 0) { |
| | | server(port); |
| | | } |
| | | |
| | | if (strcmp("client", argv[1]) == 0) |
| | | client(port); |
| | | |
| | | |
| | | return 0; |
| | | } |