Neo4j 導入 Nebula Graph 的實踐總結


摘要: 主要介紹如何通過官方 ETL 工具 Exchange 將業務線上數據從 Neo4j 直接導入到 Nebula Graph 以及在導入過程中遇到的問題和優化方法。

本文首發於 Nebula 論壇:https://discuss.nebula-graph.com.cn/t/topic/2044

Neo4j 導入 Nebula Graph 的實踐總結

1 背景

隨着業務數據量不斷增長,業務對圖數據庫在線數據實時更新寫入和查詢的效率要求也不斷增加。Neo4j 存在明顯性能不足,Neo4j 社區開源版本只支持單機部署,擴展能力存在比較大的問題,無法滿足讀寫性能的線性擴展以及讀寫分離的業務需求,並且開源版本 Neo4j 對點和邊的總數據量也有限制;而 Neo4j 企業版因果集群也存在單機主節點 Cypher 實時寫入的性能瓶頸。

相比於 Neo4j,Nebula Graph 最大的特色便是采用 shared-nothing 分布式的架構,無單主寫入瓶頸問題,讀寫支持線性擴展,擅長處理千億節點、萬億條邊的超大規模數據集。

本文主要介紹如何通過官方 ETL 工具 Exchange 將業務線上數據從 Neo4j 直接導入到 Nebula Graph 以及在導入過程中遇到的問題和優化方法。其中絕大部分問題都已經通過論壇發帖的方式得到社區的支持和解決,本文會結合問題進行逐一列舉。

2 部署環境

系統環境:

  • CPU name:Intel(R) Xeon(R) Silver 4114 CPU @ 2.20GHz
  • CPU Cores:40
  • Memory Size:376 GB
  • Disk:HDD
  • System:CentOS Linux release 7.4.1708 (Core)

軟件環境:

  • Neo4j:3.4 版本,五節點因果集群
  • Nebula Graph:
  • Exchange:nebula-java v1.1.0 源碼編譯 jar 包
  • 數倉環境:
    • hadoop-2.7.4
    • spark-2.3.1

注意:單台機器部署 Nebula 多節點的端口分配:每個 storage 還會將用戶配置的端口號 + 1的端口作為內部使用。請參考論壇帖子 nebula從neo4j導入數據出現Get UUID Failed錯誤

3 全量 & 增量數據導入

3.1 全量導入

根據 Neo4j 點和邊的屬性信息創建 Nebula Graph 的 Tag 和 Edge 結構,這里需要注意一點,業務可能會根據不同需求只在部分點和邊上增加 Neo4j 點和邊的屬性信息,其他點和邊對應的屬性為 NULL,所以需要先跟業務明確一下點和邊的全部屬性信息,避免遺漏屬性。Nebula Graph 的 Schema 信息類似 MySQL,支持 Create 和 Alter 添加屬性,並且所有的 Tag 和 Edge 的元數據信息是一致的。

1、Nebula Graph 創建 Tag 和 Edge

# 示例
# 創建圖空間,10 個分區,3 個 storage 副本。
CREATE SPACE test(partition_num=10,replica_factor=3);
# 選擇圖空間 test
USE test;
# 創建標簽 tagA
CREATE TAG tagA(vid string, field-a0 string, field-a1 bool, field-a2 double);
# 創建標簽 tagB
CREATE TAG tagB(vid string, field-b0 string, field-b1 bool, field-b2 double);
# 創建邊類型 edgeAB
CREATE EDGE edgeAB(vid string, field-e0 string, field-e1 bool, field-e2 double);

2、Exchange 導入配置文件

  • Exchange 配置目前不支持 bolt+routing 的方式連接neo4j,如果是因果集群,可以選擇一個從節點進行 bolt 方式直連讀取數據,減少集群壓力。
  • 我們業務的 Neo4j 數據點和邊的 vid 是 string 類型,Nebula v1.x 版本還不支持 string 直接當做 vid(v2.0支持),考慮到官方文檔中的描述:“當點數量到達十億級別時,用 hash 函數生成 vid 有一定的沖突概率。因此 Nebula Graph 提供 UUID 函數來避免大量點時的 vid 沖突。” 選擇了uuid() 作為轉化函數,但是導入效率要比 hash 低,而且 uuid() 在未來版本可能存在兼容問題。
  • partition: 是指 Exchange 從 Neo4j 拉取數據的分頁個數。
  • batch: 是指批量插入 Nebula 的 batch 大小。
{
  # Spark relation config
  spark: {
    app: {
      name: Spark Writer
    }

    driver: {
      cores: 1
      maxResultSize: 1G
    }

    cores {
      max: 16
    }
  }

  # Nebula Graph relation config
  nebula: {
    address:{
      graph:["xxx.xxx.xxx.xx:3699"]
      meta:["xxx.xxx.xxx.xx:45500"]
    }
    user: user
    pswd: password
    space: test

    connection {
      timeout: 3000
      retry: 3
    }

    execution {
      retry: 3
    }

    error: {
      max: 32
      output: /tmp/errors
    }

    rate: {
      limit: 1024
      timeout: 1000
    }
  }
  
  # Processing tags
  tags: [
    # Loading tag from neo4j
    {
      name: tagA
      type: {
        source: neo4j
        sink: client
      }
      server: "bolt://xxx.xxx.xxx.xxx:7687"
      user: neo4j
      password: neo4j
      exec: "match (n:tagA) where id(n) < 300000000 return n.vid as vid, n.field-a0 as field-a0, n.field-a1 as field-a1, n.field-a2 as field-a2 order by id(n)"
      fields: [vid, field-a0, field-a1, field-a2]
      nebula.fields: [vid, field-a0, field-a1, field-a2]
      vertex: {
        field: vid
        policy: "uuid"
      }
      partition: 10
      batch: 1000
      check_point_path: /tmp/test
    }
    # Loading tag from neo4j
    {
      name: tagB
      type: {
        source: neo4j
        sink: client
      }
      server: "bolt://xxx.xxx.xxx.xxx:7687"
      user: neo4j
      password: neo4j
      exec: "match (n:tagB) where id(n) < 300000000 return n.vid as vid, n.field-b0 as field-b0, n.field-b1 as field-b1, n.field-b2 as field-b2 order by id(n)"
      fields: [vid, field-b0, field-b1, field-b2]
      nebula.fields: [vid, field-b0, field-b1, field-b2]
      vertex: {
        field: vid
        policy: "uuid"
      }
      partition: 10
      batch: 1000
      check_point_path: /tmp/test
    }
  ]

  # Processing edges
  edges: [
   # Loading edges from neo4j
    {
      name: edgeAB
      type: {
        source: neo4j
        sink: client
      }
      server: "bolt://xxx.xxx.xxx.xxx:7687"
      user: neo4j
      password: neo4j
      exec: "match (a:tagA)-[r:edgeAB]->(b:tagB) where id(r) < 300000000 return n.vid as vid, n.field-e0 as field-e0, n.field-e1 as field-e1, n.field-e2 as field-e2 order by id(r)"
      fields: [vid, field-e0, field-e1, field-e2]
      nebula.fields: [vid, field-e0, field-e1, field-e2]
      source: {
        field: a.vid
        policy: "uuid"
      }
      target: {
        field: b.vid
        policy: "uuid"
      }
      partition: 10
      batch: 1000
      check_point_path: /tmp/test
    }
  ]
}

3、執行導入命令

nohup spark-submit --class com.vesoft.nebula.tools.importer.Exchange --master "local" exchange-1.1.0.jar -c test.conf > test.log &

4、查看導入 Nebula Graph 的數據量

./bin/db_dump --space=test --db_path=./data/storage/nebula/ --meta_server=127.0.0.1:45500 -limit 0 --mode=stat --tags=tagA,tagB --edges=edgeAB

注意:Nebula 1.x 版本目前還只能用 db_dump 統計,2.0 會支持 nGQL 命令的方式統計數量。

3.2 增量導入

增量數據導入主要是通過 Neo4j 內部點和邊的自增 id() 進行切割,在導入配置文件 exec 項執行 Neo4j Cypher 語句時增加 id() 范圍限制,但前提是需要業務停掉刪數據操作,因為增量導入時,如果之前的數據被刪除后 Neo4j 會復用 id(),這會導致復用 id() 的增量數據導入時查詢不到造成數據丟失。當然業務如果有條件支持 Neo4j Nebula 雙寫的話,增量導入就不會出現這種問題

exec: "match (n:user) where id(n) >= 300000000 and id(n) < 400000000 return xxx order by id(n)"

請參考論壇帖子 neo4j到nebula如何做增量導入

3.3 導入問題及解決

使用 Exchange 導入過程中遇到兩個問題,及時的得到官方 @nicole 的支持和解決,具體請參考下面兩個帖子:

問題 1:Exchange 不支持「換行回車」等特殊字符的轉義。如下 string 數據中帶有回車,在拼接 insert 語句插入時會因為換行導致插入失敗。

Neo4j 導入 Nebula Graph 的實踐總結

PR:https://github.com/vesoft-inc/nebula-java/pull/203 已經合入 exchange v1.0 分支

問題 2:Exchange 不支持屬性為 NULL 的數據導入。前文 3.1 中提到,業務可能會根據不同需求為某些點和邊增加屬性,這時其他點和邊屬性則是 NULL,這樣在使用 Exchange 導入時會報錯。

Neo4j 導入 Nebula Graph 的實踐總結

參考帖子 2 給出的修改建議解決:修改 com.vesoft.nebula.tools.importer.processor.Processor#extraValue,增加 NULL 類型的轉化值。

case NullType => {
  fieldTypeMap(field) match {
    case StringType => ""
    case IntegerType => 0
    case LongType => 0L
    case DoubleType => 0.0
    case BooleanType => false
  }
}

4 導入效率優化

關於導入效率的優化,請參考下面兩個帖子:

優化 1:通過適當增加導入配置中的 partition 和 batch 值,提升導入效率。
優化 2:如果是 string 類型做 vid 的話,1.x 版本盡量使用 hash() 函數轉化,2.0 版本會支持 string id 類型;如果是int類型做vid的話,可以直接使用,不用轉化效率更高。
優化 3:官方建議 spark-submit 提交命令 master 配置改為 yarn-cluster, 若不使用 yarn,可配置成 spark://ip:port;我們是通過 spark-submit --master "local[16]" 的方式增加 spark 並發,導入效率比使用 "local" 提升 4 倍+,測試環境單機三節點 HDD 盤 IO 峰值能到 200-300 MB/s。但在指定 --master "local[16]" 並發導入時遇到 hadoop 緩存問題,采用增加 hdfs 配置 fs.hdfs.impl.disable.cache=true 后重啟 hadoop 解決。具體請參考第二個帖子。

5 總結

使用 Exchange 從 Neo4j 導入 Nebula Graph 過程中遇到一些問題,通過積極與社區進行溝通得到了官方 @nicole 及其他小伙伴的快速響應和大力支持,這一點在 Neo4j 導入 Nebula Graph 的實踐過程中起到了十分關鍵的作用,感謝社區的大力支持。期待支持 openCypher 的 Nebula Graph 2.0。

6 參考鏈接

  1. https://nebula-graph.com.cn/posts/how-to-import-data-from-neo4j-to-nebula-graph/
  2. https://github.com/vesoft-inc/nebula-java/tree/v1.0
  3. https://docs.nebula-graph.com.cn/manual-CN/2.query-language/2.functions-and-operators/uuid/
  4. http://arganzheng.life/hadoop-filesystem-closed-exception.html

推薦閱讀


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM