Commit c15766a5 by 陈健智

searchRecord调整2

parent 454bccf4
......@@ -60,8 +60,6 @@ public class EsClientDao {
"brandkbs_cache_maps", "brandkbs_mark_cache_maps", "channel_influence", "ind_full_text", "url", "question_url", "answer_url"};
private static final Long ONE_HOUR = 60 * 60 * 1000L;
private static final Long HALF_HOUR = ONE_HOUR / 2;
// 滚动查询超时时间
private static final TimeValue TIME_VALUE = TimeValue.timeValueMinutes(8);
......@@ -105,34 +103,12 @@ public class EsClientDao {
long startTime = calendar.getTime().getTime();
List<Long[]> cutTimes = Tools.cutTimeRange(startTime, endTime, ONE_HOUR);
List<CompletableFuture<Pair<Long[], Map<ChannelIndex, ChannelIndex.Record>>>> futures = new ArrayList<>(cutTimes.size());
try {
cutTimes.forEach(times -> futures.add(CompletableFuture.supplyAsync(() -> {
try {
return searchRecord(times[0], times[1]);
} catch (IOException e) {
throw new RuntimeException(e);
}
}, executor)));
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).whenComplete((r1, e1) -> futures.forEach(f -> res.add(f.join()))).join();
}catch (Exception e){
log.error("error-searchRecordRecentDay-oneHour,跨度调整为半小时再重试", e);
List<Long[]> halfCutTimes = Tools.cutTimeRange(startTime, endTime, HALF_HOUR);
futures.clear();
res.clear();
try {
halfCutTimes.forEach(times -> futures.add(CompletableFuture.supplyAsync(() -> {
try {
return searchRecord(times[0], times[1]);
} catch (IOException e2) {
throw new RuntimeException(e2);
}
}, executor)));
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).whenComplete((r2, e2) -> futures.forEach(f -> res.add(f.join()))).join();
log.error("跨度半小时重试成功");
}catch (Exception exception){
log.error("error-searchRecordRecentDay-oneHour,跨度半小时重试失败", exception);
}
}
cutTimes.forEach(times -> futures.add(CompletableFuture.supplyAsync(() -> searchRecord(times[0], times[1]), executor)));
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).whenComplete((r, e) -> {
futures.forEach(f -> {
res.add(f.join());
});
}).join();
return res;
}
......@@ -194,10 +170,30 @@ public class EsClientDao {
return retryTemplate.execute(context -> searchScroll(sourceBuilder));
}
private Pair<Long[], Map<ChannelIndex, ChannelIndex.Record>> searchRecord(long startTime, long endTime) throws IOException {
private Pair<Long[], Map<ChannelIndex, ChannelIndex.Record>> searchRecord(long startTime, long endTime) {
Map<ChannelIndex, ChannelIndex.Record> res = new HashMap<>();
try {
List<JSONObject> results = new ArrayList<>();
try {
QueryBuilder queryBuilder = QueryBuilders.rangeQuery("mtime").gte(startTime).lt(endTime);
List<JSONObject> results = searchScroll(queryBuilder, 10000, CHANNEL_RECORD_FETCH_SOURCE);
results = searchScroll(queryBuilder, 10000, CHANNEL_RECORD_FETCH_SOURCE);
}catch (Exception e){
log.error("searchRecord-搜索阶段出错-时间分段重试开始", e);
// 时间分段查询
long midTime = startTime + (endTime - startTime) / 2;
try {
QueryBuilder queryBuilder1 = QueryBuilders.rangeQuery("mtime").gte(startTime).lt(midTime);
results.addAll(searchScroll(queryBuilder1, 10000, CHANNEL_RECORD_FETCH_SOURCE));
}catch (Exception e1){
log.error("searchRecord分段查询出错,时间范围:{}-{}", startTime, midTime, e1);
}
try {
QueryBuilder queryBuilder2 = QueryBuilders.rangeQuery("mtime").gte(midTime).lt(endTime);
results.addAll(searchScroll(queryBuilder2, 10000, CHANNEL_RECORD_FETCH_SOURCE));
}catch (Exception e2){
log.error("searchRecord分段查询出错,时间范围:{}-{}", midTime, endTime, e2);
}
}
for (Map<String, Object> result : results) {
for (ChannelIndex channelIndex : ChannelIndex.createChannelIndexes(result)) {
res.compute(channelIndex, (k, v) -> {
......@@ -213,6 +209,9 @@ public class EsClientDao {
});
}
}
} catch (Exception e) {
log.error("searchRecord-", e);
}
log.info("startTime:{},endTime:{},size:{}", DF.format(startTime), DF.format(endTime), res.size());
return Pair.of(new Long[]{startTime, endTime}, res);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment