HornetQ是一個支持集群和多種協議,可嵌入、高性能的異步消息系統。HornetQ完全支持JMS,HornetQ不但支持JMS1.1 API同時也定義屬於自己的消息API,這可以最大限度的提升HornetQ的性能和靈活性。還支持RESTful API、STOMP(Stomp的客戶端可以用多種編程語言來實現
)、AMQP(HornetQ will shortly be implementing AMQP )。
- HornetQ擁有超高的性能,HornetQ在持久化消息方面的性能可以輕易的超於其它常見的非持久化消息引擎的性能。當然,HornetQ的非持久化消息的性能會表現的更好!
- HornetQ完全使用POJO,純POJO的設計讓HornetQ可以盡可能少的以來第三方的包。從設計模式來說,HornetQ這樣的設計入侵性也最小。HornetQ既可以獨立運行,也可以與其它Java應用程序服務器集成使用。
- HornetQ擁有完善的錯誤處理機制,HornetQ提供服務器復制和故障自動轉移功能,該功能可以消除消息丟失或多個重復信息導致服務器出錯。
- HornetQ提供了靈活的集群功能,通過創建HornetQ集群,您可以享受到到消息的負載均衡帶來的性能提升。您也可以通過集群,組成一個全球性的消息網絡。您也可以靈活的配置消息路由。
- HornetQ擁有強大的管理功能。HornetQ提供了大量的管理API和監控服務器。它可以無縫的與應用程序服務器整合,並共同工作在一個HA環境中。
用途:松散地聯系各系統,不用受其它服務器的制約,有效的減少線程Block的時間. 不同於RPC , 采用的Request/Reponse 的方式.
hornetq支持內容Body
Stream -- StreamMessage 包含順序讀取值的流
Text -- TextMessage)
Map -- MapMessage (key/value))
Object -- ObjectMessage Support Serializable序列化的對象.
Bytes -- BytesMessage 字節信息(如存放圖像)
Stream -- StreamMessage 包含順序讀取值的流
Text -- TextMessage)
Map -- MapMessage (key/value))
Object -- ObjectMessage Support Serializable序列化的對象.
Bytes -- BytesMessage 字節信息(如存放圖像)
下載:wget http://downloads.jboss.org/hornetq/hornetq-2.2.14.Final.zip
yum install libaio
1.單機配置:
1.1編寫啟動腳本:start.sh
IP=`/sbin/ip a |grep 'inet '|awk -F'/' '{print $1}'|awk '{print $2}'|grep -v 127.0.0.1|head -1` export CLUSTER_PROPS="-Dhornetq.remoting.netty.host=$IP -Djnp.host=$IP" echo $CLUSTER_PROPS sh run.sh &
1.2或者修改配置文件
以下兩個文件把localhost替換為本機IP
config/stand-alone/non-clustered/hornetq-configuration.xml
config/stand-alone/non-clustered/hornetq-beans.xml
bindAddress">${jnp.host:192.168.100.241}
rmiBindAddress">${jnp.host:192.168.100.241}
${hornetq.remoting.netty.host:192.168.100.241 }
....
1.3客戶端需要的包
hornetq-core-client.jar
netty.jar
hornetq-jms-client.jar
jboss-jms-api.jar
jnp-client.jar
1.4配置一個隊列,添加配置onfig/stand-alone/non-clustered/hornetq-jms.xml
<queue name="OrderQueue">
<entry name="queues/OrderQueue"/>
</queue>
配置一個主題
<topic name="topic1">
<entry name="/my/Topic1"/>
</topic>
hornetq-configuration.xml
在<configuration>節點下增加
<security-enabled>false</security-enabled>
1.5收發消息demo
public void sendToQueue(String destinationName,Serializable payload) throws Exception { InitialContext ic = new InitialContext(); ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory"); Queue queue = (Queue)ic.lookup(destinationName); Connection connection = cf.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer publisher = session.createProducer(queue); connection.start(); ObjectMessage message = session.createObjectMessage(payload); message.setObject(payload); publisher.send(message); if (connection != null) { connection.close(); } } @TransactionAttribute(value = TransactionAttributeType.REQUIRED) public void onMessage(Message message) { ObjectMessage obj = (ObjectMessage) message; try { Serializable ser = obj.getObject(); log.info("[NotificationInbound] onMessage!"); } catch (Exception e) { log.error("[NotificationInbound] ERROR[" + e.getMessage() + "]!!!****"); throw new IllegalStateException(); } }
2 集群配置
2.1單機集群啟動腳本
start-cluster0.bat
set CLUSTER_PROPS=-Ddata.dir=../data-server2 -Djnp.port=2099 -Djnp.rmiPort=2098 -Dhornetq.remoting.netty.port=6445 run ../config/stand-alone/clustered
start-cluster1.bat
set CLUSTER_PROPS=-Ddata.dir=../data-server3 -Djnp.port=3099 -Djnp.rmiPort=3098 -Dhornetq.remoting.netty.port=7445 run ../config/stand-alone/clustered
2.2集群節點啟動腳本
start-node.sh
IP=`/sbin/ip a |grep 'inet '|awk -F'/' '{print $1}'|awk '{print $2}'|grep -v 127.0.0.1|head -1` export CLUSTER_PROPS="-Dhornetq.remoting.netty.host=$IP -Djnp.host=$IP" echo $CLUSTER_PROPS sh run.sh ../config/stand-alone/clustered
2.2.1集群節點停止腳本
stop-node.sh
sh stop.sh ../config/stand-alone/clustered
2.3
.集群配置說明
2.3.1集群發現使用udp協議進行組播
hornetq-configuration.xml
<discovery-groups> <discovery-group name="my-discovery-group"> <local-bind-address>172.16.9.7</local-bind-address> <group-address>231.7.7.7</group-address> <group-port>9876</group-port> <refresh-timeout>10000</refresh-timeout> </discovery-group> </discovery-groups> <connection-factory name="ConnectionFactory"> <discovery-group-ref discovery-group-name="my-discovery-group"/> <entries> <entry name="/ConnectionFactory"/> </entries> </connection-factory>
2.3.2客戶端連接代碼 :
final String groupAddress = "231.7.7.7"; final int groupPort = 9876; ConnectionFactory jmsConnectionFactory = HornetQJMSClient.createConnectionFactory(groupAddress, groupPort); Connection jmsConnection1 = jmsConnectionFactory.createConnection(); Connection jmsConnection2 = jmsConnectionFactory.createConnection();
2.3.3Server Side load balancing
hornetq-configuration.xml
<cluster-connections> <cluster-connection name="my-cluster"> <address>jms</address> <retry-interval>500</retry-interval> <use-duplicate-detection>true</use-duplicate-detection> <forward-when-no-consumers>false</forward-when-no-consumers> <max-hops>1</max-hops> <discovery-group-ref discovery-group-name="my-discovery-group"/> </cluster-connection> </cluster-connections>
2.3.4Client Side load balancing
hornetq-jms.xml
<connection-factory name="ConnectionFactory"> <discovery-group-ref discovery-group-name="my-discovery-group"/> <entries> <entry name="/ConnectionFactory"/> </entries> <ha>true</ha> <connection-load-balancing-policy-class-name> org.hornetq.api.core.client.loadbalance.RandomConnectionLoadBalancingPolicy </connection-load-balancing-policy-class-name> </connection-factory>
3.與spring集成示例
3.1spring配置
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util" xmlns:jee="http://www.springframework.org/schema/jee" xmlns:lang="http://www.springframework.org/schema/lang" xmlns:jms="http://www.springframework.org/schema/jms" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd http://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-3.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd"> <bean id="messageTopic" class="org.hornetq.api.jms.HornetQJMSClient" factory-method="createTopic"> <constructor-arg value="topic1" /> </bean> <bean id="searchAddMessageQueue" class="org.hornetq.api.jms.HornetQJMSClient" factory-method="createQueue"> <constructor-arg value="ExpiryQueue"></constructor-arg> </bean> <!-- <bean id="transportConfiguration" class="org.hornetq.api.core.TransportConfiguration"> <constructor-arg value="org.hornetq.core.remoting.impl.netty.NettyConnectorFactory" /> <constructor-arg> <map key-type="java.lang.String" value-type="java.lang.Object"> <entry key="host" value="localhost"></entry> <entry key="port" value="5445"></entry> </map> </constructor-arg> </bean> --> <bean id="transportConfiguration" class="org.hornetq.api.core.DiscoveryGroupConfiguration"> <constructor-arg name="groupAddress" value="231.7.7.7" /> <constructor-arg name="groupPort" value="9876"> </constructor-arg> </bean> <bean id="connectionFactory" class="org.hornetq.api.jms.HornetQJMSClient" factory-method="createConnectionFactoryWithHA" destroy-method="close"> <constructor-arg type="org.hornetq.api.jms.JMSFactoryType" value="CF" /> <constructor-arg ref="transportConfiguration" /> </bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory" /> <property name="pubSubDomain" value="true" /> </bean> <bean id="topicService" class="org.langke.hornetq.ClientServiceImpl"> <property name="jmsTemplate" ref="jmsTemplate" /> <property name="topic" ref="messageTopic" /> </bean> <bean id="sendMessageService" class="org.langke.hornetq.SendMessageServiceImpl"> <property name="jmsTemplate" ref="jmsTemplate"></property> <property name="searchAddMessageQueue" ref="searchAddMessageQueue"></property> </bean> <!-- this is the Message Driven POJO (MDP) <bean id="messageListener" class="org.langke.hornetq.MessageListenerImpl"> </bean> --> <bean id="receiveMessageListener" class="org.langke.hornetq.ReceiveMessageListenerImpl"></bean> <!-- and this is the message listener container --> <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <!-- <property name="destination" ref="messageTopic" /> --> <property name="destination" ref="searchAddMessageQueue"></property> <property name="messageListener" ref="receiveMessageListener" /> </bean> </beans>
package org.langke.common.hornetq; public interface MessageService { public boolean sendMessage(SerializableObject message) ; }
3.2發送消息
package org.langke.common.hornetq; import java.io.Serializable; public class SerializableObject implements Serializable{ /** * */ private static final long serialVersionUID = 1L; private Object obj ; private Boolean isRetry = true; public Object getObj() { return obj; } public void setObj(Object obj) { this.obj = obj; } public Boolean getIsRetry() { return isRetry; } public void setIsRetry(Boolean isRetry) { this.isRetry = isRetry; } }
package org.langke.common.hornetq; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.ObjectMessage; import javax.jms.Queue; import javax.jms.Session; import org.apache.log4j.Logger; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; public class SendMessageServiceImpl implements MessageService { private static final Logger logger = Logger.getLogger(SendMessageServiceImpl.class); private JmsTemplate jmsTemplate; private Queue searchAddMessageQueue; @Override public boolean sendMessage(SerializableObject message) { return sendQueue(message); } private boolean sendQueue(final SerializableObject so) { try { logger.info("start to send queue to " + searchAddMessageQueue.getQueueName() + ", message : " + so); jmsTemplate.send(searchAddMessageQueue, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { ObjectMessage om = session.createObjectMessage(so); return om; } }); return true; } catch (Exception e) { logger.error("Error: send topic failure:" + e.getMessage(), e); return false; } } public JmsTemplate getJmsTemplate() { return jmsTemplate; } public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } public Queue getSearchAddMessageQueue() { return searchAddMessageQueue; } public void setSearchAddMessageQueue(Queue searchAddMessageQueue) { this.searchAddMessageQueue = searchAddMessageQueue; } }
3.3接收消息
package org.langke.common.hornetq; import java.util.concurrent.atomic.AtomicInteger; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.ObjectMessage; import org.apache.log4j.Logger; public class ReceiveMessageListenerImpl implements MessageListener { private AtomicInteger count = new AtomicInteger(0); private static Logger logger = Logger.getLogger(ReceiveMessageListenerImpl.class); @Override public void onMessage(Message message) { try{ if(message instanceof ObjectMessage){ ObjectMessage objectMessage = (ObjectMessage)message; if(objectMessage.getObject() instanceof SerializableObject){ SerializableObject so = (SerializableObject) objectMessage.getObject(); logger.info(so.getObj()); }else{ logger.info(objectMessage); } }else{ System.out.println(message); } } catch (JMSException e) { logger.error( "Error: receive message from topic failure: " + e.getMessage(), e); }finally{ System.out.println(count.incrementAndGet()); } } }
3.4調用示例
package org.langke.common.hornetq; import java.io.File; import java.util.HashMap; import java.util.Map; import org.springframework.context.ApplicationContext; import org.springframework.context.support.FileSystemXmlApplicationContext; public class Test { private static ApplicationContext ctx; private static Test instance=new Test(); public static Test getInstance(){ return instance; } private Test() { if(ctx == null) { String location = null; if(System.getProperty("os.name").toLowerCase().contains("windows")){ location = "conf/applicationContext.xml"; }else{ location = "../conf/applicationContext.xml"; } File file = new File(location); ctx = new FileSystemXmlApplicationContext(location); } } /** * @param args */ public static void main(String[] args) { getInstance(); MessageService service = ctx.getBean("sendMessageService", MessageService.class); for(int i=0;i<3000;i++){ Map map = new HashMap(); map.put("ooxx", i); SerializableObject so = new SerializableObject(); so.setObj(map); service.sendMessage(so); } } }
4.其它功能
4.1Message expire
HornetQ will not deliver a message to a consumer after it's time to
live has been exceeded.
If the message hasn't been delivered before the time to live is
reached, the server can discard it.
// message will expire in 5000ms from now
message.setExpiration(System.currentTimeMillis() + 5000);
Expiry-address
<!-- expired messages in exampleQueue will be sent to the expiry
address expiryQueue -->
<address-setting match="jms.queue.exampleQueue">
<expiry-address>jms.queue.expiryQueue</expiry-address>
</address-setting>
4.2
Scheduled messages
TextMessage message = session.createTextMessage("MSG");
message.setLongProperty("_HQ_SCHED_DELIVERY", System.currentTimeMillis() + 5000);
producer.send(message);
...
// message will not be received immediately but 5 seconds later
TextMessage messageReceived = (TextMessage) consumer.receive();
4.3Message group
Message groups are sets of messages that have the following characteristics:
• Messages in a message group share the same group id; that is, they have the same group
identifier property (JMSXGroupID for JMS, _HQ_GROUP_ID for HornetQ Core API).
• Messages in a message group are always consumed by the same consumer, even if there
are many consumers on a queue. They pin all messages with the same group id to the same
consumer.
If that consumer closes another consumer is chosen and will receive all messages with the
samegroup id.
Based on message
Message message = ...
message.setStringProperty("JMSXGroupID", "Group-0");
producer.send(message);
message = ...
message.setStringProperty("JMSXGroupID", "Group-0");
producer.send(message);
Based on connection factory...
<connection-factory name="ConnectionFactory">
<connectors>
<connector-ref connector-name="netty-connector"/>
</connectors>
<entries>
<entry name="ConnectionFactory"/>
</entries>
<group-id>Group-0</group-id>
</connection-factory>