Spark2.2(三十九):如何根據appName監控spark任務,當任務不存在則啟動(任務存在當超過多久沒有活動狀態則kill,等待下次啟動)


業務需求

實現一個根據spark任務的appName來監控任務是否存在,及任務是否卡死的監控。

1)給定一個appName,根據appName從yarn application -list中驗證任務是否存在,不存在則調用spark-submit.sh腳本來啟動任務;

2)如果任務存在yarn application -list中,則讀取‘監控文件(監控文件內容包括:appId,最新活動時間)’,從監控文件中讀取出最后活動的日期,計算當前日期與app的最后活動日期相差時間為X,如果X大於30minutes(認為任務處於假死狀態[再發布環境發現有的任務DAG拋出OOM,導致app的executor和driver依然存在,當時不執行任務調度,程序卡死。具體錯誤詳情請參考《https://issues.apache.org/jira/browse/SPARK-26452》]),則執行yarn application -kill appId(殺掉任務),等待下次監控腳本執行時重啟任務。

監控實現

腳本

#/bin/sh
#LANG=zh_CN.utf8
#export LANG
export SPARK_KAFKA_VERSION=0.10
export LANG=zh_CN.UTF-8
# export env variable
if [ -f ~/.bash_profile ];
then
   source ~/.bash_profile
fi
source /etc/profile

myAppName='myBatchTopic'               #這里指定需要監控的spark任務的appName,注意:這名字重復了會導致監控失敗。
apps=''

for app in `yarn application -list`
do
  apps=${app},$apps
done
apps=${apps%?}

if [[ $apps =~ $myAppName ]];
then
  echo "appName($myAppName) exists in yarn application list"
  #1)運行 hadop fs -cat /目錄/appName,讀取其中最后更新日期;(如果文件不存在,則跳過等待文件生成。)
  monitorInfo=$(hadoop fs -cat /user/dx/streaming/monitor/${myAppName})
  LD_IFS="$IFS"
  IFS=","
  array=($monitorInfo)
  IFS="$OLD_IFS"  
  appId=${array[0]}
  monitorLastDate=${array[1]}
  echo "loading mintor information 'appId:$appId,monitorLastUpdateDate:$monitorLastDate'"

  current_date=$(date "+%Y-%m-%d %H:%M:%S")
  echo "loading current date '$current_date'"
  
  #2)與當前日期對比:
  # 如果距離當前日期相差小於30min,則不做處理;
  # 如果大於30min則kill job,根據上邊yarn application -list中能獲取對應的appId,運行yarn application -kill appId
  t1=`date -d "$current_date" +%s`
  t2=`date -d "$monitorLastDate" +%s`
  diff_minute=$(($(($t1-$t2))/60))
  echo "current date($current_date) over than monitorLastDate($monitorLastDate) $diff_minute minutes"
  if [ $diff_minute -gt 30 ];
  then
    echo 'over then 30 minutes'
    $(yarn application -kill ${appId})
    echo "kill application ${appId}"
  else
    echo 'less than 30 minutes'
  fi
else
  echo "appName($myAppName) not exists in yarn application list"
  #./submit_x1_x2.sh abc TestRestartDriver #這里指定需要啟動的腳本來啟動相關任務
  $(nohup ./submit_checkpoint2.sh >> ./output.log 2>&1 &)
fi

監控腳本業務流程圖:

監控文件生成

我這里程序是spark structured streaming,因此可以注冊sparkSesssion的streams()的query的監聽事件

sparkSession.streams().addListener(new GlobalStreamingQueryListener(sparkSession。。。))

在監聽事件中實現如下:

public class GlobalStreamingQueryListener extends StreamingQueryListener {
    private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(GlobalStreamingQueryListener.class);
    private static final String monitorBaseDir = "/user/dx/streaming/monitor/";
    private SparkSession sparkSession = null;
    private LongAccumulator triggerAccumulator = null;

    public GlobalStreamingQueryListener(SparkSession sparkSession, LongAccumulator triggerAccumulator) {
        this.sparkSession = sparkSession;
        this.triggerAccumulator = triggerAccumulator;
    }

    @Override
    public void onQueryStarted(QueryStartedEvent queryStarted) {
        System.out.println("Query started: " + queryStarted.id());
    }

    @Override
    public void onQueryTerminated(QueryTerminatedEvent queryTerminated) {
        System.out.println("Query terminated: " + queryTerminated.id());
    }

    @Override
    public void onQueryProgress(QueryProgressEvent queryProgress) {
        System.out.println("Query made progress: " + queryProgress.progress());
        // sparkSession.sql("select * from " +
        // queryProgress.progress().name()).show();

        triggerAccumulator.add(1);
        System.out.println("Trigger accumulator value: " + triggerAccumulator.value());

        logger.info("minitor start .... ");
        try {
            if (HDFSUtility.createDir(monitorBaseDir)) {
                logger.info("Create monitor base dir(" + monitorBaseDir + ") success");
            } else {
                logger.info("Create monitor base dir(" + monitorBaseDir + ") fail");
            }
        } catch (IOException e) {
            logger.error("An error was thrown while create monitor base dir(" + monitorBaseDir + ")");
            e.printStackTrace();
        }

        // spark.app.id application_1543820999543_0193
        String appId = this.sparkSession.conf().get("spark.app.id");
        // spark.app.name myBatchTopic
        String appName = this.sparkSession.conf().get("spark.app.name");
        String mintorFilePath = (monitorBaseDir.endsWith(File.separator) ? monitorBaseDir : monitorBaseDir + File.separator) + appName;
        logger.info("The application's id is " + appId);
        logger.info("The application's name is " + appName);
        logger.warn("If the appName is not unique,it will result in a monitor error");

        try {
            HDFSUtility.overwriter(mintorFilePath, appId + "," + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
        } catch (IOException e) {
            logger.error("An error was thrown while write info to monitor file(" + mintorFilePath + ")");
            e.printStackTrace();
        }
        logger.info("minitor stop .... ");
    }

}
HDFSUtility.java中方法如下:
public class HDFSUtility {
    private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HDFSUtility.class);

    /**
     * 當目錄不存在時,創建目錄。
     * 
     * @param dirPath
     *            目標目錄
     * @return true-創建成功;false-失敗。
     * @throws IOException
     * */
    public static boolean createDir(String dirPath) throws IOException {
        FileSystem fs = null;
        Path dir = new Path(dirPath);
        boolean success = false;
        try {
            fs = FileSystem.get(new Configuration());

            if (!fs.exists(dir)) {
                success = fs.mkdirs(dir);
            } else {
                success = true;
            }
        } catch (IOException e) {
            logger.error("create dir (" + dirPath + ") fail:", e);
            throw e;
        } finally {
            try {
                fs.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        return success;
    }

    /**
     * 覆蓋文件寫入信息
     * 
     * @param filePath
     *            目標文件路徑
     * @param content
     *            被寫入內容
     * @throws IOException
     * */
    public static void overwriter(String filePath, String content) throws IOException {
        FileSystem fs = null;
        // 在指定路徑創建FSDataOutputStream。默認情況下會覆蓋文件。
        FSDataOutputStream outputStream = null;
        Path file = new Path(filePath);

        try {
            fs = FileSystem.get(new Configuration());
            if (fs.exists(file)) {
                System.out.println("File exists(" + filePath + ")");
            }
            outputStream = fs.create(file);
            outputStream.write(content.getBytes());
        } catch (IOException e) {
            logger.error("write into file(" + filePath + ") fail:", e);
            throw e;
        } finally {
            if (outputStream != null) {
                try {
                    outputStream.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            try {
                fs.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM