- 前言
- 1. 客户端注册进 Nacos 注册中心(客户端视角)
- 1.1 Spring Cloud 提供的规范标准
- 1.2 Nacos 的自动配置类
- 1.3 监听服务初始化事件 AbstractAutoServiceRegistration.bind()
- 1.4 注册服务实例的逻辑 NacosServiceRegistry.register()
- 1.4.1 心跳机制 BeatReactor.addBeatInfo()
- 1.4.2 注册服务 NamingProxy.registerService()
- 1.5 以 Open API 方式发送注册请求
- 1.6 小结
- 2. Nacos 服务器注册服务(服务器视角)
- 2.1 服务器接收请求 InstanceController.register()
- 2.2 在服务器控制台注册服务实例 ServiceManager.registerInstance()
- 2.3 创建空服务 ServiceManager.createEmptyService()
- 2.3.1 将服务添加到缓存 Service.putService()
- 2.3.2 建立心跳机制 Service.init()
- 2.3.3 实现数据一致性的监听 DelegateConsistencyServiceImpl.listen()
- 2.4 小结
- 3. 客户端查询所有服务实例
- 3.1 消费者客户端向 Nacos 发出请求
- 3.2 Nacos 服务器处理请求 InstanceController.list()
- 3.2 获取所有服务的所有信息 InstanceController.doSrvIpxt()
- 4. 客户端监听 Nacos 服务器以动态获取服务实例
- 4.1 客户端发送请求
- 4.2 服务动态感知的原理
- 4.3 Nacos 服务器处理请求
- 5. 补充内容
- 5.1 Dubbo 的自动装配
- 6. 源码结构图总结
- 6.1 客户端视角下的服务注册结构图
- 6.2 服务器视角下的服务注册结构图
- 6.3 客户端查询所有服务实例结构图
- 最后
前言
参考资料:
《Spring Microservices in Action》
《Spring Cloud Alibaba 微服务原理与实战》
《B站 尚硅谷 SpringCloud 框架开发教程 周阳》
为方便理解与表达,这里把 Nacos 控制台和 Nacos 注册中心称为 Nacos 服务器(就是 web 界面那个),我们编写的业务服务称为 Nacso 客户端;
Nacos 客户端将自己注册进 Nacos 服务器。《1. 服务如何注册进 Nacos 注册中心》主要从 Nacos 客户端角度解释如何发送信息给 Nacos 服务器;《2. Nacos 服务器注册服务》主要从 Nacos 服务器角度解释注册原理;
《3. 客户端查询所有服务实例》将从服务消费者和提供者的角度,解释服务消费者如何获取提供者的所有实例。服务消费者和提供者都是 Nacos 的客户端;
《4. 客户端监听 Nacos 服务器以动态获取服务实例》从消费者客户端角度出发监听 Nacos 服务器,以动态获知提供者的变化;
1. 客户端注册进 Nacos 注册中心(客户端视角)
1.1 Spring Cloud 提供的规范标准
- 服务要注册进 Spring Cloud 集成的 Nacos,首先要满足 Spring Cloud 提供的规范标准;
- 在 spring-cloud-common 包中有一个类 org.springframework.cloud.client.serviceregistry.ServiceRegistry,它是Spring Cloud 提供的服务注册的标准。集成到 Spring Cloud 中实现服务注册的组件,都会实现该接口;
public interface ServiceRegistry<R extends Registration>{ void register(R registration); void deregister(R registration); void close(); void setStatus(R registration,String status); <T> T getstatus(R registration); }
1.2 Nacos 的自动配置类
- 在 spring-cloud-commons 包的
META-INF/spring.factories
中包含自动装配的配置信息。即约定 Spring Cloud 启动时,会将那些类自动注入到容器中:
- 其中 AutoServiceRegistrationAutoConfiguration(服务注册自动配置类) 是服务注册相关配置类,源码如下:
@Configuration @Import({AutoServiceRegistrationConfiguration.class}) @ConditionalOnProperty( value = {"spring.cloud.service-registry.auto-registration.enabled"}, matchIfMissing = true ) public class AutoServiceRegistrationAutoConfiguration { //自动注册类 @Autowired(required = false) private AutoServiceRegistration autoServiceRegistration; //自动注册类的配置文件 @Autowired private AutoServiceRegistrationProperties properties; public AutoServiceRegistrationAutoConfiguration() { } //初始化函数 @PostConstruct protected void init() { if (this.autoServiceRegistration == null && this.properties.isFailFast()) { throw new IllegalStateException("Auto Service Registration has been requested, but there is no AutoServiceRegistration bean"); } } }
- 其中关键之处在于注入了一个 AutoServiceRegistration(服务注册器) 自动注册接口,该接口在 Spring Cloud 中有一个抽象实现类 AbstractAutoServiceRegistration(服务注册器抽象类);
- 我们要用什么注册中心,该注册中心就要继承 AbstractAutoServiceRegistration(服务注册器抽象类) 抽象类;
- 对于 Nacos 来说,使用 NacosAutoServiceRegistration(Nacos 服务注册器) 类继承 AbstractAutoServiceRegistration(服务注册器抽象类) 抽象类;
1.3 监听服务初始化事件 AbstractAutoServiceRegistration.bind()
- 在上述中需要关注 ApplicationListener(事件监听器) 接口,它是一种事件监听机制,接口声明如下:
@FunctionalInterface public interface ApplicationListener<E extends ApplicationEvent> extends EventListener { void onApplicationEvent(E var1); }
- AbstractAutoServiceRegistration(服务注册器抽象类) 使用
AbstractAutoServiceRegistration.bind()
方法实现了该接口,用来监听 WebServerInitializedEvent(服务初始化事件);
@Override @SuppressWarnings("deprecation") //【断点步入】 public void onApplicationEvent(WebServerInitializedEvent event) { bind(event); }
- 我们给
AbstractAutoServiceRegistration.bind()
方法打上断点,启动服务提供者,可以发现:- Nacos 服务器上没有服务注册实例;
- 服务初始化已经完成;
- 【结论】说明服务注册到 Nacos 的流程是先进行服务初始化,然后通过事件监听机制监听初始化事件。当初始化完成时,调用
AbstractAutoServiceRegistration.bind()
方法将服务注册进 Nacos 注册中心;
1.4 注册服务实例的逻辑 NacosServiceRegistry.register()
这里能说明什么时候服务会将自己的信息发给 Nacos 服务器;
- 我们追进
AbstractAutoServiceRegistration.bind()
方法,发现在 AbstractAutoServiceRegistration(服务注册器抽象类) 里调用NacosServiceRegistry.register()
方法后, Nacos 服务器上出现服务实例;
- 我们追进
NacosServiceRegistry.register()
方法,方法的逻辑如下:
@Override public void register(Registration registration) { //判断是否有服务 ID if (StringUtils.isEmpty(registration.getServiceId())) { log.warn("No service to register for nacos client..."); return; } String serviceId = registration.getServiceId(); //服务 ID(service-provider) Instance instance = getNacosInstanceFromRegistration(registration); //服务实例(里面有 ip、port 等信息) try { //【断点步入】注册的方法 namingService.registerInstance(serviceId, instance); log.info("nacos registry, {} {}:{} register finished", serviceId, instance.getIp(), instance.getPort()); } catch (Exception e) { log.error("nacos registry, {} register failed...{},", serviceId, registration.toString(), e); } }
- 我们点进去
namingService.registerInstance()
方法(实现逻辑在 NacosNamingService.registerInstance())得到注册的具体逻辑,如下:
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { if (instance.isEphemeral()) { //用心跳 BeatInfo 封装服务实例信息 BeatInfo beatInfo = new BeatInfo(); beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName)); beatInfo.setIp(instance.getIp()); beatInfo.setPort(instance.getPort()); beatInfo.setCluster(instance.getClusterName()); beatInfo.setWeight(instance.getWeight()); beatInfo.setMetadata(instance.getMetadata()); beatInfo.setScheduled(false); long instanceInterval = instance.getInstanceHeartBeatInterval(); beatInfo.setPeriod(instanceInterval == 0L ? DEFAULT_HEART_BEAT_INTERVAL : instanceInterval); //【断点步入 1.4.1】将 beatInfo 心跳信息放进 beatReactor 心跳发送器(发送心跳后,Nacos 服务器仍然没有服务实例) this.beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo); } //【断点步入 1.4.2】使用 namingProxy 命名代理方式将服务实例信息以 POST 请求方式发送服务实例 this.serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance); }
-
即注册服务实例的逻辑分两步:
- 先通过
beatReactor.addBeatInfo
创建心跳信息实现健康检测; - 再以 POST 请求方式发送服务实例信息;
- 先通过
1.4.1 心跳机制 BeatReactor.addBeatInfo()
- 我们进入
BeatReactor.addBeatInfo()
方法一探心跳机制,源码如下:
public void addBeatInfo(String serviceName, BeatInfo beatInfo) { LogUtils.NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo); this.dom2Beat.put(this.buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()), beatInfo); //【核心】定时向服务端发送一个心跳包 beatInfo this.executorService.schedule(new BeatReactor.BeatTask(beatInfo), 0L, TimeUnit.MILLISECONDS); //【核心】 MetricsMonitor.getDom2BeatSizeMonitor().set((double)this.dom2Beat.size()); }
-
BeatReactor.addBeatInfo()
方法主要做了两件事:- 客户端调用
ScheduledExecutorService.schedule()
接口方法(靠 ScheduledThreadPoolExecutor(计划线程执行器) 实现)执行定时任务,在每个任务周期内定时向服务端发送一个心跳包 beatInfo; - 然后通过
MetricsMonitor.getDom2BeatSizeMonitor()
方法获取一个 心跳测量监视器(实际为 Gauge),不断检测服务端的回应,如果在设定时间内没有收到服务端的回应,则认为服务器出现了故障;
- 客户端调用
-
Nacos 服务端会根据客户端的心跳包不断更新服务的状态;
1.4.2 注册服务 NamingProxy.registerService()
- 接着进入
NamingProxy.registerService()
方法,源码如下:
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException { LogUtils.NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", new Object[]{this.namespaceId, serviceName, instance}); Map<String, String> params = new HashMap(9); params.put("namespaceId", this.namespaceId); params.put("serviceName", serviceName); params.put("groupName", groupName); params.put("clusterName", instance.getClusterName()); params.put("ip", instance.getIp()); params.put("port", String.valueOf(instance.getPort())); params.put("weight", String.valueOf(instance.getWeight())); params.put("enable", String.valueOf(instance.isEnabled())); params.put("healthy", String.valueOf(instance.isHealthy())); params.put("ephemeral", String.valueOf(instance.isEphemeral())); params.put("metadata", JSON.toJSONString(instance.getMetadata())); //【断点步入】这步执行完后,Nacos 服务器才出现服务实例信息 this.reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, (String)"POST"); }
- 该方法先对服务实例做了封装,然后通过
NamingProxy.reqAPI()
方法拼凑注册服务的 API。NamingProxy.reqAPI()
方法源码如下:
public String reqAPI(String api, Map<String, String> params, List<String> servers, String method) { params.put("namespaceId", this.getNamespaceId()); if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(this.nacosDomain)) { throw new IllegalArgumentException("no server available"); } else { Exception exception = new Exception(); if (servers != null && !servers.isEmpty()) { Random random = new Random(System.currentTimeMillis()); int index = random.nextInt(servers.size()); for(int i = 0; i < servers.size(); ++i) { String server = (String)servers.get(index); try { return this.callServer(api, params, server, method); } catch (NacosException var11) { exception = var11; LogUtils.NAMING_LOGGER.error("request {} failed.", server, var11); } catch (Exception var12) { exception = var12; LogUtils.NAMING_LOGGER.error("request {} failed.", server, var12); } index = (index + 1) % servers.size(); } throw new IllegalStateException("failed to req API:" + api + " after all servers(" + servers + ") tried: " + ((Exception)exception).getMessage()); } else { int i = 0; while(i < 3) { try { return this.callServer(api, params, this.nacosDomain); } catch (Exception var13) { exception = var13; LogUtils.NAMING_LOGGER.error("[NA] req api:" + api + " failed, server(" + this.nacosDomain, var13); ++i; } } throw new IllegalStateException("failed to req API:/api/" + api + " after all servers(" + servers + ") tried: " + ((Exception)exception).getMessage()); } } }
1.5 以 Open API 方式发送注册请求
- 最终将以 Open API 方式发送如下请求给 Nacos 服务器:
POST ''
1.6 小结
- Nacos 在先加载配置文件初始化服务,使用 ApplicationListener(事件监听器) 监听机制监听该服务初始化事件,当服务初始化完成后,进入
NacosServiceRegistry.register()
注册逻辑; - 注册原理分两步:
- 先使用 NacosNamingService(Nacos 命名服务) 发送心跳;
- 再使用 NamingProxy(命名代理),以 POST 请求方式发送服务实例给 Nacos 服务器;
- 即 Nacos Service 必须要确保注册的服务实例是健康的(心跳检测),才能进行服务注册;
2. Nacos 服务器注册服务(服务器视角)
- Nacos 服务器的源码下载、启动详情请见《微服务架构 | 3.2 Alibaba Nacos 注册中心》;
- 上述提到客户端注册服务器的最后一步是向服务器发送如下 Open API 请求:
POST ''
- 那么服务器的源码分析将从这个请求开始;
2.1 服务器接收请求 InstanceController.register()
- 服务器在 nacos-naming 模块下 InstanceController(服务实例控制器) 里定义了一个
register
方法用来处理客户端的注册,源码如下:
@RestController @RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance") public class InstanceController { //省略其他代码 @CanDistro @PostMapping @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public String register(HttpServletRequest request) throws Exception { //从请求体里解析出 namespaceId 命名空间(本例中是 public) final String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); //解析 serviceName 服务名(本例中是 DEFAULT_GROUP@@service-provider,实际就是service-provider) final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); NamingUtils.checkServiceNameFormat(serviceName); final Instance instance = parseInstance(request); //【断点步入】在服务器控制台注册服务实例的方法 serviceManager.registerInstance(namespaceId, serviceName, instance); return "ok"; } //省略其他代码 }
2.2 在服务器控制台注册服务实例 ServiceManager.registerInstance()
ServiceManager.registerInstance()
方法的源码如下:
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException { //【断点步入 2.3】创建空服务 createEmptyService(namespaceId, serviceName, instance.isEphemeral()); Service service = getService(namespaceId, serviceName); if (service == null) { throw new NacosException(NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName); } //添加服务实例 addInstance(namespaceId, serviceName, instance.isEphemeral(), instance); }
- 这段代码主要做了三个逻辑的内容:
createEmptyService()
:创建空服务(用于在 Nacos 服务器控制台的"服务列表"中展示服务信息),实际上是初始化一个 serviceMap,它是一个 ConcurrentHashMap 集合;getService()
:从 serviceMap 中根据 namespaceld 和 servi.............原文转载:http://www.shaoqun.com/a/1451775.html
深圳魔女宅急便音乐剧演出地址、交通:http://www.30bags.com/a/613286.html
深圳中秋一日游去哪里:http://www.30bags.com/a/630276.html
深圳茶颜悦色有外卖吗:http://www.30bags.com/a/656391.html
联动优势:https://www.ikjzd.com/w/1921
好评模板:https://www.ikjzd.com/tl/5738
捷汇:https://m.ikjzd.com/w/419
2020春节Lazada物流安排&客服排班时间表来啦:http://www.kjdsnews.com/a/762444.html
亚马逊FBA费用立刻上涨5.2%,该如何应对?:http://www.kjdsnews.com/a/762445.html
近日,多艘货船因船员疫情而被隔离,船期大受影响!:http://www.kjdsnews.com/a/762446.html
No comments:
Post a Comment