Maven+Eclipse+SparkStreaming+Kafka整合


版本號:

maven3.5.0     scala IDE for Eclipse:版本(4.6.1)    spark-2.1.1-bin-hadoop2.7    kafka_2.11-0.8.2.1   JDK1.8

基礎環境:

Maven3.5.0安裝與配置+Eclipse應用

Maven下載項目依賴jar包和使用方法

maven中把依賴的JAR包一起打包

MAVEN Scope使用

一、指定JDK為1.8

在pom.xml配置文件中添加以下參數即可:

 
        
  1. <properties>
  2. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  3. <encoding>UTF-8</encoding>
  4.     <java.version>1.8</java.version>
  5.     <maven.compiler.source>1.8</maven.compiler.source>
  6.     <maven.compiler.target>1.8</maven.compiler.target>
  7. </properties>
 
        
  1. <plugin>  
  2.     <groupId>org.apache.maven.plugins</groupId>  
  3.     <artifactId>maven-compiler-plugin</artifactId>  
  4.     <configuration>  
  5.         <source>1.8</source>  
  6.         <target>1.8</target>  
  7.     </configuration>  
  8. </plugin>

配置之后的pom.xml文件如下:

 
        
  1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  2. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  3. <modelVersion>4.0.0</modelVersion>
  4.  
  5. <groupId>Test</groupId>
  6. <artifactId>test</artifactId>
  7. <version>0.0.1-SNAPSHOT</version>
  8. <packaging>jar</packaging>
  9.  
  10. <name>test</name>
  11. <url>http://maven.apache.org</url>
  12.  
  13. <properties>
  14. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  15. <encoding>UTF-8</encoding>
  16. <!-- 配置JDK為1.8 -->
  17.         <java.version>1.8</java.version>
  18.         <maven.compiler.source>1.8</maven.compiler.source>
  19.         <maven.compiler.target>1.8</maven.compiler.target>
  20. </properties>
  21.  
  22. <dependencies>
  23. <dependency>
  24. <groupId>junit</groupId>
  25. <artifactId>junit</artifactId>
  26. <version>3.8.1</version>
  27. <scope>test</scope>
  28. </dependency>
  29. </dependencies>
  30.  
  31. <build>
  32. <plugins>
  33. <!-- 配置JDK為1.8 -->
  34. <plugin>  
  35.                 <groupId>org.apache.maven.plugins</groupId>  
  36.                 <artifactId>maven-compiler-plugin</artifactId>  
  37.                 <configuration>  
  38.                     <source>1.8</source>  
  39.                     <target>1.8</target>  
  40.                 </configuration>  
  41.             </plugin>  
  42.         
  43.              <!-- 配置打包依賴包maven-assembly-plugin -->
  44. <plugin>
  45. <artifactId> maven-assembly-plugin </artifactId>
  46. <configuration>
  47. <descriptorRefs>
  48. <descriptorRef>jar-with-dependencies</descriptorRef>
  49. </descriptorRefs>
  50. <archive>
  51. <manifest>
  52. <mainClass></mainClass>
  53. </manifest>
  54. </archive>
  55. </configuration>
  56. <executions>
  57. <execution>
  58. <id>make-assembly</id>
  59. <phase>package</phase>
  60. <goals>
  61. <goal>assembly</goal>
  62. </goals>
  63. </execution>
  64. </executions>
  65. </plugin>
  66. </plugins>
  67. </build>
  68. </project>

二、配置Spark依賴包

查看spark-2.1.1-bin-hadoop2.7/jars目錄下的jar包版本

到maven遠程倉庫http://mvnrepository.com中搜索對應jar包即可。

1、配置spark-core_2.11-2.1.1.jar

往pom.xml文件中添加以下配置:

 
        
  1. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
  2. <dependency>
  3.     <groupId>org.apache.spark</groupId>
  4.     <artifactId>spark-core_2.11</artifactId>
  5.     <version>2.1.1</version>
  6.     <scope>runtime</scope>
  7. </dependency>

為了后面打包時把依賴包也一起打包,需要把<scope>provided</scope>配置成<scope>runtime</scope>

2、配置spark-streaming_2.11-2.1.1.jar

往pom.xml文件中添加以下配置:

 
        
  1.  <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.11 -->
  2. <dependency>
  3.     <groupId>org.apache.spark</groupId>
  4.     <artifactId>spark-streaming_2.11</artifactId>
  5.     <version>2.1.1</version>
  6.     <scope>runtime</scope>
  7. </dependency>

 為了后面打包時把依賴包也一起打包,需要把<scope>provided</scope>配置成<scope>runtime</scope>

三、配置Spark+Kafka

1、配置spark-streaming-kafka-0-8_2.11-2.1.1.jar

往pom.xml文件中添加以下配置:

 
        
  1. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11 -->
  2. <dependency>
  3.     <groupId>org.apache.spark</groupId>
  4.     <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
  5.     <version>2.1.1</version>
  6. </dependency>

四、pom.xml完整配置內容

 
        
  1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  2. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  3. <modelVersion>4.0.0</modelVersion>
  4.  
  5. <groupId>Test</groupId>
  6. <artifactId>test</artifactId>
  7. <version>0.0.1-SNAPSHOT</version>
  8. <packaging>jar</packaging>
  9.  
  10. <name>test</name>
  11. <url>http://maven.apache.org</url>
  12.  
  13. <properties>
  14. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  15. <encoding>UTF-8</encoding>
  16. <!-- 配置JDK為1.8 -->
  17.         <java.version>1.8</java.version>
  18.         <maven.compiler.source>1.8</maven.compiler.source>
  19.         <maven.compiler.target>1.8</maven.compiler.target>
  20. </properties>
  21.  
  22. <dependencies>
  23. <dependency>
  24. <groupId>junit</groupId>
  25. <artifactId>junit</artifactId>
  26. <version>3.8.1</version>
  27. <scope>test</scope>
  28. </dependency>
  29. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
  30. <dependency>
  31.             <groupId>org.apache.spark</groupId>
  32.             <artifactId>spark-core_2.11</artifactId>
  33.             <version>2.1.1</version>
  34.             <scope>runtime</scope>
  35. </dependency>
  36.  <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.11 -->
  37. <dependency>
  38.             <groupId>org.apache.spark</groupId>
  39.             <artifactId>spark-streaming_2.11</artifactId>
  40.             <version>2.1.1</version>
  41.             <scope>runtime</scope>
  42. </dependency>
  43. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11 -->
  44. <dependency>
  45.          <groupId>org.apache.spark</groupId>
  46.          <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
  47.          <version>2.1.1</version>
  48. </dependency>
  49. </dependencies>
  50.  
  51. <build>
  52. <plugins>
  53. <!-- 配置JDK為1.8 -->
  54. <plugin>  
  55.                 <groupId>org.apache.maven.plugins</groupId>  
  56.                 <artifactId>maven-compiler-plugin</artifactId>  
  57.                 <configuration>  
  58.                     <source>1.8</source>  
  59.                     <target>1.8</target>  
  60.                 </configuration>  
  61.             </plugin>  
  62.         
  63.              <!-- 配置打包依賴包maven-assembly-plugin -->
  64. <plugin>
  65. <artifactId> maven-assembly-plugin </artifactId>
  66. <configuration>
  67. <descriptorRefs>
  68. <descriptorRef>jar-with-dependencies</descriptorRef>
  69. </descriptorRefs>
  70. <archive>
  71. <manifest>
  72. <mainClass></mainClass>
  73. </manifest>
  74. </archive>
  75. </configuration>
  76. <executions>
  77. <execution>
  78. <id>make-assembly</id>
  79. <phase>package</phase>
  80. <goals>
  81. <goal>assembly</goal>
  82. </goals>
  83. </execution>
  84. </executions>
  85. </plugin>
  86. </plugins>
  87. </build>
  88. </project>

五、本地開發spark代碼上傳spark集群服務並運行

JavaDirectKafkaCompare.java

 
        
  1. package com.spark.main;
  2.  
  3. import java.util.HashMap;
  4. import java.util.HashSet;
  5. import java.util.Arrays;
  6. import java.util.Iterator;
  7. import java.util.Map;
  8. import java.util.Set;
  9. import java.util.regex.Pattern;
  10.  
  11. import scala.Tuple2;
  12. import kafka.serializer.StringDecoder;
  13.  
  14. import org.apache.spark.SparkConf;
  15. import org.apache.spark.api.java.function.*;
  16. import org.apache.spark.streaming.api.java.*;
  17. import org.apache.spark.streaming.kafka.KafkaUtils;
  18. import org.apache.spark.streaming.Durations;
  19.  
  20. public class JavaDirectKafkaCompare {
  21. public static void main(String[] args) throws Exception {
  22. /**
  23.  * setMaster("local[2]"),至少要指定兩個線程,一條用於用於接收消息,一條線程用於處理消息
  24.  *  Durations.seconds(2)每兩秒讀取一次kafka
  25.  */
  26.     SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount").setMaster("local[2]");
  27.     JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
  28.     /**
  29.      * checkpoint("hdfs://192.168.168.200:9000/checkpoint")防止數據丟包
  30.      */
  31.     jssc.checkpoint("hdfs://192.168.168.200:9000/checkpoint");
  32.     /**
  33.      * 配置連接kafka的相關參數      
  34.      */
  35.     Set<String> topicsSet = new HashSet<>(Arrays.asList("test"));
  36.     Map<String, String> kafkaParams = new HashMap<>();
  37.     kafkaParams.put("metadata.broker.list", "192.168.168.200:9092");
  38.  
  39.     // Create direct kafka stream with brokers and topics
  40.     JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
  41.         jssc,
  42.         String.class,
  43.         String.class,
  44.         StringDecoder.class,
  45.         StringDecoder.class,
  46.         kafkaParams,
  47.         topicsSet
  48.     );
  49.  
  50.     // Get the lines, split them into words, count the words and print
  51.     /**
  52.      * _2()獲取第二個對象的值
  53.      */
  54.     JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
  55.       @Override
  56.       public String call(Tuple2<String, String> tuple2) {
  57.         return tuple2._2();
  58.       }
  59.     });
  60.     
  61.    String sfzh = "432922196105276721";
  62.    JavaDStream<String> wordCounts = lines.filter(new Function<String, Boolean>(){
  63. @Override
  64. public Boolean call(String s) throws Exception {
  65. // TODO Auto-generated method stub
  66. /**
  67.  * 通過身份證號篩選出相關數據
  68.  */
  69. if(s.contains(sfzh)){
  70. System.out.println("比對出來的結果:" + s);
  71. return true;
  72. }
  73. return false;
  74. }
  75.    });
  76.    wordCounts.print();
  77.     // Start the computation
  78.     jssc.start();
  79.     jssc.awaitTermination();
  80. }
  81.  
  82. }

右鍵Run As ------>Maven install,運行成功之后,會在target目錄生成一個test-0.0.1-SNAPSHOT-jar-with-dependencies.jar,把該jar包復制到LInux集群環境下的SPARK_HOME/myApp目錄下:

執行命令:

 
        
  1. cd /usr/local/spark/spark-2.1.1-bin-hadoop2.7;
  2. bin/spark-submit --class "com.spark.main.JavaDirectKafkaCompare" --master local[4] myApp/test-0.0.1-SNAPSHOT-jar-with-dependencies.jar;

六、附上離線Maven倉庫

 下載地址:  鏈接:http://pan.baidu.com/s/1eS7Ywme 密碼:y3qz


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM