使用DataStax Java驅動程序的最佳實踐


引言

如果您想開始建立自己的基於Cassandra的Java程序,歡迎!


也許您已經參加過我們精彩的DataStax Academy課程或開發者大會,又或者仔細閱讀過Cassandra Java驅動的文檔。相比示例程序,現在是時候步入真實世界、處理實際案例了。


那么問題來了:Java驅動提供了各種設置選項,在開始使用時我們需要知道哪些一般准則,以及從什么最佳實踐入手,從而輕松構建一個在生產環境中有復原力的、實時的、高性能應用呢?


這是個好問題!基於大量和您類似的Datastax客戶中所累積的經驗,本文檔將列出一些基本的准則。


在DataStax Java 驅動中有很多的旋鈕和拉桿,並且每一個都有其存在的理由。但是絕大部分的用戶不需要深入了解那些更深奧高階的功能,所以這篇文檔將會重點介紹大部分應用程序會遇到的一般情況。在這個過程中,我們會再詳細介紹提到的高級選項,但是他們確實不是一般情況。 現在就讓我們先關注一下主要情況吧。


在本文開始前,我們默認您已經對這些內容有所了解:客戶端與服務端架構的應用程序、Cassandra基礎,還有Datastax Java驅動的主要元素,例如Sessions(會話)、Statements(語句)、Results sets(結果集)、Rows(行)。附錄中提供了一些幫助您熟悉這些概念的資源。

 

最佳實踐

這一部分介紹了使用DataStax Java驅動創建一個Java程序的最佳實踐方法。部分最佳實踐的方法也可以運用到其它的開發語言中(及其關聯的DataStax驅動),但是在本文中我們將會專注於Java驅動。我們將最佳實踐的方法分為4組:總體指導、建立連接、查詢語句和查詢結果。

 

總體指導

  • 當今最佳的實踐方法是使用最新的DataStax Java驅動,即版本4.x。雖然3.x版本的驅動仍然可用,且在必要情況下依然是個不錯的選擇,本文會把重點放在4.x的驅動。

  • 使用您正在使用的Datastax Java驅動主要版本(比如4.x)中最新的細分版本(比如截止2020年8月,4.x中的4.9.0或3.x中的3.10.0)。

  • 多數據中心部署的情況下,我們建議一個應用實例僅與單獨一個數據中心掛鈎使用。換句話說,如果您要將您的應用部署到多個地區,您應該部署多個應用,每個地區分別對應一個應用。每一個應用都應將驅動程序連接到每一個當地的數據中心(如下圖所示),並且應使用Global Load Balancer(全局負載平衡器)在多個應用實例中引導流量。

    • 如果Cassandra在一個地區的數據中心無法訪問(如脫機以進行定期維護),Global Load Balancer會將流量導到其它數據區域中的應用程序實例。

  • 還有一個最佳的實踐方法是使用與數據庫查詢有關的指標來對應用程序進行監測。這個會非常有助於測試應用程序的查詢性能。更多關於驅動程序監測的指標請參考這里。在默認情況下所有的指標收集功能都是處於禁用狀態。如果您需要它們,您需要在建立連接的時候啟用。詳見下面信息。

    • 如果一個監測指標處於啟用狀態,您可以通過Session#getMetrics()方法獲得指標內容。

 

建立與數據庫的連接

用於建立與Cassandra的連接的CqlSession對象,可以通過很多不同的方式進行配置,包括通過配置文件或者編程方式建立連接——我們建議使用配置文件。當我們與Cassandra創建連接時,以下是一些最重要的考慮因素:

  • 請使用單獨的CqlSession:一個應用程序只使用一個單獨的CqlSession對象來連接到數據庫。在一些更復雜的情況下,有可能會有多個Class,每一個Class 都需要連接到一個CqlSession。在這種情況下,我們希望它們能使用同一個CqlSession,因此最好將其創建為單例模式。一個比較常見的方法是使用依賴注入框架,例如Spring。

  • 在application.conf里設定選項:將所有非默認的選項在jar包里的application.conf文件中進行定義。這個設置文件根據類型安全配置框架(Typesafe Config framework)設置參數。reference.conf文件包括了所有的默認值,同時由於application.conf是基於它衍生出來的,因此您只需在applicaiton.conf里顯式地指出任何想要復寫的值。

  • 遵循您安全性的最佳做法,同時使用適當的身份驗證和SSL選項。更多關於身份驗證SSL加密內容請參照DataStax Java驅動的文檔。

  • 在創建連接的時使用多個接觸點(Contact Points)。這樣的話,您的應用即使在單個(或多個節點)脫機時仍可與數據庫建立連接。特意選擇(或避免)種子節點並沒有任何益處。

datastax-java-driver.basic.contact-points = ["127.0.0.1:9042","localhost:9042"]
    • 只提供一個數據中心的接觸點,這個數據中心會被設為本地的數據中心(如下所示)。

    • 沒必要將一個數據中心所有的節點都設置為接觸點。當驅動程序建立初始連接后,它會發現集群中所有剩下的節點,並通過負載平衡策略與這些節點建立直接的連接。

    • 當創建CqlSession時,您可以通過編程的方式設置連接點,這會復寫application.conf中的參數。

CqlSession.builder().addContactPoint(new InetSocketAddress("127.0.0.1", 9042));
  • 使用默認的負載平衡策略:當建立連接時,使用默認參數DefaultLoadBalancingPolicy。此負載平衡策略會更有效且相對平衡地調用節點進行查詢。

  • 當建立連接時,顯式地指定使用本地的數據中心。
datastax-java-driver.basic.load-balancing-policy.local-datacenter = dc1
    • 當創建CqlSession時,您也可以通過編程的方式設置本地數據中心,這會復寫application.conf中的參數。

CqlSession.builder().withLocalDatacenter("dc1")
  • 顯式地將一致性級別設定為LOCAL_QUORUM並將默認串行一致性設為LOCAL_SERIAL。不然,默認的一致性是LOCAL_ONE,默認串行一致性級別為SERIAL,這些通常都不建議使用。

datastax-java-driver.basic.request.consistency = LOCAL_QUORUM
datastax-java-driver.basic.request.serial-consistency = LOCAL_SERIAL

 

  • 不要將默認查詢冪等性設置為true。其默認值為false,請保留它。設置為true很危險,因為一些操作是通過冪等查詢自動完成的,但事實上不是所有操作都是冪等的。因此,請針對每次查詢顯式明確地設置冪等查詢操作。

  • 請在advanced.metrics中啟用合適的指標,從而啟用指標收集功能。因為沒有一個“啟用所有指標”的選項,您必須明確指出您需要啟動的每一個指標。

    • advanced.metrics.session指定會話級(session-level)指標。下面列出了一些您可能感興趣的advanced.metrics.session.enabled中的指標:

advanced.metrics.session.enabled = [bytes-sent, bytes-received, connected-nodes, cql-requests, cql-client-timeouts, cql-prepared-cache-size, throttling.delay, throttling.queue-size, throttling.errors]

 

    • advanced.metrics.node 指定了節點級(node-level)指標,下面列出了一些您可能感興趣advanced.metrics.node.enabled:中的指標:

advanced.metrics.node.enabled = [pool.open-connections, pool.available-streams, pool.in-flight, pool.orphaned-streams, bytes-sent, bytes-received, cql-messages, errors.request.unsent, errors.request.aborted, errors.request.write-timeouts, errors.request.read-timeouts, errors.request.unavailables, errors.request.others, retries.total, retries.aborted, retries.read-timeout, retries.write-timeout, retries.unavailable, retries.other, ignores.total, ignores.aborted, ignores.read-timeout, ignores.write-timeout, ignores.unavailable, ignores.other, errors.connection.init, errors.connection.auth]

 

 

執行查詢

Cassandra的查詢會先創建一個Statement對象,再通過CqlSession對象執行。以下有很多種Statement種類:

  • 簡易語句(SimpleStatement): 由CQL字符串或查詢生成器(Query Builder)創建。

  • 參數化查詢語句(PreparedStatement): 可以在構建一次后被重復使用多次的語句,對於有不同參數的常見查詢而言,此種語句具有實用、高性能且更安全的優勢。

  • 綁定語句(BoundStatement): 用於對PrepareStatement查詢的單次調用,並允許用戶綁定只適用於此次調用的參數。

  • 批處理語句(BatchStatement): 封裝多個簡易語句或者綁定語句,並批量執行。

 

當執行查詢語句時,這里有一些需要注意的重要事項:

  • 請注意在DataStax Java驅動程序4.x的版本中,所有的Statements都是不可變的(immutable)。因此如果想要重新設置Statement的選項的話,您必須為這個值重新分配一個引用。

// 不正確 - bound2 的值並沒有改變。
bound2.setConsistencyLevel(ConsistencyLevel.LOCAL_ONE);
// 正確 - bound2 的值發生了改變。
bound2 = bound2.setConsistencyLevel(ConsistencyLevel.LOCAL_ONE);

 

  • 避免以編程方式發出DDL語句,除非可以確保它們不會被應用程序中的多個實例並發執行。 
    • 當集群中的節點對模式(Schema)產生分歧時,可能會因Cassandra分布式的特性出現問題。

    • 使用DDL語句時,應確保模式協議(schema agreement)達成一致。您可以通過調用Session#getExecutionInfo()方法獲取ExecutionInfo, 之后調用ExecutionInfo#isSchemaInAgreement()方法。如果返回True,那您就可以確保所有節點達成一致。

    • 您也可以在任何時候通過調用Session#checkSchemaAgreement()的方法判斷所有節點的模式是否一致。就好比,如果上述的方法返回了False,則可以進入一個循環直到Session#checkSchemaAgreement()返回為真(在發生異常之前會有一些超時響應或者計數器)。

  • 不要將CQL作為字符串執行(如通過CqlSession#execute(String)),而是為CQL字符串創建一個Simple Statement以便設置Statement的選項,如一致性級別或者冪等性。

SimpleStatement stmt1 = SimpleStatement.newInstance("INSERT INTO "+ks+"."+tbl+"(pkey,x) VALUES (1,2)");
stmt1 = stmt1.setIdempotent(true);

 

  • Java驅動程序的 QueryBuilder(查詢生成器)是另一個很棒的通過編程創建CQL查詢語句的方式。 這也比與用編程方式構建CQL字符串想法的動態CQL更可取。

SimpleStatement read = QueryBuilder.selectFrom(ks, tbl)
                .columns("pkey", "x").build();

 

  • 通過CqlSession#prepare()准備所有可以重復使用的語句。

SimpleStatement stmt3 = SimpleStatement.newInstance("INSERT INTO "+ks+"."+tbl+"(pkey,x) VALUES (:pkey,:x)");
PreparedStatement prepared2 = session.prepare(stmt3);
BoundStatement bound2 = prepared2.bind();
bound2 = bound2.setInt("pkey", 100);
bound2 = bound2.setInt("x", 200);

 

    • 您也可以通過按位置而不是名字來綁定變量,但是當被執行的查詢語句發生變化,使用位置綁定變量會更容易出錯,因為這就意味着您需要更注意綁定變量的位置。

SimpleStatement stmt2 = SimpleStatement.newInstance("INSERT INTO "+ks+"."+tbl+"(pkey,x) VALUES (?,?)");
PreparedStatement prepared =  session.prepare(stmt2);
BoundStatement bound1 = prepared.bind();
bound1 = bound1.setInt(0, 10);
bound1 = bound1.setInt(1, 20);

 

    • 當值綁定到准備好的語句時,請使用“unset”(未設置)選項來避免任何插入NULL值的可能性。或者,您可以僅將不為空的值與變量進行綁定,因為未綁定的綁定標記(bind marker)同樣會導致該列被設定為“unset”。

BoundStatement bound2 = prepared2.bind();
bound2 = bound2.setInt("pkey", 100);
bound2 = bound2.setInt("x", 200);
bound2 = bound2.unset(1); // 為了展示需要
  • 如果一個CQL查詢確實是冪等的,請通過Statement#setIdempotent()來設置冪等的選項。這是為了能夠自動重復執行陳述語句(如:自動重試或者推測執行)。作為安全預防措施,驅動程序不會自動重新處理任何冪等設置為false(否)的語句。

bound2 = bound2.setIdempotent(true);

 

  • 謹慎使用Batch Statement(批處理語句),因為Cassandra集群的協調節點(coordinator)負責在內部處理這些查詢,發出多個查詢會增加協調節點的負荷。

    • 批處理語句的數量應該保持相對較少。 如小於20條語句。

    • 應只在用例需要時才使用logged batches(記入日志的批處理語句), 因為它們會占用更多資源。

    • 更多細節請查詢這里

BatchStatementBuilder batchBuilder = BatchStatement.builder(BatchType.UNLOGGED);
BoundStatement bound3 = prepared.bind(1000,2000);
BoundStatement bound4 = prepared.bind(10000,20000);
batchBuilder.addStatement(bound3);
batchBuilder.addStatement(bound4);
BatchStatement batch = batchBuilder.build();
  • 我們應該避免Light weight transactions(輕量級事務),因為它們的性能相較於一般操作會慢很多。一般情況下,會有其它可以避免Light weight transactions的方式來建構應用程序。總之,輕量級事務的存在是有原因的,但請合理使用。

    • 輕量級事務和一般事務的查詢語句一樣。

    • 請確保將默認的SERIAL的串行一致性級別復寫為LOCAL_SERIAL。

 

處理查詢和結果

 

根據查詢被執行的方式,查詢結果會通過以下幾種方式被返回:

  • Synchronous execution(同步執行):返回一個可以被同步處理的ResultSet

ResultSet resultSet = session.execute(read);

 

  • Asynchronous execution(異步執行):返回一個可以通過CompletionStage API 異步處理查詢的CompletionStage<AsyncResultSet>。

CompletionStage<AsyncResultSet> asyncResult = session.executeAsync(bound1);

 

  • Reactive execution(響應式執行):返回一個ReactiveResultSet,它可以通過Reactive Labraries (響應庫)以響應式的方法處理結果。

ReactiveResultSet reactiveResult = session.executeReactive(bound2);

最常見的處理方式排列序可能為Synchronous > Reactive > Asynchronous.

 

以下是一些處理查詢結果的最佳實踐方法:

  • 養成在CQL查詢中指定所有需要被返回的列的習慣。比如但凡可以請避免使用 "SELECT * FROM …"這樣的查詢語句。如果表的結構將來發生變化的話,這樣做會有幫助。

  • 像其它數據庫一樣,請注意像“SELECT * FROM ks.tbl”這樣的簡單語句會返回數據庫中的所有數據。Cassandra是為儲存海量數據而設計的,這種請求很可能會返回一個含有巨量數據的結果。盡管有時候確實需要進行全表掃描,但也請盡量避免這種情況。

    • 如果確實需要全表掃描,利用好的分布式計算方法,或者像Spark這樣的分布式計算框架是一個不錯的選擇。DataStax為此開發了Spark Cassandra Connector (Cassandra Spark連接器),實現了數據掃描的最佳實踐。

  • 當處理多個語句時,利用異步執行(asynchronous execution)同時處理多條查詢。

    • 當加載數據時,這個方式可以大量縮短處理時間。

    • 當從多張表格獲取數據時,這可以減少總體延遲。

    • 確保所有異步操作已被處理並成功執行。

    • 更多詳情請參考這里。 

  • 對於INSERT、UPDATE或DELETE的操作,請確保捕獲異常以確保操作成功。

    • 如果一個查詢因為查詢超時失敗,請嘗試重新嘗試查詢,因為Cassandra分布式的特性應該可以成功。如果一個查詢超時,它會拋出一個異常(如 QueryExecutionException),您可以使用try-catch結構捕獲這個異常。

  • 一般情況下,使用驅動程序默認的分頁方法,這種方法在一頁讀取完畢時將自動獲取下一頁。

    • 響應式處理也可以自動處理分頁功能。

    • 通過CompletionStage API使用自動分頁。而使用AsynchronousResultSet這種手動處理API則需要手動分頁。

  • 避免調用 ResultSet#all(),因為這會將所有結果導入一個在內存中的列表(List),這樣會影響性能或產生內存錯誤

    • 與之相對,可以利用the ResultSet#iterator()方法迭代取出返回結果的每一行。

for (Row row : resultSet) {
    System.out.println("pkey: " + row.getInt("pkey"));
}

 

 

總結 

本文包含了許多適應大多數用例且最常見的最佳實踐。但是在某些情況下,使用其它不同的設置或方法也許確實是得當的,所以這篇文章應該只作為一個起點。我們建議可以從這些設置和實踐中入手,開始建立一個可用的應用程序。如果您發現應用程序沒有如預期一樣運行,可以考慮參考DataStax Java驅動程序文章(或者其它資源)來微調您的設置。


有了這些最佳實踐作為您的起點,您正走在通往順利搭建以Cassandra作為后台的應用程序的光明大道上。請享受這個過程吧!

 

附件 1 - 有用的鏈接

 

附件 2 - 代碼案例

以下代碼也可以在以下Github repository鏈接。 https://github.com/DataStax-Examples/ex_bestpractices

 

附件 2.1 - application.conf

datastax-java-driver {
 # Contact Points
 basic.contact-points = ["127.0.0.1:9042","localhost:9042"]

 # Local Data Center
 basic.load-balancing-policy.local-datacenter = dc1

 # Default Consistency Level
 basic.request.consistency = LOCAL_QUORUM
 basic.request.serial-consistency = LOCAL_SERIAL

 # Metrics
 advanced.metrics {
   # The session-level metrics
   session {
     enabled = [
       bytes-sent, bytes-received,
       connected-nodes,
       cql-requests, cql-client-timeouts, cql-prepared-cache-size,
       throttling.delay, throttling.queue-size, throttling.errors,
     ]
   }
   # The node-level metrics.
   node {
     enabled = [
       pool.open-connections, pool.available-streams, pool.in-flight, pool.orphaned-streams,
       bytes-sent, bytes-received,
       cql-messages,
       errors.request.unsent, errors.request.aborted, errors.request.write-timeouts, errors.request.read-timeouts, errors.request.unavailables, errors.request.others,
       retries.total, retries.aborted, retries.read-timeout, retries.write-timeout, retries.unavailable, retries.other,
       ignores.total, ignores.aborted, ignores.read-timeout, ignores.write-timeout, ignores.unavailable, ignores.other,
       errors.connection.init, errors.connection.auth
     ]
   }
 }
}

 

附件 2.2 - SampleApplication.java

package com.datastax.example.bestpractices;

import com.datastax.dse.driver.api.core.cql.reactive.ReactiveResultSet;
import com.datastax.oss.driver.api.core.AllNodesFailedException;
import com.datastax.oss.driver.api.core.ConsistencyLevel;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.*;
import com.datastax.oss.driver.api.core.servererrors.QueryExecutionException;
import com.datastax.oss.driver.api.core.servererrors.QueryValidationException;
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
import reactor.core.publisher.Flux;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.jmx.JmxReporter;

import java.net.InetSocketAddress;
import java.util.concurrent.CompletionStage;

public class SampleApplication {
   public static void main(String[] args) {
       final String dc = "dc1";
       final String ks = "ks";
       final String tbl = "tbl";
       final int ddlRetries = 3;
       final int ddlRetrySleepMs = 500;
       // Create CqlSession
       CqlSession session = CqlSession.builder()
//                .withAuthCredentials("cass_user", "choose_a_better_password")  // redundant with application.conf, but showing for demonstration
//                .addContactPoint(new InetSocketAddress("127.0.0.1", 9042))     // redundant with application.conf, but showing for demonstration
               .withLocalDatacenter("dc1")
               .build();

       // Set up driver metrics and expose via JMX (as an example)
       MetricRegistry registry = session.getMetrics()
               .orElseThrow(() -> new IllegalStateException("Metrics are disabled"))
               .getRegistry();

       JmxReporter reporter =
               JmxReporter.forRegistry(registry)
                       .inDomain("com.datastax.oss.driver")
                       .build();
       reporter.start();

       // Create Keyspace
       SimpleStatement createKeyspace = SimpleStatement.newInstance("CREATE KEYSPACE IF NOT EXISTS "+ks
               +" WITH replication = {'class': 'NetworkTopologyStrategy', '" + dc + "': '1'}");
       // Synchronous
       ResultSet ddl = null;
       try {
           ddl = session.execute(createKeyspace);
       }
       catch (Exception e) {
           throw new RuntimeException("Error creating keyspace");
       }
       if (!ddl.getExecutionInfo().isSchemaInAgreement()) {
           int retries = 0;
           while ((retries < ddlRetries) && session.checkSchemaAgreement()) {
               try {
                   Thread.sleep(ddlRetrySleepMs);
               }
               catch (InterruptedException ie) {
                   throw new RuntimeException("Interrupted while waiting for schema agreement");
               }
               retries++;
           }
       }

       // Create Table (using helper method)
       SimpleStatement createTable = SimpleStatement.newInstance("CREATE TABLE IF NOT EXISTS "+ks+"."+tbl
               +"(pkey INT, x INT, PRIMARY KEY ((pkey)))");
       if (!executeDdl(session, createTable, ddlRetries, ddlRetrySleepMs)) {
           System.err.println("Error creating table");
           System.exit(1);
       }

       // Insert some data
       // Simple
       SimpleStatement stmt1 = SimpleStatement.newInstance("INSERT INTO "+ks+"."+tbl+"(pkey,x) VALUES (1,2)");
       stmt1 = stmt1.setIdempotent(true);
       try {
           session.execute(stmt1);
       }
       catch (QueryExecutionException | QueryValidationException | AllNodesFailedException ex) {
           // Handle query failure
           throw new RuntimeException("Error inserting first data");
       }

       // Prepared
       // Using positional bind markers
       SimpleStatement stmt2 = SimpleStatement.newInstance("INSERT INTO "+ks+"."+tbl+"(pkey,x) VALUES (?,?)");
       PreparedStatement prepared =  session.prepare(stmt2);
       BoundStatement bound1 = prepared.bind();
       bound1 = bound1.setInt(0, 10);
       bound1 = bound1.setInt(1, 20);
       // Async
       try {
           CompletionStage<AsyncResultSet> asyncResult = session.executeAsync(bound1);
           asyncResult.toCompletableFuture().get();
       }
       catch (Exception e) {
           // process exception
       }

       // Reactive
       // Using named bind markers instead of positional
       SimpleStatement stmt3 = SimpleStatement.newInstance("INSERT INTO "+ks+"."+tbl+"(pkey,x) VALUES (:pkey,:x)");
       PreparedStatement prepared2 = session.prepare(stmt3);
       BoundStatement bound2 = prepared2.bind();
       bound2 = bound2.setInt("pkey", 100);
       bound2 = bound2.setInt("x", 200);
       bound2 = bound2.unset(1); // Showing for demonstration
       bound2 = bound2.setIdempotent(true); // set idempotency
       bound2 = bound2.setConsistencyLevel(ConsistencyLevel.LOCAL_ONE);
       ReactiveResultSet reactiveResult = session.executeReactive(bound2);
       Flux.from(reactiveResult).blockLast();

       // Batch
       BatchStatementBuilder batchBuilder = BatchStatement.builder(BatchType.UNLOGGED);
       BoundStatement bound3 = prepared.bind(1000,2000);
       BoundStatement bound4 = prepared2.bind().setInt("pkey",10000).setInt("x", 20000);
       batchBuilder.addStatement(bound3);
       batchBuilder.addStatement(bound4);
       BatchStatement batch = batchBuilder.build();
       try {
           session.execute(batch);
       }
       catch (QueryExecutionException qee) {
           // Handle query timeout - let's retry it
           try {
               session.execute(batch);
           }
           catch (QueryExecutionException | QueryValidationException | AllNodesFailedException ex) {
               // Handle query failure
               throw new RuntimeException("Error second try inserting data");
           }

       }
       catch (QueryValidationException | AllNodesFailedException ex) {
           // Handle query failure
           throw new RuntimeException("Error inserting data");
       }


       // Query Builder
       SimpleStatement read = QueryBuilder.selectFrom(ks, tbl)
               .columns("pkey", "x").build();
       // Using a helper method to execute DML
       ResultSet resultSet = executeDml(session, read, "Error reading data");
       if (null != resultSet) {
           for (Row row : resultSet) {
               System.out.println("pkey: " + row.getInt("pkey") + ", x: " + row.getInt("x"));
           }
       }

       // Cleanup
       session.close();
   }

   private static boolean executeDdl(CqlSession session, Statement ddlStatement, int ddlRetries, int ddlRetrySleepMs) {
       ResultSet ddl = null;
       try {
           ddl = session.execute(ddlStatement);
       }
       catch (Exception e) {
           throw new RuntimeException("Exception while executing DDL (" + ddlStatement + ")");
       }
       if (!ddl.getExecutionInfo().isSchemaInAgreement()) {
           int retries = 0;
           while ((retries < ddlRetries) && session.checkSchemaAgreement()) {
               try {
                   Thread.sleep(ddlRetrySleepMs);
               }
               catch (InterruptedException ie) {
                   throw new RuntimeException("Interrupted while waiting for schema agreement");
               }
               retries++;
           }
       }
       return true;
   }

   private static ResultSet executeDml(CqlSession session, Statement query, String errorString) {
       ResultSet resultSet = null;
       try {
           resultSet = session.execute(query);
       }
       catch (QueryExecutionException | QueryValidationException | AllNodesFailedException ex) {
           // Handle query failure
           throw new RuntimeException(errorString);
       }
       return resultSet;
   }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM