Commit 5938b9bd by shenjunjie

Merge branch 'feature' into 'release'

每月互动量更新策略调整

See merge request !563
parents 39666c50 a20dcc64
package com.zhiwei.brandkbs2.dao;
import com.zhiwei.brandkbs2.pojo.CustomInteractionUpdateRecord;
/**
* @ClassName: MonthlyInteractionUpdateRecordDao
* @Description MonthlyInteractionUpdateRecordDao
* @author: cjz
* @date: 2024-07-09 10:30
*/
public interface CustomInteractionUpdateRecordDao extends BaseMongoDao<CustomInteractionUpdateRecord> {
CustomInteractionUpdateRecord findLastRecord(String projectId);
}
package com.zhiwei.brandkbs2.dao.impl;
import com.zhiwei.brandkbs2.dao.CustomInteractionUpdateRecordDao;
import com.zhiwei.brandkbs2.pojo.CustomInteractionUpdateRecord;
import org.springframework.data.domain.Sort;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.stereotype.Component;
/**
* @ClassName: MonthlyInteractionUpdateRecordDao
* @Description MonthlyInteractionUpdateRecordDao
* @author: cjz
* @date: 2024-07-09 10:30
*/
@Component("customInteractionUpdateRecordDao")
public class CustomInteractionUpdateRecordDaoImpl extends BaseMongoDaoImpl<CustomInteractionUpdateRecord> implements CustomInteractionUpdateRecordDao {
private static final String COLLECTION_NAME = "brandkbs_custom_interaction_update_record";
public CustomInteractionUpdateRecordDaoImpl() {
super(COLLECTION_NAME);
}
@Override
public CustomInteractionUpdateRecord findLastRecord(String projectId) {
return mongoTemplate.findOne(new Query().addCriteria(Criteria.where("projectId").is(projectId))
.with(Sort.by(Sort.Order.desc("cTime"))).limit(1), CustomInteractionUpdateRecord.class, COLLECTION_NAME);
}
}
package com.zhiwei.brandkbs2.pojo;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
/**
* @ClassName: MonthlyInteractionUpdateRecord
* @Description 每月互动量更新记录
* @author: cjz
* @date: 2024-07-09 9:48
*/
@Getter
@Setter
@AllArgsConstructor
public class CustomInteractionUpdateRecord extends AbstractBaseMongo{
/**
* 拉取数据的开始时间
*/
private Long startTime;
/**
* 拉取数据的结束时间
*/
private Long endTime;
/**
* 项目id
*/
private String projectId;
/**
* 数据更新节点开始时间
*/
private Long dataStartTime;
/**
* 状态:完成|
*/
private String status;
private Long cTime;
private Long uTime;
}
...@@ -26,6 +26,7 @@ import org.elasticsearch.index.query.BoolQueryBuilder; ...@@ -26,6 +26,7 @@ import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query; import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
...@@ -79,6 +80,9 @@ public class TaskServiceImpl implements TaskService { ...@@ -79,6 +80,9 @@ public class TaskServiceImpl implements TaskService {
@Resource(name = "xiaohongshuRecordDao") @Resource(name = "xiaohongshuRecordDao")
private XiaohongshuRecordDao xiaohongshuRecordDao; private XiaohongshuRecordDao xiaohongshuRecordDao;
@Resource(name = "customInteractionUpdateRecordDao")
private CustomInteractionUpdateRecordDao customInteractionUpdateRecordDao;
@Resource(name = "brandkbsTaskServiceImpl") @Resource(name = "brandkbsTaskServiceImpl")
BrandkbsTaskService brandkbsTaskService; BrandkbsTaskService brandkbsTaskService;
...@@ -457,21 +461,21 @@ public class TaskServiceImpl implements TaskService { ...@@ -457,21 +461,21 @@ public class TaskServiceImpl implements TaskService {
@Override @Override
public void monthlyCustomXhsInteractionUpdate() { public void monthlyCustomXhsInteractionUpdate() {
// 近六个月 // // 近六个月
long endTime = System.currentTimeMillis(); // long endTime = System.currentTimeMillis();
long startTime = endTime - Constant.ONE_MONTH * 6; // long startTime = endTime - Constant.ONE_MONTH * 6;
// 去除近10天区间,近10天为1天/次 // // 去除近10天区间,近10天为1天/次
endTime = endTime - 10 * Constant.ONE_DAY; // endTime = endTime - 10 * Constant.ONE_DAY;
List<Project> projects = GlobalPojo.PROJECT_MAP.values().stream() // List<Project> projects = GlobalPojo.PROJECT_MAP.values().stream()
.filter(project -> CollectionUtils.isNotEmpty(project.getModuleShowList()) && project.getModuleShowList().contains("xiaohongshu")).collect(Collectors.toList()); // .filter(project -> CollectionUtils.isNotEmpty(project.getModuleShowList()) && project.getModuleShowList().contains("xiaohongshu")).collect(Collectors.toList());
for (Project project : projects) { // for (Project project : projects) {
try { // try {
List<Pair<String, Map<String, Object>>> res = customXhsInteractionUpdate(project.getId(), startTime, endTime, Collections.singletonList("6433c2251701316728003be4")); // List<Pair<String, Map<String, Object>>> res = customXhsInteractionUpdate(project.getId(), startTime, endTime, Collections.singletonList("6433c2251701316728003be4"));
esClientDao.batchUpdate(res); // esClientDao.batchUpdate(res);
}catch (Exception e){ // }catch (Exception e){
log.error("项目:{},定制化舆情分析-互动量更新出错-时间范围:{}-{}", project.getProjectName(), startTime, endTime); // log.error("项目:{},定制化舆情分析-互动量更新出错-时间范围:{}-{}", project.getProjectName(), startTime, endTime);
} // }
} // }
} }
@Override @Override
...@@ -489,6 +493,84 @@ public class TaskServiceImpl implements TaskService { ...@@ -489,6 +493,84 @@ public class TaskServiceImpl implements TaskService {
log.error("项目:{},定制化舆情分析-互动量更新出错-时间范围:{}-{}-", project.getProjectName(), startTime, endTime, e); log.error("项目:{},定制化舆情分析-互动量更新出错-时间范围:{}-{}-", project.getProjectName(), startTime, endTime, e);
} }
} }
// 每月互动量更新
monthlyCustomXhsInteractionUpdateNew(projects);
}
private void monthlyCustomXhsInteractionUpdateNew(List<Project> projects) {
// 近六个月
long endTime = System.currentTimeMillis();
long startTime = endTime - Constant.ONE_MONTH * 6;
// 去除近10天区间,近10天为1天/次
endTime = endTime - 10 * Constant.ONE_DAY;
for (Project project : projects) {
// 寻找项目最新的更新记录
CustomInteractionUpdateRecord record = customInteractionUpdateRecordDao.findLastRecord(project.getId());
if (Objects.nonNull(record)) {
String recordTime = Constant.DF_yyyyMM.format(record.getCTime());
String nowTime = Constant.DF_yyyyMM.format(System.currentTimeMillis());
// 本月已更新完成
if (Objects.equals(record.getStatus(), "完成") && Objects.equals(recordTime, nowTime)) {
continue;
}
// 本月未更新
if (!Objects.equals(recordTime, nowTime)) {
CustomInteractionUpdateRecord updateRecord = new CustomInteractionUpdateRecord(startTime, endTime, project.getId(), startTime, "更新中", System.currentTimeMillis(), System.currentTimeMillis());
customInteractionUpdateRecordDao.insertOne(updateRecord);
try {
for (Long[] time : Tools.cutTimeRange(startTime, endTime, Constant.ONE_WEEK)) {
List<Pair<String, Map<String, Object>>> res =
customXhsInteractionUpdate(project.getId(), time[0], time[1], Collections.singletonList("6433c2251701316728003be4"));
esClientDao.batchUpdate(res);
updateStatus(updateRecord.getId(), "更新中", time[1]);
}
updateStatus(updateRecord.getId(), "完成", null);
}catch (Exception e){
log.error("项目:{},每月互动量更新出错-" ,project.getId(), e);
updateStatus(updateRecord.getId(), "出错", null);
}
}
// 本月更新开始但未完成/出错
if (Objects.equals(recordTime, nowTime) && (Objects.equals(record.getStatus(), "更新中") || Objects.equals(record.getStatus(), "出错"))) {
try {
for (Long[] time : Tools.cutTimeRange(record.getDataStartTime(), record.getEndTime(), Constant.ONE_WEEK)) {
List<Pair<String, Map<String, Object>>> res =
customXhsInteractionUpdate(project.getId(), time[0], time[1], Collections.singletonList("6433c2251701316728003be4"));
esClientDao.batchUpdate(res);
updateStatus(record.getId(), "更新中", time[1]);
}
updateStatus(record.getId(), "完成", null);
}catch (Exception e){
log.error("项目:{},每月互动量继续更新出错-" ,project.getId(), e);
updateStatus(record.getId(), "出错", null);
}
}
} else { // 初次更新
CustomInteractionUpdateRecord updateRecord = new CustomInteractionUpdateRecord(startTime, endTime, project.getId(), startTime, "更新中", System.currentTimeMillis(), System.currentTimeMillis());
customInteractionUpdateRecordDao.insertOne(updateRecord);
try {
for (Long[] time : Tools.cutTimeRange(startTime, endTime, Constant.ONE_WEEK)) {
List<Pair<String, Map<String, Object>>> res =
customXhsInteractionUpdate(project.getId(), time[0], time[1], Collections.singletonList("6433c2251701316728003be4"));
esClientDao.batchUpdate(res);
updateStatus(updateRecord.getId(), "更新中", time[1]);
}
updateStatus(updateRecord.getId(), "完成", null);
}catch (Exception e){
log.error("项目:{},每月互动量更新出错-" ,project.getId(), e);
updateStatus(updateRecord.getId(), "出错", null);
}
}
}
}
private void updateStatus(String recordId, String status, Long time){
Update update = new Update();
update.set("status", status);
if (Objects.nonNull(time)){
update.set("dataStartTime", time);
}
customInteractionUpdateRecordDao.updateOneByIdWithField(recordId, update);
} }
private List<Pair<String, Map<String, Object>>> customXhsInteractionUpdate(String projectId, Long startTime, Long endTime, List<String> platforms) throws IOException { private List<Pair<String, Map<String, Object>>> customXhsInteractionUpdate(String projectId, Long startTime, Long endTime, List<String> platforms) throws IOException {
......
...@@ -119,18 +119,18 @@ public class ControlCenter { ...@@ -119,18 +119,18 @@ public class ControlCenter {
} }
} }
@Async("scheduledExecutor") // @Async("scheduledExecutor")
@Scheduled(cron = "0 30 3 1 * ?") // @Scheduled(cron = "0 30 3 1 * ?")
public void monthlyCustomXhsInteractionUpdate() { // public void monthlyCustomXhsInteractionUpdate() {
log.info("每月互动量更新-启动"); // log.info("每月互动量更新-启动");
try { // try {
taskService.monthlyCustomXhsInteractionUpdate(); // taskService.monthlyCustomXhsInteractionUpdate();
} catch (Exception e) { // } catch (Exception e) {
log.error("每月互动量更新-出错", e); // log.error("每月互动量更新-出错", e);
} finally { // } finally {
log.info("每月互动量更新-结束"); // log.info("每月互动量更新-结束");
} // }
} // }
@Async("scheduledExecutor") @Async("scheduledExecutor")
@Scheduled(cron = "0 0 5 * * ?") @Scheduled(cron = "0 0 5 * * ?")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment