Otter源碼


 

查找對應的源碼及相關的邏輯:
支持ddl同步
跳過ddl異常




報錯1:

git clone git@github.com:alibaba/otter.git
cd otter; 
mvn clean install -Dmaven.test.skip -Denv=release

https://github.com/alibaba/otter/wiki/Manager_Quickstart

[ERROR] Failed to execute goal on project shared.common: Could not resolve dependencies for project com.alibaba.otter:shared.common:jar:4.2.19-SNAPSHOT: Failed to collect dependencies at ch.qos.logback:logback-core:jar:1.1.3: Failed to read artifact descriptor for
ch.qos.logback:logback-core:jar:1.1.3: Could not transfer artifact ch.qos.logback:logback-core:pom:1.1.3 from/to central (http://repo1.maven.org/maven2): Failed to transfer file http://repo1.maven.org/maven2/ch/qos/logback/logback-core/1.1.3/logback-core-1.1.3.pom with status code 501 -> [Help 1]
org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal on project shared.common: Could not resolve dependencies for project com.alibaba.otter:shared.common:jar:4.2.19-SNAPSHOT: Failed to collect dependencies at ch.qos.logback:logback-core:ja
r:1.1.3
    at org.apache.maven.lifecycle.internal.LifecycleDependencyResolver.getDependencies (LifecycleDependencyResolver.java:269)
    at org.apache.maven.lifecycle.internal.LifecycleDependencyResolver.resolveProjectDependencies (LifecycleDependencyResolver.java:147)

 看報錯原因,是拉pom文件時報錯了,復現一下:

 

 改成https就可以了:

 

 那改的方法就簡單了,將配置的http改為https就可以了:

 

執行  mvn clean install -Dmaven.test.skip=true -Denv=release 后,報了另外一個錯:

[ERROR] Failed to execute goal on project shared.common: Could not resolve dependencies for project com.alibaba.otter:shared.common:jar:4.2.19-SNAPSHOT: Failed to collect dependencies at org.jtester:jtester:jar:1.1.8: Failed to read artifact descriptor for org.jtes
ter:jtester:jar:1.1.8: Could not transfer artifact org.jtester:jtester:pom:1.1.8 from/to jtester-maven (http://jtester.googlecode.com/svn/m2/): Connect to jtester.googlecode.com:80 [jtester.googlecode.com/74.125.204.82] failed: Connection timed out: connect -> [Hel
p 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/DependencyResolutionException
[ERROR]
[ERROR] After correcting the problems, you can resume the build with the command
[ERROR]   mvn <goals> -rf :shared.common

可以看到在本機電腦就ping不通

 

把這些已經刪除掉:

 

 再執行  mvn clean compile install -Dmaven.test.skip=true -Denv=release
又報錯了:

[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal on project shared.common: Could not resolve dependencies for project com.alibaba.otter:shared.common:jar:4.2.19-SNAPSHOT: Could not find artifact org.jtester:jtester:jar:1.1.8 in central (https://repo1.maven.org/maven2) -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/DependencyResolutionException
[ERROR]
[ERROR] After correcting the problems, you can resume the build with the command
[ERROR]   mvn <goals> -rf :shared.common

在mvnrepository.com上看的確沒有:

 

 看項目中自帶了:

 

 引入到項目中:

 

點 確定  后,選中所有的項目,點 確定
再執行 mvn clean compile  install -Dmaven.test.skip=true -Denv=release
仍然報錯,不過報錯內容變了:

[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  19.888 s
[INFO] Finished at: 2020-05-20T15:52:02+08:00
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal on project shared.common: Could not resolve dependencies for project com.alibaba.otter:shared.common:jar:4.2.19-SNAPSHOT: Failure to find org.jtester:jtester:jar:1.1.8 in https://repo1.maven.org/maven2 was cached in the local reposito
ry, resolution will not be reattempted until the update interval of central has elapsed or updates are forced -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/DependencyResolutionException
[ERROR]
[ERROR] After correcting the problems, you can resume the build with the command
[ERROR]   mvn <goals> -rf :shared.common

 


這樣還是不行。看來偷懶是不行了。

cd 存放下面兩個 jar的目錄
mvn install:install-file -Dfile=jtester-1.1.8.jar -DgroupId=org.jtester -DartifactId=jtester  -Dversion=1.1.8 -Dpackaging=jar
mvn install:install-file -Dfile=ojdbc6.jar -DgroupId=com.oracle -DartifactId=ojdbc6  -Dversion=11.1.0.7.0 -Dpackaging=jar

然后再到otter項目根目錄下,執行 mvn clean compile package -Dmaven.skip.test=true -Denv=release  

 

 

IDEA使用Maven項目不能加入本地Jar包的解決方法
https://blog.csdn.net/lvshaorong/article/details/80334351






otter代碼在IDEA遠程DEBUG方法
眾所周知,Otter的代碼打包后,是通過Jetty啟動的,Otter代碼的啟動腳本中自帶了開啟Jetty遠程DEBUG的腳本,所以我們只需要在啟動Otter Manager和Otter Node的時候,帶上如下參數:

sh start.sh debug 8888
其中8888就是我們遠程debug的端口號。

然后在IDEA中開啟遠程調試,具體的步驟為:

Debug->Remote->填入ip和端口號->apply
即可
然后在服務器啟動,和在IDEA中啟動,就可以DEBUG了。

最近要針對業務修改Otter的源碼,本地又沒法直接啟動,所以只能通過遠程調試來做了

https://www.cnblogs.com/f-zhao/p/8398704.html

 

 

 

 
【Canal源碼分析】整體架構
https://www.cnblogs.com/f-zhao/p/9112158.html

 

command : 'show master status' has an error! pls check. you need (at least one of) the SUPER,REPLICATION CLIENT privilege(s) for this operation

 

    /**
     * 查詢當前的binlog位置
     */
    private EntryPosition findEndPosition(MysqlConnection mysqlConnection) {
        try {
            ResultSetPacket packet = mysqlConnection.query("show master status");
            List<String> fields = packet.getFieldValues();
            if (CollectionUtils.isEmpty(fields)) {
                throw new CanalParseException("command : 'show master status' has an error! pls check. you need (at least one of) the SUPER,REPLICATION CLIENT privilege(s) for this operation");
            }
            EntryPosition endPosition = new EntryPosition(fields.get(0), Long.valueOf(fields.get(1)));
            if (isGTIDMode() && fields.size() > 4) {
                endPosition.setGtid(fields.get(4));
            }
            return endPosition;
        } catch (IOException e) {
            throw new CanalParseException("command : 'show master status' has an error!", e);
        }
    }

com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser#findEndPosition(com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection)

 

 

 

 

 

在MYSQL的主從復制中 ,通過命令show master status,可以查看master數據庫當前正在使用的二進制日志及當前執行二進制日志位置
show master logs,查看所有二進制日志列表 ,和  show binary logs  同義。


mysql> show binlog events;   #只查看第一個binlog文件【最老的那個binlog】的內容
mysql> show binlog events in 'mysql-bin.000002';#查看指定binlog文件的內容
mysql> show binary logs;  #獲取binlog文件列表。即,查看當前服務器使用的biglog文件及大小【單位是Byte】
mysql> show master status; #查看當前正在寫入的binlog文件

 

 

 

 

 

 

 

 

 

 

對比otter row_selected文件和當前正在寫入的文件來判斷是否有延遲



    /**
     * 查詢當前的binlog位置
     */
    private EntryPosition findStartPosition(MysqlConnection mysqlConnection) {
        try {
            ResultSetPacket packet = mysqlConnection.query("show binlog events limit 1");
            List<String> fields = packet.getFieldValues();
            if (CollectionUtils.isEmpty(fields)) {
                throw new CanalParseException("command : 'show binlog events limit 1' has an error! pls check. you need (at least one of) the SUPER,REPLICATION CLIENT privilege(s) for this operation");
            }
            EntryPosition endPosition = new EntryPosition(fields.get(0), Long.valueOf(fields.get(1)));
            return endPosition;
        } catch (IOException e) {
            throw new CanalParseException("command : 'show binlog events limit 1' has an error!", e);
        }

    }

com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser#findStartPosition(com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection)


mysql> show binlog events [IN 'log_name'] [FROM pos] [LIMIT [offset,] row_count];
選項解析:
  IN 'log_name' 指定要查詢的binlog文件名(不指定就是第一個binlog文件)
  FROM pos 指定從哪個pos起始點開始查起(不指定就是從整個文件首個pos點開始算)
  LIMIT [offset,] 偏移量(不指定就是0)
  row_count 查詢總條數(不指定就是所有行)

A.查詢第一個(最早)的binlog日志:
  mysql> show binlog events\G;

B.指定查詢 mysql-bin.000021 這個文件:
  mysql> show binlog events in 'mysql-bin.000021'\G;

C.指定查詢 mysql-bin.000021 這個文件,從pos點:8224開始查起:
  mysql> show binlog events in 'mysql-bin.000021' from 8224\G;

D.指定查詢 mysql-bin.000021 這個文件,從pos點:8224開始查起,查詢10條
  mysql> show binlog events in 'mysql-bin.000021' from 8224 limit 10\G;

E.指定查詢 mysql-bin.000021 這個文件,從pos點:8224開始查起,偏移2行,查詢10條
  mysql> show binlog events in 'mysql-bin.000021' from 8224 limit 2,10\G;

 

 

com.alibaba.otter.node.etl.select.selector.canal.CanalEmbedSelector#selector

   public Message<EventData> selector() throws InterruptedException {
 
        。。。

        if (dump && logger.isInfoEnabled()) {
            String startPosition = null;
            String endPosition = null;
            if (!CollectionUtils.isEmpty(entries)) {
                startPosition = buildPositionForDump(entries.get(0));
                endPosition = buildPositionForDump(entries.get(entries.size() - 1));
            }

            dumpMessages(result, startPosition, endPosition, entries.size());// 記錄一下,方便追查問題
        }
        return result;
    }

 

com.alibaba.otter.node.etl.select.selector.canal.CanalEmbedSelector#dumpMessages

    /**
     * 記錄一下message對象
     */
    private synchronized void dumpMessages(Message message, String startPosition, String endPosition, int total) {
        try {
            MDC.put(OtterConstants.splitPipelineSelectLogFileKey, String.valueOf(pipelineId));
            logger.info(SEP + "****************************************************" + SEP);
            logger.info(MessageDumper.dumpMessageInfo(message, startPosition, endPosition, total));
            logger.info("****************************************************" + SEP);
            if (dumpDetail) {// 判斷一下是否需要打印詳細信息
                dumpEventDatas(message.getDatas());
                logger.info("****************************************************" + SEP);
            }
        } finally {
            MDC.remove(OtterConstants.splitPipelineSelectLogFileKey);
        }
    }

 

 com.alibaba.otter.node.etl.select.selector.canal.CanalEmbedSelector#buildPositionForDump

    private String buildPositionForDump(Entry entry) {
        long time = entry.getHeader().getExecuteTime();
        Date date = new Date(time);
        SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
        return entry.getHeader().getLogfileName() + ":" + entry.getHeader().getLogfileOffset() + ":"
               + entry.getHeader().getExecuteTime() + "(" + format.format(date) + ")";
    }

 

 

com.alibaba.otter.node.etl.select.selector.MessageDumper#dumpMessageInfo

   private static String       eventData_format       = null;
    private static int          event_default_capacity = 1024;                      // 預設值StringBuilder,減少擴容影響

    static {
        context_format = "* Batch Id: [{0}] ,total : [{1}] , normal : [{2}] , filter :[{3}] , Time : {4}" + SEP;
        context_format += "* Start : [{5}] " + SEP;
        context_format += "* End : [{6}] " + SEP;

        eventData_format = "-----------------" + SEP;
        eventData_format += "- TableId: {0} , Schema: {1} , Table: {2} " + SEP;
        eventData_format += "- Type: {3}  , ExecuteTime: {4} , Remedy: {5}" + SEP;
        eventData_format += "-----------------" + SEP;
        eventData_format += "---START" + SEP;
        eventData_format += "---Pks" + SEP;
        eventData_format += "{6}" + SEP;
        eventData_format += "---oldPks" + SEP;
        eventData_format += "{7}" + SEP;
        eventData_format += "---Columns" + SEP;
        eventData_format += "{8}" + SEP;
        eventData_format += "---END" + SEP;

    }

    public static String dumpMessageInfo(Message<EventData> message, String startPosition, String endPosition, int total) {
        Date now = new Date();
        SimpleDateFormat format = new SimpleDateFormat(TIMESTAMP_FORMAT);
        int normal = message.getDatas().size();
        return MessageFormat.format(context_format, String.valueOf(message.getId()), total, normal, total - normal,
                                    format.format(now), startPosition, endPosition);
    }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM