導入Neo4j的方式有三種,大概如下:
1. 讀取外部文件到內存中,然后使用create 語句導入之后建立關系。
2. 用load csv 讀取csv 文件
3. 從JDBC直接load 到neo4j 中
這里測試導入的場景如下:
主要包含四個字段: 卡號|用戶名稱|轉賬卡號|轉讓金額;主要邏輯是: 采用序號遞增,從0 - 10000,用戶名稱也是從"user" + 0-10000。轉賬卡號是轉給下一個節點,轉賬金額也是遞增。
下面的構造數據統一采用從內存中構造數據,構造1W條數據,也就是1W個node,1W條關系。
private static final Integer DATA_SIZE = 10000; private static List<Map<String, Object>> generateData() { List<Map<String, Object>> datas = new ArrayList<>(DATA_SIZE); Map<String, Object> tmpMap = null; for (int i = 0; i < DATA_SIZE; i++) { tmpMap = new HashMap<>(); datas.add(tmpMap); tmpMap.put("cardNum", i); tmpMap.put("userName", "user" + i); tmpMap.put("transferCardNum", (i + 1) % 10000); // 每個卡給自己的下一個卡轉錢 tmpMap.put("transferAmount", i); } return datas; }
0. pom 引入如下依賴
<!-- neo4j 相關的API --> <dependency> <groupId>org.neo4j.driver</groupId> <artifactId>neo4j-java-driver</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>org.neo4j</groupId> <artifactId>neo4j</artifactId> <version>3.3.4</version> </dependency>
1. 外部文件導入neo4j
這里導入文件讀入文件的過程忽略掉,從內存中模擬1W條數據。
/** * 測試手動插入數據以及維護關系 */ private static void inertNeo4jTest() { // 構造數據, 數據和pg 庫里面的數據一樣 List<Map<String, Object>> datas = generateData(); Driver driver = GraphDatabase.driver("bolt://localhost:7687", AuthTokens.basic("neo4j", "neo4j.")); Session session = driver.session(); StopWatch stopWatch = new StopWatch(); stopWatch.start(); // 手動create to neo4j String createCQLTemplate = "create (n:transferDetail {cardNum: '$cardNum', userName: '$userName', transferCardNum: '$transferCardNum', transferAmount: '$transferAmount'})"; datas.forEach(data -> { String createCQL = createCQLTemplate.replace("$cardNum", MapUtils.getString(data, "cardNum")) .replace("$userName", MapUtils.getString(data, "userName")) .replace("$transferCardNum", MapUtils.getString(data, "transferCardNum")) .replace("$transferAmount", MapUtils.getString(data, "transferAmount")); session.run(createCQL); }); System.out.println("插入成功耗時: " + stopWatch.getTime() + " ms"); // 手動維護關系 String mergeCQLTemplate = "match (a:transferDetail{cardNum: '$cardNum1'}), (b:transferDetail{cardNum: '$cardNum2'}) MERGE(a)-[:TRANSFER{transferAmount: '$transferAmount'}]->(b)"; datas.forEach(data -> { String mergeCQL = mergeCQLTemplate.replace("$cardNum1", MapUtils.getString(data, "cardNum")) .replace("$cardNum2", MapUtils.getString(data, "transferCardNum")) .replace("$transferAmount", MapUtils.getString(data, "transferAmount")); session.run(mergeCQL); }); stopWatch.stop(); System.out.println("轉換關系成功,耗時: " + stopWatch.getTime() + " ms"); // close resource session.close(); driver.close(); }
代碼邏輯很簡單,從內存構造1W條數據-》create 到 neo4j -》手動維護關系。 測試結果耗時如下:
插入成功耗時: 50976 ms
轉換關系成功,耗時: 285111 ms
可以看到消耗時間大概為不到5min。到neo4j 查看數據如下:
MATCH (n:transferDetail) RETURN count(n)
2. 測試load csv
1. pom 引入如下依賴:
<dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-csv</artifactId> <version>1.3</version> </dependency>
2. 測試方法: 生成csv 文件,然后load csv
private static void loadCSVtest() throws Exception { // 讀取CSV 文件 // Reader fileReader = new FileReader(fileName); // Iterable<CSVRecord> records = CSVFormat.RFC4180.withFirstRecordAsHeader().parse(fileReader); // for (CSVRecord record : records) { // System.out.println(record.get("instanceId") + record.get("regionId") + record.get("zoneId")) // } // 1. 寫入一個csv 到本地, 構造相同的數據 StopWatch stopWatch = new StopWatch(); stopWatch.start(); Appendable fileWriter = new FileWriter("E:\\neo4j3.5\\neo4j-community-3.5.5\\import\\transfer.csv"); CSVPrinter printer = CSVFormat.RFC4180.withHeader("cardnum", "username", "transfercardnum", "transferamount").print(fileWriter); List<Map<String, Object>> datas = generateData(); datas.forEach(data -> { try { printer.printRecord(MapUtils.getString(data, "cardNum"), MapUtils.getString(data, "userName"), MapUtils.getString(data, "transferCardNum"), MapUtils.getString(data, "transferAmount")); } catch (IOException ignore) { // ignore } }); printer.close(); System.out.println("csv 文件輸出完成, 耗時: " + stopWatch.getTime()); // 2. csv load 到neo4j Driver driver = GraphDatabase.driver("bolt://localhost:7687", AuthTokens.basic("neo4j", "neo4j.")); Session session = driver.session(); String deleteCQL = "match (n:transferdetail) detach delete n"; session.run(deleteCQL); System.out.println("neo4j 清空數據庫成功, 耗時: " + stopWatch.getTime() + "ms"); String constraintCQL = "create constraint on (n:transferdetail) ASSERT n.cardnum is unique"; session.run(constraintCQL); String createCQL = "load csv WITH HEADERS from 'file:///transfer.csv' as line create(n:transferdetail{cardnum:line.cardnum, username:line.username, transfercardnum:line.transfercardnum, transferamount:line.transferamount})"; session.run(createCQL); String relateCQL = "load csv WITH HEADERS from 'file:///transfer.csv' as row match(n:transferdetail{cardnum:row.cardnum}),(m:transferdetail{cardnum:row.transfercardnum}) merge (n)-[:transfer{transferamount:row.transferamount}]->(m)"; session.run(relateCQL); stopWatch.stop(); System.out.println("load csv導入成功, 耗時: " + stopWatch.getTime() + "ms"); session.close(); driver.close(); }
結果: 可以看到非常的快,比自己手動創建節點然后建立關系快多了
csv 文件輸出完成, 耗時: 269
neo4j 清空數據庫成功, 耗時: 2730ms
load csv導入成功, 耗時: 3070ms
3. 測試從RDBMS 從加載數據
這里采用apoc 從jdbc 加載數據。這里采用從PG數據庫加載數據。
1. 首先下載apoc 插件
https://github.com/neo4j-contrib/neo4j-apoc-procedures/releases
2. 將下載的jar 包和pg 的驅動包放到 %neo4j%\plugins 目錄下,如下:
3. 修改%neo4j-community-3.5.5%\conf文件夾下面neo4j.conf文件, 最后增加如下配置:
dbms.security.procedures.unrestricted=apoc.* apoc.export.file.enabled=true
4. 重啟neo4j server
5. 查看apoc 版本: 查看到版本證明apoc 插件安裝成功
return apoc.version()
結果:
6. 修改程序采用apoc 從jdbc load 數據
(1) 增加pg 驅動
<dependency> <groupId>org.postgresql</groupId> <artifactId>postgresql</artifactId> <version>42.2.2</version> </dependency>
(2) 編寫測試類
public static final void inertFromPGTest() throws Exception { // 1. 構造數據 List<Map<String, Object>> datas = generateData(); // 2. 插入到PG庫 StopWatch stopWatch = new StopWatch(); stopWatch.start(); Connection connection = null; try { Class.forName("org.postgresql.Driver"); connection = DriverManager .getConnection("jdbc:postgresql://127.0.0.1:5432/qlq_test", "postgres", "postgres"); } catch (Exception e) { e.printStackTrace(); } Assert.notNull(connection, "鏈接失敗"); java.sql.Statement statement = connection.createStatement(); // 清空數據庫 String trunacteSQL = "truncate table transferdetail"; statement.execute(trunacteSQL); System.out.println("刪除pg數據庫成功, 耗時: " + stopWatch.getTime() + "ms"); // 插入數據 List<String> insertSQLs = new ArrayList<>(); String valueSQL = "('$cardNum', '$userName', '$transferCardNum', '$transferAmount')"; datas.forEach(data -> { insertSQLs.add(valueSQL.replace("$cardNum", MapUtils.getString(data, "cardNum")) .replace("$userName", MapUtils.getString(data, "userName")) .replace("$transferCardNum", MapUtils.getString(data, "transferCardNum")) .replace("$transferAmount", MapUtils.getString(data, "transferAmount"))); }); String sql = "insert into transferdetail(cardNum, userName, transferCardNum, transferAmount) values"; sql += StringUtils.join(insertSQLs, ","); statement.execute(sql); System.out.println("插入pg數據庫成功, 耗時: " + stopWatch.getTime() + "ms"); // 3. PG 庫轉換到neo4j 庫 Driver driver = GraphDatabase.driver("bolt://localhost:7687", AuthTokens.basic("neo4j", "neo4j.")); Session session = driver.session(); String deleteCQL = "match (n:transferdetail) detach delete n"; session.run(deleteCQL); System.out.println("neo4j 清空數據庫成功, 耗時: " + stopWatch.getTime() + "ms"); String constraintCQL = "create constraint on (n:transferdetail) ASSERT n.cardnum is unique"; session.run(constraintCQL); // cypher = "CALL apoc.periodic.iterate(\"CALL apoc.load.jdbc('jdbc:postgresql://10.95.24.33:5432/qlq_test?user=postgres&password=afim36owsx&characterEncoding=utf-8',\\\"SELECT * FROM transferdetail\\\")\",\"MERGE(n:transferdetail{cardnum:row.cardnum, username:row.username, transfercardnum:row.transfercardnum, transferamount:row.transferamount}) with * MERGE(m:transferdetail{cardnum:row.transfercardnum, username:row.username, transfercardnum:row.transfercardnum, transferamount:row.transferamount}) with * create p=(n)-[r:transfer{transferamount:row.transferamount}]->(m)\",{batchSize:10000,iterateList:true})"; //連接postgresSQL數據庫和設計創建neo4j圖數據庫數據模型 String createCQL = "CALL apoc.periodic.iterate(\"CALL apoc.load.jdbc('jdbc:postgresql://10.95.24.33:5432/qlq_test?user=postgres&password=afim36owsx&characterEncoding=utf-8',\\\"SELECT * FROM transferdetail\\\")\",\"create(n:transferdetail{cardnum:row.cardnum, username:row.username, transfercardnum:row.transfercardnum, transferamount:row.transferamount})\",{batchSize:10000,iterateList:true})"; //連接postgresSQL數據庫和設計創建neo4j圖數據庫數據模型 session.run(createCQL); String relateCQL = "CALL apoc.periodic.iterate(\"CALL apoc.load.jdbc('jdbc:postgresql://10.95.24.33:5432/qlq_test?user=postgres&password=afim36owsx&characterEncoding=utf-8',\\\"SELECT * FROM transferdetail\\\")\",\"match(n:transferdetail{cardnum:row.cardnum}),(m:transferdetail{cardnum:row.transfercardnum}) merge (n)-[:transfer{transferamount:row.transferamount}]->(m)\",{batchSize:10000,iterateList:true})"; //連接postgresSQL數據庫和設計創建neo4j圖數據庫數據模型 // String cypher2 = "CALL apoc.periodic.iterate(\"CALL apoc.load.jdbc('jdbc:postgresql://10.95.24.33:5432/qlq_test?user=postgres&password=afim36owsx&characterEncoding=utf-8',\\\"SELECT * FROM transferdetail\\\")\",\"match(n:transferdetail{cardnum:row.cardnum}),(m:transferdetail{cardnum:row.transfercardnum}) create p=(n)-[r:transfer{transferamount:row.transferamount}]->(m)\",{batchSize:10000,iterateList:true})"; //連接postgresSQL數據庫和設計創建neo4j圖數據庫數據模型 session.run(relateCQL); stopWatch.stop(); System.out.println("apoc導入成功, 耗時: " + stopWatch.getTime() + "ms"); session.close(); driver.close(); }
測試結果如下:
刪除pg數據庫成功, 耗時: 546ms
插入pg數據庫成功, 耗時: 976ms
neo4j 清空數據庫成功, 耗時: 2578ms
apoc導入成功, 耗時: 3013ms
可以得出結論,從jdbc load 數據和load csv 從性能上差不多;兩者都快於手動創建和維護關系。
補充: 關於neo4j 4.1 導入
在neo4j 4.1 采用apoc 導入的時候,報錯無法找到JDBC的driver,jdbc 的驅動包確實是放到plugins 目錄了。 未找到原因,最終的解決辦法是采用官方提供的一個通用驅動包,下載地址:
https://github.com/neo4j-contrib/neo4j-jdbc/releases
APOC文檔: https://neo4j.com/labs/apoc/4.1/
apoc git: https://github.com/neo4j-contrib/neo4j-apoc-procedures