Commit 051521e4 by 陈健智

工具库-事件影响力更新调整

parent 773a8118
......@@ -60,6 +60,14 @@ public class BytedanceCustomEventUpdateTask extends AbstractBaseMongo{
*/
private Boolean cancel;
/**
* 已处理的数据量
*/
private int schedule;
/**
* 待处理的数据总量
*/
private int totalSchedule;
/**
* 项目id
*/
private String projectId;
......@@ -139,6 +147,8 @@ public class BytedanceCustomEventUpdateTask extends AbstractBaseMongo{
task.setEventInfo(new ArrayList<>());
task.setErrorData(new ArrayList<>());
task.setCancel(false);
task.setSchedule(0);
task.setTotalSchedule(0);
task.setProjectId(UserThreadLocal.getProjectId());
task.setUserId(UserThreadLocal.getUserId());
task.setNickName(UserThreadLocal.getNickname());
......@@ -157,6 +167,8 @@ public class BytedanceCustomEventUpdateTask extends AbstractBaseMongo{
task.setEventInfo(null);
task.setErrorData(null);
task.setCancel(false);
task.setSchedule(0);
task.setTotalSchedule(0);
task.setProjectId(UserThreadLocal.getProjectId());
task.setUserId(UserThreadLocal.getUserId());
task.setNickName(UserThreadLocal.getNickname());
......
......@@ -21,4 +21,8 @@ public class BytedanceCustomPlatformWeight extends AbstractBaseMongo{
* 权重
*/
private double weight;
/**
* 常量
*/
private double constant;
}
......@@ -264,6 +264,10 @@ public class ToolsetServiceImpl implements ToolsetService {
* @return
*/
private List<JSONObject> urlInteractionUpdate(List<String> urls){
return urlInteractionUpdate(urls, null);
}
private List<JSONObject> urlInteractionUpdate(List<String> urls, String taskId){
// 通过url获取域名进而获取任务类型
Map<String, String> map = new HashMap<>();
urls.forEach(url -> map.compute(url, (key, value) -> {
......@@ -287,6 +291,10 @@ public class ToolsetServiceImpl implements ToolsetService {
for (List<String> ids : ListUtils.partition(taskIdList, 50)) {
result.addAll(getInteractionResult(ids));
}
if (Objects.nonNull(taskId)) {
// 更新处理进度
addProcessSchedule(taskId, taskIdList.size());
}
}
return result;
}
......@@ -329,13 +337,18 @@ public class ToolsetServiceImpl implements ToolsetService {
try {
ResponseEntity<JSONObject> response = restTemplate.postForEntity(interactionResultUrl, request, JSONObject.class);
JSONObject body = response.getBody();
if (Objects.nonNull(body) && 200 == body.getIntValue("code") && !body.isEmpty() && !body.getJSONArray("data").toJavaList(JSONObject.class).isEmpty() &&
body.getJSONArray("data").toJavaList(JSONObject.class).stream().map(m -> m.getIntValue("code")).allMatch(code -> 200 == code)) {
if (Objects.nonNull(body) && !body.isEmpty() && 200 == body.getIntValue("code") && CollectionUtils.isNotEmpty(body.getJSONArray("data").toJavaList(JSONObject.class)) &&
body.getJSONArray("data").toJavaList(JSONObject.class).stream().map(m -> m.getIntValue("code")).allMatch(code -> 200 == code || 403 == code) &&
taskIds.size() == body.getJSONArray("data").toJavaList(JSONObject.class).size()) {
res = body.getJSONArray("data").toJavaList(JSONObject.class);
break;
} else {
Thread.sleep(10000L);
}
// 最后一轮即使不满足条件也该返回
if (29 == i && Objects.nonNull(body) && !body.isEmpty() && 200 == body.getIntValue("code") && CollectionUtils.isNotEmpty(body.getJSONArray("data").toJavaList(JSONObject.class))){
res = body.getJSONArray("data").toJavaList(JSONObject.class);
}
}catch (Exception e){
ExceptionCast.cast(CommonCodeEnum.FAIL, "轮询互动量更新任务异常,taskId:" + taskIds, e);
}
......@@ -362,7 +375,8 @@ public class ToolsetServiceImpl implements ToolsetService {
readExcel.setClazz(UploadBytedanceEventDTO.class);
readExcel.setAnalysisEventListener(new BytedanceEventListener(task.getId(), data));
EasyExcelUtil.read(file, readExcel);
// 更新处理进度
resetProcessSchedule(task.getId(), data.size());
ApplicationProjectListener.getThreadPool().execute(() -> processEventUpdate(data, task, extraCompute));
}
......@@ -386,6 +400,10 @@ public class ToolsetServiceImpl implements ToolsetService {
// 将旧数据与新数据合并
List<BytedanceCustomEventUpdateTaskData> taskData = bytedanceCustomEventUpdateTaskDataDao.findList(new Query(Criteria.where("taskId").is(taskId)));
taskData.addAll(data);
// 清空错误旧数据
task.getErrorData().clear();
// 更新进度
resetProcessSchedule(task.getId(), taskData.size());
ApplicationProjectListener.getThreadPool().execute(() -> processEventUpdate(taskData, task, task.getExtraCompute()));
}
......@@ -405,6 +423,8 @@ public class ToolsetServiceImpl implements ToolsetService {
jsonObject.put("taskId", task.getId());
jsonObject.put("extraCompute", task.getExtraCompute());
jsonObject.put("cTime", task.getCTime());
jsonObject.put("schedule", task.getSchedule());
jsonObject.put("totalSchedule", task.getTotalSchedule());
if ((Objects.equals(task.getTaskStatus(), BytedanceCustomEventUpdateTask.TaskStatus.FINISH.getStatus()) &&
Objects.equals(task.getProcessStatus(), BytedanceCustomEventUpdateTask.ProcessStatus.FINISH.getStatus())) ||
Objects.equals(task.getTaskStatus(), BytedanceCustomEventUpdateTask.TaskStatus.ERROR.getStatus()) ||
......@@ -527,8 +547,7 @@ public class ToolsetServiceImpl implements ToolsetService {
String taskId = task.getId();
try {
List<BytedanceCustomEventUpdateTask.EventInfo> eventInfos = new ArrayList<>();
Map<String, List<BytedanceCustomEventUpdateTaskData>> map =
taskData.stream().collect(Collectors.groupingBy(BytedanceCustomEventUpdateTaskData::getEventName));
Map<String, List<BytedanceCustomEventUpdateTaskData>> map = taskData.stream().collect(Collectors.groupingBy(BytedanceCustomEventUpdateTaskData::getEventName));
for (Map.Entry<String, List<BytedanceCustomEventUpdateTaskData>> entry : map.entrySet()) {
List<BytedanceCustomEventUpdateTaskData> data = entry.getValue();
BytedanceCustomEventUpdateTask.EventInfo eventInfo = new BytedanceCustomEventUpdateTask.EventInfo();
......@@ -536,11 +555,17 @@ public class ToolsetServiceImpl implements ToolsetService {
// 计算影响力指数
eventInfo.setInfluence(computeInf(data));
eventInfos.add(eventInfo);
// 更新渠道匹配处理进度
addProcessSchedule(taskId, data.size());
}
// 若需要计算实时传播力与传播影响力
if (extraCompute) {
// 更新为互动量更新处理状态
updateStatus(taskId, BytedanceCustomEventUpdateTask.TaskStatus.CALCULATING, BytedanceCustomEventUpdateTask.ProcessStatus.INTERACTION_UPDATE);
long count = taskData.stream().filter(data -> Objects.isNull(data.getWechatRead()) && Objects.isNull(data.getWechatReading()) && Objects.isNull(data.getWeiboForward())
&& Objects.isNull(data.getWeiboComment()) && Objects.isNull(data.getWeiboLike())).count();
// 更新互动量更新处理进度
resetProcessSchedule(taskId, (int) count);
for (BytedanceCustomEventUpdateTask.EventInfo eventInfo : eventInfos) {
List<BytedanceCustomEventUpdateTaskData> data = map.get(eventInfo.getName());
// 计算实时传播力
......@@ -550,12 +575,6 @@ public class ToolsetServiceImpl implements ToolsetService {
}
updateStatus(taskId, BytedanceCustomEventUpdateTask.TaskStatus.CALCULATING, BytedanceCustomEventUpdateTask.ProcessStatus.INFLUENCE_COMPUTE);
}
// 移除数据中的错误数据
if (CollectionUtils.isNotEmpty(task.getErrorData())){
taskData.removeAll(task.getErrorData());
}
bytedanceCustomEventUpdateTaskDataDao.deleteOneByQuery(new Query(Criteria.where("taskId").is(taskId)));
bytedanceCustomEventUpdateTaskDataDao.insertMany(taskData);
task.setEventInfo(eventInfos);
updateStatus(task, BytedanceCustomEventUpdateTask.TaskStatus.FINISH, BytedanceCustomEventUpdateTask.ProcessStatus.FINISH);
}catch (Exception e){
......@@ -609,8 +628,10 @@ public class ToolsetServiceImpl implements ToolsetService {
}
}
updateStatus(taskId, null, BytedanceCustomEventUpdateTask.ProcessStatus.INFLUENCE_COMPUTE);
resetProcessSchedule(taskId, data.size());
// 计算影响力指数
task.setContendInfluence(computeInf(data));
addProcessSchedule(taskId, data.size());
updateStatus(task, BytedanceCustomEventUpdateTask.TaskStatus.FINISH, BytedanceCustomEventUpdateTask.ProcessStatus.FINISH);
}catch (Exception e){
log.info("竞品字节事件影响力补充计算出错-taskId:{}", taskId, e);
......@@ -656,8 +677,22 @@ public class ToolsetServiceImpl implements ToolsetService {
private void updateStatus(String taskId, BytedanceCustomEventUpdateTask.TaskStatus taskStatus){
BytedanceCustomEventUpdateTask task = bytedanceCustomEventUpdateTaskDao.findOneById(taskId);
updateStatus(task, taskStatus, null);
}
private void addProcessSchedule(String taskId, Integer schedule){
BytedanceCustomEventUpdateTask task = bytedanceCustomEventUpdateTaskDao.findOneById(taskId);
task.setUTime(System.currentTimeMillis());
task.setTaskStatus(taskStatus.getStatus());
int taskSchedule = task.getSchedule();
task.setSchedule(taskSchedule + schedule);
bytedanceCustomEventUpdateTaskDao.updateOne(task);
}
private void resetProcessSchedule(String taskId, Integer total){
BytedanceCustomEventUpdateTask task = bytedanceCustomEventUpdateTaskDao.findOneById(taskId);
task.setUTime(System.currentTimeMillis());
task.setSchedule(0);
task.setTotalSchedule(total);
bytedanceCustomEventUpdateTaskDao.updateOne(task);
}
......@@ -725,9 +760,9 @@ public class ToolsetServiceImpl implements ToolsetService {
// 微信文章数
int wechatArticle = (int) data.stream().filter(article -> Objects.equals("微信", article.getPlatform())).count();
// 网媒文章数
int media = (int) data.stream().filter(article -> Objects.equals("网媒", article.getPlatform())).count();
int normalMediaArticle = (int) data.stream().filter(article -> Objects.equals("网媒", article.getPlatform())).count();
// 其他平台文章数
int otherPlatform = (int) data.stream().filter(article -> !Objects.equals("网媒", article.getPlatform())
int otherPlatformArticle = (int) data.stream().filter(article -> !Objects.equals("网媒", article.getPlatform())
&& !Objects.equals("微博", article.getPlatform()) && !Objects.equals("微信", article.getPlatform())).count();
for (BytedanceCustomEventUpdateTaskData datum : data) {
// 选填字段全部未填,后续走互动量更新
......@@ -742,48 +777,94 @@ public class ToolsetServiceImpl implements ToolsetService {
wechatRead = wechatRead + datum.getWechatRead();
wechatReading = wechatReading + datum.getWechatReading();
}
data.removeAll(willBeInteractionUpdateData);
// 选填字段全部未填的链接互动量更新结果
try {
List<String> successUrl = new ArrayList<>();
List<String> urls = willBeInteractionUpdateData.stream().map(BytedanceCustomEventUpdateTaskData::getUrl).collect(Collectors.toList());
List<JSONObject> jsonObjects = urlInteractionUpdate(urls);
Map<String, BytedanceCustomEventUpdateTaskData> urlMap = willBeInteractionUpdateData.stream().collect(Collectors.toMap(BytedanceCustomEventUpdateTaskData::getUrl, o -> o, (v1, v2) -> v1));
// 互动量更新
List<JSONObject> jsonObjects = urlInteractionUpdate(urls, task.getId());
for (JSONObject jsonObject : jsonObjects) {
if (200 == jsonObject.getInteger("code")) {
// 成功的链接
String url = jsonObject.getString("url");
successUrl.add(url);
// 转发数
int repostCount = Objects.isNull(jsonObject.getInteger("repostCount")) ? 0 : jsonObject.getIntValue("repostCount");
urlMap.get(url).setWeiboForward(repostCount);
// 评论数
int commentCount = Objects.isNull(jsonObject.getInteger("commentCount")) ? 0 : jsonObject.getIntValue("commentCount");
urlMap.get(url).setWeiboComment(commentCount);
// 点赞数
int likeCount = Objects.isNull(jsonObject.getInteger("likeCount")) ? 0 : jsonObject.getIntValue("likeCount");
urlMap.get(url).setWeiboLike(likeCount);
// 阅读数
int readCount = Objects.isNull(jsonObject.getInteger("readCount")) ? 0 : jsonObject.getIntValue("readCount");
// 在看数
int kanCount = Objects.isNull(jsonObject.getInteger("kanCount")) ? 0 : jsonObject.getIntValue("kanCount");
urlMap.get(url).setWechatRead(readCount);
// 在看数,实际使用的是返回结果中的分享数
int shareCount = Objects.isNull(jsonObject.getInteger("shareCount")) ? 0 : jsonObject.getIntValue("shareCount");
urlMap.get(url).setWechatReading(shareCount);
// 累加
weiboForward = weiboForward + repostCount;
weiboComment = weiboComment + commentCount;
weiboLike = weiboLike + likeCount;
wechatRead = wechatRead + readCount;
wechatReading = wechatReading + kanCount;
}
wechatReading = wechatReading + shareCount;
}
}
// 报错数据
urls.removeAll(successUrl); // 去除成功数据,剩余即为失败数据
List<BytedanceCustomEventUpdateTaskData> errorData = urls.stream().map(urlMap::get).collect(Collectors.toList());
List<BytedanceCustomEventUpdateTaskData> error = task.getErrorData();
error.addAll(errorData);
task.setErrorData(error);
// 互动量更新成功数据
List<BytedanceCustomEventUpdateTaskData> successData = successUrl.stream().map(urlMap::get).collect(Collectors.toList());
data.addAll(successData);
}catch (Exception e){
// 报错数据:为互动量更新失败数据,更新失败数据不参与计算
log.info("字节事件影响力更新-互动量更新出错-id:{}", task.getId(), e);
List<BytedanceCustomEventUpdateTaskData> errorData = task.getErrorData();
errorData.addAll(willBeInteractionUpdateData);
task.setErrorData(errorData);
}
// 平台指标权重
Map<String, Double> weightMap = bytedanceCustomPlatformWeightDao.findList(new Query())
// 存储数据以用于补充计算
bytedanceCustomEventUpdateTaskDataDao.deleteOneByQuery(new Query(Criteria.where("taskId").is(task.getId())));
bytedanceCustomEventUpdateTaskDataDao.insertMany(data);
// 平台指标权重,常量
Map<String, BytedanceCustomPlatformWeight> weightMap = bytedanceCustomPlatformWeightDao.findList(new Query())
.stream()
.collect(Collectors.toMap(BytedanceCustomPlatformWeight::getType, BytedanceCustomPlatformWeight::getWeight));
// 各平台数据指标与权重相乘求和
return BigDecimal.valueOf(media).multiply(BigDecimal.valueOf(weightMap.get("网媒文章数")))
.add(BigDecimal.valueOf(otherPlatform).multiply(BigDecimal.valueOf(weightMap.get("其他平台文章数"))))
.add(BigDecimal.valueOf(wechatArticle).multiply(BigDecimal.valueOf(weightMap.get("微信文章数"))))
.add(BigDecimal.valueOf(wechatRead).multiply(BigDecimal.valueOf(weightMap.get("微信阅读数"))))
.add(BigDecimal.valueOf(wechatReading).multiply(BigDecimal.valueOf(weightMap.get("微信在看数"))))
.add(BigDecimal.valueOf(weiboArticle).multiply(BigDecimal.valueOf(weightMap.get("微博消息数"))))
.add(BigDecimal.valueOf(weiboForward).multiply(BigDecimal.valueOf(weightMap.get("微博转发数"))))
.add(BigDecimal.valueOf(weiboComment).multiply(BigDecimal.valueOf(weightMap.get("微博评论数"))))
.add(BigDecimal.valueOf(weiboLike).multiply(BigDecimal.valueOf(weightMap.get("微博点赞数"))))
.collect(Collectors.toMap(BytedanceCustomPlatformWeight::getType, o -> o));
BytedanceCustomPlatformWeight normalMediaArticleWeight = weightMap.get("网媒文章数");
BytedanceCustomPlatformWeight otherPlatformArticleWeight = weightMap.get("其他平台文章数");
BytedanceCustomPlatformWeight wechatArticleWeight = weightMap.get("微信文章数");
BytedanceCustomPlatformWeight wechatReadWeight = weightMap.get("微信阅读数");
BytedanceCustomPlatformWeight wechatReadingWeight = weightMap.get("微信在看数");
BytedanceCustomPlatformWeight weiboArticleWeight = weightMap.get("微博消息数");
BytedanceCustomPlatformWeight weiboForwardWeight = weightMap.get("微博转发数");
BytedanceCustomPlatformWeight weiboCommentWeight = weightMap.get("微博评论数");
BytedanceCustomPlatformWeight weiboLikeWeight = weightMap.get("微博点赞数");
// 标准值计算
BigDecimal bigDecimal100 = BigDecimal.valueOf(100);
BigDecimal NM = BigDecimal.valueOf(normalMediaArticle).divide(BigDecimal.valueOf(normalMediaArticleWeight.getConstant()), 9, RoundingMode.HALF_UP).multiply(bigDecimal100);
BigDecimal OP = BigDecimal.valueOf(otherPlatformArticle).divide(BigDecimal.valueOf(otherPlatformArticleWeight.getConstant()), 9, RoundingMode.HALF_UP).multiply(bigDecimal100);
BigDecimal WC = BigDecimal.valueOf(wechatArticle).divide(BigDecimal.valueOf(wechatArticleWeight.getConstant()), 9, RoundingMode.HALF_UP).multiply(bigDecimal100);
BigDecimal WCR = BigDecimal.valueOf(Math.log(wechatRead + 1)).divide(BigDecimal.valueOf(Math.log(wechatReadWeight.getConstant() + 1)), 9, RoundingMode.HALF_UP).multiply(bigDecimal100);
BigDecimal WCL = BigDecimal.valueOf(Math.log(wechatReading + 1)).divide(BigDecimal.valueOf(Math.log(wechatReadingWeight.getConstant() + 1)), 9, RoundingMode.HALF_UP).multiply(bigDecimal100);
BigDecimal WB = BigDecimal.valueOf(weiboArticle).divide(BigDecimal.valueOf(weiboArticleWeight.getConstant()), 9, RoundingMode.HALF_UP).multiply(bigDecimal100);
BigDecimal WBF = BigDecimal.valueOf(Math.log(weiboForward + 1)).divide(BigDecimal.valueOf(Math.log(weiboForwardWeight.getConstant() + 1)), 9, RoundingMode.HALF_UP).multiply(bigDecimal100);
BigDecimal WBC = BigDecimal.valueOf(Math.log(weiboComment + 1)).divide(BigDecimal.valueOf(Math.log(weiboCommentWeight.getConstant() + 1)), 9, RoundingMode.HALF_UP).multiply(bigDecimal100);
BigDecimal WBL = BigDecimal.valueOf(Math.log(weiboLike + 1)).divide(BigDecimal.valueOf(Math.log(weiboLikeWeight.getConstant() + 1)), 9, RoundingMode.HALF_UP).multiply(bigDecimal100);
// 标准值与权重计算得RMI
return NM.multiply(BigDecimal.valueOf(normalMediaArticleWeight.getWeight()))
.add(OP.multiply(BigDecimal.valueOf(otherPlatformArticleWeight.getWeight())))
.add(WC.multiply(BigDecimal.valueOf(wechatArticleWeight.getWeight())))
.add(WCR.multiply(BigDecimal.valueOf(wechatReadWeight.getWeight())))
.add(WCL.multiply(BigDecimal.valueOf(wechatReadingWeight.getWeight())))
.add(WB.multiply(BigDecimal.valueOf(weiboArticleWeight.getWeight())))
.add(WBF.multiply(BigDecimal.valueOf(weiboForwardWeight.getWeight())))
.add(WBC.multiply(BigDecimal.valueOf(weiboCommentWeight.getWeight())))
.add(WBL.multiply(BigDecimal.valueOf(weiboLikeWeight.getWeight())))
.setScale(7, RoundingMode.HALF_UP)
.doubleValue();
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment