>> Engineering Blog

From ETL to ELT

The Obligatory “About Us”

Yieldmo’s mission is to elevate the digital advertising experience for consumers, advertisers and publishers by harnessing the power of the massive data set that lives behind our consumer friendly formats. Our massive data set goes way beyond clicks by capturing countless micro-interactions consumers have with our ads. These engagements include gestures, swipes, scrolls, tilts, time spent etc… On any given day, we collect a billion plus data points leading to a daily processing payload of 280GB per day.

With Great Data comes Great Scaling Problems

Starting late December 2017, Yieldmo saw a huge increase in the number of ad requests it received. This was largely driven by new partnerships, features and revisions to its business model. Our ETL (Extract Transform Load) platform started showing its cracks. Average latency to data being query-able went from 5 minutes to 45 minutes. Even worse, sometimes latencies were upwards of 9 hours. Yikes! Welcome late night pipeline patches and a myriad of operational issues.

When our phones auto updated our ‘Favorite Contacts’ from our significant others to the on-call software, we decided enough was enough.

In this post, I will detail how switching our data persistence strategy from ETL to ELT (Extract Load Transform), has prepared us for a potential ad traffic growth of ~20x over current ingest load. All this scale on the same hardware and reduce our average latency from 45 minutes (at 2x traffic) down to 15 seconds. I will also talk about some other design considerations that nudged us in the general direction of ELT, the prototypes we evaluated to help us achieve our goal, and about how we settled on our final selection.

Existing Infrastructure and Shortcomings

The following diagram briefly outlines our architecture:

Every ad request, and its corresponding user engagement, is captured by a series of web servers. These web servers encode the details of this interaction as protobuf (protocol buffers), and persist them into a series of topics in our Kafka cluster. We used a data transformation tool called Pentaho to read this data from Kafka (one consumer process per topic), apply business rules (such as fraud detection) and normalize the protobuf data into csv files that could subsequently be loaded into Snowflake (via S3).

At this point, a few limitations I imagine scream out to the data engineers in the house:

  1. Transform data before persisting? What if we introduce a bug in the transformation logic?
  2. Do all the messages for a topic really need to flow through a SINGLE process on one box?
  3. Protobuf is an extensible data format. What if more fields are added upstream? Wouldn’t that mean that these new fields would be lost for all time until the transformation logic is updated (maintainability challenge)?

Having to serially process all the data for a topic was a consequence of applying business rules during transformation. This had the side effect that the recovery pipeline (in case of any catastrophic failure) could not be processed in parallel with standard production pipeline. This also meant that there was no scale-out strategy. If the volume of data quadrupled (likely to happen before end of the year), we would have trouble keeping up with the increased data volume.

Design Goals for a New System

After being repeatedly hazed by the existing system, it was time to re-architect. We set the following mandatory goals for any new strategies we were evaluating:

  1. Should be able to scale to 10x the ad requests with ease
  2. Pipeline should be able to scale horizontally
  3. It should be possible to run a recovery pipeline in parallel with the production pipeline
  4. Easy to code review / unit test

We also set the following criterion as good / great to have:

  1. Low latency from Kafka to Snowflake
  2. Negligible mean time to recovery
  3. Push-button horizontal scaling for data pipeline infrastructure

Candidates Evaluated

Given the weaknesses listed above, one of the first things that became apparent to us was the immediate need to switch from an ETL (Extract Transform Load) system to an ELT (Extract Load Transform) system. This would mean that data would be persisted to a data store as is before we applied any business logic / transformations to it. We evaluated many prototypes including:

  1. A simple Kafka high level consumer deployed to EC2 instances as necessary
  2. Kafka S3 Connector
  3. Amazon Kinesis
  4. Spark streaming on a cluster set-up using either Spark-EC2 / Flintrock
  5. Spark streaming on Amazon EMR

After extensive research, and, given the nature of our constraints, we finally settled on Spark streaming on Amazon EMR. Here is a quick summary that sheds some light on the decision-making criterion.

Simple Kafka Consumer

From a transition perspective, our simplest option would perhaps have been to leverage the Kafka high level consumer API and deploy a new set of consumers on the EC2 instances, where our existing ETL tool (Pentaho) was installed. This option had the following pro’s:

  1. We could simply repurpose the existing hardware
  2. Zero additional software licensing costs
  3. We could continue leveraging these boxes to run processes other than ETL

However, it also came with the following disadvantages:

  1. Scaling out would be a non-trivial exercise including steps such as provisioning new EC2 instances, installing java, deploying ELT code. In short it would not be a push button scale out strategy.
  2. Since we were not using an open source scheduler (such as Airflow / Luigi etc..), we would need to purchase licenses for each hardware instance.

Estimated Cost of Operation:

6 * M4.2XL (Kafka Cluster)  + 3 * M4.2XL (Kafka Consumer Instance) ~ $$

Spark Streaming on EC2 using either Spark-EC2 / Flintrock

Spark streaming had always been a strong contender for a solution to replace our existing ETL infrastructure. It was extremely attractive that adding new nodes to the cluster, would trigger a rebalance of the kafka partitions amongst the individual consumers without any manual intervention (push button scaling!). Leveraging spark-ec2 or flintrock to deploy the cluster would have the following advantages:

  1. Provisioning a cluster is an extremely simple exercise given the rich API

  2. Zero software licensing costs (or so I believe)

  3. Scaling out and scaling in are relatively simple. For instance,

However,

  1. At the time of evaluation, it did not support launching spark cluster into a VPC
  2. Required additional tuning post setup to get it to work as advertised
  3. There was no out of the box support for auto scaling the cluster
  4. Spark-ec2 was no longer being actively maintained
  5. The author of flintrock did not recommend using his tool to deploy long running production clusters

Estimated Cost of Operation:

6 * M4.2XL (Kafka Cluster)  + 
1 * M4.2XL  (Spark master node) +3 * M4.2XL  (Spark slave nodes) ~ $$

Spark streaming on Amazon EMR

Given the limitations of the systems enlisted above, and the luck we have had with managed services such as Snowflake and Looker in the past, we decided to give EMR a chance. It won the battle on the following counts:

  1. Provisioning a cluster was an extremely simple task

  2. Scaling in and scaling out were push of a button tasks.

  3. EMR also supports auto scaling based on cloud watch metrics. Meaning, it can add new nodes to the cluster if for any reason the streaming process was running low on resources

  4. Amazon EMR provides fast Amazon S3 connectivity using the Amazon EMR File System (EMRFS). In our testing, we found writes to S3 to be much more reliable while using EMR v/s a self provisioned cluster on EC2.

Estimated Cost of Operation:

6 * M4.2XL (Kafka Cluster)  + 
1 * M4.2XL  (Spark master node) + 3 * M4.2XL (Spark slave nodes) +
4 * M4.2XL (EMR cost which is close to nothing) ~ $$

Honorable Mention - Amazon Kinesis

We have had great success with Amazon Kinesis pipeline in a different workflow here at Yieldmo, and hence it was also a contender. It has all the great features that comes with a managed service such as auto scaling, fault tolerance etc… However, we realized it would be a non-trivial exercise to get this to work because:

  1. Upstream data was propagated and maintained as protobufs. This would require pre-processing using Lambda, as Kinesis cannot infer protobuf schema
  2. There were other teams downstream that relied on data being available in Kafka (in protobuf format at that)

In short a lot of teams would need to make changes, and the clock was ticking.

Estimated Cost Of Operation:

Nature of data - (2500 records/sec) @ (3KB/record) +
           (6000 records/sec) @ (1KB/record)
Kinesis Data Stream +Kinesis Firehose ~ $$$$

Following is a summary table of all the candidates we evaluated based on our decision making criterion

** Requires pre-processing using Lambda, as Kinesis cannot infer protobuf schema

The resulting architecture looks like this:

Results

The switch in architecture to Spark Streaming resulted in the following improvements:

Summary

To summarize, switching from ETL to ELT helps us design a distributed, pipeline that is set to scale better with growth in business. The infrastructure is a lot simpler to monitor for any anomalies, and requires little to no maintenance once setup.

Posted by: Rohit Mathews, Big Data Architect