Flink通过akka进行数据通信,在集群启动jobmanager的时候,需要启动JobManager通信的RPC服务。代码主要在
org.apache.flink.runtime.entrypoint.ClusterEntrypoint中runCluster->initializeServices方法中实现:
//在配置的bindAddress和portRange上面,启动JobManager一个RPC的服务端 //主要用来与task manager进行通信,和任务状态查询,任务Graph接口等。 commonRpcService = createRpcService(configuration, bindAddress, portRange);
createRpcService方法调用AkkaRpcServiceUtils中的createRpcService方法。
private RpcService createRpcService(Configuration configuration,String bindAddress, String portRange) throws Exception { return AkkaRpcServiceUtils.createRpcService(bindAddress, portRange, configuration); }
在AkkaRpcServiceUtils的createRpcService方法中,实现了RPCService的创建
/** * Utility method to create RPC service from configuration and hostname, port. * * @param hostname The hostname/address that describes the TaskManager's data location. * @param portRangeDefinition The port range to start TaskManager on. * @param configuration The configuration for the TaskManager. * @return The rpc service which is used to start and connect to the TaskManager RpcEndpoint . * @throws IOException Thrown, if the actor system can not bind to the address * @throws Exception Thrown is some other error occurs while creating akka actor system */ public static RpcService createRpcService( String hostname, String portRangeDefinition, Configuration configuration) throws Exception { final ActorSystem actorSystem = BootstrapTools.startActorSystem(configuration, hostname, portRangeDefinition, LOG); return instantiateAkkaRpcService(configuration, actorSystem); }
/** * Starts an ActorSystem with the given configuration listening at the address/ports. * @param configuration The Flink configuration * @param listeningAddress The address to listen at. * @param portRangeDefinition The port range to choose a port from. * @param logger The logger to output log information. * @return The ActorSystem which has been started * @throws Exception Thrown when actor system cannot be started in specified port range */ public static ActorSystem startActorSystem( Configuration configuration, String listeningAddress, String portRangeDefinition, Logger logger) throws Exception { return startActorSystem( configuration, listeningAddress, portRangeDefinition, logger, ForkJoinExecutorConfiguration.fromConfiguration(configuration)); }
/** * Starts an ActorSystem with the given configuration listening at the address/ports. * * @param configuration The Flink configuration * @param listeningAddress The address to listen at. * @param portRangeDefinition The port range to choose a port from. * @param logger The logger to output log information. * @param actorSystemExecutorConfiguration configuration for the ActorSystem's underlying executor * @return The ActorSystem which has been started * @throws Exception Thrown when actor system cannot be started in specified port range */ public static ActorSystem startActorSystem( Configuration configuration, String listeningAddress, String portRangeDefinition, Logger logger, @Nonnull ActorSystemExecutorConfiguration actorSystemExecutorConfiguration) throws Exception { return startActorSystem( configuration, AkkaUtils.getFlinkActorSystemName(), listeningAddress, portRangeDefinition, logger, actorSystemExecutorConfiguration); }
1 /** 2 * Starts an ActorSystem with the given configuration listening at the address/ports. 3 * 4 * @param configuration The Flink configuration 5 * @param actorSystemName Name of the started {@link ActorSystem} 6 * @param listeningAddress The address to listen at. 7 * @param portRangeDefinition The port range to choose a port from. 8 * @param logger The logger to output log information. 9 * @param actorSystemExecutorConfiguration configuration for the ActorSystem's underlying executor 10 * @return The ActorSystem which has been started 11 * @throws Exception Thrown when actor system cannot be started in specified port range 12 */ 13 public static ActorSystem startActorSystem( 14 Configuration configuration, 15 String actorSystemName, 16 String listeningAddress, 17 String portRangeDefinition, 18 Logger logger, 19 @Nonnull ActorSystemExecutorConfiguration actorSystemExecutorConfiguration) throws Exception { 20 21 // parse port range definition and create port iterator 22 Iterator<Integer> portsIterator; 23 try { 24 portsIterator = NetUtils.getPortRangeFromString(portRangeDefinition); 25 } catch (Exception e) { 26 throw new IllegalArgumentException("Invalid port range definition: " + portRangeDefinition); 27 } 28 29 while (portsIterator.hasNext()) { 30 final int port = portsIterator.next(); 31 32 try { 33 return startActorSystem( 34 configuration, 35 actorSystemName, 36 listeningAddress, 37 port, 38 logger, 39 actorSystemExecutorConfiguration); 40 } 41 catch (Exception e) { 42 // we can continue to try if this contains a netty channel exception 43 Throwable cause = e.getCause(); 44 if (!(cause instanceof org.jboss.netty.channel.ChannelException || 45 cause instanceof java.net.BindException)) { 46 throw e; 47 } // else fall through the loop and try the next port 48 } 49 } 50 51 // if we come here, we have exhausted the port range 52 throw new BindException("Could not start actor system on any port in port range " 53 + portRangeDefinition); 54 }
1 /** 2 * Starts an Actor System at a specific port. 3 * @param configuration The Flink configuration. 4 * @param actorSystemName Name of the started {@link ActorSystem} 5 * @param listeningAddress The address to listen at. 6 * @param listeningPort The port to listen at. 7 * @param logger the logger to output log information. 8 * @param actorSystemExecutorConfiguration configuration for the ActorSystem's underlying executor 9 * @return The ActorSystem which has been started. 10 * @throws Exception 11 */ 12 public static ActorSystem startActorSystem( 13 Configuration configuration, 14 String actorSystemName, 15 String listeningAddress, 16 int listeningPort, 17 Logger logger, 18 ActorSystemExecutorConfiguration actorSystemExecutorConfiguration) throws Exception { 19 20 String hostPortUrl = NetUtils.unresolvedHostAndPortToNormalizedString(listeningAddress, listeningPort); 21 logger.info("Trying to start actor system at {}", hostPortUrl); 22 23 try { 24 Config akkaConfig = AkkaUtils.getAkkaConfig( 25 configuration, 26 new Some<>(new Tuple2<>(listeningAddress, listeningPort)), 27 actorSystemExecutorConfiguration.getAkkaConfig()); 28 29 logger.debug("Using akka configuration\n {}", akkaConfig); 30 31 32 ActorSystem actorSystem = AkkaUtils.createActorSystem(actorSystemName, akkaConfig); 33 34 logger.info("Actor system started at {}", AkkaUtils.getAddress(actorSystem)); 35 return actorSystem; 36 } 37 catch (Throwable t) { 38 if (t instanceof ChannelException) { 39 Throwable cause = t.getCause(); 40 if (cause != null && t.getCause() instanceof BindException) { 41 throw new IOException("Unable to create ActorSystem at address " + hostPortUrl + 42 " : " + cause.getMessage(), t); 43 } 44 } 45 throw new Exception("Could not create actor system", t); 46 } 47 }
重点看一下getAkkaConfig这个方法,通过传递的地址和端口信息,创建akka的配置信息,主要是akka的provider这这部分定义,如果是(hostname,port)这种元组的方式进行传递,那么就会生成
akka.remote.RemoteActorRefProvider。
/** * Creates an akka config with the provided configuration values. If the listening address is * specified, then the actor system will listen on the respective address. * * @param configuration instance containing the user provided configuration values * @param externalAddress optional tuple of bindAddress and port to be reachable at. * If None is given, then an Akka config for local actor system * will be returned * @param executorConfig config defining the used executor by the default dispatcher * @return Akka config */ @throws(classOf[UnknownHostException]) def getAkkaConfig(configuration: Configuration, externalAddress: Option[(String, Int)], executorConfig: Config): Config = { val defaultConfig = getBasicAkkaConfig(configuration).withFallback(executorConfig) externalAddress match { case Some((hostname, port)) => val remoteConfig = getRemoteAkkaConfig(configuration, // the wildcard IP lets us bind to all network interfaces NetUtils.getWildcardIPAddress, port, hostname, port) remoteConfig.withFallback(defaultConfig) case None => defaultConfig } }
通过创建的actorsystemName、akkaConfig,调用RobustActorSystem.create创建ActorSystem
/** * Creates an actor system with the given akka config. * * @param akkaConfig configuration for the actor system * @return created actor system */ def createActorSystem(actorSystemName: String, akkaConfig: Config): ActorSystem = { // Initialize slf4j as logger of Akka's Netty instead of java.util.logging (FLINK-1650) InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory) RobustActorSystem.create(actorSystemName, akkaConfig) }
akka.actor.RobustActorSystem
def internalApply( name: String, setup: ActorSystemSetup, uncaughtExceptionHandler: Option[UncaughtExceptionHandler]): RobustActorSystem = { val bootstrapSettings = setup.get[BootstrapSetup] val cl = bootstrapSettings.flatMap(_.classLoader).getOrElse(findClassLoader()) val appConfig = bootstrapSettings.flatMap(_.config).getOrElse(ConfigFactory.load(cl)) val defaultEC = bootstrapSettings.flatMap(_.defaultExecutionContext) var a = setup.get[BootstrapSetup] .flatMap(_.actorRefProvider).map(_.identifier) new RobustActorSystem( name, appConfig, cl, defaultEC, None, setup, uncaughtExceptionHandler).start() }