假设一个对象中,有一个自定义的对象集合,比如:
import java.io.Serializable; import java.util.List; public class LogInfo implements Serializable{ private static final long serialVersionUID = 4053810260183406530L; public String logFilePath; public List<AppInfo> appInfo ; public String getLogFilePath() { return logFilePath; } public List<AppInfo> getAppInfo() { return appInfo; } public void setLogFilePath(String logFilePath) { this.logFilePath = logFilePath; } public void setAppInfo(List<AppInfo> appInfo) { this.appInfo = appInfo; } }
AppInfo 对象如下:
import java.io.Serializable; public class AppInfo implements Serializable{ public String logTime; public String msg; public String logInfo; public String getLogTime() { return logTime; } public String getMsg() { return msg; } public String getLogInfo() { return logInfo; } public void setLogTime(String logTime) { this.logTime = logTime; } public void setMsg(String msg) { this.msg = msg; } public void setLogInfo(String logInfo) { this.logInfo = logInfo; } public void createData(Integer i) { this.logTime="logTime"+i; this.msg="msg"+i; this.logInfo="logInfo"+i; } }
此时,如果是LogInfo对象的rdd,需要生成数据集进行落盘,方式如下:
import java.util.ArrayList; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import com.demo.log.analysis.entiy.AppInfo; import com.demo.log.analysis.entiy.LogInfo; public class SparkSchemaTest { public static void main(String[] args) { SparkSession spark=getSparkSession(); List<LogInfo> data=data(); List<Row> rowData=new ArrayList<Row>(); for(LogInfo i:data) { List<Row> appinfo=new ArrayList<Row>(); for(AppInfo j:i.getAppInfo()) { Row row=RowFactory.create(j.getLogInfo(),j.getLogTime(),j.getMsg()); appinfo.add(row); } Row row=RowFactory.create(i.getLogFilePath(),appinfo); rowData.add(row); } StructType schema=schema(); Dataset<Row> ds=spark.createDataFrame(rowData, schema); ds.printSchema(); ds.show(); spark.close(); } public static StructType schema() { StructType schema=new StructType(); schema=schema.add("logFilePath", DataTypes.StringType); List<StructField> sf=new ArrayList<StructField>(); sf.add(new StructField("logTime",DataTypes.StringType,false,null)); sf.add(new StructField("msg",DataTypes.StringType,false,null)); sf.add(new StructField("logInfo",DataTypes.StringType,false,null)); schema=schema.add("appInfo", DataTypes.createArrayType(DataTypes.createStructType(sf))); return schema; } public static List<LogInfo> data(){ List<LogInfo> data=new ArrayList<LogInfo>(); LogInfo logInfo=new LogInfo(); logInfo.setLogFilePath("A"); ArrayList<AppInfo> appList=new ArrayList<AppInfo>(); AppInfo appInfo=new AppInfo(); appInfo.createData(1); appList.add(appInfo); appInfo=new AppInfo(); appInfo.createData(2); appList.add(appInfo); logInfo.setAppInfo(appList); data.add(logInfo); logInfo=new LogInfo(); logInfo.setLogFilePath("B"); appList=new ArrayList<AppInfo>(); appInfo=new AppInfo(); appInfo.createData(3); appList.add(appInfo); appInfo=new AppInfo(); appInfo.createData(4); appList.add(appInfo); logInfo.setAppInfo(appList); data.add(logInfo); return data; } public static SparkSession getSparkSession() { SparkConf conf = new SparkConf(); conf.set("spark.sql.sources.partitionColumnTypeInference.enabled", "false"); conf.setMaster("local[1]"); SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); return spark; } }
此时就会正常形成Dataset<Row> ,如果在RowFactory.create中的第二个值,直接写成i.getAppInfo(),的话,就会报错:
Spark: scala.MatchError
所以,应该改为以上的 Row嵌套Row,就会解决这个问题,运行结果如下: