del[net]: remove the rarely used load balancing policy shortest-time-consumer

This commit is contained in:
godotg
2022-12-12 15:59:51 +08:00
parent 04ff16027f
commit 9e0fed2a88
8 changed files with 10 additions and 155 deletions
@@ -42,9 +42,6 @@ public abstract class AbstractConsumerLoadBalancer implements IConsumerLoadBalan
case "consistent-hash":
balancer = ConsistentHashConsumerLoadBalancer.getInstance();
break;
case "shortest-time":
balancer = ShortestTimeConsumerLoadBalancer.getInstance();
break;
default:
throw new RuntimeException(StringUtils.format("无法识别负载均衡器[{}]", loadBalancer));
}
@@ -1,92 +0,0 @@
/*
* Copyright (C) 2020 The zfoo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
*/
package com.zfoo.net.consumer.balancer;
import com.zfoo.net.router.attachment.SignalAttachment;
import com.zfoo.net.session.model.AttributeType;
import com.zfoo.net.session.model.Session;
import com.zfoo.protocol.IPacket;
import com.zfoo.protocol.ProtocolManager;
import com.zfoo.protocol.exception.RunException;
import com.zfoo.scheduler.util.TimeUtils;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 最少时间调用负载均衡器,优先选择调用时间最短的session
*
* @author godotg
* @version 3.0
*/
public class ShortestTimeConsumerLoadBalancer extends AbstractConsumerLoadBalancer {
private static final ShortestTimeConsumerLoadBalancer INSTANCE = new ShortestTimeConsumerLoadBalancer();
private ShortestTimeConsumerLoadBalancer() {
}
public static ShortestTimeConsumerLoadBalancer getInstance() {
return INSTANCE;
}
@Override
public Session loadBalancer(IPacket packet, Object argument) {
var module = ProtocolManager.moduleByProtocolId(packet.protocolId());
var sessions = getSessionsByModule(module);
if (sessions.isEmpty()) {
throw new RunException("一致性hash负载均衡[protocolId:{}]参数[argument:{}],没有服务提供者提供服务[module:{}]", packet.protocolId(), argument, module);
}
var sortedSessions = sessions.stream()
.sorted((a, b) -> {
var aMap = (Map<Short, Long>) a.getAttribute(AttributeType.RESPONSE_TIME);
var bMap = (Map<Short, Long>) b.getAttribute(AttributeType.RESPONSE_TIME);
if (aMap == null) {
return -1;
} else if (bMap == null) {
return 1;
} else {
var aTime = aMap.get(packet.protocolId());
var bTime = bMap.get(packet.protocolId());
if (aTime == null) {
return -1;
} else if (bTime == null) {
return 1;
} else {
return (aTime > bTime) ? 1 : -1;
}
}
}).findFirst();
return sortedSessions.get();
}
@Override
public void beforeLoadBalancer(Session session, IPacket packet, SignalAttachment attachment) {
// 因为要通过最短响应时间来路由分发消息,这里使用更精确的时间
attachment.setTimestamp(TimeUtils.currentTimeMillis());
}
@Override
public void afterLoadBalancer(Session session, IPacket packet, SignalAttachment attachment) {
var map = (Map<Short, Long>) session.getAttribute(AttributeType.RESPONSE_TIME);
if (map == null) {
map = new ConcurrentHashMap<>();
session.putAttribute(AttributeType.RESPONSE_TIME, map);
}
map.put(packet.protocolId(), TimeUtils.currentTimeMillis() - attachment.getTimestamp());
}
}
@@ -18,7 +18,7 @@ import com.zfoo.net.session.model.Session;
import java.util.Map;
/**
* @author jaysunxiao
* @author godotg
* @version 3.0
*/
public interface ISessionManager {
@@ -24,7 +24,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author jaysunxiao
* @author godotg
* @version 3.0
*/
public class SessionManager implements ISessionManager {
@@ -14,7 +14,7 @@
package com.zfoo.net.session.model;
/**
* @author jaysunxiao
* @author godotg
* @version 3.0
*/
public enum AttributeType {
@@ -25,8 +25,6 @@ public enum AttributeType {
*/
CONSUMER,
RESPONSE_TIME,
/**
* session的uid
*/
@@ -13,6 +13,7 @@
package com.zfoo.net.session.model;
import com.zfoo.net.consumer.registry.RegisterVO;
import com.zfoo.protocol.util.StringUtils;
import io.netty.channel.Channel;
@@ -23,7 +24,7 @@ import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
/**
* @author jaysunxiao
* @author godotg
* @version 3.0
*/
public class Session implements Closeable {
@@ -40,6 +41,10 @@ public class Session implements Closeable {
/**
* Session附带的属性参数
*/
private RegisterVO consumerAttribute = null;
private long uid = Long.MIN_VALUE;
private Map<AttributeType, Object> attributes = new EnumMap<>(AttributeType.class);
@@ -40,7 +40,7 @@ public class ProviderTest {
* RPC教程:
* 1.首先必须保证启动zookeeper
* 2.启动服务提供者,startProvider0startProvider1startProvider2
* 3.启动服务消费者,startSyncRandomConsumerstartAsyncRandomConsumerstartConsistentSessionConsumerstartShortestTimeConsumer
* 3.启动服务消费者,startSyncRandomConsumerstartAsyncRandomConsumerstartConsistentSessionConsumer
* 4.每个消费者都是通过不同的策略消费,注意区别
*/
@Test
@@ -127,26 +127,4 @@ public class ProviderTest {
ThreadUtils.sleep(Long.MAX_VALUE);
}
/**
* 最短时间的消费方式
*/
@Test
public void startShortestTimeConsumer() {
var context = new ClassPathXmlApplicationContext("provider/consumer_shortest_time_config.xml");
SessionUtils.printSessionInfo();
var ask = new ProviderMessAsk();
ask.setMessage("Hello, this is the consumer!");
var atomicInteger = new AtomicInteger(0);
for (int i = 0; i < 1000; i++) {
ThreadUtils.sleep(3000);
NetContext.getConsumer().asyncAsk(ask, ProviderMessAnswer.class, null).whenComplete(answer -> {
logger.info("消费者请求[{}]收到消息[{}]", atomicInteger.incrementAndGet(), JsonUtils.object2String(answer));
});
}
ThreadUtils.sleep(Long.MAX_VALUE);
}
}
@@ -1,31 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:net="http://www.zfoo.com/schema/net"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.zfoo.com/schema/net
http://www.zfoo.com/schema/net-1.0.xsd">
<context:property-placeholder location="classpath:deploy-dev.properties"/>
<context:component-scan base-package="com.zfoo"/>
<net:config id="applicationNameTest" protocol-location="protocol.xml">
<net:registry center="${registry.center}" user="${registry.user}" password="${registry.password}">
<net:address name="${registry.address.name}" url="${registry.address.url}"/>
</net:registry>
<net:consumers>
<net:consumer protocol-module="providerTest" load-balancer="shortest-time" consumer="myProvider1"/>
</net:consumers>
</net:config>
</beans>