redis實現分布式消息隊列


1背景

1.1為什么需要消息隊列

當系統中出現“生產“和“消費“的速度或穩定性等因素不一致的時候,就需要消息隊列,作為抽象層,彌合雙方的差異。
舉個例子:很多網站注冊需要發送短信驗證碼,有可能在某個時段有大量的人注冊,但是發送短信的模塊速度相對較慢。如果不使用消息隊列就會導致大量的注冊阻塞在發送短信驗證碼這個階段而無法進行下面的操作。
比如去銀行辦理業務,窗口數量是有限的,所以就需要排隊,按次序辦理業務

  • 提高響應速度
    有了消息隊列,消息的生產者可以將消息投放到消息隊列中並馬上返回,而不必阻塞等待消息被消費;而消費者只要從消息隊列中提取消息就可以了。這樣就大大提高了系統的相應速度。
  • 提高系統的穩定性
    比如一個訂單系統,訂單系統分為下訂單模塊和處理訂單模塊。處理訂單模塊掛了的情況下,如果有了消息隊列,下單系統還能正常下單,當訂單處理系統重啟恢復則可以繼續處理訂單。不會因為一個系統故障而導致整個系統的故障。

2實現

2.1 首先需要添加兩個依賴

<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-pool2 -->
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
    <version>2.6.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>2.9.0</version>
</dependency>

2.2 生產者線程

import java.util.concurrent.TimeUnit;

import redis.clients.jedis.Jedis;

public class Productor implements Runnable{
	public static final String PRODUCE_MESSAGE = "message:apple";
	
	@Override
	public void run() {
		//Jedis jedis = RedisSingleton.getJedis();
		//Jedis jedis = new Jedis("localhost");
		int cnt=0;
		while(true) {
			cnt++;
			Jedis jedis = JedisPoolClass.getResource();
			long size = jedis.lpush(PRODUCE_MESSAGE, "message"+cnt);
			jedis.close();
			System.out.println("還有 "+size+" 條消息未處理");
			try {
				if(size<15) TimeUnit.SECONDS.sleep(1);
				else TimeUnit.SECONDS.sleep(3);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

}

2.3 消費者線程

import java.util.concurrent.TimeUnit;

import redis.clients.jedis.Jedis;

public class Productor implements Runnable{
	public static final String PRODUCE_MESSAGE = "message:apple";
	
	@Override
	public void run() {
		int cnt=0;
                //Jedis jedis = RedisSingleton.getJedis();
		//Jedis jedis = new Jedis("localhost");
		while(true) {
			cnt++;
			Jedis jedis = JedisPoolClass.getResource();
			long size = jedis.lpush(PRODUCE_MESSAGE, "message"+cnt);
			jedis.close();
			System.out.println("還有 "+size+" 條消息未處理");
			try {
				if(size<15) TimeUnit.SECONDS.sleep(1);
				else TimeUnit.SECONDS.sleep(3);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

}

2.4 連接池

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

public class JedisPoolClass {
	private static JedisPool pool = null;
	static {
		pool = new JedisPool("localhost", 6379);
	}
	public static Jedis getResource() {
		return pool.getResource();
	}
}

2.5 測試代碼

	private static void ProCusTest() {
		ExecutorService es = Executors.newCachedThreadPool();
		Productor productor = new Productor();
		Customer customer = new Customer("customer1");
		Customer customer2 = new Customer("customer2");
		es.submit(productor);
		es.submit(customer);
		es.submit(customer2);
		while(Thread.activeCount()<=1) {}
	}
}

在進行測試的時候,最開始沒有使用連接池,而是使用消費者和生產者代碼中注釋的部分(注釋部分代碼是通過單例模式來獲取一個Jedis實例)來獲取Jedis實例。這樣會導致只有一個線程能夠實際與Redis服務器交互或者有時候又能正常進行消費-生產過程。不知道是什么原因導致?但是應該是單例的連接導致的,其中具體的原因需要進一步的研究。因此,這里采用池化技術,每次使用完放回連接池,這種情況下能夠正常使用。也可以每一次生產或者消費任務都重新獲取Jedis連接,用完再釋放。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM