使用python編寫Spark Streaming實時處理Kafka數據的程序,需要熟悉Spark工作機制和Kafka原理。
1 配置Spark開發Kafka環境
首先點擊下載spark-streaming-kafka,下載Spark連接Kafka的代碼庫。然后把下載的代碼庫放到目錄/opt/spark/spark-2.4.0-bin-hadoop2.7/jars目錄下,命令如下:
sudo mv ~/下載/spark-streaming-kafka-0-10_2.11-2.4.0.jar /opt/spark/spark-2.4.0-bin-hadoop2.7/jars
然后在/opt/spark/spark-2.4.0-bin-hadoop2.7/jars目錄下新建kafka目錄,把/opt/kafka/kafka_2.11-0.10.2.2/libs下所有函數庫復制到/opt/spark/spark-2.4.0-bin-hadoop2.7/jars/kafka目錄下
把 Kafka 相關 jar 包的路徑信息增加到 spark-env.sh,修改后的 spark-env.sh 類似如下:
export SPARK_DIST_CLASSPATH=$(/opt/hadoop/hadoop-2.7.6/bin/hadoop classpath):/opt/spark/spark-2.4.0-bin-hadoop2.7/jars/kafka/*:/opt/kafka/kafka_2.11-0.10.2.2/libs/*
因為我使用的python3版本,而spark默認使用的是python2,所以介紹一下,怎么為spark設置python環境。
要改兩個地方,一個是conf目錄下的spark_env.sh:在這個文件的開頭添加:
export PYSPARK_PYTHON=/root/anaconda3/bin/python3.7
這里的python3.7是我本地的使用版本。正常來說,python3的版本對於本實驗都可以運行。
第二個地方要修改,/usr/local/spark/bin/pyspark這個文件。參照以下修改這個地方:
# Determine the Python executable to use for the executors:
if [[ -z "$PYSPARK_PYTHON" ]]; then
if [[ $PYSPARK_DRIVER_PYTHON == *ipython* && ! $WORKS_WITH_IPYTHON ]]; then
echo "IPython requires Python 2.7+; please install python2.7 or set PYSPARK_PYTHON" 1>&2
exit 1
else
PYSPARK_PYTHON=python
fi
fi
export PYSPARK_PYTHON
把上面的 PYSPARK_PYTHON=python改成 PYSPARK_PYTHON=python3.7。
把spark-streaming-kafka-0-8-assembly_2.11-2.4.0.jar放在/opt/kafka/kafka_2.11-0.10.2.2/libs。
2 建立pyspark項目
把執行代碼放在/opt/spark/spark-2.4.0-bin-hadoop2.7/bin
#sparkstreaming_kafka.py
"""
Created on Mon Mar 16 14:23:45 2020
@author: yoyoyo
"""
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from operator import add
def start():
sc = SparkContext(master="local[2]", appName="PythonSparkStreaming")
ssc = StreamingContext(sc, 2)
zkQuorum = 'xx.xx.xx.xx:2181,xx.xx.xx.xx:2181,xx.xx.xx.xx:2181'
topic = {'test': 1}
groupid = "testConsumer"
kvs = KafkaUtils.createStream(ssc, zkQuorum, groupid, topic)
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" "))
pairs = counts.map(lambda word: (word, 1))
wordCount = pairs.reduceByKey(lambda a, b: a+b)
wordCount.pprint()
ssc.start()
ssc.awaitTermination()
if __name__ == '__main__':
start()
3 執行命令運行程序
(base) root@node3:/opt/spark/spark-2.4.0-bin-hadoop2.7/bin# spark-submit --jars /opt/kafka/kafka_2.11-0.10.2.2/libs/spark-streaming-kafka-0-8-assembly_2.11-2.4.0.jar sparkstreaming_kafka.py 2>error.log
將日志保存在error.log文件中,執行命令運行程序:
(base) root@node3:/opt/spark/spark-2.4.0-bin-hadoop2.7/bin# spark-submit --jars /opt/kafka/kafka_2.11-0.10.2.2/libs/spark-streaming-kafka-0-8-assembly_2.11-2.4.0.jar sparkstreaming_kafka.py > error.log 2>&1 &
