Flink Standalone启动RpcService


Flink通过akka进行数据通信,在集群启动jobmanager的时候,需要启动JobManager通信的RPC服务。代码主要在

org.apache.flink.runtime.entrypoint.ClusterEntrypoint中runCluster->initializeServices方法中实现:
//在配置的bindAddress和portRange上面,启动JobManager一个RPC的服务端
//主要用来与task manager进行通信,和任务状态查询,任务Graph接口等。
commonRpcService = createRpcService(configuration, bindAddress, portRange);
createRpcService方法调用AkkaRpcServiceUtils中的createRpcService方法。
private RpcService createRpcService(Configuration configuration,String bindAddress, String portRange) throws Exception {
        return AkkaRpcServiceUtils.createRpcService(bindAddress, portRange, configuration);
}

在AkkaRpcServiceUtils的createRpcService方法中,实现了RPCService的创建

/**
     * Utility method to create RPC service from configuration and hostname, port.
     *
     * @param hostname   The hostname/address that describes the TaskManager's data location.
     * @param portRangeDefinition   The port range to start TaskManager on.
     * @param configuration                 The configuration for the TaskManager.
     * @return   The rpc service which is used to start and connect to the TaskManager RpcEndpoint .
     * @throws IOException      Thrown, if the actor system can not bind to the address
     * @throws Exception      Thrown is some other error occurs while creating akka actor system
     */
    public static RpcService createRpcService(
            String hostname,
            String portRangeDefinition,
            Configuration configuration) throws Exception {
        final ActorSystem actorSystem = BootstrapTools.startActorSystem(configuration, hostname, portRangeDefinition, LOG);
        return instantiateAkkaRpcService(configuration, actorSystem);
    }
/**
     * Starts an ActorSystem with the given configuration listening at the address/ports.
     * @param configuration The Flink configuration
     * @param listeningAddress The address to listen at.
     * @param portRangeDefinition The port range to choose a port from.
     * @param logger The logger to output log information.
     * @return The ActorSystem which has been started
     * @throws Exception Thrown when actor system cannot be started in specified port range
     */
    public static ActorSystem startActorSystem(
        Configuration configuration,
        String listeningAddress,
        String portRangeDefinition,
        Logger logger) throws Exception {
        return startActorSystem(
            configuration,
            listeningAddress,
            portRangeDefinition,
            logger,
            ForkJoinExecutorConfiguration.fromConfiguration(configuration));
    }

 

/**
     * Starts an ActorSystem with the given configuration listening at the address/ports.
     *
     * @param configuration The Flink configuration
     * @param listeningAddress The address to listen at.
     * @param portRangeDefinition The port range to choose a port from.
     * @param logger The logger to output log information.
     * @param actorSystemExecutorConfiguration configuration for the ActorSystem's underlying executor
     * @return The ActorSystem which has been started
     * @throws Exception Thrown when actor system cannot be started in specified port range
     */
    public static ActorSystem startActorSystem(
            Configuration configuration,
            String listeningAddress,
            String portRangeDefinition,
            Logger logger,
            @Nonnull ActorSystemExecutorConfiguration actorSystemExecutorConfiguration) throws Exception {
        return startActorSystem(
            configuration,
            AkkaUtils.getFlinkActorSystemName(),
            listeningAddress,
            portRangeDefinition,
            logger,
            actorSystemExecutorConfiguration);
    }

 

 1 /**
 2      * Starts an ActorSystem with the given configuration listening at the address/ports.
 3      *
 4      * @param configuration The Flink configuration
 5      * @param actorSystemName Name of the started {@link ActorSystem}
 6      * @param listeningAddress The address to listen at.
 7      * @param portRangeDefinition The port range to choose a port from.
 8      * @param logger The logger to output log information.
 9      * @param actorSystemExecutorConfiguration configuration for the ActorSystem's underlying executor
10      * @return The ActorSystem which has been started
11      * @throws Exception Thrown when actor system cannot be started in specified port range
12      */
13     public static ActorSystem startActorSystem(
14             Configuration configuration,
15             String actorSystemName,
16             String listeningAddress,
17             String portRangeDefinition,
18             Logger logger,
19             @Nonnull ActorSystemExecutorConfiguration actorSystemExecutorConfiguration) throws Exception {
20 
21         // parse port range definition and create port iterator
22         Iterator<Integer> portsIterator;
23         try {
24             portsIterator = NetUtils.getPortRangeFromString(portRangeDefinition);
25         } catch (Exception e) {
26             throw new IllegalArgumentException("Invalid port range definition: " + portRangeDefinition);
27         }
28 
29         while (portsIterator.hasNext()) {
30             final int port = portsIterator.next();
31 
32             try {
33                 return startActorSystem(
34                     configuration,
35                     actorSystemName,
36                     listeningAddress,
37                     port,
38                     logger,
39                     actorSystemExecutorConfiguration);
40             }
41             catch (Exception e) {
42                 // we can continue to try if this contains a netty channel exception
43                 Throwable cause = e.getCause();
44                 if (!(cause instanceof org.jboss.netty.channel.ChannelException ||
45                         cause instanceof java.net.BindException)) {
46                     throw e;
47                 } // else fall through the loop and try the next port
48             }
49         }
50 
51         // if we come here, we have exhausted the port range
52         throw new BindException("Could not start actor system on any port in port range "
53             + portRangeDefinition);
54     }

 

 1 /**
 2      * Starts an Actor System at a specific port.
 3      * @param configuration The Flink configuration.
 4      * @param actorSystemName Name of the started {@link ActorSystem}
 5      * @param listeningAddress The address to listen at.
 6      * @param listeningPort The port to listen at.
 7      * @param logger the logger to output log information.
 8      * @param actorSystemExecutorConfiguration configuration for the ActorSystem's underlying executor
 9      * @return The ActorSystem which has been started.
10      * @throws Exception
11      */
12     public static ActorSystem startActorSystem(
13         Configuration configuration,
14         String actorSystemName,
15         String listeningAddress,
16         int listeningPort,
17         Logger logger,
18         ActorSystemExecutorConfiguration actorSystemExecutorConfiguration) throws Exception {
19 
20         String hostPortUrl = NetUtils.unresolvedHostAndPortToNormalizedString(listeningAddress, listeningPort);
21         logger.info("Trying to start actor system at {}", hostPortUrl);
22 
23         try {
24             Config akkaConfig = AkkaUtils.getAkkaConfig(
25                 configuration,
26                 new Some<>(new Tuple2<>(listeningAddress, listeningPort)),
27                 actorSystemExecutorConfiguration.getAkkaConfig());
28 
29             logger.debug("Using akka configuration\n {}", akkaConfig);
30 
31 
32             ActorSystem actorSystem = AkkaUtils.createActorSystem(actorSystemName, akkaConfig);
33 
34             logger.info("Actor system started at {}", AkkaUtils.getAddress(actorSystem));
35             return actorSystem;
36         }
37         catch (Throwable t) {
38             if (t instanceof ChannelException) {
39                 Throwable cause = t.getCause();
40                 if (cause != null && t.getCause() instanceof BindException) {
41                     throw new IOException("Unable to create ActorSystem at address " + hostPortUrl +
42                         " : " + cause.getMessage(), t);
43                 }
44             }
45             throw new Exception("Could not create actor system", t);
46         }
47     }

重点看一下getAkkaConfig这个方法,通过传递的地址和端口信息,创建akka的配置信息,主要是akka的provider这这部分定义,如果是(hostname,port)这种元组的方式进行传递,那么就会生成

akka.remote.RemoteActorRefProvider。
 /**
    * Creates an akka config with the provided configuration values. If the listening address is
    * specified, then the actor system will listen on the respective address.
    *
    * @param configuration instance containing the user provided configuration values
    * @param externalAddress optional tuple of bindAddress and port to be reachable at.
    *                        If None is given, then an Akka config for local actor system
    *                        will be returned
    * @param executorConfig config defining the used executor by the default dispatcher
    * @return Akka config
    */
  @throws(classOf[UnknownHostException])
  def getAkkaConfig(configuration: Configuration,
                    externalAddress: Option[(String, Int)],
                    executorConfig: Config): Config = {
    val defaultConfig = getBasicAkkaConfig(configuration).withFallback(executorConfig)

    externalAddress match {

      case Some((hostname, port)) =>

        val remoteConfig = getRemoteAkkaConfig(configuration,
          // the wildcard IP lets us bind to all network interfaces
          NetUtils.getWildcardIPAddress, port,
          hostname, port)

        remoteConfig.withFallback(defaultConfig)

      case None =>
        defaultConfig
    }
  }

通过创建的actorsystemName、akkaConfig,调用RobustActorSystem.create创建ActorSystem

/**
    * Creates an actor system with the given akka config.
    *
    * @param akkaConfig configuration for the actor system
    * @return created actor system
    */
  def createActorSystem(actorSystemName: String, akkaConfig: Config): ActorSystem = {
    // Initialize slf4j as logger of Akka's Netty instead of java.util.logging (FLINK-1650)
    InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory)
    RobustActorSystem.create(actorSystemName, akkaConfig)
  }
akka.actor.RobustActorSystem
def internalApply(
      name: String,
      setup: ActorSystemSetup,
      uncaughtExceptionHandler: Option[UncaughtExceptionHandler]): RobustActorSystem = {
    val bootstrapSettings = setup.get[BootstrapSetup]
    val cl = bootstrapSettings.flatMap(_.classLoader).getOrElse(findClassLoader())
    val appConfig = bootstrapSettings.flatMap(_.config).getOrElse(ConfigFactory.load(cl))
    val defaultEC = bootstrapSettings.flatMap(_.defaultExecutionContext)

    var a = setup.get[BootstrapSetup]
      .flatMap(_.actorRefProvider).map(_.identifier)

    new RobustActorSystem(
      name,
      appConfig,
      cl,
      defaultEC,
      None,
      setup,
      uncaughtExceptionHandler).start()
  }


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM