Optimizing Joins in MR via Lookup Service

Rohit Kochar

Rohit Kochar on September 01, 2015

At InMobi, we process terabytes of data to produce understandable reports. These reports are queried for and analysed by our customers. Most of our raw data contains a lot of business keys that need to be joined with another set of data, generally referred to as dimension data, to produce enhanced and easy to understand reports.

The problem at hand can be summarized as:

  • Table A a.k.a Fact Table => Huge set of data(100+ GB)
  • Table B a.k.a Dimension Table => Relatively small set of data (1-2 GB)
  • R = A X B => Required Result

Types of Joins

There are 2 types of joins in the MapReduce (MR) paradigm:

  • Fragment replicate joins (map-side joins)
  • Reduce side joins

Fragment replicate joins are performed by loading one data set in the memory of all the mappers and streaming the other data set in a map phase, while performing a lookup on the joining key.

In reduce side joins, both data sets are partitioned and streamed to reducers, where the actual join takes place.

A major constraint with fragment replicate joins is that one data set has to be small enough to be held in memory. However, with reduce side joins, if data sets are skewed, a few reducers get overwhelmed with the input, impacting overall performance.

Our Initial Approach

When we started with big data processing at InMobi, our dimension data set was very small. As such, we chose to use map side joins. We loaded the entire dimension into memory at the beginning of each mapper, and performed a map side join.

The advantages of this approach were that we could handle cases of skewed joins and get the required parallelism based on the input size. Also, we could perform star joins in one job instead of in a multiple phase MR job (something that would have had to be implemented for a reduce side join).

This approach works well when map tasks are fat - meaning that the cost of loading the dimension data in the setup phase is amortized by the actual work done in the mapper.

Over time, we observed that the size of the dimension data kept increasing. This meant that we needed more memory per map task and more time to load data into memory, impacting performance severely.

With the growth of our network, fact table sizes also grew, leading to more more mappers being required. This increased the number of times the same dimension data was read per job, leading to a suboptimal use of network bandwidth.

To counter this, we developed a central lookup service called the Dimension store.

Dimension Store

dimstore1.jpg

The Dimension store is a central lookup service that keeps all data in memory and is also backed-up by disk. It has a high read throughput of 100K reads/sec.

The Dimension store is also accompanied by a client library that provides an interface to query the Dimension store and takes care of client side caching, avoiding the need for repeated network calls to the lookup service.

An ETL job that reads data from various systems of record and uploads this data to Dimension store runs at a fixed interval. At the time of loading, it also find records that have been modified and maintains their history in the central server. This ETL job is pluggable and can be used to read data from multiple sources.

This service is centrally deployed, ensuring that there is only one source of truth for all processing across the organization, and leading to good data quality.

Joins using Dimension Store

  • Instead of the local cache, Dimension store is used for joins in the mapper.
  • 99.5% lookups are satisfied from the local client cache
  • The cache size is 1% to 30% of the corresponding dimension table size.
  • 30% to 40% reduction in time taken for jobs
  • Joins are processed in real-time.

We modified our existing MapReduce jobs to use Dimension store as the lookup service instead of the local hashmap. This saved a lot of resources by reducing the overall HDFS reads as well as the memory requirement of each job.

Typical time series data like ad network events exhibit a high level of temporal and spatial locality in terms of key event dimensions. According to stats from our data jobs, we need, on average, only 10% of the total dimension data that eventually gets cached at client, and we get a cache hit rate of 99.5% with a small cache size.

The gain in time is attributed to two major things:

  • The map setup time is reduced as the data is loaded in the Dimension store by a separate job
  • More memory is available for the mapper, hence less time is spent on memory management.

Using map side joins also means that the cost of left outer joins is minimal whereas in traditional reduce side joins, it is an expensive operation. Also, traditional map side joins can only be implemented in batch mode, whereas joins using Lookup service can be done in the real time processing world.

Observed Improvements on a Real Job

Parameter New Job Existing Job
Average Map Time 731 secs (12.2 mins) 1312 secs (21.9 mins)
Total time by all mappers 41 mins, 55 secs 1 hr, 34 mins, 10 secs
Total time by Pig (all MR jobs) 1 hr, 5 mins 2 hrs, 11 mins

Cache Stats:

Dimension Lookup Total Elements in DB Cache Size Cache Hit Cache Size/Total Size
Dimension1 542K 11K 99.75% 2%
Dimension2 272K+143K+143K=558K 9K 99.94% 1.6%
Dimension3 2590K 113K 97.51% 4.3%
Dimension4 32K 10K 99.90% 31%
Dimension5 514 432 99.98% 84.04%

Technologies Evaluated for Dimension Store Server

While evaluating stacks for Dimension store, our major requirements were:

  • High read throughput of ~70 to 80K reads/sec (this is proportional to the number of concurrent mappers that might use the dimension store in the cluster)
  • Low latency reads of ~1 ms
  • Schema aware data store. That is, a value in the key-value store must be a combination of the column name and its corresponding value.
  • Decent write throughput.

Given the requirements we had, it was clear we needed a store that could serve data purely out of memory. Hence, we selected these three technologies for our evaluation

  • HSQL DB =>In memory/process relational database
  • Redis => In memory key value store, also referred as data structure store
  • AeroSpike =>In memory-disk backed key value store

HSQL DB

l (1).jpg

t (1).jpg

  • Using HSQL DB as a standalone, in-memory process and using JDBC for querying, we were able to achieve a throughput of 50K queries/sec with mean latency of ~8ms.
  • Our test queries were a combination of queries involving 0,1 and 2 joins.

For

  • HSQLDB met our requirement of being schema aware with the added advantage of native support for joins.

Against

  • But, it had a very low read throughput with high latencies.

During our benchmarking we also found that querying a non indexed column of HSQLDB could bring down the DB, which was a serious limitation.

Due to the lack of explicit monitoring support or high availability, and because of the above limitations, we decided to try other technologies.

Redis

no-l (1).jpg

n0-t (1).jpg

  • Using a single-node Redis setup, we achieved a read throughput of ~70K queries/sec with a latency of ~1-2 ms.
  • We used the same hardware and queries for comparing all stores.

For

  • Redis met our expectation of high read throughput and low latency,

Against

  • Redis did not have native support for sharding or high availability.
  • Redis is also a completely in-memory store. This meant that in the event of a crash, we would have to build its state from an external source, which would be cumbersome.
  • Although Redis supports multiple data structures like List, Map, and so on, it does not support a tuple. We would need perform the workaround of storing a map of <column name, column value> against each key. This would lead to an explosion of memory requirement because for each key, we would be repeating column names.
  • Redis does not support secondary indexes. This could be challenging on the few occasions when we query the same data via different keys.

Aerospike (Community Edition)

throughput.jpg

latency.jpg

Using a single-node Aerospike setup and with data held entirely in memory, we achieved a read throughput of 120K queries/sec and with latency of less than 1 ms.

Aerospike met our requirements of high read throughput, low latency, being schema aware, and having good support for monitoring and deploying the cluster.

Like Redis, Aerosopike supports List, Map and has native support for tuples, secondary indexes, and disk persistence.

With the performance of an in-memory hosted cache along with greatness of data store with persistence, indexes Aerospike became our choice as Dimension store’s server.

Conclusion:

With the increase in the need for real-time data processing and the increase in the overall data size at our organization, it became natural for us to move away from the the traditional fragment and replicate joins to more optimized joins using Dimension store.