SpringBoot2.0應用(三):SpringBoot2.0整合RabbitMQ


如何整合RabbitMQ

1、添加spring-boot-starter-amqp

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>

2、添加配置

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.dynamic=true
spring.rabbitmq.cache.connection.mode=channel

3、注入隊列

@Configuration
public class RabbitConfig {
    @Bean
    public Queue Queue() {
        return new Queue("hello");
    }
}

4、創建操作數據的Repository對象

interface CityRepository extends Repository<City, Long> {

	Page<City> findAll(Pageable pageable);

	Page<City> findByNameContainingAndCountryContainingAllIgnoringCase(String name,
			String country, Pageable pageable);

	City findByNameAndCountryAllIgnoringCase(String name, String country);

}

5、創建消費者

@Component
public class RabbitConsumer {
    @RabbitHandler
    @RabbitListener(queues = "hello")
    public void process(@Payload String foo) {
        System.out.println(new Date() + ": " + foo);
    }
}

6、啟動主類

@SpringBootApplication
@EnableScheduling
public class AmqpApplication {
    public static void main(String[] args) {
        SpringApplication.run(AmqpApplication.class, args);
    }
}

控制台輸出:

Sun Sep 30 16:30:35 CST 2018: hello

到此,一個簡單的SpringBoot2.0集成RabbitMQ就完成了。
熟悉RabbitMQ的小伙伴們應該知道,RabbitMQ在一般的隊列基礎上,增加了ExChange的概念。ExChange有四種類型:Direct, Topic, Headers and Fanout。其中Headers實際很少使用,Direct較為簡單。接下來將詳細介紹如何使用topic和Fanout。

Topic Exchange

1、配置Topic規則

@Configuration
public class TopicRabbitConfig {

    @Bean
    public Queue queueMessage1() {
        return new Queue(MQConst.TOPIC_QUEUENAME1);
    }

    @Bean
    public Queue queueMessage2() {
        return new Queue(MQConst.TOPIC_QUEUENAME2);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange(MQConst.TOPIC_EXCHANGE);
    }

    @Bean
    Binding bindingExchangeMessage(Queue queueMessage1, TopicExchange exchange) {
        // 將隊列1綁定到名為topicKey.A的routingKey
        return BindingBuilder.bind(queueMessage1).to(exchange).with(MQConst.TOPIC_KEY1);
    }

    @Bean
    Binding bindingExchangeMessages(Queue queueMessage2, TopicExchange exchange) {
        // 將隊列2綁定到所有topicKey.開頭的routingKey
        return BindingBuilder.bind(queueMessage2).to(exchange).with(MQConst.TOPIC_KEYS);
    }
}

2、配置消費者

@Component
public class TopicConsumer {

    @RabbitListener(queues = MQConst.TOPIC_QUEUENAME1)
    @RabbitHandler
    public void process1(String message) {
        System.out.println("queue:topic.message1,message:" + message);
    }

    @RabbitListener(queues = MQConst.TOPIC_QUEUENAME2)
    @RabbitHandler
    public void process2(String message) {
        System.out.println("queue:topic.message2,message:" + message);
    }
}

3、生產消息

在Producer類中添加:

        // Topic
        rabbitTemplate.convertAndSend(MQConst.TOPIC_EXCHANGE, MQConst.TOPIC_KEYS, "from keys");
        rabbitTemplate.convertAndSend(MQConst.TOPIC_EXCHANGE, MQConst.TOPIC_KEY1, "from key1");

再次啟動主類,控制台輸出:

queue:topic.message2,message:from keys
queue:topic.message1,message:from key1
queue:topic.message2,message:from key1

Fanout Exchange

1、配置Fanout規則

@Configuration
public class FanoutRabbitConfig {
    @Bean
    public Queue MessageA() {
        return new Queue(MQConst.FANOUT_QUEUENAME1);
    }

    @Bean
    public Queue MessageB() {
        return new Queue(MQConst.FANOUT_QUEUENAME2);
    }

    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange(MQConst.FANOUT_EXCHANGE);
    }

    @Bean
    Binding bindingExchangeA(Queue MessageA, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(MessageA).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeB(Queue MessageB, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(MessageB).to(fanoutExchange);
    }
}

2.配置消費者

@Component
public class FanoutConsumer {

    @RabbitListener(queues = MQConst.FANOUT_QUEUENAME1)
    @RabbitHandler
    public void process1(String message) {
        System.out.println("queue:fanout.message1,message:" + message);
    }

    @RabbitListener(queues = MQConst.FANOUT_QUEUENAME2)
    @RabbitHandler
    public void process2(String message) {
        System.out.println("queue:fanout.message2,message:" + message);
    }
}

3、生產消息

在Producer類中添加:

        // FanOut
        rabbitTemplate.convertAndSend(MQConst.FANOUT_EXCHANGE, "", "fanout"); 

再次啟動主類,控制台輸出:

queue:fanout.message2,message:fanout
queue:fanout.message1,message:fanout

源碼地址:GitHub


本篇到此結束,如果讀完覺得有收獲的話,歡迎點贊、關注、加公眾號【貳級天災】,查閱更多精彩歷史!!!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM