From 9e0fed2a88c6dfe71d9bad8c592ef41ff5b2c0e2 Mon Sep 17 00:00:00 2001 From: godotg Date: Mon, 12 Dec 2022 15:59:51 +0800 Subject: [PATCH] del[net]: remove the rarely used load balancing policy shortest-time-consumer --- .../AbstractConsumerLoadBalancer.java | 3 - .../ShortestTimeConsumerLoadBalancer.java | 92 ------------------- .../net/session/manager/ISessionManager.java | 2 +- .../net/session/manager/SessionManager.java | 2 +- .../zfoo/net/session/model/AttributeType.java | 4 +- .../com/zfoo/net/session/model/Session.java | 7 +- .../zfoo/net/core/provider/ProviderTest.java | 24 +---- .../consumer_shortest_time_config.xml | 31 ------- 8 files changed, 10 insertions(+), 155 deletions(-) delete mode 100644 net/src/main/java/com/zfoo/net/consumer/balancer/ShortestTimeConsumerLoadBalancer.java delete mode 100644 net/src/test/resources/provider/consumer_shortest_time_config.xml diff --git a/net/src/main/java/com/zfoo/net/consumer/balancer/AbstractConsumerLoadBalancer.java b/net/src/main/java/com/zfoo/net/consumer/balancer/AbstractConsumerLoadBalancer.java index 399521cf..47727be3 100644 --- a/net/src/main/java/com/zfoo/net/consumer/balancer/AbstractConsumerLoadBalancer.java +++ b/net/src/main/java/com/zfoo/net/consumer/balancer/AbstractConsumerLoadBalancer.java @@ -42,9 +42,6 @@ public abstract class AbstractConsumerLoadBalancer implements IConsumerLoadBalan case "consistent-hash": balancer = ConsistentHashConsumerLoadBalancer.getInstance(); break; - case "shortest-time": - balancer = ShortestTimeConsumerLoadBalancer.getInstance(); - break; default: throw new RuntimeException(StringUtils.format("无法识别负载均衡器[{}]", loadBalancer)); } diff --git a/net/src/main/java/com/zfoo/net/consumer/balancer/ShortestTimeConsumerLoadBalancer.java b/net/src/main/java/com/zfoo/net/consumer/balancer/ShortestTimeConsumerLoadBalancer.java deleted file mode 100644 index c59a9cb0..00000000 --- a/net/src/main/java/com/zfoo/net/consumer/balancer/ShortestTimeConsumerLoadBalancer.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Copyright (C) 2020 The zfoo Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and limitations under the License. - */ - -package com.zfoo.net.consumer.balancer; - -import com.zfoo.net.router.attachment.SignalAttachment; -import com.zfoo.net.session.model.AttributeType; -import com.zfoo.net.session.model.Session; -import com.zfoo.protocol.IPacket; -import com.zfoo.protocol.ProtocolManager; -import com.zfoo.protocol.exception.RunException; -import com.zfoo.scheduler.util.TimeUtils; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -/** - * 最少时间调用负载均衡器,优先选择调用时间最短的session - * - * @author godotg - * @version 3.0 - */ -public class ShortestTimeConsumerLoadBalancer extends AbstractConsumerLoadBalancer { - - private static final ShortestTimeConsumerLoadBalancer INSTANCE = new ShortestTimeConsumerLoadBalancer(); - - private ShortestTimeConsumerLoadBalancer() { - } - - public static ShortestTimeConsumerLoadBalancer getInstance() { - return INSTANCE; - } - - @Override - public Session loadBalancer(IPacket packet, Object argument) { - var module = ProtocolManager.moduleByProtocolId(packet.protocolId()); - var sessions = getSessionsByModule(module); - - if (sessions.isEmpty()) { - throw new RunException("一致性hash负载均衡[protocolId:{}]参数[argument:{}],没有服务提供者提供服务[module:{}]", packet.protocolId(), argument, module); - } - - var sortedSessions = sessions.stream() - .sorted((a, b) -> { - var aMap = (Map) a.getAttribute(AttributeType.RESPONSE_TIME); - var bMap = (Map) b.getAttribute(AttributeType.RESPONSE_TIME); - if (aMap == null) { - return -1; - } else if (bMap == null) { - return 1; - } else { - var aTime = aMap.get(packet.protocolId()); - var bTime = bMap.get(packet.protocolId()); - if (aTime == null) { - return -1; - } else if (bTime == null) { - return 1; - } else { - return (aTime > bTime) ? 1 : -1; - } - } - }).findFirst(); - return sortedSessions.get(); - } - - @Override - public void beforeLoadBalancer(Session session, IPacket packet, SignalAttachment attachment) { - // 因为要通过最短响应时间来路由分发消息,这里使用更精确的时间 - attachment.setTimestamp(TimeUtils.currentTimeMillis()); - } - - @Override - public void afterLoadBalancer(Session session, IPacket packet, SignalAttachment attachment) { - var map = (Map) session.getAttribute(AttributeType.RESPONSE_TIME); - if (map == null) { - map = new ConcurrentHashMap<>(); - session.putAttribute(AttributeType.RESPONSE_TIME, map); - } - map.put(packet.protocolId(), TimeUtils.currentTimeMillis() - attachment.getTimestamp()); - } - -} diff --git a/net/src/main/java/com/zfoo/net/session/manager/ISessionManager.java b/net/src/main/java/com/zfoo/net/session/manager/ISessionManager.java index 762e67a5..905fe5df 100644 --- a/net/src/main/java/com/zfoo/net/session/manager/ISessionManager.java +++ b/net/src/main/java/com/zfoo/net/session/manager/ISessionManager.java @@ -18,7 +18,7 @@ import com.zfoo.net.session.model.Session; import java.util.Map; /** - * @author jaysunxiao + * @author godotg * @version 3.0 */ public interface ISessionManager { diff --git a/net/src/main/java/com/zfoo/net/session/manager/SessionManager.java b/net/src/main/java/com/zfoo/net/session/manager/SessionManager.java index 7032bf73..42f3f785 100644 --- a/net/src/main/java/com/zfoo/net/session/manager/SessionManager.java +++ b/net/src/main/java/com/zfoo/net/session/manager/SessionManager.java @@ -24,7 +24,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** - * @author jaysunxiao + * @author godotg * @version 3.0 */ public class SessionManager implements ISessionManager { diff --git a/net/src/main/java/com/zfoo/net/session/model/AttributeType.java b/net/src/main/java/com/zfoo/net/session/model/AttributeType.java index f8967866..8d495819 100644 --- a/net/src/main/java/com/zfoo/net/session/model/AttributeType.java +++ b/net/src/main/java/com/zfoo/net/session/model/AttributeType.java @@ -14,7 +14,7 @@ package com.zfoo.net.session.model; /** - * @author jaysunxiao + * @author godotg * @version 3.0 */ public enum AttributeType { @@ -25,8 +25,6 @@ public enum AttributeType { */ CONSUMER, - RESPONSE_TIME, - /** * session的uid */ diff --git a/net/src/main/java/com/zfoo/net/session/model/Session.java b/net/src/main/java/com/zfoo/net/session/model/Session.java index 9d0f268c..6356e9ed 100644 --- a/net/src/main/java/com/zfoo/net/session/model/Session.java +++ b/net/src/main/java/com/zfoo/net/session/model/Session.java @@ -13,6 +13,7 @@ package com.zfoo.net.session.model; +import com.zfoo.net.consumer.registry.RegisterVO; import com.zfoo.protocol.util.StringUtils; import io.netty.channel.Channel; @@ -23,7 +24,7 @@ import java.util.Objects; import java.util.concurrent.atomic.AtomicLong; /** - * @author jaysunxiao + * @author godotg * @version 3.0 */ public class Session implements Closeable { @@ -40,6 +41,10 @@ public class Session implements Closeable { /** * Session附带的属性参数 */ + private RegisterVO consumerAttribute = null; + + private long uid = Long.MIN_VALUE; + private Map attributes = new EnumMap<>(AttributeType.class); diff --git a/net/src/test/java/com/zfoo/net/core/provider/ProviderTest.java b/net/src/test/java/com/zfoo/net/core/provider/ProviderTest.java index 1a5b6113..2624f04d 100644 --- a/net/src/test/java/com/zfoo/net/core/provider/ProviderTest.java +++ b/net/src/test/java/com/zfoo/net/core/provider/ProviderTest.java @@ -40,7 +40,7 @@ public class ProviderTest { * RPC教程: * 1.首先必须保证启动zookeeper * 2.启动服务提供者,startProvider0,startProvider1,startProvider2 - * 3.启动服务消费者,startSyncRandomConsumer,startAsyncRandomConsumer,startConsistentSessionConsumer,startShortestTimeConsumer + * 3.启动服务消费者,startSyncRandomConsumer,startAsyncRandomConsumer,startConsistentSessionConsumer * 4.每个消费者都是通过不同的策略消费,注意区别 */ @Test @@ -127,26 +127,4 @@ public class ProviderTest { ThreadUtils.sleep(Long.MAX_VALUE); } - /** - * 最短时间的消费方式 - */ - @Test - public void startShortestTimeConsumer() { - var context = new ClassPathXmlApplicationContext("provider/consumer_shortest_time_config.xml"); - SessionUtils.printSessionInfo(); - - var ask = new ProviderMessAsk(); - ask.setMessage("Hello, this is the consumer!"); - var atomicInteger = new AtomicInteger(0); - - for (int i = 0; i < 1000; i++) { - ThreadUtils.sleep(3000); - NetContext.getConsumer().asyncAsk(ask, ProviderMessAnswer.class, null).whenComplete(answer -> { - logger.info("消费者请求[{}]收到消息[{}]", atomicInteger.incrementAndGet(), JsonUtils.object2String(answer)); - }); - } - - ThreadUtils.sleep(Long.MAX_VALUE); - } - } diff --git a/net/src/test/resources/provider/consumer_shortest_time_config.xml b/net/src/test/resources/provider/consumer_shortest_time_config.xml deleted file mode 100644 index 6894c171..00000000 --- a/net/src/test/resources/provider/consumer_shortest_time_config.xml +++ /dev/null @@ -1,31 +0,0 @@ - - - - - - - - - - - - - - - - - -