sparksql jdbc数据源


用的本地模式,pom.xml中添加了mysql驱动包,mysql已经开启,写入的时候发现用format("jdbc").save()的方式发现会有does not allow create table as select的异常,于是去官方文档上发现了使用jdbc()的方式,测试

正常,说明下Properties是java.util.Properties

 

java

 1 public class Demo {  2     private static SparkSession session = SparkSession.builder().appName("demo").master("local").getOrCreate();  3 
 4     public static void main(String[] args) {  5         Map<String, String> options = new HashMap<>();  6         options.put("url", "jdbc:mysql://127.0.0.1:3306/studentmanage");  7         options.put("driver", "com.mysql.jdbc.Driver");  8         options.put("dbtable", "studentmanage.admin");  9         options.put("user", "root"); 10         options.put("password", "root"); 11 
12         // 读取
13         Dataset<Row> dataset = session.read().format("jdbc").options(options).load(); 14  dataset.show(); 15 
16         // 创建数据
17         List<Row> list = new ArrayList<Row>(); 18         Row row1 = RowFactory.create("tele", "123", "male", "China", 1, "admin"); 19         Row row2 = RowFactory.create("wyc", "123", "male", "China", 1, "admin"); 20         Row row3 = RowFactory.create("xxx", "123", "male", "China", 1, "admin"); 21  list.add(row1); 22  list.add(row2); 23  list.add(row3); 24 
25         // 写入
26         StructType schema = DataTypes 27                 .createStructType(Arrays.asList(DataTypes.createStructField("name", DataTypes.StringType, false), 28                         DataTypes.createStructField("pwd", DataTypes.StringType, false), 29                         DataTypes.createStructField("sex", DataTypes.StringType, false), 30                         DataTypes.createStructField("nation", DataTypes.StringType, false), 31                         DataTypes.createStructField("status", DataTypes.IntegerType, false), 32                         DataTypes.createStructField("type", DataTypes.StringType, false))); 33 
34         Dataset<Row> ds = session.createDataFrame(list, schema); 35 
36         Properties connectionProperties = new Properties(); 37         connectionProperties.put("user", "root"); 38         connectionProperties.put("password", "root"); 39 
40         // 也可以对dataset进行遍历使用原生的jdbc或者dbutils等进行写入
41         ds.write().mode(SaveMode.Append).jdbc("jdbc:mysql://127.0.0.1:3306/studentmanage", "admin", 42  connectionProperties); 43 
44  session.stop(); 45  } 46 }

scala

 1 object Demo {  2   def main(args: Array[String]): Unit = {  3     val session = SparkSession.builder().appName("demo").master("local").getOrCreate()  4 
 5     val options = Map[String, String](  6       ("url", "jdbc:mysql://127.0.0.1:3306/studentmanage"),  7       ("driver", "com.mysql.jdbc.Driver"),  8       ("dbtable", "studentmanage.admin"),  9       ("user", "root"), 10       ("password", "root")) 11 
12     //读取
13     val df = session.read.options(options).format("jdbc").load() 14 
15  df.show() 16 
17     //写入
18     val arrBuffer = Array(Row("yeye", "123", "male", "us", 1, "admin")).toBuffer 19 
20     val schema = DataTypes.createStructType(Array( 21       StructField("name", DataTypes.StringType, false), 22       StructField("pwd", DataTypes.StringType, false), 23       StructField("sex", DataTypes.StringType, false), 24       StructField("nation", DataTypes.StringType, false), 25       StructField("status", DataTypes.IntegerType, false), 26       StructField("type", DataTypes.StringType, false))) 27 
28     val result = session.createDataFrame(arrBuffer, schema) 29 
30     val properties = new Properties 31     properties.put("user", "root") 32     properties.put("password", "root") 33 
34     result.write.mode(SaveMode.Append).jdbc("jdbc:mysql://127.0.0.1:3306/studentmanage", "admin", properties) 35 
36  session.stop 37  } 38 }

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM