mirror of
https://github.com/tiennm99/zfoo.git
synced 2026-05-28 16:22:28 +00:00
chore[orm]: warning log
This commit is contained in:
+66
-78
@@ -83,8 +83,7 @@ public class EntityCache<PK extends Comparable<PK>, E extends IEntity<PK>> imple
|
||||
: Filters.eq("_id", entity.id());
|
||||
var result = collection.replaceOne(filter, entity);
|
||||
if (result.getModifiedCount() <= 0) {
|
||||
logger.warn("移除[removalCause:{}]缓存时,更新数据库[{}]中的实体主键[pk:{}]的文档异常"
|
||||
, removalCause, entityDef.getClazz().getSimpleName(), entity.id());
|
||||
logger.error("onRemoval(): update entity to db failed when remove [{}] [pk:{}] by [removalCause:{}]", entityClass.getSimpleName(), entity.id(), removalCause);
|
||||
}
|
||||
}
|
||||
})
|
||||
@@ -125,12 +124,12 @@ public class EntityCache<PK extends Comparable<PK>, E extends IEntity<PK>> imple
|
||||
try {
|
||||
return cache.get(pk).getEntity();
|
||||
} catch (Exception e) {
|
||||
logger.error("数据库[{}]缓存[pk:{}]加载发生exception异常", entityDef.getClazz().getSimpleName(), pk, e);
|
||||
logger.error("load(): [{}] load [pk:{}] exception", entityDef.getClazz().getSimpleName(), pk, e);
|
||||
} catch (Throwable t) {
|
||||
logger.error("数据库[{}]缓存[pk:{}]加载发生error异常", entityDef.getClazz().getSimpleName(), pk, t);
|
||||
logger.error("load(): [{}] load [pk:{}] error", entityDef.getClazz().getSimpleName(), pk, t);
|
||||
}
|
||||
|
||||
logger.warn("数据库[{}]无法加载缓存[pk:{}],返回默认值", entityDef.getClazz().getSimpleName(), pk);
|
||||
logger.warn("[{}] can not load [pk:{}] and use default entity to replace it", entityDef.getClazz().getSimpleName(), pk);
|
||||
@SuppressWarnings("unchecked")
|
||||
var entity = (E) entityDef.newEntity(pk);
|
||||
var pnode = new PNode<E>(entity);
|
||||
@@ -151,7 +150,7 @@ public class EntityCache<PK extends Comparable<PK>, E extends IEntity<PK>> imple
|
||||
|
||||
// 比较地址是否相等
|
||||
if (entity != cachePnode.getEntity()) {
|
||||
throw new RunException("cache entity [id:{}] not equal with update entity [id:{}]", cachePnode.getEntity().id(), id);
|
||||
throw new RunException("fetchCachePnode(): cache entity [id:{}] not equal with update entity [id:{}]", cachePnode.getEntity().id(), id);
|
||||
}
|
||||
|
||||
if (safe) {
|
||||
@@ -163,9 +162,9 @@ public class EntityCache<PK extends Comparable<PK>, E extends IEntity<PK>> imple
|
||||
} else {
|
||||
var pnodeThread = ThreadUtils.findThread(pnodeThreadId);
|
||||
if (pnodeThread == null) {
|
||||
logger.warn("[{}][id:{}]有并发写风险,第一次更新的线程[threadId:{}],第2次更新的线程[threadId:{}]", entity.getClass().getSimpleName(), entity.id(), pnodeThreadId, currentThreadId);
|
||||
logger.warn("[{}][id:{}] concurrent write warning, first update [threadId:{}], second update [threadId:{}]", entity.getClass().getSimpleName(), entity.id(), pnodeThreadId, currentThreadId);
|
||||
} else {
|
||||
logger.warn("[{}][id:{}]有并发写风险,第一次更新的线程[threadId:{}][threadName:{}],第2次更新的线程[threadId:{}][threadName:{}]"
|
||||
logger.warn("[{}][id:{}] concurrent write warning, first update [threadId:{}][threadName:{}], second update [threadId:{}][threadName:{}]"
|
||||
, entity.getClass().getSimpleName(), entity.id(), pnodeThreadId, pnodeThread.getName(), currentThreadId, Thread.currentThread().getName());
|
||||
}
|
||||
}
|
||||
@@ -211,80 +210,68 @@ public class EntityCache<PK extends Comparable<PK>, E extends IEntity<PK>> imple
|
||||
// 游戏中80%都是执行更新的操作,这样做会极大的提高更新速度
|
||||
@Override
|
||||
public void persistAll() {
|
||||
try {
|
||||
var allPnodes = cache.asMap().values();
|
||||
@SuppressWarnings("unchecked")
|
||||
var entityClass = (Class<E>) entityDef.getClazz();
|
||||
|
||||
if (allPnodes.isEmpty()) {
|
||||
return;
|
||||
var updateList = new ArrayList<E>();
|
||||
var currentTime = TimeUtils.currentTimeMillis();
|
||||
for (var pnode : cache.asMap().values()) {
|
||||
var entity = pnode.getEntity();
|
||||
if (pnode.getModifiedTime() != pnode.getWriteToDbTime()) {
|
||||
pnode.setWriteToDbTime(currentTime);
|
||||
pnode.setModifiedTime(currentTime);
|
||||
updateList.add(entity);
|
||||
continue;
|
||||
}
|
||||
|
||||
var updateList = new ArrayList<E>();
|
||||
var currentTime = TimeUtils.currentTimeMillis();
|
||||
for (var pnode : allPnodes) {
|
||||
var entity = pnode.getEntity();
|
||||
if (pnode.getModifiedTime() != pnode.getWriteToDbTime()) {
|
||||
pnode.setWriteToDbTime(currentTime);
|
||||
pnode.setModifiedTime(currentTime);
|
||||
updateList.add(entity);
|
||||
if (currentTime - pnode.getModifiedTime() >= entityDef.getExpireMillisecond()) {
|
||||
invalidate(pnode.getEntity().id());
|
||||
}
|
||||
}
|
||||
|
||||
// 执行更新
|
||||
if (updateList.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
var page = Page.valueOf(1, BATCH_SIZE, updateList.size());
|
||||
var maxPageSize = page.totalPage();
|
||||
|
||||
for (var currentPage = 1; currentPage <= maxPageSize; currentPage++) {
|
||||
page.setPage(currentPage);
|
||||
var currentUpdateList = page.currentPageList(updateList);
|
||||
try {
|
||||
var collection = OrmContext.getOrmManager().getCollection(entityClass).withWriteConcern(WriteConcern.ACKNOWLEDGED);
|
||||
|
||||
var batchList = currentUpdateList.stream()
|
||||
.map(it -> {
|
||||
var version = it.gvs();
|
||||
it.svs(version + 1);
|
||||
|
||||
var filter = it.gvs() > 0
|
||||
? Filters.and(Filters.eq("_id", it.id()), Filters.eq("vs", version))
|
||||
: Filters.eq("_id", it.id());
|
||||
|
||||
return new ReplaceOneModel<>(filter, it);
|
||||
})
|
||||
.toList();
|
||||
|
||||
var result = collection.bulkWrite(batchList, new BulkWriteOptions().ordered(false));
|
||||
if (result.getModifiedCount() == batchList.size()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (currentTime - pnode.getModifiedTime() >= entityDef.getExpireMillisecond()) {
|
||||
invalidate(pnode.getEntity().id());
|
||||
}
|
||||
// mostly because the document that needs to be updated is the same as the document in the database
|
||||
// 开始执行容错操作(大部分原因都是因为需要更新的文档和数据库的文档相同)
|
||||
logger.warn("persistAll(): [{}] batch update [{}] not equal to final update [{}], and try to use persistAllAndCompare() to update every single entity."
|
||||
, entityClass.getSimpleName(), currentUpdateList.size(), result.getModifiedCount());
|
||||
} catch (Throwable t) {
|
||||
logger.error("persistAll(): [{}] batch update unknown error and try ", entityClass.getSimpleName(), t);
|
||||
}
|
||||
|
||||
// 执行更新
|
||||
if (updateList.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
var page = Page.valueOf(1, BATCH_SIZE, updateList.size());
|
||||
var maxPageSize = page.totalPage();
|
||||
|
||||
for (var currentPage = 1; currentPage <= maxPageSize; currentPage++) {
|
||||
page.setPage(currentPage);
|
||||
var currentUpdateList = page.currentPageList(updateList);
|
||||
try {
|
||||
@SuppressWarnings("unchecked")
|
||||
var entityClass = (Class<E>) entityDef.getClazz();
|
||||
var collection = OrmContext.getOrmManager().getCollection(entityClass).withWriteConcern(WriteConcern.ACKNOWLEDGED);
|
||||
|
||||
var batchList = currentUpdateList.stream()
|
||||
.map(it -> {
|
||||
var version = it.gvs();
|
||||
it.svs(version + 1);
|
||||
|
||||
var filter = it.gvs() > 0
|
||||
? Filters.and(Filters.eq("_id", it.id()), Filters.eq("vs", version))
|
||||
: Filters.eq("_id", it.id());
|
||||
|
||||
return new ReplaceOneModel<>(filter, it);
|
||||
})
|
||||
.toList();
|
||||
|
||||
var result = collection.bulkWrite(batchList, new BulkWriteOptions().ordered(false));
|
||||
if (result.getModifiedCount() == batchList.size()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
logger.warn("在数据库[{}]的批量更新操作中需要更新的数量[{}]和最终更新的数量[{}]不相同,开始执行容错操作(大部分原因都是因为需要更新的文档和数据库的文档相同)"
|
||||
, entityDef.getClazz().getSimpleName(), currentUpdateList.size(), result.getModifiedCount());
|
||||
persistAllAndCompare(currentUpdateList);
|
||||
} catch (Throwable t) {
|
||||
logger.error("数据库[{}]批量更新操作未知异常,开始执行容错操作", entityDef.getClazz().getSimpleName(), t);
|
||||
persistAllAndCompare(currentUpdateList);
|
||||
}
|
||||
}
|
||||
|
||||
updateList.clear();
|
||||
|
||||
} catch (Exception e) {
|
||||
logger.error("数据库持久化器[{}]的持久化过程中exception异常退出", entityDef.getClazz().getSimpleName(), e);
|
||||
} catch (Throwable t) {
|
||||
logger.error("数据库持久化器[{}]的持久化过程中throwable异常退出", entityDef.getClazz().getSimpleName(), t);
|
||||
} finally {
|
||||
persistAllAndCompare(currentUpdateList);
|
||||
}
|
||||
|
||||
updateList.clear();
|
||||
}
|
||||
|
||||
private void persistAllAndCompare(List<E> updateList) {
|
||||
@@ -292,11 +279,11 @@ public class EntityCache<PK extends Comparable<PK>, E extends IEntity<PK>> imple
|
||||
return;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
var entityClass = (Class<E>) entityDef.getClazz();
|
||||
var ids = updateList.stream().map(it -> it.id()).toList();
|
||||
|
||||
try {
|
||||
@SuppressWarnings("unchecked")
|
||||
var entityClass = (Class<E>) entityDef.getClazz();
|
||||
var dbList = OrmContext.getQuery(entityClass).in("_id", ids).queryAll();
|
||||
var dbMap = dbList.stream().collect(Collectors.toMap(key -> key.id(), value -> value));
|
||||
for (var entity : updateList) {
|
||||
@@ -334,7 +321,7 @@ public class EntityCache<PK extends Comparable<PK>, E extends IEntity<PK>> imple
|
||||
}
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
logger.error("数据库[{}]容错操作异常,", entityDef.getClazz().getSimpleName(), t);
|
||||
logger.error("persistAllAndCompare(): [{}] unknown error", entityClass.getSimpleName(), t);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -361,7 +348,8 @@ public class EntityCache<PK extends Comparable<PK>, E extends IEntity<PK>> imple
|
||||
@Override
|
||||
public String recordStatus() {
|
||||
var stats = cache.stats();
|
||||
return StringUtils.format("数据库[{}]缓存命中率[hitRate:{}],命中次数[hitCount:{}],加载次数[loadCount:{}],加载新值的平均时间秒[averageLoadPenalty:{}],缓存项被回收的总数[evictionCount:{}]"
|
||||
// 缓存命中率/命中次数/加载次数/加载新值的平均时间秒/缓存项被回收的总数
|
||||
return StringUtils.format("database [{}] [hitRate:{}] [hitCount:{}] [loadCount:{}] [averageLoadPenalty:{}] [evictionCount:{}]"
|
||||
, entityDef.getClazz().getSimpleName(), stats.hitRate(), stats.hitCount(), stats.loadCount(), stats.averageLoadPenalty() / TimeUtils.NANO_PER_SECOND, stats.evictionCount());
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user