import com.bean.Yyds1 import org.apache.spark.sql.SparkSession object TestReadCSV { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("CSV Reader") .master("local") .getOrCreate() /** * 參數可以字符串,也可以是具體的類型,比如boolean * delimiter 分隔符,默認為逗號, * nullValue 指定一個字符串代表 null 值 * quote 引號字符,默認為雙引號" * header 第一行不作為數據內容,作為標題 * inferSchema 自動推測字段類型 * ignoreLeadingWhiteSpace 裁剪前面的空格 * ignoreTrailingWhiteSpace 裁剪后面的空格 * nullValue 空值設置,如果不想用任何符號作為空值,可以賦值null即可 * multiline 運行多列,超過62 columns時使用 * encoding 指定編碼,如:gbk / utf-8 Unicode GB2312 * ** */ import spark.implicits._ val result = spark.read.format("csv") .option("delimiter", "\\t") .option("encoding","GB2312") .option("enforceSchema",false) .option("header", "true") // .option("header", false) .option("quote", "'") .option("nullValue", "\\N") .option("ignoreLeadingWhiteSpace", false) .option("ignoreTrailingWhiteSpace", false) .option("nullValue", null) .option("multiline", "true") .load("G:\\python\\yyds\\yyds_1120_tab.csv").as[Yyds1] //yyds_1120_tab.csv aa1.csv yyds_20211120 yyds_1120_tab2_utf-8 result.map(row => { row.ji_check_cnt.toInt }).foreachPartition(a => {a.foreach(println _)}) } }
pom依賴
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>TmLimitPredict</artifactId> <version>1.0-SNAPSHOT</version> <properties> <log4j.version>1.2.17</log4j.version> <slf4j.version>1.7.22</slf4j.version> <!--0.8.2-beta 0.8.2.0 0.8.2.1 0.8.2.2 0.9.0.1 0.10.0.0 0.10.1.0 0.10.0.1 0.10.2.0 1.0.0 2.8.0--> <kafka.version>2.8.0</kafka.version> <spark.version>2.2.0</spark.version> <scala.version>2.11.8</scala.version> <jblas.version>1.2.1</jblas.version> <hadoop.version>2.7.3</hadoop.version> </properties> <dependencies> <!--引入共同的日志管理工具--> <!-- <dependency>--> <!-- <groupId>org.slf4j</groupId>--> <!-- <artifactId>jcl-over-slf4j</artifactId>--> <!-- <version>${slf4j.version}</version>--> <!-- </dependency>--> <!-- <dependency>--> <!-- <groupId>org.slf4j</groupId>--> <!-- <artifactId>slf4j-api</artifactId>--> <!-- <version>${slf4j.version}</version>--> <!-- </dependency>--> <!-- <dependency>--> <!-- <groupId>org.slf4j</groupId>--> <!-- <artifactId>slf4j-log4j12</artifactId>--> <!-- <version>${slf4j.version}</version>--> <!-- </dependency>--> <!-- <dependency>--> <!-- <groupId>log4j</groupId>--> <!-- <artifactId>log4j</artifactId>--> <!-- <version>${log4j.version}</version>--> <!-- </dependency>--> <!-- Spark的依賴引入 --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> <exclusions> <exclusion> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </exclusion> </exclusions> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>15.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId> <version>${spark.version}</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>${spark.version}</version> <!--<scope>provided</scope>--> </dependency> <!-- 引入Scala --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> <!--<scope>provided</scope>--> </dependency> <!--MLlib--> <dependency> <groupId>org.scalanlp</groupId> <artifactId>jblas</artifactId> <version>${jblas.version}</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_2.11</artifactId> <version>${spark.version}</version> <!--<scope>provided</scope>--> </dependency> <!-- kafka --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>${spark.version}</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>com.sf.kafka</groupId> <artifactId>sf-kafka-api-core</artifactId> <version>2.4.1</version> <!--<scope>provided</scope>--> </dependency> <!-- lombok 生成get、set方法工具--> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.16.18</version> <scope>provided</scope> </dependency> </dependencies> <build> <!-- <sourceDirectory>src/main/scala</sourceDirectory>--> <sourceDirectory>src/main/java</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <resources> <resource> <directory>src/main/resources</directory> <includes> <include>**/*.properties</include> <include>**/*.xml</include> </includes> <!-- 排除外置的配置文件(運行時注釋上,使IDE能讀到配置文件;打包時放開注釋讓配置文件外置,方便修改)可以不配置,maven-jar-plugin下面已配置 --> <!--<excludes> <exclude>config.properties</exclude> </excludes>--> </resource> <!-- 配置文件外置的資源(存放到conf目錄,也是classpath路徑,下面會配置)--> <!--<resource> <directory>src/main/resources</directory> <includes> <include>config.properties</include> </includes> <targetPath>${project.build.directory}/conf</targetPath> </resource>--> </resources> <plugins> <!--scala編譯打包插件--> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <!-- <groupId>org.scala-tools</groupId>--> <!-- <artifactId>maven-scala-plugin</artifactId>--> <!-- <version>2.15.2</version>--> <executions> <execution> <id>scala-compile-first</id> <phase>process-resources</phase> <goals> <goal>add-source</goal> <goal>compile</goal> </goals> </execution> </executions> </plugin> <!--java編譯打包插件--> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> <executions> <execution> <phase>compile</phase> <goals> <goal>compile</goal> </goals> </execution> </executions> </plugin> <!-- ③打成一個zip包,發布項目的時候,將zip包copy到服務器上,直接unzip xxx.zip,里面包含要運行到的jar以及依賴的lib,還有配置的config文件,即可直接啟動服務 --> <plugin> <artifactId>maven-dependency-plugin</artifactId> <executions> <execution> <phase>process-sources</phase> <goals> <goal>copy-dependencies</goal> </goals> <configuration> <excludeScope>provided</excludeScope> <outputDirectory>${project.build.directory}/lib</outputDirectory> </configuration> </execution> </executions> </plugin> <!--The configuration of maven-jar-plugin--> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <version>2.4</version> <!--The configuration of the plugin--> <configuration> <!-- 不打包資源文件(配置文件和依賴包分開) --> <excludes> <!-- <exclude>*.properties</exclude>--> <!-- <exclude>*.xml</exclude>--> <exclude>*.txt</exclude> </excludes> <!--Configuration of the archiver--> <archive> <!--生成的jar中,不要包含pom.xml和pom.properties這兩個文件--> <addMavenDescriptor>false</addMavenDescriptor> <!--Manifest specific configuration--> <manifest> <!--是否把第三方jar放到manifest的classpath中--> <!-- <addClasspath>true</addClasspath>--> <addClasspath>false</addClasspath> <!--生成的manifest中classpath的前綴,因為要把第三方jar放到lib目錄下,所以classpath的前綴是lib/--> <classpathPrefix>lib/</classpathPrefix> <!--應用的main class--> <!-- <mainClass>com.sf.tmlimit.TmLimitPredStream</mainClass>--> <mainClass>ConnectKafkaTest</mainClass> </manifest> <!-- 給清單文件添加鍵值對,增加classpath路徑,這里將conf目錄也設置為classpath路徑 --> <manifestEntries> <!-- <Class-Path>conf/</Class-Path>--> <Class-Path>lib/</Class-Path> </manifestEntries> </archive> <!--過濾掉不希望包含在jar中的文件--> <!-- <excludes> <exclude>${project.basedir}/xml/*</exclude> </excludes>--> </configuration> </plugin> <!--The configuration of maven-assembly-plugin--> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>2.4</version> <!--The configuration of the plugin--> <configuration> <!--Specifies the configuration file of the assembly plugin--> <descriptors> <descriptor>src/main/assembly/assembly.xml</descriptor> </descriptors> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
參數 | 解釋 |
sep | 默認是, 指定單個字符分割字段和值 |
encoding | 默認是uft-8通過給定的編碼類型進行解碼 |
quote | 默認是“,其中分隔符可以是值的一部分,設置用於轉義帶引號的值的單個字符。如果您想關閉引號,則需要設置一個空字符串,而不是null。 |
escape | 默認(\)設置單個字符用於在引號里面轉義引號 |
charToEscapeQuoteEscaping | 默認是轉義字符(上面的escape)或者\0,當轉義字符和引號(quote)字符不同的時候,默認是轉義字符(escape),否則為\0 |
comment | 默認是空值,設置用於跳過行的單個字符,以該字符開頭。默認情況下,它是禁用的 |
header | 默認是false,將第一行作為列名 |
enforceSchema | 默認是true, 如果將其設置為true,則指定或推斷的模式將強制應用於數據源文件,而CSV文件中的標頭將被忽略。 如果選項設置為false,則在header選項設置為true的情況下,將針對CSV文件中的所有標題驗證模式。 模式中的字段名稱和CSV標頭中的列名稱是根據它們的位置檢查的,並考慮了*spark.sql.caseSensitive。 雖然默認值為true,但是建議禁用 enforceSchema選項,以避免產生錯誤的結果 |
inferSchema | inferSchema(默認為false`):從數據自動推斷輸入模式。 *需要對數據進行一次額外的傳遞 |
samplingRatio | 默認為1.0,定義用於模式推斷的行的分數 |
ignoreLeadingWhiteSpace | 默認為false,一個標志,指示是否應跳過正在讀取的值中的前導空格 |
ignoreTrailingWhiteSpace | 默認為false一個標志,指示是否應跳過正在讀取的值的結尾空格 |
nullValue | 默認是空的字符串,設置null值的字符串表示形式。從2.0.1開始,這適用於所有支持的類型,包括字符串類型 |
emptyValue | 默認是空字符串,設置一個空值的字符串表示形式 |
nanValue | 默認是Nan,設置非數字的字符串表示形式 |
positiveInf | 默認是Inf |
negativeInf | 默認是-Inf 設置負無窮值的字符串表示形式 |
dateFormat | 默認是yyyy-MM-dd,設置指示日期格式的字符串。 自定義日期格式遵循java.text.SimpleDateFormat中的格式。這適用於日期類型 |
timestampFormat | 默認是yyyy-MM-dd'T'HH:mm:ss.SSSXXX,設置表示時間戳格式的字符串。 自定義日期格式遵循java.text.SimpleDateFormat中的格式。這適用於時間戳記類型 |
maxColumns | 默認是20480定義多少列數目的硬性設置 |
maxCharsPerColumn | 默認是-1定義讀取的任何給定值允許的最大字符數。默認情況下為-1,表示長度不受限制 |
mode | 默認(允許)允許一種在解析過程中處理損壞記錄的模式。它支持以下不區分大小寫的模式。 請注意,Spark嘗試在列修剪下僅解析CSV中必需的列。因此,損壞的記錄可以根據所需的字段集而有所不同。 可以通過spark.sql.csv.parser.columnPruning.enabled(默認啟用)來控制此行為。 |
mode下面的參數: | --------------------------------------------------- |
PERMISSIVE | 當它遇到損壞的記錄時,將格式錯誤的字符串放入由“ columnNameOfCorruptRecord”配置的*字段中,並將其他字段設置為“ null”。 為了保留損壞的記錄,用戶可以在用戶定義的模式中設置一個名為columnNameOfCorruptRecord |
DROPMALFORMED | 忽略整個損壞的記錄 |
FAILFAST | 遇到損壞的記錄時引發異常 |
-----mode參數結束---- | ------------------------------------------------------- |
columnNameOfCorruptRecord | 默認值指定在spark.sql.columnNameOfCorruptRecord,允許重命名由PERMISSIVE模式創建的格式錯誤的新字段。這會覆蓋spark.sql.columnNameOfCorruptRecord |
multiLine | 默認是false,解析一條記錄,該記錄可能超過62個columns |