diff --git a/boot/src/test/java/com/zfoo/boot/MainTest.java b/boot/src/test/java/com/zfoo/boot/MainTest.java index c03d5538..99a191f0 100644 --- a/boot/src/test/java/com/zfoo/boot/MainTest.java +++ b/boot/src/test/java/com/zfoo/boot/MainTest.java @@ -25,6 +25,7 @@ public class MainTest { public void testTemplate() { System.out.println(StringUtils.MULTIPLE_HYPHENS); System.out.println("XX测试:"); + System.out.println("新加测试"); System.out.println("**********"); System.out.println(StringUtils.MULTIPLE_HYPHENS); } diff --git a/event/src/main/java/com/zfoo/event/manager/EventBus.java b/event/src/main/java/com/zfoo/event/manager/EventBus.java index c5be0d33..14db8582 100644 --- a/event/src/main/java/com/zfoo/event/manager/EventBus.java +++ b/event/src/main/java/com/zfoo/event/manager/EventBus.java @@ -49,8 +49,14 @@ public abstract class EventBus { private static final ExecutorService[] executors = new ExecutorService[EXECUTORS_SIZE]; private static final CopyOnWriteHashMapLongObject threadMap = new CopyOnWriteHashMapLongObject<>(EXECUTORS_SIZE); - - private static final Map, List> receiverMap = new HashMap<>(); + /** + * 同步事件映射 + */ + private static final Map, List> receiverMapSync = new HashMap<>(); + /** + * 异步事件映射 + */ + private static final Map, List> receiverMapAsync = new HashMap<>(); static { for (int i = 0; i < executors.length; i++) { @@ -83,14 +89,20 @@ public abstract class EventBus { return thread; } } - /** - * 同步抛出一个事件,会在当前线程中运行 + * 处理事件 + */ + public static void submit(IEvent event) { + syncSubmit(event); + asyncSubmit(event); + } + /** + * 同步抛出一个事件,会在当前线程中运行(只有同步观察者会处理事件) * * @param event 需要抛出的事件 */ public static void syncSubmit(IEvent event) { - var list = receiverMap.get(event.getClass()); + var list = receiverMapSync.get(event.getClass()); if (CollectionUtils.isEmpty(list)) { return; } @@ -99,12 +111,12 @@ public abstract class EventBus { /** - * 异步抛出一个事件,事件不在同一个线程中处理 + * 异步抛出一个事件,事件不在同一个线程中处理(只有异步观察者会处理事件) * * @param event 需要抛出的事件 */ public static void asyncSubmit(IEvent event) { - var list = receiverMap.get(event.getClass()); + var list = receiverMapAsync.get(event.getClass()); if (CollectionUtils.isEmpty(list)) { return; } @@ -145,10 +157,16 @@ public abstract class EventBus { } /** - * 注册事件及其对应观察者 + * 注册事件及其对应同异步观察者 */ - public static void registerEventReceiver(Class eventType, IEventReceiver receiver) { - receiverMap.computeIfAbsent(eventType, it -> new LinkedList<>()).add(receiver); + public static void registerEventReceiver(Class eventType, IEventReceiver receiver,boolean asyncFlag) { + if(asyncFlag==true) { + receiverMapAsync.computeIfAbsent(eventType, it -> new LinkedList<>()).add(receiver); + } + else + { + receiverMapSync.computeIfAbsent(eventType, it -> new LinkedList<>()).add(receiver); + } } public static Executor threadExecutor(long currentThreadId) { diff --git a/event/src/main/java/com/zfoo/event/model/anno/AsyncExecute.java b/event/src/main/java/com/zfoo/event/model/anno/AsyncExecute.java new file mode 100644 index 00000000..431d0d9d --- /dev/null +++ b/event/src/main/java/com/zfoo/event/model/anno/AsyncExecute.java @@ -0,0 +1,9 @@ +package com.zfoo.event.model.anno; + +import java.lang.annotation.*; + +@Documented +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.METHOD}) +public @interface AsyncExecute { +} diff --git a/event/src/main/java/com/zfoo/event/schema/EventRegisterProcessor.java b/event/src/main/java/com/zfoo/event/schema/EventRegisterProcessor.java index f91a3602..8e89499d 100644 --- a/event/src/main/java/com/zfoo/event/schema/EventRegisterProcessor.java +++ b/event/src/main/java/com/zfoo/event/schema/EventRegisterProcessor.java @@ -14,6 +14,7 @@ package com.zfoo.event.schema; import com.zfoo.event.manager.EventBus; +import com.zfoo.event.model.anno.AsyncExecute; import com.zfoo.event.model.anno.EventReceiver; import com.zfoo.event.model.event.IEvent; import com.zfoo.event.model.vo.EnhanceUtils; @@ -82,8 +83,12 @@ public class EventRegisterProcessor implements BeanPostProcessor { var receiverDefinition = new EventReceiverDefinition(bean, method, eventClazz); var enhanceReceiverDefinition = EnhanceUtils.createEventReceiver(receiverDefinition); + //异步执行标志,false表示同步执行,true表示异步执行 + boolean asyncFlag=false; + if(method.isAnnotationPresent(AsyncExecute.class)) + asyncFlag=true; // key:class类型 value:观察者 注册Event的receiverMap中 - EventBus.registerEventReceiver(eventClazz, enhanceReceiverDefinition); + EventBus.registerEventReceiver(eventClazz, enhanceReceiverDefinition,asyncFlag); } } catch (Throwable t) { throw new RuntimeException(t); diff --git a/event/src/test/java/com/zfoo/event/ApplicationTest.java b/event/src/test/java/com/zfoo/event/ApplicationTest.java index f64a1339..b63cb995 100644 --- a/event/src/test/java/com/zfoo/event/ApplicationTest.java +++ b/event/src/test/java/com/zfoo/event/ApplicationTest.java @@ -36,11 +36,11 @@ public class ApplicationTest { // 参考MyController1中的标准注册方法 // 抛出同步事件,事件会被当前线程立刻执行,注意日志打印的线程号 - EventBus.syncSubmit(MyNoticeEvent.valueOf("同步事件")); - - // 抛出异步事件,事件会被不会立刻执行,注意日志打印的线程号 - EventBus.asyncSubmit(MyNoticeEvent.valueOf("异步事件")); - +// EventBus.syncSubmit(MyNoticeEvent.valueOf("同步事件")); +// +// // 抛出异步事件,事件会被不会立刻执行,注意日志打印的线程号 +// EventBus.asyncSubmit(MyNoticeEvent.valueOf("异步事件")); + EventBus.submit(MyNoticeEvent.valueOf("处理事件")); // 睡眠3秒,等待异步事件执行完 ThreadUtils.sleep(3000); } diff --git a/event/src/test/java/com/zfoo/event/MyController1.java b/event/src/test/java/com/zfoo/event/MyController1.java index d7c97ad1..47195780 100644 --- a/event/src/test/java/com/zfoo/event/MyController1.java +++ b/event/src/test/java/com/zfoo/event/MyController1.java @@ -29,7 +29,7 @@ public class MyController1 { @EventReceiver public void onMyNoticeEvent(MyNoticeEvent event) { - logger.info("方法1收到事件:" + event.getMessage()); + logger.info("方法1同步执行事件:" + event.getMessage()); } } diff --git a/event/src/test/java/com/zfoo/event/MyController2.java b/event/src/test/java/com/zfoo/event/MyController2.java index 8fc67e11..238abc60 100644 --- a/event/src/test/java/com/zfoo/event/MyController2.java +++ b/event/src/test/java/com/zfoo/event/MyController2.java @@ -13,6 +13,7 @@ package com.zfoo.event; +import com.zfoo.event.model.anno.AsyncExecute; import com.zfoo.event.model.anno.EventReceiver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,9 +31,10 @@ public class MyController2 { /** * 同一个事件可以被重复注册和接受 */ + @AsyncExecute @EventReceiver public void onMyNoticeEvent(MyNoticeEvent event) { - logger.info("方法2收到事件:" + event.getMessage()); + logger.info("方法2异步执行事件:" + event.getMessage()); } }