用的本地模式,pom.xml中添加了mysql驱动包,mysql已经开启,写入的时候发现用format("jdbc").save()的方式发现会有does not allow create table as select的异常,于是去官方文档上发现了使用jdbc()的方式,测试
正常,说明下Properties是java.util.Properties
java
1 public class Demo { 2 private static SparkSession session = SparkSession.builder().appName("demo").master("local").getOrCreate(); 3
4 public static void main(String[] args) { 5 Map<String, String> options = new HashMap<>(); 6 options.put("url", "jdbc:mysql://127.0.0.1:3306/studentmanage"); 7 options.put("driver", "com.mysql.jdbc.Driver"); 8 options.put("dbtable", "studentmanage.admin"); 9 options.put("user", "root"); 10 options.put("password", "root"); 11
12 // 读取
13 Dataset<Row> dataset = session.read().format("jdbc").options(options).load(); 14 dataset.show(); 15
16 // 创建数据
17 List<Row> list = new ArrayList<Row>(); 18 Row row1 = RowFactory.create("tele", "123", "male", "China", 1, "admin"); 19 Row row2 = RowFactory.create("wyc", "123", "male", "China", 1, "admin"); 20 Row row3 = RowFactory.create("xxx", "123", "male", "China", 1, "admin"); 21 list.add(row1); 22 list.add(row2); 23 list.add(row3); 24
25 // 写入
26 StructType schema = DataTypes 27 .createStructType(Arrays.asList(DataTypes.createStructField("name", DataTypes.StringType, false), 28 DataTypes.createStructField("pwd", DataTypes.StringType, false), 29 DataTypes.createStructField("sex", DataTypes.StringType, false), 30 DataTypes.createStructField("nation", DataTypes.StringType, false), 31 DataTypes.createStructField("status", DataTypes.IntegerType, false), 32 DataTypes.createStructField("type", DataTypes.StringType, false))); 33
34 Dataset<Row> ds = session.createDataFrame(list, schema); 35
36 Properties connectionProperties = new Properties(); 37 connectionProperties.put("user", "root"); 38 connectionProperties.put("password", "root"); 39
40 // 也可以对dataset进行遍历使用原生的jdbc或者dbutils等进行写入
41 ds.write().mode(SaveMode.Append).jdbc("jdbc:mysql://127.0.0.1:3306/studentmanage", "admin", 42 connectionProperties); 43
44 session.stop(); 45 } 46 }
scala
1 object Demo { 2 def main(args: Array[String]): Unit = { 3 val session = SparkSession.builder().appName("demo").master("local").getOrCreate() 4
5 val options = Map[String, String]( 6 ("url", "jdbc:mysql://127.0.0.1:3306/studentmanage"), 7 ("driver", "com.mysql.jdbc.Driver"), 8 ("dbtable", "studentmanage.admin"), 9 ("user", "root"), 10 ("password", "root")) 11
12 //读取
13 val df = session.read.options(options).format("jdbc").load() 14
15 df.show() 16
17 //写入
18 val arrBuffer = Array(Row("yeye", "123", "male", "us", 1, "admin")).toBuffer 19
20 val schema = DataTypes.createStructType(Array( 21 StructField("name", DataTypes.StringType, false), 22 StructField("pwd", DataTypes.StringType, false), 23 StructField("sex", DataTypes.StringType, false), 24 StructField("nation", DataTypes.StringType, false), 25 StructField("status", DataTypes.IntegerType, false), 26 StructField("type", DataTypes.StringType, false))) 27
28 val result = session.createDataFrame(arrBuffer, schema) 29
30 val properties = new Properties 31 properties.put("user", "root") 32 properties.put("password", "root") 33
34 result.write.mode(SaveMode.Append).jdbc("jdbc:mysql://127.0.0.1:3306/studentmanage", "admin", properties) 35
36 session.stop 37 } 38 }