RocketMQ架构篇 - 事务消息
迪丽瓦拉
2024-06-01 09:33:50
0

前言

以电商交易场景为例,用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统的变更。

普通消息无法像数据库事务一样具备提交、回滚和统一协调的能力。而 RocketMQ 的分布式事务消息是在普通消息的基础上,将二阶段提交与本地事务绑定,实现全局提交结果的一致性

请添加图片描述

事务消息发送过程概述

请添加图片描述

事务消息发送步骤如下:

1、生产者将半事务消息发送到 RocketMQ Broker。

2、RocketMQ Broker 将消息持久化成功之后,向生产者返回 ACK 确认消息已经发送成功,此时消息不会投递给消费者。半事务消息更换主题、队列ID等信息,然后将原来的主题、队列ID等信息作为属性进行存储。

3、生产者开始执行本地事务,并根据本地事务的执行结果向服务端提交二次确认结果(Commit、Rollback)。

4、服务端收到确认结果后处理逻辑如下:

  • 二次确认结果为 Commit:基于半事务消息的相关属性以及它原来的主题、队列ID来创建新的事务消息并进行持久化,如果持久化成功则可以投递给消费者,最后丢弃半事务消息。
  • 二次确认结果为 Rollback:丢弃半事务消息。

半事务消息的丢弃,其实是将半事务消息移动到一个新的主题以及OP队列中,而非直接物理删除。

5、服务端每隔60秒进行一次检查,如果发现一直(6秒内)没有收到生产者提交的二次确认结果,或者服务端收到的二次确认结果为 Unknown 未知状态,则服务端对生产者发起消息回查。如果服务端发现原始生产者已经崩溃,则会向同一生产者组的其它生产者实例发起消息回查。

6、生产者收到消息回查后,检查本地事务执行的最终状态并再次提交二次确认结果。

源码分析-步骤一

生产者将半事务消息发送到 RocketMQ Broker。

TransactionMQProducer

首先看下 TransactionMQProducer 的 sendMessageInTransaction 方法的处理逻辑。

@Override
public TransactionSendResult sendMessageInTransaction(final Message msg,final Object arg) throws MQClientException {// 校验事务监听器是否存在if (null == this.transactionListener) {throw new MQClientException("TransactionListener is null", null);}// 交给 DefaultMQProducerImpl 继续处理return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
}

DefaultMQProducerImpl

接下来看下 DefaultMQProducerImpl 的 sendMessageInTransaction 方法的处理逻辑。

public TransactionSendResult sendMessageInTransaction(final Message msg,final LocalTransactionExecuter localTransactionExecuter, final Object arg)throws MQClientException {TransactionListener transactionListener = getCheckListener();// 校验本地事务执行器、事务监听器是否同时为空(本地事务执行器将会在 5.0.0 版本移除,推荐使用事务监听器)if (null == localTransactionExecuter && null == transactionListener) {throw new MQClientException("tranExecutor is null", null);}// 校验主题、消息体是否符合要求Validators.checkMessage(msg, this.defaultMQProducer);SendResult sendResult = null;// 对消息添加TRAN_MSG属性(属性值为true),也就是标记为事务消息MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");// 对消息添加PGROUP属性(属性值为生产者组)MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());try {// 1、发送半事务消息sendResult = this.send(msg);} catch (Exception e) {throw new MQClientException("send message Exception", e);}LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;Throwable localException = null;// 判断消息的发送状态switch (sendResult.getSendStatus()) {case SEND_OK: {try {if (sendResult.getTransactionId() != null) {// 对消息添加"__transactionId__"属性msg.putUserProperty("__transactionId__", sendResult.getTransactionId());}// 获取消息的UNIQ_KEY属性对应的属性值String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);if (null != transactionId && !"".equals(transactionId)) {// 设置事务idmsg.setTransactionId(transactionId);}if (null != localTransactionExecuter) {localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);} else if (transactionListener != null) {log.debug("Used new transaction API");// 2、执行本地事务localTransactionState = transactionListener.executeLocalTransaction(msg, arg);}if (null == localTransactionState) {localTransactionState = LocalTransactionState.UNKNOW;}if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {log.info("executeLocalTransactionBranch return {}", localTransactionState);log.info(msg.toString());}} catch (Throwable e) {log.info("executeLocalTransactionBranch exception", e);log.info(msg.toString());localException = e;}}break;case FLUSH_DISK_TIMEOUT:case FLUSH_SLAVE_TIMEOUT:case SLAVE_NOT_AVAILABLE:localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;break;default:break;}try {// 3、生产者根据本地事务的执行结果向服务端提交二次确认结果this.endTransaction(sendResult, localTransactionState, localException);} catch (Exception e) {log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);}// 组装TransactionSendResult实例并返回TransactionSendResult transactionSendResult = new TransactionSendResult();transactionSendResult.setSendStatus(sendResult.getSendStatus());transactionSendResult.setMessageQueue(sendResult.getMessageQueue());transactionSendResult.setMsgId(sendResult.getMsgId());transactionSendResult.setQueueOffset(sendResult.getQueueOffset());transactionSendResult.setTransactionId(sendResult.getTransactionId());transactionSendResult.setLocalTransactionState(localTransactionState);return transactionSendResult;
}

1.1 send 方法

接下来看下 send 方法的处理逻辑。

public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {// sendMsgTimeout:默认3000,即发送消息的超时时间return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}public SendResult send(Message msg,long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {// 事务消息默认采用同步的方式return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}
private SendResult sendDefaultImpl(Message msg,final CommunicationMode communicationMode,final SendCallback sendCallback,final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {// 校验生产者的状态this.makeSureStateOK();// 校验主题、消息体是否符合要求Validators.checkMessage(msg, this.defaultMQProducer);final long invokeID = random.nextLong();long beginTimestampFirst = System.currentTimeMillis();long beginTimestampPrev = beginTimestampFirst;long endTimestamp = beginTimestampFirst;TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {boolean callTimeout = false;MessageQueue mq = null;Exception exception = null;SendResult sendResult = null;// 计算最多可以发送消息的次数// retryTimesWhenSendFailed 参数用于发送失败的重试次数,默认 2int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;int times = 0;String[] brokersSent = new String[timesTotal];for (; times < timesTotal; times++) {String lastBrokerName = null == mq ? null : mq.getBrokerName();// 从消息队列列表中选取一个消息队列MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);if (mqSelected != null) {mq = mqSelected;brokersSent[times] = mq.getBrokerName();try {beginTimestampPrev = System.currentTimeMillis();long costTime = beginTimestampPrev - beginTimestampFirst;// 如果调用超时,则不再进行重试if (timeout < costTime) {callTimeout = true;break;}// 调用sendKernelImpl方法继续处理sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);switch (communicationMode) {case ASYNC:return null;case ONEWAY:return null;case SYNC:if (sendResult.getSendStatus() != SendStatus.SEND_OK) {// 获取 retryAnotherBrokerWhenNotStoreOK 参数值,表示发送失败时是否重试其它的broker,默认falseif (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {continue;}}return sendResult;default:break;}} catch (RemotingException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;continue;} catch (MQClientException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;continue;} catch (MQBrokerException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;switch (e.getResponseCode()) {case ResponseCode.TOPIC_NOT_EXIST:case ResponseCode.SERVICE_NOT_AVAILABLE:case ResponseCode.SYSTEM_ERROR:case ResponseCode.NO_PERMISSION:case ResponseCode.NO_BUYER_ID:case ResponseCode.NOT_IN_CURRENT_UNIT:continue;default:if (sendResult != null) {return sendResult;}throw e;}} catch (InterruptedException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());log.warn("sendKernelImpl exception", e);log.warn(msg.toString());throw e;}} else {break;}}if (sendResult != null) {return sendResult;}String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",times,System.currentTimeMillis() - beginTimestampFirst,msg.getTopic(),Arrays.toString(brokersSent));info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);MQClientException mqClientException = new MQClientException(info, exception);if (callTimeout) {throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");}if (exception instanceof MQBrokerException) {mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());} else if (exception instanceof RemotingConnectException) {mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);} else if (exception instanceof RemotingTimeoutException) {mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);} else if (exception instanceof MQClientException) {mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);}throw mqClientException;}// 校验 NameServer 地址列表是否为空,如果为空,则抛出异常List nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();if (null == nsList || nsList.isEmpty()) {throw new MQClientException("No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);}throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}

接着看 sendKernelImpl 方法的处理。

private SendResult sendKernelImpl(final Message msg,final MessageQueue mq,final CommunicationMode communicationMode,final SendCallback sendCallback,final TopicPublishInfo topicPublishInfo,final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {long beginStartTime = System.currentTimeMillis();// 从缓存中获取broker地址String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());if (null == brokerAddr) {tryToFindTopicPublishInfo(mq.getTopic());brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());}SendMessageContext context = null;if (brokerAddr != null) {// vipChannelEnabled 参数用于判断是否开启vip通道,默认 falsebrokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);byte[] prevBody = msg.getBody();try {// 如果消息不是批次消息if (!(msg instanceof MessageBatch)) {// 对消息添加一个 UNIQ_KEY 属性MessageClientIDSetter.setUniqID(msg);}int sysFlag = 0;boolean msgBodyCompressed = false;// 如果单条消息的消息体的大小超过了4KB(由 compressMsgBodyOverHowmuch 参数控制当消息大小达到多少时可以进行压缩,默认4096),则对消息体进行压缩if (this.tryToCompressMessage(msg)) {sysFlag |= MessageSysFlag.COMPRESSED_FLAG;// 标记消息已压缩msgBodyCompressed = true;}// 获取消息的"TRAN_MSG"属性值final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;}// 如果CheckForbiddenHook列表非空if (hasCheckForbiddenHook()) {CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());checkForbiddenContext.setCommunicationMode(communicationMode);checkForbiddenContext.setBrokerAddr(brokerAddr);checkForbiddenContext.setMessage(msg);checkForbiddenContext.setMq(mq);checkForbiddenContext.setUnitMode(this.isUnitMode());	// 触发所有CheckForbiddenHook实例的checkForbidden方法this.executeCheckForbiddenHook(checkForbiddenContext);}// 如果SendMessageHook列表非空if (this.hasSendMessageHook()) {context = new SendMessageContext();context.setProducer(this);context.setProducerGroup(this.defaultMQProducer.getProducerGroup());context.setCommunicationMode(communicationMode);context.setBornHost(this.defaultMQProducer.getClientIP());context.setBrokerAddr(brokerAddr);context.setMessage(msg);context.setMq(mq);// 获取消息的"TRAN_MSG"属性值String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (isTrans != null && isTrans.equals("true")) {// 设置消息类型为半事务消息context.setMsgType(MessageType.Trans_Msg_Half);}// 如果消息的"__STARTDELIVERTIME"属性值或者"DELAY"属性值非空if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {// 设置消息类型为延迟消息context.setMsgType(MessageType.Delay_Msg);}// 触发所有SendMessageHook实例的sendMessageBefore方法this.executeSendMessageHookBefore(context);}// 组装请求头SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());requestHeader.setTopic(msg.getTopic());requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());requestHeader.setQueueId(mq.getQueueId());requestHeader.setSysFlag(sysFlag);requestHeader.setBornTimestamp(System.currentTimeMillis());requestHeader.setFlag(msg.getFlag());requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));requestHeader.setReconsumeTimes(0);requestHeader.setUnitMode(this.isUnitMode());requestHeader.setBatch(msg instanceof MessageBatch);// 如果主题是以"%RETRY%"开头的if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {// 获取消息的"RECONSUME_TIME"属性值String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);if (reconsumeTimes != null) {requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);}// 获取消息的"MAX_RECONSUME_TIMES"属性值String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);if (maxReconsumeTimes != null) {requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);}}SendResult sendResult = null;switch (communicationMode) {......case ONEWAY:case SYNC:// 计算当前花费的时间long costTimeSync = System.currentTimeMillis() - beginStartTime;// 如果超时,则抛出异常if (timeout < costTimeSync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}// 生产者将半事务消息发送到 RocketMQ BrokersendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),msg,requestHeader,timeout - costTimeSync,communicationMode,context,this);break;default:assert false;break;}// 如果SendMessageHook列表非空if (this.hasSendMessageHook()) {// 上下文记录发送结果context.setSendResult(sendResult);// 触发所有SendMessageHook实例的sendMessageAfter方法this.executeSendMessageHookAfter(context);}// 返回发送结果return sendResult;......
}

源码分析-步骤二

RocketMQ Broker 将消息持久化成功之后,向生产者返回 ACK 确认消息已经发送成功,此时消息不会投递给消费者。半事务消息更换主题、队列ID等信息,然后将原来的主题、队列ID等信息作为属性进行存储。

SendMessageProcessor

在 BrokerController 的 registerProcessor 方法中,截取部分关键代码,如下:

// 构造SendMessageProcessor实例
SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
// 对于SendMessageProcessor实例,注册SendMessageHook列表
sendProcessor.registerSendMessageHook(sendMessageHookList);
// 对于SendMessageProcessor实例,注册ConsumerMessageHook列表
sendProcessor.registerConsumeMessageHook(consumeMessageHookList);// 为不同类型的请求码注册不同的处理器
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);

简单看下 NettyRemotingServer 的 registerProcessor 方法的处理逻辑。

@Override
public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {ExecutorService executorThis = executor;// 如果执行器为空,则使用默认的publicExecutor执行器if (null == executor) {executorThis = this.publicExecutor;}Pair pair = new Pair(processor, executorThis);// 将请求码以及对应的处理器、执行器注册到processorTable缓存中this.processorTable.put(requestCode, pair);
}

其中 publicExecutor 是在 NettyRemotingServer 的构造器方法中进行定义的。

this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());}
});

其次,SendMessageProcessor 作为 NettyRequestProcessor 的实现类,看下它对于 processRequest 方法的实现。

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {RemotingCommand response = null;try {response = asyncProcessRequest(ctx, request).get();} catch (InterruptedException | ExecutionException e) {log.error("process SendMessage error, request : " + request.toString(), e);}return response;
}

进入 asyncProcessRequest 方法,一窥究竟。

public CompletableFuture asyncProcessRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final SendMessageContext mqtraceContext;switch (request.getCode()) {case RequestCode.CONSUMER_SEND_MSG_BACK:return this.asyncConsumerSendMsgBack(ctx, request);default:// 解析RemotingCommand实例并得到请求头信息SendMessageRequestHeader requestHeader = parseRequestHeader(request);if (requestHeader == null) {return CompletableFuture.completedFuture(null);}// 借助ChannelHandlerContext和解析后得到的请求头信息,构建SendMessageContext实例mqtraceContext = buildMsgContext(ctx, requestHeader);// 主要用于触发所有的SendMessageHook的sendMessageBefore方法this.executeSendMessageHookBefore(ctx, request, mqtraceContext);if (requestHeader.isBatch()) {return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);} else {return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);}}
}

简单看下 executeSendMessageHookBefore 方法的处理逻辑。

public void executeSendMessageHookBefore(final ChannelHandlerContext ctx, final RemotingCommand request,SendMessageContext context) {// 如果SendMessageHook列表非空if (hasSendMessageHook()) {// 遍历SendMessageHook列表for (SendMessageHook hook : this.sendMessageHookList) {try {// 对RemotingCommand实例进行解析得到请求头信息final SendMessageRequestHeader requestHeader = parseRequestHeader(request);if (null != requestHeader) {// 填充SendMessageContext实例的相关属性String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getTopic());context.setNamespace(namespace);context.setProducerGroup(requestHeader.getProducerGroup());context.setTopic(requestHeader.getTopic());context.setBodyLength(request.getBody().length);context.setMsgProps(requestHeader.getProperties());context.setBornHost(RemotingHelper.parseChannelRemoteAddr(ctx.channel()));context.setBrokerAddr(this.brokerController.getBrokerAddr());context.setQueueId(requestHeader.getQueueId());}// 调用SendMessageHook的sendMessageBefore方法hook.sendMessageBefore(context);if (requestHeader != null) {// 填充请求头的相关属性requestHeader.setProperties(context.getMsgProps());}} catch (Throwable e) {}}}
}

根据上文可知,如果消息不是批量发送的,最后就会执行 asyncSendMessage 方法。

private CompletableFuture asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,SendMessageContext mqtraceContext,SendMessageRequestHeader requestHeader) {// 1、构造RemotingCommand实例final RemotingCommand response = preSend(ctx, request, requestHeader);final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();if (response.getCode() != -1) {return CompletableFuture.completedFuture(response);}final byte[] body = request.getBody();int queueIdInt = requestHeader.getQueueId();TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());if (queueIdInt < 0) {// 基于写队列的数量生成一个随机数queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());}MessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setTopic(requestHeader.getTopic());msgInner.setQueueId(queueIdInt);// 2、处理重试与死信队列if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {return CompletableFuture.completedFuture(response);}msgInner.setBody(body);msgInner.setFlag(requestHeader.getFlag());Map origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());MessageAccessor.setProperties(msgInner, origProps);msgInner.setBornTimestamp(requestHeader.getBornTimestamp());msgInner.setBornHost(ctx.channel().remoteAddress());msgInner.setStoreHost(this.getStoreHost());msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);if (origProps.containsKey(MessageConst.PROPERTY_WAIT_STORE_MSG_OK)) {String waitStoreMsgOKValue = origProps.remove(MessageConst.PROPERTY_WAIT_STORE_MSG_OK);msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));origProps.put(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, waitStoreMsgOKValue);} else {msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));}CompletableFuture putMessageResult = null;// 获取"TRAN_MSG"属性值String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (Boolean.parseBoolean(transFlag)) {// 判断 rejectTransactionMessage 属性值,即是否拒绝事务消息,默认falseif (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()+ "] sending transaction message is forbidden");return CompletableFuture.completedFuture(response);}// 交给TransactionalMessageServiceImpl继续处理putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);} else {putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);}// 主要是根据PutMessageStatus的不同状态,设置不同的响应码return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
}

1、preSend 方法

private RemotingCommand preSend(ChannelHandlerContext ctx, RemotingCommand request,SendMessageRequestHeader requestHeader) {// 构造RemotingCommand实例final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);response.setOpaque(request.getOpaque());response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));log.debug("Receive SendMessage request command {}", request);final long startTimestamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();if (this.brokerController.getMessageStore().now() < startTimestamp) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimestamp)));return response;}response.setCode(-1);// 主要用于检查请求头的相关信息super.msgCheck(ctx, requestHeader, response);return response;
}

有兴趣的同学可以研究一下 msgCheck 方法的逻辑。

2、handleRetryAndDLQ 方法

private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, RemotingCommand response,RemotingCommand request,MessageExt msg, TopicConfig topicConfig) {String newTopic = requestHeader.getTopic();// 如果主题以"%RETRY%"开头if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());// 获取订阅组配置信息SubscriptionGroupConfig subscriptionGroupConfig =this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);if (null == subscriptionGroupConfig) {response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);response.setRemark("subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));return false;}// 从订阅组配置信息中获取最大重试次数,默认16int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();// 请求的版本号 >= 3.4.9,并且请求头中设置了maxReconsumeTimes属性if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal() && requestHeader.getMaxReconsumeTimes() != null) {// 使用请求头中设置的maxReconsumeTimes属性值maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();}// 从请求头中获取reconsumeTimes属性值int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes();// 如果重新消费的次数达到了阈值if (reconsumeTimes >= maxReconsumeTimes) {// 重置主题为"%DLQ%" + 组名newTopic = MixAll.getDLQTopic(groupName);int queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % DLQ_NUMS_PER_GROUP;// 构造主题配置信息topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,DLQ_NUMS_PER_GROUP,PermName.PERM_WRITE | PermName.PERM_READ, 0);msg.setTopic(newTopic);msg.setQueueId(queueIdInt);msg.setDelayTimeLevel(0);if (null == topicConfig) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("topic[" + newTopic + "] not exist");return false;}}}int sysFlag = requestHeader.getSysFlag();if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;}msg.setSysFlag(sysFlag);return true;
}

TransactionalMessageServiceImpl

@Override
public CompletableFuture asyncPrepareMessage(MessageExtBrokerInner messageInner) {return transactionalMessageBridge.asyncPutHalfMessage(messageInner);
}

可知,交给了 TransactionalMessageBridge 继续处理。

TransactionalMessageBridge

public CompletableFuture asyncPutHalfMessage(MessageExtBrokerInner messageInner) {return store.asyncPutMessage(parseHalfMessageInner(messageInner));
}

先来看下 parseHalfMessageInner 方法对于半事务消息的处理。

private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {// 将真实的主题、队列ID作为MessageExtBrokerInner实例的属性存储起来MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,String.valueOf(msgInner.getQueueId()));// 设置sysFlag属性msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));// 设置主题为"RMQ_SYS_TRANS_HALF_TOPIC"msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());// 设置队列ID为0msgInner.setQueueId(0);// 设置属性字符串msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));return msgInner;
}

上述操作会将半事务消息的主题修改为 RMQ_SYS_TRANS_HALF_TOPIC,这正是服务端将半事务消息持久化成功后,不会投递给消费者的原因。

DefaultMessageStore

@Override
public CompletableFuture asyncPutMessage(MessageExtBrokerInner msg) {// 检查存储状态PutMessageStatus checkStoreStatus = this.checkStoreStatus();if (checkStoreStatus != PutMessageStatus.PUT_OK) {return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));}// 校验消息的主题、属性字符串PutMessageStatus msgCheckStatus = this.checkMessage(msg);if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));}// 校验LmqPutMessageStatus lmqMsgCheckStatus = this.checkLmqMessage(msg);if (msgCheckStatus == PutMessageStatus.LMQ_CONSUME_QUEUE_NUM_EXCEEDED) {return CompletableFuture.completedFuture(new PutMessageResult(lmqMsgCheckStatus, null));}long beginTime = this.getSystemClock().now();// 将半事务消息写入到CommitLog中CompletableFuture putResultFuture = this.commitLog.asyncPutMessage(msg);putResultFuture.thenAccept(result -> {long elapsedTime = this.getSystemClock().now() - beginTime;if (elapsedTime > 500) {log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);}this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);if (null == result || !result.isOk()) {this.storeStatsService.getPutMessageFailedTimes().add(1);}});return putResultFuture;
}

有兴趣的同学可以研究下 CommitLog。

源码分析-步骤三

生产者开始执行本地事务,并根据本地事务的执行结果向服务端提交二次确认结果(Commit、Rollback)。

DefaultMQProducerImpl

对 DefaultMQProducerImpl 的 sendMessageInTransaction 方法的剩余部分接着分析。

public TransactionSendResult sendMessageInTransaction(final Message msg,final LocalTransactionExecuter localTransactionExecuter, final Object arg)throws MQClientException {TransactionListener transactionListener = getCheckListener();// 校验本地事务执行器、事务监听器是否同时为空(本地事务执行器将会在 5.0.0 版本移除,推荐使用事务监听器)if (null == localTransactionExecuter && null == transactionListener) {throw new MQClientException("tranExecutor is null", null);}// 校验主题、消息体是否符合要求Validators.checkMessage(msg, this.defaultMQProducer);SendResult sendResult = null;// 对消息添加TRAN_MSG属性(属性值为true),也就是标记为事务消息MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");// 对消息添加PGROUP属性(属性值为生产者组)MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());try {// 1、发送半事务消息sendResult = this.send(msg);} catch (Exception e) {throw new MQClientException("send message Exception", e);}LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;Throwable localException = null;// 判断消息的发送状态switch (sendResult.getSendStatus()) {case SEND_OK: {try {if (sendResult.getTransactionId() != null) {msg.putUserProperty("__transactionId__", sendResult.getTransactionId());}// 获取消息的UNIQ_KEY属性对应的属性值String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);if (null != transactionId && !"".equals(transactionId)) {msg.setTransactionId(transactionId);}if (null != localTransactionExecuter) {localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);} else if (transactionListener != null) {log.debug("Used new transaction API");// 2、执行本地事务localTransactionState = transactionListener.executeLocalTransaction(msg, arg);}if (null == localTransactionState) {localTransactionState = LocalTransactionState.UNKNOW;}if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {log.info("executeLocalTransactionBranch return {}", localTransactionState);log.info(msg.toString());}} catch (Throwable e) {log.info("executeLocalTransactionBranch exception", e);log.info(msg.toString());localException = e;}}break;case FLUSH_DISK_TIMEOUT:case FLUSH_SLAVE_TIMEOUT:case SLAVE_NOT_AVAILABLE:localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;break;default:break;}try {// 3、生产者根据本地事务的执行结果向服务端提交二次确认结果this.endTransaction(sendResult, localTransactionState, localException);} catch (Exception e) {log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);}// 组装TransactionSendResult实例并返回TransactionSendResult transactionSendResult = new TransactionSendResult();transactionSendResult.setSendStatus(sendResult.getSendStatus());transactionSendResult.setMessageQueue(sendResult.getMessageQueue());transactionSendResult.setMsgId(sendResult.getMsgId());transactionSendResult.setQueueOffset(sendResult.getQueueOffset());transactionSendResult.setTransactionId(sendResult.getTransactionId());transactionSendResult.setLocalTransactionState(localTransactionState);return transactionSendResult;
}

executeLocalTransaction 方法

该方法是由客户端自定义的事务监听器(TransactionListener)的执行本地事务的方法。方法的出参返回 LocalTransactionState,而 LocalTransactionState 本身是个枚举类,支持 COMMIT_MESSAGE、ROLLBACK_MESSAGE、UNKNOWN 三种类型。

endTransaction 方法

最后看下 endTransaction 方法的处理。

public void endTransaction(final SendResult sendResult,final LocalTransactionState localTransactionState,final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {final MessageId id;if (sendResult.getOffsetMsgId() != null) {id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());} else {id = MessageDecoder.decodeMessageId(sendResult.getMsgId());}String transactionId = sendResult.getTransactionId();// 从缓存中获取NameServer地址final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());// 设置请求头EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();requestHeader.setTransactionId(transactionId);requestHeader.setCommitLogOffset(id.getOffset());// 根据不同的本地事务状态,设置不同类型的commitOrRollback属性switch (localTransactionState) {case COMMIT_MESSAGE:requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);break;case ROLLBACK_MESSAGE:requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);break;case UNKNOW:requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);break;default:break;}requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());requestHeader.setMsgId(sendResult.getMsgId());String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;// 向服务端提交二次确认结果this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,this.defaultMQProducer.getSendMsgTimeout());
}

源码分析-步骤四

服务端收到客户端的确认结果后,处理逻辑如下:

  • 二次确认结果为 Commit:基于半事务消息的相关属性以及它原来的主题、队列ID来创建新的事务消息并进行持久化,如果持久化成功则可以投递给消费者,最后丢弃半事务消息。
  • 二次确认结果为 Rollback:丢弃半事务消息。

最后的半事务消息的丢弃,其实是将半事务消息移动到一个新的主题以及OP队列中,而非直接物理删除。

EndTransactionProcessor

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throwsRemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);final EndTransactionRequestHeader requestHeader =(EndTransactionRequestHeader) request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);LOGGER.debug("Transaction request:{}", requestHeader);if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");return response;}if (requestHeader.getFromTransactionCheck()) {......} else {switch (requestHeader.getCommitOrRollback()) {// 如果是 TRANSACTION_NOT_TYPE,则不进行后续处理直接返回case MessageSysFlag.TRANSACTION_NOT_TYPE: {LOGGER.warn("The producer[{}] end transaction in sending message,  and it's pending status."+ "RequestHeader: {} Remark: {}",RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());return null;}case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {break;}case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {LOGGER.warn("The producer[{}] end transaction in sending message, rollback the message."+ "RequestHeader: {} Remark: {}",RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());break;}default:return null;}}OperationResult result = new OperationResult();// 如果二次确认结果是提交事务if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {// 从CommitLog取出对应的半事务消息result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);if (result.getResponseCode() == ResponseCode.SUCCESS) {// 校验半事务消息RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);if (res.getCode() == ResponseCode.SUCCESS) {// 基于半事务消息的相关属性以及它原来的主题、队列ID来创建新的事务消息MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);// 将新的事务消息写入到CommitLog中RemotingCommand sendResult = sendFinalMessage(msgInner);if (sendResult.getCode() == ResponseCode.SUCCESS) {// 删除半事务消息this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());}return sendResult;}return res;}// 如果二次确认结果是回滚事务  } else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {// 从CommitLog取出对应的半事务消息result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);if (result.getResponseCode() == ResponseCode.SUCCESS) {// 校验半事务消息RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);if (res.getCode() == ResponseCode.SUCCESS) {// 删除半事务消息this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());}return res;}}response.setCode(result.getResponseCode());response.setRemark(result.getResponseRemark());return response;
}

接着看下 checkPrepareMessage 方法的处理逻辑。

private RemotingCommand checkPrepareMessage(MessageExt msgExt, EndTransactionRequestHeader requestHeader) {final RemotingCommand response = RemotingCommand.createResponseCommand(null);if (msgExt != null) {final String pgroupRead = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);// 校验生产者组if (!pgroupRead.equals(requestHeader.getProducerGroup())) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("The producer group wrong");return response;}// 校验事务状态表位移if (msgExt.getQueueOffset() != requestHeader.getTranStateTableOffset()) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("The transaction state table offset wrong");return response;}// 校验CommitLog位移if (msgExt.getCommitLogOffset() != requestHeader.getCommitLogOffset()) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("The commit log offset wrong");return response;}} else {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("Find prepared transaction message failed");return response;}response.setCode(ResponseCode.SUCCESS);return response;
}

然后看下 endMessageTransaction 方法的处理逻辑。

private MessageExtBrokerInner endMessageTransaction(MessageExt msgExt) {// 创建新的消息MessageExtBrokerInner msgInner = new MessageExtBrokerInner();// 设置主题为半事务消息原来的主题msgInner.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));// 设置队列ID为半事务消息原来的queueIdmsgInner.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));msgInner.setBody(msgExt.getBody());msgInner.setFlag(msgExt.getFlag());msgInner.setBornTimestamp(msgExt.getBornTimestamp());msgInner.setBornHost(msgExt.getBornHost());msgInner.setStoreHost(msgExt.getStoreHost());msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());msgInner.setWaitStoreMsgOK(false);msgInner.setTransactionId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));msgInner.setSysFlag(msgExt.getSysFlag());TopicFilterType topicFilterType =(msgInner.getSysFlag() & MessageSysFlag.MULTI_TAGS_FLAG) == MessageSysFlag.MULTI_TAGS_FLAG ? TopicFilterType.MULTI_TAG: TopicFilterType.SINGLE_TAG;long tagsCodeValue = MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());msgInner.setTagsCode(tagsCodeValue);MessageAccessor.setProperties(msgInner, msgExt.getProperties());msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC);MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID);return msgInner;
}

最后简单看下 sendFinalMessage 方法的内部逻辑。

private RemotingCommand sendFinalMessage(MessageExtBrokerInner msgInner) {final RemotingCommand response = RemotingCommand.createResponseCommand(null);// 将新的事务消息写入到CommitLog中final PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);if (putMessageResult != null) {switch (putMessageResult.getPutMessageStatus()) {// Successcase PUT_OK:case FLUSH_DISK_TIMEOUT:case FLUSH_SLAVE_TIMEOUT:case SLAVE_NOT_AVAILABLE:response.setCode(ResponseCode.SUCCESS);response.setRemark(null);break;// Failedcase CREATE_MAPEDFILE_FAILED:response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("Create mapped file failed.");break;......}return response;} else {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("store putMessage return null");}return response;
}

TransactionalMessageServiceImpl

Override
public OperationResult commitMessage(EndTransactionRequestHeader requestHeader) {return getHalfMessageByOffset(requestHeader.getCommitLogOffset());
}@Override
public OperationResult rollbackMessage(EndTransactionRequestHeader requestHeader) {return getHalfMessageByOffset(requestHeader.getCommitLogOffset());
}

可见,无论是提交事务,还是回滚事务,都会调用 getHalfMessageByOffset 方法。

private OperationResult getHalfMessageByOffset(long commitLogOffset) {OperationResult response = new OperationResult();// 根据消息位移获取对应的半事务消息MessageExt messageExt = this.transactionalMessageBridge.lookMessageByOffset(commitLogOffset);if (messageExt != null) {response.setPrepareMessage(messageExt);response.setResponseCode(ResponseCode.SUCCESS);} else {response.setResponseCode(ResponseCode.SYSTEM_ERROR);response.setResponseRemark("Find prepared transaction message failed");}return response;
}

最后看下 deletePrepareMessage 方法的处理。

@Override
public boolean deletePrepareMessage(MessageExt msgExt) {// 交给TransactionalMessageBridge继续处理if (this.transactionalMessageBridge.putOpMessage(msgExt, TransactionalMessageUtil.REMOVETAG)) {log.debug("Transaction op message write successfully. messageId={}, queueId={} msgExt:{}", msgExt.getMsgId(), msgExt.getQueueId(), msgExt);return true;} else {log.error("Transaction op message write failed. messageId is {}, queueId is {}", msgExt.getMsgId(), msgExt.getQueueId());return false;}
}

TransactionalMessageBridge

lookMessageByOffset

public MessageExt lookMessageByOffset(final long commitLogOffset) {// 交给DefaultMessageStore继续处理return this.store.lookMessageByOffset(commitLogOffset);
}

putOpMessage

ublic boolean putOpMessage(MessageExt messageExt, String opType) {// 构造消息队列MessageQueue messageQueue = new MessageQueue(messageExt.getTopic(),this.brokerController.getBrokerConfig().getBrokerName(), messageExt.getQueueId());if (TransactionalMessageUtil.REMOVETAG.equals(opType)) {return addRemoveTagInTransactionOp(messageExt, messageQueue);}return true;
}

addRemoveTagInTransactionOp

rivate boolean addRemoveTagInTransactionOp(MessageExt prepareMessage, MessageQueue messageQueue) {// 构造Message实例(主题为"RMQ_SYS_TRANS_OP_HALF_TOPIC",标签为"d")Message message = new Message(TransactionalMessageUtil.buildOpTopic(), TransactionalMessageUtil.REMOVETAG,String.valueOf(prepareMessage.getQueueOffset()).getBytes(TransactionalMessageUtil.charset));// 构造OP队列,然后将消息队列、OP队列放到缓存中,最后使用Message实例、OP队列的相关属性构造MessageExtBrokerInner实例并写到 CommitLog中writeOp(message, messageQueue);return true;
}

writeOp

构造OP队列,然后将消息队列、OP队列放到缓存中,最后使用Message实例、OP队列的相关属性构造MessageExtBrokerInner实例并写到CommitLog中。

private void writeOp(Message message, MessageQueue mq) {MessageQueue opQueue;// 从缓存中获取消息队列对应的OP队列if (opQueueMap.containsKey(mq)) {opQueue = opQueueMap.get(mq);} else {// 构造OP队列(对应主题为"RMQ_SYS_TRANS_OP_HALF_TOPIC")opQueue = getOpQueueByHalf(mq);// 将消息队列、OP队列放到缓存中MessageQueue oldQueue = opQueueMap.putIfAbsent(mq, opQueue);if (oldQueue != null) {opQueue = oldQueue;}}if (opQueue == null) {// 构造OP队列(对应主题为"RMQ_SYS_TRANS_OP_HALF_TOPIC")opQueue = new MessageQueue(TransactionalMessageUtil.buildOpTopic(), mq.getBrokerName(), mq.getQueueId());}// 使用Message实例、OP队列的相关属性构造MessageExtBrokerInner实例并写到CommitLog中putMessage(makeOpMessageInner(message, opQueue));
}

getOpQueueByHalf

构造OP队列

private MessageQueue getOpQueueByHalf(MessageQueue halfMQ) {MessageQueue opQueue = new MessageQueue();// 主题为"RMQ_SYS_TRANS_OP_HALF_TOPIC"opQueue.setTopic(TransactionalMessageUtil.buildOpTopic());opQueue.setBrokerName(halfMQ.getBrokerName());opQueue.setQueueId(halfMQ.getQueueId());return opQueue;
}

putMessage

将 MessageExtBrokerInner 实例写到 CommitLog 中

public boolean putMessage(MessageExtBrokerInner messageInner) {PutMessageResult putMessageResult = store.putMessage(messageInner);if (putMessageResult != null&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {return true;} else {LOGGER.error("Put message failed, topic: {}, queueId: {}, msgId: {}",messageInner.getTopic(), messageInner.getQueueId(), messageInner.getMsgId());return false;}
}

DefaultMessageStore

1、lookMessageByOffset

public MessageExt lookMessageByOffset(long commitLogOffset) {SelectMappedBufferResult sbr = this.commitLog.getMessage(commitLogOffset, 4);if (null != sbr) {try {int size = sbr.getByteBuffer().getInt();return lookMessageByOffset(commitLogOffset, size);} finally {sbr.release();}}return null;
}

简单看下 lookMessageByOffset 方法的处理逻辑。

public MessageExt lookMessageByOffset(long commitLogOffset, int size) {SelectMappedBufferResult sbr = this.commitLog.getMessage(commitLogOffset, size);if (null != sbr) {try {return MessageDecoder.decode(sbr.getByteBuffer(), true, false);} finally {sbr.release();}}return null;
}

可知,从 CommitLog 中获取指定消息位移的半事务消息。

2、putMessage

@Override
public PutMessageResult putMessage(MessageExtBrokerInner msg) {return waitForPutResult(asyncPutMessage(msg));
}

简单看下 waitForPutResult 方法的处理逻辑。

private PutMessageResult waitForPutResult(CompletableFuture putMessageResultFuture) {try {// 获取写入消息的超时时间(在 syncFlushTimeout(默认5000)、slaveTimeout(默认3000)之间取最大值)int putMessageTimeout =Math.max(this.messageStoreConfig.getSyncFlushTimeout(),this.messageStoreConfig.getSlaveTimeout()) + 5000;// 同步等待写入消息的结果return putMessageResultFuture.get(putMessageTimeout, TimeUnit.MILLISECONDS);} catch (ExecutionException | InterruptedException e) {return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null);} catch (TimeoutException e) {log.error("usually it will never timeout, putMessageTimeout is much bigger than slaveTimeout and "+ "flushTimeout so the result can be got anyway, but in some situations timeout will happen like full gc "+ "process hangs or other unexpected situations.");return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null);}
}

最后看下 asyncPutMessage 方法的处理逻辑。

@Override
public CompletableFuture asyncPutMessage(MessageExtBrokerInner msg) {// 检查存储状态PutMessageStatus checkStoreStatus = this.checkStoreStatus();if (checkStoreStatus != PutMessageStatus.PUT_OK) {return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));}// 校验消息的主题、属性字符串PutMessageStatus msgCheckStatus = this.checkMessage(msg);if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));}// 校验LmqPutMessageStatus lmqMsgCheckStatus = this.checkLmqMessage(msg);if (msgCheckStatus == PutMessageStatus.LMQ_CONSUME_QUEUE_NUM_EXCEEDED) {return CompletableFuture.completedFuture(new PutMessageResult(lmqMsgCheckStatus, null));}long beginTime = this.getSystemClock().now();// 将半事务消息写入到CommitLog中CompletableFuture putResultFuture = this.commitLog.asyncPutMessage(msg);putResultFuture.thenAccept(result -> {long elapsedTime = this.getSystemClock().now() - beginTime;if (elapsedTime > 500) {log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);}this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);if (null == result || !result.isOk()) {this.storeStatsService.getPutMessageFailedTimes().add(1);}});return putResultFuture;
}

有兴趣的同学可以研究下 CommitLog。

源码分析-步骤五

服务端每隔60秒进行一次检查,如果发现一直(6秒内)没有收到生产者提交的二次确认结果,或者服务端收到的二次确认结果为 Unknown 未知状态,则服务端对生产者发起消息回查。如果服务端发现原始生产者已经崩溃,则会向同一生产者组的其它生产者实例发起消息回查。

BrokerStartup

在 BrokerStartup 的 main 方法中会调用 createBrokerController 方法。而 createBrokerController 方法中有如下一行代码:

boolean initResult = controller.initialize()

此外在 main 方法中还会调用 start 方法。而 start 方法中有如下一行代码:

controller.start();

接下来的分析中会涉及到这两个方法。

BrokerController

BrokerController 的 initialize 方法中有如下一行代码:

initialTransaction();

接下来看下它的 initialTransaction 方法的处理逻辑:

private void initialTransaction() {// 1、通过 SPI 机制加载 TransactionalMessageService 实例this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class);// 如果 TransactionalMessageService 实例为空if (null == this.transactionalMessageService) {// 本地构建一个 TransactionalMessageServiceImpl 实例this.transactionalMessageService = new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.getMessageStore()));log.warn("Load default transaction message hook service: {}", TransactionalMessageServiceImpl.class.getSimpleName());}// 2、通过 SPI 机制加载 AbstractTransactionalMessageCheckListener 实例this.transactionalMessageCheckListener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class);// 如果 AbstractTransactionalMessageCheckListener 实例为空if (null == this.transactionalMessageCheckListener) {// 本地构建一个 DefaultTransactionalMessageCheckListener 实例this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener();log.warn("Load default discard message hook service: {}", DefaultTransactionalMessageCheckListener.class.getSimpleName());}this.transactionalMessageCheckListener.setBrokerController(this);// 3、本地构建一个 TransactionalMessageCheckService 实例this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
}

此外,BrokerController 的 start 方法中有如下一行代码:

startProcessorByHa(messageStoreConfig.getBrokerRole());

进入 startProcessorByHa 方法看下内部逻辑。

private void startProcessorByHa(BrokerRole role) {// 如果 Broker 的角色不是 SLAVEif (BrokerRole.SLAVE != role) {if (this.transactionalMessageCheckService != null) {// 启动 TransactionMessageCheckService 实例this.transactionalMessageCheckService.start();}}
}

TransactionalMessageCheckService 本身实现了 Runnable 接口,也就是说会调用它的 run 方法。

TransactionalMessageCheckService

@Override
public void run() {log.info("Start transaction check service thread!");// 获取事务检查的时间间隔,默认60秒(可以通过服务端配置 transactionCheckInterval 进行修改)long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();while (!this.isStopped()) {this.waitForRunning(checkInterval);}log.info("End transaction check service thread!");
}

进入 waitForRunning 方法一窥究竟。

protected void waitForRunning(long interval) {if (hasNotified.compareAndSet(true, false)) {this.onWaitEnd();return;}waitPoint.reset();try {// 阻塞线程waitPoint.await(interval, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {log.error("Interrupted", e);} finally {hasNotified.set(false);this.onWaitEnd();}
}

最终会调用 onWaitEnd 方法。

@Override
protected void onWaitEnd() {// 事务消息的超时时间,默认6秒,可以通过服务端参数 transactionTimeOut 进行修改long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();// 消息回查的最大次数,默认15次,达到阈值则丢弃消息,可以通过服务端参数 transactionCheckMax 进行修改int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();long begin = System.currentTimeMillis();log.info("Begin to check prepare message, begin time:{}", begin);// 执行消息回查的处理逻辑this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}

在前面的 BrokerController 的 initialTransaction 方法得知,如果 SPI 机制加载 TransactionalMessageService 实例为空,则本地构建一个 TransactionalMessageServiceImpl 实例。

接下来看下 TransactionalMessageServiceImpl 的 check 方法是如何处理的。

TransactionalMessageServiceImpl

org.apache.rocketmq.broker.transaction.queue 包路径下的 TransactionalMessageServiceImpl 实例。

@Override
public void check(long transactionTimeout, int transactionCheckMax,AbstractTransactionalMessageCheckListener listener) {try {// "RMQ_SYS_TRANS_HALF_TOPIC"String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;// 获取读队列集合Set msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);if (msgQueues == null || msgQueues.size() == 0) {log.warn("The queue of topic is empty :" + topic);return;}log.debug("Check topic={}, queues={}", topic, msgQueues);// 遍历读队列集合for (MessageQueue messageQueue : msgQueues) {long startTime = System.currentTimeMillis();// 获取读队列对应的op队列(op队列对应的主题是"RMQ_SYS_TRANS_OP_HALF_TOPIC"),两者一对一关系MessageQueue opQueue = getOpQueue(messageQueue);// 获取读队列中记录的消费位移long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);// 获取op队列中记录的消费位移long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);// 校验两个队列记录的消费位移if (halfOffset < 0 || opOffset < 0) {log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue,halfOffset, opOffset);continue;}List doneOpOffset = new ArrayList<>();HashMap removeMap = new HashMap<>();// 从OP队列中取出待删除的半事务消息放到removeMap中PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);if (null == pullResult) {log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null",messageQueue, halfOffset, opOffset);continue;}int getMessageNullCount = 1;long newOffset = halfOffset;long i = halfOffset;while (true) {if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);break;}// 如果该消费位移已经被处理过(提交/回滚)if (removeMap.containsKey(i)) {log.debug("Half offset {} has been committed/rolled back", i);Long removedOpOffset = removeMap.remove(i);doneOpOffset.add(removedOpOffset);} else {GetResult getResult = getHalfMsg(messageQueue, i);MessageExt msgExt = getResult.getMsg();if (msgExt == null) {if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {break;}if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {log.debug("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i,messageQueue, getMessageNullCount, getResult.getPullResult());break;} else {log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}",i, messageQueue, getMessageNullCount, getResult.getPullResult());i = getResult.getPullResult().getNextBeginOffset();newOffset = i;continue;}}// 如果消息回查次数达到阈值(默认15次)或者半事务消息的存在时间大于日志文件保存时间的阈值(默认3天)// 可以通过服务端参数 transactionCheckMax 修改消息回查次数的阈值// 可以通过服务端参数 fileReservedTime 修改日志文件保存时间的阈值if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {// 将消息移动到"TRANS_CHECK_MAX_TIME_TOPIC"主题中listener.resolveDiscardMsg(msgExt);newOffset = i + 1;i++;continue;}if (msgExt.getStoreTimestamp() >= startTime) {log.debug("Fresh stored. the miss offset={}, check it later, store={}", i,new Date(msgExt.getStoreTimestamp()));break;}long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();long checkImmunityTime = transactionTimeout;String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);if (null != checkImmunityTimeStr) {checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);if (valueOfCurrentMinusBorn < checkImmunityTime) {if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {newOffset = i + 1;i++;continue;}}} else {if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) {log.debug("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i,checkImmunityTime, new Date(msgExt.getBornTimestamp()));break;}}List opMsg = pullResult.getMsgFoundList();// 判断是否需要消息回查// 如果op队列为空并且半事务消息的存在时间大于事务消息的超时时间(默认6秒),则需要消息回查                	// 或者op队列中的消息列表中的最后一条的存在时间大于事务消息的超时时间(默认6秒),则需要消息回查// 或者半事务消息的存在时间小于等于-1,则需要消息回查// 可以通过服务端参数 transactionTimeOut 来修改事务消息的超时时间boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)|| (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))|| (valueOfCurrentMinusBorn <= -1);// 如果需要消息回查if (isNeedCheck) {// 将半事务消息写回到"RMQ_SYS_TRANS_HALF_TOPIC"主题中if (!putBackHalfMsgQueue(msgExt, i)) {continue;}// 消息回查监听器对半事务消息进行处理listener.resolveHalfMsg(msgExt);} else {pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,messageQueue, pullResult);continue;}}newOffset = i + 1;i++;}if (newOffset != halfOffset) {transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);}long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);if (newOpOffset != opOffset) {transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);}}} catch (Throwable e) {log.error("Check error", e);}}

AbstractTransactionalMessageCheckListener

resolveHalfMsg 方法

public void resolveHalfMsg(final MessageExt msgExt) {executorService.execute(new Runnable() {@Overridepublic void run() {try {// 向客户端发起消息回查sendCheckMessage(msgExt);} catch (Exception e) {LOGGER.error("Send check message error!", e);}}});
}

sendCheckMessage 方法

public void sendCheckMessage(MessageExt msgExt) throws Exception {CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset());checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());// 使用半事务消息原来的主题msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));// 使用半事务消息原来的队列IDmsgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));msgExt.setStoreSize(0);String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);// 如果服务端发现原始生产者已经崩溃,则会向同一生产者组的其它生产者实例发起消息回查Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId);if (channel != null) {brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);} else {LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);}
}

源码分析-步骤六

生产者收到消息回查后,检查本地事务执行的最终状态并再次提交二次确认结果。

ClientRemotingProcessor

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {switch (request.getCode()) {case RequestCode.CHECK_TRANSACTION_STATE:return this.checkTransactionState(ctx, request);......}return null;
}

检查本地事务的最终状态会调用 TransactionListener 的 checkLocalTransaction 方法。

相关内容