From 2d390df9ede39c9d7c34bd8190b9329cfc371325 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期五, 27 九月 2019 16:19:05 +0800
Subject: [PATCH] remove exist ipc

---
 nng.go |  224 ++++++++++++++++++++++++-------------------------------
 1 files changed, 97 insertions(+), 127 deletions(-)

diff --git a/nng.go b/nng.go
index 77619e3..812cc37 100644
--- a/nng.go
+++ b/nng.go
@@ -24,156 +24,122 @@
 // NNG mangos wrap
 type NNG struct {
 	sock mangos.Socket
+	typ  td
+	mode Mode
+	url  string
+
+	arguments []interface{}
 }
 
 // Send impl interface Diliver
 func (n *NNG) Send(data []byte) error {
-	if n.sock == nil {
+	if n == nil {
 		return errors.New("please init NNG first")
 	}
-
-	switch n.sock.GetProtocol().Number() {
-	case mangos.ProtoSurveyor:
-		time.Sleep(surveyorTime * 2)
-	default:
-	}
-	if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil {
-		msg := mangos.NewMessage(len(data))
-		msg.Body = data
-		return n.sock.SendMsg(msg)
+	var err error
+	if n.sock == nil {
+		n.sock, err = n.makeNNG(agent)
+		if err != nil {
+			fmt.Println("create nng sender error")
+			return err
+		}
 	}
 
-	return n.sock.Send(data)
+	if surveyorTime > 0 {
+		time.Sleep(time.Duration(surveyorTime*2) * time.Second)
+	}
+
+	msg := mangos.NewMessage(1)
+	msg.Body = data
+	return n.sock.SendMsg(msg)
 }
 
 // Recv impl interface Diliver
 func (n *NNG) Recv() ([]byte, error) {
-	if n.sock == nil {
+	if n == nil {
 		return nil, errors.New("please init NNG first")
 	}
-	if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil {
-		if msg, err := n.sock.RecvMsg(); err == nil {
-			return msg.Body, err
+
+	var err error
+
+	if n.sock == nil {
+		n.sock, err = n.makeNNG(coactee)
+		if err != nil {
+			fmt.Println("create nng reciever error")
+			return nil, err
 		}
+	}
+
+	var msg *mangos.Message
+	if msg, err = n.sock.RecvMsg(); err != nil {
 		return nil, err
 	}
-	return n.sock.Recv()
+	return msg.Body, nil
+
+}
+
+// Recv2 impl interface
+func (n *NNG) Recv2(data []byte) (l int, err error) {
+	data, err = n.Recv()
+	l = len(data)
+	return l, err
 }
 
 // Close impl interface Deliver
 func (n *NNG) Close() {
-	if n.sock != nil {
+	if n != nil && n.sock != nil {
 		n.sock.Close()
+		n.sock = nil
 	}
 }
 
-// nngProducer create from deliver Mode
-func nngProducer(m Mode, url string, args ...interface{}) *NNG {
+func nngServer(m Mode, url string, args ...interface{}) *NNG {
 
 	rmExistedIpcName(url)
-	if sock, err := newSocket(protoProducer(m)); err == nil {
-		if err = setSocketOptions(sock, args); err != nil {
-			return nil
-		}
-		if err = sock.Listen(url); err != nil {
-			sock.Close()
-			return nil
-		}
-		return &NNG{
-			sock,
-		}
-	}
 
-	return nil
+	return &NNG{
+		typ:       agent,
+		mode:      m,
+		url:       url,
+		arguments: args,
+	}
 }
 
-// nngConsumer create from deliver Mode
-func nngConsumer(m Mode, url string, args ...interface{}) *NNG {
+func nngClient(m Mode, url string, args ...interface{}) *NNG {
 
-	if sock, err := newSocket(protoConsumer(m)); err == nil {
-		if err = setSocketOptions(sock, args); err != nil {
-			return nil
-		}
-
-		if err = sock.Dial(url); err != nil {
-			sock.Close()
-			return nil
-		}
-
-		return &NNG{
-			sock,
-		}
+	return &NNG{
+		typ:       coactee,
+		mode:      m,
+		url:       url,
+		arguments: args,
 	}
 
-	return nil
 }
 
-// maxRecvSize max recv size
-var (
-	maxRecvSize  = 33 * 1024 * 1024
-	surveyorTime = time.Second / 2
-)
+func proto(typ td, m Mode) protocol {
+	if typ == agent {
+		return protoAgent(m)
+	} else if typ == coactee {
+		return protoCoactee(m)
+	}
+	return NONE
+}
 
-func defualtSocketOptions(sock mangos.Socket) error {
+func (n *NNG) makeNNG(typ td) (mangos.Socket, error) {
+
+	var sock mangos.Socket
 	var err error
-	if err = sock.SetOption(mangos.OptionMaxRecvSize, maxRecvSize); err != nil {
-		sock.Close()
-		return err
-	}
-	if err = sock.SetOption(mangos.OptionWriteQLen, 0); err != nil {
-		sock.Close()
-		return err
-	}
-	if err = sock.SetOption(mangos.OptionReadQLen, 0); err != nil {
-		sock.Close()
-		return err
-	}
-	if err = sock.SetOption(mangos.OptionRecvDeadline, time.Second); err != nil {
-		sock.Close()
-		return err
-	}
-	if err = sock.SetOption(mangos.OptionSendDeadline, time.Second); err != nil {
-		sock.Close()
-		return err
-	}
-	if err = sock.SetOption(mangos.OptionRaw, true); err != nil {
-		sock.Close()
-		return err
-	}
 
-	return nil
-}
-
-func setSocketOptions(sock mangos.Socket, args ...interface{}) error {
-
-	err := defualtSocketOptions(sock)
-	if err != nil {
-		return err
-	}
-	switch sock.GetProtocol().Number() {
-	case mangos.ProtoSub:
-		for _, arg := range args {
-			switch arg.(type) {
-			case string:
-				err = sock.SetOption(mangos.OptionSubscribe, []byte(arg.(string)))
-			default:
-				err = sock.SetOption(mangos.OptionSubscribe, []byte(""))
-			}
-		}
-	case mangos.ProtoSurveyor:
-		for _, arg := range args {
-			switch arg.(type) {
-			case int:
-				surveyorTime = time.Duration(arg.(int)/2) * time.Second
-			default:
-			}
-			err = sock.SetOption(mangos.OptionSurveyTime, surveyorTime)
-		}
+	switch n.mode {
+	case Bus:
+		sock, err = n.busMakeNNG(typ)
+	case ReqRep, SurvResp:
+		sock, err = n.rrMakeNNG(typ)
 	default:
-		fmt.Println("no additional args")
+		sock, err = n.ppMakeNNG(typ)
 	}
 
-	return nil
+	return sock, err
 }
 
 func rmExistedIpcName(url string) {
@@ -181,6 +147,8 @@
 
 	if s[0] == "ipc" {
 		if _, err := os.Stat(s[1]); err == nil {
+			os.Remove(s[1])
+		} else if !os.IsNotExist(err) {
 			os.Remove(s[1])
 		}
 	}
@@ -190,6 +158,9 @@
 // access the underlying library.
 func newSocket(p protocol) (mangos.Socket, error) {
 
+	if p == NONE {
+		return nil, errors.New("new socket protocol none")
+	}
 	var s mangos.Socket
 	var err error
 
@@ -223,8 +194,6 @@
 	}
 
 	all.AddTransports(s)
-	// s.AddTransport(ipc.NewTransport())
-	// s.AddTransport(tcp.NewTransport())
 
 	return s, nil
 }
@@ -240,6 +209,7 @@
 
 // Constants for protocols.
 const (
+	NONE       = -1
 	PUSH       = protocol(mangos.ProtoPush)
 	PULL       = protocol(mangos.ProtoPull)
 	PUB        = protocol(mangos.ProtoPub)
@@ -252,38 +222,38 @@
 	PAIR       = protocol(mangos.ProtoPair)
 )
 
-func protoProducer(m Mode) protocol {
+func protoAgent(m Mode) protocol {
 	switch m {
 	case PushPull:
 		return PUSH
 	case PubSub:
 		return PUB
-	case ReqRep:
-		return REP
-	case SurvResp:
-		return SURVEYOR
-	case Bus:
-		return BUS
 	case Pair:
 		return PAIR
+	case SurvResp:
+		return SURVEYOR
+	case ReqRep:
+		return REQ
+	case Bus:
+		return BUS
 	}
-	return PUSH
+	return NONE
 }
 
-func protoConsumer(m Mode) protocol {
+func protoCoactee(m Mode) protocol {
 	switch m {
 	case PushPull:
 		return PULL
 	case PubSub:
 		return SUB
-	case ReqRep:
-		return REQ
-	case SurvResp:
-		return RESPONDENT
-	case Bus:
-		return BUS
 	case Pair:
 		return PAIR
+	case SurvResp:
+		return RESPONDENT
+	case ReqRep:
+		return REP
+	case Bus:
+		return BUS
 	}
-	return PULL
+	return NONE
 }

--
Gitblit v1.8.0