前言
RPC框架代码量较多,将仅对核心过程进行梳理
在这篇推文中,将实现Rpc的远程通信。远程通信是 RPC 的根本,本 RPC 框架还是采用 Netty 来作为通信框架。在本项目的系统推文中,将对项目进行详细的介绍。
主要将按照下面的内容进行分配(蓝色字体可戳):
手写RPC框架(一)
RPC简介、技术栈介绍、测试Demo
手写RPC框架(二)
远程通信实现
手写RPC框架(三)制定协议与编解码器、动态代理
手写RPC框架(四)注册中心
Rpc框架示意图
四、实现远程通信
远程通信是 RPC 的根本,本 RPC 框架还是采用 Netty 来作为通信框架。远程通信必然会有一个 Server 和一个 Client 的实现。下面就先介绍该 RPC 框架的实现:
1. 服务端
服务端负责接收客户端的请求,并做出响应。
一个Netty服务端主要包括两部分组成
配置服务端的启动类,比如下方的NettyServer
处理请求的逻辑类, 比如下方的 ServiceRequestHandler
NettyServerpublic class NettyServer {
... //服务端启动的核心方法 public void start(){... }
...
}
其中,start方法中的核心内容如下:
...
//Netty提供的服务端启动类ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, true) //启用tcp协议层面的keep-live .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //拆包Handler socketChannel.pipeline().addLast(new Spliter()); //解码Handler socketChannel.pipeline().addLast(new Decoder()); //编码Handler socketChannel.pipeline().addLast(new Encoder()); ...
//处理客户端请求的Handler socketChannel.pipeline().addLast(serviceRequestHandler);
...
} });
...
});//等待服务端关闭channelFuture.channel().closeFuture().sync();
可以看出在服务端启动时,会为服务端配置相应的Handler;
当有请求到达时,这些Handler会依次对请求做相应的处理,比如首先Spliter对通信数据包进行拆包粘包,保证数据包的完整性,然后Decoder对数据包进行解码,得到请求内容,最后serviceRequestHandler处理请求内容,并且做出响应。
ServiceRequestHandler
ServiceRequestHandler类继承了Netty框架提供的SimpleChannelInboundHandler类,并且重写了三个方法:channelActive()、channelInactive()和channelRead0()。
前两个方法顾名思义,在客户端建立连接和断开连接时执行回调,最后一个方法在收到客户端请求时执行回调,是处理请求的核心方法。
public class ServiceRequestHandler extends SimpleChannelInboundHandler<ServiceRequestPacket> {
public static final ServiceRequestHandler INSTANCE = new ServiceRequestHandler();
private ServiceInvoker serviceInvoker = ServiceInvoker.INSTANCE;
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("客户端建立了连接"); super.channelActive(ctx); }
@Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, ServiceRequestPacket serviceRequest) throws Exception { ServiceResponsePacket responsePacket = new ServiceResponsePacket();
//设置响应id responsePacket.setRequestId(serviceRequest.getId());
...
if (serviceConfig == null) { log.info("No such service : " + serviceRequest); responsePacket.setCode(1); responsePacket.setMessage("No such service"); } else { //获取服务的实现类 Object object = context.getBean(serviceConfig.getRef()); //找到请求的方法 Method method = object.getClass().getMethod(serviceRequest.getMethodName(), serviceRequest.getParameterTypes()); //通过反射调用请求的方法,得到结果 Object result = serviceInvoker.invoke(object, method, serviceRequest); log.info("service id : " + serviceRequest.getId()); log.info("Service invoked : " + serviceRequest); //将请求代码(0:成功, 1:失败)和调用结果封装在响应包中 responsePacket.setCode(0); responsePacket.setData(result); }//将响应包通过Netty发送给客户端 channelHandlerContext.channel().writeAndFlush(responsePacket);
}
@Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.info("客户端断开了连接"); super.channelInactive(ctx); }}
客户端发来的请求主要包括:
请求id
要请求的服务(即哪个类的哪个方法)
请求参数(要传入该方法的参数)
通过对代码分析可以梳理出处理的流程:
根据请求内容找到相应类的相应方法
通过反射调用该方法,并传入请求中的参数
得到结果,封装入响应包
将响应发送给客户端
2. 客户端
客户端主要的工作是连接服务器、发送消息、等待服务端的消息响应以及该响应消息、关闭与服务端的连接。
一个 Netty 的客户端同样有两个部分:
配置服务以及服务启动逻辑类,比如下方的 NettyClient 类。
实现从客户端接收到的消息的处理逻辑类:比如下方的 ClientHandler 类。
完整代码见:https://github.com/wdw87/wRpc
NettyClient
代码较长,省略了非关键部分
public class NettyClient {
...
public NettyClient(String host, int port) {
this.host = host; this.port = port;//启动类 bootstrap = new Bootstrap(); NioEventLoopGroup workerGroup = new NioEventLoopGroup();
bootstrap.group(workerGroup) .channel(NioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception {
//拆包Handler socketChannel.pipeline().addLast(new Spliter()); //解码Handler socketChannel.pipeline().addLast(new Decoder()); //编码Handler socketChannel.pipeline().addLast(new Encoder());
...
//处理服务端响应的Handler socketChannel.pipeline().addLast(serviceResponseHandler);
} }); try { this.connect(host, port); } catch (InterruptedException e) { e.printStackTrace(); } }//连接服务端 public void connect(String host, int port) throws InterruptedException {
...
this.channel = channelFuture.sync().channel(); }//发送请求 public Object send(ServiceRequestPacket requestPacket) throws InterruptedException { if(channel != null && channel.isActive()){ //发送请求 SynchronousQueue<Object> queue = serviceResponseHandler.sendRequest(requestPacket, channel); //阻塞等待响应包 ServiceResponsePacket result = (ServiceResponsePacket)queue.take(); //得到响应包中的请求结果 Class<?> returnType = requestPacket.getReturnType(); Object newdata = parseReturnType(returnType, result.getData()); result.setData(newdata); return result; }else { ServiceResponsePacket responsePacket = new ServiceResponsePacket(); responsePacket.setCode(1); responsePacket.setMessage("未正确连接到服务器.请检查相关配置信息!"); return responsePacket; }
}
...
}
NettyClient类虽然代码较长,但是结构十分简单,客户端在构造函数中初始化,与服务端一样,也有拆包和编解码过程,核心Handler是处理服务端响应的serviceResponseHandler。
值得注意的是,在send() 方法中,首先调用
serviceResponseHandler.sendRequest()方法,该方法会发出请求,同时将一个SynchronousQueue以请求id为key,放入一个ConcurrentHashMap中;
在客户端收到响应后,同样以请求id为key,得到这个SynchronousQueue,并放入响应包,这样在响应传回时就可以获得响应的响应包了。
ServiceResponseHandlerpublic class ServiceResponseHandler extends SimpleChannelInboundHandler<ServiceResponsePacket> {
private Map<String, SynchronousQueue<Object>> queueMap = new ConcurrentHashMap<>();
...
@Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, ServiceResponsePacket serviceResponsePacket) throws Exception { //得到请求id String id = serviceResponsePacket.getRequestId(); //根据id得到相应的SynchronousQueue SynchronousQueue<Object> queue = queueMap.get(id); if(queue != null){ //将响应包放入SynchronousQueue,之后,前文所述的send()方法将解除阻塞,并得响应 queue.put(serviceResponsePacket); queueMap.remove(id); }else{ log.error("request id error !!!"); } }
public SynchronousQueue<Object> sendRequest(ServiceRequestPacket requestPacket, Channel channel){ SynchronousQueue<Object> queue = new SynchronousQueue<>(); //以请求id为key,放入一个SynchronousQueue,此时SynchronousQueue为空队列 queueMap.put(requestPacket.getId(), queue); //发出请求 channel.writeAndFlush(requestPacket); return queue; }}
与server中的Handler相似,在重写的channelRead0()方法中处理响应。
作者:好吃懒做贪玩东
编辑:西瓜媛
推荐阅读:
面经 | 笔试中的综测,你必须得知道!
学习笔记 | git的基本用法
面经 | NLP算法岗(百度)
面经 | NLP算法岗(作业帮)
一文学会Pytorch版本BERT使用
书籍 | 《鸟哥的Linux私房菜》第二章