財經數據(6)-Python多進程爬蟲東方財富個股盤口異動數據


1、先上個圖看下網頁版數據、mysql結構化數據

 

 

2、分析思路:

該網頁主要采用動態加載來實現的,通過刷新頁面查看URL,最終發現想要的數據,在js鏈接當中,進行頭文件分析,構造URL,完成數據獲取

數據存儲方式上,嘗試了Python單條讀寫mysql、利用Pandas構造DataFrame存儲,2種方式,通過實驗發現:通過Python讀寫mysql執行時間為:1477s,而通過Pandas讀寫

mysql執行時間為:47s,方法2速度幾乎是方法1的30倍。在於IO讀寫上,Python多線程顯得非常雞肋,具體分析可參考:https://cuiqingcai.com/3325.html

 具體代碼及數據如下:

Python讀寫Mysql

# -*- coding: utf-8 -*-
import pandas as pd
import tushare as ts
import pymysql
import time
import requests
import json
from multiprocessing import Pool
import traceback

# ====================東方財富個股盤口異動數據抓取============================================================================================================
def EMydSpider(param_list):
    # 抓取東財個股盤口異動數據:http://quote.eastmoney.com/changes

    # 創建計數器
    success, fail = 0, 0

    # 獲取當天日期
    cur_date = time.strftime("%Y%m%d", time.localtime())

    # 創建MySQL連接對象
    conn_mysql = pymysql.connect(user='root', password='123456', database='stock', charset='utf8')
    cursor = conn_mysql.cursor()

    header = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (Khtml, like Gecko) Chrome/70.0.3538.25 Safari/537.36 Core/1.70.3676.400 QQBrowser/10.5.3738.400"}
    url = "http://push2ex.eastmoney.com/getAllStockChanges?type=8201,8202,8193,4,32,64,8207,8209,8211,8213,8215,8204,8203,8194,8,16,128,8208,8210,8212,8214,8216"
    session = requests.Session()
    for param in param_list:
        try:
            html = json.loads(session.get(url=url, params=param, headers=header).text)
            allstock = html['data']['allstock']
            for stock in allstock:
                stk_code = stock['c']  # 股票代碼,無后綴
                stk_name = stock['n']  # 股票名稱
                chg_time = stock['tm']  # 異動時間
                chg_type = stock['t']  # 異動類型
                chg_value = stock['i']  # 異動值

                try:
                    sql = '''insert into stock_yd_list(stk_code,trade_date,chg_time,chg_type,chg_value) values('%s','%s','%s','%s','%s')''' % (stk_code, cur_date, chg_time, chg_type, chg_value)
                    cursor.execute(sql)
                    conn_mysql.commit()
                    success += 1
                    print("東方財富盤口異動,第%d條數據存儲完成......" % success)
                except:
                    conn_mysql.rollback()
                    fail += 1
                    traceback.print_exc()
                    print("東方財富盤口異動,第%d條數據存儲失敗......" % fail)
        except:
            traceback.print_exc()
            exit()

    cursor.close()
    conn_mysql.close()
    print('當天個股盤口異動數據獲取完畢,新入庫數據:%d條' % success)
    print('當天個股盤口異動數據獲取完畢,入庫失敗數據:%d條' % fail)

# ====================主函數====================================================================================================================================
if __name__=='__main__':
    print("東財異動程序開始執行")
    start = time.time()
    # 定義空列表
    param_list = []
    for page in range(0,300):
        param = {"pageindex": page, "pagesize": '64', "ut": '7eea3edcaed734bea9cbfc24409ed989', "dpt": 'wzchanges'}
        param_list.append(param)

    # 創建多進程
    pool = Pool(processes=4)

    # 開啟多進程爬取東財異動數據
    try:
        pool.map(EMydSpider, (param_list,))
    except:
        print("多進程執行error")
        traceback.print_exc()

    end = time.time()
    print('東財異動程序共執行%0.2f秒.' % ((end - start)))

執行時間:

 

Pandas讀寫Mysql

# -*- coding: utf-8 -*-
import pandas as pd
import tushare as ts
import time
import requests
import json
from sqlalchemy import create_engine
from multiprocessing import Pool
from requests.packages.urllib3.exceptions import InsecureRequestWarning

# ====================東方財富個股盤口異動數據抓取============================================================================================================
def EMydSpider(param_list):
    # 抓取東財個股盤口異動數據:http://quote.eastmoney.com/changes

    # 獲取當天日期並創建數據庫引擎
    cur_date = time.strftime("%Y%m%d", time.localtime()); engine = create_engine('mysql://root:123456@127.0.0.1/quant?charset=utf8')

    # 創建空列表、空DataFrame,分別用於存儲html、異動數據
    html_list = []; stock_yd = pd.DataFrame()

    # 分析找到真正能請求到數據的URL
    header = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (Khtml, like Gecko) Chrome/70.0.3538.25 Safari/537.36 Core/1.70.3676.400 QQBrowser/10.5.3738.400"}
    url = "http://push2ex.eastmoney.com/getAllStockChanges?type=8201,8202,8193,4,32,64,8207,8209,8211,8213,8215,8204,8203,8194,8,16,128,8208,8210,8212,8214,8216"

    # 模擬發送get請求,並實例化session對象,維持會話
    session = requests.Session()
    # 禁用安全請求警告
    requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
    for param in param_list:
        try:
            html = json.loads(session.get(url=url, params=param, headers=header).text)
            html_list.append(html)
            print("第%s頁東財個股異動數據已抓取" % (param_list.index(param) + 1))
        except Exception as spider_error:
            print("html抓取過程報錯,錯誤信息為:%s" % spider_error)

    print("--------------------------------------")
    print("開始抓取東方財富個股盤口異動網頁數據解析")
    for html in html_list:
        try:
            allstock = html['data']['allstock']
            for stock in allstock:
                code = stock['c']       # 股票代碼,無后綴
                stk_name = stock['n']   # 股票名稱
                chg_time = stock['tm']  # 異動時間
                chg_type = stock['t']   # 異動類型
                chg_value = stock['i']  # 異動值

                dict = {'symbol': code, 'trade_date': cur_date, 'chg_time': chg_time, 'chg_type': chg_type, 'chg_value': chg_value}
                stock_yd = stock_yd.append(dict, ignore_index=True)
        except Exception as parse_error:
            print("html解析過程報錯,錯誤信息為:%s" % parse_error)

    stock_yd = stock_yd[['symbol', 'trade_date', 'chg_time', 'chg_type', 'chg_value']]
    stock_yd.to_sql('disks_change', engine, if_exists='append', index = False)

    print(stock_yd)
    print("本次存儲東方財富個股異動數據%s條" % stock_yd.shape[0])


# ====================主函數====================================================================================================================================
if __name__ == '__main__':
    print("東方財富個股異動爬蟲程序開始執行")
    print("--------------------------------------")
    start = time.time()

    # 定義空列表
    param_list = []

    # 構建表單
    for page in range(0, 300):
        param = {"pageindex": page, "pagesize": '64', "ut": '7eea3edcaed734bea9cbfc24409ed989', "dpt": 'wzchanges'}
        param_list.append(param)

    # 創建線程池
    pool = Pool(processes=4)

    # 開啟多進程爬取東財異動數據
    try:
        pool.map(EMydSpider, (param_list, ))
    except Exception as error:
        print("進程執行過程報錯,錯誤信息為:%s"%error)

    end = time.time()
    print('東方財富個股異動爬蟲程序共執行%0.2f秒.' % ((end - start)))
    print("東方財富個股異動爬蟲程序執行完成")

 

  

 

 

執行時間:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM