Commit 5c6c6d0f by shenjunjie

天级定时任务es同步添加重试

parent a320e10f
...@@ -18,6 +18,7 @@ import org.apache.commons.collections4.ListUtils; ...@@ -18,6 +18,7 @@ import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchStatusException;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
...@@ -125,12 +126,19 @@ public class TaskServiceImpl implements TaskService { ...@@ -125,12 +126,19 @@ public class TaskServiceImpl implements TaskService {
List<ChannelRecord> channelRecords = ChannelRecord.createChannelRecords(timeMinMax[0], timeMinMax[1], channelIndexRecordMap); List<ChannelRecord> channelRecords = ChannelRecord.createChannelRecords(timeMinMax[0], timeMinMax[1], channelIndexRecordMap);
channelEsDao.upsertChannelRecord(channelRecords); channelEsDao.upsertChannelRecord(channelRecords);
// 同步ES-channelCopy,区分insertList和updateList // 同步ES-channelCopy,区分insertList和updateList
for (int i = 1; i <= 3; i++) {
try {
ListUtils.partition(insertList, 1000).forEach(list -> { ListUtils.partition(insertList, 1000).forEach(list -> {
channelEsDao.batchInsert(list.stream().map(Channel::createChannelCopyMap).collect(Collectors.toList())); channelEsDao.batchInsert(list.stream().map(Channel::createChannelCopyMap).collect(Collectors.toList()));
}); });
ListUtils.partition(updateList, 1000).forEach(list -> { ListUtils.partition(updateList, 1000).forEach(list -> {
channelEsDao.batchInsert(list.stream().map(Channel::createChannelCopyMap).collect(Collectors.toList())); channelEsDao.batchInsert(list.stream().map(Channel::createChannelCopyMap).collect(Collectors.toList()));
}); });
break;
} catch (ElasticsearchStatusException e) {
log.info("渠道统计-es入库解析异常,正在重试第{}次", i);
}
}
log.info("渠道统计-渠道记录-统计结束"); log.info("渠道统计-渠道记录-统计结束");
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment