ref[orm]: persist entity in same update thread

This commit is contained in:
godotg
2024-06-25 12:44:21 +08:00
parent b23ded4fbf
commit ddd292b6d2
7 changed files with 77 additions and 52 deletions
@@ -46,7 +46,7 @@ public class OrmContext implements ApplicationListener<ApplicationContextEvent>,
private IOrmManager ormManager;
private boolean stop = false;
private volatile boolean stop = false;
public static ApplicationContext getApplicationContext() {
return instance.applicationContext;
+41 -21
View File
@@ -34,6 +34,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
@@ -205,18 +206,14 @@ public class EntityCache<PK extends Comparable<PK>, E extends IEntity<PK>> imple
public void updateNow(E entity) {
var cachePnode = fetchCachePnode(entity, true);
OrmContext.getAccessor().update(cachePnode.getEntity());
var now = TimeUtils.now();
cachePnode.setWriteToDbTime(now);
cachePnode.setModifiedTime(now);
cachePnode.resetTime(TimeUtils.now());
}
@Override
public void updateUnsafeNow(E entity) {
var cachePnode = fetchCachePnode(entity, false);
OrmContext.getAccessor().update(cachePnode.getEntity());
var now = TimeUtils.now();
cachePnode.setWriteToDbTime(now);
cachePnode.setModifiedTime(now);
cachePnode.resetTime(TimeUtils.now());
}
@Override
@@ -238,9 +235,7 @@ public class EntityCache<PK extends Comparable<PK>, E extends IEntity<PK>> imple
if (pnode.getModifiedTime() == pnode.getWriteToDbTime()) {
return;
}
var currentTime = TimeUtils.currentTimeMillis();
pnode.setWriteToDbTime(currentTime);
pnode.setModifiedTime(currentTime);
pnode.resetTime(TimeUtils.currentTimeMillis());
var updateList = new ArrayList<E>();
updateList.add(pnode.getEntity());
doPersist(updateList, entityClass);
@@ -252,21 +247,46 @@ public class EntityCache<PK extends Comparable<PK>, E extends IEntity<PK>> imple
@SuppressWarnings("unchecked")
var entityClass = (Class<E>) entityDef.getClazz();
var updateList = new ArrayList<E>();
var currentTime = TimeUtils.currentTimeMillis();
cache.forEach(new BiConsumer<PK, PNode<E>>() {
@Override
public void accept(PK pk, PNode<E> pnode) {
var entity = pnode.getEntity();
if (pnode.getModifiedTime() != pnode.getWriteToDbTime()) {
pnode.setWriteToDbTime(currentTime);
pnode.setModifiedTime(currentTime);
updateList.add(entity);
if (entityDef.hasUnsafeCollection()) {
// key为threadId
var updateMap = new HashMap<Long, List<E>>();
cache.forEach(new BiConsumer<PK, PNode<E>>() {
@Override
public void accept(PK pk, PNode<E> pnode) {
var entity = pnode.getEntity();
if (pnode.getModifiedTime() != pnode.getWriteToDbTime()) {
pnode.resetTime(currentTime);
var updateList = updateMap.computeIfAbsent(pnode.getThreadId(), it -> new ArrayList<>());
updateList.add(entity);
}
}
});
for(var entry : updateMap.entrySet()) {
var threadId = entry.getKey();
var updateList = entry.getValue();
var executor = ThreadUtils.executorByThreadId(threadId);
if (executor == null) {
EventBus.asyncExecute(entityClass.hashCode(), () -> doPersist(updateList, entityClass));
} else {
executor.execute(() -> doPersist(updateList, entityClass));
}
}
});
doPersist(updateList, entityClass);
} else {
var updateList = new ArrayList<E>();
cache.forEach(new BiConsumer<PK, PNode<E>>() {
@Override
public void accept(PK pk, PNode<E> pnode) {
var entity = pnode.getEntity();
if (pnode.getModifiedTime() != pnode.getWriteToDbTime()) {
pnode.resetTime(currentTime);
updateList.add(entity);
}
}
});
EventBus.asyncExecute(entityClass.hashCode(), () -> doPersist(updateList, entityClass));
}
}
private void doPersist(List<E> updateList, Class<E> entityClass) {
@@ -75,10 +75,8 @@ public class CronOrmPersister extends AbstractOrmPersister {
if (!OrmContext.isStop()) {
SchedulerBus.schedule(() -> {
if (!OrmContext.isStop()) {
EventBus.asyncExecute(entityDef.getClazz().hashCode(), () -> {
entityCaches.persistAll();
schedulePersist();
});
entityCaches.persistAll();
schedulePersist();
}
}, delay, TimeUnit.MILLISECONDS);
}
+5 -3
View File
@@ -37,10 +37,12 @@ public class PNode<E extends IEntity<?>> {
public PNode(E entity) {
this.entity = entity;
resetTime(TimeUtils.now());
}
var currentTime = TimeUtils.now();
this.writeToDbTime = currentTime;
this.modifiedTime = currentTime;
public void resetTime(long timestamp) {
this.writeToDbTime = timestamp;
this.modifiedTime = timestamp;
}
public E getEntity() {
@@ -44,7 +44,7 @@ public class TimeOrmPersister extends AbstractOrmPersister {
public void start() {
SchedulerBus.scheduleAtFixedRate(() -> {
if (!OrmContext.isStop()) {
EventBus.asyncExecute(entityDef.getClazz().hashCode(), () -> entityCaches.persistAll());
entityCaches.persistAll();
}
}, rate, TimeUnit.MILLISECONDS);
}
@@ -328,9 +328,10 @@ public class OrmManager implements IOrmManager {
public EntityDef parserEntityDef(Class<? extends IEntity<?>> clazz) {
if (!GraalVmUtils.isGraalVM()) {
analyze(clazz);
}
checkEntity(clazz);
// 校验entity格式
var hasUnsafeCollection = hasUnsafeCollection(clazz);
var cacheStrategies = ormConfig.getCaches();
var persisterStrategies = ormConfig.getPersisters();
@@ -378,19 +379,16 @@ public class OrmManager implements IOrmManager {
indexTextDefMap.put(field.getName(), indexTextDef);
}
return EntityDef.valueOf(idField, clazz, cacheSize, expireMillisecond, persisterStrategy, indexDefMap, indexTextDefMap);
return EntityDef.valueOf(idField, clazz, hasUnsafeCollection, cacheSize, expireMillisecond, persisterStrategy, indexDefMap, indexTextDefMap);
}
private void analyze(Class<?> clazz) {
private void checkEntity(Class<?> clazz) {
// 是否实现了IEntity接口
AssertionUtils.isTrue(IEntity.class.isAssignableFrom(clazz), "The entity:[{}] annotated by the [{}] annotation does not implement the interface [{}]"
, com.zfoo.orm.anno.EntityCache.class.getName(), clazz.getCanonicalName(), IEntity.class.getCanonicalName());
// 实体类Entity必须被注解EntityCache标注
AssertionUtils.notNull(clazz.getAnnotation(com.zfoo.orm.anno.EntityCache.class), "The Entity[{}] must be annotated with the annotation [{}].", clazz.getCanonicalName(), com.zfoo.orm.anno.EntityCache.class.getName());
// 校验entity格式
checkEntity(clazz);
// 校验id字段和id()方法的格式,一个Entity类只能有一个@Id注解
var idFields = ReflectionUtils.getFieldsByAnnoInPOJOClass(clazz, Id.class);
AssertionUtils.isTrue(ArrayUtils.isNotEmpty(idFields) && idFields.length == 1
@@ -465,9 +463,11 @@ public class OrmManager implements IOrmManager {
AssertionUtils.isTrue(gvsReturnValue.equals(vsValue), "The gvs and svs methods of the Entity[{}] are not correctly", clazz.getSimpleName());
}
private static final Set<Class<?>> unsafeCollections = Set.of(List.class, ArrayList.class, LinkedList.class, Set.class, HashSet.class, TreeSet.class, Map.class, HashMap.class, TreeMap.class);
private static final Set<Class<?>> unsafeCollections = Set.of(List.class, ArrayList.class, LinkedList.class
, Set.class, HashSet.class, TreeSet.class,
Map.class, HashMap.class, LinkedHashMap.class, TreeMap.class);
private void checkEntity(Class<?> clazz) {
private boolean hasUnsafeCollection(Class<?> clazz) {
// 是否为一个简单的javabean,为了防止不同层对象混用造成潜在的并发问题,特别是网络层和po层混用
ReflectionUtils.assertIsPojoClass(clazz);
// 不能是泛型类
@@ -484,8 +484,9 @@ public class OrmManager implements IOrmManager {
var filedList = ReflectionUtils.notStaticAndTransientFields(clazz);
var hasUnsafeCollection = false;
for (var field : filedList) {
var hasUnsafeCollection = false;
// entity必须包含属性的get和set方法
FieldUtils.fieldToGetMethod(clazz, field);
@@ -549,13 +550,11 @@ public class OrmManager implements IOrmManager {
} else if (ObjectId.class.isAssignableFrom(fieldType)) {
// do nothing
} else {
checkEntity(fieldType);
hasUnsafeCollection |= hasUnsafeCollection(fieldType);
}
if (hasUnsafeCollection) {
logger.warn("class[{}] field:[{}] is not concurrent collection, use concurrent collection instead or not use @EntityCache annotation", clazz.getSimpleName(), field.getName());
}
}
return hasUnsafeCollection;
}
@@ -591,8 +590,7 @@ public class OrmManager implements IOrmManager {
// ORM不支持集合嵌套数组类型
throw new RunException("ORMs do not support the combination of arrays and collections with the [type:{}] type", type);
} else {
checkEntity(clazz);
return true;
return hasUnsafeCollection(clazz);
}
}
throw new RunException("[type:{}] is incorrect", type);
@@ -28,6 +28,8 @@ public class EntityDef {
private Class<? extends IEntity<?>> clazz;
private boolean hasUnsafeCollection;
private int cacheSize;
private long expireMillisecond;
@@ -38,11 +40,13 @@ public class EntityDef {
private Map<String, IndexTextDef> indexTextDefMap;
public static EntityDef valueOf(Field idField, Class<? extends IEntity<?>> clazz, int cacheSize, long expireMillisecond
public static EntityDef valueOf(Field idField, Class<? extends IEntity<?>> clazz, boolean hasUnsafeCollection, int cacheSize, long expireMillisecond
, PersisterStrategy persisterStrategy, Map<String, IndexDef> indexDefMap, Map<String, IndexTextDef> indexTextDefMap) {
var entityDef = new EntityDef();
entityDef.idField = idField;
entityDef.clazz = clazz;
entityDef.hasUnsafeCollection = hasUnsafeCollection;
entityDef.cacheSize = cacheSize;
entityDef.expireMillisecond = expireMillisecond;
entityDef.persisterStrategy = persisterStrategy;
@@ -51,10 +55,6 @@ public class EntityDef {
return entityDef;
}
public Class<? extends IEntity<?>> getClazz() {
return clazz;
}
public IEntity<?> newEmptyEntity() {
var entity = ReflectionUtils.newInstance(clazz);
return entity;
@@ -67,6 +67,13 @@ public class EntityDef {
ReflectionUtils.setField(idFields[0], entity, id);
return entity;
}
public Class<? extends IEntity<?>> getClazz() {
return clazz;
}
public boolean hasUnsafeCollection() {
return hasUnsafeCollection;
}
public int getCacheSize() {
return cacheSize;