有效的在Stream.parallel()中使用事务

问题: 当我们使用stream().parallel()并行操作时,由于spring事务的单线程特性,无法有效在主线程执行预期的事务操作。

解决:

一、如同大多数多线程事务的处理方式,我们将事务细分化,对单个线程进行事务操作即可。

比如下伪代码,方法二中在每个线程中使用事务。由于单个线程中的类是注入到spring中的,所以可以分别其执行事务。

@Service
public class TestServiceA {

    @Autowired
    private TestServiceB serviceB;


    /**
     * 无效的事务
     *
     */
    @Transactional(rollbackFor = Exception.class)
    public void testparallelStreamTransactin0(List<String> list) {
        list.stream().parallel().forEach(e -> {

            // do insert 1
            // do insert 2


        });
    }
    /**
     * 有效的事务
     */
    @Transactional(rollbackFor = Exception.class)
    public void testparallelStreamTransactin1(List<String> list) {
        list.stream().parallel().forEach(data -> {
            serviceB.insert(data);
        });
    }
}

@Service
public class TestServiceB {


    @Transactional(rollbackFor = Exception.class)
    public  void  insert(String data){
            // do insert 1
            // do insert 2

    }


}

二、难度升级,我想让多线程同步实现事务,一起成功或失败。这个时候就必须复杂了,不建议使用多变的且难以控制的steam parallel并发,建议使用线程池操作。大致思路如下:

1、定义线程共有事务管理器或者其集合,统一储存和管理事务,保证其多线程可见性;

2、在每个线程中使用事务管理器实现事务状态的标识,并记录,但要注意线程数量对性能的消耗

3、遍历,任务事务状态,最终统一执行事务的提交和回滚。

@Autowired
    private PlatformTransactionManager transactionManager;


  ....


 
        //先在开启多线程外面,定义一个同步集合:
        List<TransactionStatus> transactionStatuses = Collections.synchronizedList(new ArrayList<TransactionStatus>());
        CountDownLatch latch= new CountDownLatch(threads);
 
        for (int i = 0; i < threads; i++) {
            int finalI = i;
            ThreadPoolUtils.fixedThreadPool.execute(new Runnable() {
                @Override
                public void run() {
                    DefaultTransactionDefinition def = new DefaultTransactionDefinition();
                    def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); // 事物隔离级别,开启新事务,这样会比较安全些。
                    TransactionStatus status = transactionManager.getTransaction(def); // 获得事务状态
                    transactionStatuses.add(status);
                    try{
                        improtInsertBath(corpCode,userId,customersList.get(finalI));
                    }catch(Exception e){
                        e.printStackTrace();
                        for (TransactionStatus transactionStatus:transactionStatuses) {

                          //异常 回滚所有事务                          
                           transactionStatus.setRollbackOnly();
                        }
 
                    }
 
                }
            });
        }
经验分享 程序员 微信小程序 职场和发展