第一次接觸Talend,做一個Demo,目的是實現定時同步!經過一番折騰,終於實現了,在此記錄一下,以慰折騰之心!哈哈!
目的:每月定時觸發兩個數據庫之間當月數據的同步以及可以人工通過發送JMS來觸發指定月份的數據同步,在同步過程中出錯的話自動轉發送JMS繼續同步!
過程:
1、實現基本業務流程
話不多說,直接上圖!呵呵!
上圖中有幾點備注一下:
1、tLibraryLoad是因為我用TIBCO EMS,所以需要先把相關jar添加進來
2、tSetGlobalVar是為了我判斷是否是第一次發送,如果是第一次發送我需要刪除目標庫相同月份的數據后再同步
3、tLogCatcher是當數據庫操作出錯時,將同步到哪個月份的相關參數發JMS,以便繼續觸發同步
4、tJavaFlex是為了處理多個年月的循環用,tJavaFlex代碼如下:
start code:
// start part of your Java code System.out.println("tJavaFlex_1: Start code"); int beginYear = context.beginYear; int beginMonth = context.beginMonth; int endYear = context.endYear; int endMonth = context.endMonth; int currYear = beginYear; int currMonth = beginMonth; String currYearMonth = ""; row1.beginYear = beginYear; row1.beginMonth = beginMonth; row1.endYear = endYear; row1.endMonth = endMonth; while((currYear*100+currMonth) <= (endYear*100+endMonth)) {
main code:
// here is the main part of the component, // a piece of code executed in the row // loop if(currMonth >= 10){ currYearMonth = "" + currYear + currMonth; } else { currYearMonth = "" + currYear + "0" + currMonth; } row1.currYear = currYear; row1.currMonth = currMonth; row1.currYearMonth = currYearMonth; currMonth = currMonth + 1; if(currMonth > 12){ currYear = currYear + 1; currMonth = 1; }
end code:
// end of the component, outside/closing the loop } System.out.println("tJavaFlex_1: End code");
2、實現JMS監聽調用
這個主要接受JMS消息后繼續調用基本流程。
3、定時調用業務流程
因為在組件里面沒有找到定時的組件,故自己開發了一個基於Quartz的timer組件,可能不是很好,但湊合着能用,先記錄一下,以后有新的想法后再優化!呵呵!
在做定時組件時,開始打算把代碼都寫到模板中去,但是總是有問題,后來就把一部分代碼寫在外面用jar的形式引入進來!
A、用eclipse或其他工具創建一個java工程,因為是基於Quartz的,所以需要引進相關jar:quartz-2.2.3.jar,slf4j-api-1.7.22.jar,slf4j-log4j12-1.7.22.jar,log4j-1.2.17.jar,c3p0-0.9.2.jar
B、創建3個java文件(當然也可以合並,根據自己的喜好):EsquelTimerJobStatus.java,EsquelTimerJob.java,EsquelTimerJobMonitor.java
直接上代碼:
EsquelTimerJobStatus.java
package com.esquel.talend.quartz; import java.util.HashMap; import java.util.Map; public class EsquelTimerJobStatus { private static Map<String, Object> timerInfo = new HashMap<String, Object>(); public static Map<String, Object> operateTimerInfo(String cid, Map<String, Object> timerMap, boolean isRemove) { synchronized (timerInfo) { if (timerMap != null) { timerInfo.putAll(timerMap); return null; } else { if (cid == null) { cid = "1"; } if (isRemove) { timerInfo.remove("currentYear_" + cid); timerInfo.remove("currentMonth_" + cid); timerInfo.remove("currentDay_" + cid); timerInfo.remove("currentHour_" + cid); timerInfo.remove("currentMinute_" + cid); timerInfo.remove("currentSecond_" + cid); timerInfo.remove("currentDate_" + cid); timerInfo.remove("currentDateTime_" + cid); timerInfo.remove("isRun_" + cid); return null; } else { if (timerInfo.get("isRun_" + cid) != null && (Boolean) timerInfo.get("isRun_" + cid)) { Map<String, Object> returnMap = new HashMap<String, Object>(); returnMap.put("isRun_" + cid, true); returnMap.put("currentYear_" + cid, timerInfo.get("currentYear_" + cid)); returnMap.put("currentMonth_" + cid, timerInfo.get("currentMonth_" + cid)); returnMap.put("currentDay_" + cid, timerInfo.get("currentDay_" + cid)); returnMap.put("currentHour_" + cid, timerInfo.get("currentHour_" + cid)); returnMap.put("currentMinute_" + cid, timerInfo.get("currentMinute_" + cid)); returnMap.put("currentSecond_" + cid, timerInfo.get("currentSecond_" + cid)); returnMap.put("currentDate_" + cid, timerInfo.get("currentDate_" + cid)); returnMap.put("currentDateTime_" + cid, timerInfo.get("currentDateTime_" + cid)); timerInfo.put("isRun_" + cid, false); return returnMap; } else { return null; } } } } } }
EsquelTimerJob.java
package com.esquel.talend.quartz; import java.util.Calendar; import java.util.HashMap; import java.util.Map; import org.quartz.Job; import org.quartz.JobDataMap; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; public final class EsquelTimerJob implements Job { public void execute(JobExecutionContext context) throws JobExecutionException { java.util.Calendar currentDate = java.util.Calendar.getInstance(); JobDataMap dataMap = context.getJobDetail().getJobDataMap(); String cid = dataMap.getString("cid"); if (cid == null) { cid = "1"; } java.text.SimpleDateFormat dateFormat = new java.text.SimpleDateFormat("yyyy-MM-dd"); String datestr = dateFormat.format(currentDate.getTime()); java.text.SimpleDateFormat datetimeFormat = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String datetimestr = datetimeFormat.format(currentDate.getTime()); Map<String, Object> timerMap = new HashMap<String, Object>(); timerMap.put("currentYear_" + cid, currentDate.get(Calendar.YEAR)); timerMap.put("currentMonth_" + cid, currentDate.get(Calendar.MONTH) + 1); timerMap.put("currentDay_" + cid, currentDate.get(Calendar.DAY_OF_MONTH)); timerMap.put("currentHour_" + cid, currentDate.get(Calendar.HOUR_OF_DAY)); timerMap.put("currentMinute_" + cid, currentDate.get(Calendar.MINUTE)); timerMap.put("currentSecond_" + cid, currentDate.get(Calendar.SECOND)); timerMap.put("currentDate_" + cid, datestr); timerMap.put("currentDateTime_" + cid, datetimestr); timerMap.put("isRun_" + cid, true); EsquelTimerJobStatus.operateTimerInfo(cid, timerMap, false); //System.out.println(cid+ ":" + datetimestr); } }
EsquelTimerJobMonitor.java
package com.esquel.talend.quartz; import java.util.Map; public class EsquelTimerJobMonitor { public EsquelTimerJobMonitor() { } public Map<String, Object> getTimerInfo(String cid) { while (true) { Map<String, Object> returnMap = EsquelTimerJobStatus.operateTimerInfo(cid, null, false); if (returnMap != null) { return returnMap; } } } }
C、加入一個log4j.properties文件,這個自己定義,因為不加入的話,好像會報log引入錯誤
D、將上面代碼和log4j.properties打包成一個jar:EsqueTimerJob.jar
E、接下來是對Talend進行組件開發,創建一個組件tEsquelTimer,引入相關jar,勾選begin和end,如下圖:
注:自定義圖標下面的esquelTimer.png的命名不對,要用tEsquelTimer_icon32.png
話不多說,直接上相關文件代碼!
tEsquelTimer_java.xml:
<?xml version="1.0" encoding="UTF-8"?> <COMPONENT> <HEADER PLATEFORM="ALL" SERIAL="" VERSION="0.102" STATUS="ALPHA" COMPATIBILITY="ALL" AUTHOR="AngusYang" RELEASE_DATE="20170215A" STARTABLE="true" LOG4J_ENABLED="true"> <SIGNATURE /> </HEADER> <FAMILIES> <FAMILY>Esquel</FAMILY> </FAMILIES> <DOCUMENTATION> <URL /> </DOCUMENTATION> <CONNECTORS> <CONNECTOR CTYPE="FLOW" MAX_INPUT="0" MAX_OUTPUT="1" /> <CONNECTOR CTYPE="ITERATE" MAX_OUTPUT="1" MAX_INPUT="1" /> <CONNECTOR CTYPE="SUBJOB_OK" MAX_INPUT="1" MAX_OUTPUT="1" /> <CONNECTOR CTYPE="COMPONENT_OK" /> <CONNECTOR CTYPE="COMPONENT_ERROR" /> <CONNECTOR CTYPE="RUN_IF" /> </CONNECTORS> <PARAMETERS> <PARAMETER NAME="TIMER_CORN" FIELD="TEXT" NUM_ROW="1"> <DEFAULT>"0/10 * * * * ? *"</DEFAULT> </PARAMETER> <PARAMETER NAME="SCHEMA" FIELD="SCHEMA_TYPE" REQUIRED="true" NUM_ROW="5"> <TABLE READONLY="false"> <COLUMN NAME="currentYear" TYPE="id_Integer" CUSTOM="true" /> <COLUMN NAME="currentMonth" TYPE="id_Integer" CUSTOM="true" /> <COLUMN NAME="currentDay" TYPE="id_Integer" CUSTOM="true" /> <COLUMN NAME="currentHour" TYPE="id_Integer" CUSTOM="true" /> <COLUMN NAME="currentMinute" TYPE="id_Integer" CUSTOM="true" /> <COLUMN NAME="currentSecond" TYPE="id_Integer" CUSTOM="true" /> <COLUMN NAME="currentDate" TYPE="id_String" CUSTOM="true" /> <COLUMN NAME="currentDateTime" TYPE="id_String" CUSTOM="true" /> </TABLE> </PARAMETER> </PARAMETERS> <CODEGENERATION> <IMPORTS> <IMPORT MODULE="c3p0-0.9.2.jar" NAME="c3p0-0.9.2" REQUIRED="true" /> <IMPORT MODULE="EsqueTimerJob.jar" NAME="EsqueTimerJob" REQUIRED="true" /> <IMPORT MODULE="log4j-1.2.17.jar" NAME="log4j-1.2.17" REQUIRED="true" /> <IMPORT MODULE="quartz-2.2.3.jar" NAME="quartz-2.2.3" REQUIRED="true" /> <IMPORT MODULE="slf4j-api-1.7.22.jar" NAME="slf4j-api-1.7.22" REQUIRED="true" /> <IMPORT MODULE="slf4j-log4j12-1.7.22.jar" NAME="slf4j-log4j12-1.7.22" REQUIRED="true" /> </IMPORTS> </CODEGENERATION> <RETURNS> <RETURN AVAILABILITY="AFTER" NAME="NB_LINE" TYPE="id_Integer" /> </RETURNS> </COMPONENT>
tEsquelTimer_begin.javajet:
<%@ jet imports=" org.talend.core.model.process.INode org.talend.core.model.process.ElementParameterParser org.talend.core.model.metadata.IMetadataTable org.talend.core.model.metadata.IMetadataColumn org.talend.core.model.process.IConnection org.talend.core.model.process.IConnectionCategory org.talend.designer.codegen.config.CodeGeneratorArgument org.talend.core.model.metadata.types.JavaTypesManager org.talend.core.model.metadata.types.JavaType java.util.List java.util.Map " %> <%@ include file="@{org.talend.designer.components.localprovider}/components/templates/Log4j/LogUtil.javajet"%> <% CodeGeneratorArgument codeGenArgument = (CodeGeneratorArgument) argument; INode node = (INode)codeGenArgument.getArgument(); String cid = node.getUniqueName(); log = new LogUtil(node); String cronExpression = ElementParameterParser.getValue(node, "__TIMER_CORN__"); IMetadataTable metadata=null; List<IMetadataTable> metadatas = node.getMetadataList(); if ((metadatas!=null)&&(metadatas.size()>0)) { metadata = metadatas.get(0); } %> int nbline_<%=cid %> = 0; com.esquel.talend.quartz.EsquelTimerJobMonitor etm = new com.esquel.talend.quartz.EsquelTimerJobMonitor(); try { org.quartz.impl.StdSchedulerFactory sf_<%=cid%> = new org.quartz.impl.StdSchedulerFactory(); org.quartz.Scheduler sched_<%=cid%> = sf_<%=cid%>.getScheduler(); org.quartz.impl.JobDetailImpl jobDetail_<%=cid%> = new org.quartz.impl.JobDetailImpl(); jobDetail_<%=cid%>.setName("EsquelTimerJob_<%=cid%>"); jobDetail_<%=cid%>.setGroup("EsquelTimerJobGroup_<%=cid%>"); jobDetail_<%=cid%>.setJobClass(com.esquel.talend.quartz.EsquelTimerJob.class); org.quartz.JobDataMap jobDataMap_<%=cid%> = new org.quartz.JobDataMap(); jobDataMap_<%=cid%>.put("cid", "<%=cid%>"); jobDetail_<%=cid%>.setJobDataMap(jobDataMap_<%=cid%>); org.quartz.impl.triggers.CronTriggerImpl cornTrigger_<%=cid%> = new org.quartz.impl.triggers.CronTriggerImpl(); cornTrigger_<%=cid%>.setName("EsquelTimerTrigger_<%=cid%>"); cornTrigger_<%=cid%>.setGroup("EsquelTimerTriggerGroup_<%=cid%>"); cornTrigger_<%=cid%>.setCronExpression(<%=cronExpression%>); sched_<%=cid%>.scheduleJob(jobDetail_<%=cid%>, cornTrigger_<%=cid%>); sched_<%=cid%>.start(); System.out.println("Ready to schedule"); System.out.println("Waiting..."); <%log.info(log.str("Ready to schedule."));%> <%log.info(log.str("Waiting..."));%> java.util.Map<String,Object> currMap = null; while((currMap=etm.getTimerInfo("<%=cid%>")) != null){ <%log.debug(log.str("corn times "), "(nbline_" + cid + "+1)", log.str("."));%> <% List< ? extends IConnection> conns = node.getOutgoingSortedConnections(); List<IMetadataColumn> columnLists = metadata.getListColumns(); for(IConnection conn:conns){ if (conn.getLineStyle().hasConnectionCategory(IConnectionCategory.DATA)) { String firstConnName = conn.getName(); %> <%=firstConnName%>.currentYear=(Integer)currMap.get("currentYear_<%=cid%>"); <%=firstConnName%>.currentMonth=(Integer)currMap.get("currentMonth_<%=cid%>"); <%=firstConnName%>.currentDay=(Integer)currMap.get("currentDay_<%=cid%>"); <%=firstConnName%>.currentHour=(Integer)currMap.get("currentHour_<%=cid%>"); <%=firstConnName%>.currentMinute=(Integer)currMap.get("currentMinute_<%=cid%>"); <%=firstConnName%>.currentSecond=(Integer)currMap.get("currentSecond_<%=cid%>"); <%=firstConnName%>.currentDate=(String)currMap.get("currentDate_<%=cid%>"); <%=firstConnName%>.currentDateTime=(String)currMap.get("currentDateTime_<%=cid%>"); <% } } %>
tEsquelTimer_end.javajet:
<%@ jet imports=" org.talend.core.model.process.INode org.talend.core.model.process.ElementParameterParser org.talend.core.model.metadata.IMetadataTable org.talend.core.model.metadata.IMetadataColumn org.talend.core.model.process.IConnection org.talend.core.model.process.IConnectionCategory org.talend.designer.codegen.config.CodeGeneratorArgument org.talend.core.model.metadata.types.JavaTypesManager org.talend.core.model.metadata.types.JavaType java.util.List java.util.Map " %> <%@ include file="@{org.talend.designer.components.localprovider}/components/templates/Log4j/LogUtil.javajet"%> <% CodeGeneratorArgument codeGenArgument = (CodeGeneratorArgument) argument; INode node = (INode)codeGenArgument.getArgument(); String cid = node.getUniqueName(); log = new LogUtil(node); %> nbline_<%=cid %>++; } } catch (Exception e) { e.printStackTrace(); } <%log.info(log.str("send records count: "), log.var("nbline"), log.str("."));%> globalMap.put("<%=cid %>_NB_LINE", nbline_<%=cid%>);
tEsquelTimer_messages.properties:
# #Tue Feb 14 13:20:59 CST 2017 TIMER_CORN.NAME=CronExpression NB_LINE.NAME=NB_LINE HELP=org.talend.help.tEsquelTimer LONG_NAME=tEsquelTimer using quartz
F、發布組件,並開發Job
好了!在此大功告成!哈哈!