前言
RPC框架代码量较多,将仅对核心过程进行梳理
在这篇推文中,将介绍协议的制定与编解码,以及动态代理相关内容。在本项目的系统推文中,将对项目进行详细的介绍。
主要将按照下面的内容进行分配(蓝色字体可戳):
手写RPC框架(一)
RPC简介、技术栈介绍、测试Demo
手写RPC框架(二)
远程通信实现
手写RPC框架(三)制定协议与编解码器、动态代理
手写RPC框架(四)注册中心
Rpc框架示意图
五、制定协议与编解
这里的协议对应 TCP/IP 中的应用层协议,我们可以使用现成的协议,比如HTTP协议,将请求与响应内容封装在HTTP请求体和响应体中就可以实现这部分的功能。
协议如果按照编码方式来分的话可以分为二进制协议、明文的文本协议以及两者结合。HTTP就属于文本协议,相对于二进制协议来说,文本协议传输的内容较多。RPC是一种小数据量,高并发的场景,显然HTTP协议不适合本场景。因此本框架自定义了二进制协议:
byte01234567891011...valueMagic Numberversiontypeserializerlengthdata
协议中各项说明:
Magic Number:类似java字节码文件里的魔数,用来判断是不是本协议的数据包,是一个固定的数字;
version:版本号
type:包类型,请求包、响应包、心跳包...
serializer:使用的序列化方式,fastJson、protobuf...
length:数据包长度
data:数据
针对本协议,使用了Netty提供的LengthFieldBasedFrameDecoder对协议进行了解析,同时实现拆包粘包,这就是前文提到的Spliter类
public class Spliter extends LengthFieldBasedFrameDecoder { //对应协议中的length起始字节 private static final int LENGTH_FIELD_OFFSET = 7; //对应协议中的length的字节长度 private static final int LENGTH_FIELD_LENGTH = 4;
public Spliter(){ super(Integer.MAX_VALUE, LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH); }
@Override protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
if(in.getInt(in.readerIndex()) != PacketCodec.MAGIC_NUMBER){ ctx.channel().close(); return null; } return super.decode(ctx, in); }}2. 编码解码编码器
编码器基于Netty提供的MessageToByteEncoder类实现:
public class Encoder extends MessageToByteEncoder<Packet> { @Override protected void encode(ChannelHandlerContext channelHandlerContext, Packet packet, ByteBuf byteBuf) throws Exception { //调用fastJson的序列化方法 PacketCodec.INSTANCE.encode(byteBuf, packet); }}
本框架使用了fastjson的序列化方法,也可以在底层实现protobuf等序列化方式
解码器public class Decoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) { //调用fastJson的反序列化方法 list.add(PacketCodec.INSTANCE.decode(byteBuf)); }}
解码器与编码器实现相同。
3. 消息类型
消息类型主要包括请求消息和响应消息:
请求消息@Data@NoArgsConstructor@AllArgsConstructorpublic class ServiceRequestPacket extends Packet {//请求id private String id;//服务类名 private String className;//方法名 private String methodName;//方法参数类型 private Class<?>[] parameterTypes;//方法参数 private Object[] args;//方法返回类型 private Class<?> returnType;
@Override public byte getType() { //返回包类型 return PacketType.SERVICE_REQUEST; }}响应消息@Data@NoArgsConstructorpublic class ServiceResponsePacket extends Packet { /** * 请求id */ private String requestId; /** * 服务返回编码,0-成功,非0失败 */ private int code = 0; /** * 具体错误信息 */ private String message; /** * 返的数据 */ private Object data;
@Override public byte getType() { return PacketType.SERVICE_RESPONSE; }}
六、动态代理
动态代理从字面上就很好理解,就是动态生成代理类。动态生成代理类是因为 JVM 的类加载机制决定的。我们都知道 Java 虚拟机类加载过程主要分为五个阶段:加载、验证、准备、解析、初始化。在加载阶段会通过一个类的全限定名来获取定义此类的二进制字节流,而二进制字节流的来源有很多种,比如从 ZIP 包获取、从网络中获取以及运行时计算生成等。这里讲到的运行时计算生成使用最多的场景就是动态代理技术。
在 RPC 中代理一般都使用的是动态代理,它代理的是服务的接口,用来解决客户端无法访问不同地址的对象问题。动态代理在实现时并不用关心代理类需要代理谁,而是在运行时才指定代理哪一个对象,这就解决了静态代理的弊端。
完整代码见:https://github.com/wdw87/wRpc1. CGLib实现的代理
本框架实现了两种动态代理方式,JDK提供的动态代理和基于CGLib的动态代理,以基于CGLib的动态代理为例:
CGLib采用ASM字节码生成框架,使用字节码技术生成代理类,通过一个MethodInterceptor拦截该类的方法,从而实现类的增强:
public interface ProxyFactory { Object getProxy(Class<?> clazz);}public class CGLIBProxyFactory implements ProxyFactory { @Override public Object getProxy(Class<?> clazz) { Enhancer enhancer = new Enhancer(); enhancer.setSuperclass(clazz); //设置MethodInterceptor enhancer.setCallback(new InvokerInterceptor(clazz)); //生成代理类 return enhancer.create(); }}
InvokerInterceptor实现了MethodInterceptor接口,实现如下
public class InvokerInterceptor implements MethodInterceptor {
private Class<?> clazz;
private ServiceDiscovery serviceDiscovery = ServiceDiscovery.getInstance();
private NettyClient client = null;
public InvokerInterceptor(Class<?> clazz){ this.clazz = clazz; }
@Override public Object intercept(Object o, Method method, Object[] objects, MethodProxy methodProxy) throws Throwable { ServiceRequestPacket requestPacket = new ServiceRequestPacket();//将请求内容封装成请求数据包: requestPacket.setId(IDUtil.getUUID()); requestPacket.setClassName(clazz.getName()); requestPacket.setMethodName(method.getName()); requestPacket.setArgs(objects); requestPacket.setParameterTypes(method.getParameterTypes()); requestPacket.setReturnType(method.getReturnType());
if(client == null) { //根据负载均衡策略选择提供该服务的服务器 List<String> list = serviceDiscovery.getServerList(clazz.getName()); String serverAddress = LoadBalance.getAddress(list); String[] address = serverAddress.split(":"); //通过客户端建立连接 client = new NettyClient(address[0], Integer.parseInt(address[1])); }//发送请求包 ServiceResponsePacket responsePacket = (ServiceResponsePacket)client.send(requestPacket);//返回请求结果 if(responsePacket.getCode() == 0){ return responsePacket.getData(); }else{ log.error(responsePacket.getMessage()); return null; }
}}
可以看出,拦截器中为我们实现了如下过程:
负载均衡
建立客户端与服务端之间的连接
封装请求数据包并发送
等待响应,返回请求结果
通过动态代理,真正实现了文章一开始说的 " 程序员无需为这个交互过程做额外的编程 " ,调用者只需要知道要调用的服务信息,传入参数,就可以得到结果,一切都通过代理类自动完成!!!
作者:好吃懒做贪玩东
编辑:西瓜媛
推荐阅读:
面经 | 笔试中的综测,你必须得知道!
学习笔记 | git的基本用法
面经 | NLP算法岗(百度)
面经 | NLP算法岗(作业帮)
一文学会Pytorch版本BERT使用
书籍 | 《鸟哥的Linux私房菜》第二章