Spring Data R2DBC快速上手指南

原创 吴就业 77 0 2020-11-02

本文为博主原创文章,未经博主允许不得转载。

本文链接:https://www.wujiuye.com/article/6fb070e9adc041a8b718ab1852f79780

作者:吴就业
链接:https://www.wujiuye.com/article/6fb070e9adc041a8b718ab1852f79780
来源:吴就业的网络日记
本文为博主原创文章,未经博主允许不得转载。

本篇文章写于2020年11月02日,从公众号|掘金|CSDN手工同步过来(博客搬家),本篇为原创文章。

R2DBC基于Reactive Streams反应流规范,它是一个开放的规范,为驱动程序供应商和使用方提供接口(r2dbc-spi),与JDBC的阻塞特性不同,它提供了完全反应式的非阻塞API与关系型数据库交互。

简单说,R2DBC项目是支持使用反应式编程API访问关系型数据库的桥梁,定义统一接口规范,不同数据库厂家通过实现该规范提供驱动程序包。

R2DBC定义了所有数据存储驱动程序必须实现的SPI,目前实现R2DBC SPI的驱动程序包括:

同时,r2dbc还提供反应式连接池r2dbc-pool(https://github.com/r2dbc/r2dbc-pool)。

本篇内容:

使用r2dbc-mysql驱动程序包与mysql数据库建立连接

添加r2dbc-mysql依赖:

<!-- r2dbc mysql-->
<dependency>
    <groupId>dev.miku</groupId>
    <artifactId>r2dbc-mysql</artifactId>
    <version>0.8.2.RELEASE</version>
</dependency>

r2dbc-mysql实现了r2dbcConnectionFactory SPI接口。

首先创建连接工厂,再通过连接工厂获取到数据库连接。

ConnectionFactoryOptions options = ConnectionFactoryOptions.builder()
                .option(DRIVER, "mysql")
                .option(HOST, "127.0.0.1")
                .option(USER, "root")
                .option(PORT, 3306)
                .option(PASSWORD, "")
                .option(DATABASE, "r2dbc_stu")
                .option(CONNECT_TIMEOUT, Duration.ofSeconds(3))
                .build();
ConnectionFactory connectionFactory = ConnectionFactories.get(options);

从连接工厂获取一个数据库连接:

Publisher<? extends Connection> connectionPublisher = connectionFactory.create();

使用连接执行一条sql

Mono.from(connectionPublisher)
    .flatMapMany(conn -> conn.createStatement(
       "insert into person (id,name,age) values ('1111','wujiuye',25)")
       .execute())
    .flatMap(Result::getRowsUpdated)
    .switchIfEmpty(Mono.just(0)) 
    .onErrorResume(throwable -> {
        throwable.printStackTrace();
        return Mono.empty();
    })
    .subscribe(System.out::println);

使用r2dbc-pool获取数据库连接

添加r2dbc-pool依赖

<dependencies>
  <!-- r2dbc mysql -->
  <dependency>
    <groupId>dev.miku</groupId>
    <artifactId>r2dbc-mysql</artifactId>
    <version>0.8.2.RELEASE</version>
  </dependency>
  <!-- r2dbc-pool -->
  <dependency>
    <groupId>io.r2dbc</groupId>
    <artifactId>r2dbc-pool</artifactId>
    <version>0.8.2.RELEASE</version>
  </dependency>
</dependencies>

根据ConnectionFactory创建连接池(ConnectionPool):

ConnectionFactory connectionFactory = ....
ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactory)
                .maxIdleTime(Duration.ofMillis(1000))
                .maxSize(20)
                .build();
ConnectionPool pool = new ConnectionPool(configuration);

使用连接池创建连接:

Mono<Connection> connectionMono = pool.create();
// 将连接释放回连接池
connectionMono.flatMapMany(Connection::close).subscribe();
// 销毁连接池
pool.dispose();

r2dbc并没有定义连接池的接口,而r2dbc-pool通过实现ConnectionFactory接口巧妙的接管连接的创建,管理连接的生命周期。

在使用spring-data-r2dbc时,我们只需要将注册到bean工厂的ConnectionFactory替换为ConnectionPool即可使用连接池,例如:

@Bean
public ConnectionFactory connectionFactory(){
   ConnectionFactory connectionFactory = ....
   ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactory)
                .maxIdleTime(Duration.ofMillis(1000))
                .maxSize(20)
                .build();
    // ConnectionPool实现了ConnectionFactory接口,使用ConnectionFactory替换ConnectionFactory
    return new ConnectionPool(configuration);
}

不过我们不需要自己配置ConnectionFactoryspring-data-r2dbc自动帮我们配置好了。

所以我们使用spring-data-r2dbc时,像下面这样使用DatabaseClient执行sql,最终也是从连接池获取连接执行:

public class xxx{
    @Resource
    private ConnectionFactory connectionFactory;
  
    @Test
    public void test(){
       DatabaseClient client = DatabaseClient.create(connectionFactory);
       // .......
    }
}

在获取连接的地方下断点验证:

Spring-Data-R2DBC增删改查API

使用spring-data-r2dbc可直接通过依赖它的starter,依赖starter会将所需的jar包也都导入到项目中:

<dependencies>
    <!-- r2dbc mysql 库-->
    <dependency>
       <groupId>dev.miku</groupId>
       <artifactId>r2dbc-mysql</artifactId>
       <version>0.8.2.RELEASE</version>
    </dependency>
    <!-- 同时也会将r2dbc-pool导入 -->
    <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-data-r2dbc</artifactId>
       <version>2.3.0.RELEASE</version>
    </dependency>
</dependencies>

application.yml配置文件中添加配置:

### r2dbc
spring:
  r2dbc:
    url: r2dbc:mysql://localhost:3306/r2dbc_stu?useUnicode=true&characterEncoding=UTF-8
    username: root
    password: 
    pool:
      enabled: true
      max-size: 10
      initial-size: 10
      validation-query: select 1

DatabaseClient

DatabaseClientSpring Data R2DBC提供的具有功能性的反应式非阻塞API,用于与数据库交互。

DatabaseClient封装了资源的处理,例如打开和关闭连接,让我们可以更方便的执行增删改查SQL,而不必关心要释放连接。

DatabaseClientspirng-data-r2dbcorg.springframework.data.r2dbc.config.AbstractR2dbcConfiguration类完成,我们也可以继承AbstractR2dbcConfiguration类,替换一些默认配置。

@Configuration
public class R2dbcConfiguration extends AbstractR2dbcConfiguration {
    @Override
    @Bean // 这个注解不能少
    public ConnectionFactory connectionFactory() {
        // ....
    }
    
    @Bean
    ReactiveTransactionManager transactionManager(ConnectionFactory connectionFactory) {
        return new R2dbcTransactionManager(connectionFactory);
    }
}

insert api

示例:往person表插入一条记录

client.insert()
      .into("person")
      .value("id", "123")
      .value("name", "wujiuye")
      .nullValue("age", Integer.class)
      .fetch()
      .rowsUpdated()
      .subscribe(System.out::println);

多条sql组合执行:

Mono<Void> insert1 = client.insert().into("person")
         .value("id", "12345")
         .value("name", "wujiuye")
         .nullValue("age", Integer.class)
         .then();
Mono<Void> insert2 = client.insert().into("person")
          .value("id","123445555555")
          .then();
insert1.then(insert2).subscribe();

update api

示例:更新person表的id12345的记录:

client.update().table("person")
      .using(Update.update("name", "1111").set("age", 18))
      .matching(Criteria.where("id").is("12345"))
      .fetch()
      .rowsUpdated()
      .subscribe(rows -> System.out.println("更新记录数:" + rows));

delete api

示例:删除person表的name1111age18的记录:

client.delete().from("person")
      .matching(Criteria.where("name").is("1111").and("age").is(18))
      .fetch()
      .rowsUpdated()
      .subscribe(rows -> System.out.println("删除的记录总数为:" + rows));

select api

示例:查询personnamenull的记录:

Flux<Person> list = client.select().from("person")
         .matching(Criteria.where("name").isNull())
         .as(Person.class)
         .fetch()
         .all();
list.subscribe(System.out::println);

事务的使用

使用关系数据库时的一个常见模式是将多个查询分组到由事务保护的工作单元中,关系数据库通常将事务与单个连接关联。因此,使用不同的连接会导致使用不同的事务。

spring-data-r2dbcDatabaseClient中包含事务感知,允许使用Spring的事务管理在同一事务中对多个语句进行分组。

示例:使用then连接多条sql,只要有一条sql执行失败就回滚事务:

ReactiveTransactionManager tm = new R2dbcTransactionManager(connectionFactory);
TransactionalOperator operator = TransactionalOperator.create(tm);

Mono<Void> atomicOperation = client.execute("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
      .bind("id", "joe") //绑定参数
      .bind("name", "Joe")
      .bind("age", 34)
      .fetch().rowsUpdated()
      .then(client.execute("INSERT INTO contacts (id, name) VALUES(:id, :name)")
                  .bind("id", "joe")
                  .bind("name", "Joe")
                  .fetch().rowsUpdated())
      .then();
// 将atomicOperation放到事务中执行
operator.transactional(atomicOperation).subscribe();

spring-data-r2dbc也支持基于注解的声明式事务。spring-data-r2dbc自动配置了ReactiveTransactionManager@EnableTransactionManagement启动声明式事务管理,因此可直接使用,而不要添加额外配置。例如:

@Component
public class TxService {
    
    @Resource
    private DatabaseClient client;

    @Transactional(rollbackFor = Throwable.class)
    public Mono<Void> txInsert() {
        Person person = new Person();
        person.setId('12123');
        return client.insert().into(Person.class)
                .using(person)
                .fetch().rowsUpdated()
                .then(client.insert().into(Person.class)
                        .using(person)
                        .fetch().rowsUpdated()
                        .then());
    }
}

// 测试
public class R2dbcTxTest extends SupporSpringBoot {

    @Resource
    private TxService txService;

    @Test
    public void testTx() {
        txService.txInsert().doOnError(throwable -> {
            System.out.println("执行失败");
            throwable.printStackTrace();
        }).subscribe();
    }

}

R2DBC Repository

Spring-data-r2dbc也实现了spring data repository的反应式API

现在,我们通过一个示例,学习如何使用声明式事务包装一系列调用Repository方法的操作。

1、声明持久化对象PO

@Table("person")
public static class Person {
   @Id
   private String id;
   private String name;
   private int age;
}

2、创建Dao(为了与领域驱动设计中的Repository做区分),继承R2dbcRepository接口:

public interface PersonDao extends R2dbcRepository<R2dbcStuMain.Person, String> {
    @Modifying
    @Query("insert into person (id,name,age) values(:id,:name,:age)")
    Mono<Integer> insertPerson(String id, String name, Integer age);
}

3、创建Service,在Service中创建一个事务方法,链接多次调用PersonDaoinsertPerson方法。

@Service
public class PersonService {

    @Resource
    private PersonDao personDao;

    @Transactional(rollbackFor = Throwable.class)
    public Mono<Integer> addPerson(Person... persons) {
        Mono<Integer> txOp = null;
        for (Person person : persons) {
            if (txOp == null) {
                txOp = personDao.insertPerson(person.getId(), person.getName(), person.getAge());
            } else {
                txOp = txOp.then(personDao.insertPerson(person.getId(), person.getName(), person.getAge()));
            }
        }
        return txOp;
    }
}

测试事务是否生效:

@SpringBootApplication
@EnableR2dbcRepositories
public class R2dbcApplication {
    public static void main(String[] args) throws InterruptedException {
        ConfigurableApplicationContext context = SpringApplication.run(R2dbcApplication.class);
        PersonService personService = context.getBean(PersonService.class);
        Person person = new Person();
        person.setId("12347");
        person.setName("wjy");
        person.setAge(25);
        // 测试事务方法,验证主键重复时是否还有数据插入成功
        personService.addPerson(person, person)
                .doOnError(Throwable::printStackTrace)
                .subscribe(System.out::println);
      
        TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
    }
}

参考文献

#后端

声明:公众号、CSDN、掘金的曾用名:“Java艺术”,因此您可能看到一些早期的文章的图片有“Java艺术”的水印。

文章推荐

如何并行消费Kafka拉取的数据库Binlog,提升吞吐量

本篇介绍如何并行消费Kafka拉取的数据库Binlog,以及使用Kafka订阅Binlog字段值获取防坑指南(阿里云DTS)。

深入浅出反应式编程原理,反应式编程入门

反应式编程不适用于业务开发,特别是复杂业务系统的开发,这或许就是反应式编程从推出到现在依然不温不火的原因吧。

如何使用Kafka订阅数据库的实时Binlog

订阅Binlog的目的在于,实现实时的缓存更新、处理复杂逻辑数据实时同步到Elasticsearch或其它库-表等业务场景,本篇介绍如何使用Kafka订阅数据库的实时Binlog。

使用Spring WebFlux + R2DBC搭建消息推送服务

消息推送服务主要是处理同步给用户推送短信通知或是异步推送短信通知、微信模板消息通知等。本篇介绍如何使用Spring WebFlux + R2DBC搭建消息推送服务。

教你如何编写一个IDEA插件,并掌握核心知识点PSI

IDEA有着极强的扩展功能,它提供插件扩展支持,让开发者能够参与到IDEA生态建设中,为更多开发者提供便利、提高开发效率。我们常用的插件有Lombok、Mybatis插件,这些插件都大大提高了我们的开发效率。即便IDEA功能已经很强大,并且也已有很多的插件,但也不可能面面俱到,有时候我们需要自给自足。

01-分享一次服务雪崩问题排查经历

笔者想跟大家分享笔者经历的一次服务雪崩事故,分析导致此次服务雪崩事故的原因。或许大多数读者都有过这样的经历,这是项目给我们上的一次非常宝贵的实战课程。