diff --git a/orm/src/main/java/com/zfoo/orm/accessor/IAccessor.java b/orm/src/main/java/com/zfoo/orm/accessor/IAccessor.java index d41af234..4f015bb4 100644 --- a/orm/src/main/java/com/zfoo/orm/accessor/IAccessor.java +++ b/orm/src/main/java/com/zfoo/orm/accessor/IAccessor.java @@ -13,6 +13,7 @@ package com.zfoo.orm.accessor; +import com.zfoo.orm.cache.persister.PNode; import com.zfoo.orm.model.IEntity; import org.springframework.lang.Nullable; @@ -33,6 +34,8 @@ public interface IAccessor { , E extends IEntity> void batchUpdate(List entities); + , E extends IEntity> void batchUpdateNode(List> entities); + , E extends IEntity> boolean delete(E entity); , E extends IEntity> boolean delete(PK pk, Class entityClazz); diff --git a/orm/src/main/java/com/zfoo/orm/accessor/MongodbAccessor.java b/orm/src/main/java/com/zfoo/orm/accessor/MongodbAccessor.java index 6a364473..f580086c 100644 --- a/orm/src/main/java/com/zfoo/orm/accessor/MongodbAccessor.java +++ b/orm/src/main/java/com/zfoo/orm/accessor/MongodbAccessor.java @@ -17,8 +17,10 @@ import com.mongodb.client.model.BulkWriteOptions; import com.mongodb.client.model.Filters; import com.mongodb.client.model.ReplaceOneModel; import com.zfoo.orm.OrmContext; +import com.zfoo.orm.cache.persister.PNode; import com.zfoo.orm.model.IEntity; import com.zfoo.protocol.collection.CollectionUtils; +import com.zfoo.scheduler.util.TimeUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -102,6 +104,37 @@ public class MongodbAccessor implements IAccessor { } } + @Override + public , E extends IEntity> void batchUpdateNode(List> nodes) { + if (CollectionUtils.isEmpty(nodes)) { + return; + } + + try { + @SuppressWarnings("unchecked") + var entityClazz = (Class) nodes.get(0).getClass(); + var collection = OrmContext.getOrmManager().getCollection(entityClazz); + List entities = nodes.stream().map(PNode::getEntity).toList(); + var batchList = entities.stream() + .map(it -> new ReplaceOneModel(Filters.eq("_id", it.id()), it)) + .toList(); + + var result = collection.bulkWrite(batchList, new BulkWriteOptions().ordered(false)); + + //设置修改时间 + long currentTime = TimeUtils.currentTimeMillis(); + nodes.forEach(k->k.resetTime(currentTime)); + + if (result.getMatchedCount() != entities.size()) { + // 在数据库的批量更新操作中需要更新的数量和最终更新的数量不相同 + logger.warn("database:[{}] update size:[{}] not equal with matched size:[{}](some entity of id not exist in database)" + , entityClazz.getSimpleName(), entities.size(), result.getMatchedCount()); + } + } catch (Throwable t) { + logger.error("batchUpdate unknown exception", t); + } + } + @Override public , E extends IEntity> boolean delete(E entity) { @SuppressWarnings("unchecked") diff --git a/orm/src/main/java/com/zfoo/orm/cache/EntityCache.java b/orm/src/main/java/com/zfoo/orm/cache/EntityCache.java index a6177a2f..1bcc85ff 100644 --- a/orm/src/main/java/com/zfoo/orm/cache/EntityCache.java +++ b/orm/src/main/java/com/zfoo/orm/cache/EntityCache.java @@ -77,7 +77,6 @@ public class EntityCache, E extends IEntity> imple var updateList = removes.stream() .map(it -> it.v) .filter(it -> !noNeedUpdate(it)) - .map(it -> it.getEntity()) .toList(); EventBus.asyncExecute(clazz.hashCode(), () -> doPersist(updateList)); } @@ -228,8 +227,7 @@ public class EntityCache, E extends IEntity> imple if (noNeedUpdate(pnode)) { return; } - pnode.resetTime(TimeUtils.currentTimeMillis()); - doPersist(List.of(pnode.getEntity())); + doPersist(List.of(pnode)); } private boolean noNeedUpdate(PNode pnode) { @@ -247,7 +245,7 @@ public class EntityCache, E extends IEntity> imple } else { var currentTime = TimeUtils.currentTimeMillis(); // key为threadId - var updateMap = new HashMap>(); + var updateMap = new HashMap>>(); var initSize = caches.size() >> 2; caches.forEach(new BiConsumer>() { @Override @@ -257,7 +255,7 @@ public class EntityCache, E extends IEntity> imple } pnode.resetTime(currentTime); var updateList = updateMap.computeIfAbsent(pnode.getThreadId(), it -> new ArrayList<>(initSize)); - updateList.add(pnode.getEntity()); + updateList.add(pnode); } }); var count = 0; @@ -278,7 +276,7 @@ public class EntityCache, E extends IEntity> imple @Override public void persistAllBlock() { var currentTime = TimeUtils.currentTimeMillis(); - var updateList = new ArrayList(caches.size()); + var updateList = new ArrayList>(caches.size()); caches.forEach(new BiConsumer>() { @Override public void accept(PK pk, PNode pnode) { @@ -286,13 +284,13 @@ public class EntityCache, E extends IEntity> imple return; } pnode.resetTime(currentTime); - updateList.add(pnode.getEntity()); + updateList.add(pnode); } }); doPersist(updateList); } - private void doPersist(List updateList) { + private void doPersist(List> updateList) { // 执行更新 if (updateList.isEmpty()) { return; @@ -305,17 +303,17 @@ public class EntityCache, E extends IEntity> imple } } - private void doPersistNoVersion(List updateList) { + private void doPersistNoVersion(List> updateList) { var page = Page.valueOf(1, DEFAULT_BATCH_SIZE, updateList.size()); var maxPageSize = page.totalPage(); for (var currentPage = 1; currentPage <= maxPageSize; currentPage++) { page.setPage(currentPage); var currentUpdateList = page.currentPageList(updateList); - OrmContext.getAccessor().batchUpdate(currentUpdateList); + OrmContext.getAccessor().batchUpdateNode(currentUpdateList); } } - private void doPersistWithVersion(List updateList) { + private void doPersistWithVersion(List> updateList) { var page = Page.valueOf(1, DEFAULT_BATCH_SIZE, updateList.size()); var maxPageSize = page.totalPage(); var versionFiledName = wrapper.versionFieldName(); @@ -323,10 +321,10 @@ public class EntityCache, E extends IEntity> imple for (var currentPage = 1; currentPage <= maxPageSize; currentPage++) { page.setPage(currentPage); var currentUpdateList = page.currentPageList(updateList); + List entities = currentUpdateList.stream().map(PNode::getEntity).toList(); try { var collection = OrmContext.getOrmManager().getCollection(clazz).withWriteConcern(WriteConcern.ACKNOWLEDGED); - - var batchList = currentUpdateList.stream() + var batchList = entities.stream() .map(it -> { var version = wrapper.gvs(it); wrapper.svs(it, version + 1); @@ -336,6 +334,9 @@ public class EntityCache, E extends IEntity> imple .toList(); var result = collection.bulkWrite(batchList, new BulkWriteOptions().ordered(false)); + + long currentTime = TimeUtils.currentTimeMillis(); + currentUpdateList.forEach(node->node.resetTime(currentTime)); if (result.getMatchedCount() == batchList.size()) { continue; } @@ -347,7 +348,7 @@ public class EntityCache, E extends IEntity> imple } catch (Throwable t) { logger.error("doPersistWithVersion(): [{}] batch update unknown error and try ", clazz.getSimpleName(), t); } - persistAndCompareVersion(currentUpdateList); + persistAndCompareVersion(entities); } }