手把手教你写一个Python版的K线合成函数


在编写、使用策略时,经常会使用一些不常用的K线周期数据。然而交易所、数据源又没有提供这些周期的数据。只能通过使用已有周期的数据进行合成。合成算法已经有一个JavaScript版本了(链接),其实移植一段JavaScript代码为Python版本很简单。接下来我们一起写一个Python版本的K线合成算法。

 

JavaScript版本

function GetNewCycleRecords (sourceRecords, targetCycle) {    // K线合成函数
    var ret = []
    
    // 首先获取源K线数据的周期
    if (!sourceRecords || sourceRecords.length < 2) {
        return null
    }
    var sourceLen = sourceRecords.length
    var sourceCycle = sourceRecords[sourceLen - 1].Time - sourceRecords[sourceLen - 2].Time

    if (targetCycle % sourceCycle != 0) {
        Log("targetCycle:", targetCycle)
        Log("sourceCycle:", sourceCycle)
        throw "targetCycle is not an integral multiple of sourceCycle."
    }

    if ((1000 * 60 * 60) % targetCycle != 0 && (1000 * 60 * 60 * 24) % targetCycle != 0) {
        Log("targetCycle:", targetCycle)
        Log("sourceCycle:", sourceCycle)
        Log((1000 * 60 * 60) % targetCycle, (1000 * 60 * 60 * 24) % targetCycle)
        throw "targetCycle cannot complete the cycle."
    }

    var multiple = targetCycle / sourceCycle


    var isBegin = false 
    var count = 0
    var high = 0 
    var low = 0 
    var open = 0
    var close = 0 
    var time = 0
    var vol = 0
    for (var i = 0 ; i < sourceLen ; i++) {
        // 获取 时区偏移数值
        var d = new Date()
        var n = d.getTimezoneOffset()

        if (((1000 * 60 * 60 * 24) - sourceRecords[i].Time % (1000 * 60 * 60 * 24) + (n * 1000 * 60)) % targetCycle == 0) {
            isBegin = true
        }

        if (isBegin) {
            if (count == 0) {
                high = sourceRecords[i].High
                low = sourceRecords[i].Low
                open = sourceRecords[i].Open
                close = sourceRecords[i].Close
                time = sourceRecords[i].Time
                vol = sourceRecords[i].Volume

                count++
            } else if (count < multiple) {
                high = Math.max(high, sourceRecords[i].High)
                low = Math.min(low, sourceRecords[i].Low)
                close = sourceRecords[i].Close
                vol += sourceRecords[i].Volume

                count++
            }

            if (count == multiple || i == sourceLen - 1) {
                ret.push({
                    High : high,
                    Low : low,
                    Open : open,
                    Close : close,
                    Time : time,
                    Volume : vol,
                })
                count = 0
            }
        }
    }

    return ret 
}

 

有JavaScript算法,对于Python其实逐行翻译移植就可以了,遇到JavaScript的内置函数,或者固有方法,对应的去Python中查找对应的方法即可,所以移植还是比较容易的。
算法逻辑完全一模一样,只是JavaScript的函数调用var n = d.getTimezoneOffset(),移植到Python时,使用Python的time库中的n = time.altzone代替。其它差异仅仅是语言语法方面的了(例如for循环的使用,布尔值的差别,逻辑与、逻辑非、逻辑或的使用差别等..)。

 

移植后的Python代码:

import time

def GetNewCycleRecords(sourceRecords, targetCycle):
    ret = []

    # 首先获取源K线数据的周期
    if not sourceRecords or len(sourceRecords) < 2 : 
        return None

    sourceLen = len(sourceRecords)
    sourceCycle = sourceRecords[-1]["Time"] - sourceRecords[-2]["Time"]

    if targetCycle % sourceCycle != 0 :
        Log("targetCycle:", targetCycle)
        Log("sourceCycle:", sourceCycle)
        raise "targetCycle is not an integral multiple of sourceCycle."

    if (1000 * 60 * 60) % targetCycle != 0 and (1000 * 60 * 60 * 24) % targetCycle != 0 : 
        Log("targetCycle:", targetCycle)
        Log("sourceCycle:", sourceCycle)
        Log((1000 * 60 * 60) % targetCycle, (1000 * 60 * 60 * 24) % targetCycle)
        raise "targetCycle cannot complete the cycle."
    
    multiple = targetCycle / sourceCycle

    isBegin = False
    count = 0 
    barHigh = 0 
    barLow = 0 
    barOpen = 0
    barClose = 0 
    barTime = 0 
    barVol = 0 

    for i in range(sourceLen) : 
        # 获取时区偏移数值
        n = time.altzone        

        if ((1000 * 60 * 60 * 24) - (sourceRecords[i]["Time"] * 1000) % (1000 * 60 * 60 * 24) + (n * 1000)) % targetCycle == 0 :
            isBegin = True

        if isBegin : 
            if count == 0 : 
                barHigh = sourceRecords[i]["High"]
                barLow = sourceRecords[i]["Low"]
                barOpen = sourceRecords[i]["Open"]
                barClose = sourceRecords[i]["Close"]
                barTime = sourceRecords[i]["Time"]
                barVol = sourceRecords[i]["Volume"]
                count += 1
            elif count < multiple : 
                barHigh = max(barHigh, sourceRecords[i]["High"])
                barLow = min(barLow, sourceRecords[i]["Low"])
                barClose = sourceRecords[i]["Close"]
                barVol += sourceRecords[i]["Volume"]
                count += 1

            if count == multiple or i == sourceLen - 1 :
                ret.append({
                    "High" : barHigh,
                    "Low" : barLow,
                    "Open" : barOpen,
                    "Close" : barClose,
                    "Time" : barTime,
                    "Volume" : barVol,
                })
                count = 0
    
    return ret 

# 测试
def main():
    while True:
        r = exchange.GetRecords()
        r2 = GetNewCycleRecords(r, 1000 * 60 * 60 * 4)      

        ext.PlotRecords(r2, "r2")                                 
        Sleep(1000)     

 

测试

火币行情图表

回测合成4小时图表

以上代码仅作为学习参考使用,如果用于具体策略中,请根据需求修改、测试。
如有BUG或者改进建议,欢迎留言,十分感谢 o^_^o


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM