The Sprinting Pachyderm: Improving Runtime Performance of Your Big Data Application

racing-elephant-with-animal.960x600

Disclaimer: No elephants were harmed while writing this blog post!

Big Data applications have become ubiquitous in software development. With treasure troves of data being collected by companies, there is always a need to derive business sense from the data quickly or risk losing their temporal context. Companies typically enable the quick pace by decoupling the tasks of setting up and maintaining infrastructure and platform administration from application software development, allowing application developers to focus purely on solving business problems.

At the Experimentation team in eBay, we deal with a massive scale of computing numerous statistical measures for over 100 metrics across tens of dimensions. To get an idea of the scale, try sampling (pun intended!) these approximate numbers — We support more than 250 experiments simultaneously, generate about 30 TB per month, and process around 20 TB every day, running more than 300 Map/Reduce (MR) jobs every day.

While application developers tend to focus their attention by adding functionality, poor run-time performance of jobs sometimes comes as a rude surprise.

In this blog, I present some of the levers that a developer can employ to mitigate performance issues of Big Data applications.

Here I will assume that there exists a data processing and storage platform, such as a Hadoop cluster, that MR applications are being developed using some high-level library such as Scoobi (a Scala library), and that jobs are scheduled using an enterprise scheduler such as Automic (UC4), though some of these techniques are applicable across variants of the architecture.

Design

When you design MR applications, some key elements need to be factored into the design phase. Here are some of my guiding principles for designing MR applications.

  • Store intermediate and final data with the express purpose of assisting with convenient processing in subsequent stages. It might be obvious to store intermediate data so that it is convenient for the MR job to process it, but the same advantage may be applicable if the final output data is also stored conveniently, because yet another stage may be added as an enhancement in the future.
  • Create self-sufficient logical units (folders) of storage. Here self-sufficiency simply means denormalization. Self-sufficiency allows for a better possibility of parallelism, and better parallelism translates to better amortization of processing costs for a suite of business use cases (or an application as a whole). The trade-off is data duplication but with cheap storage costs. Higher storage requirements should be a good alternative to absorbing higher processing cost, which translates to poor application performance and therefore affects business.
  • To mitigate the higher storage requirements, have a robust purging and archival policy.
  • Design should be flexible enough to accommodate iterative changes later, applied as layers. That is, design should be such that a smaller layer of design change can be easily added on top of the existing design. For example, we could later add a design layer to group the logical units referred to earlier and apply processing on the groups.
  • Start by setting a self-constrained SLA for each MR application. Continuously try to improve processing performance to be less than the SLA. Over time, the SLA will settle on a realistic time. Allow a breach of the SLA only if there is a corresponding increase in input.
  • Use the right tool for the right job. Sometimes it may be necessary to use a library or a tool such as Hive, Spark, or Tez that is not standard for the application being developed because of inherent advantages of the tool, such as joins or in-memory performance. Using the right tool helps prevent re-inventing the wheel if there is a tool already available to use but not yet part of the development environment.

Combiners

A seasoned MR application developer knows the advantages of using Combiners. Among other advantages, they help reduce shuffle time, reduce network clogging, and prevent OOM errors on the reducer side. But what if it’s not easy to write Combine operations for records? In real world Big Data applications, records have complex structures with attributes of Arrays, Maps, etc. Combined records form new records that need to combined again, and care should be taken that the end result of combining n records is the same for multiple executions of the job with Combine operations occurring in any order. That is, Combine operations should not violate associate properties.

Here is a great methodology to employ in such cases. Perform Combine operations to generate only partial aggregations and form the map output, and then have the final reduce phase perform the originally intended aggregation for a given reduce key.

For example, suppose there is a map of attributes in every record with string keys and double values. Our original intended aggregation is to generate the variance of those double values. In this case, the general method is to use Combiners to generate map-local variance values and then use a Reducer to generate variance across all the map-local variances for a given key. Comparing that with the variances generated without using Combiners for the same key, we find that the values are not the same. The following example illustrates the concept.

Formula for Variance = ∑((Xi – XM)2)/n

Where:

  • Xi is a value in the series for i in the range between 1 to n
  • XM is the mean of all the values in the series
  • n is the total number of values in the series

We can rewrite the formula as = (X1-XM)2 + (X2-XM)2 + (X3-XM)2 …/ n

We can further reduce this to = ((X1)2 + (X2)2 + (X3)2 + … + (nXM)2 -2XM(X1+X2+X3…)) /n

There are now three parts of the above formula that can independently become candidates for Combine functions.

  • X1 + X2 + X3 …  (sum of values)
  •  1 + 1 + 1 + … n  (Number of Values)
  •  (X1)2 + (X2)2 + (X3)2 …  (sum of squares of values)

The above three parts form the components necessary to perform final variance calculations in the final Reduce phase. These three parts (or partial aggregations) are computed in the Combiners and written out as part of the map output. Note that the XM can also be computed by just computing (1)÷(2) and then using this value in the formula for Variance. Hence this methodology can be applied to several other such complex computations as long as the computations can be broken down into partial components/aggregations. I refer to this as “staggered computations” in MR jobs and have found them to be highly performant.

Caching

MR applications can be very simple in nature, such as adding a static text to every row in the input, or they can be very complex with a lot of control flows, filters, etc. Sometimes, a lot of objects can end up being created just for the life of processing the record, discarded, and then recreated during the processing of another record. Such applications provide a lot of opportunity to cache objects. Caching objects, as we all know, prevents most memory issues from persisting and relieves the application from memory pressure.

Platform and cluster changes

Hierarchical queues

Hierarchical queues allow for the cluster to be composed of a hierarchy of queues, where each child queue is dedicated to a tenant. If hierarchical queues are enabled on the cluster, they can help alleviate the issue of resource hogging by some tenants on the cluster and allow for more available clusters per tenant. This in turn allows for better predictability of your job’s run times and also for better capacity management of the queue resources.

 Scheduler

Hadoop clusters these days are multi-tenant, so job performance depends heavily on the prevailing workload. Analyze the load patterns on the queue/cluster and look for spiky usage patterns. Ideally, we want to remove spikes from the usage patterns and smooth the curve of usage. This is not an easy task since spikes show up at different times of the day, based on the number of tenants and the number of jobs per tenant. However, once quiet periods of load patterns are identified, the Scheduler that submits the jobs can be tuned to leverage the resources available during the quiet period. This is called job throttling or dynamic scheduling, where jobs are submitted not all at once, but a few at a time. Slowly increase the threshold after observing consistent results.

This solution will work only after consistent load patterns are observed. If the load pattern cannot be nailed down to its generic form, then this solution will have no effect on job performance. After the job is submitted, there is no way to control the rate at which resources (mappers/reducers) are allocated to the job, which means throttling can be done only at the time of job submission. In enterprise schedulers such as UC4 by Automic, jobs are queued to be submitted dynamically by writing simple UC4 scripts and by using a nifty parameter called maxParallel that decides the upper limit of jobs to be run in parallel.

File Formats

Hadoop supports various file formats such as Text, CSV, Sequence File, though anyone can implement a new file format support by implementing the corresponding FileFormat interfaces and SerDes. While choosing a file format for Hadoop applications, the key factors to consider are performance, compression, and schema evolution. The aforementioned formats all have performance, compression, and schema disadvantages for large datasets.

Schema evolution

In practice, Big Data applications evolve over time and so does the data structure/schema. Data being output by one application may be getting consumed by many other applications or end users. The data files in the above formats themselves do not say anything about the schema and have to be parsed every time a file is read to understand the underlying schema which impacts the overall performance of consuming such data. Fields get added, removed, renamed frequently during the evolution of the application.

Compression

While compression data can prove to be advantageous for storage and transit of data, these advantages must be evaluated against the increase in processing cost during decompression. That is there is a trade off between I/O-bound or CPU-bound compression codecs. There are various codecs, such as gzip, bzip2, LZO, and Snappy, that may be used in conjunction with various output file formats. Choose a compression type that allows for splittable input or partial processing of data. Test out the various combinations of codecs with the output file formats to find out which combination works best for storage, transit, and processing of data for your MR application. Enable compression for both intermediate output and final output to get a sense of the runtime performance advantage.

File format performance

File formats have an impact on the read/write performance of data which is also important for overall application performance but is not directly related to processing performance. If you have more than a few Hive applications or a lot of users querying Hive tables on processed data, then there are file formats that optimize such use cases. More recent file formats, such as RC (Row Columnar), ORC (Optimized Row Columnar), and Parquet, are moving in this direction.

All things considered, I recommend using the Avro format, which offers great advantages in schema evolution and compression support (blocks), allows for input splitting, and hence is better for overall performance.

Monitoring and Profiling

Active monitoring of running jobs is essential to understand their performance characteristics. If there is no monitoring in place, then it takes an inordinate amount of time just to understand why a particular job is failing post facto. Needless to say, profiling an MR application is critical to understand its behavior.

Counters

Counters are a convenient way to monitor the MR application. They can be used to detect skews and anomalies in use cases and failures, and they even create a measure of certain business events. However, counting adds some overhead to the processing and should be used carefully. Counters are updated not only at the task level but at the overall application level, too. Hadoop itself restricts the number of user-defined counters to about 120. Having useful counters helps developers understand the profile of the application in terms of memory and CPU. In Scoobi, a simple construct allows us to look up and increment a counter representing business events.

aDList.parallelDo(new DoFn[InputType, OutputType] {
def setup() {}
def process(input: GenericData.Record, emitter: Emitter[GenericData.Record]) {
…..
…..
emitter.incrementCounter(groupName, name, increment)
}
def cleanup(emitter: Emitter[GenericData.Record]) {}
})

Auto remediation

MR jobs can fail due to a variety of reasons, but these failures can be classified into two categories: failures due to the platform or Hadoop cluster and failures due to data states. If the failures are due to the Hadoop cluster, then the chances are that they were due to some transient condition on the cluster such as a data node containing a replica not being reachable or a network glitch making name node unavailable briefly. It does not make sense to manually restart every time such a failure occurs. Auto-remediation is a key part of making the schedule of jobs tolerant to the failures of the cluster. On a scheduler such as UC4, it is quite easy to detect a failure condition and just auto-restart and resubmit the failed job. We would, however, need to cap the number of restarts to a reasonable threshold and send out alerts if remediation was impossible.

Processing cost

Applications are typically just a series of computations. By wrapping such computations with timing functions, it is possible to estimate the breakdown of processing costs for all those computations. Further, the processing costs can be saved in user-defined counters to be able to look at the numbers as aggregates. This allows us to optimize specific functions for better performance. Below are two timing functions that capture the processing costs of other computations represented as functions.

object PerformanceUtil {
def timingFn[T](fn: => T, counters: Counters, counterGroupName: String, counterName: String) = {
val start = System.nanoTime()
val output = fn
val end = System.nanoTime()
println("Time taken = " + (end - start) + "ns")
counters.incrementCounter(counterGroupName, counterName, (end - start))
(output)
}
def timingFn[T](fn: => T) = {
val start = System.nanoTime()
val output = fn
val end = System.nanoTime()
println("Time taken = " + (end - start) + "ns")
(output, (end - start))
}
}

Data Skew

It does not take long after beginning to write MR applications for you to hit the last-reducer problem. Essentially, this means there is one or a few reducers stuck doing a lot of processing. More often than not, this is due to skewness in your data.

This is a hard problem to solve, especially since skewness is observed on the Reducer side. Skew can occur either in the number of Reducer groups or in the data distribution per Reducer group. Here the Reducer group is nothing but a fragment of the data characterized by a single Reducer key.

It is relatively easier to solve data skew in the number of Reducer groups by simply having the code output a different key on Mappers breaching a threshold of the number of keys, so that a Reducer with fewer Reducer groups observed may get more of other reducer groups to process. This requires a little bit of processing cost to be absorbed at the Mapper side, but it’s worth trying if the observed runtime performances are significantly better. A simple hash partitioner can be used whenever there is a breach of this threshold number.

The skew due to data distribution across reducer groups is a much tougher problem to solve. Here data skew is observed when one or a few Reducer groups have a large number of records to process compared to most of the Reducer groups. This type of skew is harder to solve because the framework is agnostic to the logic of operation being performed in the Reducer. One of the solutions to handle this type of skew is to break up the Reducer workload into two or more stages, in such a way as to mitigate the issue. This approach, however, might need the developer to rework the logic of the Reducer, especially the non-final stages.

In other situations, if you are doing a join of two datasets where one of the datasets is heavily skewed while the other is not, you might benefit from using Block Joins. As the name suggests, a block join splits up the skewed data into blocks and replicates the non-skewed dataset into every block. The overall processing cost is higher but is spread more evenly across compute nodes, and using Block Joins also prevents the OOM from occurring on hot compute nodes.

Tuning Parameters

There is a definite advantage in trying to tune the job parameters to get better performance of the jobs. However, job parameters are very subjective to the job being tuned. Hence, the parameter values that are good for one job may have an adverse effect on another job. Let’s look at some of the parameters to tune that may easily result in faster runtimes.

  • mapreduce.input.fileinputformat.split.minsize and mapreduce.input.fileinputformat.split.maxsize
     
    The above two parameters can be tuned to decide the workload of a mapper within a range. Tuning this range results in a corresponding change in the number of mappers being allocated for the job. For large applications, it is good to cap the mapped.max.split.size at 128 MB.
     
    More mappers mean better parallelism and likely faster run time. However, this should be balanced with the prevailing load on the cluster. If the number of maps configured is not available in one pass, runtime performance of the job will increase, thereby annulling the effect of smaller runtimes of individual maps.
     
  • scoobi.mapreduce.reducers.min and scoobi.mapreduce.reducers.max
     
    Sometimes, due to skewness in data, more reduce groups might be getting processed in the same reducer. In such cases, it may be beneficial to increase the number of available Reducers so that the reduce groups are better distributed. Scoobi allows this by tuning the parameters scoobi.mapreduce.reducers.min and scoobi.mapreduce.reducers.max.
     
  • mapreduce.task.io.sort.factormapreduce.task.io.sort.mb and mapreduce.map.sort.spill.percent
     
    Before partitions are written into files as map output, the map outputs are sorted in memory. The above three parameters influence this sorting behavior. The sorting itself occurs by default using the QuickSort algorithm. mapreduce.task.io.sort.factor is a number that represents the number of simultaneous streams allowed to be merged at a time. mapreduce.task.io.sort.mb is the size of the buffer memory to use for sorting the map outputs. In practice, we have seen good performance when this setting is 1/4 the total memory setting for the map task.
     
    mapreduce.map.sort.spill.percent is a value between 0 and 1 that represents a percent threshold at which the spill of the buffer will start. By default, this setting is at 0.8 (but it depends on individual cluster deployments).
     
  • mapreduce.map.memory.mbmapreduce.reduce.memory.mb, mapreduce.map.java.optsmapreduce.reduce.java.opts
     
    mapreduce.map.memory.mb is the amount of memory allocated to each map task, and it should be tuned in the same way as in regular Java applications. Similarly, there exists a parameter for the Reducer task. mapreduce.map.java.opts and mapreduce.reduce.java.opts allow other parameters to be included, such as those related to GC.
     
  • dfs.block.size
     
    Represents the block size in MB that output files should be broken into. Each block will reside in one data node. It is recommended to try to run mappers in map-local state, which means that the entire data needed for a mapper is present on one single node. For this reason, it is better to keep dfs.block.size >= mapred.min.split.size and dfs.block.size <= mapred.max.split.size. If min.split.size is significantly larger than dfs.block.size, mappers would have to read data from other blocks over network, which adds to the run time of the job.
     
  • mapreduce.reduce.shuffle.parallelcopies
     
    This is the number of parallel copies that are allowed to be requested by a reducer. This number represents the number of mappers that can simultaneously serve map output data to the reducers. Tune this to see which setting helps reduce the shuffle time for the job.
     
  • dfs.replication, dfs.client.block.write.locateFollowingBlock.retriesdfs.client.block.write.retries

    dfs.replication indicates the number of replicas to be stored on the cluster. If it makes sense, reduce the number of replicas so that the overall storage footprint goes down. The trade-off is that jobs may fail if data nodes with these fewer replicas are simultaneously unavailable.
     
    dfs.client.block.write.locateFollowingBlock.retries indicates the number of retries that a job client must make to locate a suitable end-point data node before deciding to fail the job during writes.

    dfs.client.block.write.retries indicates the number of retries that a job must make while writing the output of the job.

  • mapred.compress.map.output and mapred.map.output.compression.codec
     
    It can be beneficial to compress the map output so that it occupies a smaller storage footprint, This setting also provides the advantage of faster shuffle time over the network. mapred.compress.map.output represents a boolean value to toggle between allowing compression or not for the map output. The LZO codec offers reasonable performance, even though compression is not great compared to others such as Snappy. However, YMMV.

Summary

Performance engineering of an MR application is a long, drawn-out affair. It is never a finished task, since application profile, input data, size, and other characteristics change over time. The purpose of this post is to provide a direction to design and develop MR applications specifically with performance in focus. There are plenty of other ideas, parameters, and tricks for many programming languages out there to learn, implement, and take lessons from practical observations. Go ahead and create your own Sprinting Pachyderm!