Getting More Out of Spark
As our journey to understand the customer better than anyone else continues, so too does the need for bigger and better sciences. Thanks to our migration to distributed technologies like Spark, the data processing capabilities available to us now allow for the juggling of much larger data sets in a fraction of the time. Despite this increased potential, blindly throwing terabytes of data at Hadoop will rarely yield the promised performance you would expect from the technology, and will more than likely land you in a hot seat with one of our friendly Hadoop administrators. Fear not, however, as in this short article I hope to show you a few tips and tricks that you can use in your Spark jobs to help manage the processing of large data.
The first thing to consider when running Spark jobs is the YARN configuration. As much as I wish I could tell you the magic numbers to help make all your jobs run smoothly, the right number of executors, cores, shuffle partitions, memory, and more is all going to depend entirely on the complexity of the program, the size of the incoming data, the size of the cluster, and more. With that said, here are a few general guidelines about some of the most important configurations you might set:
- spark.sql.shuffle.partitions – The number of shuffle partitions for your program. This represents the total number of buckets that your data will be split into when performing any actions that require a shuffle across the network. A good starting point is the size of your incoming data divided by 250 MB.
- num-executors – The total number of executors that will be allocated to your program at runtime. Although it might be tempting to think bumping this number up will automatically lead to an increase in speed, sometimes adding more executors can actually degrade the performance of your program. Be conscious of other teams who are also using the cluster, and start testing your jobs with around 10 executors before scaling to higher numbers.
- executor-cores – The number of cores that will be allocated to your Spark job per executor at run time. Typically, this number should be set somewhere between 2-6 cores, depending mostly on the type of operations you are performing in your Spark job. As with the number of executors, try to start small with two cores before increasing.
- spark.yarn.executor.memoryOverhead – The amount of overhead memory allocated for every executor spun up at runtime. If you start to find several of your executors dropping connection in the middle of your jobs or see error messages saying YARN has killed the executor for exceeding memory limits, try bumping up the overhead executor memory somewhere between 4-8 GB.
- spark.yarn.driver.memoryOverhead – The amount of reserved space on the driver for managing any overhead costs associated with your job – very similar to the executor memory overhead. Generally speaking, the more complex your program and the more executors you spin up for your job, the more you should increase your driver overhead.
- driver-memory – The amount of memory reserved for the main execution of your program. Typically, your driver memory does not need to be very large unless you are collecting any data back to your driver or are doing a lot of calculations outside of the Spark libraries and functions available to your program. If YARN often kills your driver for exceeding memory limits, a good rule of thumb is to increase the memory to 6 GB, and then continue to increment by 2 GB should you continue to get failures.
- executor-memory – The total amount of memory allocated to each executor at runtime for your job, similar to driver-memory. Again, start small with roughly 6 GB of allocated memory per executor, then increase from there as needed.
Another thing to consider when running Spark jobs is the structure of your programs. In general, there are many simple things that you can do programmatically to help reduce the amount of data being carried throughout your program, the amount of data that has to travel across the network, the amount of data shuffling that has to occur, etc. Here are a few tips for improving the efficiency of your Spark programs:
Apply selects and filters to the data as soon as possible. Waiting until later in your program to filter dataframes or drop columns can be highly inefficient; the extra time it takes to carry that data through shuffle operations across the network quickly adds up.
When writing larger tables out to HDFS, partition by join/aggregation keys. If the incoming data has already been partitioned by whatever columns you will be using for joining dataframes or calculating aggregations, Spark will skip the expensive shuffle operation. The more reads to be performed downstream on the data, the more time you will save in skipped shuffles. In addition, partitioning by columns that will often be used for filtering data will allow Spark to utilize partition pruning to quickly filter out any unnecessary data.
When joining very large datasets with small datasets, use broadcast joins to cut down on unnecessary network shuffling and, in turn, boost the speed of your program. Broadcast joins are a special type of joins that work by taking the smaller dataframe and copying it to every executor. By doing this, Spark no longer needs to shuffle the data in both dataframes across the network, because each executor can simply use its own (full) copy of the smaller dataframe to join with its piece of the larger dataset.
The last thing we will cover is skew. Skewed data is perhaps one of the biggest challenges facing Spark users today because of the way Spark divides the workload between executors in a job. For most aggregations and joins that require data to be shuffled across the network, Spark will apply a simple hashing algorithm on the groupby or join key. The hashing algorithm creates a deterministic number based upon the current record’s key that that indicates which shuffle partition this record should be assigned to. As each record is allocated to a shuffle partition, it will then be moved to whichever executor is responsible for that shuffle partition.
After all the data has been shuffled, all records with the same key will be assigned to the same shuffle partition (and the same executor) to help speed up the job of aggregating or joining. Although this approach has many positives, dividing the data by key only works well if you have close to an even number of records for every distinct key. Say, for example, half of your data has the same value as its key, and the rest of your data is evenly split among another 300 keys. If you were to run any type of aggregation or join using that key, half of your dataset is going to end up in a single executor, and that executor is going to take significantly longer to finish than the other executors (if it finishes at all). To get around this problem, the Digital Engineering Team (we call ourselves The Jetsons) has developed a tool to take two incoming dataframes and create a configurable number of “salted” keys that will help split up the data into more evenly sized buckets; if you run into this problem, you may want to consider creating a similar utility.
Though using Spark is sometimes a challenge, effective tuning and program organization can go a long way in improving the experience. I hope these tips help you to make your own workflows more efficient.