kafka + spark Streaming + Tranquility Server發送數據到druid


  花了很長時間嘗試druid官網上說的Tranquility嵌入代碼進行實時發送數據到druid,結果失敗了,各種各樣的原因造成了失敗,現在還沒有找到原因,在IDEA中可以跑起,放到線上就死活不行,有成功了的同仁希望貼個鏈接供我來學習學習;后來又嘗試了從kafka實時發送到druid,還是有些錯誤(現在已經解決, 后面再記錄一下);最后沒辦法呀,使用Tranquility Server唄 _ _!

Tranquility Server的配置和啟動請移步:https://github.com/druid-io/tranquility/blob/master/docs/server.md

(一)、在啟動了自己定制的server之后可以利用druid bin目錄下的generate-example-metrics生成測試數據 (定制的server.json如下)

server.json的配置

{ "dataSources" : { "reynold_metrics" : { "spec" : { "dataSchema" : { "dataSource" : "reynold_metrics", "parser" : { "type" : "string", "parseSpec" : { "timestampSpec" : { "column" : "timestamp", "format" : "auto" }, "dimensionsSpec" : { "dimensions" : [], "dimensionExclusions" : [ "timestamp", "value" ] }, "format" : "json" } }, "granularitySpec" : { "type" : "uniform", "segmentGranularity" : "hour", "queryGranularity" : "none" }, "metricsSpec" : [ { "type" : "count", "name" : "count" }, { "name" : "value_sum", "type" : "doubleSum", "fieldName" : "value" }, { "fieldName" : "value", "name" : "value_min", "type" : "doubleMin" }, { "type" : "doubleMax", "name" : "value_max", "fieldName" : "value" } ] },
     "tuningConfig" : { "type" : "realtime", "maxRowsInMemory" : "100000", "intermediatePersistPeriod" : "PT10M", "windowPeriod" : "PT10M" } }, "properties" : { "task.partitions" : "1", "task.replicants" : "1" } } }, "properties" : { "zookeeper.connect" : "reynold-master:2181,reynold-slave02:2181,reynold-slave03:2181", "druid.discovery.curator.path" : "/druid/discovery", "druid.selectors.indexing.serviceName" : "druid/overlord", "http.port" : "8200", "http.threads" : "16" } }

(二)、創建kafka的topic並往里面發送數據

刪除topic:kafka-topics  --delete --topic reynold --zookeeper localhost:2181
創建topic:kafka-topics  --create --topic reynold --zookeeper localhost:2181 --partitions 10 --replication-factor 1
消費數據:kafka-console-consumer --topic reynold --zookeeper localhost:2181 --from-beginning
生產數據:kafka-console-producer --broker-list reynold-master:9092 --topic reynold

{"count": 1, "value_min": 74.0, "timestamp": "2017-03-09T02:34:24.000Z", "value_max": 74.0, "metricType": "request/latency", "server": "www5.example.com", "http_method": "GET", "value_sum": 74.0, "http_code": "200", "unit": "milliseconds", "page": "/"}
{"count": 1, "value_min": 75.0, "timestamp": "2017-03-09T02:34:24.000Z", "value_max": 75.0, "metricType": "request/latency", "server": "www5.example.com", "http_method": "GET", "value_sum": 75.0, "http_code": "200", "unit": "milliseconds", "page": "/list"}
{"count": 1, "value_min": 143.0, "timestamp": "2017-03-09T02:38:06.000Z", "value_max": 143.0, "metricType": "request/latency", "server": "www2.example.com", "http_method": "GET", "value_sum": 143.0, "http_code": "200", "unit": "milliseconds", "page": "/"}

 (三)、使用spark streaming消費kafka中的數據並通過http發送到druid

object SparkDruid {

  val kafkaParam = Map[String, String](
    "metadata.broker.list" -> "reynold-master:9092,reynold-slave01:9092,reynold-slave02:9092,reynold-slave03:9092",
    "auto.offset.reset" -> "smallest"
  )

  def main(args: Array[String]): Unit = {
    val sparkContext = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("SparkDruid"))
    val ssc = new StreamingContext(sparkContext, Seconds(3))
    val topic: String = "reynold" //消費的 topic 名字
    val topics: Set[String] = Set(topic) //創建 stream 時使用的 topic 名字集合

    var kafkaStream: InputDStream[(String, String)] = null

    kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParam, topics)

    kafkaStream.map(msg => msg._2).foreachRDD { rdd =>
      rdd.foreach(strJson => Https.post("http://reynold-master:8200/v1/post/zcx_metrics", strJson))
    }

    ssc.start()
    ssc.awaitTermination()
  }
}

Https類如下:

import java.io.InputStreamReader

import com.google.common.io.CharStreams
import org.apache.http.client.methods.{HttpGet, HttpPost}
import org.apache.http.entity.StringEntity
import org.apache.http.impl.client.HttpClients

/**  * 通過http請求的方式,可以
  * 1. 向druid里面發送數據
  * 2. 提供一些查詢的druid的方法
  * 3. 順帶查詢hbase數據的方法
  */
object Https {
  private val httpClient = HttpClients.createDefault()

  def get(url: String): String = {
    val _get = new HttpGet(url)
    val resp = httpClient.execute(_get)
    try {
      if (resp.getStatusLine.getStatusCode != 200) {
        throw new RuntimeException("error: " + resp.getStatusLine)
      }
      CharStreams.toString(new InputStreamReader(resp.getEntity.getContent))
    } finally {
      resp.close()
    }
  }

 //既可以發送數據,也可以請求數據(以結果的形式返回)
  def post(url: String, content: String): String = {
    val _post = new HttpPost(url)
    _post.setHeader("Content-Type", "application/json")
    _post.setEntity(new StringEntity(content,"utf-8"))
    val resp = httpClient.execute(_post)
    try {
      CharStreams.toString(new InputStreamReader(resp.getEntity.getContent))
    } finally {
      resp.close()
    }

  }

  object MapTypeRef extends com.fasterxml.jackson.core.`type`.TypeReference[Map[String, Any]]

  object ListMapTypeRef extends com.fasterxml.jackson.core.`type`.TypeReference[List[Map[String, Any]]]

  def queryHBase(sql: String): List[Map[String, Any]] = {
  //將request為json格式 val request
= new String(Mapper.mapper.writeValueAsBytes(Map( "action" -> "query", "sql" -> sql )))
  //發送json格式的請求 val resp
= post("http://reynold-master:8209/api", request) val rs = Mapper.mapper.readValue[Map[String, Any]](resp, MapTypeRef) //val rs = Mapper.mapper.readValue[Map[String, Any]](resp, classOf[Map[String, Any]]) 這種方式也可以 rs("result").asInstanceOf[List[Map[String, Any]]] } def queryDruid(json: String): String = { post("http://reynold-master:18082/druid/v2", json) } private def getDruid(path: String): String = { get("http://reynold-master:18082/druid/v2" + path) } def druidDataSources(): List[String] = { Mapper.mapper.readValue[List[String]](getDruid("/datasources"), ListMapTypeRef) } def druidDimension(datasource: String): String = { getDruid(s"/datasources/$datasource/dimensions") } def druidMetrics(datasource: String): String = { getDruid(s"/datasources/$datasource/metrics") } def main(args: Array[String]): Unit = { println(queryHBase("select * from user_tags limit 2")) } }

Mapper.scala

package com.donews.util

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule


/**
  * Created by reynold
  */

object Mapper {
  val mapper = new ObjectMapper()
  mapper.registerModule(DefaultScalaModule)
}

注意在pom文件中添加下面的依賴:

<dependency>
    <groupId>org.apache.httpcomponents</groupId>
    <artifactId>httpclient</artifactId>
    <version>4.3.3</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.module</groupId>
    <artifactId>jackson-module-scala_2.11</artifactId>
    <version>2.4.5</version>
</dependency>

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM