大數據之Oozie——源碼分析(一)程序入口


工作中發現在oozie中使用sqoop與在shell中直接調度sqoop性能上有很大的差異。為了更深入的探索其中的緣由,開始了oozie的源碼分析之路。今天第一天閱讀源碼,由於沒有編譯成功,不能運行測試用例,直接使用sublime肉眼閱讀,還是挺費勁的。

雖然流程還不是順暢,但是大體上的內容還算是了解了。

我這里使用的是oozie4.2的版本,之前稍微看過4.3版本的,源碼上還是有一定的差異的。

看上面的圖,大致理解oozie的過程是:

  • oozie cli提交任務
  • oozie server創建一個對應任務的client
  • client去提交相應的任務

oozie工程結構

最重要的就是三個:

  • 1 client 這是任務提交的入口
  • 2 core 這是oozie的核心(在3中好像拆分成了core和server)
  • 3 distro 這里保存了啟動腳本

尋找源碼入口

  • 一種方式是直接以文件夾搜索main方法。
  • 另一種是看它的啟動腳本。

在啟動腳本中oozie.cmd,有這樣一句:

%JAVA_BIN% %JAVA_PROPERTIES% -cp %OOZIECPPATH% org.apache.oozie.cli.OozieCLI %OOZIE_PROPERTIES%

可見,入口在org.apache.oozie.cli.OozieCLI這個類中,那就從它開始吧。

sqoop作業的提交

首先是OozieCLI的入口main方法:

public static void main(String[] args) {
        //oozie方法的入口
        if (!System.getProperties().containsKey(AuthOozieClient.USE_AUTH_TOKEN_CACHE_SYS_PROP)) {
            System.setProperty(AuthOozieClient.USE_AUTH_TOKEN_CACHE_SYS_PROP, "true");
        }
        System.exit(new OozieCLI().run(args));
    }

前面是一些認證的東西,可以忽略,直接進入run方法:

public synchronized int run(String[] args) {
        //保證clent僅啟動一次
        if (used) {
            throw new IllegalStateException("CLI instance already used");
        }
        used = true;
        //創建參數解析器
        final CLIParser parser = getCLIParser();
        try {
            final CLIParser.Command command = parser.parse(args);

            String doAsUser = command.getCommandLine().getOptionValue(DO_AS_OPTION);

            if (doAsUser != null) {
                OozieClient.doAs(doAsUser, new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        processCommand(parser, command);
                        return null;
                    }
                });
            }
            else {
                processCommand(parser, command);
            }
            return 0;
        }
        ...
    }

主要的內容是在這個processCommand里面,processCommand會根據命令調用相應的命令方法:

public void processCommand(CLIParser parser, CLIParser.Command command) throws Exception {
        if (command.getName().equals(HELP_CMD)) {
            parser.showHelp(command.getCommandLine());
        }
        else if (command.getName().equals(JOB_CMD)) {
            jobCommand(command.getCommandLine());
        }
        else if (command.getName().equals(JOBS_CMD)) {
            jobsCommand(command.getCommandLine());
        }
        else if (command.getName().equals(ADMIN_CMD)) {
            adminCommand(command.getCommandLine());
        }
        else if (command.getName().equals(VERSION_CMD)) {
            versionCommand();
        }
        else if (command.getName().equals(VALIDATE_CMD)) {
            validateCommand(command.getCommandLine());
        }
        else if (command.getName().equals(SLA_CMD)) {
            slaCommand(command.getCommandLine());
        }
        else if (command.getName().equals(PIG_CMD)) {
            scriptLanguageCommand(command.getCommandLine(), PIG_CMD);
        }
        else if (command.getName().equals(HIVE_CMD)) {
            scriptLanguageCommand(command.getCommandLine(), HIVE_CMD);
        }
        else if (command.getName().equals(SQOOP_CMD)) {
            sqoopCommand(command.getCommandLine());//我關注的sqoop在這里
        }
        else if (command.getName().equals(INFO_CMD)) {
            infoCommand(command.getCommandLine());
        }
        else if (command.getName().equals(MR_CMD)){
            mrCommand(command.getCommandLine());
        }
    }

在sqoopCommand方法里面,sqoop任務被提交:

private void sqoopCommand(CommandLine commandLine) throws IOException, OozieCLIException {
        List<String> args = commandLine.getArgList();
        if (args.size() > 0) {
            // checking if args starts with -X (because CLIParser cannot check this)
            if (!args.get(0).equals("-X")) {
                throw new OozieCLIException("Unrecognized option: " + args.get(0) + " Expecting -X");
            }
            args.remove(0);
        }

        if (!commandLine.hasOption(SQOOP_COMMAND_OPTION)) {
            throw new OozieCLIException("Need to specify -command");
        }

        if (!commandLine.hasOption(CONFIG_OPTION)) {
            throw new OozieCLIException("Need to specify -config <configfile>");
        }

        try {
            XOozieClient wc = createXOozieClient(commandLine);
            Properties conf = getConfiguration(wc, commandLine);
            String[] command = commandLine.getOptionValues(SQOOP_COMMAND_OPTION);
            System.out.println(JOB_ID_PREFIX + wc.submitSqoop(conf, command, args.toArray(new String[args.size()])));
        }
        catch (OozieClientException ex) {
            throw new OozieCLIException(ex.toString(), ex);
        }
    }

最重要的內容就在這幾行:

XOozieClient wc = createXOozieClient(commandLine);
Properties conf = getConfiguration(wc, commandLine);
String[] command = commandLine.getOptionValues(SQOOP_COMMAND_OPTION);
System.out.println(JOB_ID_PREFIX + wc.submitSqoop(conf, command, args.toArray(new String[args.size()])));

其中wc.submitSqoop提交了sqoop的任務。

后續問題

  • 1 任務提交到了哪里?
  • 2 在提交任務的時候都做了很么?
  • 3 如何在mapreduce開啟一個新的sqoop的?
  • 4 為什么在yarn中可以同時看到兩個應用,一個oozie,一個是sqoop

參考

1 oozie(4.1.0)架構及二次開發流程


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM