首页 >> 人物 >> RocketMQ之消费者激活与消费流程

RocketMQ之消费者激活与消费流程

2025-02-19 人物

Listener( newMessageListenerConcurrently { @ Override publicConsumeConcurrentlyStatus consumeMessage( List msgs, ConsumeConcurrentlyContext context) { System. out.printf( "%s Receive New Messages: %s %n", Thread.currentThread.getName, msgs); // 标上该除此以外的消息仍然被先前折扣 returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 之后启动折扣者重构 consumer.start; System. out.printf( "Consumer Started.%n"); } }

后面让我们来深入研究折扣者在之后启动之前每一阶段性之前实在了什么吧,let’s go.

2.1 重构所谓折扣者

第一步主要是重构所谓折扣者,这里无视选项的Push折扣者的控制系统,构造器之前参数为并不相同的折扣者分第一组,登录同一分第一组可以折扣同一种类的除此以外的消息,如果很难登录,将但会无视选项的分第一组的控制系统,这里重构所谓了一个DefaultMQPushConsumerImpl对象,它是侧面折扣机能的主要借助于类。

// 重构所谓折扣者 DefaultMQPushConsumer consumer = newDefaultMQPushConsumer( "TestConsumer");

主要通过DefaultMQPushConsumer重构所谓DefaultMQPushConsumerImpl,它是主要的折扣机能借助于类。

publicDefaultMQPushConsumer( finalString consumerGroup, RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy) { this.consumerGroup = consumerGroup; this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl( this, rpcHook); } // 另设NameServer的URL consumer.setNamesrvAddr( "localhost:9876"); // 在线一个或者多个Topic,以及Tag来截取所需折扣的除此以外的消息 consumer.subscribe( "Test", "*");

2.2.1 添加tag

publicstaticSubionData buildSubionData(final StringconsumerGroup, Stringtopic, StringsubString) throws Exception { SubionData subionData = newSubionData; subionData.setTopic(topic); subionData.setSubString(subString); if( null== subString || subString.equals(SubionData.SUB_ALL) || subString.length == 0) { subionData.setSubString(SubionData.SUB_ALL); } else{ String[] tags = subString.split( "\|\|"); if(tags.length> 0) { for( Stringtag : tags) { if(tag.length> 0) { StringtrimString = tag.trim; if(trimString.length> 0) { subionData.getTagsSet.add(trimString); subionData.getCodeSet.add(trimString.hashCode); } } } } else{ thrownewException( "subString split error"); } } returnsubionData; }

2.2.2 转发腹痛至Broker

sendHeartbeatToAllBrokerWithLock作法,进行时腹痛检查和MySpace滤网类至broker空降兵(小农之后启动过程也但会进行时此步骤)。如下右图:

publicvoid sendHeartbeatToAllBrokerWithLock { if( this.lockHeartbeat.tryLock) { try{ this.sendHeartbeatToAllBroker; this.uploadFilterClassSource; } catch( finalException e) { log.error( "sendHeartbeatToAllBroker exception", e); } finally{ this.lockHeartbeat.unlock; } } else{ log.warn( "lock heartBeat, but failed."); } }

首必先但会对broker空降兵进行时腹痛检查,在此过程之前但会施加锁,它但会指派sendHeartbeatToAllBroker作法,实现腹痛资料heartbeatData,然后基元折扣和小农table,将折扣者和小农电子邮件之后加入到heartbeatData之前,当都假定折扣者和小农的可能会下,但会基元brokerAddrTable,往每个broker URL转发腹痛,相当于 往并不相同URL转发一次http劝告,用于 探测意味着broker真的能活。

this.mQClientAPIImpl.sendHearbeat( addr, heartbeatData, 3000);

2.2.3MySpace滤网类至FilterServer

private void uploadFilterClassSource { Iterator> it = this.consumerTable.entrySet.iterator; while(it.hasNext) { Entry next= it.next; MQConsumerInner consumer = next.getValue; if(ConsumeType.CONSUME_PASSIVELY == consumer.consumeType) { Set subions = consumer.subions; for(SubionData sub: subions) { if( sub. isClassFilterMode&& sub. getFilterClassSource!= null) { final String consumerGroup = consumer.groupName; final String className = sub. getSubString; final String topic = sub. getTopic; final String filterClassSource = sub. getFilterClassSource; try { this.uploadFilterClassToAllFilterServer(consumerGroup, className, topic, filterClassSource); } catch (Exception e) { log.error( "uploadFilterClassToAllFilterServer Exception", e); } } } } } }

滤网类的关键作用:折扣末端可以MySpace一个Class类机密文件到 FilterServer,Consumer从FilterServer努取除此以外的消息时,FilterServer但会把劝告转发给Broker,FilterServer缴交到Broker除此以外的消息后,根据MySpace的截取类之前的逻辑实在截取操作,截取完了成后如此一来把除此以外的消息给到Consumer,软件可以自界定截取除此以外的消息的借助于类。

2.3 申请难以实现借助于类

几周就是标识符之前的申请难以实现借助于类了,当然,如果你是pull的控制系统的话就不所需借助于它了,push的控制系统所需界定,两者不同点侧面但会说什么到,它主要用于从broker同步给与除此以外的消息,这里有两种折扣实例种类,用于相同的折扣种类。

ConsumeConcurrentlyContext:延时类除此以外的消息实例,用于延时除此以外的消息,即除此以外除此以外的消息,选项不延误,可以另设延误等级,每个等级并不相同固定有规律时间标尺,RocketMQ之前不能自界定延误有规律时间,延误等级从1开始,并不相同的有规律时间有规律如下右图:

"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

ConsumeOrderlyContext :左至右类除此以外的消息实例,掌控转发除此以外的消息的左至右,小农另设移去IP的控制系统后,相同key只落下登录queue上,折扣过程之前但会对左至右除此以外的消息所在的queue加锁,应有除此以外的消息的有序性。

2.4 折扣者之后启动

我们必先来看下折扣者之后启动的过程,如下右图:

(3)getAndCreateMQClientInstance:初始所谓MQ客户末端重构。

(4)offsetStore.load:根据相同除此以外的消息的控制系统成立折扣延迟offsetStore并启动时:BROADCASTING-播送的控制系统,同一个折扣group之前的consumer都折扣一次,CLUSTERING-空降兵的控制系统,选项方式也,只被折扣一次。

switch ( this.defaultMQPushConsumer.getMessageModel) { case BROADCASTING: this.offsetStore = new LocalFileOffsetStore( this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup); break; case CLUSTERING: this.offsetStore = new RemoteBrokerOffsetStore( this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup); break; default: break; }

可以通过setMessageModel方式也另设相同的控制系统;播送的控制系统则有折扣第一组的折扣者两者之间独立,折扣延迟在本地单独进行时打印;空降兵的控制系统下,同一条除此以外的消息只但会被同一个折扣第一组折扣一次,折扣延迟但会参与到阻抗适度之前,折扣延迟是共享在整个折扣第一组之前的。

(5)consumeMessageService.start:根据相同除此以外的消息国家安全局种类重构所谓并之后启动。这里有延时除此以外的消息和左至右除此以外的消息。

这里主要说什么下左至右除此以外的消息,RocketMQ也帮我们借助于了,在之后启动时,如果是空降兵的控制系统并是左至右种类,它但会之后启动除此以外训练任务,除此以外向broker转发低成本锁,努出意味着左至右折扣转回的除此以外的消息字段,左至右除此以外的消息因为小农装配除此以外的消息时登录了移去策略和除此以外的消息实例,只但会转回一个折扣字段。

除此以外训练任务转发低成本锁,努出意味着左至右除此以外的消息字段。

publicvoidstart( ) { if(MessageModel.CLUSTERING. equals(ConsumeMessageOrderlyService. this.defaultMQPushConsumerImpl.messageModel)) { this.scheduledExecutorService.scheduleAtFixedRate( newRunnable { @ Override publicvoidrun( ) { ConsumeMessageOrderlyService. this.lockMQPeriodically; } }, 1000* 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS); } }

转发努出字段的除此以外的消息至broker,broker末端返国努出先前的字段论域lockOKMQSet,左至右除此以外的消息具体借助于可查看侧面第四节。

(6)mQClientFactory.registerConsumer:MQClientInstance申请折扣者,并之后启动MQClientInstance,很难申请先前但会结束折扣客户服务。

(7)mQClientFactory.start:再一但会之后启动如下客户服务:远程客户末端、除此以外训练任务、pull除此以外的消息客户服务、阻抗适度客户服务、push除此以外的消息客户服务,然后将正常改成运行之前。

switch ( this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; // If not specified,looking address from name server if( null== this.clientConfig.getNamesrvAddr) { this.mQClientAPIImpl.fetchNameServerAddr; } // Start request-response channel this.mQClientAPIImpl.start; // Start various schedule tasks this.startScheduledTask; // Start pull service this.pullMessageService.start; // Start rebalance service this.rebalanceService.start; // Start push service this.defaultMQProducer.getDefaultMQProducerImpl.start( false); log.info( "the client factory [{}] start OK", this.clientId); this.serviceState = ServiceState.RUNNING; break; case RUNNING: break; case SHUTDOWN_ALREADY: break; case START_FAILED: thrownew MQClientException( "The Factory object["+ this.getClientId + "] has been created before, and failed.", null); default: break; }

全部之后启动完了后,整个折扣者也就之后启动好了,几周就可以对小农转发慢慢地的除此以外的消息进行时折扣了,那么是如何进行时除此以外的消息折扣的呢?相同的除此以外的消息的控制系统有何不同点呢?

三、pull/push 的控制系统折扣

3.1 pull的控制系统-DefaultMQPullConsumer

pull努取式折扣:运用并不一定适时初始化Consumer的努除此以外的消息作法从Broker客户应用程序努除此以外的消息、适时权由运用控制系统掌控,可以登录折扣的振幅,【实为标识符】如下右图:

DefaultMQPullConsumer consumer = newDefaultMQPullConsumer( "TestConsumer"); // 另设NameServer的URL consumer.setNamesrvAddr( "localhost:9876"); // 之后启动折扣者重构 consumer.start; //给与主题下所有的除此以外的消息字段,这里根据主题从nameserver给与的 Set mqs = consumer.fetchSubscribeMessageQueues( "Test"); for(MessageQueue queue: mqs) { //给与意味着字段的折扣振幅,登录折扣延迟offset,fromstore:从broker之前给与还是本地给与,true-broker longoffset = consumer.fetchConsumeOffset( queue, true); PullResult pullResult = null; while(offset < pullResult.getMaxOffset) { //第二个参数为tag,给与登录topic下的tag //第三个参数声称从哪个振幅下开始折扣除此以外的消息 //第四个参数声称一次最大努取多少个除此以外的消息 try{ pullResult = consumer.pullBlockIfNotFound( queue, "*", offset, 32); } catch(Exception e) { e.printStackTrace; System.out.println( "pull努取除此以外的消息败北"); } //标识符请注意,记录除此以外的消息振幅 offset = pullResult.getNextBeginOffset; //标识符请注意,这里为折扣除此以外的消息 } }

可以见到我们是适时努取topic并不相同下的除此以外的消息字段,然后基元它们,给与意味着折扣延迟并进行时折扣。

3.2 push的控制系统-DefaultMQPushConsumer

该的控制系统下Broker发来资料后但会适时推送给折扣末端,该折扣的控制系统一般同步性较较低,现在一般自荐适用该方式也,具体比如说可以观看第一章标题的公开demo。

它也是通过借助于pull方式也来借助于的,首必先,后面 2.4折扣者之后启动之后,再一但会之后启动努取除此以外的消息客户服务pullMessageService和阻抗适度rebalanceService客户服务,它们之后启动后但会一直有内核进行时折扣。

case CREATE_JUST: //...... // Start pull service this.pullMessageService.start; // Start rebalance service this.rebalanceService.start; //....... this.serviceState = ServiceState.RUNNING; break; case RUNNING: publicclassRebalanceServiceextendsServiceThread{ //初始所谓,请注意.... @Override publicvoid run { log.info( this.getServiceName + " service started"); while(! this.isStopped) { this.waitForRunning(waitInterval); //实在阻抗适度 this.mqClientFactory.doRebalance; } log.info( this.getServiceName + " service end"); } @Override publicString getServiceName { returnRebalanceService. class.getSimpleName; } }

然后根据每个topic,以及它真的左至右除此以外的消息的控制系统来实在rebalance。

具体实在法就是必先对Topic下的除此以外的消息折扣字段、折扣者Id进行时次序,然后用除此以外的消息字段的平均分摊线性,计算出待努取的除此以外的消息字段,将分摊到的除此以外的消息字段论域与processQueueTable实在一个截取对照,新的字段不包内含或已过期,则进行时移除 。

publicvoiddoRebalance(final booleanisOrder) { Map< String, SubionData> subTable = this.getSubionInner; if(subTable != null) { for(final Map.Entry< String, SubionData> entry : subTable.entrySet) { final Stringtopic = entry.getKey; try{ /根据 /每个topic,以及它真的左至右除此以外的消息的控制系统来实在rebalance this.rebalanceByTopic(topic, isOrder); } catch(Throwable e) { if(!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn( "rebalanceByTopic Exception", e); } } } } this.truncateMessageQueueNotMyTopic; }

rebalanceByTopic之前播送和空降兵的控制系统都但会指派updateProcessQueueTableInRebalance作法,再一但会递送劝告dispatchPullRequest,通过executePullRequestImmediately作法将pull劝告装入pull劝告字段pullRequestQueue之前, 注意,pull的控制系统下递送劝告作法dispatchPullRequest具体借助于是一个空作法,这里两者更大相同,push的控制系统借助于如下:

@ Override publicvoiddispatchPullRequest(List pullRequestList){ for(PullRequest pullRequest : pullRequestList) { this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest); log.info( "doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest); } }

然后如此一来PullMessageService之前,因为后面consumer之后启动先前了,PullMessageService内核但会同步去取pullRequestQueue之前的pull劝告。

@Override publicvoid run { log.info( this.getServiceName + " service started"); while(! this.isStopped) { try{ PullRequest pullRequest = this.pullRequestQueue.take; if(pullRequest != null) { this.pullMessage(pullRequest); } } catch(InterruptedException e) { } catch(Exception e) { log.error( "Pull Message Service Run Method exception", e); } } log.info( this.getServiceName + " service end"); }

努出来的pull劝告又但会经由DefaultMQPushConsumerImpl的除此以外的消息国家安全局类,初始化pullMessage作法。

privatevoidpullMessage( finalPullRequest pullRequest) { finalMQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup); if(consumer != null) { DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer; impl.pullMessage(pullRequest); } else{ log.warn( "No matched consumer for the PullRequest {}, drop it", pullRequest); } }

pullMessage之前pullKernelImpl有一个Pullback作法用于指派除此以外的消息的难以实现,它但会通过submitConsumeRequest这个作法来管控除此以外的消息,总而言之就是 通过内核难以实现的方式也让push的控制系统下的国家安全局器必需感受到。

//Pull难以实现 PullCallback pullCallback = newPullCallback { @Override publicvoidonSuccess(PullResult pullResult){ if(pullResult != null) { pullResult = DefaultMQPushConsumerImpl. this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue, pullResult, subionData); switch(pullResult.getPullStatus) { caseFOUND: //请注意...折扣振幅更新的 DefaultMQPushConsumerImpl. this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList, processQueue, pullRequest.getMessageQueue, dispathToConsume);

这个作法并不相同的相同折扣的控制系统显现出相同借助于,但都是但会实现一个折扣劝告ConsumeRequest,里面有一个run作法,实现完了后,但会把它装入到listener国家安全局器之前。

//国家安全局除此以外的消息 status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);

还忘了后面我们样例说明了了的申请国家安全局器难以实现管控作法吗?

我们可以其他用户侧面的consumeMessage作法,查看它在软件包内之前的借助于右边,发掘出它就回到了我们后面的 2.3申请难以实现借助于类里面了,整个工序真的通顺了呢?这个国家安全局器之前就但会发来push的除此以外的消息,努努出来进行时的业务折扣逻辑,后面是我们自己界定的除此以外的消息难以实现管控作法。

// 申请难以实现借助于类来管控从broker努取回来的除此以外的消息 consumer.registerMessageListener( newMessageListenerConcurrently { @ Override publicConsumeConcurrentlyStatus consumeMessage( List msgs, ConsumeConcurrentlyContext context) { System. out.printf( "%s Receive New Messages: %s %n", Thread.currentThread.getName, msgs); // 标上该除此以外的消息仍然被先前折扣 returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; } });

3.3 小结

push的控制系统相相当于pull的控制系统相同的是,实在阻抗适度时,pullRequest劝告但会装入pullRequestQueue,然后PullMessageService内核但会同步去努出这个劝告,将除此以外的消息存入ProcessQueue,通过内核难以实现的方式也让push的控制系统下的国家安全局器必需感受到,这样除此以外的消息从递送劝告到接收都是同步的,而pull的控制系统是折扣末端适时去努取登录除此以外的消息的,所需登录折扣延迟。

对于我们开发者来说,举例来说哪种的控制系统借助于我们的的业务逻辑相当合适呢?别急,必先让我们总结下他们的特点:

某种程度:

两者底层具体一样,push的控制系统也是基于pull的控制系统来借助于的。 pull的控制系统所需我们通过控制系统适时通过consumer向broker努除此以外的消息,而除此以外的消息的push的控制系统则只所需我们发放一个listener国家安全局,同步给与除此以外的消息。

优点:

push的控制系统采用稍长轮询阻塞的方式也给与除此以外的消息,同步性非常较低; push的控制系统rocketMQ管控了给与除此以外的消息的细节,适用好像相当简单方便; pull的控制系统可以登录折扣延迟,打算折扣多少就折扣多少,机动性大。

缺点:

push的控制系统当折扣者能力也远超小于小农能力也的时候,但会产生一定的折扣者除此以外的消息堆积; pull的控制系统同步性很低,频率还好另设; 努取除此以外的消息的有规律还好另设,较短则产生很多违宪Pull劝告的RPC开销,影响MQ整体的网络精度,太稍长则同步性差。

原则上布景:

对于客户服务末端装配除此以外的消息资料相当大时,而折扣末端管控相当复杂,折扣能力也比起较低时,这种可能会就原则上pull的控制系统; 对于资料同步性拒绝较低的布景,就相当原则上与push的控制系统。

现在的你真的完了全一致的业务之前该适用哪种的控制系统了呢?

四、左至右除此以外的消息

4.1 借助于MQ左至右除此以外的消息转发假定疑虑

(1)一般除此以外的消息转发但会无视轮询方式也把除此以外的消息转发到相同的queue(两区字段);而折扣除此以外的消息的时候从多个queue上努取除此以外的消息,broker之间是无感受的,这种可能会转发和折扣是不能应有左至右。

(2)异步方式也转发除此以外的消息时,转发的时候不是按着一条一条左至右转发的,应有再不除此以外的消息到达Broker的有规律时间也是按照转发的左至右来的。

除此以外的消息转发到打印,再一到折扣要境遇这么多步骤,我们该如何在的业务之前适用左至右除此以外的消息呢?让咱们来一步步剩余下吧。

4.2 借助于MQ左至右除此以外的消息关键点

既然分散到多个broker上未左至右,那么可以掌控转发的左至右除此以外的消息只左至右转发到同一个queue之前,折扣的时候只从这个queue上左至右努取,则就应有了左至右。在转发时另设移去IP的控制系统,让相同key的除此以外的消息只落下登录queue上,然后折扣过程之前对左至右除此以外的消息所在的queue加锁,应有除此以外的消息的有序性,让这个queue上的除此以外的消息就按照FIFO左至右来进行时折扣。因此我们满足表列三个状况真的就可以呢?

1)除此以外的消息左至右转发:多内核转发的除此以外的消息未应有有序性,因此,所需的业务方在转发时,针对同一个的业务英文字母(如同一笔订单)的除此以外的消息所需应有在一个内核内左至右转发,在上一个除此以外的消息转发先前后,在进行时下一个除此以外的消息的转发。并不相同到mq之前,除此以外的消息转发作法就得适用同步转发,异步转发未应有左至右性。

//采用的同步转发方式也,在一个内核内左至右转发,异步转发方式也为:producer.send(msg, new SendCallback {...}) SendResult sendResult = producer.send(msg, newMessageQueueSelector { //…}

2)除此以外的消息左至右打印:MQ 的topic下但会假定多个queue,要应有除此以外的消息的左至右打印,同一个的业务英文字母的除此以外的消息所需被转发到一个queue之前。并不相同到mq之前,所需适用MessageQueueSelector来同样要转发的queue。即可以对的业务英文字母另设IP的控制系统,像根据字段数量对的业务字段hash取余,将除此以外的消息转发到一个queue之前。

//适用"%"操作,使得订单id取余后相同的资料IP到同一个queue之前,也可以自界定IP的控制系统 longindex = id % mqs.size; returnmqs. get(( int) index);

3)除此以外的消息左至右折扣:要应有除此以外的消息左至右折扣,同一个queue就仅仅被一个折扣者所折扣,因此对broker之前折扣字段加锁是未避免的。同一下一场,一个折扣字段仅仅被一个折扣者折扣,折扣者内外,也仅仅有一个折扣内核来折扣该字段。这里 RocketMQ仍然为我们借助于好了。

List pullRequestList = new ArrayList; for(MessageQueue mq : mqSet) { if(! this.processQueueTable.containsKey(mq)) { if(isOrder && ! this.lock(mq)) { log.warn( "doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq); continue; } //....请注意 } }

折扣者之后阻抗,并且分摊完了折扣字段后,所需向mq客户应用程序发起除此以外的消息努取劝告,标识符借助于在RebalanceImpl#updateProcessQueueTableInRebalance之前,针对左至右除此以外的消息的除此以外的消息努取,mq实在了以上断定,即折扣客户末端必向其broker末端发起对messageQueue的加锁劝告,只有加锁先前时才成立pullRequest进行时除此以外的消息努取,这里的pullRequest就是后面pull和push的控制系统除此以外的消息体,而updateProcessQueueTableInRebalance这个作法也是在后面折扣者之后启动过程之前有说什么到过哦。

具体加锁逻辑如下:

publicboolean lock( finalMessageQueue mq) { FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName, MixAll.MASTER_ID, true); if(findBrokerResult != null) { LockBatchRequestBody requestBody = new LockBatchRequestBody; requestBody.setConsumerGroup( this.consumerGroup); requestBody.setClientId( this.mQClientFactory.getClientId); requestBody.getMqSet.add(mq); try{ Set lockedMq = this.mQClientFactory.getMQClientAPIImpl.lockBatchMQ(findBrokerResult.getBrokerAddr, requestBody, 1000); for(MessageQueue mmqq : lockedMq) { ProcessQueue processQueue = this.processQueueTable. get(mmqq); if(processQueue != null) { processQueue.setLocked( true); processQueue.setLastLockTimestamp(System.currentTimeMillis); } } boolean lockOK = lockedMq.contains(mq); log.info( "the message queue lock {}, {} {}", lockOK ? "OK": "Failed", this.consumerGroup, mq); returnlockOK; } catch(Exception e) { log.error( "lockBatchMQ exception, "+ mq, e); } } returnfalse; }

可以见到,就是初始化lockBatchMQ作法转发了一个加锁劝告,先前给与到除此以外的消息管控字段就设为给与到锁,返国锁死先前,如果加锁先前,同一下一场只有一个内核进行时除此以外的消息折扣。加锁败北,但会延误1000ms之后先前向broker末端申请锁死messageQueue,锁死先前后之后提出折扣劝告。

怎么样,这样的加锁方式也真的很像我们平时用到的分布式锁呢?由你来设计借助于你但会怎么实在呢?

五、除此以外的消息ack控制系统

5.1 除此以外的消息折扣败北管控

除此以外的消息被折扣者折扣了,那么如何应有被折扣先前呢?除此以外的消息折扣败北但会显现什么可能会呢?

除此以外的消息被折扣,那么如何应有被折扣先前呢?这里只有适用方掌控,只有适用方获知先前了,才但会折扣先前,否则但会之后递送。

RocketMQ其实是通过ACK控制系统来对败北除此以外的消息进行时作答和知会的,具体工序如下右图:

除此以外的消息先前与否是由适用方掌控,只有适用方获知先前了,才但会折扣先前,否则但会之后递送,Consumer但会通过国家安全局器国家安全局难以实现慢慢地的除此以外的消息,返国ConsumeConcurrentlyStatus.CONSUME_SUCCESS声称折扣先前,如果折扣败北,返国ConsumeConcurrentlyStatus.RECONSUME_LATER正常(折扣作答),RocketMQ就但会选项为这条除此以外的消息败北了,延误一定有规律时间后(选项10s,可配置),但会如此一来次投送到ConsumerGroup,作答次数与有规律有规律时间关系上图右图。如果持续这样,败北到一定次数(选项16次),就但会重回到DLQ死信字段,不如此一来递送,此时可以通过监视人工来干预。

5.2 除此以外的消息重投造就疑虑

RocketMQ 折扣除此以外的消息因为除此以外的消息重投更大一个疑虑就是未应有除此以外的消息只被折扣一次,因此所需开发工具在的业务里面自己去管控。

六、总结

本文主要简述了RocketMQ的折扣者之后启动工序,紧密结合公开软件包内和比如说,一步步说什么述折扣者在之后启动和除此以外的消息折扣之前的的文书工作定律及内容,并紧密结合平时的业务文书工作之前,对我们所熟悉的左至右、push/pull的控制系统等进行时详细深入研究,以及对于除此以外的消息折扣败北和重投造就疑虑去进行时深入研究。

对于自己而言,希望通过适时研读软件包内方式也,必需明白其之前之后启动的定律,研读里面优秀的建议书,像对于pull/push,左至右除此以外的消息这些,研读之后必需获知push的控制系统是何如实在到同步努取除此以外的消息的,左至右除此以外的消息是如何应有的,如此一来就是必需联打算到平时遇到这种疑虑该如何管控,像左至右除此以外的消息在除此以外的消息被折扣时保持和打印的左至右一致,这里自己施加分布式锁写能不能借助于等,文之前也有很多引导性疑虑,希望能激起读者自己的探究,必需对整个折扣者之后启动和除此以外的消息折扣工序显现出较为直觉的理解,但还显现出一些种设计由于篇幅状况没实在出详细说明了,也欢迎大家两人探讨交流~

摘要:

RocketMQ官方比如说 RocketMQ系列之pull(努)除此以外的消息的控制系统(七) RocketMQ的左至右除此以外的消息(左至右折扣)

END

这里有除此以外的开源资讯、该软件更新的、技术干货等内容

点这里 ↓↓↓ 忘了 关注✔ 标星⭐ 哦~

夏天得了空调病怎么治
性功能障碍的症状表现
脉血康能治偏头痛吗
伤口用什么药愈合的快
治疗关节炎有什么好方法
治疗新冠特效药
新冠特效药叫什么名字
眼睛发痒干涩怎么办
痛风的快速止痛的药用啥好
颈椎病疼痛用什么药止痛
友情链接