Warning : Trying to access array offset on value of type bool in
/www/wwwroot/wql_luoqin_ltd/wp-content/themes/Sakura/inc/theme_plus.php on line
286
一,RabbitMQ的核心概念和基础架构
一,RabbitMQ四大核心组件
RabbitMQ消息中间件主要由四大核心部分组成:
1,生产者
产生数据并发送给消费者
2,交换机
交换机是RabbitMQ的一个非常主要的组成,一方面它接收来自生产者的消息,另一方面它将消息推送给队列,交换机必须确定知道如何处理它接收的消息,是将这些消息推送到特定队列还是推送给多个队列,这个由交换机的类型决定
3,队列
队列是RabbitMQ内部使用的一种数据结构,主要用于存放消息,然后再推送到应用程序消费者,队列的容量仅受内存和磁盘限制的约束,本质上是一个大的消息缓冲区,生产者可以将消息发送给队列,消费者尝试从队列接收数据
4,消费者
消费与接收具有数据
注:生产者和消费者开发并不在同一个服务端上,但一个服务端既可以是生产者也可以是消费者
二,RabbitMQ基础架构
RabbitMQ由七大部分组成:
Broker(RabbitMQ Server):接收和分发消息的应用,它内部封装了多个Virtual Host(虚拟主机)
Virtual Host:出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的NameSpace的概念,当多个不同的用户使用同一个RabbitMQ提供服务时,可以使用Virtual进行服务划分,每一个Virtual封装了多个交换机(Exchange)/队列(Queue)
Connection:负责生产者(producer)/消费者(Consumer)和Broker之间的TCP连接
Channel:如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销是巨大的,效率也低,采用Channel是在Connection建立逻辑连接,每一个Thread创建单独的Channel进行通讯,将极大的减少系统的开销
Exchange:message到达brock,根据分发规则将消息分发到具体的queue中,常见的Exchange类型有:direct,topic,fanout
Queue:缓冲消息并推送给Consumer
Binding:exchange和queue之间的虚拟连接,binding中包含routing key,binding消息被保存在exchange的查询表中,用于message的分发依据
二,RabbitMQ的安装配置
安装RabbitMQ需要先安装Erlang语言环境,还有一个Socat依赖包(这个依赖包通过yum安装)
1,下载Erlang和RabbitMQ的RPM包(CentOS)
Erlang RPM包下载地址:https://packagecloud.io/rabbitmq/erlang
Erlang源码包下载地址:http://erlang.org/download/?N=D
RabbitMQ下载地址:https://www.rabbitmq.com/install-rpm.html#downloads
注:Erlang和RabbitMQ是有版本兼容问题的,在https://www.rabbitmq.com/which-erlang.html 可看对应版本关系
2,我下载的是RabbitMQ3.9.7和Erlang23.3.4.7的RPM格式(tar.gz需要手动安装)
3,安装命令(注意先后顺序)
#安装erlang
rpm -ivh erlang-23.3.4.7-1.el7.x86_64.rpm
#安装socat依赖
yum install -y socat
#安装rabbitmq
rpm -ivh rabbitmq-server-3.9.7-1.el8.noarch.rpm
注:rpm安装默认安装在/usr/sbin目录中,centos中通过Systemctl命令可以直接启动,关闭,查看状态
4,启动RabbitMQ并查看状态
启动:systemctl start rabbitmq-server
RabbitMQ默认端口5672
查看状态:systemctl status rabbitmq-server
三,RabbitMQ Web可视化插件的安装
一,安装web插件
安装命令:rabbitmq-plugins enable rabbitmq_management
注:安装RabbitMQ web界面插件需要先关闭rabbitmq:systemctl stop rabbitmq-server
访问:IP:15672
二,添加用户并设置权限
1,创建一个web账号
命令:rabbitmqctl add_user 用户名 密码
#创建一个用户名为admin,密码为123的账号
rabbitmqctl add_user admin 123
2,设置用户角色
命令:rabbitmqctl set_user_tags 用户名 角色
角色有两种:
1,普通用户(默认创建时就是)
2,管理员(administrator)
#设置admin用户权限为管理员
rabbitmqctl set_user_tags admin administator
3,设置用户权限
命令:rabbitmqctl set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
conf:配置权限
write:写权限
read:读权限
*代表有设置该权限
注:管理员不需要设置,它的权限是拉满的
#设置普通用户wql的权限
rabbitmqctl set_permissions -p "/" wql "*" "*" "*"
4,查看用户列表
命令:rabbitmqctl list_users
5,进入web界面
四,RabbitMQ的Exchange路由策略
Exchange(交换机)是RabbitMQ的中间环节
Exchange的作用:主要有两个方面的作用
接受生产者发送的消息
转发消息,通过不同的转发策略,将消息发送给队列
注:Exchange只负责转发消息,不具备存储消息的能力,因此如果没有任何队列和Exchange绑定,那么消息就会丢失
Exchange的四种转发策略:
Fanout(默认):广播,将消息分发给所有绑定的队列
Driect:定向,将消息转发给符合指定routing key的队列
Topic:通配符,把消息转发给符合routing pattern(路由模式)的队列
Headers:参数匹配,指定消息中的Header作为参数,匹配路由队列
注:本质上转发策略的不同催生了不同的工作模式,如pub/sub模式就使用Fanout策略,Routing模式使用Driect策略,Topics模式使用Topic策略(工作模式是概念,落地实现是使用Exchange的转发策略) 一,Fanout Fanout为广播策略,默认Exhange和Queue绑定时,Routing key为""空字符就默认被广播策略 Fanout路由策略:将生产者发送的消息,转发给绑定该路由的全部队列
一,Fanout
Fanout为广播策略,默认Exhange和Queue绑定时,Routing key为""空字符就默认被广播策略
Fanout路由策略:将生产者发送的消息,转发给绑定该路由的全部队列
二,Driect
Driect使用Routing key定向匹配转发给 Queue
消息发送时必须绑定Routing key,创建队列时也必须绑定Routing key,两种都为空字符串时就为消息广播,指定特定的Routing key就可以指定转发对象
三,Topic
本质上Topic策略和Driect策略没有什么区别,都是基于Routing key做消息的指定转发,但Topic可以指定一个匹配规则而Dricet只能做固定匹配
Topic的匹配符:
四,Headers
Headers和前面三种都有一些不一样,它不依赖于Routing key而是依赖参数, 其中的header就是以下方法的arguments参数
消费发送时需要将Headers作为参数传入arguments,队列绑定时也需要Headers作为参数传arguments,Exchange转发时以Heades为依据进行匹配转发
五,RabbitMQ六种工作模式
RabbitMQ五种工作模式:
hello world模式(简单模式)
work queue模式(工作队列模式)
Publish/Subscribe模式(发布定阅模式)
Routing模式(路由模式)
Topics模式(主题模式)
注:其实本质上有六种还有一个RPC远程调用模式,但它实际上不太算MQ,RPC有专门的框架实现
一,RabbitMQ简单模式实现
简单模式也是最简单的hello world模式
简单模式只有一个生产者(Publish)和消费者(Consumer),一个队列(使用默认交换机和默认的virtual host /)
① 生产者代码:
public class publish_test {
public static void main(String[] args) throws IOException, TimeoutException {
//1,创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//2,设置工厂连接的参数
//连接主机IP
connectionFactory.setHost("192.168.68.133");
//端口号(RabbitMQ默认端口5672)
connectionFactory.setPort(5672);
//设置Virtual host虚拟机(默认为'/')
connectionFactory.setVirtualHost("/");
//连接rabbitmq的用户名(需要在rabbitmq中单独设置)
connectionFactory.setUsername("love");
//连接rabbitmq的密码
connectionFactory.setPassword("123");
//3,通过工厂创建Connection连接
Connection connection = connectionFactory.newConnection();
//4,通过连接创建一个逻辑连接
Channel channel = connection.createChannel();
/*
* 5,创建一个队列
*参数:
* String queue:队列名称
* boolean durable:是否持久化,持久化之后mq重启它依然存在
* boolean exclusive:1,是否独占占(只有一个消费者能监听) 2,Connection连接断开后是否删除该队列
* boolean autoDelete:是否在没有consumer监听时,自动删除队列
* Map<String, Object> arguments:其他参数信息
* 注:队列名不能重复
* */
channel.queueDeclare("FQ", false, false, false, null);
//消息
String message = "啦啦啦,RabbitMQ";
/*
* 6,生产者发送消息
* 参数:
* String exchange:交换机(不是使用的话默认为"")
* String routingKey:路由key(通过它让队列和交换机绑定),如果使用默认交换机它和队列名一样即可绑定
* BasicProperties props:配置信息
* byte[] body:具体消息
* */
channel.basicPublish("", "FQ", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
//7,关闭资源
channel.close();
connection.close();
}
}
② 消费者代码:
public class comsumer_test {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置参数
factory.setHost("192.168.68.133");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("love");
factory.setPassword("123");
//创建连接
Connection connection = factory.newConnection();
//创建Channel
final Channel channel = connection.createChannel();
//声明一个队列(消费者其实可以创建队列,使用和生产者一样的队列,如果队列创建将不再创建)
channel.queueDeclare("FQ", false, false, false, null);
//定义回调对象,需要传入chanel对象
Consumer consumer = new DefaultConsumer(channel) {
/*重写handleDelivery回调方法,当接收到消息会自动执行该方法
* String consumerTag:消息的标识
* Envelope envelope:获取交换机,路由key等信息
* AMQP.BasicProperties properties:配置信息
* byte[] body:真实的数据
* */
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("publish发送:" + new String(body));
}
};
/*消费消息
* 参数:
*String var1:消费的队列名称
*boolean var2:是否自动消息确认
*Consumer var3:回调对象
* */
channel.basicConsume("FQ", true, consumer);
//关闭连接
// channel.close();
//connection.close();
}
}
③ 查看web界面:
二,RabbitMQ工作队列模式实现
RabbitMQ工作队列模式其实和普通模式差不多,主要区别是它有多个消费者Consumer
应用场景:对于任务过重或任务较多的情况,使用工作队列模式多个消费者,可以提供任务的处理速度
注:普通工作模式和工作队列模式massage只能单消费,后面的pub/sub模式和routing模式消息可以被多个消费
① Util工具类代码:
public class workqueue_util {
public Connection getConnection() throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置参数
factory.setHost("192.168.68.133");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("love");
factory.setPassword("123");
//创建连接
Connection connection = factory.newConnection();
return connection;
}
}
② Publish生产者代码:
public static void main(String[] args) throws IOException, TimeoutException {
//创建Util工具类对象
workqueue_util workqueueUtil = new workqueue_util();
Connection connection = workqueueUtil.getConnection();
//创建channel逻辑连接对象
Channel channel = connection.createChannel();
//创建队列
channel.queueDeclare("FQ", false, false, false, null);
//消息实体
String message = "WorkQueueMold & RabbitMQ";
//生产发送消息
for (int a = 1; a < 10; a++) {
channel.basicPublish("", "FQ", null, new String(message + " " + a).getBytes());
}
//7,关闭资源
// channel.close();
// connection.close();
}
③ Consumer消费者1代码:
public static void main(String[] args) throws IOException, TimeoutException {
//创建Util工具类对象
workqueue_util workqueueUtil = new workqueue_util();
Connection connection = workqueueUtil.getConnection();
//创建channel逻辑连接对象
Channel channel = connection.createChannel();
//定义回调对象,需要传入chanel对象
Consumer consumer = new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("publish发送:" + new String(body));
}
};
//接收消息
channel.basicConsume("FQ", true, consumer);
//关闭连接
// channel.close();
// connection.close();
}
④ Consumer消费者2代码(和消费者1是一样的):
public static void main(String[] args) throws IOException, TimeoutException {
//创建Util工具类对象
workqueue_util workqueueUtil = new workqueue_util();
Connection connection = workqueueUtil.getConnection();
//创建channel逻辑连接对象
Channel channel = connection.createChannel();
//定义回调对象,需要传入chanel对象
Consumer consumer = new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("publish发送:" + new String(body));
}
};
//接收消息
channel.basicConsume("FQ", true, consumer);
//关闭连接
// channel.close();
// connection.close();
}
三,RabbitMQ发布订阅模式实现
发布订阅模式和之前普通模式和工作队列模式相比多了一个Exchange,之前用的是默认的Exchange交换机,发布订阅模式需要自定义交换机
发布订阅模式一个Exchange绑定多个队列,Exchanger有四种消息转发方式(默认广播),它可以将publish的一个消息广播给多个队列,Consumer订阅队列,接收队列中推送的消息
发布订阅模式,可以实现消息的多消费,将消息分发给多个队列,不同队列的相同内容可以被不同的Consumer消费
注:本质还是一个队列的消息只能被消费一次的原则,主要是通过不同队列来实现多消费
① Util工具类代码:和上面util是一样的只是类名和方法名不一样而已
② Publish生产者代码:
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接对象
Connection connection = new pub_sub_util().pub_sub_getConnection();
//获取channel逻辑逻辑
Channel channel = connection.createChannel();
/*创建一个Exchange交换机
* 参数:
* String exchange:交换机的名称
* BuiltinExchangeType type:路由的类型,有四种Fanout,Driect,Topic,Headers,这里是枚举类型
* boolean durable:是否持久化
* boolean autoDelete:是否自动删除
* boolean internal:内部使用,一般为false
* Map<String, Object> arguments:参数
* */
String exchange_name = "Love_FQ";//交换机名称
channel.exchangeDeclare(exchange_name, BuiltinExchangeType.FANOUT, true, false, false, null);
//创建两个队列分别为queue1,queue2
String queue1_name = "queue1"; //队列名称
String queue2_name = "queue2"; //队列名称
channel.queueDeclare(queue1_name, true, false, false, null);
channel.queueDeclare(queue2_name, true, false, false, null);
/*将队列和交换机进行绑定(之前的普通模式和工作队列模式使用的是默认交换机,不需要手动进行绑定,自定义交换机需要手动进行绑定)
* 参数:
* String queue:队列的名称
* String exchange:路由的名称
* String routingKey:指定routingKey,使用Fanout广播模式指定为""进行
* Map<String, Object> arguments:参数(一般使用Headers模式会用到)
* */
channel.queueBind(queue1_name, exchange_name, "", null);
channel.queueBind(queue2_name, exchange_name, "", null);
//创建生产者发送消息
String message = "WQL发送了一个级别为Wain的消息!!";
channel.basicPublish(exchange_name, "", null, message.getBytes());
//关闭连接
// channel.close();
// connection.close();
}
③ Consumer消费者代码(两个消费者):
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接对象
Connection connection = new pub_sub_util().pub_sub_getConnection();
//获取channel逻辑逻辑
Channel channel = connection.createChannel();
//Consumer1的回调
Consumer consumercallback1 = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("订阅" + envelope.getExchange() + "的queue1队列" + "\n" + "publish消息:" + new String(body));
}
};
//Consumer2的回调
Consumer consumercallback2 = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("订阅" + envelope.getExchange() + "的queue2队列" + "\n" + "publish消息:" + new String(body));
}
};
//创建两个Consumer分别绑定queue1队列和queue2队列
channel.basicConsume("queue1", true, consumercallback1);
channel.basicConsume("queue2", true, consumercallback2);
//关闭连接
// channel.close();
// connection.close();
}
四,RabbitMQ路由模式实现
模式说明:
队列与队列的绑定,不能进行直接任意绑定,而是需要指定一个RoutingKey(路由Key)
消息的发送方在向Exchange发送消息时,也必须指定消息的RoutingKey
Exchange不再把消息广播给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的RoutingKey与消息RoutingKey完全一致,才能接收消息
注:发布订阅模式,路由模式,主题模式等概念具体的实现都是根据Exchange的Fanout(广播),Driect(定向),Topics(通配符)来实现的(天上的理念不同,落地的实现相似)
① Util工具类代码:和上面util是一样的只是类名和方法名不一样而已
② Publish生产者代码:
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接对象
Connection connection = new topics_util().topics_getConnection();
//获取channel逻辑逻辑
Channel channel = connection.createChannel();
//声明Exchange交换机,路由策略为Topics
String exchange_name = "topics_exchange";
channel.exchangeDeclare("topics_exchange", BuiltinExchangeType.TOPIC, true, false, false, null);
//创建两个队列
String topic_queue = "topic_queue1";
String topic_queue1 = "topic_queue2";
channel.queueDeclare(topic_queue, true, false, false, null);
channel.queueDeclare(topic_queue1, true, false, false, null);
//交换机和队列绑定
channel.queueBind(topic_queue, exchange_name, "topic.#", null);
channel.queueBind(topic_queue1, exchange_name, "topic.*.java", null);
//生产者发送消息
String message = "topic模式:匹配top.#";
channel.basicPublish(exchange_name, "topic.www.java.we", null, message.getBytes());
//关闭连接
// channel.close();
// connection.close();
}
③ Consumer消费者代码(两个消费者):
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接对象
Connection connection = new topics_util().topics_getConnection();
//获取channel逻辑逻辑
Channel channel = connection.createChannel();
//Consumer1的回调
Consumer consumercallback1 = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("订阅" + envelope.getExchange() + "的topic_queue1队列" + "\n" + "publish消息:" + new String(body));
}
};
//Consumer2的回调
Consumer consumercallback2 = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("订阅" + envelope.getExchange() + "的topic_queue2队列" + "\n" + "publish消息:" + new String(body));
}
};
//创建两个Consumer分别绑定queue1队列和queue2队列
channel.basicConsume("topic_queue1", true, consumercallback1);
channel.basicConsume("topic_queue2", true, consumercallback2);
//关闭连接
// channel.close();
// connection.close();
}
五,RabbitMQ主题模式实现
模式说明:
主题模式和路由模式一样都是依赖Routing Key,但路由模式只能进行字符固定匹配,而主题模式可以通过模式匹配(创建一个主题,相同主题的queue可以内分发消息)
Exchange会将消息的Routing Key和队列的Routing Key进行模式匹配,匹配成功就进行消息的转发
主题模式通过Exchange的Topics路由策略实现
Topics策略有两个通配符:
① Util工具类代码:和上面util是一样的只是类名和方法名不一样而已
② Publish生产者代码:
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接对象
Connection connection = new topics_util().topics_getConnection();
//获取channel逻辑逻辑
Channel channel = connection.createChannel();
//声明Exchange交换机,路由策略为Topics
String exchange_name = "topics_exchange";
channel.exchangeDeclare("topics_exchange", BuiltinExchangeType.TOPIC, true, false, false, null);
//创建两个队列
String topic_queue = "topic_queue1";
String topic_queue1 = "topic_queue2";
channel.queueDeclare(topic_queue, true, false, false, null);
channel.queueDeclare(topic_queue1, true, false, false, null);
//交换机和队列绑定
channel.queueBind(topic_queue, exchange_name, "topic.#", null);
channel.queueBind(topic_queue1, exchange_name, "topic.*.java", null);
//生产者发送消息
String message = "topic模式:匹配top.#";
channel.basicPublish(exchange_name, "topic.www.java.we", null, message.getBytes());
//关闭连接
// channel.close();
// connection.close();
}
③ Consumer消费者代码(两个消费者):
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接对象
Connection connection = new topics_util().topics_getConnection();
//获取channel逻辑逻辑
Channel channel = connection.createChannel();
//Consumer1的回调
Consumer consumercallback1 = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("订阅" + envelope.getExchange() + "的topic_queue1队列" + "\n" + "publish消息:" + new String(body));
}
};
//Consumer2的回调
Consumer consumercallback2 = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("订阅" + envelope.getExchange() + "的topic_queue2队列" + "\n" + "publish消息:" + new String(body));
}
};
//创建两个Consumer分别绑定queue1队列和queue2队列
channel.basicConsume("topic_queue1", true, consumercallback1);
channel.basicConsume("topic_queue2", true, consumercallback2);
//关闭连接
// channel.close();
// connection.close();
}
Comments | 5 条评论
博主 JMiWi
那个handleDelivery方法是要重写的吧?求回复
博主 WQL
@JMiWi 今天突然看评论,不好意思,回晚了
博主 WQL
@JMiWi 原生API的消费者Consumer的实现只有DefaultConsumer类或者顶级的Consumer接口,有很多方法如handleDelivery回调方法,但不是必须的,你需要回调时就可以写不需要也可以不写,我写的方式是匿名内部类的方式实现的,可以把它的实现提取出来
博主 JMiWi
参数稳 多看了两边复盘
博主 JMiWi
今日份打卡RabbitMQ
Warning: Undefined variable $return_smiles in /www/wwwroot/wql_luoqin_ltd/wp-content/themes/Sakura/functions.php on line 1109