使用生成器把Kafka寫入速度提高1000倍



title: 使用生成器把Kafka寫入速度提高1000倍
toc: true
comment: true
date: 2018-04-13 21:35:09
tags: ['Python', '經驗']
category: ['Python']

通過本文你會知道Python里面什么時候用yield最合適。本文不會給你講生成器是什么,所以你需要先了解Python的yield,再來看本文。

疑惑

多年以前,當我剛剛開始學習Python協程的時候,我看到絕大多數的文章都舉了一個生產者-消費者的例子,用來表示在生產者內部可以隨時調用消費者,達到和多線程相同的效果。這里憑記憶簡單還原一下當年我看到的代碼:

import time


def consumer():
    product = None
    while True:
        if product is not None:
            print('consumer: {}'.format(product))
        product = yield None


def producer():
    c = consumer()
    next(c)
    for i in range(10):
        c.send(i)

start = time.time()
producer()
end = time.time()
print(f'直到把所有數據塞入Kafka,一共耗時:{end - start}秒')

運行效果如下圖所示。

這些文章的說法,就像統一好了口徑一樣,說這樣寫可以減少線程切換開銷,從而大大提高程序的運行效率。但是當年我始終想不明白,這種寫法與直接調用函數有什么區別,如下圖所示。

直到后來我需要操作Kafka的時候,我明白了使用yield的好處。

探索

為了便於理解,我會把實際場景做一些簡化,以方便說明事件的產生發展和解決過程。事件的起因是我需要把一些信息寫入到Kafka中,我的代碼一開始是這樣的:

import time
from pykafka import KafkaClient

client = KafkaClient(hosts="127.0.0.1:9092")
topic = client.topics[b'test']


def consumer(product):
    with topic.get_producer(delivery_reports=True) as producer:
        producer.produce(str(product).encode())


def feed():
    for i in range(10):
        consumer(i)


start = time.time()
feed()
end = time.time()
print(f'直到把所有數據塞入Kafka,一共耗時:{end - start}秒')

這段代碼的運行效果如下圖所示。

寫入10條數據需要100秒,這樣的龜速顯然是有問題的。問題就出在這一句代碼:

with topic.get_producer(delivery_reports=True) as producer

獲得Kafka生產者對象是一個非常耗費時間的過程,每獲取一次都需要10秒鍾才能完成。所以寫入10個數據就獲取十次生產者對象。這消耗的100秒主要就是在獲取生產者對象,而真正寫入數據的時間短到可以忽略不計。

由於生產者對象是可以復用的,於是我對代碼作了一些修改:

import time
from pykafka import KafkaClient

client = KafkaClient(hosts="127.0.0.1:9092")
topic = client.topics[b'test']
products = []


def consumer(product_list):
    with topic.get_producer(delivery_reports=True) as producer:
        for product in product_list:
            producer.produce(str(product).encode())


def feed():
    for i in range(10):
        products.append(i)
    consumer(products)


start = time.time()
feed()
end = time.time()
print(f'直到把所有數據塞入Kafka,一共耗時:{end - start}秒')

首先把所有數據存放在一個列表中,最后再一次性給consumer函數。在一個Kafka生產者對象中展開列表,再把數據一條一條塞入Kafka。這樣由於只需要獲取一次生產者對象,所以需要耗費的時間大大縮短,如下圖所示。

這種寫法在數據量小的時候是沒有問題的,但數據量一旦大起來,如果全部先放在一個列表里面的話,服務器內存就爆了。

於是我又修改了代碼。每100條數據保存一次,並清空暫存的列表:

import time
from pykafka import KafkaClient

client = KafkaClient(hosts="127.0.0.1:9092")
topic = client.topics[b'test']


def consumer(product_list):
    with topic.get_producer(delivery_reports=True) as producer:
        for product in product_list:
            producer.produce(str(product).encode())


def feed():
    products = []
    for i in range(1003):
        products.append(i)
        if len(products) >= 100:
            consumer(products)
            products = []

    if products:
        consumer(products)


start = time.time()
feed()
end = time.time()
print(f'直到把所有數據塞入Kafka,一共耗時:{end - start}秒')

由於最后一輪循環可能無法湊夠100條數據,所以feed函數里面,循環結束以后還需要判斷products列表是否為空,如果不為空,還要再消費一次。這樣的寫法,在上面這段代碼中,一共1003條數據,每100條數據獲取一次生產者對象,那么需要獲取11次生產者對象,耗時至少為110秒。

顯然,要解決這個問題,最直接的辦法就是減少獲取Kafka生產者對象的次數並最大限度復用生產者對象。如果讀者舉一反三的能力比較強,那么根據開關文件的兩種寫法:

# 寫法一
with open('test.txt', 'w', encoding='utf-8') as f:
    f.write('xxx')
    
# 寫法二
f = open('test.txt', 'w', encoding='utf-8')
f.write('xxx')
f.close()

可以推測出獲取Kafka生產者對象的另一種寫法:

# 寫法二
producer = topic.get_producer(delivery_reports=True)
producer.produce(b'xxxx')
producer.close()

這樣一來,只要獲取一次生產者對象並把它作為全局變量就可以一直使用了。

然而,pykafka的官方文檔中使用的是第一種寫法,通過上下文管理器with來獲得生產者對象。暫且不論第二種方式是否會報錯,只從寫法上來說,第二種方式必需要手動關閉對象。開發者經常會出現開了忘記關的情況,從而導致很多問題。而且如果中間出現了異常,使用上下文管理器的第一種方式會自動關閉生產者對象,但第二種方式仍然需要開發者手動關閉。

函數VS生成器

但是如果使用第一種方式,怎么能在一個上下文里面接收生產者傳進來的數據呢?這個時候才是yield派上用場的時候。

首先需要明白,使用yield以后,函數就變成了一個生成器。生成器與普通函數的不同之處可以通過下面兩段代碼來進行說明:

def funciton(i):
    print('進入')
    print(i)
    print('結束')

for i in range(5):
    funciton(i)

運行效果如下圖所示。

函數在被調用的時候,函數會從里面的第一行代碼一直運行到某個return或者函數的最后一行才會退出。

而生成器可以從中間開始運行,從中間跳出。例如下面的代碼:

def generator():
    print('進入')
    i = None
    while True:
        if i is not None:
            print(i)
        print('跳出')
        i = yield None

g = generator()
next(g)
for i in range(5):
    g.send(i)

運行效果如下圖所示。

從圖中可以看到,進入只打印了一次。代碼運行到i = yield None后就跳到外面,外面的數據可以通過g.send(i)的形式傳進生成器,生成器內部拿到外面傳進來的數據以后繼續執行下一輪while循環,打印出被傳進來的內容,然后到i = yield None的時候又跳出。如此反復。

所以回到最開始的Kafka問題。如果把with topic.get_producer(delivery_reports=True) as producer寫在上面這一段代碼的print('進入')這個位置上,那豈不是只需要獲取一次Kafka生產者對象,然后就可以一直使用了?

根據這個邏輯,設計如下代碼:

import time
from pykafka import KafkaClient

client = KafkaClient(hosts="127.0.0.1:9092")
topic = client.topics[b'test']


def consumer():
    with topic.get_producer(delivery_reports=True) as producer:
        print('init finished..')
        next_data = ''
        while True:
            if next_data:
                producer.produce(str(next_data).encode())
            next_data = yield True


def feed():
    c = consumer()
    next(c)
    for i in range(1000):
        c.send(i)

start = time.time()
feed()
end = time.time()
print(f'直到把所有數據塞入Kafka,一共耗時:{end - start}秒')

這一次直接插入1000條數據,總共只需要10秒鍾,相比於每插入一次都獲取一次Kafka生產者對象的方法,效率提高了1000倍。運行效果如下圖所示。

后記

讀者如果仔細對比第一段代碼和最后一段代碼,就會發現他們本質上是一回事。但是第一段代碼,也就是網上很多人講yield的時候舉的生產者-消費者的例子之所以會讓人覺得毫無用處,就在於他們的消費者幾乎就是秒運行,這樣看不出和函數調用的差別。而我最后這一段代碼,它的消費者分成兩個部分,第一部分是獲取Kafka生產者對象,這個過程非常耗時;第二部分是把數據通過Kafka生產者對象插入Kafka,這一部分運行速度極快。在這種情況下,使用生成器把這個消費者代碼分開,讓耗時長的部分只運行一次,讓耗時短的反復運行,這樣就能體現出生成器的優勢。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM