摘要
Faust是用python開發的一個分布式流式處理框架。在一個機器學習應用中,機器學習算法可能被用於數據流實時處理的各個環節,而不是僅僅在推理階段,算法也不僅僅局限於常見的分類回歸算法,而是會根據業務需要執行一個十分差異化的任務, 例如:在我們的時序異常檢測應用中, 前處理階段的變點檢測算法。這就要求流處理框架除了具備進行常規的轉換聚合操作之外,可以支持更加強大的任意自定義邏輯和更加復雜的自定義狀態,能夠更好地與原生的python算法代碼緊密結合在一起。在主流的flink, spark streaming不能滿足我們的個性化需求時, Faust為我們提供了一個選擇.
本文將對faust框架的主要功能進行概要描述。
參考連接
https://faust.readthedocs.io/en/latest/
https://github.com/robinhood/faust
基本使用
app
faust庫的一個實例,提供了Faust的核心API,通過app可定義kafka topic、流處理器等。
>>> app = faust.App(
... 'myid',
... broker='kafka://kafka.example.com',
... store='rocksdb://',
... )
創建topic
faust以kafka作為數據傳輸和自組織管理的媒介,可以直接在faust應用中定義kafka主題。
topic = app.topic('name_of_topic')
@app.agent(topic)
async def process(stream):
async for event in stream:
...
創建table
table是Faust中的分布式鍵值對數據表,可用於保存流處理過程中的中間狀態。
transfer_counts = app.Table(
'transfer_counts',
default=int,
key_type=str,
value_type=int,
)
創建agent
agent是數據處理流中的一個基本處理單元,通過從kafka中攝取指定topic中的數據,並進行相應的處理。
import faust
app = faust.App('stream-example')
@app.agent()
async def myagent(stream):
"""Example agent."""
async for value in stream:
print(f'MYAGENT RECEIVED -- {value!r}')
yield value
if __name__ == '__main__':
app.main()
agent——分布式自組織流處理器
import faust
class Add(faust.Record):
a: int
b: int
app = faust.App('agent-example')
topic = app.topic('adding', value_type=Add)
@app.agent(topic)
async def adding(stream):
async for value in stream:
yield value.a + value.b
命令行中執行:faust -A examples.agent worker -l info,即可運行這個app。
sinks
可定義事件處理后的額外操作,比如推送告警等。一個sink可以是一個callable、異步callable、另外一個主題、另外一個agent等等。
回調函數
def mysink(value):
print(f'AGENT YIELD: {value!r}')
@app.agent(sink=[mysink])
async def myagent(stream):
async for value in stream:
yield process_value(value)
異步回調
async def mysink(value):
print(f'AGENT YIELD: {value!r}')
await asyncio.sleep(1)
@app.agent(sink=[mysink])
async def myagent(stream):
...
另外一個topic
agent_log_topic = app.topic('agent_log')
@app.agent(sink=[agent_log_topic])
async def myagent(stream):
...
另外一個agent
@app.agent()
async def agent_b(stream):
async for event in stream:
print(f'AGENT B RECEIVED: {event!r}')
@app.agent(sink=[agent_b])
async def agent_a(stream):
async for event in stream:
print(f'AGENT A RECEIVED: {event!r}')
streams
basics
stream是一個無限的異步可迭代對象,從topic中消費數據。
stream的常規使用為:
@app.agent(my_topic)
async def process(stream):
async for value in stream:
...
也可以自己創建一個stream:
stream = app.stream(my_topic) # or: my_topic.stream()
async for value in stream:
...
但是要處理自定義數據流,需要首先定義一個任務,在app啟動時執行這個任務。一個完整的例子:
import faust
class Withdrawal(faust.Record):
account: str
amount: float
app = faust.App('example-app')
withdrawals_topic = app.topic('withdrawals', value_type=Withdrawal)
@app.task
async def mytask():
async for w in withdrawals_topic.stream():
print(w.amount)
if __name__ == '__main__':
app.main()
processors
一個stream可以有任意多個處理器回調。
def add_default_language(value: MyModel) -> MyModel:
if not value.language:
value.language = 'US'
return value
async def add_client_info(value: MyModel) -> MyModel:
value.client = await get_http_client_info(value.account_id)
return value
s = app.stream(my_topic,
processors=[add_default_language, add_client_info])
kafka主題
每個faustworker會啟動一個kafka consumer消費數據。如果兩個agent消費了相同的主題,那么兩個agent會分別受到相同的消息,每次消息被回執,那么引用級數-1,當引用計數為0時,consumer就可以提交偏移量了。
操作
groupby
對流進行重新分區。新的流會使用一個新的中間主題,並以相應的字段作為鍵,這個新的流是agent最終迭代的流。
import faust
class Order(faust.Record):
account_id: str
product_id: str
amount: float
price: float
app = faust.App('group-by-example')
orders_topic = app.topic('orders', value_type=Order)
@app.agent(orders_topic)
async def process(orders):
async for order in orders.group_by(Order.account_id):
...
流分區的關鍵字不僅可以是數據中的字段,也可以是一個callable。
def get_order_account_id(order):
return json.loads(order)['account_id']
@app.agent(app.topic('order'))
async def process(orders):
async for order in orders.group_by(get_order_account_id):
...
take
緩存數據。
@app.agent()
async def process(stream):
async for values in stream.take(100):
assert len(values) == 100
print(f'RECEIVED 100 VALUES: {values}')
through
將流推送到一個新的topic,並迭代新的topic里的數據.
source_topic = app.topic('source-topic')
destination_topic = app.topic('destination-topic')
@app.agent()
async def process(stream):
async for value in stream.through(destination_topic):
# we are now iterating over stream(destination_topic)
print(value)
filter
過濾操作.
@app.agent()
async def process(stream):
async for value in stream.filter(lambda: v > 1000).group_by(...):
...
Channels & Topics--數據源
basics
agents迭代streams, streams迭代channels.
Models, Serialization, and Codecs
model
model用來描述數據結構, 例如:
class Point(Record, serializer='json'):
x: int
y: int
匿名agent
匿名agent不顯示地使用一個topic,而是自己創建topic,在定義好消息類型后,只需直接向該agent發送相應地消息即可.
@app.agent(key_type=Point, value_type=Point)
async def my_agent(events):
async for event in events:
print(event)
await my_agent.send(key=Point(x=10, y=20), value=Point(x=30, y=10))
schema
定義鍵值的類型和序列化反序列化器
schema = faust.Schema(
key_type=Point,
value_type=Point,
key_serializer='json',
value_serializer='json',
)
topic = app.topic('mytopic', schema=schema)
collections
model中的一個field可以是一個其他類型數據的列表.
from typing import List
import faust
class User(faust.Record):
accounts: List[Account]
支持的其他類型為: set, mapping, tuple.
tables和windowing
tables
table是Faust中的分布式內存數據表,使用kafka的changelog topic作為后端進行持久化和容錯.
table = app.Table('totals', default=int)
table的修改只能在流操作只能進行, 否則會報錯.
Co-partitioning Tables and Streams
table的任何鍵的數據只能存在於一台主機上.有狀態的流處理要求table和stream協同分區,即同一台主機處理的流和table必須共享相同的分區.因此在操作table的流迭代中需要對流重新分區.
withdrawals_topic = app.topic('withdrawals', value_type=Withdrawal)
country_to_total = app.Table(
'country_to_total', default=int).tumbling(10.0, expires=10.0)
withdrawals_stream = app.topic('withdrawals', value_type=Withdrawal).stream()
withdrawals_by_country = withdrawals_stream.group_by(Withdrawal.country)
@app.agent
async def process_withdrawal(withdrawals):
async for withdrawal in withdrawals.group_by(Withdrawal.country):
country_to_total[withdrawal.country] += withdrawal.amount
如果要進行的計算分別以兩個不太的字段分組,則應使用兩個不同的agent, 分別groupby.
withdrawals_topic = app.topic('withdrawals', value_type=Withdrawal)
user_to_total = app.Table('user_to_total', default=int)
country_to_total = app.Table(
'country_to_total', default=int).tumbling(10.0, expires=10.0)
@app.agent(withdrawals_topic)
async def find_large_user_withdrawals(withdrawals):
async for withdrawal in withdrawals:
user_to_total[withdrawal.user] += withdrawal.amount
@app.agent(withdrawals_topic)
async def find_large_country_withdrawals(withdrawals):
async for withdrawal in withdrawals.group_by(Withdrawal.country):
country_to_total[withdrawal.country] += withdrawal.amount
windowing
window的定義
from datetime import timedelta
views = app.Table('views', default=int).tumbling(
timedelta(minutes=1),
expires=timedelta(hours=1),
)
可以定義window使用的時間,包括系統時間relativ_to_now(), 當前流的處理時間relative_to_current(),相對數據中的時間字段relative_to_field().
views = app.Table('views', default=int).tumbling(...).relative_to_stream()
事件亂序
windowed table可以正確處理亂序, 只要遲到的數據在table的過期時間內.