package main import ( "flag" "fmt" "net" "strconv" "time" "andriodServer/esutil" "andriodServer/extend/config" "andriodServer/log" "github.com/streadway/amqp" ) var addr = flag.String("addr", "0.0.0.0", "The address to listen to") var Eurl = flag.String("eurl", "http://192.168.1.182:9200/", "The port to listen on") var Picurl = flag.String("picurl", "http://58.118.225.79:41242/", "picture url") var port = flag.Int("port", 6000, "The port to listen on") var sec = flag.Int("sec", 10, "the second for query data") var Level = flag.String("level","ErrorLevel","log level") var IsHub = flag.String("hub", "hub", "hub is personIsHub=1") var Size = flag.Int("size", 100, "size default is 100") var env = flag.String("env", "config", "env set") var mqIp = flag.String("mqIp", "172.17.50.245", "default mq ip") var mqPort = flag.Int("mqPort", 5672, "default mq port") var mqUser = flag.String("mqUser", "basic", "default rabbitmq user") var mqPass = flag.String("mqPass", "basic", "default rabbitmq pass") func main() { flag.Parse() log.SetLogLevel(*Level) config.Init(*env) fmt.Println(*port) //src := *addr + ":" + strconv.Itoa(*port) //listener, err := net.Listen("tcp", src) //if err != nil { // log.Log.Errorln(err) // return //} //log.Log.Infof("Listening on %s.\n", src) //fmt.Println("starting server success.") //defer listener.Close() //connArr:=make([]net.Conn,0) //for { // conn, err := listener.Accept()// // // connArr = append(connArr,conn) // if err != nil { // log.Log.Infoln("some connecion error: ", err) // } // go handleConnection(conn,connArr) //} mqAddr := "amqp://" + *mqUser + ":" + *mqPass + "@" + *mqIp + ":" + strconv.Itoa(*mqPort)+"/" conn, err := amqp.Dial(mqAddr) if err != nil { log.Log.Infof("Failed to connect to RabbitMQ,err:",err) return } defer conn.Close() ch, err := conn.Channel() if err !=nil { log.Log.Infof("Failed to open a channel,err:",err) return } defer ch.Close() q, err := ch.QueueDeclare( "alarm2Android", // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) if err !=nil { log.Log.Infof("Failed to declare a queue,err:",err) return } handleAlarmData2MQ(q, ch) } func handleAlarmData2MQ(queue amqp.Queue,ch *amqp.Channel) { tick := time.NewTicker(3 * time.Second) lastTime := time.Now() for { select { case <-tick.C: curTime := time.Now() alarmData := esutil.PostAction(*sec, *Eurl, *Picurl, *IsHub, *Size, lastTime, curTime) if alarmData != nil { err := ch.Publish( "", queue.Name, false, false, amqp.Publishing{ ContentType: "text/plain", Body: alarmData, }) if err !=nil { log.Log.Infof("send to mq err:",err) } else { log.Log.Infof("send to mq success,len(body):",len(alarmData)) } } lastTime = curTime } } } func handleConnection(conn net.Conn, connArr []net.Conn) { remoteAddr := conn.RemoteAddr().String() log.Log.Infoln("Client connected from ", remoteAddr) ech := make(chan error) go func(conn net.Conn, ech chan error) { buf := make([]byte, 10) readMsg, err := conn.Read(buf) log.Log.Infoln("Read completed,readMsg:",readMsg,",err:",err) ech <- err }(conn, ech) tick := time.NewTicker(3 * time.Second) lastTime := time.Now() for { select { case <-tick.C: curTime := time.Now() if !handleMessage(conn, connArr, lastTime, curTime){ conn.Close() return } lastTime = curTime case err := <-ech: log.Log.Infoln(err, "remoteAddr ", remoteAddr, " close") conn.Close() return } } log.Log.Infoln("Client at " + remoteAddr + " disconnected.") } func handleMessage(conn net.Conn, connArr []net.Conn,lastT time.Time, curTime time.Time) bool { jsonstring := esutil.PostAction(*sec, *Eurl, *Picurl, *IsHub, *Size, lastT, curTime) if jsonstring == nil { log.Log.Infoln("the data is nil,remoteArr:",conn.RemoteAddr()) if _, err := conn.Write([]byte("\000"));err !=nil { log.Log.Infoln("conn.WriteErr:",err) return false } else { return true } } jsonstring = append(jsonstring, []byte("\000")...) log.Log.Infoln("jsonstring len: ", len(jsonstring), "\000 data: ", len("\000")) _, err := conn.Write(jsonstring) if err !=nil{ log.Log.Infoln("conn.WriteErr:",err) return false } return true }