通過livy向CDH集群的spark提交任務


 場景

應用后台遠程調用spark任務

 

簡單介紹下livy

Submit Jobs from Anywhere

Livy enables programmatic, fault-tolerant, multi-tenant submission of Spark jobs from web/mobile apps (no Spark client needed). So, multiple users can interact with your Spark cluster concurrently and reliably.

Use Interactive Scala or Python

Livy speaks either Scala or Python, so clients can communicate with your Spark cluster via either language remotely. Also, batch job submissions can be done in Scala, Java, or Python.

No Code Changes Needed

Don’t worry, no changes to existing programs are needed to use Livy. Just build Livy with Maven, deploy the configuration file to your Spark cluster, and you’re off! 

Livy是基於Apache許可的一個服務,它可以讓遠程應用通過REST API比較方便的與Spark集群交互。通過簡單的REST接口或RPC客戶端庫,它可以讓你輕松的提交Spark作業或者Spark代碼片段,同步或者異步的結果檢索,以及SparkContext管理。Livy還簡化了Spark和應用程序服務器之間的交互,從而為web/mobile應用簡化Spark架構。

主要功能有:
1.由多個客戶端為多個Spark作業使用長時間運行的SparkContexts。
2.同時管理多個SparkContexts,讓它們在集群中(YARN/Mesos)運行,從而實現很好的容錯和並發,而不是在Livy服務上運行。
3.預編譯的jars,代碼片段或者Java/Scala客戶端API都可以用來提交作業。
4.安全認證的通信。(比如kerberos)

 

 

livy的rest api(參考鏈接

代碼如下

依賴:

<dependency>
    <groupId>org.apache.httpcomponents</groupId>
    <artifactId>httpclient</artifactId>
    <version>4.5.4</version>
</dependency>

<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.47</version>
</dependency>

 

demo代碼

package com.dtmobile.livy;

import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;

public class HttpUtils {

    public static HttpURLConnection init(HttpURLConnection conn){
        conn.setDoInput(true);
        conn.setDoOutput(true);
        conn.setRequestProperty("charset","utf-8");
        conn.setRequestProperty("Content-Type","application/json");
        return conn;
    }

    /**
     * HttpGET請求
     */
    public static JSONObject getAccess(String urlStr) {

        HttpURLConnection conn = null;
        BufferedReader in = null;
        StringBuilder builder = null;
        JSONObject response = null;
        try {
            URL url = new URL(urlStr);
            conn = init((HttpURLConnection) url.openConnection());
            conn.setRequestMethod("GET");
            conn.connect();

            in = new BufferedReader(new InputStreamReader(conn.getInputStream(), "utf-8"));
            String line = "";
            builder = new StringBuilder();
            while((line = in.readLine()) != null){
                builder.append(line);
            }
            
            response = JSON.parseObject(builder.toString());

        }catch (Exception e){
            e.printStackTrace();
        }finally {
            if (conn!=null)
                conn.disconnect();
            try {
                if (in != null)
                    in.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        return response;
    }

    /**
     * HttpDelete請求
     */
    public static Boolean deleteAccess(String urlStr) {
        HttpURLConnection conn = null;
        try {
            URL url = new URL(urlStr);
            conn = init((HttpURLConnection) url.openConnection());
            conn.setRequestMethod("DELETE");
            conn.connect();

            conn.getInputStream().close();

            conn.disconnect();
        }catch (Exception e){
            e.printStackTrace();
            return false;
        }

        return true;
    }



    /**
     * HttpPost請求
     */
    public static String postAccess(String urlStr, JSONObject data)  {
        HttpURLConnection conn = null;
        BufferedReader in = null;
        StringBuilder builder = null;
        DataOutputStream out = null;
        try {
            URL url = new URL(urlStr);
            conn = init((HttpURLConnection) url.openConnection());
            conn.setRequestMethod("POST");
            conn.connect();

            out = new DataOutputStream(conn.getOutputStream());
            out.write(data.toString().getBytes("utf8"));
            out.flush();

            in = new BufferedReader(new InputStreamReader(conn.getInputStream(), "utf-8"));
            String line = "";
            builder = new StringBuilder();
            while((line = in.readLine()) != null){
                builder.append(line);
            }

        }catch (Exception e){
            e.printStackTrace();
        }finally {
            if (conn!= null)
                conn.disconnect();
            try {
                if (in!=null)
                    in.close();
                if (out!=null)
                    out.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        if (builder != null)
            return builder.toString();
        return "";
    }

}

 

 

package cn.com.dtmobile.livy;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;

public class LivyApp {

    static String host = "http://172.xx.x.xxx:8998";

    public static int submitJob() throws JSONException {
        JSONObject data = new JSONObject();

        JSONObject conf = new JSONObject();//spark相關配置
        conf.put("spark.master","yarn");
     conf.put("spark.submit.deployMode","cluster");
        
        data.put("conf",conf);
        data.put("proxyUser","etluser");
        data.put("file","/kong/data/jar/your_jar.jar");// 指定執行的spark jar (hdfs路徑)
        data.put("jars",new String[]{"/kong/data/jar/dbscan-on-spark_2.11-0.2.0-SNAPSHOT.jar"});//指定spark jar依賴的外部jars
        data.put("className", "cn.com.dtmobile.spark.App");
        data.put("name","jonitsiteplan");
        data.put("executorCores",3);
        data.put("executorMemory","2g");
        data.put("driverCores",1);
        data.put("driverMemory","4g");
        data.put("numExecutors",6);
        data.put("queue","default");
        data.put("args",new String[]{"杭州","yj_hangzhou","2019041719"});//傳遞參數
        
        String res = HttpUtils.postAccess(host + "/batches", data);
        
        JSONObject resjson = JSON.parseObject(res);
        System.out.println("id:"+resjson.getIntValue("id"));
        return resjson.getIntValue("id");
    }

    public static void getJobInfo(int id){
//        JSONObject response = HttpUtils.getAccess(host + "/batches/3");
//        System.out.print(response.toString(1));

//        JSONObject log = HttpUtils.getAccess(host + "/batches/3/log");
//        System.out.print(log.toString(1));

        JSONObject state = HttpUtils.getAccess(host + "/batches/"+id+"/state");
        System.out.println(state.getString("state"));
    }

    public static void killJob(int id){
        // 可以直接kill掉spark任務
        if(HttpUtils.deleteAccess(host+"/batches/"+id)) {
            System.out.println("kill spark job success");
        }
        
    }

    public static void main(String[] args) {
        int id  = submitJob();
        while(true) {
            try {
                getJobInfo(id);
                Thread.sleep(10000);
            } catch (InterruptedException e) {
            }
        }
        
//        killJob(9);
    }


}

 

執行提交以后,可以在livy的UI界面上任務的狀態,也可以通過api輪詢的獲取任務狀態,以及任務的輸出日志

yarn界面

 


查看任務日志

查看spark任務執行的日志可以查看 livy rest server的角色日志,如下:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM