hbase 依賴
<dependency>
<groupId>org.apache.hbase</groupId>
<!--shaded主要是解決jar包沖突-->
<artifactId>hbase-shaded-client</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.3.1</version>
</dependency>
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val list = ArrayBuffer[String]()
list.append("my-key-1")
val text = env.fromCollection(list)
text.map((_, 1)).process(new ProcessFunction[(String, Int), String] {
var table: Table = _
override def open(parameters: Configuration): Unit = {
table = HbaseUtil().connect.getTable(TableName.valueOf("test"))
}
override def processElement(value: (String, Int), ctx: ProcessFunction[(String, Int), String]#Context, out: Collector[String]): Unit = {
//讀取
val get = new Get(Bytes.toBytes(value._1))
get.addColumn(Bytes.toBytes("a"), Bytes.toBytes("key11"))
val result = table.get(get)
val v = Bytes.toString(result.getValue(Bytes.toBytes("a"), Bytes.toBytes("key11")))
println(v)
//寫入
val put = new Put(Bytes.toBytes("shx_" + value._2))
put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("name"), Bytes.toBytes("lisi"))
table.put(put)
}
override def close(): Unit = {
table.close()
}
})