Commit 849ae8db by shenjunjie

新增事件导入更新及天级定时更新

parent 157c2cad
...@@ -124,7 +124,7 @@ public class EventController extends BaseController { ...@@ -124,7 +124,7 @@ public class EventController extends BaseController {
@ApiImplicitParam(name = "eventIds", value = "事件ID集合", paramType = "body", dataType = "list") @ApiImplicitParam(name = "eventIds", value = "事件ID集合", paramType = "body", dataType = "list")
@PostMapping("/analyze") @PostMapping("/analyze")
public ResponseResult analysisEvents(@RequestBody JSONObject info) { public ResponseResult analysisEvents(@RequestBody JSONObject info) {
eventService.analysisEvents(info.getJSONArray("eventIds").toJavaList(String.class)); eventService.analysisEventsAsync(info.getJSONArray("eventIds").toJavaList(String.class));
behaviorService.pushBehaviorOld(OPERATION, "批量更新事件", request); behaviorService.pushBehaviorOld(OPERATION, "批量更新事件", request);
return ResponseResult.success(); return ResponseResult.success();
} }
......
...@@ -190,6 +190,12 @@ public interface EventService { ...@@ -190,6 +190,12 @@ public interface EventService {
void analysisEvents(List<String> eventIds); void analysisEvents(List<String> eventIds);
/** /**
* 异步批量更新事件
* @param eventIds 事件ID集合
*/
void analysisEventsAsync(List<String> eventIds);
/**
* 搜索舆情全部事件标签 * 搜索舆情全部事件标签
* *
* @param linkedGroupId 关联项目组ID * @param linkedGroupId 关联项目组ID
...@@ -278,4 +284,11 @@ public interface EventService { ...@@ -278,4 +284,11 @@ public interface EventService {
*/ */
List<JSONObject> getLastEventTop(String keyword,int limit); List<JSONObject> getLastEventTop(String keyword,int limit);
/**
* 获取未结束的事件
* @param projectId
* @return
*/
List<Event> findNotEndEventByProjectId(String projectId);
} }
...@@ -33,4 +33,8 @@ public interface TaskService{ ...@@ -33,4 +33,8 @@ public interface TaskService{
*/ */
void cleanAggreeTask(); void cleanAggreeTask();
/**
* 事件相关更新
*/
void eventUpdate();
} }
...@@ -64,13 +64,19 @@ public class EventDataServiceImpl implements EventDataService { ...@@ -64,13 +64,19 @@ public class EventDataServiceImpl implements EventDataService {
@Override @Override
public void analysisEvent(Event event) { public void analysisEvent(Event event) {
long startTime = event.getStartTime(); long startTime = event.getStartTime();
long endTime = event.isEndStatus() ? event.getEndTime() : System.currentTimeMillis(); long now = System.currentTimeMillis();
Period periodDay = new Period(startTime, endTime, PeriodType.days()); // 时间异常
if(startTime >= now){
return;
}
long endTime = event.isEndStatus() ? event.getEndTime() : now;
//时间区间大于40天 就不更新 //时间区间大于40天 就不更新
if (periodDay.getDays() >= 40 || startTime > System.currentTimeMillis()) { if (new Period(startTime, endTime, PeriodType.days()).getDays() >= 40) {
event.setEndTime(endTime); if (!event.isEndStatus()) {
event.setEndStatus(true); event.setEndTime(endTime);
eventDao.updateOne(event); event.setEndStatus(true);
eventDao.updateOne(event);
}
log.info("analysisEvent-eventId:{},时间区间大于40天,不做更新", event.getId()); log.info("analysisEvent-eventId:{},时间区间大于40天,不做更新", event.getId());
return; return;
} }
......
...@@ -138,6 +138,7 @@ public class EventServiceImpl implements EventService { ...@@ -138,6 +138,7 @@ public class EventServiceImpl implements EventService {
AtomicInteger count = new AtomicInteger(); AtomicInteger count = new AtomicInteger();
//事件太多塞满队列了 //事件太多塞满队列了
ApplicationProjectListener.getThreadPool().execute(() -> { ApplicationProjectListener.getThreadPool().execute(() -> {
List<String> eventIds = new ArrayList<>();
yqEventList.forEach(yqEventDTO -> { yqEventList.forEach(yqEventDTO -> {
Event existEvent = eventDao.getEventByUniqueIds(yqEventDTO.getYqEventId(), projectId, contendId); Event existEvent = eventDao.getEventByUniqueIds(yqEventDTO.getYqEventId(), projectId, contendId);
Event event = Event.createFromYqEventDTO(yqEventDTO, collectionName, projectId, contendId); Event event = Event.createFromYqEventDTO(yqEventDTO, collectionName, projectId, contendId);
...@@ -149,12 +150,14 @@ public class EventServiceImpl implements EventService { ...@@ -149,12 +150,14 @@ public class EventServiceImpl implements EventService {
event.setCTime(new Date().getTime()); event.setCTime(new Date().getTime());
eventDao.insertOne(event); eventDao.insertOne(event);
} }
eventIds.add(event.getId());
if (count.incrementAndGet() == yqEventList.size()) { if (count.incrementAndGet() == yqEventList.size()) {
stringRedisTemplate.opsForValue().set(redisKey, String.valueOf(100), 1, TimeUnit.MINUTES); stringRedisTemplate.opsForValue().set(redisKey, String.valueOf(100), 1, TimeUnit.MINUTES);
} else { } else {
stringRedisTemplate.opsForValue().set(redisKey, String.valueOf((count.get()) * 100 / yqEventList.size())); stringRedisTemplate.opsForValue().set(redisKey, String.valueOf((count.get()) * 100 / yqEventList.size()));
} }
}); });
analysisEventsAsync(eventIds);
}); });
} }
...@@ -456,7 +459,10 @@ public class EventServiceImpl implements EventService { ...@@ -456,7 +459,10 @@ public class EventServiceImpl implements EventService {
@Override @Override
public void analysisEvents(List<String> eventIds) { public void analysisEvents(List<String> eventIds) {
ApplicationProjectListener.getThreadPool().execute(() -> eventIds.forEach(eventId -> { if (Tools.isEmpty(eventIds)) {
return;
}
eventIds.forEach(eventId -> {
String redisKey = null; String redisKey = null;
try { try {
Event event = getEventById(eventId); Event event = getEventById(eventId);
...@@ -469,7 +475,12 @@ public class EventServiceImpl implements EventService { ...@@ -469,7 +475,12 @@ public class EventServiceImpl implements EventService {
log.error("eventId:{}更新失败", eventId, e); log.error("eventId:{}更新失败", eventId, e);
redisUtil.set(redisKey, "-1"); redisUtil.set(redisKey, "-1");
} }
})); });
}
@Override
public void analysisEventsAsync(List<String> eventIds) {
ApplicationProjectListener.getThreadPool().execute(() -> analysisEvents(eventIds));
} }
@Override @Override
...@@ -666,6 +677,12 @@ public class EventServiceImpl implements EventService { ...@@ -666,6 +677,12 @@ public class EventServiceImpl implements EventService {
}).sorted((o1, o2) -> Long.compare((long) o2.get("total"), (long) o1.get("total"))).collect(Collectors.toList()); }).sorted((o1, o2) -> Long.compare((long) o2.get("total"), (long) o1.get("total"))).collect(Collectors.toList());
} }
@Override
public List<Event> findNotEndEventByProjectId(String projectId) {
Query query = Query.query(Criteria.where("projectId").is(projectId).and("endStatus").is(false));
return eventDao.findList(query);
}
/** /**
* 获取时间筛选条件 * 获取时间筛选条件
* *
......
...@@ -3,10 +3,7 @@ package com.zhiwei.brandkbs2.service.impl; ...@@ -3,10 +3,7 @@ package com.zhiwei.brandkbs2.service.impl;
import com.zhiwei.brandkbs2.auth.UserThreadLocal; import com.zhiwei.brandkbs2.auth.UserThreadLocal;
import com.zhiwei.brandkbs2.common.GlobalPojo; import com.zhiwei.brandkbs2.common.GlobalPojo;
import com.zhiwei.brandkbs2.config.Constant; import com.zhiwei.brandkbs2.config.Constant;
import com.zhiwei.brandkbs2.dao.AggreeResultDao; import com.zhiwei.brandkbs2.dao.*;
import com.zhiwei.brandkbs2.dao.BrandkbsTaskDao;
import com.zhiwei.brandkbs2.dao.ChannelDao;
import com.zhiwei.brandkbs2.dao.ReportSettingsDao;
import com.zhiwei.brandkbs2.enmus.ReportTypeEnum; import com.zhiwei.brandkbs2.enmus.ReportTypeEnum;
import com.zhiwei.brandkbs2.es.ChannelEsDao; import com.zhiwei.brandkbs2.es.ChannelEsDao;
import com.zhiwei.brandkbs2.es.EsClientDao; import com.zhiwei.brandkbs2.es.EsClientDao;
...@@ -75,6 +72,9 @@ public class TaskServiceImpl implements TaskService { ...@@ -75,6 +72,9 @@ public class TaskServiceImpl implements TaskService {
@Resource(name = "commonServiceImpl") @Resource(name = "commonServiceImpl")
CommonService commonService; CommonService commonService;
@Resource(name = "eventServiceImpl")
EventService eventService;
@Resource(name = "taskServiceExecutor") @Resource(name = "taskServiceExecutor")
ThreadPoolTaskExecutor taskServiceExecutor; ThreadPoolTaskExecutor taskServiceExecutor;
...@@ -277,6 +277,19 @@ public class TaskServiceImpl implements TaskService { ...@@ -277,6 +277,19 @@ public class TaskServiceImpl implements TaskService {
log.info("本次清理过期聚合:{}条", deleteCount); log.info("本次清理过期聚合:{}条", deleteCount);
} }
@Override
public void eventUpdate() {
for (Project project : projectService.getAllProjectsWithStart()) {
List<Event> events = eventService.findNotEndEventByProjectId(project.getId());
log.info("{}项目,未结束事件{}个", project.getProjectName(), events.size());
if (Tools.isEmpty(events)) {
continue;
}
eventService.analysisEvents(events.stream().map(Event::getId).collect(Collectors.toList()));
log.info("{}项目,未结束事件已更新{}个", project.getProjectName(), events.size());
}
}
private boolean reportSendByProject(Project project) { private boolean reportSendByProject(Project project) {
boolean flag = false; boolean flag = false;
// 扫描setting信息 // 扫描setting信息
......
...@@ -77,4 +77,17 @@ public class ControlCenter { ...@@ -77,4 +77,17 @@ public class ControlCenter {
} }
} }
@Async("scheduledExecutor")
@Scheduled(cron = "0 0 2 * * ?")
public void eventUpdate() {
log.info("定时按天更新事件任务-启动");
try {
taskService.eventUpdate();
} catch (Exception e) {
log.error("定时按天更新事件任务-出错", e);
} finally {
log.info("定时按天更新事件任务-结束");
}
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment