Commit 939698ae by 陈健智

对舆情系统回传数据进行渠道库更新

parent d448155c
...@@ -2,6 +2,7 @@ package com.zhiwei.brandkbs2.common; ...@@ -2,6 +2,7 @@ package com.zhiwei.brandkbs2.common;
import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.zhiwei.brandkbs2.dao.ChannelRecordRefreshTaskDao;
import com.zhiwei.brandkbs2.pojo.*; import com.zhiwei.brandkbs2.pojo.*;
import com.zhiwei.brandkbs2.service.ChannelService; import com.zhiwei.brandkbs2.service.ChannelService;
import com.zhiwei.brandkbs2.service.SystemInfoService; import com.zhiwei.brandkbs2.service.SystemInfoService;
...@@ -10,11 +11,13 @@ import com.zhiwei.middleware.automaticmark.graphs.Graphs; ...@@ -10,11 +11,13 @@ import com.zhiwei.middleware.automaticmark.graphs.Graphs;
import com.zhiwei.qbjc.bean.pojo.common.MessagePlatform; import com.zhiwei.qbjc.bean.pojo.common.MessagePlatform;
import com.zhiwei.qbjc.bean.pojo.common.Tag; import com.zhiwei.qbjc.bean.pojo.common.Tag;
import com.zhiwei.brandkbs2.dao.HighlightWordDao; import com.zhiwei.brandkbs2.dao.HighlightWordDao;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query; import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
...@@ -45,6 +48,9 @@ public class GlobalPojo { ...@@ -45,6 +48,9 @@ public class GlobalPojo {
@Resource(name = "highlightWordDao") @Resource(name = "highlightWordDao")
private HighlightWordDao highlightWordDao; private HighlightWordDao highlightWordDao;
@Resource(name = "channelRecordRefreshTaskDao")
private ChannelRecordRefreshTaskDao channelRecordRefreshTaskDao;
@Resource(name = "channelServiceImpl") @Resource(name = "channelServiceImpl")
private ChannelService channelService; private ChannelService channelService;
...@@ -143,6 +149,7 @@ public class GlobalPojo { ...@@ -143,6 +149,7 @@ public class GlobalPojo {
COMMON_SENSITIVE_CHANNEL = systemInfoService.getCommonSensitiveChannel(); COMMON_SENSITIVE_CHANNEL = systemInfoService.getCommonSensitiveChannel();
BYTEDANCE_CHANNEL_INFLUENCE = systemInfoService.getByteDanceChannelInfluence(); BYTEDANCE_CHANNEL_INFLUENCE = systemInfoService.getByteDanceChannelInfluence();
updateHighlightGraphs(); updateHighlightGraphs();
updateIncompleteChannelRecordTask();
log.info("{}-获取PLATFORMS-size:{},TAGS-size:{},LINKED_GROUP_ID_TAGS:{},CHANNEL_TAGS:{},MEDIA_TYPE:{},PROJECT_MAP:{},YUQING-PROJECTS-size:{},PROJECT_EMOTION_CHANNEL_DATA-size:{},PROJECT_SENSITIVE_CHANNEL-size:{}, COMMON_SENSITIVE_CHANNEL-size:{},BYTEDANCE_CHANNEL_INFLUENCE-size:{}", logMsg, PLATFORMS.size(), TAGS.size(), log.info("{}-获取PLATFORMS-size:{},TAGS-size:{},LINKED_GROUP_ID_TAGS:{},CHANNEL_TAGS:{},MEDIA_TYPE:{},PROJECT_MAP:{},YUQING-PROJECTS-size:{},PROJECT_EMOTION_CHANNEL_DATA-size:{},PROJECT_SENSITIVE_CHANNEL-size:{}, COMMON_SENSITIVE_CHANNEL-size:{},BYTEDANCE_CHANNEL_INFLUENCE-size:{}", logMsg, PLATFORMS.size(), TAGS.size(),
LINKED_GROUP_ID_TAGS.size(), CHANNEL_TAGS.size(), MEDIA_TYPE.size(), PROJECT_MAP.size(), YU_QING_PROJECTS.size(), PROJECT_EMOTION_CHANNEL_DATA.size(), PROJECT_SENSITIVE_CHANNEL.size(), COMMON_SENSITIVE_CHANNEL.size(), BYTEDANCE_CHANNEL_INFLUENCE.size()); LINKED_GROUP_ID_TAGS.size(), CHANNEL_TAGS.size(), MEDIA_TYPE.size(), PROJECT_MAP.size(), YU_QING_PROJECTS.size(), PROJECT_EMOTION_CHANNEL_DATA.size(), PROJECT_SENSITIVE_CHANNEL.size(), COMMON_SENSITIVE_CHANNEL.size(), BYTEDANCE_CHANNEL_INFLUENCE.size());
} catch (Exception e) { } catch (Exception e) {
...@@ -150,6 +157,19 @@ public class GlobalPojo { ...@@ -150,6 +157,19 @@ public class GlobalPojo {
} }
} }
private void updateIncompleteChannelRecordTask(){
List<ChannelRecordRefreshTask> tasks = channelRecordRefreshTaskDao.findList(new Query(Criteria.where("status").is(ChannelRecordRefreshTask.TaskStatus.UPDATING.getStatus())));
if (CollectionUtils.isEmpty(tasks)){
return;
}
for (ChannelRecordRefreshTask task : tasks) {
Update update = new Update();
update.set("status", ChannelRecordRefreshTask.TaskStatus.NOT_START.getStatus());
update.set("uTime", System.currentTimeMillis());
channelRecordRefreshTaskDao.updateOneByIdWithField(task.getId(), update);
}
}
private void updateHighlightGraphs() { private void updateHighlightGraphs() {
PROJECT_MAP.forEach((key, project) -> { PROJECT_MAP.forEach((key, project) -> {
String id = project.getId(); String id = project.getId();
......
...@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONObject; ...@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONObject;
import com.zhiwei.brandkbs2.model.ResponseResult; import com.zhiwei.brandkbs2.model.ResponseResult;
import com.zhiwei.brandkbs2.pojo.external.*; import com.zhiwei.brandkbs2.pojo.external.*;
import com.zhiwei.brandkbs2.pojo.vo.CrisisCaseWarnVO; import com.zhiwei.brandkbs2.pojo.vo.CrisisCaseWarnVO;
import com.zhiwei.brandkbs2.service.ChannelService;
import com.zhiwei.brandkbs2.service.ProjectService; import com.zhiwei.brandkbs2.service.ProjectService;
import com.zhiwei.brandkbs2.service.ProjectWarnService; import com.zhiwei.brandkbs2.service.ProjectWarnService;
import com.zhiwei.brandkbs2.util.TextUtil; import com.zhiwei.brandkbs2.util.TextUtil;
...@@ -32,6 +33,9 @@ public class InterfaceController { ...@@ -32,6 +33,9 @@ public class InterfaceController {
@Resource(name = "projectWarnServiceImpl") @Resource(name = "projectWarnServiceImpl")
private ProjectWarnService projectWarnService; private ProjectWarnService projectWarnService;
@Resource(name = "channelServiceImpl")
private ChannelService channelService;
@Autowired @Autowired
TextUtil textUtil; TextUtil textUtil;
...@@ -125,4 +129,16 @@ public class InterfaceController { ...@@ -125,4 +129,16 @@ public class InterfaceController {
public ResponseResult getUserProjectList(String userId) { public ResponseResult getUserProjectList(String userId) {
return projectService.getUserProject(userId); return projectService.getUserProject(userId);
} }
@ApiOperation("数据回传标注库品见数据更新-生成刷新任务")
@PostMapping("/channel/refresh-task")
public ResponseResult createChannelRecordRefreshTask(@RequestBody JSONObject json) {
long startTime = json.getLongValue("startTime");
long endTime = json.getLongValue("endTime");
List<JSONObject> brandkbsInfos = json.getJSONArray("brandkbsInfo").toJavaList(JSONObject.class);
String mgroupId = json.getString("mgroupId");
String submitter = json.getString("submitter");
channelService.createChannelRecordRefreshTask(startTime, endTime, brandkbsInfos, mgroupId, submitter);
return ResponseResult.success();
}
} }
package com.zhiwei.brandkbs2.dao;
import com.zhiwei.brandkbs2.pojo.ChannelRecordRefreshTask;
import org.springframework.data.mongodb.core.query.Query;
/**
* @ClassName: ChannelRecordRefreshTaskDao
* @Description 渠道记录更新任务dao
* @author: cjz
* @date: 2024-07-22 11:37
*/
public interface ChannelRecordRefreshTaskDao extends BaseMongoDao<ChannelRecordRefreshTask>{
ChannelRecordRefreshTask findOne(Query query);
}
package com.zhiwei.brandkbs2.dao.impl;
import com.zhiwei.brandkbs2.dao.ChannelRecordRefreshTaskDao;
import com.zhiwei.brandkbs2.pojo.ChannelRecordRefreshTask;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.stereotype.Component;
/**
* @ClassName: ChannelRecordRefreshTaskDaoImpl
* @Description 渠道记录更新任务dao
* @author: cjz
* @date: 2024-07-22 11:37
*/
@Component("channelRecordRefreshTaskDao")
public class ChannelRecordRefreshTaskDaoImpl extends BaseMongoDaoImpl<ChannelRecordRefreshTask> implements ChannelRecordRefreshTaskDao {
private static final String COLLECTION_NAME = "brandkbs_channel_record_refresh_task";
public ChannelRecordRefreshTaskDaoImpl() {
super(COLLECTION_NAME);
}
@Override
public ChannelRecordRefreshTask findOne(Query query) {
return mongoTemplate.findOne(query, ChannelRecordRefreshTask.class, COLLECTION_NAME);
}
}
...@@ -186,7 +186,7 @@ public class ChannelEsDao extends EsClientDao { ...@@ -186,7 +186,7 @@ public class ChannelEsDao extends EsClientDao {
channelEsClient.update(new UpdateRequest().index(index).id(hit.getId()).doc(updateMap), RequestOptions.DEFAULT); channelEsClient.update(new UpdateRequest().index(index).id(hit.getId()).doc(updateMap), RequestOptions.DEFAULT);
updateCount.getAndIncrement(); updateCount.getAndIncrement();
update = true; update = true;
} catch (IOException e) { } catch (Exception e) {
log.info("resetChannelRecordById:{},更新失败", id, e); log.info("resetChannelRecordById:{},更新失败", id, e);
} }
} }
......
...@@ -2,7 +2,6 @@ package com.zhiwei.brandkbs2.es; ...@@ -2,7 +2,6 @@ package com.zhiwei.brandkbs2.es;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.zhiwei.brandkbs2.common.GenericAttribute; import com.zhiwei.brandkbs2.common.GenericAttribute;
import com.zhiwei.brandkbs2.common.GlobalPojo;
import com.zhiwei.brandkbs2.config.Constant; import com.zhiwei.brandkbs2.config.Constant;
import com.zhiwei.brandkbs2.pojo.ChannelIndex; import com.zhiwei.brandkbs2.pojo.ChannelIndex;
import com.zhiwei.brandkbs2.util.Tools; import com.zhiwei.brandkbs2.util.Tools;
...@@ -143,35 +142,25 @@ public class EsClientDao { ...@@ -143,35 +142,25 @@ public class EsClientDao {
return res; return res;
} }
public List<JSONObject> searchRecordUnrelated(long startTime, long endTime, String mgroup) { public List<JSONObject> searchRecordUnrelated(long startTime, long endTime, String mgroupId, List<JSONObject> brandkbsInfo) {
List<JSONObject> res = new ArrayList<>(); List<JSONObject> res = new ArrayList<>();
List<JSONObject> dataList = new ArrayList<>(); List<JSONObject> dataList = new ArrayList<>();
List<Long[]> cutTimes = Tools.cutTimeRange(startTime, endTime, ONE_HOUR * 24); List<Long[]> cutTimes = Tools.cutTimeRange(startTime, endTime, ONE_HOUR * 24);
List<CompletableFuture<List<JSONObject>>> futures = new ArrayList<>(cutTimes.size()); List<CompletableFuture<List<JSONObject>>> futures = new ArrayList<>(cutTimes.size());
cutTimes.forEach(times -> futures.add(CompletableFuture.supplyAsync(() -> searchRecordUnrelatedSingle(times[0], times[1], mgroup), executor))); cutTimes.forEach(times -> futures.add(CompletableFuture.supplyAsync(() -> searchRecordUnrelatedSingle(times[0], times[1], mgroupId), executor)));
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).whenComplete((r, e) -> { CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).whenComplete((r, e) -> {
futures.forEach(f -> dataList.addAll(f.join())); futures.forEach(f -> dataList.addAll(f.join()));
}).join(); }).join();
// 找到该mgroup下关联的项目品牌及竞品 // 找到该mgroup下关联的项目品牌及竞品
Map<String, List<String>> relatedMap = new HashMap<>(); Map<String, List<String>> relatedMap = brandkbsInfo.stream().collect(Collectors.toMap(json -> json.getString("projectId"), json -> {
GlobalPojo.PROJECT_MAP.values().forEach(project -> { List<String> brandIds = new ArrayList<>();
if (project.isStart() && project.isShow()) { String brandId = Objects.equals(json.getString("brandId"), json.getString("projectId")) ? Constant.PRIMARY_CONTEND_ID : json.getString("brandId");
List<String> hitList = new ArrayList<>(); brandIds.add(brandId);
if (mgroup.equals(project.getBrandLinkedGroup())) { return brandIds;
hitList.add(Constant.PRIMARY_CONTEND_ID); }, (List<String> ls1, List<String> ls2) -> {
} ls1.addAll(ls2);
if (null != project.getContendList()) { return ls1;
project.getContendList().forEach(contend -> { }));
if (mgroup.equals(contend.getBrandLinkedGroup())) {
hitList.add(contend.getId());
}
});
}
if (!hitList.isEmpty()) {
relatedMap.put(project.getId(), hitList);
}
}
});
// 筛选没有关联关系需要重置的部分 // 筛选没有关联关系需要重置的部分
dataList.forEach(data -> { dataList.forEach(data -> {
String id = data.getString("id"); String id = data.getString("id");
...@@ -204,17 +193,17 @@ public class EsClientDao { ...@@ -204,17 +193,17 @@ public class EsClientDao {
return res; return res;
} }
public List<JSONObject> searchRecordUnrelatedSingle(long startTime, long endTime, String mgroup) { public List<JSONObject> searchRecordUnrelatedSingle(long startTime, long endTime, String mgroupId) {
List<JSONObject> res = new ArrayList<>(); List<JSONObject> res = new ArrayList<>();
try { try {
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
// TODO 是否需要设立标注字段限制(查询范围更小更精准) // TODO 是否需要设立标注字段限制(查询范围更小更精准)
boolQuery.must(QueryBuilders.termQuery("mgroup.keyword", mgroup)); boolQuery.must(QueryBuilders.termQuery("mgroup_id.keyword", mgroupId));
boolQuery.must(QueryBuilders.rangeQuery("time").gte(startTime).lt(endTime)); boolQuery.must(QueryBuilders.rangeQuery("time").gte(startTime).lte(endTime));
List<JSONObject> results = searchScroll(boolQuery, 10000, new String[]{"id", "brandkbs_cache_maps"}); List<JSONObject> results = searchScroll(boolQuery, 10000, new String[]{"id", "brandkbs_cache_maps"});
res.addAll(results); res.addAll(results);
} catch (IOException e) { } catch (IOException e) {
log.error("startTime:{},endTime:{},searchRecordUnrelatedSingle-", startTime, endTime, e); log.error("startTime:{},endTime:{},mgroupId:{},searchRecordUnrelatedSingle-", startTime, endTime, mgroupId, e);
} }
log.info("startTime:{},endTime:{},size:{}", DF.format(startTime), DF.format(endTime), res.size()); log.info("startTime:{},endTime:{},size:{}", DF.format(startTime), DF.format(endTime), res.size());
return res; return res;
...@@ -278,6 +267,24 @@ public class EsClientDao { ...@@ -278,6 +267,24 @@ public class EsClientDao {
return retryTemplate.execute(context -> searchScroll(sourceBuilder)); return retryTemplate.execute(context -> searchScroll(sourceBuilder));
} }
private List<JSONObject> searchRecordEs(long startTime, long endTime){
List<JSONObject> results = new ArrayList<>();
if (endTime - startTime <= 10 * 60 * 1000L){
log.error("searchRecord分段查询至最小分割仍未满足-时间范围:{}-{}", startTime, endTime);
return Collections.emptyList();
}
try {
QueryBuilder queryBuilder = QueryBuilders.rangeQuery("mtime").gte(startTime).lt(endTime);
results = searchScroll(queryBuilder, 10000, CHANNEL_RECORD_FETCH_SOURCE);
}catch (Exception e){
// 时间分段查询
long midTime = startTime + (endTime - startTime) / 2;
results.addAll(searchRecordEs(startTime, midTime));
results.addAll(searchRecordEs(midTime, endTime));
}
return results;
}
private Pair<Long[], Map<ChannelIndex, ChannelIndex.Record>> searchRecord(long startTime, long endTime) { private Pair<Long[], Map<ChannelIndex, ChannelIndex.Record>> searchRecord(long startTime, long endTime) {
Map<ChannelIndex, ChannelIndex.Record> res = new HashMap<>(); Map<ChannelIndex, ChannelIndex.Record> res = new HashMap<>();
try { try {
......
package com.zhiwei.brandkbs2.pojo;
import com.alibaba.fastjson.JSONObject;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import java.util.List;
/**
* @ClassName: ChannelRecordRefreshTask
* @Description 渠道记录更新任务
* @author: cjz
* @date: 2024-07-22 10:37
*/
@Getter
@Setter
@AllArgsConstructor
public class ChannelRecordRefreshTask extends AbstractBaseMongo{
/**
* 受影响数据最小时间
*/
private Long startTime;
/**
* 受影响数据最大时间
*/
private Long endTime;
/**
* 品见信息:[{projectId:,brandId:}]
*/
List<JSONObject> brandkbsInfo;
/**
* mgroupId
*/
private String mgroupId;
/**
* 任务状态
*/
private String status;
/**
* 创建者
*/
private String submitter;
/**
* 创建时间
*/
private Long cTime;
/**
* 修改时间
*/
private Long uTime;
@Getter
public enum TaskStatus {
NOT_START("未开始"), UPDATING("进行中"), ERROR("出错"), COMPLETED("已完成");
private String status;
TaskStatus(String status) {
this.status = status;
}
}
}
...@@ -333,4 +333,13 @@ public interface ChannelService { ...@@ -333,4 +333,13 @@ public interface ChannelService {
* @return * @return
*/ */
JSONObject getImportantChannelListDetail(String type, String name, String keyword); JSONObject getImportantChannelListDetail(String type, String name, String keyword);
/**
* 生成渠道记录更新任务
* @param startTime
* @param endTime
* @param yuqingProjectId
* @param submitter
*/
void createChannelRecordRefreshTask(Long startTime, Long endTime, List<JSONObject> brandkbsInfos, String yuqingProjectId, String submitter);
} }
...@@ -64,12 +64,12 @@ public interface TaskService{ ...@@ -64,12 +64,12 @@ public interface TaskService{
void generateDailyReport(); void generateDailyReport();
/** /**
* 每月互动量更新 时间范围近六个月 * 每日互动量更新 时间范围近10天
*/ */
void monthlyCustomXhsInteractionUpdate(); void dailyCustomXhsInteractionUpdate();
/** /**
* 每日互动量更新 时间范围近10天 * 定时拉取并进行渠道库更新任务
*/ */
void dailyCustomXhsInteractionUpdate(); void refreshChannelRecord();
} }
...@@ -127,6 +127,9 @@ public class ChannelServiceImpl implements ChannelService { ...@@ -127,6 +127,9 @@ public class ChannelServiceImpl implements ChannelService {
@Resource(name = "importantChannelOverviewDao") @Resource(name = "importantChannelOverviewDao")
private ImportantChannelOverviewDao importantChannelOverviewDao; private ImportantChannelOverviewDao importantChannelOverviewDao;
@Resource(name = "channelRecordRefreshTaskDao")
private ChannelRecordRefreshTaskDao channelRecordRefreshTaskDao;
@Resource(name = "mongoUtil") @Resource(name = "mongoUtil")
MongoUtil mongoUtil; MongoUtil mongoUtil;
...@@ -820,6 +823,12 @@ public class ChannelServiceImpl implements ChannelService { ...@@ -820,6 +823,12 @@ public class ChannelServiceImpl implements ChannelService {
} }
} }
@Override
public void createChannelRecordRefreshTask(Long startTime, Long endTime, List<JSONObject> brandkbsInfos, String yuqingProjectId, String submitter) {
channelRecordRefreshTaskDao.insertOne(new ChannelRecordRefreshTask(startTime, endTime, brandkbsInfos, yuqingProjectId,
ChannelRecordRefreshTask.TaskStatus.NOT_START.getStatus(), submitter, System.currentTimeMillis(), System.currentTimeMillis()));
}
private JSONObject provinceDataProcess(List<ImportantChannel> sortList){ private JSONObject provinceDataProcess(List<ImportantChannel> sortList){
List<JSONObject> resList = new ArrayList<>(); List<JSONObject> resList = new ArrayList<>();
JSONObject jsonObject = new JSONObject(new LinkedHashMap<>()); JSONObject jsonObject = new JSONObject(new LinkedHashMap<>());
......
...@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONObject; ...@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONObject;
import com.zhiwei.brandkbs2.auth.UserThreadLocal; import com.zhiwei.brandkbs2.auth.UserThreadLocal;
import com.zhiwei.brandkbs2.common.GenericAttribute; import com.zhiwei.brandkbs2.common.GenericAttribute;
import com.zhiwei.brandkbs2.common.GlobalPojo; import com.zhiwei.brandkbs2.common.GlobalPojo;
import com.zhiwei.brandkbs2.common.RedisKeyPrefix;
import com.zhiwei.brandkbs2.config.Constant; import com.zhiwei.brandkbs2.config.Constant;
import com.zhiwei.brandkbs2.dao.*; import com.zhiwei.brandkbs2.dao.*;
import com.zhiwei.brandkbs2.enmus.ReportTypeEnum; import com.zhiwei.brandkbs2.enmus.ReportTypeEnum;
...@@ -15,6 +16,7 @@ import com.zhiwei.brandkbs2.listener.ApplicationProjectListener; ...@@ -15,6 +16,7 @@ import com.zhiwei.brandkbs2.listener.ApplicationProjectListener;
import com.zhiwei.brandkbs2.model.CommonCodeEnum; import com.zhiwei.brandkbs2.model.CommonCodeEnum;
import com.zhiwei.brandkbs2.pojo.*; import com.zhiwei.brandkbs2.pojo.*;
import com.zhiwei.brandkbs2.service.*; import com.zhiwei.brandkbs2.service.*;
import com.zhiwei.brandkbs2.util.RedisUtil;
import com.zhiwei.brandkbs2.util.Tools; import com.zhiwei.brandkbs2.util.Tools;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.ListUtils; import org.apache.commons.collections4.ListUtils;
...@@ -24,6 +26,7 @@ import org.apache.logging.log4j.Logger; ...@@ -24,6 +26,7 @@ import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.springframework.data.domain.Sort;
import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query; import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update; import org.springframework.data.mongodb.core.query.Update;
...@@ -83,6 +86,9 @@ public class TaskServiceImpl implements TaskService { ...@@ -83,6 +86,9 @@ public class TaskServiceImpl implements TaskService {
@Resource(name = "customInteractionUpdateRecordDao") @Resource(name = "customInteractionUpdateRecordDao")
private CustomInteractionUpdateRecordDao customInteractionUpdateRecordDao; private CustomInteractionUpdateRecordDao customInteractionUpdateRecordDao;
@Resource(name = "channelRecordRefreshTaskDao")
private ChannelRecordRefreshTaskDao channelRecordRefreshTaskDao;
@Resource(name = "brandkbsTaskServiceImpl") @Resource(name = "brandkbsTaskServiceImpl")
BrandkbsTaskService brandkbsTaskService; BrandkbsTaskService brandkbsTaskService;
...@@ -116,6 +122,9 @@ public class TaskServiceImpl implements TaskService { ...@@ -116,6 +122,9 @@ public class TaskServiceImpl implements TaskService {
@Resource(name = "cacheServiceExecutor") @Resource(name = "cacheServiceExecutor")
ThreadPoolTaskExecutor cacheServiceExecutor; ThreadPoolTaskExecutor cacheServiceExecutor;
@Resource(name = "redisUtil")
RedisUtil redisUtil;
@Override @Override
public void messageFlowCount(int day) { public void messageFlowCount(int day) {
List<Pair<Long[], Map<ChannelIndex, ChannelIndex.Record>>> rangeTimeRecords = esClientDao.searchRecordRecentDay(day); List<Pair<Long[], Map<ChannelIndex, ChannelIndex.Record>>> rangeTimeRecords = esClientDao.searchRecordRecentDay(day);
...@@ -278,9 +287,9 @@ public class TaskServiceImpl implements TaskService { ...@@ -278,9 +287,9 @@ public class TaskServiceImpl implements TaskService {
* 本地测试-刷新历史渠道记录用 * 本地测试-刷新历史渠道记录用
* 不更新渠道表 * 不更新渠道表
*/ */
public void messageFlowCountRefresh(long startTime, long endTime, String mgroup) { public void messageFlowCountRefresh(long startTime, long endTime, String mgroupId, List<JSONObject> brandkbsInfo) {
// 找到mgroup下符合条件的数据 // 找到mgroup下符合条件的数据
List<JSONObject> list = esClientDao.searchRecordUnrelated(startTime, endTime, mgroup); List<JSONObject> list = esClientDao.searchRecordUnrelated(startTime, endTime, mgroupId, brandkbsInfo);
// 移除对应数据记录 // 移除对应数据记录
Integer updateCount = channelEsDao.removeChannelRecordBatch(list); Integer updateCount = channelEsDao.removeChannelRecordBatch(list);
log.info("刷新历史渠道记录-统计结束-受影响结果{}条,实际更新{}条", list.size(), updateCount); log.info("刷新历史渠道记录-统计结束-受影响结果{}条,实际更新{}条", list.size(), updateCount);
...@@ -433,25 +442,6 @@ public class TaskServiceImpl implements TaskService { ...@@ -433,25 +442,6 @@ public class TaskServiceImpl implements TaskService {
} }
@Override @Override
public void monthlyCustomXhsInteractionUpdate() {
// // 近六个月
// long endTime = System.currentTimeMillis();
// long startTime = endTime - Constant.ONE_MONTH * 6;
// // 去除近10天区间,近10天为1天/次
// endTime = endTime - 10 * Constant.ONE_DAY;
// List<Project> projects = GlobalPojo.PROJECT_MAP.values().stream()
// .filter(project -> CollectionUtils.isNotEmpty(project.getModuleShowList()) && project.getModuleShowList().contains("xiaohongshu")).collect(Collectors.toList());
// for (Project project : projects) {
// try {
// List<Pair<String, Map<String, Object>>> res = customXhsInteractionUpdate(project.getId(), startTime, endTime, Collections.singletonList("6433c2251701316728003be4"));
// esClientDao.batchUpdate(res);
// }catch (Exception e){
// log.error("项目:{},定制化舆情分析-互动量更新出错-时间范围:{}-{}", project.getProjectName(), startTime, endTime);
// }
// }
}
@Override
public void dailyCustomXhsInteractionUpdate() { public void dailyCustomXhsInteractionUpdate() {
// 近10天 // 近10天
long endTime = System.currentTimeMillis(); long endTime = System.currentTimeMillis();
...@@ -470,6 +460,39 @@ public class TaskServiceImpl implements TaskService { ...@@ -470,6 +460,39 @@ public class TaskServiceImpl implements TaskService {
monthlyCustomXhsInteractionUpdateNew(projects); monthlyCustomXhsInteractionUpdateNew(projects);
} }
@Override
public void refreshChannelRecord() {
ChannelRecordRefreshTask task = channelRecordRefreshTaskDao.findOne(new Query(Criteria.where("status")
.is(ChannelRecordRefreshTask.TaskStatus.NOT_START.getStatus())).with(Sort.by(Sort.Order.desc("cTime"))));
if (Objects.isNull(task)){
return;
}
updateRefreshTask(task.getId(), ChannelRecordRefreshTask.TaskStatus.UPDATING.getStatus());
try {
messageFlowCountRefresh(task.getStartTime(), task.getEndTime(), task.getMgroupId(), task.getBrandkbsInfo());
}catch (Exception e){
updateRefreshTask(task.getId(), ChannelRecordRefreshTask.TaskStatus.ERROR.getStatus());
log.error("更新渠道库记录异常-taskId:{}-", task.getId(), e);
return;
}
updateRefreshTask(task.getId(), ChannelRecordRefreshTask.TaskStatus.COMPLETED.getStatus());
// 删除缓存
task.getBrandkbsInfo().forEach(info -> {
String projectId = info.getString("projectId");
String brandId = Objects.equals(info.getString("brandId"), projectId) ? Constant.PRIMARY_CONTEND_ID : info.getString("brandId");
Set<String> keys = redisUtil.keys(RedisKeyPrefix.CHANNEL_RECORD_LIST + Tools.concat(projectId, brandId) + "*");
redisUtil.delete(keys);
});
log.info("更新渠道库记录完成-taskId:{}", task.getId());
}
private void updateRefreshTask(String id, String status){
Update update = new Update();
update.set("status", status);
update.set("uTime", System.currentTimeMillis());
channelRecordRefreshTaskDao.updateOneByIdWithField(id, update);
}
private void monthlyCustomXhsInteractionUpdateNew(List<Project> projects) { private void monthlyCustomXhsInteractionUpdateNew(List<Project> projects) {
// 近六个月 // 近六个月
long endTime = System.currentTimeMillis(); long endTime = System.currentTimeMillis();
......
...@@ -119,19 +119,6 @@ public class ControlCenter { ...@@ -119,19 +119,6 @@ public class ControlCenter {
} }
} }
// @Async("scheduledExecutor")
// @Scheduled(cron = "0 30 3 1 * ?")
// public void monthlyCustomXhsInteractionUpdate() {
// log.info("每月互动量更新-启动");
// try {
// taskService.monthlyCustomXhsInteractionUpdate();
// } catch (Exception e) {
// log.error("每月互动量更新-出错", e);
// } finally {
// log.info("每月互动量更新-结束");
// }
// }
@Async("scheduledExecutor") @Async("scheduledExecutor")
@Scheduled(cron = "0 0 5 * * ?") @Scheduled(cron = "0 0 5 * * ?")
public void dailyCustomXhsInteractionUpdate() { public void dailyCustomXhsInteractionUpdate() {
...@@ -144,4 +131,17 @@ public class ControlCenter { ...@@ -144,4 +131,17 @@ public class ControlCenter {
log.info("每日互动量更新-结束"); log.info("每日互动量更新-结束");
} }
} }
@Async("scheduledExecutor")
@Scheduled(cron = "0 0/10 * * * ?")
public void refreshChannelRecord() {
log.info("每十分钟拉取并进行渠道库更新任务-启动");
try {
taskService.refreshChannelRecord();
} catch (Exception e) {
log.error("每十分钟拉取并进行渠道库更新任务-出错", e);
} finally {
log.info("每十分钟拉取并进行渠道库更新任务-结束");
}
}
} }
...@@ -6,6 +6,7 @@ import org.springframework.data.redis.core.HashOperations; ...@@ -6,6 +6,7 @@ import org.springframework.data.redis.core.HashOperations;
import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.Set; import java.util.Set;
...@@ -186,4 +187,8 @@ public class RedisUtil { ...@@ -186,4 +187,8 @@ public class RedisUtil {
public void remove(String key) { public void remove(String key) {
setExpire(key, "-1", 1, TimeUnit.SECONDS); setExpire(key, "-1", 1, TimeUnit.SECONDS);
} }
public void delete(Collection<String> key) {
stringRedisTemplate.delete(key);
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment