554325746@qq.com
2019-08-23 93f640f882412de099ea62743a28798094bdcd70
shm.go
@@ -25,11 +25,11 @@
   signal chan []byte
}
func (s *SHM) routine(fn func([]byte)(int, error)){
func (s *SHM) sendRoutine(){
   for{
      select{
      case d := <-s.signal:
         n, err := fn(d)
         n, err := s.rw.Write(d)
         if err != nil{
            n = -1
         }
@@ -37,44 +37,24 @@
      }
   }
}
func (s *SHM) client() error{
   var err error
   if s.rw == nil{
      s.rw, err = shm.OpenSimplex(s.url)
   }
   return err
}
func (s *SHM) clearTimeout(){
   if s.typ == coactee{
      s.Close()
   }
   close(s.signal)
   s.signal = nil
}
func (s *SHM) initTimeout(fn func([]byte)(int, error)){
   if s.signal == nil{
      s.signal = make(chan []byte)
      go s.routine(fn)
   }
}
// Send impl interface Diliver
func (s *SHM) Send(data []byte) error {
   if s == nil {
      return errors.New("please init shm producer first")
   }
   if err := s.client(); err != nil{
      return err
   if s.rw == nil{
      var err error
      s.rw, err = shm.OpenSimplex(s.url)
      if err != nil{
         return err
      }
   }
   s.initTimeout(s.rw.Write)
   if s.signal == nil{
      s.signal = make(chan []byte)
      go s.sendRoutine()
   }
   s.signal <-data
@@ -85,13 +65,29 @@
      }
      return errors.New("shm send error")
   case <- time.After(3 * time.Second):
      s.clearTimeout()
      if s.typ == coactee{
         s.Close()
      }
      close(s.signal)
      s.signal = nil
      return errors.New("shm send timeout")
   }
   return errors.New("send should't here")
}
func (s *SHM) recvRoutine(){
   for{
      select{
      case <- s.signal:
         n, err := s.rw.Read(s.data)
         if err != nil{
            n = -1
         }
         s.res <- n
      }
   }
}
// Recv impl interface Diliver
func (s *SHM) Recv() ([]byte, error) {
@@ -99,17 +95,22 @@
      return nil, errors.New("please open shm consumer first")
   }
   if err := s.client(); err != nil{
      return nil, err
   if s.rw == nil{
      var err error
      s.rw, err = shm.OpenSimplex(s.url)
      if err != nil{
         return nil, err
      }
   }
   if s.data == nil{
   if s.signal == nil{
      s.signal = make(chan []byte)
      s.data = make([]byte, maxRecvSize)
      go s.recvRoutine()
   }
   
   s.initTimeout(s.rw.Read)
   s.signal <- s.data
   s.signal <- []byte{}
   select{
   case d := <-s.res:
@@ -120,7 +121,11 @@
      }
      return nil, nil
   case <- time.After(3 * time.Second):
      s.clearTimeout()
      if s.typ == coactee{
         s.Close()
      }
      close(s.signal)
      s.signal = nil
      return nil, errors.New("recv time out")
   }