目錄
前言
安裝Cassandra
安裝Cassandra數據庫
- 官網下載Cassandra壓縮包
- 解壓,並配置環境變量:
操作 | 變量名 | 變量值 |
---|---|---|
新建 | CASSANDRA_HOME | 解壓路徑 |
增加 | PATH | 解壓路徑\bin; |
安裝Python的Cassandra依賴包
pip install cassandra
測試
C:\Users\wahaha>cassandra
Python操作Cassandra
新建keyspace和table
# -*- encoding: utf-8 -*-
# 引入Cluster模塊
from cassandra.cluster import Cluster
# 引入DCAwareRoundRobinPolicy模塊,可用來自定義驅動程序的行為
from cassandra.policies import DCAwareRoundRobinPolicy
# 默認本機數據庫集群(IP127.0.0.1).
cluster = Cluster()
# 連接並創建一個會話
session = cluster.connect()
# 創建KeySpace;使用第一個副本放置策略,即簡單策略;選擇復制因子為3個副本。
session.execute('CREATE KEYSPACE test WITH replication = {'class':'SimpleStrategy', 'replication_factor' : 3};')
# 選擇keyspace
session.execute('use test;')
# 創建table
session.execute('create table test.user(name text primary key, age int, email varchar);')
# 刪除table
# session.execute('drop table test.user;')
# 關閉連接
cluster.shutdown()
# 查看是否關閉連接
print(cluster.is_shutdown)
查詢keyspaces/tables/columns狀態
# -*- encoding: utf-8 -*-
# 引入Cluster模塊
from cassandra.cluster import Cluster
# 引入DCAwareRoundRobinPolicy模塊,可用來自定義驅動程序的行為
from cassandra.policies import DCAwareRoundRobinPolicy
# 默認本機數據庫集群(IP127.0.0.1).
cluster = Cluster()
# 連接並創建一個會話
session = cluster.connect()
# 查詢keyspaces/tables/columns狀態
print(cluster.metadata.keyspaces)
print('----------')
print(cluster.metadata.keyspaces['test'].tables)
print('----------')
print(cluster.metadata.keyspaces['test'].tables['user'])
print('----------')
print(cluster.metadata.keyspaces['test'].tables['user'].columns)
print('----------')
print(cluster.metadata.keyspaces['test'].tables['user'].columns['age'])
print('----------')
# 關閉連接
cluster.shutdown()
# 查看是否關閉連接
print(cluster.is_shutdown)
插入和查詢表中的數據
# -*- encoding: utf-8 -*-
# 引入Cluster模塊
from cassandra.cluster import Cluster
# 引入DCAwareRoundRobinPolicy模塊,可用來自定義驅動程序的行為
from cassandra.policies import DCAwareRoundRobinPolicy
# 默認本機數據庫集群(IP127.0.0.1).
cluster = Cluster()
# 連接並創建一個會話
session = cluster.connect()
# table中插入數據
session.execute('insert into test.user (name, age, email) values (%s, %s, %s);', ['aaa', 21, '222@21.com'])
session.execute('insert into test.user (name, age, email) values (%s, %s, %s);', ['bbb', 22, 'bbb@22.com'])
session.execute('insert into test.user (name, age, email) values (%s, %s, %s);', ['ddd', 20, 'ccc@20.com'])
# table中查詢數據
rows = session.execute('select * from test.user;')
for row in rows:
print(row)
# 關閉連接
cluster.shutdown()
# 查看是否關閉連接
print(cluster.is_shutdown)
連接遠程數據庫
# -*- encoding: utf-8 -*-
from cassandra import ConsistencyLevel
# 引入Cluster模塊
from cassandra.cluster import Cluster
# 引入DCAwareRoundRobinPolicy模塊,可用來自定義驅動程序的行為
# from cassandra.policies import DCAwareRoundRobinPolicy
from cassandra.auth import PlainTextAuthProvider
from cassandra.query import SimpleStatement
import pandas as pd
# 配置Cassandra集群的IP,記得改成自己的遠程數據庫IP哦
contact_points = ['1.1.1.1', '2.2.2.2', '3.3.3.3']
# 配置登陸Cassandra集群的賬號和密碼,記得改成自己知道的賬號和密碼
auth_provider = PlainTextAuthProvider(username='XXX', password='XXX')
# 創建一個Cassandra的cluster
cluster = Cluster(contact_points=contact_points, auth_provider=auth_provider)
# 連接並創建一個會話
session = cluster.connect()
# 定義一條cql查詢語句
cql_str = 'select * from keyspace.table limit 5;'
simple_statement = SimpleStatement(cql_str,consistency_level=ConsistencyLevel.ONE)
# 對語句的執行設置超時時間為None
execute_result = session.execute(simple_statement, timeout=None)
# 獲取執行結果中的原始數據
result = execute_result._current_rows
# 把結果轉成DataFrame格式
result = pd.DataFrame(result)
# 把查詢結果寫入csv
result.to_csv('連接遠程數據庫.csv', mode='a', header=True)
# 關閉連接
cluster.shutdown()