flink入門到實戰(6)flink批處理從0到1


一、DataSet API之Data Sources(消費者之數據源)

介紹:

flink提供了大量的已經實現好的source方法,你也可以自定義source 通過實現sourceFunction接口來自定義無並行度的source, 或者你也可以通過實現ParallelSourceFunction 接口 or 繼承RichParallelSourceFunction 來自定義有並行度的source。

類型:
基於文件

readTextFile(path) 讀取文本文件,文件遵循TextInputFormat 讀取規則,逐行讀取並返回。

基於集合

fromCollection(Collection) 通過java 的collection集合創建一個數據流,集合中的所有元素必須是相同類型的。

代碼實現:
1、fromCollection
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment object StreamingFromCollectionScala { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //隱式轉換 import org.apache.flink.api.scala._ val data = List(10,15,20) val text = env.fromCollection(data) //針對map接收到的數據執行加1的操作 val num = text.map(_+1) num.print().setParallelism(1) env.execute("StreamingFromCollectionScala") } } package xuwei.tech.batch; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; /** */ public class BatchWordCountJava { public static void main(String[] args) throws Exception{ val data = List(10,15,20) String outPath = "D:\\data\\result"; //獲取運行環境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //獲取文件中的內容 val text = env.fromCollection(data) DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).groupBy(0).sum(1); counts.writeAsCsv(outPath,"\n"," ").setParallelism(1); env.execute("batch word count"); } public static class Tokenizer implements FlatMapFunction<String,Tuple2<String,Integer>>{ public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] tokens = value.toLowerCase().split("\\W+"); for (String token: tokens) { if(token.length()>0){ out.collect(new Tuple2<String, Integer>(token,1)); } } } } } 
2、readTextFile
package xuwei.tech.batch; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; /** * Created by xuwei.tech on 2018/10/8. */ public class BatchWordCountJava { public static void main(String[] args) throws Exception{ String inputPath = "D:\\data\\file"; String outPath = "D:\\data\\result"; //獲取運行環境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //獲取文件中的內容 DataSource<String> text = env.readTextFile(inputPath); DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).groupBy(0).sum(1); counts.writeAsCsv(outPath,"\n"," ").setParallelism(1); env.execute("batch word count"); } public static class Tokenizer implements FlatMapFunction<String,Tuple2<String,Integer>>{ public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] tokens = value.toLowerCase().split("\\W+"); for (String token: tokens) { if(token.length()>0){ out.collect(new Tuple2<String, Integer>(token,1)); } } } } } 

二、DataSet API之Transformations

介紹:

  1. Map:輸入一個元素,然后返回一個元素,中間可以做一些清洗轉換等操作
  2. FlatMap:輸入一個元素,可以返回零個,一個或者多個元素
  3. MapPartition:類似map,一次處理一個分區的數據【如果在進行map處理的時候需要獲取第三方資源鏈接,建議使用MapPartition】
  4. Filter:過濾函數,對傳入的數據進行判斷,符合條件的數據會被留下
  5. Reduce:對數據進行聚合操作,結合當前元素和上一次reduce返回的值進行聚合操作,然后返回一個新的值
  6. Aggregate:sum、max、min等
  7. Distinct:返回一個數據集中去重之后的元素,data.distinct()
  8. Join:內連接
  9. OuterJoin:外鏈接
  10. Cross:獲取兩個數據集的笛卡爾積
  11. Union:返回兩個數據集的總和,數據類型需要一致
  12. First-n:獲取集合中的前N個元素
  13. Sort Partition:在本地對數據集的所有分區進行排序,通過sortPartition()的鏈接調用來完成對多個字段的排序
代碼實現:
1、broadcast(廣播變量)
package xuwei.tech.batch.batchAPI; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; /** * broadcast廣播變量 * * * * 需求: * flink會從數據源中獲取到用戶的姓名 * * 最終需要把用戶的姓名和年齡信息打印出來 * * 分析: * 所以就需要在中間的map處理的時候獲取用戶的年齡信息 * * 建議吧用戶的關系數據集使用廣播變量進行處理 * * * * * 注意:如果多個算子需要使用同一份數據集,那么需要在對應的多個算子后面分別注冊廣播變量 */ public class BatchDemoBroadcast { public static void main(String[] args) throws Exception{ //獲取運行環境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //1:准備需要廣播的數據 ArrayList<Tuple2<String, Integer>> broadData = new ArrayList<>(); broadData.add(new Tuple2<>("zs",18)); broadData.add(new Tuple2<>("ls",20)); broadData.add(new Tuple2<>("ww",17)); DataSet<Tuple2<String, Integer>> tupleData = env.fromCollection(broadData); //1.1:處理需要廣播的數據,把數據集轉換成map類型,map中的key就是用戶姓名,value就是用戶年齡 DataSet<HashMap<String, Integer>> toBroadcast = tupleData.map(new MapFunction<Tuple2<String, Integer>, HashMap<String, Integer>>() { @Override public HashMap<String, Integer> map(Tuple2<String, Integer> value) throws Exception { HashMap<String, Integer> res = new HashMap<>(); res.put(value.f0, value.f1); return res; } }); //源數據 DataSource<String> data = env.fromElements("zs", "ls", "ww"); //注意:在這里需要使用到RichMapFunction獲取廣播變量 DataSet<String> result = data.map(new RichMapFunction<String, String>() { List<HashMap<String, Integer>> broadCastMap = new ArrayList<HashMap<String, Integer>>(); HashMap<String, Integer> allMap = new HashMap<String, Integer>(); /** * 這個方法只會執行一次 * 可以在這里實現一些初始化的功能 * * 所以,就可以在open方法中獲取廣播變量數據 * */ @Override public void open(Configuration parameters) throws Exception { super.open(parameters); //3:獲取廣播數據 this.broadCastMap = getRuntimeContext().getBroadcastVariable("broadCastMapName"); for (HashMap map : broadCastMap) { allMap.putAll(map); } } @Override public String map(String value) throws Exception { Integer age = allMap.get(value); return value + "," + age; } }).withBroadcastSet(toBroadcast, "broadCastMapName");//2:執行廣播數據的操作 result.print(); } } 
2、IntCounter(累加器)
package xuwei.tech.batch.batchAPI; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.accumulators.IntCounter; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.operators.MapOperator; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; /** * 全局累加器 * * counter 計數器 * * 需求: * 計算map函數中處理了多少數據 * * * 注意:只有在任務執行結束后,才能獲取到累加器的值 * * * * Created by xuwei.tech on 2018/10/8. */ public class BatchDemoCounter { public static void main(String[] args) throws Exception{ //獲取運行環境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSource<String> data = env.fromElements("a", "b", "c", "d"); DataSet<String> result = data.map(new RichMapFunction<String, String>() { //1:創建累加器 private IntCounter numLines = new IntCounter(); @Override public void open(Configuration parameters) throws Exception { super.open(parameters); //2:注冊累加器 getRuntimeContext().addAccumulator("num-lines",this.numLines); } //int sum = 0; @Override public String map(String value) throws Exception { //如果並行度為1,使用普通的累加求和即可,但是設置多個並行度,則普通的累加求和結果就不准了 //sum++; //System.out.println("sum:"+sum); this.numLines.add(1); return value; } }).setParallelism(8); //result.print(); result.writeAsText("d:\\data\\count10"); JobExecutionResult jobResult = env.execute("counter"); //3:獲取累加器 int num = jobResult.getAccumulatorResult("num-lines"); System.out.println("num:"+num); } } 
3、cross(獲取笛卡爾積)
package xuwei.tech.batch.batchAPI; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.CrossOperator; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import java.util.ArrayList; /** * 獲取笛卡爾積 * * Created by xuwei.tech on 2018/10/8. */ public class BatchDemoCross { public static void main(String[] args) throws Exception{ //獲取運行環境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //tuple2<用戶id,用戶姓名> ArrayList<String> data1 = new ArrayList<>(); data1.add("zs"); data1.add("ww"); //tuple2<用戶id,用戶所在城市> ArrayList<Integer> data2 = new ArrayList<>(); data2.add(1); data2.add(2); DataSource<String> text1 = env.fromCollection(data1); DataSource<Integer> text2 = env.fromCollection(data2); CrossOperator.DefaultCross<String, Integer> cross = text1.cross(text2); cross.print(); } } 
4、registerCachedFile(Distributed Cache)
package xuwei.tech.batch.batchAPI; import org.apache.commons.io.FileUtils; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.operators.MapOperator; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import java.io.File; import java.util.ArrayList; import java.util.HashMap; import java.util.List; /** * Distributed Cache */ public class BatchDemoDisCache { public static void main(String[] args) throws Exception{ //獲取運行環境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //1:注冊一個文件,可以使用hdfs或者s3上的文件 env.registerCachedFile("d:\\data\\file\\a.txt","a.txt"); DataSource<String> data = env.fromElements("a", "b", "c", "d"); DataSet<String> result = data.map(new RichMapFunction<String, String>() { private ArrayList<String> dataList = new ArrayList<String>(); @Override public void open(Configuration parameters) throws Exception { super.open(parameters); //2:使用文件 File myFile = getRuntimeContext().getDistributedCache().getFile("a.txt"); List<String> lines = FileUtils.readLines(myFile); for (String line : lines) { this.dataList.add(line); System.out.println("line:" + line); } } @Override public String map(String value) throws Exception { //在這里就可以使用dataList return value; } }); result.print(); } } 
5、distinct
package xuwei.tech.batch.batchAPI; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.MapPartitionFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.operators.FlatMapOperator; import org.apache.flink.util.Collector; import java.util.ArrayList; import java.util.Iterator; public class BatchDemoDistinct { public static void main(String[] args) throws Exception{ //獲取運行環境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); ArrayList<String> data = new ArrayList<>(); data.add("hello you"); data.add("hello me"); DataSource<String> text = env.fromCollection(data); FlatMapOperator<String, String> flatMapData = text.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String value, Collector<String> out) throws Exception { String[] split = value.toLowerCase().split("\\W+"); for (String word : split) { System.out.println("單詞:"+word); out.collect(word); } } }); flatMapData.distinct()// 對數據進行整體去重 .print(); } } 
6、排序(first)
package xuwei.tech.batch.batchAPI; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import java.util.ArrayList; /** * 獲取集合中的前N個元素 * Created by xuwei.tech on 2018/10/8. */ public class BatchDemoFirstN { public static void main(String[] args) throws Exception{ //獲取運行環境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); ArrayList<Tuple2<Integer, String>> data = new ArrayList<>(); data.add(new Tuple2<>(2,"zs")); data.add(new Tuple2<>(4,"ls")); data.add(new Tuple2<>(3,"ww")); data.add(new Tuple2<>(1,"xw")); data.add(new Tuple2<>(1,"aw")); data.add(new Tuple2<>(1,"mw")); DataSource<Tuple2<Integer, String>> text = env.fromCollection(data); //獲取前3條數據,按照數據插入的順序 text.first(3).print(); System.out.println("=============================="); //根據數據中的第一列進行分組,獲取每組的前2個元素 text.groupBy(0).first(2).print(); System.out.println("=============================="); //根據數據中的第一列分組,再根據第二列進行組內排序[升序],獲取每組的前2個元素 text.groupBy(0).sortGroup(1, Order.ASCENDING).first(2).print(); System.out.println("=============================="); //不分組,全局排序獲取集合中的前3個元素,針對第一個元素升序,第二個元素倒序 text.sortPartition(0,Order.ASCENDING).sortPartition(1,Order.DESCENDING).first(3).print(); } } 
7、join
package xuwei.tech.batch.batchAPI; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.operators.FlatMapOperator; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.util.Collector; import java.util.ArrayList; public class BatchDemoJoin { public static void main(String[] args) throws Exception{ //獲取運行環境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //tuple2<用戶id,用戶姓名> ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<>(); data1.add(new Tuple2<>(1,"zs")); data1.add(new Tuple2<>(2,"ls")); data1.add(new Tuple2<>(3,"ww")); //tuple2<用戶id,用戶所在城市> ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<>(); data2.add(new Tuple2<>(1,"beijing")); data2.add(new Tuple2<>(2,"shanghai")); data2.add(new Tuple2<>(3,"guangzhou")); DataSource<Tuple2<Integer, String>> text1 = env.fromCollection(data1); DataSource<Tuple2<Integer, String>> text2 = env.fromCollection(data2); text1.join(text2).where(0)//指定第一個數據集中需要進行比較的元素角標 .equalTo(0)//指定第二個數據集中需要進行比較的元素角標 .with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() { @Override public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception { return new Tuple3<>(first.f0,first.f1,second.f1); } }).print(); System.out.println("=================================="); //注意,這里用map和上面使用的with最終效果是一致的。 /*text1.join(text2).where(0)//指定第一個數據集中需要進行比較的元素角標 .equalTo(0)//指定第二個數據集中需要進行比較的元素角標 .map(new MapFunction<Tuple2<Tuple2<Integer,String>,Tuple2<Integer,String>>, Tuple3<Integer,String,String>>() { @Override public Tuple3<Integer, String, String> map(Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>> value) throws Exception { return new Tuple3<>(value.f0.f0,value.f0.f1,value.f1.f1); } }).print();*/ } } 
8、outerJoin
package xuwei.tech.batch.batchAPI; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import java.util.ArrayList; /** * 外連接 * * 左外連接 * 右外連接 * 全外連接 */ public class BatchDemoOuterJoin { public static void main(String[] args) throws Exception{ //獲取運行環境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //tuple2<用戶id,用戶姓名> ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<>(); data1.add(new Tuple2<>(1,"zs")); data1.add(new Tuple2<>(2,"ls")); data1.add(new Tuple2<>(3,"ww")); //tuple2<用戶id,用戶所在城市> ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<>(); data2.add(new Tuple2<>(1,"beijing")); data2.add(new Tuple2<>(2,"shanghai")); data2.add(new Tuple2<>(4,"guangzhou")); DataSource<Tuple2<Integer, String>> text1 = env.fromCollection(data1); DataSource<Tuple2<Integer, String>> text2 = env.fromCollection(data2); /** * 左外連接 * * 注意:second這個tuple中的元素可能為null * */ text1.leftOuterJoin(text2) .where(0) .equalTo(0) .with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() { @Override public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception { if(second==null){ return new Tuple3<>(first.f0,first.f1,"null"); }else{ return new Tuple3<>(first.f0,first.f1,second.f1); } } }).print(); System.out.println("============================================================================="); /** * 右外連接 * * 注意:first這個tuple中的數據可能為null * */ text1.rightOuterJoin(text2) .where(0) .equalTo(0) .with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() { @Override public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception { if(first==null){ return new Tuple3<>(second.f0,"null",second.f1); } return new Tuple3<>(first.f0,first.f1,second.f1); } }).print(); System.out.println("============================================================================="); /** * 全外連接 * * 注意:first和second這兩個tuple都有可能為null * */ text1.fullOuterJoin(text2) .where(0) .equalTo(0) .with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() { @Override public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception { if(first==null){ return new Tuple3<>(second.f0,"null",second.f1); }else if(second == null){ return new Tuple3<>(first.f0,first.f1,"null"); }else{ return new Tuple3<>(first.f0,first.f1,second.f1); } } }).print(); } } 
9、union
package xuwei.tech.batch.batchAPI; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.operators.UnionOperator; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import java.util.ArrayList; /** * Created by xuwei.tech on 2018/10/8. */ public class BatchDemoUnion { public static void main(String[] args) throws Exception{ //獲取運行環境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<>(); data1.add(new Tuple2<>(1,"zs")); data1.add(new Tuple2<>(2,"ls")); data1.add(new Tuple2<>(3,"ww")); ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<>(); data2.add(new Tuple2<>(1,"lili")); data2.add(new Tuple2<>(2,"jack")); data2.add(new Tuple2<>(3,"jessic")); DataSource<Tuple2<Integer, String>> text1 = env.fromCollection(data1); DataSource<Tuple2<Integer, String>> text2 = env.fromCollection(data2); UnionOperator<Tuple2<Integer, String>> union = text1.union(text2); union.print(); } } 

三、DataStream API之partition

介紹:
  1. Rebalance:對數據集進行再平衡,重分區,消除數據傾斜
  2. Hash-Partition:根據指定key的哈希值對數據集進行分區
  3. partitionByHash()
  4. Range-Partition:根據指定的key對數據集進行范圍分區
  5. .partitionByRange()
  6. Custom Partitioning:自定義分區規則
  7. 自定義分區需要實現Partitioner接口
  8. partitionCustom(partitioner, "someKey")
  9. 或者partitionCustom(partitioner, 0)
代碼實現:
1、partitionByRange或partitionByHash
package xuwei.tech.batch.batchAPI;

import org.apache.flink.api.common.functions.MapPartitionFunction; import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; import java.util.ArrayList; import java.util.Iterator; /** * Hash-Partition * * Range-Partition * * * Created by xuwei.tech on 2018/10/8. */ public class BatchDemoHashRangePartition { public static void main(String[] args) throws Exception{ //獲取運行環境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); ArrayList<Tuple2<Integer, String>> data = new ArrayList<>(); data.add(new Tuple2<>(1,"hello1")); data.add(new Tuple2<>(2,"hello2")); data.add(new Tuple2<>(2,"hello3")); data.add(new Tuple2<>(3,"hello4")); data.add(new Tuple2<>(3,"hello5")); data.add(new Tuple2<>(3,"hello6")); data.add(new Tuple2<>(4,"hello7")); data.add(new Tuple2<>(4,"hello8")); data.add(new Tuple2<>(4,"hello9")); data.add(new Tuple2<>(4,"hello10")); data.add(new Tuple2<>(5,"hello11")); data.add(new Tuple2<>(5,"hello12")); data.add(new Tuple2<>(5,"hello13")); data.add(new Tuple2<>(5,"hello14")); data.add(new Tuple2<>(5,"hello15")); data.add(new Tuple2<>(6,"hello16")); data.add(new Tuple2<>(6,"hello17")); data.add(new Tuple2<>(6,"hello18")); data.add(new Tuple2<>(6,"hello19")); data.add(new Tuple2<>(6,"hello20")); data.add(new Tuple2<>(6,"hello21")); DataSource<Tuple2<Integer, String>> text = env.fromCollection(data); /*text.partitionByHash(0).mapPartition(new MapPartitionFunction<Tuple2<Integer,String>, Tuple2<Integer,String>>() { @Override public void mapPartition(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception { Iterator<Tuple2<Integer, String>> it = values.iterator(); while (it.hasNext()){ Tuple2<Integer, String> next = it.next(); System.out.println("當前線程id:"+Thread.currentThread().getId()+","+next); } } }).print();*/ text.partitionByRange(0).mapPartition(new MapPartitionFunction<Tuple2<Integer,String>, Tuple2<Integer,String>>() { @Override public void mapPartition(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception { Iterator<Tuple2<Integer, String>> it = values.iterator(); while (it.hasNext()){ Tuple2<Integer, String> next = it.next(); System.out.println("當前線程id:"+Thread.currentThread().getId()+","+next); } } }).print(); } } 
2、mapPartition
package xuwei.tech.batch.batchAPI; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.MapPartitionFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.operators.MapPartitionOperator; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; import java.util.ArrayList; import java.util.Iterator; /** * Created by xuwei.tech on 2018/10/8. */ public class BatchDemoMapPartition { public static void main(String[] args) throws Exception{ //獲取運行環境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); ArrayList<String> data = new ArrayList<>(); data.add("hello you"); data.add("hello me"); DataSource<String> text = env.fromCollection(data); /*text.map(new MapFunction<String, String>() { @Override public String map(String value) throws Exception { //獲取數據庫連接--注意,此時是每過來一條數據就獲取一次鏈接 //處理數據 //關閉連接 return value; } });*/ DataSet<String> mapPartitionData = text.mapPartition(new MapPartitionFunction<String, String>() { @Override public void mapPartition(Iterable<String> values, Collector<String> out) throws Exception { //獲取數據庫連接--注意,此時是一個分區的數據獲取一次連接【優點,每個分區獲取一次鏈接】 //values中保存了一個分區的數據 //處理數據 Iterator<String> it = values.iterator(); while (it.hasNext()) { String next = it.next(); String[] split = next.split("\\W+"); for (String word : split) { out.collect(word); } } //關閉鏈接 } }); mapPartitionData.print(); } } 

四、DataSet API之Data Sink(數據落地)

介紹:
  1. writeAsText():將元素以字符串形式逐行寫入,這些字符串通過調用每個元素的toString()方法來獲取
  2. writeAsCsv():將元組以逗號分隔寫入文件中,行及字段之間的分隔是可配置的。每個字段的值來自對象的toString()方法
  3. print():打印每個元素的toString()方法的值到標准輸出或者標准錯誤輸出流中
代碼:
1、writeAsCsv
package xuwei.tech.batch; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; /** * Created by xuwei.tech on 2018/10/8. */ public class BatchWordCountJava { public static void main(String[] args) throws Exception{ String inputPath = "D:\\data\\file"; String outPath = "D:\\data\\result"; //獲取運行環境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //獲取文件中的內容 DataSource<String> text = env.readTextFile(inputPath); DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).groupBy(0).sum(1); counts.writeAsCsv(outPath,"\n"," ").setParallelism(1); env.execute("batch word count"); } public static class Tokenizer implements FlatMapFunction<String,Tuple2<String,Integer>>{ public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] tokens = value.toLowerCase().split("\\W+"); for (String token: tokens) { if(token.length()>0){ out.collect(new Tuple2<String, Integer>(token,1)); } } } } } 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM