首页 >> 人物 >> RabbitMQ发布串流模式

RabbitMQ发布串流模式

2023-04-20 人物

eiverConfig)。

为了查询结果,将其之前的一个大众处理通告的并能所设的速度慢。常用查询是否所有大众都接管到了相同的数据。

开发设计大众备有类

此备有文件主要动态是:

备有两个缓冲区-多种不同的大众可用多种不同的缓冲区消费通告。备有一个点对点 fanout类别的。备有两个Binding并不一定,常用将两个缓冲区全部的适配给这个fanout的点对点。

package wj.mq.config.fanout;

import java.util.HashMap;

import java.util.Map;

import org.springframework.amqp.core.AnonymousQueue;

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.FanoutExchange;

import org.springframework.amqp.core.Queue;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

/**

* 接管者备有类

* 因为在正式的开发设计之前,要吗有接管者备有,要吗有发行者的备有,所以将这两个备有分离

*/

@Configuration

public class ReceiverConfig {

/**

* 回应缓冲区,因为是与fanout类别的交换机适配,所以不所需中文名称

*/

@Bean

public Queue receiverQueue1() {

Map args = new HashMap<>();

args.put("name", "Test");

Queue queue = new AnonymousQueue(args);

return queue;

}

/**

* 回应第二个任意中文名称的缓冲区

* @return

*/

@Bean

public Queue receiverQueue2() {

Map args = new HashMap<>();

args.put("name", "Test");

Queue queue = new AnonymousQueue(args);

return queue;

}

/**

* 回应fanout交换机

*/

@Bean

public FanoutExchange receiverExchange() {

return new FanoutExchange("FanoutExchange", true, false);

}

/**

* 回应适配彼此间

*/

@Bean

public Binding receiverBinding1(Queue receiverQueue1,FanoutExchange receiverExchange) {

Binding b = BindingBuilder.bind(receiverQueue1).to(receiverExchange);

return b;

}

/**

* 回应适配彼此间

*/

@Bean

public Binding receiverBinding2(Queue receiverQueue2,FanoutExchange receiverExchange) {

Binding b = BindingBuilder.bind(receiverQueue2).to(receiverExchange);

return b;

}

}

图片:

大众

可用@RabbitListener释义回应两个大众

package wj.mq.rabbitmq.pubsub;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.messaging.handler.annotation.Payload;

import org.springframework.stereotype.Component;

import com.rabbitmq.client.Channel;

import cn.hutool.core.thread.ThreadUtil;

import lombok.extern.slf4j.Slf4j;

@Slf4j

@Component

public class PubsubReceiver {

@RabbitListener(queues = "#{receiverQueue1.name}")

public void cusumer1(String msg) {

log.info("大众1,接管到:{}", msg);

}

@RabbitListener(queues = "#{receiverQueue2.name}")

public void cusumer2(@Payload() Message message, Channel channel) throws Exception {

ThreadUtil.sleep(1000);

String str = new String(message.getBody());

log.info("大众2,接管到:{}", str);

//对于扇出类别的通告,不所需确认,因为很难routingkey,且缓冲区也是随机的

//Long id = message.getMessageProperties().getDeliveryTag();

//channel.basicAck(id, false);

}

}

图片:

回应小农备有类

小农发行通告,只所需一个点对点

package wj.mq.config.fanout;

import org.springframework.amqp.core.Exchange;

import org.springframework.amqp.core.FanoutExchange;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

@Configuration

public class SenderConfig {

@Bean

public Exchange senderExchange() {

return new FanoutExchange("FanoutExchange",true,false);

}

}

图片:

回应小农

这儿并很难可用调度动态,通过手动的命令行send动态,来发行数据

package wj.mq.rabbitmq.pubsub;

import org.springframework.amqp.core.Exchange;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.beans.factory.annotation.Qualifier;

import org.springframework.stereotype.Component;

@Component

public class PubsubSender {

@Autowired

private RabbitTemplate rabbitTemplate;

@Autowired

@Qualifier("senderExchange")

private Exchange senderExchange;

public void send() {

for (int i = 1; i <= 10; i++) {

String msg = "" + i;

rabbitTemplate.convertAndSend(senderExchange.getName(), "", msg);

}

}

}

图片:

开发设计ApplicationRunner

此类常用在程序重新启动顺利完成后,命令行send函数,发送数据。

package wj.mq.config.runner;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.ApplicationArguments;

import org.springframework.boot.ApplicationRunner;

import org.springframework.context.annotation.Configuration;

import lombok.extern.slf4j.Slf4j;

import wj.mq.rabbitmq.pubsub.PubsubSender;

@Slf4j

@Configuration

public class AppRunner implements ApplicationRunner {

@Autowired

private PubsubSender pubsubSender;

@Override

public void run(ApplicationArguments args) throws Exception {

log.info("程序重新启动顺利完成");

pubsubSender.send();

}

}

图片:

重新启动程序

重新启动程序,查询除此以外负载的数据,虽然大众2处理的并能速度慢,但也同样接管到了所有的数据,因为在fanout这种交换电脑程式,时会将所有通告发送给所有适配到这个点对点的缓冲区,且时会相应忽左或许routingkey。

迈普新胸腺法新怎么样
儿童装健胃消食片
多维元素片的功效与作用
软肝保肝应该用哪种药
云南不孕不育医院
TAG:模式
友情链接