Activemq消息類型


Activemq消息類型
JMS規范中的消息類型包括TextMessageMapMessageObjectMessageBytesMessageStreamMessage等五種。ActiveMQ也有對應的實現,下面我們結合Spring JMS分別來看一下五種消息類型的收發代碼。
1、TextMessage
/**
     * 向指定Destination發送text消息
     * @param destination
     * @param message
     */
    public void sendTxtMessage(Destination destination, final String message){
        if(null == destination){
            destination = jmsTemplate.getDefaultDestination();
        }
        jmsTemplate.send(destination, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(message);
            }
        });
        System.out.println("springJMS send text message...");
    }

2、MapMessage

/**
     * 向指定Destination發送map消息
     * @param destination
     * @param message
     */
    public void sendMapMessage(Destination destination, final String message){
        if(null == destination){
            destination = jmsTemplate.getDefaultDestination();
        }
        jmsTemplate.send(destination, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                MapMessage mapMessage = session.createMapMessage();
                mapMessage.setString("msgId",message);
                return mapMessage;
            }
        });
        System.out.println("springJMS send map message...");
    }

3、ObjectMessage

 /**
     * 向指定Destination發送序列化的對象
     * @param destination
     * @param object object 必須序列化
     */
    public void sendObjectMessage(Destination destination, final Serializable object){
        if(null == destination){
            destination = jmsTemplate.getDefaultDestination();
        }
        jmsTemplate.send(destination, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createObjectMessage(object);
            }
        });
        System.out.println("springJMS send object message...");
    }

4、BytesMessage

/**
     * 向指定Destination發送字節消息
     * @param destination
     * @param bytes
     */
    public void sendBytesMessage(Destination destination, final byte[] bytes){
        if(null == destination){
            destination = jmsTemplate.getDefaultDestination();
        }
        jmsTemplate.send(destination, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                BytesMessage bytesMessage = session.createBytesMessage();
                bytesMessage.writeBytes(bytes);
                return bytesMessage;

            }
        });
        System.out.println("springJMS send bytes message...");
    }

5、streamMessage

/**
     * 向默認隊列發送Stream消息
     */
    public void sendStreamMessage(Destination destination) {
        jmsTemplate.send(new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                StreamMessage message = session.createStreamMessage();
                message.writeString("stream string");
                message.writeInt(11111);
                return message;
            }
        });
        System.out.println("springJMS send Strem message...");
    }

消息接收處理

 /**
     * 根據消息類型進行對應的處理
     * @param destination 消息發送/接收共同的Destination
     * @throws JMSException
     */
    public void receive(Destination destination) throws JMSException {
        Message message = jmsTemplate.receive(destination);

        // 如果是文本消息
        if (message instanceof TextMessage) {
            TextMessage tm = (TextMessage) message;
            System.out.println("from" + destination.toString() + " get textMessage:\t" + tm.getText());
        }

        // 如果是Map消息
        if (message instanceof MapMessage) {
            MapMessage mm = (MapMessage) message;
            System.out.println("from" + destination.toString() + " get textMessage:\t" + mm.getString("msgId"));
        }

        // 如果是Object消息
        if (message instanceof ObjectMessage) {
            ObjectMessage om = (ObjectMessage) message;
            ExampleUser exampleUser = (ExampleUser) om.getObject();
            System.out.println("from" + destination.toString() + " get ObjectMessage:\t"
                    + ToStringBuilder.reflectionToString(exampleUser));
        }

        // 如果是bytes消息
        if (message instanceof BytesMessage) {
            byte[] b = new byte[1024];
            int len = -1;
            BytesMessage bm = (BytesMessage) message;
            while ((len = bm.readBytes(b)) != -1) {
                System.out.println(new String(b, 0, len));
            }
        }

        // 如果是Stream消息
        if (message instanceof StreamMessage) {
            StreamMessage sm = (StreamMessage) message;
            System.out.println(sm.readString());
            System.out.println(sm.readInt());
        }
    }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM