Spark-python入門


Hadoop是對大數據集進行分布式計算的標准工具,這也是為什么當你穿過機場時能看到”大數據(Big Data)”廣告的原因。它已經成為大數據的操作系統,提供了包括工具和技巧在內的豐富生態系統,允許使用相對便宜的商業硬件集群進行超級計算機級別的計算。2003和2004年,兩個來自Google的觀點使Hadoop成為可能:一個分布式存儲框架(Google文件系統),在Hadoop中被實現為HDFS;一個分布式計算框架(MapReduce)。

這兩個觀點成為過去十年規模分析(scaling analytics)、大規模機器學習(machine learning),以及其他大數據應用出現的主要推動力!但是,從技術角度上講,十年是一段非常長的時間,而且Hadoop還存在很多已知限制,尤其是MapReduce。對MapReduce編程明顯是困難的。對大多數分析,你都必須用很多步驟將Map和Reduce任務串接起來。這造成類SQL的計算或機器學習需要專門的系統來進行。更糟的是,MapReduce要求每個步驟間的數據要序列化到磁盤,這意味着MapReduce作業的I/O成本很高,導致交互分析和迭代算法(iterative algorithms)開銷很大;而事實是,幾乎所有的最優化和機器學習都是迭代的

為了解決這些問題,Hadoop一直在向一種更為通用的資源管理框架轉變,即YARN(Yet Another Resource Negotiator, 又一個資源協調者)。YARN實現了下一代的MapReduce,但同時也允許應用利用分布式資源而不必采用MapReduce進行計算。通過將集群管理一般化,研究轉到分布式計算的一般化上,來擴展了MapReduce的初衷。

一:Spark簡介

官網:http://spark.apache.org/

Apache Spark™是用於大規模數據處理的統一分析引擎,是第一個脫胎於該轉變的快速、通用分布式計算范式,並且很快流行起來。Spark使用函數式編程范式擴展了MapReduce模型以支持更多計算類型,可以涵蓋廣泛的工作流,這些工作流之前被實現為Hadoop之上的特殊系統。Spark使用內存緩存來提升性能,因此進行交互式分析也足夠快速(就如同使用Python解釋器,與集群進行交互一樣)。緩存同時提升了迭代算法的性能,這使得Spark非常適合數據理論任務,特別是機器學習。

二:安裝spark

因為Python常備用來做傳統機器學習,深度學習的任務,為了它們相結合,我們使用python版spark.

安裝 pip install pyspark 

 

運行 pyspark ,還需java環境

配置完java環境,在運行pyspark

 

三:spark結合傳統機器學習做數據分析

## Spark Application - execute with spark-submit

## Imports
import csv
import matplotlib.pyplot as plt

from io import StringIO
from datetime import datetime
from collections import namedtuple
from operator import add, itemgetter
from pyspark import SparkConf, SparkContext

## Module Constants
APP_NAME = "Flight Delay Analysis"
DATE_FMT = "%Y-%m-%d"
TIME_FMT = "%H%M"

fields = ('date', 'airline', 'flightnum', 'origin', 'dest', 'dep',
          'dep_delay', 'arv', 'arv_delay', 'airtime', 'distance')
Flight = namedtuple('Flight', fields)


## Closure Functions
def parse(row):
    """
    Parses a row and returns a named tuple.
    """

    row[0] = datetime.strptime(row[0], DATE_FMT).date()
    row[5] = datetime.strptime(row[5], TIME_FMT).time()
    row[6] = float(row[6])
    row[7] = datetime.strptime(row[7], TIME_FMT).time()
    row[8] = float(row[8])
    row[9] = float(row[9])
    row[10] = float(row[10])
    return Flight(*row[:11])


def split(line):
    """
    Operator function for splitting a line with csv module
    py2
    reader = csv.reader(StringIO(line))
    return reader.next()
    """
    reader = csv.reader(StringIO(line))
    return next(reader)


def plot(delays):
    """
    Show a bar chart of the total delay per airline
    """
    airlines = [d[0] for d in delays]
    minutes = [d[1] for d in delays]
    index = list(range(len(airlines)))

    fig, axe = plt.subplots()
    bars = axe.barh(index, minutes)

    # Add the total minutes to the right
    for idx, air, min in zip(index, airlines, minutes):
        if min > 0:
            bars[idx].set_color('#d9230f')
            axe.annotate(" %0.0f min" % min, xy=(min + 1, idx + 0.5), va='center')
        else:
            bars[idx].set_color('#469408')
            axe.annotate(" %0.0f min" % min, xy=(10, idx + 0.5), va='center')

    # Set the ticks
    ticks = plt.yticks([idx + 0.5 for idx in index], airlines)
    xt = plt.xticks()[0]
    plt.xticks(xt, [' '] * len(xt))

    # minimize chart junk
    plt.grid(axis='x', color='white', linestyle='-')

    plt.title('Total Minutes Delayed per Airline')
    plt.show()


## Main functionality
def main(sc):
    # Load the airlines lookup dictionary
    airlines = dict(sc.textFile("data/airlines.csv").map(split).collect())

    # Broadcast the lookup dictionary to the cluster
    airline_lookup = sc.broadcast(airlines)

    # Read the CSV Data into an RDD
    flights = sc.textFile("data/flights.csv").map(split).map(parse)

    # Map the total delay to the airline (joined using the broadcast value)
    delays = flights.map(lambda f: (airline_lookup.value[f.airline],
                                    add(f.dep_delay, f.arv_delay)))

    # Reduce the total delay for the month to the airline
    delays = delays.reduceByKey(add).collect()
    delays = sorted(delays, key=itemgetter(1))

    # Provide output from the driver
    for d in delays:
        print ("%0.0f minutes delayed\t%s" % (d[1], d[0]))

    # Show a bar chart of the delays
    plot(delays)


if __name__ == "__main__":
    # Configure Spark
    conf = SparkConf().setMaster("local[*]")
    conf = conf.setAppName(APP_NAME)
    sc = SparkContext(conf=conf)

    # Execute Main functionality
    main(sc)

數據集下載:https://www.dropbox.com/s/gnzztknnhrx81uv/ontime.zip?dl=0

參考:https://www.cnblogs.com/Vito2008/p/5216324.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM