推荐阅读
这可能是全网Java学习路线最完整,最详细的版本了,没有之一
什么是消息中间件?
MQ: mssage queue;消息队列是应用程序和应用程序之间的通信方法
为什么使用MQ?
一些无需立即返回,且耗时的操作提取出来,进行异步处理,节省服务器请求响应时间
作用:异步解耦,削峰填谷
市场上常见的消息队列有如下:
ActiveMQ:基于JMS
ZeroMQ:基于C语言开发
RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好
RocketMQ:基于JMS,阿里巴巴产品
Kafka:类似MQ的产品;分布式消息系统,高吞吐量
RabitMQ 常见的工作模式:
1.简单模式:一个生产者,一个消费者,不需要定义交换机(默认交换机)
2.工作模式: 一个生产者,多个消费者(竞争关系)。不需要定义交换机(默认交换机)
3.订阅模式:一个生产者,多个消费者。需要设置类型为fanout的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列 (一个消息可以被多个消费者收到,只要订阅即可)
4.路由模式: 一个生产者,多个消费者。需要设置类型为direct的交换机,交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列
5.Topic 通配符模式: 需要设置类型为topic的交换机,交换机和队列进行绑定,并且指定通配符方式的routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列(与路由模式相比,RoutingKey可以使用通配符)
通配符说明:
#:匹配一个或多个词,
*:匹配不多不少恰好1个词)
例如:
item.#:能够匹配item.insert.abc 或者 item.insert
item.*:只能匹配item.insert
**交换机:**只负责转发消息,不具备存储消息的能力,如果队列与交换机绑定,没有符合路由规则的队列,那么消息会丢失。
1.广播式交换器类型(fanout):该类交换器不分析所接收到消息中的Routing Key,默认将消息转发到所有与该交换器绑定的队列中去。广播式交换器转发效率最高,但是安全性较低,消费者应用程序可获取本不属于自己的消息。
2.直接式交换器类型(direct):该类交换器需要精确匹配Routing Key与BindingKey,如消息的Routing Key = Cloud,那么该条消息只能被转发至Binding Key = Cloud的消息队列中去。直接式交换器的转发效率较高,安全性较好,但是缺乏灵活性,系统配置量较大。
3.主题式交换器(Topic Exchange):该类交换器通过消息的Routing Key与Binding Key的模式匹配,将消息转发至所有符合绑定规则的队列中。Binding Key支持通配符,其中“”匹配一个词组,“#”匹配多个词组(包括零个)。例如,Binding Key=“.Cloud.#”可转发Routing Key=“OpenStack.Cloud.GD.GZ”、“OpenStack.Cloud.Beijing”以及“OpenStack.Cloud”的消息,但是对于Routing Key=“Cloud.GZ”的消息是无法匹配的。
消息可靠性保证
消息投递流程如下:
1.生产者发送消息到交换机
2.交换机根据routingkey 转发消息给队列
3.消费者监控队列,获取队列中信息
4.消费成功删除队列中的消息
生产者可靠性消息投递
- confirm模式
生产者发送消息到交换机的时机 - return模式
交换机转发消息给queue的时机
spring: rabbitmq: host: localhost port: 5672 username: guest password: guest publisher-confirms: true # 默认为false publisher-returns: true # 默认为false
设置回调函数 callBack
@Componentpublic class MyConfirmCallback implements RabbitTemplate.ConfirmCallback { /** * * @param correlationData 消息信息 * @param ack 确认标识:true,MQ服务器exchange表示已经确认收到消息 false 表示没有收到消息 * @param cause 如果没有收到消息,则指定为MQ服务器exchange消息没有收到的原因,如果已经收到则指定为null */ @Override public void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause) { if(ack){ System.out.println("发送消息到交换机成功,"+cause); }else{ System.out.println("发送消息到交换机失败,原因是:"+cause); } }}
设置回调函数 return
@Componentpublic class MyReturnCallBack implements RabbitTemplate.ReturnCallback { /** * * @param message 消息信息 * @param replyCode 退回的状态码 * @param replyText 退回的信息 * @param exchange 交换机 * @param routingKey 路由key */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("退回的消息是:"+new String(message.getBody())); System.out.println("退回的replyCode是:"+replyCode); System.out.println("退回的replyText是:"+replyText); System.out.println("退回的exchange是:"+exchange); System.out.println("退回的routingKey是:"+routingKey); }}
消费者确认机制(ACK)
ACK机制:有三种方式
自动确认 acknowledge=“none”
手动确认 acknowledge=“manual”
根据异常情况来确认(暂时不怎么用) acknowledge=“auto”
其中自动确认是指:
当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。
其中手动确认方式是指:
则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()等方法,让其按照业务功能进行处理,比如:重新发送,比如拒绝签收进入死信队列等等。
消费端限流说明
如果并发量大的情况下,生产方不停的发送消息,可能处理不了那么多消息,此时消息在队列中堆积很多,当消费端启动,瞬间就会涌入很多消息,消费端有可能瞬间垮掉,这时我们可以在消费端进行限流操作,每秒钟放行多少个消息。这样就可以进行并发量的控制,减轻系统的负载,提供系统的可用性,这种效果往往可以在秒杀和抢购中进行使用。在rabbitmq中也有限流的一些配置。
spring: application: name: consumer01 rabbitmq: username: admin password: 123456 host: 127.0.0.1 port: 5672 virtual-host: test listener: simple: acknowledge-mode: manual #设置确认模式为手动确认 prefetch: 10 #设置消费端每秒拉取的消息数量为100 默认为250 用于消费端限流
手动签收消息:
@Component@RabbitListener(queues = "q4")public class RabbitMqListener { private static Integer count = 0; @RabbitHandler public void getMessage(Message message, Channel channel, String msg) { //接收消息 System.out.println("消费端接收消息:" + msg); try { //处理本地业务 System.out.println("处理本地业务开始======start======"); // Thread.sleep(2000); // int i = 1 / 0; System.out.println("处理本地业务结束======end======"); //签收消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 第一种:签收 // channel.basicAck() // 第二种:拒绝签收 批量处理 // channel.basicNack() // 第三种:拒绝签收 不批量处理 // channel.basicReject() } catch (Exception e) { e.printStackTrace(); //如果出现异常,则拒绝消息 可以重回队列 也可以丢弃 可以根据业务场景来 try { // 第三个参数 true 则重回队列 false 则丢弃 // 重试 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); // 直接丢弃 // channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); //channel.basicReject(message.getMessageProperties().getDeliveryTag(),false); } catch (Exception e1) { e1.printStackTrace(); } } }}
SpringBoot 整合 RabbitMQ
1.引入依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2.配置连接:
spring: application: name: producer rabbitmq: username: admin password: 123456 host: 127.0.0.1 port: 5672 virtual-host: test # 消息发送到交换机上 开启消息发送成功 确认机制 publisher-confirm-type: correlated # 路由转发到队列 开启消息接受 确认机制 publisher-returns: true
spring: application: name: consumer01 rabbitmq: username: admin password: 123456 host: 127.0.0.1 port: 5672 virtual-host: test listener: simple: acknowledge-mode: manual #设置确认模式为手动确认 prefetch: 10 #设置消费端每秒拉取的消息数量为100 默认为250 用于消费端限流
3.创建队列和对应的交换机
@Bean DirectExchange lonelyDirectExchange() { return new DirectExchange("lonelyDirectExchange"); } // 队列 @Bean public Queue topicQueue(){ return new Queue("topicQueue", true); } // 主题交换机 @Bean TopicExchange topicExchange(){ return new TopicExchange("topicExchange", true, false); } // 绑定 @Bean Binding bindingTopic(){ return BindingBuilder.bind(topicQueue()).to(topicExchange()).with("aiot.#"); }
4.发送消息
@GetMapping("/sendMessage") public Result sendDirectMessage() { String messageId = String.valueOf(IdUtil.fastUUID()); String messageData = "test message, hello!"; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Map<String,Object> map=new HashMap<>(); map.put("messageId",messageId); map.put("messageData",messageData); map.put("createTime",createTime); //将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange rabbitTemplate.convertAndSend("topicExchange", "aiot.huxiongjun", map); log.info("发送消息成功: "+map); return new Result(map); }
5.接受消息
@Configuration@RabbitListener(queues = "topicQueue")public class RabbitMQListener { @RabbitHandler public void getMessage(Message message, Channel channel, HashMap messageMap) { //接收消息 System.out.println("消费端接收消息:" + messageMap); }}
来源:https://blog.csdn.net/weixin_41861506/article/details/117194955?utm_medium=distribute.pc_category.none-task-blog-hot-19.nonecase&depth_1-utm_source=distribute.pc_category.none-task-blog-hot-19.nonecase