package service
|
|
import (
|
"basic.com/pubsub/esutil.git"
|
"bytes"
|
"data_msg_push_server/config"
|
"data_msg_push_server/model"
|
"data_msg_push_server/util"
|
"encoding/json"
|
"fmt"
|
"io/ioutil"
|
"net/http"
|
)
|
|
func ConnectControl() bool {
|
url := config.ServUrls.ServerUrl
|
resp, err := http.Get(url)
|
if err != nil {
|
fmt.Println("连接失败", err)
|
return false
|
}
|
defer resp.Body.Close()
|
|
fmt.Println("连接成功!")
|
return true
|
}
|
|
func GetTotal() (total int, err error) {
|
url := "http://" + config.ServUrls.EsUrl + "/ai_ocean/_search"
|
queryDSL := `{
|
"size": "1",
|
"query": {
|
"bool": {
|
"filter": [
|
{
|
"term": {
|
"isDelete": false
|
}
|
}
|
]
|
}
|
},
|
"sort": [
|
{
|
"updateTime": {
|
"order": "asc"
|
}
|
}
|
]
|
}`
|
buf, err := esutil.EsReq("POST", url, []byte(queryDSL))
|
if err != nil {
|
return 0, err
|
}
|
totals, err := util.SourceTotal(buf)
|
if err != nil {
|
return 0, err
|
}
|
if totals == 0 {
|
return 0, nil
|
}
|
return totals, nil
|
}
|
|
func GetData() (interface{}, error) {
|
//fmt.Println(config.ServUrls.EsUrl)
|
//fmt.Println(config.ServUrls.ServerUrl)
|
url := "http://" + config.ServUrls.EsUrl + "/ai_ocean/_search"
|
queryDSL := `{
|
"size": "1",
|
"query": {
|
"bool": {
|
"filter": [
|
{
|
"term": {
|
"isDelete": false
|
}
|
}
|
]
|
}
|
},
|
"sort": [
|
{
|
"updateTime": {
|
"order": "asc"
|
}
|
}
|
]
|
}`
|
//fmt.Println(url)
|
//fmt.Println(queryDSL)
|
buf, err := esutil.EsReq("POST", url, []byte(queryDSL))
|
if err != nil {
|
return nil, err
|
}
|
source, err := util.Sourcelist(buf)
|
if err != nil {
|
return nil, err
|
}
|
if len(source) == 0 {
|
return nil, nil
|
}
|
picMaxImages := make([][]byte, 0) // 存储图片数据的数组
|
if source[0]["picMaxUrl"].([]interface{}) != nil{
|
//picMaxUrls := source[0]["picMaxUrl"].([]interface{})
|
if len(source[0]["picMaxUrl"].([]interface{})) > 0 {
|
for _, picMaxUrl := range source[0]["picMaxUrl"].([]interface{}) {
|
picMaxImageData, err := GetImageData("http://" + picMaxUrl.(string))
|
if err != nil {
|
fmt.Println("获取大图数据失败:", err)
|
continue
|
}
|
picMaxImages = append(picMaxImages, picMaxImageData)
|
}
|
fmt.Println("图片数据数组大小:", len(picMaxImages))
|
}
|
}
|
|
picSmImages := make([][]byte, 0) // 存储图片数据的数组
|
if source[0]["targetInfo"].(interface{}) != nil {
|
for _, target := range source[0]["targetInfo"].([]interface{}) {
|
info := target.(map[string]interface{})
|
picSmUrl := info["picSmUrl"].(string)
|
if picSmUrl == "" {
|
continue
|
}
|
picSmImageData, err := GetImageData("http://" + picSmUrl)
|
if err != nil {
|
fmt.Println("获取图片数据失败:", err)
|
continue
|
}
|
picSmImages = append(picSmImages, picSmImageData)
|
}
|
}
|
model.PushDataInfo.SourceData = source[0]
|
model.PushDataInfo.PicMaxImages = picMaxImages
|
model.PushDataInfo.PicSmImages = picSmImages
|
//fmt.Println(model.PushDataInfo.SourceData)
|
return model.PushDataInfo, nil
|
}
|
|
func SendData(pushDataInfo interface{}, url string) (id string, err error) {
|
id = ""
|
payload, err := json.Marshal(pushDataInfo)
|
if err != nil {
|
return id, err
|
}
|
|
req, err := http.NewRequest("POST", url, bytes.NewBuffer(payload))
|
if err != nil {
|
return id, err
|
}
|
|
client := &http.Client{}
|
resp, err := client.Do(req)
|
if err != nil {
|
return id, err
|
}
|
defer resp.Body.Close()
|
|
var responseData map[string]interface{}
|
err = json.NewDecoder(resp.Body).Decode(&responseData)
|
if err != nil {
|
return id, err
|
}
|
id = responseData["data"].(string)
|
// 处理响应
|
// 这里可以根据实际需求进行处理,例如读取响应内容或检查状态码等
|
|
return id, nil
|
|
}
|
|
func DeleteData(id string) bool {
|
url := "http://" + config.ServUrls.EsUrl + "/ai_ocean/_delete_by_query?refresh=true"
|
deleteDSL := `{
|
"query":{
|
"bool":{
|
"filter":[{
|
"term":{
|
"id":"` + id + `"
|
}
|
}]
|
}
|
}
|
}`
|
buf, err := esutil.EsReq("POST", url, []byte(deleteDSL))
|
if err != nil {
|
fmt.Println(err)
|
}
|
deleted, err := util.SourceDeleted(buf)
|
if err != nil {
|
fmt.Println(err)
|
}
|
if deleted >= 1 {
|
return true
|
}
|
return false
|
}
|
|
func MarkData(id string) bool {
|
url := "http://" + config.ServUrls.EsUrl + "/ai_ocean/_update_by_query?refresh=true"
|
markDSL := `{
|
"script": {
|
"source": "ctx._source.isDelete=true"
|
},
|
"query": {
|
"term": {
|
"id": "` + id + `"
|
}
|
}
|
}`
|
buf, err := esutil.EsReq("POST", url, []byte(markDSL))
|
if err != nil {
|
fmt.Println(err)
|
return false
|
}
|
total, err := util.SourceUpdated(buf)
|
if err != nil {
|
fmt.Println(err)
|
return false
|
}
|
fmt.Println(total)
|
return true
|
}
|
|
func GetImageData(url string) ([]byte, error) {
|
// 发起HTTP GET请求
|
response, err := http.Get(url)
|
if err != nil {
|
return nil, err
|
}
|
defer response.Body.Close()
|
|
// 读取图片数据
|
imageData, err := ioutil.ReadAll(response.Body)
|
if err != nil {
|
return nil, err
|
}
|
|
return imageData, nil
|
}
|