一、相關配置參數
1.同級目錄resource文件夾下配置
brokers_list=kafkaxxx02broker01:9092,kafkaxxx02broker02:9092,kafkaxxx02broker03:9092
2.topic:
last_member_info
3.流程
從hive表中讀取相關字段,封裝成json格式,拋kafka
二、相關代碼(scala)
package kafka import java.io.InputStream import java.text.SimpleDateFormat import java.util.{Date, HashMap, Properties} import com.google.gson.JsonObject import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession object DakaMemProducer { val prop = new Properties() val is: InputStream = this.getClass().getResourceAsStream("/conf.properties") prop.load(is) val environment_broker_list = "brokers_list"; private val brokers = prop.getProperty(environment_broker_list) // Zookeeper connection properties private val props = new HashMap[String, Object]() props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") private val producer = new KafkaProducer[String, String](this.props) def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("DakaMemProducer") val spark = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate() val date = new Date(new Date().getTime - 86400000L) val dateFormat = new SimpleDateFormat("yyyyMMdd") val statisDate = dateFormat.format(date) val querySql1 = "select member_id,flag,nick_name,nick_type from xxx_db.xxx_table where statis_date = " + statisDate // 離線數據 val resultDF1 = spark.sql(querySql1) resultDF1.rdd.foreach(row => { val member_id: String = row.getAs[String]("member_id").toString() val flag: String = row.getAs[String]("flag").toString() val nick_name: String = row.getAs[String]("nick_name").toString() val nick_type: String = row.getAs[String]("nick_type").toString() val json = new JsonObject() json.addProperty("memberId", member_id).toString json.addProperty("flag", flag).toString json.addProperty("nickName", nick_name).toString json.addProperty("nickType", nick_type).toString kafkaProducerSend(json.toString) }) if(!environment_broker_list.contains("prd")){ resultDF1.show(100) } def kafkaProducerSend(args: String) { if (args != null) { val topic = "last_member_info" val message = new ProducerRecord[String, String](topic, null, args) producer.send(message) } } } }