1.由於之前比較熟悉hive,於是用spark.sql寫,其中包含hive的一些函數,如
unix_timestamp(regexp_replace(time, '[\\x2b]', ' '))是把表中的time里的'+'換成‘ ’,然后再換成時間戳的形式,但是原來的hql的寫法執行時數據沒有實現轉換,hive為:
val df1 = spark.sql(s"select distinct a.userkey as userkey from (select userkey, unix_timestamp(regexp_replace(time, '[\\x2b]', ' '))-login_time as long_time from newsapp.stats_log where a.long_time>3*60*60 ")
數據轉換后的形式:
+--------------------+-------------------+----------+-------------------+----------+ | userkey| time|login_time| nTime| uTime| +--------------------+-------------------+----------+-------------------+----------+ |27d96ed42c2d41e09...| |1478153310| | null| |4363156a27b24bcbb...|2017-03-15+07:30:44|1399678984|2017-03-15 07:30:44|1489534244| | 863049030304334|2017-03-15+07:30:52|1476608445|2017-03-15 07:30:52|1489534252| | 861078037971755|2017-03-15+07:30:43|1488000946|2017-03-15 07:30:43|1489534243| |b6084aeae5e24b2c9...|2017-03-15+07:31:02|1459428271|2017-03-15 07:31:02|1489534262| | 868403023720201|2017-03-15+07:30:52|1488952518|2017-03-15 07:30:52|1489534252| | 860124030093303|2017-03-15+07:30:34|1457489851|2017-03-15 07:30:34|1489534234| | 863427023112338|2017-03-15+07:31:01|1462081635|2017-03-15 07:31:01|1489534261| | 3d2c1c5c1c0975a4|2017-03-15+07:31:02|1486479330|2017-03-15 07:31:02|1489534262| |9e70934c619f97bd3...|2017-03-15+07:30:42|1443790657|2017-03-15 07:30:42|1489534242| | 863454038662610|2017-03-15+07:30:53|1481668023|2017-03-15 07:30:53|1489534253| |d34d8045100af55ab...|2017-03-15+07:30:49|1432129118|2017-03-15 07:30:49|1489534249| | 352709084558295|2017-03-15+07:30:59|1486655767|2017-03-15 07:30:59|1489534259| |fac6e4fdd3a54809a...|2017-03-15+07:29:34|1483451836|2017-03-15 07:29:34|1489534174| | 866530029849022|2017-03-15+07:30:51|1476436247|2017-03-15 07:30:51|1489534251|
2.因此用spark實現這兩種變化的方法如下:
val df1 = spark.sql(s"select userkey, time,login_time from newsapp.stats_log where date like '$date2%' and cast(publish_id as string) RLIKE '^[^378].*?' ") val shapeTime = udf((time:String) => { time.replaceAll("\\+"," ") }) val shapeUnix = udf((time:String) => { unix_timestamp(col(time)) }) val df2 = df1.withColumn("nTime",shapeTime(col("time"))) df2.withColumn("uTime",unix_timestamp(col("nTime"))).createTempView(s"temp1_$date2") val df_online = spark.sql(s"select distinct a.userkey as userkey from (select userkey, utime-login_time as long_time from temp1_$date2 ) a where a.long_time>3*60*60 ")