| | |
| | | package util |
| | | |
| | | import( |
| | | "github.com/pierrec/lz4" |
| | | "github.com/long/taskpubsub/logger" |
| | | ) |
| | | import ( |
| | | "context" |
| | | "errors" |
| | | |
| | | "basic.com/valib/deliver.git" |
| | | |
| | | "basic.com/valib/logger.git" |
| | | ) |
| | | |
| | | const ( |
| | | // 共享内存发送/接收失败shmMaxTryCount次重新创建通道尝试 |
| | | ShmMaxTryCount = 7 |
| | | // 不论共享内存/mangos发送/接收失败最大次数,认为当前通道不再使用, 关闭 |
| | | // 有问题,摄像机长时间断网后重连,但是通道关闭 |
| | | tooLongFailed = 300 |
| | | |
| | | File_Img_Id_Pre = "img_" |
| | | File_Video_Id_Pre = "video_" |
| | | File_Audio_Id_Pre = "audio_" |
| | | ) |
| | | |
| | | type SocketContext struct { |
| | | Sock deliver.Deliver |
| | | Context context.Context |
| | | Cancel context.CancelFunc |
| | | |
| | | Mode int |
| | | URL string |
| | | UseSHM bool |
| | | } |
| | | |
| | | // 1. oldstring element is not in new : abandon(delete) |
| | | // 2. new element is not in oldstring : add(add) |
| | |
| | | |
| | | // UnCompress uncompress |
| | | func UnCompress(in []byte) ([]byte, error) { |
| | | out := make([]byte, 10*len(in)) |
| | | n, err := lz4.UncompressBlock(in, out) |
| | | if err != nil { |
| | | logger.Error("uncompress error: ", err) |
| | | return nil, err |
| | | } |
| | | out = out[:n] // uncompressed data |
| | | return out, nil |
| | | return in, nil |
| | | |
| | | // out := make([]byte, 3*len(in)) |
| | | // n, err := lz4.UncompressBlock(in, out) |
| | | // if err != nil { |
| | | // logger.Error("uncompress error: ", err) |
| | | // return nil, err |
| | | // } |
| | | // out = out[:n] // uncompressed data |
| | | // return out, nil |
| | | } |
| | | |
| | | // Compress compress |
| | | func Compress(in []byte) ([]byte, error) { |
| | | out := make([]byte, len(in)) |
| | | ht := make([]int, 64<<10) // buffer for the compression table |
| | | n, err := lz4.CompressBlock(in, out, ht) |
| | | if err != nil { |
| | | logger.Error("compress: ", err) |
| | | return nil, err |
| | | } |
| | | if n >= len(in) { |
| | | logger.Error("image is not compressible") |
| | | } |
| | | out = out[:n] // compressed data |
| | | return out, nil |
| | | return in, nil |
| | | |
| | | // out := make([]byte, len(in)) |
| | | // ht := make([]int, 64<<10) // buffer for the compression table |
| | | // n, err := lz4.CompressBlock(in, out, ht) |
| | | // if err != nil { |
| | | // logger.Error("compress: ", err) |
| | | // return nil, err |
| | | // } |
| | | // if n >= len(in) { |
| | | // logger.Error("image is not compressible") |
| | | // } |
| | | // out = out[:n] // compressed data |
| | | // return out, nil |
| | | } |
| | | |
| | | // create server |
| | | func NewSocketListen(mode int, url string, shm bool) (socket SocketContext, err error) { |
| | | logger.Info("url is: ", url) |
| | | ctx, cancel := context.WithCancel(context.Background()) |
| | | |
| | | socket.Context = ctx |
| | | socket.Cancel = cancel |
| | | |
| | | socket.Mode = mode |
| | | socket.URL = url |
| | | socket.UseSHM = shm |
| | | |
| | | if shm { |
| | | var err error |
| | | socket.Sock, err = deliver.NewServerWithError(deliver.Mode(mode), url) |
| | | if err != nil { |
| | | logger.Info("TASK CREATE SERVER SHM ERROR: ", err) |
| | | } |
| | | } else { |
| | | socket.Sock = deliver.NewServer(deliver.Mode(mode), url) |
| | | } |
| | | |
| | | if socket.Sock == nil { |
| | | return socket, errors.New("create listen error") |
| | | } |
| | | |
| | | return socket, nil |
| | | } |
| | | |
| | | func NewSocketDial(mode int, url string) (socket SocketContext, err error) { |
| | | logger.Info("url is: ", url) |
| | | ctx, cancel := context.WithCancel(context.Background()) |
| | | |
| | | socket.Context = ctx |
| | | socket.Cancel = cancel |
| | | |
| | | socket.Sock = deliver.NewClient(deliver.Mode(mode), url) |
| | | |
| | | if socket.Sock == nil { |
| | | return socket, errors.New("create listen error") |
| | | } |
| | | |
| | | return socket, nil |
| | | } |
| | | |
| | | func MaybeRestartSocket(socket SocketContext, tryCnt *int) SocketContext { |
| | | if socket.UseSHM { |
| | | if *tryCnt > ShmMaxTryCount { |
| | | logger.Info("SDK SEND SHM TRY :", ShmMaxTryCount, " RESTART IT") |
| | | |
| | | socket.Sock.Close() |
| | | newSocket, err := NewSocketListen(socket.Mode, socket.URL, socket.UseSHM) |
| | | if err != nil { |
| | | return socket |
| | | } |
| | | |
| | | *tryCnt = 0 |
| | | |
| | | return newSocket |
| | | } |
| | | } |
| | | |
| | | return socket |
| | | } |