一、粘包问题基本概念

网络粘包是指在TCP流式传输中,由于TCP协议的特性,多个数据包可能被合并成一个数据包接收(粘包),或者一个数据包被分割成多个数据包接收(半包)。这会导致接收端无法正确识别消息边界,从而造成数据解析错误。

主要表现形式:

  1. 粘包问题:多个数据包被合并成一个数据包接收
  2. 拆包问题:单个应用层报文被分割成多个TCP段接收

示例情况:

发送端: [Packet1] [Packet2] [Packet3]
接收端可能收到:
情况1: [Packet1] [Packet2] [Packet3](理想情况)
情况2: [Packet1] [Packet2部分数据](半包)
情况3: [Packet1部分] [Packet2] [Packet3部分](粘包+半包)

二、长度前缀(Length-Prefix)协议

最常用的解决方案是采用"长度+数据"的格式进行通信,即在每个消息前添加一个固定长度的字段,用于表示后续数据的长度。

[4字节长度字段(网络字节序)][实际数据(长度由头部指定)]
长度字段使用 uint32_t 大端序(网络字节序)

三、发送端处理

#include <arpa/inet.h>  // htonl/ntohl
#include <vector>

void send_message(int sockfd, const std::vector<char>& data) {
    // 添加长度校验
    // if (data.size() > std::numeric_limits<uint32_t>::max()) {
    //      throw std::invalid_argument("Data size exceeds 4GB");
    // }
    // 构造协议头部,
    uint32_t data_length = static_cast<uint32_t>(data.size());
    uint32_t net_length = htonl(data_length);

    // 构造发送缓冲区(二进制安全)
    std::vector<char> buffer(sizeof(net_length) + data.size());
    memcpy(buffer.data(), &net_length, sizeof(net_length));
    memcpy(buffer.data() + sizeof(net_length), data.data(), data.size());

    // 完整发送(处理部分发送情况)
    size_t total_sent = 0;
    while (total_sent < buffer.size()) {
        ssize_t sent = send(sockfd, buffer.data() + total_sent, 
                            buffer.size() - total_sent, 0);
        if (sent == -1) {
            if (errno == EINTR) continue;  // 处理系统中断
            throw std::runtime_error("send failed: " + std::string(strerror(errno)));
        }
        total_sent += sent;
    }
}

四、接收端处理

4.1 基本接收方法

// 首先接收4字节的长度字段
uint32_t network_length;
size_t total_read = 0;
while (total_read < sizeof(uint32_t)) {
    ssize_t n = recv(socket_fd, (char*)&network_length + total_read, 
                    sizeof(uint32_t) - total_read, 0);
    if (n <= 0) { /* 处理错误 */ }
    total_read += n;
}

// 将网络字节序转换为主机字节序
uint32_t length = ntohl(network_length);

// 根据长度字段分配缓冲区并接收实际消息
std::vector<char> buffer(length);
size_t total_read_body = 0;
while (total_read_body < length) {
    ssize_t n = recv(socket_fd, buffer.data() + total_read_body, 
                     length - total_read_body, 0);
    if (n == 0) {
    throw std::runtime_error("Connection closed by peer");
    } else if (n < 0) {
        if (errno == EINTR) continue;
        throw std::runtime_error("recv error: " + std::string(strerror(errno)));
    }

    total_read_body += n;
}

// 处理接收到的消息
process_message(buffer, length);

4.2 处理粘包和半包的完整状态机实现

#include <sys/socket.h>
#include <stdexcept>

class MessageParser {
public:
    static constexpr uint32_t MAX_MSG_SIZE = 16 * 1024 * 1024; // 16MB

    enum class ParseState {
        ReadingHeader,  // 正在读取长度头
        ReadingBody     // 正在读取消息体
    };

    //不使用std::vector<std::string>的原因:
    //std::string 内部以 \0 作为终止符,若消息中包含二进制数据(如图片、加密内容等)存在 0x00 字节,会导致数据被意外截断:
    //std::vector<char> 直接存储原始字节流,无任何隐式转换,保证数据完整性
    std::vector<std::vector<char>> parse(const char* input, size_t len) {
        std::vector<std::vector<char>> complete_messages;
        size_t processed = 0;

        while (processed < len) {
            switch (current_state_) {
                case ParseState::ReadingHeader: {
                    size_t remain = HEADER_SIZE - header_bytes_received_;
                      //len - processed 表示当前输入数据中还未处理的字节数。len 是传入的数据总长度,processed 是已经处理的字节数。
                    //std::min() 函数取这两个值中的较小值,确保:
                    //如果header_remaining小,确保不会读取超过头部所需的字节数
                    //如果length - processed小,确保不会尝试读取超出当前可用数据范围的字节
                    size_t to_copy = std::min(remain, len - processed);

                    // 拷贝到头部缓冲区
                    memcpy(header_buf_ + header_bytes_received_, input + processed, to_copy);
                    
                    header_bytes_received_ += to_copy;
                    processed += to_copy;

                    // 头部接收完成
                    // header_bytes_received_ < HEADER_SIZE,会直接退出switch
                    // 在处理下一个数据包时,由于header_buf_没有清空和current_state_仍是ReadingHeader
                    // header_bytes_received_也不变,可以继续从 header_buf_ 的位置追加数据
                    // 保证了正确的处理剩余的数据包,直到满足header_bytes_received_ == HEADER_SIZE
                    // 然后重置状态机相关信息,下面的ParseState::ReadingBody同理
                    if (header_bytes_received_ == HEADER_SIZE) {
                        // 转换网络字节序
                        uint32_t net_length;
                        memcpy(&net_length, header_buf_, sizeof(net_length));
                        current_body_length_ = ntohl(net_length);

                        // 安全检查
                        if (current_body_length_ > MAX_MSG_SIZE) {
                            throw std::runtime_error("Message size exceeds limit");
                        }

                        // 准备接收消息体
                        current_body_.resize(current_body_length_);
                        body_bytes_received_ = 0;
                        current_state_ = ParseState::ReadingBody;
                    }
                    break;
                }

                case ParseState::ReadingBody: {
                    size_t remain = current_body_length_ - body_bytes_received_;
                    size_t to_copy = std::min(remain, len - processed);
                    
                    //assert(current_body_.size() >= body_bytes_received_ + to_copy);
                    memcpy(current_body_.data() + body_bytes_received_,
                          input + processed, to_copy);

                    body_bytes_received_ += to_copy;
                    processed += to_copy;

                    // 消息体接收完成
                    if (body_bytes_received_ == current_body_length_) {
                        complete_messages.push_back(std::move(current_body_));
                        reset();
                    }
                    break;
                }
            }
        }
        return complete_messages;
    }

private:
    void reset() {
        current_state_ = ParseState::ReadingHeader;
        header_bytes_received_ = 0;
        body_bytes_received_ = 0;
        current_body_.clear();
    }

    static constexpr size_t HEADER_SIZE = sizeof(uint32_t);
    
    ParseState current_state_ = ParseState::ReadingHeader;
    char header_buf_[HEADER_SIZE] = {0};
    size_t header_bytes_received_ = 0;
    uint32_t current_body_length_ = 0;
    size_t body_bytes_received_ = 0;
    std::vector<char> current_body_;
};

4.3 完整接收流程

void message_loop(int sockfd) {
    MessageParser parser;
    std::vector<char> recv_buf(4096);  // 4KB接收缓冲区

    while (true) {
        ssize_t n = recv(sockfd, recv_buf.data(), recv_buf.size(), 0);
        if (n == 0) {
            // 连接正常关闭
            break;
        } else if (n < 0) {
            if (errno == EINTR) continue;   // 处理系统中断
            if (errno == EAGAIN || errno == EWOULDBLOCK) {
                // 非阻塞模式下无数据可用,应通过select/poll/epoll等待
                // 此处简单重试仅用于演示目的
                usleep(1000);
                continue;
            }
            throw std::runtime_error("recv error: " + std::string(strerror(errno)));
        }

        // 解析消息
        auto messages = parser.parse(recv_buf.data(), n);
        
        // 处理完整消息
        for (auto& msg : messages) {
            process_message(msg.data(), msg.size());
        }
    }
}

五、关键处理要点

  1. 状态机处理:使用状态机区分正在读取头部还是读取消息体
  2. 缓冲区管理:动态调整缓冲区大小,根据需要存储部分接收的数据
  3. 字节累积:持续累积接收的字节,直到获取完整的消息
  4. 多消息处理:一次接收可能包含多个完整消息,需要全部解析出来
  5. 安全检查:验证消息长度的合理性,防止恶意攻击
  6. 字节序问题:在不同架构的计算机之间通信时,必须使用网络字节序(大端序)。使用htonl()将主机字节序转换为网络字节序,使用ntohl()将网络字节序转换为主机字节序
  7. 长度字段大小:通常使用4字节(uint32_t)来表示长度,这足以表示最大4GB的消息
  8. 完整接收:TCP不保证一次recv调用能接收到完整数据,可能需要循环接收直到获取所需的全部字节

六、总结

TCP流式协议中处理粘包和半包问题的核心是:

  1. 使用固定长度的头部标识消息长度
  2. 使用状态机和缓冲区管理接收的数据
  3. 阻塞等待直到接收到完整数据
  4. 单次接收可能同时包含多个完整消息和不完整消息,需要综合处理
  5. 不同的实现方式(memcpy、string操作)各有优缺点,可根据具体需求选择