tcptump的使用------使用JAVA與tcpdump從網絡獲取原始數據


從這里開始,就開始接觸使用分布式系統處理大數據了。在處理大數據之前,需要有一個場景,否則技術工具無法嵌入現實當中,價值就會降低。我碰到的場景應該還是比較具有普遍性,因此大家可以在我的場景里先玩一遍,熟悉一下流程和方法,然后加以改造,加載到自己的場景和環境中。

場景:在一個大型公司內部,終端和各個業務系統之間的數據傳輸都通過網絡進行。出於監控的要求,需要在網絡上獲取所有數據包,並查看數據包里是否含有某些關鍵字。如果含有某些關鍵字,證明終端和業務系統間正在進行某種操作。系統記錄下這些操作,用於實時顯示或統計使用。

這其實就是“行為數據”的采集和記錄,是典型的大數據處理場景。

擴展一下,將該場景所使用的技術和工具加載到互聯網或APP上,就可以在不改動任何業務系統、在用戶無感知的情況下,采集用戶的行為數據並加以利用,形成用戶習慣數據。當然,也可以通過“埋點”的方式進行,但改動業務系統不要花錢嘛,能省一點是一點。

 

網絡數據的獲取。網絡數據通過網絡設備的“鏡像口”獲得。鏡像口的設置可以讓網管幫忙,一般可網管交換機都可以做到,思科、華三、邁普這些都沒啥問題。通過鏡像口獲取網絡數據,就可以在各個業務系統和用戶無感知的情況下獲取所有的數據了。當然,如果公司或者系統在傳輸時使用https等加密手段,這個就沒辦法了。不過一般公司很少在內網傳輸時加密。

結構如下(畫功實在是感人):

 

將鏡像口(也就是上圖的監聽口)接到一台服務器的RJ45網卡上,不用配IP地址,服務器這個網卡就可以接收到所有在網絡上傳輸的數據了。如下圖,eth1就是接入鏡像口的網卡。

 

使用 tcpdump -i eth1 可以看到網絡上傳輸的部分數據。通過源地址、目標地址,就可以大概判斷是不是公司的數據了。

tcpdump的使用很復雜,網上很多方法介紹,這里就不做解釋,直接用起來。一般來說,完整的tcpdump數據包包含了很多段,每段還有標識等等。如下圖:

可以看到網絡包中包含有一段json,這就是在網絡上傳輸的數據。

 

但直接通過tcpdump獲得的這些數據是不適合直接扔給分布式系統做處理的。為什么呢?

因為分布式處理是多台設備並行處理,達到擴展數據處理能力的目的。剛剛說了,完整的tcpdump數據包包含了很多段,假設這個數據包為A,A1是第一段,A2是第二段。如果同時將A1,A2並發給分布式系統處理,並且同時出結果,把結果拼起來,這是沒問題的。但現實情況是,A1比A2先處理完,先收到一個結果A1,就沒法得到一個有意義的結果。更糟糕的情況時,假如此時同時出來一個B1,把A1,B1拼在一起得到一個更錯誤的結果。

所以,要利用分布式系統處理tcpdump的數據,就要把一個完整的數據包一次性丟給分布式處理系統,保證數據包和計算結果的完整性和對應性。也就是A1數據包進去計算,得到的是A1的計算結果。

使用tcpdump命令:

tcpdump -Anvtttt -s 0 -i eth1 tcp[20:2]=0x4745 or tcp[20:2]=0x4854 or tcp[20:2]=0x504f|sed '/ IP (tos/s/^/<<interval>>/'

需要特別說明的是,tcp[20:2]=0x4745 or tcp[20:2]=0x4854 or tcp[20:2]=0x504f 是對tcpdump數據包進行初步過濾,只需要http包,其他的比如arp、rtp包之類的不需要。

使用個命令后,tcpdump得到的數據都會在第一行留一個<<interval>>作為標志,標志這個數據包開始,到下一個<<interval>>結束。這樣就可以為下一步利用JAVA清楚所有換行符做好標記。

得到的結果如下:

兩個<<interval>>之間就是一個完整的tcpdump數據包。下面結合使用JAVA,將一個完整的tcpdump數據包整合成一個能被分布式大數據處理平台進行處理的數據。其實是一個簡單的ETL過程。

 

 

使用JAVA對tcpdump數據包進行數據清洗,並加載到kafka中

1、使用eclipse新建一個maven工程,這里起名叫shell

2、使用pom.xml對依賴包進行管理,主要使用kafka的依賴包。

<dependencies>
<!-- kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>2.3</version>
</dependency>
</dependencies>
View Code

 

3、在kafka中設置一個topic。這個kafka的配置和topic的配置以后再說,不難。如果不知道怎么弄,可以在JAVA程序里寫到一個本地文本文件里。這算是個作業,大家自己做吧。

4、將每個tcpdump包中兩個<<interval>>之間的數據進行清洗。主要是把其中的換行符替換成“|”符號,清除掉其他不想要的符號。把清洗干凈的數據放到s_right變量中等待發送。接着清洗另一條數據,清洗完第二條數據后,將s_right里的數據前移到s_left中並發送到kafka里,第二條數據放在s_right中等待發送。

package shell;

import java.io.*;
import java.text.SimpleDateFormat;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.Date;
import java.util.Calendar;

//import kafka.producer.*;
//import kafka.serializer.StringEncoder;
//import kafka.javaapi.producer.Producer;
//import kafka.producer.Partitioner;
//import kafka.producer.ProducerConfig;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class runShell {

    // private static kafka.javaapi.producer.Producer<Integer, String> producer
    // = null;

    public static void main(String[] args) throws IOException, InterruptedException, ExecutionException {

        String SHELL_FILE_DIR, SHELL_FILE;
        SHELL_FILE_DIR = "/hadoop/sh/";
         //把tcpdump命令放到一個sh文件中,把文件地址寫在SHELL_FILE 中,這樣JAVA
         //程序就可以調用tcpdump命令了
        SHELL_FILE = "./" + "getTcpdump.sh";
        String s = null;
        String s_left = "", s_right = "";
        int runningStatus = 0;

        Properties props = new Properties();
        props.put("bootstrap.servers","你的kafka的地址和端口");
        props.put("acks", "1");
        props.put("retries", 0);
        props.put("linger.ms", 0);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 將props的配置放到kafka中
        Producer<String, String> producer = new KafkaProducer<String, String>(props);


        // 設置kafka的topic
        String TOPIC = "netRawdata";

        ProcessBuilder pb = new ProcessBuilder(SHELL_FILE);
        pb.directory(new File(SHELL_FILE_DIR));

        Process p = pb.start();

        BufferedReader stdInput = new BufferedReader(new InputStreamReader(p.getInputStream()));
        BufferedReader stdError = new BufferedReader(new InputStreamReader(p.getInputStream()));
        while ((s = stdInput.readLine()) != null) {

            if (s.startsWith("<<interval>>")) {
                s_left = s_right;
                s_right = s.substring(12);

                if (s_left != "") {
                    s_left = s_left.replaceAll("\t", " ");
                    s_left = s_left.replaceAll(" +", " ");
                    producer.send(new ProducerRecord<String, String>(TOPIC, s_left)).get();
                    System.out.println(s_left);
                }

            } else {

                s_right = s_right + "|" + s;
            }

        }
        while ((s = stdError.readLine()) != null) {
            System.out.println(s);
        }
        try {
            runningStatus = p.waitFor();

            if (s_right != "") {
                s_right = s_right.replaceAll("\t", " ");
                s_right = s_right.replaceAll(" +", " ");
                s_right = s_right.replaceAll("\n", " ");
                producer.send(new ProducerRecord<String, String>(TOPIC, s_right)).get();
                System.out.println(s_right);
                producer.close();
            }

        } catch (InterruptedException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        }

    }

}
View Code

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM