wangzhengquan
2020-07-17 c1f1446058dbedd9be9b9561e6ba435e0cd15bbc
update
6个文件已修改
96 ■■■■ 已修改文件
queue/include/lock_free_queue.h 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
queue/include/socket.h 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
queue/libshm_queue.a 补丁 | 查看 | 原始文档 | blame | 历史
queue/socket.c 75 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/communication 补丁 | 查看 | 原始文档 | blame | 历史
test/communication.c 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
queue/include/lock_free_queue.h
@@ -78,7 +78,7 @@
    int items;
   
public:
    int mutex;
    // int mutex;
    LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
    
    /// @brief destructor of the class. 
@@ -151,7 +151,7 @@
// std::cout << "LockFreeQueue init reference=" << reference << std::endl;
    slots = SemUtil::get(IPC_PRIVATE, qsize);
    items = SemUtil::get(IPC_PRIVATE, 0);
    mutex = SemUtil::get(IPC_PRIVATE, 1);
    // mutex = SemUtil::get(IPC_PRIVATE, 1);
}
template <
queue/include/socket.h
@@ -28,8 +28,9 @@
enum shm_msg_type_t
{
    SHM_SOCKET_OPEN = 1,
    SHM_SOCKET_CLOSE = 2,
    SHM_COMMON_MSG = 3
    SHM_SOCKET_OPEN_REPLY = 2,
    SHM_SOCKET_CLOSE = 3,
    SHM_COMMON_MSG = 4
    
};
queue/libshm_queue.a
Binary files differ
queue/socket.c
@@ -6,7 +6,7 @@
void print_msg(char *head, shm_msg_t& msg) {
    err_msg(0, "%s: port=%d, type=%d\n", head, msg.port, msg.type);
    //err_msg(0, "%s: port=%d, type=%d\n", head, msg.port, msg.type);
}
void * _server_run_msg_rev(void* _socket);
@@ -158,7 +158,9 @@
    shm_msg_t src;
    shm_socket_t *client_socket;
    std::map<int, shm_socket_t* >::iterator iter;
    while(socket->queue->pop(src)) {
        switch (src.type) {
            case SHM_SOCKET_OPEN : 
                socket->acceptQueue->push_timeout(src, &timeout);
@@ -167,10 +169,14 @@
                _server_close_conn_to_client(socket, src.port);
                break;
            case SHM_COMMON_MSG :
                iter = socket->clientSocketMap->find(src.port);
    print_msg("_server_run_msg_rev find before", src);
                if( iter !=  socket->clientSocketMap->end()) {
                    client_socket= iter->second;
    print_msg("_server_run_msg_rev push before", src);
                    client_socket->messageQueue->push_timeout(src, &timeout);
    print_msg("_server_run_msg_rev push after", src);
                }
                
                break;
@@ -186,6 +192,10 @@
/**
 * 接受客户端建立新连接的请求
 *
*/
shm_socket_t* shm_accept(shm_socket_t* socket) {
    hashtable_t *hashtable = mm_get_hashtable();
@@ -207,10 +217,29 @@
        socket->clientSocketMap->insert({client_port, client_socket});
        return client_socket;
        /*
         * shm_accept 用户执行的方法 与_server_run_msg_rev在两个不同的限制工作,accept要保证在客户的发送消息之前完成资源的准备工作,以避免出现竞态问题
        */
        //发送open_reply
        struct timespec timeout = {1, 0};
        shm_msg_t msg;
        msg.port = socket->port;
        msg.size = 0;
        msg.type = SHM_SOCKET_OPEN_REPLY;
        if (client_socket->remoteQueue->push_timeout(msg, &timeout) )
        {
            return client_socket;
        } else {
            err_msg(0, "shm_accept: 发送open_reply失败");
            return NULL;
        }
    } else {
        err_exit(errno, "shm_accept");
    }
    return NULL;
    
}
@@ -230,17 +259,32 @@
    }
    socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16);
    socket->remoteQueue = new SHMQueue<shm_msg_t>(port, 0);
    socket->remoteQueue = _attach_remote_queue(port);
    socket->messageQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16); 
    //发送open请求
    struct timespec timeout = {1, 0};
    shm_msg_t msg;
    msg.port = socket->port;
    msg.size = 0;
    msg.type=SHM_SOCKET_OPEN;
    socket->remoteQueue->push_timeout(msg, &timeout);
    shm_msg_t open_msg;
    open_msg.port = socket->port;
    open_msg.size = 0;
    open_msg.type=SHM_SOCKET_OPEN;
    socket->remoteQueue->push_timeout(open_msg, &timeout);
    //接受open reply
    if(socket->queue->pop(msg)) {
        // 在这里server端已经准备好接受客户端发送请求了
        if(msg.type == SHM_SOCKET_OPEN_REPLY) {
    pthread_create(&(socket->dispatch_thread), NULL, _client_run_msg_rev , (void *)socket);
            pthread_create(&(socket->dispatch_thread), NULL, _client_run_msg_rev , (void *)socket);
        } else {
            err_exit(0, "shm_connect: 不匹配的应答信息!");
        }
    } else {
        err_exit(0, "connect failted!");
    }
    return 0;
}
@@ -288,7 +332,14 @@
    dest.size = size;
    dest.buf = mm_malloc(size);
    memcpy(dest.buf, buf, size);
    // struct timeval time;
    // gettimeofday(&time, NULL);
//err_msg(0, "%d %d=======>push befor %d", time.tv_sec, time.tv_usec, socket->port);
    if(socket->remoteQueue->push(dest)) {
        //gettimeofday(&time, NULL);
//err_msg(0, "%d %d=======>push after %d", time.tv_sec, time.tv_usec, socket->port);
        return 0;
    } else {
        err_msg(errno, "connection has been closed!");
@@ -300,7 +351,13 @@
int shm_recv(shm_socket_t* socket, void **buf, int *size) {
    shm_msg_t src;
//     struct timeval time;
//     gettimeofday(&time, NULL);
// err_msg(0, "%d %d=======>pop befor %d", time.tv_sec, time.tv_usec, socket->port);
    if (socket->messageQueue->pop(src)) {
// gettimeofday(&time, NULL);
// err_msg(0, "%d %d=======>pop after %d", time.tv_sec, time.tv_usec, socket->port);
        void * _buf = malloc(src.size);
        memcpy(_buf, src.buf, src.size);
        *buf = _buf;
test/communication
Binary files differ
test/communication.c
@@ -27,7 +27,7 @@
    shm_socket_t *client_socket;
    while(true) {
        client_socket = shm_accept(socket);
printf("server messageQueue = %p\n", client_socket->messageQueue);
// printf("server messageQueue = %p\n", client_socket->messageQueue);
        pthread_create(&tid, NULL, precess_client , (void *)client_socket);
    }
@@ -56,6 +56,7 @@
    
    int size;
    void *recvbuf;
    printf("requst:%s\n", sendbuf);
    shm_send(socket, sendbuf, strlen(sendbuf)+1) ;
    shm_recv(socket, &recvbuf, &size);
    printf("reply: %s\n", (char *)recvbuf);
@@ -103,10 +104,10 @@
    int scale = 100000;
    int i;
    shm_socket_t *socket = shm_open_socket();
    shm_connect(socket, port);
    for( i = 0; i<scale; i++) {
        sprintf(sendbuf, "processor(%d) %d", targ->id, i);
        sprintf(sendbuf, "thread(%d) %d", targ->id, i);
        client_send(socket, sendbuf);
    }
    shm_close_socket(socket);
@@ -115,7 +116,7 @@
void multyThreadClient(int port) {
    int status, i = 0, processors = 2;
    int status, i = 0, processors = 4;
    void *res[processors];
    Targ *targs= (Targ*)calloc(processors, sizeof(Targ));
    pthread_t tids[processors];
@@ -157,7 +158,8 @@
 if (strcmp("mclient", argv[1]) == 0)
     multyThreadClient(port);
 shm_destroy();
  shm_destroy();
 // fprintf(stderr, "Usage: reqrep %s|%s <URL> ...\n", "server", "client");
  return 0;
}