Tuning Geocoding in Spark: Advanced Topics
This knowledge article dives a little deeper into decisions and calculations made in the previous Tuning Geocoding in Spark article.
Distributing Data (Sorting, Shuffling, Repartitioning)
There are two main problems to solve while geocoding: distributing data across the cluster and distributing computation evenly across the cluster.
Distributing data across the cluster
This problem is not unique to geocoding and arises in most Spark Applications. By default,
spark will not distribute the 1.25GB input file across the cluster while geocoding.
To solve this you need to call repartition
(int
n)
on the spark dataframe before geocoding, where
n
is the number of partitions to split the data
into. This is already implemented in the geocode driver and is exposed through the
--num-partitions
option. At a minimum, this should
be set to num-executors * executor-cores. For us that is 47 * 4 = 188.
Distributing geocoding computation evenly across the cluster
Not all countries geocode at the same speed. If a spark executor only has countries that
geocode quickly then it will finish geocoding and be idle for a long period of time
while the other executors are still working. This is not a proper way to utilize our
cluster and can result in a Spark Application waiting on just a few tasks. To solve
this problem we want to shuffle our input data. Luckily for us this will done be
implicitly when we call repartition
(int
n)
to solve the problem above. There is no benefit to randomly
sort before using repartition
.
Spark Executor Size
Choosing the correct amount of CPUs and Memory size to give an executor is very important when trying to efficiently use spark without errors. Since we know our geocoding instance will be shared across multiple tasks in an executor then we will take advantage of that and work with multi-core executors. Heres are the steps that get us to our executor size:
1. Determine the minimum amount of memory required to avoid failures
We want a reliable Spark Job that never has any task failures. We achieve this through trial and error by starting our job with 3-core executors with 1GB of memory and increasing the memory until we do not have any task failures. For this data we saw some task failures until we got to 13GB of memory. If your data is only distributed across a couple countries this the memory requirement will be lower.
2. Increase the minimum memory for a potential performance boost
Now that we have our minimum memory requirement it is possible we will see better performance if we increase our memory more. Again by trial and error, we increased the memory until we saw no improvement. We saw minor improvements if we increase to 15GB of memory and degradation started to occur at 20GB of memory.
3. Determine optimal CPU count
Now that we know our memory requirement we will determine how many cores each executor should have. By trial and error we start with executors having 2 cores and 15GB of memory and increase by 1 core until we see no improvement. At 3 cores we saw a noticeable improvement and at 7 cores we saw a degradation in performance. A minor peak in performance was observed at 4 cores.
Finalize
After following these steps we arrived at 4 cores and 15GB of memory for our executor size. Any executor size between 3 Core/13GB Memory and 6 core/20GB Memory will be very comparable in performance.
Additional Geocoding Options
Using S3 for input or output
When using EMR it is very common to use S3 paths for your input and output. This is OK and adds no performance penalty.
Using --combine
By default Spark will split your output into many small files, typically by the
number of partitions. If you want a single file then the geocoding driver supports
the --combine
flag which will repartition the output into 1
file. Using that flag with this Spark Application only added 1 minute to the total
run time.