5

Rpc-实现Zookeeper注册中心 - zko0

 1 year ago
source link: https://www.cnblogs.com/zko0/p/17130553.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Rpc-实现Zookeeper注册中心

本文章是笔主在声哥的手写RPC框架的学习下,对注册中心的一个拓展。因为声哥某些部分没有保留拓展性,所以本文章的项目与声哥的工程有部分区别,核心内容在Curator的注册发现与注销,思想看准即可。

本文章Git仓库:zko0/zko0-rpc

声哥的RPC项目写的确实很详细,跟学一遍受益匪浅:

何人听我楚狂声的博客

在声哥的项目里使用Nacos作为了服务注册中心。本人拓展添加了ZooKeeper实现服务注册。

Nacos的服务注册和发现,设计的不是非常好,每次服务的发现都需要去注册中心拉取。本人实现ZooKeeper注册中心时,参考了Dubbo的设计原理,结合本人自身想法,添加了本地缓存:

  • Client发现服务后缓存在本地,维护一个服务——实例列表
  • 当监听到注册中心的服务列表发生了变化,Client更新本地列表
  • 当注册中心宕机,Client能够依靠本地的服务列表继续提供服务
  1. 实现服务注册的本地缓存,还需要实现注册中心的监听,当注册中心的服务发生更改时能够实现动态更新。或者用轮训的方式,定时更新,不过这种方式的服务实时性较差
  2. 当Server宕机,非临时节点注册容易出现服务残留无法清除的问题。所以我建议全部使用临时节点去注册。

zookeeper需要简单学一下,知识内容非常简单,搭建也很简单,在此跳过。

如果你感兴趣,可以参考我的ZooKeeper的文章:Zookeeper学习笔记 - zko0

①添加依赖

Curator:(简化ZooKeeper客户端使用)(Netfix研发,捐给Apache,是Apache顶级项目)

这里排除slf4j依赖,因为笔主使用的slf4j存在冲突

<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes --><dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>5.2.0</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </exclusion> </exclusions></dependency>

②代码编写

1.首先创建一个连接类:

@Slf4jpublic class ZookeeperUtil { //内部化构造方法 private ZookeeperUtil(){ } private static final String SERVER_HOSTNAME= RegisterCenterConfig.getHostName(); private static final Integer SERVER_PORT=RegisterCenterConfig.getServerPort(); private static CuratorFramework zookeeperClient; public static CuratorFramework getZookeeperClient(){ if (zookeeperClient==null){ synchronized (ZookeeperUtil.class){ if (zookeeperClient==null){ RetryPolicy retryPolic=new ExponentialBackoffRetry(3000,10); zookeeperClient = CuratorFrameworkFactory.builder() .connectString(SERVER_HOSTNAME+":"+SERVER_PORT) .retryPolicy(retryPolic) // zookeeper根目录为/serviceRegister,不为/ .namespace("serviceRegister") .build(); zookeeperClient.start(); } } } return zookeeperClient; } public static String getServerHostname(){ return SERVER_HOSTNAME; } public static Integer getServerPort(){ return SERVER_PORT; } }

其中HOST,PORT信息我保存在regiserCenter.properties配置文件夹中,使用类读取:

public class RpcConfig { //注册中心类型 private static String registerCenterType; //序列化类型 private static String serializerType; //负载均衡类型 private static String loadBalanceType; //配置Nacos地址 private static String registerCenterHost; private static Integer registerCenterPort; private static boolean zookeeperDestoryIsEphemeral; private static String serverHostName; private static Integer serverPort; static { ResourceBundle bundle = ResourceBundle.getBundle("rpc"); registerCenterType=bundle.getString("registerCenter.type"); loadBalanceType=bundle.getString("loadBalance.type"); registerCenterHost=bundle.getString("registerCenter.host"); registerCenterPort = Integer.parseInt(bundle.getString("registerCenter.port")); try { zookeeperDestoryIsEphemeral="true".equals(bundle.getString("registerCenter.destory.isEphemeral")); } catch (Exception e) { zookeeperDestoryIsEphemeral=false; } serializerType=bundle.getString("serializer.type"); serverHostName=bundle.getString("server.hostName"); serverPort=Integer.parseInt(bundle.getString("server.port")); } public static String getRegisterCenterType() { return registerCenterType; } public static String getSerializerType() { return serializerType; } public static String getLoadBalanceType() { return loadBalanceType; } public static String getRegisterCenterHost() { return registerCenterHost; } public static Integer getRegisterCenterPort() { return registerCenterPort; } public static String getServerHostName() { return serverHostName; } public static Integer getServerPort() { return serverPort; } public static boolean isZookeeperDestoryIsEphemeral() { return zookeeperDestoryIsEphemeral; } }

下面的代码我和声哥有些不同,我将服务注册,注销方法放在ServerUtils中,服务发现方法放在ClientUtils中:

服务的高一致性存在两种做法:

  • 因为ZooKeeper存在临时节点,注册中心可以实现Client(RPC的Server)断开,注册服务信息的自动丢失
  • 不设置为临时节点,手动的服务注册清除

我这里两种都实现了,虽然做两种方式不同但是功能相同的代码放在一起看起来很奇怪,这里只是做演示。选择其中一种即可。(我建议使用临时节点,当Server宕机,残留的服务信息也能及时清除)

注册实现原理图:

626723792516afae5530e634e691794
626723792516afae5530e634e691794
public interface ServiceDiscovery { InetSocketAddress searchService(String serviceName); void cleanLoaclCache(String serviceName);}
public interface ServiceRegistry { //服务注册 void register(String serviceName, InetSocketAddress inetAddress); void cleanRegistry();}

ZooKeeper接口实现:

public class ZookeeperServiceDiscovery implements ServiceDiscovery{ private final LoadBalancer loadBalancer; public ZookeeperServiceDiscovery(LoadBalancer loadBalancer) { this.loadBalancer = loadBalancer; } @Override public InetSocketAddress searchService(String serviceName) { return ZookeeperClientUtils.searchService(serviceName,loadBalancer); } @Override public void cleanLoaclCache(String serviceName) { ZookeeperClientUtils.cleanLocalCache(serviceName); }}
public class ZookeeperServiceRegistry implements ServiceRegistry{ @Override public void register(String serviceName, InetSocketAddress inetAddress) { ZookeeperServerUitls.register(serviceName,inetAddress); } @Override public void cleanRegistry() { ZookeeperServerUitls.cleanRegistry(); }}

Factory工厂:

public class ServiceFactory { private static String center = RpcConfig.getRegisterCenterType(); private static String lb= RpcConfig.getLoadBalanceType(); private static ServiceRegistry registry; private static ServiceDiscovery discovery; private static Object registerLock=new Object(); private static Object discoveryLock=new Object(); public static ServiceDiscovery getServiceDiscovery(){ if (discovery==null){ synchronized (discoveryLock){ if (discovery==null){ if ("nacos".equalsIgnoreCase(center)){ discovery= new NacosServiceDiscovery(LoadBalancerFactory.getLoadBalancer(lb)); }else if ("zookeeper".equalsIgnoreCase(center)){ discovery= new ZookeeperServiceDiscovery(LoadBalancerFactory.getLoadBalancer(lb)); } } } } return discovery; } public static ServiceRegistry getServiceRegistry(){ if (registry==null){ synchronized (registerLock){ if (registry==null){ if ("nacos".equalsIgnoreCase(center)){ registry= new NacosServiceRegistry(); }else if ("zookeeper".equalsIgnoreCase(center)){ registry= new ZookeeperServiceRegistry(); } } } } return registry; } }

使用Gson序列化InetSocketAddress存在问题,编写Util:

public class InetSocketAddressSerializerUtil { public static String getJsonByInetSockerAddress(InetSocketAddress address){ HashMap<String, String> map = new HashMap<>(); map.put("host",address.getHostName()); map.put("port",address.getPort()+""); return new Gson().toJson(map); } public static InetSocketAddress getInetSocketAddressByJson(String json){ HashMap<String,String> hashMap = new Gson().fromJson(json, HashMap.class); String host = hashMap.get("host"); Integer port=Integer.parseInt(hashMap.get("port")); return new InetSocketAddress(host,port); } }

上面主要是注册,发现的逻辑,我把主要方法写在了Utils中:

@Slf4jpublic class ZookeeperServerUitls { private static CuratorFramework client = ZookeeperUtil.getZookeeperClient(); private static final Set<String> instances=new ConcurrentHashSet<>(); public static void register(String serviceName, InetSocketAddress inetSocketAddress){ serviceName=ZookeeperUtil.serviceName2Path(serviceName);; String uuid = UUID.randomUUID().toString(); serviceName=serviceName+"/"+uuid; String json = InetSocketAddressSerializerUtil.getJsonByInetSockerAddress(inetSocketAddress); try { if (RpcConfig.isZookeeperDestoryIsEphemeral()){ //会话结束节点,创建消失 client.create() .creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL) .forPath(serviceName,json.getBytes()); } else { client.create() .creatingParentsIfNeeded() .forPath(serviceName,json.getBytes()); } } catch (Exception e) { log.error("服务注册失败"); throw new RpcException(RpcError.REGISTER_SERVICE_FAILED); } //放入map instances.add(serviceName); } public static void cleanRegistry(){ log.info("注销所有注册的服务"); //如果自动销毁,不需要清除 if (RpcConfig.isZookeeperDestoryIsEphemeral()) return; if (ZookeeperUtil.getServerHostname()!=null&&ZookeeperUtil.getServerPort()!=null&&!instances.isEmpty()){ for (String path:instances) { try { client.delete().forPath(path); } catch (Exception e) { log.error("服务注销失败"); throw new RpcException(RpcError.DESTORY_REGISTER_FALL); } } } }}
@Slf4jpublic class ZookeeperClientUtils { private static CuratorFramework client = ZookeeperUtil.getZookeeperClient(); private static final Map<String, List<InetSocketAddress>> instances=new ConcurrentHashMap<>(); public static InetSocketAddress searchService(String serviceName, LoadBalancer loadBalancer) { InetSocketAddress address; //本地缓存查询 if (instances.containsKey(serviceName)){ List<InetSocketAddress> addressList = instances.get(serviceName); if (!addressList.isEmpty()){ //使用lb进行负载均衡 return loadBalancer.select(addressList); } } try { String path = ZookeeperUtil.serviceName2Path(serviceName); //获取路径下所有的实现 List<String> instancePaths = client.getChildren().forPath(path); List<InetSocketAddress> addressList = new ArrayList<>(); for (String instancePath : instancePaths) { byte[] bytes = client.getData().forPath(path+"/"+instancePath); String json = new String(bytes); InetSocketAddress instance = InetSocketAddressSerializerUtil.getInetSocketAddressByJson(json); addressList.add(instance); } addLocalCache(serviceName,addressList); return loadBalancer.select(addressList); } catch (Exception e) { log.error("服务获取失败====>{}",e); throw new RpcException(RpcError.SERVICE_NONE_INSTANCE); } } public static void cleanLocalCache(String serviceName){ log.info("服务调用失败,清除本地缓存,重新获取实例===>{}",serviceName); instances.remove(serviceName); } public static void addLocalCache(String serviceName,List<InetSocketAddress> addressList){ //直接替换原本的缓存 instances.put(serviceName,addressList); }}

③配置文件

rpc.properties放在resources下

#nacos zookeeper#registerCenter.type=nacosregisterCenter.type=zookeeper#registerCenter.host=127.0.0.1registerCenter.host=101.43.244.40#zookeeper port default 2181#registerCenter.port=9000registerCenter.port=2181 registerCenter.destory.isEphemeral=false#??random?roundRobinloadBalance.type=random#kryo json jdkserializer.type=kryo server.hostName=127.0.0.1server.port=9999

声哥的代码我做了很多修改,如果上述代码和你参考的项目代码出入比较大,可以查看本文章的工程阅读。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK