jar文件:spring3.1jar,以及
項目src路徑下文件:config.properties
讀取config.properties文件JAVA類:
package com.lejob.lejobmy.config;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
public class Configuration
{
private Properties propertie;
private FileOutputStream outputFile;
/**
* 初始化Configuration類
*/
public Configuration()
{
propertie = new Properties();
}
/**
* 初始化Configuration類
* @param filePath 要讀取的配置文件的路徑+名稱
*/
public Configuration(String filePath)
{
propertie = new Properties();
try {
InputStream inputStream = this.getClass().getClassLoader().getResourceAsStream(filePath);
propertie.load(inputStream);
inputStream.close();
} catch (FileNotFoundException ex) {
System.out.println("讀取屬性文件--->失敗!- 原因:文件路徑錯誤或者文件不存在");
ex.printStackTrace();
} catch (IOException ex) {
System.out.println("裝載文件--->失敗!");
ex.printStackTrace();
}
}//end ReadConfigInfo(...)
/**
* 重載函數,得到key的值
* @param key 取得其值的鍵
* @return key的值
*/
public String getValue(String key)
{
if(propertie.containsKey(key)){
String value = propertie.getProperty(key);//得到某一屬性的值
return value;
}
else
return "";
}//end getValue(...)
/**
* 重載函數,得到key的值
* @param fileName properties文件的路徑+文件名
* @param key 取得其值的鍵
* @return key的值
*/
public String getValue(String fileName, String key)
{
try {
String value = "";
InputStream inputStream = this.getClass().getClassLoader().getResourceAsStream(fileName);
propertie.load(inputStream);
inputStream.close();
if(propertie.containsKey(key)){
value = propertie.getProperty(key);
return value;
}else
return value;
} catch (FileNotFoundException e) {
e.printStackTrace();
return "";
} catch (IOException e) {
e.printStackTrace();
return "";
} catch (Exception ex) {
ex.printStackTrace();
return "";
}
}//end getValue(...)
/**
* 清除properties文件中所有的key和其值
*/
public void clear()
{
propertie.clear();
}//end clear();
/**
* 改變或添加一個key的值,當key存在於properties文件中時該key的值被value所代替,
* 當key不存在時,該key的值是value
* @param key 要存入的鍵
* @param value 要存入的值
*/
public void setValue(String key, String value)
{
propertie.setProperty(key, value);
}//end setValue(...)
/**
* 將更改后的文件數據存入指定的文件中,該文件可以事先不存在。
* @param fileName 文件路徑+文件名稱
* @param description 對該文件的描述
*/
public void saveFile(String fileName, String description)
{
try {
outputFile = new FileOutputStream(fileName);
propertie.store(outputFile, description);
outputFile.close();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException ioe){
ioe.printStackTrace();
}
}//end saveFile(...)
public static void main(String[] args)
{
Configuration rc = new Configuration("config.properties");//相對路徑
String ip = rc.getValue("activemq_url");//以下讀取properties文件的值
String host = rc.getValue("activemq_user");
String tab = rc.getValue("activemq_pw");
//以下輸出properties讀出的值
System.out.println("ip = " + ip);
System.out.println("host = " + host);
System.out.println("tab = " + tab);
// Configuration cf = new Configuration();
// String ipp = cf.getValue("./config/test.properties", "ip");
// System.out.println("ipp = " + ipp);
// cf.clear();
// cf.setValue("min", "999");
// cf.setValue("max", "1000");
// cf.saveFile("./config/save.perperties", "test");
// Configuration saveCf = new Configuration();
// saveCf.setValue("min", "10");
// saveCf.setValue("max", "1000");
// saveCf.saveFile("./config/save.perperties");
}
}
ActiveMQ消息生產者工廠類ActivemqConnectionFactory.java:
package com.lejob.lejobsearch.index.queue;
import java.io.Serializable;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.apache.log4j.Logger;
import com.lejob.rpc.search.model.QueueModel;
public class ActivemqConnectionFactory {
private static Logger logger = Logger.getLogger(ActivemqConnectionFactory.class);
//設置連接的最大連接數
private static int maxConnections = 5;
//設置每個連接中使用的最大活動會話數
private static int maximumActiveSessionPerConnection = 300;
//線程池數量
private static int threadPoolSize = 50;
//強制使用同步返回數據的格式
private static boolean useAsyncSendForJMS = true;
private static ExecutorService threadPool;
private static PooledConnectionFactory connectionFactory;
public void init(){
try {
//設置JAVA線程池
threadPool = Executors.newFixedThreadPool(threadPoolSize);
//ActiveMQ的連接工廠
ActiveMQConnectionFactory actualConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConstant.ACTIVEMQ_USER_NAME, ActiveMQConstant.ACTIVEMQ_USER_PW, ActiveMQConstant.ACTIVEMQ_URL);
actualConnectionFactory.setUseAsyncSend(useAsyncSendForJMS);
//Active中的連接池工廠
connectionFactory = new PooledConnectionFactory(actualConnectionFactory);
connectionFactory.setCreateConnectionOnStartup(true);
connectionFactory.setMaxConnections(maxConnections);
connectionFactory.setMaximumActiveSessionPerConnection(maximumActiveSessionPerConnection);
logger.info("INIT ACTIVEMQ POOL CONNECTION FACTORY SUCCESS......");
} catch (Exception e) {
logger.error("ACTIVEMQ CONNECTION INIT ERROR......", e);
}
}
public void destroy(){
try {
if(connectionFactory != null){
connectionFactory.stop();
logger.info("STOP ACTIVEMQ CONNECTION FACTORY SUCCESS......");
}
} catch (Exception e) {
logger.error("STOP ACTIVEMQ CONNECTION FACTORY ERROR!!!", e);
}
}
/**
* 從連接池獲取鏈接
* @author 程松
* @date 2013-12-7 下午01:46:02
* @return
* @see [類、類#方法、類#成員]
*/
public static Connection getConnection(){
try {
//從連接池工廠中獲取一個連接
Connection connection = connectionFactory.createConnection();
connection.start();
return connection;
} catch (JMSException e) {
e.printStackTrace();
}
return null;
}
/**
* 鏈接打開session回話
* @author 程松
* @date 2013-12-7 下午01:46:19
* @param conn
* @return
* @see [類、類#方法、類#成員]
*/
public static Session getSession(Connection conn){
Session session = null;
try {
//false 參數表示 為非事務型消息,后面的參數表示消息的確認類型
session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
} catch (JMSException e) {
e.printStackTrace();
}
return session;
}
/**
* 創建一個生產者
* @author 程松
* @date 2013-12-7 下午01:48:14
* @param session
* @return
* @see [類、類#方法、類#成員]
*/
public static MessageProducer getProducer(Session session, String queue_name){
try {
Destination destination = session.createQueue(queue_name);
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
return producer;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
* 創建一個消費者
* @author 程松
* @date 2013-12-7 下午01:53:28
* @param session
* @param queue_name
* @return
* @see [類、類#方法、類#成員]
*/
public static MessageConsumer getConsumer(Session session, String queue_name){
try {
Destination destination = session.createQueue(queue_name);
MessageConsumer consumer = session.createConsumer(destination);
return consumer;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
* 創建文本消息
* @author 程松
* @date 2013-12-7 下午02:05:05
* @param session
* @param msg
* @return
* @throws JMSException
* @see [類、類#方法、類#成員]
*/
public static TextMessage getMessage(Session session, String msg) throws JMSException {
TextMessage message = session.createTextMessage(msg);
return message;
}
/**
* 創建對象消息
* @author 程松
* @date 2013-12-7 下午02:06:51
* @param session
* @param obj
* @return
* @throws JMSException
* @see [類、類#方法、類#成員]
*/
public static ObjectMessage getMessage(Session session, Serializable obj) throws JMSException {
ObjectMessage message = session.createObjectMessage(obj);
return message;
}
/**
* 創建map消息
* @author 程松
* @date 2013-12-7 下午02:17:58
* @param session
* @param map
* @return
* @throws JMSException
* @see [類、類#方法、類#成員]
*/
public static MapMessage getMessage(Session session, Map map) throws JMSException {
MapMessage message = session.createMapMessage();
Set<String> set = map.keySet();
Iterator<String> it = set.iterator();
while(it.hasNext()){
String key = it.next();
String value = String.valueOf(map.get(key));
message.setString(key, value);
}
return message;
}
/**
* 發送文本消息
* @author 程松
* @date 2013-12-7 下午02:30:46
* @param model
* @see [類、類#方法、類#成員]
*/
public static void send(final String msg, final String queue_name) {
//直接使用線程池來執行具體的調用
threadPool.execute(new Runnable(){
public void run() {
try {
sendMsg(msg, queue_name);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
/**
* 真正的執行消息發送
* @author 程松
* @date 2013-12-7 下午02:33:02
* @param msg
* @param queue_name
* @throws Exception
* @see [類、類#方法、類#成員]
*/
private static void sendMsg(String msg, String queue_name) throws Exception {
Connection connection = null;
Session session = null;
try {
//從連接池工廠中獲取一個連接
connection = ActivemqConnectionFactory.getConnection();
session = ActivemqConnectionFactory.getSession(connection);
MessageProducer producer = ActivemqConnectionFactory.getProducer(session, queue_name);
Message message = ActivemqConnectionFactory.getMessage(session, msg);
producer.send(message);
} finally {
ActivemqConnectionFactory.closeSession(session);
ActivemqConnectionFactory.closeConnection(connection);
}
}
/**
* 發送對象消息
* @author 程松
* @date 2013-12-7 下午02:33:40
* @param model
* @param queue_name
* @see [類、類#方法、類#成員]
*/
public static void send(final QueueModel model, final String queue_name) {
//直接使用線程池來執行具體的調用
threadPool.execute(new Runnable(){
public void run() {
try {
sendMsg(model, queue_name);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
/**
* 真正的執行消息發送
* @author 程松
* @date 2013-12-7 下午02:36:44
* @param model
* @param queue_name
* @throws Exception
* @see [類、類#方法、類#成員]
*/
private static void sendMsg(QueueModel model, String queue_name) throws Exception {
Connection connection = null;
Session session = null;
try {
//從連接池工廠中獲取一個連接
connection = ActivemqConnectionFactory.getConnection();
session = ActivemqConnectionFactory.getSession(connection);
MessageProducer producer = ActivemqConnectionFactory.getProducer(session, queue_name);
Message message = ActivemqConnectionFactory.getMessage(session, model);
producer.send(message);
} finally {
ActivemqConnectionFactory.closeSession(session);
ActivemqConnectionFactory.closeConnection(connection);
}
}
/**
* 發送MAP消息
* @author 程松
* @date 2013-12-7 下午02:34:29
* @param model
* @see [類、類#方法、類#成員]
*/
public static void send(final Map map, final String queue_name) {
//直接使用線程池來執行具體的調用
threadPool.execute(new Runnable(){
public void run() {
try {
sendMsg(map, queue_name);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
/**
* 真正的執行消息發送
* @author 程松
* @date 2013-12-7 下午02:36:14
* @param map
* @param queue_name
* @throws Exception
* @see [類、類#方法、類#成員]
*/
private static void sendMsg(Map map, String queue_name) throws Exception {
Connection connection = null;
Session session = null;
try {
//從連接池工廠中獲取一個連接
connection = ActivemqConnectionFactory.getConnection();
session = ActivemqConnectionFactory.getSession(connection);
MessageProducer producer = ActivemqConnectionFactory.getProducer(session, queue_name);
Message message = ActivemqConnectionFactory.getMessage(session, map);
producer.send(message);
} finally {
ActivemqConnectionFactory.closeSession(session);
ActivemqConnectionFactory.closeConnection(connection);
}
}
/**
* 獲取文本消息
* @author 程松
* @date 2013-12-7 下午02:18:13
* @param message
* @return
* @see [類、類#方法、類#成員]
*/
public static String getText(Message message){
if (message instanceof TextMessage) {
//強制轉換一下
TextMessage txtMsg = (TextMessage) message;
//輸出接收到的消息
String model = null;
try {
model = txtMsg.getText();
return model;
} catch (JMSException e) {
e.printStackTrace();
}
}
return null;
}
/**
* 獲取對象消息
* @author 程松
* @date 2013-12-7 下午02:18:41
* @param message
* @return
* @see [類、類#方法、類#成員]
*/
public static Object getObject(Message message){
if (message instanceof ObjectMessage) {
//強制轉換一下
ObjectMessage txtMsg = (ObjectMessage) message;
//輸出接收到的消息
Object model = null;
try {
model = txtMsg.getObject();
return model;
} catch (JMSException e) {
e.printStackTrace();
}
}
return null;
}
/**
* 獲取map消息
* @author 程松
* @date 2013-12-7 下午02:19:00
* @param message
* @return
* @see [類、類#方法、類#成員]
*/
public static Map getMap(Message message){
if (message instanceof MapMessage) {
//強制轉換一下
MapMessage txtMsg = (MapMessage) message;
//輸出接收到的消息
Map map = new HashMap();
try {
Enumeration<String> e = txtMsg.getMapNames();
while(e.hasMoreElements()){
String key = e.nextElement();
map.put(key, txtMsg.getString(key));
}
return map;
} catch (JMSException e) {
e.printStackTrace();
}
}
return null;
}
/**
* 關閉回話
* @author 程松
* @date 2013-12-7 下午01:46:55
* @param session
* @see [類、類#方法、類#成員]
*/
public static void closeSession(Session session) {
try {
if (session != null) {
session.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 關閉鏈接
* @author 程松
* @date 2013-12-7 下午01:47:06
* @param connection
* @see [類、類#方法、類#成員]
*/
public static void closeConnection(Connection connection) {
try {
if (connection != null) {
connection.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
spring.xml中配置:
<!-- actviemq factory -->
<bean class="com.lejob.lejobsearch.index.queue.ActivemqConnectionFactory" init-method="init" destroy-method="destroy">
</bean>
,項目啟動時加載初始化ActivemqConnectionFactory。
項目中ActivemqConnectionFactory.send(param,queue_name);發送消息。
下面是消息消費者:IMessageHandler.java(消息處理任務接口)、RecrQueueMessageHandler.java(消息處理類)、RecrQueueConsumer.java(JMS消息消費者)、MultiThreadMessageListener.java(消息消費者中使用的多線程消息監聽服務)、FixedAndBlockedThreadPoolExecutor.java(支持阻塞的固定大小的線程池)
IMessageHandler.java
package com.lejob.lejobsearch.index.queue;
import javax.jms.Message;
/**
* 提供消息操作的回調接口
*
* @author 程松
* @date 2013-12-7上午10:49:49
* @company 樂享網絡(5lejob)北京研發中心
* @version [Copyright (c) 2013 V001]
* @see [相關類/方法]
* @since [產品/模塊版本]
*/
public interface IMessageHandler {
/**
* 消息回調提供的調用方法
* @author 程松
* @date 2013-12-7 上午10:28:58
* @param message
* @see [類、類#方法、類#成員]
*/
public void handle(Message message);
}
RecrQueueConsumer.java
package com.lejob.lejobsearch.index.queue.recruitment;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.log4j.Logger;
import com.lejob.lejobsearch.index.queue.ActiveMQConstant;
import com.lejob.lejobsearch.index.queue.ActivemqConnectionFactory;
import com.lejob.lejobsearch.index.queue.IMessageHandler;
import com.lejob.lejobsearch.index.queue.MultiThreadMessageListener;
/**
* JMS消息消費者
*
* @author 程松
* @date 2013-11-30下午04:00:09
* @company 樂享網絡(5lejob)北京研發中心
* @version [Copyright (c) 2013 V001]
* @see [相關類/方法]
* @since [產品/模塊版本]
*/
public class RecrQueueConsumer implements ExceptionListener {
private Logger logger = Logger.getLogger(RecrQueueConsumer.class);
/**
* 監聽消息線程最大值
*/
private static final int THREAD_COUNT = 1;
/**
* 消息處理類
*/
private IMessageHandler messageHandler;
private Connection connection;
private Session session;
public void init(){
try {
connection = ActivemqConnectionFactory.getConnection();
//會話采用非事務級別,消息到達機制使用自動通知機制
session = ActivemqConnectionFactory.getSession(connection);
MessageConsumer consumer = ActivemqConnectionFactory.getConsumer(session, ActiveMQConstant.RECR_QUEUE);
consumer.setMessageListener(new MultiThreadMessageListener(THREAD_COUNT, messageHandler));
logger.info(ActiveMQConstant.RECR_QUEUE + " LISTENER INIT SUCCESS......");
} catch (Exception e) {
logger.error("處理隊列" + ActiveMQConstant.RECR_QUEUE + "信息 異常", e);
}
}
public void destroy(){
try {
ActivemqConnectionFactory.closeSession(session);
ActivemqConnectionFactory.closeConnection(connection);
logger.info(ActiveMQConstant.RECR_QUEUE + " LISTENER DESTROY SUCCESS......");
} catch (Exception e) {
logger.error("關閉監聽隊列" + ActiveMQConstant.RECR_QUEUE + " 異常", e);
}
}
public void onException(JMSException e) {
e.printStackTrace();
}
public void setMessageHandler(IMessageHandler messageHandler) {
this.messageHandler = messageHandler;
}
}
MultiThreadMessageListener.java
package com.lejob.lejobsearch.index.queue.recruitment;
import java.util.concurrent.ExecutorService;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import com.lejob.lejobsearch.index.queue.MessageHandler;
import com.lejob.rpc.search.model.QueueModel;
/**
* 消息消費者中使用的多線程消息監聽服務
*
*/
public class MultiThreadMessageListener implements MessageListener {
//默認線程池數量
public final static int DEFAULT_HANDLE_THREAD_POOL=10;
//最大的處理線程數.
private int maxHandleThreads;
//提供消息回調調用接口
private MessageHandler messageHandler;
private ExecutorService handleThreadPool;
public MultiThreadMessageListener(MessageHandler messageHandler){
this(DEFAULT_HANDLE_THREAD_POOL, messageHandler);
}
public MultiThreadMessageListener(int maxHandleThreads,MessageHandler messageHandler){
this.maxHandleThreads=maxHandleThreads;
this.messageHandler=messageHandler;
//支持阻塞的固定大小的線程池(自行手動創建的)
this.handleThreadPool = new FixedAndBlockedThreadPoolExecutor(this.maxHandleThreads);
}
/**
* 監聽程序中自動調用的方法
*/
public void onMessage(final Message message) {
//使用支持阻塞的固定大小的線程池來執行操作
this.handleThreadPool.execute(new Runnable() {
public void run() {
try {
if (message instanceof ObjectMessage) {
//強制轉換一下
ObjectMessage txtMsg = (ObjectMessage) message;
//輸出接收到的消息
QueueModel model = null;
try {
model = (QueueModel) txtMsg.getObject();
MultiThreadMessageListener.this.messageHandler.handle(model);
} catch (JMSException e) {
e.printStackTrace();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
FixedAndBlockedThreadPoolExecutor.java
package com.lejob.lejobsearch.index.queue.recruitment;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* 支持阻塞的固定大小的線程池
*
*/
public class FixedAndBlockedThreadPoolExecutor extends ThreadPoolExecutor {
//一個可重入的互斥鎖 Lock,它具有與使用 synchronized 方法和語句所訪問的隱式監視器鎖相同的一些基本行為和語義,但功能更強大。
//使用 lock 塊來調用 try,在之前/之后的構造中
private ReentrantLock lock = new ReentrantLock();
private Condition condition = this.lock.newCondition();
public FixedAndBlockedThreadPoolExecutor(int size) {
super(size, size, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
/**
* 當線程池中沒有空閑線程時,會掛起此方法的調用線程.直到線程池中有線程有空閑線程.
*/
@Override
public void execute(Runnable command) {
//進行同步鎖定
this.lock.lock();
super.execute(command);
try {
//如果線程池的數量已經達到最大線程池的數量,則進行掛起操作
if (getPoolSize() == getMaximumPoolSize()) {
this.condition.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
this.lock.unlock();
}
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
try {
this.lock.lock();
this.condition.signal();
} finally {
this.lock.unlock();
}
}
}
項目spring.xml文件中配置
<!-- 任務處理類 -->
<bean id="recrQueueMessageHandler" class="com.lejob.lejobsearch.index.queue.recruitment.RecrQueueMessageHandler">
</bean>
<!-- JMS消息消費者 -->
<bean class="com.lejob.lejobsearch.index.queue.recruitment.RecrQueueConsumer" init-method="init" destroy-method="destroy">
<property name="messageHandler" ref="recrQueueMessageHandler"></property>
</bean>
本文參考:http://blog.csdn.net/linwei_1029/article/details/16964943