spark2.4.3+kudu1.9
1 批量讀
val df = spark.read.format("kudu") .options(Map("kudu.master" -> "master:7051", "kudu.table" -> "impala::test_db.test_table")) .load df.createOrReplaceTempView("tmp_table") spark.sql("select * from tmp_table limit 10").show()
2 批量寫
import org.apache.kudu.spark.kudu.{KuduContext, KuduWriteOptions} val kuduMaster = "master:7051" val table = "impala::test_db.test_table" val kuduContext = new KuduContext(kuduMaster, sc) kuduContext.upsertRows(df, table, new KuduWriteOptions(false, true))
3 單個讀/條件讀
cd $SPARK_HOME bin/spark-shell --packages org.apache.kudu:kudu-spark2_2.11:1.9.0 import org.apache.kudu.client.{KuduPredicate, RowResult} import org.apache.kudu.spark.kudu.KuduContext val kuduMaster = "master:7051" val table = "impala::test_db.test_table" val kuduContext = new KuduContext(kuduMaster, sc) val table = kuduContext.syncClient.openTable(table) val predicate = KuduPredicate.newComparisonPredicate(table.getSchema().getColumn("id"),KuduPredicate.ComparisonOp.EQUAL, "testid") val scanner = kuduContext.syncClient.newScannerBuilder(table).addPredicate(predicate).build() scanner.hasMoreRows val rows = scanner.nextRows rows.hasNext val row = rows.next println(row.getString(0))
4 單個寫
cd $SPARK_HOME bin/spark-shell --packages org.apache.kudu:kudu-spark2_2.11:1.9.0 import org.apache.kudu.client.{KuduPredicate, RowResult} import org.apache.kudu.spark.kudu.KuduContext import org.apache.kudu.client.SessionConfiguration val kuduMaster = "172.26.192.219:7051" val kuduContext = new KuduContext(kuduMaster, sc) val kuduClient = kuduContext.syncClient val kuduTable = kuduClient.openTable("impala::dataone_xishaoye.tbl_order_union") val kuduSession = kuduClient.newSession() //AUTO_FLUSH_BACKGROUND AUTO_FLUSH_SYNC MANUAL_FLUSH kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC) kuduSession.setMutationBufferSpace(1000) val insert = kuduTable.newInsert() val row = insert.getRow() row.addString(0, "hello") kuduSession.apply(insert) //kuduSession.flush
其他:newInsert/newUpdate/newDelete/newUpsert
5 錯誤定位
如果apply之后發現修改沒有生效,並且確認已經提交,可能有報錯(不會拋異常),需要從OperationResponse中打印錯誤信息
val opResponse = session.apply(op) if (opResponse != null && opResponse.hasRowError) println(opResponse.getRowError.toString)
注意一定要使用FlushMode.AUTO_FLUSH_SYNC,詳見源代碼
org.apache.kudu.client.KuduSession
public OperationResponse apply(Operation operation) throws KuduException { while(true) { try { Deferred<OperationResponse> d = this.session.apply(operation); if(this.getFlushMode() == FlushMode.AUTO_FLUSH_SYNC) { return (OperationResponse)d.join(); } return null; } catch (PleaseThrottleException var5) { PleaseThrottleException ex = var5; try { ex.getDeferred().join(); } catch (Exception var4) { LOG.error("Previous batch had this exception", var4); } } catch (Exception var6) { throw KuduException.transformException(var6); } } }
參考:
https://kudu.apache.org/docs/developing.html
