使用Spring WebFlux + R2DBC搭建消息推送服务

原创 吴就业 75 0 2020-10-31

本文为博主原创文章,未经博主允许不得转载。

本文链接:https://www.wujiuye.com/article/096946effd6b4250b007d0625d7fd7dc

作者:吴就业
链接:https://www.wujiuye.com/article/096946effd6b4250b007d0625d7fd7dc
来源:吴就业的网络日记
本文为博主原创文章,未经博主允许不得转载。

本篇文章写于2020年10月31日,从公众号|掘金|CSDN手工同步过来(博客搬家),本篇为原创文章。

消息推送服务主要是处理同步给用户推送短信通知或是异步推送短信通知、微信模板消息通知等。例如,在用户注册时需同步发送短信验证码、在订单发货时需异步推送微信模板消息通知或短信通知。

消费推送接口并发量可能不高,要求同步推送消息的场景不多,但仍需要考虑可能会存在流量突增情况。在促销活动的前后、拉新活动期间等等,都可能需要同步推送大量短信通知,而我们的目标是只要一个POD(一个进程)就能处理整个电商平台的消息推送。

至于可异步的消息推送则通过MQ对接,实现削峰填谷,并通过监听系统的负载情况动态的控制消息消费速度,让系统处在一个稳定的运行状态。

在团队协同定义完消息推送接口后,消息推送服务相当于只做一层代理,与网关非常相似,这也是我们考虑使用WebFlux的原因之一。以此降低消息推送服务的部署成本。

Spring WebFluxSpring WebMvc同为Web框架,不同的是,WebFlux是完全非阻塞的,能够实现以少量的线程处理并发请求、以更少的硬件资源获取系统更高的吞吐量。

但使用反应式编程可能不适合复杂业务的开发,也不适合采用了DDD领域驱动设计架构的项目,如果要使用,就必须要让响应式API侵入DDD的领域服务类、仓储类。

要使用Spring WebFlux提供完全非阻塞的接口,就必须要确保处理一个请求的整个流程都是非阻塞的,只要有一个步骤导致线程发生阻塞,WebFlux的性能就直线下降,为此你还要给WebFlux配置更多的线程,这与使用WebMvc并无差异,得不到高性能反而还增加项目的复杂性。

例如,处理接口请求阻塞在操作数据库上,那么默认WebFlux配置的几个线程都会被阻塞住,此时,如果想通过增加WebFlux的工作线程数来解决问题,那么不如直接切换回WebMvc

使用WebFlux获得高性能的同时必然要失去些什么,毕竟是等价替换。所以代码难以调试、项目代码复杂度提升难以阅读、并且会导致一些强依赖ThreadLocal实现特性的框架无法正常工作,我们不得不抛弃这些框架而寻找支持反应式的框架替代。

消息推送服务在处理一次消息推送请求的过程中,可能需要访问Redis、数据库RDS、以及第三方接口。

Redis用于缓存消息模板,但这块可以使用内存缓存替代以获取更快的响应速度,后期如果需要访问Redis,可以使用Lettuce替代Jedis

请求第三方接口则可以使用WebFlux提供的WebClient实现,用于替代诸如httpclientokhttp这类http客户端框架,实现可以使用单一长连接的非阻塞发送http请求。

最后可能需要持久化推送记录以便于后续报表的统计或其它,所以需要使用R2DBC替换JDBC实现非阻塞操作数据库。

R2DBCjdbc的关系类似于WebFluxWebMvc的关系,R2DBC是实现非阻塞操作数据库的规范,提供反应式编程API,目前已有多种实现该规范的数据库驱动程序包,如r2dbc-mysqlspring data r2dbc则是我们用来替代mybatisorm框架。

webflux兼容webmvc的全局异常处理机制,如果不嫌麻烦,也可以每个接口自行处理异常,例如:

@PostMapping("push/sms")
public Mono<GenericResponse<MessagePushResultDto>> genericSendSmsMsg(
            // webflux也支持参数检验
            @Validated @RequestBody Mono<XxxCommand> command) {
  return xxxxService.pushMessage(command)
             .flatMap(messagePushResultDto -> Mono.just(GenericResponse.success(messagePushResultDto)))
              // 处理异常,不处理则走全局异常处理
             .onErrorResume(throwable -> Mono.just(GenericResponse.fail(throwable.getMessage())));
    }

我们将Json解析封装成独立的组件,目的是适配多个json解析框架,让切换json解析框架只需要切换依赖jar包即可。为此,我们依然需要让JsonUtils替代WebFluxjson解析工作。代码实现如下。

private Mono<WxmbMessageResponse> sendTemplateMessage(WxmbMessageCommand command, String token) {
        return webClient.post().uri("/cgi-bin/message/template/send?access_token=" + token)
                .accept(MediaType.APPLICATION_JSON).acceptCharset(Charset.defaultCharset())
                .contentType(MediaType.APPLICATION_JSON)
                .bodyValue(command)
                .retrieve()
                .bodyToMono(WxmbMessageResponse.class);
}

项目需要添加mysqlr2dbc驱动包,以及spring-data-r2dbc,同时spring-data-r2dbc依赖的r2dbc-spi包也会被导入。

<!-- r2dbc mysql驱动-->
<dependency>
   <groupId>dev.miku</groupId>
   <artifactId>r2dbc-mysql</artifactId>
</dependency>
<!-- spring-data-r2dbc的starter包 -->
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>

spring-data-r2dbcspring-data-jpa的使用非常相似,两者都是实现spirng-data-commons下的repositoryAPIspring-data-r2dbc实现的是反应式API。简单的CRUD可通过继承R2dbcRepository<T, ID>接口实现,例如:

public interface MessageDao extends R2dbcRepository<MessagePushPO, Long> {
}

使用@Query自定义查询实现如下:

public interface MessageDao extends R2dbcRepository<MessagePushPO, Long> {
    @Query("select * from message where id = ?")
    Mono<MessagePO> selectByMsgId(Long msgId);
}

@Query注解不等于Mybatis@Select注解,@Query可以编写增删改查SQL,如果需要执行写操作,需要配合@Modifying注解使用。例如

public interface MessageDao extends R2dbcRepository<MessagePushPO, Long> {
    @Modifying
    @Query("delete from message where id = :msgId")
    Mono<Integer> deleteByMsgId(Long msgId);
}

对于复杂的查询,我们也可以直接使用spring-data-r2dbcAPI实现。例如:

DatabaseClient类似Mybatis中的SqlSession概念。

关于spring-data-r2dbc的使用,推荐阅读spring官方文档,虽然是英文,但阅读起来并不难理解,想要学习冷门技术,就必须要啃英文文档,因为你会发现,这方面的博客文章少之又少,还避免不了一些博客文章使用的spring-data-r2dbc版本与自己使用的版本不同存在API差异导致“copy”的代码画红线问题。

spring-data-r2dbc 1.1.0版本官方文档链接:https://docs.spring.io/spring-data/r2dbc/docs/1.1.0.RELEASE/reference/html/#reference,也可到spring.io官网搜索。

#后端

声明:公众号、CSDN、掘金的曾用名:“Java艺术”,因此您可能看到一些早期的文章的图片有“Java艺术”的水印。

文章推荐

深入浅出反应式编程原理,反应式编程入门

反应式编程不适用于业务开发,特别是复杂业务系统的开发,这或许就是反应式编程从推出到现在依然不温不火的原因吧。

如何使用Kafka订阅数据库的实时Binlog

订阅Binlog的目的在于,实现实时的缓存更新、处理复杂逻辑数据实时同步到Elasticsearch或其它库-表等业务场景,本篇介绍如何使用Kafka订阅数据库的实时Binlog。

Spring Data R2DBC快速上手指南

本篇内容介绍如何使用r2dbc-mysql驱动程序包与mysql数据库建立连接、使用r2dbc-pool获取数据库连接、Spring-Data-R2DBC增删改查API、事务的使用,以及R2DBC Repository。

教你如何编写一个IDEA插件,并掌握核心知识点PSI

IDEA有着极强的扩展功能,它提供插件扩展支持,让开发者能够参与到IDEA生态建设中,为更多开发者提供便利、提高开发效率。我们常用的插件有Lombok、Mybatis插件,这些插件都大大提高了我们的开发效率。即便IDEA功能已经很强大,并且也已有很多的插件,但也不可能面面俱到,有时候我们需要自给自足。

01-分享一次服务雪崩问题排查经历

笔者想跟大家分享笔者经历的一次服务雪崩事故,分析导致此次服务雪崩事故的原因。或许大多数读者都有过这样的经历,这是项目给我们上的一次非常宝贵的实战课程。

序言:为什么写这个专栏

随着微服务的流行,很多公司也在逐渐的将单体架构项目重构为微服务项目,单体架构微服务化后也将面临更多的挑战。服务的调用错综复杂,如何保护自身不被其它服务打垮也是项目微服务化后重点需要考虑的问题。