Flink通過SQLClinet/Java代碼創建kafka源表,指定Offset消費,並進行實時計算,最后sink到mysql表中


需要依賴包,並放到flink/lib目錄中:
  1. flink-json-1.10.0-sql-jar.jar
  2. flink-sql-connector-kafka_2.11-1.10.0.jar

 

 

1.通過自建kafka的生產者來產生數據

/bin/kafka-console-producter.sh --broker-list 192.168.58.177:9092 --topic my_topic

數據

{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662868", "item_id":"1784", "category_id": "54123654", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662854", "item_id":"1456", "category_id": "12345678", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662858", "item_id":"1457", "category_id": "12345679", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}

2.在kafka進行消費

/bin/kafka-console-consumer.sh --bootstrap-server 192.168.58.177:9092 --topic my_topic --partition 0 --offset 0

 

 第一種:在SqlClient上進行測試

3.在Flink的sqlclient 創建表

  1. 這里需要注意,如果是指定Kafka的Offset消費,則參數 'connector.startup-mode' = 'specific-offsets',並且需要添加參數 'connector.specific-offsets' = 'partition:0,offset:27',這里需要指定分區以及從什么地方開始消費.下圖中的topic消費到27,那么我們從27開始繼續消費。

  2. 查看目前topic消費的位置

 

./kafka-consumer-groups.sh --bootstrap-server 192.168.58.177:9092 --describe --group testGroup

 

 

CREATE TABLE user_log1 (
    user_id VARCHAR,
    item_id VARCHAR,
    category_id VARCHAR,
    behavior VARCHAR,
    ts VARCHAR
) WITH (
    'connector.type' = 'kafka',
    'connector.version' = 'universal',
    'connector.topic' = 'my-topic-one',
    'connector.startup-mode' = 'earliest-offset',-- optional: valid modes are "earliest-offset","latest-offset", "group-offsets",or "specific-offsets"
    'connector.properties.group.id' = 'testGroup',
    'connector.properties.zookeeper.connect' = '192.168.58.171:2181,192.168.58.177:2181,192.168.58.178:2181',
    'connector.properties.bootstrap.servers' = '192.168.58.177:9092',
    'format.type' = 'json'
);

 

實時計算 

select item_id,count(*) from user_log1 group by item_id;

 第二種,在java 項目中進行測試

代碼:

    public static void main(String[] args) throws Exception {

        //創建flink運行環境
        StreamExecutionEnvironment Env = StreamExecutionEnvironment.getExecutionEnvironment();

        //創建tableEnvironment
        StreamTableEnvironment TableEnv =  StreamTableEnvironment.create(Env);

        TableEnv.sqlUpdate("CREATE TABLE user_log1 (\n" +
                "    user_id VARCHAR,\n" +
                "    item_id VARCHAR,\n" +
                "    category_id VARCHAR,\n" +
                "    behavior VARCHAR,\n" +
                "    ts VARCHAR\n" +
                ") WITH (\n" +
                "    'connector.type' = 'kafka',\n" +
                "    'connector.version' = 'universal',\n" +
                "    'connector.topic' = 'my-topic-one',\n" +
                "    'connector.startup-mode' = 'earliest-offset',\n" + //optional: valid modes are "earliest-offset","latest-offset", "group-offsets",or "specific-offsets"
                "    'connector.properties.group.id' = 'testGroup',\n" +
                "    'connector.properties.zookeeper.connect' = '192.168.58.171:2181,192.168.58.177:2181,192.168.58.178:2181',\n" +
                "    'connector.properties.bootstrap.servers' = '192.168.58.177:9092',\n" +
                "    'format.type' = 'json'\n" +
                ")"
        ) ;

         Table result=TableEnv.sqlQuery("select user_id,count(*) from user_log1 group by user_id");
         //TableEnv.toAppendStream(result, Types.TUPLE(Types.INT,Types.LONG)).print();

         TableEnv.toRetractStream(result,Types.TUPLE(Types.STRING,Types.LONG)) .print();

         Env.execute("flink job");

    }

執行結果:

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM