1.關於S3,S3N和S3A的區別與聯系(wiki:https://wiki.apache.org/hadoop/AmazonS3) S3 Native FileSystem (URI scheme: s3n) A native filesystem for reading and writing regular files on S3. The advantage of this filesystem is that you can access files on S3 that were written with other tools. Conversely, other tools can access files written using Hadoop. The disadvantage is the 5GB limit on file size imposed by S3. S3A (URI scheme: s3a) A successor to the S3 Native, s3n fs, the S3a: system uses Amazon's libraries to interact with S3. This allows S3a to support larger files (no more 5GB limit), higher performance operations and more. The filesystem is intended to be a replacement for/successor to S3 Native: all objects accessible from s3n:// URLs should also be accessible from s3a simply by replacing the URL schema. S3 Block FileSystem (URI scheme: s3) A block-based filesystem backed by S3. Files are stored as blocks, just like they are in HDFS. This permits efficient implementation of renames. This filesystem requires you to dedicate a bucket for the filesystem - you should not use an existing bucket containing files, or write other files to the same bucket. The files stored by this filesystem can be larger than 5GB, but they are not interoperable with other S3 tools. 2.如何選擇S3訪問協議 由上面介紹可知,首先是三種協議的訪問大小有區別;其次S3是block-based,s3n/s3a是object-based,最后S3A是apache推薦的訪問方式,且S3訪問方式將會慢慢被替代,AWS不贊成使用S3訪問,且S3A更加穩定安全高效,需要注意的是hadoop2.6版本對於S3A支持有bug,所以推薦使用hadoop2.7.x使用s3a協議訪問 3.關於jar包的選擇 在Hadoop當中,訪問S3文件,需要導入aws-sdk包,這個包里有個s3的子服務供Java語言訪問S3,其中會調用hadoop-aws包解析協議,這個包在hadoop2.6.x版本之前是由hadoop-core維護的,因此如果使用hadoop2.4.x,里面會有一個這樣的類:org.apache.hadoop.fs.s3native.NativeS3FileSystem,但是在hadoop2.6.x版本就沒有了,因為Apache將訪問S3的包從hadoop-core包解耦到hadoop-aws包,由AWS維護 aws有v2,v3,v4幾種簽名算法,而jets3t庫只有0.9.4版本才支持V4,而hadoop低版本里引入的是老的jar包,因此當你使用s3n方式時最好額外引入jets3t包,而如果使用s3a就不需要了,因為s3a采用的是com.amazonaws.http.AmazonHttpClient協議,而不是jets3t 本次測試引入的jar,主要包含以下幾個: <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <scala.version>2.11.8</scala.version> <spark.version>2.2.0</spark.version> <hadoop.version>2.7.3</hadoop.version> <spark.pom.scope>compile</spark.pom.scope> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> <scope>${spark.pom.scope}</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> <scope>${spark.pom.scope}</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> <scope>${spark.pom.scope}</scope> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>2.6.3</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.6.3</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-annotations</artifactId> <version>2.6.3</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-aws</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>net.java.dev.jets3t</groupId> <artifactId>jets3t</artifactId> <version>0.9.4</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpcore</artifactId> <version>4.4</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.4</version> </dependency> </dependencies> 4.踩坑 讀取代碼很簡單,如下。需要注意這里面用了s3a協議,也是推薦的訪問方式 val rdd = spark.sparkContext.textFile("s3a://xxx/part-00000") println(rdd.count()) 即便我本機已經配置好了aws cli,但是執行的時候依然會遇到如下問題 Exception in thread "main" com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain 只好在代碼里配置下 spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "") spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "") 配置好后重新執行,依然會報錯: com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 403, AWS Service: Amazon S3, AWS Request ID: xx, AWS Error Code: null, AWS Error Message: Forbidden, S3 Extended Request ID: xxx 出現這個問題的原因找了半天,后來才曉得aws-sdk調用rest服務,而調用rest服務需要指定endpoint的(http://docs.aws.amazon.com/zh_cn/AmazonS3/latest/dev/VirtualHosting.html),但是aws提供的默認的endpoint是在com.amazonaws.services.s3.internal.Constants這個類的HOSTNAME值是s3.amazonaws.com,而中國區的hostname應該是s3.cn-north-1.amazonaws.com.cn。因此有兩種方式可以處理,一是新建個同樣包名的Constants類替換其值(不建議,代碼不友好),另外一種是直接在代碼的conf里設置 spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "s3.cn-north-1.amazonaws.com.cn") 到這基本可以在本地讀取s3了,但是如果放在emr環境還是有些問題,首先emr環境可以不設置aws的key和secret。其次EMR集群中的Hadoop是通過EMRFS的方式訪問S3的,會把s3和s3n都轉成s3,這等同於在本地Hadoop中使用s3n,而EMR環境是根本不支持s3a的(報403錯誤),EMR官方建議在代碼中使用s3來訪問文件 完整實例代碼: object SparkS3Test { def main(args: Array[String]) { val spark = SparkSession.builder() .master("local[*]") .config("spark.eventLog.enabled", "false") .config("spark.driver.memory", "2g") .config("spark.executor.memory", "2g") .appName("SparkDemoFromS3") .getOrCreate() spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "") spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "") spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "s3.cn-north-1.amazonaws.com.cn") val rdd = spark.sparkContext.textFile("s3a://xxx/part-00000") println(rdd.count()) } }