自定義InputFormat代碼實現
作者:尹正傑
版權聲明:原創作品,謝絕轉載!否則將追究法律責任。
一.MapReduce並行度決定機制
在說MapTask並行度決定之前,我們要先明確以下幾個概念: 1>.MapTask的並行度決定Map節點的任務處理並發度,進而影響到整個Job的處理速度; 2>.數據塊(Block)是HDFS集群物理上把數據分成塊進行存儲的; 3>.數據切片(split)只是在邏輯上對輸入進行分片,並不會在磁盤上將其切分成片進行存儲; 數據切片與MapTask並行度決定機制: 1>.一個Job的Map階段並行度由客戶端提交在Job時的切片數決定; 2>.每一個數據切片(split)分配一個MapTask並行實例處理; 3>.默認情況下,切片大小等於塊大小(Block size); 4>.切片不考慮數據集整體,而是逐個針對每一個文件單獨切片; 根據上述信息,我這里提出幾個自問自答的問題,由於個人水平有限,如果有對以下問題進行補充的小伙伴歡迎留言。 Q1:有一個300MB的access.log文件已經存儲在HDFS集群中,如果集群的塊大小(Black size)默認是128MB,那么這300MB文件到底是如何存儲的呢? 答:按照默認的塊大小128MB對300MB的文件進行物理切割,那么該文件會被分配成3個Block,分別為128MB,128MB,44MB。 Q2:繼上個問題,300MB文件被切割成128MB,128MB,44MB,前兩個文件占據慢慢的一個Block這個很容易理解,但這個44MB的物理在HDFS集群占據多大的空間呢? 答:當然還是占據44MB的存儲空間啦。HDFS集群默認以塊(Block)進行數據存儲的,會給大家誤以為HDFS存儲44MB的空間就是以默認的128MB空間來存儲44MB,其實不然,我們依舊可以在保存這44MB的塊中追加數據,因為該塊還有充足的空間可以使用喲。就好像一杯700ml的水你只倒了300ml,還有400ml空間你可以繼續裝水。 Q3:假設切片大小設置100MB和使用默認的128MB對300MB的access.log進行數據切分時,並行度是多少呢? 答:很顯然,無論是你以100MB切割300MB文件,還是以默認的128MB塊大小的方式切割300MB文件,最終文件只會被切割成3個切片,也就是只能啟動3個MapTask並行度。 Q4:繼上個問題,以100MB切分300MB的access.log,它有什么優缺點呢? 答:以100MB切分的方案會產生3個MapTask,每個MapTask分配的數據相對均勻,因此在整個集群網絡IO不繁忙的情況下理論上說處理速度會很快。但是在集群網絡IO比較繁忙的情況下可能效率並不高。究其原因是因為HDFS默認以128MB進行塊存儲的,當以100MB進行切割時,第一個快會有28MB的數據需要網絡IO傳輸,而第二個塊也需要28MB的數據需要網絡IO傳輸,第三個塊是44MB,如果就在第三個塊所在節點運行任務的話會涉及到56MB的網絡IO傳輸。如果存儲第三個塊的節點比較繁忙時,可能該節點的44MB也需要進行網絡傳輸,即可能會耗費100MB的網絡IO的傳輸。 Q5:繼上個問題,以默認的128MB切分300MB的access.log,它又有什么優缺點呢? 答:缺點一目了然,就是3給塊切割的數據不均勻,前兩個塊默認都是128MB自然就在其所在節點運行,而第三個塊只有44MB的數據,和可能出現第三個塊所在的節點MapTask任務提前結束,選喲花費大量時間等待前2個塊所在節點的運行結果。但優點相比以100MB切割方案要更節省網絡IO的傳輸。
Q6:如何理解"切片是不考慮數據集整體,而是逐個針對每一個文件單獨切片"這句話呢?
答:我們要計算數據就必須得提供數據源,數據源可以是單個文件也可以是個目錄,如果我們提供的是一個目錄,那么該目錄可能會有多個文件,比如輸入目錄有3個文件,文本大小分別為300MB,59MB,32MB。無論是以100MB還是128MB切分,都會存在5個切片,因為后兩個文件(59MB和32MB)雖然很小,但是也會被單獨分一個切片。並不會將后面兩個切片合並成一個切片的,因為它們是兩個不同的文件。
二.官方提供的InputFormat概述
InputFormat完成了Input到Mapper的數據傳遞,它主要負責將數據源變成K,V類型的過程。
InputFormat我們只需要關注兩個階段,一個是由文件變成切片的過程,另一個是由切片到K,V類型的過程。
FileInputFormat常見的接口實現類包括:TextInputFormat,KeyValueTextInputFormat,NLineInputFormat,CombineTextInputFormat,FixedLengthInputFormat,SequenceFileInputFormat和自定義InputFormat等。
1>.TextInputFormat切片機制
TextInputFormat是FileInputFormat繼承類。按行讀取每條數據。 切片方法: 默認使用它的父類FileInputFormat的切片方法。 FileInputFormat的切片有以下規則: 1>.塊大小(Block size)為切片大小; 2>.按照文件單獨切片; 3>.依據1.1被的切片大小來判斷是否需要切片; K,V方法: 使用LineRecordReader方法將切片數據變成KV值類型。 鍵是存儲該行在整個文件中的起始字節偏移量,LongWritable類型。值是這行的內容,不包括任何終止符(換行符和回車符),Text類型。 舉例如下: 假設以下是一個切片的開頭部分內容: I have a pen I have an apple Ah apple pen I have a pen I have a pineapple Ah pineapple pen ...... 如上所示,我們使用TextInputFormat來表示前6條數據: (0,I have a pen) (13,I have an apple) (29,Ah apple pen) (42,I have a pen) (55,I have a pineapple) (74,Ah pineapple pen) ......
2>.KeyValueTextInputFormat切片機制
KeyValueTextInputFormat也繼承自FileInputFormat類。按行讀取每條數據。 切片方法: 默認使用它的父類FileInputFormat的切片方法。 FileInputFormat的切片有以下規則: 1>.塊大小(Block size)為切片大小; 2>.按照文件單獨切片; 3>.依據1.1被的切片大小來判斷是否需要切片; K,V方法: 使用KeyVauleLineRecondReader方法將切片數據變成KV值類型,可以通過在驅動類中設置conf.set(KeyVauleLineRecondReader.KEY_VALUE_SEPERATOR,"\t");來指定分隔符。 每一行為一條記錄,被分隔符分割為Key,Value。,默認以tab("\t")作為分隔符。此時的鍵是每行排在制表符之前的Text序列。 舉例如下: 假設以下是一個切片的開頭部分內容: line1\tI have a pen line2\tI have an apple line3\tAh apple pen 1ine4\tI have a pen line5\tI have a pineapple line6\tAh pineapple pen ...... 如上所示,我們使用TextInputFormat來表示前6條數據: (line1,I have a pen) (line2,I have an apple) (line3,Ah apple pen) (line4,I have a pen) (line5,I have a pineapple) (line6,Ah pineapple pen) ......
3>.NLineInputFormat切片機制
NLineInputFormat也繼承自FileInputFormat類。 切片方法: 每個map進程處理的InputSplit不再按Block去划分,而是按NLineInputFormat指定的自定義行數N來划分。即輸入文件的總行數/N=切片數,如果不整除,切片數=商+1。 K,V方法: 這里的鍵和值與TextInputFormat生成的一樣。 即使用LineRecordReader方法將切片數據變成KV值類型。 舉例如下: 假設以下是一個切片的開頭部分內容: I have a pen I have an apple Ah apple pen I have a pen I have a pineapple Ah pineapple pen ...... 如上所示,如果N是3,則每個輸入分配包含3行,以每3行開啟一個MapTask 第一個MapTask內容為: (0,I have a pen) (13,I have an apple) (29,Ah apple pen) 第二個MapTask內容為: (42,I have a pen) (55,I have a pineapple) (74,Ah pineapple pen) 第M個MapTask內容為: ......
4>.CombineTextInputFormat切片機制
CombineTextInputFormat也繼承自FileInputFormat類。 切片方法: 通過之前的關於TextInputFormat的介紹,TextInputFormat切片機制默認使用它的父類FileInputFormat的切片方法,是對任務按文件規划切片,不管文件多小,都會是一個單獨的切片,都會交給一個MapTask,這樣如果有大量小文件,就會產生大量的MapTask,處理效率極其低下。 CombineTextInputFormat用於小文件過多的場景,它可以將多個小文件從邏輯上規划到一個切片中,這樣,多個小文件就可以交給一個MapTask處理。 我們可以通過"CombineTextInputFormat.setMaxInputSplitSize(job, 10485760);"來設置虛擬存儲切片最大值。10485760=1024*1024*10,即10MB. 切片過程: 1>.判斷虛擬存儲的文件大小是否大於setMaxInputSplitSize值,大於等於則單獨形成一個切片; 2>.如果不大於則跟下一個虛擬存儲文件進行合並,共同形成一個頭切片。 K,V方法: 它產生KV類型方式是使用了CombineFileRecordReader,它讀取到的數據方式和LineRecordReader基本一樣,但並沒有使用默認的LineRecordReader方法,因為他處理的源數據是跨文件的。 溫馨提示: 虛擬存儲切片最大值設置最好根據實際的小文件大小情況來設置具體的值。 我們知道HDFS集群並不擅長處理小文件,因此我建議大家在生產環境中應該盡量避免創建小文件,生產環境中如果真的有很多小文件建議大家打har包進行處理。 生產環境中解決小文件的根本辦法就是不要生成小文件。 舉例如下: 假設虛擬存儲切片最大值為10485760,即10M。以下是有一批小文件,其大小分別為8.7MB,28MB,12MB,2.12MB,18MB。 文件 大小 虛擬存儲過程 access.log 8.7MB 8.7MB<10MB,因此僅划分一塊。 ftp.log 28MB 21MB>10MB,但小於3*10MB,因此划分3塊,一塊10MB,第二塊9MB,第三塊9MB。 sumba.log 12MB 12MB>10MB,但小於2*10MB,因此划分2塊,一塊6MB,另一塊6MB。 error.log 2.12MB 2.12MB<10MB,因此僅划分為1塊。 nfs.log 18MB 18MB>10MB,但是小於2*10MB,因此划分2塊,一塊9MB,另一塊9MB。 如上所示,access.log,ftp.log,sumba.log,error.log,nfs.log被切割成多個小文件,接下來就按照切片過程將這些小文件合並成對應的切片,如下所示,共計5個切片。 第一個切片: 8.7MB 10MB 第二個切片: 9MB 9MB 第三個切片: 6MB 6MB 第四個切片: 2.12MB 9MB 第五個切片: 9MB
5>.FixedLengthInputFormat切片機制
FixedLengthInputFormat也繼承自FileInputFormat類。 切片方法: 默認使用它的父類FileInputFormat的切片方法。 FileInputFormat的切片有以下規則: 1>.塊大小(Block size)為切片大小; 2>.按照文件單獨切片; 3>.依據1.1被的切片大小來判斷是否需要切片; K,V方法: 它讀取數據時一次性讀取一定長度的文件,使用FixedLengthRecordReader。
6>.SequenceFileInputFormat切片機制
SequenceFileInputFormat也繼承自FileInputFormat類。 切片方法: 默認使用它的父類FileInputFormat的切片方法。 FileInputFormat的切片有以下規則: 1>.塊大小(Block size)為切片大小; 2>.按照文件單獨切片; 3>.依據1.1被的切片大小來判斷是否需要切片; SequenceFileInputFormat的使用場景: 一般情況下,SequenceFileInputFormat和SequenceFileOutputFormat結合使用。 假設在2個MapReduce任務中,B任務依賴於A任務的輸出結果,A任務執行任務后需要將數據落地,此時我們可以使用普通文本落地,但是落地到本地文件后會占用更多的空間,在B任務進行出入時還得再次處理數據,這樣效率很低下。 此時,如果A任務使用SequenceFileOutputFormat輸出數據,B任務使用SequenceFileInputFormat輸入數據。它們就可以實現完美的對接,而且在這個對接過程中連類型都可以保存下來,也就是說不需要像文本那樣輸入時再次處理數據。
7>.自定義InputFormat
在企業開發中,Hadoop框架自帶的InputFormat類型不能滿足所有應用場景,需要自定義InputFormat來解決實際問題。
自定義InputFormat步驟如下:
1>.自定義一個類繼承自FileInputFormat;
2>.改寫RecordReader,實現一次讀取一個完整封裝為KV類型;
3>.在輸出時使用SequenceFileOutPutFormat輸出合並文件。
三.自定義InputFormat代碼實現
1>.需求分析
無論是HDFS還是MapReduce,在處理小文件時效率非常低,但又難免面臨大量小文件的場景。此時,就需要相應的解決方案。我們可以自定義InputFormat實現小文件合並。
將多個小文件合並成一個SequenceFile文件(SequenceFile文件是Hadoop用來存儲二進制形式的Key-Value對的文件格式),SequenceFile里面存儲着多個小文件,Key為:文件路徑+文件名稱,Value為文件內容。
下面是我給定的5個小文件,現在需要咱們將其合並為一個文件。
This tutorial assumes you are starting fresh and have no existing Kafka or ZooKeeper data. Since Kafka console scripts are different for Unix-based and Windows platforms, on Windows platforms use bin\windows\ instead of bin/, and change the script extension to .bat. A streaming platform has three key capabilities: Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system. Store streams of records in a fault-tolerant durable way. Process streams of records as they occur. Kafka is generally used for two broad classes of applications: Building real-time streaming data pipelines that reliably get data between systems or applications Building real-time streaming applications that transform or react to the streams of data To understand how Kafka does these things, let's dive in and explore Kafka's capabilities from the bottom up. First a few concepts: Kafka is run as a cluster on one or more servers that can span multiple datacenters. The Kafka cluster stores streams of records in categories called topics. Each record consists of a key, a value, and a timestamp. Kafka has four core APIs: The Producer API allows an application to publish a stream of records to one or more Kafka topics. The Consumer API allows an application to subscribe to one or more topics and process the stream of records produced to them. The Streams API allows an application to act as a stream processor, consuming an input stream from one or more topics and producing an output stream to one or more output topics, effectively transforming the input streams to output streams. The Connector API allows building and running reusable producers or consumers that connect Kafka topics to existing applications or data systems. For example, a connector to a relational database might capture every change to a table. In Kafka the communication between the clients and the servers is done with a simple, high-performance, language agnostic TCP protocol. This protocol is versioned and maintains backwards compatibility with older version. We provide a Java client for Kafka, but clients are available in many languages.
The Apache Hive ™ data warehouse software facilitates reading, writing, and managing large datasets residing in distributed storage using SQL. Structure can be projected onto data already in storage. A command line tool and JDBC driver are provided to connect users to Hive.
Apache Hive is an open source project run by volunteers at the Apache Software Foundation. Previously it was a subproject of Apache® Hadoop®, but has now graduated to become a top-level project of its own. We encourage you to learn about the project and contribute your expertise.
The Apache Hadoop software library is a framework that allows for the distributed processing of large data sets across clusters of computers using simple programming models. It is designed to scale up from single servers to thousands of machines, each offering local computation and storage. Rather than rely on hardware to deliver high-availability, the library itself is designed to detect and handle failures at the application layer, so delivering a highly-available service on top of a cluster of computers, each of which may be prone to failures. This is the first stable release of Apache Hadoop 2.10 line. It contains 362 bug fixes, improvements and enhancements since 2.9.0. Users are encouraged to read the overview of major changes since 2.9.0. For details of 362 bug fixes, improvements, and other enhancements since the previous 2.9.0 release, please check release notes and changelog detail the changes since 2.9.0. This is the third stable release of Apache Hadoop 3.1 line. It contains 246 bug fixes, improvements and enhancements since 3.1.2. Users are encouraged to read the overview of major changes since 3.1.2. For details of the bug fixes, improvements, and other enhancements since the previous 3.1.2 release, please check release notes and changelog
2>.自定義RecordReader類
package cn.org.yinzhengjie.inputformat; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.io.IOException; /** * 咱們自定義的RecordReader只處理一個文件,把這個文件度成一個KV值。即一次性把一個小文件全部讀完。 */ public class WholeFileRecorder extends RecordReader<Text,BytesWritable> { //notRead初始值為true,表示還沒有讀文件 private boolean notRead = true; //定義key類型 private Text key = new Text(); //定義value類型 private BytesWritable value = new BytesWritable(); //定義一個輸入流對象 private FSDataInputStream inputStream; //定義一個FileSplit對象 private FileSplit fs; /** * 初始化方法,框架會在初始化時調用一次。 * @param split :定義要讀取的記錄范圍的拆分 * @param context:關於這項任務的信息 * @throws IOException * @throws InterruptedException */ public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { //由於咱們的輸入流繼承自FileInputFormat,因此我們需要將InputSplit強制類型轉換為FileSplit,即轉換切片類型到文件切片。 fs = (FileSplit)split;//FileSplit是InputSplit的一個子類,希望你還沒有忘記多態的知識點。 //通過切片獲取文件路徑 Path path = fs.getPath(); //通過path獲取文件系統 FileSystem fileSystem = path.getFileSystem(context.getConfiguration()); //開一個輸入流,別忘記在close方法中釋放輸入流資源 inputStream = fileSystem.open(path); } /** * 讀取下一個鍵,值對。 * @return :如果讀到了,返回true,讀完了就返回false。 * @throws IOException * @throws InterruptedException */ public boolean nextKeyValue() throws IOException, InterruptedException { if (notRead){ //具體讀文件的過程 key.set(fs.getPath().toString()); //讀key byte[] buf = new byte[(int) fs.getLength()];//生成一個跟文件一樣長的字節數組 inputStream.read(buf);//一次性將文件內容讀取 value.set(buf,0,buf.length);//讀取value notRead = false; //notRead設置為false表示已經讀取 return true; //在第一次讀取時返回true } return false; } /** * 獲取當前讀到的key * @return * @throws IOException * @throws InterruptedException */ public Text getCurrentKey() throws IOException, InterruptedException { return key; } /** * 獲取當前讀到的value * @return:當前value * @throws IOException * @throws InterruptedException */ public BytesWritable getCurrentValue() throws IOException, InterruptedException { return value; } /** * 記錄讀取器通過其數據的當前進度。 * @return:返回一個0.0~1.0之間的小數 * @throws IOException * @throws InterruptedException */ public float getProgress() throws IOException, InterruptedException { return notRead ? 0 : 1; } /** * 關閉記錄讀取器。 * @throws IOException */ public void close() throws IOException { //使用hadoop為咱們提供的釋放流的工具 IOUtils.closeStream(inputStream); } }
3>.自定義WholeFileInputFormat類
package cn.org.yinzhengjie.inputformat; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import java.io.IOException; /** 自定義InputFormat,我們為了偷懶不寫切片方法,就是用FileInputFormat的默認切片方法。 存儲的形式如下: Key為:文件路徑+文件名稱,Value為文件內容。 因此指定key的泛型為Text,指定value為BytesWritable(負責存儲一段二進制數值的)。 */ public class WholeFileInputFormat extends FileInputFormat<Text,BytesWritable> { /** * 咱們自定義的輸入文件不可再被切割。因此返回false即可。因為切割該文件后可能會造成數據損壞。 */ @Override protected boolean isSplitable(JobContext context, Path filename) { return false; } @Override public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) { //返回咱們自定義的RecordReader對象 return new WholeFileRecorder(); } }
4>.自定義Driver類
package cn.org.yinzhengjie.inputformat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import java.io.IOException; public class WholeFileDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //獲取一個Job實例 Job job = Job.getInstance(new Configuration()); //設置我們的當前Driver類路徑(classpath) job.setJarByClass(WholeFileDriver.class); //設置自定義的Mapper程序的輸出類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(BytesWritable.class); //設置自定義的Reducer程序的輸出類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(BytesWritable.class); //設置咱們自定義的InputFormat類路徑(classpath) job.setInputFormatClass(WholeFileInputFormat.class); //設置輸出格式是一個SequenceFile類型文件 job.setOutputFormatClass(SequenceFileOutputFormat.class); //設置輸入數據 FileInputFormat.setInputPaths(job,new Path(args[0])); //設置輸出數據 FileOutputFormat.setOutputPath(job,new Path(args[1])); //提交我們的Job,返回結果是一個布爾值 boolean result = job.waitForCompletion(true); //如果程序運行成功就打印"Task executed successfully!!!" if(result){ System.out.println("Task executed successfully!!!"); }else { System.out.println("Task execution failed..."); } //如果程序是正常運行就返回0,否則就返回1 System.exit(result ? 0 : 1); } }
5>.查看輸出結果
SEQorg.apache.hadoop.io.Text"org.apache.hadoop.io.BytesWritable 7?€*櫘G4蹀 + &%file:/E:/yinzhengjie/input/hadoop.txt The Apache Hadoop software library is a framework that allows for the distributed processing of large data sets across clusters of computers using simple programming models. It is designed to scale up from single servers to thousands of machines, each offering local computation and storage. Rather than rely on hardware to deliver high-availability, the library itself is designed to detect and handle failures at the application layer, so delivering a highly-available service on top of a cluster of computers, each of which may be prone to failures. This is the first stable release of Apache Hadoop 2.10 line. It contains 362 bug fixes, improvements and enhancements since 2.9.0. Users are encouraged to read the overview of major changes since 2.9.0. For details of 362 bug fixes, improvements, and other enhancements since the previous 2.9.0 release, please check release notes and changelog detail the changes since 2.9.0. This is the third stable release of Apache Hadoop 3.1 line. It contains 246 bug fixes, improvements and enhancements since 3.1.2. Users are encouraged to read the overview of major changes since 3.1.2. For details of the bug fixes, improvements, and other enhancements since the previous 3.1.2 release, please check release notes and changelog X $#file:/E:/yinzhengjie/input/hive.txt 0The Apache Hive 鈩?data warehouse software facilitates reading, writing, and managing large datasets residing in distributed storage using SQL. Structure can be projected onto data already in storage. A command line tool and JDBC driver are provided to connect users to Hive. Apache Hive is an open source project run by volunteers at the Apache Software Foundation. Previously it was a subproject of Apache廬 Hadoop廬, but has now graduated to become a top-level project of its own. We encourage you to learn about the project and contribute your expertise.7?€*櫘G4蹀 ? %$file:/E:/yinzhengjie/input/kafka.txt €This tutorial assumes you are starting fresh and have no existing Kafka or ZooKeeper data. Since Kafka console scripts are different for Unix-based and Windows platforms, on Windows platforms use bin\windows\ instead of bin/, and change the script extension to .bat. A streaming platform has three key capabilities: Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system. Store streams of records in a fault-tolerant durable way. Process streams of records as they occur. Kafka is generally used for two broad classes of applications: Building real-time streaming data pipelines that reliably get data between systems or applications Building real-time streaming applications that transform or react to the streams of data To understand how Kafka does these things, let's dive in and explore Kafka's capabilities from the bottom up. First a few concepts: Kafka is run as a cluster on one or more servers that can span multiple datacenters. The Kafka cluster stores streams of records in categories called topics. Each record consists of a key, a value, and a timestamp. Kafka has four core APIs: The Producer API allows an application to publish a stream of records to one or more Kafka topics. The Consumer API allows an application to subscribe to one or more topics and process the stream of records produced to them. The Streams API allows an application to act as a stream processor, consuming an input stream from one or more topics and producing an output stream to one or more output topics, effectively transforming the input streams to output streams. The Connector API allows building and running reusable producers or consumers that connect Kafka topics to existing applications or data systems. For example, a connector to a relational database might capture every change to a table. In Kafka the communication between the clients and the servers is done with a simple, high-performance, language agnostic TCP protocol. This protocol is versioned and maintains backwards compatibility with older version. We provide a Java client for Kafka, but clients are available in many languages.

