amqp 和 exchange 詳細解釋


amqp  的 excange 字面意思是一個交換機。他的任務是吧 消息 分配給消息隊列。

amqp 的  exchange 有三種,分別是 Direct , fanout 和 toppic。三種。

  Direct:通過 Routing key 來分配消息 應該分配給那個消息隊列。在給交換機綁定 消息對列的時候需要指定  路由關鍵字,並且之歌路由關鍵字必須是不包含通配符。

      特點:消息明確,只有一個對列會消費這個消息

      官方解釋:轉發消息到routingKey中指定的隊列

            要求隊列綁定時使用的bindingKey和發送時使用routingKey的保持一致,保證只有key匹配的隊列中才可以進行收發消息

 

 

  fanout:把消息分給這個 交換機下面的所有 消息隊列,值得注意的是 fanout 類型的 綁定 消息對列的時候不需要指配  Routing key

      特點:分配給全部的綁定在這個交換機上的消息隊列。類似於發布訂閱機制。

      官方解釋:

        轉發消息到與該交換機綁定的所有隊列

        只要接收端和發送端使用同一個交換機,所有端都可以收發消息

 

 

  toppic: 上面理想個的綜合,把消息分配綁定在這個交換機上的多個消息隊列,但是 不一定是全部。可能一個也沒有,可能全部都有。通過  帶有通配符的 路由關鍵在來指定分配規則

      個 在 綁定 交換機 和queue 關系的時候 ,Routing key  配置成帶有 通配符的 。發消息的 時候 發一個明確的消息 Routing key ,這樣 這個消息就會分配到 合適的 消息隊列中了。

      特點:分配給 多個 消息隊列。可以靈活指定。

      官方解釋:

        轉發消息到所有關心routingkey中指定話題的隊列,只要隊列關心的主題(bindingkey)能與消息帶的routingkey模糊匹配,就可以將消息發送到該隊列。隊列綁定時提供的主題可以使用"*"和"#"來的表示關鍵字,"*"表示一個關鍵字,

        "#"代表0個或若干個關鍵  字。關鍵字之間用"."分隔,如:有routingkey:"log","log.out","log.a.bug"; bindingKey為"log.*"的隊列只能接收"log.out"的消息,而bindingKey為"log.#"的隊列可以接收前面三個消息。

 

 

   header:(我也沒用過 猜的)

      

      有時消息的路由操作會涉及到多個屬性,此時使用消息頭就比用路由鍵更容易表達,頭交換機(headers exchange)就是為此而生的。頭交換機使用多個消息屬性來代替路由鍵建立路由規則。

      通過判斷消息頭的值能否與指定的綁定相匹配來確立路由規則。

      我們可以綁定一個隊列到頭交換機上,並給他們之間的綁定使用多個用於匹配的頭(header)。這個案例中,消息代理得從應用開發者那兒取到更多一段信息,換句話說,它需要考慮某條消息(message)

      是需要部分匹配還是全部匹配。上邊說的“更多一段消息”就是"x-match"參數。當"x-match"設置為“any”時,消息頭的任意一個值被匹配就可以滿足條件,而當"x-match"設置為“all”的時候,就需要消息頭的所有值都匹配成功。

      頭交換機可以視為直連交換機的另一種表現形式。頭交換機能夠像直連交換機一樣工作,不同之處在於頭交換機的路由規則是建立在頭屬性值之上,而不是路由鍵。路由鍵必須是一個字符串,而頭屬性值則沒有這個約束,

      它們甚至可以是整數或者哈希值(字典)等。

      BindingBuilder.bind( queueWeixinPublicDL ).to( new HeadersExchange("") ).whereAny(new HashMap<>());

     

       解釋:如果 消息中有屬性匹配 header 那么就關系就成立 。

 

關於  amqp  消息的 傳遞流程:

  生產者連接到消息服務器(broker)  --> 生產者發送消息到 交換機( exchange) --> 交換機 把消息路由給綁定在其上的消息隊列(queue)  -->  消費者連接到 消息服務器 從 消息對列中取出消息- -> 消費並且告訴 消息隊列這個消息已經正常消費(ack)

 

 

一些 amqp 的 名詞解釋:

  • Broker: 接收和分發消息的應用,RabbitMQ Server就是Message Broker。
  • Virtual host: 出於多租戶和安全因素設計的,把AMQP的基本組件划分到一個虛擬的分組中,類似於網絡中的namespace概念。當多個不同的用戶使用同一個RabbitMQ server提供的服務時,可以划分出多個vhost,每個用戶在自己的vhost創建exchange/queue等。
  • Connection: publisher/consumer和broker之間的TCP連接。斷開連接的操作只會在client端進行,Broker不會斷開連接,除非出現網絡故障或broker服務出現問題。
  • Channel: 如果每一次訪問RabbitMQ都建立一個Connection,在消息量大的時候建立TCP Connection的開銷將是巨大的,效率也較低。Channel是在connection內部建立的邏輯連接,如果應用程序支持多線程,通常每個thread創建單獨的channel進行通訊,AMQP method包含了channel id幫助客戶端和message broker識別channel,所以channel之間是完全隔離的。Channel作為輕量級的Connection極大減少了操作系統建立TCP connection的開銷。
  • Exchange: message到達broker的第一站,根據分發規則,匹配查詢表中的routing key,分發消息到queue中去。常用的類型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)。
  • Queue: 消息最終被送到這里等待consumer取走。一個message可以被同時拷貝到多個queue中。
  • Binding: exchange和queue之間的虛擬連接,binding中可以包含routing key。Binding信息被保存到exchange中的查詢表中,用於message的分發依據。

 

 下面的例子都是 基於rabbit mq 的

傳統xml 方式的  配置方式:

<!-- 容邦消息隊列 -->	
	<rabbit:queue name="rbNotificationEventQueue" durable="true"
		auto-delete="false" exclusive="false" />
	<!-- 容邦消息死信隊列 -->	
	<rabbit:queue name="rbNotificationEventQueueDL" durable="true" auto-delete="false" exclusive="false">
	    <rabbit:queue-arguments>
	        <entry key="x-dead-letter-exchange" value="rbNotificationEventExchange"/>
	    	<entry key="x-dead-letter-routing-key" value="rbNotificationEventQueue"/>
	    </rabbit:queue-arguments>
    </rabbit:queue>	
		

	<!-- 容邦通知查詢事件交換機 -->
	<rabbit:direct-exchange name="rbNotificationEventExchange"
		durable="true" auto-delete="false">
		<rabbit:bindings>
			<rabbit:binding queue="rbNotificationEventQueue" key="rbNotificationEventQueue" />
			<rabbit:binding queue="rbNotificationEventQueueDL" key="rbNotificationEventQueueDL" />
		</rabbit:bindings>
	</rabbit:direct-exchange>
	
	<!--監聽配置 -->
	<rabbit:listener-container
		connection-factory="connectionFactory" acknowledge="manual">
		<rabbit:listener queues="rbNotificationEventQueue" ref="queueListenter" />
	</rabbit:listener-container>

  

解釋:定義了一個 直連交換機(rbNotificationEventExchange) ,上面綁定了兩個消息隊列。rbNotificationEventQueue 是一個正常隊列,rbNotificationEventQueueDL 是一個死信隊列,這個死信隊列 會在消息超時 的時候,自動轉發到指定的隊列。定義了一個消費者來消費rbNotificationEventQueue 里面的消息。

 

 

純配置類的配置方式:

@Configuration
public class RabbitMQConfig2 {

	public static final String sys_exchange  = "sys_exchange";

    
    public static final String queueMessageSysProfitRecordDay = "queueMessageSysProfitRecordDay";


    /**
     * 死信
     */
    public static final String queueMessageSysProfitRecordDay_DL = "queueMessageSysProfitRecordDay_DL";

    
    /**
     * 交換
     * @return
     */
    @Bean
    public DirectExchange sysExchange() {
        return new DirectExchange(RabbitMQConfig2.sys_exchange, true, false);
    }

    
    /**
     * 正常的隊列
     * @return
     */
    @Bean
    public Queue queueMessageSysProfitRecordDay() {
    	return new Queue( RabbitMQConfig2.queueMessageSysProfitRecordDay );
    }

    /**
     *  死信隊列
     */
    
    @Bean
    public Queue queueMessageSysProfitRecordDayDL() {
    	Map<String, Object> arguments = new HashMap<>();
		arguments.put("x-dead-letter-exchange", RabbitMQConfig2.sys_exchange);
		arguments.put("x-dead-letter-routing-key", RabbitMQConfig2.queueMessageSysProfitRecordDay);
		return new Queue(RabbitMQConfig2.queueMessageSysProfitRecordDay_DL, true, false, false, arguments);
    }

    
    /**
     * 綁定
     */
    @Bean
    public Binding  bindingMessageSysProfitRecordDay(Queue queueMessageSysProfitRecordDay ,DirectExchange sysExchange ) {
        return BindingBuilder.bind( queueMessageSysProfitRecordDay ).to( sysExchange ).with( RabbitMQConfig2.queueMessageSysProfitRecordDay );
    }

    
    /**
     * 死信綁定
     */ 
    
    @Bean
    public Binding  bindingMessageSysProfitRecordDayDL(Queue queueMessageSysProfitRecordDayDL ,DirectExchange sysExchange ) {
        return BindingBuilder.bind( queueMessageSysProfitRecordDayDL ).to( sysExchange ).with( RabbitMQConfig2.queueMessageSysProfitRecordDay_DL );
    }

}

 

 

附帶發送消息和延時消息的代碼:

@Component
public class MqMessageService{

	@Autowired
	private AmqpTemplate rabbitTemplate;
	
	/**
	 * 發送消息
	 * @param message
	 * @return
	 */
	public boolean send(  Message message ) {
		try {
			if( MessageType.QUEUE.equals( message.getMsgType() ) ) {
				rabbitTemplate.convertAndSend( message.getQueue() , JSONObject.toJSONString(message) );
				return true;
			}else if( MessageType.EXCHANGE.equals( message.getMsgType() ) ) {
				sendExchangeMessage( message );
				return true;
			}else {
				System.out.println( "暫時不處理" );
				return true;
			}
		}catch (Exception e) {
			System.out.println( e.getStackTrace() );
			return false;
		}
	}
	
	/**
	 *  發送Exchange的消息
	 * @param message
	 */
	private void sendExchangeMessage(  Message message  ) {
		if( message.getDelayTime() != null ) {
			String delayTime = message.getDelayTime().toString();
			MessagePostProcessor processor = new MessagePostProcessor(){
				@Override
				public org.springframework.amqp.core.Message postProcessMessage(org.springframework.amqp.core.Message message) throws AmqpException {
					message.getMessageProperties().setExpiration( delayTime );
			        return message;
				}
		    };
			rabbitTemplate.convertAndSend( message.getExchange() , message.getRoutingKey() , JSONObject.toJSONString(message) ,processor );
			return;
		}
		rabbitTemplate.convertAndSend( message.getExchange() , message.getRoutingKey() , JSONObject.toJSONString(message) );
	}

}

  

附帶消息對象:

public class Message {
	private Long id;
	
	private String exchange;
	
	private String routingKey;

	private String queue;
	
	private MessageType msgType;
	
	/**
	 * 只有 exchange 類型的有延時
	 */
	private Long  delayTime;

	public Message() {
		this.id = SnGeneratorUtil.getId();
	}
	
	public Long getId() {
		return id;
	}

	@Override
	public String toString() {
		return "Message [id=" + id + ", exchange=" + exchange + ", routingKey=" + routingKey + ", queue=" + queue
				+ ", msgType=" + msgType + ", delayTime=" + delayTime + "]";
	}

	public void setId(Long id) {
		this.id = id;
	}

	public String getExchange() {
		return exchange;
	}

	public void setExchange(String exchange) {
		this.exchange = exchange;
	}

	public String getRoutingKey() {
		return routingKey;
	}

	public void setRoutingKey(String routingKey) {
		this.routingKey = routingKey;
	}

	public String getQueue() {
		return queue;
	}

	public void setQueue(String queue) {
		this.queue = queue;
	}

	public MessageType getMsgType() {
		return msgType;
	}

	public void setMsgType(MessageType msgType) {
		this.msgType = msgType;
	}

	/**
	 * 只有 exchange 類型的有延時
	 */
	public Long getDelayTime() {
		return delayTime;
	}

	public void setDelayTime(Long delayTime) {
		this.delayTime = delayTime;
	}

}

  附帶消息類型枚舉:

public enum MessageType {

	
	QUEUE("隊列"),
	EXCHANGE("交換機");

    private String mark;

    MessageType(String mark ) {
        this.mark = mark;
    }

    public String getMark() {
        return mark;
    }
}

  附帶發送消息代碼: 備注,這是發了一個上面的message 的子類消息 ,需要自定義。並且這個消息是延時10秒的。

MessageSysProfitRecordDay message = new MessageSysProfitRecordDay();
		message.setMsgType( MessageType.EXCHANGE );
		message.setExchange( RabbitMQConfig.sys_exchange );
		message.setRoutingKey( RabbitMQConfig.queueMessageSysProfitRecordDay_DL );
		
		message.setSourceId(sourceId);
		message.setYear(year);
		message.setMonth(month);
		message.setDay(day);
		message.setType( type.name() );
		message.setAmountType( amountType.name() );
		message.setAmount( amount );
		message.setDelayTime( 10000L );
		
        mqMessageService.send( message );

  附帶接受消息的代碼:

  

	@RabbitListener(queues = RabbitMQConfig.queueMessageSysProfitRecordDay)
	@RabbitHandler
	public void orderAward(String messageStr) {
		MessageSysProfitRecordDay message = null;
		if( logger.isInfoEnabled() ) {
			logger.info("收到一天系統日結算:{}",messageStr );
		}
	}

  

 

備注:spring boot集成的 自動 ack(在沒拋出異常的情況下),別的方式需要手動ack。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM