学习笔记之消息中间件(RabbitMQ)的学习

学习笔记之消息中间件(RabbitMQ)的学习

八月 03, 2020

本次的笔记整理的主要是在Linux下安装RabbitMQ,并且如果使用Java代码在IDEA中去使用RabbitMQ,如果对Linux不熟悉的话,请看我之前的笔记。

Linux的学习(一) Linux的学习(二)基本命令 Linux的学习(三)

本次演示的安装包版本为(centos 6.5):
otp_src_22.0.tar.gz
rabbitmq-server-generic-unix-3.7.17.tar.xz
安装包的百度云链接:https://pan.baidu.com/s/1mw4bW6RLxFm4RoV-i9vB3A
提取码:bnfy

本文主要内容

主要内容

  1. AMQP 简介
  2. RabbitMQ 简介
  3. RabbitMQ 原理
  4. Erlang 安装
  5. 安装 RabbitMQ
  6. RabbitMQ 账户管理
  7. 交换器

在这里插入图片描述

一、AMQP简介

1 AMQP 是什么?

AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是进程之间传递异步消息的网络协议。

2 AMQP 工作过程

发布者(Publisher)发布消息(Message),经过交换机(Exchange),交换机根据路由规则将收到消息分发给交换机绑定的队列(Queue),最后 AMQP 代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。
在这里插入图片描述

3 队列

队列是数据结构中概念。数据存储在一个队列中,数据是有顺序的,先进的先出,后进后出。其中一侧负责进数据,另一次负责出数据。
MQ(消息队列)很多功能都是基于此队列结构实现的
在这里插入图片描述

二、RabbitMQ简介

1 RabbitMQ 介绍

RabbitMQ 是由 Erlang 语言编写的基于 AMQP 的消息中间件。而消息中间件作为分布式系统重要组件之一,可以解决应用耦合,异步消息,流量削峰等问题。

同时符合当前互联网需求的高并发、大吞吐、多线程三个特点。所以它是一个特别重要的组件,特别是在分布式系统中,可以帮我们做一些解耦和、同步转异步、流量削峰等等。最核心的就是同步转异步。

1.1 解决应用耦合

1.1.1 不使用 MQ 时
在这里插入图片描述

1.1.2 使用 MQ 解决耦合
在这里插入图片描述
应用方式间接处理,类似于RPC远程调用一样,我们可以发现,不管是用RPC来写代码、或者是用RMI技术来写代码、还是用Dubbo技术来写代码,Provider和Consumer都是有关系的,他们的关系连接点是那个标准接口,而Provider和Consumer他们之间是有耦合度的,这个耦合度就在那个接口上,Provider没有实现接口的话,Consumer就不能去调用,Consumer没有去使用这个接口就找不到Provider。

2 RabbitMQ 适用场景(同步–>异步)

排队算法、秒杀活动、消息分发、异步处理、数据同步、处理耗时任务(离线处理)、流量销峰等。

举个例子:
比如说怎么做流量削峰呢?首先可以将上面两张图中的A看做是淘宝,相当于页面显示,而应用程序B相当于数据处理,也就是做数据的增删改查的,比如到了双十一,客户在页面找商品,然后下订单,由于双十一很多客户买东西,也就是并发更多、这个时候的消费比平时是更强的、产生流水量也是更大的,假如下订单峰值达到几亿个数据,那这个时候的压力太大了,那么这个时候如果去做数据库的写操作,这么高的并发所用的反应时间就会很长,比如说这个时间是5-10S,那么一个用户点击下订单的时候,页面就白了,等5-10s客户可能就不停的刷新,这样体验感就变得很差,如果是加服务器数量的话,双十一过去之后就没那么多人买东西了,这个时候就造成资源的浪费了,在这种情况下就有了MQ给你解决这个问题,当还是有几亿个用户下订单的时候,A就会把数据封装起来放到MQ中,然后A就会和客户说(返回一个页面),订单下定成功了请稍后结算,下订单到回复给客户的这个时间可能就只有1s左右的时间,在用户思考或者等待的时间当中,B应用在不停的往数据库中录入数据,这样用户的体验感就会好很多。

流量削峰其实就是将原来的同步等待的时间变成了异步,让用户不在一个空白的页面等待页面反应过来

再举个例子:我们最爱的抢红包
比如说:100个人抢10个红包,发红包时100个人一起点击红包去抢,100个客户端中找出手最快的10个人,于是先将消息放入到MQ当中,由于MQ中使用的是队列,特点是先进先出,将这100个请求放到队列当中,消费端将前十个处理掉,剩下90个就返回回去通知手慢了。

只要是想要将同步处理转为异步处理,都可以考虑MQ,整体来说就是并发的并行转成并发的串行,变成一个排队的机制。串行的话Consumer是可以集群的,所以速度不会变低。

三、 RabbitMQ 原理

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

四、 Erlang 安装

RabbitMQ 是使用 Erlang 语言编写的,所以需要先配置 Erlang
本次时在Linux下安装otp_src_22.0.tar.gz

前提准备:

1 修改主机名

RabbitMQ 是通过主机名进行访问的,必须指定能访问的主机名。我这里将主机名改为了oldou-mq
使用命令:

1
vim /etc/sysconfig/network

在这里插入图片描述
修改hosts:
使用命令:

1
2
vim /etc/hosts
新添加了一行,前面为服务器 ip,空格后面添加计算机主机名

注意:我这里的192.168.15.132是我的linux的IP地址,到时候填自己的IP地址就行了
在这里插入图片描述
修改好以上两个就可以进行下一步了。

2 安装依赖

执行以下命令:

1
yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel unixODBC unixODBC-devel

3 上传文件并解压

我在root目录下创建了一个temp目录,专门用于上传文件。
这里我使用的是lrzsz上传的文件,如果不会使用这个就去目录出查看我之前的笔记。

上传 otp_src_22.0.tar.gz 到/root/temp 目录中,进入目录并解压。
解压时注意,此压缩包不具有 gzip 属性,解压参数没有 z,只有 xf

1
2
3
cd /root/temp    到temp目录下
rz 使用rz命令进行上传,这里我之前安装了lrzsz
tar -xf otp_src_22.0.tar.gz 进行解压

4 配置参数

(1)先新建/usr/local/erlang 文件夹,作为安装文件夹

1
mkdir -p /usr/local/erlang

(2)进入解压后的文件夹中

1
cd otp_src_22.0

配置参数(直接复制执行)

1
./configure --prefix=/usr/local/erlang --with-ssl --enable-threads --enable-smp-support --enable-kernel-poll --enable-hipe --without-javac

以上配置的解释,
–prefix=/usr/local/erlang 这个是配置安装目录;
–with-ssl 这个是要不要提供加密;
–enable-threads 这个是开启多线程并发处理;
–enable-smp-support 这个是提供smp协议支持信息;
–enable-kernel-poll 这个是提供内核池处理能力,因为它使用了操作系统内核线程池,所以erlang在做高并发处理、多线程处理时才那么快;
–enable-hipe 这个是开启XX模式
–without-javac 这个是不提供javac的编译处理能力;

5 编译并安装

在解压后的文件目录下,直接使用以下命令编译并运行。
时间可能会比较长,要耐心等待。

1
make && make install

安装成功之后是下图所示:
在这里插入图片描述

之后可以进入到安装目录中查看,cd /usr/local/erlang/ 使用ls命令查看安装目录,然后在进入到bin目录下可以看到全是可执行文件。
在这里插入图片描述
退回到erlang目录下,执行bin/erl -version查看版本信息。
在这里插入图片描述
当你看到以上的版本号的时候就说明erlang安装成功了。
接下来去配置环境变量,让erlang可以在任意位置执行。

6 修改环境变量

1
2
3
4
5
6
7
8
(1)修改/etc/profile 文件
vim /etc/profile

(2)在文件中添加下面代码
export PATH=$PATH:/usr/local/erlang/bin

(3)运行文件,让修改内容生效
source /etc/profile

7 查看配置是否成功

1
erl -version

在这里插入图片描述
这个时候你不用在erlang的bin目录下也能执行erlang的一些文件命令了。

五、 安装RabbitMQ

本次安装的RabbitMQ的版本为:rabbitmq-server-generic-unix-3.7.17.tar.xz

1 上传文件并解压

上传 rabbitmq-server-generic-unix-3.7.17.tar.xz 到/root/temp 中

1
2
cd /root/temp
tar -xf rabbitmq-server-generic-unix-3.7.17.tar.xz

2 将解压后的文件移动到 local 下

1
mv rabbitmq_server-3.7.17/ /usr/local/rabbitmq

3 配置环境变量

1
2
3
4
5
6
7
vim /etc/profile

在文件中添加
export PATH=$PATH:/usr/local/rabbitmq/sbin

解析文件
source /etc/profile

4 开启 web 管理插件

1
2
3
4
5
6
7
8
进入 rabbitmq/sbin 目录
cd /usr/local/rabbitmq/sbin

查看插件列表
./rabbitmq-plugins list

生效管理插件
./rabbitmq-plugins enable rabbitmq_management

在这里插入图片描述
插件安装成功之后,在rabbitMQ的安装目录下的etc目录下可以查看到以下内容:
在这里插入图片描述

5 后台运行RabbitMQ

1
2
3
4
5
6
7
 启动 rabbitmq。
./rabbitmq-server -detached

停止命令 ./rabbitmqctl stop_app
如果无法停止,使用 kill -9 进程号进行关闭
使用这个命令查看进程 ps aux|grep rabbitmq
然后在使用 keil -9 进程号 杀掉进程就OK了

举个例子:
在这里插入图片描述
启动成功信息如下(两张图)
在这里插入图片描述
在这里插入图片描述

5.1 启动错误解决

如果启动 RabbitMQ 发生下述错误,可以提供环境配置文件,解决。环境配置文件命名为: rabbitmq-env.conf。所在位置是: $rabbitmq_home/etc/rabbitmq/目录。内容是: HOSTNAME=主机名称
在这里插入图片描述
在rabbitmq/etc/rabbitmq的安装目录下vim rabbitmq-env.conf
添加内容如下所示:
HOSTNAME=oldou-mq

注意:这里的oldou-mq是我之前配置的主机名称

6 查看 web 管理界面

默认可以在安装 rabbitmq 的电脑上通过用户名:guest 密码 guest 进行访问 web 管理界面
端口号:15672(放行端口,或关闭防火墙)
在虚拟机浏览器中输入:
http://ip:15672

启动成功之后,在关闭防火墙的前提下,去浏览器输入linux的主机IP和端口好就能进行访问。
在这里插入图片描述

六、 RabbitMQ 账户管理

1 创建账户

    先进入到sbin目录下:  cd /usr/local/rabbitmq/sbin
    语法:
    ./rabbitmqctl add_user username password

在这里插入图片描述
创建完之后的用户默认是没有权限的。

2 给用户授予管理员角色

其中 smallming 为新建用户的用户名
**./rabbitmqctl set_user_tags admin administrator**
在这里插入图片描述

3 给用户授权

    “/” 表示 RabbitMQ 根虚拟主机
    admin 表示用户名
    ".*" ".*" ".*" 表示完整权限
    ./rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"

在这里插入图片描述

4 登录

使用新建账户和密码在 windows 中访问 rabbitmq 并登录
在浏览器地址栏输入:
http://ip:15672/
用户名:oldou
密码:oldou
这样就能登录进去了。
在这里插入图片描述

七、 Exchange 交换器(交换机)

交换器负责接收客户端传递过来的消息,并转发到对应的队列中。在 RabbitMQ 中支持四种交换器

1. Direct Exchange:直连交换器(默认)
2. Fanout Exchange:扇形交换器
3. Topic Exchange:主题交换器

4. Header Exchange:首部交换器。

在 RabbitMq 的 Web 管理界面中 Exchanges 选项卡就可以看见这四个交换器。
在这里插入图片描述

1 direct 交换器(点对点)

在这里插入图片描述
direct 交换器是 RabbitMQ 默认交换器。默认会进行公平调度。所有接受者依次从消息队列中获取值。Publisher 给哪个队列发消息,就一定是给哪个队列发送消息。对交换器绑定的其他队列没有任何影响。
(代码演示)一个队列需要绑定多个消费者
需要使用注解/API:
org.springframework.amqp.core.Queue:队列
AmqpTemplate:操作 RabbitMQ 的接口。负责发送或接收消息
@RabbitListener(queues = “”) 注解某个方法为接收消息方法

接下来我们便使用代码来实现。

1.1 代码实现

1.1.1 新建项目(01rabbirMQdirect)

测试项目结构为:
在这里插入图片描述

1.1.1.1 添加父pom依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.oldou</groupId>
<artifactId>01rabbirMQ-direct</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
<module>amqp-rabbit-consumer</module>
</modules>

<!--添加依赖管理项-->
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.2.5.RELEASE</version>
<scope>import</scope>
<type>pom</type>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<!--SpringBoot提供的关于AMQP协议实现的启动器,可以使用AMQP协议快速访问MQ消息中间件-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

</dependencies>

</project>

然后在该项目下新建一个Model—- amqp-rabbit-consumer
然后在该文件下新建一个类InfoLogConsumer
1.1.1.2 编写消息类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.rabbit.consumer;

import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

/**
* 日志消息消费者,只消费 Info 日志
* 日志消息存储在队列中 log-info-queue
* 使用的交换器名称为 log-ex-direct
* 交换器类型是 direct
* 队列的路由键是 direct-rk-info
*
* 注解 @RabbitListener - 监听注释。可以描述类型和方法
* 类型 - 当前类型监听某个队列
* 方法 - 当前方法监听某个队列
* 属性 -
* bindings - QueueBinding[] 类型,代表这个类或者方法监听的队列、交换器、路由键的绑定方式。
*
* 注解 @QueueBinding
* 属性 -
* value - 绑定监听的队列是什么
* exchange - 队列对应的交换器是什么
* key - 队列的路由键是什么
* 注解 -
* @Queue - 描述一个队列
* 属性 -
* value|name - 队列名称
* autoDelete - 是否自动删除,默认为"",
* 如果队列名称定义,不自动删除;队列名称不定义,队列为自动删除队列
* 如果是自动删除,代表的所有的consumer关闭后,队列自动删除。
* @Exchange - 描述一个交换器
* 属性 -
* value|name - 交换器名称
* autoDelete - 是否自动删除,默认为"",就是自动删除,可选false,非自动删除
*/
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(value = "log-info-queue",autoDelete = "false"),
exchange = @Exchange(value = "log-ex-direct",type = "direct",autoDelete = "false"),
key = "direct-rk-info"
)
})
@Component
public class InfoLogConsumer {
/**
* 消息消费的方法。当队列log-inf-queue中出现消息,立即消费。
* @param msg 消息内容
*
* 注解 @RabbitHandler - 配合类型上的@RabbitListener注解
* 作用:标记当前的方法是一个监听消息队列,消费消息的方法。
*
*/
@RabbitHandler()
public void onMessage(String msg){
System.out.println("InfoLogConsumer 消费消息 "+ msg);
}
}

1.1.1.3 编写配置文件
新建 application.yml.
我们如果不配置这个文件的话,以下就为默认值:
host:默认值 localhost
username 默认值:guest
password 默认值:guest
配置文件内容如下:

1
2
3
4
5
6
7
8
spring:
rabbitmq:
host: 192.168.15.132 # RabbitMQ服务的地址,默认localhost
post: 5672 #RabbitMQ的端口,默认是5672 注意:5672是TCP协议的端口,15672是http协议的端口
username: oldou #访问RabbitMQ的用户名,默认是guest
password: oldou #访问RabbitMQ的密码,默认是guest
virtual-host: / #访问RabbitMQ的哪一个虚拟主机,默认为 /

注意:配置文件的名字必须要是application.yml,如果名称写错,那么访问的时候就一直时localhost。

编写消息的启动类:

1
2
3
4
5
6
7
8
@SpringBootApplication
public class RabbitConsumerApp {

public static void main(String[] args) {
SpringApplication.run(RabbitConsumerApp.class,args);
}

}

书写好之后,在Linux启动RabbitMQ的前提下,首先在浏览器访问192.168.15.132:15672,并且登录用户进入管理界面,然后在IDEA中启动消息的启动类。这个时候控制台的输出中就有一个本机的端口,在管理界面中刷新,可以看见该端口。
在这里插入图片描述

我们也可以使用另外一种方式来书写消息类:
在消息类的同一个包下新建一个类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* 日志消息消费者,消费 error和 warn 日志
* error日志消息存储在队列中 log-error-queue
* warn日志消息存储在队列中 log-warn-queue
* 使用的交换器名称为 log-ex-direct
* 交换器类型是 direct
* 队列的路由键是 direct-rk-error
* 队列的路由键是 direct-rk-warn
*/
@Component //加这个注解表示让Spring容器做一下加载
public class LogConsumer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(value = "log-error-queue"),
exchange = @Exchange(value = "log-ex-direct"),
key = "direct-rk-error"
)
})
public void onLogErrorMessage(String msg){
System.out.println("错误日志信息: " + msg);
}

@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(value = "log-warn-queue",autoDelete = "false"),
exchange = @Exchange(value = "log-ex-direct"),
key="direct-rk-warn"
)
})
public void onLogWarnMessage(String msg){
System.out.println("警告日志信息 "+msg);
}


}

重新启动以下启动类然后刷新一下网页可以看见:
在这里插入图片描述

1.1.1.4 编写生产者(消息生产者)
首先在父工程下新建一个Model,名为(amqp-rabbit-publisher),然后在该工程下编写消息的发送类:LogMessageSender

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.rabbit.sender;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
* 发送消息的类型。
* 把消息发送到RabbitMQ中。
* 在spring-boot-starter-amqp中,启动器会自动创建并初始化一个AmqpTemplate,
* 作为访问Amqp消息服务器(MQ中间件)的客户端对象
*/
@Component
public class LogMessageSender {
@Autowired
private AmqpTemplate template;

/**
* 发送消息的方法
* template.convertAndSend(String exchange,String rountingKey,Object message);
* exchange - 交换器名称
* rountingKey - 路由器
* message - 要发送的消息内容,就是传递的消息对象的消息体
*/
public void sendMessage(String exchange,String rountingKey,String message){
this.template.convertAndSend(exchange,rountingKey,message);
}

}

之后编写一个启动类:

1
2
3
4
5
6
7
8
@SpringBootApplication
public class RabbitPublisherApp {

public static void main(String[] args) {
SpringApplication.run(RabbitPublisherApp.class,args);
}

}

再然后编写一个测试类如1.1.1.5:
编写配置文件:

1
2
3
4
5
spring:
rabbitmq:
host: 192.168.15.132 # RabbitMQ服务的地址,默认localhost
username: oldou #访问RabbitMQ的用户名,默认是guest
password: oldou #访问RabbitMQ的密码,默认是guest

1.1.1.5编写测试类
SpringBoot 整合 Spring-AMQP 后包含内置对象 AmqpTemplate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com;

import com.rabbit.sender.LogMessageSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.Random;

/**
* 消息发送者测试类型
*/
@SpringBootTest(classes = RabbitPublisherApp.class)
@RunWith(SpringRunner.class)
public class TestPublisher {
@Autowired
private LogMessageSender sender;

private String exchange = "log-ex-direct";
private String rkInfo = "direct-rk-info";
private String rkError = "direct-rk-error";
private String rkWarn = "direct-rk-warn";

@Test
public void testSend(){
Random r = new Random();
//发送10条消息
for(int i=0;i<10;i++){
//rInt%3 - 0 :表示投递消息到Info; 1 表示投递消息到error; 2表示投递消息到warn
int rInt = r.nextInt(10);
if(rInt%3 == 0){
this.sender.sendMessage(exchange,rkInfo,"发送Info日志消息 - index = "+ i + "; rInt "+rInt);
}else if(rInt%3 == 1){
this.sender.sendMessage(exchange,rkError,"发送Error日志消息 - index = "+ i + "; rInt "+rInt);
}else{
this.sender.sendMessage(exchange,rkWarn,"发送Warn日志消息 - index = "+ i + "; rInt "+rInt);
}
}
}

}

首先启动消费者consumer,然后在进入到消息的消费者测试类中启动,这个时候就可以看见消费者拿到消息,控制台输出如下:
在这里插入图片描述
以上输出的信息为什么没有按照index=1/2/3/4/5…这样的顺序来输出呢?原因是因为处理的效率不一样,效率低的自然就处理时间长一点,因为输出就是这样。

2 fanout 交换器(广播)

在这里插入图片描述
扇形交换器,实际上做的事情就是广播,fanout 会把消息发送给所有的绑定在当前交换器上的队列。对应 Consumer 依然采用公平调度方式
(代码演示)一个交换器需要绑定多个队列
需要使用注解/API:
FanoutExchange:fanout 交换器
Binding:绑定交换器和队列
BindingBuilder:Binding 的构建器
amq.fanout:内置 fanout 交换器名称

2.1 代码

新建pojo工程,User类,在里面添加get/set方法、无参/带参构造、equals/hashcode、toString方法

项目总体结构如下:
在这里插入图片描述
2.1.1 Publisher
新建一个工程02rabbitMQfanout,然后新建一个消费者FanoutConsumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* 广播交换器,消费者
*/
@Component
public class FanoutConsumer {
/**
* 消费消息的方法
* @param user 消息体内容
*/
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(value = "queue-user-1",autoDelete = "false"),
exchange = @Exchange(value = "ex-fanout",type = "fanout",autoDelete = "false")
)
})
public void onMessage(User user){
System.out.println("onMessage run : " + user);
}

/**
* 消费消息的方法
* @param user 消息体内容
*/
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(value = "queue-user-2",autoDelete = "false"),
exchange = @Exchange(value = "ex-fanout",type = "fanout")
)
})
public void onMessage2(User user){
System.out.println("onMessage run : " + user);
}
}

然后编写配置文件:

1
2
3
4
5
spring:
rabbitmq:
host: 192.168.15.132
username: oldou
password: oldou

Pom文件中的依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>02rabbitMQfanout</artifactId>
<groupId>com.bjsxt</groupId>
<version>1.0-SNAPSHOT</version>
</parent>

<modelVersion>4.0.0</modelVersion>

<artifactId>consumer</artifactId>
<dependencies>
<dependency>
<artifactId>pojo</artifactId>
<groupId>com.bjsxt</groupId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>

</project>

编写测试类:

1
2
3
4
5
6
@SpringBootApplication
public class RabbitConsumer {
public static void main(String[] args) {
SpringApplication.run(RabbitConsumer.class,args);
}
}

启动之后可以在管理平台上看见,接下来便是去书写发送者。

2.1.2 编写发送方法
发送者代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package com.rabbit;

import com.entity.User;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
* 发送消息类型,消息发送到fanout交换器中
* 交换器名称是:ex-fanout
*/
@Component
public class UserMessageSender {
@Autowired
private AmqpTemplate template;

public void send(User user){
this.template.convertAndSend("ex-fanout","",user);
}

}

启动类:

1
2
3
4
5
6
7
8
9
10
11
package com;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitSender {
public static void main(String[] args) {
SpringApplication.run(RabbitSender.class,args);
}
}

配置文件:

1
2
3
4
5
spring:
rabbitmq:
host: 192.168.15.132
username: oldou
password: oldou

测试类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com;

import com.entity.User;
import com.rabbit.UserMessageSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

/**
* 消息发送者测试类型
*/
@SpringBootTest(classes = RabbitSender.class)
@RunWith(SpringRunner.class)
public class TestPublisher {
@Autowired
private UserMessageSender sender;

@Test
public void testSendUserMessageToFanout(){
for(int i=0;i<3;i++){
User user = new User();
user.setId((long)i);
user.setName("姓名 - " + i);
user.setAge(20 + i);
this.sender.send(user);
}
}

}

pom文件:
···



02rabbitMQfanout
com.bjsxt
1.0-SNAPSHOT

4.0.0

<artifactId>publisher</artifactId>
<dependencies>
    <dependency>
        <artifactId>pojo</artifactId>
        <groupId>com.bjsxt</groupId>
        <version>1.0-SNAPSHOT</version>
    </dependency>
</dependencies>
···

运行消息生产者,结果如下:
在这里插入图片描述

交换器把消息投入到两个队列当中,先进先出,那么两个Consumer消费消息的时候都是按照顺序依次消费,处理数据的时候是一起运行的,因为代码环境硬件都是一样的,而System.out.println是有缓存Buffer的,所以偶尔会有那么一两次是交叉输出的。

3 topic 交换器(主题交换)

它是最常用的交换器,也是功能最完整、功能最全的一种交换器,它能够实现点到点、点到面、点到部分的数据传输。
在这里插入图片描述
允许在路由键(RoutingKey)中出现匹配规则。
路由键的写法和包写法相同。com.bjsxt.xxxx.xxx 格式。
在绑定时可以带有下面特殊符号,中间可以出现:
* : 代表一个单词(两个.之间内容)
# : 0 个或多个字符
接收方依然是公平调度,同一个队列中内容轮换获取值。
需要使用注解/API:
TopicExchange:Topic 交换器
amq.topic:内置 topic 交换器名称

3.1 代码实现

项目结构为:
在这里插入图片描述
3.1.1 Publisher
首先新建一个子项目model—-topicPublisher
书写一段发送消息的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.rabbit;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
* 发送消息到主题交换器
*/
@Component
public class TopicSender {

@Autowired
private AmqpTemplate amqpTemplate;

/**
* 发送消息的方法
* @param exchange
* @param rountingKey
* @param message
*/
public void send(String exchange,String rountingKey,String message){
amqpTemplate.convertAndSend(exchange,rountingKey,message);
}
}

然后书写启动类:

1
2
3
4
5
6
7
8
9
10
11
package com;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class TopicRabbitMQSenderApp {
public static void main(String[] args) {
SpringApplication.run(TopicRabbitMQSenderApp.class,args);
}
}

书写测试类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com;

import com.rabbit.TopicSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.Random;

/**
* 消息发送者测试类型
*/
@SpringBootTest(classes = TopicRabbitMQSenderApp.class)
@RunWith(SpringRunner.class)
public class TestTopicSender {

@Autowired
private TopicSender topicSender1;

@Test
public void testSendMessageToTopic(){
//随机数%6
// 0 rk - user.rk.sms *.rk.* *.rk.sms
// 1 rk - user.rk.email *.rk.* *.rk.email
// 2 rk - order.rk.sms *.rk.* *.rk.sms
// 3 rk - order.rk.email *.rk.* *.rk.email
// 4 rk - reg.rk.sms *.rk.* *.rk.sms
// 5 rk - reg.rk.qq *.rk.*
Random r = new Random();

for(int i=0;i<10;i++){
int rInt = r.nextInt(100);
if(rInt%6 == 0){
this.topicSender1.send("ex-topic","user.rk.sms","用户登录验证码是123456 - 发送短信");
}else if(rInt%6 == 1){
this.topicSender1.send("ex-topic","user.rk.email","用户登录验证码是123456 - 发送到邮箱");
}else if(rInt%6 == 2){
this.topicSender1.send("ex-topic","order.rk.sms","订单下订成功 - 发送短信");
}else if(rInt%6 == 3){
this.topicSender1.send("ex-topic","order.rk.email","订单下订成功 - 发送到邮箱");
}else if(rInt%6 == 4){
this.topicSender1.send("ex-topic","reg.rk.sms","用户注册验证码是123456 - 发送短信");
}else if(rInt%6 == 5){
this.topicSender1.send("ex-topic","reg.rk.qq","用户注册验证码是123456 - 发送到QQ");
}
}
}

}

配置文件:

1
2
3
4
5
spring:
rabbitmq:
host: 192.168.15.132
username: oldou
password: oldou

3.1.2 Consumer
新建一个子项目topicConsumer,书写消费消息的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package com.rabbit;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* 主题消息消费者
*/
@Component
public class TopicConsumer {

/**
* 短信消息消费者,对应的rountingKey是user.rk.sms | order.rk.sms | reg.rk.sms等
* 分别代表 - 用户登录短信 | 订单下订成功通知短信 | 支付成功通知短信 | 注册码通知短信 等。
*/
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(value = "queue-sms-topic",autoDelete = "false"),
exchange = @Exchange(value = "ex-topic",type = "topic"),
key = "*.rk.sms"
)
})
public void onUserSMSMessage(String message){
System.out.println("用户短信消息内容是: " + message);
}



/**
* 路由键包括: user.rk.email | reg.rk.email | pay.rk.email 等
* @param message
*/
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(value = "queue-email-topic",autoDelete = "false"),
exchange = @Exchange(value = "ex-topic",type = "topic"),
key = "*.rk.email"
)
})
public void onUserEmailMessage(String message){
System.out.println("用户邮件消息内容是: " + message);
}


/**
* 所有的和rk相关的消息,统一处理消费
* 包含的路由键有:user.rk.sms | user.rk.email | reg.rk.sms | reg.rk.email 等
* 不发短信,不发邮件,作为一个日志记录工具存在
*/
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(value = "queue-all-topic",autoDelete = "false"),
exchange = @Exchange(value = "ex-topic",type = "topic"),
key = "*.rk.*"
)
})
public void onUserServiceMessage(String message){
System.out.println("执行的消息处理逻辑是: " + message);
}

}

主要是在配置路由键处,表示所有的,.rk.*就表示路由键中中间只要是rk的所有路由键都会执行这个方法。
测试类:

1
2
3
4
5
6
7
8
9
10
11
12
13
package com;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class TopicRabbitMQApp {

public static void main(String[] args) {
SpringApplication.run(TopicRabbitMQApp.class,args);
}

}

配置文件:

1
2
3
4
5
spring:
rabbitmq:
host: 192.168.15.132
username: oldou
password: oldou

先启动消费者,当消息生产者一旦发送消息时,消费者就会根据路由键的类型去取消息并且根据类型进行输出,只要符合消费消息的都进行输出,以上测试代码输出如下:
在这里插入图片描述

八、 传递对象类型参数

如果消息是对象类型,此对象的类型必须进行序列化,且需要给定序列化值
在这里插入图片描述

以上便是我学习RabbitMQ时整理的笔记,如果不正确的地方欢迎各位大佬们指正,上面的代码都是我测试过了的,如果出现异常或者是其他之类的错误,就需要好好检查一下代码、配置文件以及Linux的防火墙等等