From 27a32410481fc10e789315b3a1dab88a33020270 Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期二, 16 六月 2020 15:45:37 +0800 Subject: [PATCH] finished bipc --- bipc/test_pullpush.sh | 4 bipc/test_reqrep.sh | 3 bipc/test_pair.sh | 4 bipc/bipc.h | 29 +++ bipc/test_reqrep.c | 92 +++++++++++ bipc/bipc.c | 14 + service/netdisk_service | 0 bipc/test_survey | 0 bipc/test_bus.c | 74 +++++++++ service/test_client | 0 device/libnetdisk.a | 0 bipc/test_pair.c | 93 +++++++++++ bipc/test_pullpush.c | 71 ++++++++ service/test_queue | 0 bipc/test_survey.c | 46 ++-- bipc/test_pair | 0 bipc/test_bus.sh | 6 bipc/test_pullpush | 0 bipc/Makefile | 10 + service/test_properties | 0 bipc/test_pubsub | 0 /dev/null | 0 bipc/test_reqrep | 0 bipc/test_bus | 0 service/test | 0 25 files changed, 419 insertions(+), 27 deletions(-) diff --git a/bipc/Makefile b/bipc/Makefile index db7b85b..d7e7051 100644 --- a/bipc/Makefile +++ b/bipc/Makefile @@ -16,7 +16,7 @@ include $(ROOT)/Make.defines.$(PLATFORM) -PROGS = test_survey test_survey2 test_pubsub +PROGS = test_survey test_survey2 test_pubsub test_pullpush test_pair test_bus test_reqrep build: $(PROGS) @@ -26,6 +26,14 @@ test_pubsub: test_pubsub.c bipc.c +test_pullpush: test_pullpush.c bipc.c + +test_pair: test_pair.c bipc.c + +test_bus: test_bus.c bipc.c + +test_reqrep: test_reqrep.c bipc.c + clean: rm -f $(TEMPFILES) $(PROGS) diff --git a/bipc/bipc.c b/bipc/bipc.c index a1918c7..a7ae9e0 100644 --- a/bipc/bipc.c +++ b/bipc/bipc.c @@ -106,7 +106,7 @@ } -int bipc_recv(nng_socket *sock, void *data, size_t *sizep) { +int bipc_recv(bipc_socket_t *sock, void *data, size_t *sizep) { //int rv = nng_recv(*sock, data, sizep, 0); // char *buf = NULL; @@ -120,12 +120,24 @@ return rv; } +int bipc_setopt(bipc_socket_t *s, const char *opt, const void *val, size_t valsz) { + const char *tmp_opt; + if(strcmp(opt, BIPC_OPT_RECVTIMEO) == 0) { + tmp_opt = NNG_OPT_RECVTIMEO; + } + return nng_setopt(*s, tmp_opt, val, valsz); +} void bipc_free(void *ptr, size_t size) { nng_free(ptr, size); } +int bipc_close(bipc_socket_t *s){ + return nng_close(*s); +} + + const char * bipc_strerror(int error) { return nng_strerror(error); diff --git a/bipc/bipc.h b/bipc/bipc.h index 9dcdc7b..9ae0994 100644 --- a/bipc/bipc.h +++ b/bipc/bipc.h @@ -23,6 +23,27 @@ }; +#define BIPC_OPT_SOCKNAME "socket-name" +#define BIPC_OPT_RAW "raw" +#define BIPC_OPT_PROTO "protocol" +#define BIPC_OPT_PROTONAME "protocol-name" +#define BIPC_OPT_PEER "peer" +#define BIPC_OPT_PEERNAME "peer-name" +#define BIPC_OPT_RECVBUF "recv-buffer" +#define BIPC_OPT_SENDBUF "send-buffer" +#define BIPC_OPT_RECVFD "recv-fd" +#define BIPC_OPT_SENDFD "send-fd" +#define BIPC_OPT_RECVTIMEO "recv-timeout" +#define BIPC_OPT_SENDTIMEO "send-timeout" +#define BIPC_OPT_LOCADDR "local-address" +#define BIPC_OPT_REMADDR "remote-address" +#define BIPC_OPT_URL "url" +#define BIPC_OPT_MAXTTL "ttl-max" +#define BIPC_OPT_RECVMAXSZ "recv-size-max" +#define BIPC_OPT_RECONNMINT "reconnect-time-min" +#define BIPC_OPT_RECONNMAXT "reconnect-time-max" + + typedef nng_socket bipc_socket_t; int bipc_listen(bipc_socket_t *sock, const char *url, bipc_mod_t mod); @@ -31,8 +52,12 @@ int bipc_send(bipc_socket_t *sock, const void *data, size_t size); -int bipc_recv(nng_socket *sock, void *data, size_t *sizep); +int bipc_recv(bipc_socket_t *sock, void *data, size_t *sizep); -void bipc_free(void *ptr, size_t size) ; +int bipc_setopt(bipc_socket_t *s, const char *opt, const void *val, size_t valsz); + +void bipc_free(void *ptr, size_t size); + +int bipc_close(bipc_socket_t *s); const char * bipc_strerror(int error); \ No newline at end of file diff --git a/bipc/core b/bipc/core deleted file mode 100644 index 4042d75..0000000 --- a/bipc/core +++ /dev/null Binary files differ diff --git a/bipc/test_bus b/bipc/test_bus new file mode 100755 index 0000000..567d137 --- /dev/null +++ b/bipc/test_bus Binary files differ diff --git a/bipc/test_bus.c b/bipc/test_bus.c new file mode 100644 index 0000000..a6189b4 --- /dev/null +++ b/bipc/test_bus.c @@ -0,0 +1,74 @@ +#include "bipc.h" + + +void +fatal(const char *func, int rv) +{ + fprintf(stderr, "%s: %s\n", func, bipc_strerror(rv)); + exit(1); +} + +int +node(int argc, char **argv) +{ + bipc_socket_t sock; + int rv; + size_t sz; + + + if ((rv = bipc_listen(&sock, argv[2], BUS)) != 0) + { + fatal("nng_listen", rv); + } + + sleep(1); // wait for peers to bind + if (argc >= 3) + { + for (int x = 3; x < argc; x++) + { + if ((rv = bipc_connect(&sock, argv[x], BUS)) != 0) + { + fatal("nng_dial", rv); + } + } + } + + sleep(1); // wait for connects to establish + + // SEND + sz = strlen(argv[1]) + 1; // '\0' too + printf("%s: SENDING '%s' ONTO BUS\n", argv[1], argv[1]); + if ((rv = bipc_send(&sock, argv[1], sz)) != 0) + { + fatal("nng_send", rv); + } + + // RECV + for (;;) + { + char *buf = NULL; + size_t sz; + if ((rv = bipc_recv(&sock, &buf, &sz)) != 0) + { + if (rv == BIPC_ETIMEDOUT) + { + fatal("nng_recv", rv); + } + } + printf("%s: RECEIVED '%s' FROM BUS\n", argv[1], buf); + bipc_free(buf, sz); + } + bipc_close(&sock); + return (0); +} + +int +main(int argc, char **argv) +{ + if (argc >= 3) + { + return (node(argc, argv)); + } + fprintf(stderr, "Usage: bus <NODE_NAME> <URL> <URL> ...\n"); + return 1; +} diff --git a/bipc/test_bus.sh b/bipc/test_bus.sh new file mode 100755 index 0000000..abbea42 --- /dev/null +++ b/bipc/test_bus.sh @@ -0,0 +1,6 @@ +./test_bus node0 ipc:///tmp/node0.ipc ipc:///tmp/node1.ipc ipc:///tmp/node2.ipc & node0=$! +./test_bus node1 ipc:///tmp/node1.ipc ipc:///tmp/node2.ipc ipc:///tmp/node3.ipc & node1=$! +./test_bus node2 ipc:///tmp/node2.ipc ipc:///tmp/node3.ipc & node2=$! +./test_bus node3 ipc:///tmp/node3.ipc ipc:///tmp/node0.ipc & node3=$! +sleep 5 +kill $node0 $node1 $node2 $node3 \ No newline at end of file diff --git a/bipc/test_pair b/bipc/test_pair new file mode 100755 index 0000000..2b3d903 --- /dev/null +++ b/bipc/test_pair Binary files differ diff --git a/bipc/test_pair.c b/bipc/test_pair.c new file mode 100644 index 0000000..8ed4a41 --- /dev/null +++ b/bipc/test_pair.c @@ -0,0 +1,93 @@ +#include "bipc.h" + +#define NODE0 "node0" +#define NODE1 "node1" + +void +fatal(const char *func, int rv) +{ + fprintf(stderr, "%s: %s\n", func, bipc_strerror(rv)); + exit(1); +} + +int +send_name(bipc_socket_t sock, const char *name) +{ + int rv; + printf("%s: SENDING \"%s\"\n", name, name); + if ((rv = bipc_send(&sock, name, strlen(name) + 1)) != 0) { + fatal("bipc_send", rv); + } + return (rv); +} + +int +recv_name(bipc_socket_t sock, const char *name) +{ + char *buf = NULL; + int rv; + size_t sz; + if ((rv = bipc_recv(&sock, &buf, &sz)) == 0) { + printf("%s: RECEIVED \"%s\"\n", name, buf); + nng_free(buf, sz); + } + return (rv); +} + +int +send_recv(bipc_socket_t sock, const char *name) +{ + int rv; + int timeout = 100; + // if ((rv = nng_setopt_ms(sock, NNG_OPT_RECVTIMEO, 100)) != 0) { + if ((rv = bipc_setopt(&sock, BIPC_OPT_RECVTIMEO, &timeout, sizeof(int))) != 0) { + fatal("nng_setopt_ms", rv); + } + for (;;) { + recv_name(sock, name); + sleep(1); + send_name(sock, name); + } +} + +int +node0(const char *url) +{ + bipc_socket_t sock; + int rv; + // if ((rv = nng_pair0_open(&sock)) != 0) { + // fatal("nng_pair0_open", rv); + // } + if ((rv = bipc_listen(&sock, url, PAIR)) !=0) { + fatal("nng_listen", rv); + } + return (send_recv(sock, NODE0)); +} + +int +node1(const char *url) +{ + bipc_socket_t sock; + int rv; + sleep(1); + // if ((rv = nng_pair0_open(&sock)) != 0) { + // fatal("nng_pair0_open", rv); + // } + if ((rv = bipc_connect(&sock, url, PAIR)) != 0) { + fatal("nng_dial", rv); + } + return (send_recv(sock, NODE1)); +} + +int +main(int argc, char **argv) +{ + if ((argc > 1) && (strcmp(NODE0, argv[1]) == 0)) + return (node0(argv[2])); + + if ((argc > 1) && (strcmp(NODE1, argv[1]) == 0)) + return (node1(argv[2])); + + fprintf(stderr, "Usage: pair %s|%s <URL> <ARG> ...\n", NODE0, NODE1); + return 1; +} \ No newline at end of file diff --git a/bipc/test_pair.sh b/bipc/test_pair.sh new file mode 100755 index 0000000..6238401 --- /dev/null +++ b/bipc/test_pair.sh @@ -0,0 +1,4 @@ +./test_pair node0 ipc:///tmp/test_pair.ipc & node0=$! +./test_pair node1 ipc:///tmp/test_pair.ipc & node1=$! +sleep 4 +kill $node0 $node1 \ No newline at end of file diff --git a/bipc/test_pubsub b/bipc/test_pubsub index eab45e6..19b5575 100755 --- a/bipc/test_pubsub +++ b/bipc/test_pubsub Binary files differ diff --git a/bipc/test_pullpush b/bipc/test_pullpush new file mode 100755 index 0000000..c9ac05d --- /dev/null +++ b/bipc/test_pullpush Binary files differ diff --git a/bipc/test_pullpush.c b/bipc/test_pullpush.c new file mode 100644 index 0000000..f8e44ff --- /dev/null +++ b/bipc/test_pullpush.c @@ -0,0 +1,71 @@ +#include "bipc.h" + +#define NODE0 "node0" +#define NODE1 "node1" + +void +fatal(const char *func, int rv) +{ + fprintf(stderr, "%s: %s\n", func, bipc_strerror(rv)); + exit(1); +} + +int +node0(const char *url) +{ + nng_socket sock; + int rv; + + // if ((rv = nng_pull0_open(&sock)) != 0) { + // fatal("nng_pull0_open", rv); + // } + if ((rv = bipc_listen(&sock, url, PULL_PUSH)) != 0) { + fatal("nng_listen", rv); + } + for (;;) { + char *buf = NULL; + size_t sz; + if ((rv = bipc_recv(&sock, &buf, &sz)) != 0) { + fatal("nng_recv", rv); + } + printf("NODE0: RECEIVED \"%s\"\n", buf); + bipc_free(buf, sz); + } +} + +int +node1(const char *url, char *msg) +{ + int sz_msg = strlen(msg) + 1; // '\0' too + nng_socket sock; + int rv; + int bytes; + + // if ((rv = nng_push0_open(&sock)) != 0) { + // fatal("nng_push0_open", rv); + // } + if ((rv = bipc_connect(&sock, url, PULL_PUSH)) != 0) { + fatal("nng_dial", rv); + } + printf("NODE1: SENDING \"%s\"\n", msg); + if ((rv = bipc_send(&sock, msg, strlen(msg)+1)) != 0) { + fatal("nng_send", rv); + } + sleep(1); // wait for messages to flush before shutting down + bipc_close(&sock); + return (0); +} + +int +main(int argc, char **argv) +{ + if ((argc > 1) && (strcmp(NODE0, argv[1]) == 0)) + return (node0(argv[2])); + + if ((argc > 2) && (strcmp(NODE1, argv[1]) == 0)) + return (node1(argv[2], argv[3])); + + fprintf(stderr, "Usage: pipeline %s|%s <URL> <ARG> ...'\n", + NODE0, NODE1); + return (1); +} \ No newline at end of file diff --git a/bipc/test_pullpush.sh b/bipc/test_pullpush.sh new file mode 100755 index 0000000..3b0c6a2 --- /dev/null +++ b/bipc/test_pullpush.sh @@ -0,0 +1,4 @@ +./test_pullpush node0 ipc:///tmp/test_pullpush.ipc & node0=$! && sleep 1 +./test_pullpush node1 ipc:///tmp/test_pullpush.ipc "Hello, World!" +./test_pullpush node1 ipc:///tmp/test_pullpush.ipc "Goodbye." +kill $node0 \ No newline at end of file diff --git a/bipc/test_reqrep b/bipc/test_reqrep new file mode 100755 index 0000000..5ea08b5 --- /dev/null +++ b/bipc/test_reqrep Binary files differ diff --git a/bipc/test_reqrep.c b/bipc/test_reqrep.c new file mode 100644 index 0000000..0fe62e6 --- /dev/null +++ b/bipc/test_reqrep.c @@ -0,0 +1,92 @@ +#include "bipc.h" + +#define NODE0 "node0" +#define NODE1 "node1" +#define DATE "DATE" + +void +fatal(const char *func, int rv) +{ + fprintf(stderr, "%s: %s\n", func, bipc_strerror(rv)); + exit(1); +} + +char * +date(void) +{ + time_t now = time(&now); + struct tm *info = localtime(&now); + char *text = asctime(info); + text[strlen(text)-1] = '\0'; // remove '\n' + return (text); +} + +int +node0(const char *url) +{ + bipc_socket_t sock; + int rv; + + // if ((rv = nng_rep0_open(&sock)) != 0) { + // fatal("nng_rep0_open", rv); + // } + if ((rv = bipc_listen(&sock, url, REQ_REP)) != 0) { + fatal("nng_listen", rv); + } + for (;;) { + char *buf = NULL; + size_t sz; + if ((rv = bipc_recv(&sock, &buf, &sz)) != 0) { + fatal("nng_recv", rv); + } + if ((sz == (strlen(DATE) + 1)) && (strcmp(DATE, buf) == 0)) { + printf("NODE0: RECEIVED DATE REQUEST\n"); + char *d = date(); + printf("NODE0: SENDING DATE %s\n", d); + if ((rv = bipc_send(&sock, d, strlen(d) + 1)) != 0) { + fatal("nng_send", rv); + } + } + bipc_free(buf, sz); + } +} + +int +node1(const char *url) +{ + bipc_socket_t sock; + int rv; + size_t sz; + char *buf = NULL; + + // if ((rv = nng_req0_open(&sock)) != 0) { + // fatal("bipc_socket_t", rv); + // } + if ((rv = bipc_connect(&sock, url, REQ_REP)) != 0) { + fatal("nng_dial", rv); + } + printf("NODE1: SENDING DATE REQUEST %s\n", DATE); + if ((rv = bipc_send(&sock, DATE, strlen(DATE)+1)) != 0) { + fatal("nng_send", rv); + } + if ((rv = bipc_recv(&sock, &buf, &sz)) != 0) { + fatal("nng_recv", rv); + } + printf("NODE1: RECEIVED DATE %s\n", buf); + bipc_free(buf, sz); + bipc_close(&sock); + return (0); +} + +int +main(const int argc, const char **argv) +{ + if ((argc > 1) && (strcmp(NODE0, argv[1]) == 0)) + return (node0(argv[2])); + + if ((argc > 1) && (strcmp(NODE1, argv[1]) == 0)) + return (node1(argv[2])); + + fprintf(stderr, "Usage: reqrep %s|%s <URL> ...\n", NODE0, NODE1); + return (1); +} \ No newline at end of file diff --git a/bipc/test_reqrep.sh b/bipc/test_reqrep.sh new file mode 100755 index 0000000..54c3226 --- /dev/null +++ b/bipc/test_reqrep.sh @@ -0,0 +1,3 @@ +./test_reqrep node0 ipc:///tmp/test_reqrep.ipc & node0=$! && sleep 1 +./test_reqrep node1 ipc:///tmp/test_reqrep.ipc +kill $node0 \ No newline at end of file diff --git a/bipc/test_survey b/bipc/test_survey index d577d43..95a37dc 100755 --- a/bipc/test_survey +++ b/bipc/test_survey Binary files differ diff --git a/bipc/test_survey.c b/bipc/test_survey.c index 40d6ad0..b4ee7e7 100644 --- a/bipc/test_survey.c +++ b/bipc/test_survey.c @@ -26,35 +26,35 @@ int server(const char *url) { - bipc_socket_t sock; - int rv; + bipc_socket_t sock; + int rv; - - if ((rv = bipc_listen(&sock, url, SURVEY)) != 0) { - fatal("nng_listen", rv); + + if ((rv = bipc_listen(&sock, url, SURVEY)) != 0) { + fatal("nng_listen", rv); + } + for (;;) { + printf("SERVER: SENDING DATE SURVEY REQUEST\n"); + if ((rv = bipc_send(&sock, DATE, strlen(DATE) + 1)) != 0) { + fatal("nng_send", rv); } + for (;;) { - printf("SERVER: SENDING DATE SURVEY REQUEST\n"); - if ((rv = bipc_send(&sock, DATE, strlen(DATE) + 1)) != 0) { - fatal("nng_send", rv); + char *buf = NULL; + size_t sz; + rv = bipc_recv(&sock, &buf, &sz); + if (rv == BIPC_ETIMEDOUT) { + break; } - - for (;;) { - char *buf = NULL; - size_t sz; - rv = bipc_recv(&sock, &buf, &sz); - if (rv == BIPC_ETIMEDOUT) { - break; - } - if (rv != 0) { - fatal("nng_recv", rv); - } - printf("SERVER: RECEIVED \"%s\" SURVEY RESPONSE\n", buf); - bipc_free(buf, sz); + if (rv != 0) { + fatal("nng_recv", rv); } - - printf("SERVER: SURVEY COMPLETE\n"); + printf("SERVER: RECEIVED \"%s\" SURVEY RESPONSE\n", buf); + bipc_free(buf, sz); } + + printf("SERVER: SURVEY COMPLETE\n"); + } } int diff --git a/device/libnetdisk.a b/device/libnetdisk.a index a9a36a5..d77f071 100644 --- a/device/libnetdisk.a +++ b/device/libnetdisk.a Binary files differ diff --git a/service/netdisk_service b/service/netdisk_service index fd9bbc1..51ffa2d 100755 --- a/service/netdisk_service +++ b/service/netdisk_service Binary files differ diff --git a/service/test b/service/test index f5f368d..a32becf 100755 --- a/service/test +++ b/service/test Binary files differ diff --git a/service/test_client b/service/test_client index baf8670..bf86e07 100755 --- a/service/test_client +++ b/service/test_client Binary files differ diff --git a/service/test_properties b/service/test_properties index 31584af..335c825 100755 --- a/service/test_properties +++ b/service/test_properties Binary files differ diff --git a/service/test_queue b/service/test_queue index e417957..9989048 100755 --- a/service/test_queue +++ b/service/test_queue Binary files differ -- Gitblit v1.8.0