【Flink系列零】構建實時計算平台——FlinkSQL 作業菜鳥筆記


前言

因為最近的需求是做FlinkSQL平台,需要在實時計算平台上集成FlinkSQL功能,但目前剛剛有了研究成果,所以有了這篇筆記。

第一步:編寫一個流

這里使用python編寫的一個流,比Java簡潔。

需要注意的是 pip install kakfa-python,不能是 pip install kafka。

這里生產的集群是SCRAM加密的,所以配置會多一些。

有一個單詞本,words.txt就是一些英文單詞,一行一個。

這個Producer每5秒產生一個記錄,以JSON形式發布到流。

from kafka import KafkaProducer
import json
import random
import time
import sys

if __name__ == '__main__':
    producer = KafkaProducer(
        bootstrap_servers="kafka1211.slannka.com:9194",
        key_serializer=lambda v: str.encode if v is not None else None,
        value_serializer=lambda v: v.encode('utf-8') if v is not None else None,
        security_protocol="SASL_PLAINTEXT",
        sasl_mechanism="SCRAM-SHA-256",
        sasl_plain_username="slankkaCopyrightReserved",
        sasl_plain_password="passwordOfUsername",
        api_version=(2, 2, 1)
    )

    count = 0
    thefile = open("data/words.txt", "rb")
    while True:
        buffer = thefile.read(1024 * 8192)
        if not buffer:
            break
        count += buffer.count('\n'.encode())

    thefile.close()

    textfile = open("data/words.txt", "r")
    lines = textfile.readlines()  # 讀取全部內容 ,並以列表方式返回

    while True:
        # initial values in each loop
        offset = 0
        word = None
        # get a random value represents a word
        randint = random.randint(0, count)
        print("total: ", count, ", randInt: ", randint)

        for line in lines:
            if offset == randint:
                word = line.strip()
                break
            offset += 1
        val = {
            "word": word,
            "len": len(word)
        }
        value = json.dumps(val)
        print("sending:", value)
        producer.send("test_enc_putong", value)
        print("send finished..(wait 5s.)")
        time.sleep(5.0)


    producer.close(3000)
    textfile.close()

第二步:編寫FLINKSQL

create table WordCountTab (
    `word` STRING,
    `len` INT,
  # `ts` TIMESTAMP(3) METADATA FROM 'timestamp' #這一行不支持則可以去掉
) with (
    'connector' = 'kafka',
    'topic' = 'test_slankka',
    'properties.bootstrap.servers' = 'xxxxx.xxxxx.xxxxxx.com:9194',
    'properties.group.id' = 'test_flinksql_consumer',
    'format' = 'json',
    'scan.startup.mode' = 'earliest-offset',
    'properties.sasl.jaas.config'= 'org.apache.kafka.common.security.scram.ScramLoginModule required username="slankkaCopyrightReserved" password="passwordOfUsername";',
    'properties.sasl.mechanism' = 'SCRAM-SHA-256',
    'properties.security.protocol' = 'SASL_PLAINTEXT'
);

create table WordCountSink (
   word STRING,
   len INT
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://mysql1211.slankkaCorps.com:3306/rtc',
   'table-name' = 'flink_sink_test',
   'username' = 'root',
   'password' = 'root'
);

INSERT INTO WordCountSink
SELECT word, len FROM WordCountTab;

執行即可,生成一個Flink JOB,這個任務會不斷得寫<word,len>到Mysql中。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM