最近使用mybatis-plus的 saveOrUpdateBath 和saveBath接口执行特别慢,数据量大时往往需要十几分钟,打开日志查看原来批量操作也是循环单条数据插入的,那有没有批量更新的办法呢??

mybatis-plus 提供了一个自定义方法sql注入器DefaultSqlInjector我们可以通过继DefaultSqlInjector来加入自定义的方法达到批量插入的效果。

import com.baomidou.mybatisplus.core.injector.AbstractMethod;import com.baomidou.mybatisplus.core.injector.DefaultSqlInjector;import org.springframework.stereotype.Component;import java.util.List;/** * @Description: 自定义方法SQL注入器 * @Title: CustomizedSqlInjector * @Package com.highgo.edu.common.batchOperation * @Author: * @Copyright  * @CreateTime: 2022/11/3 16:21 */@Componentpublic class CustomizedSqlInjector  extends DefaultSqlInjector {    /**     * 如果只需增加方法,保留mybatis plus自带方法,     * 可以先获取super.getMethodList(),再添加add     */    @Override    public List getMethodList(Class mapperClass) {        List methodList = super.getMethodList(mapperClass);        methodList.add(new InsertBatchMethod());       // methodList.add(new UpdateBatchMethod());        methodList.add(new MysqlInsertOrUpdateBath());        methodList.add(new PGInsertOrUpdateBath());        return methodList;    }}

同时我们需要继承BaseMapper 定义

import com.baomidou.mybatisplus.core.mapper.BaseMapper;import org.apache.ibatis.annotations.Param;import java.util.List;/** * @description:自定义接口覆盖BaseMapper,解决mybatis-plus 批量操作慢的问题 * @author: * @date: 2022/11/3 15:14 * @param: null * @return: **/public interface RootMapper extends BaseMapper {    /**     * @description:批量插入     * @author:     * @date: 2022/11/3 15:13     * @param: [list]     * @return: int     **/    int insertBatch(@Param("list") List list);    /**     * @description:批量插入更新     * @author:     * @date: 2022/11/3 15:14     * @param: [list]     * @return: int     **/    int mysqlInsertOrUpdateBatch(@Param("list") List list);    int pgInsertOrUpdateBatch(@Param("list") List list);}

在需要使用批量更新插入的mapper上使用自定义的RootMapper

如下图

import com.XX.edu.common.batchOperation.RootMapper;import com.XX.edu.exam.model.TScore;import org.springframework.stereotype.Repository;/** * @Entity com.XX.edu.exam.model.TScore */@Repositorypublic interface TScoreMapper extends RootMapper {}

下面我们来定义批量插入的方法:

package com.XX.edu.common.batchOperation;import com.baomidou.mybatisplus.annotation.IdType;import com.baomidou.mybatisplus.core.enums.SqlMethod;import com.baomidou.mybatisplus.core.injector.AbstractMethod;import com.baomidou.mybatisplus.core.metadata.TableInfo;import com.baomidou.mybatisplus.core.metadata.TableInfoHelper;import org.apache.commons.lang3.StringUtils;import org.apache.ibatis.executor.keygen.Jdbc3KeyGenerator;import org.apache.ibatis.executor.keygen.KeyGenerator;import org.apache.ibatis.executor.keygen.NoKeyGenerator;import org.apache.ibatis.mapping.MappedStatement;import org.apache.ibatis.mapping.SqlSource;import org.slf4j.Logger;import org.slf4j.LoggerFactory;/** * @Description: 批量插入的方法 * @Title: InsertBatchMethod * @Package com.XX.edu.common.batchOperation * @Author: * @CreateTime: 2022/11/3 15:16 */public class InsertBatchMethod extends AbstractMethod {    Logger logger = LoggerFactory.getLogger(getClass());    @Override    public MappedStatement injectMappedStatement(Class mapperClass, Class modelClass, TableInfo tableInfo) {        final String sql = "insert into %s %s values %s";        final String fieldSql = prepareFieldSql(tableInfo);        final String valueSql = prepareValuesSql(tableInfo);        final String sqlResult = String.format(sql, tableInfo.getTableName(), fieldSql, valueSql);        logger.debug("sqlResult----->{}", sqlResult);        SqlSource sqlSource = languageDriver.createSqlSource(configuration, sqlResult, modelClass);        KeyGenerator keyGenerator = new NoKeyGenerator();        SqlMethod sqlMethod = SqlMethod.INSERT_ONE;        String keyProperty = null;        String keyColumn = null;        // 表包含主键处理逻辑,如果不包含主键当普通字段处理        if (StringUtils.isNotEmpty(tableInfo.getKeyProperty())) {            if (tableInfo.getIdType() == IdType.AUTO) {                /* 自增主键 */                keyGenerator = new Jdbc3KeyGenerator();                keyProperty = tableInfo.getKeyProperty();                keyColumn = tableInfo.getKeyColumn();            } else {                if (null != tableInfo.getKeySequence()) {                    keyGenerator = TableInfoHelper.genKeyGenerator(sqlMethod.getMethod(),tableInfo, builderAssistant);                    keyProperty = tableInfo.getKeyProperty();                    keyColumn = tableInfo.getKeyColumn();                }            }        }        // 第三个参数必须和RootMapper的自定义方法名一致        return this.addInsertMappedStatement(mapperClass, modelClass, "insertBatch", sqlSource, keyGenerator, keyProperty, keyColumn);    }    /**     * @description: 拼接字段值     * @author:     * @date: 2022/11/3 15:20     * @param: [tableInfo]     * @return: java.lang.String     **/    private String prepareValuesSql(TableInfo tableInfo) {        final StringBuilder valueSql = new StringBuilder();        valueSql.append("");        //valueSql.append("#{item.").append(tableInfo.getKeyProperty()).append("},");        tableInfo.getFieldList().forEach(x -> valueSql.append("#{item.").append(x.getProperty()).append("},"));        valueSql.delete(valueSql.length() - 1, valueSql.length());        valueSql.append("");        return valueSql.toString();    }    /**     * @description:拼接字段     * @author:     * @date: 2022/11/3 15:20     * @param: [tableInfo]     * @return: java.lang.String     **/    private String prepareFieldSql(TableInfo tableInfo) {        StringBuilder fieldSql = new StringBuilder();        //fieldSql.append(tableInfo.getKeyColumn()).append(",");        tableInfo.getFieldList().forEach(x -> {            fieldSql.append(x.getColumn()).append(",");        });        fieldSql.delete(fieldSql.length() - 1, fieldSql.length());        fieldSql.insert(0, "(");        fieldSql.append(")");        return fieldSql.toString();    }}

继续定义批量插入更新的抽象方法

package com.XX.edu.common.batchOperation;import com.baomidou.mybatisplus.core.injector.AbstractMethod;import com.baomidou.mybatisplus.core.metadata.TableInfo;import org.apache.ibatis.executor.keygen.NoKeyGenerator;import org.apache.ibatis.mapping.MappedStatement;import org.apache.ibatis.mapping.SqlSource;/** * @Description: 批量插入更新 * @Title: InsertOrUpdateBath * @Package com.XX.edu.common.batchOperation * @Author: * @Copyright  * @CreateTime: 2022/11/3 15:23 */public abstract class InsertOrUpdateBathAbstract extends AbstractMethod {    @Override    public MappedStatement injectMappedStatement(Class mapperClass, Class modelClass, TableInfo tableInfo) {        final  SqlSource sqlSource = prepareSqlSource(tableInfo, modelClass);        // 第三个参数必须和RootMapper的自定义方法名一致        return this.addInsertMappedStatement(mapperClass, modelClass, prepareInsertOrUpdateBathName(), sqlSource, new NoKeyGenerator(), null, null);    }    protected abstract SqlSource prepareSqlSource(TableInfo tableInfo, Class modelClass);        protected abstract String prepareInsertOrUpdateBathName();}

继承上面的抽象类—-mysql版本(本版本未测试 根据自己需求修改)

package com.XX.edu.common.batchOperation;import com.baomidou.mybatisplus.core.metadata.TableInfo;import org.apache.ibatis.mapping.SqlSource;import org.springframework.util.StringUtils;/** * @Description: 批量插入更新 * @Title: InsertOrUpdateBath * @Package com.XX.edu.common.batchOperation * @Author: * @Copyright  * @CreateTime: 2022/11/3 15:23 */public class MysqlInsertOrUpdateBath extends InsertOrUpdateBathAbstract {    @Override    protected SqlSource prepareSqlSource(TableInfo tableInfo, Class modelClass) {        final String sql = "insert into %s %s values %s ON DUPLICATE KEY UPDATE %s";        final String tableName = tableInfo.getTableName();        final String filedSql = prepareFieldSql(tableInfo);        final String modelValuesSql = prepareModelValuesSql(tableInfo);        final String duplicateKeySql = prepareDuplicateKeySql(tableInfo);        final String sqlResult = String.format(sql, tableName, filedSql, modelValuesSql, filedSql, duplicateKeySql);        //String.format(sql, tableName, filedSql, modelValuesSql, duplicateKeySql);        //System.out.println("savaorupdatesqlsql="+sqlResult);        return languageDriver.createSqlSource(configuration, sqlResult, modelClass);    }    @Override    protected String prepareInsertOrUpdateBathName() {        return "mysqlInsertOrUpdateBath";    }    String prepareDuplicateKeySql(TableInfo tableInfo) {        final StringBuilder duplicateKeySql = new StringBuilder();        if (!StringUtils.isEmpty(tableInfo.getKeyColumn())) {            duplicateKeySql.append(tableInfo.getKeyColumn()).append("=values(").append(tableInfo.getKeyColumn()).append("),");        }        tableInfo.getFieldList().forEach(x -> {            duplicateKeySql.append(x.getColumn())                    .append("=values(")                    .append(x.getColumn())                    .append("),");        });        duplicateKeySql.delete(duplicateKeySql.length() - 1, duplicateKeySql.length());        return duplicateKeySql.toString();    }    String prepareModelValuesSql(TableInfo tableInfo) {        final StringBuilder valueSql = new StringBuilder();        valueSql.append("");        if (!StringUtils.isEmpty(tableInfo.getKeyProperty())) {            valueSql.append("#{item.").append(tableInfo.getKeyProperty()).append("},");        }        tableInfo.getFieldList().forEach(x -> valueSql.append("#{item.").append(x.getProperty()).append("},"));        valueSql.delete(valueSql.length() - 1, valueSql.length());        valueSql.append("");        return valueSql.toString();    }    /**     * @description:准备属性名     * @author:     * @date: 2022/11/3 15:25     * @param: [tableInfo]     * @return: java.lang.String     **/    String prepareFieldSql(TableInfo tableInfo) {        StringBuilder fieldSql = new StringBuilder();        fieldSql.append(tableInfo.getKeyColumn()).append(",");        tableInfo.getFieldList().forEach(x -> {            fieldSql.append(x.getColumn()).append(",");        });        fieldSql.delete(fieldSql.length() - 1, fieldSql.length());        fieldSql.insert(0, "(");        fieldSql.append(")");        return fieldSql.toString();    }}

继承上面的抽象类—-postgresql版本(已测试完成,其中id使用序列自增)

package com.XX.edu.common.batchOperation;import com.baomidou.mybatisplus.core.metadata.TableInfo;import org.apache.ibatis.mapping.SqlSource;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.util.StringUtils;/** * @Description: 批量插入更新 * @Title: InsertOrUpdateBath * @Package com.XX.edu.common.batchOperation * @Author: * @Copyright  * @CreateTime: 2022/11/3 15:23 */public class PGInsertOrUpdateBath extends InsertOrUpdateBathAbstract {    Logger logger = LoggerFactory.getLogger(getClass());    @Override    protected SqlSource prepareSqlSource(TableInfo tableInfo, Class modelClass) {        final String sql = "insert into %s %s values %s on conflict (id)  do update set %s ";        final String tableName = tableInfo.getTableName();        final String filedSql = prepareFieldSql(tableInfo);        final String modelValuesSql = prepareModelValuesSql(tableInfo);        final String duplicateKeySql = prepareDuplicateKeySql(tableInfo);        final String sqlResult = String.format(sql, tableName, filedSql, modelValuesSql, duplicateKeySql);        logger.info("sql=={}",sqlResult);        return languageDriver.createSqlSource(configuration, sqlResult, modelClass);    }    @Override    protected String prepareInsertOrUpdateBathName() {        return "pgInsertOrUpdateBatch";    }    private String prepareDuplicateKeySql(TableInfo tableInfo) {        final StringBuilder duplicateKeySql = new StringBuilder();        if (!StringUtils.isEmpty(tableInfo.getKeyColumn())) {            duplicateKeySql.append(tableInfo.getKeyColumn()).append("=excluded.").append(tableInfo.getKeyColumn()).append(",");        }        tableInfo.getFieldList().forEach(x -> {            duplicateKeySql.append(x.getColumn())                    .append("=excluded.")                    .append(x.getColumn())                    .append(",");        });        duplicateKeySql.delete(duplicateKeySql.length() - 1, duplicateKeySql.length());        return duplicateKeySql.toString();    }    private String prepareModelValuesSql(TableInfo tableInfo) {        final StringBuilder valueSql = new StringBuilder();        valueSql.append("");        if (!StringUtils.isEmpty(tableInfo.getKeyProperty())) {            valueSql.append("#{item.").append(tableInfo.getKeyProperty()).append("},");        }        tableInfo.getFieldList().forEach(x -> valueSql.append("#{item.").append(x.getProperty()).append("},"));        valueSql.delete(valueSql.length() - 1, valueSql.length());        valueSql.append("");        return valueSql.toString();    }    /**     * @description:准备属性名     * @author:     * @date: 2022/11/3 15:25     * @param: [tableInfo]     * @return: java.lang.String     **/    private String prepareFieldSql(TableInfo tableInfo) {        StringBuilder fieldSql = new StringBuilder();        if (!StringUtils.isEmpty(tableInfo.getKeyProperty())) {            fieldSql.append(tableInfo.getKeyColumn()).append(",");        }        tableInfo.getFieldList().forEach(x -> {            fieldSql.append(x.getColumn()).append(",");        });        fieldSql.delete(fieldSql.length() - 1, fieldSql.length());        fieldSql.insert(0, "(");        fieldSql.append(")");        return fieldSql.toString();    }}

到此定义结束,下面开始使用

@Servicepublic class TNewExerciseServiceImpl extends ServiceImpl        implements TNewExerciseService {    Logger logger = LoggerFactory.getLogger(getClass());//引入mapper @AutowiredTScoreMapper scoreMapper;//这样就可以批量新增更新操作了public void test(List collect){ scoreMapper.pgInsertOrUpdateBatch(collect);}}

但是如果collect数据量太大会出现异常
“Tried to send an out-of-range integer as a 2-byte value: 87923”
是因为pg对于sql语句的参数数量是有限制的,最大为32767。

看pg源码

public void sendInteger2(int val) throws IOException {        if (val >= -32768 && val >> 8);            this.int2Buf[1] = (byte)val;            this.pgOutput.write(this.int2Buf);        } else {            throw new IOException("Tried to send an out-of-range integer as a 2-byte value: " + val);        }    }

从源代码中可以看到pgsql使用2个字节的integer,故其取值范围为[-32768, 32767]。

这意味着sql语句的参数数量,即行数*列数之积必须小于等于32767.

比如,总共有17个字段,因为最大是32767,这样最多允许32767/ 17 大约是1 927个,所以要分批操作,或有能力的童鞋可以自己修改pg的驱动呦

分批插入代码如下:

    /**     * @description:     * @author:     * @date: 2022/11/4 14:57     * @param: [list, fieldCount:列数]     * @return: void     **/    public void detachSaveOrUpdate_score(List list, int fieldCount) {        int numberBatch = 32767; //每一次插入的最大数        //每一次插入的最大行数 , 向下取整        int v = ((Double) Math.floor(numberBatch / (fieldCount * 1.0))).intValue();        double number = list.size() * 1.0 / v;        int n = ((Double) Math.ceil(number)).intValue(); //向上取整        for (int i = 0; i  list.size()) {                end = list.size(); //如果end不能超过最大索引值            }            scoreMapper.pgInsertOrUpdateBatch(list.subList(v * i, end)); //插入数据库            logger.info("更新一次~~~{}-{}", v * i, end);        }    }

完成收工~~~