Flask 流式響應


背景

在 web 場景下,經常會碰到下載文件的需求,通常小文件我們會采用 Flask send_file 或者 send_from_directory的方式,下載,但是當下載的文件是一個大壓縮文件(>1GiB)時,這種方式就顯得不友好了,我們需要采用流式下載的方式返回給客戶端。

流式下載

簡單實現:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from flask import Response


def (file_path):
def generate():
if not os.path.exists(file_path):
raise "File not found."
with open(file_path, "rb") as f:
while True:
chunk = f.read(chunk_size=10 * 1024 * 1024)
if not chunk:
break
yield chunk

return Response(generate(), content_type="application/octet-stream")
```

運行 Flask app,可以正確下載文件,但是下載只有實時速度,沒有文件總大小,導致無法知道下載進度,也沒有文件類型,這些我們都可以通過增加 header 字段實現:
```python
response = Response(generate(), mimetype='application/gzip')
response.headers['Content-Disposition'] = 'attachment; filename={}.tar.gz'.format("download_file")
response.headers['content-length'] = os.stat(str(file_path)).st_size
return response

 

這樣,我們下載文件就可以看到文件類型、文件總大小及已下載大小了,其中 mimetype 根據實際壓縮文件類型修改匹配即可。


轉發流式下載

當我們下載本地節點文件,可以通過上述方法實現,但是如果我們的產品是集群形式的,要求在集群中的任一節點均可下載集群中所有節點的指定文件,我們就需要支持將流式下載轉發並實時下載,避免訪問節點占用太多內存。

如果是單節點轉發流式請求,我們可以通過 flask 的 stream_with_context 實現:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from flask import (
Flask,
Response,
stream_with_context
)
import requests

app = Flask(__name__)

@app.route("/download/<file_path>", method=["GET"])
def (file_path):
url_prefix = "http://1.1.1.1/"
remote_url = url_prefix + file_path
req = requests.get(remote_url, stream = True)
return Response(stream_with_context(req.iter_content()),
content_type = req.headers['content-type'])

if __name__ == "__main__":
app.run(host="0.0.0.0", debug=True)

 

在我們訪問 http://localhost:5000/download/file_name 時,通過 requests 訪問遠端節點 1.1.1.1 的地址,並將請求通過流式的方式轉發至客戶端,實現下載。

如果是轉發多節點流式請求,我們該如何保證多個請求最終 merge 后是一個正確的文件呢?
通過查詢資料,排除了標准庫中的 tarfile 和 zipfile 打包壓縮方式,最終采用 zipstream(https://github.com/allanlei/python-zipstream) 第三方庫實現。


zipstream 支持通過迭代器的方式寫入文件,並可實時壓縮讀取,官方示例如下:

1
2
3
4
5
6
7
8
9
10
def iterable():
for _ in xrange(10):
yield b'this is a byte stringx01n'

z = zipstream.ZipFile()
z.write_iter('my_archive_iter', iterable())

with open('zipfile.zip', 'wb') as f:
for data in z:
f.write(data)

 


根據上述特性,我們結合轉發單節點請求,實現同時請求多節點並實時壓縮下載:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@app.route("/cluster_download/<file_path>", method=["GET"])
def cluster_download(reqs):
def generate(req):
z = zipstream.ZipFile(mode="w", compression=zipstream.ZIP_DEFLATED)
for req in reqs:
host = req.raw._fp.fp._sock.getpeername()[0]
z.write_iter("%s.tar.gz" % host, req.iter_content(chunk_size=10 * 1024 * 1024)
for chunk in z:
yield chunk

def get_file_size(reqs):
size = 0
for req in reqs:
size += int(req.headers.get("content-length"))
return size

remote_hosts = ["1.1.1.1", "2.2.2.2"]
reqs = []
for host in remote_hosts:
req = requests.get("http://%s/%s" % (host, file_path), timeout=5, stream=True)
if req.status_code == 200:
reqs.append(req)
response = Response(generate(reqs))
response.headers['mimetype'] = 'application/zip'
response.headers['Content-Disposition'] = 'attachment; filename=cluster_logs.zip)
response.hreads['content-length'] = get_file_size(reqs)

當我們訪問 http://localhost/cluster_download/file_name 時,會先去 remote_hosts 中各個節點下載該文件,並通過 write_iter 的方式寫入到 zip 文件中,Flask Response 返回的是 zip 文件中的數據塊。


如果我們要在 zip 文件中增加某些運行過程中產生的數據,我們可以通過再定義一個生成器的方式:

1
2
3
4
def generate_file(content):
yield content

z.write_iter("running_status", generate_file)

 

這樣我們就可以在最終的 zip 文件中,包含一個名為 running_status 的文件,文件內容為 content 的內容。

總結

這個需求在日常使用中是很常見的,跟下載類似,上傳文件的話我們也可以采用類似的方式實現。

 

 

 

Streaming Contents

 

Sometimes you want to send an enormous amount of data to the client, much more than you want to keep in memory. When you are generating the data on the fly though, how do you send that back to the client without the roundtrip to the filesystem?

The answer is by using generators and direct responses.

Basic Usage

This is a basic view function that generates a lot of CSV data on the fly. The trick is to have an inner function that uses a generator to generate data and to then invoke that function and pass it to a response object:

from flask import Response @app.route('/large.csv') def generate_large_csv(): def generate(): for row in iter_all_rows(): yield ','.join(row) + '\n' return Response(generate(), mimetype='text/csv') 

Each yield expression is directly sent to the browser. Note though that some WSGI middlewares might break streaming, so be careful there in debug environments with profilers and other things you might have enabled.

Streaming from Templates

The Jinja2 template engine also supports rendering templates piece by piece. This functionality is not directly exposed by Flask because it is quite uncommon, but you can easily do it yourself:

from flask import Response def stream_template(template_name, **context): app.update_template_context(context) t = app.jinja_env.get_template(template_name) rv = t.stream(context) rv.enable_buffering(5) return rv @app.route('/my-large-page.html') def render_large_template(): rows = iter_all_rows() return Response(stream_template('the_template.html', rows=rows)) 

The trick here is to get the template object from the Jinja2 environment on the application and to call stream() instead of render() which returns a stream object instead of a string. Since we’re bypassing the Flask template render functions and using the template object itself we have to make sure to update the render context ourselves by calling update_template_context(). The template is then evaluated as the stream is iterated over. Since each time you do a yield the server will flush the content to the client you might want to buffer up a few items in the template which you can do with rv.enable_buffering(size)5 is a sane default.

Streaming with Context

Changelog

Note that when you stream data, the request context is already gone the moment the function executes. Flask 0.9 provides you with a helper that can keep the request context around during the execution of the generator:

from flask import stream_with_context, request, Response @app.route('/stream') def streamed_response(): def generate(): yield 'Hello ' yield request.args['name'] yield '!' return Response(stream_with_context(generate())) 

Without the stream_with_context() function you would get a RuntimeError at that point.

 
 
 
@app.route('/')
def aws_api_route_puppet_apply(ip=None): output = somemethod(var1,var2,var3) return Response(json.dumps(output), mimetype='application/json')

有沒有辦法使用flask和HTML將某些方法流式傳輸到瀏覽器或者我是否需要使用javascript?

 
就像文檔所說的那樣,只需創建一個生成器並生成要返回給客戶端的每一行.

 

如果輸出為10行,則以下內容將打印到客戶端的十行(因為它們可用)中的每一行:

 

@app.route('/')
def aws_api_route_puppet_apply(ip=None): def generate(): for row in somemethod(var1,var2,var3): yield row + '\n' return Response(generate(), mimetype='application/json')


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM