mirror of
https://github.com/tiennm99/zfoo.git
synced 2026-05-21 06:24:38 +00:00
ref[version]: The gvs() and svs() methods have been removed from the orm and replaced with @Version annotations
This commit is contained in:
@@ -0,0 +1,29 @@
|
||||
/*
|
||||
* Copyright (C) 2020 The zfoo Authors
|
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and limitations under the License.
|
||||
*/
|
||||
|
||||
package com.zfoo.orm.anno;
|
||||
|
||||
import org.springframework.aot.hint.annotation.Reflective;
|
||||
|
||||
import java.lang.annotation.*;
|
||||
|
||||
/**
|
||||
* 一个文档的写入到数据库的version版本,只有使用@EntityCache注解时才有用
|
||||
*
|
||||
* @author godotg
|
||||
*/
|
||||
@Documented
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
@Target({ElementType.FIELD})
|
||||
@Reflective
|
||||
public @interface Version {
|
||||
}
|
||||
+35
-13
@@ -18,15 +18,22 @@ import com.mongodb.client.model.Filters;
|
||||
import com.mongodb.client.model.ReplaceOneModel;
|
||||
import com.zfoo.event.manager.EventBus;
|
||||
import com.zfoo.orm.OrmContext;
|
||||
import com.zfoo.orm.anno.Version;
|
||||
import com.zfoo.orm.cache.persister.IOrmPersister;
|
||||
import com.zfoo.orm.cache.persister.PNode;
|
||||
import com.zfoo.orm.cache.version.CacheVersion;
|
||||
import com.zfoo.orm.cache.version.CacheVersionDefault;
|
||||
import com.zfoo.orm.cache.version.ICacheVersion;
|
||||
import com.zfoo.orm.model.EntityDef;
|
||||
import com.zfoo.orm.model.IEntity;
|
||||
import com.zfoo.orm.query.Page;
|
||||
import com.zfoo.protocol.collection.ArrayUtils;
|
||||
import com.zfoo.protocol.collection.CollectionUtils;
|
||||
import com.zfoo.protocol.exception.RunException;
|
||||
import com.zfoo.protocol.model.Pair;
|
||||
import com.zfoo.protocol.util.AssertionUtils;
|
||||
import com.zfoo.protocol.util.FieldUtils;
|
||||
import com.zfoo.protocol.util.ReflectionUtils;
|
||||
import com.zfoo.protocol.util.ThreadUtils;
|
||||
import com.zfoo.scheduler.manager.SchedulerBus;
|
||||
import com.zfoo.scheduler.util.LazyCache;
|
||||
@@ -54,7 +61,20 @@ public class EntityCache<PK extends Comparable<PK>, E extends IEntity<PK>> imple
|
||||
|
||||
private final LazyCache<PK, PNode<E>> cache;
|
||||
|
||||
private ICacheVersion<PK, E> cacheVersion = new CacheVersionDefault<>();
|
||||
|
||||
|
||||
public EntityCache(EntityDef entityDef) {
|
||||
var clazz = entityDef.getClazz();
|
||||
// 创建CacheVersion
|
||||
var versionFields = ReflectionUtils.getFieldsByAnnoInPOJOClass(clazz, Version.class);
|
||||
if (ArrayUtils.isNotEmpty(versionFields)) {
|
||||
var filed = versionFields[0];
|
||||
var getMethod = ReflectionUtils.getMethodByNameInPOJOClass(clazz, FieldUtils.fieldToGetMethod(clazz, filed));
|
||||
var setMethod = ReflectionUtils.getMethodByNameInPOJOClass(clazz, FieldUtils.fieldToSetMethod(clazz, filed), filed.getType());
|
||||
cacheVersion = new CacheVersion<>(filed.getName(), getMethod, setMethod);
|
||||
}
|
||||
|
||||
var removeCallback = new BiConsumer<Pair<PK, PNode<E>>, LazyCache.RemovalCause>() {
|
||||
@Override
|
||||
public void accept(Pair<PK, PNode<E>> pair, LazyCache.RemovalCause removalCause) {
|
||||
@@ -77,11 +97,11 @@ public class EntityCache<PK extends Comparable<PK>, E extends IEntity<PK>> imple
|
||||
public void run() {
|
||||
var collection = OrmContext.getOrmManager().getCollection(entityClass);
|
||||
|
||||
var version = entity.gvs();
|
||||
entity.svs(version + 1);
|
||||
var version = cacheVersion.gvs(entity);
|
||||
cacheVersion.svs(entity, version + 1);
|
||||
|
||||
var filter = entity.gvs() > 0
|
||||
? Filters.and(Filters.eq("_id", entity.id()), Filters.eq("vs", version))
|
||||
var filter = cacheVersion.gvs(entity) > 0
|
||||
? Filters.and(Filters.eq("_id", entity.id()), Filters.eq(cacheVersion.versionField(), version))
|
||||
: Filters.eq("_id", entity.id());
|
||||
var result = collection.replaceOne(filter, entity);
|
||||
if (result.getModifiedCount() <= 0) {
|
||||
@@ -316,11 +336,11 @@ public class EntityCache<PK extends Comparable<PK>, E extends IEntity<PK>> imple
|
||||
|
||||
var batchList = currentUpdateList.stream()
|
||||
.map(it -> {
|
||||
var version = it.gvs();
|
||||
it.svs(version + 1);
|
||||
var version = cacheVersion.gvs(it);
|
||||
cacheVersion.svs(it, version + 1);
|
||||
|
||||
var filter = it.gvs() > 0
|
||||
? Filters.and(Filters.eq("_id", it.id()), Filters.eq("vs", version))
|
||||
var filter = cacheVersion.gvs(it) > 0
|
||||
? Filters.and(Filters.eq("_id", it.id()), Filters.eq(cacheVersion.versionField(), version))
|
||||
: Filters.eq("_id", it.id());
|
||||
|
||||
return new ReplaceOneModel<>(filter, it);
|
||||
@@ -367,27 +387,29 @@ public class EntityCache<PK extends Comparable<PK>, E extends IEntity<PK>> imple
|
||||
}
|
||||
|
||||
// 如果没有版本号,则直接更新数据库
|
||||
if (entity.gvs() <= 0) {
|
||||
var entityVersion = cacheVersion.gvs(entity);
|
||||
var dbEntityVersion = cacheVersion.gvs(dbEntity);
|
||||
if (entityVersion <= 0) {
|
||||
OrmContext.getAccessor().update(entity);
|
||||
continue;
|
||||
}
|
||||
|
||||
// 如果版本号相同,说明已经更新到
|
||||
if (dbEntity.gvs() == entity.gvs()) {
|
||||
if (dbEntityVersion == entityVersion) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// 如果数据库版本号较小,说明缓存的数据是最新的,直接写入数据库
|
||||
if (dbEntity.gvs() < entity.gvs()) {
|
||||
if (dbEntityVersion < entityVersion) {
|
||||
OrmContext.getAccessor().update(entity);
|
||||
continue;
|
||||
}
|
||||
|
||||
// 如果数据库版本号较大,说明缓存的数据不是最新的,直接清除缓存,下次重新加载
|
||||
if (dbEntity.gvs() > entity.gvs()) {
|
||||
if (dbEntityVersion > entityVersion) {
|
||||
cache.remove(id);
|
||||
load(id);
|
||||
logger.warn("[database:{}] document of entity [id:{}] version [{}] is greater than cache [vs:{}]", entityClass.getSimpleName(), id, dbEntity.gvs(), entity.gvs());
|
||||
logger.warn("[database:{}] document of entity [id:{}] version [{}] is greater than cache [vs:{}]", entityClass.getSimpleName(), id, dbEntityVersion, entityVersion);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,50 @@
|
||||
/*
|
||||
* Copyright (C) 2020 The zfoo Authors
|
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and limitations under the License.
|
||||
*/
|
||||
|
||||
package com.zfoo.orm.cache.version;
|
||||
|
||||
import com.zfoo.orm.model.IEntity;
|
||||
import com.zfoo.protocol.util.ReflectionUtils;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
|
||||
/**
|
||||
* @author godotg
|
||||
*/
|
||||
public class CacheVersion<PK extends Comparable<PK>, E extends IEntity<PK>> implements ICacheVersion<PK, E> {
|
||||
|
||||
private String versionFiled;
|
||||
private Method getVersionMethod;
|
||||
private Method setVersionMethod;
|
||||
|
||||
public CacheVersion(String versionFiled, Method getVersionMethod, Method setVersionMethod) {
|
||||
this.versionFiled = versionFiled;
|
||||
this.getVersionMethod = getVersionMethod;
|
||||
this.setVersionMethod = setVersionMethod;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String versionField() {
|
||||
return versionFiled;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long gvs(E entity) {
|
||||
return (long) ReflectionUtils.invokeMethod(entity, getVersionMethod);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void svs(E entity, long vs) {
|
||||
ReflectionUtils.invokeMethod(entity, setVersionMethod, vs);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,23 @@
|
||||
/*
|
||||
* Copyright (C) 2020 The zfoo Authors
|
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and limitations under the License.
|
||||
*/
|
||||
|
||||
package com.zfoo.orm.cache.version;
|
||||
|
||||
import com.zfoo.orm.model.IEntity;
|
||||
|
||||
/**
|
||||
* @author godotg
|
||||
*/
|
||||
public class CacheVersionDefault<PK extends Comparable<PK>, E extends IEntity<PK>> implements ICacheVersion<PK, E> {
|
||||
|
||||
|
||||
}
|
||||
@@ -0,0 +1,48 @@
|
||||
/*
|
||||
* Copyright (C) 2020 The zfoo Authors
|
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and limitations under the License.
|
||||
*/
|
||||
|
||||
package com.zfoo.orm.cache.version;
|
||||
|
||||
|
||||
import com.zfoo.orm.model.IEntity;
|
||||
|
||||
/**
|
||||
* @author godotg
|
||||
*/
|
||||
public interface ICacheVersion<PK extends Comparable<PK>, E extends IEntity<PK>> {
|
||||
|
||||
default String versionField() {
|
||||
return "version";
|
||||
}
|
||||
|
||||
/**
|
||||
* 一个文档的写入到数据库的version版本,version的get和set方法
|
||||
* <p>
|
||||
* 写入一条数据到数据库的时候会对比当前entity的vs和数据库的vs是不是一样,不是一样的话无法写入,一致的话就写入数据并且让vs自增+1.
|
||||
* 主要是为了防止多个服务器去操作同一条数据,保证在分布式环境的数据一致性;简单的说就是一个数据版本号的简单实现,版本号一致才能写入数据。
|
||||
* 在分布式环境中,会存在有状态服务器,比如网关路由一个用户或玩家的数据到a服务器,这个时候你加了一个b服务器,有可能下一条数据就被路由到b。
|
||||
* 虽然用了一致性hash的负载均衡算法,但是一样有概率会让某些消息路由到不同服务器。
|
||||
* 这个时候版本号就可以保证只有一台服务器可以对数据库做操作,不用担心多个服务器去操作数据。
|
||||
* 这是一个容错的操作,真实环境下很少发生。为了高性能必须要把服务器做成有状态的,这个容错操作就是最后一道保证数据一致的方案。
|
||||
* mongodb更新的时候是原子的,并发更新同一条数据只有一条数据会写入,第二条数据写入的时候版本号已经不一致了,所以老版本号的数据无法写入。
|
||||
* 写入的时候只要发现版本号不一致,就让缓存失效,重新读取数据库最新的数据。
|
||||
* <p>
|
||||
* 分布式环境下,要想服务器性能好,就要做成有状态的,有状态就要遇到这种多服对同一条数据写入问题,这是一个千古难以解决的问题,进退两难。
|
||||
*/
|
||||
default long gvs(E entity) {
|
||||
return 0L;
|
||||
}
|
||||
|
||||
default void svs(E entity, long vs) {
|
||||
}
|
||||
|
||||
}
|
||||
@@ -407,7 +407,7 @@ public class OrmManager implements IOrmManager {
|
||||
// 校验id字段和id()方法的格式,一个Entity类只能有一个@Id注解
|
||||
var idFields = ReflectionUtils.getFieldsByAnnoInPOJOClass(clazz, Id.class);
|
||||
AssertionUtils.isTrue(ArrayUtils.isNotEmpty(idFields) && idFields.length == 1
|
||||
, "The Entity[{}] must have only one Id annotation (if it is indeed marked with an Id annotation, be careful not to use the Stored Id annotation)", clazz.getSimpleName());
|
||||
, "The Entity[{}] must have only one @Id annotation (if it is indeed marked with an Id annotation, be careful not to use the Stored Id annotation)", clazz.getSimpleName());
|
||||
var idField = idFields[0];
|
||||
// idField必须用private修饰
|
||||
AssertionUtils.isTrue(Modifier.isPrivate(idField.getModifiers()), "The id of the Entity[{}] must be private", clazz.getSimpleName());
|
||||
@@ -450,32 +450,15 @@ public class OrmManager implements IOrmManager {
|
||||
AssertionUtils.isTrue(idFiledValue.equals(idMethodReturnValue), "The return value id [field:{}] of the Entity[{}] and the return value id [method:{}] are not equal, please check whether the id() method is implemented correctly"
|
||||
, clazz.getSimpleName(), idFiledValue, idMethodReturnValue);
|
||||
|
||||
// 校验gvs()方法和svs()方法的格式
|
||||
var gvsMethodOptional = Arrays.stream(ReflectionUtils.getAllMethods(clazz))
|
||||
.filter(it -> it.getName().equals("gvs"))
|
||||
.filter(it -> it.getParameterCount() <= 0)
|
||||
.findFirst();
|
||||
|
||||
var svsMethodOptional = Arrays.stream(ReflectionUtils.getAllMethods(clazz))
|
||||
.filter(it -> it.getName().equals("svs"))
|
||||
.filter(it -> it.getParameterCount() == 1)
|
||||
.filter(it -> it.getParameterTypes()[0].equals(long.class))
|
||||
.findFirst();
|
||||
// gvs和svs要实现都实现,不实现都不实现
|
||||
if (gvsMethodOptional.isEmpty() || svsMethodOptional.isEmpty()) {
|
||||
// 实体类Entity的gvs和svs方法要实现都实现,不实现都不实现
|
||||
AssertionUtils.isTrue(gvsMethodOptional.isEmpty() && svsMethodOptional.isEmpty()
|
||||
, "The gvs and svs methods of the Entity[{}] should be implemented together", clazz.getSimpleName());
|
||||
return;
|
||||
// @Version标识的字段必须是long类型
|
||||
var versionFields = ReflectionUtils.getFieldsByAnnoInPOJOClass(clazz, Version.class);
|
||||
if (ArrayUtils.isNotEmpty(versionFields)) {
|
||||
AssertionUtils.isTrue(versionFields.length == 1,"The Entity[{}] must have only one @Version annotation", clazz.getSimpleName());
|
||||
var versionField = versionFields[0];
|
||||
// idField必须用private修饰
|
||||
AssertionUtils.isTrue(Modifier.isPrivate(versionField.getModifiers()), "The version of the Entity[{}] must be private", clazz.getSimpleName());
|
||||
AssertionUtils.isTrue(versionField.getType().equals(long.class), "The version type of the Entity[{}] must be long", clazz.getSimpleName());
|
||||
}
|
||||
|
||||
var gvsMethod = gvsMethodOptional.get();
|
||||
var svsMethod = svsMethodOptional.get();
|
||||
var vsValue = RandomUtils.randomLong();
|
||||
ReflectionUtils.invokeMethod(entityInstance, svsMethod, vsValue);
|
||||
var gvsReturnValue = ReflectionUtils.invokeMethod(entityInstance, gvsMethod);
|
||||
// 实体类Entity的gvs方法和svs方法定义格式不正确
|
||||
AssertionUtils.isTrue(gvsReturnValue.equals(vsValue), "The gvs and svs methods of the Entity[{}] are not correctly", clazz.getSimpleName());
|
||||
}
|
||||
|
||||
private static final Set<Class<?>> unsafeCollections = Set.of(List.class, ArrayList.class, LinkedList.class
|
||||
|
||||
@@ -24,27 +24,6 @@ public interface IEntity<PK extends Comparable<PK>> {
|
||||
*/
|
||||
PK id();
|
||||
|
||||
/**
|
||||
* 一个文档的写入到数据库的version版本,version的get和set方法
|
||||
* <p>
|
||||
* 写入一条数据到数据库的时候会对比当前entity的vs和数据库的vs是不是一样,不是一样的话无法写入,一致的话就写入数据并且让vs自增+1.
|
||||
* 主要是为了防止多个服务器去操作同一条数据,保证在分布式环境的数据一致性;简单的说就是一个数据版本号的简单实现,版本号一致才能写入数据。
|
||||
* 在分布式环境中,会存在有状态服务器,比如网关路由一个用户或玩家的数据到a服务器,这个时候你加了一个b服务器,有可能下一条数据就被路由到b。
|
||||
* 虽然用了一致性hash的负载均衡算法,但是一样有概率会让某些消息路由到不同服务器。
|
||||
* 这个时候版本号就可以保证只有一台服务器可以对数据库做操作,不用担心多个服务器去操作数据。
|
||||
* 这是一个容错的操作,真实环境下很少发生。为了高性能必须要把服务器做成有状态的,这个容错操作就是最后一道保证数据一致的方案。
|
||||
* mongodb更新的时候是原子的,并发更新同一条数据只有一条数据会写入,第二条数据写入的时候版本号已经不一致了,所以老版本号的数据无法写入。
|
||||
* 写入的时候只要发现版本号不一致,就让缓存失效,重新读取数据库最新的数据。
|
||||
* <p>
|
||||
* 分布式环境下,要想服务器性能好,就要做成有状态的,有状态就要遇到这种多服对同一条数据写入问题,这是一个千古难以解决的问题,进退两难。
|
||||
*/
|
||||
default long gvs() {
|
||||
return 0L;
|
||||
}
|
||||
|
||||
default void svs(long vs) {
|
||||
}
|
||||
|
||||
/**
|
||||
* 判空:由于查询不存在时缓存中也会有一份,因此判断为空需要根据实际类型才能决定
|
||||
*
|
||||
|
||||
@@ -13,10 +13,7 @@
|
||||
|
||||
package com.zfoo.orm.entity;
|
||||
|
||||
import com.zfoo.orm.anno.EntityCache;
|
||||
import com.zfoo.orm.anno.Id;
|
||||
import com.zfoo.orm.anno.Index;
|
||||
import com.zfoo.orm.anno.IndexText;
|
||||
import com.zfoo.orm.anno.*;
|
||||
import com.zfoo.orm.model.IEntity;
|
||||
|
||||
import java.util.List;
|
||||
@@ -50,6 +47,9 @@ public class UserEntity implements IEntity<Long> {
|
||||
@Index(ascending = false, unique = false)
|
||||
private List<Integer> l;
|
||||
|
||||
@Version
|
||||
private long version;
|
||||
|
||||
public UserEntity() {
|
||||
}
|
||||
|
||||
@@ -132,6 +132,14 @@ public class UserEntity implements IEntity<Long> {
|
||||
this.l = l;
|
||||
}
|
||||
|
||||
public long getVersion() {
|
||||
return version;
|
||||
}
|
||||
|
||||
public void setVersion(long version) {
|
||||
this.version = version;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "UserEntity{" +
|
||||
@@ -143,6 +151,7 @@ public class UserEntity implements IEntity<Long> {
|
||||
", e='" + e + '\'' +
|
||||
", f='" + f + '\'' +
|
||||
", l=" + l +
|
||||
", version=" + version +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
||||
@@ -173,6 +173,14 @@ public abstract class ReflectionUtils {
|
||||
return ArrayUtils.listToArray(list, Method.class);
|
||||
}
|
||||
|
||||
public static Method getMethodByNameInPOJOClass(Class<?> clazz, String methodName, Class<?>... parameterTypes) {
|
||||
try {
|
||||
return clazz.getDeclaredMethod(methodName, parameterTypes);
|
||||
} catch (NoSuchMethodException e) {
|
||||
throw new IllegalStateException(StringUtils.format("[class:{}] has no [method:{}] exception", clazz, methodName), e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Attempt to get all Methods on the supplied class.
|
||||
@@ -210,7 +218,7 @@ public abstract class ReflectionUtils {
|
||||
}
|
||||
}
|
||||
|
||||
public static <T> T newInstance(Constructor<T> constructor, Object ... initargs) {
|
||||
public static <T> T newInstance(Constructor<T> constructor, Object... initargs) {
|
||||
try {
|
||||
return constructor.newInstance(initargs);
|
||||
} catch (Exception e) {
|
||||
|
||||
Reference in New Issue
Block a user