liuxiaolong
2019-05-06 c15226e1b58f255dbebf1bdca8d4e53b9277249c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
package com.basic.analy.service.impl;
 
import java.io.StringReader;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
 
import java.util.List;
import java.util.Map;
 
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
 
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONReader;
import com.basic.analy.service.SearchManService;
import com.basic.analy.utils.EsThreadUtils;
import com.cloud.common.utils.RestTemplateUtil;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import com.alibaba.fastjson.JSONObject;
 
 
@Service
public class SearchServiceImpl implements SearchManService {
 
    @Autowired
    private RestTemplateUtil restTemplateUtil;
 
    private static final String SEARCH_INDEX = "videopersons";
 
    private static final int SCROLL_SIZE = 8000;
    
    private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
 
    private String ipField = "picIp";
 
    @Value("${es.es_network_host}")
    private String es_network_host;    //es检索ip
    @Value("${es.es_network_http_port}")
    private String es_network_http_port;    //es检索端口
    @Value("${es_index_persons}")
    private String  PERSON_PIC_INDEX = "";
 
    private Logger log = Logger.getLogger(SearchServiceImpl.class);
 
    /**
     * @param index
     * @param fields
     * @param startDate
     * @param endDate
     * @param dateField
     * @return
     */
    public List<Map<String,Object>> searchScrollData(String index,String fields,String startDate,String endDate, String dateField, int liker, byte[] faceData ) {
        String url = es_network_host+":"+es_network_http_port+"/"+index+"/_search?scroll=1m";
        String json = "";
//            log.info("检索路径为"+url);
        //Post方式提交Json字符串,按照指定币种,指定时间点查询
        if(startDate!=null && endDate!=null){
         json = "{\"size\": "+ SCROLL_SIZE +",\"query\":{\"bool\": {\"must\": ["
                 +"{\"range\": {\""+dateField+"\": "
                + "{\"gte\": \""+startDate+"\",\"lte\": \""+endDate+"\"}}}]}}}";
        }else{
            json = "{\"size\": "+ SCROLL_SIZE+",\"query\": {\"bool\": {}}}";
        }
//            log.info("ScrollData service=======>startDate:"+startDate+",endDate:"+endDate+",dateField:"+dateField);
        long start = System.currentTimeMillis();
        log.info("scroll 查询 json:"+json);
        JSONObject jsonObject = JSONObject.parseObject(json);
        String resultJson = restTemplateUtil.post(url, jsonObject, MediaType.APPLICATION_JSON_UTF8, false);
 
        log.info("第一次scroll查询耗时:"+ (System.currentTimeMillis()-start));
        List<Map<String,Object>> jsonParseListData = compareEsDataStr(resultJson, liker, faceData);
 
        long end = System.currentTimeMillis();
        log.info("结束时间包含转换: "+end+"\n用时:"+(end-start));
        return jsonParseListData;
    }
 
    private List<JSONObject> fastjsonStr2List(String resultJson){
        List<JSONObject> rsList = new ArrayList<>();
 
        JSONReader reader = new JSONReader(new StringReader(resultJson));//以流的形式处理
        reader.startArray();
        JSONObject jsonObject = null;
        while (reader.hasNext()){
            rsList.add(reader.readObject(JSONObject.class));
//            jsonObject = new JSONObject();
//            while (reader.hasNext()){
//                String arrListItemKey = reader.readString();
//                String arrListItemValue = reader.readObject().toString();
//                jsonObject.put(arrListItemKey, arrListItemValue);
//            }
//            rsList.add(jsonObject);
//            reader.endObject();
        }
        reader.endArray();
        return rsList;
    }
 
    //  利用fork比对ES每次scroll查询到的数据
    public  List<Map<String, Object>> compareEsDataStr(String resultJson, int liker, byte[] faceData) {
        List<Map<String, Object>> resultList = new ArrayList<>();
 
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        String url = es_network_host+":"+es_network_http_port+"/_search/scroll";
 
        JSONObject jsonEsData = JSONObject.parseObject(resultJson);
        String scrollId = jsonEsData.getString("_scroll_id");
        JSONObject jsonHits = jsonEsData.getJSONObject("hits");
        int total = Integer.valueOf(jsonHits.getString("total"));
        long parseTimeStart = System.currentTimeMillis();
//        List<JSONObject> hitArray = fastjsonStr2List(jsonHits.getString("hits"));
        List<JSONObject> hitArray = JSONArray.parseArray(jsonHits.getString("hits"), JSONObject.class);//第一次scroll取到的数据
        log.info("hits中数组转List<JSONObject>耗时:"+(System.currentTimeMillis() - parseTimeStart));
//        String scrollId[] = returnTwoBySplit(resultJson);
        log.info("ES返回total:"+ total);
        if(total > 0){
//            times = total / SCROLL_SIZE;
//            if(total % SCROLL_SIZE > 0)
//                times++;
 
//            ArrayList<EsThreadUtils> subTasks = new ArrayList<EsThreadUtils>();
            do {
//                EsThreadUtils task = new EsThreadUtils(resultJson,null);
//                subTasks.add(task);
//                task.fork();
                if(hitArray.size() > 0){
                    Future<List<Map<String, Object>>> future = forkJoinPool.submit(new EsThreadUtils(hitArray, liker, faceData));
 
                    try {
                        List<Map<String, Object>> futureMaps = future.get();
                        if(futureMaps !=null && futureMaps.size()>0){
                            for(Map futureMap : futureMaps){
                                resultList.add(futureMap);
                            }
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (ExecutionException e) {
                        e.printStackTrace();
                    }
 
 
                    String json = "{\"scroll\":\"1m\",\"scroll_id\" :\""+scrollId+"\"}";
                    long start = System.currentTimeMillis();
                    JSONObject jsonObject = JSONObject.parseObject(json);
//                    log.info("scroll请求路径:"+jsonObject);
                    resultJson = restTemplateUtil.post(url, jsonObject, MediaType.APPLICATION_JSON_UTF8, false);
                    jsonEsData = JSONObject.parseObject(resultJson);
                    scrollId = jsonEsData.getString("_scroll_id");
                    jsonHits = jsonEsData.getJSONObject("hits");
                    hitArray = JSONArray.parseArray(jsonHits.getString("hits"), JSONObject.class);
//                    hitArray = fastjsonStr2List(jsonHits.getString("hits"));
                    long end = System.currentTimeMillis();
                    log.info("scroll中请求ES+解析成array耗时:"+(end-start));
                }
                //当searchHits的数组为空的时候结束循环,至此数据全部读取完毕    &&times++<50
            } while(hitArray.size() == SCROLL_SIZE);
        }
        return resultList;
    }
 
//    private static String[] returnTwoBySplit(String resultJson){
//        String[] substring = new String[2];
//        String subStr = "";
//        if(resultJson.startsWith("{\"_scroll_id\":")){
//            substring[0] = resultJson.substring(resultJson.indexOf("\"", 14),
//                    resultJson.indexOf("\"",20)).substring(1);
//            subStr =  resultJson.split("\"")[22];
//            substring[1] = subStr.substring(1,subStr.indexOf(","));          //:5000,
//        }
//        return substring;
//    }
    
}