從這里開始,就開始接觸使用分布式系統處理大數據了。在處理大數據之前,需要有一個場景,否則技術工具無法嵌入現實當中,價值就會降低。我碰到的場景應該還是比較具有普遍性,因此大家可以在我的場景里先玩一遍,熟悉一下流程和方法,然后加以改造,加載到自己的場景和環境中。
場景:在一個大型公司內部,終端和各個業務系統之間的數據傳輸都通過網絡進行。出於監控的要求,需要在網絡上獲取所有數據包,並查看數據包里是否含有某些關鍵字。如果含有某些關鍵字,證明終端和業務系統間正在進行某種操作。系統記錄下這些操作,用於實時顯示或統計使用。
這其實就是“行為數據”的采集和記錄,是典型的大數據處理場景。
擴展一下,將該場景所使用的技術和工具加載到互聯網或APP上,就可以在不改動任何業務系統、在用戶無感知的情況下,采集用戶的行為數據並加以利用,形成用戶習慣數據。當然,也可以通過“埋點”的方式進行,但改動業務系統不要花錢嘛,能省一點是一點。
網絡數據的獲取。網絡數據通過網絡設備的“鏡像口”獲得。鏡像口的設置可以讓網管幫忙,一般可網管交換機都可以做到,思科、華三、邁普這些都沒啥問題。通過鏡像口獲取網絡數據,就可以在各個業務系統和用戶無感知的情況下獲取所有的數據了。當然,如果公司或者系統在傳輸時使用https等加密手段,這個就沒辦法了。不過一般公司很少在內網傳輸時加密。
結構如下(畫功實在是感人):
將鏡像口(也就是上圖的監聽口)接到一台服務器的RJ45網卡上,不用配IP地址,服務器這個網卡就可以接收到所有在網絡上傳輸的數據了。如下圖,eth1就是接入鏡像口的網卡。
使用 tcpdump -i eth1 可以看到網絡上傳輸的部分數據。通過源地址、目標地址,就可以大概判斷是不是公司的數據了。
tcpdump的使用很復雜,網上很多方法介紹,這里就不做解釋,直接用起來。一般來說,完整的tcpdump數據包包含了很多段,每段還有標識等等。如下圖:
可以看到網絡包中包含有一段json,這就是在網絡上傳輸的數據。
但直接通過tcpdump獲得的這些數據是不適合直接扔給分布式系統做處理的。為什么呢?
因為分布式處理是多台設備並行處理,達到擴展數據處理能力的目的。剛剛說了,完整的tcpdump數據包包含了很多段,假設這個數據包為A,A1是第一段,A2是第二段。如果同時將A1,A2並發給分布式系統處理,並且同時出結果,把結果拼起來,這是沒問題的。但現實情況是,A1比A2先處理完,先收到一個結果A1,就沒法得到一個有意義的結果。更糟糕的情況時,假如此時同時出來一個B1,把A1,B1拼在一起得到一個更錯誤的結果。
所以,要利用分布式系統處理tcpdump的數據,就要把一個完整的數據包一次性丟給分布式處理系統,保證數據包和計算結果的完整性和對應性。也就是A1數據包進去計算,得到的是A1的計算結果。
使用tcpdump命令:
tcpdump -Anvtttt -s 0 -i eth1 tcp[20:2]=0x4745 or tcp[20:2]=0x4854 or tcp[20:2]=0x504f|sed '/ IP (tos/s/^/<<interval>>/'
需要特別說明的是,tcp[20:2]=0x4745 or tcp[20:2]=0x4854 or tcp[20:2]=0x504f 是對tcpdump數據包進行初步過濾,只需要http包,其他的比如arp、rtp包之類的不需要。
使用個命令后,tcpdump得到的數據都會在第一行留一個<<interval>>作為標志,標志這個數據包開始,到下一個<<interval>>結束。這樣就可以為下一步利用JAVA清楚所有換行符做好標記。
得到的結果如下:
兩個<<interval>>之間就是一個完整的tcpdump數據包。下面結合使用JAVA,將一個完整的tcpdump數據包整合成一個能被分布式大數據處理平台進行處理的數據。其實是一個簡單的ETL過程。
使用JAVA對tcpdump數據包進行數據清洗,並加載到kafka中
1、使用eclipse新建一個maven工程,這里起名叫shell
2、使用pom.xml對依賴包進行管理,主要使用kafka的依賴包。

<dependencies> <!-- kafka --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.10.1.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.1.1</version> </dependency> <dependency> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-resources-plugin</artifactId> <version>2.3</version> </dependency> </dependencies>
3、在kafka中設置一個topic。這個kafka的配置和topic的配置以后再說,不難。如果不知道怎么弄,可以在JAVA程序里寫到一個本地文本文件里。這算是個作業,大家自己做吧。
4、將每個tcpdump包中兩個<<interval>>之間的數據進行清洗。主要是把其中的換行符替換成“|”符號,清除掉其他不想要的符號。把清洗干凈的數據放到s_right變量中等待發送。接着清洗另一條數據,清洗完第二條數據后,將s_right里的數據前移到s_left中並發送到kafka里,第二條數據放在s_right中等待發送。

package shell; import java.io.*; import java.text.SimpleDateFormat; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.Date; import java.util.Calendar; //import kafka.producer.*; //import kafka.serializer.StringEncoder; //import kafka.javaapi.producer.Producer; //import kafka.producer.Partitioner; //import kafka.producer.ProducerConfig; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; public class runShell { // private static kafka.javaapi.producer.Producer<Integer, String> producer // = null; public static void main(String[] args) throws IOException, InterruptedException, ExecutionException { String SHELL_FILE_DIR, SHELL_FILE; SHELL_FILE_DIR = "/hadoop/sh/"; //把tcpdump命令放到一個sh文件中,把文件地址寫在SHELL_FILE 中,這樣JAVA //程序就可以調用tcpdump命令了 SHELL_FILE = "./" + "getTcpdump.sh"; String s = null; String s_left = "", s_right = ""; int runningStatus = 0; Properties props = new Properties(); props.put("bootstrap.servers","你的kafka的地址和端口"); props.put("acks", "1"); props.put("retries", 0); props.put("linger.ms", 0); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 將props的配置放到kafka中 Producer<String, String> producer = new KafkaProducer<String, String>(props); // 設置kafka的topic String TOPIC = "netRawdata"; ProcessBuilder pb = new ProcessBuilder(SHELL_FILE); pb.directory(new File(SHELL_FILE_DIR)); Process p = pb.start(); BufferedReader stdInput = new BufferedReader(new InputStreamReader(p.getInputStream())); BufferedReader stdError = new BufferedReader(new InputStreamReader(p.getInputStream())); while ((s = stdInput.readLine()) != null) { if (s.startsWith("<<interval>>")) { s_left = s_right; s_right = s.substring(12); if (s_left != "") { s_left = s_left.replaceAll("\t", " "); s_left = s_left.replaceAll(" +", " "); producer.send(new ProducerRecord<String, String>(TOPIC, s_left)).get(); System.out.println(s_left); } } else { s_right = s_right + "|" + s; } } while ((s = stdError.readLine()) != null) { System.out.println(s); } try { runningStatus = p.waitFor(); if (s_right != "") { s_right = s_right.replaceAll("\t", " "); s_right = s_right.replaceAll(" +", " "); s_right = s_right.replaceAll("\n", " "); producer.send(new ProducerRecord<String, String>(TOPIC, s_right)).get(); System.out.println(s_right); producer.close(); } } catch (InterruptedException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } } }