springboot和hadoop2.7.7集成開發


1、本人在騰訊雲安裝hadoop2.7.7,詳細安裝請看以前的博客

2、pom.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.7.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.hadoop</groupId>
    <artifactId>demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>demo</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
        <hadoop.version>2.7.7</hadoop.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.1.3.RELEASE</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.baomidou/mybatis-plus-boot-starter -->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>2.2.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.20</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>javax.servlet</groupId>
                    <artifactId>servlet-api</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>javax.servlet</groupId>
                    <artifactId>servlet-api</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoop.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>javax.servlet</groupId>
                    <artifactId>servlet-api</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

    <profiles>
        <profile>
            <id>dev</id>
            <properties>
                <activatedProperties>dev</activatedProperties>
            </properties>
            <!-- 這里代表默認使用dev環境配置文件 -->
            <activation>
                <activeByDefault>true</activeByDefault>
            </activation>
        </profile>
        <profile>
            <id>test</id>
            <properties>
                <activatedProperties>test</activatedProperties>
            </properties>
        </profile>
        <profile>
            <id>prod</id>
            <properties>
                <activatedProperties>prod</activatedProperties>
            </properties>
        </profile>
    </profiles>

</project>
View Code

3、yml文件

spring:
  profiles:
    active: @activatedProperties@
  main:
    show-banner: false
hadoop.name-node: hdfs://132.232.44.82:9000
hadoop.namespace: /mydata
#  hadoop:
#    fsUri: hdfs://132.232.44.82:9000

#端口
server:
  port: 8080

#log日志
logging:
  level:
    com:
      hadoop:
        demo:
          dao: debug
#mybatis-plus
mybatis-plus:
    typeAliasesPackage: com.hadoop.demo.entity
    mapperLocations: classpath:mapper/*.xml

---
#開發配置
spring:
  profiles: dev
  datasource:
    url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=GMT%2B8
    #url: jdbc:mysql://localhost:3306/test
    username: root
    password: yang156122
    driver-class-name: com.mysql.jdbc.Driver
    # 使用druid數據源
    type: com.alibaba.druid.pool.DruidDataSource
View Code

4、HdfsUtils.java

package com.hadoop.demo.utils;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;

import java.io.IOException;
import java.net.URI;

/**
 * hdfs基本操作
 */
@Slf4j
public class HdfsUtils {

    /**
     * 獲取文件系統
     * @param hdfsUri  nameNode地址 如"hdfs://10.10.1.142:9000"
     * @return
     */
    public static FileSystem getFileSystem(String hdfsUri) {
        //讀取配置文件
        Configuration conf = new Configuration();
        // 文件系統
        FileSystem fs = null;
        if(StringUtils.isBlank(hdfsUri)){
            // 返回默認文件系統  如果在 Hadoop集群下運行,使用此種方法可直接獲取默認文件系統
            try {
                fs = FileSystem.get(conf);
            } catch (IOException e) {
                log.error("", e);
            }
        }else{
            // 返回指定的文件系統,如果在本地測試,需要使用此種方法獲取文件系統
            try {
                URI uri = new URI(hdfsUri.trim());
                fs = FileSystem.get(uri,conf);
            } catch (Exception e) {
                log.error("", e);
            }
        }
        return fs;
    }

    /**
     * 創建文件目錄
     *
     * @param hdfsUri
     * @param path
     */
    public static void mkdir(String hdfsUri, String path) {
        try {
            // 獲取文件系統
            FileSystem fs = getFileSystem(hdfsUri);
            if(StringUtils.isNotBlank(hdfsUri)){
                path = hdfsUri + path;
            }
            // 創建目錄
            fs.mkdirs(new Path(path));
            //釋放資源
            fs.close();
        } catch (IllegalArgumentException | IOException e) {
            log.error("", e);
        }
    }

    /**
     * 刪除文件或者文件目錄
     *
     * @param path
     */
    public static void rmdir(String hdfsUri,String path) {
        try {
            // 返回FileSystem對象
            FileSystem fs = getFileSystem(hdfsUri);
            if(StringUtils.isNotBlank(hdfsUri)){
                path = hdfsUri + path;
            }
            // 刪除文件或者文件目錄  delete(Path f) 此方法已經棄用
            fs.delete(new Path(path),true);
            // 釋放資源
            fs.close();
        } catch (IllegalArgumentException | IOException e) {
            log.error("", e);
        }
    }

    /**
     * 根據filter獲取目錄下的文件
     *
     * @param path
     * @param pathFilter
     * @return String[]
     */
    public static String[] listFile(String hdfsUri, String path,PathFilter pathFilter) {
        String[] files = new String[0];
        try {
            // 返回FileSystem對象
            FileSystem fs = getFileSystem(hdfsUri);

            if(StringUtils.isNotBlank(hdfsUri)){
                path = hdfsUri + path;
            }

            FileStatus[] status;
            if(pathFilter != null){
                // 根據filter列出目錄內容
                status = fs.listStatus(new Path(path),pathFilter);
            }else{
                // 列出目錄內容
                status = fs.listStatus(new Path(path));
            }
            // 獲取目錄下的所有文件路徑
            Path[] listedPaths = FileUtil.stat2Paths(status);
            // 轉換String[]
            if (listedPaths != null && listedPaths.length > 0){
                files = new String[listedPaths.length];
                for (int i = 0; i < files.length; i++){
                    files[i] = listedPaths[i].toString();
                }
            }
            // 釋放資源
            fs.close();
        } catch (IllegalArgumentException | IOException e) {
            log.error("", e);
        }
        return files;
    }

    /**
     * 文件上傳至 HDFS
     * @param hdfsUri
     * @param delSrc       指是否刪除源文件,true為刪除,默認為false
     * @param overwrite
     * @param srcFile      源文件,上傳文件路徑
     * @param destPath     hdfs的目的路徑
     */
    public static void copyFileToHDFS(String hdfsUri,boolean delSrc, boolean overwrite,String srcFile,String destPath) {
        // 源文件路徑是Linux下的路徑,如果在 windows 下測試,需要改寫為Windows下的路徑,比如D://hadoop/djt/weibo.txt
        Path srcPath = new Path(srcFile);

        // 目的路徑
        if(StringUtils.isNotBlank(hdfsUri)){
            destPath = hdfsUri + destPath;
        }
        Path dstPath = new Path(destPath);
        // 實現文件上傳
        try {
            // 獲取FileSystem對象
            FileSystem fs = getFileSystem(hdfsUri);
            fs.copyFromLocalFile(srcPath, dstPath);
            fs.copyFromLocalFile(delSrc,overwrite,srcPath, dstPath);
            //釋放資源
            fs.close();
        } catch (IOException e) {
            log.error("", e);
        }
    }

    /**
     * 從 HDFS 下載文件
     *
     * @param srcFile
     * @param destPath 文件下載后,存放地址
     */
    public static void getFile(String hdfsUri, String srcFile,String destPath) {
        // 源文件路徑
        if(StringUtils.isNotBlank(hdfsUri)){
            srcFile = hdfsUri + srcFile;
        }
        Path srcPath = new Path(srcFile);
        Path dstPath = new Path(destPath);
        try {
            // 獲取FileSystem對象
            FileSystem fs = getFileSystem(hdfsUri);
            // 下載hdfs上的文件
            fs.copyToLocalFile(srcPath, dstPath);
            // 釋放資源
            fs.close();
        } catch (IOException e) {
            log.error("", e);
        }
    }

    /**
     * 獲取 HDFS 集群節點信息
     *
     * @return DatanodeInfo[]
     */
    public static DatanodeInfo[] getHDFSNodes(String hdfsUri) {
        // 獲取所有節點
        DatanodeInfo[] dataNodeStats = new DatanodeInfo[0];
        try {
            // 返回FileSystem對象
            FileSystem fs = getFileSystem(hdfsUri);
            // 獲取分布式文件系統
            DistributedFileSystem hdfs = (DistributedFileSystem)fs;
            dataNodeStats = hdfs.getDataNodeStats();
        } catch (IOException e) {
            log.error("", e);
        }
        return dataNodeStats;
    }

    /**
     * 查找某個文件在 HDFS集群的位置
     *
     * @param filePath
     * @return BlockLocation[]
     */
    public static BlockLocation[] getFileBlockLocations(String hdfsUri, String filePath) {
        // 文件路徑
        if(StringUtils.isNotBlank(hdfsUri)){
            filePath = hdfsUri + filePath;
        }
        Path path = new Path(filePath);

        // 文件塊位置列表
        BlockLocation[] blkLocations = new BlockLocation[0];
        try {
            // 返回FileSystem對象
            FileSystem fs = getFileSystem(hdfsUri);
            // 獲取文件目錄
            FileStatus filestatus = fs.getFileStatus(path);
            //獲取文件塊位置列表
            blkLocations = fs.getFileBlockLocations(filestatus, 0, filestatus.getLen());
        } catch (IOException e) {
            log.error("", e);
        }
        return blkLocations;
    }


    /**
     * 判斷目錄是否存在
     * @param hdfsUri
     * @param filePath
     * @param create
     * @return
     */
    public boolean existDir(String hdfsUri,String filePath, boolean create){
        boolean flag = false;

        if (StringUtils.isEmpty(filePath)){
            return flag;
        }
        try{
            Path path = new Path(filePath);
            // FileSystem對象
            FileSystem fs = getFileSystem(hdfsUri);
            if (create){
                if (!fs.exists(path)){
                    fs.mkdirs(path);
                }
            }
            if (fs.isDirectory(path)){
                flag = true;
            }
        }catch (Exception e){
            log.error("", e);
        }

        return flag;
    }
}
View Code

5、HadoopConfig.java

package com.hadoop.demo.config;

import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.FileSystem;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.net.URI;


@Configuration
@ConditionalOnProperty(name="hadoop.name-node")
@Slf4j
public class HadoopConfig {

    @Value("${hadoop.name-node}")
    private String nameNode;

    /**
     * Configuration conf=new Configuration();
     * 創建一個Configuration對象時,其構造方法會默認加載hadoop中的兩個配置文件,
     * 分別是hdfs-site.xml以及core-site.xml,這兩個文件中會有訪問hdfs所需的參數值,
     * 主要是fs.default.name,指定了hdfs的地址,有了這個地址客戶端就可以通過這個地址訪問hdfs了。
     * 即可理解為configuration就是hadoop中的配置信息。
     * @return
     */
    @Bean("fileSystem")
    public FileSystem createFs() throws Exception{
        //讀取配置文件
        org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();

        conf.set("fs.defalutFS", nameNode);
        conf.set("dfs.replication", "1");
        FileSystem fs = null;
        //conf.set("fs.defaultFS","hdfs://ns1");
        //指定訪問hdfs的客戶端身份
        //fs = FileSystem.get(new URI(nameNode), conf, "root");
        // 文件系統

        // 返回指定的文件系統,如果在本地測試,需要使用此種方法獲取文件系統
        try {
            URI uri = new URI(nameNode.trim());
            fs = FileSystem.get(uri,conf,"root");
        } catch (Exception e) {
            log.error("", e);
        }

        System.out.println("fs.defaultFS: "+conf.get("fs.defaultFS"));
        return  fs;
    }
}
View Code

6、HadoopTemplate.java

package com.hadoop.demo.config;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.io.IOException;


@Component
@ConditionalOnBean(FileSystem.class)
@Slf4j
public class HadoopTemplate {

    @Autowired
    private FileSystem fileSystem;

    @Value("${hadoop.name-node}")
    private String nameNode;

    @Value("${hadoop.namespace:/}")
    private String nameSpace;

    @PostConstruct
    public void init(){
        existDir(nameSpace,true);
    }

    public void uploadFile(String srcFile){
        copyFileToHDFS(false,true,srcFile,nameSpace);
    }

    public void uploadFile(boolean del,String srcFile){
        copyFileToHDFS(del,true,srcFile,nameSpace);
    }

    public void uploadFile(String srcFile,String destPath){
        copyFileToHDFS(false,true,srcFile,destPath);
    }

    public void uploadFile(boolean del,String srcFile,String destPath){
        copyFileToHDFS(del,true,srcFile,destPath);
    }

    public void delFile(String fileName){
        rmdir(nameSpace,fileName) ;
    }

    public void delDir(String path){
        nameSpace = nameSpace + "/" +path;
        rmdir(path,null) ;
    }

    public void download(String fileName,String savePath){
        getFile(nameSpace+"/"+fileName,savePath);
    }


    /**
     * 創建目錄
     * @param filePath
     * @param create
     * @return
     */
    public boolean existDir(String filePath, boolean create){
        boolean flag = false;
        if(StringUtils.isEmpty(filePath)){
            throw new IllegalArgumentException("filePath不能為空");
        }
        try{
            Path path = new Path(filePath);
            if (create){
                if (!fileSystem.exists(path)){
                    fileSystem.mkdirs(path);
                }
            }
            if (fileSystem.isDirectory(path)){
                flag = true;
            }
        }catch (Exception e){
            log.error("", e);
        }
        return flag;
    }




    /**
     * 文件上傳至 HDFS
     * @param delSrc       指是否刪除源文件,true為刪除,默認為false
     * @param overwrite
     * @param srcFile      源文件,上傳文件路徑
     * @param destPath     hdfs的目的路徑
     */
    public  void copyFileToHDFS(boolean delSrc, boolean overwrite,String srcFile,String destPath) {
        // 源文件路徑是Linux下的路徑,如果在 windows 下測試,需要改寫為Windows下的路徑,比如D://hadoop/djt/weibo.txt
        Path srcPath = new Path(srcFile);

        // 目的路徑
        if(StringUtils.isNotBlank(nameNode)){
            destPath = nameNode + destPath;
        }
        Path dstPath = new Path(destPath);
        // 實現文件上傳
        try {
            // 獲取FileSystem對象
            fileSystem.copyFromLocalFile(srcPath, dstPath);
            fileSystem.copyFromLocalFile(delSrc,overwrite,srcPath, dstPath);
            //釋放資源
            //    fileSystem.close();
        } catch (IOException e) {
            log.error("", e);
        }
    }


    /**
     * 刪除文件或者文件目錄
     *
     * @param path
     */
    public void rmdir(String path,String fileName) {
        try {
            // 返回FileSystem對象
            if(StringUtils.isNotBlank(nameNode)){
                path = nameNode + path;
            }
            if(StringUtils.isNotBlank(fileName)){
                path =  path + "/" +fileName;
            }
            // 刪除文件或者文件目錄  delete(Path f) 此方法已經棄用
            fileSystem.delete(new Path(path),true);
        } catch (IllegalArgumentException | IOException e) {
            log.error("", e);
        }
    }

    /**
     * 從 HDFS 下載文件
     *
     * @param hdfsFile
     * @param destPath 文件下載后,存放地址
     */
    public void getFile(String hdfsFile,String destPath) {
        // 源文件路徑
        if(StringUtils.isNotBlank(nameNode)){
            hdfsFile = nameNode + hdfsFile;
        }
        Path hdfsPath = new Path(hdfsFile);
        Path dstPath = new Path(destPath);
        try {
            // 下載hdfs上的文件
            fileSystem.copyToLocalFile(hdfsPath, dstPath);
            // 釋放資源
            // fs.close();
        } catch (IOException e) {
            log.error("", e);
        }
    }

    public String getNameSpace(){
        return nameSpace;
    }


}
View Code

7、HdfsController.java

package com.hadoop.demo.controller;

import com.hadoop.demo.config.HadoopTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class HdfsController {

    @Autowired
    private HadoopTemplate hadoopTemplate;

    /**
     * 將本地文件srcFile,上傳到hdfs
     * @param srcFile
     * @return
     */
    @RequestMapping("/upload")
    public String upload(@RequestParam String srcFile){
        hadoopTemplate.uploadFile(srcFile);
        return "copy";
    }

    @RequestMapping("/delFile")
    public String del(@RequestParam String fileName){
        hadoopTemplate.delFile(fileName);
        return "delFile";
    }

    @RequestMapping("/download")
    public String download(@RequestParam String fileName,@RequestParam String savePath){
        hadoopTemplate.download(fileName,savePath);
        return "download";
    }
}
View Code

注意!注意!基本的代碼已經可以了,接下來就是遇到的坑!!!

1、Failed to locate the winutils binary in the hadoop binary path,這個錯誤表示沒有安裝本地的hadoop

    
解決方法:
    下載:https://github.com/srccodes/hadoop-common-2.7.1-bin
    將bin目錄的文件復制到hadoop的目錄中(覆蓋原有的文件哦!)
    
    win10添加系統環境變量
        HADOOP_HOME :D:\software\hadoop-dev\hadoop-common-2.7.1-bin-master
     CLASSPATH :D:\software\hadoop-dev\hadoop-2.7.7\bin\winutils.exe 在path中將變量添加進去 %HADOOP_HOME%\bin

2、直接在啟動類中添加System.setProperty("hadoop.home.dir", "D:\\software\\hadoop-dev\\hadoop-2.7.7");

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;

@MapperScan("com.hadoop.demo.dao")
@SpringBootApplication
//@ComponentScan(value = "com.hadoop.demo.config")
public class DemoApplication {

    public static void main(String[] args) {
       System.setProperty("hadoop.home.dir", "D:\\software\\hadoop-dev\\hadoop-2.7.7");
        SpringApplication.run(DemoApplication.class, args);
    }

}

3、遠程訪問9000端口,被拒絕訪問,127.0.0.1:9000 表示只允許本機訪問,所以要修改hosts文件

tcp        0      0 127.0.0.1:9000          0.0.0.0:*               LISTEN      8593/java 

修改:

132.232.44.82  localhost.localdomain localhost
132.232.44.82  localhost4.localdomain4 localhost4

0.0.0.0 medecineit
::1 localhost.localdomain localhost
::1 localhost6.localdomain6 localhost6

坑也只有這幾個!完畢!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM