最近因工作需要,研究了一下boto3中dynamoDB部分,略有心得,在此總結一下。
首先是boto3的安裝,在裝有python和pip的機器上,運行
sudo pip install boto3
官網文檔里,boto3提供的與dynamoDB交互的接口有以下幾種:
batch_get_item()
batch_write_item()
can_paginate()
create_table()
delete_item()
delete_table()
describe_limits()
describe_table()
describe_time_to_live()
generate_presigned_url()
get_item()
get_paginator()
get_waiter()
list_tables()
list_tags_of_resource()
put_item()
query()
scan()
tag_resource()
untag_resource()
update_item()
update_table()
update_time_to_live()
說白了,就是對表和記錄的增、刪、查、改。本文主要描述我最近使用的那幾個接口。
要在python中使用boto3,就得先import boto3。當然,這是廢話。為了使用方便,我先寫了一個json格式的配置文件,如下:
{ "region_name":"xxx", "aws_access_key_id":"xxx", "aws_secret_access_key":"xxx" }
然后封裝了一個專門用於操作dynamoDB的類,目前什么都沒有
class dynamodb_operation():
它需要一個讀取json文件的方法:
def load_json(self,path): try: with open(path) as json_file: data = json.load(json_file) except Exception as e: print 'ERROR: no such file like ' + path exit(-1) else: return data
由於讀進來的文件可能不是json格式,我這里就是想讓他報個錯,然后退出。如果不想讓它退出,在except里改改就好了。
然后,我希望這個類有一個私有成員client,在我實例化對象的時候就建立好連接,於是,有了以下初始化方法:
def __init__(self,path): conf = self.load_json(path) self.client = boto3.client('dynamodb',region_name=conf['region_name'],aws_access_key_id=conf['aws_access_key_id'], aws_secret_access_key=conf['aws_secret_access_key'])
與之前的配置文件是對應的。
有了這個基礎,就可以封裝自己想要使用的方法了。各方法的在官網上的說明就不照搬過來了。
1、列出dynamoDB中的所有的表
def list_all_table(self): page=1 LastEvaluationTableName = "" while True: if page == 1: response = self.client.list_tables() else: response = self.client.list_tables( ExclusiveStartTableName=LastEvaluationTableName ) TableNames = response['TableNames'] for table in TableNames: print table if response.has_key('LastEvaluatedTableName'): LastEvaluationTableName = response["LastEvaluatedTableName"] else: break page += 1
list_table()方法一次最多只能獲取100張表的表名,並且在每次返回的時候,key為"LastEvaluatedTableName"的值為最后一張表的表名,可以做為下次請求的時候的參數。這樣循環調用,即可獲取所有的表名。如果后面沒有表了,response里將不會有LastEvaluatedTableName。此處我只是想把表名打印到終端,如果想保存起來,也是可以的。
2、獲取某張表的信息 describe_table()
def get_table_desc_only(self,table): try: response = self.client.describe_table(TableName=table) except Exception as e: print 'ERROR: no such table like ' + table exit(-1) else: return response["Table"]
此處只是將response["Table"]原原本本地返回,沒有做其它處理。
如果我想知道一張表的大小,可以:
def get_table_size(self,table): response = self.get_table_desc_only(table) stastic = {} stastic['TableSizeBytes'] = response['TableSizeBytes'] stastic['ItemCount'] = response['ItemCount'] return stastic
如果想知道其它信息,而且是只想知道那些信息的話,也可以寫出對應的方法。
3、創建一張表
def create_table(self,tablename,keySchema,attributeDefinitions,provisionedThroughput): table = self.client.create_table( TableName=tablename, KeySchema=keySchema, AttributeDefinitions=attributeDefinitions, ProvisionedThroughput=provisionedThroughput ) # Wait until the table exists. self.client.get_waiter('table_exists').wait(TableName=tablename) response = self.client.describe_table(TableName=tablename) print response
這是在創建一張沒有索引的表。創表需要時間,所以使用了get_waiter()方法。
4、插入數據
def put_item(self,tableName,item): try: self.client.put_item( TableName=tableName, Item=item ) except Exception as e: print 'ERROR: put item fail. msg: ' + str(e) exit(-1) else: return
封裝的此方法需要傳入的是一個格式正確的json,並且key要與表對應。比如:
{'uid':{'N':'999'},'aid':{'N':'999'},'sid':{'N':'999'},'ksid':{'N':'999'}}
5、刪表
def delete_table(self,table): try: self.client.delete_table( TableName=table ) except Exception as e: print 'ERROR: delete table ' + table + ' fail. msg: ' + str(e) else: print 'delete table ' + table + ' succ'
其它方法不多說了。接下來就是表的備份與恢復。要做到什么程度呢,備份的時候,保存好表的結構,大小,以及所有條目,包括索引,恢復的時候,要能建一張一模一樣的表,並把數據灌進去。
首先是備份表的結構。為了方便恢復表,對describe_table()方法的response進行了處理,同時對init方法進行修改:
def __init__(self,path): conf = self.load_json(path) self.client = boto3.client('dynamodb',region_name=conf['region_name'],aws_access_key_id=conf['aws_access_key_id'], aws_secret_access_key=conf['aws_secret_access_key']) self.conf_path = path self.items = ['TableName','AttributeDefinitions','KeySchema','LocalSecondaryIndexes','GlobalSecondaryIndexes','ProvisionedThroughput','StreamSpecification']
items里為創表時create_table()方法中的所有參數,其中,TableName,AttributeDefinitions,KeySchema,ProvisionedThroughput四項是創表時必傳參數,另外三項為選傳。同樣地,describe_table()方法的response中,這四項也是一定存在的。故:
def get_SecondaryIndexes_desc(self,content): result = [] for sub_item in content: sub_content = {} sub_content['IndexName'] = sub_item['IndexName'] sub_content['KeySchema'] = sub_item['KeySchema'] sub_content['Projection'] = sub_item['Projection'] result.append(sub_content) return result
LocalSecondaryIndexes與GlobalSecondaryIndexes都是列表,所以默認不存在的項直接賦值一個空列表。
def get_table_desc_for_create_table(self,table): response = self.get_table_desc_only(table) result = {} for item in self.items: try: content = response[item] except Exception as e: continue else: if item == 'TableName': if content != table: print 'ERROR: dynamoDB get table desc error' exit(-1) result[item] = content elif item == 'LocalSecondaryIndexes' or item == 'GlobalSecondaryIndexes': result[item] = self.get_SecondaryIndexes_desc(content) continue elif item == 'ProvisionedThroughput': continue else: result[item] = content continue return json.dumps(result)
由於表的閾值不是固定的,所以不做保存。在創表的時候直接設置成一個固定的值即可。
對應地,恢復表的方法們:
def get_item_desc(self,item,content): try: result = content[item] except Exception as e: result = [] return result def create_table_from_desc(self,path): table_desc = self.load_json(path) provisionedThroughput={ 'ReadCapacityUnits': 5, 'WriteCapacityUnits': 5 } tableName = self.get_item_desc('TableName',table_desc) attributeDefinitions = self.get_item_desc('AttributeDefinitions',table_desc) keySchema = self.get_item_desc('KeySchema',table_desc) localSecondaryIndexes = self.get_item_desc('LocalSecondaryIndexes',table_desc) globalSecondaryIndexes = self.get_item_desc('GlobalSecondaryIndexes',table_desc) streamSpecification = self.get_item_desc('StreamSpecification',table_desc) if len(globalSecondaryIndexes): for item in globalSecondaryIndexes: item['ProvisionedThroughput'] = provisionedThroughput try: if len(localSecondaryIndexes): if len(globalSecondaryIndexes): table = self.client.create_table( TableName=tableName, KeySchema=keySchema, AttributeDefinitions=attributeDefinitions, ProvisionedThroughput=provisionedThroughput, LocalSecondaryIndexes=localSecondaryIndexes, GlobalSecondaryIndexes=globalSecondaryIndexes ) else: table = self.client.create_table( TableName=tableName, KeySchema=keySchema, AttributeDefinitions=attributeDefinitions, ProvisionedThroughput=provisionedThroughput, LocalSecondaryIndexes=localSecondaryIndexes ) else: if len(globalSecondaryIndexes): table = self.client.create_table( TableName=tableName, KeySchema=keySchema, AttributeDefinitions=attributeDefinitions, ProvisionedThroughput=provisionedThroughput, GlobalSecondaryIndexes=globalSecondaryIndexes ) else: table = self.client.create_table( TableName=tableName, KeySchema=keySchema, AttributeDefinitions=attributeDefinitions, ProvisionedThroughput=provisionedThroughput ) except Exception as e: print 'ERROR: error desc like file: ' + path + '\tmsg: ' + str(e) exit(-1) else: # Wait until the table exists. self.client.get_waiter('table_exists').wait(TableName=tableName) response = self.client.describe_table(TableName=tableName) print response
傳入的path為之前保存好的表的結構。由於兩個索引非必傳,所以寫了四個創表的方法。之前嘗試過,不存在索引的時候傳空列表[],None,或者()之類,都會報錯,只能用這么一樣比較笨的方法。由於StreamSpecification我沒有用到,所以只是寫在這里。
由於時間關系,dump和灌表工具直接就使用git_hub中的 dynamo-archive了。不然自己實現一個也是極好的。
在考慮備份表的時候,最近是否使用到成為一個判斷因素。此處要使用到cloudwatch。
def get_stastic(self,dimension): conf = self.load_json(self.conf_path) cw = boto3.client('cloudwatch',region_name=conf['region_name'],aws_access_key_id=conf['aws_access_key_id'], aws_secret_access_key=conf['aws_secret_access_key']) stastic={} stastic['Write'] = 0 stastic['Read'] = 0 # write table_stastic = cw.get_metric_statistics(Namespace='AWS/DynamoDB', MetricName='ConsumedWriteCapacityUnits', Dimensions=dimension, StartTime=datetime.utcnow()-timedelta(days=9), EndTime=datetime.utcnow(), Period=900, Statistics=['Sum', 'Maximum'], Unit='Count')['Datapoints'] if len(table_stastic) > 1: for item in table_stastic: stastic['Write'] += int(item['Sum']) #read table_stastic = cw.get_metric_statistics(Namespace='AWS/DynamoDB', MetricName='ConsumedReadCapacityUnits', Dimensions=dimension, StartTime=datetime.utcnow()-timedelta(days=9), EndTime=datetime.utcnow(), Period=900, Statistics=['Sum', 'Maximum'], Unit='Count')['Datapoints'] if len(table_stastic) > 1: for item in table_stastic: stastic['Read'] += int(item['Sum']) return stastic def get_table_use(self,table_name): dimension = [{'Name': 'TableName', 'Value': table_name}] stastic = self.get_stastic(dimension) table = self.get_table_desc_only(table_name) for index in table.get('GlobalSecondaryIndexes', []): if index['IndexStatus'] != 'ACTIVE': return dimension = [{'Name': 'TableName', 'Value': table_name}, {'Name': 'GlobalSecondaryIndexName', 'Value': index['IndexName']}] tmp = self.get_stastic(dimension) stastic['Write'] += tmp['Write'] stastic['Read'] += tmp['Read'] return stastic def check_table_is_use(self,stastic): read = stastic['Write'] write = stastic['Read'] if read == 0 and write == 0: return False else: return True
直接獲取了最近9天的總使用量。
當然,如果想知道這張表是不是存在的,也是可以的
def check_table_is_exist(self,table): try: response = self.client.describe_table(TableName=table) except Exception as e: return 0 else: return 1
總體寫得挺齪的QAQ
參考資料:http://boto3.readthedocs.io/en/latest/reference/services/dynamodb.html
原始文件 dynamodb_operation.py

1 #!/usr/bin/python 2 #-*- encoding: utf-8 -*- 3 4 import boto3 5 import json 6 import sys 7 from datetime import datetime, timedelta 8 9 class dynamodb_operation(): 10 def __init__(self,path): 11 conf = self.load_json(path) 12 self.client = boto3.client('dynamodb',region_name=conf['region_name'],aws_access_key_id=conf['aws_access_key_id'], aws_secret_access_key=conf['aws_secret_access_key']) 13 self.conf_path = path 14 self.items = ['TableName','AttributeDefinitions','KeySchema','LocalSecondaryIndexes','GlobalSecondaryIndexes','ProvisionedThroughput','StreamSpecification'] 15 16 def load_json(self,path): 17 try: 18 with open(path) as json_file: 19 data = json.load(json_file) 20 except Exception as e: 21 print 'ERROR: no such file like ' + path 22 exit(-1) 23 else: 24 return data 25 26 def create_table(self,tablename,keySchema,attributeDefinitions,provisionedThroughput): 27 table = self.client.create_table( 28 TableName=tablename, 29 KeySchema=keySchema, 30 AttributeDefinitions=attributeDefinitions, 31 ProvisionedThroughput=provisionedThroughput 32 ) 33 34 # Wait until the table exists. 35 self.client.get_waiter('table_exists').wait(TableName=tablename) 36 37 response = self.client.describe_table(TableName=tablename) 38 print response 39 40 def get_item_desc(self,item,content): 41 try: 42 result = content[item] 43 except Exception as e: 44 result = [] 45 return result 46 47 def create_table_from_desc(self,path): 48 table_desc = self.load_json(path) 49 provisionedThroughput={ 50 'ReadCapacityUnits': 5, 51 'WriteCapacityUnits': 5 52 } 53 tableName = self.get_item_desc('TableName',table_desc) 54 attributeDefinitions = self.get_item_desc('AttributeDefinitions',table_desc) 55 keySchema = self.get_item_desc('KeySchema',table_desc) 56 localSecondaryIndexes = self.get_item_desc('LocalSecondaryIndexes',table_desc) 57 globalSecondaryIndexes = self.get_item_desc('GlobalSecondaryIndexes',table_desc) 58 streamSpecification = self.get_item_desc('StreamSpecification',table_desc) 59 60 if len(globalSecondaryIndexes): 61 for item in globalSecondaryIndexes: 62 item['ProvisionedThroughput'] = provisionedThroughput 63 64 try: 65 if len(localSecondaryIndexes): 66 if len(globalSecondaryIndexes): 67 table = self.client.create_table( 68 TableName=tableName, 69 KeySchema=keySchema, 70 AttributeDefinitions=attributeDefinitions, 71 ProvisionedThroughput=provisionedThroughput, 72 LocalSecondaryIndexes=localSecondaryIndexes, 73 GlobalSecondaryIndexes=globalSecondaryIndexes 74 ) 75 else: 76 table = self.client.create_table( 77 TableName=tableName, 78 KeySchema=keySchema, 79 AttributeDefinitions=attributeDefinitions, 80 ProvisionedThroughput=provisionedThroughput, 81 LocalSecondaryIndexes=localSecondaryIndexes 82 ) 83 else: 84 if len(globalSecondaryIndexes): 85 table = self.client.create_table( 86 TableName=tableName, 87 KeySchema=keySchema, 88 AttributeDefinitions=attributeDefinitions, 89 ProvisionedThroughput=provisionedThroughput, 90 GlobalSecondaryIndexes=globalSecondaryIndexes 91 ) 92 else: 93 table = self.client.create_table( 94 TableName=tableName, 95 KeySchema=keySchema, 96 AttributeDefinitions=attributeDefinitions, 97 ProvisionedThroughput=provisionedThroughput 98 ) 99 100 except Exception as e: 101 print 'ERROR: error desc like file: ' + path + '\tmsg: ' + str(e) 102 exit(-1) 103 104 else: 105 # Wait until the table exists. 106 self.client.get_waiter('table_exists').wait(TableName=tableName) 107 108 response = self.client.describe_table(TableName=tableName) 109 print response 110 111 def get_table_desc_only(self,table): 112 try: 113 response = self.client.describe_table(TableName=table) 114 except Exception as e: 115 print 'ERROR: no such table like ' + table 116 exit(-1) 117 else: 118 return response["Table"] 119 120 def check_table_is_exist(self,table): 121 try: 122 response = self.client.describe_table(TableName=table) 123 except Exception as e: 124 return 0 125 else: 126 return 1 127 128 def get_SecondaryIndexes_desc(self,content): 129 result = [] 130 for sub_item in content: 131 sub_content = {} 132 sub_content['IndexName'] = sub_item['IndexName'] 133 sub_content['KeySchema'] = sub_item['KeySchema'] 134 sub_content['Projection'] = sub_item['Projection'] 135 result.append(sub_content) 136 return result 137 138 def get_table_desc_for_create_table(self,table): 139 response = self.get_table_desc_only(table) 140 result = {} 141 for item in self.items: 142 try: 143 content = response[item] 144 except Exception as e: 145 continue 146 else: 147 if item == 'TableName': 148 if content != table: 149 print 'ERROR: dynamoDB get table desc error' 150 exit(-1) 151 result[item] = content 152 153 elif item == 'LocalSecondaryIndexes' or item == 'GlobalSecondaryIndexes': 154 result[item] = self.get_SecondaryIndexes_desc(content) 155 continue 156 157 elif item == 'ProvisionedThroughput': 158 continue 159 160 else: 161 result[item] = content 162 continue 163 164 return json.dumps(result) 165 166 def get_table_size(self,table): 167 response = self.get_table_desc_only(table) 168 stastic = {} 169 stastic['TableSizeBytes'] = response['TableSizeBytes'] 170 stastic['ItemCount'] = response['ItemCount'] 171 return stastic 172 173 def list_all_table(self): 174 page=1 175 LastEvaluationTableName = "" 176 while True: 177 if page == 1: 178 response = self.client.list_tables() 179 else: 180 response = self.client.list_tables( 181 ExclusiveStartTableName=LastEvaluationTableName 182 ) 183 TableNames = response['TableNames'] 184 for table in TableNames: 185 print table 186 if response.has_key('LastEvaluatedTableName'): 187 LastEvaluationTableName = response["LastEvaluatedTableName"] 188 else: 189 break 190 page += 1 191 192 def get_stastic(self,dimension): 193 conf = self.load_json(self.conf_path) 194 cw = boto3.client('cloudwatch',region_name=conf['region_name'],aws_access_key_id=conf['aws_access_key_id'], aws_secret_access_key=conf['aws_secret_access_key']) 195 196 stastic={} 197 stastic['Write'] = 0 198 stastic['Read'] = 0 199 200 # write 201 table_stastic = cw.get_metric_statistics(Namespace='AWS/DynamoDB', MetricName='ConsumedWriteCapacityUnits', 202 Dimensions=dimension, 203 StartTime=datetime.utcnow()-timedelta(days=9), EndTime=datetime.utcnow(), 204 Period=900, Statistics=['Sum', 'Maximum'], Unit='Count')['Datapoints'] 205 206 if len(table_stastic) > 1: 207 for item in table_stastic: 208 stastic['Write'] += int(item['Sum']) 209 210 #read 211 table_stastic = cw.get_metric_statistics(Namespace='AWS/DynamoDB', MetricName='ConsumedReadCapacityUnits', 212 Dimensions=dimension, 213 StartTime=datetime.utcnow()-timedelta(days=9), EndTime=datetime.utcnow(), 214 Period=900, Statistics=['Sum', 'Maximum'], Unit='Count')['Datapoints'] 215 216 if len(table_stastic) > 1: 217 for item in table_stastic: 218 stastic['Read'] += int(item['Sum']) 219 220 return stastic 221 222 def get_table_use(self,table_name): 223 dimension = [{'Name': 'TableName', 'Value': table_name}] 224 stastic = self.get_stastic(dimension) 225 table = self.get_table_desc_only(table_name) 226 for index in table.get('GlobalSecondaryIndexes', []): 227 if index['IndexStatus'] != 'ACTIVE': 228 return 229 dimension = [{'Name': 'TableName', 'Value': table_name}, {'Name': 'GlobalSecondaryIndexName', 'Value': index['IndexName']}] 230 tmp = self.get_stastic(dimension) 231 stastic['Write'] += tmp['Write'] 232 stastic['Read'] += tmp['Read'] 233 234 return stastic 235 236 def check_table_is_use(self,stastic): 237 read = stastic['Write'] 238 write = stastic['Read'] 239 if read == 0 and write == 0: 240 return False 241 else: 242 return True 243 244 def delete_table(self,table): 245 try: 246 self.client.delete_table( 247 TableName=table 248 ) 249 except Exception as e: 250 print 'ERROR: delete table ' + table + ' fail. msg: ' + str(e) 251 else: 252 print 'delete table ' + table + ' succ' 253 254 def list_dynamodb_conf(self): 255 conf = self.load_json(self.conf_path) 256 print 'region_name=' + '"' + conf['region_name'] + '"' 257 print 'aws_access_key_id=' + '"' + conf['aws_access_key_id'] + '"' 258 print 'aws_secret_access_key=' + '"' + conf['aws_secret_access_key'] + '"' 259 260 def put_item(self,tableName,item): 261 try: 262 self.client.put_item( 263 TableName=tableName, 264 Item=item 265 ) 266 except Exception as e: 267 print 'ERROR: put item fail. msg: ' + str(e) 268 exit(-1) 269 else: 270 return 271 272 def put_items(self,tableName,item_path): 273 for item in open(item_path): 274 self.put_item(tableName,eval(item)) 275 276 if __name__ == "__main__": 277 if len(sys.argv) < 2: 278 print "cmd args" 279 print "list_all_table" 280 print "list_dynamodb_conf" 281 print "get_table_desc_for_create_table table" 282 print "get_table_desc_only table" 283 print "get_table_size table" 284 print "create_table_from_desc table_desc_file" 285 print "check_table_is_exist table" 286 print "get_table_use table" 287 print "delete_table table password" 288 print "put_item table item(json)" 289 print "put_items table item_file_path" 290 exit(-1) 291 292 db = dynamodb_operation('../conf/dynamoDB.conf') 293 294 cmd = str(sys.argv[1]) 295 if len(sys.argv) == 2: 296 if cmd == 'list_all_table': 297 db.list_all_table() 298 if cmd == 'list_dynamodb_conf': 299 db.list_dynamodb_conf() 300 301 if len(sys.argv) == 3: 302 if cmd == 'get_table_desc_for_create_table': 303 table = str(sys.argv[2]) 304 print db.get_table_desc_for_create_table(table) 305 306 if cmd == 'get_table_desc_only': 307 table = str(sys.argv[2]) 308 print db.get_table_desc_only(table) 309 310 if cmd == 'check_table_is_exist': 311 table = str(sys.argv[2]) 312 print db.check_table_is_exist(table) 313 314 if cmd == 'get_table_size': 315 table = str(sys.argv[2]) 316 print db.get_table_size(table) 317 318 if cmd == 'create_table_from_desc': 319 desc_file_path = str(sys.argv[2]) 320 db.create_table_from_desc(desc_file_path) 321 322 if cmd == 'get_table_use': 323 table = str(sys.argv[2]) 324 stastic = db.get_table_use(table) 325 print stastic 326 print db.check_table_is_use(stastic) 327 328 329 if len(sys.argv) == 4: 330 if cmd == 'delete_table': 331 table = str(sys.argv[2]) 332 password = str(sys.argv[3]) 333 if password == 'password': 334 db.delete_table(table) 335 else: 336 print 'ERROR: password error!' 337 exit(-1) 338 339 if cmd == 'put_item': 340 table = str(sys.argv[2]) 341 tmp = str(sys.argv[3]) 342 item = eval(tmp) 343 db.put_item(table,item) 344 345 if cmd == 'put_items': 346 table = str(sys.argv[2]) 347 item_file_path = str(sys.argv[3]) 348 db.put_items(table,item_file_path)