spring data 操作 Cassandra 踩坑二三事


spring data 操作 Cassandra

添加依賴和配置

  • 添加依賴
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-cassandra</artifactId>
        </dependency>
  • 添加配置
spring.data.cassandra.keyspace-name=myKeySpace
spring.data.cassandra.contact-points=myContactPoints
spring.data.cassandra.port=9042
spring.data.cassandra.schema-action=NONE

定義key, data 和 建表

此處以counter表為例, 表示用戶在某一個類別下的點擊量。

@PrimaryKeyClass
public class UserCategoryKey implements Serializable {

    @PrimaryKeyColumn(name = "user_id", ordinal = 0, type = PrimaryKeyType.PARTITIONED)
    private String userId;

    @PrimaryKeyColumn(name = "category_id", ordinal = 1, type = PrimaryKeyType.CLUSTERED)
    private Integer categoryId;
}

@Table("user_category")
public class UserCategory {
    @PrimaryKey
    private UserCategoryKey key;

    @Column("click_cnt")
    private Long clickCnt;
}

創建表的schema:

-- Cannot set default_time_to_live on a table with counters
CREATE TABLE user_category
(
    user_id text,
    category_id int,
    click_cnt counter
    PRIMARY KEY ((user_id), category_id)
);

限制:

  • 非counter字段必須包含在主鍵中。

使用repository操作數據

@Repository
public interface UserCategoryRepository extends CassandraRepository<UserCategory, UserCategoryKey> {

    @Modifying
    @Transactional
    @Query(value = "update user_category set click_cnt = click_cnt + :cnt where " +
            "user_id = :#{#key.userId} and category_id = :#{#key.categoryId}")
    public void incClickCnt(@Param("key") UserCategoryKey key, @Param("cnt") Long cnt);
}

使用CassandraOperations 操作數據

CassandraOperations 可以直接注入:

    @Autowired
    protected CassandraOperations cassandraTemplate;
    /**
     * update user_category set click_cnt = click_cnt + 1 where user_id='user1' and category_id = 2001
     */
    public Boolean incClickCounter(UserCategoryKey key, Long cnt) {
        try {
	        //update table
            Update update = QueryBuilder.update("user_category");
            //set
            update.with(QueryBuilder.incr("click_cnt", cnt));
            //where conditions
            update.where(QueryBuilder.eq("user_id", key.getUserId()))
                    .and(QueryBuilder.eq("category_id", key.getCategoryId()));
            cassandraTemplate.getCqlOperations().execute(update);
            return true;
        } catch (Exception e) {
            LOG.error("incClickCounter exception, msg:{}", e.getMessage(), e);
        }
        return false;
    }

踩坑二三事

1. 只能對counter字段進行加減操作

如果對非counter字段進行加減操作,報錯如下:

Query; CQL [update user_category set click_cnt = click_cnt + ? where user_id = ? and category_id = ?]; Invalid operation (click_cnt = click_cnt + ?) for non counter column click_cnt; nested exception is com.datastax.driver.core.exceptions.InvalidQueryException: Invalid operation (click_cnt = click_cnt + ?) for non counter column click_cnt

2. 參數位置要求

對於 click_cnt = click_cnt + :cnt 如果參數放在前面,即click_cnt = :cnt + click_cnt. 如下語句 update user_category set click_cnt = :cnt + click_cnt where user_id = 'user1' and category_id = 1, 則報錯:

Query; CQL [update user_category set click_cnt = ? + click_cnt  where user_id = ? and category_id = ?]; Invalid operation (click_cnt = ? - click_cnt) for non list column click_cnt; nested exception is com.datastax.driver.core.exceptions.InvalidQueryException: Invalid operation (click_cnt = ? - click_cnt) for non list column click_cnt

4. counter操作的參數需為long型

如下,將方法中參數cnt定義為Integer:

    @Query(value = "update user_category set click_cnt = click_cnt + :cnt where user_id = 'user1' and category_id = 1")
    public void incClickCnt(@Param("cnt") Integer cnt);

報錯如下:

Query; CQL [update user_category set click_cnt = click_cnt + ? where user_id=? and category_id = ?]; Expected 8 or 0 byte long (4); nested exception is com.datastax.driver.core.exceptions.InvalidQueryException: Expected 8 or 0 byte long (4)

5. in語句不要加小括號

如下:

    @Query(value = "update user_category set click_cnt = click_cnt + 1 where user_id='user1' and category_id in :categoryIdList")
    public void bathUpdate(@Param("categoryIdList") List<Integer> categoryIdList);

若改為news_id in (:newsIdList), 則會報錯:

org.springframework.data.cassandra.CassandraInvalidQueryException: Query; CQL [update user_category set click_cnt=click_cnt+1 where user_id=? and category_id in (?)]; Expected 4 or 0 byte int (28); nested exception is com.datastax.driver.core.exceptions.InvalidQueryException: Expected 4 or 0 byte int (28 )

6. 關於CassandraOperations

如果在unit test 中直接注入CassandraOperations測試,會報JmxReporter相關的錯。但是如果在業務代碼中直接注入,不會有問題。如果需要在單測中使用CassandraOperations,可以手動build一個Cluster,並禁用 JmxReporter 和 metrics。

參考:

Cassandra 計數器counter類型和它的限制
Cassandra數據建模

最后:
沒有很多時間逐個搞清楚每個問題的深層次原因啦,記錄下踩的坑,以鑒他人~


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM