添加服务器分组功能

This commit is contained in:
DESKTOP-RBQROQC\SHICHANG11
2022-06-29 12:07:14 +08:00
parent 2210763f09
commit 3a42fdcf0b
7 changed files with 99 additions and 6 deletions
@@ -70,6 +70,7 @@ public class ConfigManager implements IConfigManager {
for (var providerModule : providerConfig.getModules()) {
var module = ProtocolManager.moduleByModuleName(providerModule.getName());
AssertionUtils.isTrue(module != null, "服务提供者[name:{}]在协议文件中不存在", providerModule.getName());
module.setGroup(providerModule.getGroup());
providerModules.add(module);
}
providerConfig.setModules(providerModules);
@@ -82,6 +83,7 @@ public class ConfigManager implements IConfigManager {
for (var providerModule : consumerConfig.getModules()) {
var module = ProtocolManager.moduleByModuleName(providerModule.getName());
AssertionUtils.isTrue(module != null, "消费者[name:{}]在协议文件中不存在", providerModule.getName());
module.setGroup(providerModule.getGroup());
consumerModules.add(module);
}
consumerConfig.setModules(consumerModules);
@@ -44,6 +44,9 @@ public abstract class AbstractConsumerLoadBalancer implements IConsumerLoadBalan
case "shortest-time":
balancer = ShortestTimeConsumerLoadBalancer.getInstance();
break;
case "fixed":
balancer = FixedConsumerLoadBalancer.getInstance();
break;
default:
throw new RuntimeException(StringUtils.format("无法识别负载均衡器[{}]", loadBalancer));
}
@@ -0,0 +1,61 @@
/*
* Copyright (C) 2020 The zfoo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
*/
package com.zfoo.net.consumer.balancer;
import com.zfoo.net.consumer.registry.RegisterVO;
import com.zfoo.net.session.model.AttributeType;
import com.zfoo.net.session.model.Session;
import com.zfoo.protocol.IPacket;
import com.zfoo.protocol.ProtocolManager;
import com.zfoo.protocol.exception.RunException;
/**
* 根据grouId获取固定服务器
*
* @author jaysunxiao
* @version 3.0
*/
public class FixedConsumerLoadBalancer extends AbstractConsumerLoadBalancer {
private static final FixedConsumerLoadBalancer INSTANCE = new FixedConsumerLoadBalancer();
private FixedConsumerLoadBalancer() {
}
public static FixedConsumerLoadBalancer getInstance() {
return INSTANCE;
}
@Override
public Session loadBalancer(IPacket packet, Object argument) {
var module = ProtocolManager.moduleByProtocolId(packet.protocolId());
var sessions = getSessionsByModule(module);
if (sessions.isEmpty()) {
throw new RunException("获取固定服务器失败[protocolId:{}]参数[argument:{}],没有服务提供者提供服务[module:{}]", packet.protocolId(), argument, module);
}
int group = Integer.valueOf(argument.toString());
for (var session : sessions) {
var registerVO = (RegisterVO)session.getAttribute(AttributeType.CONSUMER);
var isPresent = registerVO.getProviderConfig().getModules().stream().filter(it -> it.getName().equals(module.getName()) && it.getGroup() == group).findAny().isPresent();
if (!isPresent) {
continue;
}
return session;
}
throw new RunException("一获取固定服务器失败[protocolId:{}]参数[argument:{}],没有服务提供者提供服务[module:{}]", packet.protocolId(), argument, module);
}
}
@@ -99,7 +99,7 @@ public class RegisterVO {
var modules = Arrays.stream(moduleSplits)
.map(it -> it.trim())
.map(it -> it.split(StringUtils.HYPHEN))
.map(it -> new ProtocolModule(Byte.parseByte(it[0]), it[1], it[2]))
.map(it -> new ProtocolModule(Byte.parseByte(it[0]), it[1], it[2], Integer.parseInt(it[3])))
.collect(Collectors.toList());
return modules;
}
@@ -129,7 +129,7 @@ public class RegisterVO {
builder.append(StringUtils.SPACE).append(StringUtils.VERTICAL_BAR).append(StringUtils.SPACE);
var providerModules = providerConfig.getModules().stream()
.map(it -> StringUtils.joinWith(StringUtils.HYPHEN, it.getId(), it.getName(), ProtocolModule.versionNumToStr(it.getVersion())))
.map(it -> joinWith(StringUtils.HYPHEN, it))
.collect(Collectors.toList());
builder.append(StringUtils.format("provider:[{}]"
, StringUtils.joinWith(StringUtils.COMMA + StringUtils.SPACE, providerModules.toArray())));
@@ -139,7 +139,7 @@ public class RegisterVO {
builder.append(StringUtils.SPACE).append(StringUtils.VERTICAL_BAR).append(StringUtils.SPACE);
var consumerModules = consumerConfig.getModules().stream()
.map(it -> StringUtils.joinWith(StringUtils.HYPHEN, it.getId(), it.getName(), ProtocolModule.versionNumToStr(it.getVersion())))
.map(it -> joinWith(StringUtils.HYPHEN, it))
.collect(Collectors.toList());
builder.append(StringUtils.format("consumer:[{}]"
, StringUtils.joinWith(StringUtils.COMMA + StringUtils.SPACE, consumerModules.toArray())));
@@ -148,6 +148,9 @@ public class RegisterVO {
return builder.toString();
}
public String joinWith(String sep, ProtocolModule module) {
return StringUtils.joinWith(sep, module.getId(), module.getName(), ProtocolModule.versionNumToStr(module.getVersion()), module.getGroup());
}
public String getId() {
return id;
@@ -185,6 +185,7 @@ public class NetDefinitionParser implements BeanDefinitionParser {
var builder = BeanDefinitionBuilder.rootBeanDefinition(clazz);
builder.addConstructorArgValue(environment.resolvePlaceholders(addressElement.getAttribute("name")));
builder.addConstructorArgValue(environment.resolvePlaceholders(addressElement.getAttribute("group")));
modules.add(new BeanDefinitionHolder(builder.getBeanDefinition(), StringUtils.format("{}.{}{}", clazz.getCanonicalName(), param, i)));
}
+1
View File
@@ -46,6 +46,7 @@
<xsd:complexType name="moduleAttributeType">
<xsd:attribute name="name" type="xsd:string" use="required"/>
<xsd:attribute name="group" type="xsd:string" default="0"/>
</xsd:complexType>
@@ -32,6 +32,8 @@ public class ProtocolModule {
*/
private int version;
private int group;
private transient int hash;
@@ -46,8 +48,17 @@ public class ProtocolModule {
this.perfectHash();
}
public ProtocolModule(String name) {
public ProtocolModule(byte id, String name, String version, int group) {
this.id = id;
this.name = name;
this.version = versionStrToNum(version);
this.group = group;
this.perfectHash();
}
public ProtocolModule(String name, String group) {
this.name = name;
this.group = Integer.parseInt(group);
}
public static void assertVersion(String version) {
@@ -103,6 +114,14 @@ public class ProtocolModule {
this.version = version;
}
public int getGroup() {
return group;
}
public void setGroup(int group) {
this.group = group;
}
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -112,7 +131,10 @@ public class ProtocolModule {
return false;
}
ProtocolModule module = (ProtocolModule) o;
return id == module.id && version == module.version;
if (group == 0 || module.group == 0) {
return id == module.id && version == module.version;
}
return id == module.id && version == module.version && group == module.group;
}
@Override
@@ -122,6 +144,6 @@ public class ProtocolModule {
@Override
public String toString() {
return StringUtils.format("[id:{}][name:{}][version:{}][hash:{}]", id, name, version, hash);
return StringUtils.format("[id:{}][name:{}][version:{}][group:{}][hash:{}]", id, name, version, group, hash);
}
}