Commit dd78df5d by shentao

Merge branch 'feature' into 'dev'

2024/08/14 ai搜索 test

See merge request !569
parents 4826698e 8c3c38a8
...@@ -269,7 +269,7 @@ ...@@ -269,7 +269,7 @@
<dependency> <dependency>
<groupId>com.squareup.okhttp3</groupId> <groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId> <artifactId>okhttp</artifactId>
<version>3.8.0</version> <version>3.12.0</version>
</dependency> </dependency>
<!-- dubbo --> <!-- dubbo -->
<dependency> <dependency>
...@@ -324,6 +324,12 @@ ...@@ -324,6 +324,12 @@
<artifactId>ansj_seg</artifactId> <artifactId>ansj_seg</artifactId>
<version>5.0.2</version> <version>5.0.2</version>
</dependency> </dependency>
<!--火山引擎 豆包大模型-->
<dependency>
<groupId>com.volcengine</groupId>
<artifactId>volcengine-java-sdk-ark-runtime</artifactId>
<version>0.1.121</version>
</dependency>
</dependencies> </dependencies>
<build> <build>
<plugins> <plugins>
......
package com.zhiwei.brandkbs2.common;
import com.volcengine.ark.runtime.service.ArkService;
import com.zhiwei.brandkbs2.pojo.ai.AccessModel;
import lombok.AllArgsConstructor;
import lombok.Data;
import java.util.ArrayList;
import java.util.List;
/**
* @ClassName: DoubaoAIAccountFactor
* @Description DoubaoAIAccountFactor
* @author: sjj
* @date: 2024-07-24 14:01
*/
public class DoubaoAIAccountFactor {
public static Account getCompanyAccount() {
String apiKey = "607764fc-c9d9-47e4-a673-a310852917a0";
List<AccessModel> modelList = new ArrayList<>();
modelList.add(new AccessModel("ep-20240617061616-8d2ls", AccessModel.Model.DOUBAO_PRO_4K));
modelList.add(new AccessModel("ep-20240618021538-t6dpf", AccessModel.Model.DOUBAO_PRO_32K));
return new Account(apiKey, modelList);
}
@Data
@AllArgsConstructor
public static class Account {
String apiKey;
List<AccessModel> modelList;
}
public static ArkService arkService;
static {
arkService = new ArkService(getCompanyAccount().getApiKey());
}
}
...@@ -2,6 +2,7 @@ package com.zhiwei.brandkbs2.common; ...@@ -2,6 +2,7 @@ package com.zhiwei.brandkbs2.common;
import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.zhiwei.brandkbs2.dao.ChannelRecordRefreshTaskDao;
import com.zhiwei.brandkbs2.pojo.*; import com.zhiwei.brandkbs2.pojo.*;
import com.zhiwei.brandkbs2.service.ChannelService; import com.zhiwei.brandkbs2.service.ChannelService;
import com.zhiwei.brandkbs2.service.SystemInfoService; import com.zhiwei.brandkbs2.service.SystemInfoService;
...@@ -10,11 +11,13 @@ import com.zhiwei.middleware.automaticmark.graphs.Graphs; ...@@ -10,11 +11,13 @@ import com.zhiwei.middleware.automaticmark.graphs.Graphs;
import com.zhiwei.qbjc.bean.pojo.common.MessagePlatform; import com.zhiwei.qbjc.bean.pojo.common.MessagePlatform;
import com.zhiwei.qbjc.bean.pojo.common.Tag; import com.zhiwei.qbjc.bean.pojo.common.Tag;
import com.zhiwei.brandkbs2.dao.HighlightWordDao; import com.zhiwei.brandkbs2.dao.HighlightWordDao;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query; import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
...@@ -45,6 +48,9 @@ public class GlobalPojo { ...@@ -45,6 +48,9 @@ public class GlobalPojo {
@Resource(name = "highlightWordDao") @Resource(name = "highlightWordDao")
private HighlightWordDao highlightWordDao; private HighlightWordDao highlightWordDao;
@Resource(name = "channelRecordRefreshTaskDao")
private ChannelRecordRefreshTaskDao channelRecordRefreshTaskDao;
@Resource(name = "channelServiceImpl") @Resource(name = "channelServiceImpl")
private ChannelService channelService; private ChannelService channelService;
...@@ -143,6 +149,7 @@ public class GlobalPojo { ...@@ -143,6 +149,7 @@ public class GlobalPojo {
COMMON_SENSITIVE_CHANNEL = systemInfoService.getCommonSensitiveChannel(); COMMON_SENSITIVE_CHANNEL = systemInfoService.getCommonSensitiveChannel();
BYTEDANCE_CHANNEL_INFLUENCE = systemInfoService.getByteDanceChannelInfluence(); BYTEDANCE_CHANNEL_INFLUENCE = systemInfoService.getByteDanceChannelInfluence();
updateHighlightGraphs(); updateHighlightGraphs();
updateIncompleteChannelRecordTask();
log.info("{}-获取PLATFORMS-size:{},TAGS-size:{},LINKED_GROUP_ID_TAGS:{},CHANNEL_TAGS:{},MEDIA_TYPE:{},PROJECT_MAP:{},YUQING-PROJECTS-size:{},PROJECT_EMOTION_CHANNEL_DATA-size:{},PROJECT_SENSITIVE_CHANNEL-size:{}, COMMON_SENSITIVE_CHANNEL-size:{},BYTEDANCE_CHANNEL_INFLUENCE-size:{}", logMsg, PLATFORMS.size(), TAGS.size(), log.info("{}-获取PLATFORMS-size:{},TAGS-size:{},LINKED_GROUP_ID_TAGS:{},CHANNEL_TAGS:{},MEDIA_TYPE:{},PROJECT_MAP:{},YUQING-PROJECTS-size:{},PROJECT_EMOTION_CHANNEL_DATA-size:{},PROJECT_SENSITIVE_CHANNEL-size:{}, COMMON_SENSITIVE_CHANNEL-size:{},BYTEDANCE_CHANNEL_INFLUENCE-size:{}", logMsg, PLATFORMS.size(), TAGS.size(),
LINKED_GROUP_ID_TAGS.size(), CHANNEL_TAGS.size(), MEDIA_TYPE.size(), PROJECT_MAP.size(), YU_QING_PROJECTS.size(), PROJECT_EMOTION_CHANNEL_DATA.size(), PROJECT_SENSITIVE_CHANNEL.size(), COMMON_SENSITIVE_CHANNEL.size(), BYTEDANCE_CHANNEL_INFLUENCE.size()); LINKED_GROUP_ID_TAGS.size(), CHANNEL_TAGS.size(), MEDIA_TYPE.size(), PROJECT_MAP.size(), YU_QING_PROJECTS.size(), PROJECT_EMOTION_CHANNEL_DATA.size(), PROJECT_SENSITIVE_CHANNEL.size(), COMMON_SENSITIVE_CHANNEL.size(), BYTEDANCE_CHANNEL_INFLUENCE.size());
} catch (Exception e) { } catch (Exception e) {
...@@ -150,6 +157,19 @@ public class GlobalPojo { ...@@ -150,6 +157,19 @@ public class GlobalPojo {
} }
} }
private void updateIncompleteChannelRecordTask(){
List<ChannelRecordRefreshTask> tasks = channelRecordRefreshTaskDao.findList(new Query(Criteria.where("status").is(ChannelRecordRefreshTask.TaskStatus.UPDATING.getStatus())));
if (CollectionUtils.isEmpty(tasks)){
return;
}
for (ChannelRecordRefreshTask task : tasks) {
Update update = new Update();
update.set("status", ChannelRecordRefreshTask.TaskStatus.NOT_START.getStatus());
update.set("uTime", System.currentTimeMillis());
channelRecordRefreshTaskDao.updateOneByIdWithField(task.getId(), update);
}
}
private void updateHighlightGraphs() { private void updateHighlightGraphs() {
PROJECT_MAP.forEach((key, project) -> { PROJECT_MAP.forEach((key, project) -> {
String id = project.getId(); String id = project.getId();
......
...@@ -121,8 +121,13 @@ public class RedisKeyPrefix { ...@@ -121,8 +121,13 @@ public class RedisKeyPrefix {
public static final String CUSTOM_YUQING_ANALYZE_HIGH_WORD = "BRANDKBS:CUSTOM:YUQING:ANALYZE:HIGH:WORD:"; public static final String CUSTOM_YUQING_ANALYZE_HIGH_WORD = "BRANDKBS:CUSTOM:YUQING:ANALYZE:HIGH:WORD:";
/**
* 搜索相关缓存
*/
public static final String SEARCH_KEYWORD = "BRANDKBS:SEARCH:KEYWORD:"; public static final String SEARCH_KEYWORD = "BRANDKBS:SEARCH:KEYWORD:";
public static final String AI_SEARCH_QUESTION = "BRANDKBS:AI:SEARCH:QUESTION:";
public static String projectWarnHotTopKeyAll(String projectId, String type) { public static String projectWarnHotTopKeyAll(String projectId, String type) {
return RedisKeyPrefix.generateRedisKey(RedisKeyPrefix.PROJECT_WARN_HOT_TOP, projectId, Tools.concat(type, "*")); return RedisKeyPrefix.generateRedisKey(RedisKeyPrefix.PROJECT_WARN_HOT_TOP, projectId, Tools.concat(type, "*"));
} }
......
...@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONObject; ...@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONObject;
import com.zhiwei.brandkbs2.model.ResponseResult; import com.zhiwei.brandkbs2.model.ResponseResult;
import com.zhiwei.brandkbs2.pojo.external.*; import com.zhiwei.brandkbs2.pojo.external.*;
import com.zhiwei.brandkbs2.pojo.vo.CrisisCaseWarnVO; import com.zhiwei.brandkbs2.pojo.vo.CrisisCaseWarnVO;
import com.zhiwei.brandkbs2.service.ChannelService;
import com.zhiwei.brandkbs2.service.ProjectService; import com.zhiwei.brandkbs2.service.ProjectService;
import com.zhiwei.brandkbs2.service.ProjectWarnService; import com.zhiwei.brandkbs2.service.ProjectWarnService;
import com.zhiwei.brandkbs2.util.TextUtil; import com.zhiwei.brandkbs2.util.TextUtil;
...@@ -32,6 +33,9 @@ public class InterfaceController { ...@@ -32,6 +33,9 @@ public class InterfaceController {
@Resource(name = "projectWarnServiceImpl") @Resource(name = "projectWarnServiceImpl")
private ProjectWarnService projectWarnService; private ProjectWarnService projectWarnService;
@Resource(name = "channelServiceImpl")
private ChannelService channelService;
@Autowired @Autowired
TextUtil textUtil; TextUtil textUtil;
...@@ -125,4 +129,16 @@ public class InterfaceController { ...@@ -125,4 +129,16 @@ public class InterfaceController {
public ResponseResult getUserProjectList(String userId) { public ResponseResult getUserProjectList(String userId) {
return projectService.getUserProject(userId); return projectService.getUserProject(userId);
} }
@ApiOperation("数据回传标注库品见数据更新-生成刷新任务")
@PostMapping("/channel/refresh-task")
public ResponseResult createChannelRecordRefreshTask(@RequestBody JSONObject json) {
long startTime = json.getLongValue("startTime");
long endTime = json.getLongValue("endTime");
List<JSONObject> brandkbsInfos = json.getJSONArray("brandkbsInfo").toJavaList(JSONObject.class);
String mgroupId = json.getString("mgroupId");
String submitter = json.getString("submitter");
channelService.createChannelRecordRefreshTask(startTime, endTime, brandkbsInfos, mgroupId, submitter);
return ResponseResult.success();
}
} }
package com.zhiwei.brandkbs2.controller.admin; package com.zhiwei.brandkbs2.controller.admin;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.brandkbs2.auth.Auth; import com.zhiwei.brandkbs2.auth.Auth;
import com.zhiwei.brandkbs2.controller.BaseController; import com.zhiwei.brandkbs2.controller.BaseController;
import com.zhiwei.brandkbs2.enmus.RoleEnum; import com.zhiwei.brandkbs2.enmus.RoleEnum;
...@@ -7,10 +8,7 @@ import com.zhiwei.brandkbs2.model.ResponseResult; ...@@ -7,10 +8,7 @@ import com.zhiwei.brandkbs2.model.ResponseResult;
import com.zhiwei.brandkbs2.pojo.dto.UserDTO; import com.zhiwei.brandkbs2.pojo.dto.UserDTO;
import com.zhiwei.brandkbs2.service.UserService; import com.zhiwei.brandkbs2.service.UserService;
import com.zhiwei.middleware.auth.pojo.CenterUser; import com.zhiwei.middleware.auth.pojo.CenterUser;
import io.swagger.annotations.Api; import io.swagger.annotations.*;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
...@@ -124,4 +122,46 @@ public class UserController extends BaseController { ...@@ -124,4 +122,46 @@ public class UserController extends BaseController {
return ResponseResult.success(); return ResponseResult.success();
} }
@ApiOperation("分页查询所有用户列表")
@GetMapping("/all-user/list")
@Auth(role = RoleEnum.SUPER_ADMIN)
public ResponseResult findAllUserList(@RequestParam(value = "page", defaultValue = "1") int page,
@RequestParam(value = "size", defaultValue = "10") int size,
@RequestParam(value = "keyword", defaultValue = "") String keyword) {
return ResponseResult.success(userService.findAllUserList(keyword, page, size));
}
@ApiOperation("根据用户名或手机号查询用户项目权限")
@GetMapping("/all-user/find")
@Auth(role = RoleEnum.SUPER_ADMIN)
public ResponseResult getUserByNickNameOrPhone(@RequestParam(value = "nickname", required = false) String nickname,
@RequestParam(value = "phoneNumber", required = false) String phoneNumber) {
return userService.getUserByNickNameOrPhone(nickname, phoneNumber);
}
@ApiOperation("编辑单条项目权限")
@PostMapping("/all-user/single-update")
@Auth(role = RoleEnum.SUPER_ADMIN)
public ResponseResult updateOneUserRoles(@ApiParam(name = "json",
value = "json:{id:用户id,nickname:用户名,roleId:权限,expiredTime:过期时间,exportAmount:舆情导出数量,projectId:权限项目id,key:唯一key原封不动传回来即可}",
required = true) @RequestBody JSONObject json) {
return userService.updateOneUserRoles(json);
}
@ApiOperation("批量编辑项目权限")
@PostMapping("/all-user/batch-update")
@Auth(role = RoleEnum.SUPER_ADMIN)
public ResponseResult updateBatchUserRoles(@ApiParam(name = "json", value = "json:{id:用户id,roles:[{projectId:权限项目id,roleId:权限,exportAmount:舆情导出数量,expiredTime:过期时间},{},...]}",
required = true)
@RequestBody JSONObject json) {
return userService.updateBatchUserRoles(json);
}
@ApiOperation("删除用户权限")
@DeleteMapping("/all-user/delete")
@Auth(role = RoleEnum.SUPER_ADMIN)
public ResponseResult deleteUserRole(@RequestParam(value = "id") String id,
@RequestParam(value = "key") String key) {
return userService.deleteUserRole(id, key);
}
} }
\ No newline at end of file
...@@ -352,6 +352,25 @@ public class AppSearchController extends BaseController { ...@@ -352,6 +352,25 @@ public class AppSearchController extends BaseController {
return ResponseResult.success(markDataService.getContendSearchCriteria(contendId)); return ResponseResult.success(markDataService.getContendSearchCriteria(contendId));
} }
@ApiOperation("搜索-AI搜索")
@GetMapping("/ai/answer")
public ResponseResult getAISearchResult(@RequestParam(value = "question") String question) {
return ResponseResult.success(markDataService.getAISearchResult(question));
}
@ApiOperation("搜索-AI推荐提问")
@GetMapping("/ai/question")
public ResponseResult getAIReferenceQuestion(@RequestParam(value = "question") String question,
@RequestParam(value = "size") int size) {
return ResponseResult.success(markDataService.getAIReferenceQuestion(question, size));
}
@ApiOperation("搜索-AI参考提问")
@GetMapping("/ai/question-cache")
public ResponseResult getAIReferenceQuestion() {
return ResponseResult.success(markDataService.getAIReferenceQuestionCache(true));
}
@ApiOperation("搜索-搜索关键词历史记录") @ApiOperation("搜索-搜索关键词历史记录")
@GetMapping("/keyword/cache") @GetMapping("/keyword/cache")
public ResponseResult getSearchKeywordCache(@ApiParam(name = "searchType", public ResponseResult getSearchKeywordCache(@ApiParam(name = "searchType",
......
package com.zhiwei.brandkbs2.dao;
import com.zhiwei.brandkbs2.pojo.AISearchQuestionRecord;
import java.util.List;
/**
* @ClassName: AISearchQuestionRecordDao
* @Description AISearchQuestionRecordDao
* @author: cjz
* @date: 2024-08-12 17:07
*/
public interface AISearchQuestionRecordDao extends BaseMongoDao<AISearchQuestionRecord>{
List<String> findDistinctQuestion(String projectId);
}
package com.zhiwei.brandkbs2.dao;
import com.zhiwei.brandkbs2.pojo.ChannelRecordRefreshTask;
import org.springframework.data.mongodb.core.query.Query;
/**
* @ClassName: ChannelRecordRefreshTaskDao
* @Description 渠道记录更新任务dao
* @author: cjz
* @date: 2024-07-22 11:37
*/
public interface ChannelRecordRefreshTaskDao extends BaseMongoDao<ChannelRecordRefreshTask>{
ChannelRecordRefreshTask findOne(Query query);
}
package com.zhiwei.brandkbs2.dao;
import com.zhiwei.brandkbs2.pojo.CustomInteractionUpdateRecord;
/**
* @ClassName: MonthlyInteractionUpdateRecordDao
* @Description MonthlyInteractionUpdateRecordDao
* @author: cjz
* @date: 2024-07-09 10:30
*/
public interface CustomInteractionUpdateRecordDao extends BaseMongoDao<CustomInteractionUpdateRecord> {
CustomInteractionUpdateRecord findLastRecord(String projectId);
}
package com.zhiwei.brandkbs2.dao; package com.zhiwei.brandkbs2.dao;
import com.zhiwei.brandkbs2.pojo.User; import com.zhiwei.brandkbs2.pojo.User;
import org.springframework.data.mongodb.core.query.Query;
/** /**
* @ClassName: UserDao * @ClassName: UserDao
...@@ -9,5 +10,5 @@ import com.zhiwei.brandkbs2.pojo.User; ...@@ -9,5 +10,5 @@ import com.zhiwei.brandkbs2.pojo.User;
* @date: 2022-04-28 18:10 * @date: 2022-04-28 18:10
*/ */
public interface UserDao extends BaseMongoDao<User>{ public interface UserDao extends BaseMongoDao<User>{
User findOne(Query query);
} }
package com.zhiwei.brandkbs2.dao.impl;
import com.zhiwei.brandkbs2.dao.AISearchQuestionRecordDao;
import com.zhiwei.brandkbs2.pojo.AISearchQuestionRecord;
import org.springframework.data.domain.Sort;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @ClassName: AISearchQuestionRecordDao
* @Description AISearchQuestionRecordDao
* @author: cjz
* @date: 2024-08-12 17:07
*/
@Component("aiSearchQuestionRecordDao")
public class AISearchQuestionRecordDaoImpl extends BaseMongoDaoImpl<AISearchQuestionRecord> implements AISearchQuestionRecordDao {
private static final String COLLECTION_NAME = "brandkbs_ai_search_question_record";
public AISearchQuestionRecordDaoImpl() {
super(COLLECTION_NAME);
}
@Override
public List<String> findDistinctQuestion(String projectId) {
Query query = new Query().addCriteria(Criteria.where("projectId").is(projectId)).with(Sort.by(Sort.Order.desc("cTime"))).limit(10);
return mongoTemplate.findDistinct(query, "question", COLLECTION_NAME, String.class);
}
}
package com.zhiwei.brandkbs2.dao.impl;
import com.zhiwei.brandkbs2.dao.ChannelRecordRefreshTaskDao;
import com.zhiwei.brandkbs2.pojo.ChannelRecordRefreshTask;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.stereotype.Component;
/**
* @ClassName: ChannelRecordRefreshTaskDaoImpl
* @Description 渠道记录更新任务dao
* @author: cjz
* @date: 2024-07-22 11:37
*/
@Component("channelRecordRefreshTaskDao")
public class ChannelRecordRefreshTaskDaoImpl extends BaseMongoDaoImpl<ChannelRecordRefreshTask> implements ChannelRecordRefreshTaskDao {
private static final String COLLECTION_NAME = "brandkbs_channel_record_refresh_task";
public ChannelRecordRefreshTaskDaoImpl() {
super(COLLECTION_NAME);
}
@Override
public ChannelRecordRefreshTask findOne(Query query) {
return mongoTemplate.findOne(query, ChannelRecordRefreshTask.class, COLLECTION_NAME);
}
}
package com.zhiwei.brandkbs2.dao.impl;
import com.zhiwei.brandkbs2.dao.CustomInteractionUpdateRecordDao;
import com.zhiwei.brandkbs2.pojo.CustomInteractionUpdateRecord;
import org.springframework.data.domain.Sort;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.stereotype.Component;
/**
* @ClassName: MonthlyInteractionUpdateRecordDao
* @Description MonthlyInteractionUpdateRecordDao
* @author: cjz
* @date: 2024-07-09 10:30
*/
@Component("customInteractionUpdateRecordDao")
public class CustomInteractionUpdateRecordDaoImpl extends BaseMongoDaoImpl<CustomInteractionUpdateRecord> implements CustomInteractionUpdateRecordDao {
private static final String COLLECTION_NAME = "brandkbs_custom_interaction_update_record";
public CustomInteractionUpdateRecordDaoImpl() {
super(COLLECTION_NAME);
}
@Override
public CustomInteractionUpdateRecord findLastRecord(String projectId) {
return mongoTemplate.findOne(new Query().addCriteria(Criteria.where("projectId").is(projectId))
.with(Sort.by(Sort.Order.desc("cTime"))).limit(1), CustomInteractionUpdateRecord.class, COLLECTION_NAME);
}
}
...@@ -2,6 +2,7 @@ package com.zhiwei.brandkbs2.dao.impl; ...@@ -2,6 +2,7 @@ package com.zhiwei.brandkbs2.dao.impl;
import com.zhiwei.brandkbs2.dao.UserDao; import com.zhiwei.brandkbs2.dao.UserDao;
import com.zhiwei.brandkbs2.pojo.User; import com.zhiwei.brandkbs2.pojo.User;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
/** /**
...@@ -18,4 +19,9 @@ public class UserDaoImpl extends BaseMongoDaoImpl<User> implements UserDao { ...@@ -18,4 +19,9 @@ public class UserDaoImpl extends BaseMongoDaoImpl<User> implements UserDao {
public UserDaoImpl() { public UserDaoImpl() {
super(COLLECTION_NAME); super(COLLECTION_NAME);
} }
@Override
public User findOne(Query query) {
return mongoTemplate.findOne(query, User.class, COLLECTION_NAME);
}
} }
package com.zhiwei.brandkbs2.es; package com.zhiwei.brandkbs2.es;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.brandkbs2.common.GenericAttribute; import com.zhiwei.brandkbs2.common.GenericAttribute;
import com.zhiwei.brandkbs2.pojo.ChannelIndex; import com.zhiwei.brandkbs2.pojo.ChannelIndex;
import com.zhiwei.brandkbs2.pojo.ChannelRecord; import com.zhiwei.brandkbs2.pojo.ChannelRecord;
...@@ -10,23 +11,26 @@ import org.apache.logging.log4j.Logger; ...@@ -10,23 +11,26 @@ import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.search.sort.SortOrder;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.*;
import java.util.Calendar; import java.util.concurrent.CompletableFuture;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors; import java.util.stream.Collectors;
...@@ -111,6 +115,87 @@ public class ChannelEsDao extends EsClientDao { ...@@ -111,6 +115,87 @@ public class ChannelEsDao extends EsClientDao {
} }
} }
public Integer removeChannelRecordBatch(List<JSONObject> list) {
AtomicInteger update = new AtomicInteger();
AtomicInteger total = new AtomicInteger(list.size());
List<CompletableFuture<Boolean>> futures = new ArrayList<>(list.size());
list.forEach(json -> futures.add(CompletableFuture.supplyAsync(() -> removeChannelRecordByCondition(json, total), executor)));
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).whenComplete((r, e) -> {
futures.forEach(f -> {
if (f.join()) {
update.incrementAndGet();
}
});
}).join();
// list.forEach(json -> {
// removeChannelRecordByCondition(json, total);
// });
return update.get();
}
private boolean removeChannelRecordByCondition(JSONObject json, AtomicInteger total) {
String index = getChannelRecordIndexes().get(0);
String id = json.getString("id");
AtomicInteger searchCount = new AtomicInteger();
AtomicInteger updateCount = new AtomicInteger();
boolean update = false;
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.must(QueryBuilders.termQuery("record.articles.id", id));
if (json.containsKey("projectId")) {
boolQuery.must(QueryBuilders.termQuery("project_id.keyword", json.getString("projectId")));
}
if (json.containsKey("contendIds")) {
BoolQueryBuilder contendBoolQuery = QueryBuilders.boolQuery();
json.getJSONArray("contendIds").toJavaList(String.class).forEach(contendId -> {
contendBoolQuery.should(QueryBuilders.termQuery("contend_id.keyword", contendId));
});
boolQuery.must(contendBoolQuery);
}
searchSourceBuilder.query(boolQuery);
searchSourceBuilder.size(1000);
SearchResponse response = retryTemplate.execute(context -> {
try {
return channelEsClient.search(new SearchRequest().indices(index).source(searchSourceBuilder), RequestOptions.DEFAULT);
} catch (IOException e) {
log.info("resetChannelRecordById:{},查询失败,尝试重试第{}次-", id, context.getRetryCount() + 1, e);
return null;
}
});
if (null != response) {
searchCount.addAndGet(response.getHits().getHits().length);
for (SearchHit hit : response.getHits().getHits()) {
JSONObject record = new JSONObject((Map<String, Object>) hit.getSourceAsMap().get("record"));
Long lastTime = record.getLong("last_time");
AtomicBoolean valid = new AtomicBoolean(false);
List<ChannelIndex.Article> articles = record.getJSONArray("articles").toJavaList(JSONObject.class).stream().map(article -> {
// 移除id相同的数据
if (article.getString("id").equals(id)) {
valid.set(true);
return null;
}
return ChannelIndex.Article.fromRecordMap(article);
}).filter(Objects::nonNull).collect(Collectors.toList());
// 有效更新
if (valid.get()) {
Map<String, Object> updateMap = new HashMap<>();
updateMap.put("record", new ChannelIndex.Record(lastTime, articles).toEsMap());
updateMap.put("article_count", articles.size());
try {
channelEsClient.update(new UpdateRequest().index(index).id(hit.getId()).doc(updateMap), RequestOptions.DEFAULT);
updateCount.getAndIncrement();
update = true;
} catch (Exception e) {
log.info("resetChannelRecordById:{},更新失败", id, e);
}
}
}
}
log.info("resetChannelRecordById:{},本次查询到{}条,更新{}条,剩余id数:{}", id, searchCount.get(), updateCount.get(), total.decrementAndGet());
return update;
}
private BulkResponse upsertChannelRecordLimit(List<ChannelRecord> records, String index, int limit) { private BulkResponse upsertChannelRecordLimit(List<ChannelRecord> records, String index, int limit) {
AtomicBoolean res = new AtomicBoolean(true); AtomicBoolean res = new AtomicBoolean(true);
BulkResponse bulkResponse = null; BulkResponse bulkResponse = null;
......
package com.zhiwei.brandkbs2.es; package com.zhiwei.brandkbs2.es;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.hankcs.hanlp.HanLP;
import com.zhiwei.brandkbs2.common.GenericAttribute; import com.zhiwei.brandkbs2.common.GenericAttribute;
import com.zhiwei.brandkbs2.config.Constant; import com.zhiwei.brandkbs2.config.Constant;
import com.zhiwei.brandkbs2.pojo.ChannelIndex; import com.zhiwei.brandkbs2.pojo.ChannelIndex;
import com.zhiwei.brandkbs2.pojo.ai.FieldMapping;
import com.zhiwei.brandkbs2.util.TextUtil;
import com.zhiwei.brandkbs2.util.Tools; import com.zhiwei.brandkbs2.util.Tools;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
...@@ -33,6 +36,7 @@ import org.elasticsearch.search.aggregations.AggregationBuilder; ...@@ -33,6 +36,7 @@ import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder; import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder; import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.joda.time.Period; import org.joda.time.Period;
import org.joda.time.PeriodType; import org.joda.time.PeriodType;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
...@@ -112,6 +116,103 @@ public class EsClientDao { ...@@ -112,6 +116,103 @@ public class EsClientDao {
return res; return res;
} }
public List<JSONObject> searchRecordEmpty(long startTime, long endTime, String mgroup) {
List<JSONObject> res = new ArrayList<>();
List<Long[]> cutTimes = Tools.cutTimeRange(startTime, endTime, ONE_HOUR * 24);
List<CompletableFuture<List<JSONObject>>> futures = new ArrayList<>(cutTimes.size());
cutTimes.forEach(times -> futures.add(CompletableFuture.supplyAsync(() -> searchRecordEmptySingle(times[0], times[1], mgroup), executor)));
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).whenComplete((r, e) -> {
futures.forEach(f -> {
res.addAll(f.join());
});
}).join();
return res;
}
private List<JSONObject> searchRecordEmptySingle(long startTime, long endTime, String mgroup) {
List<JSONObject> res = new ArrayList<>();
try {
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
// mark_cache_maps字段为空
boolQuery.mustNot(QueryBuilders.existsQuery("mark_cache_maps"));
boolQuery.must(QueryBuilders.termQuery("mgroup.keyword", mgroup));
boolQuery.must(QueryBuilders.rangeQuery("time").gte(startTime).lt(endTime));
List<JSONObject> results = searchScroll(boolQuery, 10000, new String[]{"id"});
res.addAll(results);
} catch (IOException e) {
log.error("searchRecordEmptySingle-", e);
}
log.info("startTime:{},endTime:{},size:{}", DF.format(startTime), DF.format(endTime), res.size());
return res;
}
public List<JSONObject> searchRecordUnrelated(long startTime, long endTime, String mgroupId, List<JSONObject> brandkbsInfo) {
List<JSONObject> res = new ArrayList<>();
List<JSONObject> dataList = new ArrayList<>();
List<Long[]> cutTimes = Tools.cutTimeRange(startTime, endTime, ONE_HOUR * 24);
List<CompletableFuture<List<JSONObject>>> futures = new ArrayList<>(cutTimes.size());
cutTimes.forEach(times -> futures.add(CompletableFuture.supplyAsync(() -> searchRecordUnrelatedSingle(times[0], times[1], mgroupId), executor)));
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).whenComplete((r, e) -> {
futures.forEach(f -> dataList.addAll(f.join()));
}).join();
// 找到该mgroup下关联的项目品牌及竞品
Map<String, List<String>> relatedMap = brandkbsInfo.stream().collect(Collectors.toMap(json -> json.getString("projectId"), json -> {
List<String> brandIds = new ArrayList<>();
String brandId = Objects.equals(json.getString("brandId"), json.getString("projectId")) ? Constant.PRIMARY_CONTEND_ID : json.getString("brandId");
brandIds.add(brandId);
return brandIds;
}, (List<String> ls1, List<String> ls2) -> {
ls1.addAll(ls2);
return ls1;
}));
// 筛选没有关联关系需要重置的部分
dataList.forEach(data -> {
String id = data.getString("id");
List<Map<String, Object>> brandkbsCacheMaps = (List<Map<String, Object>>) data.get("brandkbs_cache_maps");
relatedMap.forEach((project, contends) -> {
List<String> unrelatedContends = new ArrayList<>();
contends.forEach(contend -> {
boolean hit = false;
for (Map<String, Object> map : brandkbsCacheMaps) {
String key = map.get("key") + "";
if (key.equals(project + "_" + contend)) {
hit = true;
break;
}
}
// 不满足关系则添加(说明需要重置)
if (!hit) {
unrelatedContends.add(contend);
}
});
if (!unrelatedContends.isEmpty()) {
JSONObject json = new JSONObject();
json.put("id", id);
json.put("projectId", project);
json.put("contendIds", unrelatedContends);
res.add(json);
}
});
});
return res;
}
public List<JSONObject> searchRecordUnrelatedSingle(long startTime, long endTime, String mgroupId) {
List<JSONObject> res = new ArrayList<>();
try {
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
// TODO 是否需要设立标注字段限制(查询范围更小更精准)
boolQuery.must(QueryBuilders.termQuery("mgroup_id.keyword", mgroupId));
boolQuery.must(QueryBuilders.rangeQuery("time").gte(startTime).lte(endTime));
List<JSONObject> results = searchScroll(boolQuery, 10000, new String[]{"id", "brandkbs_cache_maps"});
res.addAll(results);
} catch (IOException e) {
log.error("startTime:{},endTime:{},mgroupId:{},searchRecordUnrelatedSingle-", startTime, endTime, mgroupId, e);
}
log.info("startTime:{},endTime:{},size:{}", DF.format(startTime), DF.format(endTime), res.size());
return res;
}
// //
// /** // /**
// * 搜索符合事件数据 // * 搜索符合事件数据
...@@ -170,11 +271,48 @@ public class EsClientDao { ...@@ -170,11 +271,48 @@ public class EsClientDao {
return retryTemplate.execute(context -> searchScroll(sourceBuilder)); return retryTemplate.execute(context -> searchScroll(sourceBuilder));
} }
private List<JSONObject> searchRecordEs(long startTime, long endTime){
List<JSONObject> results = new ArrayList<>();
if (endTime - startTime <= 10 * 60 * 1000L){
log.error("searchRecord分段查询至最小分割仍未满足-时间范围:{}-{}", startTime, endTime);
return Collections.emptyList();
}
try {
QueryBuilder queryBuilder = QueryBuilders.rangeQuery("mtime").gte(startTime).lt(endTime);
results = searchScroll(queryBuilder, 10000, CHANNEL_RECORD_FETCH_SOURCE);
}catch (Exception e){
// 时间分段查询
long midTime = startTime + (endTime - startTime) / 2;
results.addAll(searchRecordEs(startTime, midTime));
results.addAll(searchRecordEs(midTime, endTime));
}
return results;
}
private Pair<Long[], Map<ChannelIndex, ChannelIndex.Record>> searchRecord(long startTime, long endTime) { private Pair<Long[], Map<ChannelIndex, ChannelIndex.Record>> searchRecord(long startTime, long endTime) {
Map<ChannelIndex, ChannelIndex.Record> res = new HashMap<>(); Map<ChannelIndex, ChannelIndex.Record> res = new HashMap<>();
try { try {
List<JSONObject> results = new ArrayList<>();
try {
QueryBuilder queryBuilder = QueryBuilders.rangeQuery("mtime").gte(startTime).lt(endTime); QueryBuilder queryBuilder = QueryBuilders.rangeQuery("mtime").gte(startTime).lt(endTime);
List<JSONObject> results = searchScroll(queryBuilder, 10000, CHANNEL_RECORD_FETCH_SOURCE); results = searchScroll(queryBuilder, 10000, CHANNEL_RECORD_FETCH_SOURCE);
}catch (Exception e){
log.info("searchRecord-搜索阶段出错-时间分段重试开始", e);
// 时间分段查询
long midTime = startTime + (endTime - startTime) / 2;
try {
QueryBuilder queryBuilder1 = QueryBuilders.rangeQuery("mtime").gte(startTime).lt(midTime);
results.addAll(searchScroll(queryBuilder1, 1000, CHANNEL_RECORD_FETCH_SOURCE));
}catch (Exception e1){
log.error("searchRecord分段查询出错,时间范围:{}-{}", startTime, midTime, e1);
}
try {
QueryBuilder queryBuilder2 = QueryBuilders.rangeQuery("mtime").gte(midTime).lt(endTime);
results.addAll(searchScroll(queryBuilder2, 1000, CHANNEL_RECORD_FETCH_SOURCE));
}catch (Exception e2){
log.error("searchRecord分段查询出错,时间范围:{}-{}", midTime, endTime, e2);
}
}
for (Map<String, Object> result : results) { for (Map<String, Object> result : results) {
for (ChannelIndex channelIndex : ChannelIndex.createChannelIndexes(result)) { for (ChannelIndex channelIndex : ChannelIndex.createChannelIndexes(result)) {
res.compute(channelIndex, (k, v) -> { res.compute(channelIndex, (k, v) -> {
...@@ -190,13 +328,91 @@ public class EsClientDao { ...@@ -190,13 +328,91 @@ public class EsClientDao {
}); });
} }
} }
} catch (IOException e) { } catch (Exception e) {
log.error("searchRecord-", e); log.error("searchRecord-", e);
} }
log.info("startTime:{},endTime:{},size:{}", DF.format(startTime), DF.format(endTime), res.size()); log.info("startTime:{},endTime:{},size:{}", DF.format(startTime), DF.format(endTime), res.size());
return Pair.of(new Long[]{startTime, endTime}, res); return Pair.of(new Long[]{startTime, endTime}, res);
} }
public List<JSONObject> findSearch(List<FieldMapping> fieldMappings) throws IOException {
List<JSONObject> list = new ArrayList<>();
BoolQueryBuilder query = getBoolQueryBuilder(fieldMappings);
String[] fetchSource = {"id", GenericAttribute.ES_TIME, GenericAttribute.ES_IND_TITLE};
SearchHit[] hits = searchHits(getIndexes(), query, null, fetchSource, null, 0, 10000, null).getHits();
// Map<String, JSONObject> idBaseMap = Arrays.stream(hits).map(hit -> new JSONObject(hit.getSourceAsMap())).collect(Collectors.toMap(json -> json.getString("id"), o -> o));
// Map<String, String> idTitle = Arrays.stream(hits)
// .map(hit -> new JSONObject(hit.getSourceAsMap()))
// .filter(json -> Objects.nonNull(json.getString(GenericAttribute.ES_IND_TITLE)))
// .collect(Collectors.toMap(json -> json.getString("id"), json -> json.getString(GenericAttribute.ES_IND_TITLE)));
Pair<Map<String, JSONObject>, Map<String, String>> searchProcess = findSearchResultProcess(hits);
Map<String, JSONObject> idBaseMap = searchProcess.getLeft();
Map<String, String> idTitle = searchProcess.getRight();
// 搜索条件未找到结果,将搜索关键词分词处理,再次查询
if (idTitle.isEmpty()){
SearchHit[] searchHitHanLP = findSearchHanLP(fieldMappings, fetchSource);
Pair<Map<String, JSONObject>, Map<String, String>> searchProcessHanLP = findSearchResultProcess(searchHitHanLP);
idBaseMap = searchProcessHanLP.getLeft();
idTitle = searchProcessHanLP.getRight();
}
if (idTitle.isEmpty()){
return list;
}
// 按标题聚合,取聚合结果集前9,并取结果集中最新的文章的id
Map<String, JSONObject> finalIdBaseMap = idBaseMap;
List<String> idList = TextUtil.getKResult(idTitle).stream()
.sorted(Comparator.comparing(List<String>::size, Comparator.reverseOrder()))
.limit(9)
.map(ids -> ids.stream().map(finalIdBaseMap::get).max(Comparator.comparingLong(json -> json.getLongValue(GenericAttribute.ES_TIME))).orElse(null))
.filter(Objects::nonNull)
.map(json -> json.getString("id"))
.collect(Collectors.toList());
// 反查原数据
for (String id : idList) {
list.add(getTopTitleLatest(getBoolQueryBuilder(fieldMappings), id));
}
return list;
}
private Pair<Map<String, JSONObject>, Map<String, String>> findSearchResultProcess(SearchHit[] hits){
Map<String, JSONObject> idBaseMap = Arrays.stream(hits).map(hit -> new JSONObject(hit.getSourceAsMap())).collect(Collectors.toMap(json -> json.getString("id"), o -> o));
Map<String, String> idTitle = Arrays.stream(hits)
.map(hit -> new JSONObject(hit.getSourceAsMap()))
.filter(json -> Objects.nonNull(json.getString(GenericAttribute.ES_IND_TITLE)))
.collect(Collectors.toMap(json -> json.getString("id"), json -> json.getString(GenericAttribute.ES_IND_TITLE)));
return Pair.of(idBaseMap, idTitle);
}
private SearchHit[] findSearchHanLP(List<FieldMapping> fieldMappings, String[] fetchSource) throws IOException {
fieldMappings.stream().filter(fieldMapping -> Objects.equals(FieldMapping.FieldMap.IND_FULL_TEXT, fieldMapping.getFieldMap()))
.findFirst().ifPresent(fieldMapping -> {
String value = String.valueOf(fieldMapping.getValue());
String newValue = HanLP.segment(Tools.filterSpecialCharacter(value)).stream().map(s -> s.word).distinct().collect(Collectors.joining("|"));
fieldMapping.setValue(newValue);
});
BoolQueryBuilder query = getBoolQueryBuilder(fieldMappings);
return searchHits(getIndexes(), query, null, fetchSource, null, 0, 10000, null).getHits();
}
private JSONObject getTopTitleLatest(BoolQueryBuilder query, String id) throws IOException {
query.must(QueryBuilders.termQuery("id", id));
FieldSortBuilder sort = new FieldSortBuilder(GenericAttribute.ES_TIME).order(SortOrder.DESC);
SearchHits searchHits = searchHits(getIndexes(), query, null, null, sort, 0, 1, null);
return new JSONObject(searchHits.getAt(0).getSourceAsMap());
}
private BoolQueryBuilder getBoolQueryBuilder(List<FieldMapping> fieldMappings) {
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
Map<String, List<FieldMapping>> groupMap = fieldMappings.stream().collect(Collectors.groupingBy(mapping -> mapping.getFieldMap().getFatherName()));
groupMap.forEach((fatherName, list) -> {
if (list.size() > 2) {
throw new IllegalStateException("构建搜索条件分组异常");
}
boolQueryBuilder.must(list.get(0).buildQuery(list.size() > 1 ? list.get(1) : null));
});
return boolQueryBuilder;
}
public String[] getIndexes() { public String[] getIndexes() {
return getIndexList().toArray(new String[0]); return getIndexList().toArray(new String[0]);
} }
......
package com.zhiwei.brandkbs2.pojo;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
/**
* @ClassName: AISearchQuestionRecord
* @Description ai搜索问题记录
* @author: cjz
* @date: 2024-8-12 14:58
*/
@Getter
@Setter
@AllArgsConstructor
public class AISearchQuestionRecord extends AbstractBaseMongo {
/**
* 问题
*/
private String question;
/**
* 项目id
*/
private String projectId;
/**
* 创建时间
*/
private Long cTime;
}
package com.zhiwei.brandkbs2.pojo;
import com.alibaba.fastjson.JSONObject;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import java.util.List;
/**
* @ClassName: ChannelRecordRefreshTask
* @Description 渠道记录更新任务
* @author: cjz
* @date: 2024-07-22 10:37
*/
@Getter
@Setter
@AllArgsConstructor
public class ChannelRecordRefreshTask extends AbstractBaseMongo{
/**
* 受影响数据最小时间
*/
private Long startTime;
/**
* 受影响数据最大时间
*/
private Long endTime;
/**
* 品见信息:[{projectId:,brandId:}]
*/
List<JSONObject> brandkbsInfo;
/**
* mgroupId
*/
private String mgroupId;
/**
* 任务状态
*/
private String status;
/**
* 创建者
*/
private String submitter;
/**
* 创建时间
*/
private Long cTime;
/**
* 修改时间
*/
private Long uTime;
@Getter
public enum TaskStatus {
NOT_START("未开始"), UPDATING("进行中"), ERROR("出错"), COMPLETED("已完成");
private String status;
TaskStatus(String status) {
this.status = status;
}
}
}
package com.zhiwei.brandkbs2.pojo;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
/**
* @ClassName: MonthlyInteractionUpdateRecord
* @Description 每月互动量更新记录
* @author: cjz
* @date: 2024-07-09 9:48
*/
@Getter
@Setter
@AllArgsConstructor
public class CustomInteractionUpdateRecord extends AbstractBaseMongo{
/**
* 拉取数据的开始时间
*/
private Long startTime;
/**
* 拉取数据的结束时间
*/
private Long endTime;
/**
* 项目id
*/
private String projectId;
/**
* 数据更新节点开始时间
*/
private Long dataStartTime;
/**
* 状态:完成|
*/
private String status;
private Long cTime;
private Long uTime;
}
package com.zhiwei.brandkbs2.pojo.ai;
import lombok.Data;
import lombok.Getter;
/**
* @ClassName: AccessModel
* @Description AccessModel
* @author: sjj
* @date: 2024-07-24 14:05
*/
@Data
public class AccessModel {
// 模型名称
private String modelName;
// 接入点id
private String modelId;
// 输入单价(每千个token)
private Double inputPrice;
// 输出单价(每千个token)
private Double outputPrice;
public AccessModel(String modelId, Model model) {
this.modelId = modelId;
this.modelName = model.modelName;
this.inputPrice = model.inputPrice;
this.outputPrice = model.outputPrice;
}
public enum Model {
DOUBAO_PRO_4K("Doubao-pro-4k", 0.0008, 0.0020),
DOUBAO_PRO_32K("Doubao-pro-32k", 0.0008, 0.0020),
DOUBAO_PRO_128K("Doubao-pro-128k", 0.0090, 0.0500),
DOUBAO_LITE_4K("doubao-lite-4k", 0.0003, 0.0006),
DOUBAO_LITE_32K("Doubao-lite-32k", 0.0003, 0.0006),
DOUBAO_LITE_128K("Doubao-lite-128k", 0.0008, 0.0010);
@Getter
private final String modelName;
// 输入单价(每千个token)
@Getter
private final Double inputPrice;
// 输出单价(每千个token)
@Getter
private final Double outputPrice;
Model(String modelName, Double inputPrice, Double outputPrice) {
this.modelName = modelName;
this.inputPrice = inputPrice;
this.outputPrice = outputPrice;
}
}
}
package com.zhiwei.brandkbs2.pojo.ai;
import com.zhiwei.brandkbs2.common.GlobalPojo;
import com.zhiwei.brandkbs2.config.Constant;
import com.zhiwei.brandkbs2.es.EsQueryTools;
import com.zhiwei.brandkbs2.pojo.AbstractProject;
import com.zhiwei.brandkbs2.pojo.Contend;
import com.zhiwei.brandkbs2.pojo.Project;
import lombok.Data;
import lombok.Getter;
import org.apache.lucene.search.join.ScoreMode;
import org.elasticsearch.index.query.*;
import java.util.*;
import java.util.stream.Collectors;
/**
* @ClassName: FieldMap
* @Description FieldMap
* @author: sjj
* @date: 2024-08-02 14:01
*/
@Data
public class FieldMapping {
private FieldMap fieldMap;
private Object value;
public FieldMapping(FieldMap fieldMap, Object value) {
this.fieldMap = fieldMap;
this.value = value;
}
public QueryBuilder buildQuery(FieldMapping fieldMapping) {
boolean existsAnd = null != fieldMapping;
RangeQueryBuilder timeRangeBuilder;
String contendId = "0";
// 项目组需绑定查询
switch (fieldMap) {
case START_TIME:
timeRangeBuilder = QueryBuilders.rangeQuery(fieldMap.databaseName).gte(value);
if (existsAnd && fieldMapping.fieldMap.equals(FieldMap.END_TIME)) {
timeRangeBuilder.lt(fieldMapping.value);
}
return timeRangeBuilder;
case END_TIME:
timeRangeBuilder = QueryBuilders.rangeQuery(fieldMap.databaseName).lt(value);
if (existsAnd && fieldMapping.fieldMap.equals(FieldMap.START_TIME)) {
timeRangeBuilder.gte(fieldMapping.value);
}
return timeRangeBuilder;
case PROJECT:
if (existsAnd && fieldMapping.fieldMap == FieldMap.BRAND) {
contendId = (String) fieldMapping.value;
}
BoolQueryBuilder nestedBoolBuilder = QueryBuilders.boolQuery();
// 必要条件
nestedBoolBuilder.must(QueryBuilders.termQuery(fieldMap.databaseName, value + "_" + contendId));
return new NestedQueryBuilder("brandkbs_cache_maps", nestedBoolBuilder, ScoreMode.None);
case BRAND:
if (!existsAnd || fieldMapping.fieldMap != FieldMap.PROJECT) {
throw new IllegalStateException("项目条件缺失");
}
return fieldMapping.buildQuery(this);
case IND_FULL_TEXT:
return EsQueryTools.assembleNormalKeywordQuery(String.valueOf(value), new String[]{fieldMap.databaseName});
// return QueryBuilders.matchPhraseQuery(fieldMap.databaseName, value);
case SOURCE:
case MTAG:
return QueryBuilders.termQuery(fieldMap.databaseName, value);
}
return null;
}
public enum FieldMap {
START_TIME("起始时间", "时间", "time"),
END_TIME("结束时间", "时间", "time"),
PROJECT("项目", "项目", "brandkbs_cache_maps.key.keyword"),
BRAND("品牌", "项目", "brandkbs_cache_maps.key.keyword"),
SOURCE("渠道", "渠道", "source"),
MTAG("标签", "标签", "mark_cache_maps.name.keyword"),
IND_FULL_TEXT("搜索条件", "搜索条件", "ind_full_text");
@Getter
private final String name;
@Getter
private final String fatherName;
@Getter
private final String databaseName;
FieldMap(String name, String fatherName, String databaseName) {
this.name = name;
this.fatherName = fatherName;
this.databaseName = databaseName;
}
}
public static FieldMapping createFromNameAndValue(String name, Object value, List<FieldMapping> fieldMappings) {
FieldMap fieldMap = null;
// TODO 字段转换待完善,引入数据库
for (FieldMap f : FieldMap.values()) {
if (name.equals(f.getName())) {
// 项目名需要转成id
if (FieldMap.PROJECT == f) {
Map<String, Project> projectMap = GlobalPojo.PROJECT_MAP.values().stream().collect(Collectors.toMap(AbstractProject::getProjectName, o -> o));
if (projectMap.containsKey(String.valueOf(value))) {
value = projectMap.get(String.valueOf(value)).getId();
}
}
// 品牌需要转换
if (FieldMap.BRAND == f) {
if ("主品牌".equals(value)) {
value = Constant.PRIMARY_CONTEND_ID;
} else {
// 寻找对应的竞品id
Optional<FieldMapping> project = fieldMappings.stream().filter(field -> Objects.equals(FieldMap.PROJECT, field.getFieldMap())).findFirst();
if (project.isPresent()){
List<Contend> contendList = GlobalPojo.PROJECT_MAP.get(String.valueOf(project.get().getValue())).getContendList();
Object finalValue = value;
Optional<Contend> contendOptional = contendList.stream().filter(contend -> Objects.equals(contend.getBrandName(), finalValue)).findFirst();
if (contendOptional.isPresent()){
value = contendOptional.get().getId();
}else {
value = Constant.PRIMARY_CONTEND_ID;
}
}else {
value = Constant.PRIMARY_CONTEND_ID;
}
}
}
// 标签只包含正负中
if (FieldMap.MTAG == f) {
if (!Arrays.asList("正面", "中性", "负面").contains(String.valueOf(value))) {
return null;
}
}
fieldMap = f;
break;
}
}
if (null == fieldMap) {
return null;
}
return new FieldMapping(fieldMap, value);
}
}
...@@ -333,4 +333,13 @@ public interface ChannelService { ...@@ -333,4 +333,13 @@ public interface ChannelService {
* @return * @return
*/ */
JSONObject getImportantChannelListDetail(String type, String name, String keyword); JSONObject getImportantChannelListDetail(String type, String name, String keyword);
/**
* 生成渠道记录更新任务
* @param startTime
* @param endTime
* @param yuqingProjectId
* @param submitter
*/
void createChannelRecordRefreshTask(Long startTime, Long endTime, List<JSONObject> brandkbsInfos, String yuqingProjectId, String submitter);
} }
...@@ -2,10 +2,7 @@ package com.zhiwei.brandkbs2.service; ...@@ -2,10 +2,7 @@ package com.zhiwei.brandkbs2.service;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.zhiwei.brandkbs2.model.ResponseResult; import com.zhiwei.brandkbs2.model.ResponseResult;
import com.zhiwei.brandkbs2.pojo.BaseMap; import com.zhiwei.brandkbs2.pojo.*;
import com.zhiwei.brandkbs2.pojo.DailyReport;
import com.zhiwei.brandkbs2.pojo.Event;
import com.zhiwei.brandkbs2.pojo.MarkFlowEntity;
import com.zhiwei.brandkbs2.pojo.dto.*; import com.zhiwei.brandkbs2.pojo.dto.*;
import com.zhiwei.brandkbs2.pojo.vo.LineVO; import com.zhiwei.brandkbs2.pojo.vo.LineVO;
import com.zhiwei.brandkbs2.pojo.vo.PageVO; import com.zhiwei.brandkbs2.pojo.vo.PageVO;
...@@ -836,4 +833,25 @@ public interface MarkDataService { ...@@ -836,4 +833,25 @@ public interface MarkDataService {
* @return * @return
*/ */
List<String> expandOriginRange(MarkSearchDTO dto); List<String> expandOriginRange(MarkSearchDTO dto);
/**
* AI搜索-推荐提问
* @param question
* @return
*/
List<String> getAIReferenceQuestion(String question, int size);
/**
* AI搜索-参考提问
* @param cache
* @return
*/
List<String> getAIReferenceQuestionCache(boolean cache);
/**
* AI搜索-搜索结果
* @param question
* @return
*/
JSONObject getAISearchResult(String question);
} }
...@@ -64,12 +64,17 @@ public interface TaskService{ ...@@ -64,12 +64,17 @@ public interface TaskService{
void generateDailyReport(); void generateDailyReport();
/** /**
* 每月互动量更新 时间范围近六个月 * 每日互动量更新 时间范围近10天
*/ */
void monthlyCustomXhsInteractionUpdate(); void dailyCustomXhsInteractionUpdate();
/** /**
* 每日互动量更新 时间范围近10天 * 定时拉取并进行渠道库更新任务
*/ */
void dailyCustomXhsInteractionUpdate(); void refreshChannelRecord();
/**
* 生成ai搜索参考提问缓存
*/
void cacheAIQuestion();
} }
...@@ -42,7 +42,7 @@ public interface UserService { ...@@ -42,7 +42,7 @@ public interface UserService {
UserInfo queryUserInfo(String userId, String pid); UserInfo queryUserInfo(String userId, String pid);
/** /**
* 分页查询所有用户 * 分页查询所有用户(当前项目的用户)
* *
* @param page 页码 * @param page 页码
* @param size 大小 * @param size 大小
...@@ -55,14 +55,14 @@ public interface UserService { ...@@ -55,14 +55,14 @@ public interface UserService {
PageVO<JSONObject> findUserList(int page, int size, String keyword, String pid, int role, String sorter); PageVO<JSONObject> findUserList(int page, int size, String keyword, String pid, int role, String sorter);
/** /**
* 添加用户 * 添加用户(当前项目的用户)
* *
* @param userDTO * @param userDTO
*/ */
ResponseResult addUser(UserDTO userDTO); ResponseResult addUser(UserDTO userDTO);
/** /**
* 删除用户 * 删除用户(当前项目的用户)
* *
* @param userId * @param userId
* @param pid * @param pid
...@@ -70,7 +70,7 @@ public interface UserService { ...@@ -70,7 +70,7 @@ public interface UserService {
void deleteUser(String userId, String pid); void deleteUser(String userId, String pid);
/** /**
* 编辑用户 * 编辑用户(当前项目的用户)
* *
* @param userDTO * @param userDTO
*/ */
...@@ -135,4 +135,40 @@ public interface UserService { ...@@ -135,4 +135,40 @@ public interface UserService {
*/ */
ResponseResult mobileCutLogin(String userId,String projectId); ResponseResult mobileCutLogin(String userId,String projectId);
/**
* 项目管理员-所有项目用户权限列表
* @param page
* @param pageSize
* @return
*/
PageVO<JSONObject> findAllUserList(String keyword, int page, int pageSize);
/**
* 项目管理员-所有项目用户权限列表-通过用户名或手机号查询品见用户项目权限
* @param nickName
* @param phoneNumber
* @return
*/
ResponseResult getUserByNickNameOrPhone(String nickName, String phoneNumber);
/**
* 项目管理员-所有项目用户权限列表-编辑单条项目权限
*
* @param json
*/
ResponseResult updateOneUserRoles(JSONObject json);
/**
* 项目管理员-所有项目用户权限列表-批量编辑项目权限
*
* @param json
*/
ResponseResult updateBatchUserRoles(JSONObject json);
/**
* 项目管理员-所有项目用户列表-删除用户权限
* @param id
* @return
*/
ResponseResult deleteUserRole(String id, String key);
} }
...@@ -127,6 +127,9 @@ public class ChannelServiceImpl implements ChannelService { ...@@ -127,6 +127,9 @@ public class ChannelServiceImpl implements ChannelService {
@Resource(name = "importantChannelOverviewDao") @Resource(name = "importantChannelOverviewDao")
private ImportantChannelOverviewDao importantChannelOverviewDao; private ImportantChannelOverviewDao importantChannelOverviewDao;
@Resource(name = "channelRecordRefreshTaskDao")
private ChannelRecordRefreshTaskDao channelRecordRefreshTaskDao;
@Resource(name = "mongoUtil") @Resource(name = "mongoUtil")
MongoUtil mongoUtil; MongoUtil mongoUtil;
...@@ -529,11 +532,14 @@ public class ChannelServiceImpl implements ChannelService { ...@@ -529,11 +532,14 @@ public class ChannelServiceImpl implements ChannelService {
return null; return null;
} }
articles.sort(Comparator.comparingLong(ChannelIndex.Article::getTime).reversed()); articles.sort(Comparator.comparingLong(ChannelIndex.Article::getTime).reversed());
// 重置符合条件的文章及数目
channelRecord.getRecord().setArticles(articles); channelRecord.getRecord().setArticles(articles);
channelRecord.setArticleCount(articles.size());
return channelRecord; return channelRecord;
}).filter(Objects::nonNull).collect(Collectors.groupingBy(ChannelRecord::getPlatform)); }).filter(Objects::nonNull).collect(Collectors.groupingBy(ChannelRecord::getPlatform));
for (String platformName : PLATFORMS) { for (String platformName : PLATFORMS) {
List<ChannelRecord> channelRecordList = channelRecords.getOrDefault(platformName, Collections.emptyList()).stream().limit(size).collect(Collectors.toList()); List<ChannelRecord> channelRecordList =
channelRecords.getOrDefault(platformName, Collections.emptyList()).stream().sorted((x, y) -> Long.compare(y.getArticleCount(), x.getArticleCount())).limit(size).collect(Collectors.toList());
List<ChannelListVO> list = new ArrayList<>(size); List<ChannelListVO> list = new ArrayList<>(size);
Map<String, ChannelRecord> map = Maps.uniqueIndex(channelRecordList, ChannelRecord::getChannelFid); Map<String, ChannelRecord> map = Maps.uniqueIndex(channelRecordList, ChannelRecord::getChannelFid);
Map<String, Channel> fidChannel = channelDao.queryUniqueAsync(map.keySet()); Map<String, Channel> fidChannel = channelDao.queryUniqueAsync(map.keySet());
...@@ -545,14 +551,14 @@ public class ChannelServiceImpl implements ChannelService { ...@@ -545,14 +551,14 @@ public class ChannelServiceImpl implements ChannelService {
list.add(ChannelListVO.createFromChannel(record, record.getRecord().getArticles().size())); list.add(ChannelListVO.createFromChannel(record, record.getRecord().getArticles().size()));
} }
}); });
List<ChannelListVO> resList; // List<ChannelListVO> resList;
// 排序 // // 排序
if (Objects.nonNull(sorter) && sorter.contains("index")){ // if (Objects.nonNull(sorter) && sorter.contains("index")){
resList = list.stream().sorted(Comparator.comparingDouble(ChannelListVO::getEmotionIndex).reversed()).collect(Collectors.toList()); // resList = list.stream().sorted(Comparator.comparingDouble(ChannelListVO::getEmotionIndex).reversed()).collect(Collectors.toList());
}else { // }else {
resList = list.stream().sorted(Comparator.comparingDouble(ChannelListVO::getArticleCount).reversed()).collect(Collectors.toList()); // resList = list.stream().sorted(Comparator.comparingDouble(ChannelListVO::getArticleCount).reversed()).collect(Collectors.toList());
} // }
res.put(platformName, resList); res.put(platformName, list);
} }
return res; return res;
} }
...@@ -817,6 +823,12 @@ public class ChannelServiceImpl implements ChannelService { ...@@ -817,6 +823,12 @@ public class ChannelServiceImpl implements ChannelService {
} }
} }
@Override
public void createChannelRecordRefreshTask(Long startTime, Long endTime, List<JSONObject> brandkbsInfos, String yuqingProjectId, String submitter) {
channelRecordRefreshTaskDao.insertOne(new ChannelRecordRefreshTask(startTime, endTime, brandkbsInfos, yuqingProjectId,
ChannelRecordRefreshTask.TaskStatus.NOT_START.getStatus(), submitter, System.currentTimeMillis(), System.currentTimeMillis()));
}
private JSONObject provinceDataProcess(List<ImportantChannel> sortList){ private JSONObject provinceDataProcess(List<ImportantChannel> sortList){
List<JSONObject> resList = new ArrayList<>(); List<JSONObject> resList = new ArrayList<>();
JSONObject jsonObject = new JSONObject(new LinkedHashMap<>()); JSONObject jsonObject = new JSONObject(new LinkedHashMap<>());
......
...@@ -6,13 +6,14 @@ import com.alibaba.fastjson.JSONObject; ...@@ -6,13 +6,14 @@ import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.hankcs.hanlp.HanLP; import com.hankcs.hanlp.HanLP;
import com.volcengine.ark.runtime.model.completion.chat.ChatCompletionRequest;
import com.volcengine.ark.runtime.model.completion.chat.ChatCompletionResult;
import com.volcengine.ark.runtime.model.completion.chat.ChatMessage;
import com.volcengine.ark.runtime.model.completion.chat.ChatMessageRole;
import com.zhiwei.base.category.ClassB; import com.zhiwei.base.category.ClassB;
import com.zhiwei.base.entity.subclass.mark.MarkInfo; import com.zhiwei.base.entity.subclass.mark.MarkInfo;
import com.zhiwei.brandkbs2.auth.UserThreadLocal; import com.zhiwei.brandkbs2.auth.UserThreadLocal;
import com.zhiwei.brandkbs2.common.ChannelType; import com.zhiwei.brandkbs2.common.*;
import com.zhiwei.brandkbs2.common.GenericAttribute;
import com.zhiwei.brandkbs2.common.GlobalPojo;
import com.zhiwei.brandkbs2.common.RedisKeyPrefix;
import com.zhiwei.brandkbs2.config.Constant; import com.zhiwei.brandkbs2.config.Constant;
import com.zhiwei.brandkbs2.dao.*; import com.zhiwei.brandkbs2.dao.*;
import com.zhiwei.brandkbs2.easyexcel.EasyExcelUtil; import com.zhiwei.brandkbs2.easyexcel.EasyExcelUtil;
...@@ -29,6 +30,8 @@ import com.zhiwei.brandkbs2.listener.ApplicationProjectListener; ...@@ -29,6 +30,8 @@ import com.zhiwei.brandkbs2.listener.ApplicationProjectListener;
import com.zhiwei.brandkbs2.model.CommonCodeEnum; import com.zhiwei.brandkbs2.model.CommonCodeEnum;
import com.zhiwei.brandkbs2.model.ResponseResult; import com.zhiwei.brandkbs2.model.ResponseResult;
import com.zhiwei.brandkbs2.pojo.*; import com.zhiwei.brandkbs2.pojo.*;
import com.zhiwei.brandkbs2.pojo.ai.AccessModel;
import com.zhiwei.brandkbs2.pojo.ai.FieldMapping;
import com.zhiwei.brandkbs2.pojo.dto.*; import com.zhiwei.brandkbs2.pojo.dto.*;
import com.zhiwei.brandkbs2.pojo.vo.*; import com.zhiwei.brandkbs2.pojo.vo.*;
import com.zhiwei.brandkbs2.service.*; import com.zhiwei.brandkbs2.service.*;
...@@ -83,6 +86,7 @@ import org.springframework.web.client.RestTemplate; ...@@ -83,6 +86,7 @@ import org.springframework.web.client.RestTemplate;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.io.IOException; import java.io.IOException;
import java.text.MessageFormat;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
...@@ -108,6 +112,39 @@ public class MarkDataServiceImpl implements MarkDataService { ...@@ -108,6 +112,39 @@ public class MarkDataServiceImpl implements MarkDataService {
private static final String XHS_PLATFORM_ID = "6433c2251701316728003be4"; private static final String XHS_PLATFORM_ID = "6433c2251701316728003be4";
private static final String QUESTION_PROMPT = "###\n" +
"假如你是专业的问题提炼人员,你将根据用户提供的内容,来提炼问题要素和条件。根据以下规则一步步执行:\n" +
"1.提及到年、月、日、天、礼拜的定义为时间要素,未提及到默认条件算近一周,条件给到具体的起始时间和结束时间(结束时间为当前时间则不用返回)的时间戳。\n" +
"2.提及到 XX 项目 XX 品牌的定义为项目及品牌要素,条件给到具体的项目及品牌(未提及品牌默认主品牌)。\n" +
"3.提及到 XX 渠道的定义为渠道要素,条件给到该渠道名。\n" +
"4.提及到正面、中性、负面的定义为标签要素,条件给到该标签名。\n" +
"5.提及到针对 XX ,针对 XX 相关或 XX 相关的定义为搜索条件要素(必须包含 针对/相关 字样),条件给到具体值\n" +
"6.时间和项目要素为必需要素,若不满足则返回“无法回答”。\n" +
"\n" +
"参考例子:\n" +
"示例 1:\n" +
"{用户:今年 7 月腾讯项目张清相关的正面数据}\n" +
"输出:{\"时间\":{\"起始时间\":1719763200000,\"结束时间\":1722355200000},\"项目\":\"腾讯\",\"品牌\":\"主品牌\",\"标签\":\"正面\",\"搜索条件\":\"张清\"}\n" +
"示例 2:\n" +
"{用户:近一个月老乡鸡竞品1品牌新浪网渠道数据}\n" +
"输出:{\"时间\":{\"起始时间\":1719763200000},\"项目\":\"老乡鸡\",\"品牌\":\"竞品1\",\"渠道\":\"新浪网\"}\n" +
"示例 3:\n" +
"{用户:近一年数据}\n" +
"输出:无法回答\n" +
"\n" +
"要求:\n" +
"1 按照指定输出格式输出。\n" +
"2 严格按照规则进行提炼。\n" +
"###";
private static final String RESULT_PROMPT = "假如你是专业的分析报告人员,你将根据用户提供的内容,给出自己的详细分析和见解。并在每点分析后用数字表示注明1-{0}的参考文章,分析结果和参考文章之间严格用”|“作为分隔符进行分隔,并且多个参考文章之间也严格用”|“作为分隔符进行分隔,若没有对应的参考文章则无需返回,示例:分析结果。|1|2|3" +
"请分析:";
private static final String REFERENCE_QUESTION_PROMPT = "假如你是专业的问题提出人员,提出自己{0}个关于的{1}参考问题,每个问题无需给到对应的序号,问题必须包含 针对/相关 字样,每个问题之间严格用”|“作为分隔符进行分隔。" +
"请提出:";
private static final String CACHE_REFERENCE_QUESTION_PROMPT = "假如你是专业的问题提出人员,请参考给到的问题,提出自己5个类似的的参考问题,每个问题无需给到对应的序号,问题必须包含 针对/相关 字样,每个问题之间严格用”|“作为分隔符进行分隔。" +
"请提出:";
@Value("${istarshine.addIStarShineKSData.url}") @Value("${istarshine.addIStarShineKSData.url}")
private String addIStarShineKSDataUrl; private String addIStarShineKSDataUrl;
...@@ -195,6 +232,9 @@ public class MarkDataServiceImpl implements MarkDataService { ...@@ -195,6 +232,9 @@ public class MarkDataServiceImpl implements MarkDataService {
@Resource(name = "dailyReportDao") @Resource(name = "dailyReportDao")
DailyReportDao dailyReportDao; DailyReportDao dailyReportDao;
@Resource(name = "aiSearchQuestionRecordDao")
AISearchQuestionRecordDao aiSearchQuestionRecordDao;
@Resource(name = "toolsetServiceImpl") @Resource(name = "toolsetServiceImpl")
private ToolsetService toolsetService; private ToolsetService toolsetService;
...@@ -3919,6 +3959,185 @@ public class MarkDataServiceImpl implements MarkDataService { ...@@ -3919,6 +3959,185 @@ public class MarkDataServiceImpl implements MarkDataService {
return Collections.emptyList(); return Collections.emptyList();
} }
@Override
public List<String> getAIReferenceQuestion(String question, int size) {
try {
// 选用的模型名称
String modelName = AccessModel.Model.DOUBAO_PRO_32K.getModelName();
String projectName = GlobalPojo.PROJECT_MAP.get(UserThreadLocal.getProjectId()).getProjectName();
String resultContent = standardRequest(question, modelName, MessageFormat.format(REFERENCE_QUESTION_PROMPT, size, projectName));
String[] splits = resultContent.split("\\|");
return new ArrayList<>(Arrays.asList(splits)).stream().filter(StringUtils::isNoneBlank).map(String::trim).collect(Collectors.toList());
}catch (Exception e){
ExceptionCast.cast(CommonCodeEnum.FAIL, "获取ai参考提问异常-", e);
}
return null;
}
@Override
public List<String> getAIReferenceQuestionCache(boolean cache) {
String projectId = UserThreadLocal.getProjectId();
String key = RedisUtil.getAISearchQuestionCacheKey(projectId);
String resultStr;
// 返回缓存
if (cache && StringUtils.isNotEmpty(resultStr = redisUtil.get(key))) {
return JSONObject.parseArray(resultStr).toJavaList(String.class);
}
List<String> questionList = aiSearchQuestionRecordDao.findDistinctQuestion(projectId);
String projectName = GlobalPojo.PROJECT_MAP.get(projectId).getProjectName();
if (CollectionUtils.isEmpty(questionList)){
return getAIReferenceQuestionTemplate(projectName);
}
// 选用的模型名称
String modelName = AccessModel.Model.DOUBAO_PRO_32K.getModelName();
StringBuilder sb = new StringBuilder();
int count = 1;
for (String question : questionList) {
sb.append(count++).append("、").append(question).append(";");
}
String resultContent = standardRequest(sb.toString(), modelName, MessageFormat.format(CACHE_REFERENCE_QUESTION_PROMPT, projectName));
if (Objects.isNull(resultContent)){
return getAIReferenceQuestionTemplate(projectName);
}
String[] splits = resultContent.split("\\|");
List<String> result = new ArrayList<>(Arrays.asList(splits)).stream().filter(StringUtils::isNoneBlank).map(String::trim).collect(Collectors.toList());
redisUtil.setExpire(key, JSONObject.toJSONString(result));
return result;
}
private List<String> getAIReferenceQuestionTemplate(String projectName){
String question1 = MessageFormat.format("今年 7 月{0}项目{1}相关的正面数据", projectName, projectName);
String question2 = MessageFormat.format("近一个周{0}项目有发生哪些重大舆情", projectName);
String question3 = MessageFormat.format("{0}项目的竞品舆情有哪些", projectName);
String question4 = MessageFormat.format("{0}项目最近发生了哪些事件", projectName);
String question5 = MessageFormat.format("{0}项目近期负面报道有哪些,主要是那几家集中涉及哪些事件", projectName);
return Arrays.asList(question1, question2, question3, question4, question5);
}
@Override
public JSONObject getAISearchResult(String question) {
JSONObject res = new JSONObject();
try {
// 选用的模型名称
String modelName = AccessModel.Model.DOUBAO_PRO_32K.getModelName();
// 根据AI生成条件
String result = standardRequest(question, modelName, QUESTION_PROMPT);
JSONObject json = JSON.parseObject(result);
// 数据条件
List<FieldMapping> filedMapping = getFiledMapping(json, question);
List<JSONObject> list = esClientDao.findSearch(filedMapping);
if (CollectionUtils.isEmpty(list)){
return null;
}
// AI回答
StringBuilder sb = new StringBuilder();
List<BaseMap> articles = list.stream().map(Tools::getBaseFromEsMap).collect(Collectors.toList());
int count = 1;
for (BaseMap baseMap : articles) {
String text = baseMap.getContent();
sb.append(count++).append("、").append(text).append(";");
}
String sbContent = sb.toString();
result = streamStandardRequest(sbContent, modelName, MessageFormat.format(RESULT_PROMPT, list.size()) + question);
String[] splits = result.split("\\r?\\n");
List<JSONObject> answers = new ArrayList<>();
for (int i = 0; i < splits.length; i++) {
JSONObject answer = new JSONObject();
String[] sonSplit = splits[i].split("\\|");
if (0 == sonSplit.length){
continue;
}
if (i == 0){
answer.put("answer", splits[i].trim());
answers.add(answer);
continue;
}
answer.put("answer", sonSplit[0]);
List<String> sonSplitList = new ArrayList<>(Arrays.asList(sonSplit)).stream().filter(StringUtils::isNoneBlank).skip(1).collect(Collectors.toList());
answer.put("referenceArticles", sonSplitList);
answers.add(answer);
}
res.put("answers", answers);
res.put("articles", articles);
// 记录返回成功的提问
aiSearchQuestionRecordDao.insertOne(new AISearchQuestionRecord(question, UserThreadLocal.getProjectId(), System.currentTimeMillis()));
return res;
}catch (Exception e){
ExceptionCast.cast(CommonCodeEnum.FAIL, "ai搜索异常-", e);
}
return null;
}
private String streamStandardRequest(String content, String modelName, String prompt) {
AccessModel model = DoubaoAIAccountFactor.getCompanyAccount().getModelList().stream().collect(Collectors.toMap(AccessModel::getModelName, m -> m)).get(modelName);
StringBuilder result = new StringBuilder();
try {
final List<ChatMessage> streamMessages = new ArrayList<>();
final ChatMessage streamSystemMessage = ChatMessage.builder().role(ChatMessageRole.SYSTEM).content(prompt).build();
final ChatMessage streamUserMessage = ChatMessage.builder().role(ChatMessageRole.USER).content(content).build();
streamMessages.add(streamSystemMessage);
streamMessages.add(streamUserMessage);
ChatCompletionRequest streamChatCompletionRequest = ChatCompletionRequest.builder().model(model.getModelId()).messages(streamMessages).build();
DoubaoAIAccountFactor.arkService.streamChatCompletion(streamChatCompletionRequest).doOnError(Throwable::printStackTrace).blockingForEach(choice -> {
if (choice.getChoices().size() > 0) {
result.append(choice.getChoices().get(0).getMessage().getContent());
}
});
} catch (Exception e) {
log.error("standardRequest,chatCompletion:{}", JSON.toJSONString(result), e);
}
return result.toString();
}
private String standardRequest(String content, String modelName, String prompt) {
AccessModel model = DoubaoAIAccountFactor.getCompanyAccount().getModelList().stream().collect(Collectors.toMap(AccessModel::getModelName, m -> m)).get(modelName);
ChatCompletionResult chatCompletion = null;
try {
final List<ChatMessage> messages = new ArrayList<>();
final ChatMessage systemMessage = ChatMessage.builder().role(ChatMessageRole.SYSTEM).content(prompt).build();
final ChatMessage userMessage = ChatMessage.builder().role(ChatMessageRole.USER).content(content).build();
messages.add(systemMessage);
messages.add(userMessage);
ChatCompletionRequest chatCompletionRequest = ChatCompletionRequest.builder().model(model.getModelId()).messages(messages).build();
chatCompletion = DoubaoAIAccountFactor.arkService.createChatCompletion(chatCompletionRequest);
if (chatCompletion.getChoices().size() > 1) {
log.error("异常chatCompletion:{}", JSON.toJSONString(chatCompletion));
return null;
}
return String.valueOf(chatCompletion.getChoices().get(0).getMessage().getContent());
} catch (Exception e) {
log.error("standardRequest,chatCompletion:{}", JSON.toJSONString(chatCompletion), e);
}
return null;
}
/**
* 获取
* @param json
* @param content
* @return
*/
private static List<FieldMapping> getFiledMapping(JSONObject json, String content) {
List<FieldMapping> res = new ArrayList<>();
for (Map.Entry<String, Object> entry : json.entrySet()) {
if (entry.getValue() instanceof JSONObject) {
res.addAll(getFiledMapping((JSONObject) entry.getValue(), content));
} else {
// 文本限定关键字
if (entry.getKey().equals("搜索条件")) {
if (!(content.contains("针对") || content.contains("相关"))) {
continue;
}
}
FieldMapping fieldMapping = FieldMapping.createFromNameAndValue(entry.getKey(), entry.getValue(), res);
if (null != fieldMapping) {
res.add(fieldMapping);
}
}
}
return res;
}
/** /**
* 原发溯源大库es查询 * 原发溯源大库es查询
* @param dto * @param dto
......
...@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONObject; ...@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONObject;
import com.zhiwei.brandkbs2.auth.UserThreadLocal; import com.zhiwei.brandkbs2.auth.UserThreadLocal;
import com.zhiwei.brandkbs2.common.GenericAttribute; import com.zhiwei.brandkbs2.common.GenericAttribute;
import com.zhiwei.brandkbs2.common.GlobalPojo; import com.zhiwei.brandkbs2.common.GlobalPojo;
import com.zhiwei.brandkbs2.common.RedisKeyPrefix;
import com.zhiwei.brandkbs2.config.Constant; import com.zhiwei.brandkbs2.config.Constant;
import com.zhiwei.brandkbs2.dao.*; import com.zhiwei.brandkbs2.dao.*;
import com.zhiwei.brandkbs2.enmus.ReportTypeEnum; import com.zhiwei.brandkbs2.enmus.ReportTypeEnum;
...@@ -15,6 +16,7 @@ import com.zhiwei.brandkbs2.listener.ApplicationProjectListener; ...@@ -15,6 +16,7 @@ import com.zhiwei.brandkbs2.listener.ApplicationProjectListener;
import com.zhiwei.brandkbs2.model.CommonCodeEnum; import com.zhiwei.brandkbs2.model.CommonCodeEnum;
import com.zhiwei.brandkbs2.pojo.*; import com.zhiwei.brandkbs2.pojo.*;
import com.zhiwei.brandkbs2.service.*; import com.zhiwei.brandkbs2.service.*;
import com.zhiwei.brandkbs2.util.RedisUtil;
import com.zhiwei.brandkbs2.util.Tools; import com.zhiwei.brandkbs2.util.Tools;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.ListUtils; import org.apache.commons.collections4.ListUtils;
...@@ -24,8 +26,10 @@ import org.apache.logging.log4j.Logger; ...@@ -24,8 +26,10 @@ import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.springframework.data.domain.Sort;
import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query; import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
...@@ -79,6 +83,15 @@ public class TaskServiceImpl implements TaskService { ...@@ -79,6 +83,15 @@ public class TaskServiceImpl implements TaskService {
@Resource(name = "xiaohongshuRecordDao") @Resource(name = "xiaohongshuRecordDao")
private XiaohongshuRecordDao xiaohongshuRecordDao; private XiaohongshuRecordDao xiaohongshuRecordDao;
@Resource(name = "customInteractionUpdateRecordDao")
private CustomInteractionUpdateRecordDao customInteractionUpdateRecordDao;
@Resource(name = "channelRecordRefreshTaskDao")
private ChannelRecordRefreshTaskDao channelRecordRefreshTaskDao;
@Resource(name = "aiSearchQuestionRecordDao")
private AISearchQuestionRecordDao aiSearchQuestionRecordDao;
@Resource(name = "brandkbsTaskServiceImpl") @Resource(name = "brandkbsTaskServiceImpl")
BrandkbsTaskService brandkbsTaskService; BrandkbsTaskService brandkbsTaskService;
...@@ -112,6 +125,9 @@ public class TaskServiceImpl implements TaskService { ...@@ -112,6 +125,9 @@ public class TaskServiceImpl implements TaskService {
@Resource(name = "cacheServiceExecutor") @Resource(name = "cacheServiceExecutor")
ThreadPoolTaskExecutor cacheServiceExecutor; ThreadPoolTaskExecutor cacheServiceExecutor;
@Resource(name = "redisUtil")
RedisUtil redisUtil;
@Override @Override
public void messageFlowCount(int day) { public void messageFlowCount(int day) {
List<Pair<Long[], Map<ChannelIndex, ChannelIndex.Record>>> rangeTimeRecords = esClientDao.searchRecordRecentDay(day); List<Pair<Long[], Map<ChannelIndex, ChannelIndex.Record>>> rangeTimeRecords = esClientDao.searchRecordRecentDay(day);
...@@ -270,43 +286,16 @@ public class TaskServiceImpl implements TaskService { ...@@ -270,43 +286,16 @@ public class TaskServiceImpl implements TaskService {
return Pair.of(insertList, updateList); return Pair.of(insertList, updateList);
} }
@Deprecated /**
public void messageFlowCount2(int day) { * 本地测试-刷新历史渠道记录用
List<Pair<Long[], Map<ChannelIndex, ChannelIndex.Record>>> rangeTimeRecords = esClientDao.searchRecordRecentDay(day); * 不更新渠道表
int total = rangeTimeRecords.stream().mapToInt(pair -> pair.getRight().values().size()).sum(); */
log.info("渠道统计-搜索到近{}天的受影响渠道数{}条", day, total); public void messageFlowCountRefresh(long startTime, long endTime, String mgroupId, List<JSONObject> brandkbsInfo) {
// 结果合并 // 找到mgroup下符合条件的数据
List<Map<ChannelIndex, ChannelIndex.Record>> channelList = rangeTimeRecords.stream().map(Pair::getRight).collect(Collectors.toList()); List<JSONObject> list = esClientDao.searchRecordUnrelated(startTime, endTime, mgroupId, brandkbsInfo);
// 合并渠道记录 // 移除对应数据记录
Map<ChannelIndex, ChannelIndex.Record> channelIndexRecordMap = ChannelIndex.mergeRecord(channelList); Integer updateCount = channelEsDao.removeChannelRecordBatch(list);
// 获得单位时间内最小最大时间戳 log.info("刷新历史渠道记录-统计结束-受影响结果{}条,实际更新{}条", list.size(), updateCount);
Long[] timeMinMax = Tools.timeMinMax(rangeTimeRecords.stream().map(Pair::getLeft).collect(Collectors.toList()));
List<ChannelRecord> channelRecords = ChannelRecord.createChannelRecords(timeMinMax[0], timeMinMax[1], channelIndexRecordMap);
channelEsDao.upsertChannelRecord(channelRecords);
log.info("渠道统计-渠道记录-统计结束");
// List<ChannelRecord> channelRecords = rangeTimeRecords.stream().map(pair -> ChannelRecord.createChannelRecords(pair.getLeft()[0], pair.getLeft()[1], pair.getRight())).flatMap(Collection::stream).collect(Collectors.toList());
long handleSize = 0;
List<Channel> insertList = new ArrayList<>();
for (Map.Entry<ChannelIndex, ChannelIndex.Record> entry : channelIndexRecordMap.entrySet()) {
// 是否已存在
Channel channel = channelDao.queryUnique(entry.getKey());
if (null == channel) {
channel = Channel.createFromChannelIndexRecord(entry.getKey(), entry.getValue());
insertList.add(channel);
} else {
channel.setRecord(entry.getValue());
channelDao.updateOne(channel);
}
// 设置查询数值
entry.getKey().setChannelInfo(channel);
if (++handleSize % 10000 == 0) {
log.info("渠道统计-渠道总计-查询更新已完成{}/{}", handleSize, channelIndexRecordMap.size());
}
}
log.info("渠道统计-渠道总计-查询更新结束,开始批量入库");
ListUtils.partition(insertList, 1000).forEach(list -> channelDao.insertMany(list));
log.info("渠道统计-渠道总计-录入完毕,新增渠道{}条,更新渠道{}条", insertList.size(), total - insertList.size());
} }
@Override @Override
...@@ -456,12 +445,10 @@ public class TaskServiceImpl implements TaskService { ...@@ -456,12 +445,10 @@ public class TaskServiceImpl implements TaskService {
} }
@Override @Override
public void monthlyCustomXhsInteractionUpdate() { public void dailyCustomXhsInteractionUpdate() {
// 近六个月 // 近10天
long endTime = System.currentTimeMillis(); long endTime = System.currentTimeMillis();
long startTime = endTime - Constant.ONE_MONTH * 6; long startTime = endTime - Constant.ONE_DAY * 10;
// 去除近10天区间,近10天为1天/次
endTime = endTime - 10 * Constant.ONE_DAY;
List<Project> projects = GlobalPojo.PROJECT_MAP.values().stream() List<Project> projects = GlobalPojo.PROJECT_MAP.values().stream()
.filter(project -> CollectionUtils.isNotEmpty(project.getModuleShowList()) && project.getModuleShowList().contains("xiaohongshu")).collect(Collectors.toList()); .filter(project -> CollectionUtils.isNotEmpty(project.getModuleShowList()) && project.getModuleShowList().contains("xiaohongshu")).collect(Collectors.toList());
for (Project project : projects) { for (Project project : projects) {
...@@ -469,26 +456,131 @@ public class TaskServiceImpl implements TaskService { ...@@ -469,26 +456,131 @@ public class TaskServiceImpl implements TaskService {
List<Pair<String, Map<String, Object>>> res = customXhsInteractionUpdate(project.getId(), startTime, endTime, Collections.singletonList("6433c2251701316728003be4")); List<Pair<String, Map<String, Object>>> res = customXhsInteractionUpdate(project.getId(), startTime, endTime, Collections.singletonList("6433c2251701316728003be4"));
esClientDao.batchUpdate(res); esClientDao.batchUpdate(res);
}catch (Exception e){ }catch (Exception e){
log.error("项目:{},定制化舆情分析-互动量更新出错-时间范围:{}-{}", project.getProjectName(), startTime, endTime); log.error("项目:{},定制化舆情分析-互动量更新出错-时间范围:{}-{}-", project.getProjectName(), startTime, endTime, e);
} }
} }
// 每月互动量更新
monthlyCustomXhsInteractionUpdateNew(projects);
} }
@Override @Override
public void dailyCustomXhsInteractionUpdate() { public void refreshChannelRecord() {
// 近10天 ChannelRecordRefreshTask task = channelRecordRefreshTaskDao.findOne(new Query(Criteria.where("status")
.is(ChannelRecordRefreshTask.TaskStatus.NOT_START.getStatus())).with(Sort.by(Sort.Order.desc("cTime"))));
if (Objects.isNull(task)){
return;
}
updateRefreshTask(task.getId(), ChannelRecordRefreshTask.TaskStatus.UPDATING.getStatus());
try {
messageFlowCountRefresh(task.getStartTime(), task.getEndTime(), task.getMgroupId(), task.getBrandkbsInfo());
}catch (Exception e){
updateRefreshTask(task.getId(), ChannelRecordRefreshTask.TaskStatus.ERROR.getStatus());
log.error("更新渠道库记录异常-taskId:{}-", task.getId(), e);
return;
}
updateRefreshTask(task.getId(), ChannelRecordRefreshTask.TaskStatus.COMPLETED.getStatus());
// 删除缓存
task.getBrandkbsInfo().forEach(info -> {
String projectId = info.getString("projectId");
String brandId = Objects.equals(info.getString("brandId"), projectId) ? Constant.PRIMARY_CONTEND_ID : info.getString("brandId");
Set<String> keys = redisUtil.keys(RedisKeyPrefix.CHANNEL_RECORD_LIST + Tools.concat(projectId, brandId) + "*");
redisUtil.delete(keys);
});
log.info("更新渠道库记录完成-taskId:{}", task.getId());
}
@Override
public void cacheAIQuestion() {
AtomicInteger total = new AtomicInteger();
CompletableFuture.allOf(GlobalPojo.PROJECT_MAP.values().stream().map(project -> CompletableFuture.supplyAsync(() -> {
UserThreadLocal.set(new UserInfo().setProjectId(project.getId()));
markDataService.getAIReferenceQuestionCache(false);
log.info("项目:{}-{}-AI参考问题缓存完成:{}个", project.getProjectName(), project.getId(), total.incrementAndGet());
return null;
}, cacheServiceExecutor)).toArray(CompletableFuture[]::new)).join();
}
private void updateRefreshTask(String id, String status){
Update update = new Update();
update.set("status", status);
update.set("uTime", System.currentTimeMillis());
channelRecordRefreshTaskDao.updateOneByIdWithField(id, update);
}
private void monthlyCustomXhsInteractionUpdateNew(List<Project> projects) {
// 近六个月
long endTime = System.currentTimeMillis(); long endTime = System.currentTimeMillis();
long startTime = endTime - Constant.ONE_DAY * 10; long startTime = endTime - Constant.ONE_MONTH * 6;
List<Project> projects = GlobalPojo.PROJECT_MAP.values().stream() // 去除近10天区间,近10天为1天/次
.filter(project -> CollectionUtils.isNotEmpty(project.getModuleShowList()) && project.getModuleShowList().contains("xiaohongshu")).collect(Collectors.toList()); endTime = endTime - 10 * Constant.ONE_DAY;
for (Project project : projects) { for (Project project : projects) {
// 寻找项目最新的更新记录
CustomInteractionUpdateRecord record = customInteractionUpdateRecordDao.findLastRecord(project.getId());
if (Objects.nonNull(record)) {
String recordTime = Constant.DF_yyyyMM.format(record.getCTime());
String nowTime = Constant.DF_yyyyMM.format(System.currentTimeMillis());
// 本月已更新完成
if (Objects.equals(record.getStatus(), "完成") && Objects.equals(recordTime, nowTime)) {
continue;
}
// 本月未更新
if (!Objects.equals(recordTime, nowTime)) {
CustomInteractionUpdateRecord updateRecord = new CustomInteractionUpdateRecord(startTime, endTime, project.getId(), startTime, "更新中", System.currentTimeMillis(), System.currentTimeMillis());
customInteractionUpdateRecordDao.insertOne(updateRecord);
try { try {
List<Pair<String, Map<String, Object>>> res = customXhsInteractionUpdate(project.getId(), startTime, endTime, Collections.singletonList("6433c2251701316728003be4")); for (Long[] time : Tools.cutTimeRange(startTime, endTime, Constant.ONE_WEEK)) {
List<Pair<String, Map<String, Object>>> res =
customXhsInteractionUpdate(project.getId(), time[0], time[1], Collections.singletonList("6433c2251701316728003be4"));
esClientDao.batchUpdate(res); esClientDao.batchUpdate(res);
updateStatus(updateRecord.getId(), "更新中", time[1]);
}
updateStatus(updateRecord.getId(), "完成", null);
}catch (Exception e){ }catch (Exception e){
log.error("项目:{},定制化舆情分析-互动量更新出错-时间范围:{}-{}-", project.getProjectName(), startTime, endTime, e); log.error("项目:{},每月互动量更新出错-" ,project.getId(), e);
updateStatus(updateRecord.getId(), "出错", null);
}
}
// 本月更新开始但未完成/出错
if (Objects.equals(recordTime, nowTime) && (Objects.equals(record.getStatus(), "更新中") || Objects.equals(record.getStatus(), "出错"))) {
try {
for (Long[] time : Tools.cutTimeRange(record.getDataStartTime(), record.getEndTime(), Constant.ONE_WEEK)) {
List<Pair<String, Map<String, Object>>> res =
customXhsInteractionUpdate(project.getId(), time[0], time[1], Collections.singletonList("6433c2251701316728003be4"));
esClientDao.batchUpdate(res);
updateStatus(record.getId(), "更新中", time[1]);
}
updateStatus(record.getId(), "完成", null);
}catch (Exception e){
log.error("项目:{},每月互动量继续更新出错-" ,project.getId(), e);
updateStatus(record.getId(), "出错", null);
} }
} }
} else { // 初次更新
CustomInteractionUpdateRecord updateRecord = new CustomInteractionUpdateRecord(startTime, endTime, project.getId(), startTime, "更新中", System.currentTimeMillis(), System.currentTimeMillis());
customInteractionUpdateRecordDao.insertOne(updateRecord);
try {
for (Long[] time : Tools.cutTimeRange(startTime, endTime, Constant.ONE_WEEK)) {
List<Pair<String, Map<String, Object>>> res =
customXhsInteractionUpdate(project.getId(), time[0], time[1], Collections.singletonList("6433c2251701316728003be4"));
esClientDao.batchUpdate(res);
updateStatus(updateRecord.getId(), "更新中", time[1]);
}
updateStatus(updateRecord.getId(), "完成", null);
}catch (Exception e){
log.error("项目:{},每月互动量更新出错-" ,project.getId(), e);
updateStatus(updateRecord.getId(), "出错", null);
}
}
}
}
private void updateStatus(String recordId, String status, Long time){
Update update = new Update();
update.set("status", status);
if (Objects.nonNull(time)){
update.set("dataStartTime", time);
}
customInteractionUpdateRecordDao.updateOneByIdWithField(recordId, update);
} }
private List<Pair<String, Map<String, Object>>> customXhsInteractionUpdate(String projectId, Long startTime, Long endTime, List<String> platforms) throws IOException { private List<Pair<String, Map<String, Object>>> customXhsInteractionUpdate(String projectId, Long startTime, Long endTime, List<String> platforms) throws IOException {
......
...@@ -3,6 +3,7 @@ package com.zhiwei.brandkbs2.service.impl; ...@@ -3,6 +3,7 @@ package com.zhiwei.brandkbs2.service.impl;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.zhiwei.brandkbs2.auth.UserThreadLocal; import com.zhiwei.brandkbs2.auth.UserThreadLocal;
import com.zhiwei.brandkbs2.common.GlobalPojo;
import com.zhiwei.brandkbs2.config.Constant; import com.zhiwei.brandkbs2.config.Constant;
import com.zhiwei.brandkbs2.dao.ProjectDao; import com.zhiwei.brandkbs2.dao.ProjectDao;
import com.zhiwei.brandkbs2.dao.UserDao; import com.zhiwei.brandkbs2.dao.UserDao;
...@@ -13,6 +14,7 @@ import com.zhiwei.brandkbs2.enmus.response.LoginCodeEnum; ...@@ -13,6 +14,7 @@ import com.zhiwei.brandkbs2.enmus.response.LoginCodeEnum;
import com.zhiwei.brandkbs2.exception.ExceptionCast; import com.zhiwei.brandkbs2.exception.ExceptionCast;
import com.zhiwei.brandkbs2.model.CommonCodeEnum; import com.zhiwei.brandkbs2.model.CommonCodeEnum;
import com.zhiwei.brandkbs2.model.ResponseResult; import com.zhiwei.brandkbs2.model.ResponseResult;
import com.zhiwei.brandkbs2.pojo.Project;
import com.zhiwei.brandkbs2.pojo.User; import com.zhiwei.brandkbs2.pojo.User;
import com.zhiwei.brandkbs2.pojo.UserInfo; import com.zhiwei.brandkbs2.pojo.UserInfo;
import com.zhiwei.brandkbs2.pojo.UserRole; import com.zhiwei.brandkbs2.pojo.UserRole;
...@@ -418,6 +420,154 @@ public class UserServiceImpl implements UserService { ...@@ -418,6 +420,154 @@ public class UserServiceImpl implements UserService {
} }
@Override @Override
public PageVO<JSONObject> findAllUserList(String keyword, int page, int pageSize) {
Query query = new Query(Criteria.where("superAdmin").is(false));
userDao.addKeywordFuzz(query, keyword, "nickname");
userDao.addSort(query, "{\"cTime\":\"descend\"}");
// roles总量
List<User> list = userDao.findList(new Query(Criteria.where("superAdmin").is(false)));
long count = list.stream().map(User::getRoles).filter(Objects::nonNull).mapToLong(Collection::size).sum();
List<User> userList = userDao.findList(query);
Map<User, List<UserRole>> userRolesMap = userList.stream().collect(Collectors.toMap(o -> o, User::getRoles));
List<JSONObject> resList = new ArrayList<>();
for (Map.Entry<User, List<UserRole>> entry : userRolesMap.entrySet()) {
User user = entry.getKey();
List<UserRole> roles = entry.getValue();
for (UserRole role : roles) {
String projectId = role.getProjectId();
// 已终止/已删除的项目不返回
Project project = GlobalPojo.PROJECT_MAP.get(projectId);
project = Objects.isNull(project) ? projectDao.findOneById(projectId) : project;
if (Objects.isNull(project) || !project.isStart()){
continue;
}
JSONObject jsonObject = new JSONObject();
jsonObject.put("projectId", projectId);
jsonObject.put("projectName", project.getProjectName());
jsonObject.put("id", user.getId());
jsonObject.put("nickname", user.getNickname());
jsonObject.put("username", user.getUsername());
jsonObject.put("cTime", user.getCTime());
jsonObject.put("phoneNumber", user.getPhoneNumber());
jsonObject.put("submitter", user.getSubmitter());
jsonObject.put("roleId", role.getRoleId());
jsonObject.put("expiredTime", role.getExpiredTime());
jsonObject.put("exportAmount", role.getExportAmount());
jsonObject.put("key", role.getKey());
resList.add(jsonObject);
}
}
return PageVO.createPageVo(count, page, pageSize, resList.stream().skip((long) (page - 1) * pageSize).limit(pageSize).collect(Collectors.toList()));
}
@Override
public ResponseResult updateOneUserRoles(JSONObject json) {
String id = json.getString("id");
String nickname = json.getString("nickname");
int roleId = json.getIntValue("roleId");
long expiredTime = json.getLongValue("expiredTime");
int exportAmount = json.getIntValue("exportAmount");
String projectId = json.getString("projectId");
String key = json.getString("key");
User user = userDao.findOneById(id);
List<UserRole> roles = user.getRoles();
if (CollectionUtils.isNotEmpty(roles)){
List<String> existProject = roles.stream().filter(r -> !Objects.equals(r.getKey(), key)).map(UserRole::getProjectId).collect(Collectors.toList());
if (existProject.contains(projectId)){
return ResponseResult.failure("用户已拥有此项目的权限");
}
}
user.getRoles().stream().filter(userRoles -> userRoles.getKey().equals(key)).findAny().ifPresent(userRole -> {
// 更新原userRole
userRole.setRoleId(roleId);
userRole.setProjectId(projectId);
userRole.setKey(Tools.concat(projectId, roleId));
// 权限用户以上则清空过期时间
if (roleId < RoleEnum.CUSTOMER.getState()) {
userRole.setExpiredTime(null);
} else {
OptionalLong.of(expiredTime).ifPresent(userRole::setExpiredTime);
}
OptionalInt.of(exportAmount).ifPresent(userRole::setExportAmount);
userDao.updateOneByIdWithField(id, new Update().set("roles", roles));
});
Optional.of(nickname).ifPresent(nickName -> userDao.updateOneByIdWithField(id, new Update().set("nickname", nickName)));
return ResponseResult.success();
}
@Override
public ResponseResult getUserByNickNameOrPhone(String nickName, String phoneNumber) {
Query query = new Query();
if (Objects.isNull(nickName) && Objects.isNull(phoneNumber)){
return ResponseResult.failure("用户名和手机号为空");
}
if (Objects.nonNull(nickName)){
query.addCriteria(Criteria.where("nickname").is(nickName));
}
if (Objects.nonNull(phoneNumber)){
query.addCriteria(Criteria.where("phoneNumber").is(Long.parseLong(phoneNumber)));
}
query.addCriteria(Criteria.where("superAdmin").is(false));
User user = userDao.findOne(query);
if (Objects.isNull(user)){
return ResponseResult.failure("不存在此项目用户");
}
JSONObject jsonObject = new JSONObject();
jsonObject.put("id", user.getId());
jsonObject.put("nickname", user.getNickname());
jsonObject.put("phoneNumber", user.getPhoneNumber());
List<JSONObject> roles = user.getRoles().stream().map(role -> {
JSONObject json = new JSONObject();
String projectId = role.getProjectId();
// 已终止/已删除的项目不返回
Project project = GlobalPojo.PROJECT_MAP.get(projectId);
project = Objects.isNull(project) ? projectDao.findOneById(projectId) : project;
if (Objects.isNull(project) || !project.isStart()){
return null;
}
json.put("projectName", project.getProjectName());
json.put("key", role.getKey());
json.put("projectId", projectId);
json.put("roleId", role.getRoleId());
json.put("exportAmount", role.getExportAmount());
json.put("expiredTime", role.getExpiredTime());
return json;
}).filter(Objects::nonNull).collect(Collectors.toList());
jsonObject.put("roles", roles);
return ResponseResult.success(jsonObject);
}
@Override
public ResponseResult updateBatchUserRoles(JSONObject json) {
String id = json.getString("id");
User user = userDao.findOneById(id);
List<UserRole> newRoles = json.getJSONArray("roles").toJavaList(UserRole.class);
List<String> keys = newRoles.stream().map(UserRole::getKey).collect(Collectors.toList());
// 无需被修改的权限列表
List<UserRole> oldRoles = user.getRoles().stream().filter(role -> !keys.contains(role.getKey())).collect(Collectors.toList());
for (UserRole role : newRoles) {
if (Objects.isNull(role.getExportAmount())) {
role.setExportAmount(1000);
}
role.setKey(Tools.concat(role.getProjectId(), role.getRoleId()));
}
oldRoles.addAll(newRoles);
userDao.updateOneByIdWithField(id, new Update().set("roles", oldRoles));
return ResponseResult.success();
}
@Override
public ResponseResult deleteUserRole(String id, String key) {
User user = userDao.findOneById(id);
List<UserRole> roles = user.getRoles().stream().filter(userRole -> !userRole.getKey().equals(key)).collect(Collectors.toList());
Update update = new Update();
update.set("roles", roles);
update.set("submitter", UserThreadLocal.getNickname());
userDao.updateOneByIdWithField(id, update);
return ResponseResult.success();
}
@Override
public Map<String, Object> getLoginInfo() { public Map<String, Object> getLoginInfo() {
// String userId = UserThreadLocal.getUserId(); // String userId = UserThreadLocal.getUserId();
// String projectId = UserThreadLocal.getProjectId(); // String projectId = UserThreadLocal.getProjectId();
......
...@@ -47,6 +47,7 @@ public class ControlCenter { ...@@ -47,6 +47,7 @@ public class ControlCenter {
// taskService.customEventCache(); // taskService.customEventCache();
taskService.eventAggTitleCache(); taskService.eventAggTitleCache();
taskService.yuqingAnalyzeHighWordCache(); taskService.yuqingAnalyzeHighWordCache();
taskService.cacheAIQuestion();
} catch (Exception e) { } catch (Exception e) {
log.error("定时按天缓存数据-出错", e); log.error("定时按天缓存数据-出错", e);
} finally { } finally {
...@@ -120,28 +121,28 @@ public class ControlCenter { ...@@ -120,28 +121,28 @@ public class ControlCenter {
} }
@Async("scheduledExecutor") @Async("scheduledExecutor")
@Scheduled(cron = "0 30 3 1 * ?") @Scheduled(cron = "0 0 5 * * ?")
public void monthlyCustomXhsInteractionUpdate() { public void dailyCustomXhsInteractionUpdate() {
log.info("每互动量更新-启动"); log.info("每互动量更新-启动");
try { try {
taskService.monthlyCustomXhsInteractionUpdate(); taskService.dailyCustomXhsInteractionUpdate();
} catch (Exception e) { } catch (Exception e) {
log.error("每互动量更新-出错", e); log.error("每互动量更新-出错", e);
} finally { } finally {
log.info("每互动量更新-结束"); log.info("每互动量更新-结束");
} }
} }
@Async("scheduledExecutor") @Async("scheduledExecutor")
@Scheduled(cron = "0 0 5 * * ?") @Scheduled(cron = "0 0/10 * * * ?")
public void dailyCustomXhsInteractionUpdate() { public void refreshChannelRecord() {
log.info("每日互动量更新-启动"); log.info("每十分钟拉取并进行渠道库更新任务-启动");
try { try {
taskService.dailyCustomXhsInteractionUpdate(); taskService.refreshChannelRecord();
} catch (Exception e) { } catch (Exception e) {
log.error("每日互动量更新-出错", e); log.error("每十分钟拉取并进行渠道库更新任务-出错", e);
} finally { } finally {
log.info("每日互动量更新-结束"); log.info("每十分钟拉取并进行渠道库更新任务-结束");
} }
} }
} }
...@@ -6,6 +6,7 @@ import org.springframework.data.redis.core.HashOperations; ...@@ -6,6 +6,7 @@ import org.springframework.data.redis.core.HashOperations;
import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.Set; import java.util.Set;
...@@ -129,6 +130,10 @@ public class RedisUtil { ...@@ -129,6 +130,10 @@ public class RedisUtil {
return RedisKeyPrefix.SEARCH_KEYWORD + Tools.concat(projectId, userId, searchType); return RedisKeyPrefix.SEARCH_KEYWORD + Tools.concat(projectId, userId, searchType);
} }
public static String getAISearchQuestionCacheKey(String projectId){
return RedisKeyPrefix.AI_SEARCH_QUESTION + projectId;
}
public void setExpire(String key, String value, long timeout, TimeUnit unit) { public void setExpire(String key, String value, long timeout, TimeUnit unit) {
stringRedisTemplate.opsForValue().set(key, value, timeout, unit); stringRedisTemplate.opsForValue().set(key, value, timeout, unit);
} }
...@@ -186,4 +191,8 @@ public class RedisUtil { ...@@ -186,4 +191,8 @@ public class RedisUtil {
public void remove(String key) { public void remove(String key) {
setExpire(key, "-1", 1, TimeUnit.SECONDS); setExpire(key, "-1", 1, TimeUnit.SECONDS);
} }
public void delete(Collection<String> key) {
stringRedisTemplate.delete(key);
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment