feat: Add Microservice Pattern, Log aggregation (#2690) (#2719)

* feat: Add Microservice Pattern, Log aggregation.

Related: #2690

* docs: Add javaDoc for public methods.

Related: #2690

---------

Co-authored-by: Ilkka Seppälä <iluwatar@users.noreply.github.com>
This commit is contained in:
YongHwan Kwon
2023-12-27 15:06:55 +09:00
committed by GitHub
parent 5df1fb6f13
commit cd2dbb72d7
12 changed files with 593 additions and 0 deletions
@@ -0,0 +1,53 @@
/*
* This project is licensed under the MIT license. Module model-view-viewmodel is using ZK framework licensed under LGPL (see lgpl-3.0.txt).
*
* The MIT License
* Copyright © 2014-2023 Ilkka Seppälä
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package com.iluwatar.logaggregation;
/**
* The main application class responsible for demonstrating the log aggregation mechanism. Creates
* services, generates logs, aggregates, and finally displays the logs.
*/
public class App {
/**
* The entry point of the application.
*
* @param args Command line arguments.
* @throws InterruptedException If any thread has interrupted the current thread.
*/
public static void main(String[] args) throws InterruptedException {
final CentralLogStore centralLogStore = new CentralLogStore();
final LogAggregator aggregator = new LogAggregator(centralLogStore, LogLevel.INFO);
final LogProducer serviceA = new LogProducer("ServiceA", aggregator);
final LogProducer serviceB = new LogProducer("ServiceB", aggregator);
serviceA.generateLog(LogLevel.INFO, "This is an INFO log from ServiceA");
serviceB.generateLog(LogLevel.ERROR, "This is an ERROR log from ServiceB");
serviceA.generateLog(LogLevel.DEBUG, "This is a DEBUG log from ServiceA");
aggregator.stop();
centralLogStore.displayLogs();
}
}
@@ -0,0 +1,63 @@
/*
* This project is licensed under the MIT license. Module model-view-viewmodel is using ZK framework licensed under LGPL (see lgpl-3.0.txt).
*
* The MIT License
* Copyright © 2014-2023 Ilkka Seppälä
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package com.iluwatar.logaggregation;
import java.util.concurrent.ConcurrentLinkedQueue;
import lombok.extern.slf4j.Slf4j;
/**
* A centralized store for logs. It collects logs from various services and stores them.
* This class is thread-safe, ensuring that logs from different services are safely stored
* concurrently without data races.
*/
@Slf4j
public class CentralLogStore {
private final ConcurrentLinkedQueue<LogEntry> logs = new ConcurrentLinkedQueue<>();
/**
* Stores the given log entry into the central log store.
*
* @param logEntry The log entry to store.
*/
public void storeLog(LogEntry logEntry) {
if (logEntry == null) {
LOGGER.error("Received null log entry. Skipping.");
return;
}
logs.offer(logEntry);
}
/**
* Displays all logs currently stored in the central log store.
*/
public void displayLogs() {
LOGGER.info("----- Centralized Logs -----");
for (LogEntry logEntry : logs) {
LOGGER.info(
logEntry.getTimestamp() + " [" + logEntry.getLevel() + "] " + logEntry.getMessage());
}
}
}
@@ -0,0 +1,120 @@
/*
* This project is licensed under the MIT license. Module model-view-viewmodel is using ZK framework licensed under LGPL (see lgpl-3.0.txt).
*
* The MIT License
* Copyright © 2014-2023 Ilkka Seppälä
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package com.iluwatar.logaggregation;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
/**
* Responsible for collecting and buffering logs from different services.
* Once the logs reach a certain threshold or after a certain time interval,
* they are flushed to the central log store. This class ensures logs are collected
* and processed asynchronously and efficiently, providing both an immediate collection
* and periodic flushing.
*/
@Slf4j
public class LogAggregator {
private static final int BUFFER_THRESHOLD = 3;
private final CentralLogStore centralLogStore;
private final ConcurrentLinkedQueue<LogEntry> buffer = new ConcurrentLinkedQueue<>();
private final LogLevel minLogLevel;
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final AtomicInteger logCount = new AtomicInteger(0);
/**
* constructor of LogAggregator.
*
* @param centralLogStore central log store implement
* @param minLogLevel min log level to store log
*/
public LogAggregator(CentralLogStore centralLogStore, LogLevel minLogLevel) {
this.centralLogStore = centralLogStore;
this.minLogLevel = minLogLevel;
startBufferFlusher();
}
/**
* Collects a given log entry, and filters it by the defined log level.
*
* @param logEntry The log entry to collect.
*/
public void collectLog(LogEntry logEntry) {
if (logEntry.getLevel() == null || minLogLevel == null) {
LOGGER.warn("Log level or threshold level is null. Skipping.");
return;
}
if (logEntry.getLevel().compareTo(minLogLevel) < 0) {
LOGGER.debug("Log level below threshold. Skipping.");
return;
}
buffer.offer(logEntry);
if (logCount.incrementAndGet() >= BUFFER_THRESHOLD) {
flushBuffer();
}
}
/**
* Stops the log aggregator service and flushes any remaining logs to
* the central log store.
*
* @throws InterruptedException If any thread has interrupted the current thread.
*/
public void stop() throws InterruptedException {
executorService.shutdownNow();
if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
LOGGER.error("Log aggregator did not terminate.");
}
flushBuffer();
}
private void flushBuffer() {
LogEntry logEntry;
while ((logEntry = buffer.poll()) != null) {
centralLogStore.storeLog(logEntry);
logCount.decrementAndGet();
}
}
private void startBufferFlusher() {
executorService.execute(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(5000); // Flush every 5 seconds.
flushBuffer();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
}
}
@@ -0,0 +1,42 @@
/*
* This project is licensed under the MIT license. Module model-view-viewmodel is using ZK framework licensed under LGPL (see lgpl-3.0.txt).
*
* The MIT License
* Copyright © 2014-2023 Ilkka Seppälä
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package com.iluwatar.logaggregation;
import java.time.LocalDateTime;
import lombok.AllArgsConstructor;
import lombok.Data;
/**
* Represents a single log entry, capturing essential details like the service name,
* log level, message, and the timestamp when the log was generated.
*/
@Data
@AllArgsConstructor
public class LogEntry {
private String serviceName;
private LogLevel level;
private String message;
private LocalDateTime timestamp;
}
@@ -0,0 +1,38 @@
/*
* This project is licensed under the MIT license. Module model-view-viewmodel is using ZK framework licensed under LGPL (see lgpl-3.0.txt).
*
* The MIT License
* Copyright © 2014-2023 Ilkka Seppälä
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package com.iluwatar.logaggregation;
/**
* Enum representing different log levels.
* Defines the severity of a log message, helping in filtering and prioritization.
* <ul>
* <li>DEBUG: Detailed information, typically of interest only when diagnosing problems.</li>
* <li>INFO: Confirmation that things are working as expected.</li>
* <li>ERROR: Indicates a problem that needs attention.</li>
* </ul>
*/
public enum LogLevel {
DEBUG, INFO, ERROR
}
@@ -0,0 +1,54 @@
/*
* This project is licensed under the MIT license. Module model-view-viewmodel is using ZK framework licensed under LGPL (see lgpl-3.0.txt).
*
* The MIT License
* Copyright © 2014-2023 Ilkka Seppälä
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package com.iluwatar.logaggregation;
import java.time.LocalDateTime;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
* Represents a service that produces logs.
* The logs are generated based on certain activities or events within the service.
* Once a log is generated, it's passed on to the aggregator for further processing.
*/
@AllArgsConstructor
@Slf4j
public class LogProducer {
private String serviceName;
private LogAggregator aggregator;
/**
* Generates a log entry with the given log level and message.
*
* @param level The level of the log.
* @param message The message of the log.
*/
public void generateLog(LogLevel level, String message) {
final LogEntry logEntry = new LogEntry(serviceName, level, message, LocalDateTime.now());
LOGGER.info("Producing log: " + logEntry.getMessage());
aggregator.collectLog(logEntry);
}
}
@@ -0,0 +1,80 @@
/*
* This project is licensed under the MIT license. Module model-view-viewmodel is using ZK framework licensed under LGPL (see lgpl-3.0.txt).
*
* The MIT License
* Copyright © 2014-2023 Ilkka Seppälä
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package com.iluwatar.logaggregation;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.time.LocalDateTime;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
class LogAggregatorTest {
@Mock
private CentralLogStore centralLogStore;
private LogAggregator logAggregator;
@BeforeEach
void setUp() {
logAggregator = new LogAggregator(centralLogStore, LogLevel.INFO);
}
@Test
void whenThreeInfoLogsAreCollected_thenCentralLogStoreShouldStoreAllOfThem() {
logAggregator.collectLog(createLogEntry(LogLevel.INFO, "Sample log message 1"));
logAggregator.collectLog(createLogEntry(LogLevel.INFO, "Sample log message 2"));
verifyNoInteractionsWithCentralLogStore();
logAggregator.collectLog(createLogEntry(LogLevel.INFO, "Sample log message 3"));
verifyCentralLogStoreInvokedTimes(3);
}
@Test
void whenDebugLogIsCollected_thenNoLogsShouldBeStored() {
logAggregator.collectLog(createLogEntry(LogLevel.DEBUG, "Sample debug log message"));
verifyNoInteractionsWithCentralLogStore();
}
private static LogEntry createLogEntry(LogLevel logLevel, String message) {
return new LogEntry("ServiceA", logLevel, message, LocalDateTime.now());
}
private void verifyNoInteractionsWithCentralLogStore() {
verify(centralLogStore, times(0)).storeLog(any());
}
private void verifyCentralLogStoreInvokedTimes(int times) {
verify(centralLogStore, times(times)).storeLog(any());
}
}