分布式消息隊列,兩種常規方案


官網

官方文檔

文檔

文檔

Kafka

Kafka是由Linkedin開發的一個分布式的消息隊列系統(Message Queue)

kafka開發的主要初衷目標是構建一個用來處理海量日志,用戶行為和網站運營統計等的數據處理框架。在結合了數據挖掘,行為分析,運營監控等需求的情況下,需要能夠滿足各種實時在線和批量離線處理應用場合對低延遲和批量吞吐性能的要求。從需求的根本上來說,高吞吐率是第一要求,其次是實時性和持久性。

kafka的工作方式和其他MQ基本相同,只是在一些名詞命名上有些不同。為了更好的討論,這里對這些名詞做簡單解釋。通過這些解釋應該可以大致了解kafka MQ的工作方式。

  • Producer (P):就是網kafka發消息的客戶端
  • Consumer (C):從kafka取消息的客戶端
  • Topic (T):可以理解為一個隊列
  • Consumer Group (CG):這是kafka用來實現一個topic消息的廣播(發給所有的consumer)和單播(發給任意一個consumer)的手段。一個 topic可以有多個CG。topic的消息會復制(不是真的復制,是概念上的)到所有的CG,但每個CG只會把消息發給該CG中的一個 consumer。如果需要實現廣播,只要每個consumer有一個獨立的CG就可以了。要實現單播只要所有的consumer在同一個CG。用CG還 可以將consumer進行自由的分組而不需要多次發送消息到不同的topic。
  • Broker (B):一台kafka服務器就是一個broker。一個集群由多個broker組成。一個broker可以容納多個topic。
  • Partition(P):為了實現擴展性,一個非常大的topic可以分布到多個broker(即服務器)上。kafka只保證按一個partition中的順序將消息發給consumer,不保證一個topic的整體(多個partition間)的順序。

kafka的集群有多個Broker服務器組成,每個類型的消息被定義為topic,同一topic內部的消息按照一定的key和算法被分區(partition)存儲在不同的Broker上,消息生產者producer和消費者consumer可以在多個Broker上生產/消費topic

使用教程

引入maven依賴

<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.0</version> </dependency>

生產者代碼

package com.xxl.test; import java.text.MessageFormat; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; /** * kafka * */ public class KafkaProducer { private static KafkaProducer instance = new KafkaProducer(); public static Producer<String, String> getInstance(){ return instance.producer; } private Producer<String, String> producer; private KafkaProducer() { Properties props = new Properties(); //props.put("metadata.broker.list", "192.168.56.101:9092,192.168.56.101:9093,192.168.56.101:9094"); // 此處配置的是kafka的端口 props.put("metadata.broker.list", "192.168.56.101:9092"); props.put("key.serializer.class", "kafka.serializer.StringEncoder");// 配置key的序列化類 props.put("serializer.class", "kafka.serializer.StringEncoder"); // 配置value的序列化類 props.put("request.required.acks","-1"); // 等待確認:0默認-不等待;1-等待leader確認;-1-等待所有存活確認; producer = new Producer<String, String>(new ProducerConfig(props)); } public static void main(String[] args) { String TOPIC = "test"; TOPIC = "my-replicated-topic"; // push 2 topic for (int i = 1; i < 2; i++) { String key = "key".concat(String.valueOf(i)); String value = "key".concat(String.valueOf(i)); KafkaProducer.getInstance().send(new KeyedMessage<String, String>(TOPIC, key, value)); System.out.println(MessageFormat.format("producer [{0} = {1}]", key, value)); } } }

消費者代碼

package com.xxl.test; import java.text.MessageFormat; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; import kafka.serializer.StringDecoder; import kafka.utils.VerifiableProperties; /** * kafka的topic數據流向:producer.topic --> (1:n as topic)group.id --> (1:1 as queue)consumer * @author xuxueli */ public class KafkaConsumer { private static KafkaConsumer instance = new KafkaConsumer(); public static ConsumerConnector getInstance(){ return instance.consumer; } private ConsumerConnector consumer; private KafkaConsumer() { Properties props = new Properties(); props.put("zookeeper.connect", "192.168.56.101:2181,192.168.56.101:2182,192.168.56.101:2183"); // zookeeper 配置 props.put("group.id", "default-group"); // group 代表一個消費組(可根據之,實現queue隊列或者topic廣播) props.put("zookeeper.session.timeout.ms", "4000"); // zk連接超時 props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "smallest"); props.put("serializer.class", "kafka.serializer.StringEncoder"); // 配置value的序列化類 consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(props)); } public static void main(String[] args) { String TOPIC = "test"; TOPIC = "my-replicated-topic"; // pull from topic Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(TOPIC, new Integer(1)); StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties()); StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties()); Map<String, List<KafkaStream<String, String>>> consumerMap = KafkaConsumer.getInstance().createMessageStreams(topicCountMap, keyDecoder, valueDecoder); KafkaStream<String, String> stream = consumerMap.get(TOPIC).get(0); ConsumerIterator<String, String> it = stream.iterator(); while (it.hasNext()){ MessageAndMetadata<String, String> item = it.next(); String key = item.key(); String value = item.message(); System.out.println(MessageFormat.format("consumer [{0} = {1}]", key, value)); } } }

Kafka 安裝,CentOS環境

安裝文件准備:

cd /data/temp
wget https://archive.apache.org/dist/kafka/0.8.0/kafka_2.8.0-0.8.0.tar.gz tar zxvf kafka_2.8.0-0.8.0.tar.gz (0.8.0版本對應jdk1.6) mv kafka_2.8.0-0.8.0 /data/appdata/kafka_servers cd /data/appdata/kafka_servers cp -rf kafka_2.8.0-0.8.0 kafka01 // 修改啟動jvm內存分配:修改kafka-server-start.sh即可
CentOS 單機部署

單機啟動

cd /data/appdata/kafka_servers/kafka01
// 啟動內嵌zookeeper服務和kafka服務: sh bin/zookeeper-server-start.sh config/zookeeper.properties sh bin/kafka-server-start.sh config/server.properties

測試

// 創建一個topic # bin/kafka-create-topic.sh --zookeeper localhost:2181 --replica 1 --partition 1 --topic test // 查看topic列表 # bin/kafka-list-topic.sh --zookeeper localhost:2181 // 生產topic消息 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test // 消費topic消息 # bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
CentOS 集群部署
  • 1、集群部署zookeeper,並啟動

    推薦獨立安裝:如:“192.168.56.101:2181,192.168.56.101:2182,192.168.56.101:2183”

  • 2、集群部署kafka,並啟動

    • 分別配置“config/server.properties”
    broker.id=1 (唯一,填數字) host.name=192.168.56.101 (唯一,填服務器IP) port=9093 (端口) log.dir=/tmp/kafka-logs-1 (日志地址) zookeeper.connect=192.168.56.101:2181,192.168.56.101:2182,192.168.56.101:2183 (zookeeper集群地址)
    • 分別啟動kafka
    bin/kafka-server-start.sh config/server.properties
    • 測試
    // 創建topic # bin/kafka-create-topic.sh --zookeeper 192.168.56.101:2181,192.168.56.101:2182,192.168.56.101:2183 --replica 3 --partition 1 --topic my-replicated-topic // 查看broker節點狀態 # bin/kafka-list-topic.sh --zookeeper 192.168.56.101:2181,192.168.56.101:2182,192.168.56.101:2183 // 生成topic消息 # bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic // 消費topic消息 # bin/kafka-console-consumer.sh --zookeeper 192.168.56.101:2181,192.168.56.101:2182,192.168.56.101:2183 --from-beginning --topic my-replicated-topic // 測試容錯,干掉master節點 # pkill -9 -f server-1.properties # bin/kafka-list-topic.sh --zookeeper 192.168.56.101:2181,192.168.56.101:2182,192.168.56.101:2183



官網

ActiveMQ

ActiveMQ 是Apache出品,最流行的、功能強大的即時通訊和集成模式的開源服務器。ActiveMQ是一個完全支持JMS1.1和J2EE 1.4規范的 JMS Provider實現。提供客戶端支持跨語言和協議,帶有易於在充分支持JMS 1.1和1.4使用J2EE企業集成模式和許多先進的功能。

特性

1、 多種語言和協議編寫客戶端。語言: Java、C、C++、C#、Ruby、Perl、Python、PHP。應用協議:OpenWire、Stomp REST、WS Notification、XMPP、AMQP 2、完全支持JMS1.1和J2EE 1.4規范 (持久化,XA消息,事務) 3、對Spring的支持,ActiveMQ可以很容易內嵌到使用Spring的系統里面去,而且也支持Spring2.0的特性 4、通過了常見J2EE服務器(如 Geronimo、JBoss 4、GlassFish、WebLogic)的測試,其中通過JCA 1.5 resource adaptors的配置,可以讓ActiveMQ可以自動的部署到任何兼容J2EE 1.4 商業服務器上 5、支持多種傳送協議:in-VM、TCP、SSL、NIO、UDP、JGroups、JXTA 6、支持通過JDBC和journal提供高速的消息持久化 7、從設計上保證了高性能的集群,客戶端-服務器,點對點 8、支持Ajax 9、支持與Axis的整合 10、可以很容易得調用內嵌JMS provider,進行測試

Jms規范里的兩種message傳輸方式Topic和Queue,兩者的對比如下:

  • Topic
    • Publish Subscribe messaging 發布訂閱消息
    • topic數據默認不落地,是無狀態的。
    • 並不保證publisher發布的每條數據,Subscriber都能接受到。
    • 一般來說publisher發布消息到某一個topic時,只有正在監聽該topic地址的sub能夠接收到消息;如果沒有sub在監聽,該topic就丟失了。
    • 一對多的消息發布接收策略,監聽同一個topic地址的多個sub都能收到publisher發送的消息。Sub接收完通知mq服務器
  • Quene
    • Point-to-Point 點對點
    • Queue數據默認會在mq服務器上以文件形式保存,比如Active MQ一般保存在$AMQ_HOME\data\kr-store\data下面。也可以配置成DB存儲。
    • Queue保證每條數據都能被receiver接收。
    • Sender發送消息到目標Queue,receiver可以異步接收這個Queue上的消息。Queue上的消息如果暫時沒有receiver來取,也不會丟失。
    • 一對一的消息發布接收策略,一個sender發送的消息,只能有一個receiver接收。receiver接收完后,通知mq服務器已接收,mq服務器對queue里的消息采取刪除或其他操作。

Spring整合ActiveMQ

  • 1、maven依賴
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>${spring.version}</version> </dependency> …… <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.10.2</version> </dependency> …… <dependency> <groupId>commons-pool</groupId> <artifactId>commons-pool</artifactId> <version>1.6</version> </dependency>
  • 2、開發ActIMEMQ監聽:TransportListener.java
package com.xxl.core.listener;

import java.io.IOException; import org.apache.activemq.transport.TransportListener; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; public class JmsTransportListener implements TransportListener{ public static int isConnected=1; private transient static Log logger = LogFactory.getLog(JmsTransportListener.class); @Override public void onCommand(Object arg0) { // TODO Auto-generated method stub } @Override public void onException(IOException arg0) { // TODO Auto-generated method stub } @Override public void transportInterupted() { // TODO Auto-generated method stub isConnected=0; logger.info("與jms服務器鏈接斷開"); } @Override public void transportResumed() { // TODO Auto-generated method stub isConnected=1; logger.info("與jms服務器鏈接恢復"); } }
  • 3、生產者Spring配置:applicationcontext-jms-send.xml (Topic廣播、Quene隊列,兩種方式)
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns="http://www.springframework.org/schema/beans" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.0.xsd"> <!-- 復用ConnectionFactory --> <!-- simpleTopic:PUB --> <bean id="simpleTopicPubJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactoryReceive" /> <property name="defaultDestination"> <bean class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="simpleTopic" /> </bean> </property> </bean> <!-- simpleQuenu:Producer --> <bean id="simpleQuenuProducerJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactoryReceive" /> <property name="defaultDestination"> <bean class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="simpleQuenu" index="0" /> </bean> </property> </bean> </beans>
  • 4、消費者Spring配置:applicationcontext-jms-receive.xml(Topic廣播、Quene隊列,兩種方式)
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns="http://www.springframework.org/schema/beans" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.0.xsd"> <!-- 配置ConnectionFactory --> <bean id="connectionFactoryReceive" class="org.apache.activemq.pool.PooledConnectionFactory"> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <!-- 單機配置 --> <!-- <property name="brokerURL" value="failover:(tcp://192.168.1.15:61616)" /> --> <!-- 集群配置 --> <property name="brokerURL" value="failover:(tcp://127.0.0.1:61611,tcp://127.0.0.1:61612,tcp://127.0.0.1:61613)?initialReconnectDelay=1000" /> <property name="transportListener"> <bean class="com.xxl.core.listener.JmsTransportListener"/> </property> </bean> </property> </bean> <!-- simpleTopic:SUB --> <bean id="simpleTopicSub" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactoryReceive" /> <property name="destination"> <bean class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="simpleTopic" /> </bean> </property> <property name="messageListener"> <bean class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> <constructor-arg ref="jmsReceiveService" /> <property name="defaultListenerMethod" value="simpleTopicSub" /> </bean> </property> <property name="idleTaskExecutionLimit" value="2" /> <property name="maxConcurrentConsumers" value="1" /> <!-- 訂閱者,實例數量,默認為1 (每個實例單獨消費消息,因此單次執行的訂閱消息時必須為1) --> </bean> <!-- simpleQuenu:Consumer --> <bean id="simpleQuenuConsumer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactoryReceive" /> <property name="destination" > <bean class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="simpleQuenu" /> </bean> </property> <property name="messageListener"> <bean class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> <constructor-arg ref="jmsReceiveService" /> <property name="defaultListenerMethod" value="simpleQuenuConsumer" /> </bean> </property> <property name="idleTaskExecutionLimit" value="2" /> <property name="maxConcurrentConsumers" value="1" /> <!-- 消費者,實例數量,默認為1 (多實例即開啟多線程) --> </bean> </beans>
  • 5、生產者Service開發:JmsSendServiceImpl.java
// 接口 package com.xxl.service; /** * JMS.SEND * @author xuxueli */ public interface IJmsSendService { /** * simpleTopic發布 * @param message */ public void simpleTopicPub(String message); /** * simpleQuenu生產者 * @param msg */ public void simpleQuenuProduct(String message); } // 實現類 package com.xxl.service.impl; import javax.annotation.Resource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Service; import com.xxl.service.IJmsSendService; /** * JMS.SEND * @author xuxueli */ @Service public class JmsSendServiceImpl implements IJmsSendService { private transient static Logger logger = LoggerFactory.getLogger(JmsSendServiceImpl.class); @Resource private JmsTemplate simpleTopicPubJmsTemplate; @Resource private JmsTemplate simpleQuenuProducerJmsTemplate; /* * simpleTopic發布 * @see com.xxl.service.IJmsSendService#sendString(java.lang.String) */ @Override public void simpleTopicPub(String message) { logger.info("jms simpleTopicPub:{}", message); simpleTopicPubJmsTemplate.convertAndSend(message); } /* * simpleQuenu生產者 * @see com.xxl.service.IJmsSendService#simpleQuenuProduct(java.lang.String) */ @Override public void simpleQuenuProduct(String message) { logger.info("jms simpleQuenuProduct:{}", message); simpleQuenuProducerJmsTemplate.convertAndSend(message); } }
  • 6、消費者Service開發:JmsReceiveServiceImpl.java
// 接口 package com.xxl.service; /** * JMS.RECEIVE * @author xuxueli */ public interface IJmsReceiveService { /** * simpleTopic訂閱 * @param message */ public void simpleTopicSub(String message); /** * simpleQuenu消費者 * @param message */ public void simpleQuenuConsumer(String message); } // 實現 package com.xxl.service.impl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import com.xxl.service.IJmsReceiveService; /** * JMS.RECEIVE * @author xuxueli */ @Service("jmsReceiveService") public class JmsReceiveServiceImpl implements IJmsReceiveService { private transient static Logger logger = LoggerFactory.getLogger(JmsReceiveServiceImpl.class); /* * simpleTopic訂閱 * @see com.xxl.service.IJmsReceiveService#simpleReceive(java.lang.String) */ @Override public void simpleTopicSub(String message) { logger.info("jms simpleReceive:{}", message); } /* * simpleQuenu消費者 * @see com.xxl.service.IJmsReceiveService#simpleQuenuConsumer(java.lang.String) */ @Override public void simpleQuenuConsumer(String message) { logger.info("jms simpleQuenuConsumer:{}", message); } }

ActiveMQ 服務部署,CentOS環境

真誠總結一句:官方文檔和官方教程是最便捷的學習途徑。

一開始,CentOS上安裝ActiviMq總是無法啟動,郁悶了整整一天,直到晚上,在官網看到“Using ActiveMQ > Getting Started ”才明白是因為下載的最新版本要求高版本JDK7導致;

單機部署
  • 1、下載,解壓,移動至運行目錄:
wget http://mirrors.cnnic.cn/apache/activemq/5.10.2/apache-activemq-5.10.2-bin.tar.gz (版本5.11+需要jdk7+) tar zxvf apache-activemq-5.10.2-bin.tar.gz mv apache-activemq-5.10.2 /usr/local/activemq cd /usr/local/activemq
  • 2、修改默認分配內存:(默認1G,有時候太大,內存不足報錯)
/usr/local/activemq/bin/activemq console (控制台啟動,報錯) Error occurred during initialization of VM Could not reserve enough space for object heap 原因:內存不足 查看activemq文件發現: # Set jvm memory configuration if [ -z "$ACTIVEMQ_OPTS_MEMORY" ] ; then ACTIVEMQ_OPTS_MEMORY="-Xms1G -Xmx1G" fi 我的虛擬機最大內存是512M,加上虛擬內存也不夠; 解決:更改分配內存大小512M cp /usr/local/activemq/bin/activemq /usr/local/activemq/bin/activemq.bak vi /usr/local/activemq/bin/activemq 找到:ACTIVEMQ_OPTS_MEMORY="-Xms1G -Xmx1G" 改為:ACTIVEMQ_OPTS_MEMORY="-Xms256m -Xmx512m"
  • 3、控制台、守護進程,啟動/停止
// 啟動 /usr/local/activemq/bin/activemq console (控制台啟動) // 守護進程啟動 mkdir /home/root/activemq_log nohup /usr/local/activemq/bin/activemq start > /home/root/activemq_log/smlog // 檢查啟動 netstat -ln | grep 61616 ActiveMQ默認采用61616端口提供JMS服務,使用8161端口提供管理控制台服務,執行以下命令以便檢驗是否已經成功啟動ActiveMQ服務。 // 停止 ps -ef | grep activemq kill -9 5259 // 首先需要找到activemq進程的PID,然后,殺死activemq的進程(其中 -9表示強制終止)
// 控制台的登錄用戶名密碼保存在:conf/jetty-realm.properties
# username: password [,rolename ...] admin: admin, admin user: user, user
  • 5、腳本啟動
chmod 775 /usr/local/activemq/bin/activemq mkdir /xuxueli/activemq_log/ // 新建啟動腳本 vi /xuxueli/activemq_startup.sh ##################### nohup /usr/local/activemq/bin/activemq start > /xuxueli/activemq_log/smlog 2>&1 ##################### // 腳本啟動權限 chmod 755 /xuxueli/activemq_startup.sh // 開機啟動 vi /etc/rc.d/rc.local 末尾添加腳本的路徑: /xuxueli/activemq_startup.sh // 啟動: sh /xuxueli/activemq_startup.sh // 停止: ps -ef|grep activemq kill -9 29624
集群部署

本文采用levelDB來進行持久化,並使用zookeeper實現集群的高可用。本集群僅提供主備功能,避免單點故障,沒有負載均衡功能。

activemq + levelDB + zookeeper工作原理:

使用Apach Zookeeper去協調集群中的那個節點成為master.被選擇為master的節點開始工作並接收客戶端的連接。其他的節點進入slave模式並連接到master同步他們的持久狀態。slave節點不接受客戶端的連接。所有持久操作被復制到連接的slave節點上。如果master節點死了,帶着最新更新數據的slave節點晉升為master節點。失敗的節點然后能夠回到在線並且他進入slave模式。

所有需要同步到硬盤的消息的操作在他完成前將等待所有法定人數的節點復制完成。因此,如果你配置 replicas=”3″,那么法定人數的值是(3/2+1=2)。

master節點在他報告成功之前將在本地存儲完最新的數據並等待1個其他slave節點存儲完最新的數據。

當一個新的master節點要被選擇的時候,你也需要至少法定人數的節點在線為能夠找到一個帶着最新的更新數據的節點,那個帶着最新的更新數據的節點將變成新的master。因此,建議你至少運行3個重復節點以至你能down掉一個節點不影響服務的輸出。

  • 第1步:zookeeper集群部署;

    zookeeper監控:taokeeper-monitor、node-zk-browser

  • 第2步:activemq集群配置 集群部署(levelDB會activemq自帶有,所以不需要下載)

    • 配置:activemq.xml

      • brokerName 配置: 將broker標簽的brokerName屬性設置為統一的值,zookeeper才能識別它們屬於同一個集群;
      • persistenceAdapter的配置:主要有三種方式:kahaDB(默認方式)、數據庫持久化、levelDB(v5.9.0提供支持)
      // 首先注釋掉原來kahaDB的持久化方式,然后配置levelDB+zookeeper的持久化方式
      // 注意上述配置中的hostname屬性值,不同的activemq實例對應不同的hostname值,其他兩個實例配置的hostname值分別為:192.168.2.145, 192.168.2.146;
      // zkAddress 為zookeeper集群地址;
      
      <!-- <persistenceAdapter> <kahaDB directory="${activemq.data}/kahadb"/> </persistenceAdapter> --> <persistenceAdapter> <replicatedLevelDB directory="${activemq.data}/leveldb" replicas="3" bind="tcp://0.0.0.0:0" zkAddress="192.168.2.161:2181,192.168.2.145:2181,192.168.2.146:2181" zkPassword="password" hostname="192.168.2.161" sync="local_disk" zkPath="/activemq/leveldb-stores" /> </persistenceAdapter>
    • 配置:jetty.xml:jettyPort下port:配置http監控中心端口;

    • 處理一處bug:刪除lib/pax-url-aether-1.5.2.jar;注釋掉配置文件中的日志配置activemq.xml中logQuery節點;這個BUG地址是https://issues.apache.org/jira/browse/AMQ-5225,希望可以在下個版本順利解決

  • 第三步:客戶端TCP連接,改為配置集群地址

// 單節點(非集群方式): <property name="brokerURL" value="failover:(tcp://192.168.1.15:61616)" /> // 客戶端連接使用failover方案(集群方式):節點宕掉超過N/2N+1就會整理宕掉;集群恢復后,項目會自動重連,不必重啟項目; <property name="brokerURL" value="failover:(tcp://127.0.0.1:61611,tcp://127.0.0.1:61612,tcp://127.0.0.1:61613)?initialReconnectDelay=1000" />
  • 注意:
    • 該模式下還是單節點負載;
    • 只是由於引入了zookeeper的監測機制。保證多個activemq服務在同一時間內只有一個服務對外開放。
    • 這種配置方案能夠實現(n-1)/2的容錯率,也就是三台服務器允許掛一台,五個能當掉2個,依次類推。
    • 節點宕掉超過N/2N+1就會整理宕掉;集群恢復后,項目會自動重連,不必重啟項目;

ActiveMQ 服務部署,Windows環境

  • 1、下載windows版本ActiveMQ發行包;
  • 2、解壓安裝;
  • 3、啟動:雙擊 “activemq.bat”
  • 4、端口和管理
    • 默認采用“61616” 提供JMS服務;
    • 默認采用8161端口提供管理控制台服務,地址:http://127.0.0.1:8161/admin/ -5、只是修改控制台密碼:修改文件“conf/jetty-realm.properties”
    # username: password [,rolename ...] admin: admin, admin user: user, user

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM