ActiveMQ偽集群部署


本文借鑒http://www.cnblogs.com/guozhen/p/5984915.html,在此基礎上進行了完善,使之成為一個完整版的偽分布式部署說明,在此記錄一下!

一、本文目的

        介紹如何在同一台虛擬機上搭建高可用的Activemq服務,集群數量包含3個Activemq,當Activemq可用數>=2時,整個集群可用。

        本文Activemq的集群數量為3個,分別命名為mq1,mq2,mq3

二、概念介紹

1、偽集群

      集群搭建在同一台虛擬機上,3個Activemq分別使用不同的端口提供服務,啟用1個為Master,其它2個為Slaver,同一時間僅Master隊列提供服務

2、高可用

      3個Activemq服務,同一時間僅Master隊列提供服務,當Master隊列掛掉后,其它2個Slaver自動選舉出1個成為Master,整個隊列服務依然可用。當掛掉的隊列重新恢復后,自動加入集群。當集群僅剩下1個隊列時,整個隊列不可用。

3、Activemq集群數據存儲方式

      a) kahaDB:文件共享,默認方式

      b) JDBC:數據庫共享

      c) LevelDB:數據共享,本文使用方式

三、Activemq偽集群的搭建

下載地址:http://download.csdn.net/detail/u010821757/9771407zookeeper-3.4.9

      http://download.csdn.net/detail/u010821757/9771415(ActiveMQ5.14.0)

1、Activemq的端口介紹

      Activemq默認主要使用2個端口,8161(控制台使用)、61616(提供服務的端口),如果需要搭建集群,還需要開放集群間通訊的端口(主要用於選舉Master)

2、Activemq集群端口的分配

  控制台 服務接口 集群通訊接口
mq1 8161 51511 61601
mq2 8162 51512 61602
mq3 8163 51513 61603

  服務接口沒有使用默認的61611,是因為activemq默認還會使用61613,61614等端口

3、修改activemq配置

  a)  安裝activemq,本文使用Activemq版本為5.14.0;(前提條件,電腦已安裝java JDK,不然啟動時會提示)

    解壓文件到任意目錄,雙擊bin\win64中的activemq.bat(根據電腦操作系統位數),在瀏覽器里輸入“http://127.0.0.1:8161/admin/”,默認賬號密碼都是“admin”.如圖:

       

     

    至此,activemq單機模式安裝成功。

  b) 修改配置文件activemq.xml,路徑為conf/activemq.xml

  1、broker(所有activemq的brokerName必須一致,才能加入同一個集群)

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">

  2、配置levelDB,在<persistenceAdapter>節內,將原來的kahaDB注釋掉,添加<replicatedLevelDB>節點以及內容,其中的61601可修改

    bind:集群間通訊的ip和端口

           zkAddress:ZooKeeper地址,多個可用,逗號分隔(下面有安裝教程)

           hostname:主機名,直接設置本機IP地址即可

           zkPath:zkPath目錄(自定義),可在ZooInspetor中進行查看

    <persistenceAdapter>
           <!-- <kahaDB directory="${activemq.data}/kahadb"/> -->
            <replicatedLevelDB
              directory="${activemq.data}/leveldb"
              replicas="3"
              bind="tcp://127.0.0.1:61601"
              zkAddress="127.0.0.1:2181"
              hostname="10.0.10.238"
              sync="local_disk"
              zkPath="/activemq/leveldb-stores"
            />
        </persistenceAdapter>

  3、配置服務接口,在<transportConnectors>節點內,僅修改紅色方框標注的地方(51511)

       <transportConnectors>
            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
            <transportConnector name="openwire" uri="tcp://0.0.0.0:51511?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        </transportConnectors>

  4、配置控制台端口,conf\jetty.xml文件中,在id="jettyPort"的<bean>節點內,僅修改紅色方框標注的地方(8161)

    <bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
             <!-- the default port number for the web console -->
        <property name="host" value="0.0.0.0"/>
        <property name="port" value="8161"/>
    </bean>

  5、在E盤中新建文件夾activemq,將整個activemq的安裝目錄復制成三個,並命名mq1,mq2,mq3;並修改mq2和mq3中的配置文件(按如上方法),更改其服務接口為“51512”,“51513”和控制台端口“8162”,“8163”以及集群通信接

    口“61602”,“61603”。

四、Zookeeper單機模式安裝

  1.下載zookeeper3.4.9,地址: ,並解壓至剛新建的activemq文件夾中,前提是確保以及安裝java JDK

  2.雙擊bin目錄下的zkServer.cmd啟動         

  3.打開一個CMD窗口,輸入命令“netstat -ano|findstr "2181" 查看zookeeper是否啟動成功,如圖端口2181已經打開,說明啟動成功

  

  4.點擊打開的CMD窗口,Ctrl+C關閉,關閉后修改配置文件,在解壓出的zookeeper-3.4.9文件夾中新建data文件夾,並在data文件夾中新建log和data文件夾,后面會用到。

  5.Zookeeper 的配置文件在 conf 目錄下,這個目錄下有 zoo.cfg 和 log4j.properties, zoo.cfg的配置文件項如下:

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=E:/activemq/zookeeper-3.4.9/data/data
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
dataLogDir=E:/activemq/zookeeper-3.4.9/data/log
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

  各項含義:

tickTime:時間單位,這個時間是作為 Zookeeper 服務器之間或客戶端與服務器之間維持心跳的時間間隔,也就是每個 tickTime 時間就會發送一個心跳。
initLimit:這個配置項是用來配置 Zookeeper 接受客戶端(這里所說的客戶端不是用戶連接 Zookeeper 服務器的客戶端,而是 Zookeeper 服務器集群中連接到 Leader 的 Follower 服務器)
       初始化連接時最長能忍受多少個心跳時間間隔數。當已經超過 10 個心跳的時間(也就是 tickTime)長度后 Zookeeper 服務器還沒有收到客戶端的返回信息,那么表明這個客戶端
           連接失敗。總的時間長度就是 5*2000=10 秒,同步數據的時間。
syncLimit:這個配置項標識 Leader 與 Follower 之間發送消息,請求和應答時間長度,最長不能超過多少個 tickTime 的時間長度,總的時間長度就是 2*2000=4 秒
dataDir:顧名思義就是 Zookeeper 保存數據的目錄。
dataLogDir:默認情況下,Zookeeper 將寫數據的日志文件也保存在這個目錄里
clientPort:這個端口就是客戶端連接 Zookeeper 服務器的端口,Zookeeper 會監聽這個端口,接受客戶端的訪問請求。

  6.配置完成后,雙機bin目錄下的zkServer.cmd啟動

  7.打開一個CMD窗口,輸入命令“netstat -ano|findstr "2181" 查看zookeeper是否啟動成功

五、測試ActiveMQ偽分布式是否安裝成功

  1. 先啟動Zookeeper

  2. 分別啟動三個activemq,

  mq1:

  mq2和mq3的cmd窗口會一直更新數據,嘗試連接

  瀏覽器中輸入http://127.0.0.1:8161/admin/(此時master為mq1)

  此時可以看到作為master的是端口為8161的activemq即mq1;當關閉mq1的命令窗口,此時mq2變為master,mq3變為slave

  再次查看http://127.0.0.1:8161/admin/ ,可以看到服務已經關閉了;而此時,zookeeper已經將master切換到了mq2上了,可以查看mq2的地址http://127.0.0.1:8162/admin/

  至此,偽分布式activemq的部署已經完成了。

六、Java測試代碼(Spring整合ActiveMQ)

  1. springJMSConfig.xml,在web.xml文件中調用

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.9.0.xsd">

    <!-- mq連接工廠 -->
    <bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="failover:(tcp://127.0.0.1:51511,tcp://127.0.0.1:51512,tcp://127.0.0.1:51513)" />
        <!-- 異步發送消息 -->
        <property name="useAsyncSend" value="true" />
        <!-- 隊列消費者配置預取值 -->
        <property name="prefetchPolicy">
            <bean class="org.apache.activemq.ActiveMQPrefetchPolicy">
                <property name="queuePrefetch" value="2" />
            </bean>
        </property>
        <property name="userName" value="admin" />
        <property name="password" value="admin" />
    </bean>

    <!-- 連接池 -->
    <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
        destroy-method="stop">
        <property name="connectionFactory" ref="amqConnectionFactory" />
    </bean>

    <!-- 定義連接工廠 -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <constructor-arg ref="amqConnectionFactory" />
    </bean>

    <!-- 主題目的地-->
    <bean id="defaultDestination" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg index="0" value="testXBQ_Topic66" />
    </bean>
    
    <!-- 配置JMS模板,Spring提供的JMS工具類,發送、接收消息-->    
    <bean id="jmsTemplateNotice" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="pooledConnectionFactory" />
        <property name="defaultDestination" ref="defaultDestination" />
        <!-- true是訂閱 -->
        <property name="pubSubDomain" value="true" />
        <!-- 加入事物回滾 -->
        <property name="sessionTransacted" value="true" />
    </bean>
</beans>

  java中調用發送,打開ActiveMQ控制台,查看消息數量即可。

package com.hyzn.fw.mqsend;

import javax.annotation.Resource;
import javax.jms.Destination;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

/** 
 * @ClassName: TopicSender
 * @Description: TODO 發布主題MQ
 * @author xbq
 * @version 1.0
 * @date 2017-2-13 下午4:16:22
 */
@Component
public class TopicSender {

    @Resource
    private JmsTemplate jmsTemplateNotice;
    
    public void sendMqMessage(Destination destination, String message){
        if(null == destination){
            // 為空采用
            destination = this.jmsTemplateNotice.getDefaultDestination(); 
        }
        this.jmsTemplateNotice.setDefaultDestination(destination);
        //發送mq消息
        this.jmsTemplateNotice.convertAndSend(message);
        
        System.out.println("消息推送完畢!!");
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM