package websocketx
|
|
import (
|
"errors"
|
"github.com/gorilla/websocket"
|
"sync"
|
"time"
|
)
|
|
type Connection struct {
|
wsConnect *websocket.Conn
|
inChan chan []byte
|
outChan chan []byte
|
closeChan chan byte
|
|
mutex sync.Mutex // 对closeChan关闭上锁
|
isClosed bool // 防止closeChan被关闭多次
|
}
|
|
func InitConnection(wsConn *websocket.Conn) (conn *Connection, err error) {
|
conn = &Connection{
|
wsConnect: wsConn,
|
inChan: make(chan []byte, 1000),
|
outChan: make(chan []byte, 1000),
|
closeChan: make(chan byte, 1),
|
}
|
// 启动读协程
|
go conn.readLoop()
|
// 启动写协程
|
go conn.writeLoop()
|
return
|
}
|
|
func (conn *Connection) ReadMessage() (data []byte, err error) {
|
select {
|
case data = <-conn.inChan:
|
case <-conn.closeChan:
|
err = errors.New("connection is closeed")
|
}
|
return
|
}
|
|
func (conn *Connection) WriteMessage(data []byte) (err error) {
|
select {
|
case conn.outChan <- data:
|
case <-conn.closeChan:
|
err = errors.New("connection is closeed")
|
}
|
return
|
}
|
|
func (conn *Connection) Close() {
|
// 线程安全,可多次调用
|
conn.wsConnect.Close()
|
// 利用标记,让closeChan只关闭一次
|
conn.mutex.Lock()
|
if !conn.isClosed {
|
close(conn.closeChan)
|
conn.isClosed = true
|
}
|
conn.mutex.Unlock()
|
}
|
|
// 内部实现
|
func (conn *Connection) readLoop() {
|
var (
|
data []byte
|
err error
|
)
|
for {
|
_ = conn.wsConnect.SetReadDeadline(time.Now().Add(time.Minute))
|
if _, data, err = conn.wsConnect.ReadMessage(); err != nil {
|
goto ERR
|
}
|
_ = conn.wsConnect.SetReadDeadline(time.Time{})
|
|
//阻塞在这里,等待inChan有空闲位置
|
select {
|
case conn.inChan <- data:
|
case <-conn.closeChan: // closeChan 感知 conn断开
|
goto ERR
|
}
|
}
|
|
ERR:
|
conn.Close()
|
}
|
|
func (conn *Connection) writeLoop() {
|
var (
|
data []byte
|
err error
|
)
|
|
for {
|
select {
|
case data = <-conn.outChan:
|
case <-conn.closeChan:
|
goto ERR
|
}
|
if err = conn.wsConnect.WriteMessage(websocket.BinaryMessage, data); err != nil {
|
goto ERR
|
}
|
}
|
ERR:
|
conn.Close()
|
}
|