RPC Environment (RpcEnv)

Caution

RPC Environment (aka RpcEnv) is an environment for RpcEndpoints to process messages. A RPC Environment manages the entire lifecycle of RpcEndpoints:

  • registers (sets up) endpoints (by name or uri)

  • routes incoming messages to them

  • stops them

A RPC Environment is defined by the name, host, and port. It can also be controlled by a security manager.

The only implementation of RPC Environment is Netty-based implementation. Read the section RpcEnvFactory.

RpcEndpoints define how to handle messages (what functions to execute given a message). RpcEndpoints register (with a name or uri) to RpcEnv to receive messages from RpcEndpointRefs.

rpcenv endpoints.png
Figure 1. RpcEnvironment with RpcEndpoints and RpcEndpointRefs

RpcEndpointRefs can be looked up by name or uri (because different RpcEnvs may have different naming schemes).

org.apache.spark.rpc package contains the machinery for RPC communication in Spark.

setupEndpoint Method

Caution
FIXME

awaitTermination Method

Caution
FIXME

RpcEnvFactory

Spark comes with (private[spark] trait) RpcEnvFactory which is the factory contract to create a RPC Environment.

An RpcEnvFactory implementation has a single method create(config: RpcEnvConfig): RpcEnv that returns a RpcEnv for a given RpcEnvConfig.

There are two RpcEnvFactory implementations in Spark:

  • netty using org.apache.spark.rpc.netty.NettyRpcEnvFactory. This is the default factory for RpcEnv as of Spark 1.6.0-SNAPSHOT.

  • akka using org.apache.spark.rpc.akka.AkkaRpcEnvFactory

You can choose an RPC implementation to use by spark.rpc (default: netty). The setting can be one of the two short names for the known RpcEnvFactories - netty or akka - or a fully-qualified class name of your custom factory (including Netty-based and Akka-based implementations).

$ ./bin/spark-shell --conf spark.rpc=netty

$ ./bin/spark-shell --conf spark.rpc=org.apache.spark.rpc.akka.AkkaRpcEnvFactory

RpcEndpoint

RpcEndpoint defines how to handle messages (what functions to execute given a message). RpcEndpoints live inside RpcEnv after being registered by a name.

A RpcEndpoint can be registered to one and only one RpcEnv.

The lifecycle of a RpcEndpoint is onStart, receive and onStop in sequence.

receive can be called concurrently.

Tip
If you want receive to be thread-safe, use ThreadSafeRpcEndpoint.

onError method is called for any exception thrown.

ThreadSafeRpcEndpoint

ThreadSafeRpcEndpoint is a marker RpcEndpoint that does nothing by itself but tells…​

Caution
FIXME What is marker?
Note
ThreadSafeRpcEndpoint is a private[spark] trait.

RpcEndpointRef

A RpcEndpointRef is a reference for a RpcEndpoint in a RpcEnv.

It is serializable entity and so you can send it over a network or save it for later use (it can however be deserialized using the owning RpcEnv only).

A RpcEndpointRef has an address (a Spark URL), and a name.

You can send asynchronous one-way messages to the corresponding RpcEndpoint using send method.

You can send a semi-synchronous message, i.e. "subscribe" to be notified when a response arrives, using ask method. You can also block the current calling thread for a response using askWithRetry method.

  • spark.rpc.numRetries (default: 3) - the number of times to retry connection attempts.

  • spark.rpc.retry.wait (default: 3s) - the number of milliseconds to wait on each retry.

It also uses lookup timeouts.

RpcAddress

RpcAddress is the logical address for an RPC Environment, with hostname and port.

RpcAddress is encoded as a Spark URL, i.e. spark://host:port.

RpcEndpointAddress

RpcEndpointAddress is the logical address for an endpoint registered to an RPC Environment, with RpcAddress and name.

It is in the format of spark://[name]@[rpcAddress.host]:[rpcAddress.port].

Stopping RpcEndpointRef — stop Method

stop(endpoint: RpcEndpointRef): Unit
Caution
FIXME

Endpoint Lookup Timeout

When a remote endpoint is resolved, a local RPC environment connects to the remote one. It is called endpoint lookup. To configure the time needed for the endpoint lookup you can use the following settings.

It is a prioritized list of lookup timeout properties (the higher on the list, the more important):

Their value can be a number alone (seconds) or any number with time suffix, e.g. 50s, 100ms, or 250us. See Settings.

Ask Operation Timeout

Ask operation is when a RPC client expects a response to a message. It is a blocking operation.

You can control the time to wait for a response using the following settings (in that order):

Their value can be a number alone (seconds) or any number with time suffix, e.g. 50s, 100ms, or 250us. See Settings.

Exceptions

When RpcEnv catches uncaught exceptions, it uses RpcCallContext.sendFailure to send exceptions back to the sender, or logging them if no such sender or NotSerializableException.

If any error is thrown from one of RpcEndpoint methods except onError, onError will be invoked with the cause. If onError throws an error, RpcEnv will ignore it.

Client Mode = is this an executor or the driver?

When an RPC Environment is initialized as part of the initialization of the driver or executors (using RpcEnv.create), clientMode is false for the driver and true for executors.

RpcEnv.create(actorSystemName, hostname, port, conf, securityManager, clientMode = !isDriver)

Refer to Client Mode in Netty-based RpcEnv for the implementation-specific details.

RpcEnvConfig

RpcEnvConfig is a placeholder for an instance of SparkConf, the name of the RPC Environment, host and port, a security manager, and clientMode.

create Factory Methods

create(
  name: String,
  host: String,
  port: Int,
  conf: SparkConf,
  securityManager: SecurityManager,
  clientMode: Boolean = false): RpcEnv

create(
  name: String,
  bindAddress: String,
  advertiseAddress: String,
  port: Int,
  conf: SparkConf,
  securityManager: SecurityManager,
  clientMode: Boolean): RpcEnv

You can create a RPC Environment using the helper method RpcEnv.create.

It assumes that you have a RpcEnvFactory with an empty constructor so that it can be created via Reflection that is available under spark.rpc setting.

Settings

spark.rpc

spark.rpc (default: netty since Spark 1.6.0-SNAPSHOT) - the RPC implementation to use. See RpcEnvFactory.

spark.rpc.lookupTimeout

spark.rpc.lookupTimeout (default: 120s) - the default timeout to use for RPC remote endpoint lookup. Refer to Endpoint Lookup Timeout.

spark.network.timeout

spark.network.timeout (default: 120s) - the default network timeout to use for RPC remote endpoint lookup.

It is used as a fallback value for spark.rpc.askTimeout.

Other

  • spark.rpc.numRetries (default: 3) - the number of attempts to send a message and receive a response from a remote endpoint.

  • spark.rpc.retry.wait (default: 3s) - the time to wait on each retry.

  • spark.rpc.askTimeout (default: 120s) - the default timeout to use for RPC ask operations. Refer to Ask Operation Timeout.

Others

The Worker class calls startRpcEnvAndEndpoint with the following configuration options:

  • host

  • port

  • webUiPort

  • cores

  • memory

  • masters

  • workDir

It starts sparkWorker[N] where N is the identifier of a worker.

results matching ""

    No results matching ""