需求:做一個windows服務,實現從ftp服務器實時下載或者更新文件到本地磁盤。
功能挺簡單的。直接寫個ftp工具類用定時器跑就能搞定,那我為什么不用呢?
別問,問就是我無聊啊,然后研究一下Flume打發時間。哈哈~
一、Flume部分
Source組件和Sink組件用的都是第三方。
source組件:https://github.com/keedio/flume-ftp-source
Sink組件用的誰的目前已經找不到了,網上搜到了一個升級版的。
File sink組件:https://github.com/huyanping/flume-sinks-safe-roll-file-sink
因為一些個性化的需求,所以我對他們源代碼做了些變動。
2019/02/15: 新增了采集至HDFS的sink.因為flume自帶的hdfs sink不支持高可用環境。所以依然對源代碼做了些改動
具體修改:
HDFSEventSink.java
1 public void configurateHA(Context context) { 2 String nns = Preconditions.checkNotNull( 3 context.getString("nameNodeServer"), "nameNodeServer is required"); 4 hdfsEnv.set("fs.defaultFS", "hdfs://" + nns); 5 hdfsEnv.set("dfs.nameservices", nns); 6 hdfsEnv.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem"); 7 8 Map<String, String> servers = context.getSubProperties("server."); 9 List<String> serverNames = Lists.newArrayListWithExpectedSize(servers.size()); 10 11 servers.forEach((key, value) -> { 12 String name = Preconditions.checkNotNull( 13 key, "server.name is required"); 14 String[] hostAndPort = value.split(":"); 15 Preconditions.checkArgument(2 == hostAndPort.length, "hdfs.server is error."); 16 17 hdfsEnv.set(String.format("dfs.namenode.rpc-address.%s.%s", nns, name), value); 18 serverNames.add(name); 19 }); 20 21 hdfsEnv.set(String.format("dfs.ha.namenodes.%s", nns), Joiner.on(",").join(serverNames)); 22 hdfsEnv.set(String.format("dfs.client.failover.proxy.provider.%s", nns), 23 "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"); 24 hdfsEnv.setBoolean("fs.automatic.close", false); 25 }
Flume偽集群配置
1 # Describe the sink 2 a1.sinks.k1.type = com.syher.flume.sink.hdfs.HDFSEventSink 3 # 目標路徑 4 a1.sinks.k1.hdfs.path = hdfs://bigData:8020/flume-wrapper/pdf-201901301740 5 a1.sinks.k1.hdfs.fileType = DataStream 6 a1.sinks.k1.hdfs.useLocalTimeStamp = true 7 #a1.sinks.k1.hdfs.batchSize = 3000 8 #a1.sinks.k1.hdfs.rollSize = 1024000000 9 #a1.sinks.k1.hdfs.rollCount = 0
Flume高可用配置文件
1 # Describe the sink 2 a1.sinks.k1.type = com.syher.flume.sink.hdfs.HDFSEventSink 3 # hdfs服務器是否高可用 4 a1.sink.k1.hdfs.HA = true 5 # 目標路徑 6 a1.sinks.k1.hdfs.path = hdfs://bigData/flume-wrapper/pdf-201901301805 7 a1.sinks.k1.hdfs.nameNodeServer = bigData 8 a1.sink.k1.hdfs.server.nn1 = master:9000 9 a1.sink.k1.hdfs.server.nn2 = slave1:9000 10 a1.sinks.k1.hdfs.fileType = DataStream 11 a1.sinks.k1.hdfs.useLocalTimeStamp = true 12 #a1.sinks.k1.hdfs.batchSize = 3000 13 #a1.sinks.k1.hdfs.rollSize = 0 14 #a1.sinks.k1.hdfs.rollInterval = 60 15 #a1.sinks.k1.hdfs.rollCount = 0
具體代碼參考:https://github.com/rxiu/study-on-road/tree/master/trickle-flume
Ftp-Source組件的關鍵技術是Apache FtpClient,而TailDir-sink則用的RandomAccessFile。
Junit測試類我已經寫好了,如果不想安裝服務又有興趣了解的朋友,可以自己改下配置跑一下看看。
二、JSW服務部分
用的java service wrapper把java程序做成了windows服務。
JSW工具包地址:https://pan.baidu.com/s/1Mg483tA0USYqFZ_bNV30tg
提取碼:ejda
解壓后在conf目錄可以看到兩個配置文件。一個是flume的,一個是jsw的。
bin目錄里面是一些裝卸啟停的批命令。
lib目錄里面有項目運行依賴的jar包。
lib.d目錄沒啥用,是我備份了從flume拷出來的一些無用的jar包。可刪。
具體的配置和用法可以看壓縮包里的使用說明文檔。
注意,jsw的logfile的日志級別最好指定ERROR級別的,不然聽說、可能會造成內存不足。
三、采集結果
可以看到,文件采集效率還是很穩的。一分鍾不到就搞定了。
hdfs采集結果:
四、問題記錄
hdfs采集時,用junit測試沒有問題,用jsw測試一直沒動靜,也不報錯。然后開了遠程調試。調試方法:
在wrapper.conf中加入如下代碼:
1 # remote debug 2 #wrapper.java.additional.1=-Xdebug 3 #wrapper.java.additional.2=-Xnoagent 4 #wrapper.java.additional.3=-Djava.compiler=NONE 5 6 #wrapper.java.additional.4=-Xrunjdwp:transport=dt_socket,server=y,address=5005,suspend=y
遠程聯調以后,終於有拋異常了
java.lang.NoClassDefFoundError: Could not initialize class org.apache.commons.lang.SystemUtils
找了下lib文件夾,里面確實有這個包,也沒沖突。不得已在SystemUtils類里面打了個斷電一步一步調試。最后發現是java.version的問題。
jdk10版本在下面代碼的第5行報錯了。因為JAVA_VERSION_TRIMMED值是10長度只有兩位導致了越界。
1 private static float getJavaVersionAsFloat() { 2 if (JAVA_VERSION_TRIMMED == null) { 3 return 0f; 4 } 5 String str = JAVA_VERSION_TRIMMED.substring(0, 3); 6 if (JAVA_VERSION_TRIMMED.length() >= 5) { 7 str = str + JAVA_VERSION_TRIMMED.substring(4, 5); 8 } 9 try { 10 return Float.parseFloat(str); 11 } catch (Exception ex) { 12 return 0; 13 } 14 }
由於之前為了學習jdk10新特性,搞了個jdk8,jdk10雙環境,然后換回jdk8的時候是直接改的JAVA_HOME。
所以在dos窗口敲了下java -version 輸出的是1.8_xxxxx。
但是不知道為啥java的System.getProperties("java.version")獲取的依舊是10。
然后重新檢查了JDK的J三個環境變量。
最后發現把PATH中的一個C:\ProgramData\Oracle\Java\javapath;路徑去掉就沒問題了。
1 C:\ProgramData\Oracle\Java\javapath;G:\syhenian\oracle_11_21;E:\Program Files\Python36\Scripts\;E:\Program Files\Python36\;%SystemRoot%\system32;%SystemRoot%;%SystemRoot%\System32\Wbem;%SYSTEMROOT%\System32\WindowsPowerShell\v1.0\;%JAVA_HOME%\bin;%JAVA_HOME%\jre\bin;%M2_HOME%\bin;C:\Program Files (x86)\Intel\OpenCL SDK\2.0\bin\x86;C:\Program Files (x86)\Intel\OpenCL SDK\2.0\bin\x64;%CURL_HOME%\bin;E:\Program Files\scala\bin;%SCALA_HOME%\bin;%SCALA_HOME%\jre\bin;E:\Program Files\Python36;C:\Program Files (x86)\Git\cmd;%GRADLE_HOME%\bin;%ANDROID_HOME%\tools; 2 %ANDROID_HOME%\platform-tools;%ANDROID_HOME%\build-tools\27.0.3;E:\Program Files\7-Zip;F:\Program Files\nodejs\;%HADOOP_HOME%\bin;
總結:common-lang包不知道升級管不管用,反正flume自帶的這個包暫時是不支持jdk10的。有需要的可以自己改源碼。