flink 寫入數據到hbase


hbase 依賴

 <dependency>
            <groupId>org.apache.hbase</groupId>
            <!--shaded主要是解決jar包沖突-->
            <artifactId>hbase-shaded-client</artifactId>
            <version>1.3.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.3.1</version>
        </dependency>
 def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._
    val list = ArrayBuffer[String]()
    list.append("my-key-1")
    val text = env.fromCollection(list)
    text.map((_, 1)).process(new ProcessFunction[(String, Int), String] {
      var table: Table = _

      override def open(parameters: Configuration): Unit = {
        table = HbaseUtil().connect.getTable(TableName.valueOf("test"))
      }

      override def processElement(value: (String, Int), ctx: ProcessFunction[(String, Int), String]#Context, out: Collector[String]): Unit = {
        //讀取
        val get = new Get(Bytes.toBytes(value._1))
        get.addColumn(Bytes.toBytes("a"), Bytes.toBytes("key11"))
        val result = table.get(get)
        val v = Bytes.toString(result.getValue(Bytes.toBytes("a"), Bytes.toBytes("key11")))
        println(v)
        //寫入
        val put = new Put(Bytes.toBytes("shx_" + value._2))
        put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("name"), Bytes.toBytes("lisi"))
        table.put(put)
      }

      override def close(): Unit = {
        table.close()
      }
    })


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM