wangzhengquan
2020-07-17 7032fedd41386f8a0b779d234620b473d978f889
req_rep finished
7个文件已添加
11个文件已修改
1600437 ■■■■■ 已修改文件
Makefile 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
queue/include/mod_socket.h 20 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
queue/include/shm_mm.h 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
queue/include/shm_socket.h 9 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
queue/libshm_queue.a 补丁 | 查看 | 原始文档 | blame | 历史
queue/mod_socket.c 151 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
queue/sem_util.c 18 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
queue/shm_mm.c 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
queue/shm_socket.c 39 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
queue/socket.c.bk 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/communication.c 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
test2/log.txt 800032 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test2/req_rep 补丁 | 查看 | 原始文档 | blame | 历史
test2/req_rep.c 131 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test2/test0.txt 200000 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test2/test1.txt 200000 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test2/test2.txt 200000 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test2/test3.txt 200000 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
Makefile
@@ -1,4 +1,4 @@
DIRS = queue  test
DIRS = queue  test2
all:
    for i in $(DIRS); do \
queue/include/mod_socket.h
@@ -1,12 +1,12 @@
#ifndef __MOD_SOCKET_H__
#define __MOD_SOCKET_H__
#include "shm_socket.h"
#ifdef __cplusplus
extern "C" {
#endif
enum shm_mod_t
enum socket_mod_t
{
    PULL_PUSH = 1,
    REQ_REP = 2,
@@ -18,7 +18,23 @@
};
void *mod_open_socket(int mod);
int mod_close_socket(void * _socket);
int mod_socket_bind(void * _socket, int port);
int mod_listen(void * _socket);
int mod_connect(void * _socket, int port);
int mod_send(void * _socket, void *buf, int size);
int mod_recv(void * _socket, void **buf, int *size) ;
void mod_free(void *buf);
#ifdef __cplusplus
}
queue/include/shm_mm.h
New file
@@ -0,0 +1,19 @@
#ifndef __SHM_MM_H__
#define __SHM_MM_H__
#ifdef __cplusplus
extern "C" {
#endif
void shm_init(int size);
void shm_destroy() ;
#ifdef __cplusplus
}
#endif
#endif
queue/include/shm_socket.h
@@ -23,6 +23,12 @@
    
};
enum shm_connection_status_t {
    SHM_CONN_CLOSED=1,
    SHM_CONN_LISTEN=2,
    SHM_CONN_ESTABLISHED=3
};
typedef struct shm_msg_t {
    int port;
    shm_msg_type_t type;
@@ -35,6 +41,7 @@
typedef struct shm_socket_t {
    // 本地port
    int port;
    shm_connection_status_t status;
    SHMQueue<shm_msg_t> *queue;
    SHMQueue<shm_msg_t> *remoteQueue;
    LockFreeQueue<shm_msg_t, DM_Allocator> *messageQueue;
@@ -69,7 +76,7 @@
int shm_close_socket(shm_socket_t * socket) ;
int shm_bind(shm_socket_t * socket, int port) ;
int shm_soket_bind(shm_socket_t * socket, int port) ;
int shm_listen(shm_socket_t * socket) ;
queue/libshm_queue.a
Binary files differ
queue/mod_socket.c
@@ -1,2 +1,153 @@
#include "mod_socket.h"
#include "shm_socket.h"
#include "usg_common.h"
typedef struct mod_entry_t
{
    int size;
    void *buf;
    shm_socket_t *client_socket;
}mod_entry_t;
typedef struct mod_socket_t {
  socket_mod_t mod;
  shm_socket_t *shm_socket;
  shm_socket_t *client_socket;
  int is_server;
  LockFreeQueue<mod_entry_t, DM_Allocator> *recvQueue;
  int slots;
  int items;
} mod_socket_t;
/**
 *
 */
void *mod_open_socket(int mod) {
  mod_socket_t *socket = (mod_socket_t *)malloc(sizeof(mod_socket_t));
  socket->shm_socket=shm_open_socket();
  socket->is_server = 0;
  socket->mod = (socket_mod_t)mod;
  socket->recvQueue = new LockFreeQueue<mod_entry_t, DM_Allocator>(16);
  if (mod == REQ_REP) {
    socket->slots = SemUtil::get(IPC_PRIVATE, 1);
    socket->items = SemUtil::get(IPC_PRIVATE, 0);
  }
  return (void *)socket;
}
int mod_close_socket(void * _socket){
    mod_socket_t * socket = (mod_socket_t *) _socket;
    if (socket->mod == REQ_REP) {
        SemUtil::remove(socket->slots);
    SemUtil::remove(socket->items);
    }
    int rv = shm_close_socket(socket->shm_socket);
    free(_socket);
    return rv;
}
int mod_socket_bind(void * _socket, int port){
    mod_socket_t * socket = (mod_socket_t *) _socket;
    return  shm_soket_bind(socket->shm_socket, port);
}
void * run_server_recv_client_msg(void *_socket) {
    pthread_detach(pthread_self());
    mod_socket_t * socket = (mod_socket_t *) _socket;
    shm_socket_t * client_socket = socket->client_socket;
    mod_entry_t entry;
    entry.client_socket = client_socket;
    while (socket->shm_socket->status == SHM_CONN_LISTEN &&
        client_socket->status == SHM_CONN_ESTABLISHED && shm_recv(client_socket, &entry.buf, &entry.size) == 0 ) {
        socket->recvQueue->push(entry);
        // shm_free(recvbuf);
    }
    free(_socket);
    shm_close_socket(client_socket);
    return NULL;
}
void *run_accept_connection(void * _socket) {
    mod_socket_t * socket = (mod_socket_t *) _socket;
    shm_socket_t *client_socket;
    pthread_t tid;
    while(socket->shm_socket->status == SHM_CONN_LISTEN) {
        client_socket = shm_accept(socket->shm_socket);
        mod_socket_t *arg = (mod_socket_t *)malloc(sizeof(mod_socket_t));
        memcpy(arg, _socket, sizeof(mod_socket_t));
        arg->client_socket = client_socket;
        pthread_create(&tid, NULL, run_server_recv_client_msg , (void *)arg);
    }
    return NULL;
}
int mod_listen(void * _socket) {
    mod_socket_t * socket = (mod_socket_t *) _socket;
    pthread_t tid;
    socket->is_server = 1;
    int rv = shm_listen(socket->shm_socket);
    if(rv == 0) {
        pthread_create(&tid, NULL, run_accept_connection, _socket);
        return 0;
    }
    return -1;
}
int mod_connect(void * _socket, int port) {
    mod_socket_t * socket = (mod_socket_t *) _socket;
    return shm_connect(socket->shm_socket, port);
}
int mod_send(void * _socket, void *buf, int size) {
    mod_socket_t * socket = (mod_socket_t *) _socket;
    if(!socket->is_server ) {
        return shm_send(socket->shm_socket, buf, size);
    }
    else if(socket->mod == REQ_REP) {
        SemUtil::dec(socket->items);
        shm_send(socket->client_socket, buf, size);
        SemUtil::inc(socket->slots);
        return 0;
    }
    return -1;
}
int mod_recv(void * _socket, void **buf, int *size) {
    mod_socket_t * socket = (mod_socket_t *) _socket;
    mod_entry_t entry;
    if(!socket->is_server ) {
        return shm_recv(socket->shm_socket, buf, size);
    }
    else if(socket->mod == REQ_REP) {
        SemUtil::dec(socket->slots);
        socket->recvQueue->pop(entry);
        *buf = entry.buf;
        *size = entry.size;
        socket->client_socket = entry.client_socket;
        SemUtil::inc(socket->items);
        return 0;
    }
    return -1;
}
void mod_free(void *buf) {
    free(buf);
}
queue/sem_util.c
@@ -79,8 +79,10 @@
    sops.sem_flg =  0;
    while (semop(semId, &sops, 1) == -1)
        if (errno != EINTR )
        if (errno != EINTR ) {
            err_msg(errno, "SemUtil::dec");
            return -1;
        }
    return 0;
}
@@ -94,8 +96,10 @@
    sops.sem_flg =  IPC_NOWAIT;
    while (semop(semId, &sops, 1) == -1)
        if (errno != EINTR )
        if (errno != EINTR ) {
            err_msg(errno, "SemUtil::dec_nowait");
            return -1;
        }
    return 0;
}
@@ -109,8 +113,10 @@
    sops.sem_flg = 0;
    while ( semtimedop(semId, &sops, 1, timeout) == -1)
        if (errno != EINTR )
        if (errno != EINTR ) {
            err_msg(errno, "SemUtil::dec_timeout");
            return -1;
        }
    return 0;
}
@@ -126,7 +132,11 @@
    sops.sem_op = 1;
    sops.sem_flg = 0;
    return semop(semId, &sops, 1);
    int rv = semop(semId, &sops, 1);
    if(rv == -1) {
        err_msg(errno, "SemUtil::inc");
    }
    return rv;
}
void SemUtil::remove(int semid) {
queue/shm_mm.c
New file
@@ -0,0 +1,12 @@
#include "shm_mm.h"
#include "mem_pool.h"
void shm_init(int size) {
    mem_pool_init(size);
}
void shm_destroy() {
    mem_pool_destroy();
}
queue/shm_socket.c
@@ -16,29 +16,19 @@
SHMQueue<shm_msg_t> * _attach_remote_queue(int port) ;
void shm_init(int size) {
    mem_pool_init(size);
}
void shm_destroy() {
    mem_pool_destroy();
}
void shm_free(void *buf) {
    free(buf);
}
shm_socket_t *shm_open_socket() {
    shm_socket_t *socket = (shm_socket_t *)calloc(1, sizeof(shm_socket_t));
    
    socket->port = -1;
    socket->dispatch_thread = 0;
    socket->status=SHM_CONN_CLOSED;
    
    return socket;
}
int _shm_close_socket(shm_socket_t *socket, bool notifyRemote) {
    socket->status = SHM_CONN_CLOSED;
    //给对方发送一个关闭连接的消息
    struct timespec timeout = {1, 0};
    shm_msg_t close_msg;
@@ -101,7 +91,7 @@
int shm_bind(shm_socket_t * socket, int port) {
int shm_soket_bind(shm_socket_t * socket, int port) {
    shm_socket_t * _socket = (shm_socket_t *) socket;
    _socket -> port = port;
    return 0;
@@ -126,6 +116,7 @@
    
    pthread_create(&(socket->dispatch_thread), NULL, _server_run_msg_rev , (void *)socket);
    socket->status = SHM_CONN_LISTEN;
    return 0;
}
@@ -133,16 +124,6 @@
    shm_socket_t *client_socket;
    auto iter = socket->clientSocketMap->find(port);
    if( iter !=  socket->clientSocketMap->end() ) {
        // client_socket= iter->second;
        // if(client_socket->remoteQueue != NULL) {
        //     delete client_socket->remoteQueue;
        //     client_socket->remoteQueue = NULL;
        // }
        // if(client_socket->messageQueue != NULL) {
        //     delete client_socket->messageQueue;
        //     client_socket->messageQueue = NULL;
        // }
        socket->clientSocketMap->erase(iter);
    }
    //free((void *)client_socket);
@@ -172,12 +153,11 @@
            case SHM_COMMON_MSG :
                iter = socket->clientSocketMap->find(src.port);
    print_msg("_server_run_msg_rev find before", src);
                if( iter !=  socket->clientSocketMap->end()) {
                    client_socket= iter->second;
    print_msg("_server_run_msg_rev push before", src);
    // print_msg("_server_run_msg_rev push before", src);
                    client_socket->messageQueue->push_timeout(src, &timeout);
    print_msg("_server_run_msg_rev push after", src);
    // print_msg("_server_run_msg_rev push after", src);
                }
                
                break;
@@ -221,7 +201,7 @@
        /*
         * shm_accept 用户执行的方法 与_server_run_msg_rev在两个不同的限制工作,accept要保证在客户的发送消息之前完成资源的准备工作,以避免出现竞态问题
        */
        //发送open_reply
        //发送open_reply,回应客户端的connect请求
        struct timespec timeout = {1, 0};
        shm_msg_t msg;
        msg.port = socket->port;
@@ -230,6 +210,7 @@
        if (client_socket->remoteQueue->push_timeout(msg, &timeout) )
        {
            client_socket->status = SHM_CONN_ESTABLISHED;
            return client_socket;
        } else {
            err_msg(0, "shm_accept: 发送open_reply失败");
@@ -274,9 +255,9 @@
    //接受open reply
    if(socket->queue->pop(msg)) {
        // 在这里server端已经准备好接受客户端发送请求了
        // 在这里server端已经准备好接受客户端发送请求了,完成与服务端的连接
        if(msg.type == SHM_SOCKET_OPEN_REPLY) {
            socket->status = SHM_CONN_ESTABLISHED;
            pthread_create(&(socket->dispatch_thread), NULL, _client_run_msg_rev , (void *)socket);
        } else {
            err_exit(0, "shm_connect: 不匹配的应答信息!");
queue/socket.c.bk
@@ -80,7 +80,7 @@
    if (_socket->mod == REQ_REP) {
        SemUtil::remove(_socket->slots);
        SemUtil::remove(_socket->items);
    SemUtil::remove(_socket->items);
    }
    
    free(socket);
test/communication.c
@@ -22,7 +22,7 @@
void server(int port) {
    pthread_t tid;
    shm_socket_t *socket = shm_open_socket();
    shm_bind(socket, port);
    shm_socket_bind(socket, port);
    shm_listen(socket);
    shm_socket_t *client_socket;
    while(true) {
test2/log.txt
New file
Diff too large
test2/req_rep
Binary files differ
test2/req_rep.c
@@ -1,59 +1,120 @@
#include "socket.h"
#include "mod_socket.h"
#include "shm_mm.h"
#include "usg_common.h"
typedef struct Targ {
    int port;
    int id;
}Targ;
void server(int port) {
    void *socket = shm_open_socket(REQ_REP);
    shm_bind(socket, port);
    shm_listen(socket);
    int size;
    void *recvbuf;
    char sendbuf[512];
    while(true) {
        shm_recv(socket, &recvbuf, &size);
        sprintf(sendbuf, "SERVER RECEIVED: %s", recvbuf);
        puts(sendbuf);
        shm_send(socket, sendbuf, strlen(sendbuf)+1) ;
        shm_free(recvbuf);
    }
    shm_close_socket(socket);
  void *socket = mod_open_socket(REQ_REP);
  mod_socket_bind(socket, port);
  mod_listen(socket);
  int size;
  void *recvbuf;
  char sendbuf[512];
  while (mod_recv(socket, &recvbuf, &size) == 0) {
    sprintf(sendbuf, "SERVER RECEIVED: %s", recvbuf);
    puts(sendbuf);
    mod_send(socket, sendbuf, strlen(sendbuf) + 1);
    free(recvbuf);
  }
  mod_close_socket(socket);
}
void client(int port) {
    void *socket = shm_open_socket(REQ_REP);
    shm_connect(socket, port);
  void *socket = mod_open_socket(REQ_REP);
  mod_connect(socket, port);
  int size;
  void *recvbuf;
  char sendbuf[512];
  while (true) {
    printf("request: ");
    scanf("%s", sendbuf);
    mod_send(socket, sendbuf, strlen(sendbuf) + 1);
    mod_recv(socket, &recvbuf, &size);
    printf("reply: %s\n", (char *)recvbuf);
    free(recvbuf);
  }
  mod_close_socket(socket);
}
void *threadrun(void *arg) {
  Targ *targ = (Targ *)arg;
  int port = targ->port;
  char sendbuf[512];
  int scale = 100000;
  int i;
  void *socket = mod_open_socket(REQ_REP);
  mod_connect(socket, port);
    char filename[512];
    sprintf(filename, "test%d.txt", targ->id);
    FILE *fp = NULL;
    fp = fopen(filename, "w+");
    int size;
    void *recvbuf;
    char sendbuf[512];
    while(true) {
        printf("request: ");
        scanf("%s", sendbuf);
        shm_send(socket, sendbuf, strlen(sendbuf)+1) ;
        shm_recv(socket, &recvbuf, &size);
        printf("reply: %s\n", (char *)recvbuf);
        shm_free(recvbuf);
  for (i = 0; i < scale; i++) {
    sprintf(sendbuf, "thread(%d) %d", targ->id, i);
    }
    shm_close_socket(socket);
    fprintf(fp, "requst:%s\n", sendbuf);
        mod_send(socket, sendbuf, strlen(sendbuf)+1) ;
        mod_recv(socket, &recvbuf, &size);
        fprintf(fp, "reply: %s\n", (char *)recvbuf);
        free(recvbuf);
  }
  fclose(fp);
  mod_close_socket(socket);
  return (void *)i;
}
void multyThreadClient(int port) {
  int status, i = 0, processors = 4;
  void *res[processors];
  Targ *targs = (Targ *)calloc(processors, sizeof(Targ));
  pthread_t tids[processors];
  char sendbuf[512];
  for (i = 0; i < processors; i++) {
    targs[i].port = port;
    targs[i].id = i;
    pthread_create(&tids[i], NULL, threadrun, (void *)&targs[i]);
  }
  for (i = 0; i < processors; i++) {
    if (pthread_join(tids[i], &res[i]) != 0) {
      perror("multyThreadClient pthread_join");
    } else {
      fprintf(stderr, "client(%d) 写入 %ld 条数据\n", i, (long)res[i]);
    }
  }
}
int main(int argc, char *argv[]) {
  shm_init(512);
  int port;
  if (argc < 3) {
       fprintf(stderr, "Usage: reqrep %s|%s <PORT> ...\n", "server", "client");
       return 1;
    fprintf(stderr, "Usage: reqrep %s|%s <PORT> ...\n", "server", "client");
    return 1;
  }
  port = atoi(argv[2]);
  if (strcmp("server", argv[1]) == 0 ) {
     server(port);
  if (strcmp("server", argv[1]) == 0) {
    server(port);
  }
  if (strcmp("client", argv[1]) == 0)
     client(port);
 shm_destroy();
 // fprintf(stderr, "Usage: reqrep %s|%s <URL> ...\n", "server", "client");
    client(port);
  if (strcmp("mclient", argv[1]) == 0)
    multyThreadClient(port);
  shm_destroy();
  // fprintf(stderr, "Usage: reqrep %s|%s <URL> ...\n", "server", "client");
  return 0;
}
test2/test0.txt
New file
Diff too large
test2/test1.txt
New file
Diff too large
test2/test2.txt
New file
Diff too large
test2/test3.txt
New file
Diff too large