• 发文
  • 评论
  • 微博
  • 空间
  • 微信

项目推荐 I 手写RPC框架(四)

程序媛驿站 2020-05-06 15:06 发文

前言

关于手写RPC框架的推文接近尾声了!大家一起撒花庆祝,动手实践起来吧!

RPC框架代码量较多,将仅对核心过程进行梳理

在这篇推文中,将介绍注册中心相关的内容。在本项目的系统推文中,将对项目进行详细的介绍。

主要将按照下面的内容进行分配(蓝色字体可戳):

手写RPC框架(一)
RPC简介、技术栈介绍、测试Demo
手写RPC框架(二)
远程通信实现
手写RPC框架(三)
制定协议与编解码器、动态代理
手写RPC框架(四)注册中心

Rpc框架示意图

七、注册中心

我们已经梳理过了RPC框架所要实现的主要部分,还剩下最后一个问题:客户端如何得知服务所在的具体服务器?

这时候就需要注册中心出场了

服务端,即Provider将拥有的服务以及自己的ip注册到注册中心中,客户端,即Consumer监听注册中心,从而得知服务所在的服务器列表。

Zookeeper实现的注册中心

本框架采用Zookeeper实现注册中心。

ZooKeeper的数据模型很简单,就是一棵树,作为注册中心,ZooKeeper的数据模型完全够用。看下图:

支持事件监听

ZooKeeper有一个Watcher机制。用户可以在节点上注册一些Watcher,并且当一些特定事件触发的时候,ZooKeeper服务端会将事件通知到感兴趣的客户端上。正是因为Watcher机制,才可以满足服务订阅的需求:服务消费者可以订阅某个服务,当服务提供者地址更新或者该服务有新的节点加入到集群中时,订阅该服务的服务消费者可以感知到的需求。

崩溃恢复模式

ZooKeeper的崩溃恢复模式能保证注册中心崩溃或者断连后,重启可以自动恢复注册数据以及订阅请求,因为这个时候会有新的Leader服务器与该重启的服务器进行数据同步。

框架使用zkclient作为Zookeeper客户端。

服务注册

在服务端启动时,首先根据xml配置文件,将服务实现类注册为一个Spring Bean,交由Spring管理。同时将服务发布到Zookeeper

@Data@Slf4jpublic class ServiceConfig implements ApplicationContextAware, InitializingBean {
  private String id;   private String name;   private String ref;   private ApplicationContext applicationContext;   private ServiceRegistry serviceRegistry = ServiceRegistry.getInstance();
  @Override   public void afterPropertiesSet() throws Exception {       if (!applicationContext.containsBean("server")) {           log.info("没有配置server,不能发布到注册中心");           return;      }       if (!applicationContext.containsBean("registry")) {           log.info("registry,不能发布到注册中心");           return;      }       ApplicationContext context = SpringUtil.getApplicationContext();       //获取服务提供者信息,ip等       ServerConfig serverConfig = (ServerConfig)context.getBean("server");       //获取服务提供类的实例,实例通过xml配置的方式注册为一个Spring Bean,交由Spring管理       RegistryConfig registryConfig = (RegistryConfig)context.getBean("registry");       //连接zookeeper       serviceRegistry.connectServer(serverConfig,registryConfig);       //将服务发布到注册中心       serviceRegistry.register(name);  }   @Override   public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {       this.applicationContext = applicationContext;  }}服务发现

客户端连接zookeeper,并监听相关服务,本部分代码较长,省略了相关部分:

public class ServiceDiscovery {
...
  private volatile List<String> serviceList = new ArrayList<>();   private ConcurrentHashMap<String, List<String>> serverMap = new ConcurrentHashMap<>();   private ZkClient zkClient;
  public void discovery(String serviceName) {       //获取本地所有服务       importService(serviceName);       // 发布客户端引用到注册中心       registerReference(serviceName);       // 获取引用       // 缓存引用       getReference(serviceName);       // 订阅服务变化       subscribeServiceChange(serviceName);  }
 ...}

可以看出,在客户端初始化过程中,连接zookeeper后,会执行如下过程来发现服务:

查询本地服务存根

private void importService(String name) {   boolean flag = false;   List<Class<?>> services = ClassUtils.getClasses(name.substring(0,name.lastIndexOf(".")));   //扫描服务接口所在包,查询是否有配置文件中对应的借口   for (Class<?> clazz : services) {       if(clazz.getName().equals(name)){           serviceList.add(name);           flag = true;           break;      }  }   //如果服务接口信息与配置文件不符,报错   if(!flag){       log.error("No such interface {} ,please check your config..", name);       throw new RuntimeException("No such interface");  }}

发布客户端引用到注册中心

将客户端信息作为一个临时节点,挂载在注册中心中相应服务目录下,表示引用了这个服务

private void registerReference(String service) {   String ip = RpcContext.getLocalIp();
  String basePath = ZK_REGISTRY_PATH + "/" + service + "/consumers";   String path = basePath + "/" + ip;
  createPersistent(basePath);   //临时节点   if (!zkClient.exists(path)) {       zkClient.createEphemeral(path);  }   log.info("客户端引用发布成功:[{}]", path);}

获取服务引用,并放入serverMap中缓存

private void getReference(String service) {   //删除旧的缓存   if(serverMap.containsKey(service)) {       serverMap.remove(service);  }   log.info("正在获取服务引用[{}]", service);   String path = ZK_REGISTRY_PATH + "/" + service + "/providers";   List<String> serverList = null;   try {       serverList = zkClient.getChildren(path);  } catch (Exception e) {       log.error("服务器无此服务,请检查相关配置");       e.printStackTrace();  }   log.info("发现服务器列表" + serverList);   serverMap.put(service, serverList);   log.info("引用服务获取完成[" + path + "]:" + service);  ;}

监听服务变化

监听订阅的服务,当服务器列表发生变化时(例如有新的服务器加入了服务列表,或者有服务器发生故障),获取最新的服务列表

private void subscribeServiceChange(String service) {   String path = ZK_REGISTRY_PATH + "/" + service + "/providers";   log.info("订阅服务变化[{}]", service);   zkClient.subscribeChildChanges(path, new IZkChildListener() {       @Override       public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {           serverMap.remove(service);           serverMap.put(service, currentChilds);           log.info("服务[{}]发生变化,当前服务节点为{}", service, currentChilds);      }  });}

项目讲解系列至此结束!

作者:好吃懒做贪玩东

编辑:西瓜媛

推荐阅读:

如何成功追到微软小姐姐-葡萄媛

十分钟生成自己的疫情地图,小白都能立刻上手!

Linux进程、线程、文件描述符的底层原理

Paper | CVPR2019 Image Caption 之 无监督图像描述

Paper | NAACL2019 抽取式摘要之 SUMO

面经 | NLP算法岗(腾讯)

面经 | 推荐算法岗(美团)





声明:本文为OFweek维科号作者发布,不代表OFweek维科号立场。如有侵权或其他问题,请及时联系我们举报。
2
评论

评论

    相关阅读

    暂无数据

    程序媛驿站

    带你领略计算机学科之美。内容包括...

    举报文章问题

    ×
    • 营销广告
    • 重复、旧闻
    • 格式问题
    • 低俗
    • 标题夸张
    • 与事实不符
    • 疑似抄袭
    • 我有话要说
    确定 取消

    举报评论问题

    ×
    • 淫秽色情
    • 营销广告
    • 恶意攻击谩骂
    • 我要吐槽
    确定 取消

    用户登录×

    请输入用户名/手机/邮箱

    请输入密码