package com.basic.analy.service.impl; import java.io.StringReader; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONReader; import com.basic.analy.service.SearchManService; import com.basic.analy.utils.EsThreadUtils; import com.cloud.common.utils.RestTemplateUtil; import org.apache.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.http.MediaType; import org.springframework.stereotype.Service; import com.alibaba.fastjson.JSONObject; @Service public class SearchServiceImpl implements SearchManService { @Autowired private RestTemplateUtil restTemplateUtil; private static final String SEARCH_INDEX = "videopersons"; private static final int SCROLL_SIZE = 8000; private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); private String ipField = "picIp"; @Value("${es.es_network_host}") private String es_network_host; //es检索ip @Value("${es.es_network_http_port}") private String es_network_http_port; //es检索端口 @Value("${es_index_persons}") private String PERSON_PIC_INDEX = ""; private Logger log = Logger.getLogger(SearchServiceImpl.class); /** * @param index * @param fields * @param startDate * @param endDate * @param dateField * @return */ public List> searchScrollData(String index,String fields,String startDate,String endDate, String dateField, int liker, byte[] faceData ) { String url = es_network_host+":"+es_network_http_port+"/"+index+"/_search?scroll=1m"; String json = ""; // log.info("检索路径为"+url); //Post方式提交Json字符串,按照指定币种,指定时间点查询 if(startDate!=null && endDate!=null){ json = "{\"size\": "+ SCROLL_SIZE +",\"query\":{\"bool\": {\"must\": [" +"{\"range\": {\""+dateField+"\": " + "{\"gte\": \""+startDate+"\",\"lte\": \""+endDate+"\"}}}]}}}"; }else{ json = "{\"size\": "+ SCROLL_SIZE+",\"query\": {\"bool\": {}}}"; } // log.info("ScrollData service=======>startDate:"+startDate+",endDate:"+endDate+",dateField:"+dateField); long start = System.currentTimeMillis(); log.info("scroll 查询 json:"+json); JSONObject jsonObject = JSONObject.parseObject(json); String resultJson = restTemplateUtil.post(url, jsonObject, MediaType.APPLICATION_JSON_UTF8, false); log.info("第一次scroll查询耗时:"+ (System.currentTimeMillis()-start)); List> jsonParseListData = compareEsDataStr(resultJson, liker, faceData); long end = System.currentTimeMillis(); log.info("结束时间包含转换: "+end+"\n用时:"+(end-start)); return jsonParseListData; } private List fastjsonStr2List(String resultJson){ List rsList = new ArrayList<>(); JSONReader reader = new JSONReader(new StringReader(resultJson));//以流的形式处理 reader.startArray(); JSONObject jsonObject = null; while (reader.hasNext()){ rsList.add(reader.readObject(JSONObject.class)); // jsonObject = new JSONObject(); // while (reader.hasNext()){ // String arrListItemKey = reader.readString(); // String arrListItemValue = reader.readObject().toString(); // jsonObject.put(arrListItemKey, arrListItemValue); // } // rsList.add(jsonObject); // reader.endObject(); } reader.endArray(); return rsList; } // 利用fork比对ES每次scroll查询到的数据 public List> compareEsDataStr(String resultJson, int liker, byte[] faceData) { List> resultList = new ArrayList<>(); ForkJoinPool forkJoinPool = new ForkJoinPool(); String url = es_network_host+":"+es_network_http_port+"/_search/scroll"; JSONObject jsonEsData = JSONObject.parseObject(resultJson); String scrollId = jsonEsData.getString("_scroll_id"); JSONObject jsonHits = jsonEsData.getJSONObject("hits"); int total = Integer.valueOf(jsonHits.getString("total")); long parseTimeStart = System.currentTimeMillis(); // List hitArray = fastjsonStr2List(jsonHits.getString("hits")); List hitArray = JSONArray.parseArray(jsonHits.getString("hits"), JSONObject.class);//第一次scroll取到的数据 log.info("hits中数组转List耗时:"+(System.currentTimeMillis() - parseTimeStart)); // String scrollId[] = returnTwoBySplit(resultJson); log.info("ES返回total:"+ total); if(total > 0){ // times = total / SCROLL_SIZE; // if(total % SCROLL_SIZE > 0) // times++; // ArrayList subTasks = new ArrayList(); do { // EsThreadUtils task = new EsThreadUtils(resultJson,null); // subTasks.add(task); // task.fork(); if(hitArray.size() > 0){ Future>> future = forkJoinPool.submit(new EsThreadUtils(hitArray, liker, faceData)); try { List> futureMaps = future.get(); if(futureMaps !=null && futureMaps.size()>0){ for(Map futureMap : futureMaps){ resultList.add(futureMap); } } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } String json = "{\"scroll\":\"1m\",\"scroll_id\" :\""+scrollId+"\"}"; long start = System.currentTimeMillis(); JSONObject jsonObject = JSONObject.parseObject(json); // log.info("scroll请求路径:"+jsonObject); resultJson = restTemplateUtil.post(url, jsonObject, MediaType.APPLICATION_JSON_UTF8, false); jsonEsData = JSONObject.parseObject(resultJson); scrollId = jsonEsData.getString("_scroll_id"); jsonHits = jsonEsData.getJSONObject("hits"); hitArray = JSONArray.parseArray(jsonHits.getString("hits"), JSONObject.class); // hitArray = fastjsonStr2List(jsonHits.getString("hits")); long end = System.currentTimeMillis(); log.info("scroll中请求ES+解析成array耗时:"+(end-start)); } //当searchHits的数组为空的时候结束循环,至此数据全部读取完毕 &×++<50 } while(hitArray.size() == SCROLL_SIZE); } return resultList; } // private static String[] returnTwoBySplit(String resultJson){ // String[] substring = new String[2]; // String subStr = ""; // if(resultJson.startsWith("{\"_scroll_id\":")){ // substring[0] = resultJson.substring(resultJson.indexOf("\"", 14), // resultJson.indexOf("\"",20)).substring(1); // subStr = resultJson.split("\"")[22]; // substring[1] = subStr.substring(1,subStr.indexOf(",")); //:5000, // } // return substring; // } }