有效的在Stream.parallel()中使用事务
问题: 当我们使用stream().parallel()并行操作时,由于spring事务的单线程特性,无法有效在主线程执行预期的事务操作。
解决:
一、如同大多数多线程事务的处理方式,我们将事务细分化,对单个线程进行事务操作即可。
比如下伪代码,方法二中在每个线程中使用事务。由于单个线程中的类是注入到spring中的,所以可以分别其执行事务。
@Service public class TestServiceA { @Autowired private TestServiceB serviceB; /** * 无效的事务 * */ @Transactional(rollbackFor = Exception.class) public void testparallelStreamTransactin0(List<String> list) { list.stream().parallel().forEach(e -> { // do insert 1 // do insert 2 }); } /** * 有效的事务 */ @Transactional(rollbackFor = Exception.class) public void testparallelStreamTransactin1(List<String> list) { list.stream().parallel().forEach(data -> { serviceB.insert(data); }); } } @Service public class TestServiceB { @Transactional(rollbackFor = Exception.class) public void insert(String data){ // do insert 1 // do insert 2 } }
二、难度升级,我想让多线程同步实现事务,一起成功或失败。这个时候就必须复杂了,不建议使用多变的且难以控制的steam parallel并发,建议使用线程池操作。大致思路如下:
1、定义线程共有事务管理器或者其集合,统一储存和管理事务,保证其多线程可见性;
2、在每个线程中使用事务管理器实现事务状态的标识,并记录,但要注意线程数量对性能的消耗
3、遍历,任务事务状态,最终统一执行事务的提交和回滚。
@Autowired private PlatformTransactionManager transactionManager; .... //先在开启多线程外面,定义一个同步集合: List<TransactionStatus> transactionStatuses = Collections.synchronizedList(new ArrayList<TransactionStatus>()); CountDownLatch latch= new CountDownLatch(threads); for (int i = 0; i < threads; i++) { int finalI = i; ThreadPoolUtils.fixedThreadPool.execute(new Runnable() { @Override public void run() { DefaultTransactionDefinition def = new DefaultTransactionDefinition(); def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); // 事物隔离级别,开启新事务,这样会比较安全些。 TransactionStatus status = transactionManager.getTransaction(def); // 获得事务状态 transactionStatuses.add(status); try{ improtInsertBath(corpCode,userId,customersList.get(finalI)); }catch(Exception e){ e.printStackTrace(); for (TransactionStatus transactionStatus:transactionStatuses) { //异常 回滚所有事务 transactionStatus.setRollbackOnly(); } } } }); }
上一篇:
通过多线程提高代码的执行效率例子
下一篇:
大学物理·第一章质点运动学