spring-cloud-stream 整合 rabbitmq


1,依賴與配置

1pom.xml

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

2,配置文件相關內容,這里使用系統默認的兩個管道,output 和 input 分別對應 Source 和 Sink 兩個接口

# spring.cloud.stream.bindings.[output].destination:         交換機的名稱
# spring.cloud.stream.bindings.[output].group:               組,用於生成隊列,組名相同時可以實現分布式
# spring.cloud.stream.bindings.[input].destination:          交換機的名稱
# spring.cloud.stream.bindings.[input].group:                組,用於生成隊列,組名相同時可以實現分布式
# spring.cloud.stream.bindings.[input].consumer.concurrency: 消費者的並發量
# spring.rabbitmq.addresses:                                 服務器地址
# spring.rabbitmq.username:                                  賬號
# spring.rabbitmq.password:                                  密碼
# spring.rabbitmq.virtual-host:                              虛擬主機
spring:
    stream:
      default-binder: rabbit
      bindings:
        output:
          destination: order.exchange
          group: order.queue
        input:
          destination: order.exchange
          group: order.queue
          consumer:
            concurrency: 3
  rabbitmq:
    addresses: 192.168.200.100:5672
    username: rabbit
    password: 123456
    virtual-host: /

2,代碼部分

1,作為數據的實體類,注意需要實現 Serializable 接口

package com.hwq.rabbitmq.entity;

import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

import java.io.Serializable;

@Getter
@Setter
@ToString
public class Order implements Serializable {
    private String id;
    private String name;
}

2,消費者監聽

package com.hwq.rabbitmq.service;

import com.hwq.rabbitmq.entity.Order;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Service;

@EnableBinding(Sink.class)
@Service
public class InputService {

    @StreamListener(Sink.INPUT)
    public void receiveOrder(Order order) throws InterruptedException {
        Thread.sleep(1000);
        System.out.println("接收到消息:" + order);
    }

}

3,封裝發送消息的生產者

package com.hwq.rabbitmq.service;

import com.hwq.rabbitmq.entity.Order;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

import java.util.Map;

@EnableBinding(Source.class)
@Service
public class OutputService {

    @Autowired
    private Source source;

    public void sendOrder(Order order, Map<String, Object> properties) {
        MessageHeaders headers = new MessageHeaders(properties);
        Message<Order> message = MessageBuilder.createMessage(order, headers);
        boolean result = source.output().send(message);
        System.out.println("消息發送成功:" + result);
    }

}

4,測試的 控制器

package com.hwq.rabbitmq.controller;

import com.hwq.rabbitmq.entity.Order;
import com.hwq.rabbitmq.service.OutputService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;

@RequestMapping("queue")
@RestController
public class QueueController {

    @Autowired
    private OutputService outputService;

    /**
     * 往消息隊列中發送數據
     */
    @RequestMapping("send")
    public String send() {
        Order order = new Order();
        order.setId("123456789123456798");
        order.setName("你的訂單");
        for (int i = 0; i < 20; i ++) {
            outputService.sendOrder(order, new HashMap<>());
        }
        return "ok";
    }

}

3,啟動項目並訪問 http://ip:port/queue/send


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM