perf[net]: exceptionCallback使用更加安全的SafeRunnable

This commit is contained in:
jaysunxiao
2021-08-23 22:43:46 +08:00
parent cc47ff6568
commit d2ca3d0d3f
4 changed files with 45 additions and 30 deletions
@@ -34,7 +34,6 @@ import com.zfoo.net.session.model.Session;
import com.zfoo.net.task.TaskManager;
import com.zfoo.net.task.model.ReceiveTask;
import com.zfoo.protocol.IPacket;
import com.zfoo.protocol.exception.ExceptionUtils;
import com.zfoo.protocol.util.JsonUtils;
import com.zfoo.protocol.util.StringUtils;
import com.zfoo.util.math.HashUtils;
@@ -220,23 +219,21 @@ public class PacketDispatcher implements IPacketDispatcher {
clientAttachment.getResponseFuture()
.completeOnTimeout(null, DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS)
.thenApply(response -> {
if (response == null) {
throw new NetTimeOutException(StringUtils.format("asyncRequest timeout exception, ask:[{}], attachment:[{}]"
, JsonUtils.object2String(packet), JsonUtils.object2String(clientAttachment)));
.thenApply(answer -> {
if (answer == null) {
throw new NetTimeOutException(StringUtils.format("async ask [{}] timeout exception", packet.getClass().getSimpleName()));
}
if (response.protocolId() == Error.errorProtocolId()) {
throw new ErrorResponseException((Error) response);
if (answer.protocolId() == Error.errorProtocolId()) {
throw new ErrorResponseException((Error) answer);
}
if (answerClass != null && answerClass != response.getClass()) {
throw new UnexpectedProtocolException(StringUtils.format("client expect protocol:[{}], but found protocol:[{}]"
, answerClass, response.getClass().getName()));
if (answerClass != null && answerClass != answer.getClass()) {
throw new UnexpectedProtocolException("client expect protocol:[{}], but found protocol:[{}]", answerClass, answer.getClass().getName());
}
return response;
return answer;
})
.whenCompleteAsync((responsePacket, throwable) -> {
.whenCompleteAsync((answer, throwable) -> {
try {
PacketSignal.removeSignalAttachment(clientAttachment);
@@ -247,20 +244,18 @@ public class PacketDispatcher implements IPacketDispatcher {
// 如果有异常的话,whenCompleteAsync的下一个thenAccept不会执行
if (throwable != null) {
var exceptionCallback = asyncAnswer.getExceptionCallback();
if (exceptionCallback != null) {
exceptionCallback.accept(throwable);
var notCompleteCallback = asyncAnswer.getNotCompleteCallback();
if (notCompleteCallback != null) {
notCompleteCallback.run();
}
logger.error(ExceptionUtils.getMessage(throwable));
return;
}
// 异步返回,回调业务逻辑
asyncAnswer.setFuturePacket((T) responsePacket);
asyncAnswer.setFuturePacket((T) answer);
asyncAnswer.consume();
} catch (Exception exception) {
logger.error("consume response error requestPacket:[{}] and responsePacket:[{}]",
JsonUtils.object2String(packet), JsonUtils.object2String(responsePacket), exception);
} catch (Throwable throwable1) {
logger.error("异步回调方法[ask:{}][answer:{}]错误", packet.getClass().getSimpleName(), answer.getClass().getSimpleName(), throwable1);
} finally {
if (serverSignalPacketAttachment != null) {
serverReceiveSignalPacketAttachment.set(null);
@@ -14,6 +14,7 @@
package com.zfoo.net.dispatcher.model.answer;
import com.zfoo.net.packet.model.SignalPacketAttachment;
import com.zfoo.net.task.model.SafeRunnable;
import com.zfoo.protocol.IPacket;
import java.util.ArrayList;
@@ -33,7 +34,7 @@ public class AsyncAnswer<T extends IPacket> implements IAsyncAnswer<T> {
private Runnable askCallback;
private Consumer<Throwable> exceptionCallback;
private SafeRunnable notCompleteCallback;
@Override
@@ -49,8 +50,9 @@ public class AsyncAnswer<T extends IPacket> implements IAsyncAnswer<T> {
}
@Override
public void exceptionally(Consumer<Throwable> exceptionCallback) {
this.exceptionCallback = exceptionCallback;
public IAsyncAnswer<T> notComplete(SafeRunnable notCompleteCallback) {
this.notCompleteCallback = notCompleteCallback;
return this;
}
public void consume() {
@@ -81,7 +83,7 @@ public class AsyncAnswer<T extends IPacket> implements IAsyncAnswer<T> {
this.askCallback = askCallback;
}
public Consumer<Throwable> getExceptionCallback() {
return exceptionCallback;
public SafeRunnable getNotCompleteCallback() {
return notCompleteCallback;
}
}
@@ -13,6 +13,7 @@
package com.zfoo.net.dispatcher.model.answer;
import com.zfoo.net.task.model.SafeRunnable;
import com.zfoo.protocol.IPacket;
import java.util.function.Consumer;
@@ -31,8 +32,8 @@ public interface IAsyncAnswer<T extends IPacket> {
void whenComplete(Consumer<T> consumer);
/**
* 当异步种有异常会回调的方法
* 没有执行成功的回调的方法
*/
void exceptionally(Consumer<Throwable> exceptionCallback);
IAsyncAnswer<T> notComplete(SafeRunnable notCompleteCallback);
}
@@ -13,14 +13,31 @@
package com.zfoo.net.dispatcher.model.exception;
import com.zfoo.protocol.exception.RunException;
/**
* @author jaysunxiao
* @version 3.0
*/
public class UnexpectedProtocolException extends RuntimeException {
public class UnexpectedProtocolException extends RunException {
public UnexpectedProtocolException(String s) {
super(s);
public UnexpectedProtocolException(Throwable cause) {
super(cause);
}
public UnexpectedProtocolException(String message) {
super(message);
}
public UnexpectedProtocolException(String template, Object... args) {
super(template, args);
}
public UnexpectedProtocolException(Throwable cause, String message) {
super(cause, message);
}
public UnexpectedProtocolException(Throwable cause, String template, Object... args) {
super(cause, template, args);
}
}