Commit 520c6fd1 by shentao

2018/10/20 消息流事件采集自动标注聚合模块添加

parent 832c8657
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" <project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
...@@ -71,6 +72,11 @@ ...@@ -71,6 +72,11 @@
<version>${springboot.version}</version> <version>${springboot.version}</version>
<optional>true</optional> <optional>true</optional>
</dependency> </dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${springboot.version}</version>
</dependency>
<!-- es --> <!-- es -->
<dependency> <dependency>
...@@ -189,9 +195,8 @@ ...@@ -189,9 +195,8 @@
<dependency> <dependency>
<groupId>com.zhiwei.middleware</groupId> <groupId>com.zhiwei.middleware</groupId>
<artifactId>automaticmark-client</artifactId> <artifactId>automaticmark-client</artifactId>
<version>1.0.1-SNAPSHOT</version> <version>1.0.4-SNAPSHOT</version>
</dependency> </dependency>
</dependencies> </dependencies>
<build> <build>
......
...@@ -36,12 +36,15 @@ public class ES4RedisRunner implements ApplicationRunner { ...@@ -36,12 +36,15 @@ public class ES4RedisRunner implements ApplicationRunner {
ESGetCommonId esGetCommonId = ApplicationContextProvider.getBean("ESGetCommonId", ESGetCommonId.class); ESGetCommonId esGetCommonId = ApplicationContextProvider.getBean("ESGetCommonId", ESGetCommonId.class);
esGetCommonId.getCommonId(); esGetCommonId.getCommonId();
//启动时更新事件等待采集列表中的前十个任务状态为采集完毕
/** /**
* redis存入缓存 * redis存入缓存
*/ */
// 手动注入bean ES4RedisStart // 手动注入bean ES4RedisStart
ES4RedisStart start = ApplicationContextProvider.getBean("ES4RedisStart", ES4RedisStart.class); ES4RedisStart start = ApplicationContextProvider.getBean("ES4RedisStart", ES4RedisStart.class);
DES4RedisStart directstart = ApplicationContextProvider.getBean("DES4RedisStart", DES4RedisStart.class); DES4RedisStart directstart = ApplicationContextProvider.getBean("DES4RedisStart", DES4RedisStart.class);
EVENT4RedisStart eventStart = ApplicationContextProvider.getBean("EVENT4RedisStart", EVENT4RedisStart.class);
// 定时器 // 定时器
Timer timer = new Timer(); Timer timer = new Timer();
...@@ -52,6 +55,7 @@ public class ES4RedisRunner implements ApplicationRunner { ...@@ -52,6 +55,7 @@ public class ES4RedisRunner implements ApplicationRunner {
try { try {
start.startThread(); start.startThread();
directstart.startThread(); directstart.startThread();
eventStart.startThread();
} catch (Exception e) { } catch (Exception e) {
log.error("主定时器异常{}{}",e.getMessage(),e.getStackTrace()); log.error("主定时器异常{}{}",e.getMessage(),e.getStackTrace());
} }
......
package com.zhiwei.messageflow;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.zhiwei.messageflow.bean.Event;
import com.zhiwei.messageflow.mongo.bean.Project;
import com.zhiwei.messageflow.redis.service.RedisService;
import com.zhiwei.messageflow.service.EventService;
public class EVENT4RedisStart {
private final static Logger log = LogManager.getLogger(EVENT4RedisStart.class);
@Autowired
protected RedisService redisService;
@Autowired
private EventService eventService;
private EVENTAutoMarkTask eventAutoMark;
public void startThread() throws JsonParseException, JsonMappingException, IOException, InterruptedException {
List<String> idList = redisService.getNeedAutoMark();
List<Event> events = null!=eventService.findEventsbyIds(idList)?eventService.findEventsbyIds(idList):new ArrayList<>();
for (int i = 0; i < events.size(); i++) {
Event eventCollection = events.get(i);
if(eventCollection.getStatus().equals("采集完毕")) {
//优先更新状态为自动标注
eventService.updateEventStatusAutoMark(eventCollection.getId());
// 获取线程
EVENTAutoMarkThread eventAutoMarkThread = EVENTAutoMarkThread.getThread(eventCollection,eventAutoMark);
// ES4RedisThread es4RedisThread = ES4RedisThread.getThread(project.getProjectName(), project,
// allplatformNames, es4RedisTask);
// 获取线程失败
if (eventAutoMarkThread == null) {
log.warn("{}事件采集自动标注获取线程失败", eventCollection.getId());
continue;
}
// 线程启动
eventAutoMarkThread.start();
}
}
}
}
package com.zhiwei.messageflow;
import java.io.IOException;
import org.springframework.beans.factory.annotation.Autowired;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.zhiwei.messageflow.bean.Event;
import com.zhiwei.messageflow.service.EventService;
public class EVENTAutoMarkTask {
@Autowired
private EventService eventService;
public boolean eventCollectionAutoMark(Event eventCollection) throws JsonParseException, JsonMappingException, IOException {
return eventService.autoMark(eventCollection);
}
}
package com.zhiwei.messageflow;
import java.io.IOException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.zhiwei.messageflow.bean.Event;
public class EVENTAutoMarkThread extends Thread{
private final static Logger log = LogManager.getLogger(EVENTAutoMarkThread.class);
// 线程
private Thread t;
// 线程名
private String threadName;
private Event eventCollection;
private EVENTAutoMarkTask eventAutoMark;
public EVENTAutoMarkThread(Event eventCollection, EVENTAutoMarkTask eventAutoMark2) {
this.threadName = eventCollection.getId();
this.eventCollection = eventCollection;
this.eventAutoMark = eventAutoMark;
}
public static EVENTAutoMarkThread getThread(Event eventCollection,EVENTAutoMarkTask eventAutoMark) {
EVENTAutoMarkThread es4RedisThread = new EVENTAutoMarkThread(eventCollection,eventAutoMark);
return es4RedisThread;
}
public void start() {
// 线程开始
log.info("Starting 事件采集进入自动标注 {}", threadName);
if (t == null) {
t = new Thread(this, threadName);
// 通知执行run方法
t.start();
}
}
@SuppressWarnings("static-access")
@Override
public void run() {
try {
Thread.sleep(10L);
// 程序运行
log.info("Running 事件采集进入自动标注 {}", threadName);
// 该项目执行事件采集存储
boolean flag = eventAutoMark.eventCollectionAutoMark(eventCollection);
if (!flag) {
Thread.currentThread().interrupted();
log.error("{}事件采集进入自动标注出现异常,线程状态:{}",threadName,Thread.currentThread().isInterrupted());
}
}catch (JsonParseException e) {
e.printStackTrace();
} catch (JsonMappingException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 线程退出
log.info("Thread 事件采集进入自动标注 {} exiting.", threadName);
}
}
}
package com.zhiwei.messageflow.bean;
import java.util.HashMap;
import java.util.List;
import org.springframework.data.mongodb.core.mapping.Document;
import lombok.Data;
import lombok.ToString;
/**
* 事件采集Bean
* @ClassName: Event
* @Description: 事件采集Bean
* @author shentao
* @date 2018年10月19日 上午11:18:59
*/
@Data
@ToString
@Document(collection = "qbjc_eventCollection")
public class Event {
/**
* 事件采集id
*/
private String id;
/**
* 事件采集Name
*/
private String name;
/**
* 事件采集开始时间
*/
private Long startTime;
/**
* 事件采集结束时间
*/
private Long endTime;
/**
* 事件采集关键词
*/
private String keyword;
/**
* 事件采集提交人
*/
private String submitter;
/**
* 事件采集项目
*/
private String project;
/**
* 事件采集创建时间
*/
private Long createAt;
/**
* 事件采集更新时间
*/
private Long updateAt;
/**
* 事件采集状态status
*/
private String status;
/**
* 事件采集细节结果detials
*/
private String detials;
/**
* 事件采集结果下载地址fileAddress
*/
private String fileAddress;
}
package com.zhiwei.messageflow.bean;
import lombok.Data;
import lombok.ToString;
@Data
@ToString
public class EventMsg {
/**
* 消息url
*/
private String url;
/**
* 消息标题title
*/
private String title;
/**
* 消息内容
*/
private String content;
/**
* 消息类型
*/
private String type;
/**
* 消息id ,所属事件id
*/
private String id;//id 事件id,eventId
/**
* 消息来源
*/
private String source;
/**
* 消息时间
*/
private Long time;
/**
* 消息采集状态
*/
private String result;//事件采集状态
}
...@@ -16,6 +16,7 @@ import lombok.ToString; ...@@ -16,6 +16,7 @@ import lombok.ToString;
@ConfigurationProperties(prefix = "redis") @ConfigurationProperties(prefix = "redis")
@PropertySource(value = "classpath:redis.properties") @PropertySource(value = "classpath:redis.properties")
public class RedisConfig { public class RedisConfig {
private int maxTotal; private int maxTotal;
private int maxIdle; private int maxIdle;
private int maxWaitMillis; private int maxWaitMillis;
...@@ -24,7 +25,9 @@ public class RedisConfig { ...@@ -24,7 +25,9 @@ public class RedisConfig {
private String ip; private String ip;
private int port; private int port;
private String password; private String password;
public static String DIRECTKEY = "Direct:"; public static final String DIRECTKEY = "Direct:";
public static final String EVENTKEY = "Event:";
public static final String EVENTLISTKEY = "Event:EventList";
private int keyMaxSize; private int keyMaxSize;
......
package com.zhiwei.messageflow.kafka;
import java.util.List;
import java.util.Optional;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import com.zhiwei.messageflow.service.EventService;
@Component
public class KafkaConsumer {
private static final Logger log = LogManager.getLogger(KafkaConsumer.class);
@Autowired
private EventService eventService;
// @KafkaListener(topics = {"qbjc_event_topic"})
// public void listen(ConsumerRecord<?, ?> record) {
// Optional<?> kafkaMessage = Optional.ofNullable(record.value());
// if (kafkaMessage.isPresent()) {
// Object message = kafkaMessage.get();
// eventService.saveEventMsg(message);
// L.i("record =" + record);
// L.i("message =" + message);
// }
// }
@KafkaListener(topics = { "qbjc_event_topic" }, containerFactory = "kafkaListenerContainerFactory")
public void consumerMsg(List<ConsumerRecord> records, Acknowledgment ack) {
try {
// eventService.saveRecords(records);
for (ConsumerRecord record : records) {
log.info("record = {}" , record);
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
eventService.saveEventMsg(message,record.offset());
}
}
} catch (Exception e) {
log.error("KafkaConsumer:{}",e);
} finally {
ack.acknowledge();//手动提交偏移量
}
}
}
package com.zhiwei.messageflow.kafka;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${kafka.consumer.servers}")
private String servers;
@Value("${kafka.consumer.enable.auto.commit}")
private boolean enableAutoCommit;
@Value("${kafka.consumer.session.timeout}")
private String sessionTimeout;
@Value("${kafka.consumer.auto.commit.interval}")
private String autoCommitInterval;
@Value("${kafka.consumer.group.id}")
private String groupId;
@Value("${kafka.consumer.auto.offset.reset}")
private String autoOffsetReset;
@Value("${kafka.consumer.concurrency}")
private int concurrency;
@Value("${kafka.consumer.maxPollRecordsConfig}")
private int maxPollRecordsConfig;
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(1500);
factory.setBatchListener(true);//@KafkaListener 批量消费 每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE );//设置提交偏移量的方式
return factory;
}
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>(8);
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecordsConfig);//每个批次获取数
return propsMap;
}
}
package com.zhiwei.messageflow.redis; package com.zhiwei.messageflow.redis;
import java.util.List;
import java.util.Set; import java.util.Set;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
...@@ -232,5 +233,92 @@ public class RedisPoolAndTools { ...@@ -232,5 +233,92 @@ public class RedisPoolAndTools {
returnResource(jedis); returnResource(jedis);
return set; return set;
} }
/**
* 向列表中添加单个元素
* @Title: Rpush
* @Description: 向列表中添加单个元素
* @param @param redisKey
* @param @param id 设定文件
* @return void 返回类型
*/
public void Rpush(String redisKey, String id) {
Jedis jedis = getJedis();
while (true) {
if (null != jedis) {
break;
} else {
jedis = getJedis();
}
}
jedis.rpush(redisKey, id);
returnResource(jedis);
}
/**
* 从列表中获取范围的元素
* @Title: Lrange
* @Description: 从列表中获取范围的元素
* @param @param key
* @param @param start
* @param @param end
* @param @return 设定文件
* @return List<String> 返回类型
*/
public List<String> Lrange(String key,int start, int end) {
Jedis jedis = getJedis();
while (true) {
if (null != jedis) {
break;
} else {
jedis = getJedis();
}
}
List<String> idlist=jedis.lrange(key, start, end);
returnResource(jedis);
return idlist;
}
/**
* 从有序set中获取范围的元素
* @Title: zrange
* @Description: 从有序set中获取范围的元素
* @param @param redisKey
* @param @param start
* @param @param end
* @param @return 设定文件
* @return Set<String> 返回类型
*/
public Set<String> zrange(String redisKey, int start, int end) {
Jedis jedis = getJedis();
while (true) {
if (null != jedis) {
break;
} else {
jedis = getJedis();
}
}
Set<String> res=jedis.zrange(redisKey, start, end);
returnResource(jedis);
return res;
}
/**
* 从列表中移除全部指定值
* @Title: Lrem
* @Description: 从列表中移除全部指定值
* @param @param redisKey
* @param @param id 设定文件
* @return void 返回类型
*/
public void Lrem(String key, String id) {
Jedis jedis = getJedis();
while (true) {
if (null != jedis) {
break;
} else {
jedis = getJedis();
}
}
jedis.lrem(key, 0, id);
returnResource(jedis);
}
} }
\ No newline at end of file
...@@ -2,6 +2,7 @@ package com.zhiwei.messageflow.redis.service; ...@@ -2,6 +2,7 @@ package com.zhiwei.messageflow.redis.service;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.zhiwei.messageflow.bean.MediaMessage; import com.zhiwei.messageflow.bean.MediaMessage;
...@@ -9,6 +10,8 @@ import com.zhiwei.messageflow.bean.VideoMessage; ...@@ -9,6 +10,8 @@ import com.zhiwei.messageflow.bean.VideoMessage;
import com.zhiwei.messageflow.bean.WeiboMessage; import com.zhiwei.messageflow.bean.WeiboMessage;
import com.zhiwei.messageflow.bean.ZhihuMessage; import com.zhiwei.messageflow.bean.ZhihuMessage;
import redis.clients.jedis.Tuple;
public interface RedisService { public interface RedisService {
/** /**
...@@ -106,10 +109,66 @@ public interface RedisService { ...@@ -106,10 +109,66 @@ public interface RedisService {
/** /**
* 获取定向监测缓存rsidmapKey * 获取定向监测缓存rsidmapKey
* @Title: getDirectRsidMapKey * @Title: getDirectRsidMapKey
* @Description: TODO(这里用一句话描述这个方法的作用) * @Description: 获取定向监测缓存rsidmapKey
* @param @param projectName * @param @param projectName
* @param @return 设定文件 * @param @return 设定文件
* @return String 返回类型 * @return String 返回类型
*/ */
String getDirectRsidMapKey(String projectName); String getDirectRsidMapKey(String projectName);
/**
* 将事件存入缓存
* @Title: insertEvent
* @Description: 将事件存入缓存
* @param @param ob
* @param @return 设定文件
* @return boolean 返回类型
*/
boolean insertEvent(JSONObject ob);
/**
* 向事件待处理队列中加入任务
* @Title: addEventAutoMarkList
* @Description: 向事件待处理队列中加入任务
* @param @param id
* @param @return 设定文件
* @return boolean 返回类型
*/
boolean addEventAutoMarkList(String id);
/**
* 从事件待处理队列中获取前十个任务id
* @Title: getNeedAutoMark
* @Description: 从事件待处理队列中获取前十个任务id
* @param @return 设定文件
* @return List<String> 返回类型
*/
List<String> getNeedAutoMark();
/**
* 取事件采集缓存消息数量
* @Title: countCollectionData
* @Description: 取事件采集缓存消息数量
* @param @param id
* @param @return 设定文件
* @return int 返回类型
*/
int countCollectionData(String id);
/**
* 按页数去事件采集中消息
* @Title: getCollectionData
* @Description: 按页数去事件采集中消息
* @param @param id
* @param @param start
* @param @param end
* @param @return 设定文件
* @return Set<String> 返回类型
*/
Set<String> getCollectionData(String id, int start, int end);
/**
* 向事件待处理队列中移除任务
* @Title: removeEventAutoMarkList
* @Description: 向事件待处理队列中移除任务
* @param @param id
* @param @return 设定文件
* @return boolean 返回类型
*/
boolean removeEventAutoMarkList(String id);
} }
...@@ -2,6 +2,7 @@ package com.zhiwei.messageflow.redis.service.impl; ...@@ -2,6 +2,7 @@ package com.zhiwei.messageflow.redis.service.impl;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
...@@ -18,6 +19,8 @@ import com.zhiwei.messageflow.config.RedisConfig; ...@@ -18,6 +19,8 @@ import com.zhiwei.messageflow.config.RedisConfig;
import com.zhiwei.messageflow.redis.RedisPoolAndTools; import com.zhiwei.messageflow.redis.RedisPoolAndTools;
import com.zhiwei.messageflow.redis.service.RedisService; import com.zhiwei.messageflow.redis.service.RedisService;
import redis.clients.jedis.Tuple;
@Component @Component
public class RedisServiceImpl implements RedisService { public class RedisServiceImpl implements RedisService {
...@@ -166,4 +169,48 @@ public class RedisServiceImpl implements RedisService { ...@@ -166,4 +169,48 @@ public class RedisServiceImpl implements RedisService {
return RedisConfig.DIRECTKEY+projectName; return RedisConfig.DIRECTKEY+projectName;
} }
@Override
public boolean insertEvent(JSONObject ob) {
String redisKey = RedisConfig.EVENTKEY+ob.getString("eventId").replace(":", ":");
try {
redisPoolAndTools.sortedSetZadd(redisKey, (double) ob.getLong("offset"), mapper.writeValueAsString(ob));
} catch (JsonProcessingException e) {
e.printStackTrace();
return false;
}
return true;
}
@Override
public boolean addEventAutoMarkList(String id) {
String redisKey = RedisConfig.EVENTLISTKEY;
redisPoolAndTools.Rpush(redisKey, id);
return true;
}
@Override
public List<String> getNeedAutoMark() {
String redisKey = RedisConfig.EVENTLISTKEY;
return redisPoolAndTools.Lrange(redisKey,0,9);
}
@Override
public int countCollectionData(String id) {
String redisKey = RedisConfig.EVENTKEY+id.replace(":", ":");
return Integer.parseInt(String.valueOf(redisPoolAndTools.getNowCount(redisKey)));
}
@Override
public Set<String> getCollectionData(String id, int start, int end) {
String redisKey = RedisConfig.EVENTKEY+id.replace(":", ":");
return redisPoolAndTools.zrange(redisKey, start, end);
}
@Override
public boolean removeEventAutoMarkList(String id) {
String redisKey = RedisConfig.EVENTLISTKEY;
redisPoolAndTools.Lrem(redisKey, id);
return true;
}
} }
package com.zhiwei.messageflow.service;
import java.io.IOException;
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.zhiwei.messageflow.bean.Event;
public interface EventService {
/**
* 添加单条事件
* @param offset
* @Title: saveEventMsg
* @Description: 添加单条事件
* @param @param message 设定文件
* @return void 返回类型
*/
void saveEventMsg(Object message, long offset);
/**
* 批量缓存事件
* @Title: saveRecords
* @Description: 批量缓存事件
* @param @param records 设定文件
* @return void 返回类型
*/
void saveRecords(List<ConsumerRecord> records);
/**
* 终止事件获取
* @Title: overEvent
* @Description: TODO(这里用一句话描述这个方法的作用)
* @param @param string 设定文件
* @return void 返回类型
*/
void overEvent(String eventId);
/**
* 获取事件采集s按ids
* @Title: findEventsbyIds
* @Description: 获取事件采集s按ids
* @param @param idList
* @param @return 设定文件
* @return List<Event> 返回类型
*/
List<Event> findEventsbyIds(List<String> idList);
/**
* 更新事件采集状态到“自动标注”
* @Title: updateEventStatusAutoMark
* @Description: 更新事件采集状态到“自动标注”
* @param @param id 设定文件
* @return void 返回类型
*/
void updateEventStatusAutoMark(String id);
/**
* 事件采集数据进入自动标注聚合
* @Title: autoMark
* @Description: 事件采集数据进入自动标注聚合
* @param @param eventCollection
* @param @return 设定文件
* @return boolean 返回类型
* @throws IOException
* @throws JsonMappingException
* @throws JsonParseException
*/
boolean autoMark(Event eventCollection) throws JsonParseException, JsonMappingException, IOException;
}
package com.zhiwei.messageflow.service.impl;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.CriteriaDefinition;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mongodb.BasicDBObject;
import com.mongodb.DBObject;
import com.zhiwei.messageflow.bean.Event;
import com.zhiwei.messageflow.config.MiddlewareConfig;
import com.zhiwei.messageflow.redis.service.RedisService;
import com.zhiwei.messageflow.service.EventService;
import com.zhiwei.middleware.automaticmark.Service.AutomaticMarkClient;
import redis.clients.jedis.Tuple;
@Component
public class EventServiceImpl implements EventService {
private static final Logger log = LogManager.getLogger(EventServiceImpl.class);
private static AutomaticMarkClient client = AutomaticMarkClient.getClient(MiddlewareConfig.zookeeperIp);
private static final int AUTOMARKPAGELIMIT = 500;
@Autowired
@Qualifier(value = "primaryMongoTemplate")
protected MongoTemplate primaryMongoTemplate;
@Autowired
protected RedisService redisService;
@Autowired
private ObjectMapper mapper;
@Override
public void saveEventMsg(Object message, long offset) {
// JSONObject.toJSONString(message);
// System.out.println(JSONObject.toJSONString(message));
JSONObject msgJsonOb = JSONObject.parseObject(message.toString());
if ("over".equals(msgJsonOb.getString("result"))) {
System.err.println(msgJsonOb.getString("eventId"));
// TODO 更新事件状态
overEvent(msgJsonOb.getString("eventId"));
try {
Thread.sleep(4000L);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} else {
if (msgJsonOb.containsKey("time")) {
if (null != msgJsonOb.getLong("time")) {
msgJsonOb.put("offset", offset);
// 放入缓存
redisService.insertEvent(msgJsonOb);
}
}
}
}
@Override
public void saveRecords(List<ConsumerRecord> records) {
// List<JSONObject> msgs = new ArrayList<>();
// for (ConsumerRecord record : records) {
// Optional<?> kafkaMessage = Optional.ofNullable(record.value());
// if (kafkaMessage.isPresent()) {
// Object message = kafkaMessage.get();
// JSONObject msgJsonOb = JSONObject.parseObject(message.toString());
// if ("over".equals(msgJsonOb.getString("result"))) {
// System.err.println(msgJsonOb.getString("eventId"));
// // TODO 更新事件状态
// try {
// Thread.sleep(4000L);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// } else {
// msgJsonOb.put("offset", record.offset());
// // 放入缓存list
// msgs.add(msgJsonOb);
// }
// }
// }
//
}
@Override
public void overEvent(String id) {
Update over = new Update();
over.update("status", "采集完毕");
primaryMongoTemplate.upsert(new Query(Criteria.where("_id").is(id)), over, Event.class);
redisService.addEventAutoMarkList(id);
}
@Override
public List<Event> findEventsbyIds(List<String> idList) {
return primaryMongoTemplate.find(new Query(Criteria.where("_id").in(idList)), Event.class);
}
@Override
public void updateEventStatusAutoMark(String id) {
Update over = new Update();
over.update("status", "自动标注");
primaryMongoTemplate.upsert(new Query(Criteria.where("_id").is(id)), over, Event.class);
}
@Override
public boolean autoMark(Event eventCollection) throws JsonParseException, JsonMappingException, IOException {
String id = eventCollection.getId();
// 1,清理标注聚合结果集
String group = eventCollection.getProject();
log.info("group:{},id:{}", group, id);
client.cleanMarkAggreData(group, id);
// 从缓存获取事件采集数据
int count = redisService.countCollectionData(id);
// (总记录数+每页行数-1)/每页行数
int totalpage = (int) (count + AUTOMARKPAGELIMIT - 1) / AUTOMARKPAGELIMIT;
for (int i = 0; i < totalpage; i++) {
int start = i * AUTOMARKPAGELIMIT;
int end = start + AUTOMARKPAGELIMIT - 1;
List<String> eventList = new ArrayList<>();
if (i != (totalpage - 1)) {
eventList = new ArrayList<>(redisService.getCollectionData(id, start, end));
} else {
eventList = new ArrayList<>(redisService.getCollectionData(id, start, -1));
}
List<DBObject> list = new ArrayList<>();
for (int j = 0; j < eventList.size(); j++) {
DBObject dbObject = new BasicDBObject();
String msgStr = eventList.get(j);
Map<String, Object> map = mapper.readValue(msgStr, Map.class);
dbObject.put("_id", map.get("url") + "");
dbObject.put("url", map.get("url") + "");
dbObject.put("title", map.get("title") + "");
dbObject.put("source", map.get("source") + "");
dbObject.put("time", new Date(Long.parseLong(map.get("time") + "")));
dbObject.put("markGroup", group);
list.add(dbObject);
}
// 2,增加标注聚合结果集
client.addMarkAggreSourceList(group, id, list);
}
// 3,启动聚合
client.startAggre(group, id);
//移除等待事件采集聚合list
redisService.removeEventAutoMarkList(id);
return true;
}
}
...@@ -43,53 +43,27 @@ spring.data.mongodb.thirdary.password=stno1q2w3e4r ...@@ -43,53 +43,27 @@ spring.data.mongodb.thirdary.password=stno1q2w3e4r
spring.data.mongodb.thirdary.authenticationDatabase=admin spring.data.mongodb.thirdary.authenticationDatabase=admin
#spring.data.mongodb.option.min-connection-per-host=0 #kafka
#spring.data.mongodb.option.max-connection-per-host=100 spring.kafka.bootstrap-servers = kafka1.irybd.com:9092
#spring.data.mongodb.option.threads-allowed-to-block-for-connection-multiplier=5 spring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializer
#spring.data.mongodb.option.server-selection-timeout=30000 spring.kafka.producer.value-serializer = org.apache.kafka.common.serialization.StringSerializer
#spring.data.mongodb.option.max-wait-time=120000 #=============== consumer =======================
#spring.data.mongodb.option.max-connection-idle-time=0 # group id
#spring.data.mongodb.option.max-connection-life-time=0 #spring.kafka.consumer.group-id=group1
#spring.data.mongodb.option.connect-timeout=10000 ##
#spring.data.mongodb.option.socket-timeout=0 #spring.kafka.consumer.auto-offset-reset=earliest
#spring.kafka.consumer.enable-auto-commit=true
#spring.kafka.consumer.auto-commit-interval=100
# #
#spring.data.mongodb.option.socket-keep-alive=false ## consumer deserializer
#spring.data.mongodb.option.ssl-enabled=false #spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#spring.data.mongodb.option.ssl-invalid-host-name-allowed=false #spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#spring.data.mongodb.option.always-use-m-beans=false
# kafka.consumer.servers=kafka1.irybd.com:9092
#spring.data.mongodb.option.heartbeat-socket-timeout=20000 kafka.consumer.enable.auto.commit=false
#spring.data.mongodb.option.heartbeat-connect-timeout=20000 kafka.consumer.session.timeout=15000
#spring.data.mongodb.option.min-heartbeat-frequency=500 kafka.consumer.auto.commit.interval=100
#spring.data.mongodb.option.heartbeat-frequency=10000 kafka.consumer.auto.offset.reset=earliest
#spring.data.mongodb.option.local-threshold=15 kafka.consumer.group.id=group
kafka.consumer.concurrency=10
##\uFFFD\u009C\uFFFD\uFFFD\u009C\uFFFDip kafka.consumer.maxPollRecordsConfig=100
#spring.data.mongodb.host=192.168.0.241 \ No newline at end of file
##\uFFFD\u009C\uFFFD\uFFFD\u009C\uFFFDport
#spring.data.mongodb.port=27017
##\uFFFD\u009C\uFFFD\uFFFD\u009C\uFFFD\uFFFD\u0095\uFFFD\uFFFD\u008D\uFFFD\uFFFD\u0093
#spring.data.mongodb.database=qbjcPhoenix
#\uFFFD\u0085\uFFFD\uFFFD\u0096\uFFFD\u0095\uFFFD\uFFFD\u008D\uFFFD\uFFFD\u0093
#spring.data.mongodb.database=eventMuseum
#spring.data.mongodb.database=WechatPublic
#tag\uFFFD\u0094\uFFFDuri
#spring.data.mongodb.uri=1.119.44.206:30000
#spring.data.mongodb.uri=192.168.0.245:27017
#tag\uFFFD\u0095\uFFFD\uFFFD\u008D\uFFFD\uFFFD\u0093
#spring.data.mongodb.database=Testqbjc
#spring.data.mongodb.database=weibotag
#mongo.connectionsPerHost=200
#mongo.threadsAllowedToBlockForConnectionMultiplier=10
#
#mongo.connectTimeout=30000
#
#mongo.maxWaitTime=50000
#mongo.autoConnectRetry=true
#mongo.socketKeepAlive=true
#
#mongo.socketTimeout=120000
#mongo.slaveOk=true
\ No newline at end of file
...@@ -39,53 +39,27 @@ spring.data.mongodb.thirdary.password=stno1q2w3e4r ...@@ -39,53 +39,27 @@ spring.data.mongodb.thirdary.password=stno1q2w3e4r
spring.data.mongodb.thirdary.authenticationDatabase=admin spring.data.mongodb.thirdary.authenticationDatabase=admin
#spring.data.mongodb.option.min-connection-per-host=0 #kafka
#spring.data.mongodb.option.max-connection-per-host=100 spring.kafka.bootstrap-servers = kafka1.irybd.com:9092
#spring.data.mongodb.option.threads-allowed-to-block-for-connection-multiplier=5 spring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializer
#spring.data.mongodb.option.server-selection-timeout=30000 spring.kafka.producer.value-serializer = org.apache.kafka.common.serialization.StringSerializer
#spring.data.mongodb.option.max-wait-time=120000 #=============== consumer =======================
#spring.data.mongodb.option.max-connection-idle-time=0 # group id
#spring.data.mongodb.option.max-connection-life-time=0 #spring.kafka.consumer.group-id=group1
#spring.data.mongodb.option.connect-timeout=10000 ##
#spring.data.mongodb.option.socket-timeout=0 #spring.kafka.consumer.auto-offset-reset=earliest
#spring.kafka.consumer.enable-auto-commit=true
#spring.kafka.consumer.auto-commit-interval=100
# #
#spring.data.mongodb.option.socket-keep-alive=false ## consumer deserializer
#spring.data.mongodb.option.ssl-enabled=false #spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#spring.data.mongodb.option.ssl-invalid-host-name-allowed=false #spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#spring.data.mongodb.option.always-use-m-beans=false
# kafka.consumer.servers=kafka1.irybd.com:9092
#spring.data.mongodb.option.heartbeat-socket-timeout=20000 kafka.consumer.enable.auto.commit=false
#spring.data.mongodb.option.heartbeat-connect-timeout=20000 kafka.consumer.session.timeout=15000
#spring.data.mongodb.option.min-heartbeat-frequency=500 kafka.consumer.auto.commit.interval=100
#spring.data.mongodb.option.heartbeat-frequency=10000 kafka.consumer.auto.offset.reset=earliest
#spring.data.mongodb.option.local-threshold=15 kafka.consumer.group.id=group
kafka.consumer.concurrency=10
##本地ip kafka.consumer.maxPollRecordsConfig=100
#spring.data.mongodb.host=192.168.0.241 \ No newline at end of file
##本地port
#spring.data.mongodb.port=27017
##本地数据库
#spring.data.mongodb.database=qbjcPhoenix
#其他数据库
#spring.data.mongodb.database=eventMuseum
#spring.data.mongodb.database=WechatPublic
#tag用uri
#spring.data.mongodb.uri=1.119.44.206:30000
#spring.data.mongodb.uri=192.168.0.245:27017
#tag数据库
#spring.data.mongodb.database=Testqbjc
#spring.data.mongodb.database=weibotag
#mongo.connectionsPerHost=200
#mongo.threadsAllowedToBlockForConnectionMultiplier=10
#
#mongo.connectTimeout=30000
#
#mongo.maxWaitTime=50000
#mongo.autoConnectRetry=true
#mongo.socketKeepAlive=true
#
#mongo.socketTimeout=120000
#mongo.slaveOk=true
\ No newline at end of file
...@@ -39,53 +39,27 @@ spring.data.mongodb.thirdary.password=stno1q2w3e4r ...@@ -39,53 +39,27 @@ spring.data.mongodb.thirdary.password=stno1q2w3e4r
spring.data.mongodb.thirdary.authenticationDatabase=admin spring.data.mongodb.thirdary.authenticationDatabase=admin
#spring.data.mongodb.option.min-connection-per-host=0 #kafka
#spring.data.mongodb.option.max-connection-per-host=100 spring.kafka.bootstrap-servers = kafka1.irybd.com:9092
#spring.data.mongodb.option.threads-allowed-to-block-for-connection-multiplier=5 spring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializer
#spring.data.mongodb.option.server-selection-timeout=30000 spring.kafka.producer.value-serializer = org.apache.kafka.common.serialization.StringSerializer
#spring.data.mongodb.option.max-wait-time=120000 #=============== consumer =======================
#spring.data.mongodb.option.max-connection-idle-time=0 # group id
#spring.data.mongodb.option.max-connection-life-time=0 #spring.kafka.consumer.group-id=group1
#spring.data.mongodb.option.connect-timeout=10000 ##
#spring.data.mongodb.option.socket-timeout=0 #spring.kafka.consumer.auto-offset-reset=earliest
#spring.kafka.consumer.enable-auto-commit=true
#spring.kafka.consumer.auto-commit-interval=100
# #
#spring.data.mongodb.option.socket-keep-alive=false ## consumer deserializer
#spring.data.mongodb.option.ssl-enabled=false #spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#spring.data.mongodb.option.ssl-invalid-host-name-allowed=false #spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#spring.data.mongodb.option.always-use-m-beans=false
# kafka.consumer.servers=kafka1.irybd.com:9092
#spring.data.mongodb.option.heartbeat-socket-timeout=20000 kafka.consumer.enable.auto.commit=false
#spring.data.mongodb.option.heartbeat-connect-timeout=20000 kafka.consumer.session.timeout=15000
#spring.data.mongodb.option.min-heartbeat-frequency=500 kafka.consumer.auto.commit.interval=100
#spring.data.mongodb.option.heartbeat-frequency=10000 kafka.consumer.auto.offset.reset=earliest
#spring.data.mongodb.option.local-threshold=15 kafka.consumer.group.id=group
kafka.consumer.concurrency=10
##本地ip kafka.consumer.maxPollRecordsConfig=100
#spring.data.mongodb.host=192.168.0.241 \ No newline at end of file
##本地port
#spring.data.mongodb.port=27017
##本地数据库
#spring.data.mongodb.database=qbjcPhoenix
#其他数据库
#spring.data.mongodb.database=eventMuseum
#spring.data.mongodb.database=WechatPublic
#tag用uri
#spring.data.mongodb.uri=1.119.44.206:30000
#spring.data.mongodb.uri=192.168.0.245:27017
#tag数据库
#spring.data.mongodb.database=Testqbjc
#spring.data.mongodb.database=weibotag
#mongo.connectionsPerHost=200
#mongo.threadsAllowedToBlockForConnectionMultiplier=10
#
#mongo.connectTimeout=30000
#
#mongo.maxWaitTime=50000
#mongo.autoConnectRetry=true
#mongo.socketKeepAlive=true
#
#mongo.socketTimeout=120000
#mongo.slaveOk=true
\ No newline at end of file
...@@ -43,53 +43,27 @@ spring.data.mongodb.thirdary.password=stno1q2w3e4r ...@@ -43,53 +43,27 @@ spring.data.mongodb.thirdary.password=stno1q2w3e4r
spring.data.mongodb.thirdary.authenticationDatabase=admin spring.data.mongodb.thirdary.authenticationDatabase=admin
#spring.data.mongodb.option.min-connection-per-host=0 #kafka
#spring.data.mongodb.option.max-connection-per-host=100 spring.kafka.bootstrap-servers = kafka1.irybd.com:9092
#spring.data.mongodb.option.threads-allowed-to-block-for-connection-multiplier=5 spring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializer
#spring.data.mongodb.option.server-selection-timeout=30000 spring.kafka.producer.value-serializer = org.apache.kafka.common.serialization.StringSerializer
#spring.data.mongodb.option.max-wait-time=120000 #=============== consumer =======================
#spring.data.mongodb.option.max-connection-idle-time=0 # group id
#spring.data.mongodb.option.max-connection-life-time=0 #spring.kafka.consumer.group-id=group1
#spring.data.mongodb.option.connect-timeout=10000 ##
#spring.data.mongodb.option.socket-timeout=0 #spring.kafka.consumer.auto-offset-reset=earliest
#spring.kafka.consumer.enable-auto-commit=true
#spring.kafka.consumer.auto-commit-interval=100
# #
#spring.data.mongodb.option.socket-keep-alive=false ## consumer deserializer
#spring.data.mongodb.option.ssl-enabled=false #spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#spring.data.mongodb.option.ssl-invalid-host-name-allowed=false #spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#spring.data.mongodb.option.always-use-m-beans=false
# kafka.consumer.servers=kafka1.irybd.com:9092
#spring.data.mongodb.option.heartbeat-socket-timeout=20000 kafka.consumer.enable.auto.commit=false
#spring.data.mongodb.option.heartbeat-connect-timeout=20000 kafka.consumer.session.timeout=15000
#spring.data.mongodb.option.min-heartbeat-frequency=500 kafka.consumer.auto.commit.interval=100
#spring.data.mongodb.option.heartbeat-frequency=10000 kafka.consumer.auto.offset.reset=earliest
#spring.data.mongodb.option.local-threshold=15 kafka.consumer.group.id=group
kafka.consumer.concurrency=10
##\uFFFD\u009C\uFFFD\uFFFD\u009C\uFFFDip kafka.consumer.maxPollRecordsConfig=100
#spring.data.mongodb.host=192.168.0.241 \ No newline at end of file
##\uFFFD\u009C\uFFFD\uFFFD\u009C\uFFFDport
#spring.data.mongodb.port=27017
##\uFFFD\u009C\uFFFD\uFFFD\u009C\uFFFD\uFFFD\u0095\uFFFD\uFFFD\u008D\uFFFD\uFFFD\u0093
#spring.data.mongodb.database=qbjcPhoenix
#\uFFFD\u0085\uFFFD\uFFFD\u0096\uFFFD\u0095\uFFFD\uFFFD\u008D\uFFFD\uFFFD\u0093
#spring.data.mongodb.database=eventMuseum
#spring.data.mongodb.database=WechatPublic
#tag\uFFFD\u0094\uFFFDuri
#spring.data.mongodb.uri=1.119.44.206:30000
#spring.data.mongodb.uri=192.168.0.245:27017
#tag\uFFFD\u0095\uFFFD\uFFFD\u008D\uFFFD\uFFFD\u0093
#spring.data.mongodb.database=Testqbjc
#spring.data.mongodb.database=weibotag
#mongo.connectionsPerHost=200
#mongo.threadsAllowedToBlockForConnectionMultiplier=10
#
#mongo.connectTimeout=30000
#
#mongo.maxWaitTime=50000
#mongo.autoConnectRetry=true
#mongo.socketKeepAlive=true
#
#mongo.socketTimeout=120000
#mongo.slaveOk=true
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment