diff --git a/net/src/main/java/com/zfoo/net/dispatcher/manager/PacketDispatcher.java b/net/src/main/java/com/zfoo/net/dispatcher/manager/PacketDispatcher.java index 9390711f..84f4c84c 100644 --- a/net/src/main/java/com/zfoo/net/dispatcher/manager/PacketDispatcher.java +++ b/net/src/main/java/com/zfoo/net/dispatcher/manager/PacketDispatcher.java @@ -34,7 +34,6 @@ import com.zfoo.net.session.model.Session; import com.zfoo.net.task.TaskManager; import com.zfoo.net.task.model.ReceiveTask; import com.zfoo.protocol.IPacket; -import com.zfoo.protocol.exception.ExceptionUtils; import com.zfoo.protocol.util.JsonUtils; import com.zfoo.protocol.util.StringUtils; import com.zfoo.util.math.HashUtils; @@ -220,23 +219,21 @@ public class PacketDispatcher implements IPacketDispatcher { clientAttachment.getResponseFuture() .completeOnTimeout(null, DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS) - .thenApply(response -> { - if (response == null) { - throw new NetTimeOutException(StringUtils.format("asyncRequest timeout exception, ask:[{}], attachment:[{}]" - , JsonUtils.object2String(packet), JsonUtils.object2String(clientAttachment))); + .thenApply(answer -> { + if (answer == null) { + throw new NetTimeOutException(StringUtils.format("async ask [{}] timeout exception", packet.getClass().getSimpleName())); } - if (response.protocolId() == Error.errorProtocolId()) { - throw new ErrorResponseException((Error) response); + if (answer.protocolId() == Error.errorProtocolId()) { + throw new ErrorResponseException((Error) answer); } - if (answerClass != null && answerClass != response.getClass()) { - throw new UnexpectedProtocolException(StringUtils.format("client expect protocol:[{}], but found protocol:[{}]" - , answerClass, response.getClass().getName())); + if (answerClass != null && answerClass != answer.getClass()) { + throw new UnexpectedProtocolException("client expect protocol:[{}], but found protocol:[{}]", answerClass, answer.getClass().getName()); } - return response; + return answer; }) - .whenCompleteAsync((responsePacket, throwable) -> { + .whenCompleteAsync((answer, throwable) -> { try { PacketSignal.removeSignalAttachment(clientAttachment); @@ -247,20 +244,18 @@ public class PacketDispatcher implements IPacketDispatcher { // 如果有异常的话,whenCompleteAsync的下一个thenAccept不会执行 if (throwable != null) { - var exceptionCallback = asyncAnswer.getExceptionCallback(); - if (exceptionCallback != null) { - exceptionCallback.accept(throwable); + var notCompleteCallback = asyncAnswer.getNotCompleteCallback(); + if (notCompleteCallback != null) { + notCompleteCallback.run(); } - logger.error(ExceptionUtils.getMessage(throwable)); return; } // 异步返回,回调业务逻辑 - asyncAnswer.setFuturePacket((T) responsePacket); + asyncAnswer.setFuturePacket((T) answer); asyncAnswer.consume(); - } catch (Exception exception) { - logger.error("consume response error requestPacket:[{}] and responsePacket:[{}]", - JsonUtils.object2String(packet), JsonUtils.object2String(responsePacket), exception); + } catch (Throwable throwable1) { + logger.error("异步回调方法[ask:{}][answer:{}]错误", packet.getClass().getSimpleName(), answer.getClass().getSimpleName(), throwable1); } finally { if (serverSignalPacketAttachment != null) { serverReceiveSignalPacketAttachment.set(null); diff --git a/net/src/main/java/com/zfoo/net/dispatcher/model/answer/AsyncAnswer.java b/net/src/main/java/com/zfoo/net/dispatcher/model/answer/AsyncAnswer.java index 2b2d96ae..ac7e94c5 100644 --- a/net/src/main/java/com/zfoo/net/dispatcher/model/answer/AsyncAnswer.java +++ b/net/src/main/java/com/zfoo/net/dispatcher/model/answer/AsyncAnswer.java @@ -14,6 +14,7 @@ package com.zfoo.net.dispatcher.model.answer; import com.zfoo.net.packet.model.SignalPacketAttachment; +import com.zfoo.net.task.model.SafeRunnable; import com.zfoo.protocol.IPacket; import java.util.ArrayList; @@ -33,7 +34,7 @@ public class AsyncAnswer implements IAsyncAnswer { private Runnable askCallback; - private Consumer exceptionCallback; + private SafeRunnable notCompleteCallback; @Override @@ -49,8 +50,9 @@ public class AsyncAnswer implements IAsyncAnswer { } @Override - public void exceptionally(Consumer exceptionCallback) { - this.exceptionCallback = exceptionCallback; + public IAsyncAnswer notComplete(SafeRunnable notCompleteCallback) { + this.notCompleteCallback = notCompleteCallback; + return this; } public void consume() { @@ -81,7 +83,7 @@ public class AsyncAnswer implements IAsyncAnswer { this.askCallback = askCallback; } - public Consumer getExceptionCallback() { - return exceptionCallback; + public SafeRunnable getNotCompleteCallback() { + return notCompleteCallback; } } diff --git a/net/src/main/java/com/zfoo/net/dispatcher/model/answer/IAsyncAnswer.java b/net/src/main/java/com/zfoo/net/dispatcher/model/answer/IAsyncAnswer.java index 49456fe3..514cf710 100644 --- a/net/src/main/java/com/zfoo/net/dispatcher/model/answer/IAsyncAnswer.java +++ b/net/src/main/java/com/zfoo/net/dispatcher/model/answer/IAsyncAnswer.java @@ -13,6 +13,7 @@ package com.zfoo.net.dispatcher.model.answer; +import com.zfoo.net.task.model.SafeRunnable; import com.zfoo.protocol.IPacket; import java.util.function.Consumer; @@ -31,8 +32,8 @@ public interface IAsyncAnswer { void whenComplete(Consumer consumer); /** - * 当异步种有异常会回调的方法 + * 没有执行成功的回调的方法 */ - void exceptionally(Consumer exceptionCallback); + IAsyncAnswer notComplete(SafeRunnable notCompleteCallback); } diff --git a/net/src/main/java/com/zfoo/net/dispatcher/model/exception/UnexpectedProtocolException.java b/net/src/main/java/com/zfoo/net/dispatcher/model/exception/UnexpectedProtocolException.java index 726e4b4d..7b477c15 100644 --- a/net/src/main/java/com/zfoo/net/dispatcher/model/exception/UnexpectedProtocolException.java +++ b/net/src/main/java/com/zfoo/net/dispatcher/model/exception/UnexpectedProtocolException.java @@ -13,14 +13,31 @@ package com.zfoo.net.dispatcher.model.exception; +import com.zfoo.protocol.exception.RunException; + /** * @author jaysunxiao * @version 3.0 */ -public class UnexpectedProtocolException extends RuntimeException { +public class UnexpectedProtocolException extends RunException { - public UnexpectedProtocolException(String s) { - super(s); + public UnexpectedProtocolException(Throwable cause) { + super(cause); } + public UnexpectedProtocolException(String message) { + super(message); + } + + public UnexpectedProtocolException(String template, Object... args) { + super(template, args); + } + + public UnexpectedProtocolException(Throwable cause, String message) { + super(cause, message); + } + + public UnexpectedProtocolException(Throwable cause, String template, Object... args) { + super(cause, template, args); + } }