Commit 101c2756 by 303514581@qq.com

2019/5/5 添加wap缓存机制+更新起始commonid读取位置。

parent 34436537
......@@ -38,7 +38,7 @@ public class ES4RedisStart {
private Map<String, Boolean> PROJECTMAP = new HashMap<>();
// private static final int PROJECTCOUNT = 10;
// private static final int PROJECTCOUNT = 5;
private static final int PROJECTCOUNT = 40;
/**
......
......@@ -5,7 +5,13 @@ import java.util.Collections;
import java.util.Date;
import java.util.List;
import javax.annotation.PostConstruct;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import com.zhiwei.messageflow.es.dao.ESDao;
......@@ -13,6 +19,7 @@ import com.zhiwei.messageflow.service.EventService;
@Component
public class ESGetCommonId {
private static final Logger log = LogManager.getLogger(ESGetCommonId.class);
@Autowired
private ESDao esDao;
......@@ -26,6 +33,14 @@ public class ESGetCommonId {
public static long TIME;
@Async
@Scheduled(cron = "0 0/30 * * * ?")
public void updatebackupId() {
getStartId();
// getbackupStartId();
log.info("读取起始id更新完毕");
}
public void getCommonId() {
long nowtime = System.currentTimeMillis();
if (TIME < nowtime - 60 * 60 * 1000L) {
......@@ -39,10 +54,10 @@ public class ESGetCommonId {
int zhihu = esDao.get48hoursFirstCommonid("zhihu");
cidList.add(zhihu);
Collections.sort(cidList);
System.err.println(weibo + " " + media + " " + video + " " + zhihu + "\n" + cidList);
log.info(weibo + " " + media + " " + video + " " + zhihu + "\n" + cidList);
START_COMMONID = cidList.get(0);
TIME = nowtime;
System.err.println("START_COMMONID:" + START_COMMONID + "\tNOW:" + new Date(TIME));
log.info("START_COMMONID:" + START_COMMONID + "\tNOW:" + new Date(TIME));
}
}
......@@ -59,8 +74,6 @@ public class ESGetCommonId {
int zhihu = esDao.getCommonidByTime("zhihu", endtime);
cidList.add(zhihu);
Collections.sort(cidList);
// System.err.println(weibo + " " + media + " " + video + " " + zhihu + "\n" +
// cidList);
int commonid = cidList.get(0);
return commonid;
}
......@@ -70,7 +83,7 @@ public class ESGetCommonId {
}
public void getbackupStartId() {
long nowtime = System.currentTimeMillis() - 20 * 60 * 1000L;
long nowtime = System.currentTimeMillis() - 10 * 60 * 1000L;
List<Integer> cidList = new ArrayList<>();
int weibo = esDao.getCommonidByTime("weibo", nowtime);
cidList.add(weibo);
......@@ -82,11 +95,11 @@ public class ESGetCommonId {
cidList.add(zhihu);
Collections.sort(cidList);
START_BACKUPID = cidList.get(0);
System.err.println("START_BACKUPID:" + START_BACKUPID + "\tNOW:" + nowtime);
log.info("START_BACKUPID:" + START_BACKUPID + "\tNOW:" + nowtime);
}
public void getStartId() {
long nowtime = System.currentTimeMillis() - 12 * 60 * 60 * 1000L;
long nowtime = System.currentTimeMillis() - 6 * 60 * 60 * 1000L;
List<Integer> cidList = new ArrayList<>();
int weibo = esDao.getCommonidByTime("weibo", nowtime);
cidList.add(weibo);
......@@ -98,7 +111,7 @@ public class ESGetCommonId {
cidList.add(zhihu);
Collections.sort(cidList);
START_BACKUPID = cidList.get(0);
System.err.println("START_BACKUPID:" + START_BACKUPID + "\tNOW:" + nowtime);
log.info("START_BACKUPID:" + START_BACKUPID + "\tNOW:" + nowtime);
}
}
......@@ -5,9 +5,13 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.data.mongo.MongoDataAutoConfiguration;
import org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication(exclude = {MongoAutoConfiguration.class,MongoDataAutoConfiguration.class})
@EnableKafka
@EnableScheduling
@EnableAsync
public class MessageflowApplication {
public static void main(String[] args) {
......
package com.zhiwei.messageflow;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.messageflow.config.RedisConfig;
import com.zhiwei.messageflow.mongo.bean.WapProject;
import com.zhiwei.messageflow.redis.service.RedisService;
import com.zhiwei.messageflow.service.WapService;
import com.zhiwei.messageflow.task.AsyncTask;
/**
* Wap写入缓存定时器
*
* @ClassName: WapRedisTask
* @Description: Wap写入缓存定时器
* @author shentao
* @date 2019年4月18日 下午2:59:11
*/
@Component
public class WapRedisTask {
private static final Logger log = LogManager.getLogger(WapRedisTask.class);
@Autowired
private AsyncTask task;
@Autowired
private WapService wapService;
@Autowired
private RedisService redisService;
// 缓存Key Wap:时间:项目_栏目_分类
// String key = GenericAttribute.KEY_WAP + time + ":" + project + "_"
// + classification + "_" + columnId;
private List<String> WAP_SEVEN_DAY_KEY;
/** 分类 **/
private List<String> WAP_CF = Arrays.asList("全部", "微博", "微信", "网媒", "今日头条");
/** wap项目 **/
private List<WapProject> WAP_Projects;
/** 系统项目 **/
private List<String> WAP_sProjects;
/** 栏目对应关系 **/
private List<Map<String, Object>> WAP_columnIdMap;
/**
* 启动时获取当前7天消息列表key
*
* @Title: createSevenDayTaskKeys
* @Description: 启动时获取当前7天消息列表key
* @param 设定文件
* @return void 返回类型
*/
@PostConstruct
public void createSevenDayTaskKeys() {
// 获取project
WAP_SEVEN_DAY_KEY = new ArrayList<>();
log.info("===启动时获取当前7天消息列表key===");
WAP_Projects = wapService.getWapAllProject();
List<String> systemProjects = new ArrayList<>();
WAP_Projects.forEach(a -> {
systemProjects.addAll(a.getProjects());
});
WAP_sProjects = systemProjects.stream().distinct().collect(Collectors.toList());
// 获取 项目对应columnId列表
WAP_columnIdMap = wapService.getWapAllColumnIdByProject(WAP_sProjects, WAP_Projects);
// 获取当前7天时间戳列表
List<Long> sevenDay = wapService.getWapSevenDay();
WAP_columnIdMap.stream().forEach(a -> {
String project = a.get("wapProject") + "";
String columnId = a.get("columnId") + "";
for (int i = 0; i < WAP_CF.size(); i++) {
String classification = WAP_CF.get(i);
for (int j = 0; j < sevenDay.size(); j++) {
long time = sevenDay.get(j);
WAP_SEVEN_DAY_KEY
.add(RedisConfig.WAPKEY + time + ":" + project + "_" + classification + "_" + columnId);
}
}
});
// 创建有效期7d的key,有key 更新时间
redisService.createSevenDayKey(WAP_SEVEN_DAY_KEY);
// 创建7天缓存数据
task.doTaskCreate7dWapInfo(WAP_columnIdMap,WAP_sProjects);
}
/**
* 启动后每天1点更新当前7天消息列表key
*
* @Title: updateSevenDayTaskKeysByDay
* @Description: 启动后每天1点更新当前7天消息列表key
* @param 设定文件
* @return void 返回类型
*/
@Async
@Scheduled(cron = "0 0 1 1/1 * ?")
public void updateSevenDayTaskKeysByDay() {
// 获取project
WAP_SEVEN_DAY_KEY = new ArrayList<>();
log.info("====每天1点更新当前7天消息列表key===");
WAP_Projects = wapService.getWapAllProject();
List<String> systemProjects = new ArrayList<>();
WAP_Projects.forEach(a -> {
systemProjects.addAll(a.getProjects());
});
WAP_sProjects = systemProjects.stream().distinct().collect(Collectors.toList());
// 获取 项目对应columnId列表
WAP_columnIdMap = wapService.getWapAllColumnIdByProject(WAP_sProjects, WAP_Projects);
// 获取当前7天时间戳列表
List<Long> sevenDay = wapService.getWapSevenDay();
WAP_columnIdMap.stream().forEach(a -> {
System.err.println(JSONObject.toJSONString(a));
String project = a.get("wapProject") + "";
String columnId = a.get("columnId") + "";
for (int i = 0; i < WAP_CF.size(); i++) {
String classification = WAP_CF.get(i);
for (int j = 0; j < sevenDay.size(); j++) {
long time = sevenDay.get(j);
WAP_SEVEN_DAY_KEY
.add(RedisConfig.WAPKEY + time + ":" + project + "_" + classification + "_" + columnId);
}
}
});
// 创建有效期7d的key,有key 更新时间
redisService.createSevenDayKey(WAP_SEVEN_DAY_KEY);
}
/**
* 每5分钟更新近7天缓存消息,并完成新缓存任务
*
* @Title: createSevenDayTask
* @Description: 每分钟更新近7天缓存消息,并完成新缓存任务
* @param 设定文件
* @return void 返回类型
*/
@Async
@Scheduled(cron = "0 0/1 * * * ?")
public void updateSevenDayTask5min() {
// 获取
log.info("5分钟更新近7天缓存消息");
// 7天内数据更新
try {
task.doTaskUpdate7dWapInfo(WAP_columnIdMap,WAP_sProjects);
} catch (Exception e) {
log.error("5分钟更新近7天缓存消息task出错Exception", e);
}
}
}
......@@ -33,6 +33,8 @@ public class RedisConfig {
public static final String EVENTHASHKEY = "Event:Hash:";
/** 追踪规则已追踪集 **/
public static final String TRACKKEY = "TrackRule:";
/** 追踪规则已追踪集 **/
public static final String WAPKEY = "Wap:";
private int keyMaxSize;
......
......@@ -4,6 +4,7 @@ import java.util.List;
import org.elasticsearch.search.SearchHits;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.messageflow.mongo.bean.DirectGroup;
import com.zhiwei.messageflow.mongo.bean.PlatformNew;
......@@ -13,18 +14,12 @@ public interface ESDao {
/**
* 获取微博消息
*
* @param keywords
* 关键词组
* @param count
* 消息数量
* @param start
* 开始rsid
* @param end
* 结束rsid
* @param platform
* 平台
* @param project
* 平台
* @param keywords 关键词组
* @param count 消息数量
* @param start 开始rsid
* @param end 结束rsid
* @param platform 平台
* @param project 平台
* @return
*/
public SearchHits getWeiboDataFromEs(List<String> keywords, int count, long start, long end, String platform,
......@@ -33,18 +28,12 @@ public interface ESDao {
/**
* 获取知乎消息
*
* @param keywords
* 关键词组
* @param count
* 消息数量
* @param start
* 开始rsid
* @param end
* 结束rsid
* @param platform
* 平台
* @param project
* 平台
* @param keywords 关键词组
* @param count 消息数量
* @param start 开始rsid
* @param end 结束rsid
* @param platform 平台
* @param project 平台
* @return
*/
public SearchHits getZhihuDataFromEs(List<String> keywords, int count, long start, long end, String platform,
......@@ -53,18 +42,12 @@ public interface ESDao {
/**
* 获取视频消息
*
* @param keywords
* 关键词组
* @param count
* 消息数量
* @param start
* 开始rsid
* @param end
* 结束rsid
* @param platform
* 平台
* @param project
* 平台
* @param keywords 关键词组
* @param count 消息数量
* @param start 开始rsid
* @param end 结束rsid
* @param platform 平台
* @param project 平台
* @return
*/
public SearchHits getVideoDataFromEs(List<String> keywords, int count, long start, long end, String platform,
......@@ -73,18 +56,12 @@ public interface ESDao {
/**
* 获取网媒消息
*
* @param keywords
* 关键词组
* @param count
* 消息数量
* @param start
* 开始rsid
* @param end
* 结束rsid
* @param platform
* 平台
* @param project
* 平台
* @param keywords 关键词组
* @param count 消息数量
* @param start 开始rsid
* @param end 结束rsid
* @param platform 平台
* @param project 平台
* @return
*/
public SearchHits getMediaDataFromEs(List<String> keywords, int count, long start, long end, String platform,
......@@ -104,8 +81,10 @@ public interface ESDao {
public int get48hoursFirstCommonid(String tableName);
public int getCommonidByTime(String tableName, long endtime);
/**
* 获取定向监测ESmessage
*
* @Title: getDirectDataFromEs
* @Description: TODO(获取定向监测ESmessage)
* @param @param keywords
......@@ -121,4 +100,18 @@ public interface ESDao {
public SearchHits getDirectDataFromEs(List<String> keywords, int count, long startid, long endid, String pt,
String projectName, DirectGroup directGroup);
/**
* 获取Wap数据按项目s和时间
* @param timeType
*
* @Title: getAllDayMark
* @Description: 获取Wap数据按项目s和时间
* @param @param start
* @param @param end
* @param @param sProjects
* @param @return 设定文件
* @return List<JSONObject> 返回类型
*/
public List<JSONObject> getAllDayMark(long start, long end, List<String> sProjects, String timeType);
}
......@@ -7,13 +7,16 @@ import java.util.List;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.es.util.ESIndexesUtil;
import com.zhiwei.messageflow.ESGetCommonId;
import com.zhiwei.messageflow.config.ESConfig;
......@@ -22,6 +25,7 @@ import com.zhiwei.messageflow.es.dao.ESDao;
import com.zhiwei.messageflow.mongo.bean.DirectGroup;
import com.zhiwei.messageflow.mongo.bean.PlatformNew;
import com.zhiwei.messageflow.util.ESQueryUtil;
import com.zhiwei.messageflow.util.MatchingInfoUtil;
import com.zhiwei.messageflow.util.TimeUtil;
@Component
......@@ -730,7 +734,7 @@ public class ESDaoImpl implements ESDao {
public int get48hoursFirstCommonid(String tableName) {
// 获取当天最早commonid
int commonid = 0;
long now =System.currentTimeMillis();
long now = System.currentTimeMillis();
long date = now - 1 * 48 * 60 * 60 * 1000L;
// String wEtime = TimeUtil.formatDateToMinute(new Date(now));
// String weibotime = TimeUtil.formatDateToMinute(new Date(date));
......@@ -797,7 +801,7 @@ public class ESDaoImpl implements ESDao {
// 获取当天最早commonid
int commonid = 0;
long now =System.currentTimeMillis();
long now = System.currentTimeMillis();
long date = endtime;
// String weibotime = TimeUtil.formatDateToMinute(new Date(date));
// String zhihutime = TimeUtil.formatEsDate(new Date(date - 8 * 3600 * 1000L)) + "Z";
......@@ -896,13 +900,13 @@ public class ESDaoImpl implements ESDao {
specTypeList.add(memberSpec);
}
}
BoolQueryBuilder sonQueryBuilder =QueryBuilders.boolQuery();
BoolQueryBuilder sonQueryBuilder = QueryBuilders.boolQuery();
if (pt.equals("微博")) {
sonQueryBuilder = ESQueryUtil.assembleShouldNotFieldsQuery(sonQueryBuilder, sourceList, null, "username");
sonQueryBuilder = ESQueryUtil.assembleShouldNotFieldsQuery(sonQueryBuilder, specTypeList, null, "user_id");
} else {
sonQueryBuilder = ESQueryUtil.assembleShouldNotFieldsQuery(sonQueryBuilder, sourceList, null, "source");
if (pt.equals("微信")||pt.equals("平媒")) {
if (pt.equals("微信") || pt.equals("平媒")) {
// tsuQueryBuilder = ESQueryUtil.assembleShouldNotFieldsQuery(tsuQueryBuilder,
// specTypeList, null, "source");
} else if (pt.equals("今日头条")) {
......@@ -944,4 +948,60 @@ public class ESDaoImpl implements ESDao {
return searchHits;
}
@Override
public List<JSONObject> getAllDayMark(long start, long end, List<String> sProjects, String timeType) {
List<JSONObject> list = new ArrayList<>();
// 组装查询语句
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
// 获取所需数据projectQuery
BoolQueryBuilder projectQuery = ESQueryUtil.termQueryWords(QueryBuilders.boolQuery(), "should", "markGroup",
sProjects);
// 获取所需数据typeQuery
List<String> typeList = ESQueryUtil.WAP_ALL_TYPE;
List<String> noTypeList = new ArrayList<>();
BoolQueryBuilder typeQuery = ESQueryUtil.assembleShouldNotFieldsQuery(QueryBuilders.boolQuery(), typeList,
noTypeList, "type");
queryBuilder.must(typeQuery);
queryBuilder.must(projectQuery);
// 组配查询
// TODO
String startTime = TimeUtil.formatDateToMinute(new Date(start));
String endTime = TimeUtil.formatDateToMinute(new Date(end));
SearchResponse scrollResponse = transportClient.prepareSearch("weibotag", "mediatag")
.setTypes("weibotag", "mediatag").setQuery(queryBuilder)
.setPostFilter(QueryBuilders.rangeQuery(timeType).from(startTime).to(endTime)).setSize(10000).execute()
.actionGet();
long count = scrollResponse.getHits().getTotalHits();
if (count == 0) {
return list;
} else if (count <= 10000) {
for (SearchHit s : scrollResponse.getHits()) {
JSONObject hit = MatchingInfoUtil.getMarkBean(s);
list.add(hit);
}
return list;
} else {
// 重写查询方法
scrollResponse = transportClient.prepareSearch("weibotag", "mediatag")
.setTypes("weibotag", "mediatag").setQuery(queryBuilder)
.setPostFilter(QueryBuilders.rangeQuery(timeType).from(startTime).to(endTime)).setSize(10000)
.setSize(1000).setScroll(TimeValue.timeValueMinutes(1)).execute().actionGet();
count = scrollResponse.getHits().getTotalHits();
int page = (int) count / 1000;
for (SearchHit s : scrollResponse.getHits()) {
JSONObject hit = MatchingInfoUtil.getMarkBean(s);
list.add(hit);
}
for (int i = 0; i < page; i++) {
scrollResponse = transportClient.prepareSearchScroll(scrollResponse.getScrollId())
.setScroll(TimeValue.timeValueMinutes(1)).execute().actionGet();
for (SearchHit s : scrollResponse.getHits()) {
JSONObject hit = MatchingInfoUtil.getMarkBean(s);
list.add(hit);
}
}
return list;
}
}
}
\ No newline at end of file
package com.zhiwei.messageflow.mongo.bean;
import java.util.List;
import java.util.Map;
import org.springframework.data.mongodb.core.mapping.Document;
import lombok.Data;
import lombok.ToString;
@Data
@ToString
@Document(collection = "qbjc_taggroup")
public class TagGroup {
/**
* id = tag中groupid
*/
private Long id;// id = tag中groupid
/*
* tagName 标签组名
*/
private String tagName;// tagName 标签组名
/*
* isShow 是否呈现在情报监测页面
*/
private boolean isShow;// isShow 是否呈现在情报监测页面
/*
* createAt 标签创建时间
*/
private Long createAt;// createAt 标签创建时间
/*
* submitter
*/
private String submitter;// submitter
/*
* project
*/
private String project;// project
/*
* details Object=>Tag String =>name(标签)
*/
private List<Map<String, Object>> details;// details Object=>Tag String =>name(标签)
}
package com.zhiwei.messageflow.mongo.bean;
import java.util.List;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
import lombok.Data;
import lombok.ToString;
@Data
@ToString
@Document(collection = "qbjc_wap_project")
public class WapProject {
/**
* 项目名
*/
@Id
private String id;
/**
* 对应系统项目
*/
private List<String> projects;
}
package com.zhiwei.messageflow.redis;
import java.util.Date;
import java.util.List;
import java.util.Set;
......@@ -36,7 +37,8 @@ public class RedisPoolAndTools {
config.setMaxWaitMillis(redisConfig.getMaxWaitMillis());
config.setTestOnBorrow(redisConfig.isTestOnBorrow());
config.setTestOnReturn(redisConfig.isTestOnReturn());
pool = new JedisPool(config, redisConfig.getIp(), redisConfig.getPort(), 0, redisConfig.getPassword(), redisConfig.getSelectDB());
pool = new JedisPool(config, redisConfig.getIp(), redisConfig.getPort(), 0, redisConfig.getPassword(),
redisConfig.getSelectDB());
}
/**
......@@ -79,8 +81,7 @@ public class RedisPoolAndTools {
/**
* 获取RSID
*
* @param rsidkey
* 键值为platform-project-keywords
* @param rsidkey 键值为platform-project-keywords
* @return
*/
public String getRSID(String rsidkey) {
......@@ -154,6 +155,7 @@ public class RedisPoolAndTools {
jedis.zremrangeByRank(key, 0, removeIndex);
returnResource(jedis);
}
/**
* 删除超出上限的数据
*
......@@ -172,6 +174,7 @@ public class RedisPoolAndTools {
jedis.zremrangeByRank(key, 0, removeIndex);
returnResource(jedis);
}
/**
* 获取有序集合消息数量(弃用
*
......@@ -191,6 +194,7 @@ public class RedisPoolAndTools {
returnResource(jedis);
return nowCount;
}
/**
* 获取有序集合消息数量
*
......@@ -233,8 +237,10 @@ public class RedisPoolAndTools {
returnResource(jedis);
return set;
}
/**
* 向列表中添加单个元素
*
* @Title: Rpush
* @Description: 向列表中添加单个元素
* @param @param redisKey
......@@ -253,8 +259,10 @@ public class RedisPoolAndTools {
jedis.rpush(redisKey, id);
returnResource(jedis);
}
/**
* 从列表中获取范围的元素
*
* @Title: Lrange
* @Description: 从列表中获取范围的元素
* @param @param key
......@@ -263,7 +271,7 @@ public class RedisPoolAndTools {
* @param @return 设定文件
* @return List<String> 返回类型
*/
public List<String> Lrange(String key,int start, int end) {
public List<String> Lrange(String key, int start, int end) {
Jedis jedis = getJedis();
while (true) {
if (null != jedis) {
......@@ -272,12 +280,14 @@ public class RedisPoolAndTools {
jedis = getJedis();
}
}
List<String> idlist=jedis.lrange(key, start, end);
List<String> idlist = jedis.lrange(key, start, end);
returnResource(jedis);
return idlist;
}
/**
* 从有序set中获取范围的元素
*
* @Title: zrange
* @Description: 从有序set中获取范围的元素
* @param @param redisKey
......@@ -295,13 +305,15 @@ public class RedisPoolAndTools {
jedis = getJedis();
}
}
Set<String> res=jedis.zrange(redisKey, start, end);
Set<String> res = jedis.zrange(redisKey, start, end);
returnResource(jedis);
return res;
}
/**
* 从列表中移除全部指定值
*
* @Title: Lrem
* @Description: 从列表中移除全部指定值
* @param @param redisKey
......@@ -320,8 +332,10 @@ public class RedisPoolAndTools {
jedis.lrem(key, 0, id);
returnResource(jedis);
}
/**
* 判断key是否存在
*
* @Title: exists
* @Description: 判断key是否存在
* @param @param key
......@@ -337,12 +351,14 @@ public class RedisPoolAndTools {
jedis = getJedis();
}
}
boolean res=jedis.exists(key);
boolean res = jedis.exists(key);
returnResource(jedis);
return res;
}
/**
* 判断hash中key是否存在
*
* @Title: hexists
* @Description: 判断hash中key是否存在
* @param @param redisKey
......@@ -359,12 +375,14 @@ public class RedisPoolAndTools {
jedis = getJedis();
}
}
boolean res=jedis.hexists(key, url);
boolean res = jedis.hexists(key, url);
returnResource(jedis);
return res;
}
/**
* 如果哈希表不存在,一个新的哈希表被创建并进行 HSET 操作。如果字段已经存在于哈希表中,旧值将被覆盖。
*
* @Title: hset
* @Description: 如果哈希表不存在,一个新的哈希表被创建并进行 HSET 操作。如果字段已经存在于哈希表中,旧值将被覆盖。
* @param @param redisKey
......@@ -393,7 +411,7 @@ public class RedisPoolAndTools {
jedis = getJedis();
}
}
boolean res= 1==jedis.del(key)?true:false;
boolean res = 1 == jedis.del(key) ? true : false;
returnResource(jedis);
return res;
}
......@@ -425,4 +443,54 @@ public class RedisPoolAndTools {
returnResource(jedis);
}
/**
* 如果key不存在,一个新的key被创建并进行Expire 操作。
*
* @Title: setAndExpireKey
* @Description:如果key不存在,一个新的key被创建并进行Expire 操作。
* @param @param key
* @param @param l 设定文件
* @return void 返回类型
*/
public void setAndExpireKey(String key, long milliseconds) {
Jedis jedis = getJedis();
while (true) {
if (null != jedis) {
break;
} else {
jedis = getJedis();
}
}
if (!jedis.exists(key)) {
jedis.zadd(key, -1.0, "建SortSet时间" + new Date(System.currentTimeMillis()));
jedis.pexpire(key, milliseconds);
} else {
jedis.pexpire(key, milliseconds);
}
returnResource(jedis);
}
/**
* 按Score值移除数据
*
* @Title: zremrangeByScore
* @Description: 按Score值移除数据
* @param @param key
* @param @param sScore
* @param @param eScore 设定文件
* @return void 返回类型
*/
public void zremrangeByScore(String key, Double sScore, Double eScore) {
Jedis jedis = getJedis();
while (true) {
if (null != jedis) {
break;
} else {
jedis = getJedis();
}
}
jedis.zremrangeByScore(key, sScore, eScore);
returnResource(jedis);
}
}
\ No newline at end of file
......@@ -17,8 +17,7 @@ public interface RedisService {
/**
* 获取项目Rsid列表
*
* @param project
* 项目名
* @param project 项目名
* @return
*/
String getRsid(String project);
......@@ -26,58 +25,44 @@ public interface RedisService {
/**
* 更新项目Rsid列表
*
* @param newRsidMap
* 存入redis的数据
* @param project
* 项目名
* @param newRsidMap 存入redis的数据
* @param project 项目名
*/
void setRsid(Map<String, Integer> newRsidMap, String project);
/**
* 向redis写入微博数据
*
* @param redisKey
* 数据键
* @param messages
* 数据列表
* @param maxSize
* redis数据存储上限
* @param redisKey 数据键
* @param messages 数据列表
* @param maxSize redis数据存储上限
*/
void setWeiboMessageMessage(String redisKey, List<WeiboMessage> messages, int maxSize);
/**
* 向redis写入知乎数据
*
* @param redisKey
* 数据键
* @param messages
* 数据列表
* @param maxSize
* redis数据存储上限
* @param redisKey 数据键
* @param messages 数据列表
* @param maxSize redis数据存储上限
*/
void setZhihuMessageMessage(String redisKey, List<ZhihuMessage> messages, int maxSize);
/**
* 向redis写入视频数据
*
* @param redisKey
* 数据键
* @param messages
* 数据列表
* @param maxSize
* redis数据存储上限
* @param redisKey 数据键
* @param messages 数据列表
* @param maxSize redis数据存储上限
*/
void setVideoMessageMessage(String redisKey, List<VideoMessage> messages, int maxSize);
/**
* 向redis写入网媒数据
*
* @param redisKey
* 数据键
* @param messages
* 数据列表
* @param maxSize
* redis数据存储上限
* @param redisKey 数据键
* @param messages 数据列表
* @param maxSize redis数据存储上限
*/
void setMediaMessageMessage(String redisKey, List<MediaMessage> messages, int maxSize);
......@@ -86,17 +71,16 @@ public interface RedisService {
*
* @Title: setMessage2Redis
* @Description: TODO(这里用一句话描述这个方法的作用)
* @param @param
* allRedisKey
* @param @param
* messages
* @param @param
* allkeywordcount 设定文件
* @param @param allRedisKey
* @param @param messages
* @param @param allkeywordcount 设定文件
* @return void 返回类型
*/
void setMessage2Redis(String allRedisKey, List<JSONObject> messages, int allkeywordcount);
/**
* 获取定向监测缓存Key
*
* @Title: getDirectRedisKey
* @Description: TODO(这里用一句话描述这个方法的作用)
* @param @param projectName
......@@ -106,8 +90,10 @@ public interface RedisService {
* @return String 返回类型
*/
String getDirectRedisKey(String projectName, String groupName, String pt);
/**
* 获取定向监测缓存rsidmapKey
*
* @Title: getDirectRsidMapKey
* @Description: 获取定向监测缓存rsidmapKey
* @param @param projectName
......@@ -115,8 +101,10 @@ public interface RedisService {
* @return String 返回类型
*/
String getDirectRsidMapKey(String projectName);
/**
* 将事件存入缓存
*
* @Title: insertEvent
* @Description: 将事件存入缓存
* @param @param ob
......@@ -124,8 +112,10 @@ public interface RedisService {
* @return boolean 返回类型
*/
boolean insertEvent(JSONObject ob);
/**
* 向事件待处理队列中加入任务
*
* @Title: addEventAutoMarkList
* @Description: 向事件待处理队列中加入任务
* @param @param id
......@@ -133,16 +123,20 @@ public interface RedisService {
* @return boolean 返回类型
*/
boolean addEventAutoMarkList(String id);
/**
* 从事件待处理队列中获取前十个任务id
*
* @Title: getNeedAutoMark
* @Description: 从事件待处理队列中获取前十个任务id
* @param @return 设定文件
* @return List<String> 返回类型
*/
List<String> getNeedAutoMark();
/**
* 取事件采集缓存消息数量
*
* @Title: countCollectionData
* @Description: 取事件采集缓存消息数量
* @param @param id
......@@ -150,8 +144,10 @@ public interface RedisService {
* @return int 返回类型
*/
int countCollectionData(String id);
/**
* 按页数去事件采集中消息
*
* @Title: getCollectionData
* @Description: 按页数去事件采集中消息
* @param @param id
......@@ -161,8 +157,10 @@ public interface RedisService {
* @return Set<String> 返回类型
*/
Set<String> getCollectionData(String id, int start, int end);
/**
* 向事件待处理队列中移除任务
*
* @Title: removeEventAutoMarkList
* @Description: 向事件待处理队列中移除任务
* @param @param id
......@@ -170,8 +168,10 @@ public interface RedisService {
* @return boolean 返回类型
*/
boolean removeEventAutoMarkList(String id);
/**
* 判断url是否存在
*
* @param id
* @Title: existsCollectionHashByUrlkey
* @Description: 判断url是否存在
......@@ -180,8 +180,10 @@ public interface RedisService {
* @return boolean 返回类型
*/
boolean existsCollectionHashByUrlkey(String url, String id);
/**
* 按事件采集id删除事件采集urlHash
*
* @Title: dropCollectionHash
* @Description: 按事件采集id删除事件采集urlHash
* @param @param id
......@@ -189,8 +191,10 @@ public interface RedisService {
* @return boolean 返回类型
*/
boolean dropCollectionHash(String id);
/**
* 按追踪id添加已追踪消息
*
* @Title: addWarnMsg2Set
* @Description: 按追踪id添加已追踪消息
* @param @param id
......@@ -199,4 +203,38 @@ public interface RedisService {
*/
boolean addWarnMsg2Set(String id, JSONObject msg, int maxSize);
/**
* 创建7天redisKey值
*
* @Title: createSevenDayKey
* @Description: 创建7天redisKey值
* @param @param wAP_SEVEN_DAY_KEY
* @param @return 设定文件
* @return boolean 返回类型
*/
boolean createSevenDayKey(List<String> wapKeys);
/**
* 归类并插入MarkInfos到指定key
*
* @param columnIdMap
* @param start
* @Title: setMarkInfos
* @Description: 归类并插入MarkInfos到指定key
* @param @param searchInfos 设定文件
* @return void 返回类型
*/
void setMarkInfos(List<JSONObject> searchInfos, List<Map<String, Object>> columnIdMap, long start);
/**
* 更新MarkInfos到指定key,并移除旧key
*
* @Title: setMarkInfosUpdate
* @Description: 更新MarkInfos到指定key,并移除旧key
* @param @param searchInfos
* @param @param columnIdMap
* @return void 返回类型
*/
void setMarkInfosUpdate(List<JSONObject> searchInfos, List<Map<String, Object>> columnIdMap);
}
package com.zhiwei.messageflow.redis.service.impl;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
......@@ -20,11 +23,12 @@ import com.zhiwei.messageflow.bean.ZhihuMessage;
import com.zhiwei.messageflow.config.RedisConfig;
import com.zhiwei.messageflow.redis.RedisPoolAndTools;
import com.zhiwei.messageflow.redis.service.RedisService;
import redis.clients.jedis.Tuple;
import com.zhiwei.messageflow.util.MatchingInfoUtil;
import com.zhiwei.messageflow.util.TimeUtil;
@Component
public class RedisServiceImpl implements RedisService {
private static final Logger log = LogManager.getLogger(RedisServiceImpl.class);
@Autowired
private RedisPoolAndTools redisPoolAndTools;
......@@ -167,7 +171,7 @@ public class RedisServiceImpl implements RedisService {
JSONObject object = JSONObject.parseObject((new ArrayList<>(maxSizeSet)).get(0));
long commonid = object.getLongValue("commonid");
if (commonid > ESGetCommonId.START_COMMONID) {
redisPoolAndTools.zremrangebyscore(redisKey, 0L,ESGetCommonId.START_COMMONID);
redisPoolAndTools.zremrangebyscore(redisKey, 0L, ESGetCommonId.START_COMMONID);
} else {
redisPoolAndTools.removeDataByName(redisKey, removeIndex);
}
......@@ -272,4 +276,62 @@ public class RedisServiceImpl implements RedisService {
return true;
}
@Override
public boolean createSevenDayKey(List<String> wapKeys) {
wapKeys.stream().forEach(key -> {
redisPoolAndTools.setAndExpireKey(key, (26 * 3600 * 1000L));
});
return true;
}
@Override
public void setMarkInfos(List<JSONObject> searchInfos, List<Map<String, Object>> columnIdMap, long start) {
searchInfos.parallelStream().forEach(searchinfo -> {
// long s = System.currentTimeMillis();
// 获取存放key
List<String> keys = MatchingInfoUtil.getWapKeys(searchinfo, columnIdMap, start);
keys.stream().forEach(key -> {
Double score = MatchingInfoUtil.getWapSortSetScore(searchinfo);
redisPoolAndTools.sortedSetZadd(key, score, searchinfo.toJSONString());
});
// log.info("searchinfo处理,总耗时{}ms", (System.currentTimeMillis() - s));
});
}
@Override
public void setMarkInfosUpdate(List<JSONObject> searchInfos, List<Map<String, Object>> columnIdMap) {
searchInfos.parallelStream().forEach(searchinfo -> {
long s = System.currentTimeMillis();
String time = searchinfo.get("time") + "";
long timeL = 0L;
try {
Date markDate = TimeUtil.parseTime(time, "yyyy-MM-dd HH:mm:ss");
timeL = markDate.getTime();
} catch (Exception e) {
Date markDate = TimeUtil.parseTime(time, "yyyy-MM-dd HH:mm");
timeL = markDate.getTime();
}
long start = TimeUtil.getDayZeroTime(timeL);
// 获取移除key
List<String> removekeys = MatchingInfoUtil.getWapRemoveKeys(searchinfo, columnIdMap, start);
removekeys.stream().forEach(key -> {
Double sScore = MatchingInfoUtil.getWapSortSetStartScore(searchinfo);
Double eScore = MatchingInfoUtil.getWapSortSetEndScore(searchinfo);
if (redisPoolAndTools.exists(key)) {
redisPoolAndTools.zremrangeByScore(key, sScore, eScore);
}
});
// 获取存放key
List<String> keys = MatchingInfoUtil.getWapKeys(searchinfo, columnIdMap, start);
keys.stream().forEach(key -> {
Double score = MatchingInfoUtil.getWapSortSetScore(searchinfo);
if (redisPoolAndTools.exists(key)) {
redisPoolAndTools.sortedSetZadd(key, score, searchinfo.toJSONString());
}
});
// log.info("标注日期:{},mid:{},标注人:{}", searchinfo.getString("markDate"), searchinfo.getString("mid"),
// searchinfo.getString("markGroup"));
log.info("searchinfo处理,总耗时{}ms", (System.currentTimeMillis() - s));
});
}
}
package com.zhiwei.messageflow.service;
import java.util.List;
import java.util.Map;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.messageflow.mongo.bean.WapProject;
public interface WapService {
/**
* 获取全wapproject
*
* @Title: getWapAllProject
* @Description: 获取全wapproject
* @param @return 设定文件
* @return List<String> 返回类型
*/
List<WapProject> getWapAllProject();
/**
* 按系统项目获取全部栏目
*
* @param wapProjects
*
* @Title: getWapAllColumnIdByProject
* @Description: 按系统项目获取全部栏目
* @param @param projects
* @param @return 设定文件
* @return JSONObject 返回类型
*/
List<Map<String, Object>> getWapAllColumnIdByProject(List<String> projects, List<WapProject> wapProjects);
/**
* 获取7天时间
*
* @Title: getWapSevenDay
* @Description:获取7天时间
* @param @return 设定文件
* @return List<Long> 返回类型
*/
List<Long> getWapSevenDay();
/**
* 创建7天缓存
*
* @Title: Create7dWapInfo
* @Description: 创建7天缓存
* @param @param columnIdMap
* @param @param sProjects 设定文件
* @return void 返回类型
*/
void Create7dWapInfo(List<Map<String, Object>> columnIdMap, List<String> sProjects);
/**
* 周期性更新缓存
*
* @Title: Update7dWapInfo
* @Description: 周期性更新缓存
* @param @param columnIdMap
* @param @param sProjects 设定文件
* @return void 返回类型
*/
void Update7dWapInfo(List<Map<String, Object>> columnIdMap, List<String> sProjects);
}
package com.zhiwei.messageflow.service.impl;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.search.SearchHits;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.messageflow.es.dao.ESDao;
import com.zhiwei.messageflow.mongo.bean.TagGroup;
import com.zhiwei.messageflow.mongo.bean.WapProject;
import com.zhiwei.messageflow.redis.service.RedisService;
import com.zhiwei.messageflow.service.WapService;
import com.zhiwei.messageflow.task.AsyncTask;
import com.zhiwei.messageflow.util.TimeUtil;
import com.zhiwei.messageflow.util.Tools;
@Component
public class WapServiceImpl implements WapService {
private static final Logger log = LogManager.getLogger(WapServiceImpl.class);
@Autowired
@Qualifier(value = "primaryMongoTemplate")
protected MongoTemplate primaryMongoTemplate;
@Autowired
private ESDao esDao;
@Autowired
private RedisService redisService;
@Override
public List<WapProject> getWapAllProject() {
return primaryMongoTemplate.findAll(WapProject.class);
}
@Override
public List<Map<String, Object>> getWapAllColumnIdByProject(List<String> projects, List<WapProject> wapProjects) {
List<Map<String, Object>> cInfos = new ArrayList<>();
List<TagGroup> groups = primaryMongoTemplate
.find(new Query(Criteria.where("project").in(projects).and("tagName").is("情感倾向")), TagGroup.class);
for (int i = 0; i < wapProjects.size(); i++) {
Map<String, String> column = new HashMap<>();
WapProject wapProject = wapProjects.get(i);
for (int j = 0; j < wapProject.getProjects().size(); j++) {
String project = wapProject.getProjects().get(j);
TagGroup group = groups.stream().filter(p -> p.getProject().equals(project)).findFirst().get();
List<Map<String, Object>> tagdetials = group.getDetails();
for (Map<String, Object> tagDetail : tagdetials) {
Map<String, Object> td = new HashMap<String, Object>();
@SuppressWarnings({ "unchecked", "rawtypes" })
String name = new ArrayList(tagDetail.keySet()).get(0) + "";
td = (Map<String, Object>) tagDetail.get(name);
if (column.containsKey(name)) {
String columnId = column.get(name);
columnId += ("," + group.getId() + "=" + td.get("_id"));
column.put(name, columnId);
} else {
column.put(name, ("," + group.getId() + "=" + td.get("_id")));
}
}
}
cInfos.addAll(column.entrySet().stream().map(e -> {
Map<String, Object> cInfo = new HashMap<>();
cInfo.put("wapProject", wapProject.getId());
cInfo.put("name", e.getKey());
cInfo.put("columnId", Tools.sortTag(e.getValue()));
return cInfo;
}).collect(Collectors.toList()));
}
return cInfos;
}
@Override
public List<Long> getWapSevenDay() {
List<Long> res = new ArrayList<>();
Calendar calendar = Calendar.getInstance();
calendar.setTime(new Date());
calendar.set(Calendar.HOUR_OF_DAY, 0);
calendar.set(Calendar.MINUTE, 0);
calendar.set(Calendar.SECOND, 0);
calendar.set(Calendar.MILLISECOND, 0);
calendar.add(Calendar.DATE, -6);
Date start = calendar.getTime();
Long startTime = start.getTime();
for (int i = 0; i < 7; i++) {
res.add(startTime + (i * (24 * 3600 * 1000L)));
}
return res.stream().sorted(Comparator.reverseOrder()).collect(Collectors.toList());
}
@Override
public void Create7dWapInfo(List<Map<String, Object>> columnIdMap, List<String> sProjects) {
List<Long> sevenDay = getWapSevenDay();
// 获取SearchHits
for (int i = 0; i < sevenDay.size(); i++) {
long start = sevenDay.get(i);
long end = start + (24 * 3600 * 1000L);
log.info("第{}次数据存储,当前存储日期为{}", i, TimeUtil.formatDate(new Date(start)));
long s = System.currentTimeMillis();
// 获取存放的数据
List<JSONObject> searchInfos = esDao.getAllDayMark(start, end, sProjects, "time");
log.info("本次读取Wap数据量{},总耗时{}ms", searchInfos.size(), (System.currentTimeMillis() - s));
// 存储
redisService.setMarkInfos(searchInfos, columnIdMap, start);
log.info("第{}次数据存储结束,总耗时{}ms", i, (System.currentTimeMillis() - s));
}
}
@Override
public void Update7dWapInfo(List<Map<String, Object>> columnIdMap, List<String> sProjects) {
long s = System.currentTimeMillis();
// 获取存放的数据
long end = System.currentTimeMillis();
long start = System.currentTimeMillis() - (20 * 60 * 1000L);
List<JSONObject> searchInfos = esDao.getAllDayMark(start, end, sProjects, "markDate");
log.info("本次读取Wap数据量{},总耗时{}ms", searchInfos.size(), (System.currentTimeMillis() - s));
// 存储
redisService.setMarkInfosUpdate(searchInfos, columnIdMap);
}
}
package com.zhiwei.messageflow.task;
import java.util.List;
import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import com.zhiwei.messageflow.service.WapService;
import com.zhiwei.messageflow.util.Tools;
@Component
public class AsyncTask {
private static final Logger log = LogManager.getLogger(AsyncTask.class);
@Autowired
private WapService wapService;
@Async("asyncServiceExecutor")
public void doTaskUpdate7dWapInfo(List<Map<String, Object>> columnIdMap, List<String> sProjects) throws Exception {
log.info("Wap Task started.");
long s = System.currentTimeMillis();
wapService.Update7dWapInfo(columnIdMap,sProjects);
Tools.sleep(10);
log.info("Wap Task finished, time elapsed: {} ms.", (System.currentTimeMillis() - s));
}
@Async("asyncServiceExecutor")
public void doTaskCreate7dWapInfo(List<Map<String, Object>> columnIdMap, List<String> sProjects) {
wapService.Create7dWapInfo(columnIdMap,sProjects);
log.info("7天存储完毕");
}
}
package com.zhiwei.messageflow.task;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
@EnableAsync
public class TaskPoolConfig {
private static final Logger log = LogManager.getLogger(TaskPoolConfig.class);
@Bean
public Executor asyncServiceExecutor() {
log.info("start asyncServiceExecutor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 配置核心线程数
executor.setCorePoolSize(5);
// 配置最大线程数
executor.setMaxPoolSize(5);
// 配置队列大小
executor.setQueueCapacity(99999);
// 配置线程池中的线程的名称前缀
executor.setThreadNamePrefix("async-service-");
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 执行初始化
executor.initialize();
return executor;
}
}
......@@ -14,6 +14,7 @@ import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.common.text.Text;
......@@ -26,6 +27,7 @@ import org.apache.logging.log4j.LogManager;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.zhiwei.messageflow.config.RedisConfig;
import redis.clients.jedis.Tuple;
......@@ -34,16 +36,44 @@ public class MatchingInfoUtil {
private static ObjectMapper mapper = new ObjectMapper();
/**
* SearchHIT转jsonobject Mark
*
* @Title: getMarkBean
* @Description: SearchHIT转jsonobject
* @param @param s
* @param @return 设定文件
* @return JSONObject 返回类型
*/
public static JSONObject getMarkBean(SearchHit searchHit) {
/**
* ES数据读取
*/
Map<String, Object> sourceHitMap = searchHit.getSource();
String message;
JSONObject res = new JSONObject();
try {
message = mapper.writeValueAsString(sourceHitMap);
res = JSONObject.parseObject(message);
// if(res.getString("type").indexOf("微博")==-1) {
// System.out.println(res.toJSONString());
// }
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return res;
}
/**
* SearchHIT转jsonobject
*
* @Title: getBean
* @Description: TODO(这里用一句话描述这个方法的作用)
* @param @param
* searchHit
* @param @return
* 设定文件
* @param @param searchHit
* @param @return 设定文件
* @return JSONObject 返回类型
*/
public static JSONObject getBean(SearchHit searchHit) {
......@@ -389,16 +419,11 @@ public class MatchingInfoUtil {
*
* @Title: mediaPt
* @Description: TODO(这里用一句话描述这个方法的作用)
* @param @param
* type
* @param @param
* mid
* @param @param
* source
* @param @param
* pt
* @param @return
* 设定文件
* @param @param type
* @param @param mid
* @param @param source
* @param @param pt
* @param @return 设定文件
* @return boolean 返回类型
*/
public static boolean mediaPt(String type, String mid, String source, String pt) {
......@@ -454,14 +479,10 @@ public class MatchingInfoUtil {
*
* @Title: getMediaPt
* @Description: TODO(这里用一句话描述这个方法的作用)
* @param @param
* type
* @param @param
* mid
* @param @param
* source
* @param @return
* 设定文件
* @param @param type
* @param @param mid
* @param @param source
* @param @return 设定文件
* @return String 返回类型
*/
public static String getMediaPt(String type, String mid, String source) {
......@@ -494,14 +515,10 @@ public class MatchingInfoUtil {
*
* @Title: getBriefMediaPt
* @Description: TODO(这里用一句话描述这个方法的作用)
* @param @param
* type
* @param @param
* mid
* @param @param
* source
* @param @return
* 设定文件
* @param @param type
* @param @param mid
* @param @param source
* @param @return 设定文件
* @return String 返回类型
*/
public static String getBriefMediaPt(String type, String mid, String source) {
......@@ -547,4 +564,143 @@ public class MatchingInfoUtil {
return res;
}
public static List<String> getWapRemoveKeys(JSONObject searchinfo, List<Map<String, Object>> columnIdMap,
long time) {
List<String> res = new ArrayList<>();
String markTag = searchinfo.getString("markTag");
if (!Tools.isEmpty(markTag) && !Tools.isEmpty(searchinfo.getString("type"))
&& !Tools.isEmpty(searchinfo.getString("mid"))) {
String classification = getWapClassification(searchinfo.getString("type"), searchinfo.getString("mid"));
columnIdMap.stream().forEach(map -> {
List<String> tags = getMarkTags(markTag);
String project = map.get("wapProject") + "";
String columnId = map.get("columnId") + "";
for (String tag : tags) {
String key = RedisConfig.WAPKEY + time + ":" + project + "_" + classification + "_" + columnId;
String allkey = RedisConfig.WAPKEY + time + ":" + project + "_全部_" + columnId;
res.add(key);
res.add(allkey);
}
});
}
return res;
}
public static List<String> getWapKeys(JSONObject searchinfo, List<Map<String, Object>> columnIdMap, long time) {
List<String> res = new ArrayList<>();
String markTag = searchinfo.getString("markTag");
if (!Tools.isEmpty(markTag) && !Tools.isEmpty(searchinfo.getString("type"))
&& !Tools.isEmpty(searchinfo.getString("mid"))) {
String classification = getWapClassification(searchinfo.getString("type"), searchinfo.getString("mid"));
columnIdMap.stream().forEach(map -> {
List<String> tags = getMarkTags(markTag);
String project = map.get("wapProject") + "";
String columnId = map.get("columnId") + "";
for (String tag : tags) {
if ((columnId).contains(tag)) {
String key = RedisConfig.WAPKEY + time + ":" + project + "_" + classification + "_" + columnId;
String allkey = RedisConfig.WAPKEY + time + ":" + project + "_全部_" + columnId;
res.add(key);
res.add(allkey);
}
}
});
}
return res;
}
private static String getWapClassification(String type, String url) {
if (type.indexOf("微博") != -1) {
return "微博";
} else if (url.indexOf("mp.weixin.qq.com") != -1 || url.indexOf("weixin.sogou.com") != -1) {
return "微信";
} else if (url.indexOf("toutiao.com") != -1) {
return "今日头条";
} else {
return "网媒";
}
}
/**
* 一组tag String转List
*
* @Title: getMarkTags
* @Description: 一组tag String转List
* @param @param MarkTag
* @param @return 设定文件
* @return List<String> 返回类型
*/
public static List<String> getMarkTags(String MarkTag) {
Map<String, String> result = new HashMap<String, String>();
String regex = ",";
String regexTwo = "=";
String[] sArray = MarkTag.split(regex);
for (String i : sArray) {
if (!i.equals("") && (i.indexOf(regexTwo) != -1)) {
String[] iArray = i.split(regexTwo);
String groupid = iArray[0];
String tagid = iArray[1];
result.put(groupid, tagid);
}
}
return result.entrySet().stream().map(e -> {
return "," + e.getKey() + "=" + e.getValue();
}).collect(Collectors.toList());
}
/**
* Wap 获取score值
*
* @Title: getWapSortSetScore
* @Description: Wap 获取score值
* @param @param searchinfo
* @param @return 设定文件
* @return Double 返回类型
*/
public static Double getWapSortSetScore(JSONObject searchinfo) {
String time = searchinfo.getString("time");
String markrsid = searchinfo.getString("markrsid");
try {
Date markDate = TimeUtil.parseTime(time, "yyyy-MM-dd HH:mm:ss");
long timeL = markDate.getTime() / 1000 / 60;
return Double.valueOf(timeL + "." + markrsid);
} catch (Exception e) {
Date markDate = TimeUtil.parseTime(time, "yyyy-MM-dd HH:mm");
long timeL = markDate.getTime() / 1000 / 60;
return Double.valueOf(timeL + "." + markrsid);
}
}
public static Double getWapSortSetStartScore(JSONObject searchinfo) {
String time = searchinfo.getString("time");
String markrsid = searchinfo.getString("markrsid")+"0";
long mrLong = Long.parseLong(markrsid)-1L;
markrsid = String.valueOf(mrLong);
try {
Date markDate = TimeUtil.parseTime(time, "yyyy-MM-dd HH:mm:ss");
long timeL = markDate.getTime() / 1000 / 60;
return Double.valueOf(timeL + "." + markrsid);
} catch (Exception e) {
Date markDate = TimeUtil.parseTime(time, "yyyy-MM-dd HH:mm");
long timeL = markDate.getTime() / 1000 / 60;
return Double.valueOf(timeL + "." + markrsid);
}
}
public static Double getWapSortSetEndScore(JSONObject searchinfo) {
String time = searchinfo.getString("time");
String markrsid = searchinfo.getString("markrsid")+"0";
long mrLong = Long.parseLong(markrsid)+1L;
markrsid = String.valueOf(mrLong);
try {
Date markDate = TimeUtil.parseTime(time, "yyyy-MM-dd HH:mm:ss");
long timeL = markDate.getTime() / 1000 / 60;
return Double.valueOf(timeL + "." + markrsid);
} catch (Exception e) {
Date markDate = TimeUtil.parseTime(time, "yyyy-MM-dd HH:mm");
long timeL = markDate.getTime() / 1000 / 60;
return Double.valueOf(timeL + "." + markrsid);
}
}
}
......@@ -3,6 +3,10 @@ package com.zhiwei.messageflow.util;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
......@@ -51,8 +55,7 @@ public class TimeUtil {
/**
* 判断是否是日期格式(yyyy-mm-dd)
*
* @param input
* 日期字符串
* @param input 日期字符串
* @return
*/
public static boolean isDate(String input) {
......@@ -78,8 +81,7 @@ public class TimeUtil {
/**
* 判断是否是时间格式(yy-MM-dd HH:mm:ss)
*
* @param input
* 时间字符串
* @param input 时间字符串
* @return
*/
public static boolean isDatetime(String input) {
......@@ -269,8 +271,7 @@ public class TimeUtil {
*
* @Title: getTimePointToNowOrEnd
* @Description: 获取开始到结束时间的时间点,按小时
* @param @return
* 设定文件
* @param @return 设定文件
* @return List<String> 返回类型
* @throws ParseException
*/
......@@ -315,10 +316,8 @@ public class TimeUtil {
/**
* 根据格式化字符串,格式化日期
*
* @param Date
* 时间
* @param partern
* 格式
* @param Date 时间
* @param partern 格式
* @return
*/
public static String formatDate(Date date, String pattern) {
......@@ -353,10 +352,8 @@ public class TimeUtil {
/**
* 根据格式化字符串,格式化日期
*
* @param Date
* 时间
* @param partern
* 格式
* @param Date 时间
* @param partern 格式
* @return
*/
public static synchronized String formatEsDate(Date date) {
......@@ -450,8 +447,7 @@ public class TimeUtil {
/**
* 当前日期偏移运算(增、减几日)
*
* @param skipDate
* 年偏移量
* @param skipDate 年偏移量
* @return Date 偏移日期
*/
public static Date getSkipYear(Date date, int skipYear) {
......@@ -464,8 +460,7 @@ public class TimeUtil {
/**
* 当前日期偏移运算(增、减几日)
*
* @param skipDate
* 日偏移量
* @param skipDate 日偏移量
* @return Date 偏移日期
*/
public static Date getSkipTime(int skipDay) {
......@@ -476,8 +471,7 @@ public class TimeUtil {
* 某一时间的偏移运算(增、减几日)
*
* @param Date
* @param skipDate
* 日偏移量
* @param skipDate 日偏移量
* @return Date 偏移日期
*/
public static synchronized Date getSkipTime(Date date, int skipDay) {
......@@ -490,14 +484,10 @@ public class TimeUtil {
/**
* 当前时间的偏移运算(增、减几日、几小时、几分、几秒)
*
* @param skipDate
* 日偏移量
* @param skipHour
* 偏移时
* @param skipMinute
* 偏移分
* @param skipSecond
* 偏移秒
* @param skipDate 日偏移量
* @param skipHour 偏移时
* @param skipMinute 偏移分
* @param skipSecond 偏移秒
* @return Date 偏移日期
*/
public static Date getSkipTime(int skipDay, int skipHour, int skipMinute, int skipSecond) {
......@@ -507,16 +497,11 @@ public class TimeUtil {
/**
* 某一时间的偏移运算(增、减几日、几小时、几分)
*
* @param date
* 原时间
* @param skipDate
* 日偏移量
* @param skipHour
* 偏移时
* @param skipMinute
* 偏移分
* @param skipSecond
* 偏移秒
* @param date 原时间
* @param skipDate 日偏移量
* @param skipHour 偏移时
* @param skipMinute 偏移分
* @param skipSecond 偏移秒
* @return Date 偏移时间
*/
public static Date getSkipTime(Date date, int skipDay, int skipHour, int skipMinute, int skipSecond) {
......@@ -530,18 +515,12 @@ public class TimeUtil {
/**
* 某一时间的偏移运算
*
* @param date
* 原时间
* @param skipYear
* 年偏移量
* @param skipMonth
* 月偏移量
* @param skipDay
* 日偏移量
* @param skipHour
* 小时偏移量
* @param skipMinute
* 分偏移量
* @param date 原时间
* @param skipYear 年偏移量
* @param skipMonth 月偏移量
* @param skipDay 日偏移量
* @param skipHour 小时偏移量
* @param skipMinute 分偏移量
* @return 偏移时间
*/
public static synchronized Date getSkipTime(Date date, int skipYear, int skipMonth, int skipDay, int skipHour,
......@@ -568,10 +547,8 @@ public class TimeUtil {
/**
* 计算日期相差小时数
*
* @param base
* 比较基准时间
* @param compare
* 比较时间
* @param base 比较基准时间
* @param compare 比较时间
* @return long 时偏差
*/
public static long getSubhour(Date base, Date compare) {// subtrahend
......@@ -585,10 +562,8 @@ public class TimeUtil {
/**
* 计算日期相差几天
*
* @param base
* 比较基准时间
* @param compare
* 比较时间
* @param base 比较基准时间
* @param compare 比较时间
* @return long 日偏差
*/
public static long getSubday(Date base, Date compare) {// subtrahend
......@@ -610,8 +585,7 @@ public class TimeUtil {
/**
* 返回日期属性字段值,字段定义见 com.util.Calendar,
*
* @param date
* 当前日期
* @param date 当前日期
* @param field
* @see com.util.Calendar
* @return
......@@ -661,8 +635,7 @@ public class TimeUtil {
/**
* 爬虫专用处理(xxxx年xx月xx日 xx:xx:xx 、xx月xx日 xx:xx:xx、xx小时前、xx分钟前、xx秒前等不规则时间)
*
* @param String
* date
* @param String date
* @return Date
*/
public static String conversionTime(String time) {
......@@ -730,10 +703,8 @@ public class TimeUtil {
/**
* @Title: isNum
* @Description: 验证是否为时间戳
* @param @param
* time
* @param @return
* 设定文件
* @param @param time
* @param @return 设定文件
* @return boolean 返回类型
*/
public static boolean isNum(String time) {
......@@ -749,10 +720,8 @@ public class TimeUtil {
/**
* @Title: FormatDate
* @Description: 处理时间格式
* @param @param
* dateStr
* @param @return
* 设定文件
* @param @param dateStr
* @param @return 设定文件
* @return String 返回类型
*/
@SuppressWarnings("finally")
......@@ -807,4 +776,19 @@ public class TimeUtil {
}
}
/**
* 获取当天零点时间
*
* @Title: getDayZeroTime
* @Description: 获取当天零点时间
* @param @param time
* @param @return 设定文件
* @return long 返回类型
*/
public static long getDayZeroTime(long time) {
LocalDateTime localDateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(time), ZoneId.systemDefault());
LocalDateTime startOfDay = localDateTime.with(LocalTime.MIN);
return startOfDay.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
}
}
......@@ -20,6 +20,8 @@ import java.security.MessageDigest;
import java.security.SecureRandom;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
......@@ -157,8 +159,7 @@ public class Tools {
* @Title: getFileType
* @Description: TODO(判断文件类型)
* @param type
* @param @return
* 设定文件ccc
* @param @return 设定文件ccc
* @return String 返回类型
*/
public static String getFileType(String path) {
......@@ -181,8 +182,7 @@ public class Tools {
* @Title: getMIME
* @Description: TODO(获取MIME类型)
* @param type
* @param @return
* 设定文件
* @param @return 设定文件
* @return String 返回类型
*/
public static String getMIME(String type) {
......@@ -216,8 +216,7 @@ public class Tools {
* @Description: TODO(将对象保存到文件)
* @param path
* @param obj
* @param @return
* 设定文件
* @param @return 设定文件
* @return void 返回类型
*/
public static void saveObjectToFile(String path, Object obj) {
......@@ -237,8 +236,7 @@ public class Tools {
* @Title: readObjectInFile
* @Description: TODO(读取文件中存储的对象)
* @param path
* @param @return
* 设定文件
* @param @return 设定文件
* @return Object 返回类型
*/
public static Object readObjectInFile(String path) {
......@@ -259,8 +257,7 @@ public class Tools {
* @Title: getEncode
* @Description: TODO(判断字符串编码)
* @param path
* @param @return
* 设定文件
* @param @return 设定文件
* @return String 返回类型
*/
public static String getEncode(String path) {
......@@ -302,8 +299,7 @@ public class Tools {
* @Title: validKey
* @Description: TODO(验证下载校验码)
* @param type
* @param @return
* 设定文件
* @param @return 设定文件
* @return String 返回类型
*/
public static boolean validKey(String id, String token) {
......@@ -336,8 +332,7 @@ public class Tools {
* @Title: generateKey
* @Description: TODO(生成下载校验码)
* @param type
* @param @return
* 设定文件
* @param @return 设定文件
* @return String 返回类型
*/
public static String generateKey(String id) {
......@@ -452,8 +447,7 @@ public class Tools {
* @Title: md5
* @Description: 计算字符串Md5
* @param str
* @param @return
* 设定文件
* @param @return 设定文件
* @return String 返回类型
*/
public static String md5(String str) {
......@@ -507,8 +501,7 @@ public class Tools {
* @Title: replaceBlank
* @Description: TODO(去除特殊字符)
* @param str
* @param @return
* 设定文件
* @param @return 设定文件
* @return String 返回类型
*/
public static String replaceBlank(Object str) {
......@@ -526,8 +519,7 @@ public class Tools {
* @Title: decodeUnicode
* @Description: TODO(unicode 转换成 中文)
* @param theString
* @param @return
* 设定文件
* @param @return 设定文件
* @return String 返回类型
*/
public static String decodeUnicode(String theString) {
......@@ -599,8 +591,7 @@ public class Tools {
* @Title: percentMap
* @Description: TODO(计算百分比,保留两位小数)
* @param map
* @param @return
* 设定文件
* @param @return 设定文件
* @return Map<String,String> 返回类型
*/
public static Map<String, String> percentMap(Map<String, Integer> map) {
......@@ -646,8 +637,7 @@ public class Tools {
* @Description: TODO(根据文件名和路径获取文件)
* @param path
* @param fileName
* @param @return
* 设定文件
* @param @return 设定文件
* @return File 返回类型
*/
public static File getFile(String path, String fileName) {
......@@ -694,10 +684,8 @@ public class Tools {
*
* @Title: deleteFile
* @Description:删除文件
* @param @param
* path
* @param @return
* 设定文件
* @param @param path
* @param @return 设定文件
* @return Boolean 返回类型
*/
public static Boolean deleteFile(String path) {
......@@ -750,8 +738,7 @@ public class Tools {
* @Title: changeVtype
* @Description: TODO(微博认证类型转换)
* @param vtype
* @param @return
* 设定文件
* @param @return 设定文件
* @return String 返回类型
*/
public static String changeVtype(int vtype) {
......@@ -805,8 +792,7 @@ public class Tools {
* @Title: getWechatTsn
* @Description: TODO(更具相差多少天匹配微信采集参数)
* @param day
* @param @return
* 设定文件
* @param @return 设定文件
* @return int 返回类型
*/
public static int getWechatTsn(int day) {
......@@ -825,8 +811,7 @@ public class Tools {
* @Title: getWorksByFile
* @Description: TODO(通过文件名获取文件中的关键词)
* @param file
* @param @return
* 设定文件
* @param @return 设定文件
* @return List<String> 返回类型
*/
public static List<String> getWorksByFile(File file) {
......@@ -860,8 +845,7 @@ public class Tools {
* @Title: filterSpecialCharacter
* @Description: TODO(过滤特殊字符)
* @param str
* @param @return
* 设定文件
* @param @return 设定文件
* @return String 返回类型
*/
public static String filterSpecialCharacter(String str) {
......@@ -881,12 +865,9 @@ public class Tools {
*
* @Title: approximateStringMatching
* @Description: TODO(这里用一句话描述这个方法的作用)
* @param @param
* beMatching
* @param @param
* mathingWord
* @param @return
* 设定文件
* @param @param beMatching
* @param @param mathingWord
* @param @return 设定文件
* @return boolean 返回类型
*/
public static boolean approximateStringMatching(String beMatching, String mathingWord) {
......@@ -912,10 +893,8 @@ public class Tools {
*
* @Title: convertPercent
* @Description: TODO(这里用一句话描述这个方法的作用)
* @param @param
* str
* @param @return
* 设定文件
* @param @param str
* @param @return 设定文件
* @return String 返回类型
*/
public static String convertPercent(String str) {
......@@ -968,8 +947,7 @@ public class Tools {
/**
* @Title: isIncludeDay
* @Description: 判断截止时间是否在当天
* @param @param
* endtime(yyyy-mm-dd )
* @param @param endtime(yyyy-mm-dd )
* @return @return boolean
*/
public static boolean isIncludeDay(String endtime) {
......@@ -989,12 +967,9 @@ public class Tools {
*
* @Title: eventIsHitkeyword
* @Description: "xxx xx,xx xxx"内容判断是否命中关键词
* @param @param
* content
* @param @param
* keyword
* @param @return
* 设定文件
* @param @param content
* @param @param keyword
* @param @return 设定文件
* @return boolean 返回类型
*/
public static boolean eventIsHitkeyword(String content, String keyword) {
......@@ -1026,14 +1001,10 @@ public class Tools {
*
* @Title: getSchedule
* @Description: 计算进度
* @param @param
* part
* @param @param
* all
* @param @param
* max
* @param @return
* 设定文件
* @param @param part
* @param @param all
* @param @param max
* @param @return 设定文件
* @return int 返回类型
*/
public static int getSchedule(int part, int all, int max) {
......@@ -1046,10 +1017,8 @@ public class Tools {
*
* @Title: isEmpty
* @Description: 非空判断
* @param @param
* object
* @param @return
* 设定文件
* @param @param object
* @param @return 设定文件
* @return boolean 返回类型
*/
@SuppressWarnings("rawtypes")
......@@ -1066,8 +1035,10 @@ public class Tools {
return false;
}
/**
* 去除高亮<font style='font-weight:bold;' color='#ff6c60'></font>
*
* @Title: replaceHighLight
* @Description: 去除高亮<font style='font-weight:bold;' color='#ff6c60'></font>
* @param @param res
......@@ -1193,4 +1164,20 @@ public class Tools {
return ret;
}
/**
* 排序多个标签,防止相同标签不同顺序,被认为是两种标签
*
* @param tag
* @return
*/
public static String sortTag(String tag) {
String tags[] = tag.split(",");
if (null != tags && tags.length > 2) {
List<String> tempTagList = Arrays.asList(tags).subList(1, tags.length);
tempTagList.sort(Comparator.naturalOrder());
return "," + StringUtils.join(tempTagList, ",");
}
return tag;
}
}
......@@ -7,6 +7,8 @@ redis.ip = 192.168.0.202
redis.port=6380
#redis.ip=202.107.192.94
#redis.port=6479
#redis.ip=192.168.0.35
#redis.port=7370
#redis.ip=127.0.0.1
#redis.port=6379
#redis.ip=192.168.1.74
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment