Neo4j導入數據的三種方式


  導入Neo4j的方式有三種,大概如下:

1. 讀取外部文件到內存中,然后使用create 語句導入之后建立關系。

2. 用load csv 讀取csv 文件

3. 從JDBC直接load 到neo4j 中

這里測試導入的場景如下:

主要包含四個字段: 卡號|用戶名稱|轉賬卡號|轉讓金額;主要邏輯是: 采用序號遞增,從0 - 10000,用戶名稱也是從"user" + 0-10000。轉賬卡號是轉給下一個節點,轉賬金額也是遞增。

  下面的構造數據統一采用從內存中構造數據,構造1W條數據,也就是1W個node,1W條關系。

    private static final Integer DATA_SIZE = 10000;

    private static List<Map<String, Object>> generateData() {
        List<Map<String, Object>> datas = new ArrayList<>(DATA_SIZE);
        Map<String, Object> tmpMap = null;
        for (int i = 0; i < DATA_SIZE; i++) {
            tmpMap = new HashMap<>();
            datas.add(tmpMap);
            tmpMap.put("cardNum", i);
            tmpMap.put("userName", "user" + i);
            tmpMap.put("transferCardNum", (i + 1) % 10000); // 每個卡給自己的下一個卡轉錢
            tmpMap.put("transferAmount", i);
        }
        return datas;
    }

0. pom 引入如下依賴

        <!-- neo4j 相關的API -->
        <dependency>
            <groupId>org.neo4j.driver</groupId>
            <artifactId>neo4j-java-driver</artifactId>
            <version>4.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.neo4j</groupId>
            <artifactId>neo4j</artifactId>
            <version>3.3.4</version>
        </dependency>

1. 外部文件導入neo4j

  這里導入文件讀入文件的過程忽略掉,從內存中模擬1W條數據。

    /**
     * 測試手動插入數據以及維護關系
     */
    private static void inertNeo4jTest() {
        // 構造數據, 數據和pg 庫里面的數據一樣
        List<Map<String, Object>> datas = generateData();

        Driver driver = GraphDatabase.driver("bolt://localhost:7687", AuthTokens.basic("neo4j", "neo4j."));
        Session session = driver.session();

        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        // 手動create to neo4j
        String createCQLTemplate = "create (n:transferDetail {cardNum: '$cardNum', userName: '$userName', transferCardNum: '$transferCardNum', transferAmount: '$transferAmount'})";
        datas.forEach(data -> {
            String createCQL = createCQLTemplate.replace("$cardNum", MapUtils.getString(data, "cardNum"))
                    .replace("$userName", MapUtils.getString(data, "userName"))
                    .replace("$transferCardNum", MapUtils.getString(data, "transferCardNum"))
                    .replace("$transferAmount", MapUtils.getString(data, "transferAmount"));
            session.run(createCQL);
        });
        System.out.println("插入成功耗時: " + stopWatch.getTime() + " ms");
        // 手動維護關系
        String mergeCQLTemplate = "match (a:transferDetail{cardNum: '$cardNum1'}), (b:transferDetail{cardNum: '$cardNum2'}) MERGE(a)-[:TRANSFER{transferAmount: '$transferAmount'}]->(b)";
        datas.forEach(data -> {
            String mergeCQL = mergeCQLTemplate.replace("$cardNum1", MapUtils.getString(data, "cardNum"))
                    .replace("$cardNum2", MapUtils.getString(data, "transferCardNum"))
                    .replace("$transferAmount", MapUtils.getString(data, "transferAmount"));
            session.run(mergeCQL);
        });
        stopWatch.stop();
        System.out.println("轉換關系成功,耗時: " + stopWatch.getTime() + " ms");

        // close resource
        session.close();
        driver.close();
    }

  代碼邏輯很簡單,從內存構造1W條數據-》create 到 neo4j -》手動維護關系。 測試結果耗時如下:

插入成功耗時: 50976 ms
轉換關系成功,耗時: 285111 ms

可以看到消耗時間大概為不到5min。到neo4j 查看數據如下:

MATCH (n:transferDetail) RETURN count(n)

2. 測試load csv

1. pom 引入如下依賴:

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-csv</artifactId>
            <version>1.3</version>
        </dependency>

2. 測試方法: 生成csv 文件,然后load csv

    private static void loadCSVtest() throws Exception {
        // 讀取CSV 文件
//        Reader fileReader = new FileReader(fileName);
//        Iterable<CSVRecord> records = CSVFormat.RFC4180.withFirstRecordAsHeader().parse(fileReader);
//        for (CSVRecord record : records) {
//            System.out.println(record.get("instanceId") + record.get("regionId") + record.get("zoneId"))
//        }
        // 1. 寫入一個csv 到本地, 構造相同的數據
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        Appendable fileWriter = new FileWriter("E:\\neo4j3.5\\neo4j-community-3.5.5\\import\\transfer.csv");
        CSVPrinter printer = CSVFormat.RFC4180.withHeader("cardnum", "username", "transfercardnum", "transferamount").print(fileWriter);
        List<Map<String, Object>> datas = generateData();
        datas.forEach(data -> {
            try {
                printer.printRecord(MapUtils.getString(data, "cardNum"), MapUtils.getString(data, "userName"), MapUtils.getString(data, "transferCardNum"), MapUtils.getString(data, "transferAmount"));
            } catch (IOException ignore) {
                // ignore
            }
        });
        printer.close();
        System.out.println("csv 文件輸出完成, 耗時: " + stopWatch.getTime());

        // 2. csv load 到neo4j
        Driver driver = GraphDatabase.driver("bolt://localhost:7687", AuthTokens.basic("neo4j", "neo4j."));
        Session session = driver.session();
        String deleteCQL = "match (n:transferdetail) detach delete n";
        session.run(deleteCQL);
        System.out.println("neo4j 清空數據庫成功, 耗時: " + stopWatch.getTime() + "ms");
        String constraintCQL = "create constraint on (n:transferdetail) ASSERT n.cardnum is unique";
        session.run(constraintCQL);
        String createCQL = "load csv WITH HEADERS from 'file:///transfer.csv' as line create(n:transferdetail{cardnum:line.cardnum, username:line.username, transfercardnum:line.transfercardnum, transferamount:line.transferamount})";
        session.run(createCQL);
        String relateCQL = "load csv WITH HEADERS from 'file:///transfer.csv' as row match(n:transferdetail{cardnum:row.cardnum}),(m:transferdetail{cardnum:row.transfercardnum}) merge (n)-[:transfer{transferamount:row.transferamount}]->(m)";
        session.run(relateCQL);
        stopWatch.stop();
        System.out.println("load csv導入成功, 耗時: " + stopWatch.getTime() + "ms");
        session.close();
        driver.close();

    }

結果: 可以看到非常的快,比自己手動創建節點然后建立關系快多了

csv 文件輸出完成, 耗時: 269
neo4j 清空數據庫成功, 耗時: 2730ms
load csv導入成功, 耗時: 3070ms

3. 測試從RDBMS 從加載數據

  這里采用apoc 從jdbc 加載數據。這里采用從PG數據庫加載數據。

1. 首先下載apoc 插件

https://github.com/neo4j-contrib/neo4j-apoc-procedures/releases

2. 將下載的jar 包和pg 的驅動包放到 %neo4j%\plugins 目錄下,如下:

3. 修改%neo4j-community-3.5.5%\conf文件夾下面neo4j.conf文件, 最后增加如下配置:

dbms.security.procedures.unrestricted=apoc.*
apoc.export.file.enabled=true

4. 重啟neo4j server

5. 查看apoc 版本: 查看到版本證明apoc 插件安裝成功

return apoc.version()

結果:

 6. 修改程序采用apoc 從jdbc load 數據

(1) 增加pg 驅動

        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <version>42.2.2</version>
        </dependency>

(2) 編寫測試類

    public static final void inertFromPGTest() throws Exception {
        // 1. 構造數據
        List<Map<String, Object>> datas = generateData();

        // 2. 插入到PG庫
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        Connection connection = null;
        try {
            Class.forName("org.postgresql.Driver");
            connection = DriverManager
                    .getConnection("jdbc:postgresql://127.0.0.1:5432/qlq_test",
                            "postgres", "postgres");
        } catch (Exception e) {
            e.printStackTrace();
        }
        Assert.notNull(connection, "鏈接失敗");
        java.sql.Statement statement = connection.createStatement();
        // 清空數據庫
        String trunacteSQL = "truncate table transferdetail";
        statement.execute(trunacteSQL);
        System.out.println("刪除pg數據庫成功, 耗時: " + stopWatch.getTime() + "ms");
        // 插入數據
        List<String> insertSQLs = new ArrayList<>();
        String valueSQL = "('$cardNum', '$userName', '$transferCardNum', '$transferAmount')";
        datas.forEach(data -> {
            insertSQLs.add(valueSQL.replace("$cardNum", MapUtils.getString(data, "cardNum"))
                    .replace("$userName", MapUtils.getString(data, "userName"))
                    .replace("$transferCardNum", MapUtils.getString(data, "transferCardNum"))
                    .replace("$transferAmount", MapUtils.getString(data, "transferAmount")));
        });
        String sql = "insert into transferdetail(cardNum, userName, transferCardNum, transferAmount) values";
        sql += StringUtils.join(insertSQLs, ",");
        statement.execute(sql);
        System.out.println("插入pg數據庫成功, 耗時: " + stopWatch.getTime() + "ms");

        // 3. PG 庫轉換到neo4j 庫
        Driver driver = GraphDatabase.driver("bolt://localhost:7687", AuthTokens.basic("neo4j", "neo4j."));
        Session session = driver.session();
        String deleteCQL = "match (n:transferdetail) detach delete n";
        session.run(deleteCQL);
        System.out.println("neo4j 清空數據庫成功, 耗時: " + stopWatch.getTime() + "ms");
        String constraintCQL = "create constraint on (n:transferdetail) ASSERT n.cardnum is unique";
        session.run(constraintCQL);
//        cypher = "CALL apoc.periodic.iterate(\"CALL apoc.load.jdbc('jdbc:postgresql://10.95.24.33:5432/qlq_test?user=postgres&password=afim36owsx&characterEncoding=utf-8',\\\"SELECT * FROM transferdetail\\\")\",\"MERGE(n:transferdetail{cardnum:row.cardnum, username:row.username, transfercardnum:row.transfercardnum, transferamount:row.transferamount}) with * MERGE(m:transferdetail{cardnum:row.transfercardnum, username:row.username, transfercardnum:row.transfercardnum, transferamount:row.transferamount}) with * create p=(n)-[r:transfer{transferamount:row.transferamount}]->(m)\",{batchSize:10000,iterateList:true})";  //連接postgresSQL數據庫和設計創建neo4j圖數據庫數據模型
        String createCQL = "CALL apoc.periodic.iterate(\"CALL apoc.load.jdbc('jdbc:postgresql://10.95.24.33:5432/qlq_test?user=postgres&password=afim36owsx&characterEncoding=utf-8',\\\"SELECT * FROM transferdetail\\\")\",\"create(n:transferdetail{cardnum:row.cardnum, username:row.username, transfercardnum:row.transfercardnum, transferamount:row.transferamount})\",{batchSize:10000,iterateList:true})";  //連接postgresSQL數據庫和設計創建neo4j圖數據庫數據模型
        session.run(createCQL);
        String relateCQL = "CALL apoc.periodic.iterate(\"CALL apoc.load.jdbc('jdbc:postgresql://10.95.24.33:5432/qlq_test?user=postgres&password=afim36owsx&characterEncoding=utf-8',\\\"SELECT * FROM transferdetail\\\")\",\"match(n:transferdetail{cardnum:row.cardnum}),(m:transferdetail{cardnum:row.transfercardnum}) merge (n)-[:transfer{transferamount:row.transferamount}]->(m)\",{batchSize:10000,iterateList:true})";  //連接postgresSQL數據庫和設計創建neo4j圖數據庫數據模型
//        String cypher2 = "CALL apoc.periodic.iterate(\"CALL apoc.load.jdbc('jdbc:postgresql://10.95.24.33:5432/qlq_test?user=postgres&password=afim36owsx&characterEncoding=utf-8',\\\"SELECT * FROM transferdetail\\\")\",\"match(n:transferdetail{cardnum:row.cardnum}),(m:transferdetail{cardnum:row.transfercardnum}) create p=(n)-[r:transfer{transferamount:row.transferamount}]->(m)\",{batchSize:10000,iterateList:true})";  //連接postgresSQL數據庫和設計創建neo4j圖數據庫數據模型
        session.run(relateCQL);
        stopWatch.stop();
        System.out.println("apoc導入成功, 耗時: " + stopWatch.getTime() + "ms");

        session.close();
        driver.close();
    }

測試結果如下:

刪除pg數據庫成功, 耗時: 546ms
插入pg數據庫成功, 耗時: 976ms
neo4j 清空數據庫成功, 耗時: 2578ms
apoc導入成功, 耗時: 3013ms

 

  可以得出結論,從jdbc load 數據和load csv 從性能上差不多;兩者都快於手動創建和維護關系。

 

補充: 關於neo4j 4.1 導入

  在neo4j 4.1 采用apoc 導入的時候,報錯無法找到JDBC的driver,jdbc 的驅動包確實是放到plugins 目錄了。 未找到原因,最終的解決辦法是采用官方提供的一個通用驅動包,下載地址:

https://github.com/neo4j-contrib/neo4j-jdbc/releases

 

APOC文檔: https://neo4j.com/labs/apoc/4.1/

apoc git: https://github.com/neo4j-contrib/neo4j-apoc-procedures

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM