注1:版權所有,轉載必須注明出處。
注2:此手冊根據2018年10月JanusGraph的官方文檔進行整理,后續會陸續更新新知識
注3:本博文原始地址,兩個博客由同一博主管理。
JanusGraph作為Titan的繼承者,汲取了Titan和TinkerPop框架諸多的優點,成為了當前熱門的圖數據庫領域的佼佼者,然后,JanusGraph的中文文檔卻相對匱乏, 成為了JanusGraph入門學習者的障礙之一。本文以JanusGraph中文文檔為基礎,結合作者個人的思考,總計了JanusGraph中文文檔中的關鍵點,希望給大家帶來幫助。
- Intro
- Basics
- 后端存儲(Storage Backends)
- 后端索引(Index Backends)
- 高級功能
Intro
JanusGraph is a graph database engine. JanusGraph itself is focused on compact graph serialization, rich graph data modeling, and efficient query execution. In addition, JanusGraph utilizes Hadoop for graph analytics and batch graph processing. JanusGraph implements robust, modular interfaces for data persistence, data indexing, and client access. JanusGraph’s modular architecture allows it to interoperate with a wide range of storage, index, and client technologies; it also eases the process of extending JanusGraph to support new ones.
總的來說,JanusGraph的一大特性是和其他組件的交互能力強,比如像存儲的Hbase, full-text search的elasticsearch等,也可以很方便地和graphx等olap引擎交互。
Broadly speaking, applications can interact with JanusGraph in two ways:
Embed JanusGraph inside the application executing Gremlin queries directly against the graph within the same JVM. Query execution, JanusGraph’s caches, and transaction handling all happen in the same JVM as the application while data retrieval from the storage backend may be local or remote.
Interact with a local or remote JanusGraph instance by submitting Gremlin queries to the server. JanusGraph natively supports the Gremlin Server component of the Apache TinkerPop stack.
應用程序可以通過兩種方式使用JanusGraph,
- 直接把JanusGraph集成到自己的系統中,在同一個JVM中執行Query
- 通過將query submit到JanusGraph的instance來執行
架構圖:
Basics
Configuration
- 要啟動一個Janus Instance,必須要提供一個configuration文件。
- configuration里面至少要說明storage backend是什么,參考此。
- 如果需要高級功能(e.g full-text search, geo search, or range queries),就需要一個indexing backend。
一個基於Cassandra+ElasticSearch的配置例子:
storage.backend=cql
storage.hostname=localhost
index.search.backend=elasticsearch
index.search.hostname=100.100.101.1, 100.100.101.2
index.search.elasticsearch.client-only=true
數據模型(Schema)
數據的Schema是JanusGraph的一個很重要的部分,數據schema主要定義在三個元素上:頂點,邊和屬性。聽起來可能有點繞口,schema是干什么呢?其實就是去定義頂點的label,邊的label的Property的key有哪些,再通俗點講,就是有哪些類型的點(比如 god
和 human
),有哪些類型的邊(比如 trade
和 friends
),然后點和邊的屬性列表都有哪些key(比如 human
有一個property key是name
和 age
)。
需要注意的幾點:
- schema可以顯示地定義也不可以不顯示地定義,但還是建議提前定義好。
- Schema可以在數據庫使用的過程中更改和進化,這樣並不會影響數據庫正常的服務。
Vertex label
- vertice label描述的是vertice的語義,不過vertice label是optional的,用來區分不同類型的vertice,比如
user
和product
。 - 利用
makeVertexLabel(String).make()
來創建vertice label - vertice label必須保持全局唯一性
下面是個創建vertex label的例子:
mgmt = graph.openManagement()
person = mgmt.makeVertexLabel('person').make()
mgmt.commit()
// Create a labeled vertex
person = graph.addVertex(label, 'person')
// Create an unlabeled vertex
v = graph.addVertex()
graph.tx().commit()
注意,需要顯示地調用 builder
的 make()
函數來完成vertex label的定義。
Edge label
- Edge label主要描述的是relationship的語義(比如friends)
- 在一個事務中,用
makeEdgeLabel(String)
來定義一個edge label,注意,edge label必須是唯一的,這個方法會返回一個builder
,這個builder
可以用來獲取這種edge label的多度關系multiplicity,這個指標限定了每對(pair)之間edge最大的數量。multiplicity
包含的類型有:multi, simple, many2one, one2many, one2one (結合數據庫E-R model這個概念不難理解)。- 默認的
multiplicity
是MULTI
。
下面是創建edge label的一個例子:
mgmt = graph.openManagement()
follow = mgmt.makeEdgeLabel('follow').multiplicity(MULTI).make()
mother = mgmt.makeEdgeLabel('mother').multiplicity(MANY2ONE).make()
mgmt.commit()
注意,需要顯示地調用 builder
的 make()
函數來完成edge label的定義。
Property keys
屬性Property定義在頂點和邊上,當然也可以用來定義其他東西,property是 key-value
對的形式。舉個例子,'name' = 'John'
這就可以看做一個屬性,它定義了 name
這個屬性,這個屬性的value是 'John'。
通過 makePropertyKey(String)
方法來常見Property key。屬性名應該盡可能避免使用空格和特殊字符。
屬性需要關注兩點:
- Property Key Data Type
每個屬性都有自己的數據類型,也就是說它的value必須是某種支持的數據類型,可以通過 dataType(Class)
方法來定義數據類型。JanusGraph會保證加入進來的屬性數據都滿足該數據類型。
屬性數據類型可以申明為 Object.class
,但這並不推薦,最好控制好數據類型,一來可以節省空間,二來可以讓JanusGraph來幫我們檢查導入的數據類型是不是符合要求。
數據類型必須使用具體的類,不能是接口或者抽象類。JanusGraph使用的是完全相當去判斷數據類型是否符合,所以使用子類的數據去導入到父類的屬性中也是不會成功的。
- Property Key Cardinality
用 cardinality(Cardinality)
方法來定義某一個頂點的某個屬性的基底(cardinality)。
基底有三種情況:
- SINGLE:每一個元素對於這個key只能有一個value,比如
birthDate
就是一個single基底,因為每個人最多只能有一個生日。 - LIST:每個元素對於這個key可以有任意數量的值,比如我們建模傳感器(sensor),其有一個屬性是
sensorRecords
,那么,對於這個屬性,可能有一系列的值。注意,LIST是允許有重復元素的。 - SET: 與LIST相同,不同的是,SET不允許有重復的元素。
默認的cardinality是single。注意,對於邊屬性來說,property key的基底始終是single。
下面是在定義property key的時候定義對應的cardinality的例子:
mgmt = graph.openManagement()
birthDate = mgmt.makePropertyKey('birthDate').dataType(Long.class).cardinality(Cardinality.SINGLE).make()
name = mgmt.makePropertyKey('name').dataType(String.class).cardinality(Cardinality.SET).make()
sensorReading = mgmt.makePropertyKey('sensorReading').dataType(Double.class).cardinality(Cardinality.LIST).make()
mgmt.commit()
Relation types
Edge label和property key共同地被稱為relation type。
可以通過 containsRelationType()
方法來檢測relation type是否存在。
下面是個例子:
mgmt = graph.openManagement()
if (mgmt.containsRelationType('name'))
name = mgmt.getPropertyKey('name')
mgmt.getRelationTypes(EdgeLabel.class)
mgmt.commit()
改變schema的元素(Changing Schema Elements)
一旦我們創建好Vertex label, edge label和property key后,就不能更改了,我們唯一能改的是schema的名字,比如下面這個例子:
mgmt = graph.openManagement()
place = mgmt.getPropertyKey('place')
mgmt.changeName(place, 'location')
mgmt.commit()
上面這個例子中,我們把property key的名字從place改成了location。
Schema Constraints
多個property可以綁定到同一個vertex或者edge上面,用 JanusGraphManagement.addProperties(VertexLabel, PropertyKey...)
方法:
// 例子1
mgmt = graph.openManagement()
person = mgmt.makeVertexLabel('person').make()
name = mgmt.makePropertyKey('name').dataType(String.class).cardinality(Cardinality.SET).make()
birthDate = mgmt.makePropertyKey('birthDate').dataType(Long.class).cardinality(Cardinality.SINGLE).make()
mgmt.addProperties(person, name, birthDate)
mgmt.commit()
// 例子2
mgmt = graph.openManagement()
follow = mgmt.makeEdgeLabel('follow').multiplicity(MULTI).make()
name = mgmt.makePropertyKey('name').dataType(String.class).cardinality(Cardinality.SET).make()
mgmt.addProperties(follow, name)
mgmt.commit()
有一種很通俗的定義關系的方法:
mgmt = graph.openManagement()
person = mgmt.makeVertexLabel('person').make()
company = mgmt.makeVertexLabel('company').make()
works = mgmt.makeEdgeLabel('works').multiplicity(MULTI).make()
mgmt.addConnection(works, person, company)
mgmt.commit()
這里,用到是 addConnection()
方法。
Gremlin
Gremlin是JanusGraph默認的query language。
Gremlin用來從JanusGraph里面查詢數據,修改數據,Gremlin是一種traversal查詢語言,可以很方便地查詢此類查詢:
從小明開始,遍歷到他的父親,再從他的父親遍歷到他父親的父親,然后返回他父親的父親的名字
Gremlin是Apache TinkerPop項目的一部分。
關於Gremlin這里就不多做介紹了,汪文星的文檔里做了很詳細的說明。
更多關於JanusGraph中Gremlin相關的說明,參考這個文檔。
JanusGraph Server
- JanusGrpah Server其實就是Gremlin server
- 兩種交互方式:分別是webSocket和HTTP
使用方式
使用預先打好的包
這種方式主要是用來學習使用JanusGraph,生產環境下不建議用這種配置。
./bin/janusgraph.sh start
就可以啟動了,會自動fork cassandra的包,elasticsearch的包,gremlin-server的包,並連接到對應的服務器
$ bin/janusgraph.sh start
Forking Cassandra...
Running `nodetool statusthrift`.. OK (returned exit status 0 and printed string "running").
Forking Elasticsearch...
Connecting to Elasticsearch (127.0.0.1:9300)... OK (connected to 127.0.0.1:9300).
Forking Gremlin-Server...
Connecting to Gremlin-Server (127.0.0.1:8182)... OK (connected to 127.0.0.1:8182).
Run gremlin.sh to connect.
- 使用完畢后關系JanusGraph,使用
./bin/janusgraph.sh stop
命令就可以了
$ bin/janusgraph.sh stop
Killing Gremlin-Server (pid 59687)...
Killing Elasticsearch (pid 59494)...
Killing Cassandra (pid 58809)...
使用WebSocket的方式
上面pre-package的方式其實是使用的內置的backend和index backend(在這個例子里面,分別是cassandra和elasticsearch),其實我們也可以把JanusGraph連接到自己的HBase等Backend。
使用Http的方式
配置方式和使用WebSocket的方式基本相同,可以用Curl命令來test服務的可用性:
curl -XPOST -Hcontent-type:application/json -d '{"gremlin":"g.V().count()"}' http://[IP for JanusGraph server host]:8182
同時使用WebSocket和Http的方式
修改gremlin-server.yaml文件,更改channelizer為:
channelizer: org.apache.tinkerpop.gremlin.server.channel.WsAndHttpChannelizer
一些高級用法
還有一些高級用法,比如認證(Authentication over HTTP),具體這里就不細說了,可以參考官方文檔。
服務部署
- JanusGraph Server本身是個服務器服務,這個服務背后很多Backends(es, Hbase等等),客戶端應用(application)主要通過Gremlin查詢語句和JanusGraph instance交互,JanusGraph instance交互然后和對應的后端交互來執行對應的query。
- 沒有master JanusGraph server的概念,application可以連接任何JanusGraph instance,當然,也可以用負載均衡的方法來分流到不同的JanusGraph instance。
- 各個JanusGraph instance之間並不相互交互。
部署方式:
- 方式1:每個server上起一個JanusGraph Instance,並在同一個server起對應的backend和index
- 方式2:JanusServe和backend server, index server分離
- 方法3:直接embedded到客戶端appplication中,相當於引入了一個jar包
ConfiguredGraphFactory
這個概念比較難以理解。
ConfiguredGraphFactory
是一個singleton,和JanusGraphFactory
一樣。
它們提供了一套API(methods,方法)來動態地操作服務器上的圖。
在gremlin-console下我們可以直接用這個接口去操作圖,如下:
gremlin> :remote connect tinkerpop.server conf/remote.yaml
==>Configured localhost/127.0.0.1:8182
gremlin> :remote console
==>All scripts will now be sent to Gremlin Server - [localhost/127.0.0.1:8182] - type ':remote console' to return to local mode
gremlin> ConfiguredGraphFactory.open("graph1")
==>standardjanusgraph[cassandrathrift:[127.0.0.1]]
gremlin> graph1
==>standardjanusgraph[cassandrathrift:[127.0.0.1]]
gremlin> graph1_traversal
==>graphtraversalsource[standardjanusgraph[cassandrathrift:[127.0.0.1]], standard]
先來談一談 JanusGraphFactory
,它是我們在gremlin-console里面操作一個圖的時候的entry-point,每當我們訪問一個圖的時候,系統就為我們創建了一個Configuration
類的實例。
可以將這個東西和spark-shell里面的sparkSession做類比來方便理解。
ConfiguredGraphFactory
不太一樣,它也是我們訪問、操作一個圖的時候的entry-point,但配置是通過另一個singleton來實現的,叫ConfigurationManagementGraph
。
ConfigurationManagementGraph
使我們可以很方便地管理圖的配置。
就像上面例子一樣,我們可以通過下面兩種方法來訪問一個圖:
ConfiguredGraphFactory.create("graphName")
或者
ConfiguredGraphFactory.open("graphName")
可以通過下面的方法來羅列所有配置好了的圖。配置好是指之前有用ConfigurationManagementGraph
的API配置過:
ConfiguredGraphFactory.getGraphNames()
用下面的放來來drop一個graph database:
ConfiguredGraphFactory.drop("graphName")
如果想使用ConfiguredGraphFactory
這個接口,比如在啟動前JanusGraph server前配置好。修改gremlin-server.yaml文件,在graphs這個section下面,添加一行:
graphs: {
ConfigurationManagementGraph: conf/JanusGraph-configurationmanagement.properties
}
在這個例子中,ConfigurationManagementGraph
這個graph便是使用位於onf/JanusGraph-configurationmanagement.properties
下的配置文件來配置,下面是配置文件的一個例子:
gremlin.graph=org.janusgraph.core.JanusGraphFactory
storage.backend=cql
graph.graphname=ConfigurationManagementGraph
storage.hostname=127.0.0.1
具體ConfigurationManagementGraph
怎么用呢?下面是一個例子(在gremlin-console下):
map = new HashMap<String, Object>();
map.put("storage.backend", "cql");
map.put("storage.hostname", "127.0.0.1");
map.put("graph.graphname", "graph1");
ConfiguredGraphFactory.createConfiguration(new MapConfiguration(map));
// then access the graph
ConfiguredGraphFactory.open("graph1");
graph.graphname
這個屬性指定了上述配置是針對哪張graph的。
Multi-node
- 如果我們希望在JanusGraph server啟動后去動態地創建圖,就要用到上面章節提到的
ConfiguredGraphFactory
,這要求JanusGraph集群的每個server的配置文件里面都聲明了使用JanusGraphManager
和ConfigurationManagementGraph
,具體方法,見這個鏈接。 - 為了保證graph的consistency,每個server node都要使用
ConfiguredGraphFactory
(保持集群上每個node server的配置一致性)。
Indexing
- 支持兩類graph indexing: Graph Index和Vertex-centric Index。
- graph index包含兩類:Composite Index和Mixed Index。
Composite Index
通過下面的方法創建Composite Index:
graph.tx().rollback() //Never create new indexes while a transaction is active
mgmt = graph.openManagement()
name = mgmt.getPropertyKey('name')
age = mgmt.getPropertyKey('age')
mgmt.buildIndex('byNameComposite', Vertex.class).addKey(name).buildCompositeIndex()
mgmt.buildIndex('byNameAndAgeComposite', Vertex.class).addKey(name).addKey(age).buildCompositeIndex()
mgmt.commit()
//Wait for the index to become available
ManagementSystem.awaitGraphIndexStatus(graph, 'byNameComposite').call()
ManagementSystem.awaitGraphIndexStatus(graph, 'byNameAndAgeComposite').call()
//Reindex the existing data
mgmt = graph.openManagement()
mgmt.updateIndex(mgmt.getGraphIndex("byNameComposite"), SchemaAction.REINDEX).get()
mgmt.updateIndex(mgmt.getGraphIndex("byNameAndAgeComposite"), SchemaAction.REINDEX).get()
mgmt.commit()
Composite Index方式不需要特殊地去配置底層的存儲引擎(比如cassandra),主要的底層存儲引擎都支持這種方式。
通過Composite Index可以來保證Property key的唯一性,用下面這種方法:
graph.tx().rollback() //Never create new indexes while a transaction is active
mgmt = graph.openManagement()
name = mgmt.getPropertyKey('name')
mgmt.buildIndex('byNameUnique', Vertex.class).addKey(name).unique().buildCompositeIndex()
mgmt.commit()
//Wait for the index to become available
ManagementSystem.awaitGraphIndexStatus(graph, 'byNameUnique').call()
//Reindex the existing data
mgmt = graph.openManagement()
mgmt.updateIndex(mgmt.getGraphIndex("byNameUnique"), SchemaAction.REINDEX).get()
mgmt.commit()
通過下面的方式來創建Mixed Index:
graph.tx().rollback() //Never create new indexes while a transaction is active
mgmt = graph.openManagement()
name = mgmt.getPropertyKey('name')
age = mgmt.getPropertyKey('age')
mgmt.buildIndex('nameAndAge', Vertex.class).addKey(name).addKey(age).buildMixedIndex("search")
mgmt.commit()
//Wait for the index to become available
ManagementSystem.awaitGraphIndexStatus(graph, 'nameAndAge').call()
//Reindex the existing data
mgmt = graph.openManagement()
mgmt.updateIndex(mgmt.getGraphIndex("nameAndAge"), SchemaAction.REINDEX).get()
mgmt.commit()
Mixed Index方式需要特殊地去配置底層的存儲引擎(比如cassandra)的索引。
Mixed Index比Composite Index更加靈活,但是對於含有相等關系的謂語關系的查詢效率更慢。
buildMixedIndex
方法的參數string要和properties文件中配置的一致,比如:
index.search.backend
這個配置中間的部分search
就是buildMixedIndex
方法的參數。
有了Mixed Index,這面這些query就支持用索引來加速了:
g.V().has('name', textContains('hercules')).has('age', inside(20, 50))
g.V().has('name', textContains('hercules'))
g.V().has('age', lt(50))
g.V().has('age', outside(20, 50))
g.V().has('age', lt(50).or(gte(60)))
g.V().or(__.has('name', textContains('hercules')), __.has('age', inside(20, 50)))
總結兩種Graph Index的區別:
- Use a composite index for exact match index retrievals. Composite indexes do not require configuring or operating an external index system and are often significantly faster than mixed indexes.
a. As an exception, use a mixed index for exact matches when the number of distinct values for query constraint is relatively small or if one value is expected to be associated with many elements in the graph (i.e. in case of low selectivity).
- Use a mixed indexes for numeric range, full-text or geo-spatial indexing. Also, using a mixed index can speed up the order().by() queries.
Vertex-centric Indexes
graph.tx().rollback() //Never create new indexes while a transaction is active
mgmt = graph.openManagement()
time = mgmt.getPropertyKey('time')
rating = mgmt.makePropertyKey('rating').dataType(Double.class).make()
battled = mgmt.getEdgeLabel('battled')
mgmt.buildEdgeIndex(battled, 'battlesByRatingAndTime', Direction.OUT, Order.decr, rating, time)
mgmt.commit()
//Wait for the index to become available
ManagementSystem.awaitRelationIndexStatus(graph, 'battlesByRatingAndTime', 'battled').call()
//Reindex the existing data
mgmt = graph.openManagement()
mgmt.updateIndex(mgmt.getRelationIndex(battled, 'battlesByRatingAndTime'), SchemaAction.REINDEX).get()
mgmt.commit()
注意上面的Index是有順序的,先對rating做index,再對time做index,因此:
h = g.V().has('name', 'hercules').next()
g.V(h).outE('battled').property('rating', 5.0) //Add some rating properties
g.V(h).outE('battled').has('rating', gt(3.0)).inV() // query-1
g.V(h).outE('battled').has('rating', 5.0).has('time', inside(10, 50)).inV() // query-2
g.V(h).outE('battled').has('time', inside(10, 50)).inV() // query-3
上述3個query中,前兩個被加速了,但第三沒並沒有。
JanusGraph automatically builds vertex-centric indexes per edge label and property key. That means, even with thousands of incident battled edges, queries like g.V(h).out('mother') or g.V(h).values('age') are efficiently answered by the local index.
Transaction 事務
- JanusGraph具有事務安全性,可以在多個並行的線程中同時使用。
通常,使用:
graph.V(...) and graph.tx().commit()
這樣的ThreadLocal
接口來執行一次事務。
事務處理的一個例子
根據TinkerPop框架事務機制的描述,每一個線程在它執行第一個操作的時候開啟一個事務:
graph = JanusGraphFactory.open("berkeleyje:/tmp/janusgraph")
juno = graph.addVertex() //Automatically opens a new transaction
juno.property("name", "juno")
graph.tx().commit() //Commits transaction
在上面這個例子中,addVertex()
函數執行的時候,事務被開啟,然后當我們顯示執行graph.tx().commit()
的時候,事務關閉。
當事務還沒有完成,卻調用了graph.close()
關閉了數據庫,那么這個事務的后期行為是不得而知的,一般情況下,事務應該會被回滾。但執行close()
的線程所對應的事務會被順利地commit。
事務處理范圍(scope)
根據TinkerPop框架事務機制的描述,每一個線程在它執行第一個操作的時候開啟一個事務,所有的圖操作元素(頂點,邊,類型等變量等)均和該事務自動綁定了,當我們使用commit()
或者rollback()
關閉/回滾一個事務的時候,這些圖操作元素就會失效,但是,頂點和類型會被自動地轉移到下一個事務中,如下面的例子:
graph = JanusGraphFactory.open("berkeleyje:/tmp/janusgraph")
juno = graph.addVertex() //Automatically opens a new transaction
graph.tx().commit() //Ends transaction
juno.property("name", "juno") //Vertex is automatically transitioned
但是,邊不能自動轉移到下一個事務中,需要顯式地刷新,如下面的例子:
e = juno.addEdge("knows", graph.addVertex())
graph.tx().commit() //Ends transaction
e = g.E(e).next() //Need to refresh edge
e.property("time", 99)
事務失敗(failure)
當我們在創建一個事務,並做一系列操作的時候,應該事先考慮到事務失敗的可能性(IO exceptions, network errors, machine crashes or resource unavailability...),所以推薦用下面的方式處理異常:
try {
if (g.V().has("name", name).iterator().hasNext())
throw new IllegalArgumentException("Username already taken: " + name)
user = graph.addVertex()
user.property("name", name)
graph.tx().commit()
} catch (Exception e) {
//Recover, retry, or return error message
println(e.getMessage())
}
上面的例子描述了一個注冊功能,先檢查名字存不存在,如果不存在,則創建該user,然后commit。
如果事務失敗了,會拋出JanusGraphException
異常,事務失敗有很多種可能,JanusGraph區分了兩種常見的failure:
- potentially temporary failure
- potentially temporary failure主要與是IO異常(IO hiccups (e.g. network timeouts))和資源可用情況(resource unavailability)有關。
- JanusGrpah會自動去嘗試從temporary failure中恢復,重新嘗試commit事務,retry的次數可以配置。
- permanent failure
- permanent failure主要與完全的連接失敗,硬件故障和鎖掙脫有關(complete connection loss, hardware failure or lock contention)。
- 鎖的爭奪,比如兩個人同時同時以"juno"的用戶名去注冊,其中一個事務必然失敗。根據事務的語義,可以通過重試失敗的事務來嘗試從鎖爭奪中恢復(比如使用另一個用戶名)。
- 一般有兩種典型的情形
PermanentLockingException(Local lock contention)
:另一個線程已經被賦予了競爭鎖PermanentLockingException(Expected value mismatch for X: expected=Y vs actual=Z)
:Tips: The verification that the value read in this transaction is the same as the one in the datastore after applying for the lock failed. In other words, another transaction modified the value after it had been read and modified.
多線程事務(Multi-Threaded Transactions)
多線程事務指的是同一個事務可以充分利用機器的多核架構來多線程執行,見TinkerPop關於threaded transaction的描述。
可以通過createThreadedTx()
方法來創建一個線程獨立的事務:
threadedGraph = graph.tx().createThreadedTx(); // Create a threaded transaction
threads = new Thread[10];
for (int i=0; i<threads.length; i++) {
threads[i]=new Thread({
println("Do something with 'threadedGraph''");
});
threads[i].start();
}
for (int i=0; i<threads.length; i++) threads[i].join();
threadedGraph.tx().commit();
createThreadedTx()
方法返回了一個新的Graph
對象,tx()
對象在創建線程的時候,並沒有為每個線程創建一個事務,也就是說,所有的線程都運行在同一個事務中,這樣我們就實現了threaded transaction。
JanusGraph可以支持數百個線程運行在同一個事務中。
通過createThreadedTx()
接口可以很輕松的創建並行算法(Concurrent Algorithms),尤其是那些適用於並行計算的算法。
createThreadedTx()
接口的另一個應用是創建嵌套式的事務(Nested Transactions),具體的例子見這里,這對於那些long-time running的事務尤其有作用。
事務處理的一些常見問題
再次強調一下,JanusGraph的邏輯是,當我們對一個graph
進行第一次操作的時候,事務就自動被打開了,我們並不需要去手動的創建一個事務,除非我們希望創建一個multi-threaded transaction。
每個事務都要顯式地用commit()
和rollback()
方法去關閉。
一個事務在開始的時候,就會去維持它的狀態,在多線程應用的環境中,可能會出現不可預知的問題,比如下面這個例子:
v = g.V(4).next() // Retrieve vertex, first action automatically starts transaction
g.V(v).bothE()
>> returns nothing, v has no edges
//thread is idle for a few seconds, another thread adds edges to v
g.V(v).bothE()
>> still returns nothing because the transactional state from the beginning is maintained
這種情況在客戶端應用端很常見,server會維持多個線程去相應服務器的請求。比較好的一個習慣是,在沒完成一部分工作后,就去顯式地terminate任務,如下面這個例子:
v = g.V(4).next() // Retrieve vertex, first action automatically starts transaction
g.V(v).bothE()
graph.tx().commit()
//thread is idle for a few seconds, another thread adds edges to v
g.V(v).bothE()
>> returns the newly added edge
graph.tx().commit()
多線程事務(Multi-Threaded Transactions)還可以通過newTransaction()
方法來創建,但要注意的是,在該事務中創建的點和邊只在該事務中可用,事務被關閉以后,訪問這些點和邊就會拋出異常,如果還想使用這些點和邊怎么辦?答案是顯式地去刷新這些點和邊,如下面這個例子:
g.V(existingVertex)
g.E(existingEdge)
事務的相關配置(configuration)
JanusGraph.buildTransaction()
也可以啟動一個多線程的事務,因此它和上面提到的newTransaction()
方法其實是一樣的功能,只不過buildTransaction()
方法提供了附加的配置選項。
buildTransaction()
會返回一個TransactionBuilder
實例,TransactionBuilder
實例可以配置選項,這里就詳述了。
配置好后,接着可以調用start()
方法來啟動一個線程,這樣會返回一個JanusGraphTransaction
實例。
緩存機制(JanusGraph Cache)
Transaction-level caching
通過graph.buildTransaction().setVertexCacheSize(int)
可以來設置事務的緩存大小(cache.tx-cache-size)。
當一個事務被打開的時候,維持了兩類緩存:
- Vertex cache
- Index cache
Vertex cache
Vertex cache主要包含了事務中使用到的點,以及這些點的鄰接點列表(adjacency list)的一個子集,這個子集包括了在同一個事務中被取出的該點的鄰接點。所以,heap中vertex cache使用的空間不僅與事務中存在的頂點數有關,還與這些點的鄰接點數目有關。
Vertex cache中能維持的最多的頂點數等於transaction cache size。Vertex cache能顯著地提高iteractive traversal的速度。當然,如果同樣的vertex在后面並沒有被重新訪問,那vertex cache就沒有起到作用。
Index cache
如果在一個事務中,前面部分的query用到了某個索引(index),那么后面使用這個query的時候,獲取結果的速度會大大加快,這就是index cache的作用,當然,如果后面並沒有再使用相同的index和相同的query,那Index query的作用也就沒有體現了。
Index cache中的每個條目都會被賦予一個權重,這個權重等與 2 + result set size
,整個index cache的權重不會超過transaction cache size的一半。
Database-level caching
Database-level caching實現了多個transaction之間cache的共享,從空間利用率上來說,database-lvel caching更經濟,但訪問速度比transaction-level稍慢。
Database-level cache不會在一個事務結束后馬上失效,這使得處於多個事務中的讀操作速度顯著地提高。
Cache Expiration Time
Cache expiration time通過 cache.db-cache-time
(單位為:milliseconds)參數來調整。
這里有一個trick,如果我們只啟動了一個JanusGraph instance,因為沒有另一個instance去改變玩我們的圖(graph),cache expiration time便可以設置為 0
,這樣的話,cache就會無限期保留處於cache中的元素(除非因為空間不足被頂替)。
如果我們啟動了多個JanusGraph實例,這個時間應該被設置成當一個instance修改了圖,另一個instance能夠看到修改所需要等待的最大時間。如果希望修改被其他的instance迅速能夠看到,那么應該停止使用Database-level cache。允許的時延越長,cache的效率越高。
當然,一個JanusGraph instance始終能夠馬上看到它自己做出的修改。
Cache Size
Cache size通過 cache.db-cache-size
參數來控制Database-level的cache能使用多少heap space,cache越大, 效率越高,但由此帶來的GC性能問題卻不容小覷。
cache size也可以配置成百分比(占整個剩余的heap空間的比例)。
注意cache size的配置是排他性的,也就是說cache會獨占你配置的空間,其他資源(e.g. Gremlin Server, embedded Cassandra, etc)也需要heap spave,所以不要把cache size配置得太過分,否則可能會引起out of memory錯誤或者GC問題。
Clean Up Wait Time
還有一個需要注意的參數是 cache.db-cache-clean-wait
,當一個頂點被修改后,所有與該頂點有關的Database-level的cache都會失效並且會被驅逐。JanusGraph會從底層的storage backend重新fetch新的頂點數據,並re-populate緩存。
cache.db-cache-clean-wait
這個參數可以控制,cache會等待 cache.db-cache-clean-wait
milliseconds時間再repopulate新的cache。
Storage Backend Caching
底層的storage backend也會有自己的cache策略,這個就要參考對應的底層存儲的文檔了。
事務日志(Transaction Log)
可以啟動記錄事務的變化日志,日志可以用來在后期進行處理,或者作為一個record備份。
在啟動一個事務的時候指定是否需要采集日志:
tx = graph.buildTransaction().logIdentifier('addedPerson').start()
u = tx.addVertex(label, 'human')
u.property('name', 'proteros')
u.property('age', 36)
tx.commit()
這里有個 log processor
的概念,其實就是內置的日志監聽處理器。例如,可以統計在一個transaction里面,某一個label下被添加的頂點的數目。
其他常見問題
- Accidental type creation
默認地,當遇到一個新的type的時候,janusGrpah會自動地去創建property keys和邊的label。對於schema這個問題,還是建議用戶自己去定義schema,而不要使用自動發現schema的方式,可以在配置文件里面如下申明關閉自動infer schema的功能:
schema.default = none
創建type的操作也不宜放在不同的線程中,因為這會引起不可知的后果,最好放到一個batch操作中把所有的type都事先創建好,然后再去做其他的圖操作。
- Custom Class Datatype
可以自己去創建class,作為value的類別。
- Transactional Scope for Edges
邊應該先取出來,再操作,每個transaction都是有scope的,如果超出了這個scope,去訪問之前的邊,會報錯。
- Ghost Vertices
這個概念比較新奇,指的是:我們在一個事務中刪除了一個vertex,卻同時在另一個transaction中修改了它。這種情況下,這個vertex還是會依然存在的,我們稱這種vertex為ghost vertex。
對於這個問題的解決辦法最好是暫時允許ghost vertices,然后定期地寫腳本去刪除它們。
- Debug-level Logging Slows Execution
Log level如果被設置成了 DEBUG
,輸出可能會很大,日志中會包括一個query如何被編譯、優化和執行的過程,這會顯著地影響處理的性能,在生產環境下,不建議使用 DEBUG
level,建議是用 INFO
level。
- Elasticsearch OutOfMemoryException
當有很多客戶端連接到Elasticsearch的時候,可能會報 OutOfMemoryException
異常,通常這不是一個內存溢出的問題,而是OS不允許運行ES的用戶起太多的進程。
可以通過調整運行es的用戶可運行的進程數(number of allowed processes)也許可以解決這個問題。
- Dropping a Database
刪除一個graph instance,可以使用:
JanusGraphFactory.drop(graph)
接口
graph = JanusGraphFactory.open('path/to/configuration.properties')
JanusGraphFactory.drop(graph);
ConfiguredGraphFactory
接口
graph = ConfiguredGraphFactory.open('example')
ConfiguredGraphFactory.drop('example');
Note:
0.3.0
以前的版本除了需要執行上面的命令,還需要顯示地調用 JanusGraphFactory.close(graph)
和 ConfiguredGraphFactory.close("example")
來關閉這個graph,以防cache中還存在這個graph,造成錯誤。
技術上的限制(Technical Limitations)
這個部分可以理解JanusGrpah還存在的一些可改進或者無法改進的地方。
設計上的限制
下面這些缺陷是JanusGraph天然設計上的一些缺陷,這些缺陷短期內是得不到解決的。
- Size Limitation
JanusGraph can store up to a quintillion edges (2^60) and half as many vertices. That limitation is imposed by JanusGraph’s id scheme.
- DataType Definitions
當我們用 dataType(Class)
接口去定義property key的數據類型的時候,JanusGraph會enforce該key的所有屬性都嚴格是該類型。這是嚴格的類型判斷,子類也不可行。比如,如果我們定義的數據類型是Number.class
,使用的卻是Integer
或者Long
型,這種情況大多數情況下會報錯。
用Object.class
或許可以解決這個問題,比較靈活,但帶來的問題也顯而易見,那就是性能上會降低,同時數據類型的check也會失效。
- Type Definitions cannot be changed
Edge label, vertex label和property key一旦創建就不能改變了,當然,type可以被重命名,新的類型也可以在runtime中創建,所以schema是支持envolving的。
- 保留的關鍵字(Reserved Keywords)
下面是JanusGraph保留的關鍵字,不要使用這些關鍵字作為變量名、函數名等。
- vertex
- element
- edge
- property
- label
- key
臨時的缺陷
下面這些缺陷在將來的release中會被逐漸解決。
- Limited Mixed Index Support
Mixed Index只支持JanusGraph所支持的數據類型(Data type)的一個子集,Mixed Index目前也不支持 SET
和 LIST
作為基數(cardinality)的property key。
- Batch Loading Speed
可以通過下面的configuration來開啟 batch loading mode
:
Name | Description | Datatype | Default Value | Mutability |
---|---|---|---|---|
storage.batch-loading | Whether to enable batch loading into the storage backend | Boolean | false | LOCAL |
這個trick其實並沒有使用底層的backend的batch loading技術。
另一個限制是,如果同時向一個頂點導入數百萬條邊,這種情況下很可能failure,我們稱這種loading為 supernode loading
,這種loading之所以失敗是受到了后端存儲的限制,具體這里就不細數了。
后端存儲(Storage Backends)
Apache Cassandra
JanusGraph后端通過四種方式來支持整合Cassandra:
- cql - CQL based driver (推薦使用)
- cassandrathrift - JanusGraph’s Thrift connection pool driver (v2.3以后退休了,也不建議使用)
- cassandra - Astyanax driver. The Astyanax project is retired.
- embeddedcassandra - Embedded driver for running Cassandra and JanusGraph within the same JVM(測試可以,但生產環境不建議使用這種方式)
Local Server Mode
Cassandra可以作為一個standalone的數據庫與JanusGraph一樣運行在localhost,在這種情況下,JanusGraph和Cassandra之間通過socket通信。
通過下面的步驟配置JanusGraph on Cassandra:
- Download Cassandra, unpack it, and set filesystem paths in
conf/cassandra.yaml
andconf/log4j-server.properties
- Connecting Gremlin Server to Cassandra using the default configuration files provided in the pre-packaged distribution requires that Cassandra Thrift is enabled. To enable Cassandra Thrift open conf/cassandra.yaml and update start_rpc: false to start_rpc: true. If Cassandra is already running Thrift can be started manually with bin/nodetool enablethrift. the Thrift status can be verified with bin/nodetool statusthrift.
- Start Cassandra by invoking bin/cassandra -f on the command line in the directory where Cassandra was unpacked. Read output to check that Cassandra started successfully.
Now, you can create a Cassandra JanusGraph as follows
JanusGraph g = JanusGraphFactory.build().
set("storage.backend", "cql").
set("storage.hostname", "127.0.0.1").
open();
Local Container Mode
通過docker安裝Cassandra,去release界面看一下JanusGraph版本測試通過的Cassandra版本,使用下面的docker命令運行Cassandra:
docker run --name jg-cassandra -d -e CASSANDRA_START_RPC=true -p 9160:9160 \
-p 9042:9042 -p 7199:7199 -p 7001:7001 -p 7000:7000 cassandra:3.11
Note: Port 9160 is used for the Thrift client API. Port 9042 is for CQL native clients. Ports 7000, 7001 and 7099 are for inter-node communication.
Remote Server Mode
集群模式下,有一個Cassandra集群,然后所有的JanusGraph的instance通過socket的方式去讀寫Cassandra集群,客戶端應用程序也可以和JanusGraph的實例運行在同一個JVM中。
舉個例子,我們啟動了一個Cassandra的集群,其中一個機器的IP是77.77.77.77
,我們可以通過以下方式連接:
JanusGraph graph = JanusGraphFactory.build().
set("storage.backend", "cql").
set("storage.hostname", "77.77.77.77").
open();
Remote Server Mode with Gremlin Server
可以在JanusGraph server外面包上一層Gremlin server,這樣,不僅可以和JanusGraph server交互,也可以和Gremlin server交互。
通過:
bin/gremlin-server.sh
啟動Gremlin server,然后通過 bin/gremlin.sh
打開Gremlin的終端,然后運行:
:plugin use tinkerpop.server
:remote connect tinkerpop.server conf/remote.yaml
:> g.addV()
便可以了。
JanusGraph Embedded Mode
Cassandra也可以整合到JanusGraph中,在這種情況下,JanusGraph和Cassandra運行在同一個JVM中,本次通過進程間通信而不是網絡傳輸信息,這種情況通過在performance上有很大的幫助。
如果想使用Cassandra的embedded mode,需要配置embeddedcassandra
作為存儲后端。
這種embedded模式推薦通過Gremlin server來暴露JanusGraph。
需要注意的是,embedded方式需要GC調優。
Apache HBase
Local Server Mode
- 從此處下載一個HBase的stable release。
- Start HBase by invoking the
start-hbase.sh
script in the bin directory inside the extracted HBase directory. To stop HBase, usestop-hbase.sh
.
然后通過:
JanusGraph graph = JanusGraphFactory.build()
.set("storage.backend", "hbase")
.open();
連接到HBase。
Remote Server Mode
集群模式,JanusGraph的實例通過socket與HBase建立連接,並進行讀寫操作。
假設我們啟動了一個HBase並使用zookeeper作為協調器,zk的IP是 77.77.77.77
, 77.77.77.78
和77.77.77.79
,那么我們通過下面的方式連接到HBase:
JanusGraph g = JanusGraphFactory.build()
.set("storage.backend", "hbase")
.set("storage.hostname", "77.77.77.77, 77.77.77.78, 77.77.77.79")
.open();
Remote Server Mode with Gremlin Server
和Cassandra章節講的一樣,我們可以在JanusGraph server外面再包一層Gremlin server:
http://gremlin-server.janusgraph.machine1/mygraph/vertices/1
http://gremlin-server.janusgraph.machine2/mygraph/tp/gremlin?script=g.v(1).out('follows').out('created')
Gremlin-server的配置文件也要做響應的修改,下面是個例子:
...
graphs: {
g: conf/janusgraph-hbase.properties
}
scriptEngines: {
gremlin-groovy: {
plugins: { org.janusgraph.graphdb.tinkerpop.plugin.JanusGraphGremlinPlugin: {},
org.apache.tinkerpop.gremlin.server.jsr223.GremlinServerGremlinPlugin: {},
org.apache.tinkerpop.gremlin.tinkergraph.jsr223.TinkerGraphGremlinPlugin: {},
org.apache.tinkerpop.gremlin.jsr223.ImportGremlinPlugin: {classImports: [java.lang.Math], methodImports: [java.lang.Math#*]},
org.apache.tinkerpop.gremlin.jsr223.ScriptFileGremlinPlugin: {files: [scripts/empty-sample.groovy]}}}}
...
HBase的配置
在配置說明章節,有一系列配置選項,要注意 storage.hbase.table
參數,默認table的名字是janusgraph
。
Gloabl Graph Operations
JanusGraph over HBase 支持全局的點和邊的遍歷,但這種情況下,會把所有的點和邊導入到內存中,可能會報 OutOfMemoryException
錯誤。可以使用 Gremlin-hadoop 的方式去遍歷。
InMemory Storage Backend
JanusGraph支持使用純內存,可以通過下面的屬性配置:
storage.backend=inmemory
可以在Gremlin-console直接打開內存中的圖:
graph = JanusGraphFactory.build().set('storage.backend', 'inmemory').open()
這種情況下,如果關閉該圖或者停止graph的進程,圖的所有數據都會丟失。這種模式也只支持單點模式,不支持多個JanusGraph的圖實例共享。
這種存儲策略不適合在生產中使用。
后端索引(Index Backends)
查詢謂語和數據類型
比較謂語
- eq (equal)
- neq (not equal)
- gt (greater than)
- gte (greater than or equal)
- lt (less than)
- lte (less than or equal)
文本操作謂語(Text Predicate)
主要可以用這些operator做full-text search,常見的有兩類:
- String中以詞為粒度的
- textContains
- textContainsPrefix
- textContainsRegex
- textContainsFuzzy
- 以整個String為粒度的
- textPrefix
- textRegex
- textFuzzy
區間操作謂語(Geo Predicate)
區間操作謂語包括:
- geoIntersect
- geoWithin
- geoDisjoint
- geoContains
查詢樣例
g.V().has("name", "hercules")
// 2) Find all vertices with an age greater than 50
g.V().has("age", gt(50))
// or find all vertices between 1000 (inclusive) and 5000 (exclusive) years of age and order by increasing age
g.V().has("age", inside(1000, 5000)).order().by("age", incr)
// which returns the same result set as the following query but in reverse order
g.V().has("age", inside(1000, 5000)).order().by("age", decr)
// 3) Find all edges where the place is at most 50 kilometers from the given latitude-longitude pair
g.E().has("place", geoWithin(Geoshape.circle(37.97, 23.72, 50)))
// 4) Find all edges where reason contains the word "loves"
g.E().has("reason", textContains("loves"))
// or all edges which contain two words (need to chunk into individual words)
g.E().has("reason", textContains("loves")).has("reason", textContains("breezes"))
// or all edges which contain words that start with "lov"
g.E().has("reason", textContainsPrefix("lov"))
// or all edges which contain words that match the regular expression "br[ez]*s" in their entirety
g.E().has("reason", textContainsRegex("br[ez]*s"))
// or all edges which contain words similar to "love"
g.E().has("reason", textContainsFuzzy("love"))
// 5) Find all vertices older than a thousand years and named "saturn"
g.V().has("age", gt(1000)).has("name", "saturn")
數據類型(Data Type Support)
Composite index可以支持任何類型的index,mixed index只支持下面的數據類型:
- Byte
- Short
- Integer
- Long
- Float
- Double
- String
- Geoshape
- Date
- Instant
- UUID
Geoshape Data Type
只有mixed indexes
支持Geoshape Data Type,支持的數據類型有point, circle, box, line, polygon, multi-point, multi-line 和 multi-polygon。
集合類型(Collections)
如果使用 Elasticsearch
,可以索引cardinality為 SET
或者 LIST
的屬性,如下面的例子:
mgmt = graph.openManagement()
nameProperty = mgmt.makePropertyKey("names").dataType(String.class).cardinality(Cardinality.SET).make()
mgmt.buildIndex("search", Vertex.class).addKey(nameProperty, Mapping.STRING.asParameter()).buildMixedIndex("search")
mgmt.commit()
//Insert a vertex
person = graph.addVertex()
person.property("names", "Robert")
person.property("names", "Bob")
graph.tx().commit()
//Now query it
g.V().has("names", "Bob").count().next() //1
g.V().has("names", "Robert").count().next() //1
索引參數和全局搜索
當我們定義一個Mixed index的時候,每一個被添加到索引中的property key都有一系列參數可以設置。
Full-Text Search
全局索引,這是一個很重要的功能。當我們去索引字符串類型的property key的時候,我們可以選擇從character層面后者text層面去索引,這需要改變 mapping
參數。
當我們從text層面去索引的時候,字符串會被tokenize成bag of words,用戶便可以去query是否包含一個或多個詞,這叫做 full-text search。
當我們從char層面去索引的時候,string會直接和char串做match,不會有futher analysis后者tokenize操作。這可以方便我們去查找是否包含某個字符序列,這也叫做 string search。
下面分開講:
Full-Text Search
默認地,string會使用text層面的索引,可以通過下面的方式顯示地去創建:
mgmt = graph.openManagement()
summary = mgmt.makePropertyKey('booksummary').dataType(String.class).make()
mgmt.buildIndex('booksBySummary', Vertex.class).addKey(summary, Mapping.TEXT.asParameter()).buildMixedIndex("search")
mgmt.commit()
可以看到,這和普通的創建索引唯一的一個區別是我們在調用 addKey()
方法的時候,多添加了一個 Mapping.TEXT
映射參數。
前面我們提到過,如果是使用text層面的index,JanusGraph會自己去維護一個bag of words,JanusGraph默認的tokenization方案是:它會使用非字母數字的字段去split,然后會移除到所有小於2個字符的token。
當我們使用text層面的index的時候,只有全局索引的謂語才真正用到了我們創建的索引,包括textContains
方法,textContainsPrefix
方法,textContainsRegex
方法和textContainsFuzzy
方法,注意,full-text search是case-insensitive的,下面是具體的例子:
import static org.janusgraph.core.attribute.Text.*
g.V().has('booksummary', textContains('unicorns'))
g.V().has('booksummary', textContainsPrefix('uni'))
g.V().has('booksummary', textContainsRegex('.*corn.*'))
g.V().has('booksummary', textContainsFuzzy('unicorn'))
String Search
首先要明確的是,string search會把數據load到內存中,這其實是非常costly的。
可以通過下面的方式去顯示地創建string search:
mgmt = graph.openManagement()
name = mgmt.makePropertyKey('bookname').dataType(String.class).make()
mgmt.buildIndex('booksBySummary', Vertex.class).addKey(name, Mapping.STRING.asParameter()).buildMixedIndex("search")
mgmt.commit()
這種 bookname
會按照 as-is
的方式去分析,包括stop word和no-letter character。
當我們使用string層面的index的時候,只有下面的謂語才真正用到了我們創建的索引,包括eq
,neq
、textPrefix
、textRegex
和textFuzzy
。注意,string search是case-insensitive的,下面是具體的例子:
import static org.apache.tinkerpop.gremlin.process.traversal.P.*
import static org.janusgraph.core.attribute.Text.*
g.V().has('bookname', eq('unicorns'))
g.V().has('bookname', neq('unicorns'))
g.V().has('bookname', textPrefix('uni'))
g.V().has('bookname', textRegex('.*corn.*'))
g.V().has('bookname', textFuzzy('unicorn'))
同時使用text和string層面的full-text search
如果我們使用elasticsearch作為后端,這樣就可以用所有的謂語去做精確或者模糊的查詢了。
通過下面的方式創建這種叫做 Mapping.TEXTSTRING
的full-text search方案:
mgmt = graph.openManagement()
summary = mgmt.makePropertyKey('booksummary').dataType(String.class).make()
mgmt.buildIndex('booksBySummary', Vertex.class).addKey(summary, Mapping.TEXTSTRING.asParameter()).buildMixedIndex("search")
mgmt.commit()
Geo Mapping
默認地,JanusGraph支持索引點(point)的索引,並且去查詢circle或者box類型的property,如果想索引一個非-點類型的property,需要使用 Mapping.PREFIX_TREE
:
mgmt = graph.openManagement()
name = mgmt.makePropertyKey('border').dataType(Geoshape.class).make()
mgmt.buildIndex('borderIndex', Vertex.class).addKey(name, Mapping.PREFIX_TREE.asParameter()).buildMixedIndex("search")
mgmt.commit()
Direct Index Query
可以直接向index backend發送query,下面是個例子:
ManagementSystem mgmt = graph.openManagement();
PropertyKey text = mgmt.makePropertyKey("text").dataType(String.class).make();
mgmt.buildIndex("vertexByText", Vertex.class).addKey(text).buildMixedIndex("search");
mgmt.commit();
// ... Load vertices ...
for (Result<Vertex> result : graph.indexQuery("vertexByText", "v.text:(farm uncle berry)").vertices()) {
System.out.println(result.getElement() + ": " + result.getScore());
}
需要指明兩個元素:
- 想要查詢的index backend的index名字,在上面的例子中是
vertexByText
。 - 查詢語句,在上面的例子中是
v.text:(farm uncle berry)
。
Elasticsearch
Running elasticsearch
下載包里面本身包含兼容的elasticsearch的distribution,通過:
elasticsearch/bin/elasticsearch
來運行elasticsearch。要注意的是,es不能使用root運行。
ES配置
JanusGraph支持通過HTTP客戶端連接到正在運行的ES集群。
在配置文件中,Elasticsearch client需要通過下面這一行指明:
index.search.backend=elasticsearch
通過 index.[X].hostname
指明某一個或者一系列es的實例的地址:
index.search.backend=elasticsearch
index.search.hostname=10.0.0.10:9200
可以通過下面的方式綁定要一段連續的IP:PORT對:
index.search.backend=elasticsearch
index.search.hostname=10.0.0.10, 10.0.0.20:7777
REST Client
Rest client可以通過 index.[X].bulk-refresh
參數控制改變多久能被索引到。
REST Client 既可以配置成HTTP的方式也可以配置成HTTPS的方式。
HTTPS authentification
可以通過 index.[X].elasticsearch.ssl.enabled 開啟HTTP的SSL支持。注意,這可能需要修改 index.[X].port 參數,因為ES的HTTPS服務的端口號可能和通常意義的REST API端口(9200)不一樣。
HTTP authentification
可以通過配置 index.[X].elasticsearch.http.auth.basic.realm
參數來通過HTTP協議做認證。
index.search.elasticsearch.http.auth.type=basic
index.search.elasticsearch.http.auth.basic.username=httpuser
index.search.elasticsearch.http.auth.basic.password=httppassword
tips:
可以自己實現class來實現認證:
index.search.elasticsearch.http.auth.custom.authenticator-class=fully.qualified.class.Name
index.search.elasticsearch.elasticsearch.http.auth.custom.authenticator-args=arg1,arg2,...
自己實現的class必須實現 org.janusgraph.diskstorage.es.rest.util.RestClientAuthenticator
接口。
高級功能
Advanced Schema
Static Vertex
Vertex label可以定義為static的,一旦創建,就不能修改了。
mgmt = graph.openManagement()
tweet = mgmt.makeVertexLabel('tweet').setStatic().make()
mgmt.commit()
Edge and Vertex TTL
邊和頂點可以配置對應的time-to-live(TTL)
,這個概念有點類似於數據庫中的臨時表的概念,用這種方式創建的點和邊在使用一段時間以后會被自動移除掉。
Edge TTL
mgmt = graph.openManagement()
visits = mgmt.makeEdgeLabel('visits').make()
mgmt.setTTL(visits, Duration.ofDays(7))
mgmt.commit()
需要注意的是,這種方法后端數據庫必須支持cell level TTL,目前只有Cassandra和HBase支持。
Property TTL
mgmt = graph.openManagement()
sensor = mgmt.makePropertyKey('sensor').cardinality(Cardinality.LIST).dataType(Double.class).make()
mgmt.setTTL(sensor, Duration.ofDays(21))
mgmt.commit()
Vertex TTL
mgmt = graph.openManagement()
tweet = mgmt.makeVertexLabel('tweet').setStatic().make()
mgmt.setTTL(tweet, Duration.ofHours(36))
mgmt.commit()
Undirected Edges
mgmt = graph.openManagement()
mgmt.makeEdgeLabel('author').unidirected().make()
mgmt.commit()
這種undirected edge只能通過out-going的方向去遍歷,這有點像萬維網。
Eventually-Consistent Storage Backends
底層數據的最終一致性問題。
Eventually consistent storage backend有哪些?Apache Cassandra 或者 Apache HBase其實都是這種數據庫類型。
數據的一致性
通過 JanusGraphManagement.setConsistency(element, ConsistencyModifier.LOCK)
方法去定義數據的一致性問題, 如下面的例子:
mgmt = graph.openManagement()
name = mgmt.makePropertyKey('consistentName').dataType(String.class).make()
index = mgmt.buildIndex('byConsistentName', Vertex.class).addKey(name).unique().buildCompositeIndex()
mgmt.setConsistency(name, ConsistencyModifier.LOCK) // Ensures only one name per vertex
mgmt.setConsistency(index, ConsistencyModifier.LOCK) // Ensures name uniqueness in the graph
mgmt.commit()
使用鎖其實開銷還是很大的,在對數據一致性要求不高的情形,最好不用鎖,讓后期數據庫自己在讀操作中去解決數據一致性問題。
當有兩個事務同時對一個元素進行寫操作的時候,怎么辦呢?我們可以先讓寫操作成功,然后后期再去解決一致性問題,具體有兩種思路解決這個問題:
- Forking Edges
思想就是,每一個事務fork一個對應的要修改的edge,再根據時間戳去在后期修改。
下面是個例子:
mgmt = graph.openManagement()
related = mgmt.makeEdgeLabel('related').make()
mgmt.setConsistency(related, ConsistencyModifier.FORK)
mgmt.commit()
這里,我們創建了一個edge label,叫做 related
,然后我們把一致性屬性設置成了 ConsistencyModifier.FORK
。
這個策略只對MULTI類別的邊適用。其他的multiplicity並不適用,因為其它multiplicity顯式地應用了鎖。
Failure & Recovery
失敗和恢復,主要是兩個部分:
- 事務的失敗和恢復
- 實例的宕機和恢復
事務的失敗和恢復
事務如果在調用 commit()
之前失敗,是可以恢復的。commit()
之前的改變也會被回滾。
有時候,數據persist到存儲系統的過程成功了,但創建index的的過程卻失敗了。這種情況下,該事務會被認為成功了,因為底層存儲才是source of graph。
但這樣會帶來數據和索引的不一致性。JanusGraph維護了一份 transaction write-ahead log
,對應的有兩個參數可以調整:
tx.log-tx = true
tx.max-commit-time = 10000
如果一個事務的persistance過程超過了 max-commit-time
,JanusGrpah會嘗試從中恢復。與此同時,另外有一個進程去掃描維護好的這份log,去identify那些只成功了一半的事務。建議使用另一台機器專門去做失敗恢復,運行:
recovery = JanusGraphFactory.startTransactionRecovery(graph, startTime, TimeUnit.MILLISECONDS);
transaction write-ahead log
本身也有維護成本,因為涉及到大量的寫操作。transaction write-ahead log
自動維護的時間是2天,2天前的數據會被自動刪除。
對於這樣的系統,如何 fine tune log system 也是需要仔細考慮的因素。
實例的恢復
如果某個JanusGraph instance宕機了,其他的實例應該不能受影響。如果涉及到schema相關的操作,比如創建索引,這就需要不同instance保持協作了,JanusGraph會自動地去維護一份running instance的列表,如果某一個實例被意外關閉了,創建索引的操作就會失敗。
在這種情況下,有一個方案是去手動地remove某一個實例:
mgmt = graph.openManagement()
mgmt.getOpenInstances() //all open instances
==>7f0001016161-dunwich1(current)
==>7f0001016161-atlantis1
mgmt.forceCloseInstance('7f0001016161-atlantis1') //remove an instance
mgmt.commit()
但這樣做有數據不一致的風險,應該盡量少使用這種方式。
索引的管理
重新索引
一般來講,我們在創建schema的時候,就應該把索引建立好,如果事先沒有創建好索引,就需要重新索引了。
可以通過兩種方式來執行重索引:
- MapReduce
- JanusGraphManagement
具體的代碼可以參考:https://docs.janusgraph.org/latest/index-admin.html
刪除索引
刪除索引分兩步:
- JanusGraph通知所有其他的實例,說明索引即將被刪除,索引便會標記成
DISABLED
狀態,此時JanusGraph便會停止使用該索引去回答查詢,或者更新索引,索引相關的底層數據還保留但會被忽略。 - 根據索引是屬於composite索引還是mixed索引,如果是composite索引,可以直接用
JanusGraphManagement
或者MapReduce
去刪除,如果是mixed索引就比較麻煩了,因為這涉及到后端存儲的索引,所以需要手動地去后端drop掉對應的索引。
重建索引的相關問題v
當一個索引剛剛被建立,就執行重索引的時候,可能會報如下錯誤:
The index mixedExample is in an invalid state and cannot be indexed.
The following index keys have invalid status: desc has status INSTALLED
(status must be one of [REGISTERED, ENABLED])
這是因為建立索引后,索引信息會被慢慢地廣播到集群中其他的Instances,這需要一定的時間,所以,最好不要在索引剛剛建立以后就去執行重索引任務。
大規模導入(Bulk Loading)
大規模導入需要的配置
通過 storage.batch-loading
參數來支持 Bulk loading。
如果打開了 Builk loading,最好關閉自動創建schema的功能(schema.default = none
)。因為automatic type creation會不斷地check來保證數據的一致性和完整性,對於Bulk loading的場合,這或許是不需要的。
另外一個需要關注的參數是 ids.block-size
,可以通過增大這個參數來減少id獲取過程的數量(id block acquisition process),但這會造成大量的id浪費,這個參數需要根據每台機器添加的頂點的數量來做調整,默認值已經比較合理了,如果不行,可以適當地增大這個數值(10倍,100倍,比如)。
對於這個參數,有個技巧:Rule of thumb: Set ids.block-size to the number of vertices you expect to add per JanusGraph instance per hour.
Note:要保證所有JanusGraph instance這個參數的配置都一樣,如果需要調整這個參數,最好先關閉所有的instance,調整好后再上線。
如果有多個實例,這些實例在不斷地分配id,可能會造成沖突問題,有時候甚至會報出異常,一般來說,對於這個問題,可以調整下面幾個參數:
ids.authority.wait-time
:單位是milliseconds,id pool mamager在等待id block應用程序獲得底層存儲所需要等待的時間,這個時間越短,越容易出問題。
Rule of thumb: Set this to the sum of the 95th percentile read and write times measured on the storage backend cluster under load. Important: This value should be the same across all JanusGraph instances.
ids.renew-timeout
:單位是milliseconds,JanusGraph 的 id pool manager 在獲取新一個id之前會等待的總時間。
Rule of thumb: Set this value to be as large feasible to not have to wait too long for unrecoverable failures. The only downside of increasing it is that JanusGraph will try for a long time on an unavailable storage backend cluster.
還有一些需要注意的讀寫參數:
storage.buffer-size
:我們執行很多query的時候,JanusGraph會把它們封裝成一個個的小batch,然后推送到后端的存儲執行,當我們在短時間內執行大量的寫操作的時候,后端存儲可能承受不了這么大的壓力。在這種情況下,我們可以增大這個buffer參數,但與此相對的代價是每秒中可以發送的request數量會減小。這個參數不建議在用事務的方式導入數據的時候進行修改。
storage.read-attempts
和 storage.write-attempts
參數,這個參數指的是每個推送到后端的batch會被嘗試多少次(直至認為這個batch fail),如果希望在導數據的時候支持 high load,最好調大這幾個參數。
storage.attempt-wait
參數指定了JanusGraph在重新執行一次失敗的操作之前會等待的時間(millisecond),這個值越大,后端能抗住的load越高。
Graph Partitioning
分區策略,主要是兩種:
- Edge Cut
砍邊策略,經常一起遍歷到的點盡量放在同一個機器上。
- Vertex Cut
砍點策略。砍邊策略的目的是減小通信量,砍點策略主要是為了處理hotspot問題(超級點問題),比如有的點,入度非常大,這種情況下,用鄰接表的方式+砍邊的方式存儲的話,勢必造成某一個分區上某一個點的存儲量過大(偏移),這個時候,利用砍點策略,把這種點均勻地分布到不同的partition上面就顯得很重要了。
一個典型的場景是 User
和 Product
的關系,product
可能只有幾千個,但用戶卻有上百萬個,這種情況下,product
最好就始終砍點策略。
對與分區這個問題,如果數據量小,就用隨機分區(默認的)就好,如果數據量過大,就要好好地去fine tune分區的策略了。
JanusGraph with TinkerPop’s Hadoop-Gremlin
JanusGraph和TinkerPop的Hadoop框架的整合問題。JanusGraph和Apache Spark還有Hadoop的整合主要是依靠社區的力量。