From 40d864645ff608e3a81d115fef26bba87954be2c Mon Sep 17 00:00:00 2001
From: 554325746@qq.com <554325746@qq.com>
Date: 星期一, 22 七月 2019 18:42:06 +0800
Subject: [PATCH] add log and fix pubusb
---
logger/logger.go | 2
test | 0
tasktag/tasktag.go | 8 +-
main.go | 31 +++----
sdk/sdk.go | 60 ++++++--------
util/sqlite.go | 36 +++-----
util/util.go | 8 +-
camera/camera.go | 55 +++++++------
8 files changed, 89 insertions(+), 111 deletions(-)
diff --git a/camera/camera.go b/camera/camera.go
index 5719fd4..d57ebd2 100644
--- a/camera/camera.go
+++ b/camera/camera.go
@@ -4,6 +4,7 @@
"errors"
"basic.com/valib/deliver.git"
+ "github.com/long/test/logger"
"github.com/long/test/sdk"
"github.com/long/test/util"
@@ -14,7 +15,6 @@
"fmt"
"sync"
//"time"
- "os"
)
@@ -27,10 +27,10 @@
}
func Init() {
- fmt.Println("============ camera info ====================")
+ logger.Info("============ camera info ====================")
for _, cd := range util.CameraIds {
- fmt.Println(cd)
- fmt.Println()
+ logger.Info(cd)
+ logger.Info()
}
// 鎽勫儚鏈哄垵濮嬪寲
@@ -41,7 +41,7 @@
// web绔垵濮嬪寲
CreateCamera("virtual-faceextract-sdk-pull_2" , "web")
-// go AutoDelCamera(util.Cameraflag)
+ go AutoDelCamera(util.Cameraflag)
}
// camera 鎺ュ彈鏁版嵁
@@ -51,7 +51,7 @@
socketlisten, err := NewCamerSocketListen(deliver.PushPull, id, url)
if err != nil {
- fmt.Println("create socket error")
+ logger.Error("create socket error")
return
}
@@ -63,7 +63,7 @@
func AutoDelCamera(cameraflag chan bool) {
for _ = range cameraflag {
- fmt.Println("test autodelcameraflag")
+ logger.Info("test autodelcameraflag")
var oldcamera []string
SocketManage.Range(func(k, v interface{}) bool {
@@ -74,16 +74,20 @@
})
var newcamera []string
- newcamera = append(newcamera, util.Sdklist...)
+
+ for _, camnew := range util.CameraIds {
+ newcamera = append(newcamera, camnew.Id)
+ }
+ newcamera = append(newcamera, "virtual-faceextract-sdk-pull_2")
+
cameraChanDel := util.Difference(oldcamera, newcamera)
- fmt.Println(cameraChanDel)
+ logger.Info(cameraChanDel)
for key, op := range cameraChanDel {
if op == "add" {
CreateCamera(key, "camera")
- fmt.Println("add new camera id=========================")
- os.Exit(1)
+ logger.Info("add new camera id=========================", key)
} else {
if sock, ok := SocketManage.Load(key); ok {
if socket, sok := sock.(SocketContext); sok {
@@ -91,7 +95,7 @@
SocketManage.Delete(key)
}
}
- fmt.Println("鍒犻櫎camera server : ", key)
+ logger.Info("鍒犻櫎camera server : ", key)
}
}
}
@@ -105,7 +109,7 @@
socket.Cancel = cancel
socket.Sock = deliver.NewServer(deliver.Mode(mode), url)
- fmt.Println("new socket.Sock: ", socket.Sock)
+ logger.Info("new socket.Sock: ", socket.Sock)
if socket.Sock == nil {
return socket, errors.New("create listen error")
@@ -122,37 +126,34 @@
for {
select {
case <-socket.Context.Done():
- fmt.Println("listen recv quit")
+ logger.Error("listen recv quit")
return
default:
if recvmessage, err = socket.Sock.Recv(); err != nil {
- fmt.Println("err is: ", err)
+ //logger.Error("[camera] err is: ", err)
continue
}
unmsg, err := util.UnCompress(recvmessage)
if err != nil {
- fmt.Println(err)
+ logger.Error(err)
continue
}
if err := proto.Unmarshal(unmsg,&imagemsg); err != nil {
- fmt.Println("recv msg is not protomsgImage")
+ logger.Error("recv msg is not protomsgImage")
continue
}
- // fmt.Println("============== one msg input ==========")
- // fmt.Println(imagemsg.Cid)
switch remote {
case "camera":
- fmt.Printf("=== cid: has %d task\n", len(GetAlltask(imagemsg.Cid)))
for _, taskid := range GetAlltask(imagemsg.Cid) {
//time.Sleep(5 * time.Second)
- fmt.Println("id: ", imagemsg.Cid, " taskid: ", taskid)
+ logger.Debug("id: ", imagemsg.Cid, " taskid: ", taskid)
Taskdolist(imagemsg.Cid, "", taskid, recvmessage)
}
case "web":
- fmt.Println("id: ", imagemsg.Cid , " taskid: ", "92496BDF-2BFA-98F2-62E8-96DD9866ABD2")
+ logger.Debug("id: ", imagemsg.Cid , " taskid: ", "92496BDF-2BFA-98F2-62E8-96DD9866ABD2")
Taskdolist(imagemsg.Cid,"", "92496BDF-2BFA-98F2-62E8-96DD9866ABD2", recvmessage)
}
}
@@ -175,20 +176,20 @@
func Taskdolist(cid string, caddr string, taskid string, data []byte) {
// 鏁版嵁鍔犲伐(鎵撴爣绛�)
- fmt.Printf("taskid %s: has %d data[]byte\n", taskid, len(data))
+ logger.Debug("taskid: ",taskid, "has ", len(data), "data[]byte")
sdkmsg := sdk.SdkData(cid, caddr, taskid, data)
if sdkmsg.Tasklab == nil {
- fmt.Printf("cid:%s 娌℃湁浠诲姟%s\n", cid, taskid)
+ logger.Error(cid, " not have taskid: ", taskid )
return
}
// 璁$畻鍒嗗彂鐨勪富棰�
SendTopic := sdk.SdkSendTopic(sdkmsg)
- fmt.Println(SendTopic)
+ logger.Debug(SendTopic)
if _, ok := sdk.SdkMap[SendTopic]; ok {
sdk.SdkMap[SendTopic] <- sdkmsg
- fmt.Println("dispute sendtopic success", SendTopic)
+ logger.Debug("dispute sendtopic success", SendTopic)
} else {
- fmt.Println("鍒嗗彂鐨勪富棰樹笉瀛樺湪")
+ logger.Debug("鍒嗗彂鐨勪富棰樹笉瀛樺湪")
}
}
diff --git a/logger/logger.go b/logger/logger.go
index 92f2be2..65916aa 100644
--- a/logger/logger.go
+++ b/logger/logger.go
@@ -214,4 +214,4 @@
}
me.fileFd = nil
}
-}
\ No newline at end of file
+}
diff --git a/main.go b/main.go
index 9a31d0d..55345a9 100644
--- a/main.go
+++ b/main.go
@@ -1,8 +1,6 @@
package main
import (
- "fmt"
- "log"
"net/http"
"flag"
_ "net/http/pprof"
@@ -12,24 +10,21 @@
"github.com/long/test/sdk"
"github.com/long/test/tasktag"
"github.com/long/test/util"
- //"github.com/long/test/logger"
+ "github.com/long/test/logger"
- // "github.com/long/test/camera"
- // "github.com/long/test/sdk"
- // "github.com/long/test/tasktag"
)
var initchan = make(chan bool)
-//func init(){
-// var logFile = "./taskpubsub.log"
-// var logSaveDays = 15
-//
-// // 鏃ュ織鍒濆鍖�
-// logger.Config(logFile, logger.DebugLevel)
-// logger.SetSaveDays(logSaveDays)
-// logger.Info("loginit success !")
-//}
+func init(){
+ var logFile = "./taskpubsub.log"
+ var logSaveDays = 15
+
+ // 鏃ュ織鍒濆鍖�
+ logger.Config(logFile, logger.InfoLevel)
+ logger.SetSaveDays(logSaveDays)
+ logger.Info("loginit success !")
+}
func main() {
flag.Parse()
@@ -37,14 +32,12 @@
// pprof 鐢ㄤ簬鍒嗘瀽鎬ц兘
go func() {
- log.Println(http.ListenAndServe("0.0.0.0:6061", nil))
+ logger.Info(http.ListenAndServe("0.0.0.0:6061", nil))
}()
go util.Init(initchan)
- fmt.Println("init ok !!!!, start sdk, task, camera init process ....", <-initchan)
- fmt.Println()
- fmt.Println()
+ logger.Info("init ok !!!!, start sdk, task, camera init process ....", <-initchan)
sdk.Init() // 鑾峰彇鎵�鏈夌畻娉昳d ,寤虹珛 sdk 涓婚锛� 寤虹珛sdk server(send, recv 杩愯)
tasktag.Init() // 鑾峰彇鎵�鏈変换鍔★紝寤虹珛浠诲姟鏍囩锛� 鍦ㄦ暟鎹繘鍏ユ椂锛� 鎵撴爣绛�
camera.Init() //鑾峰彇cid, taskid, sdkid ,鍏崇郴
diff --git a/sdk/sdk.go b/sdk/sdk.go
index adee5ee..e8bce98 100644
--- a/sdk/sdk.go
+++ b/sdk/sdk.go
@@ -12,6 +12,7 @@
"basic.com/pubsub/protomsg.git"
"basic.com/valib/deliver.git"
+ "github.com/long/test/logger"
)
const (
@@ -30,18 +31,18 @@
func Init() {
- fmt.Println("============= init sdk info =====================")
+ logger.Info("============= init sdk info =====================")
for _, sdkid := range util.Sdklist { // 鍒涘缓sdk server
CreatesdkTopicandServer(sdkid)
- fmt.Println()
+ logger.Info()
}
// 鎵嬪姩杈撳叆鐨勪富棰�
SdkMap["es"] = make(chan protomsg.SdkMessage)
- fmt.Println("create es channel: ")
+ logger.Info("create es channel: ")
SdkMap["virtual-faceextract-sdk-pull"] = make(chan protomsg.SdkMessage)
- fmt.Println("create virtual-faceextract-sdk-pull")
+ logger.Info("create virtual-faceextract-sdk-pull")
Createwebserver("virtual-faceextract-sdk-pull")
@@ -52,13 +53,13 @@
func CreatesdkTopicandServer(sdkid string) {
SdkMap[sdkid] = make(chan protomsg.SdkMessage)
- fmt.Println("create sdk channel: ", sdkid)
+ logger.Info("create sdk channel: ", sdkid)
url := fmt.Sprintf("ipc:///tmp/%s%s", sdkid, postPull)
socketser, err := NewSdkSocketListen(deliver.PushPull, sdkid, url)
if err != nil {
delete(SdkMap, sdkid)
- fmt.Println(sdkid, "create server error!")
+ logger.Error(sdkid, "create server error!")
return
}
@@ -69,7 +70,7 @@
socketdial, err := NewSdkSocketListen(deliver.PushPull, sdkid, url)
if err != nil {
delete(SdkMap, sdkid)
- fmt.Println(sdkid, "create dial error!")
+ logger.Error(sdkid, "create dial error!")
return
}
go Recv(socketdial)
@@ -81,7 +82,7 @@
url := fmt.Sprintf("ipc:///tmp/%s%s", webid, postPull)
socketser, err := NewSdkSocketListen(deliver.PushPull, webid, url)
if err != nil {
- fmt.Println(webid, "create server error!")
+ logger.Error(webid, "create server error!")
return
}
go Send(webid, socketser, SdkMap[webid])
@@ -90,11 +91,11 @@
func DeletesdkTopicandServer(sdkid string) {
close(SdkMap[sdkid])
delete(SdkMap, sdkid)
- fmt.Println("鍒犻櫎涓婚 sdk: ", sdkid)
+ logger.Info("鍒犻櫎涓婚 sdk: ", sdkid)
SocketManage[sdkid].Cancel()
delete(SocketManage, sdkid)
- fmt.Println("鍒犻櫎server sdk: ", sdkid)
+ logger.Info("鍒犻櫎server sdk: ", sdkid)
}
//鍗曠嫭澶勭悊 es 涓婚鐨勬儏鍐�
@@ -102,7 +103,7 @@
for {
select {
case <-SdkMap["es"]:
- fmt.Println("es finanl sdk!")
+ //logger.Info("es finanl sdk!")
}
}
}
@@ -111,7 +112,7 @@
func AutoDelSdk(sdkflag chan bool) {
for _ = range sdkflag {
- fmt.Println("test autodelsdk")
+ logger.Info("test autodelsdk")
var oldSdk []string
for key, _ := range SdkMap {
oldSdk = append(oldSdk, key)
@@ -119,20 +120,19 @@
util.Sdklist = append(util.Sdklist, "es")
util.Sdklist = append(util.Sdklist, "virtual-faceextract-sdk-pull")
sdkChanDel := util.Difference(oldSdk, util.Sdklist)
- fmt.Println(sdkChanDel)
+ logger.Info(sdkChanDel)
for key, op := range sdkChanDel {
if op == "add" {
CreatesdkTopicandServer(key)
} else {
DeletesdkTopicandServer(key)
- fmt.Println("鍒犻櫎涓婚 sdk: ", key)
+ logger.Info("鍒犻櫎涓婚 sdk: ", key)
}
}
}
}
-//涓婚
//sdk鏁版嵁 鍔犲伐鍣�
func SdkData(cid string,caddr string, taskid string, data []byte) protomsg.SdkMessage {
@@ -148,7 +148,6 @@
sdkmsg.Tasklab = val.(*protomsg.TaskLabel)
sdkmsg.Data = data
}
- //sdkmsg.Tasklab = tasktag.TaskMapLab[taskid]
return sdkmsg
}
@@ -159,18 +158,17 @@
} else {
sdksend = "es"
}
- fmt.Printf("鍒嗗彂鐨勪富棰樻槸锛�%s 浣嶇疆 %d/%d\n ", sdksend, int(sdkmsg.Tasklab.Index)+1, len(sdkmsg.Tasklab.Sdkinfos))
+ logger.Debug("鍒嗗彂鐨勪富棰�: ", sdksend , "浣嶇疆锛�", int(sdkmsg.Tasklab.Index)+1,"/", len(sdkmsg.Tasklab.Sdkinfos))
return
}
// create server
func NewSdkSocketListen(mode int, sdkid string, url string) (socket SocketContext, err error) {
- fmt.Println("url is: ", url)
+ logger.Info("url is: ", url)
ctx, cancel := context.WithCancel(context.Background())
socket.Context = ctx
socket.Cancel = cancel
-
socket.Sock = deliver.NewServer(deliver.Mode(mode), url)
if socket.Sock == nil {
@@ -182,7 +180,7 @@
}
func NewSdkSocketDial(mode int, sdkid string, url string) (sid string, socket SocketContext, err error) {
- fmt.Println("url is: ", url)
+ logger.Info("url is: ", url)
ctx, cancel := context.WithCancel(context.Background())
socket.Context = ctx
@@ -203,17 +201,15 @@
for {
select {
case <-socket.Context.Done():
- fmt.Println("socket close")
+ logger.Info("socket close")
return
default:
if msg, err := socket.Sock.Recv(); err != nil {
- //fmt.Printf("%s ", err)
continue
} else {
err = proto.Unmarshal(msg, &repsdkmsg)
- fmt.Println("receive len: ", len(msg))
if err != nil {
- fmt.Println("unmarshal error: ", err)
+ logger.Error("unmarshal error: ", err)
continue
}
repsdkmsg.Tasklab.Index++
@@ -226,33 +222,27 @@
}
func Send(sdkid string, socket SocketContext, in chan protomsg.SdkMessage) {
-// var v *protomsg.SdkMessage
-// var ok bool
for {
select {
case <-socket.Context.Done():
- fmt.Println("socket is close")
+ logger.Info("socket is close")
return
case v, ok := <-in:
if ok {
data, err :=v.Marshal()
if err != nil {
- fmt.Println("proto marshal error ", err)
+ logger.Error("proto marshal error ", err)
continue
}
- fmt.Printf("浠庣閬搒dkid=%s 鎺ュ彈鏁版嵁 %d\n", sdkid, len(data))
- fmt.Println()
-
if err := socket.Sock.Send(data); err != nil {
- fmt.Println(socket.Sock)
- fmt.Println("failed send")
+ logger.Error("failed send")
continue
}
- fmt.Printf("sdkid = %s ,send success:%d \n", sdkid, len(data))
+ logger.Debug(sdkid, " send success: ", len(data))
} else {
- fmt.Println(sdkid, " 涓婚鍏抽棴, 鍏抽棴send()")
+ logger.Debug(sdkid, " 涓婚鍏抽棴, 鍏抽棴send()")
return
}
}
diff --git a/tasktag/tasktag.go b/tasktag/tasktag.go
index d8a7540..9f54083 100644
--- a/tasktag/tasktag.go
+++ b/tasktag/tasktag.go
@@ -1,23 +1,23 @@
package tasktag
import (
- "fmt"
"sync"
"basic.com/pubsub/protomsg.git"
"github.com/long/test/util"
+ "github.com/long/test/logger"
)
var TaskMapLab sync.Map
func Init() {
- fmt.Println("============= init tasktag info =====================")
+ logger.Info("============= init tasktag info =====================")
GenTaskMap()
go func(taskflag chan bool) {
for _ = range taskflag {
GenTaskMap()
- fmt.Println("update task finished!")
+ logger.Info("update task finished!")
}
}(util.TaskSdkflag)
@@ -43,7 +43,7 @@
}
GenTasklab(tls)
TaskMapLab.Range(func(k, v interface{}) bool {
- fmt.Println(k, v)
+ logger.Info(k, v)
return true
})
}
diff --git a/test b/test
index b694585..2f485d4 100755
--- a/test
+++ b/test
Binary files differ
diff --git a/util/sqlite.go b/util/sqlite.go
index 958aff5..5b3c4d0 100644
--- a/util/sqlite.go
+++ b/util/sqlite.go
@@ -1,14 +1,13 @@
package util
import (
- "fmt"
"flag"
"basic.com/pubsub/protomsg.git"
"basic.com/dbapi.git"
"github.com/gogo/protobuf/proto"
"basic.com/valib/gopherdiscovery.git"
- "os"
+ "github.com/long/test/logger"
)
/*************************
@@ -32,7 +31,7 @@
var Sdkinfos []protomsg.Sdk
var urlServer = flag.String("urlServer","tcp://127.0.0.1:40007","heartbeat address of url server")
-var urlPubSub = flag.String("urlPubsub","tcp://127.0.0.1:5007", "heartbeat pubsub address of url server")
+var urlPubSub = flag.String("urlPubsub","tcp://127.0.0.1:50007", "heartbeat pubsub address of url server")
var dbip = flag.String("dbip","127.0.0.1","address of database ip")
var dbport = flag.Int("dbport", 8001, "port of database port")
@@ -48,9 +47,9 @@
func processinit(initchan chan bool) {
CameraIds = camval.FindAll()
- fmt.Println("==============camera camera with task ================")
+ logger.Info("==============camera camera with task ================")
CameraTasks = camval.FindAllCameraAndTask()
- fmt.Println(CameraTasks)
+ logger.Info(CameraTasks)
TaskSdks = taskapi.FindAllTaskSdkRun()
@@ -61,55 +60,50 @@
func Getdata(opt []byte) {
if err := proto.Unmarshal(opt, newsdkmsg); err != nil {
- fmt.Println("publichshMessage ", err)
+ logger.Error("publichshMessage ", err)
return
}
switch newsdkmsg.Table {
case protomsg.TableChanged_T_Camera:
- fmt.Println("update camera")
- os.Exit(1)
+ logger.Info("update camera")
CameraIds = camval.FindAll()
Cameraflag <- true
+ logger.Info("update camera finish.")
case protomsg.TableChanged_T_CameraTask:
- fmt.Println("update cameratask")
- os.Exit(1)
+ logger.Info("update cameratask")
CameraTasks = camval.FindAllCameraAndTask()
- fmt.Println("update cameratask finished!")
+ logger.Info("update cameratask finished!")
case protomsg.TableChanged_T_TaskSdk:
- fmt.Println("update tasksdk")
- os.Exit(1)
+ logger.Info("update tasksdk")
TaskSdks = taskapi.FindAllTaskSdkRun()
TaskSdkflag <- true
case protomsg.TableChanged_T_Sdk:
- fmt.Println("update sdk")
- os.Exit(1)
+ logger.Info("update sdk")
Sdklist = sdkapi.GetAllSdkIds()
Sdkinfos = sdkapi.FindAll("")
Sdkflag <- true
- fmt.Println("update finished!")
default:
- fmt.Println("unknow type operation")
- os.Exit(1)
+ logger.Info("unknow type operation")
}
- fmt.Println(newsdkmsg)
+ logger.Info(newsdkmsg)
}
func Init(initchan chan bool) {
dbapi.Init(*dbip, *dbport)
clientOne, _ := gopherdiscovery.ClientWithSub(*urlServer, *urlPubSub, "ip:local")
recvinit := clientOne.HeartBeatMsg()
- fmt.Println(<-recvinit)
+ _ = <-recvinit
+
processinit(initchan)
peers, _ := clientOne.Peers()
for x := range peers {
Getdata(x)
- os.Exit(1)
}
}
diff --git a/util/util.go b/util/util.go
index fbddcdb..6f49777 100644
--- a/util/util.go
+++ b/util/util.go
@@ -2,7 +2,7 @@
import(
"github.com/pierrec/lz4"
- "fmt"
+ "github.com/long/test/logger"
)
@@ -44,7 +44,7 @@
out := make([]byte, 10*len(in))
n, err := lz4.UncompressBlock(in, out)
if err != nil {
- fmt.Println("uncompress error: ", err)
+ logger.Error("uncompress error: ", err)
return nil, err
}
out = out[:n] // uncompressed data
@@ -57,11 +57,11 @@
ht := make([]int, 64<<10) // buffer for the compression table
n, err := lz4.CompressBlock(in, out, ht)
if err != nil {
- fmt.Println("compress: ", err)
+ logger.Error("compress: ", err)
return nil, err
}
if n >= len(in) {
- fmt.Println("image is not compressible")
+ logger.Error("image is not compressible")
}
out = out[:n] // compressed data
return out, nil
--
Gitblit v1.8.0