From 27a32410481fc10e789315b3a1dab88a33020270 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期二, 16 六月 2020 15:45:37 +0800
Subject: [PATCH] finished bipc
---
bipc/test_pullpush.sh | 4
bipc/test_reqrep.sh | 3
bipc/test_pair.sh | 4
bipc/bipc.h | 29 +++
bipc/test_reqrep.c | 92 +++++++++++
bipc/bipc.c | 14 +
service/netdisk_service | 0
bipc/test_survey | 0
bipc/test_bus.c | 74 +++++++++
service/test_client | 0
device/libnetdisk.a | 0
bipc/test_pair.c | 93 +++++++++++
bipc/test_pullpush.c | 71 ++++++++
service/test_queue | 0
bipc/test_survey.c | 46 ++--
bipc/test_pair | 0
bipc/test_bus.sh | 6
bipc/test_pullpush | 0
bipc/Makefile | 10 +
service/test_properties | 0
bipc/test_pubsub | 0
/dev/null | 0
bipc/test_reqrep | 0
bipc/test_bus | 0
service/test | 0
25 files changed, 419 insertions(+), 27 deletions(-)
diff --git a/bipc/Makefile b/bipc/Makefile
index db7b85b..d7e7051 100644
--- a/bipc/Makefile
+++ b/bipc/Makefile
@@ -16,7 +16,7 @@
include $(ROOT)/Make.defines.$(PLATFORM)
-PROGS = test_survey test_survey2 test_pubsub
+PROGS = test_survey test_survey2 test_pubsub test_pullpush test_pair test_bus test_reqrep
build: $(PROGS)
@@ -26,6 +26,14 @@
test_pubsub: test_pubsub.c bipc.c
+test_pullpush: test_pullpush.c bipc.c
+
+test_pair: test_pair.c bipc.c
+
+test_bus: test_bus.c bipc.c
+
+test_reqrep: test_reqrep.c bipc.c
+
clean:
rm -f $(TEMPFILES) $(PROGS)
diff --git a/bipc/bipc.c b/bipc/bipc.c
index a1918c7..a7ae9e0 100644
--- a/bipc/bipc.c
+++ b/bipc/bipc.c
@@ -106,7 +106,7 @@
}
-int bipc_recv(nng_socket *sock, void *data, size_t *sizep) {
+int bipc_recv(bipc_socket_t *sock, void *data, size_t *sizep) {
//int rv = nng_recv(*sock, data, sizep, 0);
// char *buf = NULL;
@@ -120,12 +120,24 @@
return rv;
}
+int bipc_setopt(bipc_socket_t *s, const char *opt, const void *val, size_t valsz) {
+ const char *tmp_opt;
+ if(strcmp(opt, BIPC_OPT_RECVTIMEO) == 0) {
+ tmp_opt = NNG_OPT_RECVTIMEO;
+ }
+ return nng_setopt(*s, tmp_opt, val, valsz);
+}
void bipc_free(void *ptr, size_t size) {
nng_free(ptr, size);
}
+int bipc_close(bipc_socket_t *s){
+ return nng_close(*s);
+}
+
+
const char * bipc_strerror(int error) {
return nng_strerror(error);
diff --git a/bipc/bipc.h b/bipc/bipc.h
index 9dcdc7b..9ae0994 100644
--- a/bipc/bipc.h
+++ b/bipc/bipc.h
@@ -23,6 +23,27 @@
};
+#define BIPC_OPT_SOCKNAME "socket-name"
+#define BIPC_OPT_RAW "raw"
+#define BIPC_OPT_PROTO "protocol"
+#define BIPC_OPT_PROTONAME "protocol-name"
+#define BIPC_OPT_PEER "peer"
+#define BIPC_OPT_PEERNAME "peer-name"
+#define BIPC_OPT_RECVBUF "recv-buffer"
+#define BIPC_OPT_SENDBUF "send-buffer"
+#define BIPC_OPT_RECVFD "recv-fd"
+#define BIPC_OPT_SENDFD "send-fd"
+#define BIPC_OPT_RECVTIMEO "recv-timeout"
+#define BIPC_OPT_SENDTIMEO "send-timeout"
+#define BIPC_OPT_LOCADDR "local-address"
+#define BIPC_OPT_REMADDR "remote-address"
+#define BIPC_OPT_URL "url"
+#define BIPC_OPT_MAXTTL "ttl-max"
+#define BIPC_OPT_RECVMAXSZ "recv-size-max"
+#define BIPC_OPT_RECONNMINT "reconnect-time-min"
+#define BIPC_OPT_RECONNMAXT "reconnect-time-max"
+
+
typedef nng_socket bipc_socket_t;
int bipc_listen(bipc_socket_t *sock, const char *url, bipc_mod_t mod);
@@ -31,8 +52,12 @@
int bipc_send(bipc_socket_t *sock, const void *data, size_t size);
-int bipc_recv(nng_socket *sock, void *data, size_t *sizep);
+int bipc_recv(bipc_socket_t *sock, void *data, size_t *sizep);
-void bipc_free(void *ptr, size_t size) ;
+int bipc_setopt(bipc_socket_t *s, const char *opt, const void *val, size_t valsz);
+
+void bipc_free(void *ptr, size_t size);
+
+int bipc_close(bipc_socket_t *s);
const char * bipc_strerror(int error);
\ No newline at end of file
diff --git a/bipc/core b/bipc/core
deleted file mode 100644
index 4042d75..0000000
--- a/bipc/core
+++ /dev/null
Binary files differ
diff --git a/bipc/test_bus b/bipc/test_bus
new file mode 100755
index 0000000..567d137
--- /dev/null
+++ b/bipc/test_bus
Binary files differ
diff --git a/bipc/test_bus.c b/bipc/test_bus.c
new file mode 100644
index 0000000..a6189b4
--- /dev/null
+++ b/bipc/test_bus.c
@@ -0,0 +1,74 @@
+#include "bipc.h"
+
+
+void
+fatal(const char *func, int rv)
+{
+ fprintf(stderr, "%s: %s\n", func, bipc_strerror(rv));
+ exit(1);
+}
+
+int
+node(int argc, char **argv)
+{
+ bipc_socket_t sock;
+ int rv;
+ size_t sz;
+
+
+ if ((rv = bipc_listen(&sock, argv[2], BUS)) != 0)
+ {
+ fatal("nng_listen", rv);
+ }
+
+ sleep(1); // wait for peers to bind
+ if (argc >= 3)
+ {
+ for (int x = 3; x < argc; x++)
+ {
+ if ((rv = bipc_connect(&sock, argv[x], BUS)) != 0)
+ {
+ fatal("nng_dial", rv);
+ }
+ }
+ }
+
+ sleep(1); // wait for connects to establish
+
+ // SEND
+ sz = strlen(argv[1]) + 1; // '\0' too
+ printf("%s: SENDING '%s' ONTO BUS\n", argv[1], argv[1]);
+ if ((rv = bipc_send(&sock, argv[1], sz)) != 0)
+ {
+ fatal("nng_send", rv);
+ }
+
+ // RECV
+ for (;;)
+ {
+ char *buf = NULL;
+ size_t sz;
+ if ((rv = bipc_recv(&sock, &buf, &sz)) != 0)
+ {
+ if (rv == BIPC_ETIMEDOUT)
+ {
+ fatal("nng_recv", rv);
+ }
+ }
+ printf("%s: RECEIVED '%s' FROM BUS\n", argv[1], buf);
+ bipc_free(buf, sz);
+ }
+ bipc_close(&sock);
+ return (0);
+}
+
+int
+main(int argc, char **argv)
+{
+ if (argc >= 3)
+ {
+ return (node(argc, argv));
+ }
+ fprintf(stderr, "Usage: bus <NODE_NAME> <URL> <URL> ...\n");
+ return 1;
+}
diff --git a/bipc/test_bus.sh b/bipc/test_bus.sh
new file mode 100755
index 0000000..abbea42
--- /dev/null
+++ b/bipc/test_bus.sh
@@ -0,0 +1,6 @@
+./test_bus node0 ipc:///tmp/node0.ipc ipc:///tmp/node1.ipc ipc:///tmp/node2.ipc & node0=$!
+./test_bus node1 ipc:///tmp/node1.ipc ipc:///tmp/node2.ipc ipc:///tmp/node3.ipc & node1=$!
+./test_bus node2 ipc:///tmp/node2.ipc ipc:///tmp/node3.ipc & node2=$!
+./test_bus node3 ipc:///tmp/node3.ipc ipc:///tmp/node0.ipc & node3=$!
+sleep 5
+kill $node0 $node1 $node2 $node3
\ No newline at end of file
diff --git a/bipc/test_pair b/bipc/test_pair
new file mode 100755
index 0000000..2b3d903
--- /dev/null
+++ b/bipc/test_pair
Binary files differ
diff --git a/bipc/test_pair.c b/bipc/test_pair.c
new file mode 100644
index 0000000..8ed4a41
--- /dev/null
+++ b/bipc/test_pair.c
@@ -0,0 +1,93 @@
+#include "bipc.h"
+
+#define NODE0 "node0"
+#define NODE1 "node1"
+
+void
+fatal(const char *func, int rv)
+{
+ fprintf(stderr, "%s: %s\n", func, bipc_strerror(rv));
+ exit(1);
+}
+
+int
+send_name(bipc_socket_t sock, const char *name)
+{
+ int rv;
+ printf("%s: SENDING \"%s\"\n", name, name);
+ if ((rv = bipc_send(&sock, name, strlen(name) + 1)) != 0) {
+ fatal("bipc_send", rv);
+ }
+ return (rv);
+}
+
+int
+recv_name(bipc_socket_t sock, const char *name)
+{
+ char *buf = NULL;
+ int rv;
+ size_t sz;
+ if ((rv = bipc_recv(&sock, &buf, &sz)) == 0) {
+ printf("%s: RECEIVED \"%s\"\n", name, buf);
+ nng_free(buf, sz);
+ }
+ return (rv);
+}
+
+int
+send_recv(bipc_socket_t sock, const char *name)
+{
+ int rv;
+ int timeout = 100;
+ // if ((rv = nng_setopt_ms(sock, NNG_OPT_RECVTIMEO, 100)) != 0) {
+ if ((rv = bipc_setopt(&sock, BIPC_OPT_RECVTIMEO, &timeout, sizeof(int))) != 0) {
+ fatal("nng_setopt_ms", rv);
+ }
+ for (;;) {
+ recv_name(sock, name);
+ sleep(1);
+ send_name(sock, name);
+ }
+}
+
+int
+node0(const char *url)
+{
+ bipc_socket_t sock;
+ int rv;
+ // if ((rv = nng_pair0_open(&sock)) != 0) {
+ // fatal("nng_pair0_open", rv);
+ // }
+ if ((rv = bipc_listen(&sock, url, PAIR)) !=0) {
+ fatal("nng_listen", rv);
+ }
+ return (send_recv(sock, NODE0));
+}
+
+int
+node1(const char *url)
+{
+ bipc_socket_t sock;
+ int rv;
+ sleep(1);
+ // if ((rv = nng_pair0_open(&sock)) != 0) {
+ // fatal("nng_pair0_open", rv);
+ // }
+ if ((rv = bipc_connect(&sock, url, PAIR)) != 0) {
+ fatal("nng_dial", rv);
+ }
+ return (send_recv(sock, NODE1));
+}
+
+int
+main(int argc, char **argv)
+{
+ if ((argc > 1) && (strcmp(NODE0, argv[1]) == 0))
+ return (node0(argv[2]));
+
+ if ((argc > 1) && (strcmp(NODE1, argv[1]) == 0))
+ return (node1(argv[2]));
+
+ fprintf(stderr, "Usage: pair %s|%s <URL> <ARG> ...\n", NODE0, NODE1);
+ return 1;
+}
\ No newline at end of file
diff --git a/bipc/test_pair.sh b/bipc/test_pair.sh
new file mode 100755
index 0000000..6238401
--- /dev/null
+++ b/bipc/test_pair.sh
@@ -0,0 +1,4 @@
+./test_pair node0 ipc:///tmp/test_pair.ipc & node0=$!
+./test_pair node1 ipc:///tmp/test_pair.ipc & node1=$!
+sleep 4
+kill $node0 $node1
\ No newline at end of file
diff --git a/bipc/test_pubsub b/bipc/test_pubsub
index eab45e6..19b5575 100755
--- a/bipc/test_pubsub
+++ b/bipc/test_pubsub
Binary files differ
diff --git a/bipc/test_pullpush b/bipc/test_pullpush
new file mode 100755
index 0000000..c9ac05d
--- /dev/null
+++ b/bipc/test_pullpush
Binary files differ
diff --git a/bipc/test_pullpush.c b/bipc/test_pullpush.c
new file mode 100644
index 0000000..f8e44ff
--- /dev/null
+++ b/bipc/test_pullpush.c
@@ -0,0 +1,71 @@
+#include "bipc.h"
+
+#define NODE0 "node0"
+#define NODE1 "node1"
+
+void
+fatal(const char *func, int rv)
+{
+ fprintf(stderr, "%s: %s\n", func, bipc_strerror(rv));
+ exit(1);
+}
+
+int
+node0(const char *url)
+{
+ nng_socket sock;
+ int rv;
+
+ // if ((rv = nng_pull0_open(&sock)) != 0) {
+ // fatal("nng_pull0_open", rv);
+ // }
+ if ((rv = bipc_listen(&sock, url, PULL_PUSH)) != 0) {
+ fatal("nng_listen", rv);
+ }
+ for (;;) {
+ char *buf = NULL;
+ size_t sz;
+ if ((rv = bipc_recv(&sock, &buf, &sz)) != 0) {
+ fatal("nng_recv", rv);
+ }
+ printf("NODE0: RECEIVED \"%s\"\n", buf);
+ bipc_free(buf, sz);
+ }
+}
+
+int
+node1(const char *url, char *msg)
+{
+ int sz_msg = strlen(msg) + 1; // '\0' too
+ nng_socket sock;
+ int rv;
+ int bytes;
+
+ // if ((rv = nng_push0_open(&sock)) != 0) {
+ // fatal("nng_push0_open", rv);
+ // }
+ if ((rv = bipc_connect(&sock, url, PULL_PUSH)) != 0) {
+ fatal("nng_dial", rv);
+ }
+ printf("NODE1: SENDING \"%s\"\n", msg);
+ if ((rv = bipc_send(&sock, msg, strlen(msg)+1)) != 0) {
+ fatal("nng_send", rv);
+ }
+ sleep(1); // wait for messages to flush before shutting down
+ bipc_close(&sock);
+ return (0);
+}
+
+int
+main(int argc, char **argv)
+{
+ if ((argc > 1) && (strcmp(NODE0, argv[1]) == 0))
+ return (node0(argv[2]));
+
+ if ((argc > 2) && (strcmp(NODE1, argv[1]) == 0))
+ return (node1(argv[2], argv[3]));
+
+ fprintf(stderr, "Usage: pipeline %s|%s <URL> <ARG> ...'\n",
+ NODE0, NODE1);
+ return (1);
+}
\ No newline at end of file
diff --git a/bipc/test_pullpush.sh b/bipc/test_pullpush.sh
new file mode 100755
index 0000000..3b0c6a2
--- /dev/null
+++ b/bipc/test_pullpush.sh
@@ -0,0 +1,4 @@
+./test_pullpush node0 ipc:///tmp/test_pullpush.ipc & node0=$! && sleep 1
+./test_pullpush node1 ipc:///tmp/test_pullpush.ipc "Hello, World!"
+./test_pullpush node1 ipc:///tmp/test_pullpush.ipc "Goodbye."
+kill $node0
\ No newline at end of file
diff --git a/bipc/test_reqrep b/bipc/test_reqrep
new file mode 100755
index 0000000..5ea08b5
--- /dev/null
+++ b/bipc/test_reqrep
Binary files differ
diff --git a/bipc/test_reqrep.c b/bipc/test_reqrep.c
new file mode 100644
index 0000000..0fe62e6
--- /dev/null
+++ b/bipc/test_reqrep.c
@@ -0,0 +1,92 @@
+#include "bipc.h"
+
+#define NODE0 "node0"
+#define NODE1 "node1"
+#define DATE "DATE"
+
+void
+fatal(const char *func, int rv)
+{
+ fprintf(stderr, "%s: %s\n", func, bipc_strerror(rv));
+ exit(1);
+}
+
+char *
+date(void)
+{
+ time_t now = time(&now);
+ struct tm *info = localtime(&now);
+ char *text = asctime(info);
+ text[strlen(text)-1] = '\0'; // remove '\n'
+ return (text);
+}
+
+int
+node0(const char *url)
+{
+ bipc_socket_t sock;
+ int rv;
+
+ // if ((rv = nng_rep0_open(&sock)) != 0) {
+ // fatal("nng_rep0_open", rv);
+ // }
+ if ((rv = bipc_listen(&sock, url, REQ_REP)) != 0) {
+ fatal("nng_listen", rv);
+ }
+ for (;;) {
+ char *buf = NULL;
+ size_t sz;
+ if ((rv = bipc_recv(&sock, &buf, &sz)) != 0) {
+ fatal("nng_recv", rv);
+ }
+ if ((sz == (strlen(DATE) + 1)) && (strcmp(DATE, buf) == 0)) {
+ printf("NODE0: RECEIVED DATE REQUEST\n");
+ char *d = date();
+ printf("NODE0: SENDING DATE %s\n", d);
+ if ((rv = bipc_send(&sock, d, strlen(d) + 1)) != 0) {
+ fatal("nng_send", rv);
+ }
+ }
+ bipc_free(buf, sz);
+ }
+}
+
+int
+node1(const char *url)
+{
+ bipc_socket_t sock;
+ int rv;
+ size_t sz;
+ char *buf = NULL;
+
+ // if ((rv = nng_req0_open(&sock)) != 0) {
+ // fatal("bipc_socket_t", rv);
+ // }
+ if ((rv = bipc_connect(&sock, url, REQ_REP)) != 0) {
+ fatal("nng_dial", rv);
+ }
+ printf("NODE1: SENDING DATE REQUEST %s\n", DATE);
+ if ((rv = bipc_send(&sock, DATE, strlen(DATE)+1)) != 0) {
+ fatal("nng_send", rv);
+ }
+ if ((rv = bipc_recv(&sock, &buf, &sz)) != 0) {
+ fatal("nng_recv", rv);
+ }
+ printf("NODE1: RECEIVED DATE %s\n", buf);
+ bipc_free(buf, sz);
+ bipc_close(&sock);
+ return (0);
+}
+
+int
+main(const int argc, const char **argv)
+{
+ if ((argc > 1) && (strcmp(NODE0, argv[1]) == 0))
+ return (node0(argv[2]));
+
+ if ((argc > 1) && (strcmp(NODE1, argv[1]) == 0))
+ return (node1(argv[2]));
+
+ fprintf(stderr, "Usage: reqrep %s|%s <URL> ...\n", NODE0, NODE1);
+ return (1);
+}
\ No newline at end of file
diff --git a/bipc/test_reqrep.sh b/bipc/test_reqrep.sh
new file mode 100755
index 0000000..54c3226
--- /dev/null
+++ b/bipc/test_reqrep.sh
@@ -0,0 +1,3 @@
+./test_reqrep node0 ipc:///tmp/test_reqrep.ipc & node0=$! && sleep 1
+./test_reqrep node1 ipc:///tmp/test_reqrep.ipc
+kill $node0
\ No newline at end of file
diff --git a/bipc/test_survey b/bipc/test_survey
index d577d43..95a37dc 100755
--- a/bipc/test_survey
+++ b/bipc/test_survey
Binary files differ
diff --git a/bipc/test_survey.c b/bipc/test_survey.c
index 40d6ad0..b4ee7e7 100644
--- a/bipc/test_survey.c
+++ b/bipc/test_survey.c
@@ -26,35 +26,35 @@
int
server(const char *url)
{
- bipc_socket_t sock;
- int rv;
+ bipc_socket_t sock;
+ int rv;
-
- if ((rv = bipc_listen(&sock, url, SURVEY)) != 0) {
- fatal("nng_listen", rv);
+
+ if ((rv = bipc_listen(&sock, url, SURVEY)) != 0) {
+ fatal("nng_listen", rv);
+ }
+ for (;;) {
+ printf("SERVER: SENDING DATE SURVEY REQUEST\n");
+ if ((rv = bipc_send(&sock, DATE, strlen(DATE) + 1)) != 0) {
+ fatal("nng_send", rv);
}
+
for (;;) {
- printf("SERVER: SENDING DATE SURVEY REQUEST\n");
- if ((rv = bipc_send(&sock, DATE, strlen(DATE) + 1)) != 0) {
- fatal("nng_send", rv);
+ char *buf = NULL;
+ size_t sz;
+ rv = bipc_recv(&sock, &buf, &sz);
+ if (rv == BIPC_ETIMEDOUT) {
+ break;
}
-
- for (;;) {
- char *buf = NULL;
- size_t sz;
- rv = bipc_recv(&sock, &buf, &sz);
- if (rv == BIPC_ETIMEDOUT) {
- break;
- }
- if (rv != 0) {
- fatal("nng_recv", rv);
- }
- printf("SERVER: RECEIVED \"%s\" SURVEY RESPONSE\n", buf);
- bipc_free(buf, sz);
+ if (rv != 0) {
+ fatal("nng_recv", rv);
}
-
- printf("SERVER: SURVEY COMPLETE\n");
+ printf("SERVER: RECEIVED \"%s\" SURVEY RESPONSE\n", buf);
+ bipc_free(buf, sz);
}
+
+ printf("SERVER: SURVEY COMPLETE\n");
+ }
}
int
diff --git a/device/libnetdisk.a b/device/libnetdisk.a
index a9a36a5..d77f071 100644
--- a/device/libnetdisk.a
+++ b/device/libnetdisk.a
Binary files differ
diff --git a/service/netdisk_service b/service/netdisk_service
index fd9bbc1..51ffa2d 100755
--- a/service/netdisk_service
+++ b/service/netdisk_service
Binary files differ
diff --git a/service/test b/service/test
index f5f368d..a32becf 100755
--- a/service/test
+++ b/service/test
Binary files differ
diff --git a/service/test_client b/service/test_client
index baf8670..bf86e07 100755
--- a/service/test_client
+++ b/service/test_client
Binary files differ
diff --git a/service/test_properties b/service/test_properties
index 31584af..335c825 100755
--- a/service/test_properties
+++ b/service/test_properties
Binary files differ
diff --git a/service/test_queue b/service/test_queue
index e417957..9989048 100755
--- a/service/test_queue
+++ b/service/test_queue
Binary files differ
--
Gitblit v1.8.0