Java实现百万级数据从Excel导入到数据库
在一个后台管理功能中,Excel导入数据是不可缺少,但是当处理大数据量的Excel文件导入时候就可能会带来一些列的问题,本文我们会分析问题产生的原因。并提供解决方案以及完整的实现代码。
🎉欢迎来系统设计专栏:Java实现百万级数据从Excel导入到数据库
🎬作者简介:大家好,我是小徐🥇
☁️博客首页:CSDN主页小徐的博客
🌄每日一句:好学而不勤非真好学者📜 欢迎大家关注! ❤️
前言
在一个后台管理功能中,Exce导入数据是不可缺少的,但是当处理大数据量的Excel文件导入时候就可能带来一些列的问题,本文我们会分析问题产生的原因。并提供解决方案以及完整的实现代码。
一、Excel导入可能产生的问题
1、内存溢出问题
百万级数据量,一次性都读取到内存中,肯定是不现实的,那么好的办法就是基于流式读取的方式进行分批处理。
在技术选型上,我们选择使用EasyExcel,他特别针对大数据量和复杂Excel文件的处理进行了优化。在解析Excel时EasyExcel不会将Excel一次性全部加载到内存中,而是从磁盘上一行行读取数据,逐个解析。
2、性能问题
百万级数据的处理,如果用单线程的话肯定是很慢的,想要提升性能,那么就需要使用多线程。
多线程的使用上涉及到两个场景,一个是用多线程进行文件的读取,另一个是用多线程实现数据的插入。这里就涉及到一个生产者-消费者的模式了,多个线程读取,然后多个线程插入这样可以最大限度的提升整体的性能。
而数据的插入,我们除了借助多线程之外,还可以同时使用数据库的批量插入的功能,这样就能更加的提升插入速度。
3、错误处理
在文件的读取和数据库写入过程中,会需要解决各种各样的问题,比如数据格式错误、数据不致、有重复数据等。
所以我们需要分两步来,第一步就是先进行数据的检查,在开始插入之间就把数据的格式等问题提前检查好,然后在插入过程中,对异常进行处理。
处理方式有很多种,可以进行事务回滚、可以进行日志记录。这个根据实际情况,一般来说不建议做回滚,直接做自动重试,重试几次之后还是不行的话,再记录日志然后后续在重新插入即可。
并目在这个过程中,需要考虑一下数据重复的问题,需要在excel中某几个字段设置成数据库唯一性约束,然后在遇到数据冲突的时候,进行处理,处理方式可以是覆盖、跳过以及报错这个根据实际业务情况来,一般来说跳过+打印日志是相对合理的。
二、解决方案
针对以上的问题,整体方案总结如下:
借助EasyExcel来实现Excel的读取,因为他并不会一次性把整个Excel都加载到内存中,而是逐行读取的。为了提升并发性能,我们再进一步将百万级数据分散到不同的sheet中,然后借助线程池,多线程同时读取不同的sheet,在读取过程中,借助EasyExcel的ReadListener做数据处理。
在处理过程中,我们并不会每一条数据都操作数据库,这样对数据库来说压力太大了,我们会设定一个批次,比如1000条,我们会把从Excel中读取到的数据暂存在内存中,这里可以使用List实现,当读取了1000条之后,就执行一次数据的批量插入,批量插入可以借助mybatis就能简单的实现了。
而这个过程中,还需要考虑一些并发的问题,所以我们在处理过程中会使用线程安全的队列来保存暂存在内存中的数据,如ConcurrentLinkedQueue。
经过验证,如此实现之后,读取一个100万数据的Excel并插入数据,耗时在100秒左右,不
超过2分钟。
三、具体实现
1、技术选项
Springboot+EasyExcel
为了提升并发处理的能力,我们把百万级数据放到同一个excel的不同的sheet中,然后通过使用EasvExcel并发的读取这些sheet。
EasyExcel提供了AnalysisEventListener接口,允许在读取每一批数据后进行自定义处理。我们可以基于他的这个功能来实现文件的分批读取。
2、增加依赖
选择最新版本
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>easyexcel</artifactId>
</dependency>
3、实现导入工具类
import com.alibaba.excel.EasyExcel;
import com.alibaba.excel.converters.longconverter.LongStringConverter;
import com.ytx.dependency.common.utils.DateUtils;
import com.ytx.dependency.mybatis.service.BaseService;
import jakarta.servlet.http.HttpServletResponse;
import org.apache.commons.lang3.StringUtils;
import org.apache.poi.ss.formula.functions.T;
import org.springframework.beans.BeanUtils;
import org.springframework.web.multipart.MultipartFile;
import java.io.IOException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @Author: xiaoxu
* @Date 2024/1/20 20:31
* @Description: Excel 工具类
*/
public class ExcelUtils {
/**
* EasyExcel解析Excel
* @param file 文件
* @param head 表单
* @param baseService 导出数据的service
* @throws IOException
* @throws InterruptedException
*/
public static void read(MultipartFile file, Class head, BaseService baseService) throws IOException, InterruptedException {
// Excel sheet的数量(需要解析到实际的,测试代码默认写死)
int numberSheet = 20;
// 为了提升并发处理的能力,如果数据量比较大,可以将数据放到同一个excel的不同的sheet中
// 采用多线程,每一个sheet用一个线程处理
// 创建一个固定大小的线程池,大小与Sheet数量相同
ExecutorService executorService = Executors.newFixedThreadPool(numberSheet);
for (int i = 0; i < numberSheet; i++) {
int finalSheet = i;
// 向线程池提交一个任务
executorService.submit(() -> {
try {
//使用EasyExcel读取指定的sheet
EasyExcel.read(file.getInputStream(), head, new ExcelDataListener<>(baseService))
.sheet(finalSheet)// 指定sheet号
.doRead(); // 开始读取操作
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}
// 启动线程池的关闭实例
executorService.shutdown();
executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
}
}
这段代码通过创建一个固定大小的线程池来并发读取一个包含多个sheets的Excel文件。每个sheet的读取作为一个单独的任务提交给线程池。
我们在代码中用了一个ExcelDataListener,这个类是AnalysisEventListener的一个实现类。当EasyExcel读取每一行数据时,它会自动调用我们传入的这个ReadListener实例的invoke方法。在这个方法中,我们就可以定义如何处理这些数据。
ExcelDataListener还包含doAfterAlIAnalysed方法,这个方法在所有数据都读取完毕后被调用。
4、实现ExcelDataListener
import com.alibaba.excel.context.AnalysisContext;
import com.alibaba.excel.event.AnalysisEventListener;
import com.ytx.dependency.common.utils.ConvertUtils;
import com.ytx.dependency.common.utils.JsonUtils;
import com.ytx.dependency.mybatis.service.BaseService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
/**
* @Author: xiaoxu
* @Date: 2024/1/21 0:12
* @Description: Excel模板读取
*/
public class ExcelDataListener<E, T> extends AnalysisEventListener<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(ExcelDataListener.class);
/**
* 每隔2000条存储数据库,然后清理list,方便内存回收
*/
private static final int BATCH_COUNT = 2000;
private final List<E> list = new ArrayList<>();
/**
* 通过构造器注入Service
*/
private final BaseService<E> baseService;
/**
* 构造方法
*
* @param baseService Service对象
*/
public ExcelDataListener(BaseService<E> baseService) {
this.baseService = baseService;
}
/**
* 每条数据解析完,都会调用此方法
*/
@Override
public void invoke(T data, AnalysisContext context) {
LOGGER.info("解析到一条数据:{}", JsonUtils.toJsonString(data));
E entity = ConvertUtils.sourceToTarget(data, baseService.currentModelClass());
list.add(entity);
// 达到BATCH_COUNT了,需要去存储一次数据库,防止数据几万条数据在内存,容易OOM
if (list.size() >= BATCH_COUNT) {
saveData();
// 存储完成清理 list
list.clear();
}
}
/**
* 所有数据解析完成了 都会来调用
*/
@Override
public void doAfterAllAnalysed(AnalysisContext context) {
// 这里也要保存数据,确保最后遗留的数据也存储到数据库
saveData();
LOGGER.info("所有数据解析完成!");
}
/**
* 加上存储数据库
*/
private void saveData() {
LOGGER.info("{}条数据,开始存储数据库!", list.size());
baseService.insertBatch(list);
LOGGER.info("存储数据库成功!");
}
}
通过自定义这个ExcelDataListener,我们就可以在读取Excel文件的过程中处理数据.
每读取到一条数据之后会把他们放入一个List,当List中积累21000条之后,进行一次数据库的批量插入.
5、实现BaseServcie
我们在代码中通过构造器注入BaseService,这个service是我们封装的基础服务接口,所有的业务Service都需集成。批量插入使用的是mybatisPlus的批量插入。
代码实现如下:
import com.baomidou.mybatisplus.core.conditions.Wrapper;
import java.io.Serializable;
import java.util.Collection;
/**
* 基础服务接口,所有Service接口都要继承
*/
public interface BaseService<T> {
Class<T> currentModelClass();
/**
* <p>
* 插入一条记录(选择字段,策略插入)
* </p>
*
* @param entity 实体对象
*/
boolean insert(T entity);
/**
* <p>
* 插入(批量),该方法不支持 Oracle、SQL Server
* </p>
*
* @param entityList 实体对象集合
*/
boolean insertBatch(Collection<T> entityList);
/**
* <p>
* 插入(批量),该方法不支持 Oracle、SQL Server
* </p>
*
* @param entityList 实体对象集合
* @param batchSize 插入批次数量
*/
boolean insertBatch(Collection<T> entityList, int batchSize);
/**
* <p>
* 根据 ID 选择修改
* </p>
*
* @param entity 实体对象
*/
boolean updateById(T entity);
/**
* <p>
* 根据 whereEntity 条件,更新记录
* </p>
*
* @param entity 实体对象
* @param updateWrapper 实体对象封装操作类 {@link com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper}
*/
boolean update(T entity, Wrapper<T> updateWrapper);
/**
* <p>
* 根据ID 批量更新
* </p>
*
* @param entityList 实体对象集合
*/
boolean updateBatchById(Collection<T> entityList);
/**
* <p>
* 根据ID 批量更新
* </p>
*
* @param entityList 实体对象集合
* @param batchSize 更新批次数量
*/
boolean updateBatchById(Collection<T> entityList, int batchSize);
/**
* <p>
* 根据 ID 查询
* </p>
*
* @param id 主键ID
*/
T selectById(Serializable id);
/**
* <p>
* 根据 ID 删除
* </p>
*
* @param id 主键ID
*/
boolean deleteById(Serializable id);
/**
* <p>
* 删除(根据ID 批量删除)
* </p>
*
* @param idList 主键ID列表
*/
boolean deleteBatchIds(Collection<? extends Serializable> idList);
}
import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.core.enums.SqlMethod;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.core.metadata.OrderItem;
import com.baomidou.mybatisplus.core.toolkit.Constants;
import com.baomidou.mybatisplus.core.toolkit.ReflectionKit;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.toolkit.SqlHelper;
import com.ytx.dependency.common.constant.CommonConstant;
import com.ytx.dependency.common.exception.ServerException;
import com.ytx.dependency.common.page.PageData;
import com.ytx.dependency.common.utils.ConvertUtils;
import com.ytx.dependency.mybatis.service.BaseService;;
import org.apache.ibatis.binding.MapperMethod;
import org.apache.ibatis.logging.Log;
import org.apache.ibatis.logging.LogFactory;
import org.apache.ibatis.session.SqlSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
/**
* 基础服务类,所有Service都要继承
*/
public abstract class BaseServiceImpl<M extends BaseMapper<T>, T> implements BaseService<T> {
@Autowired
protected M baseDao;
protected Log log = LogFactory.getLog(getClass());
protected int INTERNAL_SERVER_ERROR = 500;
/**
* 获取分页对象
*
* @param params 分页查询参数
* @param defaultOrderField 默认排序字段
* @param isAsc 排序方式
*/
protected IPage<T> getPage(Map<String, Object> params, String defaultOrderField, boolean isAsc) {
//分页参数
long curPage = 1;
long limit = 10;
if (params.get(CommonConstant.PAGE) != null) {
curPage = Long.parseLong(params.get(CommonConstant.PAGE).toString());
}
if (params.get(CommonConstant.LIMIT) != null) {
limit = Long.parseLong(params.get(CommonConstant.LIMIT).toString());
}
//分页对象
Page<T> page = new Page<>(curPage, limit);
//分页参数
params.put(CommonConstant.PAGE, page);
//排序字段
String orderField = (String) params.get(CommonConstant.ORDER_FIELD);
String order = (String) params.get(CommonConstant.ORDER);
//前端字段排序
if (StringUtils.isNotBlank(orderField) && StringUtils.isNotBlank(order)) {
if (CommonConstant.ASC.equalsIgnoreCase(order)) {
return page.addOrder(OrderItem.asc(orderField));
} else {
return page.addOrder(OrderItem.desc(orderField));
}
}
//没有排序字段,则不排序
if (StringUtils.isBlank(defaultOrderField)) {
return page;
}
//默认排序
if (isAsc) {
page.addOrder(OrderItem.asc(defaultOrderField));
} else {
page.addOrder(OrderItem.desc(defaultOrderField));
}
return page;
}
/**
* dto转换成map
* @param dto
* @return
*/
protected Map<String, Object> dtoToMap(Object dto) {
if (dto == null) {
return new HashMap<>();
}
Map<String, Object> map = new HashMap<>();
Class<?> clazz = dto.getClass();
// 遍历所有字段,包括父类中的字段
while (clazz != null) {
for (Field field : clazz.getDeclaredFields()) {
// 设置访问权限
field.setAccessible(true);
Object value = null;
try {
// 获取字段值
value = field.get(dto);
}catch (Exception e){
e.printStackTrace();
throw new ServerException(INTERNAL_SERVER_ERROR);
}
// 将字段名和字段值放入map
map.put(field.getName(), value);
}
clazz = clazz.getSuperclass();
}
return map;
}
protected <T> PageData<T> getPageData(List<?> list, long total, Class<T> target) {
List<T> targetList = ConvertUtils.sourceToTarget(list, target);
return new PageData<>(targetList, total);
}
protected <T> PageData<T> getPageData(IPage page, Class<T> target) {
return getPageData(page.getRecords(), page.getTotal(), target);
}
protected void paramsToLike(Map<String, Object> params, String... likes) {
for (String like : likes) {
String val = (String) params.get(like);
if (StringUtils.isNotBlank(val)) {
params.put(like, "%" + val + "%");
} else {
params.put(like, null);
}
}
}
/**
* <p>
* 判断数据库操作是否成功
* </p>
* <p>
* 注意!! 该方法为 Integer 判断,不可传入 int 基本类型
* </p>
*
* @param result 数据库操作返回影响条数
* @return boolean
*/
protected static boolean retBool(Integer result) {
return SqlHelper.retBool(result);
}
protected Class<M> currentMapperClass() {
return (Class<M>) ReflectionKit.getSuperClassGenericType(this.getClass(), BaseServiceImpl.class, 0);
}
@Override
public Class<T> currentModelClass() {
return (Class<T>) ReflectionKit.getSuperClassGenericType(this.getClass(), BaseServiceImpl.class, 1);
}
protected String getSqlStatement(SqlMethod sqlMethod) {
return SqlHelper.getSqlStatement(this.currentMapperClass(), sqlMethod);
}
@Override
public boolean insert(T entity) {
return BaseServiceImpl.retBool(baseDao.insert(entity));
}
@Override
@Transactional(rollbackFor = Exception.class)
public boolean insertBatch(Collection<T> entityList) {
return insertBatch(entityList, 100);
}
/**
* 批量插入
*/
@Override
@Transactional(rollbackFor = Exception.class)
public boolean insertBatch(Collection<T> entityList, int batchSize) {
String sqlStatement = getSqlStatement(SqlMethod.INSERT_ONE);
return executeBatch(entityList, batchSize, (sqlSession, entity) -> sqlSession.insert(sqlStatement, entity));
}
/**
* 执行批量操作
*/
protected <E> boolean executeBatch(Collection<E> list, int batchSize, BiConsumer<SqlSession, E> consumer) {
return SqlHelper.executeBatch(this.currentModelClass(), this.log, list, batchSize, consumer);
}
@Override
public boolean updateById(T entity) {
return BaseServiceImpl.retBool(baseDao.updateById(entity));
}
@Override
public boolean update(T entity, Wrapper<T> updateWrapper) {
return BaseServiceImpl.retBool(baseDao.update(entity, updateWrapper));
}
@Override
@Transactional(rollbackFor = Exception.class)
public boolean updateBatchById(Collection<T> entityList) {
return updateBatchById(entityList, 30);
}
@Override
@Transactional(rollbackFor = Exception.class)
public boolean updateBatchById(Collection<T> entityList, int batchSize) {
String sqlStatement = getSqlStatement(SqlMethod.UPDATE_BY_ID);
return executeBatch(entityList, batchSize, (sqlSession, entity) -> {
MapperMethod.ParamMap<T> param = new MapperMethod.ParamMap<>();
param.put(Constants.ENTITY, entity);
sqlSession.update(sqlStatement, param);
});
}
@Override
public T selectById(Serializable id) {
return baseDao.selectById(id);
}
@Override
public boolean deleteById(Serializable id) {
return SqlHelper.retBool(baseDao.deleteById(id));
}
@Override
public boolean deleteBatchIds(Collection<? extends Serializable> idList) {
return SqlHelper.retBool(baseDao.deleteBatchIds(idList));
}
}
6、使用导入工具类ExcelUtils
@RestController
@RequestMapping("test")
public class SysEmployeeInfoController {
@Autowired
private UserService userService;
@GetMapping("import")
@Operation(summary = "导入")
public Result importExcel(@RequestParam("file") MultipartFile file) throws Exception {
ExcelUtils.read(file, UserImportExcel.class, userService);
return new Result();
}
}
代码中的userService需要继承我们写的BaseServie。
总结
本文采用了 EasyExcel 和线程池的导入方案。可以有效的避免导入中可能出现的问题,代码特别完整,希望对您有帮助。
仅供参考,欢迎评论区留言,一起讨论~
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)