基於Flume做FTP文件實時同步的windows服務。


需求:做一個windows服務,實現從ftp服務器實時下載或者更新文件到本地磁盤。

功能挺簡單的。直接寫個ftp工具類用定時器跑就能搞定,那我為什么不用呢?

別問,問就是我無聊啊,然后研究一下Flume打發時間。哈哈~

一、Flume部分

Source組件和Sink組件用的都是第三方。

source組件:https://github.com/keedio/flume-ftp-source

Sink組件用的誰的目前已經找不到了,網上搜到了一個升級版的。

File sink組件:https://github.com/huyanping/flume-sinks-safe-roll-file-sink

因為一些個性化的需求,所以我對他們源代碼做了些變動。

2019/02/15: 新增了采集至HDFS的sink.因為flume自帶的hdfs sink不支持高可用環境。所以依然對源代碼做了些改動

具體修改:

HDFSEventSink.java

 1 public void configurateHA(Context context) {
 2       String nns = Preconditions.checkNotNull(
 3               context.getString("nameNodeServer"), "nameNodeServer is required");
 4       hdfsEnv.set("fs.defaultFS", "hdfs://" + nns);
 5       hdfsEnv.set("dfs.nameservices", nns);
 6       hdfsEnv.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem");
 7 
 8       Map<String, String> servers = context.getSubProperties("server.");
 9       List<String> serverNames = Lists.newArrayListWithExpectedSize(servers.size());
10 
11       servers.forEach((key, value) -> {
12           String name = Preconditions.checkNotNull(
13                   key, "server.name is required");
14           String[] hostAndPort = value.split(":");
15           Preconditions.checkArgument(2 == hostAndPort.length, "hdfs.server is error.");
16 
17           hdfsEnv.set(String.format("dfs.namenode.rpc-address.%s.%s", nns, name), value);
18           serverNames.add(name);
19       });
20 
21       hdfsEnv.set(String.format("dfs.ha.namenodes.%s", nns), Joiner.on(",").join(serverNames));
22       hdfsEnv.set(String.format("dfs.client.failover.proxy.provider.%s", nns),
23               "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
24       hdfsEnv.setBoolean("fs.automatic.close", false);
25   }

Flume偽集群配置

1 # Describe the sink
2 a1.sinks.k1.type = com.syher.flume.sink.hdfs.HDFSEventSink
3 # 目標路徑
4 a1.sinks.k1.hdfs.path = hdfs://bigData:8020/flume-wrapper/pdf-201901301740
5 a1.sinks.k1.hdfs.fileType = DataStream
6 a1.sinks.k1.hdfs.useLocalTimeStamp = true
7 #a1.sinks.k1.hdfs.batchSize = 3000 
8 #a1.sinks.k1.hdfs.rollSize = 1024000000
9 #a1.sinks.k1.hdfs.rollCount = 0

 

Flume高可用配置文件

 1 # Describe the sink
 2 a1.sinks.k1.type = com.syher.flume.sink.hdfs.HDFSEventSink
 3 # hdfs服務器是否高可用
 4 a1.sink.k1.hdfs.HA = true
 5 # 目標路徑
 6 a1.sinks.k1.hdfs.path = hdfs://bigData/flume-wrapper/pdf-201901301805
 7 a1.sinks.k1.hdfs.nameNodeServer = bigData
 8 a1.sink.k1.hdfs.server.nn1 = master:9000
 9 a1.sink.k1.hdfs.server.nn2 = slave1:9000
10 a1.sinks.k1.hdfs.fileType = DataStream
11 a1.sinks.k1.hdfs.useLocalTimeStamp = true
12 #a1.sinks.k1.hdfs.batchSize = 3000 
13 #a1.sinks.k1.hdfs.rollSize = 0
14 #a1.sinks.k1.hdfs.rollInterval = 60
15 #a1.sinks.k1.hdfs.rollCount = 0

 

具體代碼參考:https://github.com/rxiu/study-on-road/tree/master/trickle-flume

Ftp-Source組件的關鍵技術是Apache FtpClient,而TailDir-sink則用的RandomAccessFile。

Junit測試類我已經寫好了,如果不想安裝服務又有興趣了解的朋友,可以自己改下配置跑一下看看。

 

二、JSW服務部分

用的java service wrapper把java程序做成了windows服務。

JSW工具包地址:https://pan.baidu.com/s/1Mg483tA0USYqFZ_bNV30tg 提取碼:ejda

解壓后在conf目錄可以看到兩個配置文件。一個是flume的,一個是jsw的。

bin目錄里面是一些裝卸啟停的批命令。

lib目錄里面有項目運行依賴的jar包。

lib.d目錄沒啥用,是我備份了從flume拷出來的一些無用的jar包。可刪。

具體的配置和用法可以看壓縮包里的使用說明文檔。

注意,jsw的logfile的日志級別最好指定ERROR級別的,不然聽說、可能會造成內存不足。

 

三、采集結果

 可以看到,文件采集效率還是很穩的。一分鍾不到就搞定了。

hdfs采集結果:

 

 

四、問題記錄

hdfs采集時,用junit測試沒有問題,用jsw測試一直沒動靜,也不報錯。然后開了遠程調試。調試方法:

在wrapper.conf中加入如下代碼:

1 # remote debug
2 #wrapper.java.additional.1=-Xdebug
3 #wrapper.java.additional.2=-Xnoagent
4 #wrapper.java.additional.3=-Djava.compiler=NONE
5 
6 #wrapper.java.additional.4=-Xrunjdwp:transport=dt_socket,server=y,address=5005,suspend=y

遠程聯調以后,終於有拋異常了

java.lang.NoClassDefFoundError: Could not initialize class org.apache.commons.lang.SystemUtils

找了下lib文件夾,里面確實有這個包,也沒沖突。不得已在SystemUtils類里面打了個斷電一步一步調試。最后發現是java.version的問題。

jdk10版本在下面代碼的第5行報錯了。因為JAVA_VERSION_TRIMMED值是10長度只有兩位導致了越界。

 1     private static float getJavaVersionAsFloat() {
 2         if (JAVA_VERSION_TRIMMED == null) {
 3             return 0f;
 4         }
 5         String str = JAVA_VERSION_TRIMMED.substring(0, 3);
 6         if (JAVA_VERSION_TRIMMED.length() >= 5) {
 7             str = str + JAVA_VERSION_TRIMMED.substring(4, 5);
 8         }
 9         try {
10             return Float.parseFloat(str);
11         } catch (Exception ex) {
12             return 0;
13         }
14     }

由於之前為了學習jdk10新特性,搞了個jdk8,jdk10雙環境,然后換回jdk8的時候是直接改的JAVA_HOME。

所以在dos窗口敲了下java -version 輸出的是1.8_xxxxx。

但是不知道為啥java的System.getProperties("java.version")獲取的依舊是10。

然后重新檢查了JDK的J三個環境變量。

最后發現把PATH中的一個C:\ProgramData\Oracle\Java\javapath;路徑去掉就沒問題了。

1 C:\ProgramData\Oracle\Java\javapath;G:\syhenian\oracle_11_21;E:\Program Files\Python36\Scripts\;E:\Program Files\Python36\;%SystemRoot%\system32;%SystemRoot%;%SystemRoot%\System32\Wbem;%SYSTEMROOT%\System32\WindowsPowerShell\v1.0\;%JAVA_HOME%\bin;%JAVA_HOME%\jre\bin;%M2_HOME%\bin;C:\Program Files (x86)\Intel\OpenCL SDK\2.0\bin\x86;C:\Program Files (x86)\Intel\OpenCL SDK\2.0\bin\x64;%CURL_HOME%\bin;E:\Program Files\scala\bin;%SCALA_HOME%\bin;%SCALA_HOME%\jre\bin;E:\Program Files\Python36;C:\Program Files (x86)\Git\cmd;%GRADLE_HOME%\bin;%ANDROID_HOME%\tools;
2 %ANDROID_HOME%\platform-tools;%ANDROID_HOME%\build-tools\27.0.3;E:\Program Files\7-Zip;F:\Program Files\nodejs\;%HADOOP_HOME%\bin;

總結:common-lang包不知道升級管不管用,反正flume自帶的這個包暫時是不支持jdk10的。有需要的可以自己改源碼。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM