開發者可以通過Processor接口來實現自己的自定義處理邏輯。接口提供了Process和Punctuate方法。
其中:Process方法用於處理接受到的消息
Punctuate方法指定時間間隔周期性的執行,用於處理周期數據:例如某些狀態值計算生成 新的流。
Processor接口還提供了init方法,init初始化方法可以將ProcessorContext轉存到Procesor實例中,以供Prounctute使用。
可以使用context的schedule方法實現punctute的周期性調用。
將修改后的數據轉存到下游處理節點:context.().forward
體檢當前處理節點的處理進度:context.commit.
代碼實例如下:
public class MyProcessor extends Processor {
private ProcessorContext context;
private KeyValueStore kvStore;
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.context = context;
this.context.schedule(1000);
this.kvStore = (KeyValueStore) context.getStateStore("Counts");
}
@Override
public void process(String dummy, String line) {
String[] words = line.toLowerCase().split(" ");
for (String word : words) {
Integer oldValue = this.kvStore.get(word);
if (oldValue == null) {
this.kvStore.put(word, 1);
} else {
this.kvStore.put(word, oldValue + 1);
}
}
}
@Override
public void punctuate(long timestamp) {
KeyValueIterator iter = this.kvStore.all();
while (iter.hasNext()) {
KeyValue entry = iter.next();
context.forward(entry.key, entry.value.toString());
}
iter.close();
context.commit();
}
@Override
public void close() {
this.kvStore.close();
}
};
在上邊的代碼中:
1、 init方法,定義了每秒調用punctuate方法,將名稱為count的狀態存儲結構中轉存到奔processor處理節點中。
2、 在process方法中,每接受到一條消息,將字符串進行拆分,並更新到狀態存儲中,生成新的流。
3、 在puncuate方法中,迭代本地狀態存儲並將流提交到下個處理節點進行處理。
1.1 Processor Topology(處理器拓撲)
通過Processor API定義的自定義的處理器,開發人員將使用TopologyBuilder通過連接這些處理器共同構建一個處理器拓撲。(類似於主方法)
首先,所有的源節點命名為“SOURCE”並使用addSource方法添加到拓撲中,主題“src-topic”來提供記錄(消息)。
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("SOURCE", "src-topic")
.addProcessor("PROCESS1", MyProcessor1::new /* the ProcessorSupplier that can generate MyProcessor1 */, "SOURCE")
.addProcessor("PROCESS2", MyProcessor2::new /* the ProcessorSupplier that can generate MyProcessor2 */, "PROCESS1")
.addProcessor("PROCESS3", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")
.addSink("SINK1", "sink-topic1", "PROCESS1")
.addSink("SINK2", "sink-topic2", "PROCESS2")
.addSink("SINK3", "sink-topic3", "PROCESS3");
3個processor節點,使用addProcessor方法添加;這里的第一個processor是”SOURCE”節點的子節點,但是其他兩個處理器的父類。
最后,使用addSink方法將3個sink節點添加到完整的拓撲中。每個管道從不同父類處理器節點輸出到不同的topic。
1.2 本地狀態存儲
請注意,Processor API不僅限於當有消息到達時候調用process()方法,也可以保存記錄到本地狀態倉庫(如匯總或窗口連接)。利用這個特性,開發者可以使用StateStore接口定義一個狀態倉庫(Kafka Streams庫也有一些擴展的接口,如KeyValueStore)。在實際開發中,開發者通常不需要從頭開始自定義這樣的狀態倉庫,可以很簡單使用Stores工廠來設定狀態倉庫是持久化的或日志備份等。在下面的例子中,創建一個名為”Counts“的持久化的key-value倉庫,key類型String和value類型Long。
StateStoreSupplier countStore = Stores.create("Counts") .withKeys(Serdes.String()) .withValues(Serdes.Long()) .persistent() .build();
為了利用這些狀態倉庫,開發者可以在構建處理器拓撲時使用TopologyBuilder.addStateStore方法來創建本地狀態,並將它與需要訪問它的處理器節點相關聯,或者也可以通過
TopologyBuilder.connectProcessorAndStateStores將創建的狀態倉庫與現有的處理器節點連接。
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("SOURCE", "src-topic")
.addProcessor("PROCESS1", MyProcessor1::new, "SOURCE")
// create the in-memory state store "COUNTS" associated with processor "PROCESS1"
.addStateStore(Stores.create("COUNTS").withStringKeys().withStringValues().inMemory().build(), "PROCESS1")
.addProcessor("PROCESS2", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")
.addProcessor("PROCESS3", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")
// connect the state store "COUNTS" with processor "PROCESS2"
.connectProcessorAndStateStores("PROCESS2", "COUNTS");
.addSink("SINK1", "sink-topic1", "PROCESS1")
.addSink("SINK2", "sink-topic2", "PROCESS2")
.addSink("SINK3", "sink-topic3", "PROCESS3");
