背景
前段時間在做一個算法測試,需要對源於日志的數據進行分析才能獲取到結果;日志文件較大,所以想要獲取數據的變化曲線,增量讀取是最好的方式。

網上有很多人的技術博客都是寫的用for循環readline以及一個計數器去增量讀取,假如文件很大,遍歷一次太久。而且對於很多大文件的增量讀取,如果遍歷每一行比對歷史記錄的輸出或者全都加載到內存通過歷史記錄的索引查找,是非常浪費資源的,
獲取文件句柄的基本理論中就包含指針操作。linux的文件描述符的struct里有一個f_pos的這么個屬性,里面存着文件當前讀取位置,通過這個東東經過vfs的一系列映射就會得到硬盤存儲的位置了,所以很直接,很快。
在Python中的讀取文件的方法也有類似的屬性。
具體實現
Python中相關方法的核心函數如下:
| 函數 | 作用 |
|---|---|
| tell() | 返回文件當前位置 |
| seek() | 從指定位置開始讀取信息 |
其中seek()有三種模式:
- f.seek(p,0) 移動當文件第p個字節處,絕對位置
- f.seek(p,1) 移動到相對於當前位置之后的p個字節
- f.seek(p,2) 移動到相對文章尾之后的p個字節
參考代碼:
#!/usr/bin/python
fd=open("test.txt",'r') #獲得一個句柄
for i in xrange(1,3): #讀取三行數據
fd.readline()
label=fd.tell() #記錄讀取到的位置
fd.close() #關閉文件
#再次閱讀文件
fd=open("test.txt",'r') #獲得一個句柄
fd.seek(label,0)# 把文件讀取指針移動到之前記錄的位置
fd.readline() #接着上次的位置繼續向下讀取
拓展
如何得知這個大文件行數,以及變化
我的想法:
方式1: 遍歷'\n'字符。
方式2: 開始時就在for循環中對fd.readline()計數,變化的部分(用上文說的seek、tell函數做)再用for循環fd.readline()進行統計。
如何避免文件讀取時,內存溢出
- 可以通過 read 函數的chunk關鍵字來指定每次讀區數據的大小
- 使用生成器確保只有在數據被調用時才會生成
具體方法封裝如下:
def read_in_chunks(file_path, chunk=100 * 100): # 通過chunk指定每次讀取文件的大小防止內存占用過大
file_object = open(file_path, "r")
while True:
data = file_object.read(chunk)
if not data:
file_object.close()
break
# 使用generator(生成器)使數據只有在被使用時才會迭代時占用內存
yield data
應用
20191129添加
根據博客園一個朋友的實際問題寫的一段應用代碼,解決程序運行異常、斷點再讀問題:
#! /usr/bin/python
# coding:utf-8
"""
@author:Bingo.he
@file: 20191129-file.py
@time: 2019/11/29
"""
import os
import glob
class opened(object):
def __init__(self, filename):
self.filename = filename
self.handle = open(filename)
if filename in get_read_info().keys():
self.handle.seek(get_read_info()[filename], 0)
def __enter__(self):
return self.handle
def __exit__(self, exc_type, exc_value, exc_trackback):
seek_num = self.handle.tell()
set_read_info(self.filename, seek_num)
self.handle.close()
if exc_trackback is None:
print(f"文件【{self.filename}】讀取正常退出。")
else:
print(f"文件【{self.filename}】讀取退出異常!")
def get_read_info():
"""
讀取已讀取的文件的句柄位置
:return:
"""
file_info = {}
# 如果文件不存在則創建一個空文件
if not os.path.exists("temp"):
with open("temp", 'w', encoding="utf-8") as f:
pass
return file_info
with open("temp", 'r', encoding="utf-8") as f:
datas = f.readlines()
for data in datas:
name, line = data.split("===")
file_info[name] = int(line)
return file_info
def set_read_info(filename, seek_num):
"""
設置為已經讀取的文件的句柄位置
:param filename: 文件名稱
:param seek_num: 句柄位置
:return:
"""
flag = True
with open("temp", 'r', encoding="utf-8") as f:
datas = f.readlines()
for num, data in enumerate(datas):
if filename in data:
flag = False
datas[num] = f"{filename}==={seek_num}\n"
if flag:
datas.append(f"{filename}==={seek_num}\n")
# print(datas)
with open("temp", 'w', encoding="utf-8") as f:
f.writelines(datas)
# 測試代碼
# 注:文件讀完之后,存儲在temp文件中的,第二次讀取時不會再讀,可以以刪除temp文件或者修改其中信息
pys = glob.glob("*.py") # 獲取當前目錄以Py結尾的文件
for py in pys:
with opened(py) as fp: # 默認為讀模式
for line_data in fp:
print(line_data)
