Commit 46e516cf by shentao

主分支 提交 平台整合后版本

parent 0240116a
......@@ -115,6 +115,13 @@
<version>1.4.7</version>
</dependency>
<!-- fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.34</version>
</dependency>
</dependencies>
<build>
......
......@@ -33,7 +33,8 @@ public class ES4RedisRunner implements ApplicationRunner {
// */
// getSet gs = ApplicationContextProvider.getBean("getSet", getSet.class);
// gs.getDataByeRedis("", "", 0, 0, 0);
ESGetCommonId esGetCommonId = ApplicationContextProvider.getBean("ESGetCommonId", ESGetCommonId.class);
esGetCommonId.getCommonId();
/**
* redis存入缓存
......
......@@ -11,6 +11,7 @@ import org.springframework.stereotype.Component;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.zhiwei.messageflow.mongo.bean.PlatformNew;
import com.zhiwei.messageflow.mongo.bean.Project;
import com.zhiwei.messageflow.mongo.dao.PlatformDao;
import com.zhiwei.messageflow.mongo.dao.ProjectDao;
......@@ -38,7 +39,7 @@ public class ES4RedisStart {
List<Project> projects = projectDao.getAllProjects();
// 公共平台名列表
List<String> platformNames = platformDao.getPublicPlatformName();
// List<String> platformNames = platformDao.getPublicPlatformName();
// 遍历项目
for (Project project : projects) {
......@@ -46,14 +47,21 @@ public class ES4RedisStart {
/**
* 项目全部平台(公共+私有)
*/
List<String> allplatformNames = new ArrayList<>();
allplatformNames.addAll(platformNames);
allplatformNames.addAll(project.getDataPt());
List<PlatformNew> allplatformNames = platformDao.findPlatformByProject(project);
// System.out.println(allplatformNames);
// List<String> allplatformNames = new ArrayList<>();
// allplatformNames.addAll(platformNames);
// allplatformNames.addAll(project.getDataPt());
// 获取线程
ES4RedisThread es4RedisThread = ES4RedisThread.getThread(project.getProjectName(), project,
ES4RedisThreadNew es4RedisThread = ES4RedisThreadNew.getThread(project.getProjectName(), project,
allplatformNames, es4RedisTask);
// ES4RedisThread es4RedisThread = ES4RedisThread.getThread(project.getProjectName(), project,
// allplatformNames, es4RedisTask);
// 获取线程失败
if (es4RedisThread == null) {
log.warn("{}项目获取线程失败", project.getProjectName());
......
......@@ -11,6 +11,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
......@@ -21,6 +22,7 @@ import com.zhiwei.messageflow.bean.ZhihuMessage;
import com.zhiwei.messageflow.es.service.EarlyWarningService;
import com.zhiwei.messageflow.mongo.bean.KeywordNew;
import com.zhiwei.messageflow.mongo.bean.NoiseRule;
import com.zhiwei.messageflow.mongo.bean.PlatformNew;
import com.zhiwei.messageflow.mongo.bean.Project;
import com.zhiwei.messageflow.mongo.bean.TrackRule;
import com.zhiwei.messageflow.mongo.dao.KeywordNewDao;
......@@ -59,6 +61,182 @@ public class ES4RedisTask {
private static final int keywordscount = 2000;
@SuppressWarnings("unchecked")
public boolean ES4RedisNew(Project project, List<PlatformNew> platformNames, int count)
throws JsonParseException, JsonMappingException, IOException {
try {
/**
* 获取项目对应的rsid Map
*/
String rsidjson = redisService.getRsid(project.getProjectName());
Map<String, Integer> rsidMap = new HashMap<>();
if (rsidjson != null) {
rsidMap = mapper.readValue(rsidjson, Map.class);
} else {
log.info("{}项目RSID列表过期", project.getProjectName());
}
// 更新rsid用的rsid Map
Map<String, Integer> newRsidMap = new HashMap<>();
newRsidMap.putAll(rsidMap);
// 统计项目消息总数
int num = 0;
// 项目关键词组列表
List<KeywordNew> keywordNews = keywordNewDao.getKeywordNewByProject(project.getProjectName());
// 项目噪音规则列表
List<NoiseRule> noiseRules = noiseRuleDao.getNoiseRuleByProject(project.getProjectName());
// 项目预警规则列表
List<TrackRule> trackRules = trackRuleDao.getTrackRuleByProject(project.getProjectName());
/*
* //测试预警规则 List<TrackRule> trackRules=new ArrayList<>(); TrackRule tr1=new
* TrackRule(); TrackRule tr2=new TrackRule(); TrackRule tr3=new TrackRule();
* Date date = new Date(); Long createAt=date.getTime();
* tr1.setCreateAt(createAt); tr1.setAndOr(""); tr1.setWarn(false);
* tr1.setRuleType("keyWords"); tr1.setSubmitter("虞诚毅");
* tr1.setKeyWordsInputOne("知乎"); tr1.setRuleName("测试规则1");
* tr1.setRuleExplain("测试说明1"); tr1.setEarlyWarningTime("三天");
* tr1.setProject("腾讯"); tr1.setEarlyWarning("email");
* tr2.setCreateAt(createAt); tr2.setAndOr("或"); tr2.setWarn(false);
* tr2.setRuleType("keyWords"); tr2.setSubmitter("虞诚毅");
* tr2.setKeyWordsInputOne("知乎"); tr2.setKeyWordsInputTwo("微博");
* tr2.setRuleName("测试规则2"); tr2.setRuleExplain("测试说明3");
* tr2.setEarlyWarningTime("三天"); tr2.setProject("腾讯");
* tr2.setEarlyWarning("email"); tr3.setCreateAt(createAt); tr3.setAndOr("且");
* tr3.setWarn(false); tr3.setRuleType("keyWords"); tr3.setSubmitter("虞诚毅");
* tr3.setKeyWordsInputOne("知乎"); tr3.setKeyWordsInputTwo("问题");
* tr3.setRuleName("测试规则2"); tr3.setRuleExplain("测试说明3");
* tr3.setEarlyWarningTime("三天"); tr3.setProject("腾讯");
* tr3.setEarlyWarning("wechat"); trackRules.add(tr1); trackRules.add(tr2);
* trackRules.add(tr3);
*/
// 遍历平台
for (PlatformNew platform : platformNames) {
String platformName = platform.getPlatformName();
/**
* 平台全关键词查询
*/
String allkeytitle = "全部";
// 全关键词redis库中的key
String allRedisKey = platformName + "-" + project.getProjectName() + "-" + allkeytitle;
// 获取 平台-项目-关键词组 对应的rsid作为查询范围的起始点
long allstartrsid = rsidMap.get(allRedisKey) == null ? -1L : Long.valueOf(rsidMap.get(allRedisKey));
// 用于存储数据获取后新的rsid
Long allrsid = -1L;
/**
* 将项目下的所有关键词组合成全关键词组
*/
List<String> allkeywords = new ArrayList<>();
for (KeywordNew kwn : keywordNews) {
if (null == kwn.getKeyWords() || kwn.getKeyWords().isEmpty()) {
continue;
}
allkeywords.addAll(kwn.getKeyWords());
}
// 根据不同平台获取数据(同一方法,统一封装为消息流实体
RsidAndMessages ram = disposeMessageService.getFilteredMessage(noiseRules, allkeywords, count,
allstartrsid, -1L, platform, project.getProjectName(),project.getMatchFields());
// 获取查询到的信息
List<JSONObject> messages = ram.getJlist();
// 数据量为0
if (messages == null) {
// log.info("{}平台{}关键字词组无消息", platformName, allkeytitle);
continue;
}
// log.info("{}平台{}关键词数据获取{}条", platformName, "全部", messages.size());
// 记录新的rsid
allrsid = ram.getRsid();
num += messages.size();
// 向redis写入数据
redisService.setMessage2Redis(allRedisKey, messages, allkeywordcount);
newRsidMap.put(allRedisKey, Integer.valueOf(allrsid.toString()));
// 遍历关键词组
for (KeywordNew kwn : keywordNews) {
// 滤过空关键词组
if (null == kwn.getKeyWords() || kwn.getKeyWords().isEmpty()) {
continue;
}
// 关键词组在redis库中的key
String redisKey = platformName + "-" + project.getProjectName() + "-" + kwn.getKeyTitle();
// 获取 平台-项目-关键词组 对应的rsid作为查询范围的起始点
Long startrsid = rsidMap.get(redisKey) == null ? -1L : Long.valueOf(rsidMap.get(redisKey));
// 用于存储数据获取后新的rsid
Long keyrsid = -1L;
// System.out.println(kwn.toString());
// System.out.println(l++ + ":" + platformName);
// System.out.println("=============这是分界线=============");
// 获取新的rsid和信息实体
RsidAndMessages ramkey = disposeMessageService.getFilteredMessage(noiseRules,
kwn.getKeyWords(), count, startrsid, -1L, platform, project.getProjectName(),project.getMatchFields());
// 获取查询到的信息
List<JSONObject> messageskey = ramkey.getJlist();
// 查询到数据量为0
if (messageskey == null) {
newRsidMap.put(redisKey, Integer.valueOf(startrsid.toString()));
// log.info("{}平台{}关键字词组无消息", platformName, kwn.getKeyTitle());
continue;
}
// log.info("{}平台{}关键词数据获取{}条", platformName, kwn.getKeyTitle(),
// messages.size());
num += messageskey.size();
// 记录新的rsid
keyrsid = ramkey.getRsid();
// 向redis写入数据
redisService.setMessage2Redis(redisKey, messageskey, keywordscount);
} // 遍历关键词组
// 预警
earlyWarningService.earlyWarningNew(trackRules, count, allstartrsid, allrsid, platform,
project.getProjectName());
// 向redis库中存储新的rsid Map,覆盖原有数据
redisService.setRsid(newRsidMap, project.getProjectName());
} // 遍历平台
log.info("{}项目本次获取消息数:{}", project.getProjectName(), num);
} catch (Exception e) {
log.error("项目本次获取获取出错或超时{}{}", e.getMessage(), e.getStackTrace());
return false;
}
return true;
}
@SuppressWarnings("unchecked")
public boolean ES4Redis(Project project, List<String> platformNames, int count)
throws JsonParseException, JsonMappingException, IOException {
......@@ -383,4 +561,5 @@ public class ES4RedisTask {
}
return true;
}
}
\ No newline at end of file
package com.zhiwei.messageflow;
import java.io.IOException;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.zhiwei.messageflow.mongo.bean.PlatformNew;
import com.zhiwei.messageflow.mongo.bean.Project;
public class ES4RedisThreadNew extends Thread {
private final static Logger log = LoggerFactory.getLogger(ES4RedisThreadNew.class);
// 线程
private Thread t;
// 线程名
private String threadName;
// 项目
private Project project;
// 平台列表
private List<PlatformNew> platformNames;
private ES4RedisTask es4RedisTask;
// 单个平台单个关键词组每次查询数量
private static final int count = 300;
// private static final int max_Thread_num = 40;
// private static int Thread_num = 0;
// private static final int max_Running_num = 3;
// private static Integer Running_num = 0;
// private static List<String> ThreadList = new ArrayList<>();
public ES4RedisThreadNew(String name, Project project, List<PlatformNew> allplatformNames, ES4RedisTask es4RedisTask) {
threadName = name;
this.project = project;
this.platformNames = allplatformNames;
this.es4RedisTask = es4RedisTask;
}
public ES4RedisThreadNew() {
}
public static ES4RedisThreadNew getThread(String name, Project project, List<PlatformNew> allplatformNames,
ES4RedisTask es4RedisTask) {
ES4RedisThreadNew es4RedisThread = new ES4RedisThreadNew(name, project, allplatformNames, es4RedisTask);
return es4RedisThread;
}
public void start() {
// 线程开始
log.info("Starting {}", threadName);
if (t == null) {
t = new Thread(this, threadName);
// 通知执行run方法
t.start();
}
// 超时控制器
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
// 超时则线程中止
if (t.isAlive()) {
t.interrupt();
log.warn("{}项目超时线程状态:{}", project.getProjectName(),t.isInterrupted());
}
}
}, 49 * 1000L);
}
@SuppressWarnings("static-access")
@Override
public void run() {
try {
Thread.sleep(10L);
// 超时控制器
// Timer timer = new Timer();
// timer.schedule(new TimerTask() {
// @Override
// public void run() {
// // 超时则线程中止
// Thread.currentThread().interrupted();
// log.warn("{}项目超时", project.getProjectName());
// }
// }, 49 * 1000L);
// 程序运行
log.info("Running {}", threadName);
// 该项目执行消息流获取
boolean flag = es4RedisTask.ES4RedisNew(project, platformNames, count);
if (!flag) {
// 程序执行出现异常则线程中止
// timer.cancel();
Thread.currentThread().interrupted();
log.error("{}项目出现异常,线程状态:{}", project.getProjectName(),Thread.currentThread().isInterrupted());
}
// else
// // 程序正常执行完毕,关闭超时控制器
// timer.cancel();
} catch (JsonParseException e) {
e.printStackTrace();
} catch (JsonMappingException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 线程退出
log.info("Thread {} exiting.", threadName);
}
}
}
\ No newline at end of file
package com.zhiwei.messageflow;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.zhiwei.messageflow.es.dao.ESDao;
@Component
public class ESGetCommonId {
@Autowired
private ESDao esDao;
public static int START_COMMONID;
public static long TIME;
public void getCommonId() {
long nowtime = System.currentTimeMillis();
if (TIME < nowtime - 60 * 1000L) {
List<Integer> cidList = new ArrayList<>();
int weibo = esDao.get24hoursFirstCommonid("weibo");
cidList.add(weibo);
int media = esDao.get24hoursFirstCommonid("media");
cidList.add(media);
int video = esDao.get24hoursFirstCommonid("video");
cidList.add(video);
int zhihu = esDao.get24hoursFirstCommonid("zhihu");
cidList.add(zhihu);
Collections.sort(cidList);
System.err.println(weibo + " " + media + " " + video + " " + zhihu + "\n" + cidList);
START_COMMONID = cidList.get(0);
TIME = nowtime;
}
}
public int getCommonId(long limit) {
long nowtime = System.currentTimeMillis();
long endtime = nowtime - limit;
List<Integer> cidList = new ArrayList<>();
int weibo = esDao.getCommonidByTime("weibo", endtime);
cidList.add(weibo);
int media = esDao.getCommonidByTime("media", endtime);
cidList.add(media);
int video = esDao.getCommonidByTime("video", endtime);
cidList.add(video);
int zhihu = esDao.getCommonidByTime("zhihu", endtime);
cidList.add(zhihu);
Collections.sort(cidList);
// System.err.println(weibo + " " + media + " " + video + " " + zhihu + "\n" + cidList);
int commonid = cidList.get(0);
return commonid;
}
}
......@@ -4,6 +4,8 @@ import java.util.List;
import org.elasticsearch.search.SearchHits;
import com.zhiwei.messageflow.mongo.bean.PlatformNew;
//第一层 从ES获取原始数据
public interface ESDao {
......@@ -95,4 +97,11 @@ public interface ESDao {
public SearchHits getMediaDataycy(long date, List<String> keywords, int count, String platform, String project);
public SearchHits getDataFromEs(List<String> keywords, int count, long startid, long endid, PlatformNew platform,
String project, String matchFields);
public int get24hoursFirstCommonid(String tableName);
public int getCommonidByTime(String tableName, long endtime);
}
......@@ -2,6 +2,8 @@ package com.zhiwei.messageflow.es.dao;
import org.elasticsearch.search.SearchHits;
import com.zhiwei.messageflow.mongo.bean.PlatformNew;
public interface TrackESDao {
/**
......@@ -210,4 +212,13 @@ public interface TrackESDao {
*/
public SearchHits getarticleTrackMediaFromEs(String articleTitle, String color, int count, long start, long end,
String platformName);
public SearchHits getkeyWordsTrackFromEs(String anyWord, String allWords, String color, int count, long startid,
Long endid, PlatformNew platform);
public SearchHits getchannelTrackFromEs(String channel, String color, int count, long startid, Long endid,
PlatformNew platform);
public SearchHits getarticleTrackFromEs(String articleTitle, String color, int count, long startid, Long endid,
PlatformNew platform);
}
......@@ -2,11 +2,13 @@ package com.zhiwei.messageflow.es.service;
import java.util.List;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.messageflow.bean.MediaMessage;
import com.zhiwei.messageflow.bean.VideoMessage;
import com.zhiwei.messageflow.bean.WeiboMessage;
import com.zhiwei.messageflow.bean.ZhihuMessage;
import com.zhiwei.messageflow.mongo.bean.NoiseRule;
import com.zhiwei.messageflow.mongo.bean.PlatformNew;
//第二层 将SearchHits封装成List<Message>
public interface ES4BeanService {
......@@ -90,4 +92,24 @@ public interface ES4BeanService {
*/
List<MediaMessage> getMediaMessage(List<NoiseRule> noiseRules,List<String> keywords, int count, long startid, long endid, String platform,
String project);
/**
* 获取各平台消息
*
* @param keys
* 关键词
* @param count
* 消息数量
* @param startid
* 开始id
* @param endid
* 结束id
* @param platform
* 平台
* @param project
* 项目
* @param matchFields
* @return
*/
List<JSONObject> getMessage(List<NoiseRule> noiseRules, List<String> keywords, int count, long startid, long endid,
PlatformNew platform, String project, String matchFields);
}
......@@ -2,6 +2,7 @@ package com.zhiwei.messageflow.es.service;
import java.util.List;
import com.zhiwei.messageflow.mongo.bean.PlatformNew;
import com.zhiwei.messageflow.mongo.bean.TrackRule;
public interface EarlyWarningService {
......@@ -24,4 +25,7 @@ public interface EarlyWarningService {
*/
void earlyWarning(List<TrackRule> trackRules, int count, long startid, long endid, String platformName,
String projectName);
void earlyWarningNew(List<TrackRule> trackRules, int count, long startid, Long endid, PlatformNew platform,
String projectName);
}
......@@ -2,6 +2,7 @@ package com.zhiwei.messageflow.es.service;
import org.elasticsearch.search.SearchHit;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.messageflow.bean.MediaMessage;
import com.zhiwei.messageflow.bean.VideoMessage;
import com.zhiwei.messageflow.bean.WeiboMessage;
......@@ -40,4 +41,12 @@ public interface HighLightFillingService {
* @return
*/
MediaMessage getMediaBean(SearchHit searchHit);
/**
* 微博平台ES数据封装+高亮处理+渠道影响力
*
* @param searchHit
* @return
*/
JSONObject getBean(SearchHit searchHit);
}
......@@ -4,11 +4,13 @@ import java.util.List;
import org.elasticsearch.search.SearchHits;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.messageflow.bean.MediaMessage;
import com.zhiwei.messageflow.bean.VideoMessage;
import com.zhiwei.messageflow.bean.WeiboMessage;
import com.zhiwei.messageflow.bean.ZhihuMessage;
import com.zhiwei.messageflow.mongo.bean.NoiseRule;
import com.zhiwei.messageflow.mongo.bean.PlatformNew;
public interface NoiseProcessingService {
......@@ -19,4 +21,7 @@ public interface NoiseProcessingService {
List<VideoMessage> videoDenoising(List<NoiseRule> noiseRules,SearchHits searchHits, String platform, String project);
List<MediaMessage> mediaDenoising(List<NoiseRule> noiseRules,SearchHits searchHits, String platform, String project);
List<JSONObject> allDenoising(List<NoiseRule> noiseRules, SearchHits searchHits, PlatformNew platform,
String project);
}
......@@ -2,6 +2,7 @@ package com.zhiwei.messageflow.es.service;
import org.elasticsearch.search.SearchHits;
import com.zhiwei.messageflow.mongo.bean.PlatformNew;
import com.zhiwei.messageflow.mongo.bean.TrackRule;
public interface TrackHitAndWarnService {
......@@ -58,4 +59,14 @@ public interface TrackHitAndWarnService {
* @param trackRule
*/
void WarnWechat(SearchHits trackHit, String platformName, TrackRule trackRule, String projectName);
SearchHits keyWordsTrackHitNew(TrackRule trackRule, int count, long startid, Long endid, PlatformNew platform);
SearchHits channelTrackHitNew(TrackRule trackRule, int count, long startid, Long endid, PlatformNew platform);
SearchHits articleTrackHitNew(TrackRule trackRule, int count, long startid, Long endid, PlatformNew platform);
void WarnWechatNew(SearchHits trackHit, String platformName, TrackRule trackRule, String projectName);
void WarnEmailNew(SearchHits trackHit, String platformName, TrackRule trackRule, String projectName);
}
......@@ -8,6 +8,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.messageflow.bean.MediaMessage;
import com.zhiwei.messageflow.bean.VideoMessage;
import com.zhiwei.messageflow.bean.WeiboMessage;
......@@ -16,6 +17,7 @@ import com.zhiwei.messageflow.es.dao.ESDao;
import com.zhiwei.messageflow.es.service.ES4BeanService;
import com.zhiwei.messageflow.es.service.NoiseProcessingService;
import com.zhiwei.messageflow.mongo.bean.NoiseRule;
import com.zhiwei.messageflow.mongo.bean.PlatformNew;
@Component
public class ES4BeanServiceImpl implements ES4BeanService {
......@@ -110,4 +112,28 @@ public class ES4BeanServiceImpl implements ES4BeanService {
return messages;
}
@Override
public List<JSONObject> getMessage(List<NoiseRule> noiseRules, List<String> keywords, int count, long startid,
long endid, PlatformNew platform, String project,String matchFields) {
List<JSONObject> messages = null;
try {
// 查询数据库 获得searchHits
SearchHits searchHits = esDao.getDataFromEs(keywords, count, startid, endid, platform, project,matchFields);
if (searchHits == null) {
return null;
}
// 去噪并封装
messages = noiseProcessingService.allDenoising(noiseRules, searchHits, platform, project);
} catch (Exception e) {
log.error(e.getStackTrace() + " " + e.getMessage());
}
return messages;
}
}
......@@ -8,6 +8,7 @@ import org.springframework.stereotype.Component;
import com.zhiwei.messageflow.es.service.EarlyWarningService;
import com.zhiwei.messageflow.es.service.TrackHitAndWarnService;
import com.zhiwei.messageflow.mongo.bean.PlatformNew;
import com.zhiwei.messageflow.mongo.bean.TrackRule;
import com.zhiwei.messageflow.mongo.dao.TrackRuleDao;
......@@ -95,4 +96,85 @@ public class EarlyWarningServiceImpl implements EarlyWarningService {
}
}
@Override
public void earlyWarningNew(List<TrackRule> trackRules, int count, long startid, Long endid, PlatformNew platform,
String projectName) {
String platformName = platform.getPlatformName();
for (TrackRule trackRule : trackRules) {
// 判断是否预警
if (trackRule.isWarn()) {
continue;
}
/**
* 判断是否过期
*/
long createAt = trackRule.getCreateAt();
long addDay = 0;
if (trackRule.getEarlyWarningTime().equals("一天")) {
addDay = 24 * 60 * 60 * 1000L;
} else if (trackRule.getEarlyWarningTime().equals("二天")) {
addDay = 2 * 24 * 60 * 60 * 1000L;
} else if (trackRule.getEarlyWarningTime().equals("三天")) {
addDay = 3 * 24 * 60 * 60 * 1000L;
} else if (trackRule.getEarlyWarningTime().equals("一月")) {
addDay = 30 * 24 * 60 * 60 * 1000L;
}
long confirm = createAt + addDay;
if (confirm <= System.currentTimeMillis()) {
// 过期
trackRule.setWarn(true);
trackRuleDao.updateTrackrule(trackRule.get_id());
} else {
// 判断规则
// 首先判断预警方式
if (trackRule.getEarlyWarning().equals("no")) {
} else if (trackRule.getEarlyWarning().equals("wechat")) {
// 微信预警
// 处理规则
SearchHits TrackHit = null;
if (trackRule.getRuleType().equals("keyWords")) {
// 关键词追踪
TrackHit = trackHitAndWarnService.keyWordsTrackHitNew(trackRule, count, startid, endid,
platform);
} else if (trackRule.getRuleType().equals("channel")) {
// 渠道追踪
TrackHit = trackHitAndWarnService.channelTrackHitNew(trackRule, count, startid, endid,
platform);
} else if (trackRule.getRuleType().equals("article")) {
// 相似新闻数追踪
TrackHit = trackHitAndWarnService.articleTrackHitNew(trackRule, count, startid, endid,
platform);
}
trackHitAndWarnService.WarnWechatNew(TrackHit,platformName, trackRule, projectName);
} else if (trackRule.getEarlyWarning().equals("email")) {
// 邮箱预警
// 处理规则
SearchHits TrackHit = null;
if (trackRule.getRuleType().equals("keyWords")) {
// 关键词追踪
TrackHit = trackHitAndWarnService.keyWordsTrackHitNew(trackRule, count, startid, endid,
platform);
} else if (trackRule.getRuleType().equals("channel")) {
// 渠道追踪
TrackHit = trackHitAndWarnService.channelTrackHitNew(trackRule, count, startid, endid,
platform);
} else if (trackRule.getRuleType().equals("article")) {
// 相似新闻数追踪
TrackHit = trackHitAndWarnService.articleTrackHitNew(trackRule, count, startid, endid,
platform);
}
trackHitAndWarnService.WarnEmailNew(TrackHit,platformName, trackRule, projectName);
}
}
}
}
}
package com.zhiwei.messageflow.mongo.bean;
import java.util.List;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
import lombok.Data;
import lombok.ToString;
/**
* 平台新表
* @ClassName: PlatformNew
* @Description: TODO(平台bean)
* @author shentao
* @date 2017年12月15日 上午10:08:35
*/
@Data
@ToString
@Document(collection = "qbjc_platformNew")
public class PlatformNew {
/**
* id 主键 project_platformName_createAt
*/
@Id
private String id;//id
/**
* platformName 平台名
*/
private String platformName;
/**
* pt 所需pt字段
*/
private List<String> pt;
/**
* noPt 无需pt字段
*/
private List<String> noPt;
/**
* type 所需type字段
*/
private List<String> type;
/**
* noType 无需type字段
*/
private List<String> noType;
/**
* source 所需source字段
*/
private List<String> source;
/**
* noSource 无需source字段
*/
private List<String> noSource;
/**
* specialRule 特殊规则(内写方法名
*/
private List<String> specialRule;
/**
* platformType 已包含ptype
*/
private List<String> platformType;
/**
* noPlatformType 无法包含的ptype
*/
private List<String> noPlatformType;
/**
* uper 所需uper字段
*/
private List<String> uper;
/**
* noUper 无需uper字段
*/
private List<String> noUper;
/**
* createAt 创建时间
*/
private Long createAt;
/**
* project 所属项目
*/
private String project;
}
package com.zhiwei.messageflow.mongo.bean;
import java.util.List;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
import lombok.Data;
import lombok.ToString;
/**
* 平台type表
* @ClassName: PlatformType
* @Description: TODO(这里用一句话描述这个类的作用)
* @author shentao
* @date 2018年5月17日 下午5:52:41
*/
@Data
@ToString
@Document(collection = "qbjc_platformType")
public class PlatformType {
/**
* id 主键 platformName_typeName_createAt
*/
@Id
private String id;//id
/**
* typeName 平台名
*/
private String typeName;
/**
* pt 所需pt字段
*/
private List<String> pt;
/**
* noPt 无需pt字段
*/
private List<String> noPt;
/**
* type 所需type字段
*/
private List<String> type;
/**
* noType 无需type字段
*/
private List<String> noType;
/**
* source 所需source字段
*/
private List<String> source;
/**
* noSource 无需source字段
*/
private List<String> noSource;
/**
* uper 所需uper字段
*/
private List<String> uper;
/**
* noUper 无需uper字段
*/
private List<String> noUper;
/**
* specialRule 特殊规则(内写方法名
*/
private List<String> specialRule;
/**
* createAt 创建时间
*/
private Long createAt;
/**
* 所属平台Name
*/
private String platformName;
}
......@@ -38,5 +38,28 @@ public class Project {
private boolean weiboHotSearch;
private int gatherKeyWord;
private boolean download;
/**
* "platformList" 平台列表
*/
private List<String> platformList;
/**
* "unplatformList" 不可选取平台列表
*/
private List<String> unplatformList;
/**
* matchFields 匹配方式 (标题、全文、标题+全文
*/
private String matchFields;
/**
* isAutoMark 自动化标注
*/
private Boolean isAutoMark;
/**
* diyMonitor 定向监测
*/
private Boolean diyMonitor;
/**
* eventList 事件列表
*/
private Boolean eventList;
}
......@@ -2,6 +2,9 @@ package com.zhiwei.messageflow.mongo.dao;
import java.util.List;
import com.zhiwei.messageflow.mongo.bean.PlatformNew;
import com.zhiwei.messageflow.mongo.bean.Project;
public interface PlatformDao {
/**
* 获取公共平台名列表
......@@ -16,4 +19,13 @@ public interface PlatformDao {
* @return
*/
List<String> getAllPlatformName();
/**
* 按项目获取平台
* @Title: findPlatformByProject
* @Description: TODO(这里用一句话描述这个方法的作用)
* @param @param project
* @param @return 设定文件
* @return List<PlatformNew> 返回类型
*/
List<PlatformNew> findPlatformByProject(Project project);
}
......@@ -6,9 +6,13 @@ import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.stereotype.Component;
import com.zhiwei.messageflow.mongo.bean.Platform;
import com.zhiwei.messageflow.mongo.bean.PlatformNew;
import com.zhiwei.messageflow.mongo.bean.Project;
import com.zhiwei.messageflow.mongo.dao.PlatformDao;
@Component
......@@ -51,4 +55,13 @@ public class PlatformDaoImpl implements PlatformDao {
return platformNames;
}
@Override
public List<PlatformNew> findPlatformByProject(Project project) {
List<String> pidList = project.getPlatformList();
Query query = new Query();
query.addCriteria(Criteria.where("_id").in(pidList));
List<PlatformNew> res = primaryMongoTemplate.find(query, PlatformNew.class);
return res;
}
}
......@@ -2,6 +2,7 @@ package com.zhiwei.messageflow.redis.bean;
import java.util.List;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.messageflow.bean.MediaMessage;
import com.zhiwei.messageflow.bean.VideoMessage;
import com.zhiwei.messageflow.bean.WeiboMessage;
......@@ -30,6 +31,10 @@ public class RsidAndMessages {
*/
private List<MediaMessage> mlist;
/**
* 网媒消息列表
*/
private List<JSONObject> jlist;
/**
* 本次读取最后信息的rsid
*/
private long rsid;
......
......@@ -3,6 +3,7 @@ package com.zhiwei.messageflow.redis.service;
import java.util.List;
import java.util.Map;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.messageflow.bean.MediaMessage;
import com.zhiwei.messageflow.bean.VideoMessage;
import com.zhiwei.messageflow.bean.WeiboMessage;
......@@ -76,4 +77,19 @@ public interface RedisService {
* redis数据存储上限
*/
void setMediaMessageMessage(String redisKey, List<MediaMessage> messages, int maxSize);
/**
* 向redis写入数据
*
* @Title: setMessage2Redis
* @Description: TODO(这里用一句话描述这个方法的作用)
* @param @param
* allRedisKey
* @param @param
* messages
* @param @param
* allkeywordcount 设定文件
* @return void 返回类型
*/
void setMessage2Redis(String allRedisKey, List<JSONObject> messages, int allkeywordcount);
}
......@@ -6,6 +6,7 @@ import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
......@@ -136,4 +137,22 @@ public class RedisServiceImpl implements RedisService {
}
@Override
public void setMessage2Redis(String redisKey, List<JSONObject> messages, int maxSize) {
for (JSONObject jo : messages) {
// 写入数据
redisPoolAndTools.sortedSetZadd(redisKey, (double) jo.getLongValue("commonid"), jo.toJSONString());
}
/**
* 删除超出存储上限的数据
*/
long nowCount = redisPoolAndTools.getNowCount(redisKey.getBytes());// 当前数据量
int removeIndex = (int) (nowCount - maxSize);// 需移除数据数量
if (maxSize > 0 && nowCount > maxSize) {
redisPoolAndTools.removeDataByName(redisKey.getBytes(), removeIndex);
}
}
}
......@@ -3,6 +3,7 @@ package com.zhiwei.messageflow.service;
import java.util.List;
import com.zhiwei.messageflow.mongo.bean.NoiseRule;
import com.zhiwei.messageflow.mongo.bean.PlatformNew;
import com.zhiwei.messageflow.redis.bean.RsidAndMessages;
public interface DisposeMessageService {
......@@ -94,4 +95,27 @@ public interface DisposeMessageService {
*/
RsidAndMessages getFilteredMediaMessage(List<NoiseRule> noiseRules, List<String> keywords, int count, Long startid,
Long endid, String platform, String project);
/**
* 获取各平台RsidAndMessages
*
* @param noiseRules
* 去噪规则
* @param keywords
* 关键词组
* @param count
* 每次查询数量
* @param startid
* 开始rsid
* @param endid
* 结束rsid
* @param platform
* 平台
* @param matchFields
* @param project
* 项目
* @return
*/
RsidAndMessages getFilteredMessage(List<NoiseRule> noiseRules, List<String> keywords, int count,
Long startid, Long endid, PlatformNew platform, String projectName, String matchFields);
}
......@@ -8,12 +8,15 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.messageflow.bean.MediaMessage;
import com.zhiwei.messageflow.bean.VideoMessage;
import com.zhiwei.messageflow.bean.WeiboMessage;
import com.zhiwei.messageflow.bean.ZhihuMessage;
import com.zhiwei.messageflow.es.service.ES4BeanService;
import com.zhiwei.messageflow.mongo.bean.NoiseRule;
import com.zhiwei.messageflow.mongo.bean.PlatformNew;
import com.zhiwei.messageflow.redis.bean.RsidAndMessages;
import com.zhiwei.messageflow.service.DisposeMessageService;
......@@ -115,4 +118,30 @@ public class DisposeMessageServiceImpl implements DisposeMessageService {
return ram;
}
@Override
public RsidAndMessages getFilteredMessage(List<NoiseRule> noiseRules, List<String> keywords, int count,
Long startid, Long endid, PlatformNew platform, String project, String matchFields) {
RsidAndMessages ram = new RsidAndMessages();
// 消息列表
List<JSONObject> messages = new ArrayList<JSONObject>();
messages = es4BeanDao.getMessage(noiseRules, keywords, count, startid + 1L, -1L, platform, project,matchFields);
/**
* 获取最后消息的rsid
*/
if (messages != null && !messages.isEmpty()) {
JSONObject mmf = messages.get(0);
JSONObject mml = messages.get(messages.size() - 1);
ram.setJlist(messages);
long fcomid = mmf.getLongValue("commonid");
long lcomid = mml.getLongValue("commonid");
ram.setRsid(fcomid > lcomid ? fcomid : lcomid);
}
return ram;
}
}
##服务端uri
##\u009C\u008D\u008A端uri
#spring.data.mongodb.uri=115.236.59.91:27017
#内置tomcat端口号
server.port=8092
#\u5185\u7F6Etomcat\u7AEF\u53E3\u53F7
server.port=8091
#服务端数据库
#\u009C\u008D\u008A端\u0095\u008D\u0093
spring.data.mongodb.primary.database=qbjcPhoenix
#服务ip
#\u009C\u008D\u008Aip
spring.data.mongodb.primary.host=192.168.0.101
#服务port
#\u009C\u008D\u008Aport
spring.data.mongodb.primary.port=27017
spring.data.mongodb.primary.username=stno
......@@ -17,11 +17,11 @@ spring.data.mongodb.primary.password=stno1q2w3e4r
spring.data.mongodb.primary.authenticationDatabase=admin
#服务端数据库
#\u009C\u008D\u008A端\u0095\u008D\u0093
spring.data.mongodb.secondary.database=eventMuseum
#服务ip
#\u009C\u008D\u008Aip
spring.data.mongodb.secondary.host=192.168.0.101
#服务port
#\u009C\u008D\u008Aport
spring.data.mongodb.secondary.port=27017
spring.data.mongodb.secondary.username=stno
......@@ -30,11 +30,11 @@ spring.data.mongodb.secondary.password=stno1q2w3e4r
spring.data.mongodb.secondary.authenticationDatabase=admin
#服务端数据库
#\u009C\u008D\u008A端\u0095\u008D\u0093
spring.data.mongodb.thirdary.database=WechatPublic
#服务ip
#\u009C\u008D\u008Aip
spring.data.mongodb.thirdary.host=192.168.0.101
#服务port
#\u009C\u008D\u008Aport
spring.data.mongodb.thirdary.port=27017
spring.data.mongodb.thirdary.username=stno
......@@ -64,21 +64,21 @@ spring.data.mongodb.thirdary.authenticationDatabase=admin
#spring.data.mongodb.option.heartbeat-frequency=10000
#spring.data.mongodb.option.local-threshold=15
##本地ip
##\u009C\u009Cip
#spring.data.mongodb.host=192.168.0.241
##本地port
##\u009C\u009Cport
#spring.data.mongodb.port=27017
##本地数据库
##\u009C\u009C\u0095\u008D\u0093
#spring.data.mongodb.database=qbjcPhoenix
#其他数据库
#\u0085\u0096\u0095\u008D\u0093
#spring.data.mongodb.database=eventMuseum
#spring.data.mongodb.database=WechatPublic
#tag用uri
#tag\u0094uri
#spring.data.mongodb.uri=1.119.44.206:30000
#spring.data.mongodb.uri=192.168.0.245:27017
#tag数据库
#tag\u0095\u008D\u0093
#spring.data.mongodb.database=Testqbjc
#spring.data.mongodb.database=weibotag
......
##服务端uri
#spring.data.mongodb.uri=115.236.59.91:27017
#内置tomcat端口号
server.port=8092
#服务端数据库
spring.data.mongodb.primary.database=qbjcPhoenix
#服务ip
spring.data.mongodb.primary.host=192.168.0.101
#服务port
spring.data.mongodb.primary.port=27017
spring.data.mongodb.primary.username=stno
spring.data.mongodb.primary.password=stno1q2w3e4r
spring.data.mongodb.primary.authenticationDatabase=admin
#服务端数据库
spring.data.mongodb.secondary.database=eventMuseum
#服务ip
spring.data.mongodb.secondary.host=192.168.0.101
#服务port
spring.data.mongodb.secondary.port=27017
spring.data.mongodb.secondary.username=stno
spring.data.mongodb.secondary.password=stno1q2w3e4r
spring.data.mongodb.secondary.authenticationDatabase=admin
#服务端数据库
spring.data.mongodb.thirdary.database=WechatPublic
#服务ip
spring.data.mongodb.thirdary.host=192.168.0.101
#服务port
spring.data.mongodb.thirdary.port=27017
spring.data.mongodb.thirdary.username=stno
spring.data.mongodb.thirdary.password=stno1q2w3e4r
spring.data.mongodb.thirdary.authenticationDatabase=admin
#spring.data.mongodb.option.min-connection-per-host=0
#spring.data.mongodb.option.max-connection-per-host=100
#spring.data.mongodb.option.threads-allowed-to-block-for-connection-multiplier=5
#spring.data.mongodb.option.server-selection-timeout=30000
#spring.data.mongodb.option.max-wait-time=120000
#spring.data.mongodb.option.max-connection-idle-time=0
#spring.data.mongodb.option.max-connection-life-time=0
#spring.data.mongodb.option.connect-timeout=10000
#spring.data.mongodb.option.socket-timeout=0
#
#spring.data.mongodb.option.socket-keep-alive=false
#spring.data.mongodb.option.ssl-enabled=false
#spring.data.mongodb.option.ssl-invalid-host-name-allowed=false
#spring.data.mongodb.option.always-use-m-beans=false
#
#spring.data.mongodb.option.heartbeat-socket-timeout=20000
#spring.data.mongodb.option.heartbeat-connect-timeout=20000
#spring.data.mongodb.option.min-heartbeat-frequency=500
#spring.data.mongodb.option.heartbeat-frequency=10000
#spring.data.mongodb.option.local-threshold=15
##本地ip
#spring.data.mongodb.host=192.168.0.241
##本地port
#spring.data.mongodb.port=27017
##本地数据库
#spring.data.mongodb.database=qbjcPhoenix
#其他数据库
#spring.data.mongodb.database=eventMuseum
#spring.data.mongodb.database=WechatPublic
#tag用uri
#spring.data.mongodb.uri=1.119.44.206:30000
#spring.data.mongodb.uri=192.168.0.245:27017
#tag数据库
#spring.data.mongodb.database=Testqbjc
#spring.data.mongodb.database=weibotag
#mongo.connectionsPerHost=200
#mongo.threadsAllowedToBlockForConnectionMultiplier=10
#
#mongo.connectTimeout=30000
#
#mongo.maxWaitTime=50000
#mongo.autoConnectRetry=true
#mongo.socketKeepAlive=true
#
#mongo.socketTimeout=120000
#mongo.slaveOk=true
\ No newline at end of file
......@@ -13,7 +13,7 @@ redis.port=6380
redis.keyMaxSize=5000
redis.selectDB=13
redis.selectDB=12
#redis.selectDB=2
redis.user_keyMaxSize=1000
redis.cacheSize=1000
......
redis.maxTotal=2048
redis.maxIdle=200
redis.maxWaitMillis=1000
redis.testOnBorrow=true
redis.testOnReturn=true
redis.ip = 192.168.0.202
#redis.ip = 202.107.192.94
#redis.ip=127.0.0.1
redis.port=6380
#redis.ip=192.168.1.74
#redis.port=6388
#redis.password=fjouero&^%^%^$*()*)))*^$$KDFJDKJF9ruorudlfdljfldjf
redis.keyMaxSize=5000
redis.selectDB=13
#redis.selectDB=2
redis.user_keyMaxSize=1000
redis.cacheSize=1000
redis.intitCount=3000
\ No newline at end of file
......@@ -13,7 +13,7 @@ redis.port=6379
redis.keyMaxSize=5000
redis.selectDB=13
redis.selectDB=12
#redis.selectDB=2
redis.user_keyMaxSize=1000
redis.cacheSize=1000
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment