采集文件到kafka


  采集指定目錄下文本數據到kafka

package com.shenyuchong;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Date;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.regex.Pattern;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
/*
 * 用途:
 *         用於收集多個文件夾下文件內容到kafka;
 *         文件一行一行發送;
 *         支持發送完成后發出通知
 *         文件發送完成后會將文件添加.COMPLETED后綴
 *         支持采集指定后綴(多個)
 *         支持對行進行正則,不匹配的行丟棄
 *         僅支持對行進行分隔符切分
 *         支持將切分后的字段按新分隔符重組
 * 
 * 用法:
 *         mvn package打包成jar包:
 *             file2kafka-2.0.jar
 *         編寫配置文件xxx.conf內容如下:
 *             ip=192.168.1.91
 *             threadnum=20
 *             port=9092
 *             topic=customertopic
 *             path=/home/ftpuser/customer
 *             includesuffix=txt
 *             lineregex=^#\d.*$
 *             delimiter=\s+
 *             noticeurl=http://192.168.1.92:6009/schedule/customer
 *             fieldsquence=id,name,score
 *         執行:
 *             java -jar file2kafka-2.0.jar xxx.conf
 *         建議:用linux crontab進行定時執行(對同一個目錄進行多次采集不會造成數據重復發送)
 */
public class App {
    public static String fieldSquence = "";
    public static int    fieldNum = 0;
    public static String ip = "";
    public static String port = "";
    public static String path = "";
    public static String threadNum = "5";
    public static String topic = "";
    public static String lineRegex = "^.*$";
    public static String delimiter = "\\s+";
    public static String delimiter2 = "|||";
    public static String includeSuffix = "aSuffix,bSuffix";
    public static Pattern linePattern =null;
    public static Properties props =null;
    public static String noticeUrl;
    public static void main(String[] args) {
        /*
         * 配置文件若不存在則拋出異常
         */
        if(args.length<1){
            try {
                throw new Exception("無配置文件");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        try {
            BufferedReader br = new BufferedReader(new FileReader(new File(args[0])));
            String line="";
            while((line=br.readLine())!=null){
                line = line.replaceAll("\\s+", "");
                if(line.indexOf("=")!=-1){
                    String[] kv=line.split("=");
                    String k= kv[0];
                    String v= kv[1];
                    if (k.equals("port"))          port = v;            //kafka 端口
                    if (k.equals("ip"))            ip = v;              //kafka 主機地址
                    if (k.equals("topic"))         topic = v;           //kafka 主題
                    if (k.equals("fieldsquence"))  fieldSquence = v;    //字段序列,逗號隔開
                    if (k.equals("threadnum"))     threadNum = v;       //采集線程數
                    if (k.equals("path"))          path = v;            //采集的目錄,多目錄逗號隔開
                    if (k.equals("lineregex"))     lineRegex=v;         //行正則,不匹配的行數據丟棄
                    if (k.equals("delimiter"))     delimiter=v;         //字段分隔符
                    if (k.equals("delimiter2"))    delimiter2=v;        //重組分隔符(發送到Kafka)
                    if (k.equals("includesuffix")) includeSuffix=v;     //包含文件的后綴
                    if (k.equals("noticeurl"))     noticeUrl=v;         //采集完成通知的接口

                }
            }
            br.close();
        } catch (IOException e1) {
            e1.printStackTrace();
        }
        /*
         * kafka配置
         */
        props = new Properties();
        props.put("bootstrap.servers", ip+":"+port);
        props.put("acks", "all");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        /*
         * 將字段序列按逗號分隔,並獲取字段序數目
         */
        fieldNum = fieldSquence.split(",").length;
        /*
         * 行數據正則Pattern
         */
        linePattern= Pattern.compile(lineRegex);
        /*
         * 線程池
         */
        ExecutorService es = Executors.newFixedThreadPool(Integer.valueOf(threadNum));
        /*
         * 根據path目錄獲取文件
         * 根據includesuffix選中文件調用send(file)
         * 每個文件創建一個線程(線程實際總數由threadNum決定)
         */
        for(String path:path.split(",")){
            File dir=new File(path);
            File[] files = dir.listFiles();
            for(final File file:files){
                for(String suffix:includeSuffix.split(",")){
                    if(file.getAbsolutePath().endsWith(suffix)){
                        es.submit(new Runnable() {
                            @Override
                            public void run() {
                                send(file);                            
                            }
                        });
                    }
                }
                
            }
        }
        /*
         * 關閉線程池
         */
        es.shutdown();
        /*
         * 線程池停止后通知后續服務直到后續服務接受了請求
         */
        boolean stop=false,noticed=false;
        try {
            while(!stop||!noticed){
                if (es.isTerminated()) { 
                    stop=true;
                } 
                Thread.sleep(2000);
                if(stop){
                    noticed = connectSuccess(noticeUrl);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    /*
     * 讀取文件並發送到kafka,文件內容發送完成后將文件添加.COMPLETED后綴
     */
    public static void send(File file){
        BufferedReader bf =null;
        StringBuffer sb = null;
        try {            
            bf = new BufferedReader(new FileReader(file));
            String line = null;
            Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
            while((line = bf.readLine())!=null){
                sb = new StringBuffer();
                line = line.trim();
                if(linePattern.matcher(line).matches()){
                    String[] fields = line.split(delimiter);
                    if(fields.length<fieldNum){
                    }else{
                        for(String fieldValue:fields)
                            sb.append(fieldValue).append(delimiter2);
                        sb.append(file.getAbsolutePath());
                        producer.send(new ProducerRecord<String, String>(topic, String.valueOf((new Date()).getTime()), sb.toString()),new Callback() {
                            @Override
                            public void onCompletion(RecordMetadata rm, Exception e) {
                                if(e!=null)System.out.println("send fail"+rm.toString()+",e:"+e.getMessage());
                            }
                        });
                    }
                }else{
                }
            }
            producer.close();
        } catch (Exception e) {
            System.out.println(e.toString());
        }finally {
            if(bf!=null)
                try {
                    bf.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
        }
        file.renameTo(new File(file.getAbsolutePath()+".COMPLETED"));
    }
    /*
     * 根據地址請求服務,請求成功則返回true
     */
    public static boolean connectSuccess(String path){
        URL url;
        try {
            url = new URL(noticeUrl);
            HttpURLConnection con = (HttpURLConnection) url.openConnection();
            if(con.getResponseCode()==200) return true;
        } catch (Exception e) {
            return false;
        }
        return false;
    }
}

 

 

   配置文件編寫customer2kafka.conf

ip=192.168.1.91
threadnum=20
port=9092
topic=customertopic
path=/home/ftpuser/customer
includesuffix=txt
lineregex=^#\d.*$
delimiter=\s+
noticeurl=http://192.168.1.92:6009/schedule/customer
fieldsquence=id,name,score

  maven打包執行:

java -jar file2kafka-2.0.jar /opt/app/file2kafka/customer2kafka.conf

  pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.shenyuchong</groupId>
<artifactId>file2kafka</artifactId>
<version>2.0</version>
<packaging>jar</packaging>

<name>file2hive</name>
<url>http://maven.apache.org</url>

<properties>
  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
  <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.0.0</version><!--$NO-MVN-MAN-VER$ -->
  </dependency>
</dependencies>
<build>
  <sourceDirectory>src</sourceDirectory>
  <plugins>
    <plugin>
      <artifactId>maven-assembly-plugin</artifactId>
      <configuration>
        <appendAssemblyId>false</appendAssemblyId>
        <descriptorRefs>
          <descriptorRef>jar-with-dependencies</descriptorRef>
        </descriptorRefs>
        <archive>
          <manifest>
            <mainClass>com.gbd.App</mainClass>
          </manifest>
        </archive>
      </configuration>
      <executions>
        <execution>
        <id>make-assembly</id>
        <phase>package</phase>
        <goals>
          <goal>assembly</goal>
        </goals>
      </execution>
    </executions>
</plugin>

</plugins>
</build>
</project>

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM