python大數據分析代碼案例


 

#查詢用戶余額代碼案例

 

import sys

import MySQLdb

import pandas as pd

 

optmap = {

'dbuser' : 'aduser',

'dbpass' : '123654',

'dbhost' : '192.168.10.14',

'dbport' : 3306,

'dbname' : 'HBAODB'

}

 

def sql_select(reqsql):

try:

db_conn = MySQLdb.connect (user=optmap['dbuser'], passwd=optmap['dbpass'], host=optmap['dbhost'], port=optmap['dbport'], db=optmap['dbname'])

db_cursor= db_conn.cursor()

db_conn.query("use %s"%optmap['dbname'])

count = db_cursor.execute(reqsql)

ret = db_cursor.fetchall()

 

db_cursor.close()

db_conn.close

return ret

except MySQLdb.Error,e:

print "Mysql ERROR %d:%s" %(e.args[0], e.args[1])

return ''

def getusercoin(userid):

i = int(userid) % 10

reqsql = "select ID,COINREFER from CHARCOIN%u where ID=%u" % (int(i), int(userid))

#print reqsql

ret = sql_select(reqsql) #調用前面的函數

#print ret

return ret[0]

def getall(userlist):

userdata = pd.DataFrame(columns=('userid', 'coin'))

index = 0

for userid in userlist:

coins = getusercoin(userid) #調用前面的函數

#print coins[0],coins[1]/100.0

if coins[0] is not None:

userdata.loc[index] = (str(coins[0]), coins[1]/100.0)

else:

userdata.loc[index] = (str(userid), 0)

index += 1

#print userdata.tail(10)

df = spark.createDataFrame(userdata)

#df.createOrReplaceTempView('userdata')

df.show(50)

 

 

 

** #用戶消費查詢代碼案例 **

 

import sys

import MySQLdb

import pandas as pd

import datetime

import time

 

optmap = {

'dbuser' : 'aduser',

'dbpass' : '123654',

'dbhost' : '192.168.10.12',

'dbport' : 3306,

'dbname' : 'JIESUANDB'

}

 

def sql_select(reqsql) :

try:

db_conn = MySQLdb.connect(user=optmap['dbuser'], passwd=optmap['dbpass'], host=optmap['dbhost'], port=optmap['dbport'], db=optmap['dbname'])

db_cursor=db_conn.cursor()

db_conn.query("use %s"%optmap['dbname'])

count = db_cursor.execute(reqsql)

ret = db_cursor.fetchall()

 

db_cursor.close()

db_conn.close

return ret

except MySQLdb.Error,e:

print "Mysql ERROR %d:%s" %(e.args[0], e.args[1])

return ''

#用戶人民幣消費

def getuserconsume(userid, startday): #定義帶參函數

strdate = startday.strftime("%y%m%d")

送禮物 + 守護 + 點歌 + 表情貼

reqsql = "select CONSUMERID,SUM(DUBIOPNUM) from DUBIJIESUANTONGJI_%s where CONSUMERID=%u AND DBIOPTYPE=2 AND (OPTYPE=1 OR OPTYPE=4 OR OPTYPE=17 OR OPTYPE=25)" % (strdate, int(userid))

print reqsql

ret = sql_select(reqsql) #調用前面的函數

#print ret

if ret0 is not None:

return float(ret0)/100.0

else:

return 0

#用戶充值

def getusercharge(userid, startday):

strdate = startday.strftime("%y%m%d")

reqsql = "select CONSUMERID,SUM(DUBIOPNUM) from DUBIJIESUANTONGJI_%s where CONSUMERID=%u AND DUBIOPTYPE=1 AND (OPTYPE=1016 OR OPTYPE=1020 OR OPTYPE=1021)" % (strdate, int(userid))

print reqsql

ret = sql_select(reqsql)#調用前面的函數

print ret

if ret0 is not None:

return float(ret0)/100.0

else:

return 0

#用戶當天結余人民幣

def getusercurcoin(userid, startday):

strdate = startday.strftime("%y%m%d")

reqsql = "select CONSUMERID,CURRENTNUM from DUBIJIESUANTONGJI_%s where CONSUMERID=%u ORDER BY OPTIME DESC LIMIT 1" % (strdate, int(userid))

print reqsql

ret = sql_select(reqsql)

print ret

if ret:

return float(ret0)/100.0

else:

return 0

def getconsume():

startdate = datetime.date(2017, 1, 1)

enddate = datetime.date(2017, 2, 2)

userid = 3101011990

** userdata ** = pd.DataFrame(columns=('date','userid','charge', 'consume', 'dayleftcoin'))

 

index = 0

計算日差

td = enddate - startdate

datelen = td.days + 1

#print datelen

** delta = datetime.timedelta(days=1) **

allcoins = 0

** for i in range(0,datelen): **

** startday = startdate + delta * i **

consume_coin = getuserconsume(userid, startday) #調用前面的函數

charge = getusercharge(userid, startday) #調用前面的函數

dayleftcoin = getusercurcoin(userid, startday) #調用前面的函數

** userdata ** .loc[index] = (startday.strftime("%Y-%m-%d"),str(userid), charge, consume_coin, dayleftcoin)

index += 1

#userdata.loc[index] = ('total',str(userid), allcoins, 0)

print userdata.tail(100)

return

getconsume()

 

 

 

 

 

** #查詢用戶機器ID 代碼案例 **

 

import sys

import MySQLdb

import pandas as pd

import datetime

 

optmap = {

'dbuser' : 'aduser',

'dbpass' : '123654',

'dbhost' : '192.168.10.15',

'dbport' : 3306,

'dbname' : 'JIQIDB'

}

 

def sql_select(reqsql) :

try:

db_conn = MySQLdb.connect(user=optmap['dbuser'], passwd=optmap['dbpass'], host=optmap['dbhost'], port=optmap['dbport'], db=optmap['dbname'])

db_cursor=db_conn.cursor()

db_conn.query("use %s"%optmap['dbname'])

count = db_cursor.execute(reqsql)

ret = db_cursor.fetchall()

 

db_cursor.close()

db_conn.close

return ret

except MySQLdb.Error,e:

print "Mysql ERROR %d:%s" %(e.args[0], e.args[1])

return ''

def getusermid(userid, months):

i = int(userid) % 50

reqsql = "select USERID,MACHINEID from LOGINHISTORY%s%u where USERID=%u group by MACHINEID" % (months,int(i), int(userid))

print reqsql

ret = sql_select(reqsql)

#print ret

#print ret[0]

return ret

def getall(userlist):

today = datetime.date.today()

months = today.strftime("%Y%m")

** userdata ** = pd.DataFrame(columns=('USERID', 'MACHINEID'))

index = 0

for userid in userlist:

coins = getusermid(userid, months)

for i in range(len(coins)):

#print coins[i]

userdata.loc[index] = (str(coinsi), str(coinsi))

index += 1

#print coins[0],coins[1]/100.0

#userdata.loc[index] = (str(coins[0]), coins[1]/100.0)

#index += 1

#print userdata.tail(10)

** df = spark.createDataFrame(userdata) **

#df.createOrReplaceTempView('userdata')

** df.show(1000) **

 

 

 

 

** #人民幣統計代碼案例 **

from pyspark.sql import Row

from pyspark.sql.types import *

from pyspark.sql.functions import udf

import MySQLdb

import mysql_op

import datetime

import time

from mysql_op import MySQL

import pandas as pd

import numpy as np

from fastparquet import ParquetFile

from fastparquet import write

 

** def fromDayToDay(startdate, datelen, func): **

delta = datetime.timedelta(days=1)

for i in range(0,datelen):

startday = startdate + delta * i

endday = startdate + delta * (i + 1)

func(startday, endday)

return

** def fromDayToEndDay(startdate, datelen, endday, func): **

delta = datetime.timedelta(days=1)

for i in range(0,datelen):

startday = startdate + delta * i

#endday = startdate + delta * (i + 1)

func(startday, endday)

return

 

獲取人民幣數據

def saveDayPackageData(startday, endday):

#數據庫連接參數

dbconfig = {'host':'192.168.10.12',

'port': 3306,

'user':'user',

'passwd':'123654',

'db':'JIESUANDB',

'charset':'utf8'}

 

#連接數據庫,創建這個類的實例

mysql_cn= MySQLdb.connect(host=dbconfig['host'], port=dbconfig['port'],user=dbconfig['user'], passwd=dbconfig['passwd'], db=dbconfig['db'])

strday = startday.strftime("%Y%m%d")

tsstart=time.mktime(startday.timetuple())

tsend=time.mktime(endday.timetuple())

strdate = startday.strftime("%y%m%d")

 

sql = "SELECT OPTIME,CONSUMERID,PRESERVENUM,CURRENTNUM,DUBIOPTYPE,DUBIOPNUM,OPTYPE,OPDETAIL,OPNUM FROM DUBIJIESUANTONGJI_%s" % (strdate)

print sql

pddf = pd.read_sql(sql, con=mysql_cn)

mysql_cn.close()

print pddf.head(5)

dflen = len(pddf.index)

if dflen > 0:

print pddf.describe()

write("/home/haoren/logstatis/billdata"+strday+".parq", pddf)

return

 

** def savePackageData(): **

startday = datetime.date(2016, 12, 28)

endday = datetime.date(2016, 12, 28)

td = endday - startday

datelen = td.days + 1

獲取包裹數據

fromDayToDay(startday, datelen, saveDayPackageData)

獲取WF冊數據

** def saveDayWifiPhoneRegData(startday, endday): **

#數據庫連接參數

dbconfig = {'host':'192.168.10.15',

'port': 3306,

'user':'user',

'passwd':'123654',

'db':'AADB',

'charset':'utf8'}

 

#連接數據庫,創建這個類的實例

mysql_cn= MySQLdb.connect(host=dbconfig['host'], port=dbconfig['port'],user=dbconfig['user'], passwd=dbconfig['passwd'], db=dbconfig['db'])

strday = startday.strftime("%Y%m%d")

tsstart=time.mktime(startday.timetuple())

tsend=time.mktime(endday.timetuple())

strdate = startday.strftime("%y%m%d")

 

sql = "select USERID from NEW_WEB_USER where TIME< %d AND TYPE=17" % (tsend)

print sql

pddf = pd.read_sql(sql, con=mysql_cn)

mysql_cn.close()

print pddf.head(5)

dflen = len(pddf.index)

if dflen > 0:

print pddf.describe()

write("/home/haoren/logstatis/wifiphonereg"+strday+".parq", pddf)

return

 

** def saveWifiPhoneReg(): **

startday = datetime.date(2016, 12, 1)

endday = datetime.date(2016, 12, 1)

td = endday - startday

** datelen = td.days + 1 **

獲取包裹數據

fromDayToDay(startday, datelen, saveDayWifiPhoneRegData)

OPTypeName = {

0:"會員",

1:"道具",

 

}

 

OpDetailName19 = {

1:"購物保存收益",

2:"下注和返注",

3:"發紅包",

4:"搶紅包",

 

}

 

OpDetailName22 = {

1:"活動1收益到總賬號",

2:"活動2收益到總賬號",

3:"活動3收益到總賬號",

 

}

 

OpDetailName23 = {

0:"購買會員",

1:"購買道具",

2:"掃雷",

 

}

 

def getOpTypeName(func):

name = OPTypeName.get(func)

if name == None:

return ""

else:

return name.decode('utf8')

def getOpDetailName(func, detail):

if func == 19:

if detail > 10000 and detail < 30000:

return "包裹回滾".decode('utf8')

elif detail > 50000 and detail < 60000:

return "紅包接龍".decode('utf8')

else:

name = OpDetailName19.get(detail)

if name == None:

return ""

else:

return name.decode('utf8')

elif func == 22:

name = OpDetailName22.get(detail)

if name == None:

return ""

else:

return name.decode('utf8')

elif func == 23:

name = OpDetailName23.get(detail)

if name == None:

return ""

else:

return name.decode('utf8')

else:

return ""

 

** def getDayPackageData(startday, endday): **

strday = startday.strftime("%Y%m%d")

print strday + '人民幣數據'

df = spark.read.load("/home/haoren/logstatis/billdata"+strday+".parq")

df.show(10)

#df.createOrReplaceTempView('billdata')

#df.registerTempTable("billdata")

#sqlret = sqlc.sql("SELECT count(*) from billdata")

#sqlret.show(1)

df2 = df.withColumn('OPTYPENAME', udf(getOpTypeName)(df.OPTYPE))

df2.show(10)

df = df2.withColumn('DETAILNAME', udf(getOpDetailName)(df2.OPTYPE, df2.OPDETAIL))

df.show(10)

df.createOrReplaceTempView('billdata')

return

** def getPackageData(): **

startday = datetime.date(2016, 12, 28)

endday = datetime.date(2016, 12, 28)

td = endday - startday

** datelen = td.days + 1 **

獲取包裹數據

** fromDayToDay ** (startday, datelen, getDayPackageData)#調用前面的函數

print 'getPackageData finish'

 

獲取充值數據

** def getChargeInfo(startday, endday): **

#數據庫連接參數

dbconfig = {'host':'192.168.10.14',

'port': 3306,

'user':'user',

'passwd':'123654',

'db':'BAOIMDB',

'charset':'utf8'}

#連接數據庫,創建這個類的實例

mysql_cn= MySQLdb.connect(host=dbconfig['host'], port=dbconfig['port'],user=dbconfig['user'], passwd=dbconfig['passwd'], db=dbconfig['db'])

strday = startday.strftime("%Y%m%d")

tsstart=time.mktime(startday.timetuple())

tsend=time.mktime(endday.timetuple())

regdata = pd.DataFrame()

for i in range(0, 20):

sql = "SELECT * FROM USERCONSUMPTIONRECORD%d where TIME > %d AND TIME < %d" % (i, tsstart, tsend)

print sql

#pddf = pd.DataFrame()

pddf = pd.read_sql(sql, con=mysql_cn)

#print pddf.head(5)

if len(pddf.index) > 0:

regdata = regdata.append(pddf,ignore_index=True)

print regdata.tail(5)

if len(regdata.index) > 0:

print regdata.describe()

write("/home/haoren/logstatis/register"+strday+".parq", regdata)

mysql_cn.close()

return

def pudf(x):

return getOpTypeName(x.OPTYPE)

def getMergeData(strday):

dfbill = ParquetFile("/home/haoren/logstatis/billdata"+strday+".parq").to_pandas()

dfwifireg = ParquetFile ("/home/haoren/logstatis/wifiphonereg"+strday+".parq"). to_pandas ()

tempdf = pd.merge(dfbill, dfwifireg, left_on='CONSUMERID', right_on='USERID')

#write("/home/haoren/logstatis/analyze"+strday+".parq", tempdf)

#print tempdf.head(10)

tempdf['OPTYPENAME'] = tempdf.apply(lambda x:getOpTypeName(x.OPTYPE), axis=1)

#print tempdf.head(10)

tempdf['DETAILNAME'] = tempdf.apply(lambda x:getOpDetailName(x.OPTYPE,x.OPDETAIL), axis=1)

df = spark.createDataFrame(tempdf)

df.show(10)

return df

def analyzeDayBillData(startday, endday):

strday = startday.strftime("%Y%m%d")

print strday + '人民幣數據'

 

df = spark.read.load ("/home/haoren/logstatis/billdata"+strday+".parq")

dfwifireg = spark.read.load("/home/haoren/logstatis/wifiphonereg"+strday+".parq")

df3 = df.join (dfwifireg, df.CONSUMERID == dfwifireg.USERID)

df3.show(10)

df3. write.parquet ("/home/haoren/logstatis/analyze"+strday+".parq")

#df2 = df3.withColumn ('OPTYPENAME', udf(getOpTypeName)(df3.OPTYPE))

#df2.show(10)

#df = df2.withColumn('DETAILNAME', udf(getOpDetailName)(df2.OPTYPE, df2.OPDETAIL))

#df.show(10)

#df.createOrReplaceTempView('analyzebilldata')

return

def analyzeDayBillData2(startday, endday):

strday = startday.strftime("%Y%m%d")

print strday + '人民幣數據'

#df = spark.read.load("/home/haoren/logstatis/analyze"+strday+".parq")

df = getMergeData(strday)

return

df2 = df.withColumn('OPTYPENAME', udf(getOpTypeName)(df.OPTYPE))

df2.show(10)

df = df2.withColumn('DETAILNAME', udf(getOpDetailName)(df2.OPTYPE, df2.OPDETAIL))

df.show(10)

df.createOrReplaceTempView('analyzebilldata')

return

** def analyzeBillData(): **

startday = datetime.date(2016, 12, 28)

endday = datetime.date(2016, 12, 28)

td = endday - startday

datelen = td.days + 1

獲取包裹數據

** fromDayToDay ** (startday, datelen, analyzeDayBillData2) #調用前面的函數

print 'analyzeBillData finish'

savePackageData()

getPackageData()

#saveWifiPhoneReg()

#analyzeBillData()

 

#查詢用戶余額代碼案例

 

import sys

import MySQLdb

import pandas as pd

 

optmap = {

'dbuser' : 'aduser',

'dbpass' : '123654',

'dbhost' : '192.168.10.14',

'dbport' : 3306,

'dbname' : 'HBAODB'

}

 

def sql_select(reqsql):

try:

db_conn = MySQLdb.connect (user=optmap['dbuser'], passwd=optmap['dbpass'], host=optmap['dbhost'], port=optmap['dbport'], db=optmap['dbname'])

db_cursor= db_conn.cursor()

db_conn.query("use %s"%optmap['dbname'])

count = db_cursor.execute(reqsql)

ret = db_cursor.fetchall()

 

db_cursor.close()

db_conn.close

return ret

except MySQLdb.Error,e:

print "Mysql ERROR %d:%s" %(e.args[0], e.args[1])

return ''

def getusercoin(userid):

i = int(userid) % 10

reqsql = "select ID,COINREFER from CHARCOIN%u where ID=%u" % (int(i), int(userid))

#print reqsql

ret = sql_select(reqsql) #調用前面的函數

#print ret

return ret[0]

def getall(userlist):

userdata = pd.DataFrame(columns=('userid', 'coin'))

index = 0

for userid in userlist:

coins = getusercoin(userid) #調用前面的函數

#print coins[0],coins[1]/100.0

if coins[0] is not None:

userdata.loc[index] = (str(coins[0]), coins[1]/100.0)

else:

userdata.loc[index] = (str(userid), 0)

index += 1

#print userdata.tail(10)

df = spark.createDataFrame(userdata)

#df.createOrReplaceTempView('userdata')

df.show(50)

 

 

 

** #用戶消費查詢代碼案例 **

 

import sys

import MySQLdb

import pandas as pd

import datetime

import time

 

optmap = {

'dbuser' : 'aduser',

'dbpass' : '123654',

'dbhost' : '192.168.10.12',

'dbport' : 3306,

'dbname' : 'JIESUANDB'

}

 

def sql_select(reqsql) :

try:

db_conn = MySQLdb.connect(user=optmap['dbuser'], passwd=optmap['dbpass'], host=optmap['dbhost'], port=optmap['dbport'], db=optmap['dbname'])

db_cursor=db_conn.cursor()

db_conn.query("use %s"%optmap['dbname'])

count = db_cursor.execute(reqsql)

ret = db_cursor.fetchall()

 

db_cursor.close()

db_conn.close

return ret

except MySQLdb.Error,e:

print "Mysql ERROR %d:%s" %(e.args[0], e.args[1])

return ''

#用戶人民幣消費

def getuserconsume(userid, startday): #定義帶參函數

strdate = startday.strftime("%y%m%d")

送禮物 + 守護 + 點歌 + 表情貼

reqsql = "select CONSUMERID,SUM(DUBIOPNUM) from DUBIJIESUANTONGJI_%s where CONSUMERID=%u AND DBIOPTYPE=2 AND (OPTYPE=1 OR OPTYPE=4 OR OPTYPE=17 OR OPTYPE=25)" % (strdate, int(userid))

print reqsql

ret = sql_select(reqsql) #調用前面的函數

#print ret

if ret0 is not None:

return float(ret0)/100.0

else:

return 0

#用戶充值

def getusercharge(userid, startday):

strdate = startday.strftime("%y%m%d")

reqsql = "select CONSUMERID,SUM(DUBIOPNUM) from DUBIJIESUANTONGJI_%s where CONSUMERID=%u AND DUBIOPTYPE=1 AND (OPTYPE=1016 OR OPTYPE=1020 OR OPTYPE=1021)" % (strdate, int(userid))

print reqsql

ret = sql_select(reqsql)#調用前面的函數

print ret

if ret0 is not None:

return float(ret0)/100.0

else:

return 0

#用戶當天結余人民幣

def getusercurcoin(userid, startday):

strdate = startday.strftime("%y%m%d")

reqsql = "select CONSUMERID,CURRENTNUM from DUBIJIESUANTONGJI_%s where CONSUMERID=%u ORDER BY OPTIME DESC LIMIT 1" % (strdate, int(userid))

print reqsql

ret = sql_select(reqsql)

print ret

if ret:

return float(ret0)/100.0

else:

return 0

def getconsume():

startdate = datetime.date(2017, 1, 1)

enddate = datetime.date(2017, 2, 2)

userid = 3101011990

** userdata ** = pd.DataFrame(columns=('date','userid','charge', 'consume', 'dayleftcoin'))

 

index = 0

計算日差

td = enddate - startdate

datelen = td.days + 1

#print datelen

** delta = datetime.timedelta(days=1) **

allcoins = 0

** for i in range(0,datelen): **

** startday = startdate + delta * i **

consume_coin = getuserconsume(userid, startday) #調用前面的函數

charge = getusercharge(userid, startday) #調用前面的函數

dayleftcoin = getusercurcoin(userid, startday) #調用前面的函數

** userdata ** .loc[index] = (startday.strftime("%Y-%m-%d"),str(userid), charge, consume_coin, dayleftcoin)

index += 1

#userdata.loc[index] = ('total',str(userid), allcoins, 0)

print userdata.tail(100)

return

getconsume()

 

 

 

 

 

** #查詢用戶機器ID 代碼案例 **

 

import sys

import MySQLdb

import pandas as pd

import datetime

 

optmap = {

'dbuser' : 'aduser',

'dbpass' : '123654',

'dbhost' : '192.168.10.15',

'dbport' : 3306,

'dbname' : 'JIQIDB'

}

 

def sql_select(reqsql) :

try:

db_conn = MySQLdb.connect(user=optmap['dbuser'], passwd=optmap['dbpass'], host=optmap['dbhost'], port=optmap['dbport'], db=optmap['dbname'])

db_cursor=db_conn.cursor()

db_conn.query("use %s"%optmap['dbname'])

count = db_cursor.execute(reqsql)

ret = db_cursor.fetchall()

 

db_cursor.close()

db_conn.close

return ret

except MySQLdb.Error,e:

print "Mysql ERROR %d:%s" %(e.args[0], e.args[1])

return ''

def getusermid(userid, months):

i = int(userid) % 50

reqsql = "select USERID,MACHINEID from LOGINHISTORY%s%u where USERID=%u group by MACHINEID" % (months,int(i), int(userid))

print reqsql

ret = sql_select(reqsql)

#print ret

#print ret[0]

return ret

def getall(userlist):

today = datetime.date.today()

months = today.strftime("%Y%m")

** userdata ** = pd.DataFrame(columns=('USERID', 'MACHINEID'))

index = 0

for userid in userlist:

coins = getusermid(userid, months)

for i in range(len(coins)):

#print coins[i]

userdata.loc[index] = (str(coinsi), str(coinsi))

index += 1

#print coins[0],coins[1]/100.0

#userdata.loc[index] = (str(coins[0]), coins[1]/100.0)

#index += 1

#print userdata.tail(10)

** df = spark.createDataFrame(userdata) **

#df.createOrReplaceTempView('userdata')

** df.show(1000) **

 

 

 

 

** #人民幣統計代碼案例 **

from pyspark.sql import Row

from pyspark.sql.types import *

from pyspark.sql.functions import udf

import MySQLdb

import mysql_op

import datetime

import time

from mysql_op import MySQL

import pandas as pd

import numpy as np

from fastparquet import ParquetFile

from fastparquet import write

 

** def fromDayToDay(startdate, datelen, func): **

delta = datetime.timedelta(days=1)

for i in range(0,datelen):

startday = startdate + delta * i

endday = startdate + delta * (i + 1)

func(startday, endday)

return

** def fromDayToEndDay(startdate, datelen, endday, func): **

delta = datetime.timedelta(days=1)

for i in range(0,datelen):

startday = startdate + delta * i

#endday = startdate + delta * (i + 1)

func(startday, endday)

return

 

獲取人民幣數據

def saveDayPackageData(startday, endday):

#數據庫連接參數

dbconfig = {'host':'192.168.10.12',

'port': 3306,

'user':'user',

'passwd':'123654',

'db':'JIESUANDB',

'charset':'utf8'}

 

#連接數據庫,創建這個類的實例

mysql_cn= MySQLdb.connect(host=dbconfig['host'], port=dbconfig['port'],user=dbconfig['user'], passwd=dbconfig['passwd'], db=dbconfig['db'])

strday = startday.strftime("%Y%m%d")

tsstart=time.mktime(startday.timetuple())

tsend=time.mktime(endday.timetuple())

strdate = startday.strftime("%y%m%d")

 

sql = "SELECT OPTIME,CONSUMERID,PRESERVENUM,CURRENTNUM,DUBIOPTYPE,DUBIOPNUM,OPTYPE,OPDETAIL,OPNUM FROM DUBIJIESUANTONGJI_%s" % (strdate)

print sql

pddf = pd.read_sql(sql, con=mysql_cn)

mysql_cn.close()

print pddf.head(5)

dflen = len(pddf.index)

if dflen > 0:

print pddf.describe()

write("/home/haoren/logstatis/billdata"+strday+".parq", pddf)

return

 

** def savePackageData(): **

startday = datetime.date(2016, 12, 28)

endday = datetime.date(2016, 12, 28)

td = endday - startday

datelen = td.days + 1

獲取包裹數據

fromDayToDay(startday, datelen, saveDayPackageData)

獲取WF冊數據

** def saveDayWifiPhoneRegData(startday, endday): **

#數據庫連接參數

dbconfig = {'host':'192.168.10.15',

'port': 3306,

'user':'user',

'passwd':'123654',

'db':'AADB',

'charset':'utf8'}

 

#連接數據庫,創建這個類的實例

mysql_cn= MySQLdb.connect(host=dbconfig['host'], port=dbconfig['port'],user=dbconfig['user'], passwd=dbconfig['passwd'], db=dbconfig['db'])

strday = startday.strftime("%Y%m%d")

tsstart=time.mktime(startday.timetuple())

tsend=time.mktime(endday.timetuple())

strdate = startday.strftime("%y%m%d")

 

sql = "select USERID from NEW_WEB_USER where TIME< %d AND TYPE=17" % (tsend)

print sql

pddf = pd.read_sql(sql, con=mysql_cn)

mysql_cn.close()

print pddf.head(5)

dflen = len(pddf.index)

if dflen > 0:

print pddf.describe()

write("/home/haoren/logstatis/wifiphonereg"+strday+".parq", pddf)

return

 

** def saveWifiPhoneReg(): **

startday = datetime.date(2016, 12, 1)

endday = datetime.date(2016, 12, 1)

td = endday - startday

** datelen = td.days + 1 **

獲取包裹數據

fromDayToDay(startday, datelen, saveDayWifiPhoneRegData)

OPTypeName = {

0:"會員",

1:"道具",

 

}

 

OpDetailName19 = {

1:"購物保存收益",

2:"下注和返注",

3:"發紅包",

4:"搶紅包",

 

}

 

OpDetailName22 = {

1:"活動1收益到總賬號",

2:"活動2收益到總賬號",

3:"活動3收益到總賬號",

 

}

 

OpDetailName23 = {

0:"購買會員",

1:"購買道具",

2:"掃雷",

 

}

 

def getOpTypeName(func):

name = OPTypeName.get(func)

if name == None:

return ""

else:

return name.decode('utf8')

def getOpDetailName(func, detail):

if func == 19:

if detail > 10000 and detail < 30000:

return "包裹回滾".decode('utf8')

elif detail > 50000 and detail < 60000:

return "紅包接龍".decode('utf8')

else:

name = OpDetailName19.get(detail)

if name == None:

return ""

else:

return name.decode('utf8')

elif func == 22:

name = OpDetailName22.get(detail)

if name == None:

return ""

else:

return name.decode('utf8')

elif func == 23:

name = OpDetailName23.get(detail)

if name == None:

return ""

else:

return name.decode('utf8')

else:

return ""

 

** def getDayPackageData(startday, endday): **

strday = startday.strftime("%Y%m%d")

print strday + '人民幣數據'

df = spark.read.load("/home/haoren/logstatis/billdata"+strday+".parq")

df.show(10)

#df.createOrReplaceTempView('billdata')

#df.registerTempTable("billdata")

#sqlret = sqlc.sql("SELECT count(*) from billdata")

#sqlret.show(1)

df2 = df.withColumn('OPTYPENAME', udf(getOpTypeName)(df.OPTYPE))

df2.show(10)

df = df2.withColumn('DETAILNAME', udf(getOpDetailName)(df2.OPTYPE, df2.OPDETAIL))

df.show(10)

df.createOrReplaceTempView('billdata')

return

** def getPackageData(): **

startday = datetime.date(2016, 12, 28)

endday = datetime.date(2016, 12, 28)

td = endday - startday

** datelen = td.days + 1 **

獲取包裹數據

** fromDayToDay ** (startday, datelen, getDayPackageData)#調用前面的函數

print 'getPackageData finish'

 

獲取充值數據

** def getChargeInfo(startday, endday): **

#數據庫連接參數

dbconfig = {'host':'192.168.10.14',

'port': 3306,

'user':'user',

'passwd':'123654',

'db':'BAOIMDB',

'charset':'utf8'}

#連接數據庫,創建這個類的實例

mysql_cn= MySQLdb.connect(host=dbconfig['host'], port=dbconfig['port'],user=dbconfig['user'], passwd=dbconfig['passwd'], db=dbconfig['db'])

strday = startday.strftime("%Y%m%d")

tsstart=time.mktime(startday.timetuple())

tsend=time.mktime(endday.timetuple())

regdata = pd.DataFrame()

for i in range(0, 20):

sql = "SELECT * FROM USERCONSUMPTIONRECORD%d where TIME > %d AND TIME < %d" % (i, tsstart, tsend)

print sql

#pddf = pd.DataFrame()

pddf = pd.read_sql(sql, con=mysql_cn)

#print pddf.head(5)

if len(pddf.index) > 0:

regdata = regdata.append(pddf,ignore_index=True)

print regdata.tail(5)

if len(regdata.index) > 0:

print regdata.describe()

write("/home/haoren/logstatis/register"+strday+".parq", regdata)

mysql_cn.close()

return

def pudf(x):

return getOpTypeName(x.OPTYPE)

def getMergeData(strday):

dfbill = ParquetFile("/home/haoren/logstatis/billdata"+strday+".parq").to_pandas()

dfwifireg = ParquetFile ("/home/haoren/logstatis/wifiphonereg"+strday+".parq"). to_pandas ()

tempdf = pd.merge(dfbill, dfwifireg, left_on='CONSUMERID', right_on='USERID')

#write("/home/haoren/logstatis/analyze"+strday+".parq", tempdf)

#print tempdf.head(10)

tempdf['OPTYPENAME'] = tempdf.apply(lambda x:getOpTypeName(x.OPTYPE), axis=1)

#print tempdf.head(10)

tempdf['DETAILNAME'] = tempdf.apply(lambda x:getOpDetailName(x.OPTYPE,x.OPDETAIL), axis=1)

df = spark.createDataFrame(tempdf)

df.show(10)

return df

def analyzeDayBillData(startday, endday):

strday = startday.strftime("%Y%m%d")

print strday + '人民幣數據'

 

df = spark.read.load ("/home/haoren/logstatis/billdata"+strday+".parq")

dfwifireg = spark.read.load("/home/haoren/logstatis/wifiphonereg"+strday+".parq")

df3 = df.join (dfwifireg, df.CONSUMERID == dfwifireg.USERID)

df3.show(10)

df3. write.parquet ("/home/haoren/logstatis/analyze"+strday+".parq")

#df2 = df3.withColumn ('OPTYPENAME', udf(getOpTypeName)(df3.OPTYPE))

#df2.show(10)

#df = df2.withColumn('DETAILNAME', udf(getOpDetailName)(df2.OPTYPE, df2.OPDETAIL))

#df.show(10)

#df.createOrReplaceTempView('analyzebilldata')

return

def analyzeDayBillData2(startday, endday):

strday = startday.strftime("%Y%m%d")

print strday + '人民幣數據'

#df = spark.read.load("/home/haoren/logstatis/analyze"+strday+".parq")

df = getMergeData(strday)

return

df2 = df.withColumn('OPTYPENAME', udf(getOpTypeName)(df.OPTYPE))

df2.show(10)

df = df2.withColumn('DETAILNAME', udf(getOpDetailName)(df2.OPTYPE, df2.OPDETAIL))

df.show(10)

df.createOrReplaceTempView('analyzebilldata')

return

** def analyzeBillData(): **

startday = datetime.date(2016, 12, 28)

endday = datetime.date(2016, 12, 28)

td = endday - startday

datelen = td.days + 1

獲取包裹數據

** fromDayToDay ** (startday, datelen, analyzeDayBillData2) #調用前面的函數

print 'analyzeBillData finish'

savePackageData()

getPackageData()

#saveWifiPhoneReg()

#analyzeBillData()

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM