Commit 042fd7e6 by 303514581@qq.com

2019/2/26 backup定向平媒添加+事件规则过滤修改

parent f31477c4
...@@ -31,6 +31,9 @@ public class DES4RedisStart{ ...@@ -31,6 +31,9 @@ public class DES4RedisStart{
// 遍历项目 // 遍历项目
for (Project project : projects) { for (Project project : projects) {
// if(!project.getProjectName().equals("测试"))
// continue;
/** /**
* 项目是否开启定向监测,并获取定向渠道组 * 项目是否开启定向监测,并获取定向渠道组
......
...@@ -50,14 +50,10 @@ public class DirectES4RedisTask { ...@@ -50,14 +50,10 @@ public class DirectES4RedisTask {
* *
* @Title: directTask * @Title: directTask
* @Description: TODO(定向监测任务) * @Description: TODO(定向监测任务)
* @param @param * @param @param project
* project * @param @param dgList
* @param @param * @param @param count
* dgList * @param @return 设定文件
* @param @param
* count
* @param @return
* 设定文件
* @return boolean 返回类型 * @return boolean 返回类型
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
...@@ -87,10 +83,10 @@ public class DirectES4RedisTask { ...@@ -87,10 +83,10 @@ public class DirectES4RedisTask {
List<KeywordNew> keywordNews = keywordNewDao.getDirectKeywordNewByProject(project.getProjectName()); List<KeywordNew> keywordNews = keywordNewDao.getDirectKeywordNewByProject(project.getProjectName());
for (DirectGroup directGroup : dgList) { for (DirectGroup directGroup : dgList) {
//判断渠道组是否需要读取 // 判断渠道组是否需要读取
if(!directGroup.getIsUsed()||(null==directGroup.getMemberList())) if (!directGroup.getIsUsed() || (null == directGroup.getMemberList()))
continue; continue;
if (directGroup.getMemberList().isEmpty()) if (directGroup.getMemberList().isEmpty())
continue; continue;
// 获取渠道组所包含渠道平台 // 获取渠道组所包含渠道平台
List<String> ptlist = directGroupDao.getOneDirectGroupPts(directGroup); List<String> ptlist = directGroupDao.getOneDirectGroupPts(directGroup);
...@@ -101,8 +97,10 @@ public class DirectES4RedisTask { ...@@ -101,8 +97,10 @@ public class DirectES4RedisTask {
directGroup.getName(), pt); directGroup.getName(), pt);
// 获取 平台-项目-关键词组 对应的rsid作为查询范围的起始点 // 获取 平台-项目-关键词组 对应的rsid作为查询范围的起始点
Long startrsid = rsidMap.get(directRedisKey) == null ? -1L Long startrsid = (rsidMap.get(directRedisKey) == null ? -1L
: Long.valueOf(rsidMap.get(directRedisKey)); : Long.valueOf(rsidMap.get(directRedisKey))) >= ESGetCommonId.START_BACKUPID
? Long.valueOf(rsidMap.get(directRedisKey))
: ESGetCommonId.START_BACKUPID;
// 用于存储数据获取后新的rsid // 用于存储数据获取后新的rsid
Long keyrsid = -1L; Long keyrsid = -1L;
......
...@@ -25,7 +25,7 @@ public class DirectES4RedisThread extends Thread { ...@@ -25,7 +25,7 @@ public class DirectES4RedisThread extends Thread {
private List<DirectGroup> dgList; private List<DirectGroup> dgList;
// 单个平台单个关键词组每次查询数量 // 单个平台单个关键词组每次查询数量
// private static final int count = 20; // private static final int count = 20;
private static final int count = 50; private static final int count = 300;
private DirectES4RedisTask directES4RedisTask; private DirectES4RedisTask directES4RedisTask;
......
...@@ -36,6 +36,7 @@ public class ES4RedisRunner implements ApplicationRunner { ...@@ -36,6 +36,7 @@ public class ES4RedisRunner implements ApplicationRunner {
ESGetCommonId esGetCommonId = ApplicationContextProvider.getBean("ESGetCommonId", ESGetCommonId.class); ESGetCommonId esGetCommonId = ApplicationContextProvider.getBean("ESGetCommonId", ESGetCommonId.class);
esGetCommonId.getCommonId(); esGetCommonId.getCommonId();
esGetCommonId.getbackupStartId(); esGetCommonId.getbackupStartId();
// esGetCommonId.getStartId();
// 启动时更新事件等待采集列表中的前十个任务状态为采集完毕 // 启动时更新事件等待采集列表中的前十个任务状态为采集完毕
boolean isSuccess = esGetCommonId.updateTopTenCollection(); boolean isSuccess = esGetCommonId.updateTopTenCollection();
...@@ -45,7 +46,7 @@ public class ES4RedisRunner implements ApplicationRunner { ...@@ -45,7 +46,7 @@ public class ES4RedisRunner implements ApplicationRunner {
*/ */
// 手动注入bean ES4RedisStart // 手动注入bean ES4RedisStart
ES4RedisStart start = ApplicationContextProvider.getBean("ES4RedisStart", ES4RedisStart.class); ES4RedisStart start = ApplicationContextProvider.getBean("ES4RedisStart", ES4RedisStart.class);
// DES4RedisStart directstart = ApplicationContextProvider.getBean("DES4RedisStart", DES4RedisStart.class); DES4RedisStart directstart = ApplicationContextProvider.getBean("DES4RedisStart", DES4RedisStart.class);
EVENTAutoMarkStart eventStart = ApplicationContextProvider.getBean("EVENTAutoMarkStart", EVENTAutoMarkStart eventStart = ApplicationContextProvider.getBean("EVENTAutoMarkStart",
EVENTAutoMarkStart.class); EVENTAutoMarkStart.class);
...@@ -57,8 +58,8 @@ public class ES4RedisRunner implements ApplicationRunner { ...@@ -57,8 +58,8 @@ public class ES4RedisRunner implements ApplicationRunner {
public void run() { public void run() {
try { try {
start.startThread(); start.startThread();
// directstart.startThread(); directstart.startThread();
// eventStart.startThread(); eventStart.startThread();
} catch (Exception e) { } catch (Exception e) {
log.error("主定时器异常{}{}", e.getMessage(), e.getStackTrace()); log.error("主定时器异常{}{}", e.getMessage(), e.getStackTrace());
} }
......
...@@ -29,7 +29,7 @@ public class ES4RedisThreadNew extends Thread { ...@@ -29,7 +29,7 @@ public class ES4RedisThreadNew extends Thread {
private ES4RedisTask es4RedisTask; private ES4RedisTask es4RedisTask;
// 单个平台单个关键词组每次查询数量 // 单个平台单个关键词组每次查询数量
private static final int count = 50; private static final int count = 300;
// private static final int count = 10; // private static final int count = 10;
// private static final int max_Thread_num = 40; // private static final int max_Thread_num = 40;
......
...@@ -81,8 +81,22 @@ public class ESGetCommonId { ...@@ -81,8 +81,22 @@ public class ESGetCommonId {
int zhihu = esDao.getCommonidByTime("zhihu", nowtime); int zhihu = esDao.getCommonidByTime("zhihu", nowtime);
cidList.add(zhihu); cidList.add(zhihu);
Collections.sort(cidList); Collections.sort(cidList);
// System.err.println(weibo + " " + media + " " + video + " " + zhihu + "\n" + START_BACKUPID = cidList.get(0);
// cidList); System.err.println("START_BACKUPID:" + START_BACKUPID + "\tNOW:" + nowtime);
}
public void getStartId() {
long nowtime = System.currentTimeMillis() - 12 * 60 * 60 * 1000L;
List<Integer> cidList = new ArrayList<>();
int weibo = esDao.getCommonidByTime("weibo", nowtime);
cidList.add(weibo);
int media = esDao.getCommonidByTime("media", nowtime);
cidList.add(media);
int video = esDao.getCommonidByTime("video", nowtime);
cidList.add(video);
int zhihu = esDao.getCommonidByTime("zhihu", nowtime);
cidList.add(zhihu);
Collections.sort(cidList);
START_BACKUPID = cidList.get(0); START_BACKUPID = cidList.get(0);
System.err.println("START_BACKUPID:" + START_BACKUPID + "\tNOW:" + nowtime); System.err.println("START_BACKUPID:" + START_BACKUPID + "\tNOW:" + nowtime);
} }
......
...@@ -902,7 +902,7 @@ public class ESDaoImpl implements ESDao { ...@@ -902,7 +902,7 @@ public class ESDaoImpl implements ESDao {
sonQueryBuilder = ESQueryUtil.assembleShouldNotFieldsQuery(sonQueryBuilder, specTypeList, null, "user_id"); sonQueryBuilder = ESQueryUtil.assembleShouldNotFieldsQuery(sonQueryBuilder, specTypeList, null, "user_id");
} else { } else {
sonQueryBuilder = ESQueryUtil.assembleShouldNotFieldsQuery(sonQueryBuilder, sourceList, null, "source"); sonQueryBuilder = ESQueryUtil.assembleShouldNotFieldsQuery(sonQueryBuilder, sourceList, null, "source");
if (pt.equals("微信")) { if (pt.equals("微信")||pt.equals("平媒")) {
// tsuQueryBuilder = ESQueryUtil.assembleShouldNotFieldsQuery(tsuQueryBuilder, // tsuQueryBuilder = ESQueryUtil.assembleShouldNotFieldsQuery(tsuQueryBuilder,
// specTypeList, null, "source"); // specTypeList, null, "source");
} else if (pt.equals("今日头条")) { } else if (pt.equals("今日头条")) {
......
...@@ -2,6 +2,7 @@ package com.zhiwei.messageflow.mongo.dao.impl; ...@@ -2,6 +2,7 @@ package com.zhiwei.messageflow.mongo.dao.impl;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.stream.Collectors;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
...@@ -32,14 +33,15 @@ public class DirectGroupDaoImpl implements DirectGroupDao { ...@@ -32,14 +33,15 @@ public class DirectGroupDaoImpl implements DirectGroupDao {
List<String> res = new ArrayList<>(); List<String> res = new ArrayList<>();
try { try {
List<String> idlist = directGroup.getMemberList(); List<String> idlist = directGroup.getMemberList();
for (String id : idlist) { // for (String id : idlist) {
String pt = id.split("_")[0]; // String pt = id.split("_")[0];
if(res.isEmpty()) { // if(res.isEmpty()) {
res.add(pt); // res.add(pt);
}else if(!res.contains(pt)){ // }else if(!res.contains(pt)){
res.add(pt); // res.add(pt);
} // }
} // }
res = idlist.stream().map(id->id.split("_")[0]).distinct().collect(Collectors.toList());
} catch (Exception e) { } catch (Exception e) {
return null; return null;
} }
......
...@@ -4,6 +4,7 @@ import java.io.IOException; ...@@ -4,6 +4,7 @@ import java.io.IOException;
import java.net.MalformedURLException; import java.net.MalformedURLException;
import java.net.URL; import java.net.URL;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date; import java.util.Date;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
...@@ -49,6 +50,11 @@ public class EventServiceImpl implements EventService { ...@@ -49,6 +50,11 @@ public class EventServiceImpl implements EventService {
private static AutomaticMarkClient client = AutomaticMarkClient.getClient(MiddlewareConfig.zookeeperIp); private static AutomaticMarkClient client = AutomaticMarkClient.getClient(MiddlewareConfig.zookeeperIp);
/** 自动标注每次进聚合条数 **/ /** 自动标注每次进聚合条数 **/
private static final int AUTOMARKPAGELIMIT = 500; private static final int AUTOMARKPAGELIMIT = 500;
/** 事件采集source过滤条件 **/
private static final List<String> SOURCENOTRULE = Arrays.asList("主页", "网页", "首页", "平台首页", "早报", "手机版", "null");
/** 事件采集url过滤条件 **/
private static final List<String> URLNOTRULE = Arrays.asList("zhihu.com", "ali.71zs.com", "v.qq.com", "v.youku.com",
"iqiyi.com", "v.pptv.com", "zuanke8.com", "weibo.com", "socialbeta.com", "www.baidu.com/link?url=", "null");
@Autowired @Autowired
@Qualifier(value = "primaryMongoTemplate") @Qualifier(value = "primaryMongoTemplate")
...@@ -75,7 +81,7 @@ public class EventServiceImpl implements EventService { ...@@ -75,7 +81,7 @@ public class EventServiceImpl implements EventService {
} }
} else { } else {
// 放入缓存 // 放入缓存
if (CanMsgInsert(msgJsonOb)) { if (canMsgInsert(msgJsonOb)) {
if (!isUrlIterate(msgJsonOb)) { if (!isUrlIterate(msgJsonOb)) {
// url不重复 // url不重复
msgJsonOb.put("offset", offset); msgJsonOb.put("offset", offset);
...@@ -91,10 +97,8 @@ public class EventServiceImpl implements EventService { ...@@ -91,10 +97,8 @@ public class EventServiceImpl implements EventService {
* *
* @Title: dropUrlIterateList * @Title: dropUrlIterateList
* @Description: 采集结束删除urlHash集 * @Description: 采集结束删除urlHash集
* @param @param * @param @param msgJsonOb
* msgJsonOb * @param @return 设定文件
* @param @return
* 设定文件
* @return boolean 返回类型 * @return boolean 返回类型
*/ */
private boolean dropUrlIterateList(JSONObject msgJsonOb) { private boolean dropUrlIterateList(JSONObject msgJsonOb) {
...@@ -112,12 +116,9 @@ public class EventServiceImpl implements EventService { ...@@ -112,12 +116,9 @@ public class EventServiceImpl implements EventService {
* *
* @Title: isUrlIterate * @Title: isUrlIterate
* @Description: 判断重复url * @Description: 判断重复url
* @param @param * @param @param string
* string * @param @param id
* @param @param * @param @return 设定文件
* id
* @param @return
* 设定文件
* @return boolean 返回类型 * @return boolean 返回类型
*/ */
private boolean isUrlIterate(JSONObject msgJsonOb) { private boolean isUrlIterate(JSONObject msgJsonOb) {
...@@ -133,7 +134,7 @@ public class EventServiceImpl implements EventService { ...@@ -133,7 +134,7 @@ public class EventServiceImpl implements EventService {
return false; return false;
} }
} catch (Exception e) { } catch (Exception e) {
log.error("判断重复urlError:",e); log.error("判断重复urlError:", e);
return false; return false;
} }
} }
...@@ -143,16 +144,14 @@ public class EventServiceImpl implements EventService { ...@@ -143,16 +144,14 @@ public class EventServiceImpl implements EventService {
* *
* @Title: formatUrl * @Title: formatUrl
* @Description: 格式化url * @Description: 格式化url
* @param @param * @param @param string
* string * @param @return 设定文件
* @param @return
* 设定文件
* @return String 返回类型 * @return String 返回类型
* @throws Exception * @throws Exception
*/ */
private String formatUrl(String urlStr) throws Exception { private String formatUrl(String urlStr) throws Exception {
URL url = new URL(urlStr); URL url = new URL(urlStr);
if("http".equals(url.getProtocol())) { if ("http".equals(url.getProtocol())) {
urlStr = urlStr.replaceFirst("http://", "https://"); urlStr = urlStr.replaceFirst("http://", "https://");
} }
return urlStr; return urlStr;
...@@ -163,22 +162,24 @@ public class EventServiceImpl implements EventService { ...@@ -163,22 +162,24 @@ public class EventServiceImpl implements EventService {
* *
* @Title: CanMsgInsert * @Title: CanMsgInsert
* @Description: 判断是否能进缓存 * @Description: 判断是否能进缓存
* @param @param * @param @param msgJsonOb
* msgJsonOb * @param @return 设定文件
* @param @return
* 设定文件
* @return boolean 返回类型 * @return boolean 返回类型
*/ */
private boolean CanMsgInsert(JSONObject msgJsonOb) { private boolean canMsgInsert(JSONObject msgJsonOb) {
if (msgJsonOb.containsKey("time")) { if (msgJsonOb.containsKey("time")) {
if (null == msgJsonOb.getLong("time") || -1 == msgJsonOb.getLong("time")) { if (null == msgJsonOb.getLong("time") || -1 == msgJsonOb.getLong("time")) {
return false; return false;
} }
} }
if (msgJsonOb.containsKey("source")) { if (msgJsonOb.containsKey("source")) {
if (null == msgJsonOb.getString("source") || "百度网页".equals(msgJsonOb.getString("source"))) { if (conformSourceNotRule(msgJsonOb)) {
log.info("MSG_source_Error:{}", msgJsonOb);
return false; return false;
} }
// if (null == msgJsonOb.getString("source") || "百度网页".equals(msgJsonOb.getString("source"))) {
// return false;
// }
} }
if (msgJsonOb.containsKey("title")) { if (msgJsonOb.containsKey("title")) {
if (null == msgJsonOb.getString("title")) { if (null == msgJsonOb.getString("title")) {
...@@ -193,8 +194,8 @@ public class EventServiceImpl implements EventService { ...@@ -193,8 +194,8 @@ public class EventServiceImpl implements EventService {
} else if (!isValidUrl(msgJsonOb.getString("url"))) { } else if (!isValidUrl(msgJsonOb.getString("url"))) {
log.info("MSG_url_Error_notValid:{}", msgJsonOb); log.info("MSG_url_Error_notValid:{}", msgJsonOb);
return false; return false;
} else if (Tools.approximateStringMatching(msgJsonOb.getString("url"),"www.baidu.com/link?url=")) { } else if (conformUrlNotRule(msgJsonOb)) {
log.info("MSG_url_Error_www.baidu.com/link?url=:{}", msgJsonOb); log.info("MSG_url_Error:{}", msgJsonOb);
return false; return false;
} }
} }
...@@ -207,6 +208,21 @@ public class EventServiceImpl implements EventService { ...@@ -207,6 +208,21 @@ public class EventServiceImpl implements EventService {
return true; return true;
} }
private boolean conformUrlNotRule(JSONObject msgJsonOb) {
return URLNOTRULE.stream().anyMatch(rule -> msgJsonOb.getString("url").indexOf(rule) != -1);
}
private boolean conformSourceNotRule(JSONObject msgJsonOb) {
String source = null == msgJsonOb.getString("source") ? "null" : msgJsonOb.getString("source");
if (SOURCENOTRULE.contains(source) || source.startsWith("www.") || source.endsWith(".com")
|| source.endsWith(".cn") || -1 != source.indexOf("<img class=") || -1 != source.indexOf("【快资讯】")
|| -1 != source.indexOf("「热点新闻」")) {
return true;
} else {
return false;
}
}
/** /**
* 校验url合法性 * 校验url合法性
* *
......
...@@ -895,6 +895,9 @@ public class ESQueryUtil { ...@@ -895,6 +895,9 @@ public class ESQueryUtil {
list.add("微信"); list.add("微信");
list.add("微信*"); list.add("微信*");
break; break;
case "平媒":
list.add("平媒");
break;
default: default:
list.add("网媒"); list.add("网媒");
list.add("*新闻"); list.add("*新闻");
......
...@@ -5,7 +5,8 @@ redis.testOnBorrow=true ...@@ -5,7 +5,8 @@ redis.testOnBorrow=true
redis.testOnReturn=true redis.testOnReturn=true
redis.ip = 192.168.0.202 redis.ip = 192.168.0.202
redis.port=6380 redis.port=6380
#redis.ip = 202.107.192.94 #redis.ip=202.107.192.94
#redis.port=6479
#redis.ip=127.0.0.1 #redis.ip=127.0.0.1
#redis.port=6379 #redis.port=6379
#redis.ip=192.168.1.74 #redis.ip=192.168.1.74
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment