JAVA版本:
IntellJ IDEA 版本:
IntelliJ IDEA 2017.2
Build #IU-172.3317.76, built on July 15, 2017
Licensed to Administrator
JRE: 1.8.0_131-release-915-b5 amd64
JVM: OpenJDK 64-Bit Server VM by JetBrains s.r.o
Windows 7 6.1
一、新建Maven工程
1.選擇File => New => Project...
2.或者執行maven 命令行創建工程。
D:\
cd D:\JavaSourceCode\JavaSamples
mvn archetype:generate -DgroupId=com.phpdragon -DartifactId=jms-activeme-mq -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
3.或者手動創建如下目錄結構:
4.如果是手動創建目錄,需設置目錄屬性讓 IDEA 識別為源碼包路徑
二、添加JAR依賴
spring-jms:
spring-test:
activemq-pool:
fastjson:
junit:
testng:
pom.xml配置如下
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.phpdragon</groupId> <artifactId>jms-activemq-demo</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <name>jms-activemq-demo</name> <url>http://maven.apache.org</url> <developers> <developer> <id>phpdragon</id> <name>phpdragon</name> <email>phpdragon@qq.com</email> </developer> </developers> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <fastjson.vesrion>1.2.35</fastjson.vesrion> <activemq-pool.version>5.15.0</activemq-pool.version> <spring.version>4.3.10.RELEASE</spring.version> <junit.version>4.12</junit.version> <testng.version>6.11</testng.version> </properties> <dependencies> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>${fastjson.vesrion}</version> </dependency> <!-- activemq --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>${activemq-pool.version}</version> </dependency> <!-- spring-jms --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>${spring.version}</version> </dependency> <!-- spring --> <!--單元測試--> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>${spring.version}</version> <scope>test</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>${junit.version}</version> <scope>test</scope> </dependency> <!--自動化測試--> <dependency> <groupId>org.testng</groupId> <artifactId>testng</artifactId> <version>${testng.version}</version> <scope>test</scope> </dependency> </dependencies> </project>
三、編寫生產者、消費者
1.添加生產者者MqProducer.java
import com.sun.nio.sctp.MessageInfo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Component; @Component public class MqProducer { @Autowired private JmsTemplate jmsTemplate; public void sendMsg(MessageInfo info) { try { jmsTemplate.convertAndSend(info); } catch (Exception e) { e.printStackTrace(); } } }
2.創建jms消息轉換器MqMessageConverter.java
import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import com.phpdragon.jms.pojo.MessagePojo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jms.support.converter.MessageConversionException; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import org.springframework.stereotype.Component; @Component("messageConverter") public class MqMessageConverter implements org.springframework.jms.support.converter.MessageConverter { private static final Logger LOGGER = LoggerFactory.getLogger(MqMessageConverter.class); public Object fromMessage(Message message) throws JMSException, MessageConversionException { LOGGER.info("從mq獲得message, message內容:" + message); JSONObject jsonRoot = (JSONObject) JSON.parse(message.getStringProperty("obj")); JSONObject jsonObj = JSONObject.parseObject(jsonRoot.getString("value")); MessagePojo info = JSON.toJavaObject(jsonObj.getJSONObject("body"), MessagePojo.class); return info; } public Message toMessage(Object obj, Session session) throws JMSException, MessageConversionException { LOGGER.info("往mq插入message, message內容:" + obj); JSONObject jsonRoot = new JSONObject(); JSONObject jsonObj = new JSONObject(); jsonObj.put("body", obj); jsonRoot.put("value", jsonObj.toJSONString()); Message message = session.createMapMessage(); message.setObjectProperty("obj", jsonRoot.toJSONString()); return message; } }
3.添加spring配置文件, spring-context.xml、app.properties
1)app.properties:
application.main=com.phpdragon.jms.App application.name=jms_activemq_demo_server application.owner=phpdragon mq.queue.name=COM.PHPDRAGON.JMS.DEMO.QUEUE mq.brokerURL=tcp://127.0.0.1:61616
2)spring-context.xml:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.3.xsd"> <!--掃描classpath路徑下的屬性配置文件--> <context:property-placeholder location="classpath*:/*.properties" ignore-resource-not-found="true" "/> <!--配置spring掃描路徑--> <context:component-scan base-package="com.phpdragon.jms"/> <!--創建連接工廠 --> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="${mq.brokerURL}"></property> <property name="useAsyncSend" value="true"></property> </bean> <!-- 聲明ActiveMQ消息目標,目標可以是一個隊列,也可以是一個主題ActiveMQTopic --> <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="${mq.queue.name}"></constructor-arg> </bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory"></property> <property name="defaultDestination" ref="destination"></property> <property name="receiveTimeout" value="6000"></property> <property name="messageConverter" ref="messageConverter"></property> </bean> </beans>
4.編寫啟動程序App.java
import com.phpdragon.jms.activemq.MqProducer; import com.phpdragon.jms.pojo.MessagePojo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.stereotype.Component; import java.io.IOException; @Component public class App { public static final String DEFAULT_CONFIG_LOCATION = "/spring-context.xml"; @Autowired private MqProducer mqProducer; /** * 程序入口 * * @param args * @throws IOException */ public static void main(String[] args) throws IOException { ApplicationContext context = new ClassPathXmlApplicationContext(DEFAULT_CONFIG_LOCATION); App app = (App) context.getBean("app"); app.run(args); } public void run(String[] args) { MessagePojo msg = new MessagePojo(); msg.setTitle("Test"); msg.setContent("TestContent"); mqProducer.sendMsg(msg); System.exit(0); } }
5.到此,一個activeMQ發送程序就寫好了,選中App.java的main函數體,鼠標右鍵點擊 debug 運行,執行效果如下:
6.登錄activeMQ管理后台,http://127.0.0.1:8161/admin/queues.jsp, 默認帳號: admin 密碼: admin
7.添加消費者
1)創建MqConsumer.java
import com.phpdragon.jms.pojo.MessagePojo; import org.springframework.stereotype.Component; import javax.jms.JMSException; @Component public class MqConsumer{ public void handleMessage(MessagePojo msg) throws JMSException { System.out.println("handleMessage:" + msg.toString()); } }
2)添加activeMQ 監聽配置
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.3.xsd"> <!--掃描classpath路徑下的屬性配置文件--> <context:property-placeholder location="classpath*:/*.properties" ignore-resource-not-found="true"/> <!--配置spring掃描路徑--> <context:component-scan base-package="com.phpdragon.jms"/> <!--創建連接工廠 --> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="${mq.brokerURL}"></property> <property name="useAsyncSend" value="true"></property> </bean> <!-- 聲明ActiveMQ消息目標,目標可以是一個隊列,也可以是一個主題ActiveMQTopic --> <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="${mq.queue.name}"></constructor-arg> </bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory"></property> <property name="defaultDestination" ref="destination"></property> <property name="receiveTimeout" value="6000"></property> <property name="messageConverter" ref="messageConverter"></property> </bean> <!-- 消息監聽適配器 --> <bean id="myMessageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> <property name="delegate" ref="mqConsumer"/> <property name="messageConverter" ref="messageConverter"/> </bean> <bean id="mqContainer" class="org.springframework.jms.listener.SimpleMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"/> <property name="destinationName" value="${mq.queue.name}"/> <!-- 使用MessageListenerAdapter來作為消息監聽器 --> <property name="messageListener" ref="myMessageListenerAdapter"/> <!--最小並發數是4,最大並發數為8--> <property name="concurrency" value="4-8"/> <property name="sessionTransacted" value="true"/> </bean> </beans>
3) 右鍵debug運行,效果如下:
4) 查看消費情況
四、集成logback
是否覺得debug日志太簡單?那我們引入logback支持。實現豐富日志輸出、日志back等
1.添加logback-classic Maven依賴
<!-- logback --> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.2.3</version> </dependency>
2.在resources資源目錄中添加logback.xml文件
<?xml version="1.0" encoding="UTF-8" ?> <configuration> <!--日志配置 --> <property name="LOG_BACK_DIR" value="logs"/> <!-- logback 工程的日志配置 --> <appender name="DEBUG_ROLLING" class="ch.qos.logback.core.rolling.RollingFileAppender"> <file>${LOG_BACK_DIR}/debug.log</file> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <!-- rollover daily --> <fileNamePattern>${LOG_BACK_DIR}/debug_%d{yyyyMMddHH}.%i.log </fileNamePattern> <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"> <!-- or whenever the file size reaches 100MB --> <maxFileSize>256MB</maxFileSize> </timeBasedFileNamingAndTriggeringPolicy> <maxHistory>48</maxHistory> <!-- 保存最大文件數 --> </rollingPolicy> <encoder> <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS}|%X{threadId}|%level|%C|%M|%L|%.-512msg%n </pattern> <charset>UTF-8</charset> </encoder> <filter class="ch.qos.logback.classic.filter.ThresholdFilter"><!-- 只打印錯誤日志 --> <level>TRACE</level> </filter> </appender> <!-- logback 工程的日志配置 END --> <!-- 開發環境使用 打印在控制台 --> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <layout class="ch.qos.logback.classic.PatternLayout"> <param name="Pattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS}|%X{threadId}|%X{traceId}-%X{rpcId}|%level|%C|%M|%L|%.-512msg%n"/> </layout> </appender> <logger name="org.springframework" level="WARN"/> <!--開發環境為DEBUG等級 --> <root level="DEBUG"> <appender-ref ref="STDOUT"/> <appender-ref ref="INFO_ROLLING"/> </root> </configuration>
3.右鍵debug運行,效果如下:
五、集成assembly
六、單元測試與自動測試
七、編譯並上傳遠程倉庫
八、項目部署
源碼地址:
有XML配置版本: https://github.com/phpdragon/JavaSamples/tree/master/jms-activemq-demo-xml
spring注解版本:https://github.com/phpdragon/JavaSamples/tree/master/jms-activemq-demo-annotation
spring-boot版本:https://github.com/phpdragon/JavaSamples/tree/master/spring-boot-starter-activemq-demo