diff --git a/net/src/main/java/com/zfoo/net/config/manager/ConfigManager.java b/net/src/main/java/com/zfoo/net/config/manager/ConfigManager.java index c03ec864..24139ff2 100644 --- a/net/src/main/java/com/zfoo/net/config/manager/ConfigManager.java +++ b/net/src/main/java/com/zfoo/net/config/manager/ConfigManager.java @@ -70,6 +70,7 @@ public class ConfigManager implements IConfigManager { for (var providerModule : providerConfig.getModules()) { var module = ProtocolManager.moduleByModuleName(providerModule.getName()); AssertionUtils.isTrue(module != null, "服务提供者[name:{}]在协议文件中不存在", providerModule.getName()); + module.setGroup(providerModule.getGroup()); providerModules.add(module); } providerConfig.setModules(providerModules); @@ -82,6 +83,7 @@ public class ConfigManager implements IConfigManager { for (var providerModule : consumerConfig.getModules()) { var module = ProtocolManager.moduleByModuleName(providerModule.getName()); AssertionUtils.isTrue(module != null, "消费者[name:{}]在协议文件中不存在", providerModule.getName()); + module.setGroup(providerModule.getGroup()); consumerModules.add(module); } consumerConfig.setModules(consumerModules); diff --git a/net/src/main/java/com/zfoo/net/consumer/balancer/AbstractConsumerLoadBalancer.java b/net/src/main/java/com/zfoo/net/consumer/balancer/AbstractConsumerLoadBalancer.java index d467fc66..aa1096f7 100644 --- a/net/src/main/java/com/zfoo/net/consumer/balancer/AbstractConsumerLoadBalancer.java +++ b/net/src/main/java/com/zfoo/net/consumer/balancer/AbstractConsumerLoadBalancer.java @@ -44,6 +44,9 @@ public abstract class AbstractConsumerLoadBalancer implements IConsumerLoadBalan case "shortest-time": balancer = ShortestTimeConsumerLoadBalancer.getInstance(); break; + case "fixed": + balancer = FixedConsumerLoadBalancer.getInstance(); + break; default: throw new RuntimeException(StringUtils.format("无法识别负载均衡器[{}]", loadBalancer)); } diff --git a/net/src/main/java/com/zfoo/net/consumer/balancer/FixedConsumerLoadBalancer.java b/net/src/main/java/com/zfoo/net/consumer/balancer/FixedConsumerLoadBalancer.java new file mode 100644 index 00000000..32992970 --- /dev/null +++ b/net/src/main/java/com/zfoo/net/consumer/balancer/FixedConsumerLoadBalancer.java @@ -0,0 +1,61 @@ +/* + * Copyright (C) 2020 The zfoo Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + */ + +package com.zfoo.net.consumer.balancer; + +import com.zfoo.net.consumer.registry.RegisterVO; +import com.zfoo.net.session.model.AttributeType; +import com.zfoo.net.session.model.Session; +import com.zfoo.protocol.IPacket; +import com.zfoo.protocol.ProtocolManager; +import com.zfoo.protocol.exception.RunException; + +/** + * 根据grouId获取固定服务器 + * + * @author jaysunxiao + * @version 3.0 + */ +public class FixedConsumerLoadBalancer extends AbstractConsumerLoadBalancer { + + private static final FixedConsumerLoadBalancer INSTANCE = new FixedConsumerLoadBalancer(); + + private FixedConsumerLoadBalancer() { + } + + public static FixedConsumerLoadBalancer getInstance() { + return INSTANCE; + } + + @Override + public Session loadBalancer(IPacket packet, Object argument) { + var module = ProtocolManager.moduleByProtocolId(packet.protocolId()); + var sessions = getSessionsByModule(module); + + if (sessions.isEmpty()) { + throw new RunException("获取固定服务器失败[protocolId:{}]参数[argument:{}],没有服务提供者提供服务[module:{}]", packet.protocolId(), argument, module); + } + + int group = Integer.valueOf(argument.toString()); + for (var session : sessions) { + var registerVO = (RegisterVO)session.getAttribute(AttributeType.CONSUMER); + var isPresent = registerVO.getProviderConfig().getModules().stream().filter(it -> it.getName().equals(module.getName()) && it.getGroup() == group).findAny().isPresent(); + if (!isPresent) { + continue; + } + return session; + } + throw new RunException("一获取固定服务器失败[protocolId:{}]参数[argument:{}],没有服务提供者提供服务[module:{}]", packet.protocolId(), argument, module); + } + +} diff --git a/net/src/main/java/com/zfoo/net/consumer/registry/RegisterVO.java b/net/src/main/java/com/zfoo/net/consumer/registry/RegisterVO.java index 6f58c999..3edb8d44 100644 --- a/net/src/main/java/com/zfoo/net/consumer/registry/RegisterVO.java +++ b/net/src/main/java/com/zfoo/net/consumer/registry/RegisterVO.java @@ -99,7 +99,7 @@ public class RegisterVO { var modules = Arrays.stream(moduleSplits) .map(it -> it.trim()) .map(it -> it.split(StringUtils.HYPHEN)) - .map(it -> new ProtocolModule(Byte.parseByte(it[0]), it[1], it[2])) + .map(it -> new ProtocolModule(Byte.parseByte(it[0]), it[1], it[2], Integer.parseInt(it[3]))) .collect(Collectors.toList()); return modules; } @@ -129,7 +129,7 @@ public class RegisterVO { builder.append(StringUtils.SPACE).append(StringUtils.VERTICAL_BAR).append(StringUtils.SPACE); var providerModules = providerConfig.getModules().stream() - .map(it -> StringUtils.joinWith(StringUtils.HYPHEN, it.getId(), it.getName(), ProtocolModule.versionNumToStr(it.getVersion()))) + .map(it -> joinWith(StringUtils.HYPHEN, it)) .collect(Collectors.toList()); builder.append(StringUtils.format("provider:[{}]" , StringUtils.joinWith(StringUtils.COMMA + StringUtils.SPACE, providerModules.toArray()))); @@ -139,7 +139,7 @@ public class RegisterVO { builder.append(StringUtils.SPACE).append(StringUtils.VERTICAL_BAR).append(StringUtils.SPACE); var consumerModules = consumerConfig.getModules().stream() - .map(it -> StringUtils.joinWith(StringUtils.HYPHEN, it.getId(), it.getName(), ProtocolModule.versionNumToStr(it.getVersion()))) + .map(it -> joinWith(StringUtils.HYPHEN, it)) .collect(Collectors.toList()); builder.append(StringUtils.format("consumer:[{}]" , StringUtils.joinWith(StringUtils.COMMA + StringUtils.SPACE, consumerModules.toArray()))); @@ -148,6 +148,9 @@ public class RegisterVO { return builder.toString(); } + public String joinWith(String sep, ProtocolModule module) { + return StringUtils.joinWith(sep, module.getId(), module.getName(), ProtocolModule.versionNumToStr(module.getVersion()), module.getGroup()); + } public String getId() { return id; diff --git a/net/src/main/java/com/zfoo/net/schema/NetDefinitionParser.java b/net/src/main/java/com/zfoo/net/schema/NetDefinitionParser.java index 1d541a60..2ae74a03 100644 --- a/net/src/main/java/com/zfoo/net/schema/NetDefinitionParser.java +++ b/net/src/main/java/com/zfoo/net/schema/NetDefinitionParser.java @@ -185,6 +185,7 @@ public class NetDefinitionParser implements BeanDefinitionParser { var builder = BeanDefinitionBuilder.rootBeanDefinition(clazz); builder.addConstructorArgValue(environment.resolvePlaceholders(addressElement.getAttribute("name"))); + builder.addConstructorArgValue(environment.resolvePlaceholders(addressElement.getAttribute("group"))); modules.add(new BeanDefinitionHolder(builder.getBeanDefinition(), StringUtils.format("{}.{}{}", clazz.getCanonicalName(), param, i))); } diff --git a/net/src/main/resources/net-1.0.xsd b/net/src/main/resources/net-1.0.xsd index be4352e1..6bded9d0 100644 --- a/net/src/main/resources/net-1.0.xsd +++ b/net/src/main/resources/net-1.0.xsd @@ -46,6 +46,7 @@ + diff --git a/protocol/src/main/java/com/zfoo/protocol/registration/ProtocolModule.java b/protocol/src/main/java/com/zfoo/protocol/registration/ProtocolModule.java index d67e4da9..311e4693 100644 --- a/protocol/src/main/java/com/zfoo/protocol/registration/ProtocolModule.java +++ b/protocol/src/main/java/com/zfoo/protocol/registration/ProtocolModule.java @@ -32,6 +32,8 @@ public class ProtocolModule { */ private int version; + private int group; + private transient int hash; @@ -46,8 +48,17 @@ public class ProtocolModule { this.perfectHash(); } - public ProtocolModule(String name) { + public ProtocolModule(byte id, String name, String version, int group) { + this.id = id; this.name = name; + this.version = versionStrToNum(version); + this.group = group; + this.perfectHash(); + } + + public ProtocolModule(String name, String group) { + this.name = name; + this.group = Integer.parseInt(group); } public static void assertVersion(String version) { @@ -103,6 +114,14 @@ public class ProtocolModule { this.version = version; } + public int getGroup() { + return group; + } + + public void setGroup(int group) { + this.group = group; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -112,7 +131,10 @@ public class ProtocolModule { return false; } ProtocolModule module = (ProtocolModule) o; - return id == module.id && version == module.version; + if (group == 0 || module.group == 0) { + return id == module.id && version == module.version; + } + return id == module.id && version == module.version && group == module.group; } @Override @@ -122,6 +144,6 @@ public class ProtocolModule { @Override public String toString() { - return StringUtils.format("[id:{}][name:{}][version:{}][hash:{}]", id, name, version, hash); + return StringUtils.format("[id:{}][name:{}][version:{}][group:{}][hash:{}]", id, name, version, group, hash); } }