How to provision an EMR cluster using On-Demand and Spot EC2 Instances
In this tutorial I’m going to explain how to provision an Amazon EMR cluster using EC2 Spot Instances.
⚠️ WARNING ⚠️: as soon as your EC2 instances are running, your Amazon account will be billed.
You can find the full Java source code in this Gist.
Introduction
Amazon EMR is the AWS solution for Big Data using open source tools such as Apache Spark. In a matter of minutes you can provision and run clusters with up to thousands of nodes.
In general, Big Data processing requires a lot of computational resources, so running big clusters could be very expensive. EC2 Spot Instances provide an easy way to spend less money. As the official documentation states:
Amazon EC2 Spot Instances are spare EC2 compute capacity in the AWS Cloud that are available to you at savings of up to 90% off compared to On-Demand prices.
However, Spot Instances can be terminated at any time. This can cause your application to crash, to loose data or get corrupted results.
An effective way to solve this problem is to mix On-Demand and Spot Instances in the same cluster.
Cluster nodes configuration
In general, your EMR cluster can have three types of nodes: master, core and task.
From the official documentation:
Master nodes
The master node manages the cluster and typically runs master components of distributed applications.
Core nodes
Core nodes are managed by the master node. Core nodes run the Data Node daemon to coordinate data storage as part of the Hadoop Distributed File System (HDFS). They also run the Task Tracker daemon and perform other parallel computation tasks on data that installed applications require.
Task nodes
You can use task nodes to add power to perform parallel computation tasks on data, such as Hadoop MapReduce tasks and Spark executors. Task nodes don’t run the Data Node daemon, nor do they store data in HDFS.
And also:
Because Spot Instances are often used to run task nodes, Amazon EMR has default functionality for scheduling YARN jobs so that running jobs do not fail when task nodes running on Spot Instances are terminated. Amazon EMR does this by allowing application master processes to run only on core nodes. The application master process controls running jobs and needs to stay alive for the life of the job.
Depending on your workload and application type, Amazon suggests different configurations. You should see first Cluster configuration guidelines and best practices, and then Instance configurations for application scenarios.
In this tutorial, I’m going to explain how to provision a simple cluster with the following configuration:
- one master node On-Demand
- two core nodes On-Demand
- many (!) task nodes Spot
To do this, I will use the Amazon EMR Java API (full source code here).
Choosing instance types
Which instance types to use is an important choice that should be taken carefully. In general, it depends on the type of your application and workload, and on your budget.
Besides, when choosing Spot Instances, you should also take into account the availability in your region. Instances with low availability will be terminated more often, so many jobs will fail and your application will require more time (and more money).
The full list of EC2 instance types (with description) is here: Amazon EC2 Instance Types. However, not all instances are supported by EMR. Check the full list here: Instance types supported by EMR.
To choose which Spot Instances to use, the following page provides detailed information about types of Spot Instances and their frequency of interruption in your region: Spot Instance Advisor.
For additional information about prices, check Amazon EC2 Spot Instances Pricing for current prices in your region. For Spot Instances in particular, the best type is usually a trade-off between frequency of interruption and price. The Spot Instance pricing history is also very helpful.
Cluster provisioning
Configuration
First of all, I suggest to create a first EMR test cluster by hand. This will help you create what you require later, in particular a key pair, default EMR roles and a subnet.
We also have to choose the EMR version to use and a S3 bucket where to store logs.
String emrVersion = "emr-5.30.1";
String logFolder = "s3://your-bucket/emrlogs/";
String subnetID = "your-subnet-id";
String keyPairName = "your-emr-key-pair";
Configuring instance types
According to what I’ve explained above, for this example I choose the following instances for master and core nodes:
String masterInstanceType = "m5.xlarge";
String coreInstanceType = "i3.2xlarge";
As mentioned above, these will be On-Demand Instances.
When configuring Spot Instances, you have to set their weighted capacity, i.e., how much weight each instance type has when creating the Instance Fleet. In this example I choose the following instance types with their respective weights:
SpotInstance[] taskFleetInstances = new SpotInstance[] {
new SpotInstance("r4.2xlarge", 8),
new SpotInstance("r4.4xlarge", 16)
};
Note: SpotInstance
is just a bean I created to store this information.
Let me explain how weighted capacity works. In this example the weight of r4.2xlarge
is 8 and that of r4.4xlarge
is 16. First of all, even if these numbers are arbitrary (they could be for example 1 and 2), I choose here to use as weight the number of vCPUs of each instance type. I think this way is easy to understand and less error-prone. This is also the default when creating a cluster in the AWS control panel.
Later, when you create your “instance fleet” (see below) you choose a total capacity. EC2 will fill the specified total capacity with Spot Instances according to their weights. So, for example, if you request a total capacity of 32, you may get 2 r4.2xlarge
instances and 1 r4.4xlarge
, or 4 r4.2xlarge
, and so on.
Basic cluster provisioning
First of all you create an AmazonElasticMapReduce
object using your role credentials, and then you setup cluster steps, i.e., “actions” that your cluster will perform. You need at least one step, your “main” step. In this example, the main step is an Apache Spark application, deployed as a JAR file on S3.
String jarPath = "s3://your-bucket/your-job.jar";
List<String> argsList = new ArrayList<>();
argsList.add("arg1");
argsList.add("arg2");
StepConfig mainStep = new StepConfig()
.withName("your-main-step")
.withActionOnFailure("TERMINATE_JOB_FLOW")
.withHadoopJarStep(new HadoopJarStepConfig().withJar(jarPath).withArgs(argsList));
Note also that I configure an optional debugging step.
Task nodes
When creating Spot Instances, you are actually making a bid. In this example I choose a maximum price as a percentage of the full price of On-Demand Instances:
double spotPricePerc = 0.4;
that is, at least 40% of the full price. Note that Spot prices are usually up to 80% less than the full price.
I didn’t mention that in a single instance fleet you can mix Spot and On-Demand instances. For task nodes, you may choose to have a few On-Demand nodes always available. This depends on your application. In many scenarios I think the best choice would be to allocate all task nodes as Spot Instances, trying to select types with low frequency of interruption if possible.
InstanceFleetConfig taskFleetConfig =
new InstanceFleetConfig().withName("Task").withInstanceFleetType(InstanceFleetType.TASK)
.withTargetSpotCapacity(tasksSpotCapacity)
.withLaunchSpecifications(
new InstanceFleetProvisioningSpecifications()
.withSpotSpecification(
new SpotProvisioningSpecification()
.withAllocationStrategy("capacity-optimized")
.withTimeoutDurationMinutes(15)
.withTimeoutAction(SpotProvisioningTimeoutAction.TERMINATE_CLUSTER)))
.withInstanceTypeConfigs(taskFleetInstanceConfigs);
if (tasksOnDemandCapacity > 0) {
System.out.println(
"Tasks capacity ON DEMAND = " + tasksOnDemandCapacity + ", SPOT = " + tasksSpotCapacity);
taskFleetConfig = taskFleetConfig.withTargetOnDemandCapacity(tasksOnDemandCapacity);
}
I configure an instance fleet with a capacity of tasksSpotCapacity
units of Spot Instances, and an optional capacity of tasksOnDemandCapacity
units of On-Demand Instances.
The capacity-optimized
allocation strategy will choose Spot Instance types according to their availability. This will lower the probability that your instances will be terminated. The default strategy is instead to choose instances by price (the more it is discounted the better).
I also choose to wait at most 15
minutes for the bidding. After this timeout without the bid to complete, cluster provisioning will fail.
Final cluster configuration
In the last step I configure the other two node types, master and core, and I create the request object to be passed to the API:
RunJobFlowRequest request = new RunJobFlowRequest()
.withName("your-job-name")
.withReleaseLabel(emrVersion).withApplications(new Application().withName("Spark"))
.withSteps(enableDebugging, mainStep)
.withLogUri(logFolder)
.withServiceRole("EMR_DefaultRole")
.withJobFlowRole("EMR_EC2_DefaultRole")
.withInstances(
new JobFlowInstancesConfig()
.withEc2SubnetId(subnetID)
.withEc2KeyName(keyPairName)
.withKeepJobFlowAliveWhenNoSteps(false)
.withInstanceFleets(
new InstanceFleetConfig().withName("Master")
.withInstanceFleetType(InstanceFleetType.MASTER)
.withInstanceTypeConfigs(
new InstanceTypeConfig().withInstanceType(masterInstanceType))
.withTargetOnDemandCapacity(1),
new InstanceFleetConfig().withName("Core")
.withInstanceFleetType(InstanceFleetType.CORE)
.withInstanceTypeConfigs(
new InstanceTypeConfig().withInstanceType(coreInstanceType))
.withTargetOnDemandCapacity(1),
taskFleetConfig));
The last step is to invoke the API:
RunJobFlowResult result = emr.runJobFlow(request);
System.out.println("The cluster ID is " + result);