通过Reactive的方式访问MongoDB

在Java中,提供了两种方式来支持通过Reactive的方式访问MongoDB。 一种是MongoDB 官方提供了支持 Reactive 的驱动 另一种是Spring Data MongoDB 中主要的支持 我们主要看Spring Data MongoDB 中主要的支持

Spring Data MongoDB 中主要的支持

    ReactiveMongoClientFactoryBean ReactiveMongoDatabaseFactory ReactiveMongoTemplate

先传入一个只打日志的动作

@Slf4j
@SpringBootApplication
public class MongodbDemoApplicationTest implements ApplicationRunner {
          
   
    @Autowired
    private ReactiveMongoTemplate mongoTemplate;
    private CountDownLatch cdl=new CountDownLatch(1);

    @Bean
    public MongoCustomConversions mongoCustomConversions(){
          
   
        return new MongoCustomConversions(
                Arrays.asList(
                new MoneyReadConverter(),
                new MoneyWriteConverter()));  //将读与写注册到bean里面
    }
    public static void main(String[] args) {
          
   
        SpringApplication.run(MongodbDemoApplicationTest.class,args);
    }
    @Override
    public void run(ApplicationArguments args) throws Exception {
          
   
        startFromInsertion(() -> log.info("Runnable"));
        //startFromInsertion(() -> {
          
   
        //    log.info("Runnable");
        //    decreaseHighPrice();
        //});

        log.info("after starting");

		//decreaseHighPrice();

        cdl.await(); //做一个等待
    }

    private void decreaseHighPrice() {
          
   
        mongoTemplate.updateMulti(query(where("price").gte(3000L)),  //查询出所有大于30块的咖啡
                new Update().inc("price", -500L)            //对于key,做一个减少500的操作
                        .currentDate("updateTime"), Coffee.class)    //更新updatetime
                .doFinally(s -> {
          
   
                    cdl.countDown();
                    log.info("Finnally 2, {}", s);
                })
                .subscribe(r -> log.info("Result is {}", r));
    }

    public void startFromInsertion(Runnable runnable){
          
   
        mongoTemplate.insertAll(initCoffee())
                .publishOn(Schedulers.elastic()) //将插入的结果publishOn到elastic线程池上
                .doOnNext(c -> log.info("Next: {}", c)) //针对每一个next动作,都会打印我们取到的coffee
                .doOnComplete(runnable)   //整体,即插入完成之后,我们去执行一个runnable对象,即() -> log.info("Runnable")
                .doFinally(s -> {
          
   
                    cdl.countDown();    //计数器减一
                    log.info("Finnally 1, {}", s);
                }) //上面的操作最后返回的是一个flux流  对insert的结果做一个count
                .count()
                .subscribe(c -> log.info("Insert {} records", c));
    }

    private List<Coffee> initCoffee() {
          
   
        Coffee espresso = Coffee.builder()
                .name("espresso")
                .price(Money.of(CurrencyUnit.of("CNY"), 30))
                .createTime(new Date())
                .updateTime(new Date())
                .build();
        Coffee latte = Coffee.builder()
                .name("latte")
                .price(Money.of(CurrencyUnit.of("CNY"), 20))
                .createTime(new Date())
                .updateTime(new Date())
                .build();
        return Arrays.asList(espresso, latte);
    }

}

结果如下:

解开//decreaseHighPrice();注释

结果如上,我们可以看到,程序的执行并不是我们代码写的一样自上而下执行,先执行startFromInsertion,再执行decreaseHighPrice,而是先执行decreaseHighPrice,再执行startFromInsertion

保证先插入,再降价

public void run(ApplicationArguments args) throws Exception {
          
   
        //startFromInsertion(() -> log.info("Runnable"));
        startFromInsertion(() -> {
          
   
            log.info("Runnable");
            decreaseHighPrice();
        });

        log.info("after starting");

		//decreaseHighPrice();

        cdl.await(); //做一个等待
    }

结果如下:

插入和降价用的不是同一个线程

经验分享 程序员 微信小程序 职场和发展