RPC(Remote Procedure Call Protocol)远程过程调用协议,过程示意图image-20250330212348735

一、提供方配置

1.1、生成服务对象

要提供RPC服务,需要在proto中添加service信息,protoc会为每个service生成对应的C++类,包含虚函数,用户需要继承并实现这些函数。但这些生成的代码并不处理实际的网络通信,只是提供了一个框架,需要用户自己填充具体的逻辑,或者结合其他RPC库使用。

syntax = "proto3";

package friendservice;

//当cc_generic_services为true时,protoc会为每个service生成对应的C++类,包含虚函数,用户需要继承并实现这些函数。
option cc_generic_services = true;

message ResultCode
{
    int32 errCode = 1;
    bytes errMsg = 2;
}

message GetFriendListRequest
{
    uint32 userid = 1;
}

message GetFriendListResponse
{
    ResultCode result = 1;
    repeated bytes friends = 2;
}

service FriendServiceRpc
{
    rpc GetFriendList(GetFriendListRequest) returns(GetFriendListResponse);
}

1.2、提取服务

protobuf会为每一个service生成对应的抽象描述,具体实现需要我们继承这些类并实现对应的方法,然后通过protobuf内部实现的多态进行调用。

  • 获取服务对象的描述信息,描述中包含service的名字方法数量以及方法的抽象描述,将这些信息封装到map表中,用于处理服务和方法的寻找

    const google::protobuf::ServiceDescriptor* pserviceDesc = service->GetDescriptor();
    std::string service_name = pserviceDesc->name();//service名字
    int methodCnt = pserviceDesc->method_count();//service方法数量
    const google::protobuf::MethodDescriptor* pmethonDest = pserviceDesc->method(i);//方法的抽象描述
    std::string method_name = pmethonDest->name();//方法的名字

1.3、启动RPC服务

现在存在一个问题,那就是通过RPC实现的分布式框架会很多RPC服务提供者,调用方是如何知道每一个服务的调用地址呢,总不能列一个静态配置文件,一个一个的罗列出来吧,这样维护成本太高了,每次添加或删除一个service服务,调用方都要修改配置文件并重新启动。服务方的进行修改时还需要修改调用方,这样的设计实在太糟糕了。

为了解决上述问题,zookeeper就出现了,根据观察者模式的思想,每一个提供方在启动时都在zookeeper中注册信息,调用方的所有调用,只需要发送给zookeeper,至于调用方的地址在哪,调用方都无需关心,一切都由zookeeper负责。这样,调用方就只需要关注zookeeper地址即可,服务方的修改也不会影响到调用方。

PixPin_2025-03-31_09-28-24

  • 连接zookeeper服务

    // 连接zkserver
    void ZKClient::Start()
    {
        std::string host = MprpcApplication::GetInstance().getConfig().Load("zookeeperip");
        std::string port = MprpcApplication::GetInstance().getConfig().Load("zookeeperport");
        std::string connstr = host + ":" + port;
    
        m_zhandle = zookeeper_init(connstr.c_str(), global_watcher, 30000, nullptr, nullptr, 0);
        if (nullptr == m_zhandle)
        {
            perror("zookeeper_init");
            exit(EXIT_FAILURE);
        }
    
        sem_t sem;
        sem_init(&sem, 0, 0);
        zoo_set_context(m_zhandle, &sem);
    
        // 阻塞等待global_watcher函数被调用,在ZooKeeper连接成功建立解除阻塞
        sem_wait(&sem);
    }
  • 往zookeeper中注册服务

    // 注册服务
    for(auto &sp : m_serviceMap)
    {
        std::string service_path = "/" + sp.first;
        // 0表示永久性节点
        zkclient.Create(service_path.c_str(), nullptr, 0);
        for(auto &mp : sp.second.m_methodMap)
        {
            std::string method_path = service_path + "/" + mp.first;
            char method_path_buffer[128] = {0};
            snprintf(method_path_buffer, sizeof(method_path_buffer), "%s:%d",ip.c_str(), port);
            // ZOO_EPHEMERAL 表示创建临时节点
            zkclient.Create(method_path.c_str(), method_path_buffer, strlen(method_path_buffer),ZOO_EPHEMERAL);
        }
    }
  • 最后,启动网络服务完成最后的RPC启动工作

    server.start();
    m_eventLoop.loop();

二、RPC提供者实现细节

前面提到,protoc会为每个service生成对应的C++类,包含虚函数,用户需要继承并实现这些函数。

首先我们可以看见,FriendServiceRpc是protoc给我们自动生成的service类,我们可以看到它继承了google ::protobuf::Service

  • 我们可以先看看google ::protobuf::Service的实现,我们可以看见,这是一个抽象类
class PROTOBUF_EXPORT Service {
 public:
  inline Service() {}
  virtual ~Service();

  enum ChannelOwnership { STUB_OWNS_CHANNEL, STUB_DOESNT_OWN_CHANNEL };

  // Get the ServiceDescriptor describing this service and its methods.
  virtual const ServiceDescriptor* GetDescriptor() = 0;

  //可以看见,CallMethod是一个纯虚函数,FriendServiceRpc类重写service中的这个函数,通过service多态调用FriendServiceRpc的        CallMethod方法
  virtual void CallMethod(const MethodDescriptor* method,
                          RpcController* controller, const Message* request,
                          Message* response, Closure* done) = 0;

  virtual const Message& GetRequestPrototype(
      const MethodDescriptor* method) const = 0;
  virtual const Message& GetResponsePrototype(
      const MethodDescriptor* method) const = 0;

 private:
  GOOGLE_DISALLOW_EVIL_CONSTRUCTORS(Service);
};
  • protobuf生成的服务类FriendServiceRpc,这个类通过继承google ::protobuf::Service并重写CallMethod方法调用我们重写的函数GetFriendList
class FriendServiceRpc : public ::PROTOBUF_NAMESPACE_ID::Service {
 protected:
  // This class should be treated as an abstract interface.
  inline FriendServiceRpc() {};
 public:
  virtual ~FriendServiceRpc();

  typedef FriendServiceRpc_Stub Stub;

  //service描述
  static const ::PROTOBUF_NAMESPACE_ID::ServiceDescriptor* descriptor();

  //方法名的抽象描述,我们需要重写这个方法
  virtual void GetFriendList(::PROTOBUF_NAMESPACE_ID::RpcController* controller,
                       const ::friendservice::GetFriendListRequest* request,
                       ::friendservice::GetFriendListResponse* response,
                       ::google::protobuf::Closure* done);

  // implements Service ----------------------------------------------

  const ::PROTOBUF_NAMESPACE_ID::ServiceDescriptor* GetDescriptor();
  //CallMethod是最关键的函数,service通过多态调用这个CallMethod方法,而这个方法会调用我们重写好的函数!
  void CallMethod(const ::PROTOBUF_NAMESPACE_ID::MethodDescriptor* method,
                  ::PROTOBUF_NAMESPACE_ID::RpcController* controller,
                  const ::PROTOBUF_NAMESPACE_ID::Message* request,
                  ::PROTOBUF_NAMESPACE_ID::Message* response,
                  ::google::protobuf::Closure* done);
  const ::PROTOBUF_NAMESPACE_ID::Message& GetRequestPrototype(
    const ::PROTOBUF_NAMESPACE_ID::MethodDescriptor* method) const;
  const ::PROTOBUF_NAMESPACE_ID::Message& GetResponsePrototype(
    const ::PROTOBUF_NAMESPACE_ID::MethodDescriptor* method) const;

 private:
  GOOGLE_DISALLOW_EVIL_CONSTRUCTORS(FriendServiceRpc);
};

//调用方法的处理逻辑,protobuf为每一个方法都生成了一个下标,在请求到来时,会通过下标确定方法并通过多态的方式调用
void UserServiceRpc::CallMethod(const ::PROTOBUF_NAMESPACE_ID::MethodDescriptor* method,
                             ::PROTOBUF_NAMESPACE_ID::RpcController* controller,
                             const ::PROTOBUF_NAMESPACE_ID::Message* request,
                             ::PROTOBUF_NAMESPACE_ID::Message* response,
                             ::google::protobuf::Closure* done) {
  GOOGLE_DCHECK_EQ(method->service(), file_level_service_descriptors_user_2eproto[0]);
  switch(method->index()) {
    case 0:
      Login(controller,
             ::PROTOBUF_NAMESPACE_ID::internal::DownCast<const ::fixbug::LoginRequest*>(
                 request),
             ::PROTOBUF_NAMESPACE_ID::internal::DownCast<::fixbug::LoginResponse*>(
                 response),
             done);
      break;
    case 1:
      Register(controller,
             ::PROTOBUF_NAMESPACE_ID::internal::DownCast<const ::fixbug::RegisterRequest*>(
                 request),
             ::PROTOBUF_NAMESPACE_ID::internal::DownCast<::fixbug::RegisterResponse*>(
                 response),
             done);
      break;
    default:
      GOOGLE_LOG(FATAL) << "Bad method index; this should never happen.";
      break;
  }
}
  • 我们自定义的服务类,用于继承并重写FriendServiceRpc中的虚函数
class FriendService : public friendservice::FriendServiceRpc
{
public:
    static std::vector<std::string> getFriendList(uint32_t userid)
    {
        std::cout << "doing local service: getFriendList , userid : " << userid << std::endl;
        std::vector<std::string> friendList;
        friendList.emplace_back("zhangsan");
        friendList.emplace_back("lisi");
        friendList.emplace_back("wangwu");
        return friendList;
    }

    void GetFriendList(::google::protobuf::RpcController* controller,
                       const ::friendservice::GetFriendListRequest* request,
                       ::friendservice::GetFriendListResponse* response,
                       ::google::protobuf::Closure* done) override
    {
        uint32_t userid = request->userid();
        std::vector<std::string> friendList = getFriendList(userid);
        response->mutable_result()->set_errcode(0);
        response->mutable_result()->set_errmsg("");
        for (const std::string& name : friendList)
        {
            response->add_friends(name);
        }
        done->Run();
    }
};

​ 从上面我们可以大致看出,RPC服务提供者处理就是在服务端请求到达时,首先会将父类指针google::protobuf::Service*指向子类对象,这个子类对象实际上就是上面的class FriendService : public friendservice::FriendServiceRpc,也就是我们自己实现的类,再通过FriendService的CallMethod方法(由于没有重写,实际上是父类的CallMethod方法)调用我们重写的方法,至于怎么确定是哪个函数,这个由protobuf内部负责的,具体逻辑就是我们们看见的FriendServiceRpc重写的CallMethod方法,这个CallMethod会调用我们重写好的函数。

image-20250331145019803

实际上,protobuf知道我们要调用哪个函数,是需要我们传递一些参数的,我们可以看到CallMethod的函数声明

  virtual void CallMethod(const MethodDescriptor* method,
                          RpcController* controller, const Message* request,
                          Message* response, Closure* done) = 0;
  1. method:就是对应调用方法的描述,const google::protobuf::MethodDescriptor* method,通过这个描述,FriendServiceRpc就能知道要调用哪个方法。
  2. controller:通过这个RpcController类,我们可以查询返回数据时是否发生错误,并获得相关的RPC的信息,如错误的信息。
  3. request:包含方法的参数信息
  4. response: 包含服务提供者返回的响应消息
  5. done: 用于发送数据给客户端
//request
google::protobuf::Message* request = service->GetRequestPrototype(method).New();
//response
google::protobuf::Message* response = service->GetResponsePrototype(method).New();
//done,在done调用run方法时,会调用sendRpcResponse方法
google::protobuf::Closure* done = google::protobuf::
    NewCallback<RpcProvider, const muduo::net::TcpConnectionPtr&, google::protobuf::Message*>(
    this, &RpcProvider::sendRpcResponse, conn, response);

首先,我们首先要根据protobuf的数据格式约定,提取出调用方的调用信息

//解析服务描述信息
if (rpcHeader.ParseFromString(rpc_header_str))
{
    service_name = rpcHeader.service_name();
    method_name = rpcHeader.method_name();
    args_size = rpcHeader.args_size();
}

然后查看服务和方法是否存在

auto it = m_serviceMap.find(service_name);
if (it == m_serviceMap.end())
{
    LOG_ERROR("service_name : %s is not found", service_name.c_str());
    return;
}
// 尝试确认是否提供对应的方法
auto mit = it->second.m_methodMap.find(method_name);
if (mit == it->second.m_methodMap.end())
{
    LOG_ERROR("service_name : %s method_name : %s is not found",
              service_name.c_str(),
              method_name.c_str());
    return;
}

初始化request的参数数据

google::protobuf::Message* request = service->GetRequestPrototype(method).New();
if (!request->ParseFromString(args_str))
{
    LOG_ERROR("request parse error,content : %s", args_str.c_str());
    return;
}

最后调用CallMethod方法

service->CallMethod(method, nullptr, request, response, done);

具体代码实现:

void RpcProvider::onMessage(const muduo::net::TcpConnectionPtr& conn,
                            muduo::net::Buffer* buffer,
                            muduo::Timestamp timer)
{
    std::string recv_buf = buffer->retrieveAllAsString();
    // 从字符流中读取前4个字节的内容
    uint32_t header_size = 0;
    // 从第0个字节开始读取,读取4个字节
    recv_buf.copy((char*)&header_size, 4, 0);
    std::cout << header_size << std::endl;
    // 根据header_size读取数据头的原始字符流,从第五个字节开始读取数据
    std::string rpc_header_str = recv_buf.substr(4, header_size);
    mprpc::RpcHeader rpcHeader;
    std::string service_name;
    std::string method_name;
    uint32_t args_size;
    if (rpcHeader.ParseFromString(rpc_header_str))
    {
        service_name = rpcHeader.service_name();
        method_name = rpcHeader.method_name();
        args_size = rpcHeader.args_size();
        LOG_INFO("service_name : %s, method_name : %s, args_size : %d",
                 service_name.c_str(),
                 method_name.c_str(),
                 args_size);
    }
    else
    {
        LOG_ERROR("rpc_header_str : %s parse error", rpc_header_str.c_str());
    }
    // 参数 args_size处理粘包问题
    std::string args_str = recv_buf.substr(4 + header_size, args_size);

    // 尝试确认是否提供对应的服务
    auto it = m_serviceMap.find(service_name);
    if (it == m_serviceMap.end())
    {
        LOG_ERROR("service_name : %s is not found", service_name.c_str());
        return;
    }
    // 尝试确认是否提供对应的方法
    auto mit = it->second.m_methodMap.find(method_name);
    if (mit == it->second.m_methodMap.end())
    {
        LOG_ERROR("service_name : %s method_name : %s is not found",
                  service_name.c_str(),
                  method_name.c_str());
        return;
    }
    google::protobuf::Service* service = it->second.m_service; // 指向service对象,也就是我们自己实现的类
    const google::protobuf::MethodDescriptor* method = mit->second; // 指向对应的method对象,具体要调用的函数

    // 生成rpc方法调用的请求request和响应response对象
    google::protobuf::Message* request = service->GetRequestPrototype(method).New();

    //解析参数数据
    if (!request->ParseFromString(args_str))
    {
        LOG_ERROR("request parse error,content : %s", args_str.c_str());
        return;
    }
    LOG_INFO("args_str : %s", request->DebugString().c_str());
    google::protobuf::Message* response = service->GetResponsePrototype(method).New();

    google::protobuf::Closure* done = google::protobuf::
        NewCallback<RpcProvider, const muduo::net::TcpConnectionPtr&, google::protobuf::Message*>(
            this, &RpcProvider::sendRpcResponse, conn, response);

    service->CallMethod(method, nullptr, request, response, done);
}

void RpcProvider::sendRpcResponse(const muduo::net::TcpConnectionPtr& conn,
                                  google::protobuf::Message* response)
{
    std::string response_str;
    if (response->SerializeToString(&response_str))
    {
        // 序列化成功后发送给调用方
        conn->send(response_str);
    }
    else
    {
        LOG_ERROR("Serialize response_str error !");
    }
    conn->shutdown(); // 模拟http的短链接服务,有rpcprovider主动断开连接
}

三、调用方配置

使用姿势

// 初始化框架
MprpcApplication::Init(argc, argv);
//初始化RpcChannel
friendservice::FriendServiceRpc_Stub stub(new MprpcChannel());
friendservice::GetFriendListRequest getFriendListRequest;
// 定义请求参数
getFriendListRequest.set_userid(1);
friendservice::GetFriendListResponse getFriendListResponse;
MprpcController controller;
//stub.GetFriendList的第四个参数传入nullptr表示阻塞等待结果,也可以传入一个回调函数(Closure),
//异步等待结果,当rpc调用完成后会调用这个回调函数
stub.GetFriendList(&controller, &getFriendListRequest, &getFriendListResponse, nullptr);

if (!controller.Failed())
{
    if (0 == getFriendListResponse.result().errcode())
    {
        //成功时的处理逻辑
    }
    else
    {
        //失败时的错误逻辑
    }
}
else
{
    std::cout << "rpc error : " << controller.ErrorText() << std::endl;
}

四、RPC调用者实现细节

protobuf不就会生成服务提供者相关的类,同样还会实现调用者相关的类

与提供者不同的是,调用的创建的父类对象是继承自FriendServiceRpc,也就是提供者的那个父类,每一个服务提供者的类都会生成一个对应的Stub类,调用方通过这个Stub类就能与提供方实现数据间相互处理。

class FriendServiceRpc_Stub : public FriendServiceRpc {
 public:
  FriendServiceRpc_Stub(::PROTOBUF_NAMESPACE_ID::RpcChannel* channel);
  FriendServiceRpc_Stub(::PROTOBUF_NAMESPACE_ID::RpcChannel* channel,
                   ::PROTOBUF_NAMESPACE_ID::Service::ChannelOwnership ownership);
  ~FriendServiceRpc_Stub();

  inline ::PROTOBUF_NAMESPACE_ID::RpcChannel* channel() { return channel_; }

  // implements FriendServiceRpc ------------------------------------------

  void GetFriendList(::PROTOBUF_NAMESPACE_ID::RpcController* controller,
                       const ::friendservice::GetFriendListRequest* request,
                       ::friendservice::GetFriendListResponse* response,
                       ::google::protobuf::Closure* done);
 private:
  ::PROTOBUF_NAMESPACE_ID::RpcChannel* channel_;
  bool owns_channel_;
  GOOGLE_DISALLOW_EVIL_CONSTRUCTORS(FriendServiceRpc_Stub);
};

实际上,FriendServiceRpc_Stub 类并没有直接实现 CallMethod 方法,而是通过一个更精妙的设计来实现 RPC 调用。当我们查看 FriendServiceRpc_Stub 的构造函数时,可以发现它需要一个 RpcChannel 对象作为参数。这个 RpcChannel 类才是真正包含 CallMethod 方法的地方。这样表示着,调用方的所有操作,最终都会通过经过这个RpcChannel类并通过其CallMethod方法发出。RpcChannel 本身是一个抽象类,只定义了这一个纯虚方法,这正是 Protocol Buffers 框架的精髓所在 - 它提供了接口定义,但将具体的网络通信实现留给了开发者。通过继承 RpcChannel 并重写其 CallMethod 方法,我们可以实现序列化和反序列化逻辑,添加额外的元数据(如超时设置、重试策略等)等。

class PROTOBUF_EXPORT RpcChannel {
 public:
  inline RpcChannel() {}
  virtual ~RpcChannel();

  virtual void CallMethod(const MethodDescriptor* method,
                          RpcController* controller, const Message* request,
                          Message* response, Closure* done) = 0;

 private:
  GOOGLE_DISALLOW_EVIL_CONSTRUCTORS(RpcChannel);
};

具体的执行过程如下:

  • 客户端通过Stub类调用对应的RPC接口,在这个接口中调用RpcChannel的CallMethod方法
stub.GetFriendList(&controller, &getFriendListRequest, &getFriendListResponse, nullptr);
//调用CallMethod方法,protobuf生成对应描述,使用下标调用对应的方法
void FriendServiceRpc_Stub::GetFriendList(::PROTOBUF_NAMESPACE_ID::RpcController* controller,
                              const ::friendservice::GetFriendListRequest* request,
                              ::friendservice::GetFriendListResponse* response,
                              ::google::protobuf::Closure* done) {
  channel_->CallMethod(descriptor()->method(0),
                       controller, request, response, done);
}
  • RpcChannel的CallMethod方法实际上调用的是我们重写的方法,用于封装并发送的数据
class MprpcChannel : public google::protobuf::RpcChannel
{
public:
    MprpcChannel();
    ~MprpcChannel();

    // 所以使用stub代理对象调用rpc方法的都会调用callMethod方法,通过该方法进行数据的序列化和网络发送
    void CallMethod(const google::protobuf::MethodDescriptor* method,
                    google::protobuf::RpcController* controller,
                    const google::protobuf::Message* request,
                    google::protobuf::Message* response,
                    google::protobuf::Closure* done) override;

private:
};

void MprpcChannel::CallMethod(const google::protobuf::MethodDescriptor* method,
                              google::protobuf::RpcController* controller,
                              const google::protobuf::Message* request,
                              google::protobuf::Message* response,
                              google::protobuf::Closure* done)
{
    // 通过method从ServiceDescriptor中获取服务名字
    const google::protobuf::ServiceDescriptor* sd = method->service();
    const std::string& service_name = sd->name();
    const std::string& method_name = method->name();

    std::string args_str;
    uint32_t args_size = 0;
    // 序列化请求参数
    if (request->SerializeToString(&args_str))
    {
        args_size = args_str.size();
    }
    else
    {
        char errmsg[128] = {0};
        snprintf(errmsg,
                 127,
                 "service[%s] method[%s] serialize request error",
                 service_name.c_str(),
                 method_name.c_str());
        controller->SetFailed("serialize request error");
        return;
    }

    // 组织rpc请求头,包含服务名、方法名、参数长度
    mprpc::RpcHeader rpcHeader;
    rpcHeader.set_service_name(service_name);
    rpcHeader.set_method_name(method_name);
    rpcHeader.set_args_size(args_size);

    // 计算rpc请求头的长度
    uint32_t header_size = 0;
    std::string rpc_header_str;
    if (rpcHeader.SerializeToString(&rpc_header_str))
    {
        header_size = rpc_header_str.size();
    }
    else
    {
        char errmsg[128] = {0};
        snprintf(errmsg,
                 127,
                 "service[%s] method[%s] serialize rpc header error",
                 service_name.c_str(),
                 method_name.c_str());
        controller->SetFailed("serialize rpc header error");
        return;
    }

    // 组织待发送的rpc请求,rpc请求 = 请求头 + 请求体(参数)
    std::string send_rpc_str;
    // 将header_size转为二进制格式,存放在send_rpc_str的开始处的前四个字节
    send_rpc_str.insert(0, std::string((char*)&header_size, 4));
    send_rpc_str += rpc_header_str; // rpc请求头
    send_rpc_str += args_str;       // rpc请求体(参数)

    int clientfd = socket(AF_INET, SOCK_STREAM, 0);
    if (-1 == clientfd)
    {
        char errmsg[128] = {0};
        snprintf(errmsg,
                 127,
                 "service[%s] method[%s] create socket error",
                 service_name.c_str(),
                 method_name.c_str());
        controller->SetFailed("create socket error");
        return;
    }
    ZKClient zkClient;
    zkClient.Start();
    // 从zookeeper中获取服务提供者的ip和端口
    std::string service_path = "/" + service_name + "/" + method_name;
    std::string host_data = zkClient.GetData(service_path.c_str());
    if (host_data.empty())
    {
        char errmsg[128] = {0};
        snprintf(errmsg,
                 127,
                 "service[%s] method[%s] get host error",
                 service_name.c_str(),
                 method_name.c_str());
        controller->SetFailed("get host error");
        return;
    }
    int idx = host_data.find(":");
    if (idx == -1)
    {
        char errmsg[128] = {0};
        snprintf(errmsg,
                 127,
                 "service[%s] method[%s] host format error",
                 service_name.c_str(),
                 method_name.c_str());
        controller->SetFailed("host format error");
        return;
    }
    std::string ip = host_data.substr(0, idx);
    uint16_t port = atoi(host_data.substr(idx + 1, host_data.size() - idx).c_str());
    std::cout << "ip : " << ip << " port : " << port << std::endl;
    sockaddr_in server_addr{};
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = inet_addr(ip.c_str());
    server_addr.sin_port = htons(port);
    // 与服务提供者建立连接
    if (-1 == connect(clientfd, (sockaddr*)&server_addr, sizeof(server_addr)))
    {
        char errmsg[128] = {0};
        snprintf(errmsg,
                 127,
                 "service[%s] method[%s] connect error",
                 service_name.c_str(),
                 method_name.c_str());
        controller->SetFailed("connect error");
        close(clientfd);
        return;
    }
    //发送调用请求
    if (-1 == send(clientfd, send_rpc_str.c_str(), send_rpc_str.size(), 0))
    {
        char errmsg[128] = {0};
        snprintf(errmsg,
                 127,
                 "service[%s] method[%s] send error",
                 service_name.c_str(),
                 method_name.c_str());
        controller->SetFailed("send error");
        close(clientfd);
        return;
    }

    // 这里使用阻塞等待接收数据
    char recv_buf[1024];
    int recv_size = 0;
    if (-1 == (recv_size = recv(clientfd, recv_buf, 1024, 0)))
    {
        char errmsg[128] = {0};
        snprintf(errmsg,
                 127,
                 "service[%s] method[%s] recv error",
                 service_name.c_str(),
                 method_name.c_str());
        controller->SetFailed("recv error");
        close(clientfd);
        return;
    }

    //解析服务端返回的数据
    if (response->ParseFromArray(recv_buf, recv_size))
    {
        std::cout << "response parse successful" << std::endl;
    }
    else
    {
        char errmsg[128] = {0};
        snprintf(errmsg,
                 127,
                 "service[%s] method[%s] parse response error",
                 service_name.c_str(),
                 method_name.c_str());
        controller->SetFailed("parse response error");
    }
    close(clientfd);
}

总体来说,调用方只需要继承RpcChannel并重写其CallMethod方法,调用方只需要提供对应的参数,调用的方法就能,就能通过CallMethod方法实现自动处理发送并处理RPC请求与响应,其余交给框架处理。