近期又有需求為:導入Oracle的表到hive庫中;
關於spark讀取Oracle到hive有以下兩點需要說明:
1、數據量較小時,可以直接使用spark.read.jdbc(orclUrl,table_name,orclProperties)讀取,效率應該沒什么問題,能很快完成;
2、數據量較大時候,使用spark.read.jdbc(orclUrl,table_name,分區條件,orclProperties)方法,分區讀取,該方法可根據分區條件同時多線程讀取;原理為在讀取Oracle的SQL最后加入where+不同的分區條件;例如oracle 中的id為1~10;分區之后為where id >=1 and id <=5和where id >=6 and id <=10;兩個線程同時讀取;
源碼如下:spark2.2.0;請注意看官方注釋
/** * Construct a `DataFrame` representing the database table accessible via JDBC URL * url named table using connection properties. The `predicates` parameter gives a list * expressions suitable for inclusion in WHERE clauses; each one defines one partition * of the `DataFrame`. * * Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash * your external database systems. * * @param url JDBC database url of the form `jdbc:subprotocol:subname` * @param table Name of the table in the external database. * @param predicates Condition in the where clause for each partition. * @param connectionProperties JDBC database connection arguments, a list of arbitrary string * tag/value. Normally at least a "user" and "password" property * should be included. "fetchsize" can be used to control the * number of rows per fetch. * @since 1.4.0 */ def jdbc( url: String, table: String, predicates: Array[String], connectionProperties: Properties): DataFrame = { assertNoSpecifiedSchema("jdbc") // connectionProperties should override settings in extraOptions. val params = extraOptions.toMap ++ connectionProperties.asScala.toMap val options = new JDBCOptions(url, table, params) val parts: Array[Partition] = predicates.zipWithIndex.map { case (part, i) => JDBCPartition(part, i) : Partition } val relation = JDBCRelation(parts, options)(sparkSession) sparkSession.baseRelationToDataFrame(relation) }
在實際工作中發現。spark讀取Oracle時,Oracle中的date類型並不能得到很好的支持,例如:2018-10-10 23:00格式的時間,在去讀取到hive表中之后只剩下了2018-10-10,小時和分鍾沒了;
可行的解決方案如下:重寫java的方言,代碼如下:
import org.apache.spark.sql.jdbc.JdbcDialect; import org.apache.spark.sql.jdbc.JdbcDialects; import org.apache.spark.sql.jdbc.JdbcType; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.MetadataBuilder; import scala.Option; import java.sql.Types; public class OracleDateTypeInit { public static void oracleInit() { JdbcDialect dialect = new JdbcDialect() { //判斷是否為oracle庫 @Override public boolean canHandle(String url) { return url.startsWith("jdbc:oracle"); } //用於讀取Oracle數據庫時數據類型的轉換 @Override public Option<DataType> getCatalystType(int sqlType, String typeName, int size, MetadataBuilder md) { if (sqlType == Types.DATE && typeName.equals("DATE") && size == 0) return Option.apply(DataTypes.TimestampType); return Option.empty(); } //用於寫Oracle數據庫時數據類型的轉換 @Override public Option<JdbcType> getJDBCType(DataType dt) { if (DataTypes.StringType.sameType(dt)) { return Option.apply( new JdbcType("VARCHAR2(255)", Types.VARCHAR)); } else if (DataTypes.BooleanType.sameType(dt)) { return Option.apply( new JdbcType("NUMBER(1)", Types.NUMERIC)); } else if (DataTypes.IntegerType.sameType(dt)) { return Option.apply( new JdbcType("NUMBER(10)", Types.NUMERIC)); } else if (DataTypes.LongType.sameType(dt)) { return Option.apply( new JdbcType("NUMBER(19)", Types.NUMERIC)); } else if (DataTypes.DoubleType.sameType(dt)) { return Option.apply( new JdbcType("NUMBER(19,4)", Types.NUMERIC)); } else if (DataTypes.FloatType.sameType(dt)) { return Option.apply( new JdbcType("NUMBER(19,4)", Types.NUMERIC)); } else if (DataTypes.ShortType.sameType(dt)) { return Option.apply( new JdbcType("NUMBER(5)", Types.NUMERIC)); } else if (DataTypes.ByteType.sameType(dt)) { return Option.apply( new JdbcType("NUMBER(3)", Types.NUMERIC)); } else if (DataTypes.BinaryType.sameType(dt)) { return Option.apply( new JdbcType("BLOB", Types.BLOB)); } else if (DataTypes.TimestampType.sameType(dt)) { return Option.apply( new JdbcType("DATE", Types.DATE)); } else if (DataTypes.DateType.sameType(dt)) { return Option.apply( new JdbcType("DATE", Types.DATE)); } else if (DataTypes.createDecimalType() .sameType(dt)) { //unlimited /* return DecimalType.Fixed(precision, scale) =>Some(JdbcType("NUMBER(" + precision + "," + scale + ")", java.sql.Types.NUMERIC))*/ return Option.apply( new JdbcType("NUMBER(38,4)", Types.NUMERIC)); } return Option.empty(); } }; //注冊此方言 JdbcDialects.registerDialect(dialect); } }
使用時調用就可以了
//spark直接讀取hive之后date類型的數據只剩年月日了,需要轉為TimestampType
OracleDateTypeInit.oracleInit()