直接上代码了
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Jul 17 14:40:34 2019
odps 封装sql成dataframe调用
odps的dataframe 是odps.df.core.DataFrame,不喜欢它的语法咯,还是用原生的dataframe
"""
import pymysql
from sqlalchemy import create_engine
import time
import pandas as pd
from odps import ODPS
from odps.models import Schema, Column, Partition
from odps.df import DataFrame
from collections import defaultdict
odps = ODPS('username', 'password', 'data_platform',
endpoint='http://service.odps.aliyun.com/api')
def odps_read_sql(code_sql):
collection = []
with odps.execute_sql(code_sql).open_reader(tunnel=True) as reader:
for record in reader : # 处理每一个record
row = dict(record)
collection.append(row)
df = pd.DataFrame(collection)
return df
#code_sql = """
#select * from t_demo_bx_voice limit 100
#"""
#df = odps_read_sql(code_sql)
## 直接通过读取表名,操作数据
def odps_read_table(table_name,limit=10000):
"""判断读取的SQL行数,如果超过10万行,直接放弃"""
"""程序判定失败,受到迭代器影响,异步操作导致未能实现,属于一个小BUG,未解决"""
#sql = "select count(*) as count from {table}".format(table = table_name)
#df = odps_read_sql(sql)
table = DataFrame(odps.get_table(table_name))
col = table.dtypes.names[0]
rows = table[col].count()
count = rows
print('table表的行数:{count}'.format(count=count))
if count <=limit :
#df = table
df = table.to_pandas()
else :
df = '表行数超过1万,请重新设置limit参数大小'
print("超过limit:{limit}限制行数,请重新设置".format(limit=limit))
return df
#table_name = 'tmp_page_godskill_order'
#odps_read_table(table_name)
def write_database_from_odps(table_name,limit=100000):
table = odps_read_table(table_name,limit=limit)
#df = table.to_pandas()
df = table
write_to_database(df,table_name)
## 通过odps向msyql数据库写入数据
##table_name = 'tmp_page_godskill_order'
##write_database_from_odps(table_name)
from pyhive import hive
import pandas as pd
def hive_read_sql(sql_code):
connection = hive.Connection(host='10.111.3.61', port=10000, username='username')
cur = connection.cursor()
cur.execute(sql_code)
#columns = [col[0] for col in cur.description]
#result = [dict(zip(columns, row)) for row in cur.fetchall()]
#df = pd.DataFrame(result)
#df.columns = columns
headers = [col[0] for col in cur.description]
df= pd.DataFrame(cur.fetchall(), columns = headers)
cur.close()
return df
#sql = "select ds,type,user_id from test.tmp_p2p_message_jq limit 10"
#df = hive_read_sql(sql)
#df.head()
"""
向odps写入数据
第一步:读取本地数据;
第二步:写入数据
"""
def read_csv(from_path):
table = pd.read_csv(from_path,encoding='gbk') ##选取表
return table
# 创建表# 写入表
def odps_write_dataframe(df,tb_name, schema_name=None, schema_type=None, schema_comment=None):
if schema_name is None:
schema_name = df.columns.values.tolist()
if schema_type is None:
schema_type = ['string'] * len(schema_name)
if schema_comment is None:
schema_comment = [''] * len(schema_name)
df_schema = pd.DataFrame({'name': schema_name, 'type': schema_type, 'comment': schema_comment})
df_schema = df_schema[['name', 'type', 'comment']]
df_columns = df_schema.apply(lambda x: [Column(name=x[0], type=x[1], comment=x[2])], axis=1)
columns = []
for col in df_columns:
columns = columns + col
#partitions = [Partition(name='ds', type='string', comment='')]
#schema = Schema(columns=columns, partitions=partitions)
"""
判断表是否存在
"""
try:
table_name = odps.get_table(tb_name)
print('表已存在,表结构是',table_name.schema)
print('warning:表已存在,追加数据')
except:
print('info:表不存在,创建并写入数据')
schema = Schema(columns=columns)
table = odps.create_table(tb_name, schema=schema, if_not_exists=True)
## 写入数据
t = odps.get_table(tb_name)
records = df.values.tolist()
with t.open_writer() as writer:
writer.write(records) # 这里records可以是可迭代对象
return "数据集写入odps成功"
#odps_write_dataframe(df =df,tb_name='tmp_market_v1123_v1')
def write_excle(data,to_file_name):
try:
path = '/Users/zhoujunqing/Downloads'
to_path = path+"/"+to_file_name
writer = pd.ExcelWriter(to_path, engine='xlsxwriter')
data.to_excel(writer,'Sheet1',index=False)
writer.save()
print('EXCEL保存成功success')
except:
print("EXCEL保存失败")
def write_to_database(table,table_name):
try:
engine = create_engine("mysql+pymysql://root:password@127.0.0.1:3306/bixin?charset=utf8mb4")
table.to_sql(name=table_name,con=engine,if_exists='replace',index=False,chunksize=10000
)
print('数据库写入成功')
except :
print ('数据库写入失败')
if __name__ == "__main__":
start_time = time.time() # 开始时间
end_time = time.time() #结束时间
print("程序耗时%f秒." % (end_time - start_time))
使用方法
步骤一:将脚本添加到python的系统环境中
步骤二:直接使用
from py_function_tools import odps_read_sql,write_excle,write_to_database,hive_read_sql
from py_function_tools import odps_write_dataframe,odps_read_table,write_database_from_odps
import time
#df_v1 = odps_read_table(table_name) ## 读取odps的Table
#df_v1 = odps_read_sql(sql) # 读取odps数据集
#df_v2 = hive_read_sql(sql) # 读取hive数据集,需要手动指定表所在的数据库
#write_excle(df,file_name) #默认保存在downloads目录下
#write_to_database(df,table_name) #存到mysql中ypp数据库
#from pandas_dataframe_agg import dataframe_agg
#table =dataframe_agg(df,dimensions,func=func) # groupby 处理