RocketMQ Producer 深入学习

学习一下火箭消息 - 生产者的原理和使用🚀

1、事务消息

主要分为两个流程:

1.1 正常消息的发送、提交

(1) producer 发送 Half 消息
(2) broker 本地写入 Half 消息(将 Topic 改成 RMQ_SYS_TRANS_HALF_TOPIC,该阶段 Consumer 由于没有订阅关系,无法消费)
(3) producer 根据 broker 写入消息结果,成功的话,执行本地事务;写入消息失败,producer 不执行本地事务
(4) 根据本地事务结果,往 broker 发送 Commit 或者 Rollback(如果是 Commit,将会将 Half 消息转回 Real Topic,生成消息索引,订阅者可以进行消费)

1.2 补偿流程:

(1) 半消息发送成功,但 broker 没收到 Commit 或 Rollback,进行状态回查(上图的第五步)
(2) Producer 收到回查消息,检查本地事务状态
(3) 根据本地事务状态,重新发送 Commit 或者 Rollback

补偿阶段用于解决消息 Commit 或者 Rollback 发送超时或者失败的情况

如果使用了事务消息,业务方需要实现该接口:

org.apache.rocketmq.client.producer.TransactionListener

1
2
3
4
5
6
7
8
9
10
11
public interface TransactionListener {
/**
* 当发送 Half 消息成功后,执行本地事务
*/
LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);
/**
* 如果 Commit 或者 Rollback 发送丢失,broker 进行消息回查,检查本地事务状态,重新发送 Commit 或 Rollback
*/
LocalTransactionState checkLocalTransaction(final MessageExt msg);
}

2、如何选择发送目的地(MessageQueue)

手动指定 MessageQueue 目的:分区有序性,实现顺序消费

例如 Topic 创建策略,默认分配单 Broker 上分配 8 个 MessageQueue。

Producer 在了解到要发往的 Topic 有 8 个消息队列,默认情况将会轮询发送,尽量让每个队列存储到的消息数量一致。

为了实现顺序消费,需要 Producer 在发送的时候,指定发送的目的地 - 特定的 MessageQueue,这时需要指定选择分区策略(MessageSelector)以及特定的分区键入参(arg)

可以查看具体的发送选择分区逻辑:

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendSelectImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private SendResult sendSelectImpl(Message msg, MessageQueueSelector selector, Object arg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout) {
long beginStartTime = System.currentTimeMillis();
this.makeSureStateOK();
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
MessageQueue mq = null;
List<MessageQueue> messageQueueList =
mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
Message userMessage = MessageAccessor.cloneMessage(msg);
String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());
userMessage.setTopic(userTopic);
// 这一步 selector.select(messageQueueList, userMessage, arg),选出特定的 MessageQueue
mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
long costTime = System.currentTimeMillis() - beginStartTime;
// 发送
return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);
}

待补充

分组的作用,例如消费者分组 consumerGroup 是为了消息消费的负载均衡,区分不同的消费者

生产者分组 producerGroup 的作用是为了 事务消息 的回查,根据分组进行二次确认,后续相关内容需要深入研究…