Commit 454bccf4 by 陈健智

searchRecord调整

parent a20dcc64
...@@ -60,6 +60,8 @@ public class EsClientDao { ...@@ -60,6 +60,8 @@ public class EsClientDao {
"brandkbs_cache_maps", "brandkbs_mark_cache_maps", "channel_influence", "ind_full_text", "url", "question_url", "answer_url"}; "brandkbs_cache_maps", "brandkbs_mark_cache_maps", "channel_influence", "ind_full_text", "url", "question_url", "answer_url"};
private static final Long ONE_HOUR = 60 * 60 * 1000L; private static final Long ONE_HOUR = 60 * 60 * 1000L;
private static final Long HALF_HOUR = ONE_HOUR / 2;
// 滚动查询超时时间 // 滚动查询超时时间
private static final TimeValue TIME_VALUE = TimeValue.timeValueMinutes(8); private static final TimeValue TIME_VALUE = TimeValue.timeValueMinutes(8);
...@@ -103,12 +105,34 @@ public class EsClientDao { ...@@ -103,12 +105,34 @@ public class EsClientDao {
long startTime = calendar.getTime().getTime(); long startTime = calendar.getTime().getTime();
List<Long[]> cutTimes = Tools.cutTimeRange(startTime, endTime, ONE_HOUR); List<Long[]> cutTimes = Tools.cutTimeRange(startTime, endTime, ONE_HOUR);
List<CompletableFuture<Pair<Long[], Map<ChannelIndex, ChannelIndex.Record>>>> futures = new ArrayList<>(cutTimes.size()); List<CompletableFuture<Pair<Long[], Map<ChannelIndex, ChannelIndex.Record>>>> futures = new ArrayList<>(cutTimes.size());
cutTimes.forEach(times -> futures.add(CompletableFuture.supplyAsync(() -> searchRecord(times[0], times[1]), executor))); try {
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).whenComplete((r, e) -> { cutTimes.forEach(times -> futures.add(CompletableFuture.supplyAsync(() -> {
futures.forEach(f -> { try {
res.add(f.join()); return searchRecord(times[0], times[1]);
}); } catch (IOException e) {
}).join(); throw new RuntimeException(e);
}
}, executor)));
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).whenComplete((r1, e1) -> futures.forEach(f -> res.add(f.join()))).join();
}catch (Exception e){
log.error("error-searchRecordRecentDay-oneHour,跨度调整为半小时再重试", e);
List<Long[]> halfCutTimes = Tools.cutTimeRange(startTime, endTime, HALF_HOUR);
futures.clear();
res.clear();
try {
halfCutTimes.forEach(times -> futures.add(CompletableFuture.supplyAsync(() -> {
try {
return searchRecord(times[0], times[1]);
} catch (IOException e2) {
throw new RuntimeException(e2);
}
}, executor)));
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).whenComplete((r2, e2) -> futures.forEach(f -> res.add(f.join()))).join();
log.error("跨度半小时重试成功");
}catch (Exception exception){
log.error("error-searchRecordRecentDay-oneHour,跨度半小时重试失败", exception);
}
}
return res; return res;
} }
...@@ -170,28 +194,24 @@ public class EsClientDao { ...@@ -170,28 +194,24 @@ public class EsClientDao {
return retryTemplate.execute(context -> searchScroll(sourceBuilder)); return retryTemplate.execute(context -> searchScroll(sourceBuilder));
} }
private Pair<Long[], Map<ChannelIndex, ChannelIndex.Record>> searchRecord(long startTime, long endTime) { private Pair<Long[], Map<ChannelIndex, ChannelIndex.Record>> searchRecord(long startTime, long endTime) throws IOException {
Map<ChannelIndex, ChannelIndex.Record> res = new HashMap<>(); Map<ChannelIndex, ChannelIndex.Record> res = new HashMap<>();
try { QueryBuilder queryBuilder = QueryBuilders.rangeQuery("mtime").gte(startTime).lt(endTime);
QueryBuilder queryBuilder = QueryBuilders.rangeQuery("mtime").gte(startTime).lt(endTime); List<JSONObject> results = searchScroll(queryBuilder, 10000, CHANNEL_RECORD_FETCH_SOURCE);
List<JSONObject> results = searchScroll(queryBuilder, 10000, CHANNEL_RECORD_FETCH_SOURCE); for (Map<String, Object> result : results) {
for (Map<String, Object> result : results) { for (ChannelIndex channelIndex : ChannelIndex.createChannelIndexes(result)) {
for (ChannelIndex channelIndex : ChannelIndex.createChannelIndexes(result)) { res.compute(channelIndex, (k, v) -> {
res.compute(channelIndex, (k, v) -> { if (null == v) {
if (null == v) { v = new ChannelIndex.Record();
v = new ChannelIndex.Record(); }
} try {
try { return v.mergeRecord(new ChannelIndex.Record(result));
return v.mergeRecord(new ChannelIndex.Record(result)); } catch (Exception e) {
} catch (Exception e) { log.error("searchRecord-error-id:{}", result.get("id"), e);
log.error("searchRecord-error-id:{}", result.get("id"), e); return null;
return null; }
} });
});
}
} }
} catch (IOException e) {
log.error("searchRecord-", e);
} }
log.info("startTime:{},endTime:{},size:{}", DF.format(startTime), DF.format(endTime), res.size()); log.info("startTime:{},endTime:{},size:{}", DF.format(startTime), DF.format(endTime), res.size());
return Pair.of(new Long[]{startTime, endTime}, res); return Pair.of(new Long[]{startTime, endTime}, res);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment