采集指定目錄下文本數據到kafka
package com.shenyuchong; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.IOException; import java.net.HttpURLConnection; import java.net.URL; import java.util.Date; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.regex.Pattern; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; /* * 用途: * 用於收集多個文件夾下文件內容到kafka; * 文件一行一行發送; * 支持發送完成后發出通知 * 文件發送完成后會將文件添加.COMPLETED后綴 * 支持采集指定后綴(多個) * 支持對行進行正則,不匹配的行丟棄 * 僅支持對行進行分隔符切分 * 支持將切分后的字段按新分隔符重組 * * 用法: * mvn package打包成jar包: * file2kafka-2.0.jar * 編寫配置文件xxx.conf內容如下: * ip=192.168.1.91 * threadnum=20 * port=9092 * topic=customertopic * path=/home/ftpuser/customer * includesuffix=txt * lineregex=^#\d.*$ * delimiter=\s+ * noticeurl=http://192.168.1.92:6009/schedule/customer * fieldsquence=id,name,score * 執行: * java -jar file2kafka-2.0.jar xxx.conf * 建議:用linux crontab進行定時執行(對同一個目錄進行多次采集不會造成數據重復發送) */ public class App { public static String fieldSquence = ""; public static int fieldNum = 0; public static String ip = ""; public static String port = ""; public static String path = ""; public static String threadNum = "5"; public static String topic = ""; public static String lineRegex = "^.*$"; public static String delimiter = "\\s+"; public static String delimiter2 = "|||"; public static String includeSuffix = "aSuffix,bSuffix"; public static Pattern linePattern =null; public static Properties props =null; public static String noticeUrl; public static void main(String[] args) { /* * 配置文件若不存在則拋出異常 */ if(args.length<1){ try { throw new Exception("無配置文件"); } catch (Exception e) { e.printStackTrace(); } } try { BufferedReader br = new BufferedReader(new FileReader(new File(args[0]))); String line=""; while((line=br.readLine())!=null){ line = line.replaceAll("\\s+", ""); if(line.indexOf("=")!=-1){ String[] kv=line.split("="); String k= kv[0]; String v= kv[1]; if (k.equals("port")) port = v; //kafka 端口 if (k.equals("ip")) ip = v; //kafka 主機地址 if (k.equals("topic")) topic = v; //kafka 主題 if (k.equals("fieldsquence")) fieldSquence = v; //字段序列,逗號隔開 if (k.equals("threadnum")) threadNum = v; //采集線程數 if (k.equals("path")) path = v; //采集的目錄,多目錄逗號隔開 if (k.equals("lineregex")) lineRegex=v; //行正則,不匹配的行數據丟棄 if (k.equals("delimiter")) delimiter=v; //字段分隔符 if (k.equals("delimiter2")) delimiter2=v; //重組分隔符(發送到Kafka) if (k.equals("includesuffix")) includeSuffix=v; //包含文件的后綴 if (k.equals("noticeurl")) noticeUrl=v; //采集完成通知的接口 } } br.close(); } catch (IOException e1) { e1.printStackTrace(); } /* * kafka配置 */ props = new Properties(); props.put("bootstrap.servers", ip+":"+port); props.put("acks", "all"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); /* * 將字段序列按逗號分隔,並獲取字段序數目 */ fieldNum = fieldSquence.split(",").length; /* * 行數據正則Pattern */ linePattern= Pattern.compile(lineRegex); /* * 線程池 */ ExecutorService es = Executors.newFixedThreadPool(Integer.valueOf(threadNum)); /* * 根據path目錄獲取文件 * 根據includesuffix選中文件調用send(file) * 每個文件創建一個線程(線程實際總數由threadNum決定) */ for(String path:path.split(",")){ File dir=new File(path); File[] files = dir.listFiles(); for(final File file:files){ for(String suffix:includeSuffix.split(",")){ if(file.getAbsolutePath().endsWith(suffix)){ es.submit(new Runnable() { @Override public void run() { send(file); } }); } } } } /* * 關閉線程池 */ es.shutdown(); /* * 線程池停止后通知后續服務直到后續服務接受了請求 */ boolean stop=false,noticed=false; try { while(!stop||!noticed){ if (es.isTerminated()) { stop=true; } Thread.sleep(2000); if(stop){ noticed = connectSuccess(noticeUrl); } } } catch (Exception e) { e.printStackTrace(); } } /* * 讀取文件並發送到kafka,文件內容發送完成后將文件添加.COMPLETED后綴 */ public static void send(File file){ BufferedReader bf =null; StringBuffer sb = null; try { bf = new BufferedReader(new FileReader(file)); String line = null; Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); while((line = bf.readLine())!=null){ sb = new StringBuffer(); line = line.trim(); if(linePattern.matcher(line).matches()){ String[] fields = line.split(delimiter); if(fields.length<fieldNum){ }else{ for(String fieldValue:fields) sb.append(fieldValue).append(delimiter2); sb.append(file.getAbsolutePath()); producer.send(new ProducerRecord<String, String>(topic, String.valueOf((new Date()).getTime()), sb.toString()),new Callback() { @Override public void onCompletion(RecordMetadata rm, Exception e) { if(e!=null)System.out.println("send fail"+rm.toString()+",e:"+e.getMessage()); } }); } }else{ } } producer.close(); } catch (Exception e) { System.out.println(e.toString()); }finally { if(bf!=null) try { bf.close(); } catch (Exception e) { e.printStackTrace(); } } file.renameTo(new File(file.getAbsolutePath()+".COMPLETED")); } /* * 根據地址請求服務,請求成功則返回true */ public static boolean connectSuccess(String path){ URL url; try { url = new URL(noticeUrl); HttpURLConnection con = (HttpURLConnection) url.openConnection(); if(con.getResponseCode()==200) return true; } catch (Exception e) { return false; } return false; } }
配置文件編寫customer2kafka.conf
ip=192.168.1.91 threadnum=20 port=9092 topic=customertopic path=/home/ftpuser/customer includesuffix=txt lineregex=^#\d.*$ delimiter=\s+ noticeurl=http://192.168.1.92:6009/schedule/customer fieldsquence=id,name,score
maven打包執行:
java -jar file2kafka-2.0.jar /opt/app/file2kafka/customer2kafka.conf
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.shenyuchong</groupId> <artifactId>file2kafka</artifactId> <version>2.0</version> <packaging>jar</packaging> <name>file2hive</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.0.0</version><!--$NO-MVN-MAN-VER$ --> </dependency> </dependencies> <build> <sourceDirectory>src</sourceDirectory> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <appendAssemblyId>false</appendAssemblyId> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>com.gbd.App</mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>assembly</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
