Merge pull request #33 from Yuao-github/main

feat[event] 将同异步处理事件的控制 由事件发布者下放到观察者
This commit is contained in:
godotg
2022-12-01 17:52:40 +08:00
committed by GitHub
7 changed files with 53 additions and 18 deletions
@@ -25,6 +25,7 @@ public class MainTest {
public void testTemplate() {
System.out.println(StringUtils.MULTIPLE_HYPHENS);
System.out.println("XX测试:");
System.out.println("新加测试");
System.out.println("**********");
System.out.println(StringUtils.MULTIPLE_HYPHENS);
}
@@ -49,8 +49,14 @@ public abstract class EventBus {
private static final ExecutorService[] executors = new ExecutorService[EXECUTORS_SIZE];
private static final CopyOnWriteHashMapLongObject<ExecutorService> threadMap = new CopyOnWriteHashMapLongObject<>(EXECUTORS_SIZE);
private static final Map<Class<? extends IEvent>, List<IEventReceiver>> receiverMap = new HashMap<>();
/**
* 同步事件映射
*/
private static final Map<Class<? extends IEvent>, List<IEventReceiver>> receiverMapSync = new HashMap<>();
/**
* 异步事件映射
*/
private static final Map<Class<? extends IEvent>, List<IEventReceiver>> receiverMapAsync = new HashMap<>();
static {
for (int i = 0; i < executors.length; i++) {
@@ -83,14 +89,20 @@ public abstract class EventBus {
return thread;
}
}
/**
* 同步抛出一个事件,会在当前线程中运行
* 处理事件
*/
public static void submit(IEvent event) {
syncSubmit(event);
asyncSubmit(event);
}
/**
* 同步抛出一个事件,会在当前线程中运行(只有同步观察者会处理事件)
*
* @param event 需要抛出的事件
*/
public static void syncSubmit(IEvent event) {
var list = receiverMap.get(event.getClass());
var list = receiverMapSync.get(event.getClass());
if (CollectionUtils.isEmpty(list)) {
return;
}
@@ -99,12 +111,12 @@ public abstract class EventBus {
/**
* 异步抛出一个事件,事件不在同一个线程中处理
* 异步抛出一个事件,事件不在同一个线程中处理(只有异步观察者会处理事件)
*
* @param event 需要抛出的事件
*/
public static void asyncSubmit(IEvent event) {
var list = receiverMap.get(event.getClass());
var list = receiverMapAsync.get(event.getClass());
if (CollectionUtils.isEmpty(list)) {
return;
}
@@ -145,10 +157,16 @@ public abstract class EventBus {
}
/**
* 注册事件及其对应观察者
* 注册事件及其对应同异步观察者
*/
public static void registerEventReceiver(Class<? extends IEvent> eventType, IEventReceiver receiver) {
receiverMap.computeIfAbsent(eventType, it -> new LinkedList<>()).add(receiver);
public static void registerEventReceiver(Class<? extends IEvent> eventType, IEventReceiver receiver,boolean asyncFlag) {
if(asyncFlag==true) {
receiverMapAsync.computeIfAbsent(eventType, it -> new LinkedList<>()).add(receiver);
}
else
{
receiverMapSync.computeIfAbsent(eventType, it -> new LinkedList<>()).add(receiver);
}
}
public static Executor threadExecutor(long currentThreadId) {
@@ -0,0 +1,9 @@
package com.zfoo.event.model.anno;
import java.lang.annotation.*;
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface AsyncExecute {
}
@@ -14,6 +14,7 @@
package com.zfoo.event.schema;
import com.zfoo.event.manager.EventBus;
import com.zfoo.event.model.anno.AsyncExecute;
import com.zfoo.event.model.anno.EventReceiver;
import com.zfoo.event.model.event.IEvent;
import com.zfoo.event.model.vo.EnhanceUtils;
@@ -82,8 +83,12 @@ public class EventRegisterProcessor implements BeanPostProcessor {
var receiverDefinition = new EventReceiverDefinition(bean, method, eventClazz);
var enhanceReceiverDefinition = EnhanceUtils.createEventReceiver(receiverDefinition);
//异步执行标志,false表示同步执行,true表示异步执行
boolean asyncFlag=false;
if(method.isAnnotationPresent(AsyncExecute.class))
asyncFlag=true;
// key:class类型 value:观察者 注册Event的receiverMap中
EventBus.registerEventReceiver(eventClazz, enhanceReceiverDefinition);
EventBus.registerEventReceiver(eventClazz, enhanceReceiverDefinition,asyncFlag);
}
} catch (Throwable t) {
throw new RuntimeException(t);
@@ -36,11 +36,11 @@ public class ApplicationTest {
// 参考MyController1中的标准注册方法
// 抛出同步事件,事件会被当前线程立刻执行,注意日志打印的线程号
EventBus.syncSubmit(MyNoticeEvent.valueOf("同步事件"));
// 抛出异步事件,事件会被不会立刻执行,注意日志打印的线程号
EventBus.asyncSubmit(MyNoticeEvent.valueOf("异步事件"));
// EventBus.syncSubmit(MyNoticeEvent.valueOf("同步事件"));
//
// // 抛出异步事件,事件会被不会立刻执行,注意日志打印的线程号
// EventBus.asyncSubmit(MyNoticeEvent.valueOf("异步事件"));
EventBus.submit(MyNoticeEvent.valueOf("处理事件"));
// 睡眠3秒,等待异步事件执行完
ThreadUtils.sleep(3000);
}
@@ -29,7 +29,7 @@ public class MyController1 {
@EventReceiver
public void onMyNoticeEvent(MyNoticeEvent event) {
logger.info("方法1收到事件:" + event.getMessage());
logger.info("方法1同步执行事件:" + event.getMessage());
}
}
@@ -13,6 +13,7 @@
package com.zfoo.event;
import com.zfoo.event.model.anno.AsyncExecute;
import com.zfoo.event.model.anno.EventReceiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,9 +31,10 @@ public class MyController2 {
/**
* 同一个事件可以被重复注册和接受
*/
@AsyncExecute
@EventReceiver
public void onMyNoticeEvent(MyNoticeEvent event) {
logger.info("方法2收到事件:" + event.getMessage());
logger.info("方法2异步执行事件:" + event.getMessage());
}
}