下載
到ActiveMQ官網,找到下載點。
目前,
官網為http://activemq.apache.org/。
我們下載目前最新的版本吧,當前的Linux版本下載地址之一為:http://apache.fayea.com/activemq/5.11.1/apache-activemq-5.11.1-bin.tar.gz。
啟動
下載,並解壓
wget http://apache.fayea.com/activemq/5.11.1/apache-activemq-5.11.1-bin.tar.gz tar -xf ./apache-activemq-5.11.1-bin.tar.gz
啟動(當然,由於依賴於JAVA,如果你沒有安裝JAVA,它會提醒你的,哈哈)
[nicchagil@localhost bin]$ ./activemq start INFO: Loading '/home/nicchagil/app/apache-activemq-5.11.1/bin/env' INFO: Using java '/home/nicchagil/app/jdk1.7.0_71//bin/java' INFO: Starting - inspect logfiles specified in logging.properties and log4j.prop erties to get details INFO: pidfile created : '/home/nicchagil/app/apache-activemq-5.11.1/data/activem q.pid' (pid '4858')
測試啟動成功與否
ActiveMQ默認監聽61616端口,查此端口看看是否成功啟動,如果一切順利,會看到如下日志
[nicchagil@localhost bin]$ netstat -an | grep 61616 tcp 0 0 :::61616 :::* LIST EN
順便,登錄下管理員頁面,看看有木有問題:
URL : http://192.168.1.101:8161/admin/
默認的用戶名/密碼 : admin/admin
Java客戶端連接
接下來,用簡單的點對點測試生產消息、消費消息。
引入所需包
<dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.11.1</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> </dependencies>
生產消息
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.ObjectMessage; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.log4j.Logger; public class Producer { private static final Logger LOG = Logger.getLogger(Producer.class); public static void main(String[] args) { // 獲取連接工廠 ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://192.168.1.101:61616"); /* 獲取連接 */ Connection connection = null; try { connection = factory.createConnection(); connection.start(); } catch (JMSException e) { LOG.error("獲取連接出現異常", e); } /* 創建會話 */ Session session = null; try { session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); } catch (JMSException e) { LOG.error("創建會話出現異常", e); } /* 創建消息生產者 */ Destination destination = null; try { destination = session.createQueue("TestQueue"); } catch (JMSException e) { LOG.error("創建隊列出現異常", e); } /* 創建隊列 */ MessageProducer producer = null; try { producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); } catch (JMSException e) { LOG.error("創建消息生產者出現異常", e); } /* 發送消息 */ ObjectMessage message = null; try { message = session.createObjectMessage("hello world..."); producer.send(message); } catch (JMSException e) { LOG.error("發送消息出現異常", e); } try { session.commit(); } catch (JMSException e) { LOG.error("提交會話出現異常", e); } if (connection != null) { try { connection.close(); } catch (JMSException e) { LOG.error("關閉連接出現異常", e); } } LOG.info("sent..."); } }
消費消息
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.ObjectMessage; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.log4j.Logger; public class Consumer { private static final Logger LOG = Logger.getLogger(Consumer.class); // 是否繼續響應,可按需由其他邏輯修改值,true:繼續響應,false-停止響應 public static volatile boolean handleFlag = true; public static void main(String[] args) { // 獲取連接工廠 ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://192.168.1.101:61616"); /* 獲取連接 */ Connection connection = null; try { connection = factory.createConnection(); connection.start(); } catch (JMSException e) { LOG.error("獲取連接出現異常", e); } /* 創建會話 */ Session session = null; try { session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); } catch (JMSException e) { LOG.error("創建會話出現異常", e); } /* 創建消息生產者 */ Destination destination = null; try { destination = session.createQueue("TestQueue"); } catch (JMSException e) { LOG.error("創建隊列出現異常", e); } /* 創建消費者 */ MessageConsumer consumer = null; try { consumer = session.createConsumer(destination); } catch (JMSException e) { LOG.error("創建消費者出現異常", e); } /* 獲取消息對象 */ ObjectMessage objectMessage = null; while(handleFlag) { try { objectMessage = (ObjectMessage)consumer.receive(); handleMessage(objectMessage); } catch (JMSException e) { LOG.error("接收消息出現異常", e); } } if (connection != null) { try { connection.close(); } catch (JMSException e) { LOG.error("關閉連接出現異常", e); } } } /** * 處理消息對應的業務 * @param objectMessage 消息對象 */ public static void handleMessage(final ObjectMessage objectMessage) { if (objectMessage == null) { return; } /* 處理業務 */ Object object = null; try { object = objectMessage.getObject(); } catch (JMSException e) { LOG.error("獲取消息內容出現異常", e); } handleMessage(object); } /** * 處理消息對應的業務 * @param messageString 消息內容 */ public static void handleMessage(Object object) { if (object == null) { return; } String messageString = (String)object; LOG.info("Receive : " + messageString); // 這里僅作打印業務而已 } }
看到控制台打印出:Receive : hello world...,可知接收到消息了。
集群的安裝(Replicated LevelDB Store)
ActiveMQ的集群有3種類型,介紹在此Introduction to Master / Slave,我們下面使用的是Replicated LevelDB Store。
編輯配置文件/home/activemq/conf/activemq.xml,編輯以下塊:
<persistenceAdapter> <replicatedLevelDB directory="activemq-data" replicas="3" bind="tcp://0.0.0.0:0" zkAddress="xx.xx.xx.xx:xxxx,xx.xx.xx.xx:xxxx,xx.xx.xx.xx:xxxx(zookeeper集群)" zkPassword="zk password" zkPath="/activemq/leveldb-stores" hostname="當前機器IP" /> </persistenceAdapter>
其它節點也如此配置。
啟動,查看日志是否正常啟動。
荊棘
JDK版本的要求
過程中,遇到一個小問題,就是我一開始是用JDK1.6去跑的,報出常見的Unsupported major.minor version 51.0
針對這個問題,這個帖子有很好的參考意義:
http://www.cnblogs.com/chinafine/articles/1935748.html
找出jar中的一個class,執行以下命令,可查出minor version、major version:
javap -verbose yourClassName
或直接查看jar中的META-INF\MANIFEST.MF。
然后對照帖子中的JDK版本,換成JDK1.7就OK了。