wangzhengquan
2020-06-15 73866a4c527cbdf726c5fd824526d5657d0e15ee
update
1个文件已删除
16个文件已添加
4个文件已修改
544 ■■■■■ 已修改文件
bipc/Makefile 33 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
bipc/bipc.c 132 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
bipc/bipc.h 38 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
bipc/core 补丁 | 查看 | 原始文档 | blame | 历史
bipc/test_pubsub 补丁 | 查看 | 原始文档 | blame | 历史
bipc/test_pubsub.c 84 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
bipc/test_pubsub.sh 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
bipc/test_survey 补丁 | 查看 | 原始文档 | blame | 历史
bipc/test_survey.c 96 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
bipc/test_survey.sh 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
bipc/test_survey2 补丁 | 查看 | 原始文档 | blame | 历史
bipc/test_survey2.c 112 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
bipc/test_survey2.sh 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/Makefile 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/core 补丁 | 查看 | 原始文档 | blame | 历史
test/nanomsg/pubsub 补丁 | 查看 | 原始文档 | blame | 历史
test/nanomsg/pubsub.c 11 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/nanomsg/pubsub.sh 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/nng/pubsub 补丁 | 查看 | 原始文档 | blame | 历史
test/nng/pubsub.c 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/nng/pubsub.sh 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
bipc/Makefile
New file
@@ -0,0 +1,33 @@
#
# Makefile for common library.
#
ROOT=..
# LDLIBS+=-Wl,-rpath=
# 海康包路径
# 开源工具包路径
LDDIR +=-L$(ROOT)/lib/nng
# 开源工具包
LDLIBS += -lnng -lpthread
#LIB_NETDISK = $(ROOT)/libnetdisk.a
#DLIB_NETDISK = $(ROOT)/libnetdisk.so
PLATFORM=$(shell $(ROOT)/systype.sh)
include $(ROOT)/Make.defines.$(PLATFORM)
PROGS = test_survey test_survey2 test_pubsub
build: $(PROGS)
test_survey: test_survey.c bipc.c
test_pubsub:  test_pubsub.c bipc.c
clean:
    rm -f $(TEMPFILES) $(PROGS)
include $(ROOT)/Make.common.inc
bipc/bipc.c
New file
@@ -0,0 +1,132 @@
#include "bipc.h"
#include <nng/protocol/pipeline0/pull.h>
#include <nng/protocol/pipeline0/push.h>
#include <nng/protocol/survey0/survey.h>
#include <nng/protocol/survey0/respond.h>
#include <nng/protocol/pubsub0/pub.h>
#include <nng/protocol/pubsub0/sub.h>
#include <nng/protocol/pair0/pair.h>
#include <nng/protocol/bus0/bus.h>
#include <nng/protocol/reqrep0/rep.h>
#include <nng/protocol/reqrep0/req.h>
void
static fatal(const char *func, int rv)
{
        fprintf(stderr, "%s: %s\n", func, nng_strerror(rv));
        exit(1);
}
int  bipc_listen(bipc_socket_t *sock, const char *url, bipc_mod_t mod) {
    int rv;
    switch(mod) {
        case PULL_PUSH:
            rv = nng_pull0_open(sock);
             break;
        case PAIR:
            rv = nng_pair0_open(sock);
            break;
        case BUS:
            rv = nng_bus0_open(sock);
            break;
        case REQ_REP:
            rv = nng_rep0_open(sock);
            break;
        case SURVEY:
            rv = nng_surveyor0_open(sock);
            break;
        case PUB_SUB:
            rv = nng_pub0_open(sock);
            break;
        default:
            fprintf(stderr, "无法识别的模式");
            return -1;
    }
    if (rv != 0) {
       // fatal("open", rv);
        return rv;
    }
    if ((rv = nng_listen(*sock, url, NULL, 0)) != 0) {
       // fatal("nng_listen", rv);
        return rv;
    }
    return 0;
}
int bipc_connect(bipc_socket_t *sock, const char *url, bipc_mod_t mod) {
    int rv  = 0 ;
    switch(mod) {
        case PULL_PUSH:
            rv = nng_push0_open(sock);
            break;
        case PAIR:
            rv = nng_pair0_open(sock);
            break;
        case BUS:
            break;
        case REQ_REP:
            rv = nng_req0_open(sock);
            break;
        case SURVEY:
            rv = nng_respondent0_open(sock);
            break;
        case PUB_SUB:
            if ((rv = nng_sub0_open(sock)) != 0) {
                //fatal("sub0_open", rv);
                return rv;
            }
            if ((rv = nng_setopt(*sock, NNG_OPT_SUB_SUBSCRIBE, "", 0)) != 0) {
                //fatal("nng_setopt", rv);
                return rv;
            }
            break;
        default:
            fprintf(stderr, "无法识别的模式");
            return rv;
    }
    if (rv != 0) {
        fatal("bipc_connect open socket", rv);
        return rv;
    }
    if ((rv = nng_dial(*sock, url, NULL, 0)) != 0) {
       // fatal("dial", rv);
        return rv;
    }
    return 0;
}
int bipc_send(bipc_socket_t *sock,  const void *data, size_t size) {
    return nng_send(*sock, const_cast<void *>(data), size, 0);
}
int bipc_recv(nng_socket *sock,  void *data, size_t *sizep) {
    //int rv = nng_recv(*sock, data, sizep, 0);
    // char *buf = NULL;
    // int rv = nng_recv(*sock, &buf, sizep, NNG_FLAG_ALLOC);
    // memcpy(data, buf, *sizep);
    // nng_free(buf, *sizep);
    int rv = nng_recv(*sock, data, sizep, NNG_FLAG_ALLOC);
    if (rv == NNG_ETIMEDOUT)
        return BIPC_ETIMEDOUT ;
    return rv;
}
void bipc_free(void *ptr, size_t size) {
     nng_free(ptr, size);
}
const char * bipc_strerror(int error) {
    return nng_strerror(error);
}
bipc/bipc.h
New file
@@ -0,0 +1,38 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
#include <nng/nng.h>
enum bipc_mod_t
{
    PULL_PUSH = 1,
    REQ_REP = 2,
    PAIR = 3,
    PUB_SUB = 4,
    SURVEY = 5,
    BUS = 6
};
enum bipc_err {
    BIPC_ETIMEDOUT = 5
};
typedef nng_socket bipc_socket_t;
int bipc_listen(bipc_socket_t *sock, const char *url, bipc_mod_t mod);
int bipc_connect(bipc_socket_t *sock, const char *url, bipc_mod_t mod);
int bipc_send(bipc_socket_t *sock,  const void *data, size_t size);
int bipc_recv(nng_socket *sock,  void *data, size_t *sizep);
void bipc_free(void *ptr, size_t size) ;
const char * bipc_strerror(int error);
bipc/core
Binary files differ
bipc/test_pubsub
Binary files differ
bipc/test_pubsub.c
New file
@@ -0,0 +1,84 @@
#include "bipc.h"
#define SERVER "server"
#define CLIENT "client"
void
fatal(const char *func, int rv)
{
        fprintf(stderr, "%s: %s\n", func, bipc_strerror(rv));
}
char *
date(void)
{
        time_t now = time(&now);
        struct tm *info = localtime(&now);
        char *text = asctime(info);
        text[strlen(text)-1] = '\0'; // remove '\n'
        return (text);
}
int
server(const char *url)
{
        bipc_socket_t sock;
        int rv;
        // if ((rv = nng_pub0_open(&sock)) != 0) {
        //         fatal("nng_pub0_open", rv);
        // }
        if ((rv = bipc_listen(&sock, url, PUB_SUB)) < 0) {
                fatal("nng_listen", rv);
        }
        for (;;) {
                char *d = date();
                printf("SERVER: PUBLISHING DATE %s\n", d);
                if ((rv = bipc_send(&sock, d, strlen(d) + 1)) != 0) {
                        fatal("nng_send", rv);
                }
                sleep(1);
        }
}
int
client(const char *url, const char *name)
{
        bipc_socket_t sock;
        int rv;
        // if ((rv = nng_sub0_open(&sock)) != 0) {
        //         fatal("nng_sub0_open", rv);
        // }
        // // subscribe to everything (empty means all topics)
        // if ((rv = nng_setopt(sock, NNG_OPT_SUB_SUBSCRIBE, "", 0)) != 0) {
        //         fatal("nng_setopt", rv);
        // }
        if ((rv = bipc_connect(&sock, url, PUB_SUB)) != 0) {
                fatal("nng_dial", rv);
        }
        for (;;) {
                char *buf = NULL;
                size_t sz;
                if ((rv = bipc_recv(&sock, &buf, &sz)) != 0) {
                        fatal("nng_recv", rv);
                }
                printf("CLIENT (%s): RECEIVED %s\n", name, buf);
                bipc_free(buf, sz);
        }
}
int
main(const int argc, const char **argv)
{
        if ((argc >= 2) && (strcmp(SERVER, argv[1]) == 0))
                return (server(argv[2]));
          if ((argc >= 3) && (strcmp(CLIENT, argv[1]) == 0))
                return (client (argv[2], argv[3]));
        fprintf(stderr, "Usage: pubsub %s|%s <URL> <ARG> ...\n",
            SERVER, CLIENT);
        return 1;
}
bipc/test_pubsub.sh
New file
@@ -0,0 +1,6 @@
./test_pubsub server ipc:///tmp/test_pubsub.ipc & server=$! && sleep 1
./test_pubsub client ipc:///tmp/test_pubsub.ipc client0 & client0=$!
./test_pubsub client ipc:///tmp/test_pubsub.ipc client1 & client1=$!
./test_pubsub client ipc:///tmp/test_pubsub.ipc client2 & client2=$!
sleep 5
kill $server $client0 $client1 $client2
bipc/test_survey
Binary files differ
bipc/test_survey.c
New file
@@ -0,0 +1,96 @@
#include "bipc.h"
#define SERVER "server"
#define CLIENT "client"
#define DATE   "DATE"
void
fatal(const char *func, int rv)
{
        fprintf(stderr, "%s: %s\n", func, bipc_strerror(rv));
        exit(1);
}
char *
date(void)
{
        time_t now = time(&now);
        struct tm *info = localtime(&now);
        char *text = asctime(info);
        text[strlen(text)-1] = '\0'; // remove '\n'
        return (text);
}
int
server(const char *url)
{
        bipc_socket_t sock;
        int rv;
        if ((rv = bipc_listen(&sock, url, SURVEY)) != 0) {
            fatal("nng_listen", rv);
        }
        for (;;) {
            printf("SERVER: SENDING DATE SURVEY REQUEST\n");
            if ((rv = bipc_send(&sock, DATE, strlen(DATE) + 1)) != 0) {
                fatal("nng_send", rv);
            }
            for (;;) {
                char *buf = NULL;
                size_t sz;
                rv = bipc_recv(&sock, &buf, &sz);
                if (rv == BIPC_ETIMEDOUT) {
                    break;
                }
                if (rv != 0) {
                    fatal("nng_recv", rv);
                }
                printf("SERVER: RECEIVED \"%s\" SURVEY RESPONSE\n", buf);
                bipc_free(buf, sz);
            }
            printf("SERVER: SURVEY COMPLETE\n");
        }
}
int
client(const char *url, const char *name)
{
        bipc_socket_t sock;
        int rv;
        if ((rv = bipc_connect(&sock, url, SURVEY)) != 0) {
            fatal("nng_dial", rv);
        }
        for (;;) {
            char *buf = NULL;
            size_t sz;
            if ((rv = bipc_recv(&sock, &buf, &sz)) == 0) {
                printf("CLIENT (%s): RECEIVED \"%s\" SURVEY REQUEST, length=%d\n", name, buf, sz);
                bipc_free(buf, sz);
                char *d = date();
                printf("CLIENT (%s): SENDING DATE SURVEY RESPONSE\n", name);
                if ((rv = bipc_send(&sock, d, strlen(d) + 1)) != 0) {
                    fatal("nng_send", rv);
                }
            }
        }
}
int
main(const int argc, const char **argv)
{
    if ((argc >= 2) && (strcmp(SERVER, argv[1]) == 0))
        return (server(argv[2]));
    if ((argc >= 3) && (strcmp(CLIENT, argv[1]) == 0))
        return (client(argv[2], argv[3]));
    fprintf(stderr, "Usage: survey %s|%s <URL> <ARG> ...\n",
        SERVER, CLIENT);
    return 1;
}
bipc/test_survey.sh
New file
@@ -0,0 +1,6 @@
./test_survey server ipc:///tmp/test_survey.ipc & server=$!
./test_survey client ipc:///tmp/test_survey.ipc client0 & client0=$!
./test_survey client ipc:///tmp/test_survey.ipc client1 & client1=$!
./test_survey client ipc:///tmp/test_survey.ipc client2 & client2=$!
sleep 3
kill $server $client0 $client1 $client2
bipc/test_survey2
Binary files differ
bipc/test_survey2.c
New file
@@ -0,0 +1,112 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
#include <nng/nng.h>
#include <nng/protocol/survey0/survey.h>
#include <nng/protocol/survey0/respond.h>
#define SERVER "server"
#define CLIENT "client"
char * DATE  = "DATE";
void
fatal(const char *func, int rv)
{
        fprintf(stderr, "%s: %s\n", func, nng_strerror(rv));
        exit(1);
}
char *
date(void)
{
        time_t now = time(&now);
        struct tm *info = localtime(&now);
        char *text = asctime(info);
        text[strlen(text)-1] = '\0'; // remove '\n'
        return (text);
}
int
server(const char *url)
{
        nng_socket sock;
        int rv;
        if ((rv = nng_surveyor0_open(&sock)) != 0) {
                fatal("nng_surveyor0_open", rv);
        }
        if ((rv = nng_listen(sock, url, NULL, 0)) != 0) {
                fatal("nng_listen", rv);
        }
        for (;;) {
                printf("SERVER: SENDING DATE SURVEY REQUEST\n");
                if ((rv = nng_send(sock, DATE, strlen(DATE) + 1, 0)) != 0) {
                        fatal("nng_send", rv);
                }
                for (;;) {
                        char *buf = NULL;
                        size_t sz;
                        rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC);
                        if (rv == NNG_ETIMEDOUT) {
                                break;
                        }
                        if (rv != 0) {
                                fatal("nng_recv", rv);
                        }
                        printf("SERVER: RECEIVED \"%s\" SURVEY RESPONSE\n",
                            buf);
                        nng_free(buf, sz);
                }
                printf("SERVER: SURVEY COMPLETE\n");
        }
}
int
client(const char *url, const char *name)
{
        nng_socket sock;
        int rv;
        if ((rv = nng_respondent0_open(&sock)) != 0) {
                fatal("nng_respondent0_open", rv);
        }
        if ((rv = nng_dial(sock, url, NULL, NNG_FLAG_NONBLOCK)) != 0) {
                fatal("nng_dial", rv);
        }
        for (;;) {
                char *buf = NULL;
                //char buf[1024];
                size_t sz;
                if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) == 0) {
               // if ((rv = nng_recv(sock, buf, &sz, 0)) == 0) {
                        printf("CLIENT (%s): RECEIVED \"%s\" SURVEY REQUEST\n",
                            name, buf);
                       // nng_free(buf, sz);
                        char *d = date();
                        printf("CLIENT (%s): SENDING DATE SURVEY RESPONSE\n",
                           name);
                        if ((rv = nng_send(sock, d, strlen(d) + 1, 0)) != 0) {
                                fatal("nng_send", rv);
                        }
                }
        }
}
int
main(const int argc, const char **argv)
{
        if ((argc >= 2) && (strcmp(SERVER, argv[1]) == 0))
                return (server(argv[2]));
        if ((argc >= 3) && (strcmp(CLIENT, argv[1]) == 0))
                return (client(argv[2], argv[3]));
        fprintf(stderr, "Usage: survey %s|%s <URL> <ARG> ...\n",
            SERVER, CLIENT);
        return 1;
}
bipc/test_survey2.sh
New file
@@ -0,0 +1,6 @@
./test_survey2 server ipc:///tmp/test_survey2.ipc & server=$!
./test_survey2 client ipc:///tmp/test_survey2.ipc client0 & client0=$!
./test_survey2 client ipc:///tmp/test_survey2.ipc client1 & client1=$!
./test_survey2 client ipc:///tmp/test_survey2.ipc client2 & client2=$!
sleep 3
kill $server $client0 $client1 $client2
test/Makefile
@@ -25,7 +25,7 @@
clean:
    rm $(TEMPFILES)
    rm -f $(TEMPFILES)
include $(ROOT)/Make.common.inc
test/core
Binary files differ
test/nanomsg/pubsub
Binary files differ
test/nanomsg/pubsub.c
@@ -6,6 +6,7 @@
#include <nanomsg/nn.h>
#include <nanomsg/pubsub.h>
#include <nanomsg/reqrep.h>
#define SERVER "server"
#define CLIENT "client"
@@ -31,7 +32,7 @@
{
        int sock;
        if ((sock = nn_socket(AF_SP, NN_PUB)) < 0) {
        if ((sock = nn_socket(AF_SP, NN_REQ )) < 0) {
                fatal("nn_socket");
        }
          if (nn_bind(sock, url) < 0) {
@@ -54,14 +55,14 @@
{
        int sock;
        if ((sock = nn_socket(AF_SP, NN_SUB)) < 0) {
        if ((sock = nn_socket(AF_SP, NN_REP)) < 0) {
                fatal("nn_socket");
        }
        // subscribe to everything ("" means all topics)
        if (nn_setsockopt(sock, NN_SUB, NN_SUB_SUBSCRIBE, "", 0) < 0) {
                fatal("nn_setsockopt");
        }
        // if (nn_setsockopt(sock, NN_SUB, NN_SUB_SUBSCRIBE, "sjhdfjsdfh", 0) < 0) {
        //         fatal("nn_setsockopt");
        // }
        if (nn_connect(sock, url) < 0) {
                fatal("nn_connet");
        }
test/nanomsg/pubsub.sh
New file
@@ -0,0 +1,6 @@
./pubsub server ipc:///tmp/pubsub.ipc & server=$! && sleep 1
./pubsub client ipc:///tmp/pubsub.ipc client0 & client0=$!
./pubsub client ipc:///tmp/pubsub.ipc client1 & client1=$!
./pubsub client ipc:///tmp/pubsub.ipc client2 & client2=$!
sleep 5
kill $server $client0 $client1 $client2
test/nng/pubsub
Binary files differ
test/nng/pubsub.c
@@ -32,6 +32,7 @@
{
        nng_socket sock;
        int rv;
        char buf[1024];
        if ((rv = nng_pub0_open(&sock)) != 0) {
                fatal("nng_pub0_open", rv);
@@ -41,8 +42,9 @@
        }
        for (;;) {
                char *d = date();
                snprintf(buf, 1024, "time:%s", d);
                printf("SERVER: PUBLISHING DATE %s\n", d);
                if ((rv = nng_send(sock, d, strlen(d) + 1, 0)) != 0) {
                if ((rv = nng_send(sock, buf, strlen(buf) + 1, 0)) != 0) {
                        fatal("nng_send", rv);
                }
                sleep(1);
@@ -60,7 +62,7 @@
        }
        // subscribe to everything (empty means all topics)
        if ((rv = nng_setopt(sock, NNG_OPT_SUB_SUBSCRIBE, "", 0)) != 0) {
        if ((rv = nng_setopt(sock, NNG_OPT_SUB_SUBSCRIBE, "nnnnnnnnn", 0)) != 0) {
                fatal("nng_setopt", rv);
        }
        if ((rv = nng_dial(sock, url, NULL, 0)) != 0) {
test/nng/pubsub.sh
New file
@@ -0,0 +1,6 @@
./pubsub server ipc:///tmp/pubsub.ipc & server=$! && sleep 1
./pubsub client ipc:///tmp/pubsub.ipc client0 & client0=$!
./pubsub client ipc:///tmp/pubsub.ipc client1 & client1=$!
./pubsub client ipc:///tmp/pubsub.ipc client2 & client2=$!
sleep 5
kill $server $client0 $client1 $client2