Next, let’s make a simple upgrade to Zinx. Currently, all server requests’ data is stored in a single Request object. The current structure of Request is as follows:

type Request struct {
conn ziface.IConnection // the connection established with the client
data []byte // the data requested by the client
}

As we can see, the Request currently uses a []byte to receive all the data without any length or message type information. This approach lacks comprehensive data management. In order to better represent the data format, we need to define a custom message type and encapsulate all the messages within it.

5.1 Creating the Message Encapsulation Type

Create a file named imessage.go in the zinx/ziface directory. This file serves as the abstraction layer for message encapsulation. In this file, define the IMessage interface as follows:

package ziface

// IMessaage is an abstract interface that encapsulates a message
type IMessage interface {
GetDataLen() uint32 // Get the length of the message data segment
GetMsgId() uint32 // Get the message ID
GetData() []byte // Get the message content
SetMsgId(uint32) // Set the message ID
SetData([]byte) // Set the message content
SetDataLen(uint32) // Set the length of the message data segment
}

The IMessage interface defines six methods, including getters and setters for message length, message ID, and message content. From the interface, we can see that a Message should have three elements: ID, length, and data.

Next, create the implementation of the Message struct in the message.go file in the zinx/znet directory:

package znet

type Message struct {
Id uint32 // ID of the message
DataLen uint32 // Length of the message
Data []byte // Content of the message
}

Next, provide the constructor method for the Message struct:

// Create a new Message package
func NewMsgPackage(id uint32, data []byte) *Message {
return &Message{
Id: id,
DataLen: uint32(len(data)),
Data: data,
}
}

Implement the related getter and setter methods as follows:

// Get the length of the message data segment
func (msg *Message) GetDataLen() uint32 {
return msg.DataLen
}

// Get the message ID
func (msg *Message) GetMsgId() uint32 {
return msg.Id
}
// Get the message content
func (msg *Message) GetData() []byte {
return msg.Data
}
// Set the length of the message data segment
func (msg *Message) SetDataLen(len uint32) {
msg.DataLen = len
}
// Set the message ID
func (msg *Message) SetMsgId(msgId uint32) {
msg.Id = msgId
}
// Set the message content
func (msg *Message) SetData(data []byte) {
msg.Data = data
}

The Message struct serves as a placeholder for future encapsulation optimizations. It also provides an initialization method, NewMsgPackage(), to create a message package.

5.2 Message Packaging and Unpacking

Zinx adopts the classic TLV (Type-Length-Value) packaging format to solve the TCP packet sticking problem. The specific message structure is represented graphically in Figure 5.1.

Figure 5.1

5.2.1 Creating the Abstract Class for Message Packaging and Unpacking

In this section, we will create the implementation for message packaging and unpacking to solve the TCP packet sticking problem. First, create the file idatapack.go in the zinx/ziface directory. This file serves as the abstract layer interface for message packaging and unpacking. The code is as follows:

//zinx/ziface/idatapack.go

package ziface
/*
Packaging and unpacking data
Directly operates on the data stream of TCP connections, adding header information for transmitting data to handle TCP packet sticking problem.
*/
type IDataPack interface {
GetHeadLen() uint32 // Method to get the length of the packet header
Pack(msg IMessage) ([]byte, error) // Method to pack the message
Unpack([]byte) (IMessage, error) // Method to unpack the message
}

IDataPack is an abstract interface that defines three methods:

  • GetHeadLen() returns the length of the message header. For application layer data packets, both sides of the communication require a known header length to read the other data related to the current message from the header. Therefore, an interface for retrieving the header length is provided for the specific implementation classes of packaging and unpacking.
  • Pack(msg IMessage) packs the data in the form of IMessage structure into a binary stream.
  • Unpack([]byte) parses the application layer binary data stream following the Zinx message protocol into an IMessage structure. This method corresponds to the Pack() method.

5.2.2 Implementing the Packaging and Unpacking Class

In this section, we will implement the packaging and unpacking functionality in Zinx. Create the file datapack.go in the zinx/znet directory. This file serves as the concrete implementation class for packaging and unpacking. The code is as follows:

//zinx/znet/datapack.go

package znet
import (
"bytes"
"encoding/binary"
"errors"
"zinx/utils"
"zinx/ziface"
)
// DataPack class for packaging and unpacking, no need for members currently
type DataPack struct{}
// Initialization method for the DataPack class
func NewDataPack() *DataPack {
return &DataPack{}
}

Here, we define the DataPack class without any properties. We provide the NewDataPack() constructor method.

The DataPack class needs to implement the IDataPack interface, including the GetHeadLen(), Pack(), and Unpack() methods. First, let's look at the implementation of the GetHeadLen() method:

//zinx/znet/datapack.go

// Method to get the length of the packet header
func (dp *DataPack) GetHeadLen() uint32 {
// Id uint32(4 bytes) + DataLen uint32(4 bytes)
return 8
}

Here, we simply return a fixed length of 8 bytes. How did we obtain this 8-byte length? According to the TLV message format shown in Figure 17.1, the length of the Data part is not controllable, but the length of the Head part is fixed. The Head part consists of 4 bytes to store the message ID and another 4 bytes to store the length of the Data part. Therefore, the GetHeadLen() method in Zinx directly returns a length of 8 bytes.

The implementation of the Pack() method is as follows:

//zinx/znet/datapack.go

// Method for message packaging (compressing data)
func (dp *DataPack) Pack(msg ziface.IMessage) ([]byte, error) {
// Create a buffer to store the bytes
dataBuff := bytes.NewBuffer([]byte{})
// Write DataLen
if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetDataLen()); err != nil {
return nil, err
}
// Write MsgID
if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetMsgId()); err != nil {
return nil, err
}
// Write data
if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetData()); err != nil {
return nil, err
}
return dataBuff.Bytes(), nil
}

Here, we use little-endian byte order (as long as the byte order for packaging and unpacking matches). The compression order is important, as we need to pack the DataLen, MsgID, and Data in that order. Similarly, the unpacking process must follow this order.

The corresponding implementation for the Unpack() method is as follows:

//zinx/znet/datapack.go

// Method for message unpacking (decompressing data)
func (dp *DataPack) Unpack(binaryData []byte) (ziface.IMessage, error) {
// Create an io.Reader from the input binary data
dataBuff := bytes.NewReader(binaryData)
// Only extract the information from the header, obtaining dataLen and msgID
msg := &Message{}
// Read dataLen
if err := binary.Read(dataBuff, binary.LittleEndian, &msg.DataLen); err != nil {
return nil, err
}
// Read msgID
if err := binary.Read(dataBuff, binary.LittleEndian, &msg.Id); err != nil {
return nil, err
}
// Check if the dataLen exceeds the maximum allowed packet size
if utils.GlobalObject.MaxPacketSize > 0 && msg.DataLen > utils.GlobalObject.MaxPacketSize {
return nil, errors.New("Too large msg data received")
}
// We only need to unpack the header data, and then read the data once more from the connection based on the length of the header
return msg, nil
}

Note that the Unpack() method in this case is performed in two steps, as illustrated in Figure 16.1. The second step depends on the result of the dataLen obtained in the first step. Therefore, the Unpack() method can only extract the header (Head) content, obtaining the msgId and dataLen values. The caller will then continue reading the Body data from the IO stream based on the length specified by dataLen. It's important to read dataLen and msgID in the same order as in the Pack() method.

5.2.3 Testing Packaging and Unpacking Functionality

To better understand the concept, let’s test the packaging and unpacking functionality by creating a separate server and client before integrating the DataPack into the Zinx framework. Here is the code for the server, server.go:

// server.go

package main
import (
"fmt"
"io"
"net"
"zinx/znet"
)
// Test the datapack packaging and unpacking functionality
func main() {
// Create a TCP server socket
listener, err := net.Listen("tcp", "127.0.0.1:7777")
if err != nil {
fmt.Println("Server listen error:", err)
return
}
// Create a goroutine to handle reading and parsing the data from the client goroutine, which is responsible for handling sticky packets
for {
conn, err := listener.Accept()
if err != nil {
fmt.Println("Server accept error:", err)
}
// Handle client requests
go func(conn net.Conn) {
// Create a datapack object dp for packaging and unpacking
dp := znet.NewDataPack()
for {
// 1. Read the head part from the stream
headData := make([]byte, dp.GetHeadLen())
// ReadFull fills the buffer completely
_, err := io.ReadFull(conn, headData)
if err != nil {
fmt.Println("Read head error")
break
}
// 2. Unpack the headData bytes into msgHead
msgHead, err := dp.Unpack(headData)
if err != nil {
fmt.Println("Server unpack error:", err)
return
}
// 3. Read the data bytes from the IO based on dataLen
if msgHead.GetDataLen() > 0 {
// msg has data, need to read data again
msg := msgHead.(*znet.Message)
msg.Data = make([]byte, msg.GetDataLen())
_, err := io.ReadFull(conn, msg.Data)
if err != nil {
fmt.Println("Server unpack data error:", err)
return
}
fmt.Println("==> Recv Msg: ID=", msg.Id, ", len=", msg.DataLen, ", data=", string(msg.Data))
}
}
}(conn)
}
}

Let’s analyze the key steps in the code:

(1) First, we read the head part from the stream:

headData := make([]byte, dp.GetHeadLen())
_, err := io.ReadFull(conn, headData)

We allocate a buffer called headData with a fixed length of 8 bytes, which is the size of the head in our protocol. We then use io.ReadFull() to read exactly 8 bytes from the socket's IO into headData. If there are fewer than 8 bytes in the underlying buffer, io.ReadFull() will not return until the buffer is completely filled.

Next, we unpack the headData bytes into msgHead:

msgHead, err := dp.Unpack(headData)

We use the Unpack() method of the DataPack object to unpack the headData bytes into the msgHead object.

Finally, we read the data bytes from the IO based on the dataLen:

if msgHead.GetDataLen() > 0 {
msg := msgHead.(*znet.Message)
msg.Data = make([]byte, msg.GetDataLen())
_, err := io.ReadFull(conn, msg.Data)
// ... process the received message data
}

If the dataLen is greater than 0, it means there is data in the message. We create a buffer based on dataLen, read the data bytes from the IO into the buffer using io.ReadFull(), and then process the received message data.

By using io.ReadFull(), we ensure that the exact number of bytes required for each read operation is read from the socket's IO. If the underlying buffer has more data than needed, the remaining data will stay in the buffer for the next read operation.

Now, let’s run the server and create a client to test the packaging and unpacking functionality.

(2) Unpacking the header information byte stream from headData to Message:

msgHead, err := dp.Unpack(headData)
if err != nil {
fmt.Println("Server unpack error:", err)
return
}

The Unpack() function can extract the header information of the Message from the 8-byte headData. The msgHead.ID and msgHead.DataLen fields will be assigned values.

(3) Reading the byte stream from the IO based on dataLen:

// 3. Read the data bytes from the IO based on dataLen
if msgHead.GetDataLen() > 0 {
// If the message has data, read the data again
msg := msgHead.(*znet.Message)
msg.Data = make([]byte, msg.GetDataLen())
_, err := io.ReadFull(conn, msg.Data)
if err != nil {
fmt.Println("Server unpack data error:", err)
return
}
// ... (omitted code)
}

In this part of the code, we perform a second read based on the DataLen to read the complete message data. We use the ReadFull() method to read a fixed-length of DataLen bytes from the IO and fill the msgHead.Data attribute with the read data. This allows us to read a complete message with a fixed data length, effectively solving the problem of message fragmentation.

Now let’s take a look at the testing code for the client:

// client.go
package main

import (
"fmt"
"net"
"zinx/znet"
)
func main() {
// Create a client goroutine to simulate the data for sticky packets and send them
conn, err := net.Dial("tcp", "127.0.0.1:7777")
if err != nil {
fmt.Println("Client dial error:", err)
return
}
// 1. Create a DataPack object dp
dp := znet.NewDataPack()
// 2. Pack msg1
msg1 := &znet.Message{
Id: 0,
DataLen: 5,
Data: []byte{'h', 'e', 'l', 'l', 'o'},
}
sendData1, err := dp.Pack(msg1)
if err != nil {
fmt.Println("Client pack msg1 error:", err)
return
}
// 3. Pack msg2
msg2 := &znet.Message{
Id: 1,
DataLen: 7,
Data: []byte{'w', 'o', 'r', 'l', 'd', '!', '!'},
}
sendData2, err := dp.Pack(msg2)
if err != nil {
fmt.Println("Client pack msg2 error:", err)
return
}
// 4. Concatenate sendData1 and sendData2 to create a sticky packet
sendData1 = append(sendData1, sendData2...)
// 5. Write data to the server
conn.Write(sendData1)
// Block the client
select {}
}

In this code, there are five clear steps:

  1. The client uses the DataPack from zinx/znet to pack the data. Two messages are packed consecutively, and append() is used to concatenate the byte streams of the two messages, simulating a sticky packet. Then the combined sendData1 is written to the remote server.
  2. If the server can parse the data of both messages, it demonstrates that the packing and unpacking methods of DataPack are functioning correctly.

To run the server, execute the following command:

go run Server.go

In a new terminal, run the client code using the command:

go run Client.go

From the server’s perspective, the following output is observed:

==> Recv Msg: ID=0, len=5, data=hello
==> Recv Msg: ID=1, len=7, data=world!!

The results indicate that Zinx has received the two packets sent by the client and successfully parsed them.

5.3 Zinx-V0.5 Code Implementation

In this section, we need to integrate the packet packing and unpacking functionality into Zinx and test if the feature is effective.

5.3.1 Modification of the Request Struct

First, we need to modify the data field of the Request struct in zinx/znet/request.go from []byte type to IMessage type.

package znet

import "zinx/ziface"
type Request struct {
conn ziface.IConnection // The connection already established with the client
msg ziface.IMessage // Data requested by the client
}
// Get the connection information
func (r *Request) GetConnection() ziface.IConnection {
return r.conn
}
// Get the data of the request message
func (r *Request) GetData() []byte {
return r.msg.GetData()
}
// Get the ID of the request message
func (r *Request) GetMsgID() uint32 {
return r.msg.GetMsgId()
}

The related getter methods also need to be modified accordingly.

5.3.2 Integration of Unpacking Process

Next, we need to modify the code for reading client data in the StartReader() method of Connection as follows in zinx/znet/connection.go:

func (c *Connection) StartReader() {
fmt.Println("Reader Goroutine is running")
defer fmt.Println(c.RemoteAddr().String(), " conn reader exit!")
defer c.Stop()

for {
// Create a data packing/unpacking object
dp := NewDataPack()
// Read the client's message header
headData := make([]byte, dp.GetHeadLen())
if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil {
fmt.Println("read msg head error", err)
c.ExitBuffChan <- true
continue
}
// Unpack the message, obtain msgid and datalen, and store them in msg
msg, err := dp.Unpack(headData)
if err != nil {
fmt.Println("unpack error", err)
c.ExitBuffChan <- true
continue
}
// Read the data based on dataLen and store it in msg.Data
var data []byte
if msg.GetDataLen() > 0 {
data = make([]byte, msg.GetDataLen())
if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil {
fmt.Println("read msg data error", err)
c.ExitBuffChan <- true
continue
}
}
msg.SetData(data)
// Get the Request data of the current client request
req := Request{
conn: c,
msg: msg, // Replace buf with msg
}
// Find the corresponding Handle registered in Routers based on the bound Conn
go func(request ziface.IRequest) {
// Execute the registered router methods
c.Router.PreHandle(request)
c.Router.Handle(request)
c.Router.PostHandle(request)
}(&req)
}
}

The integration logic is similar to the Server.go code tested in the previous section.

5.3.3 Sending Packets Method

Now that the packet splitting functionality has been integrated into Zinx, when using Zinx, if developers want to return TLV-formatted data to the user, they cannot go through this cumbersome process every time. Therefore, Zinx should provide a packet encapsulation interface for sending packets, which should be defined in the IConnection interface as the SendMsg() method. The code is as follows:

//zinx/ziface/iconnection.go

package ziface

import "net"

// Define the connection interface
type IConnection interface {
// Start the connection and make the current connection work
Start()
// Stop the connection and end the current connection status
Stop()
// Get the raw socket TCPConn from the current connection
GetTCPConnection() *net.TCPConn
// Get the current connection ID
GetConnID() uint32
// Get the remote client address information
RemoteAddr() net.Addr
// Directly send the Message data to the remote TCP client
SendMsg(msgId uint32, data []byte) error
}

The SendMsg() method provides two parameters: the message ID of the current message being sent and the data carried by the message. The Connection implementation of this method is as follows:

//zinx/znet/connection.go

// Send the Message data to the remote TCP client
func (c *Connection) SendMsg(msgId uint32, data []byte) error {
if c.isClosed == true {
return errors.New("Connection closed when send msg")
}
// Package the data and send it
dp := NewDataPack()
msg, err := dp.Pack(NewMsgPackage(msgId, data))
if err != nil {
fmt.Println("Pack error msg id =", msgId)
return errors.New("Pack error msg")
}
// Write back to the client
if _, err := c.Conn.Write(msg); err != nil {
fmt.Println("Write msg id", msgId, "error")
c.ExitBuffChan <- true
return errors.New("conn Write error")
}
return nil
}

The SendMsg() method can make the packaging process transparent to the business developer, making the packet sending more readable and the interface clearer.

5.3.4 Using Zinx-V0.5 to Complete the Application

Now we can use the Zinx framework to complete the application-level test case for the Moon Service program that sends Message messages. The code for the application server is as follows:

//Server.go

package main
import (
"fmt"
"zinx/ziface"
"zinx/znet"
)
// Ping test custom router
type PingRouter struct {
znet.BaseRouter
}
// Test Handle
func (this *PingRouter) Handle(request ziface.IRequest) {
fmt.Println("Call PingRouter Handle")
// Read the client's data first, then write back ping...ping...ping
fmt.Println("recv from client: msgId=", request.GetMsgID(), ", data=", string(request.GetData()))
// Write back data
err := request.GetConnection().SendMsg(1, []byte("ping...ping...ping"))
if err != nil {
fmt.Println(err)
}
}
func main() {
// Create a server handle
s := znet.NewServer()
// Configure the router
s.AddRouter(&PingRouter{})
// Start the server
s.Serve()
}

Now, if you want to send application layer data of Zinx to the other end, you just need to call the SendMsg() method of the Connection object. You can also specify the ID number for the current message, so that the developer can process different business logic based on different Message IDs.

In the current server, the client’s sent Message is first parsed, and then a message with ID 1 is returned. The message content is “ping…ping…ping”.

The implementation code for the application client is as follows:

//Client.go

package main
import (
"fmt"
"io"
"net"
"time"
"zinx/znet"
)
/*
Simulate the client
*/
func main() {
fmt.Println("Client Test... start")
// Wait for 3 seconds to give the server a chance to start the service
time.Sleep(3 * time.Second)
conn, err := net.Dial("tcp", "127.0.0.1:7777")
if err != nil {
fmt.Println("client start err, exit!")
return
}
for {
// Send packet message
dp := znet.NewDataPack()
msg, _ := dp.Pack(znet.NewMsgPackage(0, []byte("Zinx V0.5 Client Test Message")))
_, err := conn.Write(msg)
if err != nil {
fmt.Println("write error err", err)
return
}
// Read the head part of the stream first
headData := make([]byte, dp.GetHeadLen())
_, err = io.ReadFull(conn, headData) // ReadFull will fill msg until it's full
if err != nil {
fmt.Println("read head error")
break
}
// Unpack the headData byte stream into msg
msgHead, err := dp.Unpack(headData)
if err != nil {
fmt.Println("server unpack err:", err)
return
}
if msgHead.GetDataLen() > 0 {
// msg has data, need to read data again
msg := msgHead.(*znet.Message)
msg.Data = make([]byte, msg.GetDataLen())
// Read byte stream from the io based on dataLen
_, err := io.ReadFull(conn, msg.Data)
if err != nil {
fmt.Println("server unpack data err:", err)
return
}
fmt.Println("==> Recv Msg: ID=", msg.Id, ", len=", msg.DataLen, ", data=", string(msg.Data))
}
time.Sleep(1 * time.Second)
}
}

Here, the Client simulates a message with ID 0, “Zinx V0.5 Client Test Message”, and then prints the data returned by the server.

Run the following commands in two separate terminals:

$ go run Server.go
$ go run Client.go

The server output will be as follows:

$ go run Server.go
Add Router succ!
[START] Server name: zinx v-0.5 demoApp, listener at IP: 127.0.0.1, Port 7777 is starting
[Zinx] Version: V0.4, MaxConn: 3, MaxPacketSize: 4096
start Zinx server zinx v-0.5 demoApp succ, now listening...
Reader Goroutine is running
Call PingRouter Handle
recv from client: msgId=0, data=Zinx V0.5 Client Test Message
Call PingRouter Handle
recv from client: msgId=0, data=Zinx V0.5 Client Test Message
Call PingRouter Handle
recv from client: msgId=0, data=Zinx V0.5 Client Test Message
...

The client output will be as follows:

$ go run Client.go
Client Test... start
==> Recv Msg: ID=1, len=18, data=ping...ping...ping
==> Recv Msg: ID=1, len=18, data=ping...ping...ping
==> Recv Msg: ID=1, len=18, data=ping...ping...ping
...

5.4 Summary

Zinx has successfully integrated the message packaging functionality, providing a basic protocol standard for Zinx’s communication. With the ability to recognize message types, Zinx’s communication router can redirect to different business logic based on different Message IDs.

Message packaging is an essential module for server frameworks. This kind of packaging is actually defining the application layer protocol of the communication framework, which can be based on TCP/IP or UDP. Zinx’s communication protocol is relatively simple. If readers want a more advanced communication protocol in the framework, they can add attributes to the message header, such as complete checksum of the data packet, identity information, decryption key, message status, etc. However, it is important to note that every time an attribute is added, the fixed length of the header should be increased, and developers need to keep track of this length to ensure that the complete header information is read.

--

--

No responses yet