Spring AMQP


Spring AMQP 是基於 Spring 框架的AMQP消息解決方案,提供模板化的發送和接收消息的抽象層,提供基於消息驅動的 POJO的消息監聽等,很大方便我們使用RabbitMQ程序的相關開發。

Spring AMQP包含一些模塊,如:spring-amqp, spring-rabbit and spring-erlang等,每個模塊分別由獨立的一些Jar包組成.

Spring AMQP模塊主要包含org.springframework.amqp.core這個包中。這個包定義的相關類主要是與前面講的AMQP模型相對應。Spring AMQP的目的是提供不依賴於任何特定的AMQP代理實現或客戶端庫通用的抽象。最終用戶代碼將很容易實現更易替換、添加和刪除AMQP,因為它可以只針對抽象層來開發。這可以很方便我們選擇和使用哪一個具體的broker實現,如sping-rabbit實現。

Message

Spring AMQP定義了Message類, 它定義了一個更一般的AMQP域模型,它是Spring AMQP的很重要的一部分,Message消息是當前模型中所操縱的基本單位,它由Producer產生經過Broker被Consumer所消費它是生產者和消費者發送和處理的對象。Message封裝了body及MessageProperties,很方便了API的調用,它的定義如下:

public class Message {

  private final MessageProperties messageProperties;
  private final byte[] body;
 
  public Message(byte[] body, MessageProperties messageProperties) {
    this.body = body;
    this.messageProperties = messageProperties;
  }
 
  public byte[] getBody() {
    return this.body;
  }
 
  public MessageProperties getMessageProperties() {
    return this.messageProperties;
  }
}

MessageProperties 定義了多種常用的屬性,如:messageId', 'timestamp', 'contentType'等等。

Exchange

Exchange接口代表了AMQP Exchange,它是消息發送的地方。在虛擬主機的消息協商器(Broker)中,每個Exchange都有唯一的名字

Exchange包含4種類型:Direct, Topic, FanoutHeaders不同的類型,他們如何處理綁定到隊列方面的行為會有所不同。

1)Direct類型: 允許一個隊列通過一個固定的Routing-key(通常是隊列的名字)進行綁定。 Direct交換器將消息根據其routing-key屬性投遞到包含對應key屬性的綁定器上。

2)Topic類型: 支持消息的Routing-key用*或#的模式,進行綁定。*匹配一個單詞,#匹配0個或者多個單詞。例如,binding key *.user.# 匹配routing key為 usd.user和eur.user.db,但是不匹配user.hello。

3)Fanout類型:它只是將消息廣播到所有綁定到它的隊列中,而不考慮routing key的值。

4)Header類型: 它根據應用程序消息的特定屬性進行匹配,這些消息可能在binding key中標記為可選或者必選

注意:AMQP規范要求,任何的消息協商器(broker)需要提供一個沒有名稱的"default" Direct Exchange。

它的相關定義如下:

public interface Exchange {
  String getName();
  String getExchangeType();
  boolean isDurable();
  boolean isAutoDelete();
  Map getArguments();
}

Queue

Queue,隊列,它代表了Message Consumer接收消息的地方,它用來保存消息直到發送給消費者。Queue有以下一些重要的屬性。

1)持久性:如果啟用,隊列將會在消息協商器(Broker)重啟前都有效。

2)自動刪除:如果啟用,那么隊列將會在所有的消費者停止使用之后自動刪除掉自身。

3)惰性:如果沒有聲明隊列,那么在執行到使用的時候會導致異常,並不會主動聲明。

4)排他性:如果啟用,隊列只能被聲明它的消費者使用。

public class Queue  {
 
    private final String name;
    private volatile boolean durable;
    private volatile boolean exclusive;
    private volatile boolean autoDelete;
    private volatile Map arguments;
 
    public Queue(String name) {
    this(name, true, false, false);
    }
 
    // Getters and Setters omitted for brevity    

Binding

生產者發送消息到Exchange,接收者從Queue接收消息,而綁定(Binging)是生產者和消費者消息傳遞的重要連接,它是連接生產者和消費者進行信息交流的關鍵。

Binging實例本身僅僅是代表持有連接的數據信息。不過它可以被AmqpAdmin這個類用來實際觸發broker上的綁定操作。同時它可以在程序啟動時,簡化 Queues, Exchanges, and Bindings的定義及一些操作。

1、下面我們看一下綁定Queue到Exchange的一些基本選項及例子。

你可以用一個固定的RoutingKey將Queue綁定到DirectExchange 

例如:new Binding(someQueue, someDirectExchange, "foo.bar")

2、也可以用*、#模式匹配的RoutingKey方式,綁定到TopicExchange。

例如:  new Binding(someQueue, someTopicExchange, "foo.*")

3、綁定到一個廣播的FanoutExchange,它沒有RoutingKey

如:  new Binding(someQueue, someFanoutExchange)

4、當然你也可以用fluent API的方式。

Binding b = BindingBuilder.bind(someQueue).to(someTopicExchange).with("foo.*");

ConnectionFactory

Spring AMQP中連接和資源的管理,尤其是spring-rabbit這個模塊,因為spring-rabbit是 RabbitMQ 的唯一支持的實現。

在spring-rabbit中,管理消息協商器(broker)連接的核心組件是ConnectionFactory這個接口。 ConnectionFactory提供了

org.springframework.amqp.rabbit.connection.Connection(com.rabbitmq.client.Connection的包裝類)實例的連接與管理。而CachingConnectionFactoryConnectionFactory的在Spring AMQP中唯一實現,它創建一個連接代理,使程序可以共享的連接。

Connection 提供一個createChannel的方法。CachingConnectionFactory 的實現能支持channels的緩存,並且能根據區分是事務性或非事務性各自獨立。同時,CachingConnectionFactory也提供hostname的構造函數,並且可以設置username、password、setChannelCacheSize等方法CachingConnectionFactory  默認channel cache 大小為1,如果想改變可以用setChannelCacheSize設置channel cache size的大小。

@Bean
public ConnectionFactory rabbitConnectionFactory() {
    CachingConnectionFactory factory = new CachingConnectionFactory(
            environment.getProperty("rabbitmq.host"),
            environment.getProperty("rabbitmq.port", Integer.class)
    );
    factory.setUsername(environment.getProperty("rabbitmq.username"));
    factory.setPassword(environment.getProperty("rabbitmq.password"));
    return factory;
}
 Connection connection = factory.createConnection();

Routing Connection Factory

從spring-rabbit 1.3版本開始,AbstractRoutingConnectionFactory 被引入進來,它提供了一個這樣的途徑來配置許多的不同的Connection Factory的映射,並且能夠根據運行時的lookupKey(通過綁定線程上下文的方式) 來決定使用哪個具體的Connection Factory

 為了方便Spring AMQP提供了 AbstractRoutingConnectionFactory 的具體實現SimpleRoutingConnectionFactory。它是從SimpleResourceHolder中獲得當前線程綁定的lookupKey

    Map factories = new HashMap<Object, org.springframework.amqp.rabbit.connection.ConnectionFactory>(2);
    factories.put("foo", connectionFactory1);
    factories.put("bar", connectionFactory2);

    AbstractRoutingConnectionFactory connectionFactory = new SimpleRoutingConnectionFactory();
    connectionFactory.setTargetConnectionFactories(factories);

    final RabbitTemplate template = new RabbitTemplate(connectionFactory);
    Expression expression = new SpelExpressionParser()
        .parseExpression("T(org.springframework.amqp.rabbit.core.RabbitTemplateTests)" +
            ".LOOKUP_KEY_COUNT.getAndIncrement() % 2 == 0 ? 'foo' : 'bar'");

AmqpTemplate

Spring AMQP提供了一個發送和接收消息的操作模板類AmqpTemplate。 AmqpTemplate它定義包含了發送和接收消息等的一些基本的操作功能。RabbitTemplate是AmqpTemplate的一個實現。

RabbitTemplate支持消息的確認與返回,為了返回消息RabbitTemplate 需要設置mandatory 屬性為true,並且CachingConnectionFactory 的publisherReturns屬性也需要設置為true返回的消息會根據它注冊的RabbitTemplate.ReturnCallback setReturnCallback 回調發送到給客戶端,

一個RabbitTemplate僅能支持一個ReturnCallback 。

為了確認Confirms消息, CachingConnectionFactory 的publisherConfirms 屬性也需要設置為true,確認的消息會根據它注冊的RabbitTemplate.ConfirmCallback setConfirmCallback回調發送到給客戶端。一個RabbitTemplate也僅能支持一個ConfirmCallback.

Sending messages

利用AmqpTemplate模板來發送消息。

AmqpTemplate提供了以下的幾個方法來發送消息

void send(Message message) throws AmqpException;

void send(String routingKey, Message message) throws AmqpException;

void send(String exchange, String routingKey, Message message) throws AmqpException;

發送時,需要指定Exchange和routingKey. 如:

amqpTemplate.send("marketData.topic""quotes.nasdaq.FOO",new Message("12.34".getBytes(), someProperties));

從Spring-Rabbit版本1.3開始,它提供了MessageBuilder  MessagePropertiesBuilder兩個類,這可以方便我們以fluent流式的方式創建Message及MessageProperties對象。

Message message = MessageBuilder.withBody("foo".getBytes())

.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)

.setMessageId("123")

.setHeader("bar""baz")

.build();

Receiving messages

接收消息可以有兩種方式。比較簡單的一種方式是,直接去查詢獲取消息,即調用receive方法,如果該方法沒有獲得消息,receive方法不阻塞,直接返回null。另一種方式是注冊一個Listener監聽器,一旦有消息到時來,異步接收。

注意: 接收消息,都需要從queue中獲得,接收消息時,RabbitTemplate需要指定queue,或者設置默認的queue。

API提供的直接獲得消息方法,如下

Message receive() throws AmqpException; 

Message receive(String queueName) throws AmqpException;

AmqpTemplate也提供了方法來直接接收POJOs對象(代替Message對象),同時也提供了各種的MessageConverter用來處理返回的Object對象。

Object receiveAndConvert() throws AmqpException;

Object receiveAndConvert(String queueName) throws AmqpException;

AmqpTemplate 從Spring-Rabbit 1.3版本開始也提供了 receiveAndReply 方法來異步接收、處理及回復消息。

AmqpTemplate 要注意receive和reply階段,大數時情況下,你需要提供僅僅一個ReceiveAndReplyCallback 的實現,用它來處理接收到消息和回復(對象)消息的業務邏輯。同時也要注意,ReceiveAndReplyCallback 可能返回null,在這種情況下,receiveAndReply 就相當於receive 方法。

boolean receiveAndReply(ReceiveAndReplyCallback callback) throws AmqpException;

boolean receiveAndReply(String queueName, ReceiveAndReplyCallback callback) throws AmqpException;

boolean receiveAndReply(ReceiveAndReplyCallback callback, String replyExchange, String replyRoutingKey) throws AmqpException;

boolean receiveAndReply(String queueName, ReceiveAndReplyCallback callback,String replyExchange, String replyRoutingKey) throws AmqpException;

Asynchronous Consumer異步接受消息

為了異步接收消息,Spring AMQP也提供了多種不同的實現方式。如:通過實現MessageListener的方式,或者通過注解@RabbitListener的方式等,來實現異步接受消息及處理。

MessageListener 異步接收消息,MessageListener 定義比較簡單。

public interface MessageListener {

void onMessage(Message message);

}

如果你的程序需要依賴Channel,你需要用 ChannelAwareMessageListener這個接口。

public interface ChannelAwareMessageListener {

void onMessage(Message message, Channel channel) throws Exception;

}

如果你想將你的程序及Message API嚴格分開的話,可以用MessageListenerAdapter這個適配器類

MessageListenerAdapter listener = new MessageListenerAdapter(somePojo);

listener.setDefaultListenerMethod("myMethod");

看一下Container 容器。一般地,容器是處於“主動”的責任,以使偵聽器回調可以保持被動觸發處理。容器是生命周期的組件,它提供了開始和停止容器的方法。在配置容器時,我們需要配置AMQP Queue和MessageListener的對應連接,需要配置ConnectionFactory 的一個引用以及Listener以及能夠從該Queue中接收消息的Queue的引用,這樣的Container才能通知到對應的Listener。

SimpleMessageListenerContainer containernew SimpleMessageListenerContainer();

container.setConnectionFactory(rabbitConnectionFactory);

container.setQueueNames("some.queue");

container.setMessageListener(new MessageListenerAdapter(somePojo));

RabbitMQ 的3.2版本開始, broker支持消費消息的priority 優先級。只需要在SimpleMessageListenerContainer 添加priority屬性的設置。

<rabbit:listener-container connection-factory="rabbitConnectionFactory">

<rabbit:listener queues="some.queue" ref="somePojo" method="handle" priority="10" />

利用注解@RabbitListener實現異步接受消息

從spring-rabbit 1.4版本開始,可心利用注解的@RabbitListener來異步接收消息,它是更為簡便的方式。

@Component

public class MyService {

@RabbitListener(queues = "myQueue")

public void processOrder(String data) { ... }

}

在容器中,需要配置@EnableRabbit,來支持@RabbitListener起作用。

@Configuration

@EnableRabbit

public class AppConfig { 

  @Bean

  public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {

    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();

    factory.setConnectionFactory(connectionFactory());

    factory.setConcurrentConsumers(3);

    factory.setMaxConcurrentConsumers(10);

    return factory;

  }

}

通過上面的配置,就配置好Listener和Container了,就可以異步接收消息了。

Message Converters

AmqpTemplate 定義提供了各種發送和接收委拖給MessageConverter轉化對象消息的方法。MessageConverter 本身比較簡單,它提供了消息對象的轉化,可將object轉化成Message 對象,或者將Message 對象轉化成Object對象。它提供了默認的SimpleMessageConverter實現,以及第三方的MessageConverter,如Jackson2JsonMessageConverter,MarshallingMessageConverter等,來處理消息與對象之間的轉換。 

SimpleMessageConverter

SimpleMessageConverter是Spring AMQP中MessageConverter的一個默認實現。

public interface MessageConverter {

  Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException; 

  Object fromMessage(Message message) throws MessageConversionException; 

}

為了自定義轉化對象,你也可以第三方的MessageConverter,如使用  Jackson2JsonMessageConverter 或者MarshallingMessageConverter,其中Jackson2JsonMessageConverter提供了Json對象的轉化,MarshallingMessageConverter對象則提供了對象的Marshaller和 Unmarshaller轉化。 

ContentTypeDelegatingMessageConverter

從Spring-Rabbit 1.4.2開始,它提供了ContentTypeDelegatingMessageConverter,它能根據不同的MessageProperties屬性(contentType)決定來委托給具體的哪一個MessageConverter。

代碼示例

import org.junit.Test;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;

public class CodeTest {
    @Test
    public void testSendAndReceive() {
    //創建ConnectionFactory //注意: guest的用戶只能夠在localhost 127.0.0.1進行測試
        String hostname = "localhost";
        String username = "mytest";
        String password = "mytest";
        String virtualHost = "/";
        CachingConnectionFactory cf = new CachingConnectionFactory(hostname);
        cf.setUsername(username);
        cf.setPassword(password);
        cf.setVirtualHost(virtualHost);

        RabbitAdmin admin = new RabbitAdmin(cf);

    //創建Exchange
        String exchangeName = "direct.test.exchange";
        DirectExchange exchange = new DirectExchange(exchangeName);
        admin.declareExchange(exchange);

    //創建Queue
        String queueName = "direct.test.queue";
        Queue queue = new Queue(queueName, true, false, false, null);
        admin.declareQueue(queue);

    //創建Binding
        String routingKey = "direct.test.queue";
        admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(routingKey));

    //創建RabbitTemplate
        RabbitTemplate rabbitTemplate = new RabbitTemplate(cf);
        rabbitTemplate.setExchange(exchangeName);
        rabbitTemplate.setQueue(queueName);

    //創建Message
        String messageStr = "this is direct message";
        Message message = MessageBuilder.withBody(messageStr.getBytes())
                .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN).build();
    //根據routingKey發送消息
        System.out.println("message=" + message);
        rabbitTemplate.send(routingKey, message);

    //接收消息
        Message resultMessage = rabbitTemplate.receive();
        System.out.println("resultMessage=" + resultMessage);
        if (resultMessage != null) {
            System.out.println("receive massage=" + new String(resultMessage.getBody()));
        }
    }
}

 異步接收消息實例

<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="direct.test.exchange" queue="direct.test.queue"/>
<rabbit:admin connection-factory="connectionFactory" />
<rabbit:direct-exchange name="direct.test.exchange" durable="true">
  <rabbit:bindings>
    <rabbit:binding queue="direct.test.queue" key="direct.test.queue">rabbit:binding>
  <rabbit:bindings>
<rabbit:direct-exchange/>
<rabbit:queue name="direct.test.queue" durable="true" auto-delete="false" exclusive="false"/>
<rabbit:connection-factory id="connectionFactory" username="mytest" password="mytest" host="127.0.0.1" port="5672"/>

<bean id="myMessageListener" class="com.company.rabbitmqpro.service.MyMessageListener">bean>
<bean id="simpleMessageListenerContainerFactory" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
  <property name="connectionFactory" ref="connectionFactory"/>
  <property name="queueNames" >
    <list>
      <value>direct.test.queuevalue>
    <list>
  <property/>
  <property name="messageListener" ref="myMessageListener">property>
  <property name="concurrentConsumers" value="3"/>
  <property name="maxConcurrentConsumers" value="10"/>
<bean/>

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class MyMessageListener implements MessageListener{
  @Override
  public void onMessage(Message message) {
    System.out.println("received: " + message);
  }
}

import javax.annotation.Resource;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { "classpath*:applicationContext.xml" })
public class ListenerTest {
  @Resource
  private RabbitTemplate rabbitTemplate;
  @Test
  public void testSendAsynListener() {
    String sendMsg = "this is direct message";
    Message message = MessageBuilder.withBody(sendMsg.getBytes())
          .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
          .build();
    String routingKey = "direct.test.queue";
    rabbitTemplate.send(routingKey, message);
    System.out.println("send ok");
  }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM