mirror of
https://github.com/tiennm99/zfoo.git
synced 2026-05-19 13:27:10 +00:00
fix[zk]: zooKeeper reconnects without a registered consumer
This commit is contained in:
@@ -200,9 +200,6 @@ public class ZookeeperRegistry implements IRegistry {
|
||||
case RECONNECTED:
|
||||
// 检查3个持久化节点,不存在就创建
|
||||
createZookeeperRootPath();
|
||||
// 如果自己是服务提供者,则注册自己
|
||||
// 如果自己是消费者,则创建连接到所有的自己关心的服务提供者
|
||||
initZookeeper();
|
||||
break;
|
||||
|
||||
default:
|
||||
@@ -223,73 +220,80 @@ public class ZookeeperRegistry implements IRegistry {
|
||||
* 检查 /zfoo /zfoo/provider /zfoo/consumer 这3个“持久化”节点,不存在就创建
|
||||
*/
|
||||
private void createZookeeperRootPath() {
|
||||
try {
|
||||
// /zfoo
|
||||
// 创建zookeeper的根路径
|
||||
var rootStat = curator.checkExists().forPath(ROOT_PATH);
|
||||
// 根节点不存在
|
||||
if (Objects.isNull(rootStat)) {
|
||||
var registryConfig = NetContext.getConfigManager().getLocalConfig().getRegistry();
|
||||
var builder = curator.create();
|
||||
builder.creatingParentsIfNeeded();
|
||||
// 检查zk连接授权
|
||||
if (registryConfig.hasZookeeperAuthor()) {
|
||||
var zookeeperAuthorStr = registryConfig.toZookeeperAuthor();
|
||||
var aclList = List.of(new ACL(ZooDefs.Perms.ALL, new Id("digest", DigestAuthenticationProvider.generateDigest(zookeeperAuthorStr))));
|
||||
builder.withACL(aclList);
|
||||
}
|
||||
// 根节点是持久化节点
|
||||
builder.withMode(CreateMode.PERSISTENT);
|
||||
// 真正创建根节点
|
||||
builder.forPath(ROOT_PATH, StringUtils.bytes(registryConfig.getCenter()));
|
||||
} else {
|
||||
var registryConfig = NetContext.getConfigManager().getLocalConfig().getRegistry();
|
||||
// 读取根节点上的数据
|
||||
var bytes = curator.getData().storingStatIn(new Stat()).forPath(ROOT_PATH);
|
||||
// 把根节点数据从二进制转string字符串
|
||||
var rootPathData = StringUtils.bytesToString(bytes);
|
||||
|
||||
// 检查zookeeper根节点的内容
|
||||
if (!rootPathData.equals(registryConfig.getCenter())) {
|
||||
throw new RuntimeException(StringUtils.format("zookeeper rootPath[{}] misconfigured [{}],expected [{}], check the relevant nodes and restart", ROOT_PATH, rootPathData, registryConfig.getCenter()));
|
||||
}
|
||||
|
||||
// 检查zookeeper根节点的权限
|
||||
if (registryConfig.hasZookeeperAuthor()) {
|
||||
try {
|
||||
var providerRootPathAclList = curator.getACL().forPath(ROOT_PATH);
|
||||
AssertionUtils.notEmpty(providerRootPathAclList);
|
||||
AssertionUtils.isTrue(providerRootPathAclList.size() == 1);
|
||||
executor.execute(() -> {
|
||||
try {
|
||||
// /zfoo
|
||||
// 创建zookeeper的根路径
|
||||
var rootStat = curator.checkExists().forPath(ROOT_PATH);
|
||||
// 根节点不存在
|
||||
if (Objects.isNull(rootStat)) {
|
||||
var registryConfig = NetContext.getConfigManager().getLocalConfig().getRegistry();
|
||||
var builder = curator.create();
|
||||
builder.creatingParentsIfNeeded();
|
||||
// 检查zk连接授权
|
||||
if (registryConfig.hasZookeeperAuthor()) {
|
||||
var zookeeperAuthorStr = registryConfig.toZookeeperAuthor();
|
||||
var aclList = List.of(new ACL(ZooDefs.Perms.ALL, new Id("digest", DigestAuthenticationProvider.generateDigest(zookeeperAuthorStr))));
|
||||
AssertionUtils.isTrue(providerRootPathAclList.get(0).equals(aclList.get(0)));
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(StringUtils.format("zookeeper rootPath[{}] permissions are misconfigured [{}]", ROOT_PATH, ExceptionUtils.getMessage(e)));
|
||||
builder.withACL(aclList);
|
||||
}
|
||||
// 根节点是持久化节点
|
||||
builder.withMode(CreateMode.PERSISTENT);
|
||||
// 真正创建根节点
|
||||
builder.forPath(ROOT_PATH, StringUtils.bytes(registryConfig.getCenter()));
|
||||
} else {
|
||||
var registryConfig = NetContext.getConfigManager().getLocalConfig().getRegistry();
|
||||
// 读取根节点上的数据
|
||||
var bytes = curator.getData().storingStatIn(new Stat()).forPath(ROOT_PATH);
|
||||
// 把根节点数据从二进制转string字符串
|
||||
var rootPathData = StringUtils.bytesToString(bytes);
|
||||
|
||||
// 检查zookeeper根节点的内容
|
||||
if (!rootPathData.equals(registryConfig.getCenter())) {
|
||||
throw new RuntimeException(StringUtils.format("zookeeper rootPath[{}] misconfigured [{}],expected [{}], check the relevant nodes and restart", ROOT_PATH, rootPathData, registryConfig.getCenter()));
|
||||
}
|
||||
|
||||
// 检查zookeeper根节点的权限
|
||||
if (registryConfig.hasZookeeperAuthor()) {
|
||||
try {
|
||||
var providerRootPathAclList = curator.getACL().forPath(ROOT_PATH);
|
||||
AssertionUtils.notEmpty(providerRootPathAclList);
|
||||
AssertionUtils.isTrue(providerRootPathAclList.size() == 1);
|
||||
var zookeeperAuthorStr = registryConfig.toZookeeperAuthor();
|
||||
var aclList = List.of(new ACL(ZooDefs.Perms.ALL, new Id("digest", DigestAuthenticationProvider.generateDigest(zookeeperAuthorStr))));
|
||||
AssertionUtils.isTrue(providerRootPathAclList.get(0).equals(aclList.get(0)));
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(StringUtils.format("zookeeper rootPath[{}] permissions are misconfigured [{}]", ROOT_PATH, ExceptionUtils.getMessage(e)));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
// /zfoo/provider
|
||||
// 检查服务提供者节点,不存在则创建
|
||||
var providerStat = curator.checkExists().forPath(PROVIDER_ROOT_PATH);
|
||||
if (Objects.isNull(providerStat)) {
|
||||
curator.create()
|
||||
.withMode(CreateMode.PERSISTENT)
|
||||
.forPath(PROVIDER_ROOT_PATH, ArrayUtils.EMPTY_BYTE_ARRAY);
|
||||
}
|
||||
|
||||
// /zfoo/provider
|
||||
// 检查服务提供者节点,不存在则创建
|
||||
var providerStat = curator.checkExists().forPath(PROVIDER_ROOT_PATH);
|
||||
if (Objects.isNull(providerStat)) {
|
||||
curator.create()
|
||||
.withMode(CreateMode.PERSISTENT)
|
||||
.forPath(PROVIDER_ROOT_PATH, ArrayUtils.EMPTY_BYTE_ARRAY);
|
||||
}
|
||||
// /zfoo/consumer
|
||||
// 检查消费者节点,不存在则创建
|
||||
var consumerStat = curator.checkExists().forPath(CONSUMER_ROOT_PATH);
|
||||
if (Objects.isNull(consumerStat)) {
|
||||
curator.create()
|
||||
.withMode(CreateMode.PERSISTENT)
|
||||
.forPath(CONSUMER_ROOT_PATH, ArrayUtils.EMPTY_BYTE_ARRAY);
|
||||
}
|
||||
|
||||
// /zfoo/consumer
|
||||
// 检查消费者节点,不存在则创建
|
||||
var consumerStat = curator.checkExists().forPath(CONSUMER_ROOT_PATH);
|
||||
if (Objects.isNull(consumerStat)) {
|
||||
curator.create()
|
||||
.withMode(CreateMode.PERSISTENT)
|
||||
.forPath(CONSUMER_ROOT_PATH, ArrayUtils.EMPTY_BYTE_ARRAY);
|
||||
// 如果自己是服务提供者,则注册自己
|
||||
// 如果自己是消费者,则创建连接到所有的自己关心的服务提供者
|
||||
initZookeeper();
|
||||
} catch (Throwable t) {
|
||||
logger.error("Zookeeper failed to create zookeeper root path, wait [{}] seconds to recreate", RETRY_SECONDS, t);
|
||||
SchedulerBus.schedule(() -> createZookeeperRootPath(), RETRY_SECONDS, TimeUnit.SECONDS);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void startProviderCache() {
|
||||
@@ -352,9 +356,8 @@ public class ZookeeperRegistry implements IRegistry {
|
||||
|
||||
// 自己是消费者,则连接所有自己关心的服务提供者
|
||||
initConsumerCache();
|
||||
} catch (Exception e) {
|
||||
//
|
||||
logger.error("Zookeeper failed to initialize, wait [{}] seconds to reinitialize", RETRY_SECONDS, e);
|
||||
} catch (Throwable t) {
|
||||
logger.error("Zookeeper failed to initialize, wait [{}] seconds to reinitialize", RETRY_SECONDS, t);
|
||||
SchedulerBus.schedule(() -> initZookeeper(), RETRY_SECONDS, TimeUnit.SECONDS);
|
||||
}
|
||||
});
|
||||
@@ -490,23 +493,21 @@ public class ZookeeperRegistry implements IRegistry {
|
||||
session.setConsumerRegister(providerCache);
|
||||
logger.info("Consumer starts consuming the provider:[{}]", providerCache);
|
||||
EventBus.post(ConsumerStartEvent.valueOf(providerCache, session));
|
||||
// 将自己的消费者消息写到 /consumer 的临时节点下
|
||||
updateConsumerData();
|
||||
} catch (Throwable t) {
|
||||
logger.error("[consumer:{}] failed to start, wait [{}] seconds to recheck consumer", providerCache, RETRY_SECONDS, t);
|
||||
recheckFlag = true;
|
||||
}
|
||||
}
|
||||
|
||||
// 将自己的消费者消息写到 /consumer 的临时节点下
|
||||
updateConsumerData();
|
||||
|
||||
if (recheckFlag) {
|
||||
SchedulerBus.schedule(() -> checkConsumer(), RETRY_SECONDS, TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
private void updateConsumerData() {
|
||||
// 将自己的消费者消息写到 /consumer 的临时节点下
|
||||
var localRegisterVO = NetContext.getConfigManager().getLocalConfig().toLocalRegister();
|
||||
var path = CONSUMER_ROOT_PATH + StringUtils.SLASH + localRegisterVO.toConsumerString();
|
||||
var list = new ArrayList<String>();
|
||||
NetContext.getSessionManager().forEachClientSession(session -> {
|
||||
var consumerAttribute = session.getConsumerRegister();
|
||||
@@ -519,6 +520,14 @@ public class ZookeeperRegistry implements IRegistry {
|
||||
}
|
||||
list.add(consumerAttribute.toProviderSimple());
|
||||
});
|
||||
|
||||
if (CollectionUtils.isEmpty(list)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 将自己的消费者消息写到 /consumer 的临时节点下
|
||||
var localRegisterVO = NetContext.getConfigManager().getLocalConfig().toLocalRegister();
|
||||
var path = CONSUMER_ROOT_PATH + StringUtils.SLASH + localRegisterVO.toConsumerString();
|
||||
addData(path, StringUtils.bytes(JsonUtils.object2String(list)), CreateMode.EPHEMERAL);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user