消息队MQ

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
可观测可视化 Grafana 版,10个用户账号 1个月
简介: 消息队MQ

文章描述

? @ 作者:Lion J
? @ 主页: https://blog.csdn.net/weixin_69252724
? @ 主题: 消息队列MQ_rabbitMQ搭建
?? @ 创作时间:2024年03月9日
————————————————

@TOC


举一个 电商的例子
在开发的一个场景中,用户下订单给订单服务,订单服务调用库存服务减产库存情况, 订单服务再下订单, 下完订单再通知用户订单信息

在这里插入图片描述


一、MQ是什么?

MQ 全称(Message Queue)又名消息队列,是一种提供消息队列服务的中间件,也称为消息中间件,是一套提供了消息生产、存储、消费全过程 API的软件系统(消息即数据)。通俗点说,就是一个先进先出的数据结构。

言简意赅的说,就是将服务中间某个行为步骤先抽取到一个容器,让容器去操作,不影响当前的服务

二、常见MQ中间件

ZeroMQ: 号称最快的消息队列系统,尤其针对大吞吐量的需求场景。扩展性好,
开发比较灵活,采用 C 语言 实现,实际上只是一个 socket 库的重新封装,如果做为消息队列使用,需要开发大量的代码。 ZeroMQ 仅提供非持久性的队列,也就是说如果 down 机,数据将会丢失。
RabbitMQ: 使用 erlang 语言开发,性能较好,适合于企业级的开发。但是不利于做二次开发和维护。
ActiveMQ: 历史悠久的 Apache 开源项目。已经在很多产品中得到应用,实现
了 JMS1.1 规范,可以和 springjms 轻松融合,实现了多种协议,支持持久化到数据库,对队列数较多的情况支持不好。
RocketMQ: 阿里巴巴的 MQ 中间件,由 java 语言开发,性能非常好,能够撑住双十一的大流量,而且使用起来 很简单。
Kafka: Kafka 是 Apache 下的一个子项目,是一个高性能跨语言分布式Publish/Subscribe 消息队列系统, 相对于 ActiveMQ 是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。

三、RocletMQ环境搭建

rabbitMQ搭建

  1. 下载解压
    https://rocketmq.apache.org/download/
  2. 配置环境变量
    ROCKETMQ_HOME=D:\ProgramFiles\rocketmq-4.9.3
    NAMESRV_ADDR =127.0.0.1:9876
    
  3. 启动Name Server
    进入到bin目录输入命令:
    mqnamesrv.cmd
  4. 启动Broker
    进入到 bin 目录输入命令:
    mqbroker.cmd -n 127.0.0.1:9876 atuoCreateTopicEnable=true

控制台安装启动

  1. 解压

在这里插入图片描述

  1. 修改其 src/main/resources 中的 application.properties 配置文件
    在这里插入图片描述
  2. 在解压目录 rocketmq-console 的 pom.xml 中添加如下 JAXB 依赖。
<dependency>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
<version>2.3.0</version>
</dependency>

<dependency>
<groupId>com.sun.xml.bind</groupId>
<artifactId>jaxb-impl</artifactId>
<version>2.3.0</version>
</dependency>

<dependency>
<groupId>com.sun.xml.bind</groupId>
<artifactId>jaxb-core</artifactId>
<version>2.3.0</version>
</dependency>

<dependency>
<groupId>javax.activation</groupId>
<artifactId>activation</artifactId>
<version>1.1.1</version>
</dependency>
  1. 打包_命令行进入到 rocketmq-console

mvn clean package -Dmaven.test.skip=true

  1. 打包后,进入 target 目录
    启动控制台 java -jar rocketmq-console-ng-1.0.0.jar

  2. 访问
    http://127.0.0.1:6060
    在这里插入图片描述

    四、RocketMQ架构

    在这里插入图片描述

其中Broker是RocketMQ的核心, 当Broker启动后, 就会向NameServer中注册自身消息, 然后Producer在NameServer中获取Broker的信息,然后向Broker发送投递消息; 消费者Consumer在NameServer中获取Broker消息之后就会从Broker中接收消息

NameServer,Broker,Producer,Consumer。
Broker(邮递员) Broker 是 RocketMQ 的核心,负责消息的接收,存储,投递等功能.
NameServer(邮局) 消息队列的协调者,Broker 向它注册路由信息,同时Producer 和 Consumer 向其获取路由信息
Producer(寄件人) 消息的生产者,需要从 NameServer 获取 Broker 信息,然后与 Broker 建立连接,向 Broker 发送消 息
Consumer(收件人) 消息的消费者,需要从 NameServer 获取 Broker 信息,然后与 Broker 建立连接,从 Broker 获取消息
Topic(地区) 用来区分不同类型的消息,发送和接收消息前都需要先创建Topic,针对 Topic 来发送和接收消息
Message Queue(邮件) 为了提高性能和吞吐量,引入了 Message Queue,一个 Topic 可以设置一个或多个 Message Queue,这样消息就可以并行往各个Message Queue 发送消息,消费者也可以并行的从多个 Message Queue 读取消息 Message Message 是消息的载体。
Producer Group 生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者组。
Consumer Group 消费者组,消费同一类消息的多个 consumer 实例组成一个消费者组。

五、java消息发送和接收演示

消息发送者

        public class MQProducerTest {
   
   
            public static void main(String[] args) throws Exception {
   
   
//1. 创建消息生产者, 指定生产者所属的组名
                DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
//2. 指定 Nameserver 地址
                producer.setNamesrvAddr("192.168.109.131:9876");
//3. 启动生产者
                producer.start();
//4. 创建消息对象,指定主题、标签和消息体
                Message msg = new Message("myTopic", "myTag",
                        ("RocketMQ Message").getBytes());
//5. 发送消息
                SendResult sendResult = producer.send(msg, 10000);
                System.out.println(sendResult);
//6. 关闭生产者
                producer.shutdown();
            }
        }

消息接收

        public class MQConsumerTest {
   
   
            public static void main(String[] args) throws Exception {
   
   
//1. 创建消息消费者, 指定消费者所属的组名
                DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumergroup");
//2. 指定 Nameserver 地址
                consumer.setNamesrvAddr("192.168.109.131:9876");
//3. 指定消费者订阅的主题和标签
                consumer.subscribe("myTopic", "*");
//4. 设置回调函数,编写处理消息的方法
                consumer.registerMessageListener(new MessageListenerConcurrently() {
   
   
                    @Override
                    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt>
                                                                            msgs,
                                                                    ConsumeConcurrentlyContext
                                                                            context) {
   
   
                        System.out.println("Receive New Messages: " + msgs);//返回消费状态
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                });
//5. 启动消息消费者
                consumer.start();
                System.out.println("Consumer Started.");
            }
        }

六、案例

在这里插入图片描述

订单微服务发送消息

  1. 添加rocketmq依赖
<!--rocketmq-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
  1. 添加配置
    在这里插入图片描述

  2. 编写测试代码

    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    rocketMQTemplate.convertAndSend("order-topic", order);
    

    用户微服务接收消息

  3. 添加依赖
         <!--rocketmq-->
         <dependency>
             <groupId>org.apache.rocketmq</groupId>
             <artifactId>rocketmq-spring-boot-starter</artifactId>
             <version>2.0.2</version>
         </dependency>
         <dependency>
             <groupId>org.apache.rocketmq</groupId>
             <artifactId>rocketmq-client</artifactId>
             <version>4.4.0</version>
         </dependency>
    
  4. 修配置文件
    在这里插入图片描述
  5. 编写消息接收服务
    @Service
    @RocketMQMessageListener(consumerGroup = "shop-user", topic = "order-topic")
    public class SmsService implements RocketMQListener<Order> {
         
         
     @Override
     public void onMessage(Order order) {
         
         
         System.out.println("收到一个订单信息:"+ JSON.toJSONString(order)+",接下来发送短信");
     }
    }
    
  6. 启动服务,执行下单操作,观看后台输出
    在这里插入图片描述

七、发送不同类型消息

RocketMQ 提供三种方式来发送普通消息:可靠同步发送、可靠异步发送、单向发送

可靠同步发送: 同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方 式。 此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。
可靠异步发送: 异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。发送 方通过回调接口接收服务器响应,并对响应结果进行处理。 异步发送一般用于链路耗时较长,对 RT 响应时间较为敏感的业务场景,例如用户视频上传后通知 启动转码服务,转码完成后通知推送转码结果等。
单向发送: 单向发送是指发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不 等待应答。 适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。

● 同步消息

//同步消息
//参数一: topic
//参数二: 消息内容
        SendResult sendResult = rocketMQTemplate.syncSend("test-topic-1", "这是一
                条同步消息");
                System.out.println(sendResult);

●发送异步消息

//参数一: topic
//参数二: 消息内容
//参数三: 回调函数, 处理返回结果
rocketMQTemplate.asyncSend("test-topic-1","这是一条异步消息",new

    SendCallback() {
   
   
        @Override
        public void onSuccess (SendResult sendResult){
   
   
            System.out.println(sendResult);
        }
        @Override
        public void onException (Throwable throwable){
   
   
            System.out.println(throwable);
        }
    });
//让线程不要终止
Thread.sleep(30000000)

●单向消息

rocketMQTemplate.sendOneWay("test-topic-1", "这是一条单向消息");
相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
9月前
|
消息中间件 存储 缓存
RabbitMq如何防止消息被重复消费
RabbitMq如何防止消息被重复消费
1017 0
|
消息中间件 存储 监控
ActiveMQ系列: ActiveMQ 的死信队列与消费重试机制
maximumRedeliveryDelay:最大传送延迟,只在 useExponentialBackOff 为 true 时有效(V5.5),假设首次重连间隔为 10ms,倍数为 2,那么第二次重连时间间隔为 20ms,第三次重连时间间隔为 40ms,当重连时间间隔大的最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。默认为 -1。
802 0
ActiveMQ系列: ActiveMQ 的死信队列与消费重试机制
|
消息中间件
RabbitMQ的死信队列和延时队列
RabbitMQ的死信队列和延时队列
|
5月前
|
消息中间件 存储 安全
mq 消费者监听经常断会出现丢消息的问题吗
在消息队列(MQ)系统中,消费者监听经常断开可能会导致消息丢失的问题,具体取决于消息队列系统的设计和配置,以及你的应用程序的处理方式。以下是一些可能导致消息丢失问题的情况: 1. **消费者断开连接:** 如果消费者监听过程中发生意外断开,例如网络故障、消费者应用程序崩溃等,那么在断开连接的瞬间,可能存在未被消费的消息。 2. **消息确认机制:** 消息队列通常提供消息确认机制,确保消息在被成功处理后才被从队列中移除。如果你的消费者应用程序在处理消息时没有发送确认,或者确认机制配置不正确,可能导致消息在被处理前被从队列中移除,从而丢失。 3. **持久化设置:** 消息队列通常提供持久
|
6月前
|
消息中间件 容器
RabbitMQ异常重启,部分消费队列不消费问题
RabbitMQ异常重启,部分消费队列不消费问题
126 0
|
10月前
|
消息中间件 监控 NoSQL
RocketMq普通消息,死信队列,消息幂等性(redis)
RocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。
179 0
|
10月前
|
消息中间件 运维 Java
rabbitMQ消息中间件的延时队列以及死信队列的使用和应用场景
rabbitMQ消息中间件的延时队列以及死信队列的使用和应用场景
|
12月前
|
消息中间件 Java 测试技术
【消息中间件】异常和死信消息们的浪浪山 2
【消息中间件】异常和死信消息们的浪浪山
|
12月前
|
消息中间件 Java Spring
【消息中间件】异常和死信消息们的浪浪山 1
【消息中间件】异常和死信消息们的浪浪山
|
12月前
|
消息中间件 Docker 容器
【消息中间件】异常和死信消息们的浪浪山 3
【消息中间件】异常和死信消息们的浪浪山
http://www.vxiaotou.com