mirror of
https://github.com/tiennm99/zfoo.git
synced 2026-05-22 22:26:06 +00:00
perf[net]: 优化同步或异步的调用控制器
This commit is contained in:
@@ -31,8 +31,8 @@ public class PacketSignal {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(PacketSignal.class);
|
||||
|
||||
// equal with 16383
|
||||
private static final int SIGNAL_MASK = 0B00000000_00000000_00111111_11111111;
|
||||
// equal with 32767
|
||||
private static final int SIGNAL_MASK = 0B00000000_00000000_01111111_11111111;
|
||||
|
||||
private static AtomicReferenceArray<SignalPacketAttachment> signalPacketArray = new AtomicReferenceArray<>(SIGNAL_MASK + 1);
|
||||
|
||||
@@ -45,9 +45,11 @@ public class PacketSignal {
|
||||
var packetId = packetAttachment.getPacketId();
|
||||
var hash = packetId & SIGNAL_MASK;
|
||||
|
||||
if (!signalPacketArray.compareAndSet(hash, null, packetAttachment)) {
|
||||
signalPacketAttachmentMap.put(packetId, packetAttachment);
|
||||
if (signalPacketArray.compareAndSet(hash, null, packetAttachment)) {
|
||||
return;
|
||||
}
|
||||
|
||||
signalPacketAttachmentMap.put(packetId, packetAttachment);
|
||||
}
|
||||
|
||||
public static SignalPacketAttachment removeSignalAttachment(SignalPacketAttachment packetAttachment) {
|
||||
|
||||
@@ -0,0 +1,79 @@
|
||||
/*
|
||||
* Copyright (C) 2020 The zfoo Authors
|
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and limitations under the License.
|
||||
*/
|
||||
|
||||
package com.zfoo.net.dispatcher;
|
||||
|
||||
import com.zfoo.protocol.util.JsonUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicReferenceArray;
|
||||
|
||||
/**
|
||||
* 同步或异步的调用控制器
|
||||
*
|
||||
* @author jaysunxiao
|
||||
* @version 3.0
|
||||
*/
|
||||
public class PacketSignalArray {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(PacketSignalArray.class);
|
||||
|
||||
// equal with 16383
|
||||
private static final int SIGNAL_MASK = 0B00000000_00000000_01111111_11111111;
|
||||
|
||||
private static AtomicReferenceArray<Integer> signalPacketArray = new AtomicReferenceArray<>(SIGNAL_MASK + 1);
|
||||
|
||||
/**
|
||||
* Session控制同步或异步的附加包,key:packetId
|
||||
*/
|
||||
private static Map<Integer, Integer> signalPacketAttachmentMap = new ConcurrentHashMap<>(1000);
|
||||
|
||||
public static void addSignalAttachment(int packetId) {
|
||||
var hash = packetId & SIGNAL_MASK;
|
||||
|
||||
if (signalPacketArray.compareAndSet(hash, null, packetId)) {
|
||||
return;
|
||||
}
|
||||
// logger.info("add [packetId:{}] [oldPacketId:{}]", packetId, signalPacketArray.get(hash));
|
||||
signalPacketAttachmentMap.put(packetId, packetId);
|
||||
}
|
||||
|
||||
|
||||
public static void removeSignalAttachment(int packetId) {
|
||||
var hash = packetId & SIGNAL_MASK;
|
||||
var oldPacketId = signalPacketArray.get(hash);
|
||||
|
||||
if (oldPacketId != null && oldPacketId == packetId && signalPacketArray.compareAndSet(hash, oldPacketId, null)) {
|
||||
return;
|
||||
}
|
||||
// logger.info("remove [packetId:{}] [oldPacketId:{}]", packetId, oldPacketId);
|
||||
signalPacketAttachmentMap.remove(packetId);
|
||||
}
|
||||
|
||||
public static void status() {
|
||||
var count = 0;
|
||||
for (int i = 0; i < SIGNAL_MASK + 1; i++) {
|
||||
var value = signalPacketArray.get(i);
|
||||
if (value != null) {
|
||||
logger.info("signalPacketArray has attachment [index:{}][count:{}][value:{}]", i, ++count, JsonUtils.object2String(value));
|
||||
}
|
||||
}
|
||||
|
||||
signalPacketAttachmentMap.forEach((key, value) -> {
|
||||
logger.info("signalPacketAttachmentMap has attachment [key:{}][value:{}]", key, JsonUtils.object2String(value));
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,49 @@
|
||||
/*
|
||||
* Copyright (C) 2020 The zfoo Authors
|
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and limitations under the License.
|
||||
*/
|
||||
|
||||
package com.zfoo.net.dispatcher;
|
||||
|
||||
import com.zfoo.protocol.util.JsonUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* 同步或异步的调用控制器
|
||||
*
|
||||
* @author jaysunxiao
|
||||
* @version 3.0
|
||||
*/
|
||||
public class PacketSignalMap {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(PacketSignalMap.class);
|
||||
|
||||
private static Map<Integer, Integer> signalPacketAttachmentMap = new ConcurrentHashMap<>(1_0000);
|
||||
|
||||
public static void addSignalAttachment(int packetId) {
|
||||
signalPacketAttachmentMap.put(packetId, packetId);
|
||||
}
|
||||
|
||||
|
||||
public static void removeSignalAttachment(int packetId) {
|
||||
signalPacketAttachmentMap.remove(packetId);
|
||||
}
|
||||
|
||||
public static void status() {
|
||||
signalPacketAttachmentMap.forEach((key, value) -> {
|
||||
logger.info("signalPacketAttachmentMap has attachment [key:{}][value:{}]", key, JsonUtils.object2String(value));
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
@@ -14,11 +14,12 @@ package com.zfoo.net.dispatcher;
|
||||
|
||||
import com.zfoo.event.manager.EventBus;
|
||||
import com.zfoo.net.dispatcher.manager.PacketSignal;
|
||||
import com.zfoo.net.packet.model.SignalPacketAttachment;
|
||||
import com.zfoo.scheduler.util.TimeUtils;
|
||||
import com.zfoo.util.ThreadUtils;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
@@ -31,47 +32,90 @@ public class PacketSignalTest {
|
||||
|
||||
private AtomicInteger atomicInteger = new AtomicInteger(0);
|
||||
|
||||
private int executorSize = EventBus.EXECUTORS_SIZE / 2;
|
||||
private int count = 100_0000;
|
||||
private int totalIndex = 10;
|
||||
|
||||
@Test
|
||||
public void test() throws InterruptedException {
|
||||
// 预热
|
||||
addAndRemove();
|
||||
arrayTest();
|
||||
mapTest();
|
||||
|
||||
ThreadUtils.sleep(3000);
|
||||
|
||||
arrayTest();
|
||||
mapTest();
|
||||
System.out.println(atomicInteger.get());
|
||||
}
|
||||
|
||||
public void arrayTest() throws InterruptedException {
|
||||
var startTime = TimeUtils.currentTimeMillis();
|
||||
|
||||
var countDownLatch = new CountDownLatch(EventBus.EXECUTORS_SIZE);
|
||||
for (int i = 0; i < EventBus.EXECUTORS_SIZE; i++) {
|
||||
var countDownLatch = new CountDownLatch(executorSize);
|
||||
for (var i = 0; i < executorSize; i++) {
|
||||
EventBus.asyncExecute(i).execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
addAndRemove();
|
||||
addAndRemoveArray();
|
||||
countDownLatch.countDown();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
countDownLatch.await();
|
||||
|
||||
System.out.println(TimeUtils.currentTimeMillis() - startTime);
|
||||
PacketSignal.status();
|
||||
System.out.println(TimeUtils.currentTimeMillis() - startTime);
|
||||
}
|
||||
|
||||
public void addAndRemove() {
|
||||
for (int count = 0; count < 1_0000; count++) {
|
||||
var startIndex = atomicInteger.incrementAndGet();
|
||||
var endIndex = 0;
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
endIndex = atomicInteger.incrementAndGet();
|
||||
public void mapTest() throws InterruptedException {
|
||||
var startTime = TimeUtils.currentTimeMillis();
|
||||
|
||||
var countDownLatch = new CountDownLatch(executorSize);
|
||||
for (int i = 0; i < executorSize; i++) {
|
||||
EventBus.asyncExecute(i).execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
addAndRemoveMap();
|
||||
countDownLatch.countDown();
|
||||
}
|
||||
});
|
||||
}
|
||||
countDownLatch.await();
|
||||
PacketSignalMap.status();
|
||||
System.out.println(TimeUtils.currentTimeMillis() - startTime);
|
||||
}
|
||||
|
||||
public void addAndRemoveArray() {
|
||||
var list = new ArrayList<Integer>(totalIndex);
|
||||
for (var i = 0; i < count; i++) {
|
||||
list.clear();
|
||||
for (var j = 0; j < totalIndex; j++) {
|
||||
var index = atomicInteger.incrementAndGet();
|
||||
list.add(index);
|
||||
PacketSignalArray.addSignalAttachment(index);
|
||||
}
|
||||
for (int i = startIndex; i < endIndex; i++) {
|
||||
var attachment = new SignalPacketAttachment();
|
||||
attachment.setPacketId(i);
|
||||
PacketSignal.addSignalAttachment(attachment);
|
||||
}
|
||||
for (int i = startIndex; i < endIndex; i++) {
|
||||
PacketSignal.removeSignalAttachment(i);
|
||||
|
||||
for (var index : list) {
|
||||
PacketSignalArray.removeSignalAttachment(index);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void addAndRemoveMap() {
|
||||
var list = new ArrayList<Integer>(totalIndex);
|
||||
for (var i = 0; i < count; i++) {
|
||||
list.clear();
|
||||
for (var j = 0; j < totalIndex; j++) {
|
||||
var index = atomicInteger.incrementAndGet();
|
||||
list.add(index);
|
||||
PacketSignalMap.addSignalAttachment(index);
|
||||
}
|
||||
|
||||
for (var index : list) {
|
||||
PacketSignalMap.removeSignalAttachment(index);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user