fix
zhangqian
2023-12-01 8324f872ef3a4d0c978a9b1d062800c6a1701c12
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package websocketx
 
import (
    "errors"
    "github.com/gorilla/websocket"
    "sync"
    "time"
)
 
type Connection struct {
    wsConnect *websocket.Conn
    inChan    chan []byte
    outChan   chan []byte
    closeChan chan byte
 
    mutex    sync.Mutex // 对closeChan关闭上锁
    isClosed bool       // 防止closeChan被关闭多次
}
 
func InitConnection(wsConn *websocket.Conn) (conn *Connection, err error) {
    conn = &Connection{
        wsConnect: wsConn,
        inChan:    make(chan []byte, 1000),
        outChan:   make(chan []byte, 1000),
        closeChan: make(chan byte, 1),
    }
    // 启动读协程
    go conn.readLoop()
    // 启动写协程
    go conn.writeLoop()
    return
}
 
func (conn *Connection) ReadMessage() (data []byte, err error) {
    select {
    case data = <-conn.inChan:
    case <-conn.closeChan:
        err = errors.New("connection is closeed")
    }
    return
}
 
func (conn *Connection) WriteMessage(data []byte) (err error) {
    select {
    case conn.outChan <- data:
    case <-conn.closeChan:
        err = errors.New("connection is closeed")
    }
    return
}
 
func (conn *Connection) Close() {
    // 线程安全,可多次调用
    conn.wsConnect.Close()
    // 利用标记,让closeChan只关闭一次
    conn.mutex.Lock()
    if !conn.isClosed {
        close(conn.closeChan)
        conn.isClosed = true
    }
    conn.mutex.Unlock()
}
 
// 内部实现
func (conn *Connection) readLoop() {
    var (
        data []byte
        err  error
    )
    for {
        _ = conn.wsConnect.SetReadDeadline(time.Now().Add(time.Minute))
        if _, data, err = conn.wsConnect.ReadMessage(); err != nil {
            goto ERR
        }
        _ = conn.wsConnect.SetReadDeadline(time.Time{})
 
        //阻塞在这里,等待inChan有空闲位置
        select {
        case conn.inChan <- data:
        case <-conn.closeChan: // closeChan 感知 conn断开
            goto ERR
        }
    }
 
ERR:
    conn.Close()
}
 
func (conn *Connection) writeLoop() {
    var (
        data []byte
        err  error
    )
 
    for {
        select {
        case data = <-conn.outChan:
        case <-conn.closeChan:
            goto ERR
        }
        if err = conn.wsConnect.WriteMessage(websocket.BinaryMessage, data); err != nil {
            goto ERR
        }
    }
ERR:
    conn.Close()
}