kettle 日志 解析功能


解析 kettle 日志文件

將 文本文件轉成字符串

private String readInput(File file) {   
    StringBuffer buffer = new StringBuffer();   
    try {   
        FileInputStream fis = new FileInputStream(file);   
        InputStreamReader isr = new InputStreamReader(fis, "GBK");   
        Reader in = new BufferedReader(isr);   
        int i;   
        while ((i= in.read()) > -1) {   
            buffer.append((char) i);   
        }   
        in.close();   
        return buffer.toString();   
    } catch (IOException e) {   
        e.printStackTrace();   
        return null;   
    }   
}

將字符串寫入文件中

private void writeOutput(String str, File file) {   
    try {   
        FileOutputStream fos = new FileOutputStream(file);   
        Writer out = new OutputStreamWriter(fos, "GBK");   
        out.write(str);   
        out.close();   
    } catch (IOException e) {   
        e.printStackTrace();   
    }   
} 

存儲解析的 kettle 信息的 日志類:

public class KettleLogParse {
	// 表示第幾張表
	private int tableNumber;
	// 表示表名
	private String tableName;
	// 表示抽取是否成功
	private boolean isSuccess;
	// 表示警告的數量
	private int warnNumber;
	// 表示抽取的數量
	private int dataNumber;
	// 日志添加在第幾行
	private int lineNumber;

	public int getTableNumber() {
		return tableNumber;
	}

	public void setTableNumber(int tableNumber) {
		this.tableNumber = tableNumber;
	}

	public boolean isSuccess() {
		return isSuccess;
	}

	public void setSuccess(boolean isSuccess) {
		this.isSuccess = isSuccess;
	}

	public int getWarnNumber() {
		return warnNumber;
	}

	public void setWarnNumber(int warnNumber) {
		this.warnNumber = warnNumber;
	}

	public int getDataNumber() {
		return dataNumber;
	}

	public void setDataNumber(int dataNumber) {
		this.dataNumber = dataNumber;
	}

	public String getTableName() {
		return tableName;
	}

	public void setTableName(String tableName) {
		this.tableName = tableName;
	}

	public int getLineNumber() {
		return lineNumber;
	}

	public void setLineNumber(int lineNumber) {
		this.lineNumber = lineNumber;
	}

	@Override
	public String toString() {
		String flag = isSuccess == true ? ",抽取成功,共" : ",抽取失敗,共";
		return "表" + tableNumber + ":" + tableName + flag + warnNumber + "個警報,抽取量為" + dataNumber + "條";
	}
}

解析 kettle 的日志文件,並返回解析好的信息:

// 傳入日志文件解析得到的字符串
private List<KettleLogParse> parseKettleLog(String kettle) {
    String[] strs=kettle.split("\r\n");

    // 默認第一張表為表1
    int tableNumber = 1;
    int lineNumber = 0;
    List<KettleLogParse> recordList = new ArrayList<>();
    Stack<KettleLogParse> stack = new Stack<>();

    // 錯誤數
    int errorNum = 0;
    // 警告數
    int warningNum = 0;

    // 遍歷解析日志文件
    for(int i=0; i<strs.length; i++) {
        //獲取每行的字符串
        String str = strs[i];
        // 遇到開始項,則向stack里面壓入一個 KettleLogParse (kettlelog解析實例),開始對解析實例做處理
        if (str.contains("開始項")) {
            KettleLogParse addLogEntity = new KettleLogParse();
            addLogEntity.setTableNumber(tableNumber);
            addLogEntity.setLineNumber(lineNumber);
            errorNum = 0;
            warningNum = 0;
            stack.push(addLogEntity);		
        }
        // 獲取該解析項的一些初始信息,表名,抽取文件路徑等等
        if (str.contains("Loading transformation from XML file")) {
            KettleLogParse addLogEntity = stack.peek();
            int tableNameStart = str.lastIndexOf("[");
            int tableNameEnd = str.lastIndexOf("]");
            String tableName = str.substring(tableNameStart+1, tableNameEnd);
            int lastIndexOf;
            if (tableName.contains("file")) {
                lastIndexOf = tableName.lastIndexOf("/");				
            } else {
                lastIndexOf = tableName.lastIndexOf("\\");
            }
            String tableName1 = tableName.substring(lastIndexOf+1);

            addLogEntity.setTableName(tableName1);
            tableNumber++;
        }
        // 對該處理項的結果進行解析
        if (str.contains("完成處理")) {
            KettleLogParse addLogEntity = stack.peek();
            int beginIndex = str.lastIndexOf("(");
            int endIndex = str.lastIndexOf(")");
            String record = str.substring(beginIndex+1, endIndex);
            List<String> asList = Arrays.asList(record.split(","));
            Map<String, Integer> map = getKettleMap(asList);
            addLogEntity.setWarnNumber(warningNum);
            addLogEntity.setDataNumber(map.get(" W"));
            if (errorNum == 0) {
                addLogEntity.setSuccess(true);
            }
        }
        //// 完成時出棧,並設置最終的解析結果
        if (str.contains("完成作業項")) {
            KettleLogParse addLogEntity = stack.pop();
            if(addLogEntity.getTableName()!=null) {
                recordList.add(addLogEntity);
            }
        }
            // 記錄錯誤數
        if (str.contains("- ERROR")) {
            errorNum++;
        }
        // 記錄警告數
        if (str.contains("- Warning:")) {
            warningNum++;
        }
        lineNumber++;	
    }

    return recordList;
}

根據解析信息,獲取新的日志文本字符串

private String pageKettle (String kettle, List<KettleLogParse> recordList) {
    String[] strs=kettle.split("\r\n");

    StringBuilder result = new StringBuilder("");
    for (int i = 0; i < strs.length; i++) {
        String string = strs[i];
        KettleLogParse insertLine = isInsertLine(i, recordList);
        if(insertLine!=null) {
            String warning = insertLine.getWarnNumber() > 0 ? "<p><span class='warning'></span>" : "<p><span class='success'></span>";
            if (insertLine.isSuccess() == false) {
                warning = "<p><span class='error'></span>";
            }
            result.append(warning+insertLine.toString()+"</p> <div>"+string+"</div>\r\n");
        }else {
            result.append("<div>"+string+"</div>"+"\r\n");
        }	
    }
    return result.toString();
}

// 判斷是否在該行插入新的解析數據,是則返回插入數據,否返回空
private KettleLogParse isInsertLine(int index, List<KettleLogParse> list) {
    for(KettleLogParse logEntity : list) {
        if(index==logEntity.getLineNumber()) {
            return logEntity;
        }
    }
    return null;
}

對文本文件進行修改,添加標簽。

/**
 * 在文本文件中插入的字符串,不同 HTML 標簽
 * @param kettle
 * @param recordList
 * @return
 */
private String pageLogFile (String kettle, List<KettleLogParse> recordList) {
	String[] strs=kettle.split("\r\n");
	
	StringBuilder result = new StringBuilder("");
	for (int i = 0; i < strs.length; i++) {
		String string = strs[i];
		KettleLogParse insertLine = isInsertLine(i, recordList);
		if(insertLine!=null) {
			result.append(insertLine.toString()+"\r\n"+string+"\r\n");
		}else {
			result.append(string+"\r\n");
		}	
	}
	
	return result.toString();
}

日志解析過程中,獲取其中的特殊數據:

// 獲取日志中I=0, O=0, R=77175, W=77175, U=0, E=0,並將其解析成map結構
private Map<String, Integer> getKettleMap(List<String> list){
    Map<String, Integer> map = new HashMap<String, Integer>();
    for (String str : list) {
        String[] split = str.split("=");
        map.put(split[0], Integer.parseInt(split[1]));
    }	
    return map;
}

獲取日志文件

private File requiredLogFile(Map<String, Object> kettleLog, boolean isTrusted) {
    File logFile = null;
    try {
        Object logTime = kettleLog.get("logDate");
        String rootPath = ResourceUtil.getConfigByName("data.kettle.path");
        // 獲取文件路徑
        String filePath = rootPath + "/" + (isTrusted ? "Trusted_Log_" : "Print_Log_")
            + logTime.toString().replace("-", "") + ".log";
        // 獲取文件對象
        logFile = new File(filePath);
        // 如果文件存在,則返回,不存在,則寫入。
        if (logFile.exists() && logFile.isFile() && logFile.length() != 0) {
            String readInput = readInput(logFile);
            if(readInput.contains("個警報,抽取量為")) {
                return logFile;
            }
            List<KettleLogParse> parseKettleLog = parseKettleLog(readInput);
            String pageLogFile = pageLogFile(readInput, parseKettleLog);
            writeOutput(pageLogFile, logFile);
            return logFile;
        }
        // 如果文件存在,但文件內容為空
        if (logFile.exists() && logFile.length() == 0) {
            logFile.delete();
        }

        if (!logFile.getParentFile().exists()) {
            logFile.getParentFile().mkdirs();
        }
        logFile.createNewFile();
        Object fileContent = kettleLog.get("dataLog");
        FileWriter writer = new FileWriter(logFile);
        writer.write(fileContent.toString());
        writer.close();
        String readInput = readInput(logFile);
        List<KettleLogParse> parseKettleLog = parseKettleLog(readInput);
        String pageLogFile = pageLogFile(readInput, parseKettleLog);
        writeOutput(pageLogFile, logFile);
    } catch (Exception e) {
        e.printStackTrace();
    }
    return logFile;
}

對日志文件進行壓縮處理

/**
* 壓縮一個文件集合中的所有文件,並返回壓縮文件
* @param fileList
* @return
*/
private File logToZip(List<File> fileList) {
    File zipFile = null;
    ZipOutputStream zos = null;
    try {
        zipFile = File.createTempFile("ketteLogZip", ".zip");
        zos = new ZipOutputStream(new FileOutputStream(zipFile));
        int i = 0;
        for (File srcFile : fileList) {
            byte[] buf = new byte[2048];
            // 防止文件重名導致壓縮失敗
            String fileName = srcFile.getName();
            String prefix = fileName.substring(fileName.lastIndexOf("."));
            String newFileName = fileName.substring(0, fileName.length()-prefix.length()) + "_" +i+ prefix;
            zos.putNextEntry(new ZipEntry(newFileName)); 
            int len;
            FileInputStream in = new FileInputStream(srcFile);
            while ((len = in.read(buf)) != -1){
                zos.write(buf, 0, len);
            }
            zos.closeEntry();
            in.close();
            zos.flush();
            i++;
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        if(zos != null) {
            try {
                zos.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    return zipFile;
}

查詢接口

@Override
	public Map<String, Object> queryKettleLog(String beginTime,String endTime,String logFlag,Integer pageId, Integer pageNum){
		// 查詢日志的SQL
		String sql = "SELECT ID_JOB as id, left(LOGDATE,10) as logDate, `ERRORS` as logFlag, REPLAYDATE AS STARTDATE, LOGDATE AS ENDDATE, LOG_FIELD as dataLog FROM t_sdrs_data_extraction_log";
		// 日志總數
		String sql1 = "SELECT COUNT(*) FROM t_sdrs_data_extraction_log";
		// 篩選條件
		String condition = " WHERE 1=1";
		if(!StringUtils.isBlank(beginTime)){
			condition = condition +" AND LOGDATE >='"+beginTime+"'";
		}
		if(!StringUtils.isBlank(endTime)){
			condition = condition +" AND LOGDATE <='"+endTime+"'";
		}
		if(!StringUtils.isBlank(logFlag)){
			if(logFlag.equals("1")) {
				condition = condition +" AND `ERRORS` !='0'";
			}else {
				condition = condition +" AND `ERRORS` ='0'";
			}
		}
		condition = condition + " ORDER BY LOGDATE DESC";
		sql = sql + condition;
		sql1 = sql1 + condition;
		
		// dao層查詢方法
		Map<String,Object> map = Maps.<String,Object>newHashMap();
		Long total = systemService.getCountForJdbc(sql1);
		List<Map<String, Object>> kettleLogList = systemService.findForJdbc(sql,pageId,pageNum);
		
		// 開始解析,目標是統計沒條日志中有無錯誤,或警告
		Iterator<Map<String, Object>> iterator = kettleLogList.iterator();
		
		while (iterator.hasNext()){
			Map<String, Object> queryMap = iterator.next();
			Object object = queryMap.get("dataLog");
			List<KettleLogParse> parseKettleLog = parseKettleLog(object.toString());
			int sumRecord = 0; //每條日志中的記錄總數
			int resultFlag = 0; // 記錄的標志:0正常,1錯誤,2警告
			for (KettleLogParse parse : parseKettleLog) {
				int dataNumber = parse.getDataNumber();
				sumRecord = sumRecord + dataNumber;
				// error 檢查
				if (resultFlag!=1) {
					if(parse.isSuccess()==false) {
						resultFlag = 1;
					}
				}
				if(resultFlag == 0) {
					if(parse.getWarnNumber()>0) {
						resultFlag = 2;
					}	
				}
			}
			//當日志的解析結果和查詢的條件不一致時,將其從所有數據中移除。
			if(!StringUtils.isBlank(logFlag)&&!logFlag.equals(String.valueOf(resultFlag))) {
				iterator.remove();
				continue;
			}
			queryMap.put("sumRecord", sumRecord);
			queryMap.put("resultFlag", resultFlag);
		}
		
		map.put("ketteLog", kettleLogList);
		map.put("total", total);
		return map;
	}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM