SpringBoot 高效批量插入万级数据,哪种方式最强?
准备工作
1、Maven项目中pom.xml文件引入的相关依赖如下
<dependencies>
<!-- SpringBoot Web模块依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- MyBatis-Plus 依赖 -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.3.1</version>
</dependency>
<!-- 数据库连接驱动 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!-- 使用注解,简化代码-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
2、application.yml配置属性文件内容(重点:开启批处理模式)
server:
端口号
port: 8080
# MySQL连接配置信息(以下仅简单配置,更多设置可自行查看)
spring:
datasource:
连接地址(解决UTF-8中文乱码问题 + 时区校正)
(rewriteBatchedStatements=true 开启批处理模式)
url: jdbc:mysql://127.0.0.1:3306/bjpowernode?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&rewriteBatchedStatements=true
用户名
username: root
密码
password: xxx
连接驱动名称
driver-class-name: com.mysql.cj.jdbc.Driver
3、Entity实体类(测试)
/**
* Student 测试实体类
*
* @Data注解:引入Lombok依赖,可省略Setter、Getter方法
*/
@Data
@TableName(value = "student")
public class Student {
/** 主键 type:自增 */
@TableId(type = IdType.AUTO)
private int id;
/** 名字 */
private String name;
/** 年龄 */
private int age;
/** 地址 */
private String addr;
/** 地址号 @TableField:与表字段映射 */
@TableField(value = "addr_num")
private String addrNum;
public Student(String name, int age, String addr, String addrNum) {
this.name = name;
this.age = age;
this.addr = addr;
this.addrNum = addrNum;
}
}
4、数据库student表结构(注意:无索引)
测试工作
1、for循环插入(单条)(总耗时:177秒)
总结:测试平均时间约是177秒,实在是不忍直视(捂脸),因为利用for循环进行单条插入时,每次都是在获取连接(Connection)、释放连接和资源关闭等操作上,(如果数据量大的情况下)极其消耗资源,导致时间长。
@GetMapping("/for")
public void forSingle(){
// 开始时间
long startTime = System.currentTimeMillis();
for (int i = 0; i < 50000; i++){
Student student = new Student("李毅" + i,24,"张家界市" + i,i + "号");
studentMapper.insert(student);
}
// 结束时间
long endTime = System.currentTimeMillis();
System.out.println("插入数据消耗时间:" + (endTime - startTime));
}
测试时间:2、拼接SQL语句(总耗时:2.9秒)
简明:拼接格式:insert into student(xxxx) value(xxxx),(xxxx),(xxxxx)…
总结:拼接结果就是将所有的数据集成在一条SQL语句的value值上,其由于提交到服务器上的insert语句少了,网络负载少了,性能也就提上去。
但是当数据量上去后,可能会出现内存溢出、解析SQL语句耗时等情况,但与第一点相比,提高了极大的性能。
/**
* 拼接sql形式
*/
@GetMapping("/sql")
public void sql(){
ArrayList<Student> arrayList = new ArrayList<>();
long startTime = System.currentTimeMillis();
for (int i = 0; i < 50000; i++){
Student student = new Student("李毅" + i,24,"张家界市" + i,i + "号");
arrayList.add(student);
}
studentMapper.insertSplice(arrayList);
long endTime = System.currentTimeMillis();
System.out.println("插入数据消耗时间:" + (endTime - startTime));
}
mapper
public interface StudentMapper extends BaseMapper<Student> {
@Insert("<script>" +
"insert into student (name, age, addr, addr_num) values " +
"<foreach collection='studentList' item='item' separator=','> " +
"(#{item.name}, #{item.age},#{item.addr}, #{item.addrNum}) " +
"</foreach> " +
"</script>")
int insertSplice(@Param("studentList") List<Student> studentList);
}
测试结果3、批量插入saveBatch(总耗时:2.7秒)
简明:使用MyBatis-Plus实现IService接口中批处理saveBatch()方法,对底层源码进行查看时,可发现其实是for循环插入,但是与第一点相比,为什么性能上提高了呢?
因为利用分片处理(batchSize = 1000) + 分批提交事务的操作,从而提高性能,并非在Connection上消耗性能。(目前个人觉得较优化方案)
/**
* mybatis-plus的批处理模式
*/
@GetMapping("/saveBatch1")
public void saveBatch1(){
ArrayList<Student> arrayList = new ArrayList<>();
long startTime = System.currentTimeMillis();
// 模拟数据
for (int i = 0; i < 50000; i++){
Student student = new Student("李毅" + i,24,"张家界市" + i,i + "号");
arrayList.add(student);
}
// 批量插入
studentService.saveBatch(arrayList);
long endTime = System.currentTimeMillis();
System.out.println("插入数据消耗时间:" + (endTime - startTime));
}
重点注意:MySQLJDBC驱动默认情况下忽略saveBatch()方法中的executeBatch()语句,将需要批量处理的一组SQL语句进行拆散,执行时一条一条给MySQL数据库,造成实际上是分片插入,即与单条插入方式相比,有提高,但是性能未能得到实质性的提高。
测试:数据库连接URL地址缺少 rewriteBatchedStatements = true 参数情况
# MySQL连接配置信息
spring:
datasource:
连接地址(未开启批处理模式)
url: jdbc:mysql://127.0.0.1:3306/bjpowernode?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
用户名
username: root
密码
password: xxx
连接驱动名称
driver-class-name: com.mysql.cj.jdbc.Driver
测试结果:10541 约等于 10.5秒(未开启批处理模式)
ChatGPT中文网站:https://ai.cxyquan.com/ 4、循环插入 + 开启批处理模式(总耗时:1.7秒)(重点:一次性提交)
简明:开启批处理,关闭自动提交事务,共用同一个SqlSession之后,for循环单条插入的性能得到实质性的提高;由于同一个SqlSession省去对资源相关操作的耗能、减少对事务处理的时间等,从而极大程度上提高执行效率。(目前个人觉得较优化方案)
/**
* 共用同一个SqlSession
*/
@GetMapping("/forSaveBatch")
public void forSaveBatch(){
// 开启批量处理模式 BATCH 、关闭自动提交事务 false
SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH,false);
// 反射获取,获取Mapper
StudentMapper studentMapper = sqlSession.getMapper(StudentMapper.class);
long startTime = System.currentTimeMillis();
for (int i = 0 ; i < 50000 ; i++){
Student student = new Student("李毅" + i,24,"张家界市" + i,i + "号");
studentMapper.insert(student);
}
// 一次性提交事务
sqlSession.commit();
// 关闭资源
sqlSession.close();
long endTime = System.currentTimeMillis();
System.out.println("总耗时: " + (endTime - startTime));
}
5、ThreadPoolTaskExecutor(总耗时:1.7秒)
❝目前个人觉得较优化方案
❞
@Autowired
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
@Autowired
private PlatformTransactionManager transactionManager;
@GetMapping("/batchInsert2")
public void batchInsert2() {
ArrayList<Student> arrayList = new ArrayList<>();
long startTime = System.currentTimeMillis();
// 模拟数据
for (int i = 0; i < 50000; i++){
Student student = new Student("李毅" + i,24,"张家界市" + i,i + "号");
arrayList.add(student);
}
int count = arrayList.size();
int pageSize = 1000; // 每批次插入的数据量
int threadNum = count / pageSize + 1; // 线程数
CountDownLatch countDownLatch = new CountDownLatch(threadNum);
for (int i = 0; i < threadNum; i++) {
int startIndex = i * pageSize;
int endIndex = Math.min(count, (i + 1) * pageSize);
List<Student> subList = arrayList.subList(startIndex, endIndex);
threadPoolTaskExecutor.execute(() -> {
DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
TransactionStatus status = transactionManager.getTransaction(transactionDefinition);
try {
studentMapper.insertSplice(subList);
transactionManager.commit(status);
} catch (Exception e) {
transactionManager.rollback(status);
throw e;
} finally {
countDownLatch.countDown();
}
});
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
首先定义了一个线程池(ThreadPoolTaskExecutor),用于管理线程的生命周期和执行任务。然后,我们将要插入的数据列表按照指定的批次大小分割成多个子列表,并开启多个线程来执行插入操作。
首先通过 TransactionManager 获取事务管理器,并使用 TransactionDefinition 定义事务属性。然后,在每个线程中,我们通过 transactionManager.getTransaction()
方法获取事务状态,并在插入操作中使用该状态来管理事务。
在插入操作完成后,我们再根据操作结果调用transactionManager.commit()
或 transactionManager.rollback()
方法来提交或回滚事务。在每个线程执行完毕后,都会调用 CountDownLatch 的 countDown() 方法,以便主线程等待所有线程都执行完毕后再返回。
来源:blog.csdn.net/weixin_44030143/
article/details/130825037
往期推荐:
再见,Navicat!! 警报炸锅,FastJson 又立功了。。 高德导航红绿灯为啥能读秒? 为什么 List 原生排序比 stream() 流效率更高?
微信扫码关注该文公众号作者