關於什么是cassandra,可以參考:
http://blog.csdn.net/zyz511919766/article/details/38683219
比較了HBASE、mongodb 和 cassandra
1)HBASE 和 cassandra 都是列式存儲,但是 cassandra部署方便,擴展容易
2) mongodb 並不是真正的列式存儲,數據擴容比較麻煩,需要提前做好集群分區
casandra是 p2p(gossip)實現的bigtable, 數據一致性可以通過參數配置(R+W >N), 寫操作完成是all node,還是指定的node個數,才進行返回。
數據模型:
嘗試了cassandra的兩個client。
1. "github.com/gocql/gocql"
2."github.com/hailocab/gocassa"
gocassa是在gocql上面的封裝,提供更方便的操作。
在用cassandra之前,建議先熟悉一下CQL,類似 SQL語句的語法。
作為一個client, 我們需要考慮的點:
1)連接池
2)批量操作
3)可能還會考慮同步操作(同時更新兩個table中的數據)
cassandra部署和使用都還算簡單,比較困難的是,要擺脫傳統的db設計范式思維,要根據后續的數據查詢來設計你的bigtable結構,方便將來的查詢。
貼上幾個相關的參考資料:
http://www.slideshare.net/yukim/cql3-in-depth (CQL相關介紹)
http://www.slideshare.net/jaykumarpatel/cassandra-data-modeling-best-practices
http://www.infoq.com/cn/articles/best-practices-cassandra-data-model-design-part2 (ebay的cassandra實踐)
然后,貼上兩個client使用示例:
package main import ( "fmt" "log" "time" "github.com/gocql/gocql" ) func main() { // connect to the cluster cluster := gocql.NewCluster("127.0.0.1") cluster.Keyspace = "demo" cluster.Consistency = gocql.Quorum //設置連接池的數量,默認是2個(針對每一個host,都建立起NumConns個連接) cluster.NumConns = 3 session, _ := cluster.CreateSession() time.Sleep(1 * time.Second) //Sleep so the fillPool can complete. fmt.Println(session.Pool.Size()) defer session.Close() //unlogged batch, 進行批量插入,最好是partition key 一致的情況 t := time.Now() batch := session.NewBatch(gocql.UnloggedBatch) for i := 0; i < 100; i++ { batch.Query(`INSERT INTO bigrow (rowname, iplist) VALUES (?,?)`, fmt.Sprintf("name_%d", i), fmt.Sprintf("ip_%d", i)) } if err := session.ExecuteBatch(batch); err != nil { fmt.Println("execute batch:", err) } bt := time.Now().Sub(t).Nanoseconds() t = time.Now() for i := 0; i < 100; i++ { session.Query(`INSERT INTO bigrow (rowname, iplist) VALUES (?,?)`, fmt.Sprintf("name_%d", i), fmt.Sprintf("ip_%d", i)) } nt := time.Now().Sub(t).Nanoseconds() t = time.Now() sbatch := session.NewBatch(gocql.UnloggedBatch) for i := 0; i < 100; i++ { sbatch.Query(`INSERT INTO bigrow (rowname, iplist) VALUES (?,?)`, "samerow", fmt.Sprintf("ip_%d", i)) } if err := session.ExecuteBatch(sbatch); err != nil { fmt.Println("execute batch:", err) } sbt := time.Now().Sub(t).Nanoseconds() fmt.Println("bt:", bt, "sbt:", sbt, "nt:", nt) //----------out put------------------ // ./rawtest // bt: 5795593 sbt: 3003774 nt: 261775 //------------------------------------ // insert a tweet if err := session.Query(`INSERT INTO tweet (timeline, id, text) VALUES (?, ?, ?)`, "me", gocql.TimeUUID(), "hello world").Exec(); err != nil { log.Fatal(err) } var id gocql.UUID var text string /* Search for a specific set of records whose 'timeline' column matches * the value 'me'. The secondary index that we created earlier will be * used for optimizing the search */ if err := session.Query(`SELECT id, text FROM tweet WHERE timeline = ? LIMIT 1`, "me").Consistency(gocql.One).Scan(&id, &text); err != nil { log.Fatal(err) } fmt.Println("Tweet:", id, text) // list all tweets iter := session.Query(`SELECT id, text FROM tweet WHERE timeline = ?`, "me").Iter() for iter.Scan(&id, &text) { fmt.Println("Tweet:", id, text) } if err := iter.Close(); err != nil { log.Fatal(err) } query := session.Query(`SELECT * FROM bigrow where rowname = ?`, "30") // query := session.Query(`SELECT * FROM bigrow `) var m map[string]interface{} m = make(map[string]interface{}, 10) err := query.Consistency(gocql.One).MapScan(m) if err != nil { log.Fatal(err) } fmt.Printf("%#v", m) }
package main import ( "fmt" "time" "github.com/hailocab/gocassa" ) // This test assumes that cassandra is running on default port locally and // that the keySpace called 'test' already exists. type Sale struct { Id string CustomerId string SellerId string Price int Created time.Time } func main() { keySpace, err := gocassa.ConnectToKeySpace("not_exist_demo", []string{"127.0.0.1"}, "", "") if err != nil { panic(err) } salesTable := keySpace.Table("sale", Sale{}, gocassa.Keys{ PartitionKeys: []string{"Id"}, }) // Create the table - we ignore error intentionally err = salesTable.Create() fmt.Println(err) // We insert the first record into our table - yay! err = salesTable.Set(Sale{ Id: "sale-1", CustomerId: "customer-1", SellerId: "seller-1", Price: 42, Created: time.Now(), }).Run() if err != nil { panic(err) } result := Sale{} if err := salesTable.Where(gocassa.Eq("Id", "sale-1")).ReadOne(&result).Run(); err != nil { panic(err) } fmt.Println(result) }
更多配置可參考:
https://github.com/gocql/gocql/blob/master/cluster.go#L57
// ClusterConfig is a struct to configure the default cluster implementation // of gocoql. It has a varity of attributes that can be used to modify the // behavior to fit the most common use cases. Applications that requre a // different setup must implement their own cluster. type ClusterConfig struct { Hosts []string // addresses for the initial connections CQLVersion string // CQL version (default: 3.0.0) ProtoVersion int // version of the native protocol (default: 2) Timeout time.Duration // connection timeout (default: 600ms) Port int // port (default: 9042) Keyspace string // initial keyspace (optional) NumConns int // number of connections per host (default: 2) NumStreams int // number of streams per connection (default: max per protocol, either 128 or 32768) Consistency Consistency // default consistency level (default: Quorum) Compressor Compressor // compression algorithm (default: nil) Authenticator Authenticator // authenticator (default: nil) RetryPolicy RetryPolicy // Default retry policy to use for queries (default: 0) SocketKeepalive time.Duration // The keepalive period to use, enabled if > 0 (default: 0) ConnPoolType NewPoolFunc // The function used to create the connection pool for the session (default: NewSimplePool) DiscoverHosts bool // If set, gocql will attempt to automatically discover other members of the Cassandra cluster (default: false) MaxPreparedStmts int // Sets the maximum cache size for prepared statements globally for gocql (default: 1000) MaxRoutingKeyInfo int // Sets the maximum cache size for query info about statements for each session (default: 1000) PageSize int // Default page size to use for created sessions (default: 5000) SerialConsistency SerialConsistency // Sets the consistency for the serial part of queries, values can be either SERIAL or LOCAL_SERIAL (default: unset) Discovery DiscoveryConfig SslOpts *SslOptions DefaultTimestamp bool // Sends a client side timestamp for all requests which overrides the timestamp at which it arrives at the server. (default: true, only enabled for protocol 3 and above) }
連接池,默認采用是NewSimplePool,這里用的是roud robin
源碼:https://github.com/gocql/gocql/blob/master/connectionpool.go#L454
ConnPoolType 是一個接口,可以自己實現接口來定制話自己的策略。
數據一致性策略,通過Consistency配置,默認是Quorum(大部分節點):
type Consistency uint16 const ( Any Consistency = 0x00 One Consistency = 0x01 Two Consistency = 0x02 Three Consistency = 0x03 Quorum Consistency = 0x04 All Consistency = 0x05 LocalQuorum Consistency = 0x06 EachQuorum Consistency = 0x07 LocalOne Consistency = 0x0A )