Commit 832c8657 by shentao

2018/10/12 新版采集消息流线上版1.0.0

parent 01a0668c
...@@ -200,6 +200,14 @@ ...@@ -200,6 +200,14 @@
<plugin> <plugin>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId> <artifactId>spring-boot-maven-plugin</artifactId>
<version>2.0.4.RELEASE</version>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin> </plugin>
</plugins> </plugins>
</build> </build>
......
...@@ -10,12 +10,15 @@ import java.util.Locale; ...@@ -10,12 +10,15 @@ import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.TimeZone; import java.util.TimeZone;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchHits;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.zhiwei.messageflow.DirectES4RedisThread;
import com.zhiwei.messageflow.bean.MediaMessage; import com.zhiwei.messageflow.bean.MediaMessage;
import com.zhiwei.messageflow.bean.VideoMessage; import com.zhiwei.messageflow.bean.VideoMessage;
import com.zhiwei.messageflow.bean.WeiboMessage; import com.zhiwei.messageflow.bean.WeiboMessage;
...@@ -29,6 +32,7 @@ import com.zhiwei.messageflow.util.ESQueryUtil; ...@@ -29,6 +32,7 @@ import com.zhiwei.messageflow.util.ESQueryUtil;
@Component @Component
public class NoiseProcessingServiceImpl implements NoiseProcessingService { public class NoiseProcessingServiceImpl implements NoiseProcessingService {
private final static Logger log = LogManager.getLogger(NoiseProcessingServiceImpl.class);
@Autowired @Autowired
private HighLightFillingService highLightFillingService; private HighLightFillingService highLightFillingService;
...@@ -637,7 +641,7 @@ public class NoiseProcessingServiceImpl implements NoiseProcessingService { ...@@ -637,7 +641,7 @@ public class NoiseProcessingServiceImpl implements NoiseProcessingService {
private boolean isTimeOver(String time) { private boolean isTimeOver(String time) {
long current=System.currentTimeMillis();//当前时间毫秒数 long current=System.currentTimeMillis();//当前时间毫秒数
long zero=current/(1000*3600*24)*(1000*3600*24)-TimeZone.getDefault().getRawOffset();//今天零点零分零秒的毫秒数 long zero=current-(24*3600*1000L);//前一天毫秒数
Date date = new Date(); Date date = new Date();
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
// SimpleDateFormat dateFormat = new SimpleDateFormat("EEE MMM d HH:mm:ss 'CST' // SimpleDateFormat dateFormat = new SimpleDateFormat("EEE MMM d HH:mm:ss 'CST'
...@@ -647,7 +651,7 @@ public class NoiseProcessingServiceImpl implements NoiseProcessingService { ...@@ -647,7 +651,7 @@ public class NoiseProcessingServiceImpl implements NoiseProcessingService {
try { try {
date = dateFormat.parse(time); date = dateFormat.parse(time);
if ((date.getTime() + (8 * 3600 * 1000L)) < zero) { if ((date.getTime() + (8 * 3600 * 1000L)) < zero) {
System.err.println(date.getTime() + (8 * 3600 * 1000L)+"|"+zero); log.info(date.getTime() + (8 * 3600 * 1000L)+"|"+zero);
return true; return true;
} }
} catch (ParseException e) { } catch (ParseException e) {
...@@ -677,15 +681,15 @@ public class NoiseProcessingServiceImpl implements NoiseProcessingService { ...@@ -677,15 +681,15 @@ public class NoiseProcessingServiceImpl implements NoiseProcessingService {
String time = map.get("time") != null ? map.get("time").toString() : null; String time = map.get("time") != null ? map.get("time").toString() : null;
if (isTimeOver(time)) { if (isTimeOver(time)) {
System.err.println("weibo:time:" + time + map.get("id").toString()); log.info("weibo:time:" + time +"\t"+ map.get("id").toString());
return isnoise; return true;
} }
String text = map.get("text") != null ? map.get("text").toString() : null; String text = map.get("text") != null ? map.get("text").toString() : null;
String roottext = map.get("roottext") != null ? map.get("roottext").toString() : null; String roottext = map.get("roottext") != null ? map.get("roottext").toString() : null;
String username = map.get("username") != null ? map.get("username").toString() : null; String username = map.get("username") != null ? map.get("username").toString() : null;
if (null == username) { if (null == username) {
return isnoise; return true;
} }
if (noiseRules != null) { if (noiseRules != null) {
for (NoiseRule n : noiseRules) { for (NoiseRule n : noiseRules) {
...@@ -782,8 +786,8 @@ public class NoiseProcessingServiceImpl implements NoiseProcessingService { ...@@ -782,8 +786,8 @@ public class NoiseProcessingServiceImpl implements NoiseProcessingService {
String platform = platformNew.getPlatformName(); String platform = platformNew.getPlatformName();
String time = map.get("created_at") != null ? map.get("created_at").toString() : null; String time = map.get("created_at") != null ? map.get("created_at").toString() : null;
if (isTimeOver(time)) { if (isTimeOver(time)) {
System.err.println("zhihu:time:" + time + map.get("id").toString()); log.info("zhihu:time:" + time +"\t"+ map.get("id").toString());
return isnoise; return true;
} }
String text = map.get("question_title") != null ? map.get("question_title").toString() : null; String text = map.get("question_title") != null ? map.get("question_title").toString() : null;
String roottext = map.get("answer_content") != null ? map.get("answer_content").toString() : null; String roottext = map.get("answer_content") != null ? map.get("answer_content").toString() : null;
...@@ -913,8 +917,8 @@ public class NoiseProcessingServiceImpl implements NoiseProcessingService { ...@@ -913,8 +917,8 @@ public class NoiseProcessingServiceImpl implements NoiseProcessingService {
String platform = platformNew.getPlatformName(); String platform = platformNew.getPlatformName();
String time = map.get("time") != null ? map.get("time").toString() : null; String time = map.get("time") != null ? map.get("time").toString() : null;
if (isTimeOver(time)) { if (isTimeOver(time)) {
System.err.println("video:time:" + time + map.get("id").toString()); log.info("video:time:" + time +"\t"+ map.get("id").toString());
return isnoise; return true;
} }
String text = map.get("title") != null ? map.get("title").toString() : null; String text = map.get("title") != null ? map.get("title").toString() : null;
String roottext = null; String roottext = null;
...@@ -1043,15 +1047,15 @@ public class NoiseProcessingServiceImpl implements NoiseProcessingService { ...@@ -1043,15 +1047,15 @@ public class NoiseProcessingServiceImpl implements NoiseProcessingService {
String platform = platformNew.getPlatformName(); String platform = platformNew.getPlatformName();
String time = map.get("time") != null ? map.get("time").toString() : null; String time = map.get("time") != null ? map.get("time").toString() : null;
if (isTimeOver(time)) { if (isTimeOver(time)) {
System.err.println("media:time:" + time + map.get("id").toString()); log.info("media:time:" + time +"\t"+ map.get("id").toString());
return isnoise; return true;
} }
String text = map.get("title") != null ? map.get("title").toString() : null; String text = map.get("title") != null ? map.get("title").toString() : null;
String roottext = map.get("content") != null ? map.get("content").toString() : null; String roottext = map.get("content") != null ? map.get("content").toString() : null;
String username = map.get("source") != null ? map.get("source").toString() : null; String username = map.get("source") != null ? map.get("source").toString() : null;
if (null == username) { if (null == username) {
return isnoise; return true;
} }
if (noiseRules != null) { if (noiseRules != null) {
for (NoiseRule n : noiseRules) { for (NoiseRule n : noiseRules) {
......
...@@ -136,7 +136,7 @@ public class RedisPoolAndTools { ...@@ -136,7 +136,7 @@ public class RedisPoolAndTools {
} }
/** /**
* 删除超出上限的数据 * 删除超出上限的数据(弃用
* *
* @param key * @param key
* @param removeIndex * @param removeIndex
...@@ -153,9 +153,26 @@ public class RedisPoolAndTools { ...@@ -153,9 +153,26 @@ public class RedisPoolAndTools {
jedis.zremrangeByRank(key, 0, removeIndex); jedis.zremrangeByRank(key, 0, removeIndex);
returnResource(jedis); returnResource(jedis);
} }
/** /**
* 获取有序集合消息数量 * 删除超出上限的数据
*
* @param key
* @param removeIndex
*/
public void removeDataByName(String key, int removeIndex) {
Jedis jedis = getJedis();
while (true) {
if (null != jedis) {
break;
} else {
jedis = getJedis();
}
}
jedis.zremrangeByRank(key, 0, removeIndex);
returnResource(jedis);
}
/**
* 获取有序集合消息数量(弃用
* *
* @param key * @param key
* @return * @return
...@@ -173,6 +190,25 @@ public class RedisPoolAndTools { ...@@ -173,6 +190,25 @@ public class RedisPoolAndTools {
returnResource(jedis); returnResource(jedis);
return nowCount; return nowCount;
} }
/**
* 获取有序集合消息数量
*
* @param key
* @return
*/
public Long getNowCount(String key) {
Jedis jedis = getJedis();
while (true) {
if (null != jedis) {
break;
} else {
jedis = getJedis();
}
}
Long nowCount = jedis.zcard(key);
returnResource(jedis);
return nowCount;
}
/** /**
* 分页获取redis数据 * 分页获取redis数据
......
...@@ -61,10 +61,10 @@ public class RedisServiceImpl implements RedisService { ...@@ -61,10 +61,10 @@ public class RedisServiceImpl implements RedisService {
/** /**
* 删除超出存储上限的数据 * 删除超出存储上限的数据
*/ */
long nowCount = redisPoolAndTools.getNowCount(redisKey.getBytes());// 当前数据量 long nowCount = redisPoolAndTools.getNowCount(redisKey);// 当前数据量
int removeIndex = (int) (nowCount - maxSize);// 需移除数据数量 int removeIndex = (int) (nowCount - maxSize);// 需移除数据数量
if (maxSize > 0 && nowCount > maxSize) { if (maxSize > 0 && nowCount > maxSize) {
redisPoolAndTools.removeDataByName(redisKey.getBytes(), removeIndex); redisPoolAndTools.removeDataByName(redisKey, removeIndex);
} }
} }
...@@ -84,10 +84,10 @@ public class RedisServiceImpl implements RedisService { ...@@ -84,10 +84,10 @@ public class RedisServiceImpl implements RedisService {
/** /**
* 删除超出存储上限的数据 * 删除超出存储上限的数据
*/ */
long nowCount = redisPoolAndTools.getNowCount(redisKey.getBytes());// 当前数据量 long nowCount = redisPoolAndTools.getNowCount(redisKey);// 当前数据量
int removeIndex = (int) (nowCount - maxSize);// 需移除数据数量 int removeIndex = (int) (nowCount - maxSize);// 需移除数据数量
if (maxSize > 0 && nowCount > maxSize) { if (maxSize > 0 && nowCount > maxSize) {
redisPoolAndTools.removeDataByName(redisKey.getBytes(), removeIndex); redisPoolAndTools.removeDataByName(redisKey, removeIndex);
} }
} }
...@@ -107,10 +107,10 @@ public class RedisServiceImpl implements RedisService { ...@@ -107,10 +107,10 @@ public class RedisServiceImpl implements RedisService {
/** /**
* 删除超出存储上限的数据 * 删除超出存储上限的数据
*/ */
long nowCount = redisPoolAndTools.getNowCount(redisKey.getBytes());// 当前数据量 long nowCount = redisPoolAndTools.getNowCount(redisKey);// 当前数据量
int removeIndex = (int) (nowCount - maxSize);// 需移除数据数量 int removeIndex = (int) (nowCount - maxSize);// 需移除数据数量
if (maxSize > 0 && nowCount > maxSize) { if (maxSize > 0 && nowCount > maxSize) {
redisPoolAndTools.removeDataByName(redisKey.getBytes(), removeIndex); redisPoolAndTools.removeDataByName(redisKey, removeIndex);
} }
} }
...@@ -130,10 +130,10 @@ public class RedisServiceImpl implements RedisService { ...@@ -130,10 +130,10 @@ public class RedisServiceImpl implements RedisService {
/** /**
* 删除超出存储上限的数据 * 删除超出存储上限的数据
*/ */
long nowCount = redisPoolAndTools.getNowCount(redisKey.getBytes());// 当前数据量 long nowCount = redisPoolAndTools.getNowCount(redisKey);// 当前数据量
int removeIndex = (int) (nowCount - maxSize);// 需移除数据数量 int removeIndex = (int) (nowCount - maxSize);// 需移除数据数量
if (maxSize > 0 && nowCount > maxSize) { if (maxSize > 0 && nowCount > maxSize) {
redisPoolAndTools.removeDataByName(redisKey.getBytes(), removeIndex); redisPoolAndTools.removeDataByName(redisKey, removeIndex);
} }
} }
...@@ -148,10 +148,10 @@ public class RedisServiceImpl implements RedisService { ...@@ -148,10 +148,10 @@ public class RedisServiceImpl implements RedisService {
/** /**
* 删除超出存储上限的数据 * 删除超出存储上限的数据
*/ */
long nowCount = redisPoolAndTools.getNowCount(redisKey.getBytes());// 当前数据量 long nowCount = redisPoolAndTools.getNowCount(redisKey);// 当前数据量
int removeIndex = (int) (nowCount - maxSize);// 需移除数据数量 int removeIndex = (int) (nowCount - maxSize);// 需移除数据数量
if (maxSize > 0 && nowCount > maxSize) { if (maxSize > 0 && nowCount > maxSize) {
redisPoolAndTools.removeDataByName(redisKey.getBytes(), removeIndex); redisPoolAndTools.removeDataByName(redisKey, removeIndex);
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment