package com.basic.analy.service.impl;
|
|
import java.io.StringReader;
|
import java.text.SimpleDateFormat;
|
import java.util.ArrayList;
|
|
import java.util.List;
|
import java.util.Map;
|
|
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ForkJoinPool;
|
import java.util.concurrent.Future;
|
|
import com.alibaba.fastjson.JSONArray;
|
import com.alibaba.fastjson.JSONReader;
|
import com.basic.analy.service.SearchManService;
|
import com.basic.analy.utils.EsThreadUtils;
|
import com.cloud.common.utils.RestTemplateUtil;
|
import org.apache.log4j.Logger;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.http.MediaType;
|
import org.springframework.stereotype.Service;
|
import com.alibaba.fastjson.JSONObject;
|
|
|
@Service
|
public class SearchServiceImpl implements SearchManService {
|
|
@Autowired
|
private RestTemplateUtil restTemplateUtil;
|
|
private static final String SEARCH_INDEX = "videopersons";
|
|
private static final int SCROLL_SIZE = 8000;
|
|
private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
|
private String ipField = "picIp";
|
|
@Value("${es.es_network_host}")
|
private String es_network_host; //es检索ip
|
@Value("${es.es_network_http_port}")
|
private String es_network_http_port; //es检索端口
|
@Value("${es_index_persons}")
|
private String PERSON_PIC_INDEX = "";
|
|
private Logger log = Logger.getLogger(SearchServiceImpl.class);
|
|
/**
|
* @param index
|
* @param fields
|
* @param startDate
|
* @param endDate
|
* @param dateField
|
* @return
|
*/
|
public List<Map<String,Object>> searchScrollData(String index,String fields,String startDate,String endDate, String dateField, int liker, byte[] faceData ) {
|
String url = es_network_host+":"+es_network_http_port+"/"+index+"/_search?scroll=1m";
|
String json = "";
|
// log.info("检索路径为"+url);
|
//Post方式提交Json字符串,按照指定币种,指定时间点查询
|
if(startDate!=null && endDate!=null){
|
json = "{\"size\": "+ SCROLL_SIZE +",\"query\":{\"bool\": {\"must\": ["
|
+"{\"range\": {\""+dateField+"\": "
|
+ "{\"gte\": \""+startDate+"\",\"lte\": \""+endDate+"\"}}}]}}}";
|
}else{
|
json = "{\"size\": "+ SCROLL_SIZE+",\"query\": {\"bool\": {}}}";
|
}
|
// log.info("ScrollData service=======>startDate:"+startDate+",endDate:"+endDate+",dateField:"+dateField);
|
long start = System.currentTimeMillis();
|
log.info("scroll 查询 json:"+json);
|
JSONObject jsonObject = JSONObject.parseObject(json);
|
String resultJson = restTemplateUtil.post(url, jsonObject, MediaType.APPLICATION_JSON_UTF8, false);
|
|
log.info("第一次scroll查询耗时:"+ (System.currentTimeMillis()-start));
|
List<Map<String,Object>> jsonParseListData = compareEsDataStr(resultJson, liker, faceData);
|
|
long end = System.currentTimeMillis();
|
log.info("结束时间包含转换: "+end+"\n用时:"+(end-start));
|
return jsonParseListData;
|
}
|
|
private List<JSONObject> fastjsonStr2List(String resultJson){
|
List<JSONObject> rsList = new ArrayList<>();
|
|
JSONReader reader = new JSONReader(new StringReader(resultJson));//以流的形式处理
|
reader.startArray();
|
JSONObject jsonObject = null;
|
while (reader.hasNext()){
|
rsList.add(reader.readObject(JSONObject.class));
|
// jsonObject = new JSONObject();
|
// while (reader.hasNext()){
|
// String arrListItemKey = reader.readString();
|
// String arrListItemValue = reader.readObject().toString();
|
// jsonObject.put(arrListItemKey, arrListItemValue);
|
// }
|
// rsList.add(jsonObject);
|
// reader.endObject();
|
}
|
reader.endArray();
|
return rsList;
|
}
|
|
// 利用fork比对ES每次scroll查询到的数据
|
public List<Map<String, Object>> compareEsDataStr(String resultJson, int liker, byte[] faceData) {
|
List<Map<String, Object>> resultList = new ArrayList<>();
|
|
ForkJoinPool forkJoinPool = new ForkJoinPool();
|
String url = es_network_host+":"+es_network_http_port+"/_search/scroll";
|
|
JSONObject jsonEsData = JSONObject.parseObject(resultJson);
|
String scrollId = jsonEsData.getString("_scroll_id");
|
JSONObject jsonHits = jsonEsData.getJSONObject("hits");
|
int total = Integer.valueOf(jsonHits.getString("total"));
|
long parseTimeStart = System.currentTimeMillis();
|
// List<JSONObject> hitArray = fastjsonStr2List(jsonHits.getString("hits"));
|
List<JSONObject> hitArray = JSONArray.parseArray(jsonHits.getString("hits"), JSONObject.class);//第一次scroll取到的数据
|
log.info("hits中数组转List<JSONObject>耗时:"+(System.currentTimeMillis() - parseTimeStart));
|
// String scrollId[] = returnTwoBySplit(resultJson);
|
log.info("ES返回total:"+ total);
|
if(total > 0){
|
// times = total / SCROLL_SIZE;
|
// if(total % SCROLL_SIZE > 0)
|
// times++;
|
|
// ArrayList<EsThreadUtils> subTasks = new ArrayList<EsThreadUtils>();
|
do {
|
// EsThreadUtils task = new EsThreadUtils(resultJson,null);
|
// subTasks.add(task);
|
// task.fork();
|
if(hitArray.size() > 0){
|
Future<List<Map<String, Object>>> future = forkJoinPool.submit(new EsThreadUtils(hitArray, liker, faceData));
|
|
try {
|
List<Map<String, Object>> futureMaps = future.get();
|
if(futureMaps !=null && futureMaps.size()>0){
|
for(Map futureMap : futureMaps){
|
resultList.add(futureMap);
|
}
|
}
|
} catch (InterruptedException e) {
|
e.printStackTrace();
|
} catch (ExecutionException e) {
|
e.printStackTrace();
|
}
|
|
|
String json = "{\"scroll\":\"1m\",\"scroll_id\" :\""+scrollId+"\"}";
|
long start = System.currentTimeMillis();
|
JSONObject jsonObject = JSONObject.parseObject(json);
|
// log.info("scroll请求路径:"+jsonObject);
|
resultJson = restTemplateUtil.post(url, jsonObject, MediaType.APPLICATION_JSON_UTF8, false);
|
jsonEsData = JSONObject.parseObject(resultJson);
|
scrollId = jsonEsData.getString("_scroll_id");
|
jsonHits = jsonEsData.getJSONObject("hits");
|
hitArray = JSONArray.parseArray(jsonHits.getString("hits"), JSONObject.class);
|
// hitArray = fastjsonStr2List(jsonHits.getString("hits"));
|
long end = System.currentTimeMillis();
|
log.info("scroll中请求ES+解析成array耗时:"+(end-start));
|
}
|
//当searchHits的数组为空的时候结束循环,至此数据全部读取完毕 &×++<50
|
} while(hitArray.size() == SCROLL_SIZE);
|
}
|
return resultList;
|
}
|
|
// private static String[] returnTwoBySplit(String resultJson){
|
// String[] substring = new String[2];
|
// String subStr = "";
|
// if(resultJson.startsWith("{\"_scroll_id\":")){
|
// substring[0] = resultJson.substring(resultJson.indexOf("\"", 14),
|
// resultJson.indexOf("\"",20)).substring(1);
|
// subStr = resultJson.split("\"")[22];
|
// substring[1] = subStr.substring(1,subStr.indexOf(",")); //:5000,
|
// }
|
// return substring;
|
// }
|
|
}
|