From 1da0f41381e0b1742a52b6fdcc3384e2d8bc9f54 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ilkka=20Sepp=C3=A4l=C3=A4?= Date: Sat, 18 May 2024 08:54:56 +0300 Subject: [PATCH] docs: update queue-based load leveling --- queue-load-leveling/README.md | 346 +++++++----------- .../com/iluwatar/queue/load/leveling/App.java | 2 +- .../queue/load/leveling/ServiceExecutor.java | 4 +- .../queue/load/leveling/TaskGenerator.java | 2 +- .../load/leveling/TaskGenSrvExeTest.java | 5 +- 5 files changed, 133 insertions(+), 226 deletions(-) diff --git a/queue-load-leveling/README.md b/queue-load-leveling/README.md index 49c37e623..e5091d971 100644 --- a/queue-load-leveling/README.md +++ b/queue-load-leveling/README.md @@ -1,267 +1,148 @@ --- -title: Queue based load leveling -category: Concurrency +title: Queue-Based Load Leveling +category: Resilience language: en tag: - - Decoupling - - Performance - - Cloud distributed + - Asynchronous + - Buffering + - Decoupling + - Fault tolerance + - Messaging + - Scalability + - Synchronization + - Thread management --- +## Also known as + +* Load Leveling +* Message Queuing + ## Intent -Use a queue that acts as a buffer between a task and a service that it invokes in order to smooth -intermittent heavy loads that may otherwise cause the service to fail or the task to time out. -This pattern can help to minimize the impact of peaks in demand on availability and responsiveness -for both the task and the service. + +Queue-Based Load Leveling aims to manage the load in a system by using a queue to level the workload between producers and consumers, ensuring that heavy loads are handled smoothly without overwhelming the system. ## Explanation -Real world example -> A Microsoft Azure web role stores data by using a separate storage service. If a large number of instances of the web -> role run concurrently, it is possible that the storage service could be overwhelmed and be unable to respond to requests -> quickly enough to prevent these requests from timing out or failing. +Real-world example + +> Imagine a popular restaurant with a limited number of kitchen staff (consumers) and a large number of customers placing orders (producers). During peak hours, if all customers were served immediately, the kitchen would be overwhelmed, leading to long wait times and potential mistakes in orders. To manage this, the restaurant implements a queue-based load leveling system using a ticketing machine. +> +> When customers place orders, they receive a ticket number and their order is placed in a queue. The kitchen staff then processes orders one at a time in the order they were received. This ensures that the kitchen can handle the workload at a manageable pace, preventing overload and maintaining service quality. Customers wait comfortably knowing their order is in line and will be handled efficiently, even during the busiest times. In plain words -> Makes resource-load balanced by ensuring an intermediate data structure like queue that makes bridge -> between service-takers and service-givers. Where both takers and givers are running asynchronously and -> service-takers can tolerate some amount of delay to get feedback. -> + +> Queue-Based Load Leveling is a design pattern that uses a queue to manage and balance the workload between producers and consumers, preventing system overload and ensuring smooth processing. Wikipedia says -> In computing, load balancing is the process of distributing a set of tasks over a set of resources -> (computing units), with the aim of making their overall processing more efficient. Load balancing can -> optimize the response time and avoid unevenly overloading some compute nodes while other compute nodes -> are left idle. +> Message Queues are essential components for inter-process communication (IPC) and inter-thread communication, using queues to manage the passing of messages. They help in decoupling producers and consumers, allowing asynchronous processing, which is a key aspect of the Queue-Based Load Leveling pattern. **Programmatic Example** -TaskGenerator implements Task, runnable interfaces. Hence, It runs asynchronously. +The Queue-Based Load Leveling pattern helps to manage high-volume, sporadic bursts of tasks that can overwhelm a system. It uses a queue as a buffer to hold tasks, decoupling the task generation from task processing. The tasks are then processed at a manageable rate. + +First, let's look at the `MessageQueue` and `Message` classes. The `MessageQueue` acts as a buffer, storing messages until they are retrieved by the `ServiceExecutor`. The `Message` represents the tasks to be processed. ```java -/** - * Task Interface. - */ -public interface Task { - void submit(Message msg); +public class Message { + // Message details +} + +public class MessageQueue { + private Queue queue; + + public MessageQueue() { + queue = new LinkedList<>(); + } + + // Method to add a message to the queue + public void addMessage(Message message) { + queue.add(message); + } + + // Method to retrieve a message from the queue + public Message getMessage() { + return queue.poll(); + } } ``` -It submits tasks to ServiceExecutor to serve tasks. + +Next, we have the `TaskGenerator` class. This class represents the task producers. It generates tasks and submits them to the `MessageQueue`. + ```java -/** - * TaskGenerator class. Each TaskGenerator thread will be a Worker which submit's messages to the - * queue. We need to mention the message count for each of the TaskGenerator threads. - */ -@Slf4j -public class TaskGenerator implements Task, Runnable { +public class TaskGenerator implements Runnable { + private MessageQueue msgQueue; + private int taskCount; - // MessageQueue reference using which we will submit our messages. - private final MessageQueue msgQueue; - - // Total message count that a TaskGenerator will submit. - private final int msgCount; - - // Parameterized constructor. - public TaskGenerator(MessageQueue msgQueue, int msgCount) { + public TaskGenerator(MessageQueue msgQueue, int taskCount) { this.msgQueue = msgQueue; - this.msgCount = msgCount; + this.taskCount = taskCount; } - /** - * Submit messages to the Blocking Queue. - */ - public void submit(Message msg) { - try { - this.msgQueue.submitMsg(msg); - } catch (Exception e) { - LOGGER.error(e.getMessage()); - } - } - - /** - * Each TaskGenerator thread will submit all the messages to the Queue. After every message - * submission TaskGenerator thread will sleep for 1 second. - */ + @Override public void run() { - var count = this.msgCount; - - try { - while (count > 0) { - var statusMsg = "Message-" + count + " submitted by " + Thread.currentThread().getName(); - this.submit(new Message(statusMsg)); - - LOGGER.info(statusMsg); - - // reduce the message count. - count--; - - // Make the current thread to sleep after every Message submission. - Thread.sleep(1000); - } - } catch (Exception e) { - LOGGER.error(e.getMessage()); + for (int i = 0; i < taskCount; i++) { + Message message = new Message(); // Create a new message + msgQueue.addMessage(message); // Add the message to the queue } } } ``` -It also implements runnable interface and run asynchronously. It retrieves tasks one by one -from blockingQueue to serve. -```java -/** - * ServiceExecuotr class. This class will pick up Messages one by one from the Blocking Queue and - * process them. - */ -@Slf4j -public class ServiceExecutor implements Runnable { - private final MessageQueue msgQueue; +The `ServiceExecutor` class represents the task consumer. It retrieves tasks from the `MessageQueue` and processes them. + +```java +public class ServiceExecutor implements Runnable { + private MessageQueue msgQueue; public ServiceExecutor(MessageQueue msgQueue) { this.msgQueue = msgQueue; } - /** - * The ServiceExecutor thread will retrieve each message and process it. - */ - public void run() { - try { - while (!Thread.currentThread().isInterrupted()) { - var msg = msgQueue.retrieveMsg(); - - if (null != msg) { - LOGGER.info(msg.toString() + " is served."); - } else { - LOGGER.info("Service Executor: Waiting for Messages to serve .. "); - } - - Thread.sleep(1000); - } - } catch (Exception e) { - LOGGER.error(e.getMessage()); - } - } -} -``` - -BlockingQueue data-structure is used in MessageQueue class for acting buffer -between TaskGenerator to ServiceExecutor. - -```java -public class MessageQueue { - - private final BlockingQueue blkQueue; - - // Default constructor when called creates Blocking Queue object. - public MessageQueue() { - this.blkQueue = new ArrayBlockingQueue<>(1024); - } - - /** - * All the TaskGenerator threads will call this method to insert the Messages in to the Blocking - * Queue. - */ - public void submitMsg(Message msg) { - try { - if (null != msg) { - blkQueue.add(msg); - } - } catch (Exception e) { - LOGGER.error(e.getMessage()); - } - } - - /** - * All the messages will be retrieved by the ServiceExecutor by calling this method and process - * them. Retrieves and removes the head of this queue, or returns null if this queue is empty. - */ - public Message retrieveMsg() { - try { - return blkQueue.poll(); - } catch (Exception e) { - LOGGER.error(e.getMessage()); - } - return null; - } -} -``` -TaskGenerator submit message object to ServiceExecutor for serving. -```java -/** - * Message class with only one parameter. - */ -@Getter -@RequiredArgsConstructor -public class Message { - private final String msg; - @Override - public String toString() { - return msg; + public void run() { + while (true) { + Message message = msgQueue.getMessage(); // Retrieve a message from the queue + if (message != null) { + // Process the message + } else { + // No more messages to process + break; + } + } } } ``` -To simulate the situation ExecutorService is used here. ExecutorService automatically provides a pool of threads and -an API for assigning tasks to it. + +Finally, we have the `App` class which sets up the `TaskGenerator` and `ServiceExecutor` threads and submits them to an `ExecutorService`. + ```java public class App { - - //Executor shut down time limit. - private static final int SHUTDOWN_TIME = 15; - - /** - * Program entry point. - * - * @param args command line args - */ public static void main(String[] args) { + var msgQueue = new MessageQueue(); - // An Executor that provides methods to manage termination and methods that can - // produce a Future for tracking progress of one or more asynchronous tasks. - ExecutorService executor = null; + final var taskRunnable1 = new TaskGenerator(msgQueue, 5); + final var taskRunnable2 = new TaskGenerator(msgQueue, 1); + final var taskRunnable3 = new TaskGenerator(msgQueue, 2); - try { - // Create a MessageQueue object. - var msgQueue = new MessageQueue(); + final var srvRunnable = new ServiceExecutor(msgQueue); - LOGGER.info("Submitting TaskGenerators and ServiceExecutor threads."); + ExecutorService executor = Executors.newFixedThreadPool(2); + executor.submit(taskRunnable1); + executor.submit(taskRunnable2); + executor.submit(taskRunnable3); + executor.submit(srvRunnable); - // Create three TaskGenerator threads. Each of them will submit different number of jobs. - final var taskRunnable1 = new TaskGenerator(msgQueue, 5); - final var taskRunnable2 = new TaskGenerator(msgQueue, 1); - final var taskRunnable3 = new TaskGenerator(msgQueue, 2); - - // Create e service which should process the submitted jobs. - final var srvRunnable = new ServiceExecutor(msgQueue); - - // Create a ThreadPool of 2 threads and - // submit all Runnable task for execution to executor.. - executor = Executors.newFixedThreadPool(2); - executor.submit(taskRunnable1); - executor.submit(taskRunnable2); - executor.submit(taskRunnable3); - - // submitting serviceExecutor thread to the Executor service. - executor.submit(srvRunnable); - - // Initiates an orderly shutdown. - LOGGER.info("Initiating shutdown." - + " Executor will shutdown only after all the Threads are completed."); - executor.shutdown(); - - // Wait for SHUTDOWN_TIME seconds for all the threads to complete - // their tasks and then shut down the executor and then exit. - if (!executor.awaitTermination(SHUTDOWN_TIME, TimeUnit.SECONDS)) { - LOGGER.info("Executor was shut down and Exiting."); - executor.shutdownNow(); - } - } catch (Exception e) { - LOGGER.error(e.getMessage()); - } + executor.shutdown(); } } ``` -The console output +In this example, the `TaskGenerator` threads generate tasks at a variable rate and submit them to the `MessageQueue`. The `ServiceExecutor` retrieves the tasks from the queue and processes them at its own pace, preventing the system from being overwhelmed by peak loads. + +Running the application produces the following console output: + ``` [main] INFO App - Submitting TaskGenerators and ServiceExecutor threads. [main] INFO App - Initiating shutdown. Executor will shutdown only after all the Threads are completed. @@ -290,18 +171,45 @@ The console output ``` ## Class diagram -![alt text](./etc/queue-load-leveling.gif "queue-load-leveling") + +![Queue-Based Load Leveling](./etc/queue-load-leveling.gif "Queue-Based Load Leveling") ## Applicability -* This pattern is ideally suited to any type of application that uses services that may be subject to overloading. -* This pattern might not be suitable if the application expects a response from the service with minimal latency. +* When there are variable workloads, and you need to ensure that peak loads do not overwhelm the system +* In distributed systems where tasks are produced at a different rate than they are consumed +* For decoupling producers and consumers in an asynchronous messaging system -## Tutorials -* [Queue-Based Load Leveling Pattern](http://java-design-patterns.com/blog/queue-load-leveling/) +## Known Uses +* Amazon Web Services (AWS) Simple Queue Service (SQS) +* RabbitMQ +* Java Message Service (JMS) in enterprise Java applications + +## Consequences + +Benefits: + +* Decouples the producers and consumers, allowing each to operate at its own pace +* Increases system resilience and fault tolerance by preventing overload conditions +* Enhances scalability by allowing more consumers to be added to handle increased load + +Trade-offs: + +* Adds complexity to the system architecture +* May introduce latency as messages need to be queued and dequeued +* Requires additional components (queues) to be managed and monitored + +## Related Patterns + +* Asynchronous Messaging: Queue-Based Load Leveling uses asynchronous messaging to decouple producers and consumers +* [Circuit Breaker](https://java-design-patterns.com/patterns/circuit-breaker/): Often used in conjunction with Queue-Based Load Leveling to prevent system overloads by temporarily halting message processing +* [Producer-Consumer](https://java-design-patterns.com/patterns/producer-consumer/): Queue-Based Load Leveling is a specific application of the Producer-Consumer pattern where the queue serves as the intermediary +* [Retry](https://java-design-patterns.com/patterns/retry/): Works with Queue-Based Load Leveling to handle transient failures by retrying failed operations ## Credits -* [Queue-Based Load Leveling pattern](https://docs.microsoft.com/en-us/azure/architecture/patterns/queue-based-load-leveling) -* [Load-Balancing](https://www.wikiwand.com/en/Load_balancing_(computing)) +* [Designing Data-Intensive Applications: The Big Ideas Behind Reliable, Scalable, and Maintainable Systems](https://amzn.to/3y6yv1z) +* [Enterprise Integration Patterns: Designing, Building, and Deploying Messaging Solutions](https://amzn.to/3WcFVui) +* [Patterns of Enterprise Application Architecture](https://amzn.to/3WfKBPR) +* [Queue-Based Load Leveling - Microsoft](https://docs.microsoft.com/en-us/azure/architecture/patterns/queue-based-load-leveling) diff --git a/queue-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/App.java b/queue-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/App.java index 1f6f84cde..7042ff7b7 100644 --- a/queue-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/App.java +++ b/queue-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/App.java @@ -90,7 +90,7 @@ public class App { final var srvRunnable = new ServiceExecutor(msgQueue); // Create a ThreadPool of 2 threads and - // submit all Runnable task for execution to executor.. + // submit all Runnable task for execution to executor executor = Executors.newFixedThreadPool(2); executor.submit(taskRunnable1); executor.submit(taskRunnable2); diff --git a/queue-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/ServiceExecutor.java b/queue-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/ServiceExecutor.java index a9e1e0271..02530042b 100644 --- a/queue-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/ServiceExecutor.java +++ b/queue-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/ServiceExecutor.java @@ -48,7 +48,7 @@ public class ServiceExecutor implements Runnable { var msg = msgQueue.retrieveMsg(); if (null != msg) { - LOGGER.info(msg.toString() + " is served."); + LOGGER.info(msg + " is served."); } else { LOGGER.info("Service Executor: Waiting for Messages to serve .. "); } @@ -59,4 +59,4 @@ public class ServiceExecutor implements Runnable { LOGGER.error(e.getMessage()); } } -} \ No newline at end of file +} diff --git a/queue-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/TaskGenerator.java b/queue-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/TaskGenerator.java index 73babb9d0..9b1407277 100644 --- a/queue-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/TaskGenerator.java +++ b/queue-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/TaskGenerator.java @@ -27,7 +27,7 @@ package com.iluwatar.queue.load.leveling; import lombok.extern.slf4j.Slf4j; /** - * TaskGenerator class. Each TaskGenerator thread will be a Worker which submit's messages to the + * TaskGenerator class. Each TaskGenerator thread will be a Worker which submits messages to the * queue. We need to mention the message count for each of the TaskGenerator threads. */ @Slf4j diff --git a/queue-load-leveling/src/test/java/com/iluwatar/queue/load/leveling/TaskGenSrvExeTest.java b/queue-load-leveling/src/test/java/com/iluwatar/queue/load/leveling/TaskGenSrvExeTest.java index f8667a528..0a03bc560 100644 --- a/queue-load-leveling/src/test/java/com/iluwatar/queue/load/leveling/TaskGenSrvExeTest.java +++ b/queue-load-leveling/src/test/java/com/iluwatar/queue/load/leveling/TaskGenSrvExeTest.java @@ -24,10 +24,9 @@ */ package com.iluwatar.queue.load.leveling; -import org.junit.jupiter.api.Test; - import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.junit.jupiter.api.Test; /** * Test case for submitting Message to Blocking Queue by TaskGenerator and retrieve the message by