Commit c38ba017 by shenjunjie

Merge branch 'release' into 'master'

Release

See merge request !566
parents 37557b4e 56e10078
package com.zhiwei.brandkbs2.es;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.brandkbs2.common.GenericAttribute;
import com.zhiwei.brandkbs2.pojo.ChannelIndex;
import com.zhiwei.brandkbs2.pojo.ChannelRecord;
......@@ -10,23 +11,25 @@ import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
......@@ -111,6 +114,51 @@ public class ChannelEsDao extends EsClientDao {
}
}
public void removeChannelRecordById(List<JSONObject> list) {
List<String> idList = list.stream().map(json -> json.getString("id")).collect(Collectors.toList());
String index = getChannelRecordIndexes().get(0);
AtomicInteger count = new AtomicInteger(idList.size());
idList.forEach(id -> {
AtomicInteger searchCount = new AtomicInteger();
AtomicInteger updateCount = new AtomicInteger();
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.termQuery("record.articles.id", id));
searchSourceBuilder.size(10000);
SearchResponse response = retryTemplate.execute(context -> {
try {
return channelEsClient.search(new SearchRequest().indices(index).source(searchSourceBuilder), RequestOptions.DEFAULT);
} catch (IOException e) {
log.info("resetChannelRecordById:{},查询失败,尝试重试第{}次-", id, context.getRetryCount() + 1, e);
return null;
}
});
if (null != response) {
searchCount.addAndGet(response.getHits().getHits().length);
for (SearchHit hit : response.getHits().getHits()) {
JSONObject record = new JSONObject((Map<String, Object>) hit.getSourceAsMap().get("record"));
Long lastTime = record.getLong("last_time");
List<ChannelIndex.Article> articles = record.getJSONArray("articles").toJavaList(JSONObject.class).stream().map(json -> {
// 移除id相同的数据
if (json.getString("id").equals(id)) {
return null;
}
return ChannelIndex.Article.fromRecordMap(json);
}).filter(Objects::nonNull).collect(Collectors.toList());
Map<String, Object> updateMap = new HashMap<>();
updateMap.put("record", new ChannelIndex.Record(lastTime, articles).toEsMap());
updateMap.put("article_count", articles.size());
try {
channelEsClient.update(new UpdateRequest().index(index).id(hit.getId()).doc(updateMap), RequestOptions.DEFAULT);
updateCount.getAndIncrement();
} catch (IOException e) {
log.info("resetChannelRecordById:{},更新失败", id, e);
}
}
}
log.info("resetChannelRecordById:{},共查询到{}条,更新{}条,剩余id数:{}", id, searchCount.get(), updateCount.get(), count.decrementAndGet());
});
}
private BulkResponse upsertChannelRecordLimit(List<ChannelRecord> records, String index, int limit) {
AtomicBoolean res = new AtomicBoolean(true);
BulkResponse bulkResponse = null;
......
......@@ -112,6 +112,36 @@ public class EsClientDao {
return res;
}
public List<JSONObject> searchRecordEmpty(long startTime, long endTime, String mgroup) {
List<JSONObject> res = new ArrayList<>();
List<Long[]> cutTimes = Tools.cutTimeRange(startTime, endTime, ONE_HOUR * 24);
List<CompletableFuture<List<JSONObject>>> futures = new ArrayList<>(cutTimes.size());
cutTimes.forEach(times -> futures.add(CompletableFuture.supplyAsync(() -> searchRecordEmptySingle(times[0], times[1], mgroup), executor)));
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).whenComplete((r, e) -> {
futures.forEach(f -> {
res.addAll(f.join());
});
}).join();
return res;
}
private List<JSONObject> searchRecordEmptySingle(long startTime, long endTime, String mgroup) {
List<JSONObject> res = new ArrayList<>();
try {
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
// mark_cache_maps字段为空
boolQuery.mustNot(QueryBuilders.existsQuery("mark_cache_maps"));
boolQuery.must(QueryBuilders.termQuery("mgroup.keyword", mgroup));
boolQuery.must(QueryBuilders.rangeQuery("time").gte(startTime).lt(endTime));
List<JSONObject> results = searchScroll(boolQuery, 10000, new String[]{"id"});
res.addAll(results);
} catch (IOException e) {
log.error("searchRecordEmptySingle-", e);
}
log.info("startTime:{},endTime:{},size:{}", DF.format(startTime), DF.format(endTime), res.size());
return res;
}
//
// /**
// * 搜索符合事件数据
......@@ -173,8 +203,27 @@ public class EsClientDao {
private Pair<Long[], Map<ChannelIndex, ChannelIndex.Record>> searchRecord(long startTime, long endTime) {
Map<ChannelIndex, ChannelIndex.Record> res = new HashMap<>();
try {
QueryBuilder queryBuilder = QueryBuilders.rangeQuery("mtime").gte(startTime).lt(endTime);
List<JSONObject> results = searchScroll(queryBuilder, 10000, CHANNEL_RECORD_FETCH_SOURCE);
List<JSONObject> results = new ArrayList<>();
try {
QueryBuilder queryBuilder = QueryBuilders.rangeQuery("mtime").gte(startTime).lt(endTime);
results = searchScroll(queryBuilder, 10000, CHANNEL_RECORD_FETCH_SOURCE);
}catch (Exception e){
log.error("searchRecord-搜索阶段出错-时间分段重试开始", e);
// 时间分段查询
long midTime = startTime + (endTime - startTime) / 2;
try {
QueryBuilder queryBuilder1 = QueryBuilders.rangeQuery("mtime").gte(startTime).lt(midTime);
results.addAll(searchScroll(queryBuilder1, 10000, CHANNEL_RECORD_FETCH_SOURCE));
}catch (Exception e1){
log.error("searchRecord分段查询出错,时间范围:{}-{}", startTime, midTime, e1);
}
try {
QueryBuilder queryBuilder2 = QueryBuilders.rangeQuery("mtime").gte(midTime).lt(endTime);
results.addAll(searchScroll(queryBuilder2, 10000, CHANNEL_RECORD_FETCH_SOURCE));
}catch (Exception e2){
log.error("searchRecord分段查询出错,时间范围:{}-{}", midTime, endTime, e2);
}
}
for (Map<String, Object> result : results) {
for (ChannelIndex channelIndex : ChannelIndex.createChannelIndexes(result)) {
res.compute(channelIndex, (k, v) -> {
......@@ -190,7 +239,7 @@ public class EsClientDao {
});
}
}
} catch (IOException e) {
} catch (Exception e) {
log.error("searchRecord-", e);
}
log.info("startTime:{},endTime:{},size:{}", DF.format(startTime), DF.format(endTime), res.size());
......
......@@ -529,11 +529,14 @@ public class ChannelServiceImpl implements ChannelService {
return null;
}
articles.sort(Comparator.comparingLong(ChannelIndex.Article::getTime).reversed());
// 重置符合条件的文章及数目
channelRecord.getRecord().setArticles(articles);
channelRecord.setArticleCount(articles.size());
return channelRecord;
}).filter(Objects::nonNull).collect(Collectors.groupingBy(ChannelRecord::getPlatform));
for (String platformName : PLATFORMS) {
List<ChannelRecord> channelRecordList = channelRecords.getOrDefault(platformName, Collections.emptyList()).stream().limit(size).collect(Collectors.toList());
List<ChannelRecord> channelRecordList =
channelRecords.getOrDefault(platformName, Collections.emptyList()).stream().sorted((x, y) -> Long.compare(y.getArticleCount(), x.getArticleCount())).limit(size).collect(Collectors.toList());
List<ChannelListVO> list = new ArrayList<>(size);
Map<String, ChannelRecord> map = Maps.uniqueIndex(channelRecordList, ChannelRecord::getChannelFid);
Map<String, Channel> fidChannel = channelDao.queryUniqueAsync(map.keySet());
......@@ -545,14 +548,14 @@ public class ChannelServiceImpl implements ChannelService {
list.add(ChannelListVO.createFromChannel(record, record.getRecord().getArticles().size()));
}
});
List<ChannelListVO> resList;
// 排序
if (Objects.nonNull(sorter) && sorter.contains("index")){
resList = list.stream().sorted(Comparator.comparingDouble(ChannelListVO::getEmotionIndex).reversed()).collect(Collectors.toList());
}else {
resList = list.stream().sorted(Comparator.comparingDouble(ChannelListVO::getArticleCount).reversed()).collect(Collectors.toList());
}
res.put(platformName, resList);
// List<ChannelListVO> resList;
// // 排序
// if (Objects.nonNull(sorter) && sorter.contains("index")){
// resList = list.stream().sorted(Comparator.comparingDouble(ChannelListVO::getEmotionIndex).reversed()).collect(Collectors.toList());
// }else {
// resList = list.stream().sorted(Comparator.comparingDouble(ChannelListVO::getArticleCount).reversed()).collect(Collectors.toList());
// }
res.put(platformName, list);
}
return res;
}
......
......@@ -274,43 +274,17 @@ public class TaskServiceImpl implements TaskService {
return Pair.of(insertList, updateList);
}
/**
* 本地测试-刷新历史渠道记录用
* 不更新渠道表
*/
@Deprecated
public void messageFlowCount2(int day) {
List<Pair<Long[], Map<ChannelIndex, ChannelIndex.Record>>> rangeTimeRecords = esClientDao.searchRecordRecentDay(day);
int total = rangeTimeRecords.stream().mapToInt(pair -> pair.getRight().values().size()).sum();
log.info("渠道统计-搜索到近{}天的受影响渠道数{}条", day, total);
// 结果合并
List<Map<ChannelIndex, ChannelIndex.Record>> channelList = rangeTimeRecords.stream().map(Pair::getRight).collect(Collectors.toList());
// 合并渠道记录
Map<ChannelIndex, ChannelIndex.Record> channelIndexRecordMap = ChannelIndex.mergeRecord(channelList);
// 获得单位时间内最小最大时间戳
Long[] timeMinMax = Tools.timeMinMax(rangeTimeRecords.stream().map(Pair::getLeft).collect(Collectors.toList()));
List<ChannelRecord> channelRecords = ChannelRecord.createChannelRecords(timeMinMax[0], timeMinMax[1], channelIndexRecordMap);
channelEsDao.upsertChannelRecord(channelRecords);
log.info("渠道统计-渠道记录-统计结束");
// List<ChannelRecord> channelRecords = rangeTimeRecords.stream().map(pair -> ChannelRecord.createChannelRecords(pair.getLeft()[0], pair.getLeft()[1], pair.getRight())).flatMap(Collection::stream).collect(Collectors.toList());
long handleSize = 0;
List<Channel> insertList = new ArrayList<>();
for (Map.Entry<ChannelIndex, ChannelIndex.Record> entry : channelIndexRecordMap.entrySet()) {
// 是否已存在
Channel channel = channelDao.queryUnique(entry.getKey());
if (null == channel) {
channel = Channel.createFromChannelIndexRecord(entry.getKey(), entry.getValue());
insertList.add(channel);
} else {
channel.setRecord(entry.getValue());
channelDao.updateOne(channel);
}
// 设置查询数值
entry.getKey().setChannelInfo(channel);
if (++handleSize % 10000 == 0) {
log.info("渠道统计-渠道总计-查询更新已完成{}/{}", handleSize, channelIndexRecordMap.size());
}
}
log.info("渠道统计-渠道总计-查询更新结束,开始批量入库");
ListUtils.partition(insertList, 1000).forEach(list -> channelDao.insertMany(list));
log.info("渠道统计-渠道总计-录入完毕,新增渠道{}条,更新渠道{}条", insertList.size(), total - insertList.size());
public void messageFlowCount2(long startTime,long endTime,String mgroup) {
// 找到项目的空标签数据
List<JSONObject> list = esClientDao.searchRecordEmpty(startTime, endTime, mgroup);
// 移除对应数据记录
channelEsDao.removeChannelRecordById(list);
log.info("刷新历史渠道记录-统计结束");
}
@Override
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment