1背景
1.1为什么需要消息队列
当系统中出现“生产“和“消费“的速度或稳定性等因素不一致的时候,就需要消息队列,作为抽象层,弥合双方的差异。
举个例子:很多网站注册需要发送短信验证码,有可能在某个时段有大量的人注册,但是发送短信的模块速度相对较慢。如果不使用消息队列就会导致大量的注册阻塞在发送短信验证码这个阶段而无法进行下面的操作。
比如去银行办理业务,窗口数量是有限的,所以就需要排队,按次序办理业务
- 提高响应速度
有了消息队列,消息的生产者可以将消息投放到消息队列中并马上返回,而不必阻塞等待消息被消费;而消费者只要从消息队列中提取消息就可以了。这样就大大提高了系统的相应速度。 - 提高系统的稳定性
比如一个订单系统,订单系统分为下订单模块和处理订单模块。处理订单模块挂了的情况下,如果有了消息队列,下单系统还能正常下单,当订单处理系统重启恢复则可以继续处理订单。不会因为一个系统故障而导致整个系统的故障。
2实现
2.1 首先需要添加两个依赖
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-pool2 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.6.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
2.2 生产者线程
import java.util.concurrent.TimeUnit;
import redis.clients.jedis.Jedis;
public class Productor implements Runnable{
public static final String PRODUCE_MESSAGE = "message:apple";
@Override
public void run() {
//Jedis jedis = RedisSingleton.getJedis();
//Jedis jedis = new Jedis("localhost");
int cnt=0;
while(true) {
cnt++;
Jedis jedis = JedisPoolClass.getResource();
long size = jedis.lpush(PRODUCE_MESSAGE, "message"+cnt);
jedis.close();
System.out.println("还有 "+size+" 条消息未处理");
try {
if(size<15) TimeUnit.SECONDS.sleep(1);
else TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
2.3 消费者线程
import java.util.concurrent.TimeUnit;
import redis.clients.jedis.Jedis;
public class Productor implements Runnable{
public static final String PRODUCE_MESSAGE = "message:apple";
@Override
public void run() {
int cnt=0;
//Jedis jedis = RedisSingleton.getJedis();
//Jedis jedis = new Jedis("localhost");
while(true) {
cnt++;
Jedis jedis = JedisPoolClass.getResource();
long size = jedis.lpush(PRODUCE_MESSAGE, "message"+cnt);
jedis.close();
System.out.println("还有 "+size+" 条消息未处理");
try {
if(size<15) TimeUnit.SECONDS.sleep(1);
else TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
2.4 连接池
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
public class JedisPoolClass {
private static JedisPool pool = null;
static {
pool = new JedisPool("localhost", 6379);
}
public static Jedis getResource() {
return pool.getResource();
}
}
2.5 测试代码
private static void ProCusTest() {
ExecutorService es = Executors.newCachedThreadPool();
Productor productor = new Productor();
Customer customer = new Customer("customer1");
Customer customer2 = new Customer("customer2");
es.submit(productor);
es.submit(customer);
es.submit(customer2);
while(Thread.activeCount()<=1) {}
}
}
在进行测试的时候,最开始没有使用连接池,而是使用消费者和生产者代码中注释的部分(注释部分代码是通过单例模式来获取一个Jedis实例)来获取Jedis实例。这样会导致只有一个线程能够实际与Redis服务器交互或者有时候又能正常进行消费-生产过程。不知道是什么原因导致?但是应该是单例的连接导致的,其中具体的原因需要进一步的研究。因此,这里采用池化技术,每次使用完放回连接池,这种情况下能够正常使用。也可以每一次生产或者消费任务都重新获取Jedis连接,用完再释放。