概述
環境說明
scala: 2.12.8 linux下scala安裝部署
flink : 1.8.1 Flink1.8.1 集群部署
kafka_2.12-2.2.0 kafka_2.12-2.2.0 集群部署
hbase 2.1 hbase 2.1 環境搭建–完全分布式模式 Advanced - Fully Distributed
hadoop Hadoop 2.8.5 完全分布式HA高可用安裝(二)–環境搭建
引入依賴
<dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>2.1.5</version> </dependency> <dependency> <groupId>org.apache.phoenix</groupId> <artifactId>phoenix-core</artifactId> <version>5.0.0-HBase-2.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.8.1</version> </dependency>
使用flink讀取kafka的數據消息
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(1000); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "node1:9092"); FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("my-test-topic", new SimpleStringSchema(), properties); //從最早開始消費 consumer.setStartFromEarliest(); DataStream<String> stream = env.addSource(consumer); stream.print(); //stream.map(); env.execute(); }
啟動服務:
- 啟動hadoop集群
- 啟動hbase集群
- 啟動kafka集群
- 啟動flink
執行上述main方法,該main方法會一直監控kafka集群消息。
我們啟動kafka客戶端來發送幾條消息
./kafka-console-producer.sh --broker-list node1:9092 --topic my-test-topic >111111 >2222
可以看到java程序控制台輸出
4> 111111 4> 2222
寫入hbase
編寫process來完成寫入hbase的操作
import lombok.extern.slf4j.Slf4j; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; @Slf4j public class HbaseProcess extends ProcessFunction<String, String> { private static final long serialVersionUID = 1L; private Connection connection = null; private Table table = null; @Override public void open(org.apache.flink.configuration.Configuration parameters) throws Exception { try { // 加載HBase的配置 Configuration configuration = HBaseConfiguration.create(); // 讀取配置文件 configuration.addResource(new Path(ClassLoader.getSystemResource("hbase-site.xml").toURI())); configuration.addResource(new Path(ClassLoader.getSystemResource("core-site.xml").toURI())); connection = ConnectionFactory.createConnection(configuration); TableName tableName = TableName.valueOf("test"); // 獲取表對象 table = connection.getTable(tableName); log.info("[HbaseSink] : open HbaseSink finished"); } catch (Exception e) { log.error("[HbaseSink] : open HbaseSink faild {}", e); } } @Override public void close() throws Exception { log.info("close..."); if (null != table) table.close(); if (null != connection) connection.close(); } @Override public void processElement(String value, Context ctx, Collector<String> out) throws Exception { try { log.info("[HbaseSink] value={}", value); //row1:cf:a:aaa String[] split = value.split(":"); // 創建一個put請求,用於添加數據或者更新數據 Put put = new Put(Bytes.toBytes(split[0])); put.addColumn(Bytes.toBytes(split[1]), Bytes.toBytes(split[2]), Bytes.toBytes(split[3])); table.put(put); log.error("[HbaseSink] : put value:{} to hbase", value); } catch (Exception e) { log.error("", e); } } }
然后將上面main方法中的stream.print();
改為:
stream.process(new HbaseProcess());
運行main方法,然后在kafka控制台發送一條消息row1:cf:a:aaa
。
到hbase 的shell控制台查看test表數據:
hbase(main):012:0> scan 'test' ROW COLUMN+CELL row1 column=cf:a, timestamp=1563880584014, value=aaa row1 column=cf:age, timestamp=1563779499842, value=12 row2 column=cf:a, timestamp=1563451278532, value=value2a row2 column=cf:age, timestamp=1563779513308, value=13 row2 column=cf:b, timestamp=1563441738877, value=value2 row3 column=cf:c, timestamp=1563441741609, value=value3
上面第一行aaa就是我們新插入的數據。
當然除了process,也可以使用sink,編寫HbaseSink類
import lombok.extern.slf4j.Slf4j; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; @Slf4j public class HbaseSink implements SinkFunction<String> { @Override public void invoke(String value, Context context) throws Exception { Connection connection = null; Table table = null; try { // 加載HBase的配置 Configuration configuration = HBaseConfiguration.create(); // 讀取配置文件 configuration.addResource(new Path(ClassLoader.getSystemResource("hbase-site.xml").toURI())); configuration.addResource(new Path(ClassLoader.getSystemResource("core-site.xml").toURI())); connection = ConnectionFactory.createConnection(configuration); TableName tableName = TableName.valueOf("test"); // 獲取表對象 table = connection.getTable(tableName); //row1:cf:a:aaa String[] split = value.split(":"); // 創建一個put請求,用於添加數據或者更新數據 Put put = new Put(Bytes.toBytes(split[0])); put.addColumn(Bytes.toBytes(split[1]), Bytes.toBytes(split[2]), Bytes.toBytes(split[3])); table.put(put); log.error("[HbaseSink] : put value:{} to hbase", value); } catch (Exception e) { log.error("", e); } finally { if (null != table) table.close(); if (null != connection) connection.close(); } } }
然后修改main方法代碼,運行效果一樣的。具體區別后續再分析。
// stream.print(); // stream.process(new HbaseProcess()); stream.addSink(new HbaseSink());