From 73866a4c527cbdf726c5fd824526d5657d0e15ee Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期一, 15 六月 2020 19:47:13 +0800 Subject: [PATCH] update --- test/nng/pubsub.c | 6 bipc/test_pubsub.sh | 6 test/nanomsg/pubsub | 0 bipc/bipc.h | 38 +++ test/nanomsg/pubsub.sh | 6 bipc/test_survey2 | 0 bipc/Makefile | 33 +++ bipc/bipc.c | 132 +++++++++++++ bipc/core | 0 bipc/test_survey2.sh | 6 bipc/test_survey | 0 bipc/test_pubsub | 0 /dev/null | 0 test/Makefile | 2 bipc/test_survey.sh | 6 bipc/test_survey2.c | 112 +++++++++++ bipc/test_pubsub.c | 84 ++++++++ test/nng/pubsub.sh | 6 bipc/test_survey.c | 96 +++++++++ test/nanomsg/pubsub.c | 11 test/nng/pubsub | 0 21 files changed, 536 insertions(+), 8 deletions(-) diff --git a/bipc/Makefile b/bipc/Makefile new file mode 100644 index 0000000..db7b85b --- /dev/null +++ b/bipc/Makefile @@ -0,0 +1,33 @@ +# +# Makefile for common library. +# +ROOT=.. +# LDLIBS+=-Wl,-rpath= +# 娴峰悍鍖呰矾寰� +# 寮�婧愬伐鍏峰寘璺緞 +LDDIR +=-L$(ROOT)/lib/nng + +# 寮�婧愬伐鍏峰寘 +LDLIBS += -lnng -lpthread + +#LIB_NETDISK = $(ROOT)/libnetdisk.a +#DLIB_NETDISK = $(ROOT)/libnetdisk.so +PLATFORM=$(shell $(ROOT)/systype.sh) +include $(ROOT)/Make.defines.$(PLATFORM) + + +PROGS = test_survey test_survey2 test_pubsub + + +build: $(PROGS) + + +test_survey: test_survey.c bipc.c + +test_pubsub: test_pubsub.c bipc.c + +clean: + rm -f $(TEMPFILES) $(PROGS) + + +include $(ROOT)/Make.common.inc diff --git a/bipc/bipc.c b/bipc/bipc.c new file mode 100644 index 0000000..a1918c7 --- /dev/null +++ b/bipc/bipc.c @@ -0,0 +1,132 @@ +#include "bipc.h" +#include <nng/protocol/pipeline0/pull.h> +#include <nng/protocol/pipeline0/push.h> +#include <nng/protocol/survey0/survey.h> +#include <nng/protocol/survey0/respond.h> +#include <nng/protocol/pubsub0/pub.h> +#include <nng/protocol/pubsub0/sub.h> +#include <nng/protocol/pair0/pair.h> +#include <nng/protocol/bus0/bus.h> +#include <nng/protocol/reqrep0/rep.h> +#include <nng/protocol/reqrep0/req.h> + +void +static fatal(const char *func, int rv) +{ + fprintf(stderr, "%s: %s\n", func, nng_strerror(rv)); + exit(1); +} + +int bipc_listen(bipc_socket_t *sock, const char *url, bipc_mod_t mod) { + + int rv; + switch(mod) { + case PULL_PUSH: + rv = nng_pull0_open(sock); + break; + case PAIR: + rv = nng_pair0_open(sock); + break; + case BUS: + rv = nng_bus0_open(sock); + break; + case REQ_REP: + rv = nng_rep0_open(sock); + break; + case SURVEY: + rv = nng_surveyor0_open(sock); + break; + case PUB_SUB: + rv = nng_pub0_open(sock); + break; + default: + fprintf(stderr, "鏃犳硶璇嗗埆鐨勬ā寮�"); + return -1; + } + + if (rv != 0) { + // fatal("open", rv); + return rv; + } + + if ((rv = nng_listen(*sock, url, NULL, 0)) != 0) { + // fatal("nng_listen", rv); + return rv; + } + return 0; + +} + +int bipc_connect(bipc_socket_t *sock, const char *url, bipc_mod_t mod) { + int rv = 0 ; + switch(mod) { + case PULL_PUSH: + rv = nng_push0_open(sock); + break; + case PAIR: + rv = nng_pair0_open(sock); + break; + case BUS: + break; + case REQ_REP: + rv = nng_req0_open(sock); + break; + case SURVEY: + rv = nng_respondent0_open(sock); + break; + case PUB_SUB: + if ((rv = nng_sub0_open(sock)) != 0) { + //fatal("sub0_open", rv); + return rv; + } + if ((rv = nng_setopt(*sock, NNG_OPT_SUB_SUBSCRIBE, "", 0)) != 0) { + //fatal("nng_setopt", rv); + return rv; + } + break; + default: + fprintf(stderr, "鏃犳硶璇嗗埆鐨勬ā寮�"); + return rv; + } + + if (rv != 0) { + fatal("bipc_connect open socket", rv); + return rv; + } + + if ((rv = nng_dial(*sock, url, NULL, 0)) != 0) { + // fatal("dial", rv); + return rv; + } + return 0; +} + +int bipc_send(bipc_socket_t *sock, const void *data, size_t size) { + return nng_send(*sock, const_cast<void *>(data), size, 0); +} + + +int bipc_recv(nng_socket *sock, void *data, size_t *sizep) { + + //int rv = nng_recv(*sock, data, sizep, 0); + // char *buf = NULL; + // int rv = nng_recv(*sock, &buf, sizep, NNG_FLAG_ALLOC); + // memcpy(data, buf, *sizep); + // nng_free(buf, *sizep); + int rv = nng_recv(*sock, data, sizep, NNG_FLAG_ALLOC); + if (rv == NNG_ETIMEDOUT) + return BIPC_ETIMEDOUT ; + + return rv; +} + + +void bipc_free(void *ptr, size_t size) { + nng_free(ptr, size); +} + + + +const char * bipc_strerror(int error) { + return nng_strerror(error); +} \ No newline at end of file diff --git a/bipc/bipc.h b/bipc/bipc.h new file mode 100644 index 0000000..9dcdc7b --- /dev/null +++ b/bipc/bipc.h @@ -0,0 +1,38 @@ +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <time.h> +#include <unistd.h> + +#include <nng/nng.h> + + +enum bipc_mod_t +{ + PULL_PUSH = 1, + REQ_REP = 2, + PAIR = 3, + PUB_SUB = 4, + SURVEY = 5, + BUS = 6 + +}; + +enum bipc_err { + BIPC_ETIMEDOUT = 5 +}; + + +typedef nng_socket bipc_socket_t; + +int bipc_listen(bipc_socket_t *sock, const char *url, bipc_mod_t mod); + +int bipc_connect(bipc_socket_t *sock, const char *url, bipc_mod_t mod); + +int bipc_send(bipc_socket_t *sock, const void *data, size_t size); + +int bipc_recv(nng_socket *sock, void *data, size_t *sizep); + +void bipc_free(void *ptr, size_t size) ; + +const char * bipc_strerror(int error); \ No newline at end of file diff --git a/bipc/core b/bipc/core new file mode 100644 index 0000000..4042d75 --- /dev/null +++ b/bipc/core Binary files differ diff --git a/bipc/test_pubsub b/bipc/test_pubsub new file mode 100755 index 0000000..eab45e6 --- /dev/null +++ b/bipc/test_pubsub Binary files differ diff --git a/bipc/test_pubsub.c b/bipc/test_pubsub.c new file mode 100644 index 0000000..4f57719 --- /dev/null +++ b/bipc/test_pubsub.c @@ -0,0 +1,84 @@ +#include "bipc.h" + +#define SERVER "server" +#define CLIENT "client" + +void +fatal(const char *func, int rv) +{ + fprintf(stderr, "%s: %s\n", func, bipc_strerror(rv)); +} + +char * +date(void) +{ + time_t now = time(&now); + struct tm *info = localtime(&now); + char *text = asctime(info); + text[strlen(text)-1] = '\0'; // remove '\n' + return (text); +} + +int +server(const char *url) +{ + bipc_socket_t sock; + int rv; + + // if ((rv = nng_pub0_open(&sock)) != 0) { + // fatal("nng_pub0_open", rv); + // } + if ((rv = bipc_listen(&sock, url, PUB_SUB)) < 0) { + fatal("nng_listen", rv); + } + for (;;) { + char *d = date(); + printf("SERVER: PUBLISHING DATE %s\n", d); + if ((rv = bipc_send(&sock, d, strlen(d) + 1)) != 0) { + fatal("nng_send", rv); + } + sleep(1); + } +} + +int +client(const char *url, const char *name) +{ + bipc_socket_t sock; + int rv; + + // if ((rv = nng_sub0_open(&sock)) != 0) { + // fatal("nng_sub0_open", rv); + // } + + // // subscribe to everything (empty means all topics) + // if ((rv = nng_setopt(sock, NNG_OPT_SUB_SUBSCRIBE, "", 0)) != 0) { + // fatal("nng_setopt", rv); + // } + if ((rv = bipc_connect(&sock, url, PUB_SUB)) != 0) { + fatal("nng_dial", rv); + } + for (;;) { + char *buf = NULL; + size_t sz; + if ((rv = bipc_recv(&sock, &buf, &sz)) != 0) { + fatal("nng_recv", rv); + } + printf("CLIENT (%s): RECEIVED %s\n", name, buf); + bipc_free(buf, sz); + } +} + +int +main(const int argc, const char **argv) +{ + if ((argc >= 2) && (strcmp(SERVER, argv[1]) == 0)) + return (server(argv[2])); + + if ((argc >= 3) && (strcmp(CLIENT, argv[1]) == 0)) + return (client (argv[2], argv[3])); + + fprintf(stderr, "Usage: pubsub %s|%s <URL> <ARG> ...\n", + SERVER, CLIENT); + return 1; +} \ No newline at end of file diff --git a/bipc/test_pubsub.sh b/bipc/test_pubsub.sh new file mode 100644 index 0000000..ab8134c --- /dev/null +++ b/bipc/test_pubsub.sh @@ -0,0 +1,6 @@ +./test_pubsub server ipc:///tmp/test_pubsub.ipc & server=$! && sleep 1 +./test_pubsub client ipc:///tmp/test_pubsub.ipc client0 & client0=$! +./test_pubsub client ipc:///tmp/test_pubsub.ipc client1 & client1=$! +./test_pubsub client ipc:///tmp/test_pubsub.ipc client2 & client2=$! +sleep 5 +kill $server $client0 $client1 $client2 \ No newline at end of file diff --git a/bipc/test_survey b/bipc/test_survey new file mode 100755 index 0000000..d577d43 --- /dev/null +++ b/bipc/test_survey Binary files differ diff --git a/bipc/test_survey.c b/bipc/test_survey.c new file mode 100644 index 0000000..40d6ad0 --- /dev/null +++ b/bipc/test_survey.c @@ -0,0 +1,96 @@ +#include "bipc.h" + +#define SERVER "server" +#define CLIENT "client" +#define DATE "DATE" + + + +void +fatal(const char *func, int rv) +{ + fprintf(stderr, "%s: %s\n", func, bipc_strerror(rv)); + exit(1); +} + +char * +date(void) +{ + time_t now = time(&now); + struct tm *info = localtime(&now); + char *text = asctime(info); + text[strlen(text)-1] = '\0'; // remove '\n' + return (text); +} + +int +server(const char *url) +{ + bipc_socket_t sock; + int rv; + + + if ((rv = bipc_listen(&sock, url, SURVEY)) != 0) { + fatal("nng_listen", rv); + } + for (;;) { + printf("SERVER: SENDING DATE SURVEY REQUEST\n"); + if ((rv = bipc_send(&sock, DATE, strlen(DATE) + 1)) != 0) { + fatal("nng_send", rv); + } + + for (;;) { + char *buf = NULL; + size_t sz; + rv = bipc_recv(&sock, &buf, &sz); + if (rv == BIPC_ETIMEDOUT) { + break; + } + if (rv != 0) { + fatal("nng_recv", rv); + } + printf("SERVER: RECEIVED \"%s\" SURVEY RESPONSE\n", buf); + bipc_free(buf, sz); + } + + printf("SERVER: SURVEY COMPLETE\n"); + } +} + +int +client(const char *url, const char *name) +{ + bipc_socket_t sock; + int rv; + + if ((rv = bipc_connect(&sock, url, SURVEY)) != 0) { + fatal("nng_dial", rv); + } + for (;;) { + char *buf = NULL; + size_t sz; + if ((rv = bipc_recv(&sock, &buf, &sz)) == 0) { + printf("CLIENT (%s): RECEIVED \"%s\" SURVEY REQUEST, length=%d\n", name, buf, sz); + bipc_free(buf, sz); + char *d = date(); + printf("CLIENT (%s): SENDING DATE SURVEY RESPONSE\n", name); + if ((rv = bipc_send(&sock, d, strlen(d) + 1)) != 0) { + fatal("nng_send", rv); + } + } + } +} + +int +main(const int argc, const char **argv) +{ + if ((argc >= 2) && (strcmp(SERVER, argv[1]) == 0)) + return (server(argv[2])); + + if ((argc >= 3) && (strcmp(CLIENT, argv[1]) == 0)) + return (client(argv[2], argv[3])); + + fprintf(stderr, "Usage: survey %s|%s <URL> <ARG> ...\n", + SERVER, CLIENT); + return 1; +} \ No newline at end of file diff --git a/bipc/test_survey.sh b/bipc/test_survey.sh new file mode 100755 index 0000000..58358c4 --- /dev/null +++ b/bipc/test_survey.sh @@ -0,0 +1,6 @@ +./test_survey server ipc:///tmp/test_survey.ipc & server=$! +./test_survey client ipc:///tmp/test_survey.ipc client0 & client0=$! +./test_survey client ipc:///tmp/test_survey.ipc client1 & client1=$! +./test_survey client ipc:///tmp/test_survey.ipc client2 & client2=$! +sleep 3 +kill $server $client0 $client1 $client2 \ No newline at end of file diff --git a/bipc/test_survey2 b/bipc/test_survey2 new file mode 100755 index 0000000..b5439a7 --- /dev/null +++ b/bipc/test_survey2 Binary files differ diff --git a/bipc/test_survey2.c b/bipc/test_survey2.c new file mode 100644 index 0000000..7c142a2 --- /dev/null +++ b/bipc/test_survey2.c @@ -0,0 +1,112 @@ +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <time.h> +#include <unistd.h> + +#include <nng/nng.h> +#include <nng/protocol/survey0/survey.h> +#include <nng/protocol/survey0/respond.h> + +#define SERVER "server" +#define CLIENT "client" +char * DATE = "DATE"; + +void +fatal(const char *func, int rv) +{ + fprintf(stderr, "%s: %s\n", func, nng_strerror(rv)); + exit(1); +} + +char * +date(void) +{ + time_t now = time(&now); + struct tm *info = localtime(&now); + char *text = asctime(info); + text[strlen(text)-1] = '\0'; // remove '\n' + return (text); +} + +int +server(const char *url) +{ + nng_socket sock; + int rv; + + if ((rv = nng_surveyor0_open(&sock)) != 0) { + fatal("nng_surveyor0_open", rv); + } + if ((rv = nng_listen(sock, url, NULL, 0)) != 0) { + fatal("nng_listen", rv); + } + for (;;) { + printf("SERVER: SENDING DATE SURVEY REQUEST\n"); + if ((rv = nng_send(sock, DATE, strlen(DATE) + 1, 0)) != 0) { + fatal("nng_send", rv); + } + + for (;;) { + char *buf = NULL; + size_t sz; + rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC); + if (rv == NNG_ETIMEDOUT) { + break; + } + if (rv != 0) { + fatal("nng_recv", rv); + } + printf("SERVER: RECEIVED \"%s\" SURVEY RESPONSE\n", + buf); + nng_free(buf, sz); + } + + printf("SERVER: SURVEY COMPLETE\n"); + } +} + +int +client(const char *url, const char *name) +{ + nng_socket sock; + int rv; + + if ((rv = nng_respondent0_open(&sock)) != 0) { + fatal("nng_respondent0_open", rv); + } + if ((rv = nng_dial(sock, url, NULL, NNG_FLAG_NONBLOCK)) != 0) { + fatal("nng_dial", rv); + } + for (;;) { + char *buf = NULL; + //char buf[1024]; + size_t sz; + if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) == 0) { + // if ((rv = nng_recv(sock, buf, &sz, 0)) == 0) { + printf("CLIENT (%s): RECEIVED \"%s\" SURVEY REQUEST\n", + name, buf); + // nng_free(buf, sz); + char *d = date(); + printf("CLIENT (%s): SENDING DATE SURVEY RESPONSE\n", + name); + if ((rv = nng_send(sock, d, strlen(d) + 1, 0)) != 0) { + fatal("nng_send", rv); + } + } + } +} + +int +main(const int argc, const char **argv) +{ + if ((argc >= 2) && (strcmp(SERVER, argv[1]) == 0)) + return (server(argv[2])); + + if ((argc >= 3) && (strcmp(CLIENT, argv[1]) == 0)) + return (client(argv[2], argv[3])); + + fprintf(stderr, "Usage: survey %s|%s <URL> <ARG> ...\n", + SERVER, CLIENT); + return 1; +} \ No newline at end of file diff --git a/bipc/test_survey2.sh b/bipc/test_survey2.sh new file mode 100755 index 0000000..618d19f --- /dev/null +++ b/bipc/test_survey2.sh @@ -0,0 +1,6 @@ +./test_survey2 server ipc:///tmp/test_survey2.ipc & server=$! +./test_survey2 client ipc:///tmp/test_survey2.ipc client0 & client0=$! +./test_survey2 client ipc:///tmp/test_survey2.ipc client1 & client1=$! +./test_survey2 client ipc:///tmp/test_survey2.ipc client2 & client2=$! +sleep 3 +kill $server $client0 $client1 $client2 \ No newline at end of file diff --git a/test/Makefile b/test/Makefile index a002692..7dafde0 100755 --- a/test/Makefile +++ b/test/Makefile @@ -25,7 +25,7 @@ clean: - rm $(TEMPFILES) + rm -f $(TEMPFILES) include $(ROOT)/Make.common.inc diff --git a/test/core b/test/core deleted file mode 100644 index fe39cd2..0000000 --- a/test/core +++ /dev/null Binary files differ diff --git a/test/nanomsg/pubsub b/test/nanomsg/pubsub index 11b7616..8b10b19 100755 --- a/test/nanomsg/pubsub +++ b/test/nanomsg/pubsub Binary files differ diff --git a/test/nanomsg/pubsub.c b/test/nanomsg/pubsub.c index a8f2702..048d221 100644 --- a/test/nanomsg/pubsub.c +++ b/test/nanomsg/pubsub.c @@ -6,6 +6,7 @@ #include <nanomsg/nn.h> #include <nanomsg/pubsub.h> +#include <nanomsg/reqrep.h> #define SERVER "server" #define CLIENT "client" @@ -31,7 +32,7 @@ { int sock; - if ((sock = nn_socket(AF_SP, NN_PUB)) < 0) { + if ((sock = nn_socket(AF_SP, NN_REQ )) < 0) { fatal("nn_socket"); } if (nn_bind(sock, url) < 0) { @@ -54,14 +55,14 @@ { int sock; - if ((sock = nn_socket(AF_SP, NN_SUB)) < 0) { + if ((sock = nn_socket(AF_SP, NN_REP)) < 0) { fatal("nn_socket"); } // subscribe to everything ("" means all topics) - if (nn_setsockopt(sock, NN_SUB, NN_SUB_SUBSCRIBE, "", 0) < 0) { - fatal("nn_setsockopt"); - } + // if (nn_setsockopt(sock, NN_SUB, NN_SUB_SUBSCRIBE, "sjhdfjsdfh", 0) < 0) { + // fatal("nn_setsockopt"); + // } if (nn_connect(sock, url) < 0) { fatal("nn_connet"); } diff --git a/test/nanomsg/pubsub.sh b/test/nanomsg/pubsub.sh new file mode 100755 index 0000000..c3f1c59 --- /dev/null +++ b/test/nanomsg/pubsub.sh @@ -0,0 +1,6 @@ +./pubsub server ipc:///tmp/pubsub.ipc & server=$! && sleep 1 +./pubsub client ipc:///tmp/pubsub.ipc client0 & client0=$! +./pubsub client ipc:///tmp/pubsub.ipc client1 & client1=$! +./pubsub client ipc:///tmp/pubsub.ipc client2 & client2=$! +sleep 5 +kill $server $client0 $client1 $client2 diff --git a/test/nng/pubsub b/test/nng/pubsub new file mode 100755 index 0000000..76667af --- /dev/null +++ b/test/nng/pubsub Binary files differ diff --git a/test/nng/pubsub.c b/test/nng/pubsub.c index 794cd6c..c45476b 100644 --- a/test/nng/pubsub.c +++ b/test/nng/pubsub.c @@ -32,6 +32,7 @@ { nng_socket sock; int rv; + char buf[1024]; if ((rv = nng_pub0_open(&sock)) != 0) { fatal("nng_pub0_open", rv); @@ -41,8 +42,9 @@ } for (;;) { char *d = date(); + snprintf(buf, 1024, "time:%s", d); printf("SERVER: PUBLISHING DATE %s\n", d); - if ((rv = nng_send(sock, d, strlen(d) + 1, 0)) != 0) { + if ((rv = nng_send(sock, buf, strlen(buf) + 1, 0)) != 0) { fatal("nng_send", rv); } sleep(1); @@ -60,7 +62,7 @@ } // subscribe to everything (empty means all topics) - if ((rv = nng_setopt(sock, NNG_OPT_SUB_SUBSCRIBE, "", 0)) != 0) { + if ((rv = nng_setopt(sock, NNG_OPT_SUB_SUBSCRIBE, "nnnnnnnnn", 0)) != 0) { fatal("nng_setopt", rv); } if ((rv = nng_dial(sock, url, NULL, 0)) != 0) { diff --git a/test/nng/pubsub.sh b/test/nng/pubsub.sh new file mode 100755 index 0000000..569d384 --- /dev/null +++ b/test/nng/pubsub.sh @@ -0,0 +1,6 @@ +./pubsub server ipc:///tmp/pubsub.ipc & server=$! && sleep 1 +./pubsub client ipc:///tmp/pubsub.ipc client0 & client0=$! +./pubsub client ipc:///tmp/pubsub.ipc client1 & client1=$! +./pubsub client ipc:///tmp/pubsub.ipc client2 & client2=$! +sleep 5 +kill $server $client0 $client1 $client2 \ No newline at end of file -- Gitblit v1.8.0