Commit 031034c1 by 303514581@qq.com

2019/5/14 监测系统消息流高亮读取,子词组合消息并全部词组消息

parent 901bb57b
...@@ -32,7 +32,7 @@ public class DES4RedisStart{ ...@@ -32,7 +32,7 @@ public class DES4RedisStart{
// 遍历项目 // 遍历项目
for (Project project : projects) { for (Project project : projects) {
// if(!project.getProjectName().equals("测试")) // if(!project.getProjectName().equals("证监会"))
// continue; // continue;
/** /**
......
...@@ -24,6 +24,7 @@ import com.zhiwei.messageflow.mongo.dao.KeywordNewDao; ...@@ -24,6 +24,7 @@ import com.zhiwei.messageflow.mongo.dao.KeywordNewDao;
import com.zhiwei.messageflow.redis.bean.RsidAndMessages; import com.zhiwei.messageflow.redis.bean.RsidAndMessages;
import com.zhiwei.messageflow.redis.service.RedisService; import com.zhiwei.messageflow.redis.service.RedisService;
import com.zhiwei.messageflow.service.DisposeMessageService; import com.zhiwei.messageflow.service.DisposeMessageService;
import com.zhiwei.messageflow.util.Tools;
@Component @Component
public class DirectES4RedisTask { public class DirectES4RedisTask {
...@@ -48,8 +49,8 @@ public class DirectES4RedisTask { ...@@ -48,8 +49,8 @@ public class DirectES4RedisTask {
/** /**
* 定向监测任务 * 定向监测任务
* *
* @Title: directTask * @Title: directTaskNew
* @Description: TODO(定向监测任务) * @Description: TODO(定向监测任务) 历史版
* @param @param project * @param @param project
* @param @param dgList * @param @param dgList
* @param @param count * @param @param count
...@@ -57,7 +58,7 @@ public class DirectES4RedisTask { ...@@ -57,7 +58,7 @@ public class DirectES4RedisTask {
* @return boolean 返回类型 * @return boolean 返回类型
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public boolean directTask(Project project, List<DirectGroup> dgList, int count) public boolean directTaskNew(Project project, List<DirectGroup> dgList, int count)
throws JsonParseException, JsonMappingException, IOException { throws JsonParseException, JsonMappingException, IOException {
try { try {
/** /**
...@@ -115,7 +116,7 @@ public class DirectES4RedisTask { ...@@ -115,7 +116,7 @@ public class DirectES4RedisTask {
} }
} }
// 根据不同平台获取数据(同一方法,统一封装为消息流实体 // 根据不同平台获取数据(同一方法,统一封装为消息流实体
RsidAndMessages ram = disposeMessageService.getDirectMessage(allkeywords, count, startrsid, -1L, pt, RsidAndMessages ram = disposeMessageService.getDirectMessageNew(allkeywords, count, startrsid, -1L, pt,
project.getProjectName(), directGroup); project.getProjectName(), directGroup);
// 获取查询到的信息 // 获取查询到的信息
...@@ -150,4 +151,149 @@ public class DirectES4RedisTask { ...@@ -150,4 +151,149 @@ public class DirectES4RedisTask {
} }
return true; return true;
} }
/**
* 定向监测读取新 高亮+子词组合并
*
* @Title: directTask
* @Description: 定向监测读取新
* @param @param project
* @param @param dgList
* @param @param count
* @param @return
* @param @throws JsonParseException
* @param @throws JsonMappingException
* @param @throws IOException 设定文件
* @return boolean 返回类型
*/
public boolean directTask(Project project, List<DirectGroup> dgList, int count)
throws JsonParseException, JsonMappingException, IOException {
try {
/**
* 获取项目对应的rsid Map
*/
String directRsidMapKey = RedisConfig.DIRECTRSIDKEY + project.getProjectName();
String rsidjson = redisService.getRsid(directRsidMapKey);
Map<String, Integer> rsidMap = new HashMap<>();
if (rsidjson != null) {
rsidMap = mapper.readValue(rsidjson, Map.class);
} else {
log.info("{}项目RSID列表过期", project.getProjectName());
}
// 统计项目消息总数
int num = 0;
// 更新rsid用的rsid Map
Map<String, Integer> newRsidMap = new HashMap<>();
newRsidMap.putAll(rsidMap);
// 项目关键词组列表
List<KeywordNew> keywordNews = keywordNewDao.getDirectKeywordNewByProject(project.getProjectName());
for (DirectGroup directGroup : dgList) {
// 判断渠道组是否需要读取
if (!directGroup.getIsUsed() || (null == directGroup.getMemberList()))
continue;
if (directGroup.getMemberList().isEmpty())
continue;
// 获取渠道组所包含渠道平台
List<String> ptlist = directGroupDao.getOneDirectGroupPts(directGroup);
// 遍历全平台获取对应渠道组id
for (String pt : ptlist) {
// 全关键词redis库中的key
String directRedisKey = redisService.getDirectRedisKeyNew(project.getProjectName(),
directGroup.getName(), pt);
// 获取 平台-项目-关键词组 对应的rsid作为查询范围的起始点
Long startrsid = (rsidMap.get(directRedisKey) == null ? -1L
: Long.valueOf(rsidMap.get(directRedisKey))) >= ESGetCommonId.START_BACKUPID
? Long.valueOf(rsidMap.get(directRedisKey))
: ESGetCommonId.START_BACKUPID;
// 用于存储数据获取后新的rsid
Long keyrsid = -1L;
// 获取该渠道组对应关键词组的关键词,如没有则按全量
List<KeywordNew> DkeywordNews = new ArrayList<>();
for (KeywordNew keywordNew : keywordNews) {
if (null != keywordNew.getQdList() && !keywordNew.getQdList().isEmpty()) {// 判断是否定向监测启用
if (keywordNew.getQdList().contains(directGroup.getName())) {// qdList中有渠道的name
DkeywordNews.add(keywordNew);
}
}
}
if (DkeywordNews.isEmpty()) {
// 根据不同平台获取数据(同一方法,统一封装为消息流实体
RsidAndMessages ram = disposeMessageService.getDirectMessage(new ArrayList<>(), count,
startrsid, -1L, pt, project.getProjectName(), directGroup);
// 获取查询到的信息
List<JSONObject> messageskey = ram.getJlist();
// 查询到数据量为0
if (messageskey == null) {
newRsidMap.put(directRedisKey, Integer.valueOf(startrsid.toString()));
// log.info("{}平台{}关键字词组无消息", platformName, kwn.getKeyTitle());
continue;
}
// log.info("{}平台{}关键词数据获取{}条", platformName, kwn.getKeyTitle(),
// messages.size());
num += messageskey.size();
// 记录新的rsid
keyrsid = ram.getRsid();
// 向redis写入数据
redisService.setMessage2Redis(directRedisKey, messageskey, allkeywordcount);
newRsidMap.put(directRedisKey, Integer.valueOf(keyrsid.toString()));
// 向redis库中存储新的rsid Map,覆盖原有数据
redisService.setRsid(newRsidMap, directRsidMapKey);
} else {
// 子词组合并
List<JSONObject> messages = new ArrayList<>();
for (int i = 0; i < DkeywordNews.size(); i++) {
KeywordNew DkeywordNew = DkeywordNews.get(i);
// 根据不同平台获取数据(同一方法,统一封装为消息流实体
RsidAndMessages ram = disposeMessageService.getDirectMessage(DkeywordNew.getKeyWords(),
count, startrsid, -1L, pt, project.getProjectName(), directGroup);
// 获取查询到的信息
List<JSONObject> messageskey = ram.getJlist();
// 查询到数据量为0
if (messageskey == null) {
newRsidMap.put(directRedisKey, Integer.valueOf(startrsid.toString()));
// log.info("{}平台{}关键字词组无消息", platformName, kwn.getKeyTitle());
continue;
}
messages = disposeMessageService.accumulateMessage(messages,messageskey);
num += messageskey.size();
// 记录新的rsid
keyrsid = ram.getRsid()>keyrsid?ram.getRsid():keyrsid;
}
// 向redis写入数据
redisService.setMessage2Redis(directRedisKey, messages, allkeywordcount);
newRsidMap.put(directRedisKey, Integer.valueOf(keyrsid.toString()));
// 向redis库中存储新的rsid Map,覆盖原有数据
redisService.setRsid(newRsidMap, directRsidMapKey);
}
}
// 向redis库中存储新的rsid Map,覆盖原有数据
redisService.setRsid(newRsidMap, directRsidMapKey);
}
log.info("{}项目定向监测本次获取消息数:{}", project.getProjectName(), num);
} catch (Exception e) {
log.error("项目定向监测本次获取获取出错或超时{}{}", e.getMessage(), e.getStackTrace());
return false;
}
return true;
}
} }
...@@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager; ...@@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager;
import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.JsonMappingException;
import com.zhiwei.messageflow.config.RedisConfig;
import com.zhiwei.messageflow.mongo.bean.DirectGroup; import com.zhiwei.messageflow.mongo.bean.DirectGroup;
import com.zhiwei.messageflow.mongo.bean.Project; import com.zhiwei.messageflow.mongo.bean.Project;
...@@ -69,7 +70,7 @@ public class DirectES4RedisThread extends Thread { ...@@ -69,7 +70,7 @@ public class DirectES4RedisThread extends Thread {
log.warn("{}项目超时定向监测线程状态:{}", project.getProjectName(),t.isInterrupted()); log.warn("{}项目超时定向监测线程状态:{}", project.getProjectName(),t.isInterrupted());
} }
} }
}, 49 * 1000L); }, RedisConfig.READ_TIME-1000L);
} }
@SuppressWarnings("static-access") @SuppressWarnings("static-access")
......
...@@ -10,6 +10,7 @@ import org.springframework.boot.ApplicationArguments; ...@@ -10,6 +10,7 @@ import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner; import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import com.zhiwei.messageflow.config.RedisConfig;
import com.zhiwei.messageflow.listener.ApplicationContextProvider; import com.zhiwei.messageflow.listener.ApplicationContextProvider;
import com.zhiwei.messageflow.test.GetMessage; import com.zhiwei.messageflow.test.GetMessage;
...@@ -35,8 +36,8 @@ public class ES4RedisRunner implements ApplicationRunner { ...@@ -35,8 +36,8 @@ public class ES4RedisRunner implements ApplicationRunner {
// gs.getDataByeRedis("", "", 0, 0, 0); // gs.getDataByeRedis("", "", 0, 0, 0);
ESGetCommonId esGetCommonId = ApplicationContextProvider.getBean("ESGetCommonId", ESGetCommonId.class); ESGetCommonId esGetCommonId = ApplicationContextProvider.getBean("ESGetCommonId", ESGetCommonId.class);
esGetCommonId.getCommonId(); esGetCommonId.getCommonId();
// esGetCommonId.getbackupStartId(); esGetCommonId.getbackupStartId();
esGetCommonId.getStartId(); // esGetCommonId.getStartId();
// 启动时更新事件等待采集列表中的前十个任务状态为采集完毕 // 启动时更新事件等待采集列表中的前十个任务状态为采集完毕
boolean isSuccess = esGetCommonId.updateTopTenCollection(); boolean isSuccess = esGetCommonId.updateTopTenCollection();
...@@ -61,10 +62,10 @@ public class ES4RedisRunner implements ApplicationRunner { ...@@ -61,10 +62,10 @@ public class ES4RedisRunner implements ApplicationRunner {
directstart.startThread(); directstart.startThread();
eventStart.startThread(); eventStart.startThread();
} catch (Exception e) { } catch (Exception e) {
log.error("主定时器异常{}{}", e.getMessage(), e.getStackTrace()); log.error("主定时器异常", e);
} }
} }
}, 100L, 50 * 1000L); }, 100L, RedisConfig.READ_TIME);
// /** // /**
// * ES库消息输出Excel并分析关键词重复数据 // * ES库消息输出Excel并分析关键词重复数据
......
...@@ -83,9 +83,9 @@ public class ES4RedisStart { ...@@ -83,9 +83,9 @@ public class ES4RedisStart {
pcount++; pcount++;
} }
// if(!project.getProjectName().equals("测试")) { // if (!project.getProjectName().equals("腾讯")) {
// continue; // continue;
// } // }
/** /**
* 项目全部平台(公共+私有) * 项目全部平台(公共+私有)
......
...@@ -19,6 +19,7 @@ import com.zhiwei.messageflow.bean.MediaMessage; ...@@ -19,6 +19,7 @@ import com.zhiwei.messageflow.bean.MediaMessage;
import com.zhiwei.messageflow.bean.VideoMessage; import com.zhiwei.messageflow.bean.VideoMessage;
import com.zhiwei.messageflow.bean.WeiboMessage; import com.zhiwei.messageflow.bean.WeiboMessage;
import com.zhiwei.messageflow.bean.ZhihuMessage; import com.zhiwei.messageflow.bean.ZhihuMessage;
import com.zhiwei.messageflow.config.RedisConfig;
import com.zhiwei.messageflow.es.service.AutoMarkService; import com.zhiwei.messageflow.es.service.AutoMarkService;
import com.zhiwei.messageflow.es.service.EarlyWarningService; import com.zhiwei.messageflow.es.service.EarlyWarningService;
import com.zhiwei.messageflow.mongo.bean.KeywordNew; import com.zhiwei.messageflow.mongo.bean.KeywordNew;
...@@ -32,6 +33,7 @@ import com.zhiwei.messageflow.mongo.dao.TrackRuleDao; ...@@ -32,6 +33,7 @@ import com.zhiwei.messageflow.mongo.dao.TrackRuleDao;
import com.zhiwei.messageflow.redis.bean.RsidAndMessages; import com.zhiwei.messageflow.redis.bean.RsidAndMessages;
import com.zhiwei.messageflow.redis.service.RedisService; import com.zhiwei.messageflow.redis.service.RedisService;
import com.zhiwei.messageflow.service.DisposeMessageService; import com.zhiwei.messageflow.service.DisposeMessageService;
import com.zhiwei.messageflow.util.Tools;
@Component @Component
public class ES4RedisTask { public class ES4RedisTask {
...@@ -142,7 +144,7 @@ public class ES4RedisTask { ...@@ -142,7 +144,7 @@ public class ES4RedisTask {
// 根据不同平台获取数据(同一方法,统一封装为消息流实体 // 根据不同平台获取数据(同一方法,统一封装为消息流实体
RsidAndMessages ram = disposeMessageService.getFilteredMessage(noiseRules, allkeywords, count, RsidAndMessages ram = disposeMessageService.getFilteredMessageNew(noiseRules, allkeywords, count,
allstartrsid, -1L, platform, project.getProjectName(), project.getMatchFields()); allstartrsid, -1L, platform, project.getProjectName(), project.getMatchFields());
// 获取查询到的信息 // 获取查询到的信息
...@@ -241,4 +243,150 @@ public class ES4RedisTask { ...@@ -241,4 +243,150 @@ public class ES4RedisTask {
return true; return true;
} }
public boolean ES4Redis(Project project, List<PlatformNew> platformNames, int count)
throws JsonParseException, JsonMappingException, IOException {
try {
/**
* 获取项目对应的rsid Map
*/
String rsidMapKey = RedisConfig.INFORSIDKEY + project.getProjectName();
String rsidjson = redisService.getRsid(rsidMapKey);
Map<String, Integer> rsidMap = new HashMap<>();
if (rsidjson != null) {
rsidMap = mapper.readValue(rsidjson, Map.class);
} else {
log.info("{}项目RSID列表过期", project.getProjectName());
}
// 更新rsid用的rsid Map
Map<String, Integer> newRsidMap = new HashMap<>();
newRsidMap.putAll(rsidMap);
// 统计项目消息总数
int num = 0;
// 项目关键词组列表
List<KeywordNew> keywordNews = keywordNewDao.getKeywordNewByProject(project.getProjectName());
// 项目噪音规则列表
List<NoiseRule> noiseRules = noiseRuleDao.getNoiseRuleByProject(project.getProjectName());
// 项目预警规则列表
List<TrackRule> trackRules = trackRuleDao.getTrackRuleByProject(project.getProjectName());
// 遍历平台
for (PlatformNew platform : platformNames) {
String platformName = platform.getPlatformName();
/**
* 平台全关键词查询
*/
String allkeytitle = "全部";
// 全关键词redis库中的key
String allRedisKey = RedisConfig.INFOKEY + project.getProjectName() + ":" + platformName + ":"
+ allkeytitle;
/**
* 将项目下的所有关键词组合成全关键词组
*/
List<KeywordNew> kws = new ArrayList<>();
for (KeywordNew kwn : keywordNews) {
if (null == kwn.getPtList()) {// 滤过非选配词组
continue;
}
if (kwn.getPtList().isEmpty() || !kwn.getPtList().contains(platformName)) {// 滤过非选配词组
continue;
}
if (null == kwn.getKeyWords() || kwn.getKeyWords().isEmpty()) {// 滤过空词组
continue;
}
kws.add(kwn);
}
// 项目关键词为空
if (kws.isEmpty()) {
continue;
}
List<JSONObject> messages = new ArrayList<>();
// 遍历关键词组
for (KeywordNew kwn : kws) {
if (null == kwn.getPtList()) {// 滤过非选配词组
continue;
}
if (kwn.getPtList().isEmpty() || !kwn.getPtList().contains(platformName)) {// 滤过非选配词组
continue;
}
if (null == kwn.getKeyWords() || kwn.getKeyWords().isEmpty()) {// 滤过空词组
continue;
}
// 关键词组在redis库中的key
String redisKey = RedisConfig.INFOKEY + project.getProjectName() + ":" + platformName + ":"
+ kwn.getKeyTitle();
// 获取 平台-项目-关键词组 对应的rsid作为查询范围的起始点
Long startrsid = (rsidMap.get(redisKey) == null ? -1L
: Long.valueOf(rsidMap.get(redisKey))) >= ESGetCommonId.START_BACKUPID
? Long.valueOf(rsidMap.get(redisKey))
: ESGetCommonId.START_BACKUPID;
// 用于存储数据获取后新的rsid
Long keyrsid = -1L;
// 获取新的rsid和信息实体
RsidAndMessages ramkey = disposeMessageService.getFilteredMessage(noiseRules, kwn.getKeyWords(),
count, startrsid, -1L, platform, project.getProjectName(), project.getMatchFields());
// 获取查询到的信息
List<JSONObject> messageskey = ramkey.getJlist();
// 查询到数据量为0
if (Tools.isEmpty(messageskey)) {
newRsidMap.put(redisKey, Integer.valueOf(startrsid.toString()));
// log.info("{}平台{}关键字词组无消息", platformName, kwn.getKeyTitle());
continue;
}
//合并子词组
// log.info("in messages:{};messageskey{}", messages.size(), messageskey.size());
messages = disposeMessageService.accumulateMessage(messages,messageskey);
// log.info("out messages:{};messageskey{}", messages.size(), messageskey.size());
num += messageskey.size();
// 记录新的rsid
keyrsid = ramkey.getRsid();
// 向redis写入数据
redisService.setMessage2Redis(redisKey, messageskey, keywordscount);
newRsidMap.put(redisKey, Integer.valueOf(keyrsid.toString()));
} // 遍历关键词组
// 向redis写入数据
redisService.setMessage2Redis(allRedisKey, messages, allkeywordcount);
// 自动标注
autoMarkService.autoMarkMessages(messages, project);
// 预警
earlyWarningService.earlyWarning(messages, trackRules, count, platform,
project.getProjectName());
// 向redis库中存储新的rsid Map,覆盖原有数据
redisService.setRsid(newRsidMap, rsidMapKey);
} // 遍历平台
log.info("{}项目本次获取消息数:{}", project.getProjectName(), num);
} catch (Exception e) {
log.error("项目本次获取获取出错或超时", e);
return false;
}
return true;
}
} }
\ No newline at end of file
...@@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager; ...@@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager;
import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.JsonMappingException;
import com.zhiwei.messageflow.config.RedisConfig;
import com.zhiwei.messageflow.mongo.bean.PlatformNew; import com.zhiwei.messageflow.mongo.bean.PlatformNew;
import com.zhiwei.messageflow.mongo.bean.Project; import com.zhiwei.messageflow.mongo.bean.Project;
...@@ -78,7 +79,7 @@ public class ES4RedisThreadNew extends Thread { ...@@ -78,7 +79,7 @@ public class ES4RedisThreadNew extends Thread {
log.warn("{}项目超时线程状态:{}", project.getProjectName(), t.isInterrupted()); log.warn("{}项目超时线程状态:{}", project.getProjectName(), t.isInterrupted());
} }
} }
}, 49 * 1000L); }, RedisConfig.READ_TIME-1000L);
} }
@SuppressWarnings("static-access") @SuppressWarnings("static-access")
...@@ -103,7 +104,7 @@ public class ES4RedisThreadNew extends Thread { ...@@ -103,7 +104,7 @@ public class ES4RedisThreadNew extends Thread {
log.info("Running {}", threadName); log.info("Running {}", threadName);
// 该项目执行消息流获取 // 该项目执行消息流获取
boolean flag = es4RedisTask.ES4RedisNew(project, platformNames, count); boolean flag = es4RedisTask.ES4Redis(project, platformNames, count);
if (!flag) { if (!flag) {
// 程序执行出现异常则线程中止 // 程序执行出现异常则线程中止
......
...@@ -32,7 +32,7 @@ public class ESGetCommonId { ...@@ -32,7 +32,7 @@ public class ESGetCommonId {
public static long START_BACKUPID; public static long START_BACKUPID;
public static long TIME; public static long TIME;
@Async @Async
@Scheduled(cron = "0 0/30 * * * ?") @Scheduled(cron = "0 0/30 * * * ?")
public void updatebackupId() { public void updatebackupId() {
...@@ -46,16 +46,21 @@ public class ESGetCommonId { ...@@ -46,16 +46,21 @@ public class ESGetCommonId {
if (TIME < nowtime - 60 * 60 * 1000L) { if (TIME < nowtime - 60 * 60 * 1000L) {
List<Integer> cidList = new ArrayList<>(); List<Integer> cidList = new ArrayList<>();
int weibo = esDao.get48hoursFirstCommonid("weibo"); int weibo = esDao.get48hoursFirstCommonid("weibo");
cidList.add(weibo); if (weibo != 0)
cidList.add(weibo);
int media = esDao.get48hoursFirstCommonid("media"); int media = esDao.get48hoursFirstCommonid("media");
cidList.add(media); if (media != 0)
cidList.add(media);
int video = esDao.get48hoursFirstCommonid("video"); int video = esDao.get48hoursFirstCommonid("video");
cidList.add(video); if (video != 0)
cidList.add(video);
int zhihu = esDao.get48hoursFirstCommonid("zhihu"); int zhihu = esDao.get48hoursFirstCommonid("zhihu");
cidList.add(zhihu); if (zhihu != 0)
cidList.add(zhihu);
Collections.sort(cidList); Collections.sort(cidList);
log.info(weibo + " " + media + " " + video + " " + zhihu + "\n" + cidList); log.info(weibo + " " + media + " " + video + " " + zhihu + "\n" + cidList);
START_COMMONID = cidList.get(0); if (!cidList.isEmpty())
START_COMMONID = cidList.get(0);
TIME = nowtime; TIME = nowtime;
log.info("START_COMMONID:" + START_COMMONID + "\tNOW:" + new Date(TIME)); log.info("START_COMMONID:" + START_COMMONID + "\tNOW:" + new Date(TIME));
} }
...@@ -66,13 +71,17 @@ public class ESGetCommonId { ...@@ -66,13 +71,17 @@ public class ESGetCommonId {
long endtime = nowtime - limit; long endtime = nowtime - limit;
List<Integer> cidList = new ArrayList<>(); List<Integer> cidList = new ArrayList<>();
int weibo = esDao.getCommonidByTime("weibo", endtime); int weibo = esDao.getCommonidByTime("weibo", endtime);
cidList.add(weibo); if (weibo != 0)
cidList.add(weibo);
int media = esDao.getCommonidByTime("media", endtime); int media = esDao.getCommonidByTime("media", endtime);
cidList.add(media); if (media != 0)
cidList.add(media);
int video = esDao.getCommonidByTime("video", endtime); int video = esDao.getCommonidByTime("video", endtime);
cidList.add(video); if (video != 0)
cidList.add(video);
int zhihu = esDao.getCommonidByTime("zhihu", endtime); int zhihu = esDao.getCommonidByTime("zhihu", endtime);
cidList.add(zhihu); if (zhihu != 0)
cidList.add(zhihu);
Collections.sort(cidList); Collections.sort(cidList);
int commonid = cidList.get(0); int commonid = cidList.get(0);
return commonid; return commonid;
...@@ -86,15 +95,20 @@ public class ESGetCommonId { ...@@ -86,15 +95,20 @@ public class ESGetCommonId {
long nowtime = System.currentTimeMillis() - 10 * 60 * 1000L; long nowtime = System.currentTimeMillis() - 10 * 60 * 1000L;
List<Integer> cidList = new ArrayList<>(); List<Integer> cidList = new ArrayList<>();
int weibo = esDao.getCommonidByTime("weibo", nowtime); int weibo = esDao.getCommonidByTime("weibo", nowtime);
cidList.add(weibo); if (weibo != 0)
cidList.add(weibo);
int media = esDao.getCommonidByTime("media", nowtime); int media = esDao.getCommonidByTime("media", nowtime);
cidList.add(media); if (media != 0)
cidList.add(media);
int video = esDao.getCommonidByTime("video", nowtime); int video = esDao.getCommonidByTime("video", nowtime);
cidList.add(video); if (video != 0)
cidList.add(video);
int zhihu = esDao.getCommonidByTime("zhihu", nowtime); int zhihu = esDao.getCommonidByTime("zhihu", nowtime);
cidList.add(zhihu); if (zhihu != 0)
cidList.add(zhihu);
Collections.sort(cidList); Collections.sort(cidList);
START_BACKUPID = cidList.get(0); if (!cidList.isEmpty())
START_BACKUPID = cidList.get(0);
log.info("START_BACKUPID:" + START_BACKUPID + "\tNOW:" + nowtime); log.info("START_BACKUPID:" + START_BACKUPID + "\tNOW:" + nowtime);
} }
...@@ -102,15 +116,20 @@ public class ESGetCommonId { ...@@ -102,15 +116,20 @@ public class ESGetCommonId {
long nowtime = System.currentTimeMillis() - 6 * 60 * 60 * 1000L; long nowtime = System.currentTimeMillis() - 6 * 60 * 60 * 1000L;
List<Integer> cidList = new ArrayList<>(); List<Integer> cidList = new ArrayList<>();
int weibo = esDao.getCommonidByTime("weibo", nowtime); int weibo = esDao.getCommonidByTime("weibo", nowtime);
cidList.add(weibo); if (weibo != 0)
cidList.add(weibo);
int media = esDao.getCommonidByTime("media", nowtime); int media = esDao.getCommonidByTime("media", nowtime);
cidList.add(media); if (media != 0)
cidList.add(media);
int video = esDao.getCommonidByTime("video", nowtime); int video = esDao.getCommonidByTime("video", nowtime);
cidList.add(video); if (video != 0)
cidList.add(video);
int zhihu = esDao.getCommonidByTime("zhihu", nowtime); int zhihu = esDao.getCommonidByTime("zhihu", nowtime);
cidList.add(zhihu); if (zhihu != 0)
cidList.add(zhihu);
Collections.sort(cidList); Collections.sort(cidList);
START_BACKUPID = cidList.get(0); if (!cidList.isEmpty())
START_BACKUPID = cidList.get(0);
log.info("START_BACKUPID:" + START_BACKUPID + "\tNOW:" + nowtime); log.info("START_BACKUPID:" + START_BACKUPID + "\tNOW:" + nowtime);
} }
......
...@@ -31,7 +31,7 @@ import com.zhiwei.messageflow.task.AsyncTask; ...@@ -31,7 +31,7 @@ import com.zhiwei.messageflow.task.AsyncTask;
* @author shentao * @author shentao
* @date 2019年4月18日 下午2:59:11 * @date 2019年4月18日 下午2:59:11
*/ */
@Component //@Component
public class WapRedisTask { public class WapRedisTask {
private static final Logger log = LogManager.getLogger(WapRedisTask.class); private static final Logger log = LogManager.getLogger(WapRedisTask.class);
......
...@@ -25,7 +25,11 @@ public class RedisConfig { ...@@ -25,7 +25,11 @@ public class RedisConfig {
private String ip; private String ip;
private int port; private int port;
private String password; private String password;
/** 定向消息流缓存集 **/
public static final String DIRECTKEY = "Direct:"; public static final String DIRECTKEY = "Direct:";
/** 定向消息流缓存集rsid记录集 **/
public static final String DIRECTRSIDKEY = "Direct:Rsid:";
/** 事件缓存列表 **/
public static final String EVENTKEY = "Event:"; public static final String EVENTKEY = "Event:";
/** 事件采集待处理列表 **/ /** 事件采集待处理列表 **/
public static final String EVENTLISTKEY = "Event:EventList"; public static final String EVENTLISTKEY = "Event:EventList";
...@@ -33,8 +37,14 @@ public class RedisConfig { ...@@ -33,8 +37,14 @@ public class RedisConfig {
public static final String EVENTHASHKEY = "Event:Hash:"; public static final String EVENTHASHKEY = "Event:Hash:";
/** 追踪规则已追踪集 **/ /** 追踪规则已追踪集 **/
public static final String TRACKKEY = "TrackRule:"; public static final String TRACKKEY = "TrackRule:";
/** 追踪规则已追踪集 **/ /** Wap缓存集 **/
public static final String WAPKEY = "Wap:"; public static final String WAPKEY = "Wap:";
/** 消息流缓存集rsid记录集 **/
public static final String INFORSIDKEY = "Info:Rsid:";
/** 消息流缓存集 **/
public static final String INFOKEY = "Info:";
/** 消息流缓存读取周期 **/
public static final Long READ_TIME = 90*1000L;
private int keyMaxSize; private int keyMaxSize;
......
...@@ -15,86 +15,6 @@ import com.zhiwei.messageflow.mongo.bean.PlatformNew; ...@@ -15,86 +15,6 @@ import com.zhiwei.messageflow.mongo.bean.PlatformNew;
public interface ES4BeanService { public interface ES4BeanService {
/** /**
* 获取微博消息
*
* @param keys
* 关键词
* @param count
* 消息数量
* @param startid
* 开始id
* @param endid
* 结束id
* @param platform
* 平台
* @param project
* 项目
* @return
*/
List<WeiboMessage> getWeiboMessage(List<NoiseRule> noiseRules, List<String> keywords, int count, long startid,
long endid, String platform, String project);
/**
* 获取知乎消息
*
* @param keys
* 关键词
* @param count
* 消息数量
* @param startid
* 开始id
* @param endid
* 结束id
* @param platform
* 平台
* @param project
* 项目
* @return
*/
List<ZhihuMessage> getZhihuMessage(List<NoiseRule> noiseRules, List<String> keywords, int count, long startid,
long endid, String platform, String project);
/**
* 获取视频消息
*
* @param keys
* 关键词
* @param count
* 消息数量
* @param startid
* 开始id
* @param endid
* 结束id
* @param platform
* 平台
* @param project
* 项目
* @return
*/
List<VideoMessage> getVideoMessage(List<NoiseRule> noiseRules, List<String> keywords, int count, long startid,
long endid, String platform, String project);
/**
* 获取网媒消息
*
* @param keys
* 关键词
* @param count
* 消息数量
* @param startid
* 开始id
* @param endid
* 结束id
* @param platform
* 平台
* @param project
* 项目
* @return
*/
List<MediaMessage> getMediaMessage(List<NoiseRule> noiseRules, List<String> keywords, int count, long startid,
long endid, String platform, String project);
/**
* 获取各平台消息 * 获取各平台消息
* *
* @param keys * @param keys
...@@ -140,4 +60,10 @@ public interface ES4BeanService { ...@@ -140,4 +60,10 @@ public interface ES4BeanService {
*/ */
List<JSONObject> getDirectMessage(List<String> keywords, int count, long startid, long endid, String pt, String projectName, List<JSONObject> getDirectMessage(List<String> keywords, int count, long startid, long endid, String pt, String projectName,
DirectGroup directGroup); DirectGroup directGroup);
List<JSONObject> getMessageNew(List<NoiseRule> noiseRules, List<String> keywords, int count, long l, long m,
PlatformNew platform, String project, String matchFields);
List<JSONObject> getDirectMessageNew(List<String> keywords, int count, long l, long m, String pt,
String projectName, DirectGroup directGroup);
} }
...@@ -10,21 +10,19 @@ public interface EarlyWarningService { ...@@ -10,21 +10,19 @@ public interface EarlyWarningService {
/** /**
* 预警 * 预警
* @param messages
* *
* @param trackRules * @param messages
* 预警规则 *
* @param count * @param trackRules 预警规则
* 预警消息数 * @param count 预警消息数
* @param startid * @param startid 开始rsid
* 开始rsid * @param endid 结束rsid
* @param endid * @param platformName 平台名
* 结束rsid * @param projectName 项目名
* @param platformName
* 平台名
* @param projectName
* 项目名
*/ */
void earlyWarningNew(List<JSONObject> messages, List<TrackRule> trackRules, int count, long startid, Long endid, PlatformNew platform, void earlyWarningNew(List<JSONObject> messages, List<TrackRule> trackRules, int count, long startid, Long endid,
PlatformNew platform, String projectName);
void earlyWarning(List<JSONObject> messages, List<TrackRule> trackRules, int count, PlatformNew platform,
String projectName); String projectName);
} }
package com.zhiwei.messageflow.es.service; package com.zhiwei.messageflow.es.service;
import java.util.List;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
...@@ -49,4 +51,15 @@ public interface HighLightFillingService { ...@@ -49,4 +51,15 @@ public interface HighLightFillingService {
* @return * @return
*/ */
JSONObject getBean(SearchHit searchHit); JSONObject getBean(SearchHit searchHit);
/**
* ES数据封装+高亮处理+渠道影响力
* @param keywords
* @Title: getBeanNewHighLightCount
* @Description: ES数据封装+高亮处理+渠道影响力
* @param @param searchHit
* @param @return 设定文件
* @return JSONObject 返回类型
*/
JSONObject getBeanNewHighLightCount(SearchHit searchHit, List<String> keywords);
} }
...@@ -13,36 +13,61 @@ import com.zhiwei.messageflow.mongo.bean.NoiseRule; ...@@ -13,36 +13,61 @@ import com.zhiwei.messageflow.mongo.bean.NoiseRule;
import com.zhiwei.messageflow.mongo.bean.PlatformNew; import com.zhiwei.messageflow.mongo.bean.PlatformNew;
public interface NoiseProcessingService { public interface NoiseProcessingService {
List<WeiboMessage> weiboDenoising(List<NoiseRule> noiseRules,SearchHits searchHits, String platform, String project);
List<ZhihuMessage> zhihuDenoising(List<NoiseRule> noiseRules,SearchHits searchHits, String platform, String project);
List<VideoMessage> videoDenoising(List<NoiseRule> noiseRules,SearchHits searchHits, String platform, String project);
List<MediaMessage> mediaDenoising(List<NoiseRule> noiseRules,SearchHits searchHits, String platform, String project);
/** /**
* 读取消息流去噪 * 读取消息流去噪
* @Title: allDenoising *
* @Description: TODO(读取消息流去噪) * @Title: allDenoising
* @Description: TODO(读取消息流去噪)
* @param @param noiseRules * @param @param noiseRules
* @param @param searchHits * @param @param searchHits
* @param @param platform * @param @param platform
* @param @param project * @param @param project
* @param @return 设定文件 * @param @return 设定文件
* @return List<JSONObject> 返回类型 * @return List<JSONObject> 返回类型
*/ */
List<JSONObject> allDenoising(List<NoiseRule> noiseRules, SearchHits searchHits, PlatformNew platform, List<JSONObject> allDenoising(List<NoiseRule> noiseRules, SearchHits searchHits, PlatformNew platform,
String project); String project);
/** /**
* 读取定向监测消息流去噪 * 读取定向监测消息流去噪
* @Title: directDenoising *
* @Description: TODO(这里用一句话描述这个方法的作用) * @Title: directDenoising
* @Description: TODO(这里用一句话描述这个方法的作用)
* @param @param searchHits * @param @param searchHits
* @param @param pt * @param @param pt
* @param @param projectName * @param @param projectName
* @param @return 设定文件 * @param @return 设定文件
* @return List<JSONObject> 返回类型 * @return List<JSONObject> 返回类型
*/ */
List<JSONObject> directDenoising(SearchHits searchHits, String pt, String projectName); List<JSONObject> directDenoising(SearchHits searchHits, String pt, String projectName);
/**
* 读取消息流去噪 高亮读取
*
* @Title: allDenoising
* @Description: 读取消息流去噪 高亮读取
* @param @param noiseRules
* @param @param searchHits
* @param @param platform
* @param @param project
* @param @return 设定文件
* @return List<JSONObject> 返回类型
*/
List<JSONObject> allDenoising(List<NoiseRule> noiseRules, SearchHits searchHits, PlatformNew platform,
String project, List<String> keywords);
/**
* 读取定向监测消息流去噪 高亮读取
*
* @Title: directDenoising
* @Description: 读取定向监测消息流去噪 高亮读取
* @param @param searchHits
* @param @param pt
* @param @param projectName
* @param @param keywords
* @param @return 设定文件
* @return List<JSONObject> 返回类型
*/
List<JSONObject> directDenoising(SearchHits searchHits, String pt, String projectName, List<String> keywords);
} }
...@@ -49,7 +49,7 @@ public class AutoMarkServiceImpl implements AutoMarkService { ...@@ -49,7 +49,7 @@ public class AutoMarkServiceImpl implements AutoMarkService {
// System.err.println(dbObject.get("_id").toString()+"title:"+dbObject.get("title").toString()+"company"+dbObject.get("markGroup").toString()); // System.err.println(dbObject.get("_id").toString()+"title:"+dbObject.get("title").toString()+"company"+dbObject.get("markGroup").toString());
list.add(dbObject); list.add(dbObject);
} }
client.autoMark(list, "media"); // client.autoMark(list, "media");
} }
} }
......
...@@ -32,96 +32,65 @@ public class ES4BeanServiceImpl implements ES4BeanService { ...@@ -32,96 +32,65 @@ public class ES4BeanServiceImpl implements ES4BeanService {
private NoiseProcessingService noiseProcessingService; private NoiseProcessingService noiseProcessingService;
@Override @Override
public List<WeiboMessage> getWeiboMessage(List<NoiseRule> noiseRules, List<String> keywords, int count, public List<JSONObject> getMessage(List<NoiseRule> noiseRules, List<String> keywords, int count, long startid,
long startid, long endid, String platform, String project) { long endid, PlatformNew platform, String project, String matchFields) {
List<JSONObject> messages = null;
List<WeiboMessage> messages = null;
try { try {
// 查询数据库 获得searchHits // 查询数据库 获得searchHits
SearchHits searchHits = esDao.getWeiboDataFromEs(keywords, count, startid, endid, platform, project); SearchHits searchHits = esDao.getDataFromEs(keywords, count, startid, endid, platform, project,
matchFields);
if (searchHits == null) { if (searchHits == null) {
return null; return null;
} }
// 去噪并封装 // 去噪并封装
messages = noiseProcessingService.weiboDenoising(noiseRules, searchHits, platform, project); messages = noiseProcessingService.allDenoising(noiseRules, searchHits, platform, project, keywords);
} catch (Exception e) { } catch (Exception e) {
log.error(e.getStackTrace() + " " + e.getMessage()); log.error("error:", e);
} }
return messages; return messages;
} }
@Override @Override
public List<ZhihuMessage> getZhihuMessage(List<NoiseRule> noiseRules, List<String> keywords, int count, public List<JSONObject> getDirectMessage(List<String> keywords, int count, long startid, long endid, String pt,
long startid, long endid, String platform, String project) { String projectName, DirectGroup directGroup) {
List<JSONObject> messages = null;
List<ZhihuMessage> messages = null;
// 查询数据库 获得searchHits
SearchHits searchHits = esDao.getZhihuDataFromEs(keywords, count, startid, endid, platform, project);
if (searchHits == null) {
return null;
}
// 去噪并封装
messages = noiseProcessingService.zhihuDenoising(noiseRules, searchHits, platform, project);
return messages;
}
@Override
public List<VideoMessage> getVideoMessage(List<NoiseRule> noiseRules, List<String> keywords, int count,
long startid, long endid, String platform, String project) {
List<VideoMessage> messages = null;
// 查询数据库 获得searchHits
SearchHits searchHits = esDao.getVideoDataFromEs(keywords, count, startid, endid, platform, project);
if (searchHits == null) {
return null;
}
// 去噪并封装
messages = noiseProcessingService.videoDenoising(noiseRules, searchHits, platform, project);
return messages; try {
}
@Override // 查询数据库 获得searchHits
public List<MediaMessage> getMediaMessage(List<NoiseRule> noiseRules, List<String> keywords, int count, SearchHits searchHits = esDao.getDirectDataFromEs(keywords, count, startid, endid, pt, projectName,
long startid, long endid, String platform, String project) { directGroup);
List<MediaMessage> messages = null; if (searchHits == null) {
return null;
}
// 查询数据库 获得searchHits // 去噪并封装
SearchHits searchHits = esDao.getMediaDataFromEs(keywords, count, startid, endid, platform, project); messages = noiseProcessingService.directDenoising(searchHits, pt, projectName,keywords);
if (searchHits == null) { } catch (Exception e) {
return null; log.error("error:", e);
} }
// 去噪并封装
messages = noiseProcessingService.mediaDenoising(noiseRules, searchHits, platform, project);
return messages; return messages;
} }
@Override @Override
public List<JSONObject> getMessage(List<NoiseRule> noiseRules, List<String> keywords, int count, long startid, public List<JSONObject> getMessageNew(List<NoiseRule> noiseRules, List<String> keywords, int count, long startid,
long endid, PlatformNew platform, String project,String matchFields) { long endid, PlatformNew platform, String project, String matchFields) {
List<JSONObject> messages = null; List<JSONObject> messages = null;
try { try {
// 查询数据库 获得searchHits // 查询数据库 获得searchHits
SearchHits searchHits = esDao.getDataFromEs(keywords, count, startid, endid, platform, project,matchFields); SearchHits searchHits = esDao.getDataFromEs(keywords, count, startid, endid, platform, project,
matchFields);
if (searchHits == null) { if (searchHits == null) {
return null; return null;
...@@ -131,31 +100,32 @@ public class ES4BeanServiceImpl implements ES4BeanService { ...@@ -131,31 +100,32 @@ public class ES4BeanServiceImpl implements ES4BeanService {
messages = noiseProcessingService.allDenoising(noiseRules, searchHits, platform, project); messages = noiseProcessingService.allDenoising(noiseRules, searchHits, platform, project);
} catch (Exception e) { } catch (Exception e) {
log.error("error:",e); log.error("error:", e);
} }
return messages; return messages;
} }
@Override @Override
public List<JSONObject> getDirectMessage(List<String> keywords, int count, long startid, long endid, String pt, public List<JSONObject> getDirectMessageNew(List<String> keywords, int count, long startid, long endid, String pt,
String projectName, DirectGroup directGroup) { String projectName, DirectGroup directGroup) {
List<JSONObject> messages = null; List<JSONObject> messages = null;
try { try {
// 查询数据库 获得searchHits // 查询数据库 获得searchHits
SearchHits searchHits = esDao.getDirectDataFromEs(keywords, count, startid, endid, pt, projectName,directGroup); SearchHits searchHits = esDao.getDirectDataFromEs(keywords, count, startid, endid, pt, projectName,
directGroup);
if (searchHits == null) { if (searchHits == null) {
return null; return null;
} }
// 去噪并封装 // 去噪并封装
messages = noiseProcessingService.directDenoising( searchHits, pt, projectName); messages = noiseProcessingService.directDenoising(searchHits, pt, projectName);
} catch (Exception e) { } catch (Exception e) {
log.error("error:",e); log.error("error:", e);
} }
return messages; return messages;
......
...@@ -38,4 +38,25 @@ public class EarlyWarningServiceImpl implements EarlyWarningService { ...@@ -38,4 +38,25 @@ public class EarlyWarningServiceImpl implements EarlyWarningService {
} }
@Override
public void earlyWarning(List<JSONObject> messages, List<TrackRule> trackRules, int count, PlatformNew platform,
String projectName) {
String platformName = platform.getPlatformName();
for (int i = 0; i < messages.size(); i++) {
JSONObject msg = messages.get(i);
for (TrackRule tr : trackRules) {
// 判断是否预警
if (!trackHitAndWarnService.isWarnTrackrule(tr))
continue;
// 是否命中预警规则
boolean ishit = trackHitAndWarnService.ishitWarnMsg(msg, platformName, tr);
// 按不同预警规则预警
if (ishit)
trackHitAndWarnService.warnMsg(msg, tr);
}
}
}
} }
...@@ -236,5 +236,16 @@ public interface RedisService { ...@@ -236,5 +236,16 @@ public interface RedisService {
* @return void 返回类型 * @return void 返回类型
*/ */
void setMarkInfosUpdate(List<JSONObject> searchInfos, List<Map<String, Object>> columnIdMap); void setMarkInfosUpdate(List<JSONObject> searchInfos, List<Map<String, Object>> columnIdMap);
/**
* 获取新定向监测key
* @Title: getDirectRedisKeyNew
* @Description: 获取新定向监测key
* @param @param projectName
* @param @param name
* @param @param pt
* @param @return 设定文件
* @return String 返回类型
*/
String getDirectRedisKeyNew(String projectName, String name, String pt);
} }
...@@ -187,6 +187,11 @@ public class RedisServiceImpl implements RedisService { ...@@ -187,6 +187,11 @@ public class RedisServiceImpl implements RedisService {
public String getDirectRedisKey(String projectName, String groupName, String pt) { public String getDirectRedisKey(String projectName, String groupName, String pt) {
return RedisConfig.DIRECTKEY + projectName + "-" + groupName + "-" + pt; return RedisConfig.DIRECTKEY + projectName + "-" + groupName + "-" + pt;
} }
@Override
public String getDirectRedisKeyNew(String projectName, String groupName, String pt) {
return RedisConfig.DIRECTKEY + projectName + ":" + groupName + ":" + pt;
}
@Override @Override
public String getDirectRsidMapKey(String projectName) { public String getDirectRsidMapKey(String projectName) {
...@@ -301,7 +306,7 @@ public class RedisServiceImpl implements RedisService { ...@@ -301,7 +306,7 @@ public class RedisServiceImpl implements RedisService {
@Override @Override
public void setMarkInfosUpdate(List<JSONObject> searchInfos, List<Map<String, Object>> columnIdMap) { public void setMarkInfosUpdate(List<JSONObject> searchInfos, List<Map<String, Object>> columnIdMap) {
searchInfos.parallelStream().forEach(searchinfo -> { searchInfos.parallelStream().forEach(searchinfo -> {
long s = System.currentTimeMillis(); // long s = System.currentTimeMillis();
String time = searchinfo.get("time") + ""; String time = searchinfo.get("time") + "";
long timeL = 0L; long timeL = 0L;
try { try {
...@@ -331,7 +336,9 @@ public class RedisServiceImpl implements RedisService { ...@@ -331,7 +336,9 @@ public class RedisServiceImpl implements RedisService {
}); });
// log.info("标注日期:{},mid:{},标注人:{}", searchinfo.getString("markDate"), searchinfo.getString("mid"), // log.info("标注日期:{},mid:{},标注人:{}", searchinfo.getString("markDate"), searchinfo.getString("mid"),
// searchinfo.getString("markGroup")); // searchinfo.getString("markGroup"));
log.info("searchinfo处理,总耗时{}ms", (System.currentTimeMillis() - s)); // log.info("searchinfo处理,总耗时{}ms", (System.currentTimeMillis() - s));
}); });
} }
} }
...@@ -2,6 +2,7 @@ package com.zhiwei.messageflow.service; ...@@ -2,6 +2,7 @@ package com.zhiwei.messageflow.service;
import java.util.List; import java.util.List;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.messageflow.mongo.bean.DirectGroup; import com.zhiwei.messageflow.mongo.bean.DirectGroup;
import com.zhiwei.messageflow.mongo.bean.NoiseRule; import com.zhiwei.messageflow.mongo.bean.NoiseRule;
import com.zhiwei.messageflow.mongo.bean.PlatformNew; import com.zhiwei.messageflow.mongo.bean.PlatformNew;
...@@ -10,94 +11,6 @@ import com.zhiwei.messageflow.redis.bean.RsidAndMessages; ...@@ -10,94 +11,6 @@ import com.zhiwei.messageflow.redis.bean.RsidAndMessages;
public interface DisposeMessageService { public interface DisposeMessageService {
/** /**
* 获取微博平台RsidAndMessages
*
* @param noiseRules
* 去噪规则
* @param keywords
* 关键词组
* @param count
* 每次查询数量
* @param startid
* 开始rsid
* @param endid
* 结束rsid
* @param platform
* 平台
* @param project
* 项目
* @return
*/
RsidAndMessages getFilteredWeiboMessage(List<NoiseRule> noiseRules, List<String> keywords, int count, Long startid,
Long endid, String platform, String project);
/**
* 获取知乎平台RsidAndMessages
*
* @param noiseRules
* 去噪规则
* @param keywords
* 关键词组
* @param count
* 每次查询数量
* @param startid
* 开始rsid
* @param endid
* 结束rsid
* @param platform
* 平台
* @param project
* 项目
* @return
*/
RsidAndMessages getFilteredZhihuMessage(List<NoiseRule> noiseRules, List<String> keywords, int count, Long startid,
Long endid, String platform, String project);
/**
* 获取视频平台RsidAndMessages
*
* @param noiseRules
* 去噪规则
* @param keywords
* 关键词组
* @param count
* 每次查询数量
* @param startid
* 开始rsid
* @param endid
* 结束rsid
* @param platform
* 平台
* @param project
* 项目
* @return
*/
RsidAndMessages getFilteredVideoMessage(List<NoiseRule> noiseRules, List<String> keywords, int count, Long startid,
Long endid, String platform, String project);
/**
* 获取网媒平台RsidAndMessages
*
* @param noiseRules
* 去噪规则
* @param keywords
* 关键词组
* @param count
* 每次查询数量
* @param startid
* 开始rsid
* @param endid
* 结束rsid
* @param platform
* 平台
* @param project
* 项目
* @return
*/
RsidAndMessages getFilteredMediaMessage(List<NoiseRule> noiseRules, List<String> keywords, int count, Long startid,
Long endid, String platform, String project);
/**
* 获取各平台RsidAndMessages * 获取各平台RsidAndMessages
* *
* @param noiseRules * @param noiseRules
...@@ -135,4 +48,20 @@ public interface DisposeMessageService { ...@@ -135,4 +48,20 @@ public interface DisposeMessageService {
*/ */
RsidAndMessages getDirectMessage(List<String> keywords, int count, long startid, long endid, String pt, RsidAndMessages getDirectMessage(List<String> keywords, int count, long startid, long endid, String pt,
String projectName, DirectGroup directGroup); String projectName, DirectGroup directGroup);
/**
* 各词组消息累加为全部关键词
* @Title: accumulateMessage
* @Description: 各词组消息累加为全部关键词
* @param @param messages
* @param @param messageskey 设定文件
* @return void 返回类型
*/
List<JSONObject> accumulateMessage(List<JSONObject> messages, List<JSONObject> messageskey);
RsidAndMessages getFilteredMessageNew(List<NoiseRule> noiseRules, List<String> allkeywords, int count,
long allstartrsid, long l, PlatformNew platform, String projectName, String matchFields);
RsidAndMessages getDirectMessageNew(List<String> keywords, int count, long startid, long endid, String pt,
String projectName, DirectGroup directGroup);
} }
package com.zhiwei.messageflow.service.impl; package com.zhiwei.messageflow.service.impl;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.commons.collections.ListUtils;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.zhiwei.messageflow.bean.MediaMessage; import com.zhiwei.messageflow.bean.MediaMessage;
import com.zhiwei.messageflow.bean.VideoMessage; import com.zhiwei.messageflow.bean.VideoMessage;
...@@ -20,113 +23,124 @@ import com.zhiwei.messageflow.mongo.bean.NoiseRule; ...@@ -20,113 +23,124 @@ import com.zhiwei.messageflow.mongo.bean.NoiseRule;
import com.zhiwei.messageflow.mongo.bean.PlatformNew; import com.zhiwei.messageflow.mongo.bean.PlatformNew;
import com.zhiwei.messageflow.redis.bean.RsidAndMessages; import com.zhiwei.messageflow.redis.bean.RsidAndMessages;
import com.zhiwei.messageflow.service.DisposeMessageService; import com.zhiwei.messageflow.service.DisposeMessageService;
import com.zhiwei.messageflow.util.Tools;
@Component @Component
public class DisposeMessageServiceImpl implements DisposeMessageService { public class DisposeMessageServiceImpl implements DisposeMessageService {
public final static Logger log = LogManager.getLogger(DisposeMessageServiceImpl.class); public final static Logger log = LogManager.getLogger(DisposeMessageServiceImpl.class);
@Autowired @Autowired
private ES4BeanService es4BeanDao; private ES4BeanService es4BeanDao;
@Override @Override
public RsidAndMessages getFilteredWeiboMessage(List<NoiseRule> noiseRules, List<String> keywords, int count, public RsidAndMessages getFilteredMessage(List<NoiseRule> noiseRules, List<String> keywords, int count,
Long startid, Long endid, String platform, String project) { Long startid, Long endid, PlatformNew platform, String project, String matchFields) {
RsidAndMessages ram = new RsidAndMessages(); RsidAndMessages ram = new RsidAndMessages();
// 消息列表 // 消息列表
List<WeiboMessage> messages = new ArrayList<WeiboMessage>(); List<JSONObject> messages = new ArrayList<JSONObject>();
messages = es4BeanDao.getWeiboMessage(noiseRules, keywords, count, startid + 1L, -1L, platform, project); messages = es4BeanDao.getMessage(noiseRules, keywords, count, startid + 1L, -1L, platform, project,
matchFields);
/** /**
* 获取最后消息的rsid * 获取最后消息的rsid
*/ */
if (messages != null && !messages.isEmpty()) { if (messages != null && !messages.isEmpty()) {
WeiboMessage wmf = messages.get(0); JSONObject mmf = messages.get(0);
WeiboMessage wml = messages.get(messages.size() - 1); JSONObject mml = messages.get(messages.size() - 1);
ram.setWlist(messages); ram.setJlist(messages);
ram.setRsid(wmf.getRstime() > wml.getRstime() ? wmf.getRstime() : wml.getRstime()); long fcomid = mmf.getLongValue("commonid");
} long lcomid = mml.getLongValue("commonid");
ram.setRsid(fcomid > lcomid ? fcomid : lcomid);
return ram;
}
@Override
public RsidAndMessages getFilteredZhihuMessage(List<NoiseRule> noiseRules, List<String> keywords, int count,
Long startid, Long endid, String platform, String project) {
RsidAndMessages ram = new RsidAndMessages();
// 消息列表
List<ZhihuMessage> messages = new ArrayList<ZhihuMessage>();
messages = es4BeanDao.getZhihuMessage(noiseRules, keywords, count, startid + 1L, -1L, platform, project);
/**
* 获取最后消息的rsid
*/
if (messages != null && !messages.isEmpty()) {
ZhihuMessage zmf = messages.get(0);
ZhihuMessage zml = messages.get(messages.size() - 1);
ram.setZlist(messages);
ram.setRsid(zmf.getRsid() > zml.getRsid() ? zmf.getRsid() : zml.getRsid());
} }
return ram; return ram;
} }
@Override @Override
public RsidAndMessages getFilteredVideoMessage(List<NoiseRule> noiseRules, List<String> keywords, int count, public RsidAndMessages getDirectMessage(List<String> keywords, int count, long startid, long endid, String pt,
Long startid, Long endid, String platform, String project) { String projectName, DirectGroup directGroup) {
RsidAndMessages ram = new RsidAndMessages(); RsidAndMessages ram = new RsidAndMessages();
// 消息列表 // 消息列表
List<VideoMessage> messages = new ArrayList<VideoMessage>(); List<JSONObject> messages = new ArrayList<JSONObject>();
messages = es4BeanDao.getVideoMessage(noiseRules, keywords, count, startid + 1L, -1L, platform, project); messages = es4BeanDao.getDirectMessage(keywords, count, startid + 1L, -1L, pt, projectName, directGroup);
/** /**
* 获取最后消息的rsid * 获取最后消息的rsid
*/ */
if (messages != null && !messages.isEmpty()) { if (messages != null && !messages.isEmpty()) {
VideoMessage vmf = messages.get(0); JSONObject mmf = messages.get(0);
VideoMessage vml = messages.get(messages.size() - 1); JSONObject mml = messages.get(messages.size() - 1);
ram.setVlist(messages); ram.setJlist(messages);
ram.setRsid(vmf.getRsid() > vml.getRsid() ? vmf.getRsid() : vml.getRsid()); long fcomid = mmf.getLongValue("commonid");
long lcomid = mml.getLongValue("commonid");
ram.setRsid(fcomid > lcomid ? fcomid : lcomid);
} }
return ram; return ram;
} }
@SuppressWarnings("unchecked")
@Override @Override
public RsidAndMessages getFilteredMediaMessage(List<NoiseRule> noiseRules, List<String> keywords, int count, public List<JSONObject> accumulateMessage(List<JSONObject> messages, List<JSONObject> messageskey) {
Long startid, Long endid, String platform, String project) { if (Tools.isEmpty(messages) && !Tools.isEmpty(messageskey)) {
RsidAndMessages ram = new RsidAndMessages(); messages.addAll(messageskey);
return messages;
// 消息列表 } else if (!Tools.isEmpty(messageskey)) {
List<MediaMessage> messages = new ArrayList<MediaMessage>(); Map<String, JSONObject> allMsgs = new HashMap<>();
messages = es4BeanDao.getMediaMessage(noiseRules, keywords, count, startid + 1L, -1L, platform, project); messages.stream().forEach(msg -> {
allMsgs.put(msg.getLongValue("commonid") + "", msg);
/** });
* 获取最后消息的rsid Map<String, JSONObject> kwMsgs = new HashMap<>();
*/ messageskey.stream().forEach(msg -> {
if (messages != null && !messages.isEmpty()) { kwMsgs.put(msg.getLongValue("commonid") + "", msg);
MediaMessage mmf = messages.get(0); });
MediaMessage mml = messages.get(messages.size() - 1); List<String> retainCids = ListUtils.retainAll(allMsgs.keySet(), kwMsgs.keySet());
ram.setMlist(messages); // log.info("1 allMsgs:{},kwMsgs{},retainCids{}", allMsgs.keySet().size(), kwMsgs.keySet().size(),
ram.setRsid(mmf.getRsid() > mml.getRsid() ? mmf.getRsid() : mml.getRsid()); // retainCids.size());
List<JSONObject> messagesRetain = new ArrayList<>();
retainCids.stream().forEach(cid -> {
JSONObject allMsg = allMsgs.get(cid);
JSONObject kwMsg = kwMsgs.get(cid);
JSONObject retainMsg = JSONObject.parseObject(allMsg.toJSONString());
// 合并高亮词
Map<String, Integer> hLMapOne = kwMsg.getObject("hLMap", Map.class);
Map<String, Integer> hLMapTwo = allMsg.getObject("hLMap", Map.class);
hLMapOne.putAll(hLMapTwo);
Map<String, Integer> hLMap = hLMapOne.entrySet().stream()
.sorted((Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) -> o2.getValue()
- o1.getValue())
.skip(0).limit(5).collect(Collectors.toMap(p -> p.getKey(), p -> p.getValue()));
retainMsg.put("hLMap", hLMap);
messagesRetain.add(retainMsg);
allMsgs.remove(cid);
kwMsgs.remove(cid);
});
// log.info("2 allMsgs:{},kwMsgs{},retainMsgs{}", allMsgs.keySet().size(), kwMsgs.keySet().size(),
// messagesRetain.size());
messagesRetain.addAll(allMsgs.entrySet().stream().map(a -> a.getValue()).collect(Collectors.toList()));
messagesRetain.addAll(kwMsgs.entrySet().stream().map(a -> a.getValue()).collect(Collectors.toList()));
// log.info("3 allMsgs:{},kwMsgs{},retainMsgs{}", allMsgs.keySet().size(), kwMsgs.keySet().size(),
// messagesRetain.size());
return messagesRetain;
} }
return messages;
return ram;
} }
@Override @Override
public RsidAndMessages getFilteredMessage(List<NoiseRule> noiseRules, List<String> keywords, int count, public RsidAndMessages getFilteredMessageNew(List<NoiseRule> noiseRules, List<String> keywords, int count,
Long startid, Long endid, PlatformNew platform, String project, String matchFields) { long startid, long l, PlatformNew platform, String project, String matchFields) {
RsidAndMessages ram = new RsidAndMessages(); RsidAndMessages ram = new RsidAndMessages();
// 消息列表 // 消息列表
List<JSONObject> messages = new ArrayList<JSONObject>(); List<JSONObject> messages = new ArrayList<JSONObject>();
messages = es4BeanDao.getMessage(noiseRules, keywords, count, startid + 1L, -1L, platform, project,matchFields); messages = es4BeanDao.getMessageNew(noiseRules, keywords, count, startid + 1L, -1L, platform, project,
matchFields);
/** /**
* 获取最后消息的rsid * 获取最后消息的rsid
...@@ -145,13 +159,13 @@ public class DisposeMessageServiceImpl implements DisposeMessageService { ...@@ -145,13 +159,13 @@ public class DisposeMessageServiceImpl implements DisposeMessageService {
} }
@Override @Override
public RsidAndMessages getDirectMessage(List<String> keywords, int count, long startid, long endid, String pt, public RsidAndMessages getDirectMessageNew(List<String> keywords, int count, long startid, long endid, String pt,
String projectName, DirectGroup directGroup) { String projectName, DirectGroup directGroup) {
RsidAndMessages ram = new RsidAndMessages(); RsidAndMessages ram = new RsidAndMessages();
// 消息列表 // 消息列表
List<JSONObject> messages = new ArrayList<JSONObject>(); List<JSONObject> messages = new ArrayList<JSONObject>();
messages = es4BeanDao.getDirectMessage(keywords, count, startid + 1L, -1L, pt, projectName,directGroup); messages = es4BeanDao.getDirectMessageNew(keywords, count, startid + 1L, -1L, pt, projectName, directGroup);
/** /**
* 获取最后消息的rsid * 获取最后消息的rsid
...@@ -169,5 +183,4 @@ public class DisposeMessageServiceImpl implements DisposeMessageService { ...@@ -169,5 +183,4 @@ public class DisposeMessageServiceImpl implements DisposeMessageService {
return ram; return ram;
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment