摘要: 原创出处 huaweicloud.csdn.net/63876ff5dacf622b8df8c462.html 「.李太白.」欢迎转载,保留摘要,谢谢!
一、使用场景
数据库有两张表 t_person
和 t_school
如下:前端传来10000条person数据要插入到t_person
,同时要删除t_school
表中id为1的数据(为提高效率采用线程池做)


二、思路
1、要保证主线程和子线程使用的同一个sqlSession
2、手动控制提交和回滚
3、将10000条数据均分成10份,每份1000条,创建10个任务,放入线程池执行!
三、代码及注释如下:
1、核心业务代码
@Service public class PersonServiceImpl extends ServiceImpl<PersonMapper, Person> implements IPersonService {
@Autowired private SqlSessionTemplate sqlSessionTemplate;
@Autowired private SchoolMapper schoolMapper;
private ArrayBlockingQueue queue=new ArrayBlockingQueue(8,true);
private ThreadPoolExecutor.CallerRunsPolicy policy=new ThreadPoolExecutor.CallerRunsPolicy();
private ThreadPoolExecutor executor = new ThreadPoolExecutor(10,15,10, TimeUnit.SECONDS ,queue,policy);
@Override public int insertPerson(Person person) { return this.baseMapper.insert(person); }
@Override @Transactional public void inserPersonBatch(List<Person> list) throws SQLException {
SqlSessionFactory sqlSessionFactory = sqlSessionTemplate.getSqlSessionFactory(); SqlSession sqlSession = sqlSessionFactory.openSession(); Connection connection = sqlSession.getConnection(); try{ connection.setAutoCommit(false); PersonMapper mapper = sqlSession.getMapper(PersonMapper.class); schoolMapper.deleteById("1"); List<List<Person>> lists = ListUtils.averageAssign(list,1000); List<Callable<Integer>> callableList = new ArrayList<>(); for(int i = 0; i < lists.size(); i++){ List<Person> insertList = lists.get(i); Callable<Integer> callable = new Callable<Integer>() { @Override public Integer call() throws Exception { int n = 0; try{ n = mapper.saveBatch(insertList); }catch (Exception e){ return n; } return n; } }; callableList.add(callable); }
List<Future<Integer>> futures = executor.invokeAll(callableList); for(Future<Integer> future : futures){ if(future.get() <= 0){ connection.rollback(); return; } } connection.commit(); System.out.println("添加成功!");
}catch (Exception e){ connection.rollback(); log.error(e.toString()); throw new SQLException("出现异常!"); } return; } }
|
2、PersonMapper中自定义批量插入
<insert id="saveBatch" parameterType="list"> insert into t_person(id,name,age,addr,classes,school_id) values <foreach collection="list" index="index" item="item" separator=","> ( #{item.id}, #{item.name}, #{item.age}, #{item.addr}, #{item.classes}, #{item.schoolId} ) </foreach> </insert>
|
3、均分List工具类
public class ListUtils {
public static <T> List<List<T>> averageAssign(List<T> source, int limit) { if (null == source || source.isEmpty()) { return Collections.emptyList(); } List<List<T>> result = new ArrayList<>(); int listCount = (source.size() - 1) / limit + 1; int remaider = source.size() % listCount; int number = source.size() / listCount; int offset = 0; for (int i = 0; i < listCount; i++) { List<T> value; if (remaider > 0) { value = source.subList(i * number + offset, (i + 1) * number + offset + 1); remaider--; offset++; } else { value = source.subList(i * number + offset, (i + 1) * number + offset); } result.add(value); } return result; } }
|
四、测试验证:
controller层如下:传入10000条数据
@GetMapping("/addBatch") public void addBatch() { List<Person> list = new ArrayList<>(); for(int i = 1; i <= 10000; i++){ Person p = new Person(); p.setId(i); p.setName("张三" + i); p.setAge(i); p.setAddr("重庆"); p.setClasses("一班"); p.setSchoolId(i); list.add(p); } try{ this.iPersonService.inserPersonBatch(list); }catch (Exception e){ e.printStackTrace(); } }
|
1、情况1:子线程中有一个执行失败
t_person
表主键唯一 10000条Person数据id按1—10000设置
如图t_person
表中已经有一条id为201的数据 所以线程池中有一个任务执行会失败!

我们打断点来看:此时已经分配好10个任务

如下图:插入id为201的数据时失败,线程池第一个任务执行失败返回0,其余全部成功返回1000

执行rollback
回滚

执行完毕观察数据库:
t_school
表数据没有被删,

t_person
表数据也没有变化

2、情况2、删除 t_person表中id为201的数据重新插入

此时10个任务全部执行成功:

执行commit

执行完毕观察数据库:
t_school
表数据已被删除

t_person
表中10000条数据也成功插入:

3、情况3:主线程报错就不演示了
以上测试成功!