gremlin driver/server 基於netty的 session實現


gremlin-server中 實現session需要兩點保證:

  • session 綁定了 變量列表;
  • 每一個session必須 在同一台 server進程的同一個 線程中運行。 這是又tinkpop graph transaction的threadlocal 機制要求的。

 

1.  SessionOpProcessor.java 中維護了 id -》 session的列表, 每個session 維護 Binding變量, 這即是 java ScriptEngine 的binding。

protected static ConcurrentHashMap<String, Session> sessions = new ConcurrentHashMap<>();

http://docs.oracle.com/javase/7/docs/api/javax/script/ScriptEngine.html#eval(java.lang.String,%20javax.script.Bindings)

 

2. gremlin driver 端每一個session 固定在一台機器:Client.java

        /**
         * Randomly choose an available {@link Host} to bind the session too and initialize the {@link ConnectionPool}.
         */
        @Override
        protected void initializeImplementation() {
            // chooses an available host at random
            final List<Host> hosts = cluster.allHosts()
                    .stream().filter(Host::isAvailable).collect(Collectors.toList());
            if (hosts.isEmpty()) throw new IllegalStateException("No available host in the cluster");
            Collections.shuffle(hosts);
            final Host host = hosts.get(0);
            connectionPool = new ConnectionPool(host, this, Optional.of(1), Optional.of(1));
        }

 

3. 每個session 有自己的 SingleThreadExecutor,保證單線程執行. Session.java

    /**
     * By binding the session to run ScriptEngine evaluations in a specific thread, each request will respect
     * the ThreadLocal nature of Graph implementations.
     */
    private final ExecutorService executor = Executors.newSingleThreadExecutor(threadFactoryWorker);

 

4. 保證 每個Encoder 都在session對應的thread執行:

GremlinResponseFrameEncoder.java
// if the request came in on a session then the serialization must occur in that same thread, except
// in the case of an error where we can free the session executor from having to do that job. the
// problem here is that if the session executor is used in the case of an error and the executor is
// blocked by parallel requests then there is no thread available to serialize the result and send
// back the response as the workers get all tied up behind the session executor.
if (null == session || !o.getStatus().getCode().isSuccess())
     serialized = new Frame(serializer.serializeResponseAsBinary(o, ctx.alloc()));
else
     serialized = new Frame(session.getExecutor().submit(() -> serializer.serializeResponseAsBinary(o, ctx.alloc())).get());

 5. 保證 gremlin script 在session對應的thread執行:

SessionOpProcessor.java, evalOp()
evalOpInternal(context, session::getGremlinExecutor, getBindingMaker(session).apply(context));

Session.java, 自帶的 GremlinExecutor 使用 session的executor

    private GremlinExecutor.Builder initializeGremlinExecutor() {
        final GremlinExecutor.Builder gremlinExecutorBuilder = GremlinExecutor.build()
                .scriptEvaluationTimeout(settings.scriptEvaluationTimeout)
                .afterTimeout(b -> {
                    graphManager.rollbackAll();
                    this.bindings.clear();
                    this.bindings.putAll(b);
                })
                .afterSuccess(b -> {
                    this.bindings.clear();
                    this.bindings.putAll(b);
                })
                .enabledPlugins(new HashSet<>(settings.plugins))
                .globalBindings(graphManager.getAsBindings())
                .executorService(executor)
                .scheduledExecutorService(scheduledExecutorService);

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM