戳更多文章:
簡介
Flink-kafka-connector用來做什么?
Kafka中的partition機制和Flink的並行度機制結合,實現數據恢復
Kafka可以作為Flink的source和sink
任務失敗,通過設置kafka的offset來恢復應用
kafka簡單介紹
關於kafka,我們會有專題文章介紹,這里簡單介紹幾個必須知道的概念。
1.生產者(Producer)
顧名思義,生產者就是生產消息的組件,它的主要工作就是源源不斷地生產出消息,然后發送給消息隊列。生產者可以向消息隊列發送各種類型的消息,如狹義的字符串消息,也可以發送二進制消息。生產者是消息隊列的數據源,只有通過生產者持續不斷地向消息隊列發送消息,消息隊列才能不斷處理消息。
2.消費者(Consumer)
所謂消費者,指的是不斷消費(獲取)消息的組件,它獲取消息的來源就是消息隊列(即Kafka本身)。換句話說,生產者不斷向消息隊列發送消息,而消費者則不斷從消息隊列中獲取消息。
3.主題(Topic)
主題是Kafka中一個極為重要的概念。首先,主題是一個邏輯上的概念,它用於從邏輯上來歸類與存儲消息本身。多個生產者可以向一個Topic發送消息,同時也可以有多個消費者消費一個Topic中的消息。Topic還有分區和副本的概念。Topic與消息這兩個概念之間密切相關,Kafka中的每一條消息都歸屬於某一個Topic,而一個Topic下面可以有任意數量的消息。
kafka簡單操作
啟動zk:nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
啟動server: nohup bin/kafka-server-start.sh config/server.properties &
創建一個topic:bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看topic:bin/kafka-topics.sh --list --zookeeper localhost:2181
發送數據:bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
啟動一個消費者:bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
刪除topic: bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic topn
Flink消費Kafka注意事項
-
setStartFromGroupOffsets()【默認消費策略】
默認讀取上次保存的offset信息
如果是應用第一次啟動,讀取不到上次的offset信息,則會根據這個參數auto.offset.reset的值來進行消費數據
- setStartFromEarliest()
從最早的數據開始進行消費,忽略存儲的offset信息
- setStartFromLatest()
從最新的數據進行消費,忽略存儲的offset信息
-
setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long>)
從指定位置進行消費 -
當checkpoint機制開啟的時候,KafkaConsumer會定期把kafka的offset信息還有其他operator的狀態信息一塊保存起來。當job失敗重啟的時候,Flink會從最近一次的checkpoint中進行恢復數據,重新消費kafka中的數據。
- 為了能夠使用支持容錯的kafka Consumer,需要開啟checkpoint
env.enableCheckpointing(5000); // 每5s checkpoint一次
搭建Kafka單機環境
我本地安裝了一個kafka_2.11-2.1.0版本的kafka

啟動Zookeeper和kafka server:
啟動zk:nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
啟動server: nohup bin/kafka-server-start.sh config/server.properties &
創建一個topic:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

實戰案例
所有代碼,我放在了我的公眾號,回復Flink可以下載
- 海量【java和大數據的面試題+視頻資料】整理在公眾號,關注后可以下載~
- 更多大數據技術歡迎和作者一起探討~

Kafka作為Flink Sink
首先pom依賴:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.7.0</version> </dependency>
向kafka寫入數據:
public class KafkaProducer { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> text = env.addSource(new MyNoParalleSource()).setParallelism(1); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); //new FlinkKafkaProducer("topn",new KeyedSerializationSchemaWrapper(new SimpleStringSchema()),properties,FlinkKafkaProducer.Semantic.EXACTLY_ONCE); FlinkKafkaProducer<String> producer = new FlinkKafkaProducer("test",new SimpleStringSchema(),properties); /* //event-timestamp事件的發生時間 producer.setWriteTimestampToKafka(true); */ text.addSink(producer); env.execute(); } }//
大家這里特別注意,我們實現了一個並行度為1的MyNoParalleSource
來生產數據,代碼如下:
//使用並行度為1的source public class MyNoParalleSource implements SourceFunction<String> {//1 //private long count = 1L; private boolean isRunning = true; /** * 主要的方法 * 啟動一個source * 大部分情況下,都需要在這個run方法中實現一個循環,這樣就可以循環產生數據了 * * @param ctx * @throws Exception */ @Override public void run(SourceContext<String> ctx) throws Exception { while(isRunning){ //圖書的排行榜 List<String> books = new ArrayList<>(); books.add("Pyhton從入門到放棄");//10 books.add("Java從入門到放棄");//8 books.add("Php從入門到放棄");//5 books.add("C++從入門到放棄");//3 books.add("Scala從入門到放棄");//0-4 int i = new Random().nextInt(5); ctx.collect(books.get(i)); //每2秒產生一條數據 Thread.sleep(2000); } } //取消一個cancel的時候會調用的方法 @Override public void cancel() { isRunning = false; } }
代碼實現了一個發送器,來發送書名<Pyhton從入門到放棄><Java從入門到放棄>等...
然后右鍵運行我們的程序,控制台輸出如下:

開始源源不斷的生產數據了。
然后我們用命令去查看一下 kafka test
這個topic:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
輸出如下:

Kafka作為Flink Source
直接上代碼:
public class KafkaConsumer { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties); //從最早開始消費 consumer.setStartFromEarliest(); DataStream<String> stream = env .addSource(consumer); stream.print(); //stream.map(); env.execute(); } }//
控制台輸出如下:

將我們之前發往kafka的消息全部打印出來了。