public class Main implements Serializable { /** * */ private static final long serialVersionUID = -8513279306224995844L; private static final String MYSQL_USERNAME = "demo"; private static final String MYSQL_PWD = "demo"; private static final String MYSQL_CONNECTION_URL = "jdbc:mysql://192.168.1.91:3306/demo"; private static final JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("SparkSaveToDb").setMaster("local[*]")); private static final SQLContext sqlContext = new SQLContext(sc); public static void main(String[] args) { // Sample data-frame loaded from a JSON file DataFrame usersDf = sqlContext.read().json("users.json"); // Save data-frame to MySQL (or any other JDBC supported databases) Properties connectionProperties = new Properties(); connectionProperties.put("user", MYSQL_USERNAME); connectionProperties.put("password", MYSQL_PWD); // write dataframe to jdbc mysql usersDf.write().mode(SaveMode.Append).jdbc(MYSQL_CONNECTION_URL, "users", connectionProperties); } }
我們為了寫入數據方便測試,需要一個json文件,類似下方:
{"id":994,"name":"Betty","email":"bsmithrl@simplemachines.org","city":"Eláteia","country":"Greece","ip":"9.19.204.44"}, {"id":995,"name":"Anna","email":"alewisrm@canalblog.com","city":"Shangjing","country":"China","ip":"14.207.119.126"}, {"id":996,"name":"David","email":"dgarrettrn@japanpost.jp","city":"Tsarychanka","country":"Ukraine","ip":"111.252.63.159"}, {"id":997,"name":"Heather","email":"hgilbertro@skype.com","city":"Koilás","country":"Greece","ip":"29.57.181.250"}, {"id":998,"name":"Diane","email":"ddanielsrp@statcounter.com","city":"Mapiripán","country":"Colombia","ip":"19.205.181.99"}, {"id":999,"name":"Philip","email":"pfullerrq@reuters.com","city":"El Cairo","country":"Colombia","ip":"210.248.121.194"}, {"id":1000,"name":"Maria","email":"mfordrr@shop-pro.jp","city":"Karabash","country":"Russia","ip":"224.21.41.52"}
讀取文件時,users.json需要與jar包在同一目錄下,測試采用本地運行方式:
DataFrame usersDf = sqlContext.read().json("users.json");
其中,代碼中的這行mode(SaveMode.Append)要特別注意,這個使得每次寫入的數據是增加到數據表中。否則會一直提 示:Exception in thread “main” java.lang.RuntimeException: Table users already exists.
usersDf.write().mode(SaveMode.Append).jdbc(MYSQL_CONNECTION_URL, "users", connectionProperties)