解析 kettle 日志文件
將 文本文件轉成字符串
private String readInput(File file) {
StringBuffer buffer = new StringBuffer();
try {
FileInputStream fis = new FileInputStream(file);
InputStreamReader isr = new InputStreamReader(fis, "GBK");
Reader in = new BufferedReader(isr);
int i;
while ((i= in.read()) > -1) {
buffer.append((char) i);
}
in.close();
return buffer.toString();
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
將字符串寫入文件中
private void writeOutput(String str, File file) {
try {
FileOutputStream fos = new FileOutputStream(file);
Writer out = new OutputStreamWriter(fos, "GBK");
out.write(str);
out.close();
} catch (IOException e) {
e.printStackTrace();
}
}
存儲解析的 kettle 信息的 日志類:
public class KettleLogParse {
// 表示第幾張表
private int tableNumber;
// 表示表名
private String tableName;
// 表示抽取是否成功
private boolean isSuccess;
// 表示警告的數量
private int warnNumber;
// 表示抽取的數量
private int dataNumber;
// 日志添加在第幾行
private int lineNumber;
public int getTableNumber() {
return tableNumber;
}
public void setTableNumber(int tableNumber) {
this.tableNumber = tableNumber;
}
public boolean isSuccess() {
return isSuccess;
}
public void setSuccess(boolean isSuccess) {
this.isSuccess = isSuccess;
}
public int getWarnNumber() {
return warnNumber;
}
public void setWarnNumber(int warnNumber) {
this.warnNumber = warnNumber;
}
public int getDataNumber() {
return dataNumber;
}
public void setDataNumber(int dataNumber) {
this.dataNumber = dataNumber;
}
public String getTableName() {
return tableName;
}
public void setTableName(String tableName) {
this.tableName = tableName;
}
public int getLineNumber() {
return lineNumber;
}
public void setLineNumber(int lineNumber) {
this.lineNumber = lineNumber;
}
@Override
public String toString() {
String flag = isSuccess == true ? ",抽取成功,共" : ",抽取失敗,共";
return "表" + tableNumber + ":" + tableName + flag + warnNumber + "個警報,抽取量為" + dataNumber + "條";
}
}
解析 kettle 的日志文件,並返回解析好的信息:
// 傳入日志文件解析得到的字符串
private List<KettleLogParse> parseKettleLog(String kettle) {
String[] strs=kettle.split("\r\n");
// 默認第一張表為表1
int tableNumber = 1;
int lineNumber = 0;
List<KettleLogParse> recordList = new ArrayList<>();
Stack<KettleLogParse> stack = new Stack<>();
// 錯誤數
int errorNum = 0;
// 警告數
int warningNum = 0;
// 遍歷解析日志文件
for(int i=0; i<strs.length; i++) {
//獲取每行的字符串
String str = strs[i];
// 遇到開始項,則向stack里面壓入一個 KettleLogParse (kettlelog解析實例),開始對解析實例做處理
if (str.contains("開始項")) {
KettleLogParse addLogEntity = new KettleLogParse();
addLogEntity.setTableNumber(tableNumber);
addLogEntity.setLineNumber(lineNumber);
errorNum = 0;
warningNum = 0;
stack.push(addLogEntity);
}
// 獲取該解析項的一些初始信息,表名,抽取文件路徑等等
if (str.contains("Loading transformation from XML file")) {
KettleLogParse addLogEntity = stack.peek();
int tableNameStart = str.lastIndexOf("[");
int tableNameEnd = str.lastIndexOf("]");
String tableName = str.substring(tableNameStart+1, tableNameEnd);
int lastIndexOf;
if (tableName.contains("file")) {
lastIndexOf = tableName.lastIndexOf("/");
} else {
lastIndexOf = tableName.lastIndexOf("\\");
}
String tableName1 = tableName.substring(lastIndexOf+1);
addLogEntity.setTableName(tableName1);
tableNumber++;
}
// 對該處理項的結果進行解析
if (str.contains("完成處理")) {
KettleLogParse addLogEntity = stack.peek();
int beginIndex = str.lastIndexOf("(");
int endIndex = str.lastIndexOf(")");
String record = str.substring(beginIndex+1, endIndex);
List<String> asList = Arrays.asList(record.split(","));
Map<String, Integer> map = getKettleMap(asList);
addLogEntity.setWarnNumber(warningNum);
addLogEntity.setDataNumber(map.get(" W"));
if (errorNum == 0) {
addLogEntity.setSuccess(true);
}
}
//// 完成時出棧,並設置最終的解析結果
if (str.contains("完成作業項")) {
KettleLogParse addLogEntity = stack.pop();
if(addLogEntity.getTableName()!=null) {
recordList.add(addLogEntity);
}
}
// 記錄錯誤數
if (str.contains("- ERROR")) {
errorNum++;
}
// 記錄警告數
if (str.contains("- Warning:")) {
warningNum++;
}
lineNumber++;
}
return recordList;
}
根據解析信息,獲取新的日志文本字符串
private String pageKettle (String kettle, List<KettleLogParse> recordList) {
String[] strs=kettle.split("\r\n");
StringBuilder result = new StringBuilder("");
for (int i = 0; i < strs.length; i++) {
String string = strs[i];
KettleLogParse insertLine = isInsertLine(i, recordList);
if(insertLine!=null) {
String warning = insertLine.getWarnNumber() > 0 ? "<p><span class='warning'></span>" : "<p><span class='success'></span>";
if (insertLine.isSuccess() == false) {
warning = "<p><span class='error'></span>";
}
result.append(warning+insertLine.toString()+"</p> <div>"+string+"</div>\r\n");
}else {
result.append("<div>"+string+"</div>"+"\r\n");
}
}
return result.toString();
}
// 判斷是否在該行插入新的解析數據,是則返回插入數據,否返回空
private KettleLogParse isInsertLine(int index, List<KettleLogParse> list) {
for(KettleLogParse logEntity : list) {
if(index==logEntity.getLineNumber()) {
return logEntity;
}
}
return null;
}
對文本文件進行修改,添加標簽。
/**
* 在文本文件中插入的字符串,不同 HTML 標簽
* @param kettle
* @param recordList
* @return
*/
private String pageLogFile (String kettle, List<KettleLogParse> recordList) {
String[] strs=kettle.split("\r\n");
StringBuilder result = new StringBuilder("");
for (int i = 0; i < strs.length; i++) {
String string = strs[i];
KettleLogParse insertLine = isInsertLine(i, recordList);
if(insertLine!=null) {
result.append(insertLine.toString()+"\r\n"+string+"\r\n");
}else {
result.append(string+"\r\n");
}
}
return result.toString();
}
日志解析過程中,獲取其中的特殊數據:
// 獲取日志中I=0, O=0, R=77175, W=77175, U=0, E=0,並將其解析成map結構
private Map<String, Integer> getKettleMap(List<String> list){
Map<String, Integer> map = new HashMap<String, Integer>();
for (String str : list) {
String[] split = str.split("=");
map.put(split[0], Integer.parseInt(split[1]));
}
return map;
}
獲取日志文件
private File requiredLogFile(Map<String, Object> kettleLog, boolean isTrusted) {
File logFile = null;
try {
Object logTime = kettleLog.get("logDate");
String rootPath = ResourceUtil.getConfigByName("data.kettle.path");
// 獲取文件路徑
String filePath = rootPath + "/" + (isTrusted ? "Trusted_Log_" : "Print_Log_")
+ logTime.toString().replace("-", "") + ".log";
// 獲取文件對象
logFile = new File(filePath);
// 如果文件存在,則返回,不存在,則寫入。
if (logFile.exists() && logFile.isFile() && logFile.length() != 0) {
String readInput = readInput(logFile);
if(readInput.contains("個警報,抽取量為")) {
return logFile;
}
List<KettleLogParse> parseKettleLog = parseKettleLog(readInput);
String pageLogFile = pageLogFile(readInput, parseKettleLog);
writeOutput(pageLogFile, logFile);
return logFile;
}
// 如果文件存在,但文件內容為空
if (logFile.exists() && logFile.length() == 0) {
logFile.delete();
}
if (!logFile.getParentFile().exists()) {
logFile.getParentFile().mkdirs();
}
logFile.createNewFile();
Object fileContent = kettleLog.get("dataLog");
FileWriter writer = new FileWriter(logFile);
writer.write(fileContent.toString());
writer.close();
String readInput = readInput(logFile);
List<KettleLogParse> parseKettleLog = parseKettleLog(readInput);
String pageLogFile = pageLogFile(readInput, parseKettleLog);
writeOutput(pageLogFile, logFile);
} catch (Exception e) {
e.printStackTrace();
}
return logFile;
}
對日志文件進行壓縮處理
/**
* 壓縮一個文件集合中的所有文件,並返回壓縮文件
* @param fileList
* @return
*/
private File logToZip(List<File> fileList) {
File zipFile = null;
ZipOutputStream zos = null;
try {
zipFile = File.createTempFile("ketteLogZip", ".zip");
zos = new ZipOutputStream(new FileOutputStream(zipFile));
int i = 0;
for (File srcFile : fileList) {
byte[] buf = new byte[2048];
// 防止文件重名導致壓縮失敗
String fileName = srcFile.getName();
String prefix = fileName.substring(fileName.lastIndexOf("."));
String newFileName = fileName.substring(0, fileName.length()-prefix.length()) + "_" +i+ prefix;
zos.putNextEntry(new ZipEntry(newFileName));
int len;
FileInputStream in = new FileInputStream(srcFile);
while ((len = in.read(buf)) != -1){
zos.write(buf, 0, len);
}
zos.closeEntry();
in.close();
zos.flush();
i++;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if(zos != null) {
try {
zos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return zipFile;
}
查詢接口
@Override
public Map<String, Object> queryKettleLog(String beginTime,String endTime,String logFlag,Integer pageId, Integer pageNum){
// 查詢日志的SQL
String sql = "SELECT ID_JOB as id, left(LOGDATE,10) as logDate, `ERRORS` as logFlag, REPLAYDATE AS STARTDATE, LOGDATE AS ENDDATE, LOG_FIELD as dataLog FROM t_sdrs_data_extraction_log";
// 日志總數
String sql1 = "SELECT COUNT(*) FROM t_sdrs_data_extraction_log";
// 篩選條件
String condition = " WHERE 1=1";
if(!StringUtils.isBlank(beginTime)){
condition = condition +" AND LOGDATE >='"+beginTime+"'";
}
if(!StringUtils.isBlank(endTime)){
condition = condition +" AND LOGDATE <='"+endTime+"'";
}
if(!StringUtils.isBlank(logFlag)){
if(logFlag.equals("1")) {
condition = condition +" AND `ERRORS` !='0'";
}else {
condition = condition +" AND `ERRORS` ='0'";
}
}
condition = condition + " ORDER BY LOGDATE DESC";
sql = sql + condition;
sql1 = sql1 + condition;
// dao層查詢方法
Map<String,Object> map = Maps.<String,Object>newHashMap();
Long total = systemService.getCountForJdbc(sql1);
List<Map<String, Object>> kettleLogList = systemService.findForJdbc(sql,pageId,pageNum);
// 開始解析,目標是統計沒條日志中有無錯誤,或警告
Iterator<Map<String, Object>> iterator = kettleLogList.iterator();
while (iterator.hasNext()){
Map<String, Object> queryMap = iterator.next();
Object object = queryMap.get("dataLog");
List<KettleLogParse> parseKettleLog = parseKettleLog(object.toString());
int sumRecord = 0; //每條日志中的記錄總數
int resultFlag = 0; // 記錄的標志:0正常,1錯誤,2警告
for (KettleLogParse parse : parseKettleLog) {
int dataNumber = parse.getDataNumber();
sumRecord = sumRecord + dataNumber;
// error 檢查
if (resultFlag!=1) {
if(parse.isSuccess()==false) {
resultFlag = 1;
}
}
if(resultFlag == 0) {
if(parse.getWarnNumber()>0) {
resultFlag = 2;
}
}
}
//當日志的解析結果和查詢的條件不一致時,將其從所有數據中移除。
if(!StringUtils.isBlank(logFlag)&&!logFlag.equals(String.valueOf(resultFlag))) {
iterator.remove();
continue;
}
queryMap.put("sumRecord", sumRecord);
queryMap.put("resultFlag", resultFlag);
}
map.put("ketteLog", kettleLogList);
map.put("total", total);
return map;
}