Commit 3594cf63 by win7

监测系统消息流模块

parents
target/
!.mvn/wrapper/maven-wrapper.jar
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
nbproject/private/
build/
nbbuild/
dist/
nbdist/
.nb-gradle/
\ No newline at end of file
distributionUrl=https://repo1.maven.org/maven2/org/apache/maven/apache-maven/3.5.3/apache-maven-3.5.3-bin.zip
#!/bin/sh
# ----------------------------------------------------------------------------
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# ----------------------------------------------------------------------------
# ----------------------------------------------------------------------------
# Maven2 Start Up Batch script
#
# Required ENV vars:
# ------------------
# JAVA_HOME - location of a JDK home dir
#
# Optional ENV vars
# -----------------
# M2_HOME - location of maven2's installed home dir
# MAVEN_OPTS - parameters passed to the Java VM when running Maven
# e.g. to debug Maven itself, use
# set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
# MAVEN_SKIP_RC - flag to disable loading of mavenrc files
# ----------------------------------------------------------------------------
if [ -z "$MAVEN_SKIP_RC" ] ; then
if [ -f /etc/mavenrc ] ; then
. /etc/mavenrc
fi
if [ -f "$HOME/.mavenrc" ] ; then
. "$HOME/.mavenrc"
fi
fi
# OS specific support. $var _must_ be set to either true or false.
cygwin=false;
darwin=false;
mingw=false
case "`uname`" in
CYGWIN*) cygwin=true ;;
MINGW*) mingw=true;;
Darwin*) darwin=true
# Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home
# See https://developer.apple.com/library/mac/qa/qa1170/_index.html
if [ -z "$JAVA_HOME" ]; then
if [ -x "/usr/libexec/java_home" ]; then
export JAVA_HOME="`/usr/libexec/java_home`"
else
export JAVA_HOME="/Library/Java/Home"
fi
fi
;;
esac
if [ -z "$JAVA_HOME" ] ; then
if [ -r /etc/gentoo-release ] ; then
JAVA_HOME=`java-config --jre-home`
fi
fi
if [ -z "$M2_HOME" ] ; then
## resolve links - $0 may be a link to maven's home
PRG="$0"
# need this for relative symlinks
while [ -h "$PRG" ] ; do
ls=`ls -ld "$PRG"`
link=`expr "$ls" : '.*-> \(.*\)$'`
if expr "$link" : '/.*' > /dev/null; then
PRG="$link"
else
PRG="`dirname "$PRG"`/$link"
fi
done
saveddir=`pwd`
M2_HOME=`dirname "$PRG"`/..
# make it fully qualified
M2_HOME=`cd "$M2_HOME" && pwd`
cd "$saveddir"
# echo Using m2 at $M2_HOME
fi
# For Cygwin, ensure paths are in UNIX format before anything is touched
if $cygwin ; then
[ -n "$M2_HOME" ] &&
M2_HOME=`cygpath --unix "$M2_HOME"`
[ -n "$JAVA_HOME" ] &&
JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
[ -n "$CLASSPATH" ] &&
CLASSPATH=`cygpath --path --unix "$CLASSPATH"`
fi
# For Migwn, ensure paths are in UNIX format before anything is touched
if $mingw ; then
[ -n "$M2_HOME" ] &&
M2_HOME="`(cd "$M2_HOME"; pwd)`"
[ -n "$JAVA_HOME" ] &&
JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`"
# TODO classpath?
fi
if [ -z "$JAVA_HOME" ]; then
javaExecutable="`which javac`"
if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then
# readlink(1) is not available as standard on Solaris 10.
readLink=`which readlink`
if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then
if $darwin ; then
javaHome="`dirname \"$javaExecutable\"`"
javaExecutable="`cd \"$javaHome\" && pwd -P`/javac"
else
javaExecutable="`readlink -f \"$javaExecutable\"`"
fi
javaHome="`dirname \"$javaExecutable\"`"
javaHome=`expr "$javaHome" : '\(.*\)/bin'`
JAVA_HOME="$javaHome"
export JAVA_HOME
fi
fi
fi
if [ -z "$JAVACMD" ] ; then
if [ -n "$JAVA_HOME" ] ; then
if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
# IBM's JDK on AIX uses strange locations for the executables
JAVACMD="$JAVA_HOME/jre/sh/java"
else
JAVACMD="$JAVA_HOME/bin/java"
fi
else
JAVACMD="`which java`"
fi
fi
if [ ! -x "$JAVACMD" ] ; then
echo "Error: JAVA_HOME is not defined correctly." >&2
echo " We cannot execute $JAVACMD" >&2
exit 1
fi
if [ -z "$JAVA_HOME" ] ; then
echo "Warning: JAVA_HOME environment variable is not set."
fi
CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher
# traverses directory structure from process work directory to filesystem root
# first directory with .mvn subdirectory is considered project base directory
find_maven_basedir() {
if [ -z "$1" ]
then
echo "Path not specified to find_maven_basedir"
return 1
fi
basedir="$1"
wdir="$1"
while [ "$wdir" != '/' ] ; do
if [ -d "$wdir"/.mvn ] ; then
basedir=$wdir
break
fi
# workaround for JBEAP-8937 (on Solaris 10/Sparc)
if [ -d "${wdir}" ]; then
wdir=`cd "$wdir/.."; pwd`
fi
# end of workaround
done
echo "${basedir}"
}
# concatenates all lines of a file
concat_lines() {
if [ -f "$1" ]; then
echo "$(tr -s '\n' ' ' < "$1")"
fi
}
BASE_DIR=`find_maven_basedir "$(pwd)"`
if [ -z "$BASE_DIR" ]; then
exit 1;
fi
export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"}
echo $MAVEN_PROJECTBASEDIR
MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS"
# For Cygwin, switch paths to Windows format before running java
if $cygwin; then
[ -n "$M2_HOME" ] &&
M2_HOME=`cygpath --path --windows "$M2_HOME"`
[ -n "$JAVA_HOME" ] &&
JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"`
[ -n "$CLASSPATH" ] &&
CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
[ -n "$MAVEN_PROJECTBASEDIR" ] &&
MAVEN_PROJECTBASEDIR=`cygpath --path --windows "$MAVEN_PROJECTBASEDIR"`
fi
WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
exec "$JAVACMD" \
$MAVEN_OPTS \
-classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \
"-Dmaven.home=${M2_HOME}" "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \
${WRAPPER_LAUNCHER} $MAVEN_CONFIG "$@"
@REM ----------------------------------------------------------------------------
@REM Licensed to the Apache Software Foundation (ASF) under one
@REM or more contributor license agreements. See the NOTICE file
@REM distributed with this work for additional information
@REM regarding copyright ownership. The ASF licenses this file
@REM to you under the Apache License, Version 2.0 (the
@REM "License"); you may not use this file except in compliance
@REM with the License. You may obtain a copy of the License at
@REM
@REM http://www.apache.org/licenses/LICENSE-2.0
@REM
@REM Unless required by applicable law or agreed to in writing,
@REM software distributed under the License is distributed on an
@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@REM KIND, either express or implied. See the License for the
@REM specific language governing permissions and limitations
@REM under the License.
@REM ----------------------------------------------------------------------------
@REM ----------------------------------------------------------------------------
@REM Maven2 Start Up Batch script
@REM
@REM Required ENV vars:
@REM JAVA_HOME - location of a JDK home dir
@REM
@REM Optional ENV vars
@REM M2_HOME - location of maven2's installed home dir
@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands
@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a key stroke before ending
@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven
@REM e.g. to debug Maven itself, use
@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files
@REM ----------------------------------------------------------------------------
@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on'
@echo off
@REM enable echoing my setting MAVEN_BATCH_ECHO to 'on'
@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO%
@REM set %HOME% to equivalent of $HOME
if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%")
@REM Execute a user defined script before this one
if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre
@REM check for pre script, once with legacy .bat ending and once with .cmd ending
if exist "%HOME%\mavenrc_pre.bat" call "%HOME%\mavenrc_pre.bat"
if exist "%HOME%\mavenrc_pre.cmd" call "%HOME%\mavenrc_pre.cmd"
:skipRcPre
@setlocal
set ERROR_CODE=0
@REM To isolate internal variables from possible post scripts, we use another setlocal
@setlocal
@REM ==== START VALIDATION ====
if not "%JAVA_HOME%" == "" goto OkJHome
echo.
echo Error: JAVA_HOME not found in your environment. >&2
echo Please set the JAVA_HOME variable in your environment to match the >&2
echo location of your Java installation. >&2
echo.
goto error
:OkJHome
if exist "%JAVA_HOME%\bin\java.exe" goto init
echo.
echo Error: JAVA_HOME is set to an invalid directory. >&2
echo JAVA_HOME = "%JAVA_HOME%" >&2
echo Please set the JAVA_HOME variable in your environment to match the >&2
echo location of your Java installation. >&2
echo.
goto error
@REM ==== END VALIDATION ====
:init
@REM Find the project base dir, i.e. the directory that contains the folder ".mvn".
@REM Fallback to current working directory if not found.
set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR%
IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir
set EXEC_DIR=%CD%
set WDIR=%EXEC_DIR%
:findBaseDir
IF EXIST "%WDIR%"\.mvn goto baseDirFound
cd ..
IF "%WDIR%"=="%CD%" goto baseDirNotFound
set WDIR=%CD%
goto findBaseDir
:baseDirFound
set MAVEN_PROJECTBASEDIR=%WDIR%
cd "%EXEC_DIR%"
goto endDetectBaseDir
:baseDirNotFound
set MAVEN_PROJECTBASEDIR=%EXEC_DIR%
cd "%EXEC_DIR%"
:endDetectBaseDir
IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig
@setlocal EnableExtensions EnableDelayedExpansion
for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a
@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS%
:endReadAdditionalConfig
SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe"
set WRAPPER_JAR="%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.jar"
set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
%MAVEN_JAVA_EXE% %JVM_CONFIG_MAVEN_PROPS% %MAVEN_OPTS% %MAVEN_DEBUG_OPTS% -classpath %WRAPPER_JAR% "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" %WRAPPER_LAUNCHER% %MAVEN_CONFIG% %*
if ERRORLEVEL 1 goto error
goto end
:error
set ERROR_CODE=1
:end
@endlocal & set ERROR_CODE=%ERROR_CODE%
if not "%MAVEN_SKIP_RC%" == "" goto skipRcPost
@REM check for post script, once with legacy .bat ending and once with .cmd ending
if exist "%HOME%\mavenrc_post.bat" call "%HOME%\mavenrc_post.bat"
if exist "%HOME%\mavenrc_post.cmd" call "%HOME%\mavenrc_post.cmd"
:skipRcPost
@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on'
if "%MAVEN_BATCH_PAUSE%" == "on" pause
if "%MAVEN_TERMINATE_CMD%" == "on" exit %ERROR_CODE%
exit /B %ERROR_CODE%
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.zhiwei.messageflow</groupId>
<artifactId>messageflow</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>messageflow</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RELEASE</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- es -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>5.2.2</version><!--$NO-MVN-MAN-VER$ -->
</dependency>
<dependency>
<groupId>com.zhiwei</groupId>
<artifactId>es-client</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.7</version><!--$NO-MVN-MAN-VER$ -->
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.7</version><!--$NO-MVN-MAN-VER$ -->
</dependency>
<!-- jackson -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<!-- net.sf.json -->
<dependency>
<groupId>net.sf.json-lib</groupId>
<artifactId>json-lib</artifactId>
<version>2.4</version>
<classifier>jdk15</classifier>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<!-- Jedis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
<!-- lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!-- POI -->
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-ooxml</artifactId>
<version>3.9</version>
</dependency>
<!-- mongodb-plus -->
<dependency>
<groupId>com.spring4all</groupId>
<artifactId>mongodb-plus-spring-boot-starter</artifactId>
<version>1.0.0.RELEASE</version>
</dependency>
<!-- mail-send -->
<dependency>
<groupId>javax.mail</groupId>
<artifactId>mail</artifactId>
<version>1.4.7</version>
</dependency>
<!-- solr -->
<dependency>
<groupId>org.apache.solr</groupId>
<artifactId>solr-solrj</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
package com.zhiwei.messageflow;
import java.io.IOException;
import java.util.Timer;
import java.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import com.zhiwei.messageflow.listener.ApplicationContextProvider;
/**
* 消息流模块入口
*/
@Component
public class ES4RedisRunner implements ApplicationRunner {
public final static Logger log = LoggerFactory.getLogger(ES4RedisRunner.class);
@Override
public void run(ApplicationArguments args) throws Exception {
init();
}
public void init() {
try {
// /**
// * redis消息获取测试方法
// */
// getSet gs = ApplicationContextProvider.getBean("getSet", getSet.class);
// gs.getDataByeRedis("", "", 0, 0, 0);
// 手动注入bean ES4RedisStart
ES4RedisStart start = ApplicationContextProvider.getBean("ES4RedisStart", ES4RedisStart.class);
// 定时器 项目启动后100ms运行 每次间隔8min
Timer timer = new Timer();
timer.schedule(
new TimerTask() {
public void run() {
try {
start.startThread();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
}, 100L, 30 * 1000L);
// /**
// * ES库消息输出Excel并分析关键词重复数据
// */
// GetMessage gm = ApplicationContextProvider.getBean("getMessage",
// GetMessage.class);
// gm.getSame();
// gm.getmessage();
} catch (Exception e) {
e.printStackTrace();
}
}
}
\ No newline at end of file
package com.zhiwei.messageflow;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.zhiwei.messageflow.mongo.bean.Project;
import com.zhiwei.messageflow.mongo.dao.PlatformDao;
import com.zhiwei.messageflow.mongo.dao.ProjectDao;
@Component
public class ES4RedisStart {
private final static Logger log = LoggerFactory.getLogger(ES4RedisStart.class);
@Autowired
private ProjectDao projectDao;
@Autowired
private PlatformDao platformDao;
@Autowired
private ES4RedisTask es4RedisTask;
public void startThread() throws JsonParseException, JsonMappingException, IOException, InterruptedException {
// 项目列表
List<Project> projects = projectDao.getAllProjects();
// 公共平台名列表
List<String> platformNames = platformDao.getPublicPlatformName();
// 遍历项目
for (Project project : projects) {
/**
* 项目全部平台(公共+私有)
*/
List<String> allplatformNames = new ArrayList<>();
allplatformNames.addAll(platformNames);
allplatformNames.addAll(project.getDataPt());
// 获取线程
ES4RedisThread es4RedisThread = ES4RedisThread.getThread(project.getProjectName(), project,
allplatformNames, es4RedisTask);
// 获取线程失败
if (es4RedisThread == null) {
log.warn("{}项目获取线程失败", project.getProjectName());
continue;
}
// 线程启动
es4RedisThread.start();
}
}
}
package com.zhiwei.messageflow;
import java.io.IOException;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.zhiwei.messageflow.mongo.bean.Project;
public class ES4RedisThread extends Thread {
private final static Logger log = LoggerFactory.getLogger(ES4RedisThread.class);
// 线程
private Thread t;
// 线程名
private String threadName;
// 项目
private Project project;
// 平台列表
private List<String> platformNames;
private ES4RedisTask es4RedisTask;
// 单个平台单个关键词组每次查询数量
private static final int count = 300;
// private static final int max_Thread_num = 40;
// private static int Thread_num = 0;
// private static final int max_Running_num = 3;
// private static Integer Running_num = 0;
// private static List<String> ThreadList = new ArrayList<>();
public ES4RedisThread(String name, Project project, List<String> platformNames, ES4RedisTask es4RedisTask) {
threadName = name;
this.project = project;
this.platformNames = platformNames;
this.es4RedisTask = es4RedisTask;
}
public ES4RedisThread() {
}
public static ES4RedisThread getThread(String name, Project project, List<String> platformNames,
ES4RedisTask es4RedisTask) {
ES4RedisThread es4RedisThread = new ES4RedisThread(name, project, platformNames, es4RedisTask);
return es4RedisThread;
}
public void start() {
// 线程开始
log.info("Starting {}", threadName);
if (t == null) {
t = new Thread(this, threadName);
// 通知执行run方法
t.start();
}
}
@SuppressWarnings("static-access")
@Override
public void run() {
try {
Thread.sleep(10L);
// 超时控制器
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
// 超时则线程中止
Thread.currentThread().interrupted();
log.warn("{}项目超时", project.getProjectName());
}
}, 29 * 1000L);
// 程序运行
log.info("Running {}", threadName);
// 该项目执行消息流获取
boolean flag = es4RedisTask.ES4Redis(project, platformNames, count);
if (!flag) {
// 程序执行出现异常则线程中止
timer.cancel();
Thread.currentThread().interrupted();
log.error("{}项目出现异常", project.getProjectName());
} else
// 程序正常执行完毕,关闭超时控制器
timer.cancel();
} catch (JsonParseException e) {
e.printStackTrace();
} catch (JsonMappingException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 线程退出
log.info("Thread {} exiting.", threadName);
}
}
}
\ No newline at end of file
package com.zhiwei.messageflow;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class MessageflowApplication {
public static void main(String[] args) {
SpringApplication.run(MessageflowApplication.class, args);
}
}
package com.zhiwei.messageflow.bean;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
@Data
@ToString
@EqualsAndHashCode(callSuper = false)
public class MediaMessage {
private String _id;
private String pt;
private Long rsid;
private String id;
private String source;
private String time;
private String title;
private String type;
private String content;
private String savetime;
private String spyderInfoId;
private String column;
private String url;
private String rootSource;
private String userid;
private String name;
private String company;
private String position;
private String coulmn;
private String author;
private String user;
private String colum;
private String rootSourcce;
private Double channelIndex;
}
\ No newline at end of file
package com.zhiwei.messageflow.bean;
import lombok.Data;
import lombok.ToString;
@Data
@ToString
public class VideoMessage {
private String _id;
private String videoLength;
private String pt;
private String spyderInfoId;
private String browseCount;
private Long rsid;
private String source;
private String title;
private String url;
private String content;
private String savetime;
private String id;
private String time;
private String uper;
private String type;
}
\ No newline at end of file
package com.zhiwei.messageflow.bean;
import lombok.Data;
import lombok.ToString;
@Data
@ToString
public class WeiboMessage {
private String _id;
private Integer vtype;
private String sex;
private Integer fensi;
private String description;
private String pic_urls;
private String source;
private Integer reply_count;
private Integer retweet_count;
private String url;
private String ins;
private Integer isForward;
private String weibo_img;
private Integer weibo;
private String user_id;
private String img_url;
private String location;
private String id;
private String time;
private String text;
private Long rstime;
private String username;
private Integer guanzhu;
private String rooturl;
private String roottext;
private String root_time;
private String rootsource;
private String retweet_status_id;
private Integer root_reply;
private String rootuid;
private Integer root_repost;
private String rootname;
private String rootLocation;
private Double channelIndex;
}
package com.zhiwei.messageflow.bean;
import lombok.Data;
import lombok.ToString;
@Data
@ToString
public class ZhihuMessage {
private String _id;
private String pt;
private String spyderInfoId;
private String user_name;
private String created_at;
private Long rsid;
private String answers_updateTime;
private String source;
private Integer attitudes_count;
private String type;
private String insert_at;
private String question_title;
private Integer answer_count;
private String question_id;
private String url;
private String answer_content;
private String img_url;
private String update_at;
private String user_url;
private String id;
private String comments_count;
private String question_content;
private String createTime;
private String serialVersionUID;
private String question_url;
private String answer_url;
private String img;
private String follow;
private String browse;
}
\ No newline at end of file
package com.zhiwei.messageflow.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.Component;
//ES配置类
@Component
@Configuration
@ConfigurationProperties(prefix = "es")
@PropertySource(value = "classpath:es.properties")
public class ESConfig {
public static String ip;
public static int esPort;
public static String clusterName;
public static String zhihuIndexName;
public static String zhihuType;
public static String mediaIndexName;
public static String mediaType;
public static String weiboIndexName;
public static String weiboType;
public static String videoIndexName;
public static String videoType;
public static String mediaMarkIndexName;
public static String mediaMarkType;
public static String weiboMarkIndexName;
public static String weiboMarkType;
public static String getIp() {
return ip;
}
public static void setIp(String ip) {
ESConfig.ip = ip;
}
public static int getEsPort() {
return esPort;
}
public static void setEsPort(int esPort) {
ESConfig.esPort = esPort;
}
public static String getClusterName() {
return clusterName;
}
public static void setClusterName(String clusterName) {
ESConfig.clusterName = clusterName;
}
public static String getZhihuIndexName() {
return zhihuIndexName;
}
public static void setZhihuIndexName(String zhihuIndexName) {
ESConfig.zhihuIndexName = zhihuIndexName;
}
public static String getZhihuType() {
return zhihuType;
}
public static void setZhihuType(String zhihuType) {
ESConfig.zhihuType = zhihuType;
}
public static String getMediaIndexName() {
return mediaIndexName;
}
public static void setMediaIndexName(String mediaIndexName) {
ESConfig.mediaIndexName = mediaIndexName;
}
public static String getMediaType() {
return mediaType;
}
public static void setMediaType(String mediaType) {
ESConfig.mediaType = mediaType;
}
public static String getWeiboIndexName() {
return weiboIndexName;
}
public static void setWeiboIndexName(String weiboIndexName) {
ESConfig.weiboIndexName = weiboIndexName;
}
public static String getWeiboType() {
return weiboType;
}
public static void setWeiboType(String weiboType) {
ESConfig.weiboType = weiboType;
}
public static String getVideoIndexName() {
return videoIndexName;
}
public static void setVideoIndexName(String videoIndexName) {
ESConfig.videoIndexName = videoIndexName;
}
public static String getVideoType() {
return videoType;
}
public static void setVideoType(String videoType) {
ESConfig.videoType = videoType;
}
public static String getMediaMarkIndexName() {
return mediaMarkIndexName;
}
public static void setMediaMarkIndexName(String mediaMarkIndexName) {
ESConfig.mediaMarkIndexName = mediaMarkIndexName;
}
public static String getMediaMarkType() {
return mediaMarkType;
}
public static void setMediaMarkType(String mediaMarkType) {
ESConfig.mediaMarkType = mediaMarkType;
}
public static String getWeiboMarkIndexName() {
return weiboMarkIndexName;
}
public static void setWeiboMarkIndexName(String weiboMarkIndexName) {
ESConfig.weiboMarkIndexName = weiboMarkIndexName;
}
public static String getWeiboMarkType() {
return weiboMarkType;
}
public static void setWeiboMarkType(String weiboMarkType) {
ESConfig.weiboMarkType = weiboMarkType;
}
}
package com.zhiwei.messageflow.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.Component;
import lombok.Data;
import lombok.ToString;
//Mongo配置类
@Data
@ToString
@Component
@Configuration
@ConfigurationProperties(prefix = "mongo")
@PropertySource(value = "classpath:mongo.properties")
public class MongoConfig {
private String LocalIP;
private int Localport;
private String LocalDBName;
private String ServerIP;
private int Serverport;
private String ServerDBName;
private String ServerUsername;
private String ServerPassword;
private String authenticationDatabase;
private String EventDBName;
private String WechatDBName;
private String TestDBName;
private String weibotagDBName;
private String mediatagDBName;
private String TagIP;
private int Tagport;
private int connectionsPerHost;
private int threadsAllowedToBlockForConnectionMultiplier;
private int connectTimeout;
private int maxWaitTime;
private boolean autoConnectRetry;
private boolean socketKeepAlive;
private int socketTimeout;
private boolean slaveOk;
}
package com.zhiwei.messageflow.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.Component;
//redus配置类
@Component
@Configuration
@ConfigurationProperties(prefix = "redis")
@PropertySource(value = "classpath:redis.properties")
public class RedisConfig {
private int maxTotal;
private int maxIdle;
private int maxWaitMillis;
private boolean testOnBorrow;
private boolean testOnReturn;
private String ip;
private int port;
private int keyMaxSize;
private int selectDB;
private int user_keyMaxSize;
private int cacheSize;
private int intitCount;
public int getMaxTotal() {
return maxTotal;
}
public void setMaxTotal(int maxTotal) {
this.maxTotal = maxTotal;
}
public int getMaxIdle() {
return maxIdle;
}
public void setMaxIdle(int maxIdle) {
this.maxIdle = maxIdle;
}
public int getMaxWaitMillis() {
return maxWaitMillis;
}
public void setMaxWaitMillis(int maxWaitMillis) {
this.maxWaitMillis = maxWaitMillis;
}
public boolean isTestOnBorrow() {
return testOnBorrow;
}
public void setTestOnBorrow(boolean testOnBorrow) {
this.testOnBorrow = testOnBorrow;
}
public boolean isTestOnReturn() {
return testOnReturn;
}
public void setTestOnReturn(boolean testOnReturn) {
this.testOnReturn = testOnReturn;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public int getKeyMaxSize() {
return keyMaxSize;
}
public void setKeyMaxSize(int keyMaxSize) {
this.keyMaxSize = keyMaxSize;
}
public int getSelectDB() {
return selectDB;
}
public void setSelectDB(int selectDB) {
this.selectDB = selectDB;
}
public int getUser_keyMaxSize() {
return user_keyMaxSize;
}
public void setUser_keyMaxSize(int user_keyMaxSize) {
this.user_keyMaxSize = user_keyMaxSize;
}
public int getCacheSize() {
return cacheSize;
}
public void setCacheSize(int cacheSize) {
this.cacheSize = cacheSize;
}
public int getIntitCount() {
return intitCount;
}
public void setIntitCount(int intitCount) {
this.intitCount = intitCount;
}
}
package com.zhiwei.messageflow.es;
import java.net.InetAddress;
import java.net.UnknownHostException;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.springframework.stereotype.Component;
import com.zhiwei.messageflow.config.ESConfig;
@Component
public class ESClient {
public static TransportClient client = ESClientHolder.getESClient();
private static class ESClientHolder {
private static TransportClient getESClient() {
Settings esSettings = Settings.builder().put("cluster.name", ESConfig.clusterName) // 设置ES实例的名称
.put("client.transport.sniff", false) // 自动嗅探整个集群的状态,把集群中其他ES节点的ip添加到本地的客户端列表中
// .put("client.transport.ping_timeout", "15s")
.build();
TransportClient client = new PreBuiltTransportClient(esSettings);
// 此步骤添加IP,至少一个,其实一个就够了,因为添加了自动嗅探配置
try {
client.addTransportAddress(
new InetSocketTransportAddress(InetAddress.getByName(ESConfig.ip), ESConfig.esPort));
} catch (UnknownHostException e) {
e.printStackTrace();
}
return client;
}
}
}
/**
* @Title: Template.java
* @Package com.zhiwei.weixin.pojo
* @Description: TODO(用一句话描述该文件做什么)
* @author hero
* @date 2016年1月27日 上午11:40:25
* @version V1.0
*/
package com.zhiwei.messageflow.es.bean;
import java.util.Map;
import lombok.Data;
import lombok.ToString;
/**
*
* @ClassName: Template
* @Description: TODO(模版消息类)
* @author 陈炜涛
* @date 2017年11月4日 上午11:31:49
*/
@Data
@ToString
public class Template {
/**
* 要发送的的用户
*/
private String touser;
/**
* 模版消息长id
*/
private String template_id;
/**
* 点击通知的跳转地址
*/
private String url;
/**
* 需要发送的模版消息内容
*/
private Map<String, Object> data;
}
package com.zhiwei.messageflow.es.bean;
import java.util.Date;
import lombok.Data;
import lombok.ToString;
/**
*
* @ClassName: TemplateData
* @Description: TODO(消息模板数据)
* @author 陈炜涛
* @date 2017年11月4日 上午11:02:13
*/
@Data
@ToString
public class TemplateData {
/**
* 地址
*/
private String url;
/**
* 标题 若平台为知乎,则这为知乎问题 若平台为论坛/贴吧,则这为贴子名
*/
private String title;
/**
* 内容 若平台为知乎,则为回答内容 若平台为论坛/贴吧,则为用户回复内容
*/
private String content;
/**
* 来源 若平台为知乎,则为回答用户 若平台为论坛/贴吧,则为用户
*/
private String source;
/**
* 时间
*/
private Date time;
/**
* 平台
*/
private String pt;
/**
* 若为微博平台则有
*/
private int fensi;
/**
* 关键词规则
*/
private String wordRule;
/**
* 渠道规则
*/
private String channelRule;
public TemplateData(String url, String title, String content, String source, Date time, String pt) {
super();
this.url = url;
this.title = title;
this.content = content;
this.source = source;
this.time = time;
this.pt = pt;
}
public TemplateData() {
super();
}
}
package com.zhiwei.messageflow.es.bean;
/**
*
* @ClassName: WechatConstant
* @Description: TODO(放置微信中的一些常量)
* @author 陈炜涛
* @date 2017年11月4日 上午10:22:39
*/
public class WechatConstant {
/**
* 使用的微信号
*/
public static final String WECHAT_APPID = "wx2f555218d66e5948";
/**
* 预警模板id IT
*/
public static final String WECHAT_TEMPLATEID_EARLY_IT = "trBtKi7YyE_xbeH_xMtWYNGqAVgOZe4NlmPrH6mO-aw";
/**
* 获取access_token的url
*/
public static final String ACCESS_TOKEN_FETCH_URL = "https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid=APPID&secret=APPSECRET";
/**
* 获取用户基本信息的url
*/
public static final String WECHAT_USER_FETCH_URL = "https://api.weixin.qq.com/cgi-bin/user/info?access_token=ACCESS_TOKEN&openid=OPENID&lang=LANG";
/**
* 批量获取用户基本信息的url
*/
public static final String WECHAT_USER_BATCH_FETCH_URL = "https://api.weixin.qq.com/cgi-bin/user/info/batchget?access_token=ACCESS_TOKEN";
/**
* 创建标签url
*/
public static final String WECHAT_CREATE_TAG_URL = "https://api.weixin.qq.com/cgi-bin/tags/create?access_token=ACCESS_TOKEN";
/**
* 获取已创建的标签url
*/
public static final String WECHAT_GET_TAG_URL = "https://api.weixin.qq.com/cgi-bin/tags/get?access_token=ACCESS_TOKEN";
/**
* 编辑标签url
*/
public static final String WECHAT_EDIT_TAG_URL = "https://api.weixin.qq.com/cgi-bin/tags/update?access_token=ACCESS_TOKEN";
/**
* 删除标签url
*/
public static final String WECHAT_DELETE_TAG_URL = "https://api.weixin.qq.com/cgi-bin/tags/delete?access_token=ACCESS_TOKEN";
/**
* 获取该标签下的用户列表url
*/
public static final String WECHAT_GET_USER_OF_TAG_URL = "https://api.weixin.qq.com/cgi-bin/user/tag/get?access_token=ACCESS_TOKEN";
/**
* 批量为用户打标签url
*/
public static final String WECHAT_BATCH_TAG_USER_URL = "https://api.weixin.qq.com/cgi-bin/tags/members/batchtagging?access_token=ACCESS_TOKEN";
/**
* 批量为用户取消标签url
*/
public static final String WECHAT_BATCH_UNTAG_USER_URL = "https://api.weixin.qq.com/cgi-bin/tags/members/batchuntagging?access_token=ACCESS_TOKEN";
/**
* 获取用户身上标注的标签列表url
*/
public static final String WECHAT_GET_TAGS_OF_USER_URL = "https://api.weixin.qq.com/cgi-bin/tags/getidlist?access_token=ACCESS_TOKEN";
/**
* 给用户备注的url
*/
public static final String WECHAT_REMARK_USER_URL = "https://api.weixin.qq.com/cgi-bin/user/info/updateremark?access_token=ACCESS_TOKEN";
/**
* 获取公众号的黑名单列表
*/
public static final String WECHAT_GET_BLACK_LIST_URL = "https://api.weixin.qq.com/cgi-bin/tags/members/getblacklist?access_token=ACCESS_TOKEN";
/**
* 拉黑用户url
*/
public static final String WECHAT_BLACK_USER_URL = "https://api.weixin.qq.com/cgi-bin/tags/members/batchblacklist?access_token=ACCESS_TOKEN";
/**
* 取消拉黑用户url
*/
public static final String WECHAT_UNBLACK_USER_URL = "https://api.weixin.qq.com/cgi-bin/tags/members/batchunblacklist?access_token=ACCESS_TOKEN";
/**
* 发送模版消息
*/
public static final String WECHAT_TEMPLET_SEND_URL = "https://api.weixin.qq.com/cgi-bin/message/template/send?access_token=ACCESS_TOKEN";
/**
* accesstoken的过期时间
*/
public static final long ACCESS_TOKEN_EXPIRED_TIME = 7200L;
/**
* accesstoken过期提前量(提前200秒过期)
*/
public static final long ACCESS_TOKEN_EXPIRED_delta = 200L;
/**
* 默认编码
*/
public static final String DEFAULT_CHARSET = "UTF-8";
/**
* 最多一次拉黑人数
*/
public static final int WECHAT_BLACK_USER_MAX_SIZE = 20;
/**
* 给用户备注的最大长度
*/
public static final int WECHAT_USER_REMARK_MAX_LENGTH = 30;
/**
* 每日笑话菜单的key
*/
public static final String MENU_JOKE_CLICK_KEY = "joke";
/**
* 用户标签的最大长度
*/
public static final int WECHAT_USER_TAG_LENGTH = 30;
/**
* url中的accessToken占位符
*/
public static final String ACCESS_TOKEN = "ACCESS_TOKEN";
/**
* [获取验证码]菜单对应的eventKey
*/
public static final String MENU_VER_CODE = "ver_code";
/**
* [我的信息]菜单对应的eventKey
*/
public static final String MENU_MY_CLICK_KEY = "my";
/**
* [发送位置]菜单对应的eventKey
*/
public static final String MENU_LOCATION_SELECT_KEY = "location_select";
/**
* [系统拍照发图]菜单对应的eventKey
*/
public static final String MENU_PIC_SYS_PHOTO = "pic_sysphoto";
/**
* [拍照或相册发图]菜单对应的eventKey
*/
public static final String MENU_PIC_PHOTO_OR_ALBUM = "pic_photo_or_album";
/**
* [微信相册发图]菜单对应的eventKey
*/
public static final String MENU_PIC_WEIXIN = "pic_weixin";
public static final String MENU_SCAN_CODE_WAIT_MSG = "scancode_waitmsg";
}
package com.zhiwei.messageflow.es.dao;
import java.util.List;
import org.elasticsearch.search.SearchHits;
//第一层 从ES获取原始数据
public interface ESDao {
/**
* 获取微博消息
*
* @param keywords
* 关键词组
* @param count
* 消息数量
* @param start
* 开始rsid
* @param end
* 结束rsid
* @param platform
* 平台
* @param project
* 平台
* @return
*/
public SearchHits getWeiboDataFromEs(List<String> keywords, int count, long start, long end, String platform,
String project);
/**
* 获取知乎消息
*
* @param keywords
* 关键词组
* @param count
* 消息数量
* @param start
* 开始rsid
* @param end
* 结束rsid
* @param platform
* 平台
* @param project
* 平台
* @return
*/
public SearchHits getZhihuDataFromEs(List<String> keywords, int count, long start, long end, String platform,
String project);
/**
* 获取视频消息
*
* @param keywords
* 关键词组
* @param count
* 消息数量
* @param start
* 开始rsid
* @param end
* 结束rsid
* @param platform
* 平台
* @param project
* 平台
* @return
*/
public SearchHits getVideoDataFromEs(List<String> keywords, int count, long start, long end, String platform,
String project);
/**
* 获取网媒消息
*
* @param keywords
* 关键词组
* @param count
* 消息数量
* @param start
* 开始rsid
* @param end
* 结束rsid
* @param platform
* 平台
* @param project
* 平台
* @return
*/
public SearchHits getMediaDataFromEs(List<String> keywords, int count, long start, long end, String platform,
String project);
public SearchHits getWeiboDataycy(long date, List<String> keywords, int count, String platform, String project);
public SearchHits getZhihuDataycy(long date, List<String> keywords, int count, String platform, String project);
public SearchHits getVideoDataycy(long date, List<String> keywords, int count, String platform, String project);
public SearchHits getMediaDataycy(long date, List<String> keywords, int count, String platform, String project);
}
package com.zhiwei.messageflow.es.dao;
import org.elasticsearch.search.SearchHits;
public interface TrackESDao {
/**
* 微博关键词预警
*
* @param anyWord
* 或关系关键词
* @param allWords
* 且关系关键词
* @param color
* 高亮颜色
* @param count
* 消息数量
* @param start
* 开始rsid
* @param end
* 结束rsid
* @return
*/
public SearchHits getkeyWordsTrackWeiboFromEs(String anyWord, String allWords, String color, int count, long start,
long end);
/**
* 知乎关键词预警
*
* @param anyWord
* 或关系关键词
* @param allWords
* 且关系关键词
* @param color
* 高亮颜色
* @param count
* 消息数量
* @param start
* 开始rsid
* @param end
* 结束rsid
* @return
*/
public SearchHits getkeyWordsTrackZhihuFromEs(String anyWord, String allWords, String color, int count, long start,
long end);
/**
* 视频关键词预警
*
* @param anyWord
* 或关系关键词
* @param allWords
* 且关系关键词
* @param color
* 高亮颜色
* @param count
* 消息数量
* @param start
* 开始rsid
* @param end
* 结束rsid
* @return
*/
public SearchHits getkeyWordsTrackVideoFromEs(String anyWord, String allWords, String color, int count, long start,
long end);
/**
* 网媒关键词预警
*
* @param anyWord
* 或关系关键词
* @param allWords
* 且关系关键词
* @param color
* 高亮颜色
* @param count
* 消息数量
* @param start
* 开始rsid
* @param end
* 结束rsid
* @param platformName
* 平台名
* @return
*/
public SearchHits getkeyWordsTrackMediaFromEs(String anyWord, String allWords, String color, int count, long start,
long end, String platformName);
/**
* 微博渠道预警
*
* @param channel
* 渠道名
* @param color
* 高亮颜色
* @param count
* 消息数
* @param start
* 开始rsid
* @param end
* 结束rsid
* @return
*/
public SearchHits getchannelTrackWeiboFromEs(String channel, String color, int count, long start, long end);
/**
* 知乎渠道预警
*
* @param channel
* 渠道名
* @param color
* 高亮颜色
* @param count
* 消息数
* @param start
* 开始rsid
* @param end
* 结束rsid
* @return
*/
public SearchHits getchannelTrackZhihuFromEs(String channel, String color, int count, long start, long end);
/**
* 视频渠道预警
*
* @param channel
* 渠道名
* @param color
* 高亮颜色
* @param count
* 消息数
* @param start
* 开始rsid
* @param end
* 结束rsid
* @return
*/
public SearchHits getchannelTrackVideoFromEs(String channel, String color, int count, long start, long end);
/**
* 网媒渠道预警
*
* @param channel
* 渠道名
* @param color
* 高亮颜色
* @param count
* 消息数
* @param start
* 开始rsid
* @param end
* 结束rsid
* @param platformName
* 平台名
* @return
*/
public SearchHits getchannelTrackMediaFromEs(String channel, String color, int count, long start, long end,
String platformName);
/**
* 微博相似消息数预警
*
* @param articleTitle
* 标题
* @param color
* 高亮颜色
* @param count
* 消息数
* @param start
* 开始时间
* @param end
* 结束时间
* @return
*/
public SearchHits getarticleTrackWeiboFromEs(String articleTitle, String color, int count, long start, long end);
/**
* 视频相似消息数预警
*
* @param articleTitle
* 标题
* @param color
* 高亮颜色
* @param count
* 消息数
* @param start
* 开始rsid
* @param end
* 结束rsid
* @return
*/
public SearchHits getarticleTrackVideoFromEs(String articleTitle, String color, int count, long start, long end);
/**
* 微博相似消息数预警
*
* @param articleTitle
* 标题
* @param color
* 高亮颜色
* @param count
* 消息数
* @param start
* 开始时间
* @param end
* 结束时间
* @param platformName
* 平台名
* @return
*/
public SearchHits getarticleTrackMediaFromEs(String articleTitle, String color, int count, long start, long end,
String platformName);
}
package com.zhiwei.messageflow.es.service;
import java.util.List;
import com.zhiwei.messageflow.bean.MediaMessage;
import com.zhiwei.messageflow.bean.VideoMessage;
import com.zhiwei.messageflow.bean.WeiboMessage;
import com.zhiwei.messageflow.bean.ZhihuMessage;
import com.zhiwei.messageflow.mongo.bean.NoiseRule;
//第二层 将SearchHits封装成List<Message>
public interface ES4BeanService {
/**
* 获取微博消息
*
* @param keys
* 关键词
* @param count
* 消息数量
* @param startid
* 开始id
* @param endid
* 结束id
* @param platform
* 平台
* @param project
* 项目
* @return
*/
List<WeiboMessage> getWeiboMessage(List<NoiseRule> noiseRules,List<String> keywords, int count, long startid, long endid, String platform,
String project);
/**
* 获取知乎消息
*
* @param keys
* 关键词
* @param count
* 消息数量
* @param startid
* 开始id
* @param endid
* 结束id
* @param platform
* 平台
* @param project
* 项目
* @return
*/
List<ZhihuMessage> getZhihuMessage(List<NoiseRule> noiseRules,List<String> keywords, int count, long startid, long endid, String platform,
String project);
/**
* 获取视频消息
*
* @param keys
* 关键词
* @param count
* 消息数量
* @param startid
* 开始id
* @param endid
* 结束id
* @param platform
* 平台
* @param project
* 项目
* @return
*/
List<VideoMessage> getVideoMessage(List<NoiseRule> noiseRules,List<String> keywords, int count, long startid, long endid, String platform,
String project);
/**
* 获取网媒消息
*
* @param keys
* 关键词
* @param count
* 消息数量
* @param startid
* 开始id
* @param endid
* 结束id
* @param platform
* 平台
* @param project
* 项目
* @return
*/
List<MediaMessage> getMediaMessage(List<NoiseRule> noiseRules,List<String> keywords, int count, long startid, long endid, String platform,
String project);
}
package com.zhiwei.messageflow.es.service;
import java.util.List;
import com.zhiwei.messageflow.mongo.bean.TrackRule;
public interface EarlyWarningService {
/**
* 预警
*
* @param trackRules
* 预警规则
* @param count
* 预警消息数
* @param startid
* 开始rsid
* @param endid
* 结束rsid
* @param platformName
* 平台名
* @param projectName
* 项目名
*/
void earlyWarning(List<TrackRule> trackRules, int count, long startid, long endid, String platformName,
String projectName);
}
package com.zhiwei.messageflow.es.service;
import org.elasticsearch.search.SearchHit;
import com.zhiwei.messageflow.bean.MediaMessage;
import com.zhiwei.messageflow.bean.VideoMessage;
import com.zhiwei.messageflow.bean.WeiboMessage;
import com.zhiwei.messageflow.bean.ZhihuMessage;
public interface HighLightFillingService {
/**
* 微博平台ES数据封装+高亮处理+渠道影响力
*
* @param searchHit
* @return
*/
WeiboMessage getWeiboBean(SearchHit searchHit);
/**
* 知乎平台ES数据封装+高亮处理+渠道影响力
*
* @param searchHit
* @return
*/
ZhihuMessage getZhihuBean(SearchHit searchHit);
/**
* 视频ES数据封装+高亮处理+渠道影响力
*
* @param searchHit
* @return
*/
VideoMessage getVideoBean(SearchHit searchHit);
/**
* 网媒平台ES数据封装+高亮处理+渠道影响力
*
* @param searchHit
* @return
*/
MediaMessage getMediaBean(SearchHit searchHit);
}
package com.zhiwei.messageflow.es.service;
import java.util.List;
import org.elasticsearch.search.SearchHits;
import com.zhiwei.messageflow.bean.MediaMessage;
import com.zhiwei.messageflow.bean.VideoMessage;
import com.zhiwei.messageflow.bean.WeiboMessage;
import com.zhiwei.messageflow.bean.ZhihuMessage;
import com.zhiwei.messageflow.mongo.bean.NoiseRule;
public interface NoiseProcessingService {
List<WeiboMessage> weiboDenoising(List<NoiseRule> noiseRules,SearchHits searchHits, String platform, String project);
List<ZhihuMessage> zhihuDenoising(List<NoiseRule> noiseRules,SearchHits searchHits, String platform, String project);
List<VideoMessage> videoDenoising(List<NoiseRule> noiseRules,SearchHits searchHits, String platform, String project);
List<MediaMessage> mediaDenoising(List<NoiseRule> noiseRules,SearchHits searchHits, String platform, String project);
}
package com.zhiwei.messageflow.es.service;
import org.elasticsearch.search.SearchHits;
import com.zhiwei.messageflow.mongo.bean.TrackRule;
public interface TrackHitAndWarnService {
/**
* 关键词预警数据获取
*
* @param truckRule
* @param count
* @param startid
* @param endid
* @param platformName
* @return
*/
SearchHits keyWordsTrackHit(TrackRule truckRule, int count, long startid, long endid, String platformName);
/**
* 渠道预警数据获取
*
* @param truckRule
* @param count
* @param startid
* @param endid
* @param platformName
* @return
*/
SearchHits channelTrackHit(TrackRule truckRule, int count, long startid, long endid, String platformName);
/**
* 相似新闻数预警数据获取
*
* @param truckRule
* @param count
* @param startid
* @param endid
* @param platformName
* @return
*/
SearchHits articleTrackHit(TrackRule truckRule, int count, long startid, long endid, String platformName);
/**
* 邮件预警
*
* @param trackHits
* @param platformName
* @param trackRule
*/
void WarnEmail(SearchHits trackHit, String platformName, TrackRule trackRule, String projectName);
/**
* 微信预警
*
* @param trackHits
* @param platformName
* @param trackRule
*/
void WarnWechat(SearchHits trackHit, String platformName, TrackRule trackRule, String projectName);
}
package com.zhiwei.messageflow.es.service.impl;
import java.util.List;
import org.elasticsearch.search.SearchHits;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.zhiwei.messageflow.bean.MediaMessage;
import com.zhiwei.messageflow.bean.VideoMessage;
import com.zhiwei.messageflow.bean.WeiboMessage;
import com.zhiwei.messageflow.bean.ZhihuMessage;
import com.zhiwei.messageflow.es.dao.ESDao;
import com.zhiwei.messageflow.es.service.ES4BeanService;
import com.zhiwei.messageflow.es.service.NoiseProcessingService;
import com.zhiwei.messageflow.mongo.bean.NoiseRule;
@Component
public class ES4BeanServiceImpl implements ES4BeanService {
public final static Logger log = LoggerFactory.getLogger(ES4BeanServiceImpl.class);
@Autowired
private ESDao esDao;
@Autowired
private NoiseProcessingService noiseProcessingService;
@Override
public List<WeiboMessage> getWeiboMessage(List<NoiseRule> noiseRules, List<String> keywords, int count,
long startid, long endid, String platform, String project) {
List<WeiboMessage> messages = null;
try {
// 查询数据库 获得searchHits
SearchHits searchHits = esDao.getWeiboDataFromEs(keywords, count, startid, endid, platform, project);
if (searchHits == null) {
return null;
}
// 去噪并封装
messages = noiseProcessingService.weiboDenoising(noiseRules, searchHits, platform, project);
} catch (Exception e) {
log.error(e.getStackTrace() + " " + e.getMessage());
}
return messages;
}
@Override
public List<ZhihuMessage> getZhihuMessage(List<NoiseRule> noiseRules, List<String> keywords, int count,
long startid, long endid, String platform, String project) {
List<ZhihuMessage> messages = null;
// 查询数据库 获得searchHits
SearchHits searchHits = esDao.getZhihuDataFromEs(keywords, count, startid, endid, platform, project);
if (searchHits == null) {
return null;
}
// 去噪并封装
messages = noiseProcessingService.zhihuDenoising(noiseRules, searchHits, platform, project);
return messages;
}
@Override
public List<VideoMessage> getVideoMessage(List<NoiseRule> noiseRules, List<String> keywords, int count,
long startid, long endid, String platform, String project) {
List<VideoMessage> messages = null;
// 查询数据库 获得searchHits
SearchHits searchHits = esDao.getVideoDataFromEs(keywords, count, startid, endid, platform, project);
if (searchHits == null) {
return null;
}
// 去噪并封装
messages = noiseProcessingService.videoDenoising(noiseRules, searchHits, platform, project);
return messages;
}
@Override
public List<MediaMessage> getMediaMessage(List<NoiseRule> noiseRules, List<String> keywords, int count,
long startid, long endid, String platform, String project) {
List<MediaMessage> messages = null;
// 查询数据库 获得searchHits
SearchHits searchHits = esDao.getMediaDataFromEs(keywords, count, startid, endid, platform, project);
if (searchHits == null) {
return null;
}
// 去噪并封装
messages = noiseProcessingService.mediaDenoising(noiseRules, searchHits, platform, project);
return messages;
}
}
package com.zhiwei.messageflow.es.service.impl;
import java.util.List;
import org.elasticsearch.search.SearchHits;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.zhiwei.messageflow.es.service.EarlyWarningService;
import com.zhiwei.messageflow.es.service.TrackHitAndWarnService;
import com.zhiwei.messageflow.mongo.bean.TrackRule;
import com.zhiwei.messageflow.mongo.dao.TrackRuleDao;
@Component
public class EarlyWarningServiceImpl implements EarlyWarningService {
@Autowired
private TrackRuleDao trackRuleDao;
@Autowired
private TrackHitAndWarnService trackHitAndWarnService;
@Override
public void earlyWarning(List<TrackRule> trackRules, int count, long startid, long endid, String platformName,
String projectName) {
for (TrackRule trackRule : trackRules) {
// 判断是否预警
if (trackRule.isWarn()) {
continue;
}
/**
* 判断是否过期
*/
long createAt = trackRule.getCreateAt();
long addDay = 0;
if (trackRule.getEarlyWarningTime().equals("一天")) {
addDay = 24 * 60 * 60 * 1000L;
} else if (trackRule.getEarlyWarningTime().equals("二天")) {
addDay = 2 * 24 * 60 * 60 * 1000L;
} else if (trackRule.getEarlyWarningTime().equals("三天")) {
addDay = 3 * 24 * 60 * 60 * 1000L;
}
long confirm = createAt + addDay;
if (confirm <= System.currentTimeMillis()) {
// 过期
trackRule.setWarn(true);
trackRuleDao.updateTrackrule(trackRule.get_id());
} else {
// 判断规则
// 首先判断预警方式
if (trackRule.getEarlyWarning().equals("no")) {
} else if (trackRule.getEarlyWarning().equals("wechat")) {
// 微信预警
// 处理规则
SearchHits TrackHit = null;
if (trackRule.getRuleType().equals("keyWords")) {
// 关键词追踪
TrackHit = trackHitAndWarnService.keyWordsTrackHit(trackRule, count, startid, endid,
platformName);
} else if (trackRule.getRuleType().equals("channel")) {
// 渠道追踪
TrackHit = trackHitAndWarnService.channelTrackHit(trackRule, count, startid, endid,
platformName);
} else if (trackRule.getRuleType().equals("article")) {
// 相似新闻数追踪
TrackHit = trackHitAndWarnService.articleTrackHit(trackRule, count, startid, endid,
platformName);
}
trackHitAndWarnService.WarnWechat(TrackHit, platformName, trackRule, projectName);
} else if (trackRule.getEarlyWarning().equals("email")) {
// 邮箱预警
// 处理规则
SearchHits TrackHit = null;
if (trackRule.getRuleType().equals("keyWords")) {
// 关键词追踪
TrackHit = trackHitAndWarnService.keyWordsTrackHit(trackRule, count, startid, endid,
platformName);
} else if (trackRule.getRuleType().equals("channel")) {
// 渠道追踪
TrackHit = trackHitAndWarnService.channelTrackHit(trackRule, count, startid, endid,
platformName);
} else if (trackRule.getRuleType().equals("article")) {
// 相似新闻数追踪
TrackHit = trackHitAndWarnService.articleTrackHit(trackRule, count, startid, endid,
platformName);
}
trackHitAndWarnService.WarnEmail(TrackHit, platformName, trackRule, projectName);
}
}
}
}
}
package com.zhiwei.messageflow.getmessage;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.zhiwei.messageflow.getmessage.bean.Media;
import com.zhiwei.messageflow.getmessage.bean.Video;
import com.zhiwei.messageflow.getmessage.bean.Weibo;
import com.zhiwei.messageflow.getmessage.bean.ZhihuInfo;
import redis.clients.jedis.Tuple;
@Component
@SuppressWarnings("unchecked")
public class MatchingInfo {
private ObjectMapper mapper = new ObjectMapper();
SimpleDateFormat sdf = new SimpleDateFormat("E MMM dd HH:mm:ss z yyyy", Locale.US);
public List<Weibo> weiboMatchingInfo(Set<Tuple> set) {
List<Weibo> weibos = new ArrayList<>();
Map<String, Object> map = new HashMap<>();
for (Tuple t : set) {
Weibo weibo = new Weibo();
try {
map = mapper.readValue(t.getElement(), Map.class);
if (map.get("username") != null)
weibo.setUsername(map.get("username").toString());
if (map.get("roottext") != null)
weibo.setRoottext(map.get("roottext").toString());
if (map.get("rootname") != null)
weibo.setRootname(map.get("rootname").toString());
if (map.get("root_time") != null)
weibo.setRoot_time(map.get("root_time").toString());
if (map.get("rooturl") != null)
weibo.setRooturl(map.get("rooturl").toString());
if (map.get("img_url") != null)
weibo.setImg_url(map.get("img_url").toString());
if (map.get("channelIndex") != null)
weibo.setChannelIndex(Double.parseDouble(map.get("channelIndex").toString()));
if (map.get("retweet_status_id") != null)
weibo.setRetweet_status_id(map.get("retweet_status_id").toString());
if (map.get("weibo") != null)
weibo.setWeibo(Integer.parseInt(map.get("weibo").toString()));
if (map.get("isForward") != null)
weibo.setIsForward(Integer.parseInt(map.get("isForward").toString()));
if (map.get("fensi") != null)
weibo.setFensi(Integer.parseInt(map.get("fensi").toString()));
if (map.get("vtype") != null)
weibo.setVtype((Integer.parseInt(map.get("vtype").toString())));
if (map.get("_id") != null)
weibo.setId(map.get("_id").toString());
if (map.get("rstime") != null)
weibo.setRstime(Long.parseLong(map.get("rstime").toString()));
if (map.get("text") != null)
weibo.setText(map.get("text").toString());
if (map.get("user_id") != null)
weibo.setUser_id((map.get("user_id").toString()));
if (map.get("url") != null)
weibo.setUrl(map.get("url").toString());
if (map.get("time") != null)
weibo.setTime(map.get("time").toString());
System.out.println(weibo.toString());
weibos.add(weibo);
} catch (IOException e) {
e.printStackTrace();
}
}
return weibos;
}
public List<ZhihuInfo> zhihuMatchingInfo(Set<Tuple> set) {
List<ZhihuInfo> zhihuInfos = new ArrayList<>();
Map<String, Object> map = new HashMap<>();
for (Tuple t : set) {
ZhihuInfo zhihuInfo = new ZhihuInfo();
try {
map = mapper.readValue(t.getElement(), Map.class);
if (map.get("_id") != null)
zhihuInfo.setId(map.get("_id").toString());
if (map.get("rsid") != null)
zhihuInfo.setRsid(Long.parseLong(map.get("rsid").toString()));
if (map.get("type") != null)
zhihuInfo.setType(map.get("type").toString());
if (map.get("user_name") != null)
zhihuInfo.setUser_name(map.get("user_name").toString());
if (map.get("user_url") != null)
zhihuInfo.setUser_url(map.get("user_url").toString());
if (map.get("answer_count") != null)
zhihuInfo.setAnswer_count(Integer.parseInt(map.get("answer_count").toString()));
if (map.get("question_url") != null)
zhihuInfo.setQuestion_url(map.get("question_url").toString());
if (map.get("question_title") != null)
zhihuInfo.setQuestion_title(map.get("question_title").toString());
if (map.get("answer_content") != null)
zhihuInfo.setAnswer_content(map.get("answer_content").toString());
if (map.get("answer_url") != null)
zhihuInfo.setAnswer_url(map.get("answer_url").toString());
if (map.get("img") != null)
zhihuInfo.setImg(map.get("img").toString());
if (map.get("follow") != null)
zhihuInfo.setFollow(map.get("follow").toString());
if (map.get("browse") != null)
zhihuInfo.setBrowse(map.get("browse").toString());
if (map.get("insert_at") != null) {
Date d = sdf.parse(map.get("insert_at").toString());
zhihuInfo.setInsert_at(d);
}
if (map.get("created_at") != null) {
Date d = sdf.parse(map.get("created_at").toString());
zhihuInfo.setInsert_at(d);
}
System.out.println(zhihuInfo.toString());
zhihuInfos.add(zhihuInfo);
} catch (IOException | ParseException e) {
e.printStackTrace();
}
}
return zhihuInfos;
}
public List<Video> videoMatchingInfo(Set<Tuple> set) {
List<Video> videos = new ArrayList<>();
Map<String, Object> map = new HashMap<>();
for (Tuple t : set) {
Video video = new Video();
try {
map = mapper.readValue(t.getElement(), Map.class);
if (map.get("pt") != null)
video.setPt(map.get("pt").toString());
if (map.get("_id") != null)
video.setId(map.get("_id").toString());
if (map.get("rsid") != null)
video.setRsid(Long.parseLong(map.get("rsid").toString()));
if (map.get("title") != null)
video.setTitle(map.get("title").toString());
if (map.get("source") != null)
video.setSource(map.get("source").toString());
if (map.get("url") != null)
video.setUrl(map.get("url").toString());
if (map.get("uper") != null)
video.setUper(map.get("uper").toString());
if (map.get("time") != null) {
Date d = sdf.parse(map.get("time").toString());
video.setTime(d);
}
System.out.println(video.toString());
videos.add(video);
} catch (IOException | ParseException e) {
e.printStackTrace();
}
}
return videos;
}
public List<Media> mediaMatchingInfo(Set<Tuple> set) {
List<Media> medias = new ArrayList<>();
Map<String, Object> map = new HashMap<>();
for (Tuple t : set) {
Media media = new Media();
try {
map = mapper.readValue(t.getElement(), Map.class);
if (map.get("type") != null)
media.setType(map.get("type").toString());
if (map.get("pt") != null)
media.setPt(map.get("pt").toString());
if (map.get("_id") != null)
media.setId(map.get("_id").toString());
if (map.get("rsid") != null)
media.setRsid(Long.parseLong(map.get("rsid").toString()));
if (map.get("title") != null)
media.setTitle(map.get("title").toString());
if (map.get("source") != null)
media.setSource(map.get("source").toString());
if (map.get("content") != null)
media.setContent(map.get("content").toString());
if (map.get("channelIndex") != null)
media.setChannelIndex(Double.parseDouble(map.get("channelIndex").toString()));
if (map.get("time") != null) {
Date d = sdf.parse(map.get("time").toString());
media.setTime(d);
}
System.out.println(media.toString());
medias.add(media);
} catch (IOException | ParseException e) {
e.printStackTrace();
}
}
return medias;
}
}
package com.zhiwei.messageflow.getmessage.bean;
import java.util.Date;
import org.springframework.data.mongodb.core.mapping.Document;
import lombok.Data;
import lombok.ToString;
@Data
@ToString
@Document(collection = "net_media")
public class Media {
private String id;// 标志id,在数据库中指的是url
private String title;// 标题
private String content;// 内容
private String source;// 来源
private String rootSource;// 来源
private Date time;// 时间
private String savetime;// 保存时间
private String type;// 类别
private String pt;// 所属平台
private long rsid;// 自增id
private Double channelIndex;
private String emotionType;// 情感分类
private String eventType;// 事件分类
private String dangerLevel;// 危险等级
private String markGroup;// 标记组
private String markPerson;// 标记人
}
package com.zhiwei.messageflow.getmessage.bean;
import java.util.Date;
import lombok.Data;
import lombok.ToString;
@Data
@ToString
public class Video {
private String id;// 视频
private String videoLength;// videoLength 视频时长
private Date time;// time date 发布时间
private String title;// title 视频标题
private String url;// url 视频链接
private String content;// content 视频简介
private String uper;// uper 视频上传者
private String pt;// pt 平台
private String browseCount;// browseCount 视频观看数
private long rsid;// rsid 视频rsid
private String source;// source 视频来源
private String savetime;// 保存时间
}
package com.zhiwei.messageflow.getmessage.bean;
import lombok.Data;
import lombok.ToString;
@Data
@ToString
public class Weibo {
private String id;// 微博
private String time;// 发微博时间
private String day;
private String month;
private String hour;
private String text;// 微博内容
private long rstime;
private String user_id;// 用户id
private String username;// 微博用户昵称
private String source;// 微博来源
private int retweet_count;// 转发数
private int reply_count;// 评论数
private String url;// 微博地址
private String img_url;// 用户头像图片地址
private String weibo_img;// weibo含有的图片
private int vtype;// 认证类型
private int fensi;// 粉丝数
private int guanzhu;// 关注数
private int weibo;// 微博数
private String description;// 微博简介
private int isForward;// 原创还是转发
private String roottext;// 原创微博内容
private String retweet_status_id;
private String rootuid;// root用户id
private String rootname;// root用户的昵称
private String root_time;// root用户的发微博时间
private String rooturl;// root微博地址
private String rootsource;// 来源
private int root_repost;
private int root_reply;
private Double channelIndex;
private String emotionType;// 情感分类
private String eventType;// 事件分类
private String dangerLevel;// 危险等级
private String markTag;// 新版标签信息
private String markGroup;// 标记组
private String markPerson;// 标记人
private String markDate;// 标记日期
private int isMarked;// 是否标记
private String sex;// 性别
private String location;// 地点
public Weibo() {
}
}
package com.zhiwei.messageflow.getmessage.bean;
import java.util.Date;
import org.springframework.data.mongodb.core.mapping.Document;
import lombok.Data;
import lombok.ToString;
@Data
@ToString
@Document(collection = "zhihu")
public class ZhihuInfo {
private String id;// 主键 问题地址或答案地址
private String question_title;// 问题标题
private String question_content;// 问题标题
private String question_url;// 问题地址
private String answer_content;// 回答内容
private String answer_url;// 答案地址
private String user_name;// 回答者昵称
private String user_url;// 回答者主页
private int answer_count;// 问题回答数
private int attitudes_count;// 回答点赞数
private int comments_count;// 评论数
private String created_at;// 问题或回答创建时间
private Date insert_at;// 问题或答案第一次采集到的时间
private Date update_at;// 答案点赞书最后更新时间
private String type;// 消息类型 answer or question
private long rsid;// 信息自增键
private long question_id;// 问题id
private String img;// 图片
private String follow;
private String browse;
}
package com.zhiwei.messageflow.getmessage;
import java.util.List;
import java.util.Set;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.zhiwei.messageflow.getmessage.bean.ZhihuInfo;
import com.zhiwei.messageflow.redis.RedisPoolAndTools;
import redis.clients.jedis.Tuple;
@Component
public class getSet {
@Autowired
private RedisPoolAndTools redisPoolAndTools;
@Autowired
private MatchingInfo matchingInfo;
@SuppressWarnings("unused")
public Set<Tuple> getDataByeRedis(String redisTools,String keyValue,int page,int pageSize,int newDataCount) {
newDataCount=0;
page=1;
pageSize=20;
keyValue="知乎-美团-全部";
long startIndex = ((page - 1) * pageSize) + newDataCount;
long endIndex = startIndex + pageSize - 1;
Set<Tuple> set = null;
try {
set = redisPoolAndTools.getRevDataListByIndex(keyValue, startIndex, endIndex);
} catch (Exception e) {
e.printStackTrace();
}
// List<Media> medias= matchingInfo.mediaMatchingInfo(set);
// List<Weibo> weibos= matchingInfo.weiboMatchingInfo(set);
// List<Video> videos= matchingInfo.videoMatchingInfo(set);
List<ZhihuInfo> zhihuInfos= matchingInfo.zhihuMatchingInfo(set);
return set;
}
}
package com.zhiwei.messageflow.listener;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
@Component
public class ApplicationContextProvider implements ApplicationContextAware{
private static ApplicationContext context;
private ApplicationContextProvider(){}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
context = applicationContext;
}
public static <T> T getBean(String name,Class<T> aClass){
return context.getBean(name,aClass);
}
}
package com.zhiwei.messageflow.mail;
import java.util.Properties;
public class MailSenderInfo
{
// 发送服务器的IP和端口
private String mailServerHost;
private String mailServerPort = "25";
// 邮件发送者地址
private String fromAddress;
private String nickName;
// 邮件接受者的地址
private String toAddress;
// 登录邮件发送服务器的用户名和密码
private String userName;
private String password;
// 是否需要身份验证
private boolean validate = false;
// 邮件主题
private String subject;
// 邮件的文本内容
private String content;
// 邮件附件的文件名
private String[] attachFileNames;
/*
* 获得邮件会话属性
*/
public Properties getProperties()
{
Properties p = new Properties();
p.put("mail.smtp.host", this.mailServerHost);
p.put("mail.smtp.port", this.mailServerPort);
p.put("mail.smtp.auth", validate ? "true" : "false");
return p;
}
public String getMailServerHost()
{
return mailServerHost;
}
public void setMailServerHost(String mailServerHost)
{
this.mailServerHost = mailServerHost;
}
public String getMailServerPort()
{
return mailServerPort;
}
public void setMailServerPort(String mailServerPort)
{
this.mailServerPort = mailServerPort;
}
public String getFromAddress()
{
return fromAddress;
}
public void setFromAddress(String fromAddress)
{
this.fromAddress = fromAddress;
}
public String getToAddress()
{
return toAddress;
}
public void setToAddress(String toAddress)
{
this.toAddress = toAddress;
}
public String getUserName()
{
return userName;
}
public void setUserName(String userName)
{
this.userName = userName;
}
public String getPassword()
{
return password;
}
public void setPassword(String password)
{
this.password = password;
}
public boolean isValidate()
{
return validate;
}
public void setValidate(boolean validate)
{
this.validate = validate;
}
public String getSubject()
{
return subject;
}
public void setSubject(String subject)
{
this.subject = subject;
}
public String getContent()
{
return content;
}
public void setContent(String content)
{
this.content = content;
}
public String[] getAttachFileNames()
{
return attachFileNames;
}
public void setAttachFileNames(String[] attachFileNames)
{
this.attachFileNames = attachFileNames;
}
public String getNickName() {
return nickName;
}
public void setNickName(String nickName) {
this.nickName = nickName;
}
}
package com.zhiwei.messageflow.mail;
import javax.mail.Authenticator;
import javax.mail.PasswordAuthentication;
public class MyAuthenticator extends Authenticator{
String userName=null;//用户名
String password=null;//密码
/*
* 无参构造
*/
public MyAuthenticator(){}
/*
* 代参构造
*/
public MyAuthenticator(String username,String cs){
this.userName=username;
this.password=cs;
}
/*
*
*/
protected PasswordAuthentication getPasswordAuthentication(){
return new PasswordAuthentication(userName,password);
}
}
package com.zhiwei.messageflow.mail;
import com.zhiwei.messageflow.es.bean.TemplateData;
import com.zhiwei.messageflow.util.TimeUtil;
public class SendMail {
/**
* @Title: sendMail
* @Description: TODO(发送邮件)
* @param mailContent
* @param email
* 邮件目标用户 设定文件
* @return boolean 返回类型
*/
public static boolean sendMailWarning(String email, TemplateData templateData) {
// 这个类主要是设置邮件
MailSenderInfo mailInfo = new MailSenderInfo();
mailInfo.setMailServerHost("smtp.ym.163.com");
// mailInfo.setMailServerHost("smtp.163.com");
mailInfo.setMailServerPort("25");
mailInfo.setValidate(true);
mailInfo.setUserName("qbjcyj@zhiweidata.com");
mailInfo.setPassword("1q2w3e4r");// 您的邮箱密码
mailInfo.setFromAddress("qbjcyj@zhiweidata.com");
mailInfo.setNickName("知微情报监测服务");
boolean f = false;
mailInfo.setToAddress(email);
mailInfo.setSubject("知微情报监测预警");
mailInfo.setContent(mailContent(templateData));
// 这个类主要来发送邮件
f = SimpleMailSender.sendHtmlMail(mailInfo);// 发送文体格式
return f;
}
// /**
// * @Title: sendMailBriefing
// * @Description: TODO(发送舆情简报)
// * @param email
// * @param mailTitle 邮件标题
// * @param briefs
// * @return 设定文件
// * @date 2017年11月16日 下午5:48:14
// * @return boolean 返回类型
// */
// public static boolean sendMailBriefing(String email, String
// mailTitle,List<Brief> briefs) {
// // 这个类主要是设置邮件
// MailSenderInfo mailInfo = new MailSenderInfo();
// mailInfo.setMailServerHost("smtp.ym.163.com");
// // mailInfo.setMailServerHost("smtp.163.com");
// mailInfo.setMailServerPort("25");
// mailInfo.setValidate(true);
// mailInfo.setUserName("qbjcyj@zhiweidata.com");
// mailInfo.setPassword("1q2w3e4r");// 您的邮箱密码
// mailInfo.setFromAddress("qbjcyj@zhiweidata.com");
// mailInfo.setNickName("知微情报监测服务");
//
// boolean f = false;
// mailInfo.setToAddress(email);
// mailInfo.setSubject(mailTitle);
//
// mailInfo.setContent(mailContent(briefs));
// // 这个类主要来发送邮件
// f = SimpleMailSender.sendHtmlMail(mailInfo);// 发送文体格式
// return f;
// }
/**
*
* @Title: mailContent
* @Description: TODO(拼接所发邮件内容)
* @param templateData
* @return 设定文件
* @date 2017年11月8日 下午4:29:41
* @return String 返回类型
*/
private static String mailContent(TemplateData templateData) {
StringBuffer sb = new StringBuffer();
sb.append("<style> </style> ");
sb.append("<div>你好,收到一条" + templateData.getPt() + "的预警通知,预警内容如下:</div>");
if ("新闻客户端".equals(templateData.getPt()) || "网媒".equals(templateData.getPt())
|| "微信".equals(templateData.getPt()) || "报刊".equals(templateData.getPt())
|| "今日头条".equals(templateData.getPt())) {
sb.append("<div>渠道:" + templateData.getSource() + "</div><br>");
sb.append("<div>平台:网络媒体</div><br>");
sb.append("<div>发布时间:"
+ getHotLink(TimeUtil.formatDate(templateData.getTime(), "yyyy-MM-dd HH:mm"), templateData.getUrl())
+ "</div><br>");
sb.append("<div>标题:" + templateData.getTitle() + "</div><br>");
sb.append("<div>摘要:" + templateData.getContent() + "</div><br>");
} else if ("微博".equals(templateData.getPt())) {
sb.append("<div>微博账号:" + templateData.getSource() + "(粉丝量:" + dealFensi(templateData.getFensi())
+ ")</div><br>");
sb.append("<div>发布时间:"
+ getHotLink(TimeUtil.formatDate(templateData.getTime(), "yyyy-MM-dd HH:mm"), templateData.getUrl())
+ "</div><br>");
sb.append("<div>发布内容:" + templateData.getContent() + "</div><br>");
} else if ("知乎".equals(templateData.getPt()) || "论坛".equals(templateData.getPt())
|| "贴吧".equals(templateData.getPt())) {
sb.append("<div>问题:" + templateData.getTitle() + "</div><br>");
sb.append("<div>回答用户:" + templateData.getSource() + "</div><br>");
sb.append("<div>回答时间:"
+ getHotLink(TimeUtil.formatDate(templateData.getTime(), "yyyy-MM-dd HH:mm"), templateData.getUrl())
+ "</div><br>");
sb.append("<div>答案:" + templateData.getContent() + "</div><br>");
} else {
sb.append("<div>来源:" + templateData.getSource() + "</div><br>");
sb.append("<div>平台:" + templateData.getPt() + "</div><br>");
sb.append("<div>发布时间:"
+ getHotLink(TimeUtil.formatDate(templateData.getTime(), "yyyy-MM-dd HH:mm"), templateData.getUrl())
+ "</div><br>");
sb.append("<div>标题:" + templateData.getTitle() + "</div><br>");
sb.append("<div>摘要:" + templateData.getContent() + "</div><br>");
}
if (null != templateData.getChannelRule()) {
sb.append("<div>命中渠道规则:" + templateData.getChannelRule() + "</div><br>");
}
if (null != templateData.getWordRule()) {
sb.append("<div>命中关键词规则:" + templateData.getWordRule() + "</div><br>");
}
sb.append("<div>Best Regards,</div>");
sb.append("<div>知微——情报监测</div>");
sb.append("<style>*{font-family:Microsoft YaHei;}</style>");
return sb.toString();
}
// /**
// *
// * @Title: mailContent
// * @Description: TODO(拼接所发邮件内容)
// * @param templateData
// * @return 设定文件
// * @date 2017年11月8日 下午4:29:41
// * @return String 返回类型
// */
// private static String mailContent(List<Brief> briefs) {
// StringBuffer sb = new StringBuffer();
//
// Brief brief;
// for (int i = 0; i < briefs.size(); i++) {
// brief = briefs.get(i);
// sb.append("<div><B>" +getHotLink(brief.getTitle(), brief.getUrl())
// +"</B></div><br>");
// sb.append("<div>" + brief.getDetails() + "</div><br>");
// sb.append("<br>");
// }
// sb.append("<div>Best Regards,</div>");
// sb.append("<div>知微——情报监测</div>");
// sb.append("<style>*{ font-size:18px} *{font-family:Microsoft
// YaHei;}</style>");
//
// return sb.toString();
// }
/**
* @Title: getHotLink
* @Description: TODO(这里用一句话描述这个方法的作用)
* @param relation2
* 设定文件
* @return String 返回类型
*/
private static String getHotLink(String name, String url) {
return "<a target='_blank' href='" + url + "'>" + name + "</a>";
}
/**
*
* @Title: dealFensi
* @Description: TODO(处理粉丝量)
* @param fensi
* @return 设定文件
* @date 2017年11月8日 下午4:10:12
* @return String 返回类型
*/
private static String dealFensi(int fensi) {
String fensiStr = "";
if (fensi < 10000) {
fensiStr = fensi + "";
} else {
fensiStr = fensi / 10000 + "万";
}
return fensiStr;
}
}
package com.zhiwei.messageflow.mail;
import java.io.UnsupportedEncodingException;
import java.util.Date;
import java.util.Properties;
import javax.mail.Address;
import javax.mail.BodyPart;
import javax.mail.Message;
import javax.mail.MessagingException;
import javax.mail.Multipart;
import javax.mail.Session;
import javax.mail.Transport;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeBodyPart;
import javax.mail.internet.MimeMessage;
import javax.mail.internet.MimeMultipart;
public class SimpleMailSender
{
/*
* 以文本的形式发送邮件 param mailInfo待发送的邮件信息
*/
public boolean sendTextMail(MailSenderInfo mailInfo)
{
// 判断是否需要身份验证֤
MyAuthenticator authenticator = null;
Properties pro = mailInfo.getProperties();
if (mailInfo.isValidate())
{
// 如果需要身份验证,则创建一个密码验证器
authenticator = new MyAuthenticator(mailInfo.getUserName(),
mailInfo.getPassword());
}
// 根据邮件会话属性和密码验证器构造一个发送邮件的session
Session sendMailSession = Session.getInstance(pro, authenticator);
try
{
// 根据Session创建一个邮件消息
Message mailMessage = new MimeMessage(sendMailSession);
// 创建邮件发送者地址ߵ�ַ
Address from = new InternetAddress(mailInfo.getFromAddress());
//设置自定义发件人昵称
if (mailInfo.getNickName()!=null) {
String nick="";
try {
nick=javax.mail.internet.MimeUtility.encodeText(mailInfo.getNickName());
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
mailMessage.setFrom(new InternetAddress(nick+" <"+mailInfo.getFromAddress()+">"));
}else {
// 设置邮件消息的发送者
mailMessage.setFrom(from);
}
// 创建邮件的接收者地址,并设置到邮件消息中
Address to = new InternetAddress(mailInfo.getToAddress());
mailMessage.setRecipient(Message.RecipientType.TO, to);
// 设置邮件消息主题
mailMessage.setSubject(mailInfo.getSubject());
// 设置邮件消息发送的时间
mailMessage.setSentDate(new Date());
// 设置邮件消息发送的时间
String mailContent = mailInfo.getContent();
mailMessage.setText(mailContent);
// 发送邮件
Transport.send(mailMessage);
return true;
}
catch (Exception e)
{
e.printStackTrace();
}
return false;
}
/*
* 以HTML的格式发送邮件
*/
public static boolean sendHtmlMail(MailSenderInfo mailInfo)
{
// 判断是否需要身份认证
MyAuthenticator authenticator = null;
Properties pro = mailInfo.getProperties();
// 如果需要身份认证,则创建一个密码验证器
if (mailInfo.isValidate())
{
authenticator = new MyAuthenticator(mailInfo.getUserName(),
mailInfo.getPassword());
}
// 根据邮件会话属性和密码验证器构造一个发送邮件的session
Session sendMailSession = Session
.getDefaultInstance(pro, authenticator);
try
{
// 根据session创建一个邮件消息
Message mailMessage = new MimeMessage(sendMailSession);
// 创建邮件发送者地址
Address from = new InternetAddress(mailInfo.getFromAddress());
//设置自定义发件人昵称
if (mailInfo.getNickName()!=null) {
String nick="";
try {
nick=javax.mail.internet.MimeUtility.encodeText(mailInfo.getNickName());
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
mailMessage.setFrom(new InternetAddress(nick+" <"+mailInfo.getFromAddress()+">"));
}else {
// 设置邮件消息的发送者
mailMessage.setFrom(from);
}
// 创建邮件的接收者地址,并设置到邮件消息中
Address to = new InternetAddress(mailInfo.getToAddress());
// Message.RecipientType.TO属性表示接收者的类型为TO
mailMessage.setRecipient(Message.RecipientType.TO, to);
// 设置邮件消息的主题
mailMessage.setSubject(mailInfo.getSubject());
// 设置邮件消息发送的时间
mailMessage.setSentDate(new Date());
// MiniMultipart类是一个容器类,包含MimeBodyPart类型的对象
Multipart mainPart = new MimeMultipart();
// 创建一个包含HTML内容的MimeBodyPart
BodyPart html = new MimeBodyPart();
// 设置HTML内容
html.setContent(mailInfo.getContent(), "text/html; charset=utf-8");
mainPart.addBodyPart(html);
// 将MiniMultipart对象设置为邮件内容
mailMessage.setContent(mainPart);
// 发送邮件
Transport.send(mailMessage);
return true;
}
catch (MessagingException ex)
{
ex.printStackTrace();
}
return false;
}
}
//package com.zhiwei.messageflow.mongo;
//
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.data.mongodb.MongoDbFactory;
//import org.springframework.data.mongodb.core.MongoTemplate;
//import org.springframework.data.mongodb.core.SimpleMongoDbFactory;
//import org.springframework.stereotype.Component;
//
//import com.mongodb.MongoClient;
//import com.mongodb.MongoClientOptions;
//import com.mongodb.MongoCredential;
//import com.mongodb.ServerAddress;
//import com.zhiwei.messageflow.config.MongoConfig;
//
//@Component
//public class MongoFactoryBean {
//
// @Autowired
// private static MongoConfig mongoConfig;
//
// public MongoDbFactory mongoDbFactory (){
//
// /**
// * 客户端配置(连接数、副本集群验证)
// */
// MongoClientOptions.Builder builder = new MongoClientOptions.Builder();
// System.out.println(mongoConfig.toString());
// builder.connectionsPerHost(mongoConfig.getConnectionsPerHost());
// builder.connectTimeout(mongoConfig.getConnectTimeout());
// builder.threadsAllowedToBlockForConnectionMultiplier(
// mongoConfig.getThreadsAllowedToBlockForConnectionMultiplier());
// builder.maxWaitTime(mongoConfig.getMaxWaitTime());
// MongoClientOptions mongoClientOptions = builder.build();
//
// // MongoDB地址列表
// ServerAddress serverAddress = new ServerAddress(mongoConfig.getServerIP(), mongoConfig.getServerport());
//
// // 连接认证
// MongoCredential mongoCredential = MongoCredential.createScramSha1Credential(mongoConfig.getServerUsername(),
// mongoConfig.getAuthenticationDatabase(), mongoConfig.getServerPassword().toCharArray());
//
// //创建客户端和Factory
// MongoClient mongoClient = new MongoClient(serverAddress, mongoCredential, mongoClientOptions);
// MongoDbFactory mongoDbFactory = new SimpleMongoDbFactory(mongoClient, mongoConfig.getServerDBName());
//
// return mongoDbFactory;
// }
//
// public MongoTemplate mongoTemplate() {
// return new MongoTemplate(mongoDbFactory());
// }
//}
\ No newline at end of file
package com.zhiwei.messageflow.mongo.bean;
import org.springframework.data.mongodb.core.mapping.Document;
import lombok.Data;
import lombok.ToString;
@Data
@ToString
@Document(collection = "HuserInfoMedia")
public class HuserInfoMedia {
private String _id;
private Integer pr;
private Integer aleax;
private Integer monthAVGAleax;
private Long monthPV;
private Integer H;
private String source;
private Double pv;
private Double normalization;
private Integer mediaType;
private Double channelIndex;
}
package com.zhiwei.messageflow.mongo.bean;
import org.springframework.data.mongodb.core.mapping.Document;
import lombok.Data;
import lombok.ToString;
@Data
@ToString
@Document(collection = "HuserInfoWeibo")
public class HuserInfoWeibo {
private String _id;
private String username;
private Integer followsCount;
private Integer weiboCount;
private Integer friendsCound;
private String vType;
private Integer H;
private Double channelIndex;
}
package com.zhiwei.messageflow.mongo.bean;
import org.springframework.data.mongodb.core.mapping.Document;
import lombok.Data;
import lombok.ToString;
@Data
@ToString
@Document(collection = "HuserInfoWeixin")
public class HuserInfoWeixin {
private String _id;
private String uid;
private String username;
private String levelOne;
private String levelTwo;
private String openid;
private Integer avgReadCount;
private Integer avgLikeCount;
private Integer H;
private Double channelIndex;
}
package com.zhiwei.messageflow.mongo.bean;
import java.util.List;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
import lombok.Data;
import lombok.ToString;
@Data
@ToString
@Document(collection="qbjc_keywordNew")
public class KeywordNew {
@Id
private String _id;
private String keyTitle;// 关键词组
private List<String> keyWords;//关键词
private String project;// 项目
private String submitter;
private long createAt;
}
package com.zhiwei.messageflow.mongo.bean;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
import lombok.Data;
import lombok.ToString;
@Data
@ToString
@Document(collection = "qbjc_noiserule")
public class NoiseRule {
@Id
private long _id;
private String ruleName;
private String keyWordPt;
private String ruleExplain;
private String ruleType;
private String keyWordsInputOne;
private String andOr;
private String keyWordsInputTwo;
private String project;
private long createAt;
private String submitter;
private String channelPt;
private String channelQd;
}
package com.zhiwei.messageflow.mongo.bean;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
import lombok.Data;
import lombok.ToString;
@Data
@ToString
@Document(collection = "qbjc_platform")
public class Platform{
@Id
private long _id;
private String Platform;
private String PlatformName;
private String Type;
private long createAt;
}
package com.zhiwei.messageflow.mongo.bean;
import java.util.List;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
import lombok.Data;
import lombok.ToString;
@Data
@ToString
@Document(collection = "qbjc_project")
public class Project {
/**
* id
*/
@Id
private long _id;
/**
* 项目名
*/
private String projectName;
private int keyGroupLimit;
private int keywordsLimit;
private int traceLimit;
private int noiseLimit;
private int userLimit;
private int tagLimit;
private long createAt;
private boolean search;
private int searchLimit;
/**
* 私有平台列表
*/
private List<String> dataPt;
private boolean weiboHotSearch;
private int gatherKeyWord;
private boolean download;
}
package com.zhiwei.messageflow.mongo.bean;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
import lombok.Data;
import lombok.ToString;
@Data
@ToString
@Document(collection = "qbjc_trackrule")
public class TrackRule {
@Id
private long _id;
private String ruleName;
private String ruleExplain;
private String ruleType;
private String KeyWordsInputOne;
private String andOr;
private String KeyWordsInputTwo;
private String earlyWarning;
private String highlighted;
private String earlyWarningTime;
private boolean isWarn;
private long createAt;
private String submitter;
private String project;
private String channelPt;
private String channelQd;
private String articleTitle;
private String articleNum;
private String timeRange;
}
package com.zhiwei.messageflow.mongo.bean;
import java.util.Date;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
import lombok.Data;
import lombok.ToString;
@Data
@ToString
@Document(collection = "qbjc_user_mail")
public class UserMail {
@Id
private String _id;
private String address;
private String project;
private Date createAt;
}
package com.zhiwei.messageflow.mongo.bean;
import java.util.Date;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
import lombok.Data;
import lombok.ToString;
@Data
@ToString
@Document(collection = "qbjc_user_wechat")
public class UserWechat {
@Id
private String _id;
private String nickname;
private Long uid;
private String username;
private String project;
private String appId;
private String openid;
private Date createAt;
}
package com.zhiwei.messageflow.mongo.bean;
import java.util.Date;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
import lombok.Data;
import lombok.ToString;
@Data
@ToString
@Document(collection = "wechatCode")
public class WechatCode {
@Id
private String _id;
private String appsecret;
private String token;
private String encodingAESKey;
private String accessToken;
private Date expiresTime;
private Date updateTime;
}
package com.zhiwei.messageflow.mongo.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.MongoDbFactory;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.SimpleMongoDbFactory;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientOptions;
import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress;
import com.zhiwei.messageflow.config.MongoConfig;
import lombok.Data;
@Data
public abstract class AbstractMongoConfig {
private String database;
private String username;
private String password;
private String host;
private String authenticationDatabase;
private Integer port;
@Autowired
private MongoConfig mongoConfig;
/**
* 创建mongoDbFactory
*
* @return
* @throws Exception
*/
public MongoDbFactory mongoDbFactory() throws Exception {
ServerAddress serverAddress = new ServerAddress(host, port);
MongoCredential mongoCredential = MongoCredential.createCredential(username, authenticationDatabase,
password.toCharArray());
MongoClientOptions.Builder builder = new MongoClientOptions.Builder();
builder.connectionsPerHost(mongoConfig.getConnectionsPerHost());
builder.connectTimeout(mongoConfig.getConnectTimeout());
builder.threadsAllowedToBlockForConnectionMultiplier(
mongoConfig.getThreadsAllowedToBlockForConnectionMultiplier());
builder.maxWaitTime(mongoConfig.getMaxWaitTime());
MongoClientOptions mongoClientOptions = builder.build();
return new SimpleMongoDbFactory(new MongoClient(serverAddress, mongoCredential, mongoClientOptions), database);
}
/**
* 获取mongoTemplate
*
* @return
* @throws Exception
*/
abstract public MongoTemplate getMongoTemplate() throws Exception;
}
package com.zhiwei.messageflow.mongo.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.mongodb.core.MongoTemplate;
@Configuration
@ConfigurationProperties(prefix = "spring.data.mongodb.primary")
public class PrimaryMongoConfig extends AbstractMongoConfig{
@Primary
@Override
public @Bean(name = "primaryMongoTemplate") MongoTemplate getMongoTemplate() throws Exception {
return new MongoTemplate(mongoDbFactory());
}
}
package com.zhiwei.messageflow.mongo.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.mongodb.core.MongoTemplate;
@Configuration
@ConfigurationProperties(prefix = "spring.data.mongodb.secondary")
public class SecondaryMongoConfig extends AbstractMongoConfig {
@Override
public @Bean(name = "secondaryMongoTemplate") MongoTemplate getMongoTemplate() throws Exception {
return new MongoTemplate(mongoDbFactory());
}
}
package com.zhiwei.messageflow.mongo.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.mongodb.core.MongoTemplate;
@Configuration
@ConfigurationProperties(prefix = "spring.data.mongodb.thirdary")
public class ThirdaryMongoConfig extends AbstractMongoConfig{
@Override
public @Bean(name = "thirdaryMongoTemplate") MongoTemplate getMongoTemplate() throws Exception {
return new MongoTemplate(mongoDbFactory());
}
}
package com.zhiwei.messageflow.mongo.dao;
import com.zhiwei.messageflow.mongo.bean.HuserInfoMedia;
import com.zhiwei.messageflow.mongo.bean.HuserInfoWeibo;
import com.zhiwei.messageflow.mongo.bean.HuserInfoWeixin;
public interface HuserInfoDao {
/**
* 微博渠道信息获取
*
* @param user_id
* @return
*/
HuserInfoWeibo getHuserInfoWeibo(String user_id);
/**
* 微信渠道信息获取
*
* @param username
* @return
*/
HuserInfoWeixin getHuserInfoWeixin(String username);
/**
* 网媒渠道信息获取
*
* @param source
* @return
*/
HuserInfoMedia getHuserInfoMedia(String source);
}
package com.zhiwei.messageflow.mongo.dao;
import java.util.List;
import com.zhiwei.messageflow.mongo.bean.KeywordNew;
public interface KeywordNewDao {
/**
* 获取项目关键词组列表
*
* @param projectName
* 项目名
* @return
*/
List<KeywordNew> getKeywordNewByProject(String projectName);
}
package com.zhiwei.messageflow.mongo.dao;
import java.util.List;
import com.zhiwei.messageflow.mongo.bean.NoiseRule;
public interface NoiseRuleDao {
/**
* 根据项目获取噪音规则
*
* @param projectName
* @return
*/
List<NoiseRule> getNoiseRuleByProject(String projectName);
}
package com.zhiwei.messageflow.mongo.dao;
import java.util.List;
public interface PlatformDao {
/**
* 获取公共平台名列表
*
* @return
*/
List<String> getPublicPlatformName();
/**
* 获取全部平台名列表
*
* @return
*/
List<String> getAllPlatformName();
}
package com.zhiwei.messageflow.mongo.dao;
import java.util.List;
import com.zhiwei.messageflow.mongo.bean.Project;
public interface ProjectDao {
/**
* 获取项目列表
*
* @return
*/
List<Project> getAllProjects();
}
package com.zhiwei.messageflow.mongo.dao;
import java.util.List;
import com.zhiwei.messageflow.mongo.bean.TrackRule;
public interface TrackRuleDao {
/**
* 根据项目查询预警规则
*
* @param projectName
* @return
*/
List<TrackRule> getTrackRuleByProject(String projectName);
/**
* 更新项目预警情况
*
* @param trackRule_id
*/
void updateTrackrule(Long trackRule_id);
}
package com.zhiwei.messageflow.mongo.dao;
import java.util.List;
import com.zhiwei.messageflow.mongo.bean.UserMail;
public interface UserMailDao {
/**
* 获取预警邮箱
*
* @param projectName
* @return
*/
List<UserMail> findUserMailByProject(String projectName);
}
package com.zhiwei.messageflow.mongo.dao;
import java.util.List;
import com.zhiwei.messageflow.mongo.bean.UserWechat;
public interface UserWechatDao {
/**
* 获取预警微信
*
* @param projectName
* @return
*/
List<UserWechat> getUserWechatByProject(String projectName);
}
package com.zhiwei.messageflow.mongo.dao;
import com.zhiwei.messageflow.mongo.bean.WechatCode;
public interface WechatCodeDao {
/**
* 获取预警微信公众号权限
*
* @param AppId
* @return
*/
WechatCode getWechatCodeByAppId(String AppId);
}
package com.zhiwei.messageflow.mongo.dao.impl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.stereotype.Component;
import com.zhiwei.messageflow.mongo.bean.HuserInfoMedia;
import com.zhiwei.messageflow.mongo.bean.HuserInfoWeibo;
import com.zhiwei.messageflow.mongo.bean.HuserInfoWeixin;
import com.zhiwei.messageflow.mongo.dao.HuserInfoDao;
@Component
public class HuserInfoDaoImpl implements HuserInfoDao {
@Autowired
@Qualifier(value = "secondaryMongoTemplate")
protected MongoTemplate secondaryMongoTemplate;
@Override
public HuserInfoWeibo getHuserInfoWeibo(String user_id) {
Query query = new Query().addCriteria(Criteria.where("_id").is(user_id));
return secondaryMongoTemplate.findOne(query, HuserInfoWeibo.class);
}
@Override
public HuserInfoWeixin getHuserInfoWeixin(String username) {
Query query = new Query().addCriteria(Criteria.where("username").is(username));
return secondaryMongoTemplate.findOne(query, HuserInfoWeixin.class);
}
@Override
public HuserInfoMedia getHuserInfoMedia(String source) {
Query query = new Query().addCriteria(Criteria.where("source").is(source));
return secondaryMongoTemplate.findOne(query, HuserInfoMedia.class);
}
}
package com.zhiwei.messageflow.mongo.dao.impl;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.stereotype.Component;
import com.zhiwei.messageflow.mongo.bean.KeywordNew;
import com.zhiwei.messageflow.mongo.dao.KeywordNewDao;
@Component
public class KeywordNewDaoImpl implements KeywordNewDao {
@Autowired
@Qualifier(value = "primaryMongoTemplate")
protected MongoTemplate primaryMongoTemplate;
@Override
public List<KeywordNew> getKeywordNewByProject(String projectName) {
Query query = new Query();
query.addCriteria(Criteria.where("project").is(projectName));
return primaryMongoTemplate.find(query, KeywordNew.class);
}
}
package com.zhiwei.messageflow.mongo.dao.impl;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.stereotype.Component;
import com.zhiwei.messageflow.mongo.bean.NoiseRule;
import com.zhiwei.messageflow.mongo.dao.NoiseRuleDao;
@Component
public class NoiseRuleDaoImpl implements NoiseRuleDao {
@Autowired
@Qualifier(value = "primaryMongoTemplate")
protected MongoTemplate primaryMongoTemplate;
@Override
public List<NoiseRule> getNoiseRuleByProject(String projectName) {
Query query = new Query();
query.addCriteria(Criteria.where("project").is(projectName));
return primaryMongoTemplate.find(query, NoiseRule.class);
}
}
package com.zhiwei.messageflow.mongo.dao.impl;
import java.util.ArrayList;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.stereotype.Component;
import com.zhiwei.messageflow.mongo.bean.Platform;
import com.zhiwei.messageflow.mongo.dao.PlatformDao;
@Component
public class PlatformDaoImpl implements PlatformDao {
@Autowired
@Qualifier(value = "primaryMongoTemplate")
protected MongoTemplate primaryMongoTemplate;
@Override
public List<String> getPublicPlatformName() {
// mongoTemplate.getCollectionName(Platform.class);
// Query query = new Query();
// query.addCriteria(Criteria.where("Type").is("public"));
// List<Platform> platforms = mongoTemplate.find(query, Platform.class);
List<Platform> platforms = primaryMongoTemplate.findAll(Platform.class);
List<String> platformNames = new ArrayList<>();
for (Platform p : platforms) {
if (p.getType().equals("public")) {
platformNames.add(p.getPlatformName());
}
}
return platformNames;
}
@Override
public List<String> getAllPlatformName() {
List<Platform> platforms = primaryMongoTemplate.findAll(Platform.class);
List<String> platformNames = new ArrayList<>();
for (Platform p : platforms) {
platformNames.add(p.getPlatformName());
}
return platformNames;
}
}
package com.zhiwei.messageflow.mongo.dao.impl;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.stereotype.Component;
import com.zhiwei.messageflow.mongo.bean.Project;
import com.zhiwei.messageflow.mongo.dao.ProjectDao;
@Component
public class ProjectDaoImpl implements ProjectDao {
@Autowired
@Qualifier(value = "primaryMongoTemplate")
protected MongoTemplate primaryMongoTemplate;
@Override
public List<Project> getAllProjects() {
return primaryMongoTemplate.findAll(Project.class);
}
}
package com.zhiwei.messageflow.mongo.dao.impl;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.stereotype.Component;
import com.zhiwei.messageflow.mongo.bean.TrackRule;
import com.zhiwei.messageflow.mongo.dao.TrackRuleDao;
@Component
public class TrackRuleDaoImpl implements TrackRuleDao {
@Autowired
@Qualifier(value = "primaryMongoTemplate")
protected MongoTemplate primaryMongoTemplate;
@Override
public List<TrackRule> getTrackRuleByProject(String projectName) {
Query query = new Query();
query.addCriteria(Criteria.where("project").is(projectName));
primaryMongoTemplate.getCollectionName(TrackRule.class);
List<TrackRule> trackRules = primaryMongoTemplate.find(query, TrackRule.class);
return trackRules;
}
@Override
public void updateTrackrule(Long trackRule_id) {
System.out.println("预警状态更新");
// primaryMongoTemplate.updateFirst(Query.query(Criteria.where("_id").is(trackRule_id)),
// new Update().set("isWarn", true), TrackRule.class);
}
}
package com.zhiwei.messageflow.mongo.dao.impl;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.stereotype.Component;
import com.zhiwei.messageflow.mongo.bean.UserMail;
import com.zhiwei.messageflow.mongo.dao.UserMailDao;
@Component
public class UserMailDaoImal implements UserMailDao {
@Autowired
@Qualifier(value = "primaryMongoTemplate")
protected MongoTemplate primaryMongoTemplate;
@Override
public List<UserMail> findUserMailByProject(String projectName) {
Query query = new Query();
query.addCriteria(Criteria.where("project").is(projectName));
return primaryMongoTemplate.find(query, UserMail.class);
}
}
package com.zhiwei.messageflow.mongo.dao.impl;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.stereotype.Component;
import com.zhiwei.messageflow.mongo.bean.UserWechat;
import com.zhiwei.messageflow.mongo.dao.UserWechatDao;
@Component
public class UserWechatDaoImpl implements UserWechatDao {
@Autowired
@Qualifier(value = "primaryMongoTemplate")
protected MongoTemplate primaryMongoTemplate;
@Override
public List<UserWechat> getUserWechatByProject(String projectName) {
Query query=new Query();
query.addCriteria(Criteria.where("project").is(projectName));
return primaryMongoTemplate.find(query, UserWechat.class);
}
}
package com.zhiwei.messageflow.mongo.dao.impl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.stereotype.Component;
import com.zhiwei.messageflow.mongo.bean.WechatCode;
import com.zhiwei.messageflow.mongo.dao.WechatCodeDao;
@Component
public class WechatCodeDaoImpl implements WechatCodeDao {
@Autowired
@Qualifier(value = "thirdaryMongoTemplate")
protected MongoTemplate thirdaryMongoTemplate;
@Override
public WechatCode getWechatCodeByAppId(String AppId) {
return thirdaryMongoTemplate.findById(AppId, WechatCode.class);
}
}
package com.zhiwei.messageflow.mongo.service;
import java.util.Map;
import com.zhiwei.messageflow.es.bean.TemplateData;
public interface UserMailService {
Map<String,Object> sendMailByProject(String project, TemplateData templateData);
}
package com.zhiwei.messageflow.mongo.service;
import com.zhiwei.messageflow.es.bean.TemplateData;
import com.zhiwei.messageflow.mongo.bean.WechatCode;
import net.sf.json.JSONObject;
public interface WechatSendService {
int sendTemplateByProject(String projectName, TemplateData templateData);
int sendDataJson(WechatCode wechatCode,JSONObject dataJson);
}
package com.zhiwei.messageflow.mongo.service.impl;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.zhiwei.messageflow.es.bean.TemplateData;
import com.zhiwei.messageflow.mail.SendMail;
import com.zhiwei.messageflow.mongo.bean.UserMail;
import com.zhiwei.messageflow.mongo.dao.UserMailDao;
import com.zhiwei.messageflow.mongo.service.UserMailService;
@Component
public class UserMailServiceImpl implements UserMailService {
@SuppressWarnings("unused")
@Autowired
private UserMailDao userMailDao;
@Override
public Map<String, Object> sendMailByProject(String project, TemplateData templateData) {
Map<String, Object> result = new HashMap<>();
// List<UserMail> mails = userMailDao.findUserMailByProject(project);
List<UserMail> mails = new ArrayList<>();
UserMail mail = new UserMail();
mail.setAddress("1454551152@qq.com");
mail.setProject("腾讯");
mails.add(mail);
List<String> failMails = new ArrayList<>();
int success = 0;
for (UserMail userMail : mails) {
if (SendMail.sendMailWarning(userMail.getAddress(), templateData)) {
success++;
} else {
failMails.add(userMail.getAddress());
}
}
if (mails.size() == success) {
result.put("status", true);
result.put("message", "用户组邮件发送成功");
} else {
result.put("status", false);
String failStr = "";
for (String string : failMails) {
failStr += string + ";";
}
result.put("message", "用户组邮箱发送成功【" + success + "】个,发送失败邮箱:【" + failStr + "】");
}
return result;
}
}
package com.zhiwei.messageflow.mongo.service.impl;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.zhiwei.messageflow.es.bean.Template;
import com.zhiwei.messageflow.es.bean.TemplateData;
import com.zhiwei.messageflow.es.bean.WechatConstant;
import com.zhiwei.messageflow.mongo.bean.UserWechat;
import com.zhiwei.messageflow.mongo.bean.WechatCode;
import com.zhiwei.messageflow.mongo.dao.UserWechatDao;
import com.zhiwei.messageflow.mongo.dao.WechatCodeDao;
import com.zhiwei.messageflow.mongo.service.WechatSendService;
import com.zhiwei.messageflow.util.HttpRequest;
import com.zhiwei.messageflow.util.TimeUtil;
import net.sf.json.JSONObject;
@Component
public class WechatSendServiceImpl implements WechatSendService {
private final static Logger log = LoggerFactory.getLogger(WechatSendServiceImpl.class);
@Autowired
private WechatCodeDao wechatCodeDao;
@SuppressWarnings("unused")
@Autowired
private UserWechatDao userWechatDao;
@SuppressWarnings("unused")
@Override
public int sendTemplateByProject(String projectName, TemplateData templateData) {
WechatCode wechatCode=wechatCodeDao.getWechatCodeByAppId(WechatConstant.WECHAT_APPID);
// List<UserWechat> userWechats=userWechatDao.getUserWechatByProject(projectName);
List<UserWechat> userWechats=new ArrayList<>();
UserWechat userWechat1=new UserWechat();
userWechat1.setOpenid("o_J5m0b9Un5rRfUXZS5S3kKjkzeo");
userWechats.add(userWechat1);
if (null == userWechats) {
log.error("未获取到UserWechat,查询依据【{}】", projectName);
return -1;
}
int size = userWechats.size();
int k = 0;
if (wechatCode != null) {
Map<String, Object> dataMap = new HashMap<>();
JSONObject first = new JSONObject();
first.put("value", "知微情报监测服务:有一条来自" + templateData.getPt() + "平台的预警通知。");
dataMap.put("first", first);
JSONObject keyword1 = new JSONObject();
keyword1.put("value", templateData.getTitle());
keyword1.put("color", "#173177");
dataMap.put("keyword1", keyword1);
JSONObject keyword2 = new JSONObject();
keyword2.put("value", templateData.getSource());
keyword2.put("color", "#173177");
dataMap.put("keyword2", keyword2);
JSONObject keyword3 = new JSONObject();
keyword3.put("value", TimeUtil.formatTime(templateData.getTime()));
keyword3.put("color", "#173177");
dataMap.put("keyword3", keyword3);
JSONObject keyword4 = new JSONObject();
keyword4.put("value", templateData.getContent());
keyword4.put("color", "#173177");
dataMap.put("keyword4", keyword4);
JSONObject remark = new JSONObject();
if (null != templateData.getWordRule()) {
remark.put("value", "关键词规则:" + templateData.getWordRule());
} else if (null != templateData.getChannelRule()) {
remark.put("value", "渠道规则:" + templateData.getChannelRule());
} else {
remark.put("value", "知微情报监测服务");
}
dataMap.put("remark", remark);
for (UserWechat userWechat : userWechats) {
Template template = new Template();
template.setTouser(userWechat.getOpenid());
template.setUrl(templateData.getUrl());
template.setTemplate_id(WechatConstant.WECHAT_TEMPLATEID_EARLY_IT);
template.setData(dataMap);
JSONObject templateJson = JSONObject.fromObject(template);
k = sendDataJson(wechatCode, templateJson);
if (0 == k) {
k = -1;
log.error("发送模版消息【{}】时出现错误.....", template);
} else {
size--;
}
}
} else {
log.error("未获取到appId【{}】的wechatCode.....", WechatConstant.WECHAT_APPID);
}
return size;
}
@Override
public int sendDataJson(WechatCode wechatCode, JSONObject dataJson) {
int msgid = 0;
String url = WechatConstant.WECHAT_TEMPLET_SEND_URL.replace("ACCESS_TOKEN", wechatCode.getAccessToken());
try {
JSONObject jsonObject = HttpRequest.httpRequest(url, "POST", dataJson.toString());
if (null != jsonObject) {
if ("ok".equals(jsonObject.getString("errmsg"))) {
msgid = jsonObject.getInt("msgid");
}
}
} catch (Exception e) {
e.printStackTrace();
log.error("发送创建模版消息请求时出现错误", e.getMessage());
msgid = 0;
}
return msgid;
}
}
package com.zhiwei.messageflow.redis;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import com.zhiwei.messageflow.config.RedisConfig;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.Tuple;
import redis.clients.jedis.exceptions.JedisConnectionException;
@Configuration
public class RedisPoolAndTools {
public final static Logger log = LoggerFactory.getLogger(RedisPoolAndTools.class);
@Autowired
private RedisConfig redisConfig;
private JedisPool pool;
/**
* jedis连接池创建
*/
private void initializePool() {
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(redisConfig.getMaxTotal());
config.setMaxIdle(redisConfig.getMaxIdle());
config.setMaxWaitMillis(redisConfig.getMaxWaitMillis());
config.setTestOnBorrow(redisConfig.isTestOnBorrow());
config.setTestOnReturn(redisConfig.isTestOnReturn());
pool = new JedisPool(config, redisConfig.getIp(), redisConfig.getPort(), 0, null, redisConfig.getSelectDB());
}
/**
* 获取jedis
*
* @return jedis连接
*/
private Jedis getJedis() {
// 连接请求次数
int timeoutCount = 0;
if (null == pool) {
initializePool();
}
while (true) {
try {
if (null != pool) {
return pool.getResource();
}
} catch (Exception e) {
if (e instanceof JedisConnectionException) {
timeoutCount++;
log.warn("jedis连接请求次数{}", timeoutCount);
if (timeoutCount > 3) {
break;
}
} else {
log.warn("jedisInfo ... NumActive=" + pool.getNumActive() + ", NumIdle=" + pool.getNumIdle()
+ ", NumWaiters=" + pool.getNumWaiters() + ", isClosed=" + pool.isClosed());
log.error("获取jedis连接错误:{}", e.getMessage());
break;
}
}
break;
}//请求jedis完成
return null;
}
/**
* 关闭jedis连接
*
* @param jedis
*/
public static void returnResource(Jedis jedis) {
if (null != jedis) {
jedis.close();
}
}
/**
* 获取RSID
*
* @param rsidkey
* 键值为platform-project-keywords
* @return
*/
public String getRSID(String rsidkey) {
Jedis jedis = getJedis();
while (true) {
if (null != jedis) {
break;
} else {
jedis = getJedis();
}
}
String rsid = jedis.get(rsidkey);
returnResource(jedis);
return rsid;
}
/**
* 写入RSID,有效时间为12小时
*
* @param rsidkey
* @param rsid
*/
public void setRSID(String rsidkey, String rsid) {
Jedis jedis = getJedis();
while (true) {
if (null != jedis) {
break;
} else {
jedis = getJedis();
}
}
jedis.setex(rsidkey, 60 * 60 * 12, rsid);
returnResource(jedis);
}
/**
* 消息流数据写入
*
* @param redisKey
* @param rsid
* @param message
*/
public void sortedSetZadd(String redisKey, double rsid, String message) {
Jedis jedis = getJedis();
while (true) {
if (null != jedis) {
break;
} else {
jedis = getJedis();
}
}
jedis.zadd(redisKey, rsid, message);
returnResource(jedis);
}
/**
* 删除超出上限的数据
*
* @param key
* @param removeIndex
*/
public void removeDataByName(byte[] key, int removeIndex) {
Jedis jedis = getJedis();
while (true) {
if (null != jedis) {
break;
} else {
jedis = getJedis();
}
}
jedis.zremrangeByRank(key, 0, removeIndex);
returnResource(jedis);
}
/**
* 获取有序集合消息数量
*
* @param key
* @return
*/
public Long getNowCount(byte[] key) {
Jedis jedis = getJedis();
while (true) {
if (null != jedis) {
break;
} else {
jedis = getJedis();
}
}
Long nowCount = jedis.zcard(key);
returnResource(jedis);
return nowCount;
}
/**
* 分页获取redis数据
*
* @param key
* @param start
* @param end
* @return
*/
public Set<Tuple> getRevDataListByIndex(String key, long start, long end) {
Set<Tuple> set = null;
Jedis jedis = getJedis();
while (true) {
if (null != jedis) {
break;
} else {
jedis = getJedis();
}
}
set = jedis.zrevrangeWithScores(key.getBytes(), start, end);
returnResource(jedis);
return set;
}
}
\ No newline at end of file
package com.zhiwei.messageflow.redis.bean;
import java.util.List;
import com.zhiwei.messageflow.bean.MediaMessage;
import com.zhiwei.messageflow.bean.VideoMessage;
import com.zhiwei.messageflow.bean.WeiboMessage;
import com.zhiwei.messageflow.bean.ZhihuMessage;
import lombok.Data;
import lombok.ToString;
@Data
@ToString
public class RsidAndMessages {
/**
* 微博消息列表
*/
private List<WeiboMessage> wlist;
/**
* 知乎消息列表
*/
private List<ZhihuMessage> zlist;
/**
* 视频消息列表
*/
private List<VideoMessage> vlist;
/**
* 网媒消息列表
*/
private List<MediaMessage> mlist;
/**
* 本次读取最后信息的rsid
*/
private long rsid;
}
package com.zhiwei.messageflow.redis.service;
import java.util.List;
import java.util.Map;
import com.zhiwei.messageflow.bean.MediaMessage;
import com.zhiwei.messageflow.bean.VideoMessage;
import com.zhiwei.messageflow.bean.WeiboMessage;
import com.zhiwei.messageflow.bean.ZhihuMessage;
public interface RedisService {
/**
* 获取项目Rsid列表
*
* @param project
* 项目名
* @return
*/
String getRsid(String project);
/**
* 更新项目Rsid列表
*
* @param newRsidMap
* 存入redis的数据
* @param project
* 项目名
*/
void setRsid(Map<String, Integer> newRsidMap, String project);
/**
* 向redis写入微博数据
*
* @param redisKey
* 数据键
* @param messages
* 数据列表
* @param maxSize
* redis数据存储上限
*/
void setWeiboMessageMessage(String redisKey, List<WeiboMessage> messages, int maxSize);
/**
* 向redis写入知乎数据
*
* @param redisKey
* 数据键
* @param messages
* 数据列表
* @param maxSize
* redis数据存储上限
*/
void setZhihuMessageMessage(String redisKey, List<ZhihuMessage> messages, int maxSize);
/**
* 向redis写入视频数据
*
* @param redisKey
* 数据键
* @param messages
* 数据列表
* @param maxSize
* redis数据存储上限
*/
void setVideoMessageMessage(String redisKey, List<VideoMessage> messages, int maxSize);
/**
* 向redis写入网媒数据
*
* @param redisKey
* 数据键
* @param messages
* 数据列表
* @param maxSize
* redis数据存储上限
*/
void setMediaMessageMessage(String redisKey, List<MediaMessage> messages, int maxSize);
}
package com.zhiwei.messageflow.redis.service.impl;
import java.util.List;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.zhiwei.messageflow.bean.MediaMessage;
import com.zhiwei.messageflow.bean.VideoMessage;
import com.zhiwei.messageflow.bean.WeiboMessage;
import com.zhiwei.messageflow.bean.ZhihuMessage;
import com.zhiwei.messageflow.redis.RedisPoolAndTools;
import com.zhiwei.messageflow.redis.service.RedisService;
@Component
public class RedisServiceImpl implements RedisService {
@Autowired
private RedisPoolAndTools redisPoolAndTools;
static ObjectMapper mapper=new ObjectMapper();
static {
mapper.setSerializationInclusion(Include.NON_EMPTY);
}
@Override
public String getRsid(String project) {
return redisPoolAndTools.getRSID(project);
}
@Override
public void setRsid(Map<String, Integer> rsidmap, String project) {
try {
redisPoolAndTools.setRSID(project, mapper.writeValueAsString(rsidmap));
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
@Override
public void setWeiboMessageMessage(String redisKey, List<WeiboMessage> messages, int maxSize) {
for (WeiboMessage wm : messages) {
try {
// 写入数据
redisPoolAndTools.sortedSetZadd(redisKey, (double) wm.getRstime(), mapper.writeValueAsString(wm));
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
/**
* 删除超出存储上限的数据
*/
long nowCount = redisPoolAndTools.getNowCount(redisKey.getBytes());// 当前数据量
int removeIndex = (int) (nowCount - maxSize);// 需移除数据数量
if (maxSize > 0 && nowCount > maxSize) {
redisPoolAndTools.removeDataByName(redisKey.getBytes(), removeIndex);
}
}
@Override
public void setZhihuMessageMessage(String redisKey, List<ZhihuMessage> messages, int maxSize) {
for (ZhihuMessage zm : messages) {
try {
// 写入数据
redisPoolAndTools.sortedSetZadd(redisKey, (double) zm.getRsid(), mapper.writeValueAsString(zm));
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
/**
* 删除超出存储上限的数据
*/
long nowCount = redisPoolAndTools.getNowCount(redisKey.getBytes());// 当前数据量
int removeIndex = (int) (nowCount - maxSize);// 需移除数据数量
if (maxSize > 0 && nowCount > maxSize) {
redisPoolAndTools.removeDataByName(redisKey.getBytes(), removeIndex);
}
}
@Override
public void setVideoMessageMessage(String redisKey, List<VideoMessage> messages, int maxSize) {
for (VideoMessage vm : messages) {
try {
// 写入数据
redisPoolAndTools.sortedSetZadd(redisKey, (double) vm.getRsid(), mapper.writeValueAsString(vm));
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
/**
* 删除超出存储上限的数据
*/
long nowCount = redisPoolAndTools.getNowCount(redisKey.getBytes());// 当前数据量
int removeIndex = (int) (nowCount - maxSize);// 需移除数据数量
if (maxSize > 0 && nowCount > maxSize) {
redisPoolAndTools.removeDataByName(redisKey.getBytes(), removeIndex);
}
}
@Override
public void setMediaMessageMessage(String redisKey, List<MediaMessage> messages, int maxSize) {
for (MediaMessage mm : messages) {
try {
// 写入数据
redisPoolAndTools.sortedSetZadd(redisKey, (double) mm.getRsid(), mapper.writeValueAsString(mm));
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
/**
* 删除超出存储上限的数据
*/
long nowCount = redisPoolAndTools.getNowCount(redisKey.getBytes());// 当前数据量
int removeIndex = (int) (nowCount - maxSize);// 需移除数据数量
if (maxSize > 0 && nowCount > maxSize) {
redisPoolAndTools.removeDataByName(redisKey.getBytes(), removeIndex);
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment