阻塞队列
队列,先进先出的一种数据结构。阻塞队列其实也就是队列的一种特殊情况。举个例子来说明一下吧,我们去餐馆吃饭,一个接一个的下单,这时候就是一个普通的队列,万一这家店生意好,餐馆挤满了人,这时候肯定不能把顾客赶出去,于是餐馆就在旁边设置了一个休息等待区。这就是一个阻塞队列了。我们使用一张图来演示一下:
从上面这张图我们会发现这样的规律:
(1)当阻塞队列为空时,从队列中获取元素的操作将会被阻塞,就好比餐馆休息区没人了,此时不能接纳新的顾客了。换句话,肚子为空的时候也没东西吃。
(2)当阻塞队列满了,往队列添加元素的操作将会被阻塞,好比餐馆的休息区也挤满了,后来的顾客只能走了。
从上面的概念我们类比到线程中去,我们会发现,在某些时候线程可能不能不阻塞,因为CPU内核就那么几个,阻塞现状更加说明了资源的利用率高,换句话来说,阻塞其实是一个好事。
阻塞队列应用最广泛的是生产者和消费者模式。
在多线程中,阻塞的意思是,在某些情况下会挂起线程,一旦条件成熟,被阻塞的线程就会被自动唤醒。
之前线程的wait和notify我们程序员需要自己控制,但有了这个阻塞队列之后我们程序员就不用担心了,阻塞队列会自动管理。
常见的BlockQueue方法
常见的阻塞队列
ArrayblockingQueue:数组构成有界队列
LinkedBlockingQueue:链表组成有界队列,默认Integer.MAX_VALUE
SynchrousQueue:单元素队列
PriorityBlockingQueue:一个优先阻塞队列。每次从队队列里面获取到的都是队列中优先级最高的。
DelayQueue:延时队列,所谓延时队列就是消费线程将会延时一段时间来消费元素。
阻塞队列生产者消费者程序演示:
生产者
class Producer implements Runnable {
private BlockingQueue<Integer> queue;
public Producer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
for (int i = 0; i < 100; i++) {
Thread.sleep(20);
queue.put(i);
System.out.println(Thread.currentThread().getName() + "生产:" + queue.size());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
消费者
class Consumer implements Runnable {
private BlockingQueue<Integer> queue;
public Consumer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
Thread.sleep(new Random().nextInt(1000));
queue.take();
System.out.println(Thread.currentThread().getName() + "消费:" + queue.size());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
测试
public class BlockingQueueTests {
public static void main(String[] args) {
BlockingQueue queue = new ArrayBlockingQueue(10);
new Thread(new Producer(queue)).start();
new Thread(new Consumer(queue)).start();
new Thread(new Consumer(queue)).start();
new Thread(new Consumer(queue)).start();
}
}
Kafka
简介
Kafka是由LinkedIn开发的一个分布式的消息系统,使用Scala编写,它以可水平扩展和高吞吐率而被广泛使用。目前越来越多的开源分布式处理系统如Storm,Spark,Flink都支持与Kafka集成。现在我们的数据实时处理平台也使用到了kafka。现在它已被多家不同类型的公司作为多种类型的数据管道和消息系统使用。
应用
消息系统,日志收集,用户行为跟踪,流式处理
为什么使用消息系统?
(1) 解耦
在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。消息系统在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
(2) 冗余,即消息持久化
有些情况下,处理数据的过程会失败。除非数据被持久化,否则将造成丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。
(3) 扩展性
因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。不需要改变代码、不需要调节参数。扩展就像调大电力按钮一样简单。
(4) 灵活性 & 峰值处理能力
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见;如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
(5) 顺序保证
在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。Kafka保证一个Partition内的消息的有序性。
(6) 缓冲
在任何重要的系统中,都会有需要不同的处理时间的元素。例如,加载一张图片比应用过滤器花费更少的时间。消息队列通过一个缓冲层来帮助任务最高效率的执行———写入队列的处理会尽可能的快速。该缓冲有助于控制和优化数据流经过系统的速度。
为什么是kafka?
高吞吐量:可以满足每秒百万级别消息的生产和消费——生产消费。 存放到硬盘,顺序读取速度快。
消息持久化::由于kafka broker会持久化数据,broker没有内存压力,因此,consumer非常适合采取pull的方式消费数据。把数据进行持久化直到它们已经被完全处理。
高可靠性:分布式,集群部署,一台服务器挂掉,有其他的。
高扩展性:动态扩展,当需要增加broker(服务器)结点时,新增的broker会向zookeeper(管理集群)注册,而producer及consumer会通过zookeeper感知这些变化,并及时作出调整。
负载均衡:通过zookeeper对Producer,Broker,Consumer的动态加入与离开进行管理
消息收发流程:
- 启动Zookeeper及Broker.
- Producer连接Broker后,将消息发布到Broker中指定Topic上(可以指定Patition分区:提升并发写的能力)。
- Broker集群接收到Producer发过来的消息后,将其持久化到硬盘,并将消息该保留指定时长(可配置),而不关注消息是否被消费。
- Consumer连接到Broker后,启动消息泵对Broker进行侦听,当有消息到来时,会触发消息泵循环获取消息,获取消息后Zookeeper将记录该Consumer的消息Offset(索引标记)。
offset:它使得Kafka在消费的过程中即使挂了或者引发再均衡问题重新分配Partation,当下次重新恢复消费时仍然可以知道从哪里开始消费。它好比看一本书中的书签标记,每次通过书签标记(offset)就能快速找到该从哪里开始看(消费)。在多个Consumer并发访问一个partition会有同步锁控制。
Kafka对于offset的处理有两种提交方式:(1) 自动提交偏移量(默认的提交方式) (2) 手动提交(可以灵活地控制offset)
Leader Replica:主副本,负责响应
Follower Replica:从副本
//创建主题
kafak-topic.bat --creat --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
Spring整合Kafak
-
引入依赖 spring-kafka
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
-
配置Kafka server,consumer
# KafkaProperties spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=community-consumer-group spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.auto-commit-interval=3000
-
访问Kafka
-
生产者
@Component class KafkaProducer { @Autowired private KafkaTemplate kafkaTemplate; public void sendMessage(String topic, String content) { kafkaTemplate.send(topic, content); } }
-
消费者(被动)
@Component class KafkaConsumer { @KafkaListener(topics = {"test"}) public void handleMessage(ConsumerRecord record) { System.out.println(record.value()); } }
-
-
测试
@RunWith(SpringRunner.class) @SpringBootTest @ContextConfiguration(classes = CommunityApplication.class) public class KafkaTests { @Autowired private KafkaProducer kafkaProducer; @Test public void testKafka() { kafkaProducer.sendMessage("test", "你好"); kafkaProducer.sendMessage("test", "在吗"); try { Thread.sleep(1000 * 10); } catch (InterruptedException e) { e.printStackTrace(); } } }