Commit 08539bc8 by shenjunjie

渠道记录表重置标签方法实现

parent aa8a4748
......@@ -28,6 +28,7 @@ import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
......@@ -114,16 +115,46 @@ public class ChannelEsDao extends EsClientDao {
}
}
public void removeChannelRecordById(List<JSONObject> list) {
List<String> idList = list.stream().map(json -> json.getString("id")).collect(Collectors.toList());
public Integer removeChannelRecordBatch(List<JSONObject> list) {
AtomicInteger update = new AtomicInteger();
AtomicInteger total = new AtomicInteger(list.size());
List<CompletableFuture<Boolean>> futures = new ArrayList<>(list.size());
list.forEach(json -> futures.add(CompletableFuture.supplyAsync(() -> removeChannelRecordByCondition(json, total), executor)));
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).whenComplete((r, e) -> {
futures.forEach(f -> {
if (f.join()) {
update.incrementAndGet();
}
});
}).join();
// list.forEach(json -> {
// removeChannelRecordByCondition(json, total);
// });
return update.get();
}
private boolean removeChannelRecordByCondition(JSONObject json, AtomicInteger total) {
String index = getChannelRecordIndexes().get(0);
AtomicInteger count = new AtomicInteger(idList.size());
idList.forEach(id -> {
String id = json.getString("id");
AtomicInteger searchCount = new AtomicInteger();
AtomicInteger updateCount = new AtomicInteger();
boolean update = false;
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.termQuery("record.articles.id", id));
searchSourceBuilder.size(10000);
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.must(QueryBuilders.termQuery("record.articles.id", id));
if (json.containsKey("projectId")) {
boolQuery.must(QueryBuilders.termQuery("project_id.keyword", json.getString("projectId")));
}
if (json.containsKey("contendIds")) {
BoolQueryBuilder contendBoolQuery = QueryBuilders.boolQuery();
json.getJSONArray("contendIds").toJavaList(String.class).forEach(contendId -> {
contendBoolQuery.should(QueryBuilders.termQuery("contend_id.keyword", contendId));
});
boolQuery.must(contendBoolQuery);
}
searchSourceBuilder.query(boolQuery);
searchSourceBuilder.size(1000);
SearchResponse response = retryTemplate.execute(context -> {
try {
return channelEsClient.search(new SearchRequest().indices(index).source(searchSourceBuilder), RequestOptions.DEFAULT);
......@@ -137,26 +168,32 @@ public class ChannelEsDao extends EsClientDao {
for (SearchHit hit : response.getHits().getHits()) {
JSONObject record = new JSONObject((Map<String, Object>) hit.getSourceAsMap().get("record"));
Long lastTime = record.getLong("last_time");
List<ChannelIndex.Article> articles = record.getJSONArray("articles").toJavaList(JSONObject.class).stream().map(json -> {
AtomicBoolean valid = new AtomicBoolean(false);
List<ChannelIndex.Article> articles = record.getJSONArray("articles").toJavaList(JSONObject.class).stream().map(article -> {
// 移除id相同的数据
if (json.getString("id").equals(id)) {
if (article.getString("id").equals(id)) {
valid.set(true);
return null;
}
return ChannelIndex.Article.fromRecordMap(json);
return ChannelIndex.Article.fromRecordMap(article);
}).filter(Objects::nonNull).collect(Collectors.toList());
// 有效更新
if (valid.get()) {
Map<String, Object> updateMap = new HashMap<>();
updateMap.put("record", new ChannelIndex.Record(lastTime, articles).toEsMap());
updateMap.put("article_count", articles.size());
try {
channelEsClient.update(new UpdateRequest().index(index).id(hit.getId()).doc(updateMap), RequestOptions.DEFAULT);
updateCount.getAndIncrement();
update = true;
} catch (IOException e) {
log.info("resetChannelRecordById:{},更新失败", id, e);
}
}
}
log.info("resetChannelRecordById:{},共查询到{}条,更新{}条,剩余id数:{}", id, searchCount.get(), updateCount.get(), count.decrementAndGet());
});
}
log.info("resetChannelRecordById:{},本次查询到{}条,更新{}条,剩余id数:{}", id, searchCount.get(), updateCount.get(), total.decrementAndGet());
return update;
}
private BulkResponse upsertChannelRecordLimit(List<ChannelRecord> records, String index, int limit) {
......
......@@ -2,6 +2,7 @@ package com.zhiwei.brandkbs2.es;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.brandkbs2.common.GenericAttribute;
import com.zhiwei.brandkbs2.common.GlobalPojo;
import com.zhiwei.brandkbs2.config.Constant;
import com.zhiwei.brandkbs2.pojo.ChannelIndex;
import com.zhiwei.brandkbs2.util.Tools;
......@@ -142,6 +143,83 @@ public class EsClientDao {
return res;
}
public List<JSONObject> searchRecordUnrelated(long startTime, long endTime, String mgroup) {
List<JSONObject> res = new ArrayList<>();
List<JSONObject> dataList = new ArrayList<>();
List<Long[]> cutTimes = Tools.cutTimeRange(startTime, endTime, ONE_HOUR * 24);
List<CompletableFuture<List<JSONObject>>> futures = new ArrayList<>(cutTimes.size());
cutTimes.forEach(times -> futures.add(CompletableFuture.supplyAsync(() -> searchRecordUnrelatedSingle(times[0], times[1], mgroup), executor)));
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).whenComplete((r, e) -> {
futures.forEach(f -> dataList.addAll(f.join()));
}).join();
// 找到该mgroup下关联的项目品牌及竞品
Map<String, List<String>> relatedMap = new HashMap<>();
GlobalPojo.PROJECT_MAP.values().forEach(project -> {
if (project.isStart() && project.isShow()) {
List<String> hitList = new ArrayList<>();
if (mgroup.equals(project.getBrandLinkedGroup())) {
hitList.add(Constant.PRIMARY_CONTEND_ID);
}
if (null != project.getContendList()) {
project.getContendList().forEach(contend -> {
if (mgroup.equals(contend.getBrandLinkedGroup())) {
hitList.add(contend.getId());
}
});
}
if (!hitList.isEmpty()) {
relatedMap.put(project.getId(), hitList);
}
}
});
// 筛选没有关联关系需要重置的部分
dataList.forEach(data -> {
String id = data.getString("id");
List<Map<String, Object>> brandkbsCacheMaps = (List<Map<String, Object>>) data.get("brandkbs_cache_maps");
relatedMap.forEach((project, contends) -> {
List<String> unrelatedContends = new ArrayList<>();
contends.forEach(contend -> {
boolean hit = false;
for (Map<String, Object> map : brandkbsCacheMaps) {
String key = map.get("key") + "";
if (key.equals(project + "_" + contend)) {
hit = true;
break;
}
}
// 不满足关系则添加(说明需要重置)
if (!hit) {
unrelatedContends.add(contend);
}
});
if (!unrelatedContends.isEmpty()) {
JSONObject json = new JSONObject();
json.put("id", id);
json.put("projectId", project);
json.put("contendIds", unrelatedContends);
res.add(json);
}
});
});
return res;
}
public List<JSONObject> searchRecordUnrelatedSingle(long startTime, long endTime, String mgroup) {
List<JSONObject> res = new ArrayList<>();
try {
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
// TODO 是否需要设立标注字段限制(查询范围更小更精准)
boolQuery.must(QueryBuilders.termQuery("mgroup.keyword", mgroup));
boolQuery.must(QueryBuilders.rangeQuery("time").gte(startTime).lt(endTime));
List<JSONObject> results = searchScroll(boolQuery, 10000, new String[]{"id", "brandkbs_cache_maps"});
res.addAll(results);
} catch (IOException e) {
log.error("startTime:{},endTime:{},searchRecordUnrelatedSingle-", startTime, endTime, e);
}
log.info("startTime:{},endTime:{},size:{}", DF.format(startTime), DF.format(endTime), res.size());
return res;
}
//
// /**
// * 搜索符合事件数据
......@@ -208,18 +286,18 @@ public class EsClientDao {
QueryBuilder queryBuilder = QueryBuilders.rangeQuery("mtime").gte(startTime).lt(endTime);
results = searchScroll(queryBuilder, 10000, CHANNEL_RECORD_FETCH_SOURCE);
}catch (Exception e){
log.error("searchRecord-搜索阶段出错-时间分段重试开始", e);
log.info("searchRecord-搜索阶段出错-时间分段重试开始", e);
// 时间分段查询
long midTime = startTime + (endTime - startTime) / 2;
try {
QueryBuilder queryBuilder1 = QueryBuilders.rangeQuery("mtime").gte(startTime).lt(midTime);
results.addAll(searchScroll(queryBuilder1, 10000, CHANNEL_RECORD_FETCH_SOURCE));
results.addAll(searchScroll(queryBuilder1, 1000, CHANNEL_RECORD_FETCH_SOURCE));
}catch (Exception e1){
log.error("searchRecord分段查询出错,时间范围:{}-{}", startTime, midTime, e1);
}
try {
QueryBuilder queryBuilder2 = QueryBuilders.rangeQuery("mtime").gte(midTime).lt(endTime);
results.addAll(searchScroll(queryBuilder2, 10000, CHANNEL_RECORD_FETCH_SOURCE));
results.addAll(searchScroll(queryBuilder2, 1000, CHANNEL_RECORD_FETCH_SOURCE));
}catch (Exception e2){
log.error("searchRecord分段查询出错,时间范围:{}-{}", midTime, endTime, e2);
}
......
......@@ -278,13 +278,12 @@ public class TaskServiceImpl implements TaskService {
* 本地测试-刷新历史渠道记录用
* 不更新渠道表
*/
@Deprecated
public void messageFlowCount2(long startTime,long endTime,String mgroup) {
// 找到项目的空标签数据
List<JSONObject> list = esClientDao.searchRecordEmpty(startTime, endTime, mgroup);
public void messageFlowCountRefresh(long startTime, long endTime, String mgroup) {
// 找到mgroup下符合条件的数据
List<JSONObject> list = esClientDao.searchRecordUnrelated(startTime, endTime, mgroup);
// 移除对应数据记录
channelEsDao.removeChannelRecordById(list);
log.info("刷新历史渠道记录-统计结束");
Integer updateCount = channelEsDao.removeChannelRecordBatch(list);
log.info("刷新历史渠道记录-统计结束-受影响结果{}条,实际更新{}条", list.size(), updateCount);
}
@Override
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment