面试官:RocketMQ 的推模式和拉模式有什么区别?

IT科技2025-11-05 06:28:13382

大家好,面试我是推模君哥。

RocketMQ 消息消费有两种模式,式和式有什区PULL 和 PUSH,拉模今天我们来看一下这两种模式有什么区别。

PUSH 模式

首先看一段 RocketMQ 推模式的面试一个官方示例:

public static void main(String[] args) throws InterruptedException, MQClientException {

Tracer tracer = initTracer();

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");

consumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(new ConsumeMessageOpenTracingHookImpl(tracer));

consumer.subscribe("TopicTest", "*");

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.setConsumeTimestamp("20181109221800");

consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override

public ConsumeConcurrentlyStatus consumeMessage(Listmsgs, ConsumeConcurrentlyContext context) {

System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

});

consumer.start();

System.out.printf("Consumer Started.%n");

}

消费者会定义一个消息监听器,并且把这个监听器注册到 DefaultMQPushConsumer,推模同时也会注册到 DefaultMQPushConsumerIm-pl,式和式有什区当拉取到消息时,拉模就会使用这个监听器来处理消息。面试那这个监听器是推模什么时候调用呢?看下面的 UML 类图:

消费者真正拉取请求的类是 DefaultMQPush-ConsumerImpl,这个类的式和式有什区 pullMessage 方法调用了 PullAPIWrapper 的 pullKernelImpl 方法,这个方法有一个参数是拉模回调函数 Pull-Callback,当 PULL 状态是面试 PullStatus.FOU-ND 时,代表拉取消息成功,推模处理逻辑如下:

PullCallback pullCallback = new PullCallback() {

@Override

public void onSuccess(PullResult pullResult) {

if (pullResult != null) {

pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(),式和式有什区 pullResult,

subscriptionData);

switch (pullResult.getPullStatus()) {

case FOUND:

//省略部分逻辑

DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(

pullResult.getMsgFoundList(),

processQueue,

pullRequest.getMessageQueue(),

dispatchToConsume);

//省略部分逻辑

break;

//省略其他case

default:

break;

}

}

}

@Override

public void onException(Throwable e) {

//省略

}

};

这个处理逻辑调用了 ConsumeMessage-Service 类的 submitConsumeRequest 方法,我们看一下并发消费消息的处理逻辑,代码如下:

public void submitConsumeRequest(

final Listmsgs,

final ProcessQueue processQueue,

final MessageQueue messageQueue,

final boolean dispatchToConsume) {

final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();

if (msgs.size() <= consumeBatchSize) {

ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);

try {

this.consumeExecutor.submit(consumeRequest);

} catch (RejectedExecutionException e) {

this.submitConsumeRequestLater(consumeRequest);

}

} else {

//分批处理,跟上面逻辑一致

}

ConsumeRequest 类是一个线程类,run 方法里面调用了消费者定义的消息处理方法,代码如下:

public void run() {

//省略逻辑

MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;

//省略逻辑

try {

//调用消费方法

status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);

} catch (Throwable e) {

//省略逻辑

}

//省略逻辑

}

下面以并发消费方式下的WordPress模板同步拉取消息为例总结一下消费者消息处理过程:

在 MessageListenerConcurrently 中定义消费者处理逻辑,消费者启动时注册到 DefaultMQPushConsumer 和 DefaultMQ-PushConsumerImpl。消费者启动时,启动消费拉取线程 PullMessageService,里面死循环不停地从 Broker 拉取消息。这里调用了 DefaultMQPushConsumerImpl 类的 pullMessage 方法。DefaultMQPushConsumerImpl 类的 pullMessage 方法调用 PullAPIWrapper 的 pullKernelImpl 方法真正去发送 PULL 请求,并传入 PullCallback 的 回调函数。拉取到消息后,调用 PullCallback 的 onSuccess 方法处理结果,这里调用了 ConsumeMessageConcurrentlyService 的 submitConsumeRequest 方法,里面用 ConsumeRequest 线程来处理拉取到的消息。ConsumeRequest 处理消息时调用了消费端定义的消费逻辑,也就是 Message-ListenerConcurrently 的 consumeMessage 方法。PULL 模式

下面是来自官方的一段 PULL 模式拉取消息的代码:

DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test");

litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

litePullConsumer.subscribe("TopicTest", "*");

litePullConsumer.start();

try {

while (running) {

ListmessageExts = litePullConsumer.poll();

System.out.printf("%s%n", messageExts);

}

} finally {

litePullConsumer.shutdown();

}

这里我们看到,PULL 模式需要在处理逻辑里不停的去拉取消息,比如上面代码中写了一个死循环。那 PULL 模式中 poll 函数是怎么实现的免费源码下载呢?我们看下面的 UML 类图:

跟踪源码可以看到,消息拉取最终是从 DefaultLitePullConsumerImpl 类中的一个 LinkedBlockingQueue 上面拉取。那消息是什么时候 put 到 LinkedBlockingQueue 呢?

官方拉取消息的代码中有一个 subscribe 方法订阅了 Topic,这里相关的 UML 类图如下:

这个 subscribe 方法最终调用了 DefaultLite-PullConsumerImpl 类的 subscribe,代码如下:

public synchronized void subscribe(String topic, String subExpression) throws MQClientException {

try {

//省略逻辑

this.defaultLitePullConsumer.setMessageQueueListener(new MessageQueueListenerImpl());

assignedMessageQueue.setRebalanceImpl(this.rebalanceImpl);

//省略逻辑

} catch (Exception e) {

throw new MQClientException("subscribe exception", e);

}

}

这里给 DefaultLitePullConsumer 类的 messageQueueListener 这个监听器进行了赋值。当监听器监听到 MessageQueue 发送变化时,就会启动消息拉取消息的线程 Pull-TaskImpl,代码如下:

public void run() {

//省略部分逻辑

if (!this.isCancelled()) {

long pullDelayTimeMills = 0;

try {

PullResult pullResult = pull(messageQueue, subscriptionData, offset, defaultLitePullConsumer.getPullBatchSize());

switch (pullResult.getPullStatus()) {

case FOUND:

final Object objLock = messageQueueLock.fetchLockObject(messageQueue);

synchronized (objLock) {

if (pullResult.getMsgFoundList() != null && !pullResult.getMsgFoundList().isEmpty() && assignedMessageQueue.getSeekOffset(messageQueue) == -1) {

processQueue.putMessage(pullResult.getMsgFoundList());

submitConsumeRequest(new ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue));

}

}

break;

//省略其他 case

}

}

//省略 catch

if (!this.isCancelled()) {

//启动下一次拉取

scheduledThreadPoolExecutor.schedule(this, pullDelayTimeMills, TimeUnit.MILLISECONDS);

} else {

log.warn("The Pull Task is cancelled after doPullTask, {}", messageQueue);

}

}

}

拉取消息成功后,调用 submitConsume-Request 方法把拉取到的消息放到 consumeRequestCache,然后启动下一次拉取。

这样就清除了示例代码中 poll 消息的逻辑,那还有一个问题,监听器是什么时候触发监听事件呢?

在消费者启动时,会启动 RebalanceService 这个线程,这个线程的 run 方法如下:

public void run() {

while (!this.isStopped()) {

this.waitForRunning(waitInterval);

this.mqClientFactory.doRebalance();

}

}

下面的 UML 类图显示了 doRebalance 方法的调用关系:

可以看到最终调用了 最终调用了 Rebalance-LitePullImpl 的 messageQueueChanged 方法,代码如下:

public void messageQueueChanged(String topic, SetmqAll, SetmqDivided) {

MessageQueueListener messageQueueListener = this.litePullConsumerImpl.getDefaultLitePullConsumer().getMessageQueueListener();

if (messageQueueListener != null) {

try {

messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);

} catch (Throwable e) {

log.error("messageQueueChanged exception", e);

}

}

}

这里最终触发了监听器。

下面以并发消费方式下的网站模板同步拉取消息为例总结一下消费者消息处理过程:

消费者启动,向 DefaultLitePullConsumer 订阅了 Topic,这个订阅过程会向 DefaultLitePullConsumer 注册一个监听器。消费者启动过程中,会启动 Message-Queue 重平衡线程 Rebalance-Service,当重平衡过程发现 ProcessQueueTable 发生变化时,启动消息拉取线程。消息拉取线程拉取到消息后,把消息放到 consumeRequestCache,然后进行下一次拉取。消费者启动后,不停地从 consumeReq-uestCache 拉取消息进行处理。总结

通过本文的讲解,可以看到 PUSH 模式和 PULL 模式本质上都是客户端主动拉取,RocketMQ并没有真正实现 Broker 推送消息的 PUSH 模式。RocketMQ 中 PULL 模式和 PUSH 模式的区别如下:

PULL 模式是从 Broker 拉取消息后放入缓存,然后消费端不停地从缓存取出消息来执行客户端定义的处理逻辑,而 PUSH 模式是在死循环中不停的从 Broker 拉取消息,拉取到后调用回调函数进行处理,回调函数中调用客户端定义的处理逻辑。

PUSH 模式拉取消息依赖死循环来不停唤起业务,而 PULL 模式拉取消息是通过 MessageQueue 监听器来触发消息拉取线程,消息拉取线程会在拉取完一次后接着下一次拉取。

本文地址:http://www.bzuk.cn/html/117b31199571.html
版权声明

本文仅代表作者观点,不代表本站立场。
本文系作者授权发表,未经许可,不得转载。

全站热门

如何更换旧电脑的网卡(简单易懂的教程,轻松升级你的网络连接)

重磅!Twitter 源代码泄露

TPM 2.0 爆出漏洞,数十亿物联网设备受到严重威胁!

Spring Boot 如何快速集成Redis?

电脑主板刷机教程(电脑主板刷机步骤详解,让你轻松升级BIOS)

Redis热点之底层实现篇

【大厂面试题】Redis中是如何实现分布式锁的?

我从未见过的牛逼解说方式!Redis五种数据结构,看一遍就懂了

友情链接

滇ICP备2023006006号-33