Commit 8d5c8711 by shenjunjie

调整esClient为线上

parent 8d01723a
...@@ -155,8 +155,8 @@ public class AppChannelController extends BaseController { ...@@ -155,8 +155,8 @@ public class AppChannelController extends BaseController {
@ApiOperation("渠道-文章列表") @ApiOperation("渠道-文章列表")
@ApiImplicitParams({ @ApiImplicitParams({
@ApiImplicitParam(name = "startTime", value = "开始时间", defaultValue = "0", paramType = "query", dataType = "long"), @ApiImplicitParam(name = "startTime", value = "开始时间", required = true, paramType = "query", dataType = "long"),
@ApiImplicitParam(name = "endTime", value = "结束时间", defaultValue = "0", paramType = "query", dataType = "long"), @ApiImplicitParam(name = "endTime", value = "结束时间", required = true, paramType = "query", dataType = "long"),
@ApiImplicitParam(name = "page", value = "页码", defaultValue = "1", paramType = "query", dataType = "int"), @ApiImplicitParam(name = "page", value = "页码", defaultValue = "1", paramType = "query", dataType = "int"),
@ApiImplicitParam(name = "pageSize", value = "页码大小", defaultValue = "10", paramType = "query", dataType = "int"), @ApiImplicitParam(name = "pageSize", value = "页码大小", defaultValue = "10", paramType = "query", dataType = "int"),
@ApiImplicitParam(name = "channelId", value = "渠道ID", required = true, paramType = "query", dataType = "int"), @ApiImplicitParam(name = "channelId", value = "渠道ID", required = true, paramType = "query", dataType = "int"),
......
...@@ -36,7 +36,7 @@ import java.util.stream.Collectors; ...@@ -36,7 +36,7 @@ import java.util.stream.Collectors;
public class ChannelEsDao extends EsClientDao { public class ChannelEsDao extends EsClientDao {
private static final Logger log = LogManager.getLogger(ChannelEsDao.class); private static final Logger log = LogManager.getLogger(ChannelEsDao.class);
@Resource(name = "localEsClient") @Resource(name = "esClient")
RestHighLevelClient channelEsClient; RestHighLevelClient channelEsClient;
@Resource(name = "commonServiceImpl") @Resource(name = "commonServiceImpl")
......
...@@ -30,7 +30,6 @@ import com.zhiwei.brandkbs2.service.CommonService; ...@@ -30,7 +30,6 @@ import com.zhiwei.brandkbs2.service.CommonService;
import com.zhiwei.brandkbs2.service.ProjectService; import com.zhiwei.brandkbs2.service.ProjectService;
import com.zhiwei.brandkbs2.util.MongoUtil; import com.zhiwei.brandkbs2.util.MongoUtil;
import com.zhiwei.brandkbs2.util.Tools; import com.zhiwei.brandkbs2.util.Tools;
import io.netty.util.concurrent.CompleteFuture;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
...@@ -48,6 +47,7 @@ import org.elasticsearch.search.aggregations.metrics.Sum; ...@@ -48,6 +47,7 @@ import org.elasticsearch.search.aggregations.metrics.Sum;
import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query; import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update; import org.springframework.data.mongodb.core.query.Update;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.annotation.Resource; import javax.annotation.Resource;
...@@ -98,6 +98,9 @@ public class ChannelServiceImpl implements ChannelService { ...@@ -98,6 +98,9 @@ public class ChannelServiceImpl implements ChannelService {
@Resource(name = "mongoUtil") @Resource(name = "mongoUtil")
MongoUtil mongoUtil; MongoUtil mongoUtil;
@Resource(name = "esSearchExecutor")
ThreadPoolTaskExecutor esSearchExecutor;
@Override @Override
public PageVO<JSONObject> findChannelList(int page, int size, String linkedGroupId, String emotion, String platform, Boolean show, String keyword, String sorter) { public PageVO<JSONObject> findChannelList(int page, int size, String linkedGroupId, String emotion, String platform, Boolean show, String keyword, String sorter) {
Query query = channelListQuery(linkedGroupId, show, emotion, platform, keyword, sorter); Query query = channelListQuery(linkedGroupId, show, emotion, platform, keyword, sorter);
...@@ -564,13 +567,13 @@ public class ChannelServiceImpl implements ChannelService { ...@@ -564,13 +567,13 @@ public class ChannelServiceImpl implements ChannelService {
// 每天返回前10条 // 每天返回前10条
List<CompletableFuture<JSONObject>> futureList = list.stream().limit(10).map(article -> CompletableFuture.supplyAsync(() -> { List<CompletableFuture<JSONObject>> futureList = list.stream().limit(10).map(article -> CompletableFuture.supplyAsync(() -> {
JSONObject json = new JSONObject(); JSONObject json = new JSONObject();
json.put("emotion", article.getEmotion()); json.put("emotion", EmotionEnum.state2Name(article.getEmotion()));
json.put("time", article.getTime()); json.put("time", article.getTime());
String[] titleAndUrl = getTitleAndUrlById(article.getId()); String[] titleUrl = getTitleAndUrlById(article.getId());
json.put("title", titleAndUrl[0]); json.put("title", titleUrl[0]);
json.put("url", titleAndUrl[1]); json.put("url", titleUrl[1]);
return json; return json;
})).collect(Collectors.toList()); }, esSearchExecutor)).collect(Collectors.toList());
CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join(); CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join();
dayResult.put("articles", futureList.stream().map(CompletableFuture::join).collect(Collectors.toList())); dayResult.put("articles", futureList.stream().map(CompletableFuture::join).collect(Collectors.toList()));
dayResult.put("articleCount", list.size()); dayResult.put("articleCount", list.size());
...@@ -583,13 +586,13 @@ public class ChannelServiceImpl implements ChannelService { ...@@ -583,13 +586,13 @@ public class ChannelServiceImpl implements ChannelService {
@Override @Override
public List<ExportAppChannelArticleDTO> downloadArticlesByTime(Long startTime, Long endTime, String channelId, String contendId) { public List<ExportAppChannelArticleDTO> downloadArticlesByTime(Long startTime, Long endTime, String channelId, String contendId) {
List<ExportAppChannelArticleDTO> res = new ArrayList<>();
List<CompleteFuture<ExportAppChannelArticleDTO>> futureList = new ArrayList<>();
Map<String, List<ChannelIndex.Article>> sourceContendMap = getSourceContendMap(channelId, Collections.singleton(contendId), startTime, endTime); Map<String, List<ChannelIndex.Article>> sourceContendMap = getSourceContendMap(channelId, Collections.singleton(contendId), startTime, endTime);
for (ChannelIndex.Article article : sourceContendMap.get(contendId)) { List<CompletableFuture<ExportAppChannelArticleDTO>> futureList = sourceContendMap.get(contendId).stream().map(article -> CompletableFuture.supplyAsync(() -> {
// res.add(ExportAppChannelArticleDTO.createFromArticle(article)); String[] titleUrl = getTitleAndUrlById(article.getId());
} return ExportAppChannelArticleDTO.createFromArticle(article, titleUrl[0], titleUrl[1]);
return res; }, esSearchExecutor)).collect(Collectors.toList());
CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join();
return futureList.stream().map(CompletableFuture::join).collect(Collectors.toList());
} }
private Map<String, List<ChannelIndex.Article>> getSourceContendMap(String channelId, Set<String> contendIds, String platform, String keyword, Long startTime, private Map<String, List<ChannelIndex.Article>> getSourceContendMap(String channelId, Set<String> contendIds, String platform, String keyword, Long startTime,
...@@ -709,7 +712,7 @@ public class ChannelServiceImpl implements ChannelService { ...@@ -709,7 +712,7 @@ public class ChannelServiceImpl implements ChannelService {
// return v; // return v;
// }); // });
contendMap.putIfAbsent(channelRecord.getContendId(), new ArrayList<>()); contendMap.putIfAbsent(channelRecord.getContendId(), new ArrayList<>());
contendMap.get(channelRecord.getChannelId()).addAll(channelRecord.getRecord().getArticles()); contendMap.get(channelRecord.getContendId()).addAll(channelRecord.getRecord().getArticles());
} }
} }
// contendMap.forEach((k, v) -> v.sort(Comparator.comparingLong(ChannelIndex.Article::getTime))); // contendMap.forEach((k, v) -> v.sort(Comparator.comparingLong(ChannelIndex.Article::getTime)));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment