1、配置文件config.properties
# Kafka配置 kafka.broker.list = hadoop300:9092,hadoop301:9092,hadoop302:9092 # Redis配置 redis.host=hadoop300 redis.port=6379
2、讀取Properties
package com.duoduo.realtime.utils import java.io.InputStreamReader import java.util.Properties /** * Author z * Date 2020-08-27 10:04:21 */ object PropertiesUtil { def main(args: Array[String]): Unit = { val properties: Properties = PropertiesUtil.load("config.properties") println(properties.getProperty("kafka.broker.list")) } def load(propertiesName: String) = { val p=new Properties() p.load(new InputStreamReader( Thread.currentThread().getContextClassLoader .getResourceAsStream(propertiesName) , "UTF-8")) p } }
3、POM文件依賴
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>dw-stream</artifactId> <groupId>com.duoduo</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>dw-realtime</artifactId> <properties> <spark.version>2.4.0</spark.version> <scala.version>2.11.8</scala.version> <kafka.version>1.0.0</kafka.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.56</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.9.0</version> </dependency> <dependency> <groupId>org.apache.phoenix</groupId> <artifactId>phoenix-spark</artifactId> <version>4.14.2-HBase-1.3</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>io.searchbox</groupId> <artifactId>jest</artifactId> <version>5.3.3</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>net.java.dev.jna</groupId> <artifactId>jna</artifactId> <version>4.5.2</version> </dependency> <dependency> <groupId>org.codehaus.janino</groupId> <artifactId>commons-compiler</artifactId> <version>2.7.8</version> </dependency> </dependencies> <build> <plugins> <!-- 該插件用於將Scala代碼編譯成class文件 --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.4.6</version> <executions> <execution> <!-- 聲明綁定到maven的compile階段 --> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.0.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
4、工具類
1 package com.duoduo.realtime.utils 2 3 import java.util.Properties 4 5 import org.apache.kafka.clients.consumer.ConsumerRecord 6 import org.apache.kafka.common.TopicPartition 7 import org.apache.kafka.common.serialization.StringDeserializer 8 import org.apache.spark.streaming.StreamingContext 9 import org.apache.spark.streaming.dstream.InputDStream 10 import org.apache.spark.streaming.kafka010.{ConsumerStrategies, ConsumerStrategy, KafkaUtils, LocationStrategies} 11 12 /** 13 * Author z 14 * Date 2020-08-27 10:02:21 15 */ 16 object KafkaUtil { 17 private val properties: Properties = PropertiesUtil.load("config.properties") 18 val broker_list = properties.getProperty("kafka.broker.list") 19 var kafkaParam = collection.mutable.Map( 20 "bootstrap.servers" -> broker_list, //用於初始化鏈接到集群的地址 21 "key.deserializer" -> classOf[StringDeserializer], 22 "value.deserializer" -> classOf[StringDeserializer], 23 //用於標識這個消費者屬於哪個消費團體 24 "group.id" -> "gmall_consumer_group", 25 //如果沒有初始化偏移量或者當前的偏移量不存在任何服務器上,可以使用這個配置屬性 26 //可以使用這個配置,latest自動重置偏移量為最新的偏移量 27 "auto.offset.reset" -> "latest", 28 //如果是true,則這個消費者的偏移量會在后台自動提交,但是kafka宕機容易丟失數據 29 //如果是false,會需要手動維護kafka偏移量 30 "enable.auto.commit" -> (false: java.lang.Boolean) 31 ) 32 33 def getKafkaStream(topic: String, ssc: StreamingContext) 34 : InputDStream[ConsumerRecord[String, String]] = { 35 val dStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils 36 .createDirectStream[String, String]( 37 ssc, 38 LocationStrategies.PreferConsistent, 39 ConsumerStrategies.Subscribe[String, String](Array(topic), kafkaParam) 40 ) 41 dStream 42 } 43 44 def getKafkaStream(topic: String, ssc: StreamingContext, groupid: String) 45 : InputDStream[ConsumerRecord[String, String]] = { 46 kafkaParam("group.id") = groupid 47 val dStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils 48 .createDirectStream[String, String]( 49 ssc, 50 LocationStrategies.PreferConsistent, 51 ConsumerStrategies.Subscribe[String, String](Array(topic), kafkaParam) 52 ) 53 dStream 54 } 55 56 def getKafkaStream(topic: String, 57 ssc: StreamingContext, 58 offsets: Map[TopicPartition, Long] 59 , groupid: String) 60 : InputDStream[ConsumerRecord[String, String]] = { 61 kafkaParam("group.id") = groupid 62 val dStream: ConsumerStrategy[String, String] = ConsumerStrategies 63 .Subscribe[String, String](Array(topic), kafkaParam, offsets) 64 KafkaUtils.createDirectStream[String, String]( 65 ssc, 66 LocationStrategies.PreferConsistent, 67 dStream 68 ) 69 } 70 }