連接ODPS
from odps import ODPS odps = ODPS('**your-access-id**', '**your-secret-access-key**', '** your-default-project**', endpoint='**your-end-point**')
your-default-project**', endpoint='**your-end-point**')
這兩個可省略,通過get_project命令獲取到某個項⽬空間
創建非分區表data_temp:table_name,data_count,time 三個字段
table = odps.create_table('data_temp','table_name string, data_count bigint, time datetime',if_not_exists=True)
操作表
t = odps.get_table('data_temp')
插入數據,使用列表的形式
records = [['lrb', 344322, '2020-04-28 14:09:37'], ['zcfzb', 4343434, '2020-04-28 14:09:37'], ['xjllb', 21142232, '2020-04-28 14:09:37'], ['test', 343242, '2020-04-28 14:09:37']]
odps.write_table('data_temp', records)
讀取表中數據
with t.open_reader() as reader: count = reader.count for record in reader: #處理每一條數據
使用dataframe
df = t.to_df()
注:dataframe是maxCompute自己的,不具備pandas的dataframe一些功能(如:to_csv)
可以轉成pandas的dataframe
pd_df = df.to_pandas()
打印如下
table_name data_count time 0 lrb 344322 2020-04-28 14:09:37 1 zcfzb 4343434 2020-04-28 14:09:37 2 xjllb 21142232 2020-04-28 14:09:37 3 test 343242 2020-04-28 14:09:37
介於此,我們可以將數據直接導入MYSQL
from sqlalchemy import create_engine connect = create_engine('mysql+pymysql://root:mysql@172.20.10.12:3306/odps?charset=utf8') pd_df.to_sql("data_sync", connect, if_exists='append', index=False)
其中data_sync是mysql對應的表名,connect是mysql連接
由於我的Mysql表沒有建立主鍵,所以當代碼重復執行后,會寫入多次。
需要注意的是:ODPS的表名和MYSQL的表名盡量保持一致,不然會報錯。解決辦法是導入mysql之前,更改dataframe列名即可。