RocketMQ 学习与分享

学习一下火箭消息的原理和使用🚀

之前使用的消息中间件是 ActiveMQ,由于缺少维护和资料文档,出现问题不好排查,于是在此契机下,经过调研选择使用 RocketMQ,下面来分享一下对于它的基础学习。

RocketMQ 一个纯 Java、分布式、队列模型的开源消息中间件,前身是 MetaQ,是阿里研发的一个队列模型的消息中间件,后开源给 apache 基金会成为了 apache的顶级开源项目,具有高性能、高可靠、高实时、分布式特点。

借鉴于 Kafka,对比两者,RocketMQ 偏向于稳定以及业务型操作。

一、整体架构

核心组件有四个 Nameserver、Broker、Producer、Consumer

上图各个模块都会互相建立长链接来进行联系,这种架构模式跟 Dubbo 有点像,都有一个注册中心来维护相关信息,不同点在于 Broker 这模块,在消息系统中起着重要的作用。

二、优点特性

稳定性、高性能和丰富的消息类型

网上已经有很多相关的优缺点介绍,这里推荐两篇:

阿里 RocketMQ 性能介绍

RocketMQ与kafka对比(18项差异)

三、简述各个名词概念

Topic

消息主题,是一种消息的逻辑分类,例如这类消息属于哪类操作。例如库存相关、订单相关、活动相关等。理解成一种抽象的分类规范,大家的操作按照 Topic 进行分类,不同的 Producer 将消息发送到指定的 Topic,不同的 Consumer 订阅指定的 Topic,从上面拉取消息消费,屏蔽了底层的消息存储。

Tag

对 Topic 进一步细化,在阿里云官方文档有这行注释 “Message Tag,可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在消息队列 RocketMQ 版的服务器过滤”

更多具体可以看这篇:Topic 与 Tag 最佳实践

Message

它是消息队列中 消息传递的载体

发送消息是指发送到某个主题 Topic 下,其中每条消息包括以下几部分:

  • Message ID

消息的全局唯一标识,由 RocketMQ 发送过程中自动生成,唯一标识某条信息

  • Message key

消息的业务标识,由消息生产者 Producre 设置,唯一标识某个业务逻辑。

  • Message Body

消息携带的内容体,一般在这里自定义传递的内容,记得要将消息内容序列化。

具体消息 Message 核心结构可以参考控制台里展示的 Message Detail

MessageQueue

前面提到的 Topic 是抽象概念,实际发送消息和消费消息的地方是 Message Queue,每个 Topic 下可能有多个消息队列。引入队列的原因是为了提高可用性和灵活性,按照队列的性质 FIFO,先发送的消息先消费。

例如默认情况下,一个 Topic 会分配四个 Message Queue(参数配置:defaultTopicQueueNums),如果有两个 Broker,那将会平分两个,如果是本地搭建一个 Broker,那么应该跟我一样看到的是:一个 Broker 下有四个 Message Queue

消息发送的目的地和消费的获取源就是 Message Queue

Group

分组信息,一个组可以订阅多个 Topic

具体可以分为 Producer GroupConsumer Group,一个应用里都可以建立多个发送者组和消费者组,不过推荐的用法是一个应用指定一个 Producer Group,统一消息发送者的信息。

一般一个应用只需设定一个消费者组,单独订阅主题进行消费。如果一个 Topic 在一个应用中想设定两个处理逻辑,那么可以配置不同的消费者分组,可以实现对同一个主题消息设定不同的处理者 Handler

Offset

位移量,用来保存消息消费的进度。

从上面了解到,一个 Broker 下会有多个 Message Queue,我们需要用一个下标来记录消息消费的位置。通过 Offset 可以定位到目前消费完成的消息位置,指示 Consumer 下一条要从 Offest 后面位置消费消息。

在代码中,Offsetlong 基础类型,根据它来访问 Message Queue 指定位置的消息。

Order 消息有序性

一种按照顺序进行发布和消费的消息类型,分为 全局顺序消息和分区顺序消息

全局顺序:这个比较好理解,对于同一个 Topic 的消息,无论消费者有多少个,消息出队只能一个一个按照顺序来,下一个 Message 的消费依赖于前一个消费完成。适用于性能要求不高的场景,不过基本挺少选择该模式。

分区顺序:通过某个 Sharding Key 来进行区块分区。同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。

这里是文档中举的例子:

电商的订单创建,以订单 ID 作为 Sharding Key,那么同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息都会按照发布的先后顺序来消费。

可以通过 Sharding Key 来保证同一类型、用户的消息顺序发送和消费,既保证了高并发处理消息,也保障了业务上的连贯性。

上图展示的是 分区顺序,从代码实现来说,需要设定自定义 Selector,然后对入参 arg 进行解析和根据策略选择 mq,例如常见的哈希取模,选择策略可以参考这个类 :SelectMessageQueueByHash

消息消费模式

RocketMQ 中,实现的消费模式有两种

  • 拉(PULL)模式: Consumer 主动从消息服务器 Broker 获取消息。
  • 推(PUSH)模式: 消息服务器 Broker 主动推送消息到 Consumer

引用【藤原豆腐店-】的描述:

  • MQPullConsumer: 取消息的过程需要用户自己写,首先通过打算消费的Topic 拿到 MessageQueue 的集合,遍历 MessageQueue 集合,然后针对每个 MessageQueue 批量取消息,一次取完后,记录该队列下一次要取的开始offset,直到取完了,再换另一个 MessageQueue

  • MQPushConsumer: consumer 把轮询过程封装了,并注册 MessageListener 监听器,取到消息后,唤醒 MessageListenerconsumeMessage() 来消费,对用户而言,感觉消息是被推送(push)过来的。

查看 SpringBoot 里面的 @RocketMQMessageListener 注解实现方式和团队使用二次封装的 RocketMQ,发现使用的都是 MQPushConsumer,封装好了 pull 轮询过程,所以可以认为,RocketMQ 使用的是 Pull 拉取模式的消费模式。

消息重复

消息的语义有三种:

  • 最多一次(At most once)
  • 最少一次(At least once)
  • 精确一次(Exactly once)

由于网络波动原因,无法避免消息在网络传输时,发送端认为第一次发送失败后,进行发送重试,于是乎我们要解决的问题可以理解成:两条一样的消息,如何保证程序处理正确?

目前常见的做法有两种:

1、保证消息的 幂等性

2、消息系统 过滤重复的消息,或者 消费端 过滤重复的消息

幂等性: 利用数学上的概念加深理解,例如有个函数 f(x),x 是消息,那么无论对消息重复消费多少次,f(f(x)) 的结果都是一样的。不会因为重复消费而产生副作用,保证数据的正确性。

消息去重: 这个比较好理解,每条消息带有全局唯一的 Message ID,可以在消息系统 Broker 处进行过滤,也可以在消费者 Consumer 处进行过滤。

目前在介绍中,没看到 RocketMQBroker 处进行过滤去重,所以需要在消费者端进行过滤。可以考虑新增一张数据库表,记录处理过的 Message ID,如果遇到重复的消息就不再进行处理,在处理中的消息可以先放入 Redis,避免同时消费一样的消息~

四、详说核心模块

从前面的架构图中可以看到有四大核心模块,从生产者发送消息到消费者消费经历过以下的流程:

先以 Producer 角度来说,与其中一个 ns 建立长链接,然后定期发送心跳维持状态,获取 Topic 主题路由信息,然后与 Broker Master 建立长链接,定期发送心跳判断是否可用。根据发送消息的类型,判断是否需要 Broker 的返回值。

Consumer 角度来说,与 Producer 不同点在于,它可以从 Broker Master 订阅消息,也能够从 Broker Salve 订阅消息,这里不再重复画图。

由于 RocketMQ 是纯 Java 语言编写的,所以可以在 Github 中下载源码,查看每个模块的详细设计。

Nameserver

Nameserver 来管理消息订阅,消息发送和消费信息,集群中的各个服务需要通过 Nameserver 来了解各自的状态。

有点像 Dubbo 中的注册中心 ZookeeperNameServer 中维护着 Producer 集群、Broker 集群、 Consumer 集群的服务状态。通过定时(默认是 30s)发送心跳数据包进行维护更新各个服务的状态。

①、接收 Broker 的请求,注册 Broker 的路由信息

②、接口 Client 的请求,根据某个 Topic 获取其到 Broker 的路由信息

NameServer 没有状态,可以横向扩展。每个 Broker 在启动的时候会到NameServer 注册;Producer 在发送消息前会根据 TopicNameServer 获取路由(到 Broker)信息;Consumer 也会定时获取 Topic 路由信息。

关于 Namesrv,了解上述概念后,可以在代码中查看具体的启动流程:

1
org.apache.rocketmq.namesrv.NamesrvStartup#main

Broker

Broker 的定位是消息代理存储服务器,职责是负责持久化消息还有管理消息消费的进度。

介绍一下它的特性:

① 与所有 Namesrv 节点保持长链接和心跳,定时(默认 30s)将 Topic 信息注册到 Namesrv

② 负责存储消息,以 Topic 为维度支持轻量级的队列,单机可以支撑上万队列规模,支持消息推拉模型。

③ 具有上亿级消息堆积能力,同时可以严格保证消息的有序性。

具体启动代码入口在:

1
org.apache.rocketmq.broker.BrokerStartup#main

初始化过程中做的事情有点多,消息存储、远端服务、过滤器等等,粗看一眼有点晕,没有继续往下跟,感兴趣的可以去源码中瞅瞅=-=

Producer

消息发送者,往消息队列发送消息的主体角色。

下图是 RocketMQTemplate 调用 convertAndSend 方法调用的时序图:

核心步骤在 DefaultMQProducerImpl 的发送实现方法,底层选择要发往的 MessageQueue,执行前置钩子、通过 NettyClinet 来发送请求,发送完成后执行后置钩子,最后返回 SendResult

具体可以在 SpringBoot 中引入 RocketMQ-Starter 依赖,然后发送消息,查看整体调用链路。

Consumer

  • 识别依据

一个消费者分组中,可以设定很多个 Listener,来分别消费不同 Topic 下的消息。

在消息消费时,需要通过 ConsumerGroup + Topic + Tag 来唯一确定 Listener

所以同一个消费组、同一个主题下,不可以出现相同 TagListener,应用在启动时会报错。

  • 代码实现

实现代码与 Producer 在同一个模块:client

在讲消息消费之前,先来看下应用启动时,它将扫描打上 RocketMQMessageListener 注解的 beans,然后进行注册容器

按照我的理解,在 for 循环中注册扫描到的 bean,接着在 createRocketMQListenerContainer 组装 DefaultRocketMQListenerContainer,然后在 Spring 容器里注册,等待之后的消费。

同时有一个守护线程在不断从 Broker 上拉取消息,监听到符合条件的消息后进行消费:

上面大致介绍的是应用启动时注册 Listener 和循环获取消息的过程,具体 Consumer 启动时的代码入口在这里:

1
org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer#start

消费者启动注册的流程比较长,需要慢慢看~

五、具体使用(代码和监控 UI)

下载代码(external 和 二进制启动包)

  • external:扩展内容,为了获得监控台 Console 地址如下:

    1
    https://github.com/apache/rocketmq-externals.git
  • install:应用启动包,用来部署 NamesrvBroker

    1
    https://rocketmq.apache.org/dowloading/releases/
  • 程序实现代码:已经开源,贡献给 Github,可以下载自行打包使用和参考实现思路,学习学习~

    1
    https://github.com/apache/rocketmq

启动 nameserver 和 broker

在启动之前,请确保本地全局变量中包含 JAVA_HOME 变量,例如:

1
2
$ echo ${JAVA_HOME}
/Library/Java/JavaVirtualMachines/jdk1.8.0_221.jdk/Contents/Home

进入下载的 releases 二进制安装包目录

1
2
$ cd rocketmq-all-4.7.0-bin-release/bin
$ sh paly.sh

点击查看 play.sh 脚本,可以看到它将会启动 NamesrvBroker 两个服务~启动好之后,就能够往 Broker 发送和消费消息

Springboot 集成 rocker-starter

这里使用的是 Springboot 集成 starter 模块,具体可以参考这篇文章:

https://www.baeldung.com/apache-rocketmq-spring-boot

个人整合后,放入了 DemoRocket 目录下:

https://github.com/Vip-Augus/springboot-note

这里要说下 Listener 消息消费者处理流程,应用不断往 broker 获取监听的 Topic 消息,然后找到对应的 Consumer 进行消费:

可以跟踪上图左侧的调用链路,了解消费者消费消息的整体链路。

启动监控 UI 使用

进入刚才下载的 external 路径,里面有个 rocketmq-console 目录,防止占用 8080 端口,需要修改一下,具体要修改的地方有两处:

1
2
3
4
5
# UI 监控系统的访问端口
server.port=10010
# Namesrv 的地址,如果有多个,请用分号 ; 分隔开
#if this value is empty,use env value rocketmq.config.namesrvAddr NAMESRV_ADDR | now, you can set it in ops page.default localhost:9876
rocketmq.config.namesrvAddr=localhost:9876

rocket-console 是一个 Springboot 项目,修改相关配置后,需要经过打包然后部署

1
2
$ mvn clean install -DskipTests
$ java -jar target/rocketmq-console-ng-1.0.1.jar

接着访问前面设定的端口,就能看到监控平台:

顶部的导航栏标识了它拥有的功能,例如想看某个 Topic 发往哪个 Broker,然后被订阅的消费组有哪些,都可以通过监控来进行查看:

六、后续学习计划

日常使用中,基本可以不去修改 NameserverBroker 这两个模块的服务,更多关注的是 ProducerConsumer 的使用,同时可以对他们进行二次封装,替换其中的 DefaultMQProducerMQConsumer 实现类,打造适合自身业务的发送者和消费者。

本次分享大介绍了 RocketMQ 的设计架构、核心模块的设计、源码位置和消息发送以及消费的大致流程,介绍了 RocketMQTemplate 基础使用还有监控 Console UI 的查看和使用,但关于位移量、消息存储格式、同步、异步刷盘方式和消息重复等等详细设计都还没有去了解,有机会的话之后再去了解和分享~

参考资料

1、阿里云产品官方文档

2、阿里云 RocketMQ 名词解释

3、《深入理解RocketMQ》- MQ消息的投递机制

4、消息中间件系列(九):详解RocketMQ的架构设计、关键特性、与应用场景

5、《浅入浅出》-RocketMQ

6、RocketMQ 分享全纪实

7、阿里 RocketMQ 性能介绍

8、RocketMQ与kafka对比(18项差异)

9、RocketMQ消息消费方式 推拉模式