小白終於進入了職場,從事大數據方面的工作!
分到項目組了,搬磚的時候遇到了一個這樣的問題。
要求:用spark實現oracle的存儲過程中計算部分。
坑:由於報表中包含了一個ID字段,其要求是不同的區域擁有不同的區域ID,且ID在數據庫表中的屬性為主鍵。Oracle的存儲過程中采用的是自定義序列,采用發號的形式實現ID唯一且符合區域特性。
填坑過程:
方法一:sql.functions 中monotonically_increasing_id
。
采用import org.apache.spark.sql.functions.中的
monotonically_increasing_id函數。
使用demo如下:
//從數據庫中加載表TEST_EMP進入內存,並且取ENAME和EMPNO兩列
val dfEmp=sqlContext.read.options(conUtil.con("TEST_EMP"))
.format("jdbc").load()
.select("ENAME","EMPNO")
val test =dfEmp
.withColumn("TEST_NO",monotonically_increasing_id)
//向oracle中寫數據,這個函數的使用前提是需要確定表"EMP_TMP"存在。且向這張表寫入數據的時候最好字段進行對應,如果列多余數據庫中的列數則會出現參數過多的錯誤。
JdbcUtils.saveTable(test, url, "EMP_TMP", properties)
//代碼結果如下所示,在數據庫中生成了一個從0開始自增的列
ENAME | EMPNO | TEST_NO |
SMITH | 7369 | 0 |
ALLEN | 7499 | 1 |
WARD | 7521 | 2 |
JONES | 7566 | 3 |
這個方法有一個缺點:序列是從0開始的,monotonically_increasing_id函數無法接受參數,所以我們無法用其根據我們的業務進行指定序列。
所以,有一個想法於是去看了一下該方法的源碼,發下如下特點:
首先看到函數的定義def monotonically_increasing_id(): Column = withExpr { MonotonicallyIncreasingID() }
深入查看MonotonicallyIncreasingID() ,具體源碼如下:
private[sql] case class MonotonicallyIncreasingID() extends LeafExpression with Nondeterministic {
/**
* Record ID within each partition. By being transient, count's value is reset to 0 every time
* we serialize and deserialize and initialize it.
*/
@transient private[this] var count: Long = _
@transient private[this] var partitionMask: Long = _
override protected def initInternal(): Unit = {
count = 0L
partitionMask = TaskContext.getPartitionId().toLong << 33
}
override def nullable: Boolean = false
override def dataType: DataType = LongType
override protected def evalInternal(input: InternalRow): Long = {
val currentCount = count
count += 1
partitionMask + currentCount
}
override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
val countTerm = ctx.freshName("count")
val partitionMaskTerm = ctx.freshName("partitionMask")
ctx.addMutableState(ctx.JAVA_LONG, countTerm, s"$countTerm = 0L;")
ctx.addMutableState(ctx.JAVA_LONG, partitionMaskTerm,
s"$partitionMaskTerm = ((long) org.apache.spark.TaskContext.getPartitionId()) << 33;")
ev.isNull = "false"
s"""
final ${ctx.javaType(dataType)} ${ev.value} = $partitionMaskTerm + $countTerm;
$countTerm++;
"""
}
}
我們可以發現這個類中重寫了父類的initInternal()方法,指定了初始值count=0L,enmm這樣子的話我們可不可以通過復寫該類中的初始值來滿足我們的業務需求
override protected def initInternal(): Unit = {
count = 0L
partitionMask = TaskContext.getPartitionId().toLong << 33
}
(別想太多,一個業務涉及那么多序列,總不能用一次改一次吧,當然如果技術過硬,自己寫一套方法以及類,用來接收參數1:序列起始值,參數2:序列終止值。當前技術不夠且加班 導致這個想法涼涼)
方法二:rdd算子中的zipWithIndex()方法
代碼demo如下:
val dfEmp=sqlContext.read.options(conUtil.con("TEST_EMP"))
.format("jdbc").load()
.select("ENAME","EMPNO")
//對讀取的dfEmp進行schema加列操作,增加一列且指定列數據類型
val schma=dfEmp.schema.add(StructField("TEST_NO",LongType))
val temp=dfEmp.rdd.zipWithIndex()
//可以在row中指定我們自己業務需求的序列初始值
val changed= temp.map(t => Row.merge(t._1, Row(t._2+340000000)))
val in=sqlContext.createDataFrame(changed,schma)
JdbcUtils.saveTable(in, url, "EMP_TMP", properties)
結果如下所示:
ENAME | EMPNO | TEST_NO |
SMITH | 7369 | 300000000 |
ALLEN | 7499 | 300000001 |
WARD | 7521 | 300000002 |
到此,入職的第一個坑填好了!貌似方法二還能夠用zipWithUniqueId()方法進行實現,由於時間不夠就沒有一一的嘗試了,如果各位小伙伴們有空可以嘗試一下!
同時,如果小伙伴們有更加好的方法,求分享!求指導!感謝!!!!!
歡迎留言!!!!