Faust——python分布式流式處理框架


摘要

Faust是用python開發的一個分布式流式處理框架。在一個機器學習應用中,機器學習算法可能被用於數據流實時處理的各個環節,而不是僅僅在推理階段,算法也不僅僅局限於常見的分類回歸算法,而是會根據業務需要執行一個十分差異化的任務, 例如:在我們的時序異常檢測應用中, 前處理階段的變點檢測算法。這就要求流處理框架除了具備進行常規的轉換聚合操作之外,可以支持更加強大的任意自定義邏輯和更加復雜的自定義狀態,能夠更好地與原生的python算法代碼緊密結合在一起。在主流的flink, spark streaming不能滿足我們的個性化需求時, Faust為我們提供了一個選擇.

本文將對faust框架的主要功能進行概要描述。

參考連接

https://faust.readthedocs.io/en/latest/

https://github.com/robinhood/faust

基本使用

app

faust庫的一個實例,提供了Faust的核心API,通過app可定義kafka topic、流處理器等。

>>> app = faust.App(
... 'myid',
... broker='kafka://kafka.example.com',
... store='rocksdb://',
... )

創建topic

faust以kafka作為數據傳輸和自組織管理的媒介,可以直接在faust應用中定義kafka主題。

topic = app.topic('name_of_topic')
@app.agent(topic)
async def process(stream):
  async for event in stream:
    ...

創建table

table是Faust中的分布式鍵值對數據表,可用於保存流處理過程中的中間狀態。

transfer_counts = app.Table(
'transfer_counts',
default=int,
key_type=str,
value_type=int,
)

創建agent

agent是數據處理流中的一個基本處理單元,通過從kafka中攝取指定topic中的數據,並進行相應的處理。

import faust
app = faust.App('stream-example')
@app.agent()
async def myagent(stream):
  """Example agent."""
  async for value in stream:
    print(f'MYAGENT RECEIVED -- {value!r}')
    yield value
if __name__ == '__main__':
  app.main()

agent——分布式自組織流處理器

import faust

class Add(faust.Record):
  a: int
  b: int

app = faust.App('agent-example')

topic = app.topic('adding', value_type=Add)
@app.agent(topic)
async def adding(stream):
  async for value in stream:
    yield value.a + value.b

命令行中執行:faust -A examples.agent worker -l info,即可運行這個app。

sinks

可定義事件處理后的額外操作,比如推送告警等。一個sink可以是一個callable、異步callable、另外一個主題、另外一個agent等等。

回調函數

def mysink(value):
  print(f'AGENT YIELD: {value!r}')
@app.agent(sink=[mysink])
async def myagent(stream):
  async for value in stream:
    yield process_value(value)

異步回調

async def mysink(value):
  print(f'AGENT YIELD: {value!r}')
  await asyncio.sleep(1)
@app.agent(sink=[mysink])
async def myagent(stream):
  ...

另外一個topic

agent_log_topic = app.topic('agent_log')
@app.agent(sink=[agent_log_topic])
async def myagent(stream):
  ...

另外一個agent

@app.agent()
async def agent_b(stream):
  async for event in stream:
    print(f'AGENT B RECEIVED: {event!r}')
@app.agent(sink=[agent_b])
async def agent_a(stream):
  async for event in stream:
    print(f'AGENT A RECEIVED: {event!r}')

streams

basics

stream是一個無限的異步可迭代對象,從topic中消費數據。

stream的常規使用為:

@app.agent(my_topic)
async def process(stream):
   async for value in stream:
    ...

也可以自己創建一個stream:

stream = app.stream(my_topic) # or: my_topic.stream()
async for value in stream:
  ...

但是要處理自定義數據流,需要首先定義一個任務,在app啟動時執行這個任務。一個完整的例子:

import faust
class Withdrawal(faust.Record):
  account: str
  amount: float
app = faust.App('example-app')
withdrawals_topic = app.topic('withdrawals', value_type=Withdrawal)
@app.task
async def mytask():
  async for w in withdrawals_topic.stream():
    print(w.amount)
if __name__ == '__main__':
  app.main()

processors

一個stream可以有任意多個處理器回調。

def add_default_language(value: MyModel) -> MyModel:
  if not value.language:
    value.language = 'US'
  return value
async def add_client_info(value: MyModel) -> MyModel:
  value.client = await get_http_client_info(value.account_id)
  return value
s = app.stream(my_topic,
  processors=[add_default_language, add_client_info])

kafka主題

每個faustworker會啟動一個kafka consumer消費數據。如果兩個agent消費了相同的主題,那么兩個agent會分別受到相同的消息,每次消息被回執,那么引用級數-1,當引用計數為0時,consumer就可以提交偏移量了。

操作

groupby

對流進行重新分區。新的流會使用一個新的中間主題,並以相應的字段作為鍵,這個新的流是agent最終迭代的流。

import faust
class Order(faust.Record):
  account_id: str
  product_id: str
  amount: float
  price: float
app = faust.App('group-by-example')
orders_topic = app.topic('orders', value_type=Order)
@app.agent(orders_topic)
async def process(orders):
  async for order in orders.group_by(Order.account_id):
    ...

流分區的關鍵字不僅可以是數據中的字段,也可以是一個callable。

def get_order_account_id(order):
  return json.loads(order)['account_id']
@app.agent(app.topic('order'))
async def process(orders):
  async for order in orders.group_by(get_order_account_id):
    ...

take

緩存數據。

@app.agent()
async def process(stream):
async for values in stream.take(100):
  assert len(values) == 100
  print(f'RECEIVED 100 VALUES: {values}')

through

將流推送到一個新的topic,並迭代新的topic里的數據.

source_topic = app.topic('source-topic')
destination_topic = app.topic('destination-topic')
@app.agent()
async def process(stream):
  async for value in stream.through(destination_topic):
    # we are now iterating over stream(destination_topic)
    print(value)

filter

過濾操作.

@app.agent()
async def process(stream):
  async for value in stream.filter(lambda: v > 1000).group_by(...):
    ...

Channels & Topics--數據源

basics

agents迭代streams, streams迭代channels.

Models, Serialization, and Codecs

model

model用來描述數據結構, 例如:

class Point(Record, serializer='json'):
  x: int
  y: int

匿名agent

匿名agent不顯示地使用一個topic,而是自己創建topic,在定義好消息類型后,只需直接向該agent發送相應地消息即可.

@app.agent(key_type=Point, value_type=Point)
async def my_agent(events):
  async for event in events:
    print(event)

 

await my_agent.send(key=Point(x=10, y=20), value=Point(x=30, y=10))

schema

定義鍵值的類型和序列化反序列化器

schema = faust.Schema(
key_type=Point,
value_type=Point,
key_serializer='json',
value_serializer='json',
)
topic = app.topic('mytopic', schema=schema)

collections

model中的一個field可以是一個其他類型數據的列表.

from typing import List
import faust
class User(faust.Record):
  accounts: List[Account]

支持的其他類型為: set, mapping, tuple.

tables和windowing

tables

table是Faust中的分布式內存數據表,使用kafka的changelog topic作為后端進行持久化和容錯.

table = app.Table('totals', default=int)

table的修改只能在流操作只能進行, 否則會報錯.

Co-partitioning Tables and Streams

table的任何鍵的數據只能存在於一台主機上.有狀態的流處理要求table和stream協同分區,即同一台主機處理的流和table必須共享相同的分區.因此在操作table的流迭代中需要對流重新分區.

withdrawals_topic = app.topic('withdrawals', value_type=Withdrawal)
country_to_total = app.Table(
'country_to_total', default=int).tumbling(10.0, expires=10.0)
withdrawals_stream = app.topic('withdrawals', value_type=Withdrawal).stream()
withdrawals_by_country = withdrawals_stream.group_by(Withdrawal.country)
@app.agent
async def process_withdrawal(withdrawals):
  async for withdrawal in withdrawals.group_by(Withdrawal.country):
    country_to_total[withdrawal.country] += withdrawal.amount

如果要進行的計算分別以兩個不太的字段分組,則應使用兩個不同的agent, 分別groupby.

withdrawals_topic = app.topic('withdrawals', value_type=Withdrawal)
user_to_total = app.Table('user_to_total', default=int)
country_to_total = app.Table(
'country_to_total', default=int).tumbling(10.0, expires=10.0)
@app.agent(withdrawals_topic)
async def find_large_user_withdrawals(withdrawals):
  async for withdrawal in withdrawals:
    user_to_total[withdrawal.user] += withdrawal.amount
@app.agent(withdrawals_topic)
async def find_large_country_withdrawals(withdrawals):
  async for withdrawal in withdrawals.group_by(Withdrawal.country):
    country_to_total[withdrawal.country] += withdrawal.amount

windowing

window的定義

from datetime import timedelta
views = app.Table('views', default=int).tumbling(
timedelta(minutes=1),
expires=timedelta(hours=1),
)

可以定義window使用的時間,包括系統時間relativ_to_now(), 當前流的處理時間relative_to_current(),相對數據中的時間字段relative_to_field().

views = app.Table('views', default=int).tumbling(...).relative_to_stream()

事件亂序

windowed table可以正確處理亂序, 只要遲到的數據在table的過期時間內.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM