前言
关于手写RPC框架的推文接近尾声了!大家一起撒花庆祝,动手实践起来吧!
RPC框架代码量较多,将仅对核心过程进行梳理
在这篇推文中,将介绍注册中心相关的内容。在本项目的系统推文中,将对项目进行详细的介绍。
主要将按照下面的内容进行分配(蓝色字体可戳):
手写RPC框架(一)
RPC简介、技术栈介绍、测试Demo
手写RPC框架(二)
远程通信实现
手写RPC框架(三)
制定协议与编解码器、动态代理
手写RPC框架(四)注册中心
Rpc框架示意图
七、注册中心
我们已经梳理过了RPC框架所要实现的主要部分,还剩下最后一个问题:客户端如何得知服务所在的具体服务器?
这时候就需要注册中心出场了
服务端,即Provider将拥有的服务以及自己的ip注册到注册中心中,客户端,即Consumer监听注册中心,从而得知服务所在的服务器列表。
Zookeeper实现的注册中心
本框架采用Zookeeper实现注册中心。
ZooKeeper的数据模型很简单,就是一棵树,作为注册中心,ZooKeeper的数据模型完全够用。看下图:
支持事件监听
ZooKeeper有一个Watcher机制。用户可以在节点上注册一些Watcher,并且当一些特定事件触发的时候,ZooKeeper服务端会将事件通知到感兴趣的客户端上。正是因为Watcher机制,才可以满足服务订阅的需求:服务消费者可以订阅某个服务,当服务提供者地址更新或者该服务有新的节点加入到集群中时,订阅该服务的服务消费者可以感知到的需求。
崩溃恢复模式
ZooKeeper的崩溃恢复模式能保证注册中心崩溃或者断连后,重启可以自动恢复注册数据以及订阅请求,因为这个时候会有新的Leader服务器与该重启的服务器进行数据同步。
框架使用zkclient作为Zookeeper客户端。
服务注册
在服务端启动时,首先根据xml配置文件,将服务实现类注册为一个Spring Bean,交由Spring管理。同时将服务发布到Zookeeper
@Data@Slf4jpublic class ServiceConfig implements ApplicationContextAware, InitializingBean {
private String id; private String name; private String ref; private ApplicationContext applicationContext; private ServiceRegistry serviceRegistry = ServiceRegistry.getInstance();
@Override public void afterPropertiesSet() throws Exception { if (!applicationContext.containsBean("server")) { log.info("没有配置server,不能发布到注册中心"); return; } if (!applicationContext.containsBean("registry")) { log.info("registry,不能发布到注册中心"); return; } ApplicationContext context = SpringUtil.getApplicationContext(); //获取服务提供者信息,ip等 ServerConfig serverConfig = (ServerConfig)context.getBean("server"); //获取服务提供类的实例,实例通过xml配置的方式注册为一个Spring Bean,交由Spring管理 RegistryConfig registryConfig = (RegistryConfig)context.getBean("registry"); //连接zookeeper serviceRegistry.connectServer(serverConfig,registryConfig); //将服务发布到注册中心 serviceRegistry.register(name); } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; }}服务发现
客户端连接zookeeper,并监听相关服务,本部分代码较长,省略了相关部分:
public class ServiceDiscovery {
...
private volatile List<String> serviceList = new ArrayList<>(); private ConcurrentHashMap<String, List<String>> serverMap = new ConcurrentHashMap<>(); private ZkClient zkClient;
public void discovery(String serviceName) { //获取本地所有服务 importService(serviceName); // 发布客户端引用到注册中心 registerReference(serviceName); // 获取引用 // 缓存引用 getReference(serviceName); // 订阅服务变化 subscribeServiceChange(serviceName); }
...}
可以看出,在客户端初始化过程中,连接zookeeper后,会执行如下过程来发现服务:
查询本地服务存根
private void importService(String name) { boolean flag = false; List<Class<?>> services = ClassUtils.getClasses(name.substring(0,name.lastIndexOf("."))); //扫描服务接口所在包,查询是否有配置文件中对应的借口 for (Class<?> clazz : services) { if(clazz.getName().equals(name)){ serviceList.add(name); flag = true; break; } } //如果服务接口信息与配置文件不符,报错 if(!flag){ log.error("No such interface {} ,please check your config..", name); throw new RuntimeException("No such interface"); }}
发布客户端引用到注册中心
将客户端信息作为一个临时节点,挂载在注册中心中相应服务目录下,表示引用了这个服务
private void registerReference(String service) { String ip = RpcContext.getLocalIp();
String basePath = ZK_REGISTRY_PATH + "/" + service + "/consumers"; String path = basePath + "/" + ip;
createPersistent(basePath); //临时节点 if (!zkClient.exists(path)) { zkClient.createEphemeral(path); } log.info("客户端引用发布成功:[{}]", path);}
获取服务引用,并放入serverMap中缓存
private void getReference(String service) { //删除旧的缓存 if(serverMap.containsKey(service)) { serverMap.remove(service); } log.info("正在获取服务引用[{}]", service); String path = ZK_REGISTRY_PATH + "/" + service + "/providers"; List<String> serverList = null; try { serverList = zkClient.getChildren(path); } catch (Exception e) { log.error("服务器无此服务,请检查相关配置"); e.printStackTrace(); } log.info("发现服务器列表" + serverList); serverMap.put(service, serverList); log.info("引用服务获取完成[" + path + "]:" + service); ;}
监听服务变化
监听订阅的服务,当服务器列表发生变化时(例如有新的服务器加入了服务列表,或者有服务器发生故障),获取最新的服务列表
private void subscribeServiceChange(String service) { String path = ZK_REGISTRY_PATH + "/" + service + "/providers"; log.info("订阅服务变化[{}]", service); zkClient.subscribeChildChanges(path, new IZkChildListener() { @Override public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception { serverMap.remove(service); serverMap.put(service, currentChilds); log.info("服务[{}]发生变化,当前服务节点为{}", service, currentChilds); } });}
项目讲解系列至此结束!
作者:好吃懒做贪玩东
编辑:西瓜媛
推荐阅读:
如何成功追到微软小姐姐-葡萄媛
十分钟生成自己的疫情地图,小白都能立刻上手!
Linux进程、线程、文件描述符的底层原理
Paper | CVPR2019 Image Caption 之 无监督图像描述
Paper | NAACL2019 抽取式摘要之 SUMO
面经 | NLP算法岗(腾讯)
面经 | 推荐算法岗(美团)