hadoop生態--Kafka


分布式流處理平台,是一個分布式消息中間件系統。

一、jms

1、什么是jms
java message service(java 消息服務):java程序需要異步發送消息的時候使用的服務。
用於異構系統之間的通信。
middleware,中間件,提供消息服務,部件之間的交互通過中間件完成,部件之間互為生產者和消費者,
 
3、什么時候可以用到java消息機制?
答:(1)異構系統集成,整合現有資源,提高資源的利用率
       (2)異步請求處理,減輕或消除系統瓶頸,提高用戶生產率和系統的整體可伸縮性
       (3)組件解偶,增加系統的靈活性
 
4、消息傳送的兩種模型
不同的消息系統提供不同的消息路由模式,通用的目的地有兩個:隊列和主題
點對點模型(point-2-pint<P2P>)
這個模式中涉及到的角色:發送者、接收者、消息隊列(遠程隊列)
客戶端通過隊列(queue)這個虛擬通道來同步和異步發送、接收消息,發送到隊列的消息只能被一個接收者所接收,即使有多個消費者時也只能有一個消費者處理消息。生產者和消費者之間沒有依賴,生產者只需要把消息丟到遠程隊列;消費者從隊列獲取消息,即隊列中的消息只有在被消費或者超時時才會被銷毀。
 
發布/訂閱模型(publish/Subsrcibe<pub/sub>)
這個模式中涉及到的角色:發布者、訂閱者、主題Topic(模板隊列+動態隊列)
客戶端發送消息到一個名為主題(topic)的虛擬通道中,每個訂閱該主題的消費者都會接收到每條消息的一個副本。
一個生產者對應多個消費者,即一條消息可以被多次消費;
默認情況下,生產者生產消息時,消費者必須在線消費,即發布主題時,訂閱者必須在線監聽;
為了接觸訂閱模式中消費者與主題的時間耦合,jms提供持久化訂閱,針對某些特定的訂閱者,topic會緩存消息至訂閱者消費或消息超時。
 
queue: 只能有一個消費者,p2p模式(點對點),一個消息進來只能有一個消費者
topic:發布訂閱模式(pub-sub,主題模式),消息發送給主題,一個消息多個消費者

二、kafaka

2-1幾個概念

consumer      //消息消費者            
consumer group      //消費者組
broker    kafka server,kafka服務器
topic      //主題,副本數,分區
partitions  //一個主題可以划分多個 分區.
kafka的集群通過zk協調,zookeeper      //hadoop namenoade + RM HA | hbase | kafka
 
(1) Broker:消息中間件處理節點,一個Kafka節點就是一個broker,一個或者多個Broker可以組成一個Kafka集群;
(2) Topic:主題是對一組消息的抽象分類,比如例如page view日志、click日志等都可以以topic的形式進行抽象划分類別。在物理上,不同Topic的消息分開存儲,邏輯上一個Topic的消息雖然保存於一個或多個broker上但用戶只需指定消息的Topic即可使得數據的生產者或消費者不必關心數據存於何處;
(3) Partition:每個主題又被分成一個或者若干個分區(Partition)。每個分區在本地磁盤上對應一個文件夾,分區命名規則為主題名稱后接“—”連接符,之后再接分區編號,分區編號從0開始至分區總數減-1;
(4) LogSegment:每個分區又被划分為多個日志分段(LogSegment)組成,日志段是Kafka日志對象分片的最小單位;LogSegment算是一個邏輯概念,對應一個具體的日志文件(“.log”的數據文件)和兩個索引文件(“.index”和“.timeindex”,分別表示偏移量索引文件和消息時間戳索引文件)組成;
(5) Offset:每個partition中都由一系列有序的、不可變的消息組成,這些消息被順序地追加到partition中。每個消息都有一個連續的序列號稱之為offset—偏移量,用於在partition內唯一標識消息(並不表示消息在磁盤上的物理位置);
(6) Message:消息是Kafka中存儲的最小最基本的單位,即為一個commit log,由一個固定長度的消息頭和一個可變長度的消息體組成;
-----參考------
來源:簡書
作者:癲狂俠
鏈接:https://www.jianshu.com/p/3e54a5a39683

2-2   Kafka的使用場景:

兩個需要通信的系統中,如果數據生產者和消費者之間的速度差異大,可以通過kafka作為中間件,削峰。
 
- 日志收集:一個公司可以用Kafka可以收集各種服務的log,通過kafka以統一接口服務的方式開放給各種consumer,例如hadoop、Hbase、Solr等。
- 消息系統:解耦和生產者和消費者、緩存消息等。
- 用戶活動跟蹤:Kafka經常被用來記錄web用戶或者app用戶的各種活動,如瀏覽網頁、搜索、點擊等活動,這些活動信息被各個服務器發布到kafka的topic中,然后訂閱者通過訂閱這些topic來做實時的監控分析,或者裝載到hadoop、數據倉庫中做離線分析和挖掘。
- 運營指標:Kafka也經常用來記錄運營監控數據。包括收集各種分布式應用的數據,生產各種操作的集中反饋,比如報警和報告。
- 流式處理:比如spark streaming和storm
- 事件源

三、部署及使用

3-1下載,解壓,放到集群所有機器上。
3-2配置$kafka_home/config/server.properties:
修改其中broker.id     log.dir     zookeeper.connect等。<參考: https://blog.csdn.net/lizhitao/article/details/25667831>
3-3 啟動kafka
進入kafka目錄,敲入命令
 
創建主題:
./kafka-topics.sh --create --zookeeper localhost:2181,localhost:2182,localhost:2183 --replication-factor 1 --partitions 1 --topic test1
查看主題:
./kafka-topics.sh --list --zookeeper localhost:2181,localhost:2182,localhost:2183

四、Java API

https://www.cnblogs.com/zhangchao0515/p/9502843.html

4-1 生產者

 

4-2 消費者

消息消費者
--------------------
    /**
     * 消費者
     */
    @Test
    public void testConumser(){
        //
        Properties props = new Properties();
        props.put("zookeeper.connect", "s202:2181");
        props.put("group.id", "g3");
        props.put("zookeeper.session.timeout.ms", "500");
        props.put("zookeeper.sync.time.ms", "250");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "smallest");
        //創建消費者配置對象
        ConsumerConfig config = new ConsumerConfig(props);
        //
        Map<String, Integer> map = new HashMap<String, Integer>();
        map.put("test3", new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> msgs = Consumer.createJavaConsumerConnector(new ConsumerConfig(props)).createMessageStreams(map);
        List<KafkaStream<byte[], byte[]>> msgList = msgs.get("test3");
        for(KafkaStream<byte[],byte[]> stream : msgList){
            ConsumerIterator<byte[],byte[]> it = stream.iterator();
            while(it.hasNext()){
                byte[] message = it.next().message();
                System.out.println(new String(message));
            }
        }
    }

 

 

 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM