test[python]: test python serialization

This commit is contained in:
godotg
2023-08-27 10:13:08 +08:00
parent 20d73fe7e6
commit 6083a284cd
9 changed files with 16424 additions and 0 deletions
+96
View File
@@ -0,0 +1,96 @@
# encoding: UTF-8
from unittest import TestCase
import struct
from pyProtocol import ProtocolManager
from pyProtocol import ByteBuffer
def print_bytearray(array):
signed_byte_array = struct.unpack('b' * len(array), array)
for byte in signed_byte_array:
print(byte) # 以字节形式输出每个字节的值
class ByteBufferTestCase(TestCase):
def test_complex_object(self):
# 打开文件并读取内容
with open('NormalObject.bytes', 'rb') as file:
content = file.read()
# 将内容转为bytearray
byte_array = bytearray(content)
buffer = ByteBuffer.ByteBuffer()
buffer.writeBytes(byte_array)
obj = ProtocolManager.read(buffer)
buffer.clear()
ProtocolManager.write(buffer, obj)
newObj = ProtocolManager.read(buffer)
bytes = buffer.toBytes()
# 打印bytearray
print(byte_array)
pass
def test_bytearray(self):
buffer = bytearray(10)
for byte in buffer:
print(byte) # 以字节形式输出每个字节的值
pass
def test_buffer_raw_int(self):
byteBuffer = ByteBuffer.ByteBuffer()
byteBuffer.writeRawInt(2147483647)
self.assertEqual(byteBuffer.readRawInt(), 2147483647)
def test_buffer_int(self):
byteBuffer = ByteBuffer.ByteBuffer()
byteBuffer.writeInt(2147483647)
self.assertEqual(byteBuffer.readInt(), 2147483647)
byteBuffer.writeInt(-2147483648)
self.assertEqual(byteBuffer.readInt(), -2147483648)
pass
def test_buffer_long(self):
byteBuffer = ByteBuffer.ByteBuffer()
byteBuffer.writeLong(9223372036854775807)
self.assertEqual(byteBuffer.readLong(), 9223372036854775807)
print_bytearray(byteBuffer.buffer)
byteBuffer.writeLong(-9223372036854775808)
self.assertEqual(byteBuffer.readLong(), -9223372036854775808)
pass
def test_buffer(self):
byteBuffer = ByteBuffer.ByteBuffer()
print(byteBuffer.writeOffset)
print(byteBuffer.readOffset)
print(byteBuffer.getCapacity())
byteBuffer.writeBool(True)
self.assertEqual(byteBuffer.readBool(), True)
byteBuffer.writeByte(-100)
self.assertEqual(byteBuffer.readByte(), -100)
byteBuffer.writeShort(100)
self.assertEqual(byteBuffer.readShort(), 100)
byteBuffer.writeInt(9999)
self.assertEqual(byteBuffer.readInt(), 9999)
byteBuffer.writeInt(-9999)
self.assertEqual(byteBuffer.readInt(), -9999)
byteBuffer.writeLong(9999)
self.assertEqual(byteBuffer.readLong(), 9999)
byteBuffer.writeLong(-9999)
self.assertEqual(byteBuffer.readLong(), -9999)
byteBuffer.writeFloat(99.9)
self.assertTrue(abs(byteBuffer.readFloat() - 99.9) < 0.001)
str = "Hello World!你好"
byteBuffer.writeString(str)
self.assertEqual(byteBuffer.readString(), str)
charStr = "A"
byteBuffer.writeChar(charStr)
self.assertEqual(byteBuffer.readChar(), charStr)
print("----------------------------------------------------------------")
print_bytearray(byteBuffer.buffer)
@@ -0,0 +1,907 @@
import struct
from . import ProtocolManager
maxSize = 655537
maxInt = 2147483647
minInt = -2147483648
maxLong = 9223372036854775807
minLong = -9223372036854775808
empty_str = ""
class ByteBuffer():
writeOffset = 0
readOffset = 0
buffer = bytearray(256)
def setWriteOffset(self, writeOffset):
if writeOffset > len(self.buffer):
raise ValueError("index out of bounds exception:readerIndex:" + str(self.readOffset) +
", writerIndex:" + str(self.writeOffset) +
"(expected:0 <= readerIndex <= writerIndex <= capacity:" + str(len(self.buffer)))
self.writeOffset = writeOffset
def setReadOffset(self, readOffset):
if readOffset > self.writeOffset:
raise ValueError("index out of bounds exception:readerIndex:" + str(self.readOffset) +
", writerIndex:" + str(self.writeOffset) +
"(expected:0 <= readerIndex <= writerIndex <= capacity:" + str(len(self.buffer)))
self.readOffset = readOffset
def isReadable(self):
return self.writeOffset > self.readOffset
pass
def getCapacity(self):
return len(self.buffer) - self.writeOffset
def ensureCapacity(self, minCapacity):
while (minCapacity - self.getCapacity() > 0):
newSize = len(self.buffer) * 2
if newSize > maxSize:
raise ValueError("out of memory error")
# auto grow capacity
self.buffer.extend(bytearray(len(self.buffer)))
def clear(self):
self.writeOffset = 0
self.readOffset = 0
self.buffer = bytearray(256)
pass
def toBytes(self):
return self.buffer[0:self.writeOffset]
def writeBool(self, value):
if value:
self.writeByte(1)
else:
self.writeByte(0)
def readBool(self):
if self.readByte():
return True
else:
return False
def writeByte(self, value):
self.ensureCapacity(1)
struct.pack_into('>b', self.buffer, self.writeOffset, value)
self.writeOffset += 1
def readByte(self):
value = struct.unpack_from('>b', self.buffer, self.readOffset)[0]
self.readOffset += 1
return value
def writeBytes(self, value):
length = len(value)
self.ensureCapacity(length)
self.buffer[self.writeOffset:length] = value[:]
self.writeOffset += length
def writeUByte(self, value):
self.ensureCapacity(1)
struct.pack_into('>B', self.buffer, self.writeOffset, value)
self.writeOffset += 1
def readUByte(self):
value = struct.unpack_from('>B', self.buffer, self.readOffset)[0]
self.readOffset += 1
return value
def writeShort(self, value):
self.ensureCapacity(2)
struct.pack_into('>h', self.buffer, self.writeOffset, value)
self.writeOffset += 2
def readShort(self):
value = struct.unpack_from('>h', self.buffer, self.readOffset)[0]
self.readOffset += 2
return value
def writeRawInt(self, value):
self.ensureCapacity(4)
struct.pack_into('>i', self.buffer, self.writeOffset, value)
self.writeOffset += 4
def readRawInt(self):
value = struct.unpack_from('>i', self.buffer, self.readOffset)[0]
self.readOffset += 4
return value
def writeInt(self, value):
if not (minInt <= value <= maxInt):
raise ValueError('value must range between minInt:-2147483648 and maxInt:2147483647')
value = (value << 1) ^ (value >> 31)
self.ensureCapacity(5)
if value >> 7 == 0:
self.writeUByte(value)
return
if value >> 14 == 0:
self.writeUByte(value & 0x7F | 0x80)
self.writeUByte((value >> 7) & 0x7F)
return
if value >> 21 == 0:
self.writeUByte((value & 0x7F) | 0x80)
self.writeUByte(((value >> 7) & 0x7F | 0x80))
self.writeUByte((value >> 14) & 0x7F)
return
if value >> 28 == 0:
self.writeUByte(value & 0x7F | 0x80)
self.writeUByte(((value >> 7) & 0x7F | 0x80))
self.writeUByte(((value >> 14) & 0x7F | 0x80))
self.writeUByte((value >> 21) & 0x7F)
return
self.writeUByte(value & 0x7F | 0x80)
self.writeUByte(((value >> 7) & 0x7F | 0x80))
self.writeUByte(((value >> 14) & 0x7F | 0x80))
self.writeUByte(((value >> 21) & 0x7F | 0x80))
self.writeUByte((value >> 28) & 0x7F)
pass
def readInt(self):
b = self.readUByte()
value = b & 0x7F
if (b & 0x80) != 0:
b = self.readUByte()
value |= (b & 0x7F) << 7
if (b & 0x80) != 0:
b = self.readUByte()
value |= (b & 0x7F) << 14
if (b & 0x80) != 0:
b = self.readUByte()
value |= (b & 0x7F) << 21
if (b & 0x80) != 0:
b = self.readUByte()
value |= (b & 0x7F) << 28
return (value >> 1) ^ -(value & 1)
def writeLong(self, value):
if not (minLong <= value <= maxLong):
raise ValueError('value must range between minLong:-9223372036854775808 and maxLong:9223372036854775807')
value = (value << 1) ^ (value >> 63)
self.ensureCapacity(9)
if value >> 7 == 0:
self.writeUByte(value)
return
if value >> 14 == 0:
self.writeUByte(value & 0x7F | 0x80)
self.writeUByte((value >> 7) & 0x7F)
return
if value >> 21 == 0:
self.writeUByte((value & 0x7F) | 0x80)
self.writeUByte(((value >> 7) & 0x7F | 0x80))
self.writeUByte((value >> 14) & 0x7F)
return
if value >> 28 == 0:
self.writeUByte(value & 0x7F | 0x80)
self.writeUByte(((value >> 7) & 0x7F | 0x80))
self.writeUByte(((value >> 14) & 0x7F | 0x80))
self.writeUByte((value >> 21) & 0x7F)
return
if value >> 35 == 0:
self.writeUByte(value & 0x7F | 0x80)
self.writeUByte(((value >> 7) & 0x7F | 0x80))
self.writeUByte(((value >> 14) & 0x7F | 0x80))
self.writeUByte(((value >> 21) & 0x7F | 0x80))
self.writeUByte((value >> 28) & 0x7F)
return
if value >> 42 == 0:
self.writeUByte(value & 0x7F | 0x80)
self.writeUByte(((value >> 7) & 0x7F | 0x80))
self.writeUByte(((value >> 14) & 0x7F | 0x80))
self.writeUByte(((value >> 21) & 0x7F | 0x80))
self.writeUByte(((value >> 28) & 0x7F | 0x80))
self.writeUByte((value >> 35) & 0x7F)
return
if value >> 49 == 0:
self.writeUByte(value & 0x7F | 0x80)
self.writeUByte(((value >> 7) & 0x7F | 0x80))
self.writeUByte(((value >> 14) & 0x7F | 0x80))
self.writeUByte(((value >> 21) & 0x7F | 0x80))
self.writeUByte(((value >> 28) & 0x7F | 0x80))
self.writeUByte(((value >> 35) & 0x7F | 0x80))
self.writeUByte((value >> 42) & 0x7F)
return
if value >> 56 == 0:
self.writeUByte(value & 0x7F | 0x80)
self.writeUByte(((value >> 7) & 0x7F | 0x80))
self.writeUByte(((value >> 14) & 0x7F | 0x80))
self.writeUByte(((value >> 21) & 0x7F | 0x80))
self.writeUByte(((value >> 28) & 0x7F | 0x80))
self.writeUByte(((value >> 35) & 0x7F | 0x80))
self.writeUByte(((value >> 42) & 0x7F | 0x80))
self.writeUByte((value >> 49) & 0x7F)
return
self.writeUByte(value & 0x7F | 0x80)
self.writeUByte(((value >> 7) & 0x7F | 0x80))
self.writeUByte(((value >> 14) & 0x7F | 0x80))
self.writeUByte(((value >> 21) & 0x7F | 0x80))
self.writeUByte(((value >> 28) & 0x7F | 0x80))
self.writeUByte(((value >> 35) & 0x7F | 0x80))
self.writeUByte(((value >> 42) & 0x7F | 0x80))
self.writeUByte(((value >> 49) & 0x7F | 0x80))
self.writeUByte(value >> 56)
pass
def readLong(self):
b = self.readUByte()
value = b & 0x7F
if (b & 0x80) != 0:
b = self.readUByte()
value |= (b & 0x7F) << 7
if (b & 0x80) != 0:
b = self.readUByte()
value |= (b & 0x7F) << 14
if (b & 0x80) != 0:
b = self.readUByte()
value |= (b & 0x7F) << 21
if (b & 0x80) != 0:
b = self.readUByte()
value |= (b & 0x7F) << 28
if (b & 0x80) != 0:
b = self.readUByte()
value |= (b & 0x7F) << 35
if (b & 0x80) != 0:
b = self.readUByte()
value |= (b & 0x7F) << 42
if (b & 0x80) != 0:
b = self.readUByte()
value |= (b & 0x7F) << 49
if (b & 0x80) != 0:
b = self.readUByte()
value |= b << 56
return (value >> 1) ^ -(value & 1)
def writeFloat(self, value):
self.ensureCapacity(4)
struct.pack_into('>f', self.buffer, self.writeOffset, value)
self.writeOffset += 4
def readFloat(self):
value = struct.unpack_from('>f', self.buffer, self.readOffset)[0]
self.readOffset += 4
return value
def writeDouble(self, value):
self.ensureCapacity(8)
struct.pack_into('>d', self.buffer, self.writeOffset, value)
self.writeOffset += 8
def readDouble(self):
value = struct.unpack_from('>d', self.buffer, self.readOffset)[0]
self.readOffset += 8
return value
def writeChar(self, value):
if len(value) == 0:
self.writeInt(0)
return
self.writeString(value[0])
def readChar(self):
return self.readString()
def writeString(self, value):
if len(value) == 0:
self.writeInt(0)
return
byte_array = bytearray(value, 'utf-8')
length = len(byte_array)
self.ensureCapacity(5 + length)
self.writeInt(length)
self.buffer[self.writeOffset:self.writeOffset + length] = byte_array[:]
self.writeOffset += length
def readString(self):
length = self.readInt()
if (length <= 0):
return empty_str
byte_array = self.buffer[self.readOffset:self.readOffset + length]
# 使用utf-8编码进行解码
value = byte_array.decode('utf-8')
self.readOffset += length
return value
def writePacketFlag(self, value):
if value is None:
self.writeBool(False)
else:
self.writeBool(True)
pass
def writePacket(self, packet, protocolId):
protocolRegistration = ProtocolManager.getProtocol(protocolId)
protocolRegistration.write(self, packet)
def readPacket(self, protocolId):
protocolRegistration = ProtocolManager.getProtocol(protocolId)
return protocolRegistration.read(self)
def writeBooleanArray(self, array):
if array is None:
self.writeInt(0)
else:
self.writeInt(len(array))
for element in array:
self.writeBool(element)
pass
def readBooleanArray(self):
array = []
size = self.readInt()
if size > 0:
for index in range(size):
array.append(self.readBool())
return array
def writeByteArray(self, array):
if array is None:
self.writeInt(0)
else:
self.writeInt(len(array))
for element in array:
self.writeByte(element)
pass
def readByteArray(self):
array = []
size = self.readInt()
if size > 0:
for index in range(size):
array.append(self.readByte())
return array
def writeShortArray(self, array):
if array is None:
self.writeInt(0)
else:
self.writeInt(len(array))
for element in array:
self.writeShort(element)
pass
def readShortArray(self):
array = []
size = self.readInt()
if size > 0:
for index in range(size):
array.append(self.readShort())
return array
def writeIntArray(self, array):
if array is None:
self.writeInt(0)
else:
self.writeInt(len(array))
for element in array:
self.writeInt(element)
pass
def readIntArray(self):
array = []
size = self.readInt()
if size > 0:
for index in range(size):
array.append(self.readInt())
return array
def writeLongArray(self, array):
if array is None:
self.writeInt(0)
else:
self.writeInt(len(array))
for element in array:
self.writeLong(element)
pass
def readLongArray(self):
array = []
size = self.readInt()
if size > 0:
for index in range(size):
array.append(self.readLong())
return array
def writeFloatArray(self, array):
if array is None:
self.writeInt(0)
else:
self.writeInt(len(array))
for element in array:
self.writeFloat(element)
pass
def readFloatArray(self):
array = []
size = self.readInt()
if size > 0:
for index in range(size):
array.append(self.readFloat())
return array
def writeDoubleArray(self, array):
if array is None:
self.writeInt(0)
else:
self.writeInt(len(array))
for element in array:
self.writeDouble(element)
pass
def readDoubleArray(self):
array = []
size = self.readInt()
if size > 0:
for index in range(size):
array.append(self.readDouble())
return array
def writeCharArray(self, array):
if array is None:
self.writeInt(0)
else:
self.writeInt(len(array))
for element in array:
self.writeChar(element)
pass
def readCharArray(self):
array = []
size = self.readInt()
if size > 0:
for index in range(size):
array.append(self.readChar())
return array
def writeStringArray(self, array):
if array is None:
self.writeInt(0)
else:
self.writeInt(len(array))
for element in array:
self.writeString(element)
pass
def readStringArray(self):
array = []
size = self.readInt()
if size > 0:
for index in range(size):
array.append(self.readString())
return array
def writePacketArray(self, array, protocolId):
if array is None:
self.writeInt(0)
else:
self.writeInt(len(array))
for element in array:
self.writePacket(element, protocolId)
pass
def readPacketArray(self, protocolId):
array = []
size = self.readInt()
if size > 0:
for index in range(size):
array.append(self.readPacket(protocolId))
return array
def writeIntIntMap(self, map):
if map is None:
self.writeInt(0)
else:
self.writeInt(len(map))
for key, value in map.items():
self.writeInt(key)
self.writeInt(value)
pass
def readIntIntMap(self):
map = {}
size = self.readInt()
if size > 0:
for index in range(size):
key = self.readInt()
value = self.readInt()
map[key] = value
return map
def writeIntLongMap(self, map):
if map is None:
self.writeInt(0)
else:
self.writeInt(len(map))
for key, value in map.items():
self.writeInt(key)
self.writeLong(value)
pass
def readIntLongMap(self):
map = {}
size = self.readInt()
if size > 0:
for index in range(size):
key = self.readInt()
value = self.readLong()
map[key] = value
return map
def writeIntStringMap(self, map):
if map is None:
self.writeInt(0)
else:
self.writeInt(len(map))
for key, value in map.items():
self.writeInt(key)
self.writeString(value)
pass
def readIntStringMap(self):
map = {}
size = self.readInt()
if size > 0:
for index in range(size):
key = self.readInt()
value = self.readString()
map[key] = value
return map
def writeIntPacketMap(self, map, protocolId):
if map is None:
self.writeInt(0)
else:
self.writeInt(len(map))
for key, value in map.items():
self.writeInt(key)
self.writePacket(value, protocolId)
pass
def readIntPacketMap(self, protocolId):
map = {}
size = self.readInt()
if size > 0:
for index in range(size):
key = self.readInt()
value = self.readPacket(protocolId)
map[key] = value
return map
def writeLongIntMap(self, map):
if map is None:
self.writeInt(0)
else:
self.writeInt(len(map))
for key, value in map.items():
self.writeLong(key)
self.writeInt(value)
pass
def readLongIntMap(self):
map = {}
size = self.readInt()
if size > 0:
for index in range(size):
key = self.readLong()
value = self.readInt()
map[key] = value
return map
def writeLongLongMap(self, map):
if map is None:
self.writeInt(0)
else:
self.writeInt(len(map))
for key, value in map.items():
self.writeLong(key)
self.writeLong(value)
pass
def readLongLongMap(self):
map = {}
size = self.readInt()
if size > 0:
for index in range(size):
key = self.readLong()
value = self.readLong()
map[key] = value
return map
def writeLongStringMap(self, map):
if map is None:
self.writeInt(0)
else:
self.writeInt(len(map))
for key, value in map.items():
self.writeLong(key)
self.writeString(value)
pass
def readLongStringMap(self):
map = {}
size = self.readInt()
if size > 0:
for index in range(size):
key = self.readLong()
value = self.readString()
map[key] = value
return map
def writeLongPacketMap(self, map, protocolId):
if map is None:
self.writeInt(0)
else:
self.writeInt(len(map))
for key, value in map.items():
self.writeLong(key)
self.writePacket(value, protocolId)
pass
def readLongPacketMap(self, protocolId):
map = {}
size = self.readInt()
if size > 0:
for index in range(size):
key = self.readLong()
value = self.readPacket(protocolId)
map[key] = value
return map
def writeStringIntMap(self, map):
if map is None:
self.writeInt(0)
else:
self.writeInt(len(map))
for key, value in map.items():
self.writeString(key)
self.writeInt(value)
pass
def readStringIntMap(self):
map = {}
size = self.readInt()
if size > 0:
for index in range(size):
key = self.readString()
value = self.readInt()
map[key] = value
return map
def writeStringLongMap(self, map):
if map is None:
self.writeInt(0)
else:
self.writeInt(len(map))
for key, value in map.items():
self.writeString(key)
self.writeLong(value)
pass
def readStringLongMap(self):
map = {}
size = self.readInt()
if size > 0:
for index in range(size):
key = self.readString()
value = self.readLong()
map[key] = value
return map
def writeStringStringMap(self, map):
if map is None:
self.writeInt(0)
else:
self.writeInt(len(map))
for key, value in map.items():
self.writeString(key)
self.writeString(value)
pass
def readStringStringMap(self):
map = {}
size = self.readInt()
if size > 0:
for index in range(size):
key = self.readString()
value = self.readString()
map[key] = value
return map
def writeStringPacketMap(self, map, protocolId):
if map is None:
self.writeInt(0)
else:
self.writeInt(len(map))
for key, value in map.items():
self.writeString(key)
self.writePacket(value, protocolId)
pass
def readStringPacketMap(self, protocolId):
map = {}
size = self.readInt()
if size > 0:
for index in range(size):
key = self.readString()
value = self.readPacket(protocolId)
map[key] = value
return map
def writeBooleanSet(self, value):
if value is None:
self.writeInt(0)
else:
self.writeInt(len(value))
for element in value:
self.writeBool(element)
pass
def readBooleanSet(self):
value = set()
size = self.readInt()
if size > 0:
for index in range(size):
value.add(self.readBool())
return value
def writeByteSet(self, value):
if value is None:
self.writeInt(0)
else:
self.writeInt(len(value))
for element in value:
self.writeByte(element)
pass
def readByteSet(self):
value = set()
size = self.readInt()
if size > 0:
for index in range(size):
value.add(self.readByte())
return value
def writeShortSet(self, value):
if value is None:
self.writeInt(0)
else:
self.writeInt(len(value))
for element in value:
self.writeShort(element)
pass
def readShortSet(self):
value = set()
size = self.readInt()
if size > 0:
for index in range(size):
value.add(self.readShort())
return value
def writeIntSet(self, value):
if value is None:
self.writeInt(0)
else:
self.writeInt(len(value))
for element in value:
self.writeInt(element)
pass
def readIntSet(self):
value = set()
size = self.readInt()
if size > 0:
for index in range(size):
value.add(self.readInt())
return value
def writeLongSet(self, value):
if value is None:
self.writeInt(0)
else:
self.writeInt(len(value))
for element in value:
self.writeLong(element)
pass
def readLongSet(self):
value = set()
size = self.readInt()
if size > 0:
for index in range(size):
value.add(self.readLong())
return value
def writeFloatSet(self, value):
if value is None:
self.writeInt(0)
else:
self.writeInt(len(value))
for element in value:
self.writeFloat(element)
pass
def readFloatSet(self):
value = set()
size = self.readInt()
if size > 0:
for index in range(size):
value.add(self.readFloat())
return value
def writeDoubleSet(self, value):
if value is None:
self.writeInt(0)
else:
self.writeInt(len(value))
for element in value:
self.writeDouble(element)
pass
def readDoubleSet(self):
value = set()
size = self.readInt()
if size > 0:
for index in range(size):
value.add(self.readDouble())
return value
def writeCharSet(self, value):
if value is None:
self.writeInt(0)
else:
self.writeInt(len(value))
for element in value:
self.writeChar(element)
pass
def readCharValue(self):
value = set()
size = self.readInt()
if size > 0:
for index in range(size):
value.add(self.readChar())
return value
def writeStringSet(self, value):
if value is None:
self.writeInt(0)
else:
self.writeInt(len(value))
for element in value:
self.writeString(element)
pass
def readStringSet(self):
value = set()
size = self.readInt()
if size > 0:
for index in range(size):
value.add(self.readString())
return value
def writePacketSet(self, value, protocolId):
if value is None:
self.writeInt(0)
else:
self.writeInt(len(value))
for element in value:
self.writePacket(element, protocolId)
pass
def readPacketSet(self, protocolId):
value = set()
size = self.readInt()
if size > 0:
for index in range(size):
value.add(self.readPacket(protocolId))
return value
@@ -0,0 +1,411 @@
# 复杂的对象,包括了各种复杂的结构,数组,List,Set,Map
class ComplexObject:
# byte类型,最简单的整形
a = 0 # byte
# byte的包装类型,优先使用基础类型,包装类型会有装箱拆箱
aa = 0 # byte
# 数组类型
aaa = [] # byte[]
aaaa = [] # byte[]
b = 0 # short
bb = 0 # short
bbb = [] # short[]
bbbb = [] # short[]
c = 0 # int
cc = 0 # int
ccc = [] # int[]
cccc = [] # int[]
d = 0 # long
dd = 0 # long
ddd = [] # long[]
dddd = [] # long[]
e = 0.0 # float
ee = 0.0 # float
eee = [] # float[]
eeee = [] # float[]
f = 0.0 # double
ff = 0.0 # double
fff = [] # double[]
ffff = [] # double[]
g = False # bool
gg = False # bool
ggg = [] # bool[]
gggg = [] # bool[]
h = "" # char
hh = "" # char
hhh = [] # char[]
hhhh = [] # char[]
jj = "" # string
jjj = [] # string[]
kk = None # ObjectA
kkk = [] # ObjectA[]
l = [] # List<int>
ll = [] # List<List<List<int>>>
lll = [] # List<List<ObjectA>>
llll = [] # List<string>
lllll = [] # List<Dictionary<int, string>>
m = {} # Dictionary<int, string>
mm = {} # Dictionary<int, ObjectA>
mmm = {} # Dictionary<ObjectA, List<int>>
mmmm = {} # Dictionary<List<List<ObjectA>>, List<List<List<int>>>>
mmmmm = {} # Dictionary<List<Dictionary<int, string>>, HashSet<Dictionary<int, string>>>
s = {} # HashSet<int>
ss = {} # HashSet<HashSet<List<int>>>
sss = {} # HashSet<HashSet<ObjectA>>
ssss = {} # HashSet<string>
sssss = {} # HashSet<Dictionary<int, string>>
# 如果要修改协议并且兼容老协议,需要加上Compatible注解,按照增加的顺序添加order
myCompatible = 0 # int
myObject = None # ObjectA
def protocolId(self):
return 100
@classmethod
def write(cls, buffer, packet):
if buffer.writePacketFlag(packet):
return
buffer.writeByte(packet.a)
buffer.writeByte(packet.aa)
buffer.writeByteArray(packet.aaa)
buffer.writeByteArray(packet.aaaa)
buffer.writeShort(packet.b)
buffer.writeShort(packet.bb)
buffer.writeShortArray(packet.bbb)
buffer.writeShortArray(packet.bbbb)
buffer.writeInt(packet.c)
buffer.writeInt(packet.cc)
buffer.writeIntArray(packet.ccc)
buffer.writeIntArray(packet.cccc)
buffer.writeLong(packet.d)
buffer.writeLong(packet.dd)
buffer.writeLongArray(packet.ddd)
buffer.writeLongArray(packet.dddd)
buffer.writeFloat(packet.e)
buffer.writeFloat(packet.ee)
buffer.writeFloatArray(packet.eee)
buffer.writeFloatArray(packet.eeee)
buffer.writeDouble(packet.f)
buffer.writeDouble(packet.ff)
buffer.writeDoubleArray(packet.fff)
buffer.writeDoubleArray(packet.ffff)
buffer.writeBool(packet.g)
buffer.writeBool(packet.gg)
buffer.writeBooleanArray(packet.ggg)
buffer.writeBooleanArray(packet.gggg)
buffer.writeChar(packet.h)
buffer.writeChar(packet.hh)
buffer.writeCharArray(packet.hhh)
buffer.writeCharArray(packet.hhhh)
buffer.writeString(packet.jj)
buffer.writeStringArray(packet.jjj)
buffer.writePacket(packet.kk, 102)
buffer.writePacketArray(packet.kkk, 102)
buffer.writeIntArray(packet.l)
if packet.ll is None:
buffer.writeInt(0)
else:
buffer.writeInt(len(packet.ll))
for element0 in packet.ll:
if element0 is None:
buffer.writeInt(0)
else:
buffer.writeInt(len(element0))
for element1 in element0:
buffer.writeIntArray(element1)
if packet.lll is None:
buffer.writeInt(0)
else:
buffer.writeInt(len(packet.lll))
for element2 in packet.lll:
buffer.writePacketArray(element2, 102)
buffer.writeStringArray(packet.llll)
if packet.lllll is None:
buffer.writeInt(0)
else:
buffer.writeInt(len(packet.lllll))
for element3 in packet.lllll:
buffer.writeIntStringMap(element3)
buffer.writeIntStringMap(packet.m)
buffer.writeIntPacketMap(packet.mm, 102)
if packet.mmm is None:
buffer.writeInt(0)
else:
buffer.writeInt(len(packet.mmm))
for key4 in packet.mmm:
value5 = packet.mmm[key4]
buffer.writePacket(key4, 102)
buffer.writeIntArray(value5)
if packet.mmmm is None:
buffer.writeInt(0)
else:
buffer.writeInt(len(packet.mmmm))
for key6 in packet.mmmm:
value7 = packet.mmmm[key6]
if key6 is None:
buffer.writeInt(0)
else:
buffer.writeInt(len(key6))
for element8 in key6:
buffer.writePacketArray(element8, 102)
if value7 is None:
buffer.writeInt(0)
else:
buffer.writeInt(len(value7))
for element9 in value7:
if element9 is None:
buffer.writeInt(0)
else:
buffer.writeInt(len(element9))
for element10 in element9:
buffer.writeIntArray(element10)
if packet.mmmmm is None:
buffer.writeInt(0)
else:
buffer.writeInt(len(packet.mmmmm))
for key11 in packet.mmmmm:
value12 = packet.mmmmm[key11]
if key11 is None:
buffer.writeInt(0)
else:
buffer.writeInt(len(key11))
for element13 in key11:
buffer.writeIntStringMap(element13)
if value12 is None:
buffer.writeInt(0)
else:
buffer.writeInt(len(value12))
for element14 in value12:
buffer.writeIntStringMap(element14)
buffer.writeIntSet(packet.s)
if packet.ss is None:
buffer.writeInt(0)
else:
buffer.writeInt(len(packet.ss))
for element15 in packet.ss:
if element15 is None:
buffer.writeInt(0)
else:
buffer.writeInt(len(element15))
for element16 in element15:
buffer.writeIntArray(element16)
if packet.sss is None:
buffer.writeInt(0)
else:
buffer.writeInt(len(packet.sss))
for element17 in packet.sss:
buffer.writePacketSet(element17, 102)
buffer.writeStringSet(packet.ssss)
if packet.sssss is None:
buffer.writeInt(0)
else:
buffer.writeInt(len(packet.sssss))
for element18 in packet.sssss:
buffer.writeIntStringMap(element18)
buffer.writeInt(packet.myCompatible)
buffer.writePacket(packet.myObject, 102)
pass
@classmethod
def read(cls, buffer):
if not buffer.readBool():
return None
packet = ComplexObject()
result19 = buffer.readByte()
packet.a = result19
result20 = buffer.readByte()
packet.aa = result20
array21 = buffer.readByteArray()
packet.aaa = array21
array22 = buffer.readByteArray()
packet.aaaa = array22
result23 = buffer.readShort()
packet.b = result23
result24 = buffer.readShort()
packet.bb = result24
array25 = buffer.readShortArray()
packet.bbb = array25
array26 = buffer.readShortArray()
packet.bbbb = array26
result27 = buffer.readInt()
packet.c = result27
result28 = buffer.readInt()
packet.cc = result28
array29 = buffer.readIntArray()
packet.ccc = array29
array30 = buffer.readIntArray()
packet.cccc = array30
result31 = buffer.readLong()
packet.d = result31
result32 = buffer.readLong()
packet.dd = result32
array33 = buffer.readLongArray()
packet.ddd = array33
array34 = buffer.readLongArray()
packet.dddd = array34
result35 = buffer.readFloat()
packet.e = result35
result36 = buffer.readFloat()
packet.ee = result36
array37 = buffer.readFloatArray()
packet.eee = array37
array38 = buffer.readFloatArray()
packet.eeee = array38
result39 = buffer.readDouble()
packet.f = result39
result40 = buffer.readDouble()
packet.ff = result40
array41 = buffer.readDoubleArray()
packet.fff = array41
array42 = buffer.readDoubleArray()
packet.ffff = array42
result43 = buffer.readBool()
packet.g = result43
result44 = buffer.readBool()
packet.gg = result44
array45 = buffer.readBooleanArray()
packet.ggg = array45
array46 = buffer.readBooleanArray()
packet.gggg = array46
result47 = buffer.readChar()
packet.h = result47
result48 = buffer.readChar()
packet.hh = result48
array49 = buffer.readCharArray()
packet.hhh = array49
array50 = buffer.readCharArray()
packet.hhhh = array50
result51 = buffer.readString()
packet.jj = result51
array52 = buffer.readStringArray()
packet.jjj = array52
result53 = buffer.readPacket(102)
packet.kk = result53
array54 = buffer.readPacketArray(102)
packet.kkk = array54
list55 = buffer.readIntArray()
packet.l = list55
result56 = []
size58 = buffer.readInt()
if size58 > 0:
for index57 in range(size58):
result59 = []
size61 = buffer.readInt()
if size61 > 0:
for index60 in range(size61):
list62 = buffer.readIntArray()
result59.append(list62)
result56.append(result59)
packet.ll = result56
result63 = []
size65 = buffer.readInt()
if size65 > 0:
for index64 in range(size65):
list66 = buffer.readPacketArray(102)
result63.append(list66)
packet.lll = result63
list67 = buffer.readStringArray()
packet.llll = list67
result68 = []
size70 = buffer.readInt()
if size70 > 0:
for index69 in range(size70):
map71 = buffer.readIntStringMap()
result68.append(map71)
packet.lllll = result68
map72 = buffer.readIntStringMap()
packet.m = map72
map73 = buffer.readIntPacketMap(102)
packet.mm = map73
result74 = {}
size75 = buffer.readInt()
if size75 > 0:
for index76 in range(size75):
result77 = buffer.readPacket(102)
list78 = buffer.readIntArray()
result74[result77] = list78
packet.mmm = result74
result79 = {}
size80 = buffer.readInt()
if size80 > 0:
for index81 in range(size80):
result82 = []
size84 = buffer.readInt()
if size84 > 0:
for index83 in range(size84):
list85 = buffer.readPacketArray(102)
result82.append(list85)
result86 = []
size88 = buffer.readInt()
if size88 > 0:
for index87 in range(size88):
result89 = []
size91 = buffer.readInt()
if size91 > 0:
for index90 in range(size91):
list92 = buffer.readIntArray()
result89.append(list92)
result86.append(result89)
result79[result82] = result86
packet.mmmm = result79
result93 = {}
size94 = buffer.readInt()
if size94 > 0:
for index95 in range(size94):
result96 = []
size98 = buffer.readInt()
if size98 > 0:
for index97 in range(size98):
map99 = buffer.readIntStringMap()
result96.append(map99)
result100 = []
size102 = buffer.readInt()
if size102 > 0:
for index101 in range(size102):
map103 = buffer.readIntStringMap()
result100.append(map103)
result93[result96] = result100
packet.mmmmm = result93
set104 = buffer.readIntSet()
packet.s = set104
result105 = []
size107 = buffer.readInt()
if size107 > 0:
for index106 in range(size107):
result108 = []
size110 = buffer.readInt()
if size110 > 0:
for index109 in range(size110):
list111 = buffer.readIntArray()
result108.append(list111)
result105.append(result108)
packet.ss = result105
result112 = []
size114 = buffer.readInt()
if size114 > 0:
for index113 in range(size114):
set115 = buffer.readPacketSet(102)
result112.append(set115)
packet.sss = result112
set116 = buffer.readStringSet()
packet.ssss = set116
result117 = []
size119 = buffer.readInt()
if size119 > 0:
for index118 in range(size119):
map120 = buffer.readIntStringMap()
result117.append(map120)
packet.sssss = result117
if not buffer.isReadable():
return packet
pass
result121 = buffer.readInt()
packet.myCompatible = result121
if not buffer.isReadable():
return packet
pass
result122 = buffer.readPacket(102)
packet.myObject = result122
return packet
@@ -0,0 +1,92 @@
class NormalObject:
a = 0 # byte
aaa = [] # byte[]
b = 0 # short
c = 0 # int
d = 0 # long
e = 0.0 # float
f = 0.0 # double
g = False # bool
jj = "" # string
kk = None # ObjectA
l = [] # List<int>
ll = [] # List<long>
lll = [] # List<ObjectA>
llll = [] # List<string>
m = {} # Dictionary<int, string>
mm = {} # Dictionary<int, ObjectA>
s = {} # HashSet<int>
ssss = {} # HashSet<string>
def protocolId(self):
return 101
@classmethod
def write(cls, buffer, packet):
if buffer.writePacketFlag(packet):
return
buffer.writeByte(packet.a)
buffer.writeByteArray(packet.aaa)
buffer.writeShort(packet.b)
buffer.writeInt(packet.c)
buffer.writeLong(packet.d)
buffer.writeFloat(packet.e)
buffer.writeDouble(packet.f)
buffer.writeBool(packet.g)
buffer.writeString(packet.jj)
buffer.writePacket(packet.kk, 102)
buffer.writeIntArray(packet.l)
buffer.writeLongArray(packet.ll)
buffer.writePacketArray(packet.lll, 102)
buffer.writeStringArray(packet.llll)
buffer.writeIntStringMap(packet.m)
buffer.writeIntPacketMap(packet.mm, 102)
buffer.writeIntSet(packet.s)
buffer.writeStringSet(packet.ssss)
pass
@classmethod
def read(cls, buffer):
if not buffer.readBool():
return None
packet = NormalObject()
result0 = buffer.readByte()
packet.a = result0
array1 = buffer.readByteArray()
packet.aaa = array1
result2 = buffer.readShort()
packet.b = result2
result3 = buffer.readInt()
packet.c = result3
result4 = buffer.readLong()
packet.d = result4
result5 = buffer.readFloat()
packet.e = result5
result6 = buffer.readDouble()
packet.f = result6
result7 = buffer.readBool()
packet.g = result7
result8 = buffer.readString()
packet.jj = result8
result9 = buffer.readPacket(102)
packet.kk = result9
list10 = buffer.readIntArray()
packet.l = list10
list11 = buffer.readLongArray()
packet.ll = list11
list12 = buffer.readPacketArray(102)
packet.lll = list12
list13 = buffer.readStringArray()
packet.llll = list13
map14 = buffer.readIntStringMap()
packet.m = map14
map15 = buffer.readIntPacketMap(102)
packet.mm = map15
set16 = buffer.readIntSet()
packet.s = set16
set17 = buffer.readStringSet()
packet.ssss = set17
return packet
@@ -0,0 +1,32 @@
class ObjectA:
a = 0 # int
m = {} # Dictionary<int, string>
objectB = None # ObjectB
def protocolId(self):
return 102
@classmethod
def write(cls, buffer, packet):
if buffer.writePacketFlag(packet):
return
buffer.writeInt(packet.a)
buffer.writeIntStringMap(packet.m)
buffer.writePacket(packet.objectB, 103)
pass
@classmethod
def read(cls, buffer):
if not buffer.readBool():
return None
packet = ObjectA()
result0 = buffer.readInt()
packet.a = result0
map1 = buffer.readIntStringMap()
packet.m = map1
result2 = buffer.readPacket(103)
packet.objectB = result2
return packet
@@ -0,0 +1,24 @@
class ObjectB:
flag = False # bool
def protocolId(self):
return 103
@classmethod
def write(cls, buffer, packet):
if buffer.writePacketFlag(packet):
return
buffer.writeBool(packet.flag)
pass
@classmethod
def read(cls, buffer):
if not buffer.readBool():
return None
packet = ObjectB()
result0 = buffer.readBool()
packet.flag = result0
return packet
@@ -0,0 +1,30 @@
from . import VeryBigObject
from . import ComplexObject
from . import NormalObject
from . import ObjectA
from . import ObjectB
from . import SimpleObject
protocols = {}
protocols[0] = VeryBigObject.VeryBigObject
protocols[100] = ComplexObject.ComplexObject
protocols[101] = NormalObject.NormalObject
protocols[102] = ObjectA.ObjectA
protocols[103] = ObjectB.ObjectB
protocols[104] = SimpleObject.SimpleObject
def getProtocol(protocolId):
return protocols[protocolId]
def write(buffer, packet):
protocolId = packet.protocolId()
buffer.writeShort(protocolId)
protocol = protocols[protocolId]
protocol.write(buffer, packet)
def read(buffer):
protocolId = buffer.readShort()
protocol = protocols[protocolId]
packet = protocol.read(buffer)
return packet
@@ -0,0 +1,28 @@
class SimpleObject:
c = 0 # int
g = False # bool
def protocolId(self):
return 104
@classmethod
def write(cls, buffer, packet):
if buffer.writePacketFlag(packet):
return
buffer.writeInt(packet.c)
buffer.writeBool(packet.g)
pass
@classmethod
def read(cls, buffer):
if not buffer.readBool():
return None
packet = SimpleObject()
result0 = buffer.readInt()
packet.c = result0
result1 = buffer.readBool()
packet.g = result1
return packet
File diff suppressed because it is too large Load Diff