From 27a32410481fc10e789315b3a1dab88a33020270 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期二, 16 六月 2020 15:45:37 +0800
Subject: [PATCH] finished bipc

---
 bipc/test_pullpush.sh   |    4 
 bipc/test_reqrep.sh     |    3 
 bipc/test_pair.sh       |    4 
 bipc/bipc.h             |   29 +++
 bipc/test_reqrep.c      |   92 +++++++++++
 bipc/bipc.c             |   14 +
 service/netdisk_service |    0 
 bipc/test_survey        |    0 
 bipc/test_bus.c         |   74 +++++++++
 service/test_client     |    0 
 device/libnetdisk.a     |    0 
 bipc/test_pair.c        |   93 +++++++++++
 bipc/test_pullpush.c    |   71 ++++++++
 service/test_queue      |    0 
 bipc/test_survey.c      |   46 ++--
 bipc/test_pair          |    0 
 bipc/test_bus.sh        |    6 
 bipc/test_pullpush      |    0 
 bipc/Makefile           |   10 +
 service/test_properties |    0 
 bipc/test_pubsub        |    0 
 /dev/null               |    0 
 bipc/test_reqrep        |    0 
 bipc/test_bus           |    0 
 service/test            |    0 
 25 files changed, 419 insertions(+), 27 deletions(-)

diff --git a/bipc/Makefile b/bipc/Makefile
index db7b85b..d7e7051 100644
--- a/bipc/Makefile
+++ b/bipc/Makefile
@@ -16,7 +16,7 @@
 include $(ROOT)/Make.defines.$(PLATFORM)
 
  
-PROGS = test_survey test_survey2 test_pubsub
+PROGS = test_survey test_survey2 test_pubsub test_pullpush test_pair test_bus test_reqrep
 
 
 build: $(PROGS)
@@ -26,6 +26,14 @@
  
 test_pubsub:  test_pubsub.c bipc.c
 
+test_pullpush:  test_pullpush.c bipc.c
+
+test_pair: test_pair.c bipc.c
+
+test_bus:  test_bus.c bipc.c
+
+test_reqrep: test_reqrep.c bipc.c
+
 clean:
 	rm -f $(TEMPFILES) $(PROGS)
 
diff --git a/bipc/bipc.c b/bipc/bipc.c
index a1918c7..a7ae9e0 100644
--- a/bipc/bipc.c
+++ b/bipc/bipc.c
@@ -106,7 +106,7 @@
 }
 
 
-int bipc_recv(nng_socket *sock,  void *data, size_t *sizep) {
+int bipc_recv(bipc_socket_t *sock,  void *data, size_t *sizep) {
 	
 	//int rv = nng_recv(*sock, data, sizep, 0);
 	// char *buf = NULL;
@@ -120,12 +120,24 @@
 	return rv;
 }
 
+int bipc_setopt(bipc_socket_t *s, const char *opt, const void *val, size_t valsz) {
+	const char *tmp_opt;
+	if(strcmp(opt, BIPC_OPT_RECVTIMEO) == 0) {
+		tmp_opt = NNG_OPT_RECVTIMEO;
+	}
+	return nng_setopt(*s, tmp_opt, val, valsz);
+}
 
 void bipc_free(void *ptr, size_t size) {
 	 nng_free(ptr, size);
 }
 
 
+int bipc_close(bipc_socket_t *s){
+	return nng_close(*s);
+}
+
+
 
 const char * bipc_strerror(int error) {
 	return nng_strerror(error);
diff --git a/bipc/bipc.h b/bipc/bipc.h
index 9dcdc7b..9ae0994 100644
--- a/bipc/bipc.h
+++ b/bipc/bipc.h
@@ -23,6 +23,27 @@
 };
 
 
+#define BIPC_OPT_SOCKNAME      "socket-name"
+#define BIPC_OPT_RAW           "raw"
+#define BIPC_OPT_PROTO         "protocol"
+#define BIPC_OPT_PROTONAME     "protocol-name"
+#define BIPC_OPT_PEER          "peer"
+#define BIPC_OPT_PEERNAME      "peer-name"
+#define BIPC_OPT_RECVBUF       "recv-buffer"
+#define BIPC_OPT_SENDBUF       "send-buffer"
+#define BIPC_OPT_RECVFD        "recv-fd"
+#define BIPC_OPT_SENDFD        "send-fd"
+#define BIPC_OPT_RECVTIMEO     "recv-timeout"
+#define BIPC_OPT_SENDTIMEO     "send-timeout"
+#define BIPC_OPT_LOCADDR       "local-address"
+#define BIPC_OPT_REMADDR       "remote-address"
+#define BIPC_OPT_URL           "url"
+#define BIPC_OPT_MAXTTL        "ttl-max"
+#define BIPC_OPT_RECVMAXSZ     "recv-size-max"
+#define BIPC_OPT_RECONNMINT    "reconnect-time-min"
+#define BIPC_OPT_RECONNMAXT    "reconnect-time-max"
+
+
 typedef nng_socket bipc_socket_t;
 
 int bipc_listen(bipc_socket_t *sock, const char *url, bipc_mod_t mod);
@@ -31,8 +52,12 @@
 
 int bipc_send(bipc_socket_t *sock,  const void *data, size_t size);
 
-int bipc_recv(nng_socket *sock,  void *data, size_t *sizep);
+int bipc_recv(bipc_socket_t *sock,  void *data, size_t *sizep);
 
-void bipc_free(void *ptr, size_t size) ;
+int bipc_setopt(bipc_socket_t *s, const char *opt, const void *val, size_t valsz);
+
+void bipc_free(void *ptr, size_t size);
+
+int bipc_close(bipc_socket_t *s);
 
 const char * bipc_strerror(int error);
\ No newline at end of file
diff --git a/bipc/core b/bipc/core
deleted file mode 100644
index 4042d75..0000000
--- a/bipc/core
+++ /dev/null
Binary files differ
diff --git a/bipc/test_bus b/bipc/test_bus
new file mode 100755
index 0000000..567d137
--- /dev/null
+++ b/bipc/test_bus
Binary files differ
diff --git a/bipc/test_bus.c b/bipc/test_bus.c
new file mode 100644
index 0000000..a6189b4
--- /dev/null
+++ b/bipc/test_bus.c
@@ -0,0 +1,74 @@
+#include "bipc.h"
+
+
+void
+fatal(const char *func, int rv)
+{
+  fprintf(stderr, "%s: %s\n", func, bipc_strerror(rv));
+  exit(1);
+}
+
+int
+node(int argc, char **argv)
+{
+  bipc_socket_t sock;
+  int rv;
+  size_t sz;
+
+
+  if ((rv = bipc_listen(&sock, argv[2], BUS)) != 0)
+  {
+    fatal("nng_listen", rv);
+  }
+
+  sleep(1); // wait for peers to bind
+  if (argc >= 3)
+  {
+    for (int x = 3; x < argc; x++)
+    {
+      if ((rv = bipc_connect(&sock, argv[x], BUS)) != 0)
+      {
+        fatal("nng_dial", rv);
+      }
+    }
+  }
+
+  sleep(1); // wait for connects to establish
+
+  // SEND
+  sz = strlen(argv[1]) + 1; // '\0' too
+  printf("%s: SENDING '%s' ONTO BUS\n", argv[1], argv[1]);
+  if ((rv = bipc_send(&sock, argv[1], sz)) != 0)
+  {
+    fatal("nng_send", rv);
+  }
+
+  // RECV
+  for (;;)
+  {
+    char *buf = NULL;
+    size_t sz;
+    if ((rv = bipc_recv(&sock, &buf, &sz)) != 0)
+    {
+      if (rv == BIPC_ETIMEDOUT)
+      {
+        fatal("nng_recv", rv);
+      }
+    }
+    printf("%s: RECEIVED '%s' FROM BUS\n", argv[1], buf);
+    bipc_free(buf, sz);
+  }
+  bipc_close(&sock);
+  return (0);
+}
+
+int
+main(int argc, char **argv)
+{
+  if (argc >= 3)
+  {
+    return (node(argc, argv));
+  }
+  fprintf(stderr, "Usage: bus <NODE_NAME> <URL> <URL> ...\n");
+  return 1;
+}
diff --git a/bipc/test_bus.sh b/bipc/test_bus.sh
new file mode 100755
index 0000000..abbea42
--- /dev/null
+++ b/bipc/test_bus.sh
@@ -0,0 +1,6 @@
+./test_bus node0 ipc:///tmp/node0.ipc ipc:///tmp/node1.ipc ipc:///tmp/node2.ipc & node0=$!
+./test_bus node1 ipc:///tmp/node1.ipc ipc:///tmp/node2.ipc ipc:///tmp/node3.ipc & node1=$!
+./test_bus node2 ipc:///tmp/node2.ipc ipc:///tmp/node3.ipc & node2=$!
+./test_bus node3 ipc:///tmp/node3.ipc ipc:///tmp/node0.ipc & node3=$!
+sleep 5
+kill $node0 $node1 $node2 $node3
\ No newline at end of file
diff --git a/bipc/test_pair b/bipc/test_pair
new file mode 100755
index 0000000..2b3d903
--- /dev/null
+++ b/bipc/test_pair
Binary files differ
diff --git a/bipc/test_pair.c b/bipc/test_pair.c
new file mode 100644
index 0000000..8ed4a41
--- /dev/null
+++ b/bipc/test_pair.c
@@ -0,0 +1,93 @@
+#include "bipc.h"
+
+#define NODE0 "node0"
+#define NODE1 "node1"
+
+void
+fatal(const char *func, int rv)
+{
+        fprintf(stderr, "%s: %s\n", func, bipc_strerror(rv));
+        exit(1);
+}
+
+int
+send_name(bipc_socket_t sock, const char *name)
+{
+        int rv;
+        printf("%s: SENDING \"%s\"\n", name, name);
+        if ((rv = bipc_send(&sock, name, strlen(name) + 1)) != 0) {
+                fatal("bipc_send", rv);
+        }
+        return (rv);
+}
+
+int
+recv_name(bipc_socket_t sock, const char *name)
+{
+        char *buf = NULL;
+        int rv;
+        size_t sz;
+        if ((rv = bipc_recv(&sock, &buf, &sz)) == 0) {
+                printf("%s: RECEIVED \"%s\"\n", name, buf); 
+                nng_free(buf, sz);
+        }
+        return (rv);
+}
+
+int
+send_recv(bipc_socket_t sock, const char *name)
+{
+    int rv;
+    int timeout = 100;
+    // if ((rv = nng_setopt_ms(sock, NNG_OPT_RECVTIMEO, 100)) != 0) {
+    if ((rv = bipc_setopt(&sock, BIPC_OPT_RECVTIMEO, &timeout, sizeof(int))) != 0) {
+            fatal("nng_setopt_ms", rv);
+    }
+    for (;;) {
+            recv_name(sock, name);
+            sleep(1);
+            send_name(sock, name);
+    }
+}
+
+int
+node0(const char *url)
+{
+    bipc_socket_t sock;
+    int rv;
+    // if ((rv = nng_pair0_open(&sock)) != 0) {
+    //         fatal("nng_pair0_open", rv);
+    // }
+     if ((rv = bipc_listen(&sock, url, PAIR)) !=0) {
+            fatal("nng_listen", rv);
+    }
+    return (send_recv(sock, NODE0));
+}
+
+int
+node1(const char *url)
+{
+    bipc_socket_t sock;
+    int rv;
+    sleep(1);
+    // if ((rv = nng_pair0_open(&sock)) != 0) {
+    //         fatal("nng_pair0_open", rv);
+    // }
+    if ((rv = bipc_connect(&sock, url, PAIR)) != 0) {
+            fatal("nng_dial", rv);
+    }
+    return (send_recv(sock, NODE1));
+}
+
+int
+main(int argc, char **argv)
+{
+    if ((argc > 1) && (strcmp(NODE0, argv[1]) == 0))
+            return (node0(argv[2]));
+
+    if ((argc > 1) && (strcmp(NODE1, argv[1]) == 0))
+            return (node1(argv[2]));
+
+    fprintf(stderr, "Usage: pair %s|%s <URL> <ARG> ...\n", NODE0, NODE1);
+    return 1;
+}
\ No newline at end of file
diff --git a/bipc/test_pair.sh b/bipc/test_pair.sh
new file mode 100755
index 0000000..6238401
--- /dev/null
+++ b/bipc/test_pair.sh
@@ -0,0 +1,4 @@
+./test_pair node0 ipc:///tmp/test_pair.ipc & node0=$!
+./test_pair node1 ipc:///tmp/test_pair.ipc & node1=$!
+sleep 4
+kill $node0 $node1
\ No newline at end of file
diff --git a/bipc/test_pubsub b/bipc/test_pubsub
index eab45e6..19b5575 100755
--- a/bipc/test_pubsub
+++ b/bipc/test_pubsub
Binary files differ
diff --git a/bipc/test_pullpush b/bipc/test_pullpush
new file mode 100755
index 0000000..c9ac05d
--- /dev/null
+++ b/bipc/test_pullpush
Binary files differ
diff --git a/bipc/test_pullpush.c b/bipc/test_pullpush.c
new file mode 100644
index 0000000..f8e44ff
--- /dev/null
+++ b/bipc/test_pullpush.c
@@ -0,0 +1,71 @@
+#include "bipc.h"
+
+#define NODE0 "node0"
+#define NODE1 "node1"
+
+void
+fatal(const char *func, int rv)
+{
+        fprintf(stderr, "%s: %s\n", func, bipc_strerror(rv));
+        exit(1);
+}
+
+int
+node0(const char *url)
+{
+        nng_socket sock;
+        int rv;
+
+        // if ((rv = nng_pull0_open(&sock)) != 0) {
+        //         fatal("nng_pull0_open", rv);
+        // }
+        if ((rv = bipc_listen(&sock, url, PULL_PUSH)) != 0) {
+                fatal("nng_listen", rv);
+        }
+        for (;;) {
+                char *buf = NULL;
+                size_t sz;
+                if ((rv = bipc_recv(&sock, &buf, &sz)) != 0) {
+                        fatal("nng_recv", rv);
+                }
+                printf("NODE0: RECEIVED \"%s\"\n", buf); 
+                bipc_free(buf, sz);
+        }
+}
+
+int
+node1(const char *url, char *msg)
+{
+        int sz_msg = strlen(msg) + 1; // '\0' too
+        nng_socket sock;
+        int rv;
+        int bytes;
+
+        // if ((rv = nng_push0_open(&sock)) != 0) {
+        //         fatal("nng_push0_open", rv);
+        // }
+        if ((rv = bipc_connect(&sock, url, PULL_PUSH)) != 0) {
+                fatal("nng_dial", rv);
+        }
+        printf("NODE1: SENDING \"%s\"\n", msg);
+        if ((rv = bipc_send(&sock, msg, strlen(msg)+1)) != 0) {
+                fatal("nng_send", rv);
+        }
+        sleep(1); // wait for messages to flush before shutting down
+        bipc_close(&sock);
+        return (0);
+}
+
+int
+main(int argc, char **argv)
+{
+        if ((argc > 1) && (strcmp(NODE0, argv[1]) == 0))
+                return (node0(argv[2]));
+
+        if ((argc > 2) && (strcmp(NODE1, argv[1]) == 0))
+                return (node1(argv[2], argv[3]));
+
+        fprintf(stderr, "Usage: pipeline %s|%s <URL> <ARG> ...'\n",
+                NODE0, NODE1);
+        return (1);
+}
\ No newline at end of file
diff --git a/bipc/test_pullpush.sh b/bipc/test_pullpush.sh
new file mode 100755
index 0000000..3b0c6a2
--- /dev/null
+++ b/bipc/test_pullpush.sh
@@ -0,0 +1,4 @@
+./test_pullpush node0 ipc:///tmp/test_pullpush.ipc & node0=$! && sleep 1
+./test_pullpush node1 ipc:///tmp/test_pullpush.ipc "Hello, World!"
+./test_pullpush node1 ipc:///tmp/test_pullpush.ipc "Goodbye."
+kill $node0
\ No newline at end of file
diff --git a/bipc/test_reqrep b/bipc/test_reqrep
new file mode 100755
index 0000000..5ea08b5
--- /dev/null
+++ b/bipc/test_reqrep
Binary files differ
diff --git a/bipc/test_reqrep.c b/bipc/test_reqrep.c
new file mode 100644
index 0000000..0fe62e6
--- /dev/null
+++ b/bipc/test_reqrep.c
@@ -0,0 +1,92 @@
+#include "bipc.h"
+
+#define NODE0 "node0"
+#define NODE1 "node1"
+#define DATE "DATE"
+
+void
+fatal(const char *func, int rv)
+{
+    fprintf(stderr, "%s: %s\n", func, bipc_strerror(rv));
+    exit(1);
+}
+
+char *
+date(void)
+{
+    time_t now = time(&now);
+    struct tm *info = localtime(&now);
+    char *text = asctime(info);
+    text[strlen(text)-1] = '\0'; // remove '\n'
+    return (text);
+}
+
+int
+node0(const char *url)
+{
+    bipc_socket_t sock;
+    int rv;
+
+    // if ((rv = nng_rep0_open(&sock)) != 0) {
+    //         fatal("nng_rep0_open", rv);
+    // }
+    if ((rv = bipc_listen(&sock, url, REQ_REP)) != 0) {
+        fatal("nng_listen", rv);
+    }
+    for (;;) {
+        char *buf = NULL;
+        size_t sz;
+        if ((rv = bipc_recv(&sock, &buf, &sz)) != 0) {
+                fatal("nng_recv", rv);
+        }
+        if ((sz == (strlen(DATE) + 1)) && (strcmp(DATE, buf) == 0)) {
+            printf("NODE0: RECEIVED DATE REQUEST\n");
+            char *d = date();
+            printf("NODE0: SENDING DATE %s\n", d);
+            if ((rv = bipc_send(&sock, d, strlen(d) + 1)) != 0) {
+                    fatal("nng_send", rv);
+            }
+        }
+        bipc_free(buf, sz);
+    }
+}
+
+int
+node1(const char *url)
+{
+    bipc_socket_t sock;
+    int rv;
+    size_t sz;
+    char *buf = NULL;
+
+    // if ((rv = nng_req0_open(&sock)) != 0) {
+    //         fatal("bipc_socket_t", rv);
+    // }
+    if ((rv = bipc_connect(&sock, url, REQ_REP)) != 0) {
+            fatal("nng_dial", rv);
+    }
+    printf("NODE1: SENDING DATE REQUEST %s\n", DATE);
+    if ((rv = bipc_send(&sock, DATE, strlen(DATE)+1)) != 0) {
+            fatal("nng_send", rv);
+    }
+    if ((rv = bipc_recv(&sock, &buf, &sz)) != 0) {
+            fatal("nng_recv", rv);
+    }
+    printf("NODE1: RECEIVED DATE %s\n", buf);  
+    bipc_free(buf, sz);
+    bipc_close(&sock);
+    return (0);
+}
+
+int
+main(const int argc, const char **argv)
+{
+    if ((argc > 1) && (strcmp(NODE0, argv[1]) == 0))
+        return (node0(argv[2]));
+
+    if ((argc > 1) && (strcmp(NODE1, argv[1]) == 0))
+        return (node1(argv[2]));
+
+    fprintf(stderr, "Usage: reqrep %s|%s <URL> ...\n", NODE0, NODE1);
+    return (1);
+}
\ No newline at end of file
diff --git a/bipc/test_reqrep.sh b/bipc/test_reqrep.sh
new file mode 100755
index 0000000..54c3226
--- /dev/null
+++ b/bipc/test_reqrep.sh
@@ -0,0 +1,3 @@
+./test_reqrep node0 ipc:///tmp/test_reqrep.ipc & node0=$! && sleep 1
+./test_reqrep node1 ipc:///tmp/test_reqrep.ipc
+kill $node0
\ No newline at end of file
diff --git a/bipc/test_survey b/bipc/test_survey
index d577d43..95a37dc 100755
--- a/bipc/test_survey
+++ b/bipc/test_survey
Binary files differ
diff --git a/bipc/test_survey.c b/bipc/test_survey.c
index 40d6ad0..b4ee7e7 100644
--- a/bipc/test_survey.c
+++ b/bipc/test_survey.c
@@ -26,35 +26,35 @@
 int
 server(const char *url)
 {
-        bipc_socket_t sock;
-        int rv;
+    bipc_socket_t sock;
+    int rv;
 
-        
-        if ((rv = bipc_listen(&sock, url, SURVEY)) != 0) {
-            fatal("nng_listen", rv);
+    
+    if ((rv = bipc_listen(&sock, url, SURVEY)) != 0) {
+        fatal("nng_listen", rv);
+    }
+    for (;;) {
+        printf("SERVER: SENDING DATE SURVEY REQUEST\n");
+        if ((rv = bipc_send(&sock, DATE, strlen(DATE) + 1)) != 0) {
+            fatal("nng_send", rv);
         }
+
         for (;;) {
-            printf("SERVER: SENDING DATE SURVEY REQUEST\n");
-            if ((rv = bipc_send(&sock, DATE, strlen(DATE) + 1)) != 0) {
-                fatal("nng_send", rv);
+            char *buf = NULL;
+            size_t sz;
+            rv = bipc_recv(&sock, &buf, &sz);
+            if (rv == BIPC_ETIMEDOUT) {
+                break;
             }
-
-            for (;;) {
-                char *buf = NULL;
-                size_t sz;
-                rv = bipc_recv(&sock, &buf, &sz);
-                if (rv == BIPC_ETIMEDOUT) {
-                    break;
-                }
-                if (rv != 0) {
-                    fatal("nng_recv", rv);
-                }
-                printf("SERVER: RECEIVED \"%s\" SURVEY RESPONSE\n", buf);
-                bipc_free(buf, sz);
+            if (rv != 0) {
+                fatal("nng_recv", rv);
             }
-
-            printf("SERVER: SURVEY COMPLETE\n");
+            printf("SERVER: RECEIVED \"%s\" SURVEY RESPONSE\n", buf);
+            bipc_free(buf, sz);
         }
+
+        printf("SERVER: SURVEY COMPLETE\n");
+    }
 }
 
 int
diff --git a/device/libnetdisk.a b/device/libnetdisk.a
index a9a36a5..d77f071 100644
--- a/device/libnetdisk.a
+++ b/device/libnetdisk.a
Binary files differ
diff --git a/service/netdisk_service b/service/netdisk_service
index fd9bbc1..51ffa2d 100755
--- a/service/netdisk_service
+++ b/service/netdisk_service
Binary files differ
diff --git a/service/test b/service/test
index f5f368d..a32becf 100755
--- a/service/test
+++ b/service/test
Binary files differ
diff --git a/service/test_client b/service/test_client
index baf8670..bf86e07 100755
--- a/service/test_client
+++ b/service/test_client
Binary files differ
diff --git a/service/test_properties b/service/test_properties
index 31584af..335c825 100755
--- a/service/test_properties
+++ b/service/test_properties
Binary files differ
diff --git a/service/test_queue b/service/test_queue
index e417957..9989048 100755
--- a/service/test_queue
+++ b/service/test_queue
Binary files differ

--
Gitblit v1.8.0