ref[EntityCache]: use LazyCache instead of Caffeine

This commit is contained in:
godotg
2024-03-26 10:45:03 +08:00
parent 6b62bda613
commit 18531c09ef
4 changed files with 85 additions and 110 deletions
+64 -96
View File
@@ -12,7 +12,6 @@
package com.zfoo.orm.cache;
import com.github.benmanes.caffeine.cache.*;
import com.mongodb.WriteConcern;
import com.mongodb.client.model.BulkWriteOptions;
import com.mongodb.client.model.Filters;
@@ -23,21 +22,18 @@ import com.zfoo.orm.cache.persister.PNode;
import com.zfoo.orm.model.EntityDef;
import com.zfoo.orm.model.IEntity;
import com.zfoo.orm.query.Page;
import com.zfoo.orm.util.LazyCache;
import com.zfoo.protocol.collection.CollectionUtils;
import com.zfoo.protocol.exception.RunException;
import com.zfoo.protocol.model.Pair;
import com.zfoo.protocol.util.AssertionUtils;
import com.zfoo.protocol.util.StringUtils;
import com.zfoo.protocol.util.ThreadUtils;
import com.zfoo.scheduler.util.TimeUtils;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
@@ -52,58 +48,43 @@ public class EntityCache<PK extends Comparable<PK>, E extends IEntity<PK>> imple
private final EntityDef entityDef;
private final LoadingCache<PK, PNode<E>> cache;
private final LazyCache<PK, PNode<E>> cache;
public EntityCache(EntityDef entityDef) {
var removeCallback = new BiConsumer<Pair<PK, PNode<E>>, LazyCache.RemovalCause>() {
@Override
public void accept(Pair<PK, PNode<E>> pair, LazyCache.RemovalCause removalCause) {
if (removalCause == LazyCache.RemovalCause.EXPLICIT) {
return;
}
var pk = pair.getKey();
var pnode = pair.getValue();
if (pnode.getWriteToDbTime() == pnode.getModifiedTime()) {
return;
}
// 缓存失效之前,将数据写入数据库
var entity = pnode.getEntity();
@SuppressWarnings("unchecked")
var entityClass = (Class<E>) entityDef.getClazz();
var collection = OrmContext.getOrmManager().getCollection(entityClass);
var version = entity.gvs();
entity.svs(version + 1);
var filter = entity.gvs() > 0
? Filters.and(Filters.eq("_id", entity.id()), Filters.eq("vs", version))
: Filters.eq("_id", entity.id());
var result = collection.replaceOne(filter, entity);
if (result.getModifiedCount() <= 0) {
// 移除缓存时,更新数据库中的实体文档异常
logger.error("onRemoval(): update entity to db failed when remove [{}] [pk:{}] by [removalCause:{}]", entityClass.getSimpleName(), entity.id(), removalCause);
}
}
};
this.entityDef = entityDef;
this.cache = Caffeine.newBuilder()
.expireAfterAccess(entityDef.getExpireMillisecond(), TimeUnit.MILLISECONDS)
.maximumSize(entityDef.getCacheSize())
.initialCapacity(CollectionUtils.comfortableCapacity(entityDef.getCacheSize()))
//.recordStats() // 开启统计信息开关,cache.stats()获取统计信息
.removalListener(new RemovalListener<PK, PNode<E>>() {
@Override
public void onRemoval(@Nullable PK pk, @Nullable PNode<E> pnode, @NonNull RemovalCause removalCause) {
if (pnode.getWriteToDbTime() == pnode.getModifiedTime()) {
return;
}
// 缓存失效之前,将数据写入数据库
var entity = pnode.getEntity();
@SuppressWarnings("unchecked")
var entityClass = (Class<E>) entityDef.getClazz();
var collection = OrmContext.getOrmManager().getCollection(entityClass);
var version = entity.gvs();
entity.svs(version + 1);
var filter = entity.gvs() > 0
? Filters.and(Filters.eq("_id", entity.id()), Filters.eq("vs", version))
: Filters.eq("_id", entity.id());
var result = collection.replaceOne(filter, entity);
if (result.getModifiedCount() <= 0) {
// 移除缓存时,更新数据库中的实体文档异常
logger.error("onRemoval(): update entity to db failed when remove [{}] [pk:{}] by [removalCause:{}]", entityClass.getSimpleName(), entity.id(), removalCause);
}
}
})
.build(new CacheLoader<PK, PNode<E>>() {
@Override
public @Nullable PNode<E> load(@NonNull PK pk) {
@SuppressWarnings("unchecked")
var entity = (E) OrmContext.getAccessor().load(pk, (Class<IEntity<?>>) entityDef.getClazz());
// 如果数据库中不存在则给一个默认值
if (entity == null) {
@SuppressWarnings("unchecked")
var newEntity = (E) entityDef.newEntity(pk);
return new PNode<E>(newEntity);
}
return new PNode<E>(entity);
}
});
this.cache = new LazyCache<>(entityDef.getCacheSize(), entityDef.getExpireMillisecond(), 1 * TimeUtils.MILLIS_PER_SECOND, removeCallback);
if (CollectionUtils.isNotEmpty(entityDef.getIndexDefMap())) {
// indexMap
@@ -122,19 +103,21 @@ public class EntityCache<PK extends Comparable<PK>, E extends IEntity<PK>> imple
@Override
public E load(PK pk) {
AssertionUtils.notNull(pk);
try {
return cache.get(pk).getEntity();
} catch (Exception e) {
logger.error("load(): [{}] load [pk:{}] exception", entityDef.getClazz().getSimpleName(), pk, e);
} catch (Throwable t) {
logger.error("load(): [{}] load [pk:{}] error", entityDef.getClazz().getSimpleName(), pk, t);
var pnode = cache.get(pk);
if (pnode != null) {
return pnode.getEntity();
}
// 数据库无法加载缓存,返回默认值
logger.warn("[{}] can not load [pk:{}] and use default entity to replace it", entityDef.getClazz().getSimpleName(), pk);
@SuppressWarnings("unchecked")
var entity = (E) entityDef.newEntity(pk);
var pnode = new PNode<E>(entity);
var entity = (E) OrmContext.getAccessor().load(pk, (Class<IEntity<?>>) entityDef.getClazz());
// 如果数据库中不存在则给一个默认值
if (entity == null) {
// 数据库无法加载缓存,返回默认值
logger.warn("[{}] can not load [pk:{}] and use default entity to replace it", entityDef.getClazz().getSimpleName(), pk);
entity = (E) entityDef.newEntity(pk);
}
pnode = new PNode<>(entity);
cache.put(pk, pnode);
return entity;
}
@@ -144,7 +127,7 @@ public class EntityCache<PK extends Comparable<PK>, E extends IEntity<PK>> imple
*/
private PNode<E> fetchCachePnode(E entity, boolean safe) {
var id = entity.id();
var cachePnode = cache.getIfPresent(id);
var cachePnode = cache.get(id);
if (cachePnode == null) {
cachePnode = new PNode<>(entity);
cache.put(entity.id(), cachePnode);
@@ -207,7 +190,7 @@ public class EntityCache<PK extends Comparable<PK>, E extends IEntity<PK>> imple
// 游戏业务中,操作最频繁的是update,不是insertdeletequery
// 所以这边并不考虑
AssertionUtils.notNull(pk);
cache.invalidate(pk);
cache.remove(pk);
}
// 游戏中80%都是执行更新的操作,这样做会极大的提高更新速度
@@ -218,14 +201,17 @@ public class EntityCache<PK extends Comparable<PK>, E extends IEntity<PK>> imple
var updateList = new ArrayList<E>();
var currentTime = TimeUtils.currentTimeMillis();
for (var pnode : cache.asMap().values()) {
var entity = pnode.getEntity();
if (pnode.getModifiedTime() != pnode.getWriteToDbTime()) {
pnode.setWriteToDbTime(currentTime);
pnode.setModifiedTime(currentTime);
updateList.add(entity);
cache.forEach(new BiConsumer<PK, PNode<E>>() {
@Override
public void accept(PK pk, PNode<E> pnode) {
var entity = pnode.getEntity();
if (pnode.getModifiedTime() != pnode.getWriteToDbTime()) {
pnode.setWriteToDbTime(currentTime);
pnode.setModifiedTime(currentTime);
updateList.add(entity);
}
}
}
});
// 执行更新
if (updateList.isEmpty()) {
@@ -289,7 +275,7 @@ public class EntityCache<PK extends Comparable<PK>, E extends IEntity<PK>> imple
var dbEntity = dbMap.get(id);
if (dbEntity == null) {
cache.invalidate(entity.id());
cache.remove(entity.id());
logger.warn("[database:{}] not found entity [id:{}]", entityClass.getSimpleName(), id);
continue;
}
@@ -313,7 +299,7 @@ public class EntityCache<PK extends Comparable<PK>, E extends IEntity<PK>> imple
// 如果数据库版本号较大,说明缓存的数据不是最新的,直接清除缓存,下次重新加载
if (dbEntity.gvs() > entity.gvs()) {
cache.invalidate(id);
cache.remove(id);
load(id);
logger.warn("[database:{}] document of entity [id:{}] version [{}] is greater than cache [vs:{}]", entityClass.getSimpleName(), id, dbEntity.gvs(), entity.gvs());
continue;
@@ -324,32 +310,14 @@ public class EntityCache<PK extends Comparable<PK>, E extends IEntity<PK>> imple
}
}
@Override
public List<E> allPresentCaches() {
var allPnodes = cache.asMap().values();
if (allPnodes.isEmpty()) {
return Collections.emptyList();
}
return allPnodes.stream().map(it -> it.getEntity()).toList();
}
@Override
public void forEach(BiConsumer<PK, E> biConsumer) {
cache.asMap().forEach((pk, pNode) -> biConsumer.accept(pk, pNode.getEntity()));
cache.forEach((pk, pnode) -> biConsumer.accept(pk, pnode.getEntity()));
}
@Override
public long size() {
return cache.estimatedSize();
}
@Override
public String recordStatus() {
var stats = cache.stats();
// 缓存命中率/命中次数/加载次数/加载新值的平均时间秒/缓存项被回收的总数
return StringUtils.format("database [{}] [hitRate:{}] [hitCount:{}] [loadCount:{}] [averageLoadPenalty:{}] [evictionCount:{}]"
, entityDef.getClazz().getSimpleName(), stats.hitRate(), stats.hitCount(), stats.loadCount(), stats.averageLoadPenalty() / TimeUtils.NANO_PER_SECOND, stats.evictionCount());
return cache.size();
}
}
-11
View File
@@ -15,7 +15,6 @@ package com.zfoo.orm.cache;
import com.zfoo.orm.model.IEntity;
import org.springframework.lang.NonNull;
import java.util.List;
import java.util.function.BiConsumer;
/**
@@ -64,18 +63,8 @@ public interface IEntityCache<PK extends Comparable<PK>, E extends IEntity<PK>>
*/
void persistAll();
/**
* 获取所有存在的缓存对象
*/
List<E> allPresentCaches();
void forEach(BiConsumer<PK, E> biConsumer);
long size();
/**
* 统计缓存命中率
*/
String recordStatus();
}
+21
View File
@@ -54,6 +54,27 @@ public class LazyCacheTest {
System.out.println(lazyCache.get(5));
}
@Test
public void expire1Test() {
var lazyCache = new LazyCache<Integer, String>(10, 10 * TimeUtils.MILLIS_PER_SECOND, 5 * TimeUtils.MILLIS_PER_SECOND, myRemoveCallback);
lazyCache.put(1, "a");
lazyCache.put(2, "b");
lazyCache.put(3, "c");
lazyCache.put(4, "d");
lazyCache.put(5, "e");
for (int i = 0; i < 11; i++) {
lazyCache.get(1);
lazyCache.get(2);
ThreadUtils.sleep(1 * TimeUtils.MILLIS_PER_SECOND);
}
System.out.println(lazyCache.get(1));
System.out.println(lazyCache.get(2));
System.out.println(lazyCache.get(3));
System.out.println(lazyCache.get(4));
System.out.println(lazyCache.get(5));
}
@Test
public void batchTest() {
var lazyCache = new LazyCache<Integer, String>(1_0000, 10 * TimeUtils.MILLIS_PER_SECOND, 5 * TimeUtils.MILLIS_PER_SECOND, myRemoveCallback);
-3
View File
@@ -13,7 +13,6 @@
package com.zfoo.orm.cache;
import com.zfoo.orm.OrmContext;
import com.zfoo.protocol.util.ThreadUtils;
import org.junit.Ignore;
import org.junit.Test;
@@ -39,8 +38,6 @@ public class OrmTest {
userEntityCaches.update(entity);
}
OrmContext.getOrmManager().getAllEntityCaches().forEach(it -> System.out.println(it.recordStatus()));
ThreadUtils.sleep(Long.MAX_VALUE);
}