python websocket 獲取火幣 eth 歷史行情


import zlib
import websocket
import json
import redis
import pymysql
import time

rds = redis.StrictRedis(host='10.10.6.83', port=6379, db=3, password="q123q123")
db = pymysql.connect("10.10.6.83", "root", "q123q123", "test")

# 使用cursor()方法創建一個游標對象
cursor = db.cursor()


# 火幣歷史行情數據獲取
def huobi_history_market():
    # 火幣有api可以獲取行情數據,rest api 只提供了當前時間的行情,歷史行情需要 websocket api
    url = 'wss://www.huobi.ge/-/s/pro/ws'
    for i in range(5):
        try:
            # 創建websocket 連接
            ws = websocket.create_connection(url, timeout=5)
            break
        except:
            if i == 4:
                raise Exception("火幣url可能被封了, 請求了5次都失敗,換個url試試")
    while 1:
        # 獲取上一次數據歷史數據
        from_time = get_redis_last_time()
        to_time = from_time - 60 * 300
        if from_time <= int(time.mktime(time.strptime("2017-07-28 00:00:00", '%Y-%m-%d %H:%M:%S'))):
            exit(0)
        while 1:
            send_data = '{"req":"market.ethusdt.kline.1min","symbol":"ethusdt","period":"1min","from":%d,"to":%d}' % (
            to_time, from_time)

            # 發送請求獲取數據
            ws.send(send_data)
            # 接收響應數據
            content_compress = ws.recv()
            # 數據解壓縮
            content = zlib.decompress(content_compress, 16 + zlib.MAX_WBITS)

            # 數據轉碼
            content = content.decode()
            content = json.loads(content)
            # 判斷數據是否正常獲取到
            if content.get("status") == "ok":
                data = content.get("data")
                sql = " replace into eth_info(id, open, close,low,high,amount,vol,count) values "
                values_list = []
                for i in range(len(data)):
                    value = [str(data[i]["id"]), str(data[i]["open"]), str(data[i]["close"]), str(data[i]["low"]),
                             str(data[i]["high"]), str(data[i]["amount"]), str(data[i]["vol"]), str(data[i]["count"])]
                    values_list.append("(" + ", ".join(value) + ")")

                # 拼接sql
                sql += ", ".join(values_list)
                cursor.execute(sql)
                db.commit()
                rds.set("last_time", data[0]["id"])
                break


# 獲取上一次爬取的時間
def get_redis_last_time():
    last_time = rds.get("last_time")
    if not last_time:
        last_time = get_mysql_last_time()
    if not last_time:
        last_time = get_now_time()
    if last_time:
        last_time = int(last_time)
    return last_time


def get_mysql_last_time():
    # 使用execute()方法執行SQL語句
    cursor.execute("SELECT id FROM eth_info order by id asc limit 0, 1")
    # 使用fetall()獲取全部數據
    data = cursor.fetchall()
    if not data:
        return None
    return data[0][0]


def get_now_time():
    a = time.strftime('%Y-%m-%d %H:%M:00', time.localtime(time.time() - 60))
    last_time = int(time.mktime(time.strptime(a, '%Y-%m-%d %H:%M:%S')))
    return last_time


if __name__ == '__main__':
    huobi_history_market()
    # 關閉游標和數據庫的連接
    cursor.close()
    db.close()


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM