RabbitMQ发布串流模式
2023-04-20 人物
为了查询结果,将其之前的一个大众处理通告的并能所设的速度慢。常用查询是否所有大众都接管到了相同的数据。
开发设计大众备有类此备有文件主要动态是:
备有两个缓冲区-多种不同的大众可用多种不同的缓冲区消费通告。备有一个点对点 fanout类别的。备有两个Binding并不一定,常用将两个缓冲区全部的适配给这个fanout的点对点。package wj.mq.config.fanout;
import java.util.HashMap;
import java.util.Map;
import org.springframework.amqp.core.AnonymousQueue;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 接管者备有类
* 因为在正式的开发设计之前,要吗有接管者备有,要吗有发行者的备有,所以将这两个备有分离
*/
@Configuration
public class ReceiverConfig {
/**
* 回应缓冲区,因为是与fanout类别的交换机适配,所以不所需中文名称
*/
@Bean
public Queue receiverQueue1() {
Map args = new HashMap<>();
args.put("name", "Test");
Queue queue = new AnonymousQueue(args);
return queue;
}
/**
* 回应第二个任意中文名称的缓冲区
* @return
*/
@Bean
public Queue receiverQueue2() {
Map args = new HashMap<>();
args.put("name", "Test");
Queue queue = new AnonymousQueue(args);
return queue;
}
/**
* 回应fanout交换机
*/
@Bean
public FanoutExchange receiverExchange() {
return new FanoutExchange("FanoutExchange", true, false);
}
/**
* 回应适配彼此间
*/
@Bean
public Binding receiverBinding1(Queue receiverQueue1,FanoutExchange receiverExchange) {
Binding b = BindingBuilder.bind(receiverQueue1).to(receiverExchange);
return b;
}
/**
* 回应适配彼此间
*/
@Bean
public Binding receiverBinding2(Queue receiverQueue2,FanoutExchange receiverExchange) {
Binding b = BindingBuilder.bind(receiverQueue2).to(receiverExchange);
return b;
}
}
图片:
大众可用@RabbitListener释义回应两个大众
package wj.mq.rabbitmq.pubsub;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
import cn.hutool.core.thread.ThreadUtil;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Component
public class PubsubReceiver {
@RabbitListener(queues = "#{receiverQueue1.name}")
public void cusumer1(String msg) {
log.info("大众1,接管到:{}", msg);
}
@RabbitListener(queues = "#{receiverQueue2.name}")
public void cusumer2(@Payload() Message message, Channel channel) throws Exception {
ThreadUtil.sleep(1000);
String str = new String(message.getBody());
log.info("大众2,接管到:{}", str);
//对于扇出类别的通告,不所需确认,因为很难routingkey,且缓冲区也是随机的
//Long id = message.getMessageProperties().getDeliveryTag();
//channel.basicAck(id, false);
}
}
图片:
回应小农备有类小农发行通告,只所需一个点对点
package wj.mq.config.fanout;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class SenderConfig {
@Bean
public Exchange senderExchange() {
return new FanoutExchange("FanoutExchange",true,false);
}
}
图片:
回应小农这儿并很难可用调度动态,通过手动的命令行send动态,来发行数据
package wj.mq.rabbitmq.pubsub;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
@Component
public class PubsubSender {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
@Qualifier("senderExchange")
private Exchange senderExchange;
public void send() {
for (int i = 1; i <= 10; i++) {
String msg = "" + i;
rabbitTemplate.convertAndSend(senderExchange.getName(), "", msg);
}
}
}
图片:
开发设计ApplicationRunner此类常用在程序重新启动顺利完成后,命令行send函数,发送数据。
package wj.mq.config.runner;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.annotation.Configuration;
import lombok.extern.slf4j.Slf4j;
import wj.mq.rabbitmq.pubsub.PubsubSender;
@Slf4j
@Configuration
public class AppRunner implements ApplicationRunner {
@Autowired
private PubsubSender pubsubSender;
@Override
public void run(ApplicationArguments args) throws Exception {
log.info("程序重新启动顺利完成");
pubsubSender.send();
}
}
图片:
重新启动程序重新启动程序,查询除此以外负载的数据,虽然大众2处理的并能速度慢,但也同样接管到了所有的数据,因为在fanout这种交换电脑程式,时会将所有通告发送给所有适配到这个点对点的缓冲区,且时会相应忽左或许routingkey。
。迈普新胸腺法新怎么样儿童装健胃消食片
多维元素片的功效与作用
软肝保肝应该用哪种药
云南不孕不育医院
- 02-10参加公务员考试后才明白,满足3个条件的录取,更容易上岸
- 02-10“举报高速违章1分钟赚3000元”?最新答复
- 02-10女神双料冠军再产子!四年为大14岁富豪老公生下2胎,职业生涯终留憾
- 02-10“我想当科学家 造一个大航天器去看星星”——北京怀柔科学风幼儿园
- 02-10五家商场节前开业,“双节”北京商超客流再涨掀起吃饭吃潮
- 02-10首秀炸裂!狂轰20分10板5帽,敢打敢拼展现拼劲,无愧闵鹿蕾器重
- 02-10驻美使馆发布平安留学时时刻刻!
- 02-10山东泰山足球俱乐部回应外球迷“购票被骗”:公安已介入调查
- 02-10高洪波刚卸任即遭炮轰 董路:你狗眼看人低!看谁未来活得不够自由
- 02-10教育部:我国与58个国家签署学历文学士互认协议