Commit 0044078e by shentao

2018/10/20 预防消息流重启导致事件采集中断,加入自动恢复机制

parent 520c6fd1
...@@ -37,7 +37,8 @@ public class ES4RedisRunner implements ApplicationRunner { ...@@ -37,7 +37,8 @@ public class ES4RedisRunner implements ApplicationRunner {
esGetCommonId.getCommonId(); esGetCommonId.getCommonId();
//启动时更新事件等待采集列表中的前十个任务状态为采集完毕 //启动时更新事件等待采集列表中的前十个任务状态为采集完毕
boolean isSuccess=esGetCommonId.updateTopTenCollection();
log.info("前十个任务状态更新为采集完毕是否成功:{}",isSuccess);
/** /**
* redis存入缓存 * redis存入缓存
*/ */
......
...@@ -8,12 +8,16 @@ import org.springframework.beans.factory.annotation.Autowired; ...@@ -8,12 +8,16 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import com.zhiwei.messageflow.es.dao.ESDao; import com.zhiwei.messageflow.es.dao.ESDao;
import com.zhiwei.messageflow.service.EventService;
@Component @Component
public class ESGetCommonId { public class ESGetCommonId {
@Autowired @Autowired
private ESDao esDao; private ESDao esDao;
@Autowired
private EventService eventService;
public static int START_COMMONID; public static int START_COMMONID;
...@@ -56,4 +60,8 @@ public class ESGetCommonId { ...@@ -56,4 +60,8 @@ public class ESGetCommonId {
return commonid; return commonid;
} }
public boolean updateTopTenCollection() {
return eventService.updateTopTenCollection();
}
} }
...@@ -64,5 +64,13 @@ public interface EventService { ...@@ -64,5 +64,13 @@ public interface EventService {
* @throws JsonParseException * @throws JsonParseException
*/ */
boolean autoMark(Event eventCollection) throws JsonParseException, JsonMappingException, IOException; boolean autoMark(Event eventCollection) throws JsonParseException, JsonMappingException, IOException;
/**
* 更新事件等待采集列表中的前十个任务状态为采集完毕
* @Title: updateTopTenCollection
* @Description: 更新事件等待采集列表中的前十个任务状态为采集完毕
* @param @return 设定文件
* @return boolean 返回类型
*/
boolean updateTopTenCollection();
} }
...@@ -171,4 +171,19 @@ public class EventServiceImpl implements EventService { ...@@ -171,4 +171,19 @@ public class EventServiceImpl implements EventService {
return true; return true;
} }
@Override
public boolean updateTopTenCollection() {
List<String> idList = redisService.getNeedAutoMark();
List<Event> events = null!=findEventsbyIds(idList)?findEventsbyIds(idList):new ArrayList<>();
for (int i = 0; i < events.size(); i++) {
Event eventCollection = events.get(i);
if(!eventCollection.getStatus().equals("采集完毕")) {
Update over = new Update();
over.update("status", "采集完毕");
primaryMongoTemplate.upsert(new Query(Criteria.where("_id").is(eventCollection.getId())), over, Event.class);
}
}
return true;
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment