KafkaStream低級別API


開發者可以通過Processor接口來實現自己的自定義處理邏輯。接口提供了Process和Punctuate方法。

其中:Process方法用於處理接受到的消息

Punctuate方法指定時間間隔周期性的執行,用於處理周期數據:例如某些狀態值計算生成 新的流。

Processor接口還提供了init方法,init初始化方法可以將ProcessorContext轉存到Procesor實例中,以供Prounctute使用。

可以使用context的schedule方法實現punctute的周期性調用。

將修改后的數據轉存到下游處理節點:context.().forward

體檢當前處理節點的處理進度:context.commit.

代碼實例如下:

public class MyProcessor extends Processor {

        private ProcessorContext context;

        private KeyValueStore kvStore;
        @Override
        @SuppressWarnings("unchecked")
        public void init(ProcessorContext context) {
            this.context = context;
            this.context.schedule(1000);
            this.kvStore = (KeyValueStore) context.getStateStore("Counts");
        }
        @Override
        public void process(String dummy, String line) {
            String[] words = line.toLowerCase().split(" ");
            for (String word : words) {
                Integer oldValue = this.kvStore.get(word);
                if (oldValue == null) {
                    this.kvStore.put(word, 1);
                } else {
                    this.kvStore.put(word, oldValue + 1);
                }
            }
        }
        @Override
        public void punctuate(long timestamp) {
            KeyValueIterator iter = this.kvStore.all();
            while (iter.hasNext()) {
                KeyValue entry = iter.next();
                context.forward(entry.key, entry.value.toString());
            }
            iter.close();
            context.commit();
        }
        @Override
        public void close() {
            this.kvStore.close();
        }
    };

  

在上邊的代碼中:

1、 init方法,定義了每秒調用punctuate方法,將名稱為count的狀態存儲結構中轉存到奔processor處理節點中。

2、 在process方法中,每接受到一條消息,將字符串進行拆分,並更新到狀態存儲中,生成新的流。

3、 在puncuate方法中,迭代本地狀態存儲並將流提交到下個處理節點進行處理。

1.1    Processor Topology(處理器拓撲)

 

通過Processor API定義的自定義的處理器,開發人員將使用TopologyBuilder通過連接這些處理器共同構建一個處理器拓撲。(類似於主方法)

首先,所有的源節點命名為“SOURCE”並使用addSource方法添加到拓撲中,主題“src-topic”來提供記錄(消息)。

TopologyBuilder builder = new TopologyBuilder();
builder.addSource("SOURCE", "src-topic")
.addProcessor("PROCESS1", MyProcessor1::new /* the ProcessorSupplier that can generate MyProcessor1 */, "SOURCE")
.addProcessor("PROCESS2", MyProcessor2::new /* the ProcessorSupplier that can generate MyProcessor2 */, "PROCESS1")
.addProcessor("PROCESS3", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")
.addSink("SINK1", "sink-topic1", "PROCESS1")
.addSink("SINK2", "sink-topic2", "PROCESS2")
.addSink("SINK3", "sink-topic3", "PROCESS3");

3個processor節點,使用addProcessor方法添加;這里的第一個processor是”SOURCE”節點的子節點,但是其他兩個處理器的父類。

最后,使用addSink方法將3個sink節點添加到完整的拓撲中。每個管道從不同父類處理器節點輸出到不同的topic。

1.2    本地狀態存儲

 請注意,Processor API不僅限於當有消息到達時候調用process()方法,也可以保存記錄到本地狀態倉庫(如匯總或窗口連接)。利用這個特性,開發者可以使用StateStore接口定義一個狀態倉庫(Kafka Streams庫也有一些擴展的接口,如KeyValueStore)。在實際開發中,開發者通常不需要從頭開始自定義這樣的狀態倉庫,可以很簡單使用Stores工廠來設定狀態倉庫是持久化的或日志備份等。在下面的例子中,創建一個名為”Counts“的持久化的key-value倉庫,key類型String和value類型Long。

 

StateStoreSupplier countStore = Stores.create("Counts")
    .withKeys(Serdes.String())
    .withValues(Serdes.Long())
    .persistent()
    .build();

 

為了利用這些狀態倉庫,開發者可以在構建處理器拓撲時使用TopologyBuilder.addStateStore方法來創建本地狀態,並將它與需要訪問它的處理器節點相關聯,或者也可以通過

TopologyBuilder.connectProcessorAndStateStores將創建的狀態倉庫與現有的處理器節點連接。
  TopologyBuilder builder = new TopologyBuilder();
    builder.addSource("SOURCE", "src-topic")
        .addProcessor("PROCESS1", MyProcessor1::new, "SOURCE")
        // create the in-memory state store "COUNTS" associated with processor "PROCESS1"
    .addStateStore(Stores.create("COUNTS").withStringKeys().withStringValues().inMemory().build(), "PROCESS1")
        .addProcessor("PROCESS2", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")
        .addProcessor("PROCESS3", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")
        // connect the state store "COUNTS" with processor "PROCESS2"
        .connectProcessorAndStateStores("PROCESS2", "COUNTS");
        .addSink("SINK1", "sink-topic1", "PROCESS1")
        .addSink("SINK2", "sink-topic2", "PROCESS2")
        .addSink("SINK3", "sink-topic3", "PROCESS3");

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM