From 0c22e1e1b5c77fa5d09600200239bd3a0907fc78 Mon Sep 17 00:00:00 2001 From: 554325746@qq.com <554325746@qq.com> Date: 星期四, 22 八月 2019 14:53:29 +0800 Subject: [PATCH] mem leak bug --- shm.go | 110 +++++++++++++++++++++++++++++++++++++----------------- 1 files changed, 75 insertions(+), 35 deletions(-) diff --git a/shm.go b/shm.go index 21f8167..6326e7f 100644 --- a/shm.go +++ b/shm.go @@ -5,48 +5,90 @@ "fmt" "io" + "time" + "github.com/tmthrgd/shm-go" ) // SHM share memory type SHM struct { - rw *shm.ReadWriteCloser - typ td + rw *shm.ReadWriteCloser + typ td + data []byte } // Send impl interface Diliver func (s *SHM) Send(data []byte) error { - if s.rw == nil { + if s == nil || s.rw == nil { return errors.New("please init shm producer first") } - n, err := s.rw.Write(data) - if n < 1 { - fmt.Println("recv data less than 1 length") + ch := make(chan int) + go func(){ + n, _ := s.rw.Write(data) + ch <- n + }() + select{ + case <-ch: + return nil + case <- time.After(3 * time.Second): + return errors.New("send time out") } - return err + return errors.New("send should't here") + + // n, err := s.rw.Write(data) + // if n < 1 { + // fmt.Println("recv data less than 1 length") + // } + + // return err } // Recv impl interface Diliver func (s *SHM) Recv() ([]byte, error) { - if s.rw == nil { + if s == nil || s.rw == nil { return nil, errors.New("please open shm consumer first") } - n, err := s.rw.Read(s.data) - if err == nil || err == io.EOF { - s.data = s.data[:n:n] - return s.data, nil + ch := make(chan []byte) + go func(){ + var data []byte + n, err := s.rw.Read(s.data) + if err == nil || err == io.EOF { + data = make([]byte, n) + copy(data, s.data) + // s.data = s.data[:n:n] + } + ch <- s.data + }() + select{ + case d := <-ch: + return d, nil + case <- time.After(3 * time.Second): + return nil, errors.New("recv time out") } - return nil, err + return nil, errors.New("recv should't here") + + + // data := make([]byte, maxRecvSize) + // n, err := s.rw.Read(data) + // if err == nil || err == io.EOF { + // data = data[:n:n] + // return data, nil + // } + + // return nil, err } // Close impl interface Deliver func (s *SHM) Close() { + if s == nil { + return + } if s.rw != nil { s.rw.Close() } @@ -55,56 +97,54 @@ } } -func shmServer(m Mode, url string, args ...interface{}) *SHM { +func shmServer(m Mode, url string, args ...interface{}) (*SHM, error) { if m != Shm { - fmt.Println("this is not a shm mode: ", m) - return nil + return nil, errors.New("please use deliver.Shm mode") } var param []int for _, v := range args { - fmt.Println(v) switch v.(type) { case int: param = append(param, v.(int)) default: - fmt.Println("shmProducer recv error parameters") - return nil + return nil, errors.New("shmServer created recv error parameters") } } - if len(param) != 2 { - fmt.Println("shmProducer recv too much parameter: ", len(param)) - return nil + + blocks, size := 2, maxRecvSize + if len(param) == 2 { + blocks, size = param[0], param[1] + // return nil, errors.New("shmServer created recv too much parameters") } shm.Unlink(url) - if rw, err := shm.CreateSimplex(url, 0644, param[0], param[1]); err == nil { + + rw, err := shm.CreateSimplex(url, 0644, blocks, size) + if err == nil { fmt.Println(rw.Name()) return &SHM{ rw, agent, - nil, - } + make([]byte, maxRecvSize), + }, nil } - fmt.Println("create simple shm error") - return nil + return nil, err } -func shmClient(m Mode, url string, args ...interface{}) *SHM { - +func shmClient(m Mode, url string, args ...interface{}) (*SHM, error) { if m != Shm { - fmt.Println("this is not a shm mode: ", m) - return nil + return nil, errors.New("please use deliver.Shm mode") } - if rw, err := shm.OpenSimplex(url); err == nil { + rw, err := shm.OpenSimplex(url) + if err == nil { return &SHM{ rw, coactee, make([]byte, maxRecvSize), - } + }, nil } - fmt.Println("shmConsumer open error") - return nil + return nil, err } -- Gitblit v1.8.0