How I scaled Amazon’s Load Generator to run on 1000s of machines

Moving from a single-host process to a distributed system

Carlos Arguelles
10 min readOct 21, 2023

July 10, 2019. As Taylor Swift was giving her very first live performance of “You Need To Calm Down” at the New York’s Hammerstein Ballroom, I was nothing but calm. This was being livestreamed to all 200 million Amazon Prime members across the globe. As the concert progressed without a glitch, I was delighted. Taylor Swift is charming, of course, but the main reason I was delighted was that a technology I had created while working at Amazon had been used to validate that we could handle millions of happy Swifties, and it was working flawlessly. Behind the scenes, I had been collaborating for months with some really awesome engineers from Prime Video to make it so.

I’ve written the story of how I created and grew the infrastructure that Amazon uses for Load and Performance testing (“TPSGenerator”), but I never really wrote about how I scaled it.

My first version was pretty scrappy, and was just single-host. You’d run it as a command line tool, specifying how many thousands of transactions (“TPS”) per second you wanted, and TPSGenerator would spawn off potentially thousands of threads, happily consuming all the hardware resources you were willing to sacrifice. In a futile attempt to avoid a distributed systems problem, I profiled every line of code and optimized it to the nth degree. In a regular machine, it could generate a few hundred thousand TPS, which was sufficient for a lot of services out there.

But Cybermonday was coming, and that was the busiest shopping day of the year. As the date grew closer, more teams started asking me how they could generate more throughput than what a single machine could, to validate that their services were going to handle the peak.

Amazon culture was extremely scrappy and pragmatic, and my first few answers were something like that: “humm, scale vertically, get a bigger machine,” or “well you can ssh to several machines and run TPSGenerator on each machine.” These solutions were definitely scrappy and unsatisfying (to me and to my customers). I knew I needed something better than that.

The “ssh to N machines and run a process on each one” method was not great, but it did work well in the “happy” case. Imagine you wanted to generate 40,000 TPS, and each machine could only generate 10,000 TPS. You could ssh to 4 machines and kick off a TPSGenerator process on each one. The system under test would receive 40,000 TPS of synthetic traffic from these machines.

This broke of course when a particular machine went down or went rogue, because the other machines weren’t aware of the problem, and you’d end up applying less load than you intended:

Or when a particular machine wasn’t able to generate as much load as the others, either because it had different hardware specs, or because of a faulty network card, or some random process running in the background:

There was a plethora of additional problems: How could I know how many hosts I needed? How could I emergency stop a process running remotely on potentially hundreds or thousands of machines? What if they didn’t respond? And the toil to launch, and monitor the entire test, was massive.

As I thought deeper about the space, the main problem that made this optimization difficult was that TPSGenerator was going to be executing arbitrary customer code at high throughput. Not all transactions are created equal. Some are CPU bound, some by network bandwidth. Some can execute fast (ms), some slow (seconds). How do you optimize a distributed system when you don’t know the data patterns you’re optimizing for?

My aha-moment was when I realized I could apply one of the oldest Computer Science techniques to this problem: Divide-And-Conquer:

I could decompose the task of “run 40,000 TPS for 20 minutes” into 400 smaller tasks of “run 100 TPS for 20 minutes” running concurrently. And I could further decompose that task of “run 100 TPS for 20 minutes” into 20 smaller tasks of “run 100 TPS for 1 minute” running sequentially.

So rather than thinking about the big task of “run 40,000 TPS for 20 minutes”, I needed to be thinking about how do I orchestrate 8000 little tiny tasks of “run 100 TPS for 1 minute”

A Controller → Queue → Worker architecture was a perfect fit.

You asked a Controller to orchestrate the running of 40,000 TPS for 20 minutes. The Controller would decompose it into the smaller tasks, and was responsible for placing messages in a queue at the rate at which they needed to be running. A bunch of Workers would be listening to the queue, and dequeue and execute jobs. The Workers were the ones actually generating the synthetic test load and hitting the System Under Test.

This was very simple, but extremely powerful.

Decoupling Controller from Worker came with a lot of benefits. The Controller didn’t need to know or care who was actually doing the work. It just knew that it was placing work in a queue and if everything was ok, there was “something” dequeuing this work as soon as it was enqueued. The queue size was a critical piece of data: if the queue size was growing, the Controller knew there weren’t enough workers, or there was an fatal underlying problem, so it could pull off an emergency stop by deleting the queue. Without a queue telling them what to do, the workers would stop immediately.

What if we didn’t have enough Workers? We could simply add more Worker hosts at any time. In the next version, I made the Controller smarter by having it talk to the EC2 AutoScaling group to increase the number of hosts reactively if it saw the queue size growing. In the next version, I played around with doing it proactively: if it knew it was going to increase the load a minute from now, it could make that request proactively now so that the additional hardware is ready to go when it’s needed.

What happened if a Worker died? Nothing bad. Other workers would just pick up the slack. In fact, Amazon SQS had a feature that gave me some extra resilience. When you dequeue a message from SQS, it doesn’t actually remove it, it just sets its Visibility flag to false, so nobody else can dequeue it. Under normal operation, once the Worker finishes processing a job successfully, it calls SQS again to actually delete it. If a Worker dies and doesn’t call SQS to delete the message, after a timeout, SQS sets the Visibility flag back to true and that message will be read and executed by a different Worker.

How did this architecture handle the problem of heterogenous fleets? That is, what about the case of a Worker not being able to generate as much load as all the other Workers?

I actually went through 3 rounds with this. In the first version, I simply punted it, because I had 2–3 months to have something up and running, so I had to make some hard tradeoffs given an aggressive timeline. You’d just run as many Workers as cores in the machine, so it was pretty simplistic. But it became more clear that I needed the workers to self-regulate and they had to be smarter about how many of these messages they could process at the same time.

So I wrote a little “traffic light” as part of the Worker code that read from the Queue. It was responsible for a Worker pausing and resuming reading from the queue. There were a number of background threads that monitored things like CPU usage, Memory usage, Network Bandwidth usage, Disk Usage, etc. When any of these hardware resources went below a customizable threshold (eg. 80% CPU), the Worker paused reading from the queue until CPU usage dipped again [AWS AutoScaling also has the capability to scale number of hosts up or down based on hardware resources, and I used that, but I also wanted a finer grain ability to scale number of threads in each host].

This second round worked surprisingly well and empowered Workers to be pretty smart about continuously self-tuning how many messages they were processing at a given time.

What I didn’t realize right away was that not all bottlenecks are hardware related. For example, most of Amazon at the time used Apache Commons MultiThreadedHttpConnectionManager, which had a DEFAULT_MAX_TOTAL_CONNECTIONS of 20. Amazon’s Service Framework, Coral, used it under the hood for all auto-generated clients, so this default was everywhere in the company. And most engineers didn’t even realize they had it. So they would ask TPSGenerator to generate thousands of TPS, but unknowingly had this arbitrary limit in their own code of 20 concurrent connections, so subsequent connections would just queue up. It was frustrating because engineers troubleshooting this would see that the machine was not utilizing a lot of hardware resources, yet it wasn’t generating a lot of traffic either.

The epiphany I had for the third version of the self-tuning code was that I actually could have a catch-all heuristic here. In the happy case, the number of transactions finished had the exact same slope as the number of transactions started. But in the case of a bottleneck that wasn’t associated with a hardware resource, I knew the slope of transactions finished was going to be significantly lower than the slope of transactions started, and that gave me a hint that the system was in trouble.

At that point, I had built a system that was hugely scalable, and could dynamically self-regulate on how many hosts to use, and how many threads each one of these hosts should be running.

But it wasn’t all smooth sailing. I ran into a thousand little problems along the way.

One major decision I made was to sacrifice some amount of precision to achieve high scale. Designing distributed systems is all about making tradeoffs given competing non-functional priorities. In single-host mode, TPSGenerator could be 100% accurate. You wanted to generate 23,476.76 TPS for 15 seconds? You got exactly 23,476.76 TPS for 15 seconds. But in multi-host mode, being able to achieve a load in the millions of TPS was more important than that exact precision. A lot of the mechanisms I had in place to self-regulate had several seconds of delay, so you couldn’t run at 4,000,007 TPS for 20 seconds then switch to running at 4,000,009 TPS for 10 seconds. It was optimized for longer running times and large throughputs at the expense of some precision. Tradeoffs in distributed systems!

I initially had hard-coded the TPS requested per queue message to be 100 for simplicity. But say I wanted to generate a load of 40MM TPS, that would have resulted in the Controller writing 400k messages to SQS every minute, or almost 7000 writes per second. So the Controller calculated the TPS-per-message as a function of the overall desired TPS, so that its write rate wasn’t so aggressive.

I had to be careful about the Workers overwhelming SQS as well, so switched them to long-polling. I also added a small random jitter on start, to avoid the thundering herd problem of thousands of workers hitting the queue all at once.

And lastly, the Elephant in the Room. The entire architecture had a single point of failure: the Controller. I played around with having a second Controller essentially in shadow mode, keeping track of the active Controller via heartbeats. This got me in the business of leader election, gossip protocols, and consensus algorithms like Raft and Paxos. In the end, I was not satisfied with the amount of complexity that this introduced given how infrequently this was an actual problem. Sometimes, the solution to a problem brings more problems than the one it fixes.

Metrics were a major painpoint. In single-host mode, TPSGenerator kept track of things like transactions executed every second, how many passed and failed, and latency percentiles per minute. It could then decide to abort the test if the error rate was too big, or latency spiked significantly above expectations. But in a distributed world, the Controller didn’t have any metrics, the Workers did. How could the Controller access these metrics to make those decisions?

I briefly considered a push or pull mechanism where the Controller could request metrics from the Workers, or the Workers could send their metrics to the Controller:

I disliked that for two simple reasons. One, it was re-coupling components I had very specifically decoupled. But most importantly, this could turn the Controller into a bottleneck if it was receiving millions of metrics per minute. The Controller was already a single-point-of-failure so I didn’t want to push my luck there.

Once again, designing distributed systems is all about tradeoffs between competing requirements. I went for an architecture where the Workers pushed their metrics to AWS Cloudwatch, and the Controller could fetch them from there. The downside was the delay: if the Workers sent their metrics directly to the Controller, it could make near real-time decisions about the state of the world. But with Cloudwatch as intermediary, there was a potential multi-minute delay, taking into account potentially late-arriving metrics from stray hosts. It was a tradeoff I begrudgingly accepted to avoid a guaranteed bottleneck.

There were a million additional little tweaks and heuristics that we added to the algorithm and the product over the course of many years. But the bones are still the same today. I left Amazon in 2020, but it’s fun to think that this algorithm runs on thousands of machines in Amazon data centers even today, at millions of TPS. 🥂

--

--

Carlos Arguelles
Carlos Arguelles

Written by Carlos Arguelles

Hi! I'm a Senior Principal Engineer (L8) at Amazon. In the last 26 years, I've worked at Google and Microsoft as well.

Responses (38)