| | |
| | | DIRS = queue test |
| | | DIRS = queue test2 |
| | | |
| | | all: |
| | | for i in $(DIRS); do \ |
| | |
| | | #ifndef __MOD_SOCKET_H__ |
| | | #define __MOD_SOCKET_H__ |
| | | |
| | | #include "shm_socket.h" |
| | | |
| | | #ifdef __cplusplus |
| | | extern "C" { |
| | | #endif |
| | | |
| | | enum shm_mod_t |
| | | enum socket_mod_t |
| | | { |
| | | PULL_PUSH = 1, |
| | | REQ_REP = 2, |
| | |
| | | }; |
| | | |
| | | |
| | | void *mod_open_socket(int mod); |
| | | |
| | | int mod_close_socket(void * _socket); |
| | | |
| | | int mod_socket_bind(void * _socket, int port); |
| | | |
| | | |
| | | int mod_listen(void * _socket); |
| | | |
| | | |
| | | int mod_connect(void * _socket, int port); |
| | | |
| | | int mod_send(void * _socket, void *buf, int size); |
| | | |
| | | int mod_recv(void * _socket, void **buf, int *size) ; |
| | | |
| | | void mod_free(void *buf); |
| | | |
| | | #ifdef __cplusplus |
| | | } |
New file |
| | |
| | | #ifndef __SHM_MM_H__ |
| | | #define __SHM_MM_H__ |
| | | |
| | | #ifdef __cplusplus |
| | | extern "C" { |
| | | #endif |
| | | |
| | | void shm_init(int size); |
| | | void shm_destroy() ; |
| | | |
| | | |
| | | |
| | | |
| | | #ifdef __cplusplus |
| | | } |
| | | #endif |
| | | |
| | | #endif |
| | | |
| | |
| | | |
| | | }; |
| | | |
| | | enum shm_connection_status_t { |
| | | SHM_CONN_CLOSED=1, |
| | | SHM_CONN_LISTEN=2, |
| | | SHM_CONN_ESTABLISHED=3 |
| | | }; |
| | | |
| | | typedef struct shm_msg_t { |
| | | int port; |
| | | shm_msg_type_t type; |
| | |
| | | typedef struct shm_socket_t { |
| | | // 本地port |
| | | int port; |
| | | shm_connection_status_t status; |
| | | SHMQueue<shm_msg_t> *queue; |
| | | SHMQueue<shm_msg_t> *remoteQueue; |
| | | LockFreeQueue<shm_msg_t, DM_Allocator> *messageQueue; |
| | |
| | | int shm_close_socket(shm_socket_t * socket) ; |
| | | |
| | | |
| | | int shm_bind(shm_socket_t * socket, int port) ; |
| | | int shm_soket_bind(shm_socket_t * socket, int port) ; |
| | | |
| | | int shm_listen(shm_socket_t * socket) ; |
| | | |
| | |
| | | #include "mod_socket.h" |
| | | #include "shm_socket.h" |
| | | #include "usg_common.h" |
| | | typedef struct mod_entry_t |
| | | { |
| | | int size; |
| | | void *buf; |
| | | shm_socket_t *client_socket; |
| | | }mod_entry_t; |
| | | |
| | | typedef struct mod_socket_t { |
| | | socket_mod_t mod; |
| | | shm_socket_t *shm_socket; |
| | | shm_socket_t *client_socket; |
| | | int is_server; |
| | | LockFreeQueue<mod_entry_t, DM_Allocator> *recvQueue; |
| | | int slots; |
| | | int items; |
| | | |
| | | |
| | | } mod_socket_t; |
| | | |
| | | /** |
| | | * |
| | | */ |
| | | void *mod_open_socket(int mod) { |
| | | mod_socket_t *socket = (mod_socket_t *)malloc(sizeof(mod_socket_t)); |
| | | socket->shm_socket=shm_open_socket(); |
| | | socket->is_server = 0; |
| | | socket->mod = (socket_mod_t)mod; |
| | | socket->recvQueue = new LockFreeQueue<mod_entry_t, DM_Allocator>(16); |
| | | if (mod == REQ_REP) { |
| | | socket->slots = SemUtil::get(IPC_PRIVATE, 1); |
| | | socket->items = SemUtil::get(IPC_PRIVATE, 0); |
| | | } |
| | | |
| | | return (void *)socket; |
| | | } |
| | | |
| | | |
| | | |
| | | int mod_close_socket(void * _socket){ |
| | | mod_socket_t * socket = (mod_socket_t *) _socket; |
| | | |
| | | if (socket->mod == REQ_REP) { |
| | | SemUtil::remove(socket->slots); |
| | | SemUtil::remove(socket->items); |
| | | } |
| | | |
| | | int rv = shm_close_socket(socket->shm_socket); |
| | | free(_socket); |
| | | return rv; |
| | | } |
| | | |
| | | |
| | | int mod_socket_bind(void * _socket, int port){ |
| | | mod_socket_t * socket = (mod_socket_t *) _socket; |
| | | return shm_soket_bind(socket->shm_socket, port); |
| | | } |
| | | |
| | | void * run_server_recv_client_msg(void *_socket) { |
| | | pthread_detach(pthread_self()); |
| | | mod_socket_t * socket = (mod_socket_t *) _socket; |
| | | shm_socket_t * client_socket = socket->client_socket; |
| | | |
| | | mod_entry_t entry; |
| | | entry.client_socket = client_socket; |
| | | while (socket->shm_socket->status == SHM_CONN_LISTEN && |
| | | client_socket->status == SHM_CONN_ESTABLISHED && shm_recv(client_socket, &entry.buf, &entry.size) == 0 ) { |
| | | |
| | | socket->recvQueue->push(entry); |
| | | // shm_free(recvbuf); |
| | | } |
| | | free(_socket); |
| | | shm_close_socket(client_socket); |
| | | return NULL; |
| | | } |
| | | |
| | | void *run_accept_connection(void * _socket) { |
| | | mod_socket_t * socket = (mod_socket_t *) _socket; |
| | | shm_socket_t *client_socket; |
| | | pthread_t tid; |
| | | while(socket->shm_socket->status == SHM_CONN_LISTEN) { |
| | | client_socket = shm_accept(socket->shm_socket); |
| | | |
| | | mod_socket_t *arg = (mod_socket_t *)malloc(sizeof(mod_socket_t)); |
| | | memcpy(arg, _socket, sizeof(mod_socket_t)); |
| | | arg->client_socket = client_socket; |
| | | pthread_create(&tid, NULL, run_server_recv_client_msg , (void *)arg); |
| | | } |
| | | return NULL; |
| | | } |
| | | |
| | | int mod_listen(void * _socket) { |
| | | mod_socket_t * socket = (mod_socket_t *) _socket; |
| | | pthread_t tid; |
| | | socket->is_server = 1; |
| | | int rv = shm_listen(socket->shm_socket); |
| | | if(rv == 0) { |
| | | pthread_create(&tid, NULL, run_accept_connection, _socket); |
| | | return 0; |
| | | } |
| | | return -1; |
| | | } |
| | | |
| | | |
| | | int mod_connect(void * _socket, int port) { |
| | | mod_socket_t * socket = (mod_socket_t *) _socket; |
| | | return shm_connect(socket->shm_socket, port); |
| | | |
| | | } |
| | | |
| | | int mod_send(void * _socket, void *buf, int size) { |
| | | mod_socket_t * socket = (mod_socket_t *) _socket; |
| | | if(!socket->is_server ) { |
| | | return shm_send(socket->shm_socket, buf, size); |
| | | } |
| | | else if(socket->mod == REQ_REP) { |
| | | SemUtil::dec(socket->items); |
| | | shm_send(socket->client_socket, buf, size); |
| | | SemUtil::inc(socket->slots); |
| | | return 0; |
| | | } |
| | | return -1; |
| | | |
| | | } |
| | | |
| | | int mod_recv(void * _socket, void **buf, int *size) { |
| | | mod_socket_t * socket = (mod_socket_t *) _socket; |
| | | mod_entry_t entry; |
| | | |
| | | if(!socket->is_server ) { |
| | | return shm_recv(socket->shm_socket, buf, size); |
| | | } |
| | | else if(socket->mod == REQ_REP) { |
| | | SemUtil::dec(socket->slots); |
| | | socket->recvQueue->pop(entry); |
| | | *buf = entry.buf; |
| | | *size = entry.size; |
| | | socket->client_socket = entry.client_socket; |
| | | SemUtil::inc(socket->items); |
| | | return 0; |
| | | } |
| | | return -1; |
| | | |
| | | |
| | | } |
| | | |
| | | |
| | | void mod_free(void *buf) { |
| | | free(buf); |
| | | } |
| | | |
| | |
| | | sops.sem_flg = 0; |
| | | |
| | | while (semop(semId, &sops, 1) == -1) |
| | | if (errno != EINTR ) |
| | | if (errno != EINTR ) { |
| | | err_msg(errno, "SemUtil::dec"); |
| | | return -1; |
| | | } |
| | | |
| | | return 0; |
| | | } |
| | |
| | | sops.sem_flg = IPC_NOWAIT; |
| | | |
| | | while (semop(semId, &sops, 1) == -1) |
| | | if (errno != EINTR ) |
| | | if (errno != EINTR ) { |
| | | err_msg(errno, "SemUtil::dec_nowait"); |
| | | return -1; |
| | | } |
| | | |
| | | return 0; |
| | | } |
| | |
| | | sops.sem_flg = 0; |
| | | |
| | | while ( semtimedop(semId, &sops, 1, timeout) == -1) |
| | | if (errno != EINTR ) |
| | | if (errno != EINTR ) { |
| | | err_msg(errno, "SemUtil::dec_timeout"); |
| | | return -1; |
| | | } |
| | | |
| | | return 0; |
| | | } |
| | |
| | | sops.sem_op = 1; |
| | | sops.sem_flg = 0; |
| | | |
| | | return semop(semId, &sops, 1); |
| | | int rv = semop(semId, &sops, 1); |
| | | if(rv == -1) { |
| | | err_msg(errno, "SemUtil::inc"); |
| | | } |
| | | return rv; |
| | | } |
| | | |
| | | void SemUtil::remove(int semid) { |
New file |
| | |
| | | #include "shm_mm.h" |
| | | #include "mem_pool.h" |
| | | |
| | | void shm_init(int size) { |
| | | mem_pool_init(size); |
| | | } |
| | | |
| | | void shm_destroy() { |
| | | mem_pool_destroy(); |
| | | } |
| | | |
| | | |
| | |
| | | |
| | | SHMQueue<shm_msg_t> * _attach_remote_queue(int port) ; |
| | | |
| | | void shm_init(int size) { |
| | | mem_pool_init(size); |
| | | } |
| | | |
| | | void shm_destroy() { |
| | | mem_pool_destroy(); |
| | | } |
| | | |
| | | void shm_free(void *buf) { |
| | | free(buf); |
| | | } |
| | | |
| | | shm_socket_t *shm_open_socket() { |
| | | shm_socket_t *socket = (shm_socket_t *)calloc(1, sizeof(shm_socket_t)); |
| | | |
| | | socket->port = -1; |
| | | socket->dispatch_thread = 0; |
| | | socket->status=SHM_CONN_CLOSED; |
| | | |
| | | return socket; |
| | | } |
| | | |
| | | |
| | | int _shm_close_socket(shm_socket_t *socket, bool notifyRemote) { |
| | | socket->status = SHM_CONN_CLOSED; |
| | | //给对方发送一个关闭连接的消息 |
| | | struct timespec timeout = {1, 0}; |
| | | shm_msg_t close_msg; |
| | |
| | | |
| | | |
| | | |
| | | int shm_bind(shm_socket_t * socket, int port) { |
| | | int shm_soket_bind(shm_socket_t * socket, int port) { |
| | | shm_socket_t * _socket = (shm_socket_t *) socket; |
| | | _socket -> port = port; |
| | | return 0; |
| | |
| | | |
| | | pthread_create(&(socket->dispatch_thread), NULL, _server_run_msg_rev , (void *)socket); |
| | | |
| | | socket->status = SHM_CONN_LISTEN; |
| | | return 0; |
| | | } |
| | | |
| | |
| | | shm_socket_t *client_socket; |
| | | auto iter = socket->clientSocketMap->find(port); |
| | | if( iter != socket->clientSocketMap->end() ) { |
| | | // client_socket= iter->second; |
| | | // if(client_socket->remoteQueue != NULL) { |
| | | // delete client_socket->remoteQueue; |
| | | // client_socket->remoteQueue = NULL; |
| | | // } |
| | | // if(client_socket->messageQueue != NULL) { |
| | | // delete client_socket->messageQueue; |
| | | // client_socket->messageQueue = NULL; |
| | | // } |
| | | |
| | | socket->clientSocketMap->erase(iter); |
| | | } |
| | | //free((void *)client_socket); |
| | |
| | | case SHM_COMMON_MSG : |
| | | |
| | | iter = socket->clientSocketMap->find(src.port); |
| | | print_msg("_server_run_msg_rev find before", src); |
| | | if( iter != socket->clientSocketMap->end()) { |
| | | client_socket= iter->second; |
| | | print_msg("_server_run_msg_rev push before", src); |
| | | // print_msg("_server_run_msg_rev push before", src); |
| | | client_socket->messageQueue->push_timeout(src, &timeout); |
| | | print_msg("_server_run_msg_rev push after", src); |
| | | // print_msg("_server_run_msg_rev push after", src); |
| | | } |
| | | |
| | | break; |
| | |
| | | /* |
| | | * shm_accept 用户执行的方法 与_server_run_msg_rev在两个不同的限制工作,accept要保证在客户的发送消息之前完成资源的准备工作,以避免出现竞态问题 |
| | | */ |
| | | //发送open_reply |
| | | //发送open_reply,回应客户端的connect请求 |
| | | struct timespec timeout = {1, 0}; |
| | | shm_msg_t msg; |
| | | msg.port = socket->port; |
| | |
| | | |
| | | if (client_socket->remoteQueue->push_timeout(msg, &timeout) ) |
| | | { |
| | | client_socket->status = SHM_CONN_ESTABLISHED; |
| | | return client_socket; |
| | | } else { |
| | | err_msg(0, "shm_accept: 发送open_reply失败"); |
| | |
| | | |
| | | //接受open reply |
| | | if(socket->queue->pop(msg)) { |
| | | // 在这里server端已经准备好接受客户端发送请求了 |
| | | // 在这里server端已经准备好接受客户端发送请求了,完成与服务端的连接 |
| | | if(msg.type == SHM_SOCKET_OPEN_REPLY) { |
| | | |
| | | socket->status = SHM_CONN_ESTABLISHED; |
| | | pthread_create(&(socket->dispatch_thread), NULL, _client_run_msg_rev , (void *)socket); |
| | | } else { |
| | | err_exit(0, "shm_connect: 不匹配的应答信息!"); |
| | |
| | | |
| | | if (_socket->mod == REQ_REP) { |
| | | SemUtil::remove(_socket->slots); |
| | | SemUtil::remove(_socket->items); |
| | | SemUtil::remove(_socket->items); |
| | | } |
| | | |
| | | free(socket); |
| | |
| | | void server(int port) { |
| | | pthread_t tid; |
| | | shm_socket_t *socket = shm_open_socket(); |
| | | shm_bind(socket, port); |
| | | shm_socket_bind(socket, port); |
| | | shm_listen(socket); |
| | | shm_socket_t *client_socket; |
| | | while(true) { |
| | |
| | | #include "socket.h" |
| | | #include "mod_socket.h" |
| | | #include "shm_mm.h" |
| | | #include "usg_common.h" |
| | | typedef struct Targ { |
| | | int port; |
| | | int id; |
| | | }Targ; |
| | | |
| | | |
| | | void server(int port) { |
| | | void *socket = shm_open_socket(REQ_REP); |
| | | shm_bind(socket, port); |
| | | shm_listen(socket); |
| | | int size; |
| | | void *recvbuf; |
| | | char sendbuf[512]; |
| | | while(true) { |
| | | shm_recv(socket, &recvbuf, &size); |
| | | sprintf(sendbuf, "SERVER RECEIVED: %s", recvbuf); |
| | | puts(sendbuf); |
| | | shm_send(socket, sendbuf, strlen(sendbuf)+1) ; |
| | | shm_free(recvbuf); |
| | | |
| | | } |
| | | shm_close_socket(socket); |
| | | void *socket = mod_open_socket(REQ_REP); |
| | | mod_socket_bind(socket, port); |
| | | mod_listen(socket); |
| | | int size; |
| | | void *recvbuf; |
| | | char sendbuf[512]; |
| | | while (mod_recv(socket, &recvbuf, &size) == 0) { |
| | | sprintf(sendbuf, "SERVER RECEIVED: %s", recvbuf); |
| | | puts(sendbuf); |
| | | mod_send(socket, sendbuf, strlen(sendbuf) + 1); |
| | | free(recvbuf); |
| | | } |
| | | mod_close_socket(socket); |
| | | } |
| | | |
| | | void client(int port) { |
| | | void *socket = shm_open_socket(REQ_REP); |
| | | shm_connect(socket, port); |
| | | void *socket = mod_open_socket(REQ_REP); |
| | | mod_connect(socket, port); |
| | | int size; |
| | | void *recvbuf; |
| | | char sendbuf[512]; |
| | | while (true) { |
| | | printf("request: "); |
| | | scanf("%s", sendbuf); |
| | | mod_send(socket, sendbuf, strlen(sendbuf) + 1); |
| | | mod_recv(socket, &recvbuf, &size); |
| | | printf("reply: %s\n", (char *)recvbuf); |
| | | free(recvbuf); |
| | | } |
| | | mod_close_socket(socket); |
| | | } |
| | | |
| | | |
| | | |
| | | |
| | | void *threadrun(void *arg) { |
| | | Targ *targ = (Targ *)arg; |
| | | int port = targ->port; |
| | | char sendbuf[512]; |
| | | int scale = 100000; |
| | | int i; |
| | | void *socket = mod_open_socket(REQ_REP); |
| | | mod_connect(socket, port); |
| | | |
| | | char filename[512]; |
| | | sprintf(filename, "test%d.txt", targ->id); |
| | | FILE *fp = NULL; |
| | | fp = fopen(filename, "w+"); |
| | | |
| | | int size; |
| | | void *recvbuf; |
| | | char sendbuf[512]; |
| | | while(true) { |
| | | printf("request: "); |
| | | scanf("%s", sendbuf); |
| | | shm_send(socket, sendbuf, strlen(sendbuf)+1) ; |
| | | shm_recv(socket, &recvbuf, &size); |
| | | printf("reply: %s\n", (char *)recvbuf); |
| | | shm_free(recvbuf); |
| | | for (i = 0; i < scale; i++) { |
| | | sprintf(sendbuf, "thread(%d) %d", targ->id, i); |
| | | |
| | | } |
| | | shm_close_socket(socket); |
| | | fprintf(fp, "requst:%s\n", sendbuf); |
| | | mod_send(socket, sendbuf, strlen(sendbuf)+1) ; |
| | | mod_recv(socket, &recvbuf, &size); |
| | | fprintf(fp, "reply: %s\n", (char *)recvbuf); |
| | | free(recvbuf); |
| | | } |
| | | fclose(fp); |
| | | mod_close_socket(socket); |
| | | return (void *)i; |
| | | } |
| | | |
| | | void multyThreadClient(int port) { |
| | | |
| | | int status, i = 0, processors = 4; |
| | | void *res[processors]; |
| | | Targ *targs = (Targ *)calloc(processors, sizeof(Targ)); |
| | | pthread_t tids[processors]; |
| | | char sendbuf[512]; |
| | | for (i = 0; i < processors; i++) { |
| | | targs[i].port = port; |
| | | targs[i].id = i; |
| | | pthread_create(&tids[i], NULL, threadrun, (void *)&targs[i]); |
| | | } |
| | | |
| | | for (i = 0; i < processors; i++) { |
| | | if (pthread_join(tids[i], &res[i]) != 0) { |
| | | perror("multyThreadClient pthread_join"); |
| | | } else { |
| | | fprintf(stderr, "client(%d) 写入 %ld 条数据\n", i, (long)res[i]); |
| | | } |
| | | } |
| | | } |
| | | |
| | | int main(int argc, char *argv[]) { |
| | | shm_init(512); |
| | | int port; |
| | | if (argc < 3) { |
| | | fprintf(stderr, "Usage: reqrep %s|%s <PORT> ...\n", "server", "client"); |
| | | return 1; |
| | | fprintf(stderr, "Usage: reqrep %s|%s <PORT> ...\n", "server", "client"); |
| | | return 1; |
| | | } |
| | | |
| | | port = atoi(argv[2]); |
| | | |
| | | if (strcmp("server", argv[1]) == 0 ) { |
| | | server(port); |
| | | if (strcmp("server", argv[1]) == 0) { |
| | | server(port); |
| | | } |
| | | |
| | | if (strcmp("client", argv[1]) == 0) |
| | | client(port); |
| | | shm_destroy(); |
| | | // fprintf(stderr, "Usage: reqrep %s|%s <URL> ...\n", "server", "client"); |
| | | client(port); |
| | | |
| | | if (strcmp("mclient", argv[1]) == 0) |
| | | multyThreadClient(port); |
| | | shm_destroy(); |
| | | // fprintf(stderr, "Usage: reqrep %s|%s <URL> ...\n", "server", "client"); |
| | | return 0; |
| | | } |