Commit 790d1431 by shenjunjie

调整入库统计

parent 426ad205
...@@ -79,4 +79,22 @@ public class TaskPoolConfig { ...@@ -79,4 +79,22 @@ public class TaskPoolConfig {
return executor; return executor;
} }
@Bean
public ThreadPoolTaskExecutor taskServiceExecutor() {
log.info("start taskServiceExecutor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 配置核心线程数
executor.setCorePoolSize(8);
// 配置最大线程数
executor.setMaxPoolSize(16);
// 配置线程池中的线程的名称前缀
executor.setThreadNamePrefix("taskServiceExecutor-");
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 执行初始化
executor.initialize();
return executor;
}
} }
...@@ -197,8 +197,8 @@ public class AppChannelController extends BaseController { ...@@ -197,8 +197,8 @@ public class AppChannelController extends BaseController {
@ApiImplicitParam(name = "endTime", value = "结束时间", required = true, paramType = "query", dataType = "long"), @ApiImplicitParam(name = "endTime", value = "结束时间", required = true, paramType = "query", dataType = "long"),
@ApiImplicitParam(name = "page", value = "页码", defaultValue = "1", paramType = "query", dataType = "int"), @ApiImplicitParam(name = "page", value = "页码", defaultValue = "1", paramType = "query", dataType = "int"),
@ApiImplicitParam(name = "pageSize", value = "页码大小", defaultValue = "10", paramType = "query", dataType = "int"), @ApiImplicitParam(name = "pageSize", value = "页码大小", defaultValue = "10", paramType = "query", dataType = "int"),
@ApiImplicitParam(name = "channelId", value = "渠道ID", required = true, paramType = "query", dataType = "int"), @ApiImplicitParam(name = "channelId", value = "渠道ID", required = true, paramType = "query", dataType = "string"),
@ApiImplicitParam(name = "contendId", value = "竞品ID", defaultValue = "0", paramType = "query", dataType = "int") @ApiImplicitParam(name = "contendId", value = "竞品ID", defaultValue = "0", paramType = "query", dataType = "string")
}) })
@GetMapping("/events") @GetMapping("/events")
public ResponseResult getEvents(@RequestParam(value = "startTime") Long startTime, public ResponseResult getEvents(@RequestParam(value = "startTime") Long startTime,
...@@ -214,13 +214,15 @@ public class AppChannelController extends BaseController { ...@@ -214,13 +214,15 @@ public class AppChannelController extends BaseController {
@ApiImplicitParams({ @ApiImplicitParams({
@ApiImplicitParam(name = "startTime", value = "开始时间", required = true, paramType = "query", dataType = "long"), @ApiImplicitParam(name = "startTime", value = "开始时间", required = true, paramType = "query", dataType = "long"),
@ApiImplicitParam(name = "endTime", value = "结束时间", required = true, paramType = "query", dataType = "long"), @ApiImplicitParam(name = "endTime", value = "结束时间", required = true, paramType = "query", dataType = "long"),
@ApiImplicitParam(name = "channelId", value = "渠道ID", required = true, paramType = "query", dataType = "string") @ApiImplicitParam(name = "channelId", value = "渠道ID", required = true, paramType = "query", dataType = "string"),
@ApiImplicitParam(name = "contendId", value = "竞品ID", defaultValue = "0", paramType = "query", dataType = "string")
}) })
@GetMapping("/events/download") @GetMapping("/events/download")
public ResponseResult downloadEvents(@RequestParam(value = "startTime") long startTime, public ResponseResult downloadEvents(@RequestParam(value = "startTime") long startTime,
@RequestParam(value = "endTime") long endTime, @RequestParam(value = "endTime") long endTime,
@RequestParam("channelId") String channelId) { @RequestParam("channelId") String channelId,
List<ExportAppChannelEventDTO> exportAppChannelEventDTOS = channelService.downloadEventsByTime(startTime, endTime, channelId); @RequestParam(value = "contendId", defaultValue = "0") String contendId) {
List<ExportAppChannelEventDTO> exportAppChannelEventDTOS = channelService.downloadEventsByTime(startTime, endTime, channelId, contendId);
EasyExcelUtil.download(channelId + "渠道列表数据", "sheet1", ExportAppChannelEventDTO.class, exportAppChannelEventDTOS, response); EasyExcelUtil.download(channelId + "渠道列表数据", "sheet1", ExportAppChannelEventDTO.class, exportAppChannelEventDTOS, response);
return ResponseResult.success(); return ResponseResult.success();
} }
......
...@@ -68,23 +68,42 @@ public class EventDaoImpl extends BaseMongoDaoImpl<Event> implements EventDao { ...@@ -68,23 +68,42 @@ public class EventDaoImpl extends BaseMongoDaoImpl<Event> implements EventDao {
@Override @Override
public long getEventCount(ChannelIndex channelIndex, List<String> eventEmotions, String emotion) { public long getEventCount(ChannelIndex channelIndex, List<String> eventEmotions, String emotion) {
Criteria criteria = Criteria.where("emotion").in(eventEmotions). String primaryCollection = getAggreeCollection();
and("projectId").is(channelIndex.getProjectId()).and("contendId").is(channelIndex.getContendId()); Criteria criteria = Criteria.where("channelFid").is(channelIndex.getFid());
if (null != emotion) {
criteria.and("emotion").is(emotion);
}
String aliasName = "events";
Criteria lookUpCriteria = Criteria.where(aliasName + ".emotion").in(eventEmotions);
List<AggregationOperation> operations = Arrays.asList(Aggregation.match(criteria),
Aggregation.lookup(COLLECTION_NAME, "eventId", "_id", aliasName), Aggregation.match(lookUpCriteria),
Aggregation.project("events._id"));
Aggregation aggregation = Aggregation.newAggregation(operations);
AggregationResults<JSONObject> aggregate = mongoTemplate.aggregate(aggregation, primaryCollection, JSONObject.class);
List<JSONObject> mappedResults = aggregate.getMappedResults();
return mappedResults.stream().map(json -> json.getString("_id")).distinct().count();
}
/**
* 以事件为主表联合查询 应用于getEventCount,已被替代
* @param channelIndex
* @param eventEmotions
* @param emotion
* @return
*/
@Deprecated
private Aggregation eventCountAggregationPrimaryEvent(ChannelIndex channelIndex, List<String> eventEmotions, String emotion) {
Criteria criteria = Criteria.where("emotion").in(eventEmotions).and("projectId").is(channelIndex.getProjectId()).and("contendId").is(channelIndex.getContendId());
String aliasName = "articles"; String aliasName = "articles";
Criteria lookUpCriteria = Criteria.where(aliasName.concat(".channelFid")).is(channelIndex.getFid()); Criteria lookUpCriteria = Criteria.where(aliasName.concat(".channelFid")).is(channelIndex.getFid());
if (null != emotion) { if (null != emotion) {
lookUpCriteria.and(aliasName.concat(".emotion")).is(emotion); lookUpCriteria.and(aliasName.concat(".emotion")).is(emotion);
} }
// long count = mongoTemplate.count(Query.query(criteria), COLLECTION_NAME); // long count = mongoTemplate.count(Query.query(criteria), COLLECTION_NAME);
List<AggregationOperation> operations = Arrays.asList( List<AggregationOperation> operations = Arrays.asList(Aggregation.match(criteria),
Aggregation.match(criteria),
// aoc -> new Document("$addFields", new Document("_id", new Document("$toString", "$_id"))), 该方式mongo版本不支持 // aoc -> new Document("$addFields", new Document("_id", new Document("$toString", "$_id"))), 该方式mongo版本不支持
Aggregation.lookup(getAggreeCollection(), "_id", "eventId", aliasName), Aggregation.lookup(getAggreeCollection(), "_id", "eventId", aliasName), Aggregation.match(lookUpCriteria));
Aggregation.match(lookUpCriteria) return Aggregation.newAggregation(operations);
);
AggregationResults<JSONObject> aggregate = mongoTemplate.aggregate(Aggregation.newAggregation(operations), COLLECTION_NAME, JSONObject.class);
List<JSONObject> mappedResults = aggregate.getMappedResults();
return mappedResults.size();
} }
private Map<Long, List<Event>> getEventTimePattern(ChannelIndex channelIndex, Long startTime, Long endTime, int nrOfChars) { private Map<Long, List<Event>> getEventTimePattern(ChannelIndex channelIndex, Long startTime, Long endTime, int nrOfChars) {
...@@ -95,13 +114,10 @@ public class EventDaoImpl extends BaseMongoDaoImpl<Event> implements EventDao { ...@@ -95,13 +114,10 @@ public class EventDaoImpl extends BaseMongoDaoImpl<Event> implements EventDao {
String aliasName = "articles"; String aliasName = "articles";
Criteria lookUpCriteria = Criteria.where(aliasName.concat(".channelFid")).is(channelIndex.getFid()); Criteria lookUpCriteria = Criteria.where(aliasName.concat(".channelFid")).is(channelIndex.getFid());
// 分组 // 分组
Aggregation agg = Aggregation.newAggregation(Aggregation.match(criteria), Aggregation agg = Aggregation.newAggregation(Aggregation.match(criteria), Aggregation.project("startTime", "_id", "emotion", "influence", "title", "endTime", "eventTag"),
Aggregation.project("startTime", "_id", "emotion", "influence", "title", "endTime", "eventTag"),
// 想通过截取的方式来分组,但是会有8小时误差问题未解决,故舍弃 // 想通过截取的方式来分组,但是会有8小时误差问题未解决,故舍弃
// .andExpression("add(new java.util.Date(8),startTime)").substring(0, nrOfChars).as("patternDate"), // .andExpression("add(new java.util.Date(8),startTime)").substring(0, nrOfChars).as("patternDate"),
Aggregation.lookup(getAggreeCollection(), "_id", "eventId", aliasName), Aggregation.lookup(getAggreeCollection(), "_id", "eventId", aliasName), Aggregation.match(lookUpCriteria));
Aggregation.match(lookUpCriteria)
);
AggregationResults<JSONObject> aggregate = mongoTemplate.aggregate(agg, COLLECTION_NAME, JSONObject.class); AggregationResults<JSONObject> aggregate = mongoTemplate.aggregate(agg, COLLECTION_NAME, JSONObject.class);
List<JSONObject> mappedResults = aggregate.getMappedResults(); List<JSONObject> mappedResults = aggregate.getMappedResults();
for (JSONObject mappedResult : mappedResults) { for (JSONObject mappedResult : mappedResults) {
......
...@@ -2,6 +2,7 @@ package com.zhiwei.brandkbs2.es; ...@@ -2,6 +2,7 @@ package com.zhiwei.brandkbs2.es;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.zhiwei.brandkbs2.common.GenericAttribute; import com.zhiwei.brandkbs2.common.GenericAttribute;
import com.zhiwei.brandkbs2.config.Constant;
import com.zhiwei.brandkbs2.pojo.ChannelIndex; import com.zhiwei.brandkbs2.pojo.ChannelIndex;
import com.zhiwei.brandkbs2.util.Tools; import com.zhiwei.brandkbs2.util.Tools;
import lombok.Getter; import lombok.Getter;
...@@ -89,6 +90,8 @@ public class EsClientDao { ...@@ -89,6 +90,8 @@ public class EsClientDao {
public List<Pair<Long[], Map<ChannelIndex, ChannelIndex.Record>>> searchRecordRecentDay(int day) { public List<Pair<Long[], Map<ChannelIndex, ChannelIndex.Record>>> searchRecordRecentDay(int day) {
List<Pair<Long[], Map<ChannelIndex, ChannelIndex.Record>>> res = new ArrayList<>(); List<Pair<Long[], Map<ChannelIndex, ChannelIndex.Record>>> res = new ArrayList<>();
Calendar calendar = Calendar.getInstance(); Calendar calendar = Calendar.getInstance();
// 设置为当天0:00
calendar.setTime(Tools.truncDate(calendar.getTime(), Constant.DAY_PATTERN));
long endTime = calendar.getTime().getTime(); long endTime = calendar.getTime().getTime();
calendar.add(Calendar.DAY_OF_MONTH, -day); calendar.add(Calendar.DAY_OF_MONTH, -day);
long startTime = calendar.getTime().getTime(); long startTime = calendar.getTime().getTime();
......
...@@ -62,6 +62,10 @@ public class ChannelIndex extends AbstractBaseMongo { ...@@ -62,6 +62,10 @@ public class ChannelIndex extends AbstractBaseMongo {
*/ */
private Double emotionIndex; private Double emotionIndex;
public ChannelIndex(Channel channel, String contendId) {
this(channel.getProjectId(), contendId, channel.getPlatform(), channel.getRealSource(), channel.getSource());
}
public ChannelIndex(Channel channel) { public ChannelIndex(Channel channel) {
this(channel.getProjectId(), channel.getContendId(), channel.getPlatform(), channel.getRealSource(), channel.getSource()); this(channel.getProjectId(), channel.getContendId(), channel.getPlatform(), channel.getRealSource(), channel.getSource());
} }
......
...@@ -243,7 +243,7 @@ public interface ChannelService { ...@@ -243,7 +243,7 @@ public interface ChannelService {
* @param channelId 渠道ID * @param channelId 渠道ID
* @return 事件信息 * @return 事件信息
*/ */
List<ExportAppChannelEventDTO> downloadEventsByTime(Long startTime, Long endTime, String channelId); List<ExportAppChannelEventDTO> downloadEventsByTime(Long startTime, Long endTime, String channelId,String contendId);
/** /**
* 计算渠道倾向及指数 * 计算渠道倾向及指数
......
...@@ -74,12 +74,14 @@ public interface ProjectService { ...@@ -74,12 +74,14 @@ public interface ProjectService {
/** /**
* 获取当前用户拥有的所有项目 * 获取当前用户拥有的所有项目
*
* @return * @return
*/ */
List<JSONObject> getUserAllProjects(); List<JSONObject> getUserAllProjects();
/** /**
* 获取当前用户拥有的所有项目(包括已过期) * 获取当前用户拥有的所有项目(包括已过期)
*
* @return * @return
*/ */
List<JSONObject> getLoginUserAllProjects(); List<JSONObject> getLoginUserAllProjects();
...@@ -90,7 +92,7 @@ public interface ProjectService { ...@@ -90,7 +92,7 @@ public interface ProjectService {
* @param hasPrimary 是否要主品牌 * @param hasPrimary 是否要主品牌
* @return 品牌筛选列表 * @return 品牌筛选列表
*/ */
List<JSONObject> getBrands(String projectId,boolean hasPrimary); List<JSONObject> getBrands(String projectId, boolean hasPrimary);
/** /**
* 根据关联项目组ID获取Project * 根据关联项目组ID获取Project
...@@ -106,10 +108,11 @@ public interface ProjectService { ...@@ -106,10 +108,11 @@ public interface ProjectService {
* @param contendId 品牌id * @param contendId 品牌id
* @return project对象 * @return project对象
*/ */
AbstractProject getProjectByContendId(String contendId); AbstractProject getProjectByContendId(String projectId, String contendId);
/** /**
* 获取所有启动状态下的Project * 获取所有启动状态下的Project
*
* @return * @return
*/ */
List<Project> getAllProjectsWithStart(); List<Project> getAllProjectsWithStart();
......
...@@ -460,7 +460,7 @@ public class ChannelServiceImpl implements ChannelService { ...@@ -460,7 +460,7 @@ public class ChannelServiceImpl implements ChannelService {
@Override @Override
public List<JSONObject> getCollectList(String contendId) { public List<JSONObject> getCollectList(String contendId) {
String projectId = UserThreadLocal.getProjectId(); String projectId = UserThreadLocal.getProjectId();
String linkedGroupId = projectService.getProjectByContendId(contendId).getBrandLinkedGroupId(); String linkedGroupId = projectService.getProjectByContendId(projectId, contendId).getBrandLinkedGroupId();
Query query = new Query(); Query query = new Query();
query.addCriteria(Criteria.where("projectId").is(projectId).and("linkedGroupId").is(linkedGroupId).and("isCollect").is(true)); query.addCriteria(Criteria.where("projectId").is(projectId).and("linkedGroupId").is(linkedGroupId).and("isCollect").is(true));
channelDao.addSort(query, "{\"collectTime\":\"descend\"}"); channelDao.addSort(query, "{\"collectTime\":\"descend\"}");
...@@ -523,7 +523,7 @@ public class ChannelServiceImpl implements ChannelService { ...@@ -523,7 +523,7 @@ public class ChannelServiceImpl implements ChannelService {
json.put("spreadType", "month"); json.put("spreadType", "month");
} }
Channel channel = channelDao.findOneById(channelId); Channel channel = channelDao.findOneById(channelId);
json.put("spreadingTend", spreadingTendEvent(startTime, endTime, channel, UserThreadLocal.getProjectId(), contend, timePattern)); json.put("spreadingTend", spreadingTendEvent(startTime, endTime, channel, contend, timePattern));
if ("0".equals(contend)) { if ("0".equals(contend)) {
spreadDatas.add(0, json); spreadDatas.add(0, json);
continue; continue;
...@@ -571,7 +571,7 @@ public class ChannelServiceImpl implements ChannelService { ...@@ -571,7 +571,7 @@ public class ChannelServiceImpl implements ChannelService {
for (Map.Entry<String, List<ChannelIndex.Article>> entry : contendMap.entrySet()) { for (Map.Entry<String, List<ChannelIndex.Article>> entry : contendMap.entrySet()) {
String contendId = entry.getKey(); String contendId = entry.getKey();
Pair<Long, JSONObject> dataCount = getDataCount(entry.getValue()); Pair<Long, JSONObject> dataCount = getDataCount(entry.getValue());
Pair<Long, JSONObject> eventCount = getEventCount(startTime, endTime, channel); Pair<Long, JSONObject> eventCount = getEventCount(startTime, endTime, channel, contendId);
articleTotal += dataCount.getLeft(); articleTotal += dataCount.getLeft();
eventTotal += eventCount.getLeft(); eventTotal += eventCount.getLeft();
// 合并统计 // 合并统计
...@@ -693,7 +693,7 @@ public class ChannelServiceImpl implements ChannelService { ...@@ -693,7 +693,7 @@ public class ChannelServiceImpl implements ChannelService {
JSONObject res = new JSONObject(); JSONObject res = new JSONObject();
List<JSONObject> dayList = new ArrayList<>(); List<JSONObject> dayList = new ArrayList<>();
Channel channel = channelDao.findOneById(channelId); Channel channel = channelDao.findOneById(channelId);
Map<Long, List<Event>> eventDay = eventDao.getEventDay(new ChannelIndex(channel), startTime, endTime); Map<Long, List<Event>> eventDay = eventDao.getEventDay(new ChannelIndex(channel, contendId), startTime, endTime);
eventDay = Tools.sortTimeKeyMap(eventDay, true); eventDay = Tools.sortTimeKeyMap(eventDay, true);
eventDay.forEach((time, list) -> dayList.add(getDayResultWithEvent(list, page, pageSize, time))); eventDay.forEach((time, list) -> dayList.add(getDayResultWithEvent(list, page, pageSize, time)));
res.put("list", dayList); res.put("list", dayList);
...@@ -701,10 +701,10 @@ public class ChannelServiceImpl implements ChannelService { ...@@ -701,10 +701,10 @@ public class ChannelServiceImpl implements ChannelService {
} }
@Override @Override
public List<ExportAppChannelEventDTO> downloadEventsByTime(Long startTime, Long endTime, String channelId) { public List<ExportAppChannelEventDTO> downloadEventsByTime(Long startTime, Long endTime, String channelId, String contendId) {
List<ExportAppChannelEventDTO> res = new ArrayList<>(); List<ExportAppChannelEventDTO> res = new ArrayList<>();
Channel channel = channelDao.findOneById(channelId); Channel channel = channelDao.findOneById(channelId);
Map<Long, List<Event>> eventDay = eventDao.getEventDay(new ChannelIndex(channel), startTime, endTime); Map<Long, List<Event>> eventDay = eventDao.getEventDay(new ChannelIndex(channel, contendId), startTime, endTime);
Tools.sortTimeKeyMap(eventDay, true).values().forEach(events -> { Tools.sortTimeKeyMap(eventDay, true).values().forEach(events -> {
events.forEach(event -> { events.forEach(event -> {
res.add(ExportAppChannelEventDTO.createFromEvent(event)); res.add(ExportAppChannelEventDTO.createFromEvent(event));
...@@ -734,10 +734,8 @@ public class ChannelServiceImpl implements ChannelService { ...@@ -734,10 +734,8 @@ public class ChannelServiceImpl implements ChannelService {
//是否友好渠道 //是否友好渠道
boolean isPositive = false; boolean isPositive = false;
boolean isNegative = false; boolean isNegative = false;
long specNegativeCount = eventDao.getEventCount(new ChannelIndex(channel), Arrays.asList(EmotionEnum.POSITIVE.getName(), long specNegativeCount = eventDao.getEventCount(new ChannelIndex(channel), Arrays.asList(EmotionEnum.POSITIVE.getName(), EmotionEnum.NEUTRAL.getName()), EmotionEnum.NEGATIVE.getName());
EmotionEnum.NEUTRAL.getName()), EmotionEnum.NEGATIVE.getName()); long specPositiveCount = eventDao.getEventCount(new ChannelIndex(channel), Arrays.asList(EmotionEnum.NEGATIVE.getName(), EmotionEnum.POSITIVE.getName()), EmotionEnum.POSITIVE.getName());
long specPositiveCount = eventDao.getEventCount(new ChannelIndex(channel), Arrays.asList(EmotionEnum.NEGATIVE.getName(),
EmotionEnum.POSITIVE.getName()), EmotionEnum.POSITIVE.getName());
// 特殊情况:若皆有发布过反常稿件 // 特殊情况:若皆有发布过反常稿件
if (specNegativeCount > 0 && specPositiveCount > 0) { if (specNegativeCount > 0 && specPositiveCount > 0) {
//正面稿件数>负面稿件数,为正面 //正面稿件数>负面稿件数,为正面
...@@ -754,8 +752,7 @@ public class ChannelServiceImpl implements ChannelService { ...@@ -754,8 +752,7 @@ public class ChannelServiceImpl implements ChannelService {
//如果有在正面事件中发布负面稿件 //如果有在正面事件中发布负面稿件
} else if (specNegativeCount > 0) { } else if (specNegativeCount > 0) {
// 或满足三者复合条件:发布正面稿件>负面稿件&&正面稿件>=中性稿件&&月均参与正面事件数≥4.64 // 或满足三者复合条件:发布正面稿件>负面稿件&&正面稿件>=中性稿件&&月均参与正面事件数≥4.64
if (positiveCount > negativeCount && positiveCount >= neutralCount if (positiveCount > negativeCount && positiveCount >= neutralCount && (this.inEventCountMonthAverage(channel, EmotionEnum.POSITIVE.getName()) >= 4.64)) {
&& (this.inEventCountMonthAverage(channel, EmotionEnum.POSITIVE.getName()) >= 4.64)) {
isPositive = true; isPositive = true;
} else { } else {
isNegative = true; isNegative = true;
...@@ -765,15 +762,13 @@ public class ChannelServiceImpl implements ChannelService { ...@@ -765,15 +762,13 @@ public class ChannelServiceImpl implements ChannelService {
if (specPositiveCount > 0) { if (specPositiveCount > 0) {
isPositive = true; isPositive = true;
// 或满足三者复合条件:发布正面稿件>负面稿件&&正面稿件>=中性稿件&&月均参与正面事件数≥4.64 // 或满足三者复合条件:发布正面稿件>负面稿件&&正面稿件>=中性稿件&&月均参与正面事件数≥4.64
} else if (positiveCount > negativeCount && positiveCount >= neutralCount } else if (positiveCount > negativeCount && positiveCount >= neutralCount && (this.inEventCountMonthAverage(channel, EmotionEnum.POSITIVE.getName()) >= 4.64)) {
&& (this.inEventCountMonthAverage(channel, EmotionEnum.POSITIVE.getName()) >= 4.64)) {
isPositive = true; isPositive = true;
} }
} }
} }
//更新渠道指数,渠道等级,情感倾向并记录变化 //更新渠道指数,渠道等级,情感倾向并记录变化
this.updateChannel(channel, project, positiveCount, neutralCount, negativeCount, isPositive, isNegative, specPositiveCount, this.updateChannel(channel, project, positiveCount, neutralCount, negativeCount, isPositive, isNegative, specPositiveCount, specNegativeCount);
specNegativeCount);
} catch (Exception e) { } catch (Exception e) {
log.error("calculateChannelEmotionIndex-", e); log.error("calculateChannelEmotionIndex-", e);
} }
...@@ -789,8 +784,7 @@ public class ChannelServiceImpl implements ChannelService { ...@@ -789,8 +784,7 @@ public class ChannelServiceImpl implements ChannelService {
* @param neutralCount 中性稿件数 * @param neutralCount 中性稿件数
* @param negativeCount 负面稿件数 * @param negativeCount 负面稿件数
*/ */
private void updateChannel(Channel channel, Project project, long positiveCount, long neutralCount, long negativeCount, boolean isPositive, private void updateChannel(Channel channel, Project project, long positiveCount, long neutralCount, long negativeCount, boolean isPositive, boolean isNegative, long specPositiveCount, long specNegativeCount) {
boolean isNegative, long specPositiveCount, long specNegativeCount) {
double index; double index;
int emotion; int emotion;
// 负面渠道走负面渠道指数计算规则并更新 // 负面渠道走负面渠道指数计算规则并更新
...@@ -826,8 +820,7 @@ public class ChannelServiceImpl implements ChannelService { ...@@ -826,8 +820,7 @@ public class ChannelServiceImpl implements ChannelService {
* @param positiveChannelMap 模块配置的正面渠道指数计算比例 * @param positiveChannelMap 模块配置的正面渠道指数计算比例
* @return 计算指数 * @return 计算指数
*/ */
private double positiveChannelIndexRule(Channel channel, long positiveCount, long neutralCount, long negativeCount, long specPositiveCount, private double positiveChannelIndexRule(Channel channel, long positiveCount, long neutralCount, long negativeCount, long specPositiveCount, Map<String, Double> positiveChannelMap) {
Map<String, Double> positiveChannelMap) {
//正面稿件数-中性稿件数 //正面稿件数-中性稿件数
long value = positiveCount - neutralCount; long value = positiveCount - neutralCount;
//正面-中性得分 //正面-中性得分
...@@ -1184,13 +1177,13 @@ public class ChannelServiceImpl implements ChannelService { ...@@ -1184,13 +1177,13 @@ public class ChannelServiceImpl implements ChannelService {
return Pair.of(total, res); return Pair.of(total, res);
} }
private Pair<Long, JSONObject> getEventCount(Long startTime, Long endTime, Channel channel) { private Pair<Long, JSONObject> getEventCount(Long startTime, Long endTime, Channel channel, String contendId) {
JSONObject res = new JSONObject(); JSONObject res = new JSONObject();
Map<Long, List<Event>> eventCount; Map<Long, List<Event>> eventCount;
if (endTime - startTime > Constant.ONE_MONTH) { if (endTime - startTime > Constant.ONE_MONTH) {
eventCount = eventDao.getEventMonth(new ChannelIndex(channel), startTime, endTime); eventCount = eventDao.getEventMonth(new ChannelIndex(channel, contendId), startTime, endTime);
} else { } else {
eventCount = eventDao.getEventDay(new ChannelIndex(channel), startTime, endTime); eventCount = eventDao.getEventDay(new ChannelIndex(channel, contendId), startTime, endTime);
} }
// 事件部分 // 事件部分
long positiveEventCount = 0; long positiveEventCount = 0;
...@@ -1224,12 +1217,12 @@ public class ChannelServiceImpl implements ChannelService { ...@@ -1224,12 +1217,12 @@ public class ChannelServiceImpl implements ChannelService {
}).collect(Collectors.toList()); }).collect(Collectors.toList());
} }
private List<JSONObject> spreadingTendEvent(Long startTime, Long endTime, Channel channel, String projectId, String contendId, String timePattern) { private List<JSONObject> spreadingTendEvent(Long startTime, Long endTime, Channel channel, String contendId, String timePattern) {
Map<Long, List<Event>> eventCount = completeTimes(startTime, endTime, timePattern); Map<Long, List<Event>> eventCount = completeTimes(startTime, endTime, timePattern);
if (Constant.MONTH_PATTERN.equals(timePattern)) { if (Constant.MONTH_PATTERN.equals(timePattern)) {
eventCount.putAll(eventDao.getEventMonth(new ChannelIndex(channel), startTime, endTime)); eventCount.putAll(eventDao.getEventMonth(new ChannelIndex(channel, contendId), startTime, endTime));
} else { } else {
eventCount.putAll(eventDao.getEventDay(new ChannelIndex(channel), startTime, endTime)); eventCount.putAll(eventDao.getEventDay(new ChannelIndex(channel, contendId), startTime, endTime));
} }
return eventCount.entrySet().stream().sorted(Comparator.comparingLong(Map.Entry::getKey)).map(e -> { return eventCount.entrySet().stream().sorted(Comparator.comparingLong(Map.Entry::getKey)).map(e -> {
JSONObject spreadJson = new JSONObject(); JSONObject spreadJson = new JSONObject();
...@@ -1369,7 +1362,7 @@ public class ChannelServiceImpl implements ChannelService { ...@@ -1369,7 +1362,7 @@ public class ChannelServiceImpl implements ChannelService {
if ("0".equals(channel.getContendId())) { if ("0".equals(channel.getContendId())) {
return true; return true;
} }
AbstractProject project = projectService.getProjectByContendId(channel.getContendId()); AbstractProject project = projectService.getProjectByContendId(channel.getProjectId(), channel.getContendId());
if (project instanceof Contend) { if (project instanceof Contend) {
//有情感倾向更新渠道指数 //有情感倾向更新渠道指数
return ((Contend) project).isHasEmotion(); return ((Contend) project).isHasEmotion();
......
...@@ -227,8 +227,7 @@ public class ProjectServiceImpl implements ProjectService { ...@@ -227,8 +227,7 @@ public class ProjectServiceImpl implements ProjectService {
} }
@Override @Override
public AbstractProject getProjectByContendId(String contendId) { public AbstractProject getProjectByContendId(String projectId, String contendId) {
String projectId = UserThreadLocal.getProjectId();
Project project = projectDao.findOneById(projectId); Project project = projectDao.findOneById(projectId);
if ("0".equals(contendId)) { if ("0".equals(contendId)) {
return project; return project;
......
package com.zhiwei.brandkbs2.service.impl; package com.zhiwei.brandkbs2.service.impl;
import com.zhiwei.brandkbs2.config.Constant; import com.zhiwei.brandkbs2.config.Constant;
import com.zhiwei.brandkbs2.dao.*; import com.zhiwei.brandkbs2.dao.AggreeResultDao;
import com.zhiwei.brandkbs2.dao.BrandkbsTaskDao;
import com.zhiwei.brandkbs2.dao.ChannelDao;
import com.zhiwei.brandkbs2.dao.ReportSettingsDao;
import com.zhiwei.brandkbs2.enmus.ReportTypeEnum; import com.zhiwei.brandkbs2.enmus.ReportTypeEnum;
import com.zhiwei.brandkbs2.es.ChannelEsDao; import com.zhiwei.brandkbs2.es.ChannelEsDao;
import com.zhiwei.brandkbs2.es.EsClientDao; import com.zhiwei.brandkbs2.es.EsClientDao;
...@@ -13,14 +16,16 @@ import org.apache.commons.collections4.ListUtils; ...@@ -13,14 +16,16 @@ import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors; import java.util.stream.Collectors;
...@@ -65,6 +70,9 @@ public class TaskServiceImpl implements TaskService { ...@@ -65,6 +70,9 @@ public class TaskServiceImpl implements TaskService {
@Resource(name = "reportServiceImpl") @Resource(name = "reportServiceImpl")
ReportService reportService; ReportService reportService;
@Resource(name = "taskServiceExecutor")
ThreadPoolTaskExecutor taskServiceExecutor;
@Override @Override
public void messageFlowCount(int day) { public void messageFlowCount(int day) {
List<Pair<Long[], Map<ChannelIndex, ChannelIndex.Record>>> rangeTimeRecords = esClientDao.searchRecordRecentDay(day); List<Pair<Long[], Map<ChannelIndex, ChannelIndex.Record>>> rangeTimeRecords = esClientDao.searchRecordRecentDay(day);
...@@ -77,7 +85,7 @@ public class TaskServiceImpl implements TaskService { ...@@ -77,7 +85,7 @@ public class TaskServiceImpl implements TaskService {
List<Channel> insertList = new ArrayList<>(); List<Channel> insertList = new ArrayList<>();
List<Map.Entry<ChannelIndex, ChannelIndex.Record>> batchList = new ArrayList<>(); List<Map.Entry<ChannelIndex, ChannelIndex.Record>> batchList = new ArrayList<>();
// 新recordMap // 新recordMap
Map<ChannelIndex, ChannelIndex.Record> newRecordMap = new HashMap<>(); Map<ChannelIndex, ChannelIndex.Record> newRecordMap = new ConcurrentHashMap<>();
for (Map.Entry<ChannelIndex, ChannelIndex.Record> entry : channelIndexRecordMap.entrySet()) { for (Map.Entry<ChannelIndex, ChannelIndex.Record> entry : channelIndexRecordMap.entrySet()) {
batchList.add(entry); batchList.add(entry);
// 每100条做一次清算 // 每100条做一次清算
...@@ -85,7 +93,7 @@ public class TaskServiceImpl implements TaskService { ...@@ -85,7 +93,7 @@ public class TaskServiceImpl implements TaskService {
insertList.addAll(batchHandle(batchList, newRecordMap)); insertList.addAll(batchHandle(batchList, newRecordMap));
batchList = new ArrayList<>(); batchList = new ArrayList<>();
} }
if (handleSize % 10000 == 0) { if (handleSize % 100 == 0) {
log.info("渠道统计-渠道总计-查询更新已完成{}/{}", handleSize, channelIndexRecordMap.size()); log.info("渠道统计-渠道总计-查询更新已完成{}/{}", handleSize, channelIndexRecordMap.size());
} }
} }
...@@ -103,10 +111,10 @@ public class TaskServiceImpl implements TaskService { ...@@ -103,10 +111,10 @@ public class TaskServiceImpl implements TaskService {
} }
private List<Channel> batchHandle(List<Map.Entry<ChannelIndex, ChannelIndex.Record>> batchList, Map<ChannelIndex, ChannelIndex.Record> newRecordMap) { private List<Channel> batchHandle(List<Map.Entry<ChannelIndex, ChannelIndex.Record>> batchList, Map<ChannelIndex, ChannelIndex.Record> newRecordMap) {
List<Channel> insertList = new ArrayList<>(); List<Channel> insertList = Collections.synchronizedList(new ArrayList<>());
List<String> fids = batchList.stream().map(channelIndexRecordEntry -> channelIndexRecordEntry.getKey().getFid()).collect(Collectors.toList()); List<String> fids = batchList.stream().map(channelIndexRecordEntry -> channelIndexRecordEntry.getKey().getFid()).collect(Collectors.toList());
Map<String, Channel> fidChannel = channelDao.queryUniqueAsync(fids); Map<String, Channel> fidChannel = channelDao.queryUniqueAsync(fids);
for (Map.Entry<ChannelIndex, ChannelIndex.Record> entry : batchList) { CompletableFuture.allOf(batchList.stream().map(entry -> CompletableFuture.supplyAsync(() -> {
String fid = entry.getKey().getFid(); String fid = entry.getKey().getFid();
Channel channel = fidChannel.get(fid); Channel channel = fidChannel.get(fid);
if (null == channel) { if (null == channel) {
...@@ -119,7 +127,22 @@ public class TaskServiceImpl implements TaskService { ...@@ -119,7 +127,22 @@ public class TaskServiceImpl implements TaskService {
// 设置查询数值 // 设置查询数值
entry.getKey().setChannelInfo(channel); entry.getKey().setChannelInfo(channel);
newRecordMap.put(entry.getKey(), entry.getValue()); newRecordMap.put(entry.getKey(), entry.getValue());
} return null;
}, taskServiceExecutor)).toArray(CompletableFuture[]::new)).join();
// for (Map.Entry<ChannelIndex, ChannelIndex.Record> entry : batchList) {
// String fid = entry.getKey().getFid();
// Channel channel = fidChannel.get(fid);
// if (null == channel) {
// channel = Channel.createFromChannelIndexRecord(entry.getKey(), entry.getValue());
// insertList.add(channelService.calculateChannelEmotionIndex(channel));
// } else {
// channel.setRecord(entry.getValue());
// channelDao.updateOne(channelService.calculateChannelEmotionIndex(channel));
// }
// // 设置查询数值
// entry.getKey().setChannelInfo(channel);
// newRecordMap.put(entry.getKey(), entry.getValue());
// }
return insertList; return insertList;
} }
......
...@@ -3,9 +3,10 @@ package com.zhiwei.brandkbs2; ...@@ -3,9 +3,10 @@ package com.zhiwei.brandkbs2;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.zhiwei.brandkbs2.common.GlobalPojo; import com.zhiwei.brandkbs2.common.GlobalPojo;
import com.zhiwei.brandkbs2.controller.app.AppArticleController; import com.zhiwei.brandkbs2.controller.app.AppArticleController;
import com.zhiwei.brandkbs2.dao.ChannelLabelDao; import com.zhiwei.brandkbs2.dao.*;
import com.zhiwei.brandkbs2.dao.EventDataDao; import com.zhiwei.brandkbs2.pojo.Channel;
import com.zhiwei.brandkbs2.dao.ReportDao; import com.zhiwei.brandkbs2.service.ChannelService;
import com.zhiwei.brandkbs2.service.EventService;
import com.zhiwei.brandkbs2.service.ReportService; import com.zhiwei.brandkbs2.service.ReportService;
import com.zhiwei.brandkbs2.service.TaskService; import com.zhiwei.brandkbs2.service.TaskService;
import com.zhiwei.brandkbs2.util.TextUtil; import com.zhiwei.brandkbs2.util.TextUtil;
...@@ -38,6 +39,12 @@ public class TestRunWith { ...@@ -38,6 +39,12 @@ public class TestRunWith {
ReportService reportService; ReportService reportService;
@Autowired @Autowired
EventService eventService;
@Autowired
EventDao eventDao;
@Autowired
EventDataDao eventDataDao; EventDataDao eventDataDao;
@Autowired @Autowired
...@@ -55,6 +62,12 @@ public class TestRunWith { ...@@ -55,6 +62,12 @@ public class TestRunWith {
@Autowired @Autowired
GlobalPojo globalPojo; GlobalPojo globalPojo;
@Autowired
ChannelDao channelDao;
@Autowired
ChannelService channelService;
@Test @Test
public void test1() { public void test1() {
// UserInfo userInfo = new UserInfo(); // UserInfo userInfo = new UserInfo();
...@@ -62,8 +75,16 @@ public class TestRunWith { ...@@ -62,8 +75,16 @@ public class TestRunWith {
// UserThreadLocal.set(userInfo); // UserThreadLocal.set(userInfo);
// ResponseResult result = appArticleController.getMarkSpread(1657468800000L, 1657555200000L); // ResponseResult result = appArticleController.getMarkSpread(1657468800000L, 1657555200000L);
// System.out.println(JSONObject.toJSONString(result)); // System.out.println(JSONObject.toJSONString(result));
taskService.messageFlowCount(2); // taskService.messageFlowCount(2);
// eventService.analysisEvents(Arrays.asList("62e9eb772a41a57ed6c6ef92"));
// ChannelIndex channelIndex = new ChannelIndex("62beadd1bbf8eb20f96d2f2f","0","网媒","网媒","中工网");
// long total = eventDao.judgeSpecEventCount(channelIndex, Collections.singletonList("正面"), "中性");
// eventDao.getEventDay(channelIndex,1659283200000L,1660924800000L);
long start = System.currentTimeMillis();
Channel channel = channelDao.findOneById("62f5482d3b00de5932ba9593");
channelService.calculateChannelEmotionIndex(channel);
System.out.println(System.currentTimeMillis()-start);
// reportService.getReportsAggCount(); // reportService.getReportsAggCount();
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment