Spark Streaming實時處理Kafka數據


使用python編寫Spark Streaming實時處理Kafka數據的程序,需要熟悉Spark工作機制和Kafka原理。

1 配置Spark開發Kafka環境

首先點擊下載spark-streaming-kafka,下載Spark連接Kafka的代碼庫。然后把下載的代碼庫放到目錄/opt/spark/spark-2.4.0-bin-hadoop2.7/jars目錄下,命令如下:

sudo mv ~/下載/spark-streaming-kafka-0-10_2.11-2.4.0.jar /opt/spark/spark-2.4.0-bin-hadoop2.7/jars

然后在/opt/spark/spark-2.4.0-bin-hadoop2.7/jars目錄下新建kafka目錄,把/opt/kafka/kafka_2.11-0.10.2.2/libs下所有函數庫復制到/opt/spark/spark-2.4.0-bin-hadoop2.7/jars/kafka目錄下

把 Kafka 相關 jar 包的路徑信息增加到 spark-env.sh,修改后的 spark-env.sh 類似如下:

export SPARK_DIST_CLASSPATH=$(/opt/hadoop/hadoop-2.7.6/bin/hadoop classpath):/opt/spark/spark-2.4.0-bin-hadoop2.7/jars/kafka/*:/opt/kafka/kafka_2.11-0.10.2.2/libs/*

因為我使用的python3版本,而spark默認使用的是python2,所以介紹一下,怎么為spark設置python環境。
要改兩個地方,一個是conf目錄下的spark_env.sh:在這個文件的開頭添加:

export PYSPARK_PYTHON=/root/anaconda3/bin/python3.7

這里的python3.7是我本地的使用版本。正常來說,python3的版本對於本實驗都可以運行。
第二個地方要修改,/usr/local/spark/bin/pyspark這個文件。參照以下修改這個地方:

# Determine the Python executable to use for the executors:
if [[ -z "$PYSPARK_PYTHON" ]]; then
  if [[ $PYSPARK_DRIVER_PYTHON == *ipython* && ! $WORKS_WITH_IPYTHON ]]; then
    echo "IPython requires Python 2.7+; please install python2.7 or set PYSPARK_PYTHON" 1>&2
    exit 1
  else
    PYSPARK_PYTHON=python
  fi
fi
export PYSPARK_PYTHON

把上面的 PYSPARK_PYTHON=python改成 PYSPARK_PYTHON=python3.7

spark-streaming-kafka-0-8-assembly_2.11-2.4.0.jar放在/opt/kafka/kafka_2.11-0.10.2.2/libs

2 建立pyspark項目

把執行代碼放在/opt/spark/spark-2.4.0-bin-hadoop2.7/bin

#sparkstreaming_kafka.py

"""
Created on Mon Mar 16 14:23:45 2020

@author: yoyoyo
"""
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from operator import add

def start():
    sc = SparkContext(master="local[2]", appName="PythonSparkStreaming")
    ssc = StreamingContext(sc, 2)
    zkQuorum = 'xx.xx.xx.xx:2181,xx.xx.xx.xx:2181,xx.xx.xx.xx:2181'
    topic = {'test': 1}
    groupid = "testConsumer"
    kvs = KafkaUtils.createStream(ssc, zkQuorum, groupid, topic)
    lines = kvs.map(lambda x: x[1])
    counts = lines.flatMap(lambda line: line.split(" "))
    pairs = counts.map(lambda word: (word, 1))
    wordCount = pairs.reduceByKey(lambda a, b: a+b)
    wordCount.pprint()

    ssc.start()
    ssc.awaitTermination()

if __name__ == '__main__':
    start()

3 執行命令運行程序

(base) root@node3:/opt/spark/spark-2.4.0-bin-hadoop2.7/bin# spark-submit --jars /opt/kafka/kafka_2.11-0.10.2.2/libs/spark-streaming-kafka-0-8-assembly_2.11-2.4.0.jar sparkstreaming_kafka.py 2>error.log

將日志保存在error.log文件中,執行命令運行程序:

(base) root@node3:/opt/spark/spark-2.4.0-bin-hadoop2.7/bin# spark-submit --jars /opt/kafka/kafka_2.11-0.10.2.2/libs/spark-streaming-kafka-0-8-assembly_2.11-2.4.0.jar sparkstreaming_kafka.py > error.log 2>&1 &


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM