From ddd292b6d2013f6d599bbbf99b53cddc6fba9e8f Mon Sep 17 00:00:00 2001 From: godotg Date: Tue, 25 Jun 2024 12:44:21 +0800 Subject: [PATCH] ref[orm]: persist entity in same update thread --- .../main/java/com/zfoo/orm/OrmContext.java | 2 +- .../java/com/zfoo/orm/cache/EntityCache.java | 62 ++++++++++++------- .../orm/cache/persister/CronOrmPersister.java | 6 +- .../com/zfoo/orm/cache/persister/PNode.java | 8 ++- .../orm/cache/persister/TimeOrmPersister.java | 2 +- .../java/com/zfoo/orm/manager/OrmManager.java | 32 +++++----- .../java/com/zfoo/orm/model/EntityDef.java | 17 +++-- 7 files changed, 77 insertions(+), 52 deletions(-) diff --git a/orm/src/main/java/com/zfoo/orm/OrmContext.java b/orm/src/main/java/com/zfoo/orm/OrmContext.java index 6ef7a66f..2e729d25 100644 --- a/orm/src/main/java/com/zfoo/orm/OrmContext.java +++ b/orm/src/main/java/com/zfoo/orm/OrmContext.java @@ -46,7 +46,7 @@ public class OrmContext implements ApplicationListener, private IOrmManager ormManager; - private boolean stop = false; + private volatile boolean stop = false; public static ApplicationContext getApplicationContext() { return instance.applicationContext; diff --git a/orm/src/main/java/com/zfoo/orm/cache/EntityCache.java b/orm/src/main/java/com/zfoo/orm/cache/EntityCache.java index bc8aacfc..30e06ff7 100644 --- a/orm/src/main/java/com/zfoo/orm/cache/EntityCache.java +++ b/orm/src/main/java/com/zfoo/orm/cache/EntityCache.java @@ -34,6 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.function.BiConsumer; import java.util.stream.Collectors; @@ -205,18 +206,14 @@ public class EntityCache, E extends IEntity> imple public void updateNow(E entity) { var cachePnode = fetchCachePnode(entity, true); OrmContext.getAccessor().update(cachePnode.getEntity()); - var now = TimeUtils.now(); - cachePnode.setWriteToDbTime(now); - cachePnode.setModifiedTime(now); + cachePnode.resetTime(TimeUtils.now()); } @Override public void updateUnsafeNow(E entity) { var cachePnode = fetchCachePnode(entity, false); OrmContext.getAccessor().update(cachePnode.getEntity()); - var now = TimeUtils.now(); - cachePnode.setWriteToDbTime(now); - cachePnode.setModifiedTime(now); + cachePnode.resetTime(TimeUtils.now()); } @Override @@ -238,9 +235,7 @@ public class EntityCache, E extends IEntity> imple if (pnode.getModifiedTime() == pnode.getWriteToDbTime()) { return; } - var currentTime = TimeUtils.currentTimeMillis(); - pnode.setWriteToDbTime(currentTime); - pnode.setModifiedTime(currentTime); + pnode.resetTime(TimeUtils.currentTimeMillis()); var updateList = new ArrayList(); updateList.add(pnode.getEntity()); doPersist(updateList, entityClass); @@ -252,21 +247,46 @@ public class EntityCache, E extends IEntity> imple @SuppressWarnings("unchecked") var entityClass = (Class) entityDef.getClazz(); - var updateList = new ArrayList(); var currentTime = TimeUtils.currentTimeMillis(); - cache.forEach(new BiConsumer>() { - @Override - public void accept(PK pk, PNode pnode) { - var entity = pnode.getEntity(); - if (pnode.getModifiedTime() != pnode.getWriteToDbTime()) { - pnode.setWriteToDbTime(currentTime); - pnode.setModifiedTime(currentTime); - updateList.add(entity); + + if (entityDef.hasUnsafeCollection()) { + // key为threadId + var updateMap = new HashMap>(); + cache.forEach(new BiConsumer>() { + @Override + public void accept(PK pk, PNode pnode) { + var entity = pnode.getEntity(); + if (pnode.getModifiedTime() != pnode.getWriteToDbTime()) { + pnode.resetTime(currentTime); + var updateList = updateMap.computeIfAbsent(pnode.getThreadId(), it -> new ArrayList<>()); + updateList.add(entity); + } + } + }); + for(var entry : updateMap.entrySet()) { + var threadId = entry.getKey(); + var updateList = entry.getValue(); + var executor = ThreadUtils.executorByThreadId(threadId); + if (executor == null) { + EventBus.asyncExecute(entityClass.hashCode(), () -> doPersist(updateList, entityClass)); + } else { + executor.execute(() -> doPersist(updateList, entityClass)); } } - }); - - doPersist(updateList, entityClass); + } else { + var updateList = new ArrayList(); + cache.forEach(new BiConsumer>() { + @Override + public void accept(PK pk, PNode pnode) { + var entity = pnode.getEntity(); + if (pnode.getModifiedTime() != pnode.getWriteToDbTime()) { + pnode.resetTime(currentTime); + updateList.add(entity); + } + } + }); + EventBus.asyncExecute(entityClass.hashCode(), () -> doPersist(updateList, entityClass)); + } } private void doPersist(List updateList, Class entityClass) { diff --git a/orm/src/main/java/com/zfoo/orm/cache/persister/CronOrmPersister.java b/orm/src/main/java/com/zfoo/orm/cache/persister/CronOrmPersister.java index 47938ee4..efa4dfb5 100644 --- a/orm/src/main/java/com/zfoo/orm/cache/persister/CronOrmPersister.java +++ b/orm/src/main/java/com/zfoo/orm/cache/persister/CronOrmPersister.java @@ -75,10 +75,8 @@ public class CronOrmPersister extends AbstractOrmPersister { if (!OrmContext.isStop()) { SchedulerBus.schedule(() -> { if (!OrmContext.isStop()) { - EventBus.asyncExecute(entityDef.getClazz().hashCode(), () -> { - entityCaches.persistAll(); - schedulePersist(); - }); + entityCaches.persistAll(); + schedulePersist(); } }, delay, TimeUnit.MILLISECONDS); } diff --git a/orm/src/main/java/com/zfoo/orm/cache/persister/PNode.java b/orm/src/main/java/com/zfoo/orm/cache/persister/PNode.java index 432e079a..fb65df46 100644 --- a/orm/src/main/java/com/zfoo/orm/cache/persister/PNode.java +++ b/orm/src/main/java/com/zfoo/orm/cache/persister/PNode.java @@ -37,10 +37,12 @@ public class PNode> { public PNode(E entity) { this.entity = entity; + resetTime(TimeUtils.now()); + } - var currentTime = TimeUtils.now(); - this.writeToDbTime = currentTime; - this.modifiedTime = currentTime; + public void resetTime(long timestamp) { + this.writeToDbTime = timestamp; + this.modifiedTime = timestamp; } public E getEntity() { diff --git a/orm/src/main/java/com/zfoo/orm/cache/persister/TimeOrmPersister.java b/orm/src/main/java/com/zfoo/orm/cache/persister/TimeOrmPersister.java index 4efe6cae..7cbc7dee 100644 --- a/orm/src/main/java/com/zfoo/orm/cache/persister/TimeOrmPersister.java +++ b/orm/src/main/java/com/zfoo/orm/cache/persister/TimeOrmPersister.java @@ -44,7 +44,7 @@ public class TimeOrmPersister extends AbstractOrmPersister { public void start() { SchedulerBus.scheduleAtFixedRate(() -> { if (!OrmContext.isStop()) { - EventBus.asyncExecute(entityDef.getClazz().hashCode(), () -> entityCaches.persistAll()); + entityCaches.persistAll(); } }, rate, TimeUnit.MILLISECONDS); } diff --git a/orm/src/main/java/com/zfoo/orm/manager/OrmManager.java b/orm/src/main/java/com/zfoo/orm/manager/OrmManager.java index 8f61abdf..13694117 100644 --- a/orm/src/main/java/com/zfoo/orm/manager/OrmManager.java +++ b/orm/src/main/java/com/zfoo/orm/manager/OrmManager.java @@ -328,9 +328,10 @@ public class OrmManager implements IOrmManager { public EntityDef parserEntityDef(Class> clazz) { - if (!GraalVmUtils.isGraalVM()) { - analyze(clazz); - } + checkEntity(clazz); + + // 校验entity格式 + var hasUnsafeCollection = hasUnsafeCollection(clazz); var cacheStrategies = ormConfig.getCaches(); var persisterStrategies = ormConfig.getPersisters(); @@ -378,19 +379,16 @@ public class OrmManager implements IOrmManager { indexTextDefMap.put(field.getName(), indexTextDef); } - return EntityDef.valueOf(idField, clazz, cacheSize, expireMillisecond, persisterStrategy, indexDefMap, indexTextDefMap); + return EntityDef.valueOf(idField, clazz, hasUnsafeCollection, cacheSize, expireMillisecond, persisterStrategy, indexDefMap, indexTextDefMap); } - private void analyze(Class clazz) { + private void checkEntity(Class clazz) { // 是否实现了IEntity接口 AssertionUtils.isTrue(IEntity.class.isAssignableFrom(clazz), "The entity:[{}] annotated by the [{}] annotation does not implement the interface [{}]" , com.zfoo.orm.anno.EntityCache.class.getName(), clazz.getCanonicalName(), IEntity.class.getCanonicalName()); // 实体类Entity必须被注解EntityCache标注 AssertionUtils.notNull(clazz.getAnnotation(com.zfoo.orm.anno.EntityCache.class), "The Entity[{}] must be annotated with the annotation [{}].", clazz.getCanonicalName(), com.zfoo.orm.anno.EntityCache.class.getName()); - // 校验entity格式 - checkEntity(clazz); - // 校验id字段和id()方法的格式,一个Entity类只能有一个@Id注解 var idFields = ReflectionUtils.getFieldsByAnnoInPOJOClass(clazz, Id.class); AssertionUtils.isTrue(ArrayUtils.isNotEmpty(idFields) && idFields.length == 1 @@ -465,9 +463,11 @@ public class OrmManager implements IOrmManager { AssertionUtils.isTrue(gvsReturnValue.equals(vsValue), "The gvs and svs methods of the Entity[{}] are not correctly", clazz.getSimpleName()); } - private static final Set> unsafeCollections = Set.of(List.class, ArrayList.class, LinkedList.class, Set.class, HashSet.class, TreeSet.class, Map.class, HashMap.class, TreeMap.class); + private static final Set> unsafeCollections = Set.of(List.class, ArrayList.class, LinkedList.class + , Set.class, HashSet.class, TreeSet.class, + Map.class, HashMap.class, LinkedHashMap.class, TreeMap.class); - private void checkEntity(Class clazz) { + private boolean hasUnsafeCollection(Class clazz) { // 是否为一个简单的javabean,为了防止不同层对象混用造成潜在的并发问题,特别是网络层和po层混用 ReflectionUtils.assertIsPojoClass(clazz); // 不能是泛型类 @@ -484,8 +484,9 @@ public class OrmManager implements IOrmManager { var filedList = ReflectionUtils.notStaticAndTransientFields(clazz); + var hasUnsafeCollection = false; + for (var field : filedList) { - var hasUnsafeCollection = false; // entity必须包含属性的get和set方法 FieldUtils.fieldToGetMethod(clazz, field); @@ -549,13 +550,11 @@ public class OrmManager implements IOrmManager { } else if (ObjectId.class.isAssignableFrom(fieldType)) { // do nothing } else { - checkEntity(fieldType); + hasUnsafeCollection |= hasUnsafeCollection(fieldType); } - if (hasUnsafeCollection) { - logger.warn("class[{}] field:[{}] is not concurrent collection, use concurrent collection instead or not use @EntityCache annotation", clazz.getSimpleName(), field.getName()); - } } + return hasUnsafeCollection; } @@ -591,8 +590,7 @@ public class OrmManager implements IOrmManager { // ORM不支持集合嵌套数组类型 throw new RunException("ORMs do not support the combination of arrays and collections with the [type:{}] type", type); } else { - checkEntity(clazz); - return true; + return hasUnsafeCollection(clazz); } } throw new RunException("[type:{}] is incorrect", type); diff --git a/orm/src/main/java/com/zfoo/orm/model/EntityDef.java b/orm/src/main/java/com/zfoo/orm/model/EntityDef.java index bdf97bb3..d986f524 100644 --- a/orm/src/main/java/com/zfoo/orm/model/EntityDef.java +++ b/orm/src/main/java/com/zfoo/orm/model/EntityDef.java @@ -28,6 +28,8 @@ public class EntityDef { private Class> clazz; + private boolean hasUnsafeCollection; + private int cacheSize; private long expireMillisecond; @@ -38,11 +40,13 @@ public class EntityDef { private Map indexTextDefMap; - public static EntityDef valueOf(Field idField, Class> clazz, int cacheSize, long expireMillisecond + + public static EntityDef valueOf(Field idField, Class> clazz, boolean hasUnsafeCollection, int cacheSize, long expireMillisecond , PersisterStrategy persisterStrategy, Map indexDefMap, Map indexTextDefMap) { var entityDef = new EntityDef(); entityDef.idField = idField; entityDef.clazz = clazz; + entityDef.hasUnsafeCollection = hasUnsafeCollection; entityDef.cacheSize = cacheSize; entityDef.expireMillisecond = expireMillisecond; entityDef.persisterStrategy = persisterStrategy; @@ -51,10 +55,6 @@ public class EntityDef { return entityDef; } - public Class> getClazz() { - return clazz; - } - public IEntity newEmptyEntity() { var entity = ReflectionUtils.newInstance(clazz); return entity; @@ -67,6 +67,13 @@ public class EntityDef { ReflectionUtils.setField(idFields[0], entity, id); return entity; } + public Class> getClazz() { + return clazz; + } + + public boolean hasUnsafeCollection() { + return hasUnsafeCollection; + } public int getCacheSize() { return cacheSize;