Commit face4cbb by shenjunjie

Merge branch 'release' into 'master'

Release

See merge request !108
parents 5b74c000 46b4691f
......@@ -124,7 +124,7 @@ public class EventController extends BaseController {
@ApiImplicitParam(name = "eventIds", value = "事件ID集合", paramType = "body", dataType = "list")
@PostMapping("/analyze")
public ResponseResult analysisEvents(@RequestBody JSONObject info) {
eventService.analysisEvents(info.getJSONArray("eventIds").toJavaList(String.class));
eventService.analysisEventsAsync(info.getJSONArray("eventIds").toJavaList(String.class));
behaviorService.pushBehaviorOld(OPERATION, "批量更新事件", request);
return ResponseResult.success();
}
......
......@@ -190,6 +190,12 @@ public interface EventService {
void analysisEvents(List<String> eventIds);
/**
* 异步批量更新事件
* @param eventIds 事件ID集合
*/
void analysisEventsAsync(List<String> eventIds);
/**
* 搜索舆情全部事件标签
*
* @param linkedGroupId 关联项目组ID
......@@ -278,4 +284,11 @@ public interface EventService {
*/
List<JSONObject> getLastEventTop(String keyword,int limit);
/**
* 获取未结束的事件
* @param projectId
* @return
*/
List<Event> findNotEndEventByProjectId(String projectId);
}
......@@ -33,4 +33,8 @@ public interface TaskService{
*/
void cleanAggreeTask();
/**
* 事件相关更新
*/
void eventUpdate();
}
......@@ -11,6 +11,7 @@ import com.zhiwei.brandkbs2.pojo.vo.BehaviorVO;
import com.zhiwei.brandkbs2.pojo.vo.PageVO;
import com.zhiwei.brandkbs2.service.BehaviorService;
import com.zhiwei.brandkbs2.util.MongoUtil;
import com.zhiwei.brandkbs2.util.Tools;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
......@@ -77,8 +78,9 @@ public class BehaviorServiceImpl implements BehaviorService {
query.addCriteria(Criteria.where("cTime").gte(startTime).lt(endTime).and("backstage").is(backstage));
behaviorDao.addSort(query, "{\"cTime\":\"descend\"}");
if (StringUtils.isNotEmpty(nickname)) {
userDao.addKeywordFuzz(query, nickname, "nickname");
List<User> userList = userDao.findList(query);
Query userQuery = new Query();
userDao.addKeywordFuzz(userQuery, nickname, "nickname");
List<User> userList = userDao.findList(userQuery);
// 根据nickname未查询
if (CollectionUtils.isEmpty(userList)) {
return PageVO.createPageVo(0, page, 0, size, Collections.emptyList());
......@@ -87,14 +89,29 @@ public class BehaviorServiceImpl implements BehaviorService {
query.addCriteria(Criteria.where("userId").in(userList.stream().map(User::getId).collect(Collectors.toList())));
}
long count = behaviorDao.count(query, collectionNames);
mongoUtil.start(page, size, query);
List<Behavior> behaviorList = behaviorDao.findList(query, collectionNames);
// 保证数据分表时正常分页
int skipCount = (page - 1) * size;
List<Behavior> behaviorList = new ArrayList<>();
int pageSizeFlag = size;
if (0 != collectionNames.length) {
for (int i = collectionNames.length - 1; i >= 0; i--) {
if (behaviorList.size() >= size) {
break;
} else {
query.limit(pageSizeFlag);
query.skip(skipCount);
List<Behavior> list = behaviorDao.findList(query, collectionNames[i]);
behaviorList.addAll(list);
}
skipCount = 0;
pageSizeFlag = size - behaviorList.size();
}
}
List<BehaviorVO> resList = behaviorList.stream().map(behavior -> {
Integer roleId = getRoleId(behavior.getUserId(), behavior.getProjectId());
return BehaviorVO.createFromBehavior(behavior, roleId);
}).collect(Collectors.toList());
MongoUtil.PageHelper<BehaviorVO> pageHelper = mongoUtil.pageHelperT(count, resList);
return PageVO.createPageVo(pageHelper, resList);
return PageVO.createPageVo(count, page, size, resList);
}
@Override
......
......@@ -64,13 +64,19 @@ public class EventDataServiceImpl implements EventDataService {
@Override
public void analysisEvent(Event event) {
long startTime = event.getStartTime();
long endTime = event.isEndStatus() ? event.getEndTime() : System.currentTimeMillis();
Period periodDay = new Period(startTime, endTime, PeriodType.days());
long now = System.currentTimeMillis();
// 时间异常
if(startTime >= now){
return;
}
long endTime = event.isEndStatus() ? event.getEndTime() : now;
//时间区间大于40天 就不更新
if (periodDay.getDays() >= 40 || startTime > System.currentTimeMillis()) {
if (new Period(startTime, endTime, PeriodType.days()).getDays() >= 40) {
if (!event.isEndStatus()) {
event.setEndTime(endTime);
event.setEndStatus(true);
eventDao.updateOne(event);
}
log.info("analysisEvent-eventId:{},时间区间大于40天,不做更新", event.getId());
return;
}
......
......@@ -138,6 +138,7 @@ public class EventServiceImpl implements EventService {
AtomicInteger count = new AtomicInteger();
//事件太多塞满队列了
ApplicationProjectListener.getThreadPool().execute(() -> {
List<String> eventIds = new ArrayList<>();
yqEventList.forEach(yqEventDTO -> {
Event existEvent = eventDao.getEventByUniqueIds(yqEventDTO.getYqEventId(), projectId, contendId);
Event event = Event.createFromYqEventDTO(yqEventDTO, collectionName, projectId, contendId);
......@@ -149,12 +150,14 @@ public class EventServiceImpl implements EventService {
event.setCTime(new Date().getTime());
eventDao.insertOne(event);
}
eventIds.add(event.getId());
if (count.incrementAndGet() == yqEventList.size()) {
stringRedisTemplate.opsForValue().set(redisKey, String.valueOf(100), 1, TimeUnit.MINUTES);
} else {
stringRedisTemplate.opsForValue().set(redisKey, String.valueOf((count.get()) * 100 / yqEventList.size()));
}
});
analysisEventsAsync(eventIds);
});
}
......@@ -456,7 +459,10 @@ public class EventServiceImpl implements EventService {
@Override
public void analysisEvents(List<String> eventIds) {
ApplicationProjectListener.getThreadPool().execute(() -> eventIds.forEach(eventId -> {
if (Tools.isEmpty(eventIds)) {
return;
}
eventIds.forEach(eventId -> {
String redisKey = null;
try {
Event event = getEventById(eventId);
......@@ -469,7 +475,12 @@ public class EventServiceImpl implements EventService {
log.error("eventId:{}更新失败", eventId, e);
redisUtil.set(redisKey, "-1");
}
}));
});
}
@Override
public void analysisEventsAsync(List<String> eventIds) {
ApplicationProjectListener.getThreadPool().execute(() -> analysisEvents(eventIds));
}
@Override
......@@ -666,6 +677,12 @@ public class EventServiceImpl implements EventService {
}).sorted((o1, o2) -> Long.compare((long) o2.get("total"), (long) o1.get("total"))).collect(Collectors.toList());
}
@Override
public List<Event> findNotEndEventByProjectId(String projectId) {
Query query = Query.query(Criteria.where("projectId").is(projectId).and("endStatus").is(false));
return eventDao.findList(query);
}
/**
* 获取时间筛选条件
*
......
......@@ -140,6 +140,7 @@ public class MarkFlowServiceImpl implements MarkFlowService {
if (null != brandkbsHitJson.get("channel_emotion")) {
sourceDetails.put("channelEmotion", ChannelEmotion.getNameFromState(brandkbsHitJson.getIntValue("channel_emotion")));
}
sourceDetails.put("channelId", brandkbsHitJson.getString("channel_id"));
double channelValue = tJson.getDoubleValue(GenericAttribute.ES_CHANNEL_INFLUENCE);
if (channelValue > 0) {
// 保留两位小数
......
......@@ -3,10 +3,7 @@ package com.zhiwei.brandkbs2.service.impl;
import com.zhiwei.brandkbs2.auth.UserThreadLocal;
import com.zhiwei.brandkbs2.common.GlobalPojo;
import com.zhiwei.brandkbs2.config.Constant;
import com.zhiwei.brandkbs2.dao.AggreeResultDao;
import com.zhiwei.brandkbs2.dao.BrandkbsTaskDao;
import com.zhiwei.brandkbs2.dao.ChannelDao;
import com.zhiwei.brandkbs2.dao.ReportSettingsDao;
import com.zhiwei.brandkbs2.dao.*;
import com.zhiwei.brandkbs2.enmus.ReportTypeEnum;
import com.zhiwei.brandkbs2.es.ChannelEsDao;
import com.zhiwei.brandkbs2.es.EsClientDao;
......@@ -75,6 +72,9 @@ public class TaskServiceImpl implements TaskService {
@Resource(name = "commonServiceImpl")
CommonService commonService;
@Resource(name = "eventServiceImpl")
EventService eventService;
@Resource(name = "taskServiceExecutor")
ThreadPoolTaskExecutor taskServiceExecutor;
......@@ -277,6 +277,19 @@ public class TaskServiceImpl implements TaskService {
log.info("本次清理过期聚合:{}条", deleteCount);
}
@Override
public void eventUpdate() {
for (Project project : projectService.getAllProjectsWithStart()) {
List<Event> events = eventService.findNotEndEventByProjectId(project.getId());
log.info("{}项目,未结束事件{}个", project.getProjectName(), events.size());
if (Tools.isEmpty(events)) {
continue;
}
eventService.analysisEvents(events.stream().map(Event::getId).collect(Collectors.toList()));
log.info("{}项目,未结束事件已更新{}个", project.getProjectName(), events.size());
}
}
private boolean reportSendByProject(Project project) {
boolean flag = false;
// 扫描setting信息
......
......@@ -77,4 +77,17 @@ public class ControlCenter {
}
}
@Async("scheduledExecutor")
@Scheduled(cron = "0 0 2 * * ?")
public void eventUpdate() {
log.info("定时按天更新事件任务-启动");
try {
taskService.eventUpdate();
} catch (Exception e) {
log.error("定时按天更新事件任务-出错", e);
} finally {
log.info("定时按天更新事件任务-结束");
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment