前言

在项目开发中,需要插入批量插入20多万条数据,通过日志观察,发现在调用MyBatis-Plus中的saveBatch()方法性能非常的差,本篇文章主要分享一下saveBatch()的原理以及使用的注意事项

原理

我们通过源码的形式进行解析saveBatch()方法的原理

    @Transactional(rollbackFor = Exception.class)
    default boolean saveBatch(Collection<T> entityList) {
        //DEFAULT_BATCH_SIZE 默认是1000
        return saveBatch(entityList, DEFAULT_BATCH_SIZE);
    }
    @Transactional(rollbackFor = Exception.class)
    @Override
    public boolean saveBatch(Collection<T> entityList, int batchSize) {
        String sqlStatement = getSqlStatement(SqlMethod.INSERT_ONE);
        //分批执行SQL
        return executeBatch(entityList, batchSize, (sqlSession, entity) -> sqlSession.insert(sqlStatement, entity));
    }

我们看下saveBatch是怎么批量执行的

    public static <E> boolean executeBatch(Class<?> entityClass, Log log, Collection<E> list, int batchSize, BiConsumer<SqlSession, E> consumer) {
        Assert.isFalse(batchSize < 1, "batchSize must not be less than one");
        return !CollectionUtils.isEmpty(list) && executeBatch(entityClass, log, sqlSession -> {
            int size = list.size();
            int i = 1;
            for (E element : list) {
                //数据最终保存在StatementImpl.batchArgs中,用于批量保存
                consumer.accept(sqlSession, element);
                if ((i % batchSize == 0) || i == size) {
                    //批量保存StatementImpl.batchArgs中数据
                    sqlSession.flushStatements();
                }
                i++;
            }
        });
    }

通过flushStatements()方法我们可以看到最终调用的是StatementImpl中的executeBatchInternal()方法。注意:代码过长,下面方法做了删减。

protected long[] executeBatchInternal() throws SQLException {
        synchronized (checkClosed().getConnectionMutex()) {

            if (this.connection.isReadOnly()) {
                throw new SQLException(Messages.getString("PreparedStatement.25") + Messages.getString("PreparedStatement.26"),
                        MysqlErrorNumbers.SQL_STATE_ILLEGAL_ARGUMENT);
            }

            if (this.query.getBatchedArgs() == null || this.query.getBatchedArgs().size() == 0) {
                return new long[0];
            }

            // we timeout the entire batch, not individual statements
            int batchTimeout = getTimeoutInMillis();
            setTimeoutInMillis(0);

            resetCancelledState();

            try {
                statementBegins();

                clearWarnings();
				// 如果配置rewriteBatchedStatements 开启多SQL执行
                if (!this.batchHasPlainStatements && this.rewriteBatchedStatements.getValue()) {

                    if (getQueryInfo().isRewritableWithMultiValuesClause()) {
                        return executeBatchWithMultiValuesClause(batchTimeout);
                    }

                    if (!this.batchHasPlainStatements && this.query.getBatchedArgs() != null
                            && this.query.getBatchedArgs().size() > 3 /* cost of option setting rt-wise */) {
                        return executePreparedBatchAsMultiStatement(batchTimeout);
                    }
                }

                return executeBatchSerially(batchTimeout);
            } finally {
                this.query.getStatementExecuting().set(false);

                clearBatch();
            }
        }
    }

我们再看下insert做了什么事情

  public int insert(String statement, Object parameter) {
    return update(statement, parameter);
  }
  
  public int update(String statement, Object parameter) {
    try {
      dirty = true;
      MappedStatement ms = configuration.getMappedStatement(statement);
      return executor.update(ms, wrapCollection(parameter));
    } catch (Exception e) {
      throw ExceptionFactory.wrapException("Error updating database.  Cause: " + e, e);
    } finally {
      ErrorContext.instance().reset();
    }
  }
  public int update(MappedStatement ms, Object parameter) throws SQLException {
    ErrorContext.instance().resource(ms.getResource()).activity("executing an update").object(ms.getId());
    if (closed) {
      throw new ExecutorException("Executor was closed.");
    }
    clearLocalCache();
    return doUpdate(ms, parameter);
  }

重点方法在doUpdate(ms,parameter). 完成SQL的拼装

@Override
  public int doUpdate(MappedStatement ms, Object parameterObject) throws SQLException {
    final Configuration configuration = ms.getConfiguration();
    final StatementHandler handler = configuration.newStatementHandler(this, ms, parameterObject, RowBounds.DEFAULT, null, null);
    final BoundSql boundSql = handler.getBoundSql();
    final String sql = boundSql.getSql();
    final Statement stmt;
     // 数据的SQL语句必须完全一致,包括表名和列
    if (sql.equals(currentSql) && ms.equals(currentStatement)) {
      int last = statementList.size() - 1;
      stmt = statementList.get(last);
      applyTransactionTimeout(stmt);
      handler.parameterize(stmt);// fix Issues 322
      BatchResult batchResult = batchResultList.get(last);
      batchResult.addParameterObject(parameterObject);
    } else {
      Connection connection = getConnection(ms.getStatementLog());
      stmt = handler.prepare(connection, transaction.getTimeout());
      handler.parameterize(stmt);    // fix Issues 322
      currentSql = sql;
      currentStatement = ms;
      statementList.add(stmt);
      batchResultList.add(new BatchResult(ms, sql, parameterObject));
    }
    handler.batch(stmt);
    return BATCH_UPDATE_RETURN_VALUE;
  }

以上就是saveBatch的原理。

总结

1: 想要批量执行操作 数据库链接参数加上rewriteBatchedStatements=true

rewriteBatchedStatements参数需要保证5.1.13以上版本的驱动才能实现高性能的批量插入

2: 根据doUpdate(ms,parameter). 完成SQL的拼装的原理可以得出,如果批量插入的数据,有些数据字段值为null,不会批量查询,而是单独拼装一个SQL执行。

例如:

public class Student {
    
    private String name;
    
    private String address;
}

100个Student,其中 20个name=null,其中 50个address==null。通过日志我们看下这种不会批量插入。

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐