通过Reactive的方式访问MongoDB
在Java中,提供了两种方式来支持通过Reactive的方式访问MongoDB。 一种是MongoDB 官方提供了支持 Reactive 的驱动 另一种是Spring Data MongoDB 中主要的支持 我们主要看Spring Data MongoDB 中主要的支持
Spring Data MongoDB 中主要的支持
-
ReactiveMongoClientFactoryBean ReactiveMongoDatabaseFactory ReactiveMongoTemplate
先传入一个只打日志的动作
@Slf4j @SpringBootApplication public class MongodbDemoApplicationTest implements ApplicationRunner { @Autowired private ReactiveMongoTemplate mongoTemplate; private CountDownLatch cdl=new CountDownLatch(1); @Bean public MongoCustomConversions mongoCustomConversions(){ return new MongoCustomConversions( Arrays.asList( new MoneyReadConverter(), new MoneyWriteConverter())); //将读与写注册到bean里面 } public static void main(String[] args) { SpringApplication.run(MongodbDemoApplicationTest.class,args); } @Override public void run(ApplicationArguments args) throws Exception { startFromInsertion(() -> log.info("Runnable")); //startFromInsertion(() -> { // log.info("Runnable"); // decreaseHighPrice(); //}); log.info("after starting"); //decreaseHighPrice(); cdl.await(); //做一个等待 } private void decreaseHighPrice() { mongoTemplate.updateMulti(query(where("price").gte(3000L)), //查询出所有大于30块的咖啡 new Update().inc("price", -500L) //对于key,做一个减少500的操作 .currentDate("updateTime"), Coffee.class) //更新updatetime .doFinally(s -> { cdl.countDown(); log.info("Finnally 2, {}", s); }) .subscribe(r -> log.info("Result is {}", r)); } public void startFromInsertion(Runnable runnable){ mongoTemplate.insertAll(initCoffee()) .publishOn(Schedulers.elastic()) //将插入的结果publishOn到elastic线程池上 .doOnNext(c -> log.info("Next: {}", c)) //针对每一个next动作,都会打印我们取到的coffee .doOnComplete(runnable) //整体,即插入完成之后,我们去执行一个runnable对象,即() -> log.info("Runnable") .doFinally(s -> { cdl.countDown(); //计数器减一 log.info("Finnally 1, {}", s); }) //上面的操作最后返回的是一个flux流 对insert的结果做一个count .count() .subscribe(c -> log.info("Insert {} records", c)); } private List<Coffee> initCoffee() { Coffee espresso = Coffee.builder() .name("espresso") .price(Money.of(CurrencyUnit.of("CNY"), 30)) .createTime(new Date()) .updateTime(new Date()) .build(); Coffee latte = Coffee.builder() .name("latte") .price(Money.of(CurrencyUnit.of("CNY"), 20)) .createTime(new Date()) .updateTime(new Date()) .build(); return Arrays.asList(espresso, latte); } }
结果如下:
解开//decreaseHighPrice();注释
结果如上,我们可以看到,程序的执行并不是我们代码写的一样自上而下执行,先执行startFromInsertion,再执行decreaseHighPrice,而是先执行decreaseHighPrice,再执行startFromInsertion
保证先插入,再降价
public void run(ApplicationArguments args) throws Exception { //startFromInsertion(() -> log.info("Runnable")); startFromInsertion(() -> { log.info("Runnable"); decreaseHighPrice(); }); log.info("after starting"); //decreaseHighPrice(); cdl.await(); //做一个等待 }
结果如下:
插入和降价用的不是同一个线程
上一篇:
IDEA上Java项目控制台中文乱码