Commit cfaf3818 by shentao

2018/8/2 平台选配调整

parent 51394b16
...@@ -144,7 +144,10 @@ public class ES4RedisTask { ...@@ -144,7 +144,10 @@ public class ES4RedisTask {
*/ */
List<String> allkeywords = new ArrayList<>(); List<String> allkeywords = new ArrayList<>();
for (KeywordNew kwn : keywordNews) { for (KeywordNew kwn : keywordNews) {
if (null == kwn.getKeyWords() || kwn.getKeyWords().isEmpty()) { if (!kwn.getPtList().contains(platformName)) {// 滤过非选配词组
continue;
}
if (null == kwn.getKeyWords() || kwn.getKeyWords().isEmpty()) {// 滤过空词组
continue; continue;
} }
allkeywords.addAll(kwn.getKeyWords()); allkeywords.addAll(kwn.getKeyWords());
...@@ -186,9 +189,11 @@ public class ES4RedisTask { ...@@ -186,9 +189,11 @@ public class ES4RedisTask {
// 遍历关键词组 // 遍历关键词组
for (KeywordNew kwn : keywordNews) { for (KeywordNew kwn : keywordNews) {
// 滤过空关键词组 if(!kwn.getPtList().contains(platformName)) {// 滤过非选配词组
if (null == kwn.getKeyWords() || kwn.getKeyWords().isEmpty()) { continue;
}
if (null == kwn.getKeyWords() || kwn.getKeyWords().isEmpty()) {// 滤过空关键词组
continue; continue;
} }
...@@ -247,330 +252,4 @@ public class ES4RedisTask { ...@@ -247,330 +252,4 @@ public class ES4RedisTask {
return true; return true;
} }
@SuppressWarnings("unchecked")
public boolean ES4Redis(Project project, List<String> platformNames, int count)
throws JsonParseException, JsonMappingException, IOException {
try {
/**
* 获取项目对应的rsid Map
*/
String rsidjson = redisService.getRsid(project.getProjectName());
Map<String, Integer> rsidMap = new HashMap<>();
if (rsidjson != null) {
rsidMap = mapper.readValue(rsidjson, Map.class);
} else {
log.info("{}项目RSID列表过期", project.getProjectName());
}
// 更新rsid用的rsid Map
Map<String, Integer> newRsidMap = new HashMap<>();
newRsidMap.putAll(rsidMap);
// 统计项目消息总数
int num = 0;
// 项目关键词组列表
List<KeywordNew> keywordNews = keywordNewDao.getKeywordNewByProject(project.getProjectName());
// 项目噪音规则列表
List<NoiseRule> noiseRules = noiseRuleDao.getNoiseRuleByProject(project.getProjectName());
// 项目预警规则列表
List<TrackRule> trackRules = trackRuleDao.getTrackRuleByProject(project.getProjectName());
/*
* //测试预警规则 List<TrackRule> trackRules=new ArrayList<>(); TrackRule tr1=new
* TrackRule(); TrackRule tr2=new TrackRule(); TrackRule tr3=new TrackRule();
* Date date = new Date(); Long createAt=date.getTime();
* tr1.setCreateAt(createAt); tr1.setAndOr(""); tr1.setWarn(false);
* tr1.setRuleType("keyWords"); tr1.setSubmitter("虞诚毅");
* tr1.setKeyWordsInputOne("知乎"); tr1.setRuleName("测试规则1");
* tr1.setRuleExplain("测试说明1"); tr1.setEarlyWarningTime("三天");
* tr1.setProject("腾讯"); tr1.setEarlyWarning("email");
* tr2.setCreateAt(createAt); tr2.setAndOr("或"); tr2.setWarn(false);
* tr2.setRuleType("keyWords"); tr2.setSubmitter("虞诚毅");
* tr2.setKeyWordsInputOne("知乎"); tr2.setKeyWordsInputTwo("微博");
* tr2.setRuleName("测试规则2"); tr2.setRuleExplain("测试说明3");
* tr2.setEarlyWarningTime("三天"); tr2.setProject("腾讯");
* tr2.setEarlyWarning("email"); tr3.setCreateAt(createAt); tr3.setAndOr("且");
* tr3.setWarn(false); tr3.setRuleType("keyWords"); tr3.setSubmitter("虞诚毅");
* tr3.setKeyWordsInputOne("知乎"); tr3.setKeyWordsInputTwo("问题");
* tr3.setRuleName("测试规则2"); tr3.setRuleExplain("测试说明3");
* tr3.setEarlyWarningTime("三天"); tr3.setProject("腾讯");
* tr3.setEarlyWarning("wechat"); trackRules.add(tr1); trackRules.add(tr2);
* trackRules.add(tr3);
*/
// 遍历平台
for (String platformName : platformNames) {
/**
* 平台全关键词查询
*/
String allkeytitle = "全部";
// 全关键词redis库中的key
String allRedisKey = platformName + "-" + project.getProjectName() + "-" + allkeytitle;
// 获取 平台-项目-关键词组 对应的rsid作为查询范围的起始点
long allstartrsid = rsidMap.get(allRedisKey) == null ? -1L : Long.valueOf(rsidMap.get(allRedisKey));
// 用于存储数据获取后新的rsid
Long allrsid = -1L;
/**
* 将项目下的所有关键词组合成全关键词组
*/
List<String> allkeywords = new ArrayList<>();
for (KeywordNew kwn : keywordNews) {
if (null == kwn.getKeyWords() || kwn.getKeyWords().isEmpty()) {
continue;
}
allkeywords.addAll(kwn.getKeyWords());
}
// 根据不同平台获取数据
if (platformName.equals("微博")) {
// 获取新的rsid和信息实体
RsidAndMessages ram = disposeMessageService.getFilteredWeiboMessage(noiseRules, allkeywords, count,
allstartrsid, -1L, platformName, project.getProjectName());
// 获取查询到的信息
List<WeiboMessage> messages = ram.getWlist();
// 数据量为0
if (messages == null) {
// log.info("{}平台{}关键字词组无消息", platformName, allkeytitle);
continue;
}
// log.info("{}平台{}关键词数据获取{}条", platformName, "全部", messages.size());
// 记录新的rsid
allrsid = ram.getRsid();
num += messages.size();
// 向redis写入数据
redisService.setWeiboMessageMessage(allRedisKey, messages, allkeywordcount);
} else if (platformName.equals("知乎")) {
// 获取新的rsid和信息实体
RsidAndMessages ram = disposeMessageService.getFilteredZhihuMessage(noiseRules, allkeywords, count,
allstartrsid, -1L, platformName, project.getProjectName());
// 获取查询到的信息
List<ZhihuMessage> messages = ram.getZlist();
// 查询到数据量为0
if (messages == null) {
// log.info("{}平台{}关键字词组无消息", platformName, allkeytitle);
continue;
}
// log.info("{}平台{}关键词数据获取{}条", platformName, "全部", messages.size());
// 记录新的rsid
allrsid = ram.getRsid();
num += messages.size();
// 向redis写入数据
redisService.setZhihuMessageMessage(allRedisKey, messages, allkeywordcount);
} else if (platformName.equals("视频")) {
// 获取新的rsid和信息实体
RsidAndMessages ram = disposeMessageService.getFilteredVideoMessage(noiseRules, allkeywords, count,
allstartrsid, -1L, platformName, project.getProjectName());
// 获取查询到的信息
List<VideoMessage> messages = ram.getVlist();
// 查询到数据量为0
if (messages == null) {
// log.info("{}平台{}关键字词组无消息", platformName, allkeytitle);
continue;
}
// log.info("{}平台{}关键词数据获取{}条", platformName, "全部", messages.size());
// 记录新的rsid
allrsid = ram.getRsid();
num += messages.size();
// 向redis写入数据
redisService.setVideoMessageMessage(allRedisKey, messages, allkeywordcount);
} else {
// 获取新的rsid和信息实体
RsidAndMessages ram = disposeMessageService.getFilteredMediaMessage(noiseRules, allkeywords, count,
allstartrsid, -1L, platformName, project.getProjectName());
// 获取查询到的信息
List<MediaMessage> messages = ram.getMlist();
// 查询到数据量为0
if (messages == null) {
// log.info("{}平台{}关键字词组无消息", platformName, allkeytitle);
continue;
}
// log.info("{}平台{}关键词数据获取{}条", platformName, "全部", messages.size());
// 记录新的rsid
allrsid = ram.getRsid();
num += messages.size();
// 向redis写入数据
redisService.setMediaMessageMessage(allRedisKey, messages, allkeywordcount);
}
newRsidMap.put(allRedisKey, Integer.valueOf(allrsid.toString()));
// 遍历关键词组
for (KeywordNew kwn : keywordNews) {
// 滤过空关键词组
if (null == kwn.getKeyWords() || kwn.getKeyWords().isEmpty()) {
continue;
}
// 关键词组在redis库中的key
String redisKey = platformName + "-" + project.getProjectName() + "-" + kwn.getKeyTitle();
// 获取 平台-项目-关键词组 对应的rsid作为查询范围的起始点
Long startrsid = rsidMap.get(redisKey) == null ? -1L : Long.valueOf(rsidMap.get(redisKey));
// 用于存储数据获取后新的rsid
Long keyrsid = -1L;
// System.out.println(kwn.toString());
// System.out.println(l++ + ":" + platformName);
// System.out.println("=============这是分界线=============");
// 根据平台查询数据
if (platformName.equals("微博")) {
// 获取新的rsid和信息实体
RsidAndMessages ram = disposeMessageService.getFilteredWeiboMessage(noiseRules,
kwn.getKeyWords(), count, startrsid, -1L, platformName, project.getProjectName());
// 获取查询到的信息
List<WeiboMessage> messages = ram.getWlist();
// 查询到数据量为0
if (messages == null) {
newRsidMap.put(redisKey, Integer.valueOf(startrsid.toString()));
// log.info("{}平台{}关键字词组无消息", platformName, kwn.getKeyTitle());
continue;
}
// log.info("{}平台{}关键词数据获取{}条", platformName, kwn.getKeyTitle(),
// messages.size());
num += messages.size();
// 记录新的rsid
keyrsid = ram.getRsid();
// 向redis写入数据
redisService.setWeiboMessageMessage(redisKey, messages, keywordscount);
} else if (platformName.equals("知乎")) {
// 获取新的rsid和信息实体
RsidAndMessages ram = disposeMessageService.getFilteredZhihuMessage(noiseRules,
kwn.getKeyWords(), count, startrsid, -1L, platformName, project.getProjectName());
// 获取查询到的信息
List<ZhihuMessage> messages = ram.getZlist();
// 查询到数据量为0
if (messages == null) {
newRsidMap.put(redisKey, Integer.valueOf(startrsid.toString()));
// log.info("{}平台{}关键字词组无消息", platformName, kwn.getKeyTitle());
continue;
}
// log.info("{}平台{}关键词数据获取{}条", platformName, kwn.getKeyTitle(),
// messages.size());
num += messages.size();
// 记录新的rsid
keyrsid = ram.getRsid();
// 向redis写入数据
redisService.setZhihuMessageMessage(redisKey, messages, keywordscount);
} else if (platformName.equals("视频")) {
// 获取新的rsid和信息实体
RsidAndMessages ram = disposeMessageService.getFilteredVideoMessage(noiseRules,
kwn.getKeyWords(), count, startrsid, -1L, platformName, project.getProjectName());
// 获取查询到的信息
List<VideoMessage> messages = ram.getVlist();
// 查询到数据量为0
if (messages == null) {
newRsidMap.put(redisKey, Integer.valueOf(startrsid.toString()));
// log.info("{}平台{}关键字词组无消息", platformName, kwn.getKeyTitle());
continue;
}
// log.info("{}平台{}关键词数据获取{}条", platformName, kwn.getKeyTitle(),
// messages.size());
num += messages.size();
// 记录新的rsid
keyrsid = ram.getRsid();
// 向redis写入数据
redisService.setVideoMessageMessage(redisKey, messages, keywordscount);
} else {
// 获取新的rsid和信息实体
RsidAndMessages ram = disposeMessageService.getFilteredMediaMessage(noiseRules,
kwn.getKeyWords(), count, startrsid, -1L, platformName, project.getProjectName());
// 获取查询到的信息
List<MediaMessage> messages = ram.getMlist();
// 查询到数据量为0
if (messages == null) {
newRsidMap.put(redisKey, Integer.valueOf(startrsid.toString()));
// log.info("{}平台{}关键字词组无消息", platformName, kwn.getKeyTitle());
continue;
}
// log.info("{}平台{}关键词数据获取{}条", platformName, kwn.getKeyTitle(),
// messages.size());
num += messages.size();
// 记录新的rsid
keyrsid = ram.getRsid();
// 向redis写入数据
redisService.setMediaMessageMessage(redisKey, messages, keywordscount);
} // 数据写入完成
// 预警
earlyWarningService.earlyWarning(trackRules, count, startrsid, keyrsid, platformName,
project.getProjectName());
// 向更新rsid用的rsid Map中存入rsid
newRsidMap.put(redisKey, Integer.valueOf(keyrsid.toString()));
} // 遍历关键词组
// 向redis库中存储新的rsid Map,覆盖原有数据
redisService.setRsid(newRsidMap, project.getProjectName());
} // 遍历平台
log.info("{}项目本次获取消息数:{}", project.getProjectName(), num);
} catch (Exception e) {
log.error("项目本次获取获取出错或超时{}{}", e.getMessage(), e.getStackTrace());
return false;
}
return true;
}
} }
\ No newline at end of file
package com.zhiwei.messageflow;
import java.io.IOException;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.zhiwei.messageflow.mongo.bean.Project;
public class ES4RedisThread extends Thread {
private final static Logger log = LoggerFactory.getLogger(ES4RedisThread.class);
// 线程
private Thread t;
// 线程名
private String threadName;
// 项目
private Project project;
// 平台列表
private List<String> platformNames;
private ES4RedisTask es4RedisTask;
// 单个平台单个关键词组每次查询数量
private static final int count = 300;
// private static final int max_Thread_num = 40;
// private static int Thread_num = 0;
// private static final int max_Running_num = 3;
// private static Integer Running_num = 0;
// private static List<String> ThreadList = new ArrayList<>();
public ES4RedisThread(String name, Project project, List<String> platformNames, ES4RedisTask es4RedisTask) {
threadName = name;
this.project = project;
this.platformNames = platformNames;
this.es4RedisTask = es4RedisTask;
}
public ES4RedisThread() {
}
public static ES4RedisThread getThread(String name, Project project, List<String> platformNames,
ES4RedisTask es4RedisTask) {
ES4RedisThread es4RedisThread = new ES4RedisThread(name, project, platformNames, es4RedisTask);
return es4RedisThread;
}
public void start() {
// 线程开始
log.info("Starting {}", threadName);
if (t == null) {
t = new Thread(this, threadName);
// 通知执行run方法
t.start();
}
// 超时控制器
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
// 超时则线程中止
if (t.isAlive()) {
t.interrupt();
log.warn("{}项目超时线程状态:{}", project.getProjectName(),t.isInterrupted());
}
}
}, 49 * 1000L);
}
@SuppressWarnings("static-access")
@Override
public void run() {
try {
Thread.sleep(10L);
// 超时控制器
// Timer timer = new Timer();
// timer.schedule(new TimerTask() {
// @Override
// public void run() {
// // 超时则线程中止
// Thread.currentThread().interrupted();
// log.warn("{}项目超时", project.getProjectName());
// }
// }, 49 * 1000L);
// 程序运行
log.info("Running {}", threadName);
// 该项目执行消息流获取
boolean flag = es4RedisTask.ES4Redis(project, platformNames, count);
if (!flag) {
// 程序执行出现异常则线程中止
// timer.cancel();
Thread.currentThread().interrupted();
log.error("{}项目出现异常,线程状态:{}", project.getProjectName(),Thread.currentThread().isInterrupted());
}
// else
// // 程序正常执行完毕,关闭超时控制器
// timer.cancel();
} catch (JsonParseException e) {
e.printStackTrace();
} catch (JsonMappingException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 线程退出
log.info("Thread {} exiting.", threadName);
}
}
}
\ No newline at end of file
package com.zhiwei.messageflow.config;
import java.util.Arrays;
import java.util.List;
public class BossZhipingChannelList {
public static final List<String> BOSSZPLIST = Arrays.asList("人民日报", "人民网", "新华社", "中新社", "光明日报", "光明网", "中央人民广播电台",
"36氪", "Techweb网", "中国网", "新京报网", "新京报", "法晚网", "法制晚报", "中国青年报", "中青在线", "经济生活网", "经济日报", "中国经济网", "央视网",
"国际在线", "京报网", "北晨网", "北京日报", "北京青年报", "北青网", "北京晚报", "北京晨报", "上观新闻", "解放日报", "文汇报", "新民晚报", "新民网", "文汇网",
"界面新闻", "澎湃新闻", "新闻晨报网", "新闻晨报", "南方网", "奥一网", "南方日报", "南方都市报", "21世纪经济报道", "南方周末", "金羊网", "羊城晚报", "四川在线",
"华西都市网", "封面新闻", "四川日报网", "四川日报", "成都商报", "成都晚报", "红星新闻", "四川新闻网", "现代快报网", "现代快报", "晚报", "金陵晚报网", "扬子晚报网",
"扬子晚报", "今晚网", "今晚报", "三秦都市报", "三秦网", "钱江晚报", "浙江24小时", "腾讯科技", "搜狐科技", "凤凰科技", "网易科技", "新浪科技", "好奇心日报",
"36Kr", "虎嗅", "钛媒体", "AI财经社", "腾讯财经", "搜狐财经", "凤凰财经", "网易财经", "新浪财经", "财经网", "财新网", "Techweb", "DoNews",
"亿欧网", "投资界", "ifanr", "中国政府网", "人社部", "民政部", "司法部", "商务部", "国家广电总局", "国家市场监督管理总局", "中国人民银行", "教育部", "科技部", "工信部",
"公安部", "国家知识产权局", "首都之窗", "天津政务网", "中国上海", "江苏省政府", "四川省政府", "广东省政府", "浙江省政府", "重庆市政府");
}
...@@ -636,7 +636,11 @@ public class NoiseProcessingServiceImpl implements NoiseProcessingService { ...@@ -636,7 +636,11 @@ public class NoiseProcessingServiceImpl implements NoiseProcessingService {
String text = map.get("text") != null ? map.get("text").toString() : null; String text = map.get("text") != null ? map.get("text").toString() : null;
String roottext = map.get("roottext") != null ? map.get("roottext").toString() : null; String roottext = map.get("roottext") != null ? map.get("roottext").toString() : null;
String username = map.get("username") != null ? map.get("username").toString() : null;
boolean isnoise = false; boolean isnoise = false;
if (null == username) {
return isnoise;
}
if (noiseRules != null) { if (noiseRules != null) {
for (NoiseRule n : noiseRules) { for (NoiseRule n : noiseRules) {
/** /**
...@@ -983,8 +987,11 @@ public class NoiseProcessingServiceImpl implements NoiseProcessingService { ...@@ -983,8 +987,11 @@ public class NoiseProcessingServiceImpl implements NoiseProcessingService {
String text = map.get("title") != null ? map.get("title").toString() : null; String text = map.get("title") != null ? map.get("title").toString() : null;
String roottext = map.get("content") != null ? map.get("content").toString() : null; String roottext = map.get("content") != null ? map.get("content").toString() : null;
String username = map.get("source") != null ? map.get("source").toString() : null;
boolean isnoise = false; boolean isnoise = false;
if (null == username) {
return isnoise;
}
if (noiseRules != null) { if (noiseRules != null) {
for (NoiseRule n : noiseRules) { for (NoiseRule n : noiseRules) {
/** /**
......
...@@ -21,4 +21,5 @@ public class KeywordNew { ...@@ -21,4 +21,5 @@ public class KeywordNew {
private String submitter; private String submitter;
private long createAt; private long createAt;
private Boolean isUsed; private Boolean isUsed;
private List<String> ptList;//平台列表
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment