直接上代碼了
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Jul 17 14:40:34 2019
odps 封裝sql成dataframe調用
odps的dataframe 是odps.df.core.DataFrame,不喜歡它的語法咯,還是用原生的dataframe
"""
import pymysql
from sqlalchemy import create_engine
import time
import pandas as pd
from odps import ODPS
from odps.models import Schema, Column, Partition
from odps.df import DataFrame
from collections import defaultdict
odps = ODPS('username', 'password', 'data_platform',
endpoint='http://service.odps.aliyun.com/api')
def odps_read_sql(code_sql):
collection = []
with odps.execute_sql(code_sql).open_reader(tunnel=True) as reader:
for record in reader : # 處理每一個record
row = dict(record)
collection.append(row)
df = pd.DataFrame(collection)
return df
#code_sql = """
#select * from t_demo_bx_voice limit 100
#"""
#df = odps_read_sql(code_sql)
## 直接通過讀取表名,操作數據
def odps_read_table(table_name,limit=10000):
"""判斷讀取的SQL行數,如果超過10萬行,直接放棄"""
"""程序判定失敗,受到迭代器影響,異步操作導致未能實現,屬於一個小BUG,未解決"""
#sql = "select count(*) as count from {table}".format(table = table_name)
#df = odps_read_sql(sql)
table = DataFrame(odps.get_table(table_name))
col = table.dtypes.names[0]
rows = table[col].count()
count = rows
print('table表的行數:{count}'.format(count=count))
if count <=limit :
#df = table
df = table.to_pandas()
else :
df = '表行數超過1萬,請重新設置limit參數大小'
print("超過limit:{limit}限制行數,請重新設置".format(limit=limit))
return df
#table_name = 'tmp_page_godskill_order'
#odps_read_table(table_name)
def write_database_from_odps(table_name,limit=100000):
table = odps_read_table(table_name,limit=limit)
#df = table.to_pandas()
df = table
write_to_database(df,table_name)
## 通過odps向msyql數據庫寫入數據
##table_name = 'tmp_page_godskill_order'
##write_database_from_odps(table_name)
from pyhive import hive
import pandas as pd
def hive_read_sql(sql_code):
connection = hive.Connection(host='10.111.3.61', port=10000, username='username')
cur = connection.cursor()
cur.execute(sql_code)
#columns = [col[0] for col in cur.description]
#result = [dict(zip(columns, row)) for row in cur.fetchall()]
#df = pd.DataFrame(result)
#df.columns = columns
headers = [col[0] for col in cur.description]
df= pd.DataFrame(cur.fetchall(), columns = headers)
cur.close()
return df
#sql = "select ds,type,user_id from test.tmp_p2p_message_jq limit 10"
#df = hive_read_sql(sql)
#df.head()
"""
向odps寫入數據
第一步:讀取本地數據;
第二步:寫入數據
"""
def read_csv(from_path):
table = pd.read_csv(from_path,encoding='gbk') ##選取表
return table
# 創建表# 寫入表
def odps_write_dataframe(df,tb_name, schema_name=None, schema_type=None, schema_comment=None):
if schema_name is None:
schema_name = df.columns.values.tolist()
if schema_type is None:
schema_type = ['string'] * len(schema_name)
if schema_comment is None:
schema_comment = [''] * len(schema_name)
df_schema = pd.DataFrame({'name': schema_name, 'type': schema_type, 'comment': schema_comment})
df_schema = df_schema[['name', 'type', 'comment']]
df_columns = df_schema.apply(lambda x: [Column(name=x[0], type=x[1], comment=x[2])], axis=1)
columns = []
for col in df_columns:
columns = columns + col
#partitions = [Partition(name='ds', type='string', comment='')]
#schema = Schema(columns=columns, partitions=partitions)
"""
判斷表是否存在
"""
try:
table_name = odps.get_table(tb_name)
print('表已存在,表結構是',table_name.schema)
print('warning:表已存在,追加數據')
except:
print('info:表不存在,創建並寫入數據')
schema = Schema(columns=columns)
table = odps.create_table(tb_name, schema=schema, if_not_exists=True)
## 寫入數據
t = odps.get_table(tb_name)
records = df.values.tolist()
with t.open_writer() as writer:
writer.write(records) # 這里records可以是可迭代對象
return "數據集寫入odps成功"
#odps_write_dataframe(df =df,tb_name='tmp_market_v1123_v1')
def write_excle(data,to_file_name):
try:
path = '/Users/zhoujunqing/Downloads'
to_path = path+"/"+to_file_name
writer = pd.ExcelWriter(to_path, engine='xlsxwriter')
data.to_excel(writer,'Sheet1',index=False)
writer.save()
print('EXCEL保存成功success')
except:
print("EXCEL保存失敗")
def write_to_database(table,table_name):
try:
engine = create_engine("mysql+pymysql://root:password@127.0.0.1:3306/bixin?charset=utf8mb4")
table.to_sql(name=table_name,con=engine,if_exists='replace',index=False,chunksize=10000
)
print('數據庫寫入成功')
except :
print ('數據庫寫入失敗')
if __name__ == "__main__":
start_time = time.time() # 開始時間
end_time = time.time() #結束時間
print("程序耗時%f秒." % (end_time - start_time))
使用方法
步驟一:將腳本添加到python的系統環境中
步驟二:直接使用
from py_function_tools import odps_read_sql,write_excle,write_to_database,hive_read_sql
from py_function_tools import odps_write_dataframe,odps_read_table,write_database_from_odps
import time
#df_v1 = odps_read_table(table_name) ## 讀取odps的Table
#df_v1 = odps_read_sql(sql) # 讀取odps數據集
#df_v2 = hive_read_sql(sql) # 讀取hive數據集,需要手動指定表所在的數據庫
#write_excle(df,file_name) #默認保存在downloads目錄下
#write_to_database(df,table_name) #存到mysql中ypp數據庫
#from pandas_dataframe_agg import dataframe_agg
#table =dataframe_agg(df,dimensions,func=func) # groupby 處理