• 发文
  • 评论
  • 微博
  • 空间
  • 微信

项目推荐 I 手写RPC框架(二)

程序媛驿站 2020-05-06 15:07 发文

前言

RPC框架代码量较多,将仅对核心过程进行梳理

在这篇推文中,将实现Rpc的远程通信。远程通信是 RPC 的根本,本 RPC 框架还是采用 Netty 来作为通信框架。在本项目的系统推文中,将对项目进行详细的介绍。

主要将按照下面的内容进行分配(蓝色字体可戳):

手写RPC框架(一)
RPC简介、技术栈介绍、测试Demo
手写RPC框架(二)
远程通信实现
手写RPC框架(三)制定协议与编解码器、动态代理
手写RPC框架(四)注册中心

Rpc框架示意图

四、实现远程通信

远程通信是 RPC 的根本,本 RPC 框架还是采用 Netty 来作为通信框架。远程通信必然会有一个 Server 和一个 Client 的实现。下面就先介绍该 RPC 框架的实现:

1. 服务端

服务端负责接收客户端的请求,并做出响应。

一个Netty服务端主要包括两部分组成

配置服务端的启动类,比如下方的NettyServer

处理请求的逻辑类, 比如下方的 ServiceRequestHandler

NettyServerpublic class NettyServer {
...   //服务端启动的核心方法   public void start(){...  }
...

其中,start方法中的核心内容如下:

...
//Netty提供的服务端启动类ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup)  .channel(NioServerSocketChannel.class)  .option(ChannelOption.SO_KEEPALIVE, true)   //启用tcp协议层面的keep-live  .childHandler(new ChannelInitializer<SocketChannel>() {       @Override       protected void initChannel(SocketChannel socketChannel) throws Exception {           //拆包Handler           socketChannel.pipeline().addLast(new Spliter());           //解码Handler           socketChannel.pipeline().addLast(new Decoder());           //编码Handler           socketChannel.pipeline().addLast(new Encoder());          ...
          //处理客户端请求的Handler           socketChannel.pipeline().addLast(serviceRequestHandler);
         ...
     }  });
...
});//等待服务端关闭channelFuture.channel().closeFuture().sync();

可以看出在服务端启动时,会为服务端配置相应的Handler;

当有请求到达时,这些Handler会依次对请求做相应的处理,比如首先Spliter对通信数据包进行拆包粘包,保证数据包的完整性,然后Decoder对数据包进行解码,得到请求内容,最后serviceRequestHandler处理请求内容,并且做出响应。

ServiceRequestHandler

ServiceRequestHandler类继承了Netty框架提供的SimpleChannelInboundHandler类,并且重写了三个方法:channelActive()、channelInactive()和channelRead0()。

前两个方法顾名思义,在客户端建立连接和断开连接时执行回调,最后一个方法在收到客户端请求时执行回调,是处理请求的核心方法。

public class ServiceRequestHandler extends SimpleChannelInboundHandler<ServiceRequestPacket> {
  public static final ServiceRequestHandler INSTANCE = new ServiceRequestHandler();
  private ServiceInvoker serviceInvoker = ServiceInvoker.INSTANCE;
  @Override   public void channelActive(ChannelHandlerContext ctx) throws Exception {       log.info("客户端建立了连接");       super.channelActive(ctx);  }
  @Override   protected void channelRead0(ChannelHandlerContext channelHandlerContext, ServiceRequestPacket serviceRequest) throws Exception {       ServiceResponsePacket responsePacket = new ServiceResponsePacket();
      //设置响应id       responsePacket.setRequestId(serviceRequest.getId());
     ...
      if (serviceConfig == null) {           log.info("No such service : " + serviceRequest);           responsePacket.setCode(1);           responsePacket.setMessage("No such service");      } else {           //获取服务的实现类           Object object = context.getBean(serviceConfig.getRef());           //找到请求的方法           Method method = object.getClass().getMethod(serviceRequest.getMethodName(), serviceRequest.getParameterTypes());           //通过反射调用请求的方法,得到结果           Object result = serviceInvoker.invoke(object, method, serviceRequest);           log.info("service id : " + serviceRequest.getId());           log.info("Service invoked : " + serviceRequest);           //将请求代码(0:成功, 1:失败)和调用结果封装在响应包中           responsePacket.setCode(0);           responsePacket.setData(result);      }//将响应包通过Netty发送给客户端       channelHandlerContext.channel().writeAndFlush(responsePacket);
 }
  @Override   public void channelInactive(ChannelHandlerContext ctx) throws Exception {       log.info("客户端断开了连接");       super.channelInactive(ctx);  }}

客户端发来的请求主要包括:

请求id

要请求的服务(即哪个类的哪个方法)

请求参数(要传入该方法的参数)

通过对代码分析可以梳理出处理的流程:

根据请求内容找到相应类的相应方法

通过反射调用该方法,并传入请求中的参数

得到结果,封装入响应包

将响应发送给客户端

2. 客户端

客户端主要的工作是连接服务器、发送消息、等待服务端的消息响应以及该响应消息、关闭与服务端的连接。

一个 Netty 的客户端同样有两个部分:

配置服务以及服务启动逻辑类,比如下方的 NettyClient 类。

实现从客户端接收到的消息的处理逻辑类:比如下方的 ClientHandler 类。

完整代码见:https://github.com/wdw87/wRpc

NettyClient

代码较长,省略了非关键部分

public class NettyClient {
 ...
  public NettyClient(String host, int port) {
      this.host = host;       this.port = port;//启动类       bootstrap = new Bootstrap();       NioEventLoopGroup workerGroup = new NioEventLoopGroup();
      bootstrap.group(workerGroup)              .channel(NioSocketChannel.class)              .option(ChannelOption.SO_KEEPALIVE, true)              .handler(new ChannelInitializer<SocketChannel>() {                   @Override                   protected void initChannel(SocketChannel socketChannel) throws Exception {
                      //拆包Handler                       socketChannel.pipeline().addLast(new Spliter());                       //解码Handler                       socketChannel.pipeline().addLast(new Decoder());                       //编码Handler                       socketChannel.pipeline().addLast(new Encoder());
                     ...
                      //处理服务端响应的Handler                       socketChannel.pipeline().addLast(serviceResponseHandler);
                 }              });       try {           this.connect(host, port);      } catch (InterruptedException e) {           e.printStackTrace();      }  }//连接服务端   public void connect(String host, int port) throws InterruptedException {
     ...
      this.channel = channelFuture.sync().channel();  }//发送请求   public Object send(ServiceRequestPacket requestPacket) throws InterruptedException {       if(channel != null && channel.isActive()){           //发送请求           SynchronousQueue<Object> queue = serviceResponseHandler.sendRequest(requestPacket, channel);           //阻塞等待响应包           ServiceResponsePacket result = (ServiceResponsePacket)queue.take();           //得到响应包中的请求结果           Class<?> returnType = requestPacket.getReturnType();           Object newdata = parseReturnType(returnType, result.getData());           result.setData(newdata);           return result;      }else {           ServiceResponsePacket responsePacket = new ServiceResponsePacket();           responsePacket.setCode(1);           responsePacket.setMessage("未正确连接到服务器.请检查相关配置信息!");           return responsePacket;      }
 }
 ...

NettyClient类虽然代码较长,但是结构十分简单,客户端在构造函数中初始化,与服务端一样,也有拆包和编解码过程,核心Handler是处理服务端响应的serviceResponseHandler。

值得注意的是,在send() 方法中,首先调用

serviceResponseHandler.sendRequest()方法,该方法会发出请求,同时将一个SynchronousQueue以请求id为key,放入一个ConcurrentHashMap中;

在客户端收到响应后,同样以请求id为key,得到这个SynchronousQueue,并放入响应包,这样在响应传回时就可以获得响应的响应包了。

ServiceResponseHandlerpublic class ServiceResponseHandler extends SimpleChannelInboundHandler<ServiceResponsePacket> {
  private Map<String, SynchronousQueue<Object>> queueMap = new ConcurrentHashMap<>();
 ...
  @Override   protected void channelRead0(ChannelHandlerContext channelHandlerContext, ServiceResponsePacket serviceResponsePacket) throws Exception {       //得到请求id       String id = serviceResponsePacket.getRequestId();       //根据id得到相应的SynchronousQueue       SynchronousQueue<Object> queue = queueMap.get(id);       if(queue != null){           //将响应包放入SynchronousQueue,之后,前文所述的send()方法将解除阻塞,并得响应           queue.put(serviceResponsePacket);           queueMap.remove(id);      }else{           log.error("request id error !!!");      }  }
  public SynchronousQueue<Object> sendRequest(ServiceRequestPacket requestPacket, Channel channel){       SynchronousQueue<Object> queue = new SynchronousQueue<>();       //以请求id为key,放入一个SynchronousQueue,此时SynchronousQueue为空队列       queueMap.put(requestPacket.getId(), queue);       //发出请求       channel.writeAndFlush(requestPacket);       return queue;  }}

与server中的Handler相似,在重写的channelRead0()方法中处理响应。

作者:好吃懒做贪玩东

编辑:西瓜媛

推荐阅读:

面经 | 笔试中的综测,你必须得知道!

学习笔记 | git的基本用法

面经 | NLP算法岗(百度

面经 | NLP算法岗(作业帮)

一文学会Pytorch版本BERT使用

书籍 | 《鸟哥的Linux私房菜》第二章


声明:本文为OFweek维科号作者发布,不代表OFweek维科号立场。如有侵权或其他问题,请及时联系我们举报。
2
评论

评论

    相关阅读

    暂无数据

    程序媛驿站

    带你领略计算机学科之美。内容包括...

    举报文章问题

    ×
    • 营销广告
    • 重复、旧闻
    • 格式问题
    • 低俗
    • 标题夸张
    • 与事实不符
    • 疑似抄袭
    • 我有话要说
    确定 取消

    举报评论问题

    ×
    • 淫秽色情
    • 营销广告
    • 恶意攻击谩骂
    • 我要吐槽
    确定 取消

    用户登录×

    请输入用户名/手机/邮箱

    请输入密码