ActiveMq性能優化


ActiveMq運行是比較穩定的,數據的吞吐速度也很高,如果出現入隊列或者出隊列慢的問題,先檢查一下自己的代碼,是不是本身取到數據后處理過慢。

本文的關於性能優化,其實是列舉出一些需要注意的點,請確保你的項目沒有一下問題:

1. 使用spring的JmsTemplate

 JmsTemplate的send和convertAndSend會使用持久化mode,即使你設置了NON_PERSISTENT。這會導致入隊列速度變得非常慢。

 解決辦法,使用下面的MyJmsTemplate代替JmsTemplate。

public class MyJmsTemplate extends JmsTemplate {
    private Session session;

    public MyJmsTemplate() {
        super();
    }

    public MyJmsTemplate(ConnectionFactory connectionFactory) {
        super(connectionFactory);
    }

    public void doSend(MessageProducer producer, Message message) throws JMSException {
        if (isExplicitQosEnabled()) {
            producer.send(message, getDeliveryMode(), getPriority(), getTimeToLive());
        } else {
            producer.send(message);
        }
    }

    public Session getSession() {
        return session;
    }

    public void setSession(Session session) {
        this.session = session;
    }
}

 

2. DeliveryMode的選擇,如果你入隊列的數據,不考慮MQ掛掉的情況(這概率很小),使用NON_PERSISTENT會顯著提高數據寫入速度。

3. 生產者使用事物會提高入隊列性能,但是消費者如果啟動了事物則會顯著影響數據的消費速度。相關代碼如下:

    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

代碼中的false代表不啟動事物。

4. 消費者的消息處理即onMessage方法優化,舉例如下:

public class SmsMoPool implements MessageListener {
    private final static Logger logger = LoggerFactory.getLogger(SmsMoPool.class);
    private DefaultEventPubliser moEventPublisher;
    private final EventFactory eventFactory = new DefaultEventFactory();
    private DefaultDataGather dataGather;
    private ExecutorService pool = Executors.newFixedThreadPool(5);

    @Override
    public void onMessage(final Message message) {
        pool.execute(new Runnable() {
            @Override
            public void run() {
                final ObjectMessage msg = (ObjectMessage) message;
                Serializable obj = null;
                try {
                    obj = msg.getObject();
                } catch (JMSException e) {
                    logger.error("從消息隊列獲得上行信息異常{}", e);
                }
                if (obj != null) {
                    dataGather.incrementDateCount(MasEntityConstants.TRAFFIC_SMS_MO_IN);
                    AgentToServerReq req = (AgentToServerReq) obj;
                    if (logger.isInfoEnabled()) {
                        logger.info("驅動-->調度:{}", req.toXmlStr());
                    }
                    Event event = eventFactory.createMoEvent(req);
                    moEventPublisher.publishEvent(event);
                }
            }
        });
    }
}

這段代碼使用了線程池,另一點要注意的是msg.getObject();這個方法是一個比較耗時的方法,你的代碼中不應該出現多次getObject()。

5. 消費者使用預取,如何使用預取,下面以spring版本為例

  <bean class="org.apache.activemq.command.ActiveMQQueue">
                <constructor-arg value="data.mo?consumer.prefetchSize=100"/>
            </bean>

預取數量根據具體入隊列數據而定,以上設置100,是針對2000/sec入隊列速度設定的。
另外如果是慢消費者,這里可設置為1。

6. 檢查你的MQ數據吞吐速度,保持生產和消費的平衡,不會出現大量積壓。

7. ActiveMQ使用TCP協議時 tcpNoDelay=默認是false ,設置為true可以提高性能。

還是spring版本的:

 <bean id="mqPoolConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
        <property name="connectionFactory">
            <bean id="mqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" p:useAsyncSend="true"
                  p:brokerURL="failover://(tcp://127.0.0.1:61616?tcpNoDelay=true)"/>
        </property>
    </bean>


 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM