perf[orm]: persist finish, reset modifiedTime & writeToDbTime

This commit is contained in:
awake
2024-10-21 14:08:42 +08:00
parent dc1a196714
commit 6e76170f95
3 changed files with 51 additions and 14 deletions
@@ -13,6 +13,7 @@
package com.zfoo.orm.accessor;
import com.zfoo.orm.cache.persister.PNode;
import com.zfoo.orm.model.IEntity;
import org.springframework.lang.Nullable;
@@ -33,6 +34,8 @@ public interface IAccessor {
<PK extends Comparable<PK>, E extends IEntity<PK>> void batchUpdate(List<E> entities);
<PK extends Comparable<PK>, E extends IEntity<PK>> void batchUpdateNode(List<PNode<PK,E>> entities);
<PK extends Comparable<PK>, E extends IEntity<PK>> boolean delete(E entity);
<PK extends Comparable<PK>, E extends IEntity<PK>> boolean delete(PK pk, Class<E> entityClazz);
@@ -17,8 +17,10 @@ import com.mongodb.client.model.BulkWriteOptions;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.ReplaceOneModel;
import com.zfoo.orm.OrmContext;
import com.zfoo.orm.cache.persister.PNode;
import com.zfoo.orm.model.IEntity;
import com.zfoo.protocol.collection.CollectionUtils;
import com.zfoo.scheduler.util.TimeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -102,6 +104,37 @@ public class MongodbAccessor implements IAccessor {
}
}
@Override
public <PK extends Comparable<PK>, E extends IEntity<PK>> void batchUpdateNode(List<PNode<PK,E>> nodes) {
if (CollectionUtils.isEmpty(nodes)) {
return;
}
try {
@SuppressWarnings("unchecked")
var entityClazz = (Class<E>) nodes.get(0).getClass();
var collection = OrmContext.getOrmManager().getCollection(entityClazz);
List<E> entities = nodes.stream().map(PNode::getEntity).toList();
var batchList = entities.stream()
.map(it -> new ReplaceOneModel<E>(Filters.eq("_id", it.id()), it))
.toList();
var result = collection.bulkWrite(batchList, new BulkWriteOptions().ordered(false));
//设置修改时间
long currentTime = TimeUtils.currentTimeMillis();
nodes.forEach(k->k.resetTime(currentTime));
if (result.getMatchedCount() != entities.size()) {
// 在数据库的批量更新操作中需要更新的数量和最终更新的数量不相同
logger.warn("database:[{}] update size:[{}] not equal with matched size:[{}](some entity of id not exist in database)"
, entityClazz.getSimpleName(), entities.size(), result.getMatchedCount());
}
} catch (Throwable t) {
logger.error("batchUpdate unknown exception", t);
}
}
@Override
public <PK extends Comparable<PK>, E extends IEntity<PK>> boolean delete(E entity) {
@SuppressWarnings("unchecked")
+15 -14
View File
@@ -77,7 +77,6 @@ public class EntityCache<PK extends Comparable<PK>, E extends IEntity<PK>> imple
var updateList = removes.stream()
.map(it -> it.v)
.filter(it -> !noNeedUpdate(it))
.map(it -> it.getEntity())
.toList();
EventBus.asyncExecute(clazz.hashCode(), () -> doPersist(updateList));
}
@@ -228,8 +227,7 @@ public class EntityCache<PK extends Comparable<PK>, E extends IEntity<PK>> imple
if (noNeedUpdate(pnode)) {
return;
}
pnode.resetTime(TimeUtils.currentTimeMillis());
doPersist(List.of(pnode.getEntity()));
doPersist(List.of(pnode));
}
private boolean noNeedUpdate(PNode<PK, E> pnode) {
@@ -247,7 +245,7 @@ public class EntityCache<PK extends Comparable<PK>, E extends IEntity<PK>> imple
} else {
var currentTime = TimeUtils.currentTimeMillis();
// key为threadId
var updateMap = new HashMap<Long, List<E>>();
var updateMap = new HashMap<Long, List<PNode<PK, E>>>();
var initSize = caches.size() >> 2;
caches.forEach(new BiConsumer<PK, PNode<PK, E>>() {
@Override
@@ -257,7 +255,7 @@ public class EntityCache<PK extends Comparable<PK>, E extends IEntity<PK>> imple
}
pnode.resetTime(currentTime);
var updateList = updateMap.computeIfAbsent(pnode.getThreadId(), it -> new ArrayList<>(initSize));
updateList.add(pnode.getEntity());
updateList.add(pnode);
}
});
var count = 0;
@@ -278,7 +276,7 @@ public class EntityCache<PK extends Comparable<PK>, E extends IEntity<PK>> imple
@Override
public void persistAllBlock() {
var currentTime = TimeUtils.currentTimeMillis();
var updateList = new ArrayList<E>(caches.size());
var updateList = new ArrayList<PNode<PK, E>>(caches.size());
caches.forEach(new BiConsumer<PK, PNode<PK, E>>() {
@Override
public void accept(PK pk, PNode<PK, E> pnode) {
@@ -286,13 +284,13 @@ public class EntityCache<PK extends Comparable<PK>, E extends IEntity<PK>> imple
return;
}
pnode.resetTime(currentTime);
updateList.add(pnode.getEntity());
updateList.add(pnode);
}
});
doPersist(updateList);
}
private void doPersist(List<E> updateList) {
private void doPersist(List<PNode<PK,E>> updateList) {
// 执行更新
if (updateList.isEmpty()) {
return;
@@ -305,17 +303,17 @@ public class EntityCache<PK extends Comparable<PK>, E extends IEntity<PK>> imple
}
}
private void doPersistNoVersion(List<E> updateList) {
private void doPersistNoVersion(List<PNode<PK,E>> updateList) {
var page = Page.valueOf(1, DEFAULT_BATCH_SIZE, updateList.size());
var maxPageSize = page.totalPage();
for (var currentPage = 1; currentPage <= maxPageSize; currentPage++) {
page.setPage(currentPage);
var currentUpdateList = page.currentPageList(updateList);
OrmContext.getAccessor().batchUpdate(currentUpdateList);
OrmContext.getAccessor().batchUpdateNode(currentUpdateList);
}
}
private void doPersistWithVersion(List<E> updateList) {
private void doPersistWithVersion(List<PNode<PK,E>> updateList) {
var page = Page.valueOf(1, DEFAULT_BATCH_SIZE, updateList.size());
var maxPageSize = page.totalPage();
var versionFiledName = wrapper.versionFieldName();
@@ -323,10 +321,10 @@ public class EntityCache<PK extends Comparable<PK>, E extends IEntity<PK>> imple
for (var currentPage = 1; currentPage <= maxPageSize; currentPage++) {
page.setPage(currentPage);
var currentUpdateList = page.currentPageList(updateList);
List<E> entities = currentUpdateList.stream().map(PNode::getEntity).toList();
try {
var collection = OrmContext.getOrmManager().getCollection(clazz).withWriteConcern(WriteConcern.ACKNOWLEDGED);
var batchList = currentUpdateList.stream()
var batchList = entities.stream()
.map(it -> {
var version = wrapper.gvs(it);
wrapper.svs(it, version + 1);
@@ -336,6 +334,9 @@ public class EntityCache<PK extends Comparable<PK>, E extends IEntity<PK>> imple
.toList();
var result = collection.bulkWrite(batchList, new BulkWriteOptions().ordered(false));
long currentTime = TimeUtils.currentTimeMillis();
currentUpdateList.forEach(node->node.resetTime(currentTime));
if (result.getMatchedCount() == batchList.size()) {
continue;
}
@@ -347,7 +348,7 @@ public class EntityCache<PK extends Comparable<PK>, E extends IEntity<PK>> imple
} catch (Throwable t) {
logger.error("doPersistWithVersion(): [{}] batch update unknown error and try ", clazz.getSimpleName(), t);
}
persistAndCompareVersion(currentUpdateList);
persistAndCompareVersion(entities);
}
}