前言
因為最近的需求是做FlinkSQL平台,需要在實時計算平台上集成FlinkSQL功能,但目前剛剛有了研究成果,所以有了這篇筆記。
第一步:編寫一個流
這里使用python編寫的一個流,比Java簡潔。
需要注意的是 pip install kakfa-python,不能是 pip install kafka。
這里生產的集群是SCRAM加密的,所以配置會多一些。
有一個單詞本,words.txt就是一些英文單詞,一行一個。
這個Producer每5秒產生一個記錄,以JSON形式發布到流。
from kafka import KafkaProducer
import json
import random
import time
import sys
if __name__ == '__main__':
producer = KafkaProducer(
bootstrap_servers="kafka1211.slannka.com:9194",
key_serializer=lambda v: str.encode if v is not None else None,
value_serializer=lambda v: v.encode('utf-8') if v is not None else None,
security_protocol="SASL_PLAINTEXT",
sasl_mechanism="SCRAM-SHA-256",
sasl_plain_username="slankkaCopyrightReserved",
sasl_plain_password="passwordOfUsername",
api_version=(2, 2, 1)
)
count = 0
thefile = open("data/words.txt", "rb")
while True:
buffer = thefile.read(1024 * 8192)
if not buffer:
break
count += buffer.count('\n'.encode())
thefile.close()
textfile = open("data/words.txt", "r")
lines = textfile.readlines() # 讀取全部內容 ,並以列表方式返回
while True:
# initial values in each loop
offset = 0
word = None
# get a random value represents a word
randint = random.randint(0, count)
print("total: ", count, ", randInt: ", randint)
for line in lines:
if offset == randint:
word = line.strip()
break
offset += 1
val = {
"word": word,
"len": len(word)
}
value = json.dumps(val)
print("sending:", value)
producer.send("test_enc_putong", value)
print("send finished..(wait 5s.)")
time.sleep(5.0)
producer.close(3000)
textfile.close()
第二步:編寫FLINKSQL
create table WordCountTab (
`word` STRING,
`len` INT,
# `ts` TIMESTAMP(3) METADATA FROM 'timestamp' #這一行不支持則可以去掉
) with (
'connector' = 'kafka',
'topic' = 'test_slankka',
'properties.bootstrap.servers' = 'xxxxx.xxxxx.xxxxxx.com:9194',
'properties.group.id' = 'test_flinksql_consumer',
'format' = 'json',
'scan.startup.mode' = 'earliest-offset',
'properties.sasl.jaas.config'= 'org.apache.kafka.common.security.scram.ScramLoginModule required username="slankkaCopyrightReserved" password="passwordOfUsername";',
'properties.sasl.mechanism' = 'SCRAM-SHA-256',
'properties.security.protocol' = 'SASL_PLAINTEXT'
);
create table WordCountSink (
word STRING,
len INT
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://mysql1211.slankkaCorps.com:3306/rtc',
'table-name' = 'flink_sink_test',
'username' = 'root',
'password' = 'root'
);
INSERT INTO WordCountSink
SELECT word, len FROM WordCountTab;
執行即可,生成一個Flink JOB,這個任務會不斷得寫<word,len>到Mysql中。