From 28d2ac6078d2fc723faed7a69b2db39ac4f609f0 Mon Sep 17 00:00:00 2001 From: godotg Date: Mon, 1 Jul 2024 12:24:25 +0800 Subject: [PATCH] ref[version]: The gvs() and svs() methods have been removed from the orm and replaced with @Version annotations --- .../main/java/com/zfoo/orm/anno/Version.java | 29 +++++++++++ .../java/com/zfoo/orm/cache/EntityCache.java | 48 +++++++++++++----- .../zfoo/orm/cache/version/CacheVersion.java | 50 +++++++++++++++++++ .../cache/version/CacheVersionDefault.java | 23 +++++++++ .../zfoo/orm/cache/version/ICacheVersion.java | 48 ++++++++++++++++++ .../java/com/zfoo/orm/manager/OrmManager.java | 35 ++++--------- .../main/java/com/zfoo/orm/model/IEntity.java | 21 -------- .../java/com/zfoo/orm/entity/UserEntity.java | 17 +++++-- .../zfoo/protocol/util/ReflectionUtils.java | 10 +++- 9 files changed, 216 insertions(+), 65 deletions(-) create mode 100644 orm/src/main/java/com/zfoo/orm/anno/Version.java create mode 100644 orm/src/main/java/com/zfoo/orm/cache/version/CacheVersion.java create mode 100644 orm/src/main/java/com/zfoo/orm/cache/version/CacheVersionDefault.java create mode 100644 orm/src/main/java/com/zfoo/orm/cache/version/ICacheVersion.java diff --git a/orm/src/main/java/com/zfoo/orm/anno/Version.java b/orm/src/main/java/com/zfoo/orm/anno/Version.java new file mode 100644 index 00000000..9fc59d07 --- /dev/null +++ b/orm/src/main/java/com/zfoo/orm/anno/Version.java @@ -0,0 +1,29 @@ +/* + * Copyright (C) 2020 The zfoo Authors + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + */ + +package com.zfoo.orm.anno; + +import org.springframework.aot.hint.annotation.Reflective; + +import java.lang.annotation.*; + +/** + * 一个文档的写入到数据库的version版本,只有使用@EntityCache注解时才有用 + * + * @author godotg + */ +@Documented +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.FIELD}) +@Reflective +public @interface Version { +} diff --git a/orm/src/main/java/com/zfoo/orm/cache/EntityCache.java b/orm/src/main/java/com/zfoo/orm/cache/EntityCache.java index 427c79c7..4f3301be 100644 --- a/orm/src/main/java/com/zfoo/orm/cache/EntityCache.java +++ b/orm/src/main/java/com/zfoo/orm/cache/EntityCache.java @@ -18,15 +18,22 @@ import com.mongodb.client.model.Filters; import com.mongodb.client.model.ReplaceOneModel; import com.zfoo.event.manager.EventBus; import com.zfoo.orm.OrmContext; +import com.zfoo.orm.anno.Version; import com.zfoo.orm.cache.persister.IOrmPersister; import com.zfoo.orm.cache.persister.PNode; +import com.zfoo.orm.cache.version.CacheVersion; +import com.zfoo.orm.cache.version.CacheVersionDefault; +import com.zfoo.orm.cache.version.ICacheVersion; import com.zfoo.orm.model.EntityDef; import com.zfoo.orm.model.IEntity; import com.zfoo.orm.query.Page; +import com.zfoo.protocol.collection.ArrayUtils; import com.zfoo.protocol.collection.CollectionUtils; import com.zfoo.protocol.exception.RunException; import com.zfoo.protocol.model.Pair; import com.zfoo.protocol.util.AssertionUtils; +import com.zfoo.protocol.util.FieldUtils; +import com.zfoo.protocol.util.ReflectionUtils; import com.zfoo.protocol.util.ThreadUtils; import com.zfoo.scheduler.manager.SchedulerBus; import com.zfoo.scheduler.util.LazyCache; @@ -54,7 +61,20 @@ public class EntityCache, E extends IEntity> imple private final LazyCache> cache; + private ICacheVersion cacheVersion = new CacheVersionDefault<>(); + + public EntityCache(EntityDef entityDef) { + var clazz = entityDef.getClazz(); + // 创建CacheVersion + var versionFields = ReflectionUtils.getFieldsByAnnoInPOJOClass(clazz, Version.class); + if (ArrayUtils.isNotEmpty(versionFields)) { + var filed = versionFields[0]; + var getMethod = ReflectionUtils.getMethodByNameInPOJOClass(clazz, FieldUtils.fieldToGetMethod(clazz, filed)); + var setMethod = ReflectionUtils.getMethodByNameInPOJOClass(clazz, FieldUtils.fieldToSetMethod(clazz, filed), filed.getType()); + cacheVersion = new CacheVersion<>(filed.getName(), getMethod, setMethod); + } + var removeCallback = new BiConsumer>, LazyCache.RemovalCause>() { @Override public void accept(Pair> pair, LazyCache.RemovalCause removalCause) { @@ -77,11 +97,11 @@ public class EntityCache, E extends IEntity> imple public void run() { var collection = OrmContext.getOrmManager().getCollection(entityClass); - var version = entity.gvs(); - entity.svs(version + 1); + var version = cacheVersion.gvs(entity); + cacheVersion.svs(entity, version + 1); - var filter = entity.gvs() > 0 - ? Filters.and(Filters.eq("_id", entity.id()), Filters.eq("vs", version)) + var filter = cacheVersion.gvs(entity) > 0 + ? Filters.and(Filters.eq("_id", entity.id()), Filters.eq(cacheVersion.versionField(), version)) : Filters.eq("_id", entity.id()); var result = collection.replaceOne(filter, entity); if (result.getModifiedCount() <= 0) { @@ -316,11 +336,11 @@ public class EntityCache, E extends IEntity> imple var batchList = currentUpdateList.stream() .map(it -> { - var version = it.gvs(); - it.svs(version + 1); + var version = cacheVersion.gvs(it); + cacheVersion.svs(it, version + 1); - var filter = it.gvs() > 0 - ? Filters.and(Filters.eq("_id", it.id()), Filters.eq("vs", version)) + var filter = cacheVersion.gvs(it) > 0 + ? Filters.and(Filters.eq("_id", it.id()), Filters.eq(cacheVersion.versionField(), version)) : Filters.eq("_id", it.id()); return new ReplaceOneModel<>(filter, it); @@ -367,27 +387,29 @@ public class EntityCache, E extends IEntity> imple } // 如果没有版本号,则直接更新数据库 - if (entity.gvs() <= 0) { + var entityVersion = cacheVersion.gvs(entity); + var dbEntityVersion = cacheVersion.gvs(dbEntity); + if (entityVersion <= 0) { OrmContext.getAccessor().update(entity); continue; } // 如果版本号相同,说明已经更新到 - if (dbEntity.gvs() == entity.gvs()) { + if (dbEntityVersion == entityVersion) { continue; } // 如果数据库版本号较小,说明缓存的数据是最新的,直接写入数据库 - if (dbEntity.gvs() < entity.gvs()) { + if (dbEntityVersion < entityVersion) { OrmContext.getAccessor().update(entity); continue; } // 如果数据库版本号较大,说明缓存的数据不是最新的,直接清除缓存,下次重新加载 - if (dbEntity.gvs() > entity.gvs()) { + if (dbEntityVersion > entityVersion) { cache.remove(id); load(id); - logger.warn("[database:{}] document of entity [id:{}] version [{}] is greater than cache [vs:{}]", entityClass.getSimpleName(), id, dbEntity.gvs(), entity.gvs()); + logger.warn("[database:{}] document of entity [id:{}] version [{}] is greater than cache [vs:{}]", entityClass.getSimpleName(), id, dbEntityVersion, entityVersion); continue; } } diff --git a/orm/src/main/java/com/zfoo/orm/cache/version/CacheVersion.java b/orm/src/main/java/com/zfoo/orm/cache/version/CacheVersion.java new file mode 100644 index 00000000..ae928a59 --- /dev/null +++ b/orm/src/main/java/com/zfoo/orm/cache/version/CacheVersion.java @@ -0,0 +1,50 @@ +/* + * Copyright (C) 2020 The zfoo Authors + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + */ + +package com.zfoo.orm.cache.version; + +import com.zfoo.orm.model.IEntity; +import com.zfoo.protocol.util.ReflectionUtils; + +import java.lang.reflect.Method; + +/** + * @author godotg + */ +public class CacheVersion, E extends IEntity> implements ICacheVersion { + + private String versionFiled; + private Method getVersionMethod; + private Method setVersionMethod; + + public CacheVersion(String versionFiled, Method getVersionMethod, Method setVersionMethod) { + this.versionFiled = versionFiled; + this.getVersionMethod = getVersionMethod; + this.setVersionMethod = setVersionMethod; + } + + @Override + public String versionField() { + return versionFiled; + } + + @Override + public long gvs(E entity) { + return (long) ReflectionUtils.invokeMethod(entity, getVersionMethod); + } + + @Override + public void svs(E entity, long vs) { + ReflectionUtils.invokeMethod(entity, setVersionMethod, vs); + } + +} diff --git a/orm/src/main/java/com/zfoo/orm/cache/version/CacheVersionDefault.java b/orm/src/main/java/com/zfoo/orm/cache/version/CacheVersionDefault.java new file mode 100644 index 00000000..6bd2a375 --- /dev/null +++ b/orm/src/main/java/com/zfoo/orm/cache/version/CacheVersionDefault.java @@ -0,0 +1,23 @@ +/* + * Copyright (C) 2020 The zfoo Authors + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + */ + +package com.zfoo.orm.cache.version; + +import com.zfoo.orm.model.IEntity; + +/** + * @author godotg + */ +public class CacheVersionDefault, E extends IEntity> implements ICacheVersion { + + +} diff --git a/orm/src/main/java/com/zfoo/orm/cache/version/ICacheVersion.java b/orm/src/main/java/com/zfoo/orm/cache/version/ICacheVersion.java new file mode 100644 index 00000000..eaf3bd3e --- /dev/null +++ b/orm/src/main/java/com/zfoo/orm/cache/version/ICacheVersion.java @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2020 The zfoo Authors + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + */ + +package com.zfoo.orm.cache.version; + + +import com.zfoo.orm.model.IEntity; + +/** + * @author godotg + */ +public interface ICacheVersion, E extends IEntity> { + + default String versionField() { + return "version"; + } + + /** + * 一个文档的写入到数据库的version版本,version的get和set方法 + *

+ * 写入一条数据到数据库的时候会对比当前entity的vs和数据库的vs是不是一样,不是一样的话无法写入,一致的话就写入数据并且让vs自增+1. + * 主要是为了防止多个服务器去操作同一条数据,保证在分布式环境的数据一致性;简单的说就是一个数据版本号的简单实现,版本号一致才能写入数据。 + * 在分布式环境中,会存在有状态服务器,比如网关路由一个用户或玩家的数据到a服务器,这个时候你加了一个b服务器,有可能下一条数据就被路由到b。 + * 虽然用了一致性hash的负载均衡算法,但是一样有概率会让某些消息路由到不同服务器。 + * 这个时候版本号就可以保证只有一台服务器可以对数据库做操作,不用担心多个服务器去操作数据。 + * 这是一个容错的操作,真实环境下很少发生。为了高性能必须要把服务器做成有状态的,这个容错操作就是最后一道保证数据一致的方案。 + * mongodb更新的时候是原子的,并发更新同一条数据只有一条数据会写入,第二条数据写入的时候版本号已经不一致了,所以老版本号的数据无法写入。 + * 写入的时候只要发现版本号不一致,就让缓存失效,重新读取数据库最新的数据。 + *

+ * 分布式环境下,要想服务器性能好,就要做成有状态的,有状态就要遇到这种多服对同一条数据写入问题,这是一个千古难以解决的问题,进退两难。 + */ + default long gvs(E entity) { + return 0L; + } + + default void svs(E entity, long vs) { + } + +} diff --git a/orm/src/main/java/com/zfoo/orm/manager/OrmManager.java b/orm/src/main/java/com/zfoo/orm/manager/OrmManager.java index 6cb77861..86aa037e 100644 --- a/orm/src/main/java/com/zfoo/orm/manager/OrmManager.java +++ b/orm/src/main/java/com/zfoo/orm/manager/OrmManager.java @@ -407,7 +407,7 @@ public class OrmManager implements IOrmManager { // 校验id字段和id()方法的格式,一个Entity类只能有一个@Id注解 var idFields = ReflectionUtils.getFieldsByAnnoInPOJOClass(clazz, Id.class); AssertionUtils.isTrue(ArrayUtils.isNotEmpty(idFields) && idFields.length == 1 - , "The Entity[{}] must have only one Id annotation (if it is indeed marked with an Id annotation, be careful not to use the Stored Id annotation)", clazz.getSimpleName()); + , "The Entity[{}] must have only one @Id annotation (if it is indeed marked with an Id annotation, be careful not to use the Stored Id annotation)", clazz.getSimpleName()); var idField = idFields[0]; // idField必须用private修饰 AssertionUtils.isTrue(Modifier.isPrivate(idField.getModifiers()), "The id of the Entity[{}] must be private", clazz.getSimpleName()); @@ -450,32 +450,15 @@ public class OrmManager implements IOrmManager { AssertionUtils.isTrue(idFiledValue.equals(idMethodReturnValue), "The return value id [field:{}] of the Entity[{}] and the return value id [method:{}] are not equal, please check whether the id() method is implemented correctly" , clazz.getSimpleName(), idFiledValue, idMethodReturnValue); - // 校验gvs()方法和svs()方法的格式 - var gvsMethodOptional = Arrays.stream(ReflectionUtils.getAllMethods(clazz)) - .filter(it -> it.getName().equals("gvs")) - .filter(it -> it.getParameterCount() <= 0) - .findFirst(); - - var svsMethodOptional = Arrays.stream(ReflectionUtils.getAllMethods(clazz)) - .filter(it -> it.getName().equals("svs")) - .filter(it -> it.getParameterCount() == 1) - .filter(it -> it.getParameterTypes()[0].equals(long.class)) - .findFirst(); - // gvs和svs要实现都实现,不实现都不实现 - if (gvsMethodOptional.isEmpty() || svsMethodOptional.isEmpty()) { - // 实体类Entity的gvs和svs方法要实现都实现,不实现都不实现 - AssertionUtils.isTrue(gvsMethodOptional.isEmpty() && svsMethodOptional.isEmpty() - , "The gvs and svs methods of the Entity[{}] should be implemented together", clazz.getSimpleName()); - return; + // @Version标识的字段必须是long类型 + var versionFields = ReflectionUtils.getFieldsByAnnoInPOJOClass(clazz, Version.class); + if (ArrayUtils.isNotEmpty(versionFields)) { + AssertionUtils.isTrue(versionFields.length == 1,"The Entity[{}] must have only one @Version annotation", clazz.getSimpleName()); + var versionField = versionFields[0]; + // idField必须用private修饰 + AssertionUtils.isTrue(Modifier.isPrivate(versionField.getModifiers()), "The version of the Entity[{}] must be private", clazz.getSimpleName()); + AssertionUtils.isTrue(versionField.getType().equals(long.class), "The version type of the Entity[{}] must be long", clazz.getSimpleName()); } - - var gvsMethod = gvsMethodOptional.get(); - var svsMethod = svsMethodOptional.get(); - var vsValue = RandomUtils.randomLong(); - ReflectionUtils.invokeMethod(entityInstance, svsMethod, vsValue); - var gvsReturnValue = ReflectionUtils.invokeMethod(entityInstance, gvsMethod); - // 实体类Entity的gvs方法和svs方法定义格式不正确 - AssertionUtils.isTrue(gvsReturnValue.equals(vsValue), "The gvs and svs methods of the Entity[{}] are not correctly", clazz.getSimpleName()); } private static final Set> unsafeCollections = Set.of(List.class, ArrayList.class, LinkedList.class diff --git a/orm/src/main/java/com/zfoo/orm/model/IEntity.java b/orm/src/main/java/com/zfoo/orm/model/IEntity.java index 1a7c6a20..0a8f10db 100644 --- a/orm/src/main/java/com/zfoo/orm/model/IEntity.java +++ b/orm/src/main/java/com/zfoo/orm/model/IEntity.java @@ -24,27 +24,6 @@ public interface IEntity> { */ PK id(); - /** - * 一个文档的写入到数据库的version版本,version的get和set方法 - *

- * 写入一条数据到数据库的时候会对比当前entity的vs和数据库的vs是不是一样,不是一样的话无法写入,一致的话就写入数据并且让vs自增+1. - * 主要是为了防止多个服务器去操作同一条数据,保证在分布式环境的数据一致性;简单的说就是一个数据版本号的简单实现,版本号一致才能写入数据。 - * 在分布式环境中,会存在有状态服务器,比如网关路由一个用户或玩家的数据到a服务器,这个时候你加了一个b服务器,有可能下一条数据就被路由到b。 - * 虽然用了一致性hash的负载均衡算法,但是一样有概率会让某些消息路由到不同服务器。 - * 这个时候版本号就可以保证只有一台服务器可以对数据库做操作,不用担心多个服务器去操作数据。 - * 这是一个容错的操作,真实环境下很少发生。为了高性能必须要把服务器做成有状态的,这个容错操作就是最后一道保证数据一致的方案。 - * mongodb更新的时候是原子的,并发更新同一条数据只有一条数据会写入,第二条数据写入的时候版本号已经不一致了,所以老版本号的数据无法写入。 - * 写入的时候只要发现版本号不一致,就让缓存失效,重新读取数据库最新的数据。 - *

- * 分布式环境下,要想服务器性能好,就要做成有状态的,有状态就要遇到这种多服对同一条数据写入问题,这是一个千古难以解决的问题,进退两难。 - */ - default long gvs() { - return 0L; - } - - default void svs(long vs) { - } - /** * 判空:由于查询不存在时缓存中也会有一份,因此判断为空需要根据实际类型才能决定 * diff --git a/orm/src/test/java/com/zfoo/orm/entity/UserEntity.java b/orm/src/test/java/com/zfoo/orm/entity/UserEntity.java index f0290a0b..49c75db3 100644 --- a/orm/src/test/java/com/zfoo/orm/entity/UserEntity.java +++ b/orm/src/test/java/com/zfoo/orm/entity/UserEntity.java @@ -13,10 +13,7 @@ package com.zfoo.orm.entity; -import com.zfoo.orm.anno.EntityCache; -import com.zfoo.orm.anno.Id; -import com.zfoo.orm.anno.Index; -import com.zfoo.orm.anno.IndexText; +import com.zfoo.orm.anno.*; import com.zfoo.orm.model.IEntity; import java.util.List; @@ -50,6 +47,9 @@ public class UserEntity implements IEntity { @Index(ascending = false, unique = false) private List l; + @Version + private long version; + public UserEntity() { } @@ -132,6 +132,14 @@ public class UserEntity implements IEntity { this.l = l; } + public long getVersion() { + return version; + } + + public void setVersion(long version) { + this.version = version; + } + @Override public String toString() { return "UserEntity{" + @@ -143,6 +151,7 @@ public class UserEntity implements IEntity { ", e='" + e + '\'' + ", f='" + f + '\'' + ", l=" + l + + ", version=" + version + '}'; } } diff --git a/protocol/src/main/java/com/zfoo/protocol/util/ReflectionUtils.java b/protocol/src/main/java/com/zfoo/protocol/util/ReflectionUtils.java index 3d4fdea8..6160b7af 100644 --- a/protocol/src/main/java/com/zfoo/protocol/util/ReflectionUtils.java +++ b/protocol/src/main/java/com/zfoo/protocol/util/ReflectionUtils.java @@ -173,6 +173,14 @@ public abstract class ReflectionUtils { return ArrayUtils.listToArray(list, Method.class); } + public static Method getMethodByNameInPOJOClass(Class clazz, String methodName, Class... parameterTypes) { + try { + return clazz.getDeclaredMethod(methodName, parameterTypes); + } catch (NoSuchMethodException e) { + throw new IllegalStateException(StringUtils.format("[class:{}] has no [method:{}] exception", clazz, methodName), e); + } + } + /** * Attempt to get all Methods on the supplied class. @@ -210,7 +218,7 @@ public abstract class ReflectionUtils { } } - public static T newInstance(Constructor constructor, Object ... initargs) { + public static T newInstance(Constructor constructor, Object... initargs) { try { return constructor.newInstance(initargs); } catch (Exception e) {