Commit f31477c4 by 303514581@qq.com

2019/1/28 backup优化启动时效率

parent cae0ba33
...@@ -35,17 +35,19 @@ public class ES4RedisRunner implements ApplicationRunner { ...@@ -35,17 +35,19 @@ public class ES4RedisRunner implements ApplicationRunner {
// gs.getDataByeRedis("", "", 0, 0, 0); // gs.getDataByeRedis("", "", 0, 0, 0);
ESGetCommonId esGetCommonId = ApplicationContextProvider.getBean("ESGetCommonId", ESGetCommonId.class); ESGetCommonId esGetCommonId = ApplicationContextProvider.getBean("ESGetCommonId", ESGetCommonId.class);
esGetCommonId.getCommonId(); esGetCommonId.getCommonId();
esGetCommonId.getbackupStartId();
//启动时更新事件等待采集列表中的前十个任务状态为采集完毕 // 启动时更新事件等待采集列表中的前十个任务状态为采集完毕
boolean isSuccess=esGetCommonId.updateTopTenCollection(); boolean isSuccess = esGetCommonId.updateTopTenCollection();
log.info("前十个任务状态更新为采集完毕是否成功:{}",isSuccess); log.info("前十个任务状态更新为采集完毕是否成功:{}", isSuccess);
/** /**
* redis存入缓存 * redis存入缓存
*/ */
// 手动注入bean ES4RedisStart // 手动注入bean ES4RedisStart
ES4RedisStart start = ApplicationContextProvider.getBean("ES4RedisStart", ES4RedisStart.class); ES4RedisStart start = ApplicationContextProvider.getBean("ES4RedisStart", ES4RedisStart.class);
DES4RedisStart directstart = ApplicationContextProvider.getBean("DES4RedisStart", DES4RedisStart.class); // DES4RedisStart directstart = ApplicationContextProvider.getBean("DES4RedisStart", DES4RedisStart.class);
EVENTAutoMarkStart eventStart = ApplicationContextProvider.getBean("EVENTAutoMarkStart", EVENTAutoMarkStart.class); EVENTAutoMarkStart eventStart = ApplicationContextProvider.getBean("EVENTAutoMarkStart",
EVENTAutoMarkStart.class);
// 定时器 // 定时器
Timer timer = new Timer(); Timer timer = new Timer();
...@@ -55,16 +57,14 @@ public class ES4RedisRunner implements ApplicationRunner { ...@@ -55,16 +57,14 @@ public class ES4RedisRunner implements ApplicationRunner {
public void run() { public void run() {
try { try {
start.startThread(); start.startThread();
directstart.startThread(); // directstart.startThread();
eventStart.startThread(); // eventStart.startThread();
} catch (Exception e) { } catch (Exception e) {
log.error("主定时器异常{}{}",e.getMessage(),e.getStackTrace()); log.error("主定时器异常{}{}", e.getMessage(), e.getStackTrace());
} }
} }
}, 100L, 50 * 1000L); }, 100L, 50 * 1000L);
// /** // /**
// * ES库消息输出Excel并分析关键词重复数据 // * ES库消息输出Excel并分析关键词重复数据
// */ // */
......
...@@ -38,7 +38,7 @@ public class ES4RedisStart { ...@@ -38,7 +38,7 @@ public class ES4RedisStart {
private Map<String, Boolean> PROJECTMAP = new HashMap<>(); private Map<String, Boolean> PROJECTMAP = new HashMap<>();
private static final int PROJECTCOUNT = 6; private static final int PROJECTCOUNT = 10;
/** /**
* 启动线程 * 启动线程
...@@ -51,17 +51,17 @@ public class ES4RedisStart { ...@@ -51,17 +51,17 @@ public class ES4RedisStart {
// 本轮项目计数 // 本轮项目计数
int pcount = 0; int pcount = 0;
//new Map // new Map
if (Tools.isEmpty(PROJECTMAP.keySet())||isProjectMapFull()) { if (Tools.isEmpty(PROJECTMAP.keySet()) || isProjectMapFull()) {
projects.forEach(p -> { projects.forEach(p -> {
PROJECTMAP.put(p.getProjectName(), false); PROJECTMAP.put(p.getProjectName(), false);
}); });
PROJECTMAP.keySet().forEach(p -> { PROJECTMAP.keySet().forEach(p -> {
System.out.println(p+"\t"+PROJECTMAP.get(p)); System.out.println(p + "\t" + PROJECTMAP.get(p));
}); });
}else { } else {
PROJECTMAP.keySet().forEach(p -> { PROJECTMAP.keySet().forEach(p -> {
System.out.println(p+"\t"+PROJECTMAP.get(p)); System.out.println(p + "\t" + PROJECTMAP.get(p));
}); });
} }
...@@ -111,8 +111,10 @@ public class ES4RedisStart { ...@@ -111,8 +111,10 @@ public class ES4RedisStart {
es4RedisThread.start(); es4RedisThread.start();
} }
} }
/** /**
* 判断项目map是否一轮结束 * 判断项目map是否一轮结束
*
* @param projects * @param projects
* @Title: isProjectMapFull * @Title: isProjectMapFull
* @Description: 判断项目map是否一轮结束 * @Description: 判断项目map是否一轮结束
...@@ -133,9 +135,9 @@ public class ES4RedisStart { ...@@ -133,9 +135,9 @@ public class ES4RedisStart {
* @return boolean 返回类型 * @return boolean 返回类型
*/ */
private boolean canProjectRead(Project project) { private boolean canProjectRead(Project project) {
if(PROJECTMAP.get(project.getProjectName())) { if (PROJECTMAP.get(project.getProjectName())) {
return false; return false;
}else { } else {
PROJECTMAP.put(project.getProjectName(), true); PROJECTMAP.put(project.getProjectName(), true);
return true; return true;
} }
......
...@@ -111,7 +111,10 @@ public class ES4RedisTask { ...@@ -111,7 +111,10 @@ public class ES4RedisTask {
String allRedisKey = platformName + "-" + project.getProjectName() + "-" + allkeytitle; String allRedisKey = platformName + "-" + project.getProjectName() + "-" + allkeytitle;
// 获取 平台-项目-关键词组 对应的rsid作为查询范围的起始点 // 获取 平台-项目-关键词组 对应的rsid作为查询范围的起始点
long allstartrsid = rsidMap.get(allRedisKey) == null ? -1L : Long.valueOf(rsidMap.get(allRedisKey)); long allstartrsid = (rsidMap.get(allRedisKey) == null ? -1L
: Long.valueOf(rsidMap.get(allRedisKey))) >= ESGetCommonId.START_BACKUPID
? Long.valueOf(rsidMap.get(allRedisKey))
: ESGetCommonId.START_BACKUPID;
// 用于存储数据获取后新的rsid // 用于存储数据获取后新的rsid
Long allrsid = -1L; Long allrsid = -1L;
...@@ -121,10 +124,10 @@ public class ES4RedisTask { ...@@ -121,10 +124,10 @@ public class ES4RedisTask {
*/ */
List<String> allkeywords = new ArrayList<>(); List<String> allkeywords = new ArrayList<>();
for (KeywordNew kwn : keywordNews) { for (KeywordNew kwn : keywordNews) {
if (null==kwn.getPtList()) {// 滤过非选配词组 if (null == kwn.getPtList()) {// 滤过非选配词组
continue; continue;
} }
if (kwn.getPtList().isEmpty()||!kwn.getPtList().contains(platformName)) {// 滤过非选配词组 if (kwn.getPtList().isEmpty() || !kwn.getPtList().contains(platformName)) {// 滤过非选配词组
continue; continue;
} }
if (null == kwn.getKeyWords() || kwn.getKeyWords().isEmpty()) {// 滤过空词组 if (null == kwn.getKeyWords() || kwn.getKeyWords().isEmpty()) {// 滤过空词组
...@@ -132,15 +135,15 @@ public class ES4RedisTask { ...@@ -132,15 +135,15 @@ public class ES4RedisTask {
} }
allkeywords.addAll(kwn.getKeyWords()); allkeywords.addAll(kwn.getKeyWords());
} }
//项目关键词为空 // 项目关键词为空
if(allkeywords.isEmpty()) { if (allkeywords.isEmpty()) {
continue; continue;
} }
// 根据不同平台获取数据(同一方法,统一封装为消息流实体 // 根据不同平台获取数据(同一方法,统一封装为消息流实体
RsidAndMessages ram = disposeMessageService.getFilteredMessage(noiseRules, allkeywords, count, RsidAndMessages ram = disposeMessageService.getFilteredMessage(noiseRules, allkeywords, count,
allstartrsid, -1L, platform, project.getProjectName(),project.getMatchFields()); allstartrsid, -1L, platform, project.getProjectName(), project.getMatchFields());
// 获取查询到的信息 // 获取查询到的信息
List<JSONObject> messages = ram.getJlist(); List<JSONObject> messages = ram.getJlist();
...@@ -151,8 +154,8 @@ public class ES4RedisTask { ...@@ -151,8 +154,8 @@ public class ES4RedisTask {
continue; continue;
} }
//自动标注 // 自动标注
autoMarkService.autoMarkMessages(messages,project); autoMarkService.autoMarkMessages(messages, project);
// log.info("{}平台{}关键词数据获取{}条", platformName, "全部", messages.size()); // log.info("{}平台{}关键词数据获取{}条", platformName, "全部", messages.size());
...@@ -164,15 +167,14 @@ public class ES4RedisTask { ...@@ -164,15 +167,14 @@ public class ES4RedisTask {
// 向redis写入数据 // 向redis写入数据
redisService.setMessage2Redis(allRedisKey, messages, allkeywordcount); redisService.setMessage2Redis(allRedisKey, messages, allkeywordcount);
newRsidMap.put(allRedisKey, Integer.valueOf(allrsid.toString())); newRsidMap.put(allRedisKey, Integer.valueOf(allrsid.toString()));
// 遍历关键词组 // 遍历关键词组
for (KeywordNew kwn : keywordNews) { for (KeywordNew kwn : keywordNews) {
if (null==kwn.getPtList()) {// 滤过非选配词组 if (null == kwn.getPtList()) {// 滤过非选配词组
continue; continue;
} }
if (kwn.getPtList().isEmpty()||!kwn.getPtList().contains(platformName)) {// 滤过非选配词组 if (kwn.getPtList().isEmpty() || !kwn.getPtList().contains(platformName)) {// 滤过非选配词组
continue; continue;
} }
if (null == kwn.getKeyWords() || kwn.getKeyWords().isEmpty()) {// 滤过空词组 if (null == kwn.getKeyWords() || kwn.getKeyWords().isEmpty()) {// 滤过空词组
...@@ -183,8 +185,12 @@ public class ES4RedisTask { ...@@ -183,8 +185,12 @@ public class ES4RedisTask {
String redisKey = platformName + "-" + project.getProjectName() + "-" + kwn.getKeyTitle(); String redisKey = platformName + "-" + project.getProjectName() + "-" + kwn.getKeyTitle();
// 获取 平台-项目-关键词组 对应的rsid作为查询范围的起始点 // 获取 平台-项目-关键词组 对应的rsid作为查询范围的起始点
Long startrsid = rsidMap.get(redisKey) == null ? -1L : Long.valueOf(rsidMap.get(redisKey)); Long startrsid = (rsidMap.get(redisKey) == null ? -1L
: Long.valueOf(rsidMap.get(redisKey))) >= ESGetCommonId.START_BACKUPID
? Long.valueOf(rsidMap.get(redisKey))
: ESGetCommonId.START_BACKUPID;
// System.err.println(redisKey+"\tstartrsid:"+startrsid);
// 用于存储数据获取后新的rsid // 用于存储数据获取后新的rsid
Long keyrsid = -1L; Long keyrsid = -1L;
...@@ -193,8 +199,8 @@ public class ES4RedisTask { ...@@ -193,8 +199,8 @@ public class ES4RedisTask {
// System.out.println("=============这是分界线============="); // System.out.println("=============这是分界线=============");
// 获取新的rsid和信息实体 // 获取新的rsid和信息实体
RsidAndMessages ramkey = disposeMessageService.getFilteredMessage(noiseRules, RsidAndMessages ramkey = disposeMessageService.getFilteredMessage(noiseRules, kwn.getKeyWords(),
kwn.getKeyWords(), count, startrsid, -1L, platform, project.getProjectName(),project.getMatchFields()); count, startrsid, -1L, platform, project.getProjectName(), project.getMatchFields());
// 获取查询到的信息 // 获取查询到的信息
List<JSONObject> messageskey = ramkey.getJlist(); List<JSONObject> messageskey = ramkey.getJlist();
...@@ -219,7 +225,7 @@ public class ES4RedisTask { ...@@ -219,7 +225,7 @@ public class ES4RedisTask {
} // 遍历关键词组 } // 遍历关键词组
// 预警 // 预警
earlyWarningService.earlyWarningNew(messages,trackRules, count, allstartrsid, allrsid, platform, earlyWarningService.earlyWarningNew(messages, trackRules, count, allstartrsid, allrsid, platform,
project.getProjectName()); project.getProjectName());
// 向redis库中存储新的rsid Map,覆盖原有数据 // 向redis库中存储新的rsid Map,覆盖原有数据
......
...@@ -39,7 +39,8 @@ public class ES4RedisThreadNew extends Thread { ...@@ -39,7 +39,8 @@ public class ES4RedisThreadNew extends Thread {
// private static List<String> ThreadList = new ArrayList<>(); // private static List<String> ThreadList = new ArrayList<>();
public ES4RedisThreadNew(String name, Project project, List<PlatformNew> allplatformNames, ES4RedisTask es4RedisTask) { public ES4RedisThreadNew(String name, Project project, List<PlatformNew> allplatformNames,
ES4RedisTask es4RedisTask) {
threadName = name; threadName = name;
this.project = project; this.project = project;
this.platformNames = allplatformNames; this.platformNames = allplatformNames;
...@@ -74,7 +75,7 @@ public class ES4RedisThreadNew extends Thread { ...@@ -74,7 +75,7 @@ public class ES4RedisThreadNew extends Thread {
// 超时则线程中止 // 超时则线程中止
if (t.isAlive()) { if (t.isAlive()) {
t.interrupt(); t.interrupt();
log.warn("{}项目超时线程状态:{}", project.getProjectName(),t.isInterrupted()); log.warn("{}项目超时线程状态:{}", project.getProjectName(), t.isInterrupted());
} }
} }
}, 49 * 1000L); }, 49 * 1000L);
...@@ -108,7 +109,7 @@ public class ES4RedisThreadNew extends Thread { ...@@ -108,7 +109,7 @@ public class ES4RedisThreadNew extends Thread {
// 程序执行出现异常则线程中止 // 程序执行出现异常则线程中止
// timer.cancel(); // timer.cancel();
Thread.currentThread().interrupted(); Thread.currentThread().interrupted();
log.error("{}项目出现异常,线程状态:{}", project.getProjectName(),Thread.currentThread().isInterrupted()); log.error("{}项目出现异常,线程状态:{}", project.getProjectName(), Thread.currentThread().isInterrupted());
} }
// else // else
// // 程序正常执行完毕,关闭超时控制器 // // 程序正常执行完毕,关闭超时控制器
......
...@@ -22,6 +22,8 @@ public class ESGetCommonId { ...@@ -22,6 +22,8 @@ public class ESGetCommonId {
public static long START_COMMONID; public static long START_COMMONID;
public static long START_BACKUPID;
public static long TIME; public static long TIME;
public void getCommonId() { public void getCommonId() {
...@@ -67,4 +69,22 @@ public class ESGetCommonId { ...@@ -67,4 +69,22 @@ public class ESGetCommonId {
return eventService.updateTopTenCollection(); return eventService.updateTopTenCollection();
} }
public void getbackupStartId() {
long nowtime = System.currentTimeMillis() - 20 * 60 * 1000L;
List<Integer> cidList = new ArrayList<>();
int weibo = esDao.getCommonidByTime("weibo", nowtime);
cidList.add(weibo);
int media = esDao.getCommonidByTime("media", nowtime);
cidList.add(media);
int video = esDao.getCommonidByTime("video", nowtime);
cidList.add(video);
int zhihu = esDao.getCommonidByTime("zhihu", nowtime);
cidList.add(zhihu);
Collections.sort(cidList);
// System.err.println(weibo + " " + media + " " + video + " " + zhihu + "\n" +
// cidList);
START_BACKUPID = cidList.get(0);
System.err.println("START_BACKUPID:" + START_BACKUPID + "\tNOW:" + nowtime);
}
} }
...@@ -797,10 +797,11 @@ public class ESDaoImpl implements ESDao { ...@@ -797,10 +797,11 @@ public class ESDaoImpl implements ESDao {
// 获取当天最早commonid // 获取当天最早commonid
int commonid = 0; int commonid = 0;
long now =System.currentTimeMillis();
long date = endtime; long date = endtime;
String weibotime = TimeUtil.formatDateToMinute(new Date(date)); // String weibotime = TimeUtil.formatDateToMinute(new Date(date));
String zhihutime = TimeUtil.formatEsDate(new Date(date - 8 * 3600 * 1000L)) + "Z"; // String zhihutime = TimeUtil.formatEsDate(new Date(date - 8 * 3600 * 1000L)) + "Z";
String vmtime = TimeUtil.formatEsDate(new Date(date - 8 * 3600 * 1000L)); // String vmtime = TimeUtil.formatEsDate(new Date(date - 8 * 3600 * 1000L));
// weibo // weibo
if (tableName.equals("weibo")) { if (tableName.equals("weibo")) {
List<String> list = indexes.getLastIndexes("network", 2); List<String> list = indexes.getLastIndexes("network", 2);
...@@ -809,7 +810,8 @@ public class ESDaoImpl implements ESDao { ...@@ -809,7 +810,8 @@ public class ESDaoImpl implements ESDao {
.setQuery(QueryBuilders.matchAllQuery()).setFetchSource(new String[] { "commonid" }, null) .setQuery(QueryBuilders.matchAllQuery()).setFetchSource(new String[] { "commonid" }, null)
.addSort("commonid", SortOrder.ASC).setFrom(0).setSize(1); .addSort("commonid", SortOrder.ASC).setFrom(0).setSize(1);
searchRequestBuilder.setPostFilter(QueryBuilders.rangeQuery("time").from(weibotime)); searchRequestBuilder.setPostFilter(QueryBuilders.rangeQuery("savetime").from(date).to(now));
// searchRequestBuilder.setPostFilter(QueryBuilders.rangeQuery("time").from(weibotime));
SearchResponse response = searchRequestBuilder.execute().actionGet(); SearchResponse response = searchRequestBuilder.execute().actionGet();
SearchHits searchHits = response.getHits(); SearchHits searchHits = response.getHits();
...@@ -822,7 +824,7 @@ public class ESDaoImpl implements ESDao { ...@@ -822,7 +824,7 @@ public class ESDaoImpl implements ESDao {
.setQuery(QueryBuilders.matchAllQuery()).setFetchSource(new String[] { "commonid" }, null) .setQuery(QueryBuilders.matchAllQuery()).setFetchSource(new String[] { "commonid" }, null)
.addSort("commonid", SortOrder.ASC).setFrom(0).setSize(1); .addSort("commonid", SortOrder.ASC).setFrom(0).setSize(1);
searchRequestBuilder.setPostFilter(QueryBuilders.rangeQuery("time").from(vmtime)); searchRequestBuilder.setPostFilter(QueryBuilders.rangeQuery("savetime").from(date).to(now));
SearchResponse response = searchRequestBuilder.execute().actionGet(); SearchResponse response = searchRequestBuilder.execute().actionGet();
SearchHits searchHits = response.getHits(); SearchHits searchHits = response.getHits();
...@@ -835,7 +837,7 @@ public class ESDaoImpl implements ESDao { ...@@ -835,7 +837,7 @@ public class ESDaoImpl implements ESDao {
.setQuery(QueryBuilders.matchAllQuery()).setFetchSource(new String[] { "commonid" }, null) .setQuery(QueryBuilders.matchAllQuery()).setFetchSource(new String[] { "commonid" }, null)
.addSort("commonid", SortOrder.ASC).setFrom(0).setSize(1); .addSort("commonid", SortOrder.ASC).setFrom(0).setSize(1);
searchRequestBuilder.setPostFilter(QueryBuilders.rangeQuery("time").from(vmtime)); searchRequestBuilder.setPostFilter(QueryBuilders.rangeQuery("savetime").from(date).to(now));
SearchResponse response = searchRequestBuilder.execute().actionGet(); SearchResponse response = searchRequestBuilder.execute().actionGet();
SearchHits searchHits = response.getHits(); SearchHits searchHits = response.getHits();
...@@ -846,7 +848,7 @@ public class ESDaoImpl implements ESDao { ...@@ -846,7 +848,7 @@ public class ESDaoImpl implements ESDao {
.setQuery(QueryBuilders.matchAllQuery()).setFetchSource(new String[] { "commonid" }, null) .setQuery(QueryBuilders.matchAllQuery()).setFetchSource(new String[] { "commonid" }, null)
.addSort("commonid", SortOrder.ASC).setFrom(0).setSize(1); .addSort("commonid", SortOrder.ASC).setFrom(0).setSize(1);
searchRequestBuilder.setPostFilter(QueryBuilders.rangeQuery("insert_at").from(zhihutime)); searchRequestBuilder.setPostFilter(QueryBuilders.rangeQuery("savetime").from(date).to(now));
SearchResponse response = searchRequestBuilder.execute().actionGet(); SearchResponse response = searchRequestBuilder.execute().actionGet();
SearchHits searchHits = response.getHits(); SearchHits searchHits = response.getHits();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment