feat: Adding backpressure design pattern #3233 (#3249)

* Initial commit backpressure

temp

* Adding backpressure pattern #3233

* Fix test case #3233

* Fix formatting #3233

* Changes after review #3233

* Fix sonar error in leaderfollowers module #3233
This commit is contained in:
Sanura Hettiarachchi
2025-04-12 07:46:49 +02:00
committed by GitHub
parent cd224ea5fe
commit 0b83b6dfd1
11 changed files with 663 additions and 0 deletions
@@ -0,0 +1,76 @@
/*
* This project is licensed under the MIT license. Module model-view-viewmodel is using ZK framework licensed under LGPL (see lgpl-3.0.txt).
*
* The MIT License
* Copyright © 2014-2022 Ilkka Seppälä
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package com.iluwatar.backpressure;
import java.util.concurrent.CountDownLatch;
/**
* The Backpressure pattern is a flow control mechanism. It allows a consumer to signal to a
* producer to slow down or stop sending data when it's overwhelmed.
* <li>Prevents memory overflow, CPU thrashing, and resource exhaustion.
* <li>Ensures fair usage of resources in distributed systems.
* <li>Avoids buffer bloat and latency spikes. Key concepts of this design paradigm involves
* <li>Publisher/Producer: Generates data.
* <li>Subscriber/Consumer: Receives and processes data.
*
* <p>In this example we will create a {@link Publisher} and a {@link Subscriber}. Publisher
* will emit a stream of integer values with a predefined delay. Subscriber takes 500 ms to
* process one integer. Since the subscriber can't process the items fast enough we apply
* backpressure to the publisher so that it will request 10 items first, process 5 items and
* request for the next 5 again. After processing 5 items subscriber will keep requesting for
* another 5 until the stream ends.
*/
public class App {
protected static CountDownLatch latch;
/**
* Program entry point.
*
* @param args command line args
*/
public static void main(String[] args) throws InterruptedException {
/*
* This custom subscriber applies backpressure:
* - Has a processing delay of 0.5 milliseconds
* - Requests 10 items initially
* - Process 5 items and request for the next 5 items
*/
Subscriber sub = new Subscriber();
// slow publisher emit 15 numbers with a delay of 200 milliseconds
Publisher.publish(1, 17, 200).subscribe(sub);
latch = new CountDownLatch(1);
latch.await();
sub = new Subscriber();
// fast publisher emit 15 numbers with a delay of 1 millisecond
Publisher.publish(1, 17, 1).subscribe(sub);
latch = new CountDownLatch(1);
latch.await();
}
}
@@ -0,0 +1,44 @@
/*
* This project is licensed under the MIT license. Module model-view-viewmodel is using ZK framework licensed under LGPL (see lgpl-3.0.txt).
*
* The MIT License
* Copyright © 2014-2022 Ilkka Seppälä
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package com.iluwatar.backpressure;
import java.time.Duration;
import reactor.core.publisher.Flux;
/** This class is the publisher that generates the data stream. */
public class Publisher {
/**
* On message method will trigger when the subscribed event is published.
*
* @param start starting integer
* @param count how many integers to emit
* @param delay delay between each item in milliseconds
* @return a flux stream of integers
*/
public static Flux<Integer> publish(int start, int count, int delay) {
return Flux.range(start, count).delayElements(Duration.ofMillis(delay)).log();
}
}
@@ -0,0 +1,64 @@
/*
* This project is licensed under the MIT license. Module model-view-viewmodel is using ZK framework licensed under LGPL (see lgpl-3.0.txt).
*
* The MIT License
* Copyright © 2014-2022 Ilkka Seppälä
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package com.iluwatar.backpressure;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
/** This class is the custom subscriber that subscribes to the data stream. */
@Slf4j
public class Subscriber extends BaseSubscriber<Integer> {
@Override
protected void hookOnSubscribe(@NonNull Subscription subscription) {
request(10); // request 10 items initially
}
@Override
protected void hookOnNext(@NonNull Integer value) {
processItem();
LOGGER.info("process({})", value);
if (value % 5 == 0) {
// request for the next 5 items after processing first 5
request(5);
}
}
@Override
protected void hookOnComplete() {
App.latch.countDown();
}
private void processItem() {
try {
Thread.sleep(500); // simulate slow processing
} catch (InterruptedException e) {
LOGGER.error(e.getMessage(), e);
Thread.currentThread().interrupt();
}
}
}
@@ -0,0 +1,37 @@
/*
* This project is licensed under the MIT license. Module model-view-viewmodel is using ZK framework licensed under LGPL (see lgpl-3.0.txt).
*
* The MIT License
* Copyright © 2014-2022 Ilkka Seppälä
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package com.iluwatar.backpressure;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import org.junit.jupiter.api.Test;
public class AppTest {
@Test
void shouldExecuteApplicationWithoutException() {
assertDoesNotThrow(() -> App.main(new String[] {}));
}
}
@@ -0,0 +1,60 @@
/*
* This project is licensed under the MIT license. Module model-view-viewmodel is using ZK framework licensed under LGPL (see lgpl-3.0.txt).
*
* The MIT License
* Copyright © 2014-2022 Ilkka Seppälä
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package com.iluwatar.backpressure;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;
import java.util.List;
import java.util.stream.Collectors;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.slf4j.LoggerFactory;
public class LoggerExtension implements BeforeEachCallback, AfterEachCallback {
private final ListAppender<ILoggingEvent> listAppender = new ListAppender<>();
private final Logger logger = (Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
@Override
public void afterEach(ExtensionContext extensionContext) {
listAppender.stop();
listAppender.list.clear();
logger.detachAppender(listAppender);
}
@Override
public void beforeEach(ExtensionContext extensionContext) {
logger.addAppender(listAppender);
listAppender.start();
}
public List<String> getFormattedMessages() {
return listAppender.list.stream()
.map(ILoggingEvent::getFormattedMessage)
.collect(Collectors.toList());
}
}
@@ -0,0 +1,51 @@
/*
* This project is licensed under the MIT license. Module model-view-viewmodel is using ZK framework licensed under LGPL (see lgpl-3.0.txt).
*
* The MIT License
* Copyright © 2014-2022 Ilkka Seppälä
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package com.iluwatar.backpressure;
import static com.iluwatar.backpressure.Publisher.publish;
import java.time.Duration;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
public class PublisherTest {
@Test
public void testPublish() {
Flux<Integer> flux = publish(1, 3, 200);
StepVerifier.withVirtualTime(() -> flux)
.expectSubscription()
.expectNoEvent(Duration.ofMillis(200))
.expectNext(1)
.expectNoEvent(Duration.ofSeconds(200))
.expectNext(2)
.expectNoEvent(Duration.ofSeconds(200))
.expectNext(3)
.verifyComplete();
}
}
@@ -0,0 +1,54 @@
/*
* This project is licensed under the MIT license. Module model-view-viewmodel is using ZK framework licensed under LGPL (see lgpl-3.0.txt).
*
* The MIT License
* Copyright © 2014-2022 Ilkka Seppälä
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package com.iluwatar.backpressure;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.concurrent.CountDownLatch;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
public class SubscriberTest {
@RegisterExtension public LoggerExtension loggerExtension = new LoggerExtension();
@Test
public void testSubscribe() throws InterruptedException {
Subscriber sub = new Subscriber();
Publisher.publish(1, 8, 100).subscribe(sub);
App.latch = new CountDownLatch(1);
App.latch.await();
String result = String.join(",", loggerExtension.getFormattedMessages());
assertTrue(
result.endsWith(
"onSubscribe(FluxConcatMapNoPrefetch."
+ "FluxConcatMapNoPrefetchSubscriber),request(10),onNext(1),process(1),onNext(2),"
+ "process(2),onNext(3),process(3),onNext(4),process(4),onNext(5),process(5),request(5),"
+ "onNext(6),process(6),onNext(7),process(7),onNext(8),process(8),onComplete()"));
}
}