pandas.read_sql_query()讀取數據庫數據用chunksize的坑


最近一項工作需要讀取數據庫中1500萬條數據,考慮到數據量太大,不方便直接一次性讀取,不然會內存爆炸。想到用pandas.read_sql_query()里有一個chunksize可以分批返回chunksize個數據,於是用pandas試了下,代碼如下:

import pandas as pd
from sqlalchemy import create_engine
import psycopg2
import json

class DB_connection(object):
    def __init__(self):
        with open('config_db.json', 'r') as load_f:
            db_config = json.load(load_f)
        self.db_engine = create_engine(''.join(['postgresql+psycopg2://', db_config['USER'], ':', db_config['PASSWORD'], '@', db_config['HOST'], ':', str(db_config['PORT']), '/', db_config['DATABASE']]))
        self.db_conn = self.db_engine.connect()
        self.database = db_config['DATABASE']

    def read_from_table(self):
        data_gen = pd.read_sql_query(
            'SELECT case_id, text FROM first_case',
            self.db_conn, chunksize=2000
        )
        return data_gen

因為pandas.read_sql_query()加上chunksize后返回的是一個iterator。但運行程序時一直卡在那不動,看pandas.read_sql_query()源碼才知道它不是真正的分批次讀取,而是根據SQL語句全部讀取出來后,再把它按chunksize個一批一批地轉為iterator然后再返回。

    def read_query(self, sql, index_col=None, coerce_float=True,
                   parse_dates=None, params=None, chunksize=None):
        """Read SQL query into a DataFrame.

        Parameters
        ----------
        sql : string
            SQL query to be executed.
        index_col : string, optional, default: None
            Column name to use as index for the returned DataFrame object.
        coerce_float : boolean, default True
            Attempt to convert values of non-string, non-numeric objects (like
            decimal.Decimal) to floating point, useful for SQL result sets.
        params : list, tuple or dict, optional, default: None
            List of parameters to pass to execute method.  The syntax used
            to pass parameters is database driver dependent. Check your
            database driver documentation for which of the five syntax styles,
            described in PEP 249's paramstyle, is supported.
            Eg. for psycopg2, uses %(name)s so use params={'name' : 'value'}
        parse_dates : list or dict, default: None
            - List of column names to parse as dates.
            - Dict of ``{column_name: format string}`` where format string is
              strftime compatible in case of parsing string times, or is one of
              (D, s, ns, ms, us) in case of parsing integer timestamps.
            - Dict of ``{column_name: arg dict}``, where the arg dict
              corresponds to the keyword arguments of
              :func:`pandas.to_datetime` Especially useful with databases
              without native Datetime support, such as SQLite.
        chunksize : int, default None
            If specified, return an iterator where `chunksize` is the number
            of rows to include in each chunk.

        Returns
        -------
        DataFrame

        See also
        --------
        read_sql_table : Read SQL database table into a DataFrame
        read_sql

        """
        args = _convert_params(sql, params)

        result = self.execute(*args)
        columns = result.keys()

        if chunksize is not None:
            return self._query_iterator(result, chunksize, columns,
                                        index_col=index_col,
                                        coerce_float=coerce_float,
                                        parse_dates=parse_dates)
        else:
            data = result.fetchall()
            frame = _wrap_result(data, columns, index_col=index_col,
                                 coerce_float=coerce_float,
                                 parse_dates=parse_dates)
            return frame

上面源碼可以看到,它先用execute執行sql語句,然后在判斷是否有chunksize,沒有就直接返回所有數據,有的話根據chunksize返回一個iterator。所以這不是一個真正的分批次讀取,如果數據量大,還是會導致內存爆炸直至卡死。

最好的分批次方式是在SQL語句直接執行,比如加limit和offset。

SELECT case_id, text FROM first_case order by case_id limit 1000 offset 0

limit a offset b,表示跳過b個數據,讀取出a個數據,這樣可以固定a, 更新b就可實現一批一批地讀取到所有數據。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM