阻塞隊列
隊列,先進先出的一種數據結構。阻塞隊列其實也就是隊列的一種特殊情況。舉個例子來說明一下吧,我們去餐館吃飯,一個接一個的下單,這時候就是一個普通的隊列,萬一這家店生意好,餐館擠滿了人,這時候肯定不能把顧客趕出去,於是餐館就在旁邊設置了一個休息等待區。這就是一個阻塞隊列了。我們使用一張圖來演示一下:
從上面這張圖我們會發現這樣的規律:
(1)當阻塞隊列為空時,從隊列中獲取元素的操作將會被阻塞,就好比餐館休息區沒人了,此時不能接納新的顧客了。換句話,肚子為空的時候也沒東西吃。
(2)當阻塞隊列滿了,往隊列添加元素的操作將會被阻塞,好比餐館的休息區也擠滿了,后來的顧客只能走了。
從上面的概念我們類比到線程中去,我們會發現,在某些時候線程可能不能不阻塞,因為CPU內核就那么幾個,阻塞現狀更加說明了資源的利用率高,換句話來說,阻塞其實是一個好事。
阻塞隊列應用最廣泛的是生產者和消費者模式。
在多線程中,阻塞的意思是,在某些情況下會掛起線程,一旦條件成熟,被阻塞的線程就會被自動喚醒。
之前線程的wait和notify我們程序員需要自己控制,但有了這個阻塞隊列之后我們程序員就不用擔心了,阻塞隊列會自動管理。
常見的BlockQueue方法
常見的阻塞隊列
ArrayblockingQueue:數組構成有界隊列
LinkedBlockingQueue:鏈表組成有界隊列,默認Integer.MAX_VALUE
SynchrousQueue:單元素隊列
PriorityBlockingQueue:一個優先阻塞隊列。每次從隊隊列里面獲取到的都是隊列中優先級最高的。
DelayQueue:延時隊列,所謂延時隊列就是消費線程將會延時一段時間來消費元素。
阻塞隊列生產者消費者程序演示:
生產者
class Producer implements Runnable {
private BlockingQueue<Integer> queue;
public Producer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
for (int i = 0; i < 100; i++) {
Thread.sleep(20);
queue.put(i);
System.out.println(Thread.currentThread().getName() + "生產:" + queue.size());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
消費者
class Consumer implements Runnable {
private BlockingQueue<Integer> queue;
public Consumer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
Thread.sleep(new Random().nextInt(1000));
queue.take();
System.out.println(Thread.currentThread().getName() + "消費:" + queue.size());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
測試
public class BlockingQueueTests {
public static void main(String[] args) {
BlockingQueue queue = new ArrayBlockingQueue(10);
new Thread(new Producer(queue)).start();
new Thread(new Consumer(queue)).start();
new Thread(new Consumer(queue)).start();
new Thread(new Consumer(queue)).start();
}
}
Kafka
簡介
Kafka是由LinkedIn開發的一個分布式的消息系統,使用Scala編寫,它以可水平擴展和高吞吐率而被廣泛使用。目前越來越多的開源分布式處理系統如Storm,Spark,Flink都支持與Kafka集成。現在我們的數據實時處理平台也使用到了kafka。現在它已被多家不同類型的公司作為多種類型的數據管道和消息系統使用。
應用
消息系統,日志收集,用戶行為跟蹤,流式處理
為什么使用消息系統?
(1) 解耦
在項目啟動之初來預測將來項目會碰到什么需求,是極其困難的。消息系統在處理過程中間插入了一個隱含的、基於數據的接口層,兩邊的處理過程都要實現這一接口。這允許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。
(2) 冗余,即消息持久化
有些情況下,處理數據的過程會失敗。除非數據被持久化,否則將造成丟失。消息隊列把數據進行持久化直到它們已經被完全處理,通過這一方式規避了數據丟失風險。許多消息隊列所采用的"插入-獲取-刪除"范式中,在把一個消息從隊列中刪除之前,需要你的處理系統明確的指出該消息已經被處理完畢,從而確保你的數據被安全的保存直到你使用完畢。
(3) 擴展性
因為消息隊列解耦了你的處理過程,所以增大消息入隊和處理的頻率是很容易的,只要另外增加處理過程即可。不需要改變代碼、不需要調節參數。擴展就像調大電力按鈕一樣簡單。
(4) 靈活性 & 峰值處理能力
在訪問量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量並不常見;如果為以能處理這類峰值訪問為標准來投入資源隨時待命無疑是巨大的浪費。使用消息隊列能夠使關鍵組件頂住突發的訪問壓力,而不會因為突發的超負荷的請求而完全崩潰。
(5) 順序保證
在大多使用場景下,數據處理的順序都很重要。大部分消息隊列本來就是排序的,並且能保證數據會按照特定的順序來處理。Kafka保證一個Partition內的消息的有序性。
(6) 緩沖
在任何重要的系統中,都會有需要不同的處理時間的元素。例如,加載一張圖片比應用過濾器花費更少的時間。消息隊列通過一個緩沖層來幫助任務最高效率的執行———寫入隊列的處理會盡可能的快速。該緩沖有助於控制和優化數據流經過系統的速度。
為什么是kafka?
高吞吐量:可以滿足每秒百萬級別消息的生產和消費——生產消費。 存放到硬盤,順序讀取速度快。
消息持久化::由於kafka broker會持久化數據,broker沒有內存壓力,因此,consumer非常適合采取pull的方式消費數據。把數據進行持久化直到它們已經被完全處理。
高可靠性:分布式,集群部署,一台服務器掛掉,有其他的。
高擴展性:動態擴展,當需要增加broker(服務器)結點時,新增的broker會向zookeeper(管理集群)注冊,而producer及consumer會通過zookeeper感知這些變化,並及時作出調整。
負載均衡:通過zookeeper對Producer,Broker,Consumer的動態加入與離開進行管理
消息收發流程:
- 啟動Zookeeper及Broker.
- Producer連接Broker后,將消息發布到Broker中指定Topic上(可以指定Patition分區:提升並發寫的能力)。
- Broker集群接收到Producer發過來的消息后,將其持久化到硬盤,並將消息該保留指定時長(可配置),而不關注消息是否被消費。
- Consumer連接到Broker后,啟動消息泵對Broker進行偵聽,當有消息到來時,會觸發消息泵循環獲取消息,獲取消息后Zookeeper將記錄該Consumer的消息Offset(索引標記)。
offset:它使得Kafka在消費的過程中即使掛了或者引發再均衡問題重新分配Partation,當下次重新恢復消費時仍然可以知道從哪里開始消費。它好比看一本書中的書簽標記,每次通過書簽標記(offset)就能快速找到該從哪里開始看(消費)。在多個Consumer並發訪問一個partition會有同步鎖控制。
Kafka對於offset的處理有兩種提交方式:(1) 自動提交偏移量(默認的提交方式) (2) 手動提交(可以靈活地控制offset)
Leader Replica:主副本,負責響應
Follower Replica:從副本
//創建主題
kafak-topic.bat --creat --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
Spring整合Kafak
-
引入依賴 spring-kafka
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
-
配置Kafka server,consumer
# KafkaProperties spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=community-consumer-group spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.auto-commit-interval=3000
-
訪問Kafka
-
生產者
@Component class KafkaProducer { @Autowired private KafkaTemplate kafkaTemplate; public void sendMessage(String topic, String content) { kafkaTemplate.send(topic, content); } }
-
消費者(被動)
@Component class KafkaConsumer { @KafkaListener(topics = {"test"}) public void handleMessage(ConsumerRecord record) { System.out.println(record.value()); } }
-
-
測試
@RunWith(SpringRunner.class) @SpringBootTest @ContextConfiguration(classes = CommunityApplication.class) public class KafkaTests { @Autowired private KafkaProducer kafkaProducer; @Test public void testKafka() { kafkaProducer.sendMessage("test", "你好"); kafkaProducer.sendMessage("test", "在嗎"); try { Thread.sleep(1000 * 10); } catch (InterruptedException e) { e.printStackTrace(); } } }