perf[golang]: go的zfoo服务器,使用zfoo协议

This commit is contained in:
godotg
2022-09-15 21:40:34 +08:00
parent a68c948498
commit de37816d5c
14 changed files with 107 additions and 162 deletions
+13 -8
View File
@@ -163,15 +163,20 @@ func (byteBuffer *ByteBuffer) ReadShort() int16 {
return value
}
func (byteBuffer *ByteBuffer) WriteRawInt32(intValue int32) {
byteBuffer.WriteUByte(byte(intValue >> 24))
byteBuffer.WriteUByte(byte(intValue >> 16))
byteBuffer.WriteUByte(byte(intValue >> 8))
byteBuffer.WriteUByte(byte(intValue))
func (byteBuffer *ByteBuffer) WriteRawInt32(value int32) {
byteBuffer.EnsureCapacity(4)
var bytesBuffer = bytes.NewBuffer([]byte{})
binary.Write(bytesBuffer, binary.BigEndian, value)
var byteArray = bytesBuffer.Bytes()
byteBuffer.WriteUBytes(byteArray)
}
func (byteBuffer *ByteBuffer) ReadRawInt32() int32 {
return int32(uint32(byteBuffer.ReadUByte())<<24 | uint32(byteBuffer.ReadUByte())<<16 | uint32(byteBuffer.ReadUByte())<<8 | uint32(byteBuffer.ReadUByte()))
var byteArray = byteBuffer.ReadUBytes(4)
bytesBuffer := bytes.NewBuffer(byteArray)
var value int32
binary.Read(bytesBuffer, binary.BigEndian, &value)
return value
}
func (byteBuffer *ByteBuffer) WriteInt(intValue int) {
@@ -444,7 +449,7 @@ func (byteBuffer *ByteBuffer) ReadPacket(protocolId int16) any {
// -------------------------------------------------IProtocolRegistration-------------------------------------------------
type IProtocolRegistration interface {
protocolId() int16
ProtocolId() int16
write(buffer *ByteBuffer, packet any)
@@ -459,7 +464,7 @@ func GetProtocol(protocolId int16) IProtocolRegistration {
}
func Write(buffer *ByteBuffer, packet any) {
var protocolId = packet.(IProtocolRegistration).protocolId()
var protocolId = packet.(IProtocolRegistration).ProtocolId()
buffer.WriteShort(protocolId)
var protocolRegistration = GetProtocol(protocolId)
protocolRegistration.write(buffer, packet)
+11 -11
View File
@@ -6,7 +6,7 @@ type Message struct {
Module int8
}
func (protocol Message) protocolId() int16 {
func (protocol Message) ProtocolId() int16 {
return 100
}
@@ -41,7 +41,7 @@ type Error struct {
Module int
}
func (protocol Error) protocolId() int16 {
func (protocol Error) ProtocolId() int16 {
return 101
}
@@ -74,7 +74,7 @@ type Heartbeat struct {
}
func (protocol Heartbeat) protocolId() int16 {
func (protocol Heartbeat) ProtocolId() int16 {
return 102
}
@@ -97,7 +97,7 @@ type Ping struct {
}
func (protocol Ping) protocolId() int16 {
func (protocol Ping) ProtocolId() int16 {
return 103
}
@@ -120,7 +120,7 @@ type Pong struct {
Time int64
}
func (protocol Pong) protocolId() int16 {
func (protocol Pong) ProtocolId() int16 {
return 104
}
@@ -148,7 +148,7 @@ type PairLong struct {
Value int64
}
func (protocol PairLong) protocolId() int16 {
func (protocol PairLong) ProtocolId() int16 {
return 111
}
@@ -179,7 +179,7 @@ type PairString struct {
Value string
}
func (protocol PairString) protocolId() int16 {
func (protocol PairString) ProtocolId() int16 {
return 112
}
@@ -210,7 +210,7 @@ type PairLS struct {
Value string
}
func (protocol PairLS) protocolId() int16 {
func (protocol PairLS) ProtocolId() int16 {
return 113
}
@@ -242,7 +242,7 @@ type TripleLong struct {
Right int64
}
func (protocol TripleLong) protocolId() int16 {
func (protocol TripleLong) ProtocolId() int16 {
return 114
}
@@ -277,7 +277,7 @@ type TripleString struct {
Right string
}
func (protocol TripleString) protocolId() int16 {
func (protocol TripleString) ProtocolId() int16 {
return 115
}
@@ -312,7 +312,7 @@ type TripleLSS struct {
Right string
}
func (protocol TripleLSS) protocolId() int16 {
func (protocol TripleLSS) ProtocolId() int16 {
return 116
}
+2 -2
View File
@@ -4,7 +4,7 @@ type GatewayToProviderRequest struct {
Message string
}
func (protocol GatewayToProviderRequest) protocolId() int16 {
func (protocol GatewayToProviderRequest) ProtocolId() int16 {
return 5000
}
@@ -31,7 +31,7 @@ type GatewayToProviderResponse struct {
Message string
}
func (protocol GatewayToProviderResponse) protocolId() int16 {
func (protocol GatewayToProviderResponse) ProtocolId() int16 {
return 5001
}
+2 -2
View File
@@ -4,7 +4,7 @@ type JProtobufHelloRequest struct {
Message string
}
func (protocol JProtobufHelloRequest) protocolId() int16 {
func (protocol JProtobufHelloRequest) ProtocolId() int16 {
return 1500
}
@@ -31,7 +31,7 @@ type JProtobufHelloResponse struct {
Message string
}
func (protocol JProtobufHelloResponse) protocolId() int16 {
func (protocol JProtobufHelloResponse) ProtocolId() int16 {
return 1501
}
+2 -2
View File
@@ -4,7 +4,7 @@ type JsonHelloRequest struct {
Message string
}
func (protocol JsonHelloRequest) protocolId() int16 {
func (protocol JsonHelloRequest) ProtocolId() int16 {
return 1600
}
@@ -31,7 +31,7 @@ type JsonHelloResponse struct {
Message string
}
func (protocol JsonHelloResponse) protocolId() int16 {
func (protocol JsonHelloResponse) ProtocolId() int16 {
return 1601
}
+2 -2
View File
@@ -4,7 +4,7 @@ type TcpHelloRequest struct {
Message string
}
func (protocol TcpHelloRequest) protocolId() int16 {
func (protocol TcpHelloRequest) ProtocolId() int16 {
return 1300
}
@@ -31,7 +31,7 @@ type TcpHelloResponse struct {
Message string
}
func (protocol TcpHelloResponse) protocolId() int16 {
func (protocol TcpHelloResponse) ProtocolId() int16 {
return 1301
}
+2 -2
View File
@@ -4,7 +4,7 @@ type UdpHelloRequest struct {
Message string
}
func (protocol UdpHelloRequest) protocolId() int16 {
func (protocol UdpHelloRequest) ProtocolId() int16 {
return 1200
}
@@ -31,7 +31,7 @@ type UdpHelloResponse struct {
Message string
}
func (protocol UdpHelloResponse) protocolId() int16 {
func (protocol UdpHelloResponse) ProtocolId() int16 {
return 1201
}
+15 -43
View File
@@ -13,53 +13,25 @@
package znet
import (
"bytes"
"encoding/binary"
protocol "gonet/goProtocol"
)
// Encode from Packet to []byte
func Encode(msg *Packet) ([]byte, error) {
buffer := new(bytes.Buffer)
err := binary.Write(buffer, binary.LittleEndian, msg.length)
if err != nil {
return nil, err
}
err = binary.Write(buffer, binary.LittleEndian, msg.protocolId)
if err != nil {
return nil, err
}
err = binary.Write(buffer, binary.LittleEndian, msg.data)
if err != nil {
return nil, err
}
return buffer.Bytes(), nil
func Encode(packet any) *protocol.ByteBuffer {
var buffer = new(protocol.ByteBuffer)
buffer.WriteRawInt32(0)
protocol.Write(buffer, packet)
var writeOffset = buffer.WriteOffset()
buffer.SetWriteOffset(0)
buffer.WriteRawInt32(int32(writeOffset - 4))
buffer.SetWriteOffset(writeOffset)
return buffer
}
// Decode from []byte to Packet
func Decode(data []byte) (*Packet, error) {
bufReader := bytes.NewReader(data)
dataSize := len(data)
// 读取消息ID
var protocolId int16
err := binary.Read(bufReader, binary.LittleEndian, &protocolId)
if err != nil {
return nil, err
}
// 读取数据
dataBufLength := dataSize - 2 - 4
dataBuf := make([]byte, dataBufLength)
err = binary.Read(bufReader, binary.LittleEndian, &dataBuf)
if err != nil {
return nil, err
}
message := &Packet{}
message.length = int32(dataSize)
message.protocolId = protocolId
message.data = dataBuf
return message, nil
func Decode(data []byte) any {
var buffer = new(protocol.ByteBuffer)
buffer.WriteUBytes(data)
var packet = protocol.Read(buffer)
return packet
}
+9 -22
View File
@@ -26,10 +26,10 @@ type Conn struct {
sid string
rawConn net.Conn
sendCh chan []byte
messageCh chan any
done chan error
hbTimer *time.Timer
name string
messageCh chan *Packet
hbInterval time.Duration
hbTimeout time.Duration
}
@@ -45,7 +45,7 @@ func NewConn(c net.Conn, hbInterval time.Duration, hbTimeout time.Duration) *Con
rawConn: c,
sendCh: make(chan []byte, 100),
done: make(chan error),
messageCh: make(chan *Packet, 100),
messageCh: make(chan any, 100),
hbInterval: hbInterval,
hbTimeout: hbTimeout,
}
@@ -68,12 +68,8 @@ func (c *Conn) Close() {
// SendMessage send message
func (c *Conn) SendMessage(msg *Packet) error {
pkg, err := Encode(msg)
if err != nil {
return err
}
c.sendCh <- pkg
var buffer = Encode(msg)
c.sendCh <- buffer.ToBytes()
return nil
}
@@ -135,32 +131,23 @@ func (c *Conn) readCoroutine(ctx context.Context) {
bufReader := bytes.NewReader(buf)
var dataSize int32
err = binary.Read(bufReader, binary.LittleEndian, &dataSize)
err = binary.Read(bufReader, binary.BigEndian, &dataSize)
if err != nil {
c.done <- err
continue
}
// 读取数据
databuf := make([]byte, dataSize)
_, err = io.ReadFull(c.rawConn, databuf)
var bytes = make([]byte, dataSize)
_, err = io.ReadFull(c.rawConn, bytes)
if err != nil {
c.done <- err
continue
}
// 解码
msg, err := Decode(databuf)
if err != nil {
c.done <- err
continue
}
if msg.protocolId == MsgHeartbeat {
continue
}
c.messageCh <- msg
var packet = Decode(bytes)
c.messageCh <- packet
}
}
}
+12 -16
View File
@@ -21,7 +21,7 @@ import (
// SocketService struct
type SocketService struct {
onMessage func(*Session, *Packet)
onMessage func(*Session, any)
onConnect func(*Session)
onDisconnect func(*Session, error)
sessions *sync.Map
@@ -33,30 +33,26 @@ type SocketService struct {
stopCh chan error
}
// NewSocketService create a new socket service
func NewSocketService(laddr string) (*SocketService, error) {
// Server create a new socket service
func Server(laddr string) *SocketService {
l, err := net.Listen("tcp", laddr)
listen, _ := net.Listen("tcp", laddr)
if err != nil {
return nil, err
}
s := &SocketService{
server := &SocketService{
sessions: &sync.Map{},
stopCh: make(chan error),
hbInterval: 0 * time.Second,
hbTimeout: 0 * time.Second,
laddr: laddr,
status: STInited,
listener: l,
listener: listen,
}
return s, nil
return server
}
// RegMessageHandler register message handler
func (s *SocketService) RegMessageHandler(handler func(*Session, *Packet)) {
func (s *SocketService) RegMessageHandler(handler func(*Session, any)) {
s.onMessage = handler
}
@@ -70,8 +66,8 @@ func (s *SocketService) RegDisconnectHandler(handler func(*Session, error)) {
s.onDisconnect = handler
}
// Serv Start socket service
func (s *SocketService) Serv() {
// Start Start socket service
func (s *SocketService) Start() {
s.status = STRunning
ctx, cancel := context.WithCancel(context.Background())
@@ -134,9 +130,9 @@ func (s *SocketService) connectHandler(ctx context.Context, c net.Conn) {
}
return
case msg := <-conn.messageCh:
case packet := <-conn.messageCh:
if s.onMessage != nil {
s.onMessage(session, msg)
s.onMessage(session, packet)
}
}
}
+22 -42
View File
@@ -12,43 +12,30 @@
package znet
import (
protocol "gonet/goProtocol"
"fmt"
protocol "gonet/goProtocol"
"net"
"testing"
"time"
)
func TestService(t *testing.T) {
host := "127.0.0.1:18787"
host := "127.0.0.1:9000"
ss, err := NewSocketService(host)
if err != nil {
return
}
server, _ := Server(host)
server.RegMessageHandler(HandleMessage)
server.RegConnectHandler(HandleConnect)
server.RegDisconnectHandler(HandleDisconnect)
// ss.SetHeartBeat(5*time.Second, 30*time.Second)
server.Start()
ss.RegMessageHandler(HandleMessage)
ss.RegConnectHandler(HandleConnect)
ss.RegDisconnectHandler(HandleDisconnect)
go NewClientConnect()
timer := time.NewTimer(time.Second * 1)
go func() {
<-timer.C
ss.Stop("stop service")
t.Log("service stoped")
}()
t.Log("service running on " + host)
ss.Serv()
// clientTest()
}
func HandleMessage(s *Session, msg *Packet) {
fmt.Println("receive protocolId:", msg)
fmt.Println("receive data:", string(msg.data))
func HandleMessage(s *Session, packet any) {
fmt.Println("receive packet")
fmt.Println(packet)
}
func HandleDisconnect(s *Session, err error) {
@@ -59,26 +46,19 @@ func HandleConnect(s *Session) {
fmt.Println(s.conn.GetName() + " connected.")
}
func NewClientConnect() {
host := "127.0.0.1:18787"
tcpAddr, err := net.ResolveTCPAddr("tcp", host)
if err != nil {
return
}
func clientTest() {
host := "127.0.0.1:9000"
tcpAddr, _ := net.ResolveTCPAddr("tcp", host)
conn, err := net.DialTCP("tcp", nil, tcpAddr)
if err != nil {
return
}
conn, _ := net.DialTCP("tcp", nil, tcpAddr)
var buffer = new(protocol.ByteBuffer)
var packet = new(protocol.TcpHelloRequest)
packet.Message = "Hello, This is Golang Client"
fmt.Println("send message")
msg := NewMessage(1, []byte("Hello Zero!"))
data, err := Encode(msg)
if err != nil {
return
}
conn.Write(data)
var buffer = Encode(packet)
conn.Write(buffer.ToBytes())
time.Sleep(time.Millisecond * 5000)
}
+13 -8
View File
@@ -163,15 +163,20 @@ func (byteBuffer *ByteBuffer) ReadShort() int16 {
return value
}
func (byteBuffer *ByteBuffer) WriteRawInt32(intValue int32) {
byteBuffer.WriteUByte(byte(intValue >> 24))
byteBuffer.WriteUByte(byte(intValue >> 16))
byteBuffer.WriteUByte(byte(intValue >> 8))
byteBuffer.WriteUByte(byte(intValue))
func (byteBuffer *ByteBuffer) WriteRawInt32(value int32) {
byteBuffer.EnsureCapacity(4)
var bytesBuffer = bytes.NewBuffer([]byte{})
binary.Write(bytesBuffer, binary.BigEndian, value)
var byteArray = bytesBuffer.Bytes()
byteBuffer.WriteUBytes(byteArray)
}
func (byteBuffer *ByteBuffer) ReadRawInt32() int32 {
return int32(uint32(byteBuffer.ReadUByte())<<24 | uint32(byteBuffer.ReadUByte())<<16 | uint32(byteBuffer.ReadUByte())<<8 | uint32(byteBuffer.ReadUByte()))
var byteArray = byteBuffer.ReadUBytes(4)
bytesBuffer := bytes.NewBuffer(byteArray)
var value int32
binary.Read(bytesBuffer, binary.BigEndian, &value)
return value
}
func (byteBuffer *ByteBuffer) WriteInt(intValue int) {
@@ -444,7 +449,7 @@ func (byteBuffer *ByteBuffer) ReadPacket(protocolId int16) any {
// -------------------------------------------------IProtocolRegistration-------------------------------------------------
type IProtocolRegistration interface {
protocolId() int16
ProtocolId() int16
write(buffer *ByteBuffer, packet any)
@@ -459,7 +464,7 @@ func GetProtocol(protocolId int16) IProtocolRegistration {
}
func Write(buffer *ByteBuffer, packet any) {
var protocolId = packet.(IProtocolRegistration).protocolId()
var protocolId = packet.(IProtocolRegistration).ProtocolId()
buffer.WriteShort(protocolId)
var protocolRegistration = GetProtocol(protocolId)
protocolRegistration.write(buffer, packet)
@@ -4,7 +4,7 @@ type {} struct {
{}
}
func (protocol {}) protocolId() int16 {
func (protocol {}) ProtocolId() int16 {
return {}
}
@@ -4,7 +4,7 @@ type {} struct {
{}
}
func (protocol {}) protocolId() int16 {
func (protocol {}) ProtocolId() int16 {
return {}
}