相比於Hadoop,Spark在數據的處理方面更加靈活方便。然而在最近的使用中遇到了一點小麻煩:Spark保存文件的的函數(如saveAsTextFile)在保存數據時都需要新建一個目錄,然后在這個目錄下分塊保存文件。如果我們想在原有的目錄下增加一個文件(而不是增加一個目錄)
rddx.repartition(1).saveAsTextFile("test/test.txt")
rddx.coalesce(1).saveAsTextFile("test/test.txt")
把分區設置成1個 結果是Spark仍然是新建了一個目錄test.txt,然后在這個目錄下把數據都保存在了part-00000文件中
問題:如何讓spark將Rdd結果輸出到一個文件而不是目錄中呢?
Spark的保存模式的設定注定了在保存數據的時候只能新建目錄,如果想把數據增加到原有的目錄中,單獨作為一個文件,就只能借助於hadoop的HDFS操作。下面的例子演示如何用Hadoop的FileSystem實現在已有目錄下用一個文件保存Spark數據:
package com.ys.penspark.util;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedWriter;
import java.io.OutputStreamWriter;
import java.io.Serializable;
import java.net.URI;
/**
* @ClassName: HdfsOperate
* @Description:
* @Author: Administrator
* @Date: 2017/6/28
*/
public class HdfsOperate implements Serializable {
private static Logger logger = LoggerFactory.getLogger(HdfsOperate.class);
private static Configuration conf = new Configuration();
private static BufferedWriter writer = null;
//在hdfs的目標位置新建一個文件,得到一個輸出流
public static void openHdfsFile(String path) throws Exception {
FileSystem fs = FileSystem.get(URI.create(path),conf);
writer = new BufferedWriter(new OutputStreamWriter(fs.create(new Path(path))));
if(null!=writer){
logger.info("[HdfsOperate]>> initialize writer succeed!");
}
}
//往hdfs文件中寫入數據
public static void writeString(String line) {
try {
writer.write(line + "\n");
}catch(Exception e){
logger.error("[HdfsOperate]>> writer a line error:" , e);
}
}
//關閉hdfs輸出流
public static void closeHdfsFile() {
try {
if (null != writer) {
writer.close();
logger.info("[HdfsOperate]>> closeHdfsFile close writer succeed!");
}
else{
logger.error("[HdfsOperate]>> closeHdfsFile writer is null");
}
}catch(Exception e){
logger.error("[HdfsOperate]>> closeHdfsFile close hdfs error:" + e);
}
}
}
先將spark的Rdd重新分區,再將每個分區的數據collectPartitions按行寫入hdfs文件中
package com.ys.penspark.util;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
/**
* @ClassName: FeatureExtractor
* @Description:
* @Author: mashiwei
* @Date: 2017/6/28
*/
public class FeatureExtractor implements Serializable{
private static Logger logger = LoggerFactory.getLogger(FeatureExtractor.class);
public void extractFeature(Dataset<Row> s, int repartitionNum,String out) throws Exception {
StringBuffer sb = new StringBuffer();
for (int i = 0; i<= s.schema().fieldNames().length-1;i++) {
sb.append(s.schema().fieldNames()[i]);
if (i == s.schema().fieldNames().length-1){
break;
}
sb.append(",");
}
s.show();
JavaRDD<String> rddx = s.toJavaRDD().map(new ExtractFeatureMap()).repartition(repartitionNum);
//寫入hdfs文件位置
// String destinationPath = "/kettle/penspark/data.txt" ;
//創建Hdfs文件,打開Hdfs輸出流
HdfsOperate.openHdfsFile(out);
HdfsOperate.writeString(sb.toString());
//分塊讀取RDD數據並保存到hdfs
//如果直接用collect()函數獲取List<String>,可能因數據量過大超過內存空間而失敗
for (int i = 0; i < repartitionNum; i++) {
int[] index = new int[1];
index[0] = i;
// List<String>[] featureList = rddx.collectPartitions(index);
// List<String> strs = rddx.collect();
List<String>[] featureList = rddx.collectPartitions(index);
if (featureList.length != 1) {
logger.error("[FeatureExtractor]>> featureList.length is not 1!");
}
for (String str : featureList[0]) {
//寫一行到Hdfs文件
logger.info("-----"+str);
HdfsOperate.writeString(str);
}
}
//關閉Hdfs輸出流
HdfsOperate.closeHdfsFile();
}
class ExtractFeatureMap implements Function<Row, String> {
@Override
public String call(Row line) throws Exception {
try {
StringBuffer sb = new StringBuffer();
int len = line.length();
for (int i = 0; i<= len-1; i++){
sb.append(line.get(i).toString());
if (i == len-1){
break;
}
sb.append(",");
}
return sb.toString();
} catch (Exception e) {
logger.error("[FeatureExtractor]>>GetTokenAndKeywordFeature error:", e);
}
return null;
}
}
public static void main(String[] args) {
// SparkConf conf = new SparkConf().setAppName("WordCount").setMaster("local");
// JavaSparkContext sc= new JavaSparkContext(conf);
StructType Schemafinal = new StructType();
Map<String,String> options = new HashMap<String,String>();
LinkedList<StructField> obj = new LinkedList<StructField>();
StructField structField = new StructField("name", DataTypes.StringType, true, Metadata.empty());
StructField structField1 = new StructField("age", DataTypes.StringType, true, Metadata.empty());
// StructField structField2 = new StructField("字段2", DataTypes.StringType, true, Metadata.empty());
// StructField structField3 = new StructField("字段3", DataTypes.StringType, true, Metadata.empty());
obj.add(structField);
obj.add(structField1);
// obj.add(structField2);
// obj.add(structField3);
Schemafinal = new StructType(obj.toArray(new StructField[obj.size()]));
SparkConf conf = new SparkConf().setAppName("Example App").setMaster("local[*]");
options.put("delimiter",",");
options.put("header","true");
JavaSparkContext sc = new JavaSparkContext(conf);
@SuppressWarnings("deprecation")
SQLContext sqlContext = new SQLContext(sc);
SparkSession spark = SparkSession
.builder()
.appName("Pentaho Logic as Spark")
.config("spark.some.config.option", "some-value")
.config("spark.sql.warehouse.dir", "file:///C:/tmp/")
.getOrCreate();
Dataset<Row> tempdf = spark.read()
.format("com.databricks.spark.csv")
.options(options)
.schema(Schemafinal)
.option("header", true)
.load("file:///"+"C:\\Users\\Administrator\\Desktop\\測試\\功能開發\\excel.txt");
tempdf.show();
FeatureExtractor fx = new FeatureExtractor();
try {
// fx.extractFeature(sc,5);
fx.extractFeature(tempdf,2,"/kettle/tempData.txt");
} catch (Exception e) {
e.printStackTrace();
}
}
}
數據
name,age zs, 44 li, 22 ww, 18
