diff --git a/orm/src/main/java/com/zfoo/orm/cache/EntityCache.java b/orm/src/main/java/com/zfoo/orm/cache/EntityCache.java index 0d900a6b..8c3b6a94 100644 --- a/orm/src/main/java/com/zfoo/orm/cache/EntityCache.java +++ b/orm/src/main/java/com/zfoo/orm/cache/EntityCache.java @@ -12,7 +12,6 @@ package com.zfoo.orm.cache; -import com.github.benmanes.caffeine.cache.*; import com.mongodb.WriteConcern; import com.mongodb.client.model.BulkWriteOptions; import com.mongodb.client.model.Filters; @@ -23,21 +22,18 @@ import com.zfoo.orm.cache.persister.PNode; import com.zfoo.orm.model.EntityDef; import com.zfoo.orm.model.IEntity; import com.zfoo.orm.query.Page; +import com.zfoo.orm.util.LazyCache; import com.zfoo.protocol.collection.CollectionUtils; import com.zfoo.protocol.exception.RunException; +import com.zfoo.protocol.model.Pair; import com.zfoo.protocol.util.AssertionUtils; -import com.zfoo.protocol.util.StringUtils; import com.zfoo.protocol.util.ThreadUtils; import com.zfoo.scheduler.util.TimeUtils; -import org.checkerframework.checker.nullness.qual.NonNull; -import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.Collections; import java.util.List; -import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.stream.Collectors; @@ -52,58 +48,43 @@ public class EntityCache, E extends IEntity> imple private final EntityDef entityDef; - private final LoadingCache> cache; + private final LazyCache> cache; public EntityCache(EntityDef entityDef) { + var removeCallback = new BiConsumer>, LazyCache.RemovalCause>() { + @Override + public void accept(Pair> pair, LazyCache.RemovalCause removalCause) { + if (removalCause == LazyCache.RemovalCause.EXPLICIT) { + return; + } + + var pk = pair.getKey(); + var pnode = pair.getValue(); + if (pnode.getWriteToDbTime() == pnode.getModifiedTime()) { + return; + } + + // 缓存失效之前,将数据写入数据库 + var entity = pnode.getEntity(); + @SuppressWarnings("unchecked") + var entityClass = (Class) entityDef.getClazz(); + var collection = OrmContext.getOrmManager().getCollection(entityClass); + + var version = entity.gvs(); + entity.svs(version + 1); + + var filter = entity.gvs() > 0 + ? Filters.and(Filters.eq("_id", entity.id()), Filters.eq("vs", version)) + : Filters.eq("_id", entity.id()); + var result = collection.replaceOne(filter, entity); + if (result.getModifiedCount() <= 0) { + // 移除缓存时,更新数据库中的实体文档异常 + logger.error("onRemoval(): update entity to db failed when remove [{}] [pk:{}] by [removalCause:{}]", entityClass.getSimpleName(), entity.id(), removalCause); + } + } + }; this.entityDef = entityDef; - - this.cache = Caffeine.newBuilder() - .expireAfterAccess(entityDef.getExpireMillisecond(), TimeUnit.MILLISECONDS) - .maximumSize(entityDef.getCacheSize()) - .initialCapacity(CollectionUtils.comfortableCapacity(entityDef.getCacheSize())) - //.recordStats() // 开启统计信息开关,cache.stats()获取统计信息 - .removalListener(new RemovalListener>() { - @Override - public void onRemoval(@Nullable PK pk, @Nullable PNode pnode, @NonNull RemovalCause removalCause) { - if (pnode.getWriteToDbTime() == pnode.getModifiedTime()) { - return; - } - - // 缓存失效之前,将数据写入数据库 - var entity = pnode.getEntity(); - @SuppressWarnings("unchecked") - var entityClass = (Class) entityDef.getClazz(); - var collection = OrmContext.getOrmManager().getCollection(entityClass); - - var version = entity.gvs(); - entity.svs(version + 1); - - var filter = entity.gvs() > 0 - ? Filters.and(Filters.eq("_id", entity.id()), Filters.eq("vs", version)) - : Filters.eq("_id", entity.id()); - var result = collection.replaceOne(filter, entity); - if (result.getModifiedCount() <= 0) { - // 移除缓存时,更新数据库中的实体文档异常 - logger.error("onRemoval(): update entity to db failed when remove [{}] [pk:{}] by [removalCause:{}]", entityClass.getSimpleName(), entity.id(), removalCause); - } - } - }) - .build(new CacheLoader>() { - @Override - public @Nullable PNode load(@NonNull PK pk) { - @SuppressWarnings("unchecked") - var entity = (E) OrmContext.getAccessor().load(pk, (Class>) entityDef.getClazz()); - - // 如果数据库中不存在则给一个默认值 - if (entity == null) { - @SuppressWarnings("unchecked") - var newEntity = (E) entityDef.newEntity(pk); - return new PNode(newEntity); - } - - return new PNode(entity); - } - }); + this.cache = new LazyCache<>(entityDef.getCacheSize(), entityDef.getExpireMillisecond(), 1 * TimeUtils.MILLIS_PER_SECOND, removeCallback); if (CollectionUtils.isNotEmpty(entityDef.getIndexDefMap())) { // indexMap @@ -122,19 +103,21 @@ public class EntityCache, E extends IEntity> imple @Override public E load(PK pk) { AssertionUtils.notNull(pk); - try { - return cache.get(pk).getEntity(); - } catch (Exception e) { - logger.error("load(): [{}] load [pk:{}] exception", entityDef.getClazz().getSimpleName(), pk, e); - } catch (Throwable t) { - logger.error("load(): [{}] load [pk:{}] error", entityDef.getClazz().getSimpleName(), pk, t); + var pnode = cache.get(pk); + if (pnode != null) { + return pnode.getEntity(); } - // 数据库无法加载缓存,返回默认值 - logger.warn("[{}] can not load [pk:{}] and use default entity to replace it", entityDef.getClazz().getSimpleName(), pk); @SuppressWarnings("unchecked") - var entity = (E) entityDef.newEntity(pk); - var pnode = new PNode(entity); + var entity = (E) OrmContext.getAccessor().load(pk, (Class>) entityDef.getClazz()); + + // 如果数据库中不存在则给一个默认值 + if (entity == null) { + // 数据库无法加载缓存,返回默认值 + logger.warn("[{}] can not load [pk:{}] and use default entity to replace it", entityDef.getClazz().getSimpleName(), pk); + entity = (E) entityDef.newEntity(pk); + } + pnode = new PNode<>(entity); cache.put(pk, pnode); return entity; } @@ -144,7 +127,7 @@ public class EntityCache, E extends IEntity> imple */ private PNode fetchCachePnode(E entity, boolean safe) { var id = entity.id(); - var cachePnode = cache.getIfPresent(id); + var cachePnode = cache.get(id); if (cachePnode == null) { cachePnode = new PNode<>(entity); cache.put(entity.id(), cachePnode); @@ -207,7 +190,7 @@ public class EntityCache, E extends IEntity> imple // 游戏业务中,操作最频繁的是update,不是insert,delete,query // 所以这边并不考虑 AssertionUtils.notNull(pk); - cache.invalidate(pk); + cache.remove(pk); } // 游戏中80%都是执行更新的操作,这样做会极大的提高更新速度 @@ -218,14 +201,17 @@ public class EntityCache, E extends IEntity> imple var updateList = new ArrayList(); var currentTime = TimeUtils.currentTimeMillis(); - for (var pnode : cache.asMap().values()) { - var entity = pnode.getEntity(); - if (pnode.getModifiedTime() != pnode.getWriteToDbTime()) { - pnode.setWriteToDbTime(currentTime); - pnode.setModifiedTime(currentTime); - updateList.add(entity); + cache.forEach(new BiConsumer>() { + @Override + public void accept(PK pk, PNode pnode) { + var entity = pnode.getEntity(); + if (pnode.getModifiedTime() != pnode.getWriteToDbTime()) { + pnode.setWriteToDbTime(currentTime); + pnode.setModifiedTime(currentTime); + updateList.add(entity); + } } - } + }); // 执行更新 if (updateList.isEmpty()) { @@ -289,7 +275,7 @@ public class EntityCache, E extends IEntity> imple var dbEntity = dbMap.get(id); if (dbEntity == null) { - cache.invalidate(entity.id()); + cache.remove(entity.id()); logger.warn("[database:{}] not found entity [id:{}]", entityClass.getSimpleName(), id); continue; } @@ -313,7 +299,7 @@ public class EntityCache, E extends IEntity> imple // 如果数据库版本号较大,说明缓存的数据不是最新的,直接清除缓存,下次重新加载 if (dbEntity.gvs() > entity.gvs()) { - cache.invalidate(id); + cache.remove(id); load(id); logger.warn("[database:{}] document of entity [id:{}] version [{}] is greater than cache [vs:{}]", entityClass.getSimpleName(), id, dbEntity.gvs(), entity.gvs()); continue; @@ -324,32 +310,14 @@ public class EntityCache, E extends IEntity> imple } } - @Override - public List allPresentCaches() { - var allPnodes = cache.asMap().values(); - - if (allPnodes.isEmpty()) { - return Collections.emptyList(); - } - return allPnodes.stream().map(it -> it.getEntity()).toList(); - } - @Override public void forEach(BiConsumer biConsumer) { - cache.asMap().forEach((pk, pNode) -> biConsumer.accept(pk, pNode.getEntity())); + cache.forEach((pk, pnode) -> biConsumer.accept(pk, pnode.getEntity())); } @Override public long size() { - return cache.estimatedSize(); - } - - @Override - public String recordStatus() { - var stats = cache.stats(); - // 缓存命中率/命中次数/加载次数/加载新值的平均时间秒/缓存项被回收的总数 - return StringUtils.format("database [{}] [hitRate:{}] [hitCount:{}] [loadCount:{}] [averageLoadPenalty:{}] [evictionCount:{}]" - , entityDef.getClazz().getSimpleName(), stats.hitRate(), stats.hitCount(), stats.loadCount(), stats.averageLoadPenalty() / TimeUtils.NANO_PER_SECOND, stats.evictionCount()); + return cache.size(); } } diff --git a/orm/src/main/java/com/zfoo/orm/cache/IEntityCache.java b/orm/src/main/java/com/zfoo/orm/cache/IEntityCache.java index b1fff5cb..334fc1ed 100644 --- a/orm/src/main/java/com/zfoo/orm/cache/IEntityCache.java +++ b/orm/src/main/java/com/zfoo/orm/cache/IEntityCache.java @@ -15,7 +15,6 @@ package com.zfoo.orm.cache; import com.zfoo.orm.model.IEntity; import org.springframework.lang.NonNull; -import java.util.List; import java.util.function.BiConsumer; /** @@ -64,18 +63,8 @@ public interface IEntityCache, E extends IEntity> */ void persistAll(); - /** - * 获取所有存在的缓存对象 - */ - List allPresentCaches(); - void forEach(BiConsumer biConsumer); long size(); - /** - * 统计缓存命中率 - */ - String recordStatus(); - } diff --git a/orm/src/test/java/com/zfoo/orm/cache/LazyCacheTest.java b/orm/src/test/java/com/zfoo/orm/cache/LazyCacheTest.java index ca574547..09ec63c5 100644 --- a/orm/src/test/java/com/zfoo/orm/cache/LazyCacheTest.java +++ b/orm/src/test/java/com/zfoo/orm/cache/LazyCacheTest.java @@ -54,6 +54,27 @@ public class LazyCacheTest { System.out.println(lazyCache.get(5)); } + @Test + public void expire1Test() { + var lazyCache = new LazyCache(10, 10 * TimeUtils.MILLIS_PER_SECOND, 5 * TimeUtils.MILLIS_PER_SECOND, myRemoveCallback); + + lazyCache.put(1, "a"); + lazyCache.put(2, "b"); + lazyCache.put(3, "c"); + lazyCache.put(4, "d"); + lazyCache.put(5, "e"); + for (int i = 0; i < 11; i++) { + lazyCache.get(1); + lazyCache.get(2); + ThreadUtils.sleep(1 * TimeUtils.MILLIS_PER_SECOND); + } + System.out.println(lazyCache.get(1)); + System.out.println(lazyCache.get(2)); + System.out.println(lazyCache.get(3)); + System.out.println(lazyCache.get(4)); + System.out.println(lazyCache.get(5)); + } + @Test public void batchTest() { var lazyCache = new LazyCache(1_0000, 10 * TimeUtils.MILLIS_PER_SECOND, 5 * TimeUtils.MILLIS_PER_SECOND, myRemoveCallback); diff --git a/orm/src/test/java/com/zfoo/orm/cache/OrmTest.java b/orm/src/test/java/com/zfoo/orm/cache/OrmTest.java index 2f1d62ef..386f8c60 100644 --- a/orm/src/test/java/com/zfoo/orm/cache/OrmTest.java +++ b/orm/src/test/java/com/zfoo/orm/cache/OrmTest.java @@ -13,7 +13,6 @@ package com.zfoo.orm.cache; -import com.zfoo.orm.OrmContext; import com.zfoo.protocol.util.ThreadUtils; import org.junit.Ignore; import org.junit.Test; @@ -39,8 +38,6 @@ public class OrmTest { userEntityCaches.update(entity); } - OrmContext.getOrmManager().getAllEntityCaches().forEach(it -> System.out.println(it.recordStatus())); - ThreadUtils.sleep(Long.MAX_VALUE); }