Spring和SpringBoot整合RabbitMQ

发布于 2021-10-12  1.81k 次阅读


一,Spring整合RabbitMQ

一,Spring整合的依赖pom依赖和头文件

pom依赖:

<dependencies>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>5.2.17.RELEASE</version>
    </dependency>

    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>

    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-test</artifactId>
        <version>5.2.17.RELEASE</version>
    </dependency>

    <!--spring整合rabbitmq的插件包-->
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
        <version>2.2.18.RELEASE</version>
    </dependency>
</dependencies>

头文件:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:aop="http://www.springframework.org/schema/aop"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                            http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
                            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
                            http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

二,生产者配置使用

Spring整合后生产者发送消息只需要依赖RabbitMQTemplate对象

① 创建生产者工程(省略)

② 添加依赖(上面有pom依赖)

③ 配置整合

<!--加载properties配置文件-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>

<!--创建rabbitmq工厂,并配置连接信息-->
<rabbit:connection-factory id="factory" host="${rabbitmq.host}"
                           port="${rabbitmq.port}"
                           username="${rabbitmq.name}"
                           password="${rabbitmq.password}"
                           virtual-host="${rabbitmq.virtual-host}"/>

<!--定义管理交换机和队列封装过后的工厂,将上面工厂传入其中即可-->
<rabbit:admin connection-factory="factory"/>

<!--定义持久化队列,不存在则自动创建,不绑定到指定交换机则为默认""交换机,默认交换机路由类型为Direct
参数:
1,auto-declare:是否自动创建(一般都设置为true)
2,auto-delete:是否自动删除
3,durable:是否持久化
4,queue-arguments:参数
5,exclusive:是否排他性
-->
<rabbit:queue id="spring_queue" name="spring_queue" durable="true" auto-declare="true"/>

<!--广播策略-->
<!--定义广播交换机中的持久化队列,不存在则自动创建-->
<rabbit:queue id="spring_fanout_queue1" name="spring_fanout_queue1" durable="true" auto-declare="true"/>
<!--定义广播交换机中的持久化队列,不存在则自动创建-->
<rabbit:queue id="spring_fanout_queue2" name="spring_fanout_queue2" durable="true" auto-declare="true"/>

<!--定义广播类型交换机,并绑定上述两个队列-->
<rabbit:fanout-exchange id="spring_fanout_exchange" name="spring_fanout_exchange" auto-declare="true"
                        durable="true">
    <rabbit:bindings>
        <rabbit:binding queue="spring_fanout_queue1"/>
        <rabbit:binding queue="spring_fanout_queue2"/>
    </rabbit:bindings>
</rabbit:fanout-exchange>

<!--主题(通配符)策略-->
<!--定义要加入主题策略中的队列(三个队列)-->
<rabbit:queue id="spring_topic_queue1" name="spring_topic_queue1" auto-declare="true" durable="true"/>
<rabbit:queue id="spring_topic_queue2" name="spring_topic_queue2" auto-declare="true" durable="true"/>
<rabbit:queue id="spring_topic_queue3" name="spring_topic_queue3" auto-declare="true" durable="true"/>
<!--定义主题策略的交换机,并绑定队列-->
<rabbit:topic-exchange id="spring_topic_exchange" name="spring_topic_exchange" auto-declare="true">
    <rabbit:bindings>
        <!--pattren为路由通配规则-->
        <rabbit:binding pattern="wql.#" queue="spring_topic_queue1"/>
        <rabbit:binding pattern="wql.*" queue="spring_topic_queue2"/>
        <rabbit:binding pattern="wql.#.fq" queue="spring_topic_queue3"/>
    </rabbit:bindings>
</rabbit:topic-exchange>

<!--定义rabbitTemplate对象操作,可以在代码中方便的发送消息(关键对象)-->
<rabbit:template id="rabbitTemplate" connection-factory="factory"/>

④ 编写发送代码(只需要依赖RabbitMQTemplate)

//注入rabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void holleworld_test() {

    /*发送消息,只需要调用rabbittemplate中的方法即可
     * 参数:
     * String exchange:交换机名称
     * String routingKey:路由Key
     * Object object:消息实体(不需要转化为byte类型)
     * */
    rabbitTemplate.convertAndSend("spring_fanout_exchange", "", "通过RabbitMQTemplate的converrAndSennt发送消息");

}

三,消费者配置使用

消费者在Spring中书写非常方便就两步

  1. 创建监听器监听消费者bean
  2. bean对象实现MeassageListener
① 创建生产者工程(省略)

② 添加依赖(上面有pom依赖)

③ 配置整合

<!--加载properties配置文件-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>

<!--创建rabbitmq工厂,并配置连接信息-->
<rabbit:connection-factory id="factory" host="${rabbitmq.host}"
                           port="${rabbitmq.port}"
                           username="${rabbitmq.name}"
                           password="${rabbitmq.password}"
                           virtual-host="${rabbitmq.virtual-host}"/>

<!--消息者实现非常简单,定义一个bean将他放入rabbitmq监听器中即可,监听器声明它要消费的队列
注:实体baen需要实现MessageListener接口即可
-->
<bean id="spring_rabbit_listener" class="com.spring_rabbitmq.spring_rabbitmq_consumer"/>

<!--监听者-->
<rabbit:listener-container connection-factory="factory" auto-declare="true">
    <!--添加监听,并指定监听的队列-->
    <rabbit:listener ref="spring_rabbit_listener" queue-names="spring_fanout_queue1"/>

</rabbit:listener-container>

④ 编写监听器

public class spring_rabbitmq_consumer implements MessageListener {
    //通过这个onMessage方法获取数据
    @Override
    public void onMessage(Message message) {
        //message为数据对象
        System.out.println(new String(message.getBody()));
    }
}

⑤  测试

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
//测试消费者
public class consumer_test {

    //这个测试方法目的不是调用bean,通过ContextConfiguration运行这个配置文件即可
    @Test
    public void consumer_test_wql() {}
}

二,SpringBoot整合RabbitMQ

SpringBoot整合RabbitMQ,操作RabbitMQ比Spring好方便得多,和Spring一样SpringBoot也是通过操作RabbitTemplate来实现生产者发送消息的

SpringBoot配置类实现配置,有通过大量Build创建者模式实现的exchange,queue,binding

  • 生产者发送消息通过RabbitMQTemplate对象
  • 消费者消费消息通过RabbitMQListener注解(也可以实现MessageListener)

①,SpringBoot依赖

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

②,通过yml配置连接

spring:
  rabbitmq:
    host: 192.168.68.133
    port: 5672
    username: love
    password: 123
    virtual-host: /

③,通过@Configuration实现配置(大量使用创建者模式)

//配置类
//@Configuration
public class RabbitMQConfig  {

    //交换机
    @Bean("bootexchange")
    public Exchange bootexchange(){
        //通过ExchangeBuilder构建fanout类型名称为springboot_exchange的交换机
        return ExchangeBuilder.fanoutExchange("springboot_exchange").durable(true).build();
    }

    //队列
    @Bean("bootqueue")
    public Queue bootqueue(){
        //构建queue,名称在durable中指定
        return QueueBuilder.durable("springboot_queue").build();
    }

    //交接机合队列绑定
    @Bean
    public Binding bindexchangequeue(@Qualifier("bootqueue") Queue queue, @Qualifier("bootexchange") Exchange exchange){
    /*
    * bind:需要绑定的队列
    * to:需要绑定的交换机
    * with:指定routingkey
    * noargs:构建
    * and:参数
    * */
    return BindingBuilder.bind(queue).to(exchange).with("").noargs();
    }

}

④,生产者发送消息(通过RabbitMQTemplate对象)

@SpringBootTest

class SpringbootRabbitmqApplicationTests {

    //springboot和spring一样都是通过RabbitTemplate进行发送消息,只不管springboot不需要在配置文件中指定RabbitTemplate,直接可以注入

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    void contextLoads() {
        //发送消息
            rabbitTemplate.convertAndSend("springboot_exchange", "", "springboot整合rabbitmq!!!");

    }
}

⑤,消息者消费消息

注解:

@Component
public class rabbitlistener{

    @RabbitListener(queues = "springboot_queue")
    public void listenr_consumer(Message message){

        System.out.println(new String(message.getBody()));

    }}

接口:

@Component
public class rabbitlistener_wql implements MessageListener {
    @Override
    public void onMessage(Message message) {
        System.out.println(message.getBody());
    }
}

路漫漫其修远兮,吾将上下而求索