Commit ed96a533 by shentao

2018/11/19 稳定版:追踪规则+消息流灵活上限1.0.0版

parent 22a3d9e4
...@@ -30,11 +30,15 @@ public class ES4RedisStart { ...@@ -30,11 +30,15 @@ public class ES4RedisStart {
@Autowired @Autowired
private ES4RedisTask es4RedisTask; private ES4RedisTask es4RedisTask;
@Autowired
private ESGetCommonId esGetCommonId;
/** /**
* 启动线程 * 启动线程
*/ */
public void startThread() throws JsonParseException, JsonMappingException, IOException, InterruptedException { public void startThread() throws JsonParseException, JsonMappingException, IOException, InterruptedException {
//更新commonid
esGetCommonId.getCommonId();
// 项目列表 // 项目列表
List<Project> projects = projectDao.getAllProjects(); List<Project> projects = projectDao.getAllProjects();
...@@ -44,7 +48,7 @@ public class ES4RedisStart { ...@@ -44,7 +48,7 @@ public class ES4RedisStart {
// 遍历项目 // 遍历项目
for (Project project : projects) { for (Project project : projects) {
// if(!project.getProjectName().equals("今日头条")) { // if(!project.getProjectName().equals("测试")) {
// continue; // continue;
// } // }
......
...@@ -97,29 +97,6 @@ public class ES4RedisTask { ...@@ -97,29 +97,6 @@ public class ES4RedisTask {
// 项目预警规则列表 // 项目预警规则列表
List<TrackRule> trackRules = trackRuleDao.getTrackRuleByProject(project.getProjectName()); List<TrackRule> trackRules = trackRuleDao.getTrackRuleByProject(project.getProjectName());
/*
* //测试预警规则 List<TrackRule> trackRules=new ArrayList<>(); TrackRule tr1=new
* TrackRule(); TrackRule tr2=new TrackRule(); TrackRule tr3=new TrackRule();
* Date date = new Date(); Long createAt=date.getTime();
* tr1.setCreateAt(createAt); tr1.setAndOr(""); tr1.setWarn(false);
* tr1.setRuleType("keyWords"); tr1.setSubmitter("虞诚毅");
* tr1.setKeyWordsInputOne("知乎"); tr1.setRuleName("测试规则1");
* tr1.setRuleExplain("测试说明1"); tr1.setEarlyWarningTime("三天");
* tr1.setProject("腾讯"); tr1.setEarlyWarning("email");
* tr2.setCreateAt(createAt); tr2.setAndOr("或"); tr2.setWarn(false);
* tr2.setRuleType("keyWords"); tr2.setSubmitter("虞诚毅");
* tr2.setKeyWordsInputOne("知乎"); tr2.setKeyWordsInputTwo("微博");
* tr2.setRuleName("测试规则2"); tr2.setRuleExplain("测试说明3");
* tr2.setEarlyWarningTime("三天"); tr2.setProject("腾讯");
* tr2.setEarlyWarning("email"); tr3.setCreateAt(createAt); tr3.setAndOr("且");
* tr3.setWarn(false); tr3.setRuleType("keyWords"); tr3.setSubmitter("虞诚毅");
* tr3.setKeyWordsInputOne("知乎"); tr3.setKeyWordsInputTwo("问题");
* tr3.setRuleName("测试规则2"); tr3.setRuleExplain("测试说明3");
* tr3.setEarlyWarningTime("三天"); tr3.setProject("腾讯");
* tr3.setEarlyWarning("wechat"); trackRules.add(tr1); trackRules.add(tr2);
* trackRules.add(tr3);
*/
// 遍历平台 // 遍历平台
for (PlatformNew platform : platformNames) { for (PlatformNew platform : platformNames) {
...@@ -242,7 +219,7 @@ public class ES4RedisTask { ...@@ -242,7 +219,7 @@ public class ES4RedisTask {
} // 遍历关键词组 } // 遍历关键词组
// 预警 // 预警
earlyWarningService.earlyWarningNew(trackRules, count, allstartrsid, allrsid, platform, earlyWarningService.earlyWarningNew(messages,trackRules, count, allstartrsid, allrsid, platform,
project.getProjectName()); project.getProjectName());
// 向redis库中存储新的rsid Map,覆盖原有数据 // 向redis库中存储新的rsid Map,覆盖原有数据
......
...@@ -2,6 +2,7 @@ package com.zhiwei.messageflow; ...@@ -2,6 +2,7 @@ package com.zhiwei.messageflow;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Date;
import java.util.List; import java.util.List;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
...@@ -19,26 +20,27 @@ public class ESGetCommonId { ...@@ -19,26 +20,27 @@ public class ESGetCommonId {
@Autowired @Autowired
private EventService eventService; private EventService eventService;
public static int START_COMMONID; public static long START_COMMONID;
public static long TIME; public static long TIME;
public void getCommonId() { public void getCommonId() {
long nowtime = System.currentTimeMillis(); long nowtime = System.currentTimeMillis();
if (TIME < nowtime - 60 * 1000L) { if (TIME < nowtime - 60 * 60 * 1000L) {
List<Integer> cidList = new ArrayList<>(); List<Integer> cidList = new ArrayList<>();
int weibo = esDao.get24hoursFirstCommonid("weibo"); int weibo = esDao.get48hoursFirstCommonid("weibo");
cidList.add(weibo); cidList.add(weibo);
int media = esDao.get24hoursFirstCommonid("media"); int media = esDao.get48hoursFirstCommonid("media");
cidList.add(media); cidList.add(media);
int video = esDao.get24hoursFirstCommonid("video"); int video = esDao.get48hoursFirstCommonid("video");
cidList.add(video); cidList.add(video);
int zhihu = esDao.get24hoursFirstCommonid("zhihu"); int zhihu = esDao.get48hoursFirstCommonid("zhihu");
cidList.add(zhihu); cidList.add(zhihu);
Collections.sort(cidList); Collections.sort(cidList);
System.err.println(weibo + " " + media + " " + video + " " + zhihu + "\n" + cidList); System.err.println(weibo + " " + media + " " + video + " " + zhihu + "\n" + cidList);
START_COMMONID = cidList.get(0); START_COMMONID = cidList.get(0);
TIME = nowtime; TIME = nowtime;
System.err.println("START_COMMONID:" + START_COMMONID + "\tNOW:" + new Date(TIME));
} }
} }
...@@ -55,7 +57,8 @@ public class ESGetCommonId { ...@@ -55,7 +57,8 @@ public class ESGetCommonId {
int zhihu = esDao.getCommonidByTime("zhihu", endtime); int zhihu = esDao.getCommonidByTime("zhihu", endtime);
cidList.add(zhihu); cidList.add(zhihu);
Collections.sort(cidList); Collections.sort(cidList);
// System.err.println(weibo + " " + media + " " + video + " " + zhihu + "\n" + cidList); // System.err.println(weibo + " " + media + " " + video + " " + zhihu + "\n" +
// cidList);
int commonid = cidList.get(0); int commonid = cidList.get(0);
return commonid; return commonid;
} }
......
...@@ -31,6 +31,8 @@ public class RedisConfig { ...@@ -31,6 +31,8 @@ public class RedisConfig {
public static final String EVENTLISTKEY = "Event:EventList"; public static final String EVENTLISTKEY = "Event:EventList";
/** 事件采集去重url集 **/ /** 事件采集去重url集 **/
public static final String EVENTHASHKEY = "Event:Hash:"; public static final String EVENTHASHKEY = "Event:Hash:";
/** 追踪规则已追踪集 **/
public static final String TRACKKEY = "TrackRule:";
private int keyMaxSize; private int keyMaxSize;
......
...@@ -101,7 +101,7 @@ public interface ESDao { ...@@ -101,7 +101,7 @@ public interface ESDao {
public SearchHits getDataFromEs(List<String> keywords, int count, long startid, long endid, PlatformNew platform, public SearchHits getDataFromEs(List<String> keywords, int count, long startid, long endid, PlatformNew platform,
String project, String matchFields); String project, String matchFields);
public int get24hoursFirstCommonid(String tableName); public int get48hoursFirstCommonid(String tableName);
public int getCommonidByTime(String tableName, long endtime); public int getCommonidByTime(String tableName, long endtime);
/** /**
......
...@@ -725,14 +725,17 @@ public class ESDaoImpl implements ESDao { ...@@ -725,14 +725,17 @@ public class ESDaoImpl implements ESDao {
} }
@Override @Override
public int get24hoursFirstCommonid(String tableName) { public int get48hoursFirstCommonid(String tableName) {
// 获取当天最早commonid // 获取当天最早commonid
int commonid = 0; int commonid = 0;
long now =System.currentTimeMillis();
long date = System.currentTimeMillis() - 1 * 24 * 60 * 60 * 1000L; long date = now - 1 * 48 * 60 * 60 * 1000L;
String weibotime = TimeUtil.formatDateToMinute(new Date(date)); // String wEtime = TimeUtil.formatDateToMinute(new Date(now));
String zhihutime = TimeUtil.formatEsDate(new Date(date - 8 * 3600 * 1000L)) + "Z"; // String weibotime = TimeUtil.formatDateToMinute(new Date(date));
String vmtime = TimeUtil.formatEsDate(new Date(date - 8 * 3600 * 1000L)); // String zEtime = TimeUtil.formatEsDate(new Date(now - 8 * 3600 * 1000L)) + "Z";
// String zhihutime = TimeUtil.formatEsDate(new Date(date - 8 * 3600 * 1000L)) + "Z";
// String vmEtime = TimeUtil.formatEsDate(new Date(now - 8 * 3600 * 1000L));
// String vmtime = TimeUtil.formatEsDate(new Date(date - 8 * 3600 * 1000L));
// weibo // weibo
if (tableName.equals("weibo")) { if (tableName.equals("weibo")) {
List<String> list = indexes.getLastIndexes("network", 2); List<String> list = indexes.getLastIndexes("network", 2);
...@@ -741,7 +744,7 @@ public class ESDaoImpl implements ESDao { ...@@ -741,7 +744,7 @@ public class ESDaoImpl implements ESDao {
.setQuery(QueryBuilders.matchAllQuery()).setFetchSource(new String[] { "commonid" }, null) .setQuery(QueryBuilders.matchAllQuery()).setFetchSource(new String[] { "commonid" }, null)
.addSort("commonid", SortOrder.ASC).setFrom(0).setSize(1); .addSort("commonid", SortOrder.ASC).setFrom(0).setSize(1);
searchRequestBuilder.setPostFilter(QueryBuilders.rangeQuery("time").from(weibotime)); searchRequestBuilder.setPostFilter(QueryBuilders.rangeQuery("savetime").from(date).to(now));
SearchResponse response = searchRequestBuilder.execute().actionGet(); SearchResponse response = searchRequestBuilder.execute().actionGet();
SearchHits searchHits = response.getHits(); SearchHits searchHits = response.getHits();
...@@ -754,7 +757,7 @@ public class ESDaoImpl implements ESDao { ...@@ -754,7 +757,7 @@ public class ESDaoImpl implements ESDao {
.setQuery(QueryBuilders.matchAllQuery()).setFetchSource(new String[] { "commonid" }, null) .setQuery(QueryBuilders.matchAllQuery()).setFetchSource(new String[] { "commonid" }, null)
.addSort("commonid", SortOrder.ASC).setFrom(0).setSize(1); .addSort("commonid", SortOrder.ASC).setFrom(0).setSize(1);
searchRequestBuilder.setPostFilter(QueryBuilders.rangeQuery("time").from(vmtime)); searchRequestBuilder.setPostFilter(QueryBuilders.rangeQuery("savetime").from(date).to(now));
SearchResponse response = searchRequestBuilder.execute().actionGet(); SearchResponse response = searchRequestBuilder.execute().actionGet();
SearchHits searchHits = response.getHits(); SearchHits searchHits = response.getHits();
...@@ -767,7 +770,7 @@ public class ESDaoImpl implements ESDao { ...@@ -767,7 +770,7 @@ public class ESDaoImpl implements ESDao {
.setQuery(QueryBuilders.matchAllQuery()).setFetchSource(new String[] { "commonid" }, null) .setQuery(QueryBuilders.matchAllQuery()).setFetchSource(new String[] { "commonid" }, null)
.addSort("commonid", SortOrder.ASC).setFrom(0).setSize(1); .addSort("commonid", SortOrder.ASC).setFrom(0).setSize(1);
searchRequestBuilder.setPostFilter(QueryBuilders.rangeQuery("time").from(vmtime)); searchRequestBuilder.setPostFilter(QueryBuilders.rangeQuery("savetime").from(date).to(now));
SearchResponse response = searchRequestBuilder.execute().actionGet(); SearchResponse response = searchRequestBuilder.execute().actionGet();
SearchHits searchHits = response.getHits(); SearchHits searchHits = response.getHits();
...@@ -778,7 +781,7 @@ public class ESDaoImpl implements ESDao { ...@@ -778,7 +781,7 @@ public class ESDaoImpl implements ESDao {
.setQuery(QueryBuilders.matchAllQuery()).setFetchSource(new String[] { "commonid" }, null) .setQuery(QueryBuilders.matchAllQuery()).setFetchSource(new String[] { "commonid" }, null)
.addSort("commonid", SortOrder.ASC).setFrom(0).setSize(1); .addSort("commonid", SortOrder.ASC).setFrom(0).setSize(1);
searchRequestBuilder.setPostFilter(QueryBuilders.rangeQuery("insert_at").from(zhihutime)); searchRequestBuilder.setPostFilter(QueryBuilders.rangeQuery("savetime").from(date).to(now));
SearchResponse response = searchRequestBuilder.execute().actionGet(); SearchResponse response = searchRequestBuilder.execute().actionGet();
SearchHits searchHits = response.getHits(); SearchHits searchHits = response.getHits();
......
...@@ -2,6 +2,7 @@ package com.zhiwei.messageflow.es.service; ...@@ -2,6 +2,7 @@ package com.zhiwei.messageflow.es.service;
import java.util.List; import java.util.List;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.messageflow.mongo.bean.PlatformNew; import com.zhiwei.messageflow.mongo.bean.PlatformNew;
import com.zhiwei.messageflow.mongo.bean.TrackRule; import com.zhiwei.messageflow.mongo.bean.TrackRule;
...@@ -9,6 +10,7 @@ public interface EarlyWarningService { ...@@ -9,6 +10,7 @@ public interface EarlyWarningService {
/** /**
* 预警 * 预警
* @param messages
* *
* @param trackRules * @param trackRules
* 预警规则 * 预警规则
...@@ -23,9 +25,6 @@ public interface EarlyWarningService { ...@@ -23,9 +25,6 @@ public interface EarlyWarningService {
* @param projectName * @param projectName
* 项目名 * 项目名
*/ */
void earlyWarning(List<TrackRule> trackRules, int count, long startid, long endid, String platformName, void earlyWarningNew(List<JSONObject> messages, List<TrackRule> trackRules, int count, long startid, Long endid, PlatformNew platform,
String projectName);
void earlyWarningNew(List<TrackRule> trackRules, int count, long startid, Long endid, PlatformNew platform,
String projectName); String projectName);
} }
package com.zhiwei.messageflow.es.service; package com.zhiwei.messageflow.es.service;
import org.elasticsearch.search.SearchHits; import com.alibaba.fastjson.JSONObject;
import com.zhiwei.messageflow.mongo.bean.PlatformNew;
import com.zhiwei.messageflow.mongo.bean.TrackRule; import com.zhiwei.messageflow.mongo.bean.TrackRule;
public interface TrackHitAndWarnService { public interface TrackHitAndWarnService {
/** /**
* 关键词预警数据获取 * 判断该条追踪规则是否处于预警状态
* * @Title: isWarnTrackrule
* @param truckRule * @Description: 判断该条追踪规则是否处于预警状态
* @param count * @param @param tr
* @param startid * @param @return 设定文件
* @param endid * @return boolean 返回类型
* @param platformName
* @return
*/
SearchHits keyWordsTrackHit(TrackRule truckRule, int count, long startid, long endid, String platformName);
/**
* 渠道预警数据获取
*
* @param truckRule
* @param count
* @param startid
* @param endid
* @param platformName
* @return
*/ */
SearchHits channelTrackHit(TrackRule truckRule, int count, long startid, long endid, String platformName); boolean isWarnTrackrule(TrackRule trackRule);
/** /**
* 相似新闻数预警数据获取 * 使用单条追踪规则
* * @Title: useTrackrule
* @param truckRule * @Description: 使用单条追踪规则
* @param count * @param @param trackrule
* @param startid * @param @return 设定文件
* @param endid * @return boolean 返回类型
* @param platformName
* @return
*/ */
SearchHits articleTrackHit(TrackRule truckRule, int count, long startid, long endid, String platformName); boolean endTrackrule(TrackRule trackrule);
/** /**
* 邮件预警 * 是否命中预警规则
*
* @param trackHits
* @param platformName * @param platformName
* @param trackRule * @Title: ishitWarnMsg
* @Description:是否命中预警规则
* @param @param msg
* @param @param tr 设定文件
* @return void 返回类型
*/ */
void WarnEmail(SearchHits trackHit, String platformName, TrackRule trackRule, String projectName); boolean ishitWarnMsg(JSONObject msg, String platformName, TrackRule tr);
/** /**
* 微信预警 * 按不同规则预警
* * @Title: warnMsg
* @param trackHits * @Description: 按不同规则预警
* @param platformName * @param @param msg
* @param trackRule * @param @param tr 设定文件
* @return void 返回类型
*/ */
void WarnWechat(SearchHits trackHit, String platformName, TrackRule trackRule, String projectName); void warnMsg(JSONObject msg, TrackRule tr);
SearchHits keyWordsTrackHitNew(TrackRule trackRule, int count, long startid, Long endid, PlatformNew platform);
SearchHits channelTrackHitNew(TrackRule trackRule, int count, long startid, Long endid, PlatformNew platform);
SearchHits articleTrackHitNew(TrackRule trackRule, int count, long startid, Long endid, PlatformNew platform);
void WarnWechatNew(SearchHits trackHit, String platformName, TrackRule trackRule, String projectName);
void WarnEmailNew(SearchHits trackHit, String platformName, TrackRule trackRule, String projectName);
} }
...@@ -2,176 +2,37 @@ package com.zhiwei.messageflow.es.service.impl; ...@@ -2,176 +2,37 @@ package com.zhiwei.messageflow.es.service.impl;
import java.util.List; import java.util.List;
import org.elasticsearch.search.SearchHits;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.messageflow.es.service.EarlyWarningService; import com.zhiwei.messageflow.es.service.EarlyWarningService;
import com.zhiwei.messageflow.es.service.TrackHitAndWarnService; import com.zhiwei.messageflow.es.service.TrackHitAndWarnService;
import com.zhiwei.messageflow.mongo.bean.PlatformNew; import com.zhiwei.messageflow.mongo.bean.PlatformNew;
import com.zhiwei.messageflow.mongo.bean.TrackRule; import com.zhiwei.messageflow.mongo.bean.TrackRule;
import com.zhiwei.messageflow.mongo.dao.TrackRuleDao;
@Component @Component
public class EarlyWarningServiceImpl implements EarlyWarningService { public class EarlyWarningServiceImpl implements EarlyWarningService {
@Autowired @Autowired
private TrackRuleDao trackRuleDao;
@Autowired
private TrackHitAndWarnService trackHitAndWarnService; private TrackHitAndWarnService trackHitAndWarnService;
@Override @Override
public void earlyWarning(List<TrackRule> trackRules, int count, long startid, long endid, String platformName, public void earlyWarningNew(List<JSONObject> messages, List<TrackRule> trackRules, int count, long startid,
String projectName) { Long endid, PlatformNew platform, String projectName) {
for (TrackRule trackRule : trackRules) {
// 判断是否预警
if (trackRule.isWarn()) {
continue;
}
/**
* 判断是否过期
*/
long createAt = trackRule.getCreateAt();
long addDay = 0;
// if (trackRule.getEarlyWarningTime().equals("一天")) {
// addDay = 24 * 60 * 60 * 1000L;
// } else if (trackRule.getEarlyWarningTime().equals("二天")) {
// addDay = 2 * 24 * 60 * 60 * 1000L;
// } else if (trackRule.getEarlyWarningTime().equals("三天")) {
// addDay = 3 * 24 * 60 * 60 * 1000L;
// }
long confirm = System.currentTimeMillis()+1L;//createAt + addDay;
if (confirm <= System.currentTimeMillis()) {
// 过期
trackRule.setWarn(true);
trackRuleDao.updateTrackrule(trackRule.get_id());
} else {
// 判断规则
// 首先判断预警方式
if (trackRule.getEarlyWarning().equals("no")) {
} else if (trackRule.getEarlyWarning().equals("wechat")) {
// 微信预警
// 处理规则
SearchHits TrackHit = null;
if (trackRule.getRuleType().equals("keyWords")) {
// 关键词追踪
TrackHit = trackHitAndWarnService.keyWordsTrackHit(trackRule, count, startid, endid,
platformName);
} else if (trackRule.getRuleType().equals("channel")) {
// 渠道追踪
TrackHit = trackHitAndWarnService.channelTrackHit(trackRule, count, startid, endid,
platformName);
} else if (trackRule.getRuleType().equals("article")) {
// 相似新闻数追踪
TrackHit = trackHitAndWarnService.articleTrackHit(trackRule, count, startid, endid,
platformName);
}
trackHitAndWarnService.WarnWechat(TrackHit, platformName, trackRule, projectName);
} else if (trackRule.getEarlyWarning().equals("email")) {
// 邮箱预警
// 处理规则
SearchHits TrackHit = null;
if (trackRule.getRuleType().equals("keyWords")) {
// 关键词追踪
TrackHit = trackHitAndWarnService.keyWordsTrackHit(trackRule, count, startid, endid,
platformName);
} else if (trackRule.getRuleType().equals("channel")) {
// 渠道追踪
TrackHit = trackHitAndWarnService.channelTrackHit(trackRule, count, startid, endid,
platformName);
} else if (trackRule.getRuleType().equals("article")) {
// 相似新闻数追踪
TrackHit = trackHitAndWarnService.articleTrackHit(trackRule, count, startid, endid,
platformName);
}
trackHitAndWarnService.WarnEmail(TrackHit, platformName, trackRule, projectName);
}
}
}
}
@Override
public void earlyWarningNew(List<TrackRule> trackRules, int count, long startid, Long endid, PlatformNew platform,
String projectName) {
String platformName = platform.getPlatformName(); String platformName = platform.getPlatformName();
for (TrackRule trackRule : trackRules) { for (int i = 0; i < messages.size(); i++) {
JSONObject msg = messages.get(i);
for (TrackRule tr : trackRules) {
// 判断是否预警 // 判断是否预警
if (trackRule.isWarn()) { if (!trackHitAndWarnService.isWarnTrackrule(tr))
continue; continue;
} // 是否命中预警规则
boolean ishit = trackHitAndWarnService.ishitWarnMsg(msg, platformName, tr);
/** // 按不同预警规则预警
* 判断是否过期 if (ishit)
*/ trackHitAndWarnService.warnMsg(msg, tr);
long createAt = trackRule.getCreateAt();
long addDay = 0;
// if (trackRule.getEarlyWarningTime().equals("一天")) {
// addDay = 24 * 60 * 60 * 1000L;
// } else if (trackRule.getEarlyWarningTime().equals("二天")) {
// addDay = 2 * 24 * 60 * 60 * 1000L;
// } else if (trackRule.getEarlyWarningTime().equals("三天")) {
// addDay = 3 * 24 * 60 * 60 * 1000L;
// } else if (trackRule.getEarlyWarningTime().equals("一月")) {
// addDay = 30 * 24 * 60 * 60 * 1000L;
// }
long confirm = System.currentTimeMillis()+1L;//createAt + addDay;
if (confirm <= System.currentTimeMillis()) {
// 过期
trackRule.setWarn(true);
trackRuleDao.updateTrackrule(trackRule.get_id());
} else {
// 判断规则
// 首先判断预警方式
if (trackRule.getEarlyWarning().equals("no")) {
} else if (trackRule.getEarlyWarning().equals("wechat")) {
// 微信预警
// 处理规则
SearchHits TrackHit = null;
if (trackRule.getRuleType().equals("keyWords")) {
// 关键词追踪
TrackHit = trackHitAndWarnService.keyWordsTrackHitNew(trackRule, count, startid, endid,
platform);
} else if (trackRule.getRuleType().equals("channel")) {
// 渠道追踪
TrackHit = trackHitAndWarnService.channelTrackHitNew(trackRule, count, startid, endid,
platform);
} else if (trackRule.getRuleType().equals("article")) {
// 相似新闻数追踪
TrackHit = trackHitAndWarnService.articleTrackHitNew(trackRule, count, startid, endid,
platform);
}
trackHitAndWarnService.WarnWechatNew(TrackHit,platformName, trackRule, projectName);
} else if (trackRule.getEarlyWarning().equals("email")) {
// 邮箱预警
// 处理规则
SearchHits TrackHit = null;
if (trackRule.getRuleType().equals("keyWords")) {
// 关键词追踪
TrackHit = trackHitAndWarnService.keyWordsTrackHitNew(trackRule, count, startid, endid,
platform);
} else if (trackRule.getRuleType().equals("channel")) {
// 渠道追踪
TrackHit = trackHitAndWarnService.channelTrackHitNew(trackRule, count, startid, endid,
platform);
} else if (trackRule.getRuleType().equals("article")) {
// 相似新闻数追踪
TrackHit = trackHitAndWarnService.articleTrackHitNew(trackRule, count, startid, endid,
platform);
}
trackHitAndWarnService.WarnEmailNew(TrackHit,platformName, trackRule, projectName);
}
} }
} }
......
...@@ -420,7 +420,6 @@ public class HighLightFillingServiceImpl implements HighLightFillingService { ...@@ -420,7 +420,6 @@ public class HighLightFillingServiceImpl implements HighLightFillingService {
message = mapper.writeValueAsString(sourceHitMap); message = mapper.writeValueAsString(sourceHitMap);
res = JSONObject.parseObject(message); res = JSONObject.parseObject(message);
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
// TODO Auto-generated catch block
e.printStackTrace(); e.printStackTrace();
} }
String type = searchHit.getType(); String type = searchHit.getType();
......
...@@ -3,13 +3,9 @@ package com.zhiwei.messageflow.es.service.impl; ...@@ -3,13 +3,9 @@ package com.zhiwei.messageflow.es.service.impl;
import java.text.ParseException; import java.text.ParseException;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.TimeZone;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
...@@ -18,7 +14,6 @@ import org.springframework.beans.factory.annotation.Autowired; ...@@ -18,7 +14,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.zhiwei.messageflow.DirectES4RedisThread;
import com.zhiwei.messageflow.bean.MediaMessage; import com.zhiwei.messageflow.bean.MediaMessage;
import com.zhiwei.messageflow.bean.VideoMessage; import com.zhiwei.messageflow.bean.VideoMessage;
import com.zhiwei.messageflow.bean.WeiboMessage; import com.zhiwei.messageflow.bean.WeiboMessage;
...@@ -28,7 +23,6 @@ import com.zhiwei.messageflow.es.service.NoiseProcessingService; ...@@ -28,7 +23,6 @@ import com.zhiwei.messageflow.es.service.NoiseProcessingService;
import com.zhiwei.messageflow.mongo.bean.NoiseRule; import com.zhiwei.messageflow.mongo.bean.NoiseRule;
import com.zhiwei.messageflow.mongo.bean.PlatformNew; import com.zhiwei.messageflow.mongo.bean.PlatformNew;
import com.zhiwei.messageflow.mongo.dao.NoiseRuleDao; import com.zhiwei.messageflow.mongo.dao.NoiseRuleDao;
import com.zhiwei.messageflow.util.ESQueryUtil;
@Component @Component
public class NoiseProcessingServiceImpl implements NoiseProcessingService { public class NoiseProcessingServiceImpl implements NoiseProcessingService {
...@@ -651,7 +645,7 @@ public class NoiseProcessingServiceImpl implements NoiseProcessingService { ...@@ -651,7 +645,7 @@ public class NoiseProcessingServiceImpl implements NoiseProcessingService {
try { try {
date = dateFormat.parse(time); date = dateFormat.parse(time);
if ((date.getTime() + (8 * 3600 * 1000L)) < zero) { if ((date.getTime() + (8 * 3600 * 1000L)) < zero) {
log.info(date.getTime() + (8 * 3600 * 1000L)+"|"+zero); // log.info(date.getTime() + (8 * 3600 * 1000L)+"|"+zero);
return true; return true;
} }
} catch (ParseException e) { } catch (ParseException e) {
...@@ -681,7 +675,7 @@ public class NoiseProcessingServiceImpl implements NoiseProcessingService { ...@@ -681,7 +675,7 @@ public class NoiseProcessingServiceImpl implements NoiseProcessingService {
String time = map.get("time") != null ? map.get("time").toString() : null; String time = map.get("time") != null ? map.get("time").toString() : null;
if (isTimeOver(time)) { if (isTimeOver(time)) {
log.info("weibo:time:" + time +"\t"+ map.get("id").toString()); // log.info("weibo:time:" + time +"\t"+ map.get("id").toString());
return true; return true;
} }
String text = map.get("text") != null ? map.get("text").toString() : null; String text = map.get("text") != null ? map.get("text").toString() : null;
...@@ -786,7 +780,7 @@ public class NoiseProcessingServiceImpl implements NoiseProcessingService { ...@@ -786,7 +780,7 @@ public class NoiseProcessingServiceImpl implements NoiseProcessingService {
String platform = platformNew.getPlatformName(); String platform = platformNew.getPlatformName();
String time = map.get("created_at") != null ? map.get("created_at").toString() : null; String time = map.get("created_at") != null ? map.get("created_at").toString() : null;
if (isTimeOver(time)) { if (isTimeOver(time)) {
log.info("zhihu:time:" + time +"\t"+ map.get("id").toString()); // log.info("zhihu:time:" + time +"\t"+ map.get("id").toString());
return true; return true;
} }
String text = map.get("question_title") != null ? map.get("question_title").toString() : null; String text = map.get("question_title") != null ? map.get("question_title").toString() : null;
...@@ -917,7 +911,7 @@ public class NoiseProcessingServiceImpl implements NoiseProcessingService { ...@@ -917,7 +911,7 @@ public class NoiseProcessingServiceImpl implements NoiseProcessingService {
String platform = platformNew.getPlatformName(); String platform = platformNew.getPlatformName();
String time = map.get("time") != null ? map.get("time").toString() : null; String time = map.get("time") != null ? map.get("time").toString() : null;
if (isTimeOver(time)) { if (isTimeOver(time)) {
log.info("video:time:" + time +"\t"+ map.get("id").toString()); // log.info("video:time:" + time +"\t"+ map.get("id").toString());
return true; return true;
} }
String text = map.get("title") != null ? map.get("title").toString() : null; String text = map.get("title") != null ? map.get("title").toString() : null;
...@@ -1047,7 +1041,7 @@ public class NoiseProcessingServiceImpl implements NoiseProcessingService { ...@@ -1047,7 +1041,7 @@ public class NoiseProcessingServiceImpl implements NoiseProcessingService {
String platform = platformNew.getPlatformName(); String platform = platformNew.getPlatformName();
String time = map.get("time") != null ? map.get("time").toString() : null; String time = map.get("time") != null ? map.get("time").toString() : null;
if (isTimeOver(time)) { if (isTimeOver(time)) {
log.info("media:time:" + time +"\t"+ map.get("id").toString()); // log.info("media:time:" + time +"\t"+ map.get("id").toString());
return true; return true;
} }
String text = map.get("title") != null ? map.get("title").toString() : null; String text = map.get("title") != null ? map.get("title").toString() : null;
......
package com.zhiwei.messageflow.mongo.bean; package com.zhiwei.messageflow.mongo.bean;
import java.util.List;
import java.util.Map;
import org.springframework.data.annotation.Id; import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document; import org.springframework.data.mongodb.core.mapping.Document;
...@@ -10,25 +13,74 @@ import lombok.ToString; ...@@ -10,25 +13,74 @@ import lombok.ToString;
@ToString @ToString
@Document(collection = "qbjc_trackrule") @Document(collection = "qbjc_trackrule")
public class TrackRule { public class TrackRule {
/**
* id用户id
*/
@Id @Id
private long _id; private String _id;
/**
* ruleName 规则名
*/
private String ruleName; private String ruleName;
private String ruleExplain; /**
* ruleType 规则类型
*/
private String ruleType; private String ruleType;
private String KeyWordsInputOne; /**
private String andOr; * keyword 关键词:xxx xx,xx xxx
private String KeyWordsInputTwo; */
private String earlyWarning; private String keyword;
/**
* channel List<Map> 规则内容 渠道:平台 渠道名 主键
*/
private List<Map<String, Object>> channel;
/**
* warnRule 预警规则 (normal wechat)
*/
private String warnRule;
/**
* startTime 预警起始时间
*/
private Long startTime;
/**
* endTime long 预警结束时间
*/
private Long endTime;
/**
* highlighted String 高亮颜色
*/
private String highlighted; private String highlighted;
private String earlyWarningTime; /**
private boolean isWarn; * createAt long 创建时间
private long createAt; */
private Long createAt;
/**
* updateAt long 更新时间
*/
private Long updateAt;
/**
* submitter String 提交人
*/
private String submitter; private String submitter;
/**
* isUsed boolean 是否使用
*/
private Boolean isUsed;
/**
* project String 所属项目
*/
private String project; private String project;
private String channelPt; /**
private String channelQd; * warnLimit int 预警上限
private String articleTitle; */
private String articleNum; private Integer warnLimit;
private String timeRange; /**
* WarnNum int 本次预警条数
*/
private Integer warnNum;
/**
* hasTrackNum int 已追踪总条数
*/
private Integer hasTrack;
} }
...@@ -10,7 +10,7 @@ import lombok.ToString; ...@@ -10,7 +10,7 @@ import lombok.ToString;
@Data @Data
@ToString @ToString
@Document(collection = "wechatCode") @Document(collection = "wechatCodeNew")
public class WechatCode { public class WechatCode {
@Id @Id
private String _id; private String _id;
......
...@@ -2,6 +2,9 @@ package com.zhiwei.messageflow.mongo.dao; ...@@ -2,6 +2,9 @@ package com.zhiwei.messageflow.mongo.dao;
import java.util.List; import java.util.List;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import com.zhiwei.messageflow.mongo.bean.TrackRule; import com.zhiwei.messageflow.mongo.bean.TrackRule;
public interface TrackRuleDao { public interface TrackRuleDao {
...@@ -15,9 +18,22 @@ public interface TrackRuleDao { ...@@ -15,9 +18,22 @@ public interface TrackRuleDao {
List<TrackRule> getTrackRuleByProject(String projectName); List<TrackRule> getTrackRuleByProject(String projectName);
/** /**
* 更新项目预警情况 * 更新单条追踪规则按条件和更新条件
* * @Title: updateTrackrule
* @param trackRule_id * @Description: 更新单条追踪规则
* @param @param query
* @param @param update
* @param @return 设定文件
* @return boolean 返回类型
*/
boolean updateTrackrule(Query query, Update update);
/**
* 按id去追踪规则
* @Title: getTrackRuleById
* @Description: 按id去追踪规则
* @param @param get_id
* @param @return 设定文件
* @return TrackRule 返回类型
*/ */
void updateTrackrule(Long trackRule_id); TrackRule getTrackRuleById(String id);
} }
...@@ -22,18 +22,25 @@ public class TrackRuleDaoImpl implements TrackRuleDao { ...@@ -22,18 +22,25 @@ public class TrackRuleDaoImpl implements TrackRuleDao {
@Override @Override
public List<TrackRule> getTrackRuleByProject(String projectName) { public List<TrackRule> getTrackRuleByProject(String projectName) {
Query query = new Query(); Query query = new Query(Criteria.where("project").is(projectName));
query.addCriteria(Criteria.where("project").is(projectName)); return primaryMongoTemplate.find(query, TrackRule.class);
primaryMongoTemplate.getCollectionName(TrackRule.class);
List<TrackRule> trackRules = primaryMongoTemplate.find(query, TrackRule.class);
return trackRules;
} }
@Override @Override
public void updateTrackrule(Long trackRule_id) { public boolean updateTrackrule(Query query, Update update) {
// System.out.println("预警状态更新"); TrackRule t = primaryMongoTemplate.findOne(query, TrackRule.class);
primaryMongoTemplate.updateFirst(Query.query(Criteria.where("_id").is(trackRule_id)), if (t != null) {
new Update().set("isWarn", true), TrackRule.class); primaryMongoTemplate.updateFirst(query, update, TrackRule.class);
} else {
return false;
}
return true;
}
@Override
public TrackRule getTrackRuleById(String id) {
Query query = new Query(Criteria.where("_id").is(id));
return primaryMongoTemplate.findOne(query, TrackRule.class);
} }
} }
...@@ -398,4 +398,31 @@ public class RedisPoolAndTools { ...@@ -398,4 +398,31 @@ public class RedisPoolAndTools {
return res; return res;
} }
public Set<String> zrevrange(String redisKey, int start, int end) {
Jedis jedis = getJedis();
while (true) {
if (null != jedis) {
break;
} else {
jedis = getJedis();
}
}
Set<String> res = jedis.zrevrange(redisKey, start, end);
returnResource(jedis);
return res;
}
public void zremrangebyscore(String key, long start, long end) {
Jedis jedis = getJedis();
while (true) {
if (null != jedis) {
break;
} else {
jedis = getJedis();
}
}
jedis.zremrangeByScore(key, start, end);
returnResource(jedis);
}
} }
\ No newline at end of file
...@@ -189,5 +189,14 @@ public interface RedisService { ...@@ -189,5 +189,14 @@ public interface RedisService {
* @return boolean 返回类型 * @return boolean 返回类型
*/ */
boolean dropCollectionHash(String id); boolean dropCollectionHash(String id);
/**
* 按追踪id添加已追踪消息
* @Title: addWarnMsg2Set
* @Description: 按追踪id添加已追踪消息
* @param @param id
* @param @param msg 设定文件
* @return void 返回类型
*/
boolean addWarnMsg2Set(String id, JSONObject msg, int maxSize);
} }
package com.zhiwei.messageflow.redis.service.impl; package com.zhiwei.messageflow.redis.service.impl;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
...@@ -11,6 +12,7 @@ import com.alibaba.fastjson.JSONObject; ...@@ -11,6 +12,7 @@ import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.annotation.JsonInclude.Include; import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.zhiwei.messageflow.ESGetCommonId;
import com.zhiwei.messageflow.bean.MediaMessage; import com.zhiwei.messageflow.bean.MediaMessage;
import com.zhiwei.messageflow.bean.VideoMessage; import com.zhiwei.messageflow.bean.VideoMessage;
import com.zhiwei.messageflow.bean.WeiboMessage; import com.zhiwei.messageflow.bean.WeiboMessage;
...@@ -151,27 +153,45 @@ public class RedisServiceImpl implements RedisService { ...@@ -151,27 +153,45 @@ public class RedisServiceImpl implements RedisService {
/** /**
* 删除超出存储上限的数据 * 删除超出存储上限的数据
*/ */
removeOverLimit(redisKey, maxSize);
}
private void removeOverLimit(String redisKey, int maxSize) {
long nowCount = redisPoolAndTools.getNowCount(redisKey);// 当前数据量 long nowCount = redisPoolAndTools.getNowCount(redisKey);// 当前数据量
int removeIndex = (int) (nowCount - maxSize);// 需移除数据数量 int removeIndex = (int) (nowCount - maxSize);// 需移除数据数量
if (maxSize > 0 && nowCount > maxSize) { if (maxSize > 0 && nowCount > maxSize) {
// 判断zrevrange取maxSize条的commonid是否大于limitCommonid(小于或等于为以maxSize作为limit,大于以limitCommonid作为limit)
try {
Set<String> maxSizeSet = redisPoolAndTools.zrevrange(redisKey, maxSize - 1, maxSize - 1);
JSONObject object = JSONObject.parseObject((new ArrayList<>(maxSizeSet)).get(0));
long commonid = object.getLongValue("commonid");
if (commonid > ESGetCommonId.START_COMMONID) {
redisPoolAndTools.zremrangebyscore(redisKey, 0L,ESGetCommonId.START_COMMONID);
} else {
redisPoolAndTools.removeDataByName(redisKey, removeIndex); redisPoolAndTools.removeDataByName(redisKey, removeIndex);
} }
} catch (Exception e) {
e.printStackTrace();
redisPoolAndTools.removeDataByName(redisKey, removeIndex);
}
}
} }
@Override @Override
public String getDirectRedisKey(String projectName, String groupName, String pt) { public String getDirectRedisKey(String projectName, String groupName, String pt) {
return RedisConfig.DIRECTKEY+projectName+"-"+groupName+"-"+pt; return RedisConfig.DIRECTKEY + projectName + "-" + groupName + "-" + pt;
} }
@Override @Override
public String getDirectRsidMapKey(String projectName) { public String getDirectRsidMapKey(String projectName) {
return RedisConfig.DIRECTKEY+projectName; return RedisConfig.DIRECTKEY + projectName;
} }
@Override @Override
public boolean insertEvent(JSONObject ob) { public boolean insertEvent(JSONObject ob) {
String redisKey = RedisConfig.EVENTKEY+ob.getString("eventId").replace(":", ":"); String redisKey = RedisConfig.EVENTKEY + ob.getString("eventId").replace(":", ":");
try { try {
redisPoolAndTools.sortedSetZadd(redisKey, (double) ob.getLong("offset"), mapper.writeValueAsString(ob)); redisPoolAndTools.sortedSetZadd(redisKey, (double) ob.getLong("offset"), mapper.writeValueAsString(ob));
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
...@@ -191,18 +211,18 @@ public class RedisServiceImpl implements RedisService { ...@@ -191,18 +211,18 @@ public class RedisServiceImpl implements RedisService {
@Override @Override
public List<String> getNeedAutoMark() { public List<String> getNeedAutoMark() {
String redisKey = RedisConfig.EVENTLISTKEY; String redisKey = RedisConfig.EVENTLISTKEY;
return redisPoolAndTools.Lrange(redisKey,0,9); return redisPoolAndTools.Lrange(redisKey, 0, 9);
} }
@Override @Override
public int countCollectionData(String id) { public int countCollectionData(String id) {
String redisKey = RedisConfig.EVENTKEY+id.replace(":", ":"); String redisKey = RedisConfig.EVENTKEY + id.replace(":", ":");
return Integer.parseInt(String.valueOf(redisPoolAndTools.getNowCount(redisKey))); return Integer.parseInt(String.valueOf(redisPoolAndTools.getNowCount(redisKey)));
} }
@Override @Override
public Set<String> getCollectionData(String id, int start, int end) { public Set<String> getCollectionData(String id, int start, int end) {
String redisKey = RedisConfig.EVENTKEY+id.replace(":", ":"); String redisKey = RedisConfig.EVENTKEY + id.replace(":", ":");
return redisPoolAndTools.zrange(redisKey, start, end); return redisPoolAndTools.zrange(redisKey, start, end);
} }
...@@ -215,26 +235,41 @@ public class RedisServiceImpl implements RedisService { ...@@ -215,26 +235,41 @@ public class RedisServiceImpl implements RedisService {
@Override @Override
public boolean existsCollectionHashByUrlkey(String url, String id) { public boolean existsCollectionHashByUrlkey(String url, String id) {
String redisKey = RedisConfig.EVENTHASHKEY+id; String redisKey = RedisConfig.EVENTHASHKEY + id;
if(redisPoolAndTools.exists(redisKey)) { if (redisPoolAndTools.exists(redisKey)) {
//判断是否有hash urlkey // 判断是否有hash urlkey
if(redisPoolAndTools.hexists(redisKey,url)) { if (redisPoolAndTools.hexists(redisKey, url)) {
return true; return true;
}else { } else {
redisPoolAndTools.hset(redisKey,url); redisPoolAndTools.hset(redisKey, url);
return false; return false;
} }
}else { } else {
//新建hash // 新建hash
redisPoolAndTools.hset(redisKey,url); redisPoolAndTools.hset(redisKey, url);
return false; return false;
} }
} }
@Override @Override
public boolean dropCollectionHash(String id) { public boolean dropCollectionHash(String id) {
String redisKey = RedisConfig.EVENTHASHKEY+id; String redisKey = RedisConfig.EVENTHASHKEY + id;
return redisPoolAndTools.del(redisKey); return redisPoolAndTools.del(redisKey);
} }
@Override
public boolean addWarnMsg2Set(String id, JSONObject jo, int maxSize) {
String redisKey = RedisConfig.TRACKKEY + id;
redisPoolAndTools.sortedSetZadd(redisKey, (double) jo.getLongValue("commonid"), jo.toJSONString());
/**
* 删除超出存储上限的数据
*/
long nowCount = redisPoolAndTools.getNowCount(redisKey);// 当前数据量
int removeIndex = (int) (nowCount - maxSize);// 需移除数据数量
if (maxSize > 0 && nowCount > maxSize) {
redisPoolAndTools.removeDataByName(redisKey, removeIndex);
}
return true;
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment