Much like the legendary Gordian Knot, sometimes when a problem becomes too thorny to solve, you just need to approach it from a different perspective. Especially in the fast-paced world of data, it’s tempting to devise complex systems when a simple one works just as well.
A few months ago, the analytics team at JW Player started working on breaking apart the usage computation from our larger daily pipeline. This usage data needed to be more timely to help our customers avoid overage charges or unexpected loss of service. We called this new pipeline Usage-Mini because it would run more frequently on smaller batches of data as they arrive.
As part of splitting apart the usage aggregation, we redesigned the pipeline, making one key decision that has led to huge improvements in performance, monitoring, and stability. This change was to reformulate our definition of batch from fixed time to fixed size. What’s the big deal about that? Keep reading to understand the impact of this simple choice.
A tale of two batches
Until now, every one of our batch pipelines operated on data within a fixed time interval, whether that’s monthly, daily, or every 15 minutes. Any given batch is defined by a start and end time. With our new pipeline, we redefined a batch to be an arbitrary set of data of a fixed size.
The diagram below shows how fixing the size, rather than the time interval, changes the set of files included in each batch. We make each run more consistent, at the expense of a little overhead.
Looking at the difference between what a batch includes in a fixed interval versus fixed size scheme for a sample of six files.
How We Create Batches
Our pipeline processes nginx log-lines that come from a cluster of web servers. These servers receive around 40,000 pings per second from web clients around the world using JW Player. The pings are logged, and once the log file reaches a certain size, it’s uploaded to Amazon S3.
We run a cron job every minute to determine if the number of unprocessed files reaches our batch size threshold. At that point, we create a new batch. The batch metadata is stored in MySQL tables. One table, MiniBatch, contains all the batch names, and another, MiniBatchFile, contains all the filenames. They are joined by a foreign key, BatchID. We also create a Manifest File of all the filenames for the batch, analogous to a flight manifest. Our batch processing job (MapReduce) directly reads in the manifest file to avoid any database dependencies. The output of the Map Reduce is parsed logs structured in AVRO format. Finally, the parsed logs are aggregated by a Hive job to produce the account level roll-ups. For Usage-Mini, we keep track of the total video plays and ad impressions served by each publisher.
So what makes this new batch scheme better? At first, it seems like more work to create the Manifest File. Earlier I mentioned we saw improvements in three areas – performance, monitoring, and stability.
For those who haven’t figured it out yet, the major advantage of fixing the input size is making the system easier to understand and tune. Our web traffic, much like the rest of the Internet, ebbs and flows, so using a time-based batch scheme means we have to either dynamically adjust our hardware allocation, or allot for the worst case scenario. In our case, with fixed-time batches we had opted for the latter, meaning our AWS bill was higher than it needed to be.
In contrast, allocating hardware for a fixed-size batch is trivial. Our MapReduce cluster contains the exact number of cores as the batch size. Thus all the mapper tasks run in parallel, and we fully utilize the cluster. And as our traffic ramps up in the future, we can easily scale out by increasing our batch size and allocating just as many cores.
Not only is our MapReduce cluster utilized more efficiently; our runtimes for each batch become closer together. In the case of our Periodic Pipeline, which runs on 15 minutes intervals, the runtime varied by over 400% between the average and longest durations. This inconsistency made it very hard to monitor the pipeline and alert on stalled jobs. After switching, we found the runtime varied by only 30%. Most of this remaining deviation comes from our log files not all being the same size.
Actual Datadog metrics of the run-times of our batch pipelines that are interval-based (left) and fixed-size (right). Which one would you choose?
With a time-based approach, all the log files belonging to a given time interval must arrive on S3 before that interval is processed. If one of our ping nodes uploads the log files late, then we need to rerun that entire interval and overwrite the output. It’s a messy solution and prone to errors, especially when downstream pipelines rely on the output.
Our fixed-batch solution avoids this issue by adding new files to a batch as soon as they arrive. Since we’re only computing aggregates, it doesn’t matter that log files uploaded during the same interval belong to the same batch.
There is an edge case, however, of files for the previous day that are received after midnight. All of our data is aggregated on a daily basis through a compaction job. This cron script simply aggregates all the batches to get the total usage data for the day. We hold off running the compaction job until 1:00a.m. to leave room for delayed files, but if one comes afterwards, we’d have to rerun the compaction job and overwrite the daily aggregate.
By adjusting a fundamental premise of our batch pipeline, we were able to get big wins with a little extra overhead. Usage-Mini has been running in production since January and we’ve seen the difference already in performance and cost savings. We’re working on migrating all of our daily batch pipelines over to this fixed sized approach.
As a startup matures, there comes a point when engineers have to go back and rewrite legacy code. It’s important to revisit underlying assumptions in the design and use the benefit of hindsight to validate them or find a better way.