Commit 83e66793 by 陈健智

事件标题聚合缓存修复

parent 2a75c54e
......@@ -45,6 +45,7 @@ import org.apache.commons.lang3.time.DateUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
......@@ -871,8 +872,14 @@ public class MarkDataServiceImpl implements MarkDataService {
if (null != emotion && !Objects.equals(emotion, EmotionEnum.ALL.getName())) {
query.must(QueryBuilders.termQuery("brandkbs_mark_cache_maps.name.keyword", emotion));
}
SearchResponse searchResponse = esClientDao.searchResponse(indexes, null, query,
aggregationBuilder.subAggregation(sourceAggregationBuilder), null, null, 0, 0, null);
SearchResponse searchResponse;
// 单独处理buckets超过上限异常,出错时按时间分段查询
try {
searchResponse = esClientDao.searchResponse(indexes, null, query,
aggregationBuilder.subAggregation(sourceAggregationBuilder), null, null, 0, 0, null);
}catch (ElasticsearchStatusException e){
return getMarkTopTitleDivided(query, indexes, aggregationBuilder, startTime, endTime, size);
}
List<JSONObject> res = new ArrayList<>();
Map<String, Aggregation> aggMap = searchResponse.getAggregations().asMap();
ParsedStringTerms teamAgg = (ParsedStringTerms) aggMap.get("titles");
......@@ -883,7 +890,6 @@ public class MarkDataServiceImpl implements MarkDataService {
if ("分享一篇文章".equals(title)) {
continue;
}
// result.merge(title, num, Integer::sum);
jsonObject.put("title", title);
jsonObject.put("num", bucket.getDocCount());
Map<String, Aggregation> aggregationMap = bucket.getAggregations().asMap();
......@@ -899,6 +905,59 @@ public class MarkDataServiceImpl implements MarkDataService {
return res;
}
/**
* getMarkTopTitle按时间分段查询
* @param query
* @param indexes
* @param aggregationBuilder
* @param startTime
* @param endTime
* @param size
* @return
* @throws IOException
*/
private List<JSONObject> getMarkTopTitleDivided(BoolQueryBuilder query, String[] indexes, TermsAggregationBuilder aggregationBuilder, Long startTime, Long endTime, int size) throws IOException {
List<Long[]> timeRange = Tools.cutTimeRange(startTime, endTime, Constant.ONE_DAY);
Map<String, JSONObject> map = new HashMap<>();
for (Long[] time : timeRange) {
TermsAggregationBuilder sourceAggregationBuilder = AggregationBuilders.terms(Tools.concat(time[0], time[1])).field("source").size(10000);
query.must(QueryBuilders.rangeQuery("time").gte(time[0]).lt(time[1]));
SearchResponse searchResponse = esClientDao.searchResponse(indexes, null, query,
aggregationBuilder.subAggregation(sourceAggregationBuilder), null, null, 0, 0, null);
Map<String, Aggregation> aggMap = searchResponse.getAggregations().asMap();
ParsedStringTerms teamAgg = (ParsedStringTerms) aggMap.get("titles");
for (Terms.Bucket bucket : teamAgg.getBuckets()) {
JSONObject jsonObject = new JSONObject();
String title = bucket.getKeyAsString();
// 过滤 “分享一篇文章” 的标题
if ("分享一篇文章".equals(title)) {
continue;
}
jsonObject.put("title", title);
jsonObject.put("num", bucket.getDocCount());
Map<String, Aggregation> aggregationMap = bucket.getAggregations().asMap();
ParsedStringTerms sourceAgg = (ParsedStringTerms) aggregationMap.get(Tools.concat(time[0], time[1]));
jsonObject.put("sourceCount", sourceAgg.getBuckets().size());
List<String> sources = new ArrayList<>();
for (Terms.Bucket sourceBucket : sourceAgg.getBuckets()) {
sources.add(sourceBucket.getKeyAsString());
}
jsonObject.put("sources", sources);
// 合并map
map.merge(title, jsonObject, (v1, v2) -> {
v1.put("num", v1.getLongValue("num") + v2.getLongValue("num"));
v1.put("sourceCount", v1.getIntValue("sourceCount") + v2.getIntValue("sourceCount"));
v1.put("sources", v1.getJSONArray("sources").addAll(v2.getJSONArray("sources")));
return v1;
});
}
}
return map.values().stream()
.filter(Objects::nonNull)
.sorted(Comparator.comparingLong((JSONObject json) -> json.getLongValue("num")).reversed())
.limit(size + 1).collect(Collectors.toList());
}
@Override
public BaseMap getFirstArticle(Long startTime, Long endTime, String aggTitle, String projectId, String contendId) throws IOException {
return getFirstArticle(startTime, endTime, aggTitle, projectId, contendId, true);
......@@ -2742,8 +2801,8 @@ public class MarkDataServiceImpl implements MarkDataService {
@Override
public List<JSONObject> getLastNews(Long startTime, Long endTime, String planId, int size, boolean include) {
String projectId = UserThreadLocal.getProjectId();
try {
String projectId = UserThreadLocal.getProjectId();
List<JSONObject> markTopTitleList = getMarkTopTitle(startTime, endTime, null, projectId, Constant.PRIMARY_CONTEND_ID, planId, size);
CompletableFuture.allOf(markTopTitleList.stream().map(json -> CompletableFuture.supplyAsync(() -> {
try {
......@@ -2759,7 +2818,7 @@ public class MarkDataServiceImpl implements MarkDataService {
}, executor)).toArray(CompletableFuture[]::new)).join();
return markTopTitleList.stream().filter(Objects::nonNull).limit(size).collect(Collectors.toList());
}catch (Exception e){
ExceptionCast.cast(CommonCodeEnum.FAIL, "新舆情分析getLastNews异常-", e);
log.error("新舆情分析getLastNews异常-projectId:{}", projectId,e);
}
return Collections.emptyList();
}
......
......@@ -319,6 +319,10 @@ public class Tools {
public static List<Long[]> cutTimeRange(long startTime, long endTime, long range) {
List<Long[]> res = new ArrayList<>();
if (endTime - startTime <= range){
res.add(new Long[]{startTime, endTime});
return res;
}
while (startTime < endTime) {
long rangeEnd = Math.min(startTime + range, endTime);
res.add(new Long[]{startTime, rangeEnd});
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment