二、原因排查
2.1 SparkStreaming程序排查
2.2 Kafka數據驗證
2.3 查看OGG源碼
2.3.1 生成Kafka消息類
2.3.2 Kafka配置類
2.3.3 Kafka 消息發送類
2.3.4 Kafka 分區獲取方式
三、結論
一、現象
目前我們的數據是通過OGG->Kafka->Spark Streaming->HBase。由於之前我們發現HBase的列表put無法保證順序,因此改了程序,如果是在同一個SparkStreaming的批次里面對同一條數據進行操作,則寫入HBase的數據時間戳就非常相近,只會差幾毫秒,如果是不同批次則會差好幾秒。此為背景。
現在有一條數據,理應先刪除再插入,但是結果變成了先插入再刪除,結果如下
hbase(main):002:0> get 'XDGL_ACCT_PAYMENT_SCHEDULE','e5ad-***', {COLUMN=>'cf1:SQLTYPE',VERSIONS=>10}
COLUMN CELL
cf1:SQLTYPE timestamp=1498445308420, value=D
cf1:SQLTYPE timestamp=1498445301336, value=I
其中,兩條記錄的時間戳換算過來正好相差了7秒
2017-06-26 10:48:21 I
2017-06-26 10:48:28 D
很明顯這兩條數據並沒有在同一個批次得到處理,很明顯Spark獲取到數據的先后順序出了點問題。
二、原因排查
2.1 SparkStreaming程序排查
首先SparkStream接收到數據后根據數據的pos排序,然后再根據主鍵排序。從現象看,是SparkStreaming分了兩個批次才拿到,而SparkStreaming從Kafka拿數據也是順序拿的。那么出現問題的可能性就只有兩個:
1、OGG發給Kafka的數據順序是錯誤的。
2、OGG發給Kafka的數據順序是正確的,但是發到了不同的Kafka Partition。
2.2 Kafka數據驗證
為了驗證上面的兩個猜想,我把kafka的數據再次獲取出來進行分析。重點分析數據的partition、key、value。
得到的結果如下:
可以看到數據的同一個表數據寫到了不同的分區,可以看到OGG的同一分區下的數據順序是正確的。
正好說明2.1里面的第二個猜想。看來是OGG寫入的時候並沒有按照數據的表名寫入不同的分區。
在OGG 文檔
http://docs.oracle.com/goldengate/bd1221/gg-bd/GADBD/GUID-2561CA12-9BAC-454B-A2E3-2D36C5C60EE5.htm#GADBD449
中的 5.1.4 Kafka Handler Configuration 的屬性 gg.handler.kafkahandler.ProducerRecordClass 里面提到了,默認使用的是oracle.goldengate.handler.kafka.DefaultProducerRecord這個類對表名進行分區的。如果要自定義的話需要實現CreateProducerRecord這個接口
原話是 The unit of data in Kafka - a
ProducerRecord
holds the key field with the value representing the payload. This key is used for partitioning a Kafka Producer record that holds change capture data. By default, the fully qualified table name is used to partition the records. In order to change this key or behavior, theCreateProducerRecord
Kafka Handler Interface needs to be implemented and this property needs to be set to point to the fully qualified name of the customProducerRecord
class.
然而寫入kafka的結果卻不是這樣子的。這點讓人費解。看來我們需要查看OGG的源代碼。
2.3 查看OGG源碼
在OGG的安裝包里面有一個名叫ggjava/resources/lib/ggkafka-****.jar
的文件,我們將其導入一個工程之后就可以直接看到它的源代碼了。
2.3.1 生成Kafka消息類
我們直接查看oracle.goldengate.handler.kafka.DefaultProducerRecord
這個類
public class DefaultProducerRecord implements CreateProducerRecord {
public DefaultProducerRecord() {
}
public ProducerRecord createProducerRecord(String topicName, Tx transaction, Op operation, byte[] data, TxOpMode handlerMode) {
ProducerRecord pr;
if(handlerMode.isOperationMode()) {
pr = new ProducerRecord(topicName, operation.getTableName().getOriginalName().getBytes(), data);
} else {
pr = new ProducerRecord(topicName, (Object)null, data);
}
return pr;
}
}
這個類只返回一個ProducerRecord,這個是用於發送給Kafka的一條消息。我們先不管這個,繼續看他是如何寫給kafka的
2.3.2 Kafka配置類
首先是OGG與Kafka相關的配置類 oracle.goldengate.handler.kafka.impl.KafkaProperties
。這個類里面定義了一堆參數,我們只需要關心partitioner.class
這個參數,該參數用於定義寫入Kafka的時候獲取分區的類。很遺憾,這個類沒有該參數配置。
2.3.3 Kafka 消息發送類
這里有一個抽象類oracle.goldengate.handler.kafka.impl.AbstractKafkaProducer
,他有兩個子類,分別叫BlockingKafkaProducer
和NonBlockingKafkaProducer
(默認是NonBlockingKafkaProducer)
這兩個類都是直接將通過producer對象將record發送給了kafka。因此想要指導Kafka的分區信息還需要看Kafka是怎么獲取分區的。
2.3.4 Kafka 分區獲取方式
進入kafka的producer發送record的函數
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
return send(record, null);
}
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}
發送的方法在doSend里面,里面內容很多,請看我勾出來的這兩段
由於寫入的時候都沒有對Record指定分區,因此這段代碼的partition都為空。所以代碼總會執行到 this.partitioner.partition(record.topic(), record.key(), serializedKey,record.value(), serializedValue,cluster)
該函數是kafka的Partitioner這個抽象類里面的
由於2.3.2 Kafka配置類中沒有指定分區的class,因此只會使用Kafka默認的分區類org.apache.kafka.clients.producer.internals.DefaultPartitioner
private final AtomicInteger counter = new AtomicInteger(new Random().nextInt());
這里先是獲取了一個隨機值,然后再獲取了Kafka中對應topic的可用分區列表,然后根據分區數和隨機值進行取余得到分區數的值。
流程走到這里,我們基本可以得到一個結論。
- Kafka的record指定了分區,則會使用指定的分區寫入;否則進行下一個判斷;
- Kafka根據自己定義的partitioner接口進行分區,如果沒指定類,則使用默認的分區則進行下一個判斷;
- Kafka獲取record中的key進行分區,如果key不為空,則使用Hash分區,如果為空,基本上就是隨機分配分區了。
三、結論
事情到了這里,我們可以斷定,寫入分區錯亂的問題是因為gg.handler.kafkahandler.Mode
是事務模式,導致多條消息一次發送了,無法使用表名作為key,OGG就用了null作為key發送給了Kafka,最終Kafka拿到空值之后只能隨機發送給某個partition,所以才會出現這樣的問題。
最終,修改了ogg的操作模式之后可以看到,寫入的分區正常了。