Spark 讀取csv文件操作,option參數解釋


 

import com.bean.Yyds1
import org.apache.spark.sql.SparkSession

object TestReadCSV {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("CSV Reader")
      .master("local")
      .getOrCreate()
    /** *   參數可以字符串,也可以是具體的類型,比如boolean
     * delimiter 分隔符,默認為逗號,
     * nullValue 指定一個字符串代表 null 值
     * quote 引號字符,默認為雙引號"
     * header 第一行不作為數據內容,作為標題
     * inferSchema 自動推測字段類型
     * ignoreLeadingWhiteSpace 裁剪前面的空格
     * ignoreTrailingWhiteSpace 裁剪后面的空格
     * nullValue 空值設置,如果不想用任何符號作為空值,可以賦值null即可
     * multiline  運行多列,超過62 columns時使用
     * encoding   指定編碼,如:gbk  / utf-8  Unicode  GB2312
     * ** */

      import spark.implicits._
    val result = spark.read.format("csv")
      .option("delimiter", "\\t")
      .option("encoding","GB2312")
      .option("enforceSchema",false)
      .option("header", "true")
//      .option("header", false)
      .option("quote", "'")
      .option("nullValue", "\\N")
      .option("ignoreLeadingWhiteSpace", false)
      .option("ignoreTrailingWhiteSpace", false)
      .option("nullValue", null)
      .option("multiline", "true")
      .load("G:\\python\\yyds\\yyds_1120_tab.csv").as[Yyds1] //yyds_1120_tab.csv  aa1.csv   yyds_20211120  yyds_1120_tab2_utf-8


    result.map(row => {
      row.ji_check_cnt.toInt
    }).foreachPartition(a => {a.foreach(println _)})

  }
}

 

pom依賴

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.example</groupId>
    <artifactId>TmLimitPredict</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <log4j.version>1.2.17</log4j.version>
        <slf4j.version>1.7.22</slf4j.version>
        <!--0.8.2-beta   0.8.2.0    0.8.2.1   0.8.2.2   0.9.0.1   0.10.0.0
                        0.10.1.0   0.10.0.1   0.10.2.0   1.0.0   2.8.0-->
        <kafka.version>2.8.0</kafka.version>
        <spark.version>2.2.0</spark.version>
        <scala.version>2.11.8</scala.version>
        <jblas.version>1.2.1</jblas.version>
        <hadoop.version>2.7.3</hadoop.version>
    </properties>


    <dependencies>
        <!--引入共同的日志管理工具-->
        <!--        <dependency>-->
        <!--            <groupId>org.slf4j</groupId>-->
        <!--            <artifactId>jcl-over-slf4j</artifactId>-->
        <!--            <version>${slf4j.version}</version>-->
        <!--        </dependency>-->
        <!--        <dependency>-->
        <!--            <groupId>org.slf4j</groupId>-->
        <!--            <artifactId>slf4j-api</artifactId>-->
        <!--            <version>${slf4j.version}</version>-->
        <!--        </dependency>-->
        <!--        <dependency>-->
        <!--            <groupId>org.slf4j</groupId>-->
        <!--            <artifactId>slf4j-log4j12</artifactId>-->
        <!--            <version>${slf4j.version}</version>-->
        <!--        </dependency>-->
        <!--        <dependency>-->
        <!--            <groupId>log4j</groupId>-->
        <!--            <artifactId>log4j</artifactId>-->
        <!--            <version>${log4j.version}</version>-->
        <!--        </dependency>-->
        <!-- Spark的依賴引入 -->

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>com.google.guava</groupId>
                    <artifactId>guava</artifactId>
                </exclusion>
            </exclusions>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>15.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>${spark.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <!-- 引入Scala -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>

        <!--MLlib-->
        <dependency>
            <groupId>org.scalanlp</groupId>
            <artifactId>jblas</artifactId>
            <version>${jblas.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.11</artifactId>
            <version>${spark.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>


        <!-- kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>${spark.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>com.sf.kafka</groupId>
            <artifactId>sf-kafka-api-core</artifactId>
            <version>2.4.1</version>
            <!--<scope>provided</scope>-->
        </dependency>

        <!--     lombok   生成get、set方法工具-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.18</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>


    <build>
        <!--    <sourceDirectory>src/main/scala</sourceDirectory>-->
        <sourceDirectory>src/main/java</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>

        <resources>
            <resource>
                <directory>src/main/resources</directory>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <!-- 排除外置的配置文件(運行時注釋上,使IDE能讀到配置文件;打包時放開注釋讓配置文件外置,方便修改)可以不配置,maven-jar-plugin下面已配置 -->
                <!--<excludes>
                    <exclude>config.properties</exclude>
                </excludes>-->
            </resource>
            <!-- 配置文件外置的資源(存放到conf目錄,也是classpath路徑,下面會配置)-->
            <!--<resource>
                <directory>src/main/resources</directory>
                <includes>
                    <include>config.properties</include>
                </includes>
                <targetPath>${project.build.directory}/conf</targetPath>
            </resource>-->
        </resources>

        <plugins>
            <!--scala編譯打包插件-->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <!--                <groupId>org.scala-tools</groupId>-->
                <!--                <artifactId>maven-scala-plugin</artifactId>-->
                <!--                <version>2.15.2</version>-->
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <!--java編譯打包插件-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
                <executions>
                    <execution>
                        <phase>compile</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <!--
                ③打成一個zip包,發布項目的時候,將zip包copy到服務器上,直接unzip xxx.zip,里面包含要運行到的jar以及依賴的lib,還有配置的config文件,即可直接啟動服務
            -->
            <plugin>
                <artifactId>maven-dependency-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>process-sources</phase>

                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>

                        <configuration>
                            <excludeScope>provided</excludeScope>
                            <outputDirectory>${project.build.directory}/lib</outputDirectory>
                        </configuration>

                    </execution>
                </executions>
            </plugin>
            <!--The configuration of maven-jar-plugin-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>2.4</version>
                <!--The configuration of the plugin-->
                <configuration>
                    <!-- 不打包資源文件(配置文件和依賴包分開) -->
                    <excludes>
                        <!--            <exclude>*.properties</exclude>-->
                        <!--            <exclude>*.xml</exclude>-->
                        <exclude>*.txt</exclude>
                    </excludes>
                    <!--Configuration of the archiver-->
                    <archive>
                        <!--生成的jar中,不要包含pom.xml和pom.properties這兩個文件-->
                        <addMavenDescriptor>false</addMavenDescriptor>
                        <!--Manifest specific configuration-->
                        <manifest>
                            <!--是否把第三方jar放到manifest的classpath中-->
                            <!--              <addClasspath>true</addClasspath>-->
                            <addClasspath>false</addClasspath>
                            <!--生成的manifest中classpath的前綴,因為要把第三方jar放到lib目錄下,所以classpath的前綴是lib/-->
                            <classpathPrefix>lib/</classpathPrefix>
                            <!--應用的main class-->
                            <!--              <mainClass>com.sf.tmlimit.TmLimitPredStream</mainClass>-->
                            <mainClass>ConnectKafkaTest</mainClass>
                        </manifest>
                        <!-- 給清單文件添加鍵值對,增加classpath路徑,這里將conf目錄也設置為classpath路徑 -->
                        <manifestEntries>
                            <!--              <Class-Path>conf/</Class-Path>-->
                            <Class-Path>lib/</Class-Path>
                        </manifestEntries>
                    </archive>
                    <!--過濾掉不希望包含在jar中的文件-->
                    <!-- <excludes>
                         <exclude>${project.basedir}/xml/*</exclude>
                     </excludes>-->
                </configuration>
            </plugin>

            <!--The configuration of maven-assembly-plugin-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.4</version>
                <!--The configuration of the plugin-->
                <configuration>
                    <!--Specifies the configuration file of the assembly plugin-->
                    <descriptors>
                        <descriptor>src/main/assembly/assembly.xml</descriptor>
                    </descriptors>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>


</project>

 

 

參數 解釋
sep 默認是, 指定單個字符分割字段和值
encoding 默認是uft-8通過給定的編碼類型進行解碼
quote 默認是“,其中分隔符可以是值的一部分,設置用於轉義帶引號的值的單個字符。如果您想關閉引號,則需要設置一個空字符串,而不是null。
escape 默認(\)設置單個字符用於在引號里面轉義引號
charToEscapeQuoteEscaping 默認是轉義字符(上面的escape)或者\0,當轉義字符和引號(quote)字符不同的時候,默認是轉義字符(escape),否則為\0
comment 默認是空值,設置用於跳過行的單個字符,以該字符開頭。默認情況下,它是禁用的
header 默認是false,將第一行作為列名
enforceSchema

默認是true, 如果將其設置為true,則指定或推斷的模式將強制應用於數據源文件,而CSV文件中的標頭將被忽略。

如果選項設置為false,則在header選項設置為true的情況下,將針對CSV文件中的所有標題驗證模式。

模式中的字段名稱和CSV標頭中的列名稱是根據它們的位置檢查的,並考慮了*spark.sql.caseSensitive。

雖然默認值為true,但是建議禁用 enforceSchema選項,以避免產生錯誤的結果

inferSchema inferSchema(默認為false`):從數據自動推斷輸入模式。 *需要對數據進行一次額外的傳遞
samplingRatio 默認為1.0,定義用於模式推斷的行的分數
ignoreLeadingWhiteSpace 默認為false,一個標志,指示是否應跳過正在讀取的值中的前導空格
ignoreTrailingWhiteSpace 默認為false一個標志,指示是否應跳過正在讀取的值的結尾空格
nullValue 默認是空的字符串,設置null值的字符串表示形式。從2.0.1開始,這適用於所有支持的類型,包括字符串類型
emptyValue 默認是空字符串,設置一個空值的字符串表示形式
nanValue 默認是Nan,設置非數字的字符串表示形式
positiveInf 默認是Inf
negativeInf 默認是-Inf 設置負無窮值的字符串表示形式
dateFormat

默認是yyyy-MM-dd,設置指示日期格式的字符串。

自定義日期格式遵循java.text.SimpleDateFormat中的格式。這適用於日期類型

timestampFormat

默認是yyyy-MM-dd'T'HH:mm:ss.SSSXXX,設置表示時間戳格式的字符串。

自定義日期格式遵循java.text.SimpleDateFormat中的格式。這適用於時間戳記類型

maxColumns 默認是20480定義多少列數目的硬性設置
maxCharsPerColumn 默認是-1定義讀取的任何給定值允許的最大字符數。默認情況下為-1,表示長度不受限制
mode

默認(允許)允許一種在解析過程中處理損壞記錄的模式。它支持以下不區分大小寫的模式。

請注意,Spark嘗試在列修剪下僅解析CSV中必需的列。因此,損壞的記錄可以根據所需的字段集而有所不同。

可以通過spark.sql.csv.parser.columnPruning.enabled(默認啟用)來控制此行為。

   
mode下面的參數: ---------------------------------------------------
PERMISSIVE

當它遇到損壞的記錄時,將格式錯誤的字符串放入由“ columnNameOfCorruptRecord”配置的*字段中,並將其他字段設置為“ null”。

為了保留損壞的記錄,用戶可以在用戶定義的模式中設置一個名為columnNameOfCorruptRecord

DROPMALFORMED 忽略整個損壞的記錄
FAILFAST 遇到損壞的記錄時引發異常
-----mode參數結束---- -------------------------------------------------------
   
columnNameOfCorruptRecord 默認值指定在spark.sql.columnNameOfCorruptRecord,允許重命名由PERMISSIVE模式創建的格式錯誤的新字段。這會覆蓋spark.sql.columnNameOfCorruptRecord
multiLine 默認是false,解析一條記錄,該記錄可能超過62個columns
   

 

 

 

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM