Our team recently encountered a thorny issue in a distributed system using Spring Boot 3, JDK 21, and AWS ECS. The system processes messages from several SQS queues, with a scheduled task running every 10 seconds to dequeue and process messages. While it worked perfectly under light load or when external services were responsive, things fell apart under heavier load and slower response times.
Here’s a breakdown of the problem and how we solved it.
The Problem: A Backlog That Never Ends
The scheduled task is designed to loop over SQS queue URLs and process up to 10 messages per queue in parallel:
- Scheduled Task:
Runs every 10 seconds using@Scheduled(fixedDelay = 10000)
. For each queue URL, it submits a task to a thread pool (asyncTaskExecutor
) that callsdequeue(queueUrl)
. - Dequeue Logic:
- Retrieves up to 10 messages from SQS.Submits each message to another thread pool (
asyncTaskExecutorB
) for processing.Processing involves callingdoWork()
, which can take 50ms to 10 seconds, and then deleting the message from the queue.
List<Message> messages = sqsClient.dequeue(queueUrl, 10, ...);
for (Message message : messages) {
asyncTaskExecutorB.submit(() -> {
doWork();
deleteMessage(message);
});
}
- Retrieves up to 10 messages from SQS.Submits each message to another thread pool (
- The Bottleneck:
- Thread Contention: With a core pool size of 5 for
asyncTaskExecutorB
, slowdoWork()
executions quickly consumed all available threads. - Visibility Timeout Breaches: Messages that weren’t deleted before their visibility timeout were re-queued, leading to duplicate processing.
- Message Backlog Growth: With new messages arriving at ~500 per minute across queues, the backlog grew faster than it could be processed.
- Thread Contention: With a core pool size of 5 for
The Solution: Backpressure and Smarter Scheduling
To address this issue, we needed to introduce backpressure into the system while keeping the processing pipeline efficient and resilient. Here’s what we did:
1. Controlled Concurrency with Semaphore
We replaced the unconstrained submission of tasks to asyncTaskExecutorB
with a Semaphore to control the number of concurrent tasks across all queues. This ensured we didn’t overload the thread pool.
private final Semaphore semaphore = new Semaphore(50); // Limit to 50 concurrent tasks
for (Message message : messages) {
semaphore.acquire(); // Block if limit is reached
asyncTaskExecutorB.submit(() -> {
try {
doWork();
deleteMessage(message);
} finally {
semaphore.release(); // Release semaphore when task completes
}
});
}
Why it works:
The semaphore ensures that we process no more than 50 messages concurrently, preventing thread starvation and managing the overall system load.
2. Dynamic Visibility Timeout
We dynamically set the visibility timeout for each batch of messages based on the observed processing times of doWork()
.
- Added metrics to measure the average time spent in
doWork()
. - Adjusted the visibility timeout for dequeued messages to
averageDoWorkTime * 2
, ensuring enough buffer to complete processing.
int visibilityTimeout = Math.max((int) (averageDoWorkTime * 2), 30); // Minimum of 30 seconds
List<Message> messages = sqsClient.dequeue(queueUrl, 10, visibilityTimeout, ...);
3. Rate-Limiting the Scheduled Task
We introduced rate-limiting to the @Scheduled
task using token buckets to prevent it from overwhelming the system during high load.
private final RateLimiter rateLimiter = RateLimiter.create(50); // 50 tasks per second
for (String queueUrl : queueUrls) {
rateLimiter.acquire(); // Blocks if rate limit is exceeded
asyncTaskExecutor.submit(() -> dequeue(queueUrl));
}
Why it works:
This approach spreads out the load, allowing downstream systems and thread pools to keep up without saturating resources.
4. Adopting Java 21 Virtual Threads
With Java 21, we experimented with virtual threads to replace asyncTaskExecutorB
. Virtual threads dramatically reduce thread contention, as they are not bound to OS threads.
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (Message message : messages) {
executor.submit(() -> {
doWork();
deleteMessage(message);
});
}
}
Impact:
- Simplified thread management.
- Eliminated thread starvation, as virtual threads scale with the workload.
Results: A Resilient System Under Load
After these changes, the system showed marked improvements:
- No More Visibility Timeout Breaches: With dynamic timeouts and limited concurrency, messages were consistently deleted before reappearing in the queue.
- Stable Backlog Growth: The semaphore and rate limiter prevented unchecked growth, ensuring messages were processed at a sustainable pace.
- Improved Throughput: Virtual threads increased the overall efficiency, handling more messages with fewer resources.
Lessons Learned
- Backpressure is Critical: Unbounded task submission is a recipe for disaster. Adding controls like semaphores and rate limiters keeps the system stable.
- Observe and Adapt: Monitoring processing times allowed us to dynamically adjust visibility timeouts, preventing unnecessary retries.
- Modern Java Rocks: Java 21’s virtual threads are a game-changer for high-concurrency workloads. If you haven’t tried them, now is the time.
Final Thoughts
Distributed systems are inherently complex, especially when dealing with unpredictable workloads and latencies. By introducing backpressure, fine-tuning visibility timeouts, and embracing modern Java features, we turned an unsustainable system into one that gracefully handles both peaks and troughs of traffic. If you’re facing similar challenges, we hope this approach inspires your solution.
Leave a Reply