一.前言
我们在之前的一篇文章分析了eureka服务端的启动过程[],现在分析一下eureka客户端的启动,eureka客户端主要通过向注册中心的注册及续约,维持服务实例在注册中心服务列表的存在,供服务调用方发现对外提供服务;
那么eureka客户端向注册中心的注册和续约过程是怎样的呢?注册中心又是怎么保存服务实例信息的呢?接下来我们分析一下源码;
二. Eureka客户端的启动过程
2.1 @EnableEurekaClient
该注解注释在应用程序的启动入口类,来启动eureka客户端;
/** * Convenience annotation for clients to enable Eureka discovery configuration * (specifically). Use this (optionally) in case you want discovery and know for sure that * it is Eureka you want. All it does is turn on discovery and let the autoconfiguration * find the eureka classes if they are available (i.e. you need Eureka on the classpath as * well). * * @author Dave Syer * @author Spencer Gibb */@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Documented@Inherited@EnableDiscoveryClientpublic @interface EnableEurekaClient {}
通过这个注解的注释说明我们可以看到,这个注解主要用于启动eureka服务发现客户端的配置,在这个注解上有@EnableDiscoveryClient的修饰,说明这个注解拥有@EnableDiscoveryClient的功能;
值得注意的是,对于Spring Cloud的服务发现功能,Eureka只是实现此功能的一个插件式的存在,Spring Cloud 服务发现还支持如alibaba的Dubbo融合,两者在使用过程中效果完全一致,不比纠结;
@EnableDiscoveryClient: 开启Spring Cloud 服务发现客户端的注解;
@EnableEurekaClient : 开启以Eureka为Spring Cloud服务发现组件的客户端的注解;
2.2 EnableDiscoveryClientImportSelector
@EnableDiscoveryClient又是如何启动服务的呢?
/** * Annotation to enable a DiscoveryClient implementation. * @author Spencer Gibb */@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Documented@Inherited@Import(EnableDiscoveryClientImportSelector.class)public @interface EnableDiscoveryClient { /** * If true, the ServiceRegistry will automatically register the local server. */ boolean autoRegister() default true;}
主要通过注解导入了一个配置交由spring管理:
@Order(Ordered.LOWEST_PRECEDENCE - 100)public class EnableDiscoveryClientImportSelector extends SpringFactoryImportSelector{ @Override public String[] selectImports(AnnotationMetadata metadata) { //调用父类的方法,拿到通过父类方法要注入的全路径类名数组; String[] imports = super.selectImports(metadata); //获得该注解(@EnableDiscoveryClient)的所有属性参数 AnnotationAttributes attributes = AnnotationAttributes.fromMap( metadata.getAnnotationAttributes(getAnnotationClass().getName(), true)); //获得属性autoRegister的值,该值默认是true的; boolean autoRegister = attributes.getBoolean("autoRegister"); //根据注解配置来判断是否要实例化下面的那个自动配置类 if (autoRegister) { List importsList = new ArrayList<>(Arrays.asList(imports)); importsList.add("org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationConfiguration"); imports = importsList.toArray(new String[0]); } return imports; } @Override protected boolean isEnabled() { return new RelaxedPropertyResolver(getEnvironment()).getProperty( "spring.cloud.discovery.enabled", Boolean.class, Boolean.TRUE); } @Override protected boolean hasDefaultFactory() { return true; }}
说明:
1) 这个类通过继承 抽象类 SpringFactoryImportSelector<T> 来实现AutoServiceRegistrationConfiguration配置类的条件实例化;
抽象类 SpringFactoryImportSelector<T>直接实现了DeferredImportSelector接口,间接实现 接口,
的用法: 通过实现 接口的 public String[] selectImports(AnnotationMetadata metadata);方法,返回要实例化的全路径类名数组,来实现普通java类的注入;
2.3 AutoServiceRegistrationConfiguration
@Configuration@EnableConfigurationProperties(AutoServiceRegistrationProperties.class)public class AutoServiceRegistrationConfiguration {}
开启了配置类:
/** * @author Spencer Gibb */@ConfigurationProperties("spring.cloud.service-registry.auto-registration")public class AutoServiceRegistrationProperties { /** If Auto-Service Registration is enabled, default to true. */ private boolean enabled = true; /** Should startup fail if there is no AutoServiceRegistration, default to false. */ private boolean failFast = false; public boolean isFailFast() { return failFast; } public void setFailFast(boolean failFast) { this.failFast = failFast; }}
这个类有两个配置属性,enable:自动服务注册是否开启,默认为true,没看懂是什么意思,关键还没有getter/setter方法;
isFailFast:是否启动快速失败,默认为false,我们查看下getter方法的调用,看一下干什么的;
@Configuration@ConditionalOnBean(AutoServiceRegistrationProperties.class)public class AutoServiceRegistrationAutoConfiguration { @Autowired(required = false) private AutoServiceRegistration autoServiceRegistration; @Autowired private AutoServiceRegistrationProperties properties; @PostConstruct protected void init() { if (autoServiceRegistration == null && this.properties.isFailFast()) { throw new IllegalStateException("Auto Service Registration has been requested, but there is no AutoServiceRegistration bean"); } }}
仅在这里的init()方法有调用,
可以看到这个类实例化的条件@ConditionalOnBean(AutoServiceRegistrationProperties.class)是是否有之前的配置类AutoServiceRegistrationProperties存在;
init()方法通过servlet注解 @PostConstruct使得该方法在该类构造函数执行后执行,从而达到自动执行的目的,在这里检验了AutoServiceRegistration bean是否存在,不存在则抛出异常程序立即启动失败,那么这个bean又是干什么,在哪里实例化的呢?
我们查看引用:
AutoServiceRegistration是一个接口,我们查看其具体实现类的调用:
在org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration#eurekaAutoServiceRegistration我们发现了其实例化的位置;
2.4 EurekaClientAutoConfiguration的实例化条件
EurekaClientAutoConfiguration 通过名字我们可知这是Eureka客户端自动配置类;
@Configuration@EnableConfigurationProperties@ConditionalOnClass(EurekaClientConfig.class)@ConditionalOnBean(EurekaDiscoveryClientConfiguration.Marker.class)@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)@AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class, CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })@AutoConfigureAfter(name = "org.springframework.cloud.autoconfigure.RefreshAutoConfiguration")public class EurekaClientAutoConfiguration { .... 省略代码先 ....}
- 首先 @Configuration注解是的该配置类被spring所感知到
- @EnableConfigurationProperties注解开启对于通过@ConfigurationProperties引入外部配置文件属性注册成bean的支持;
- @ConditionalOnClass(EurekaClientConfig.class) 该配置类实例化的第一个条件,容器上下文中存在EurekaClientConfig类;EurekaClientConfig的默认实现类提供了一些环境配置的属性值;
- @ConditionalOnBean(EurekaDiscoveryClientConfiguration.Marker.class) 配置类实例化的另一个条件,上下文中存在EurekaDiscoveryClientConfiguration.Marker的bean;
这个bean是什么时候实例化的呢?通过调用我们看到:
@ConditionalOnClass(ConfigServicePropertySourceLocator.class)@ConditionalOnProperty(value = "spring.cloud.config.discovery.enabled", matchIfMissing = false)@Configuration@Import({ EurekaDiscoveryClientConfiguration.class, // this emulates @EnableDiscoveryClient, the import selector doesn't run before the bootstrap phase EurekaClientAutoConfiguration.class })public class EurekaDiscoveryClientConfigServiceBootstrapConfiguration {}
而在META-INF/spring.factories文件中:
org.springframework.cloud.bootstrap.BootstrapConfiguration=\org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceBootstrapConfiguration
我们在 BootstrapApplicationListener 这个类的 151行发现了调用; 通过 SpringFactoriesLoader 加载了这个配置,在onApplicationEvent(ApplicationEnvironmentPreparedEvent event)方法中被调用;
public class BootstrapApplicationListener implements ApplicationListener, Ordered { @Override public void onApplicationEvent(ApplicationEnvironmentPreparedEvent event) { } ... 代码省略 ...}
BootstrapApplicationListener 实现了 ApplicationListener 接口 并且指定了一个 ApplicationEnvironmentPreparedEvent 事件;
这里使用的是Spring的监听器,利用观察者模式(Observer),使用方法我们会在另一篇文章中讲解;通俗来讲就是我们定义一个监听器,和事件,当我们发布这个事件时,监听器会监听到,然后执行对应的操作,也叫<发布--订阅>;
回到这里面讲,Spring的监听器,Spring根据应用启动的过程,提供了四种事件供我们使用:
- ApplicationStartedEvent :spring boot启动开始时执行的事件
- ApplicationEnvironmentPreparedEvent:spring boot 对应Enviroment已经准备完毕,但此时上下文context还没有创建。
- ApplicationPreparedEvent:spring boot上下文context创建完成,但此时spring中的bean是没有完全加载完成的。
- ApplicationFailedEvent:spring boot启动异常时执行事件
这样就清晰了,当应用环境初始化完毕,但是还没有创建context(容器/上下文)的节点时,个人理解其实就是应用启动到初始化完各种配置属性后,context容器还未创建,各种bean还没有实例化的时候,通过发布此时的时间来触发监听器,执行加载该配置文件并且实例化EurekaDiscoveryClientConfiguration.Marker这个bean;
5. @ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true) 实例化条件,默认为true,这个属性参数很熟悉了;
6. @AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class,
CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })这个注解的用法通过注释我们可以知道,示意一个自动配置类应该在另外一个特殊的自动配置类之前被实例化;用于指定配置的实例化顺序;
- NoopDiscoveryClientAutoConfiguration :
- 直译等待发现客户端自动配置,实例化条件 @ConditionalOnMissingBean(DiscoveryClient.class)
- 查看 接口DiscoveryClient的实现类有EurekaDiscoveryClient和NoopDiscoveryClient,该注解含义是若上下文中存在DiscoveryClient的bean,则实例化被注解的类,否则实例化DiscoveryClient,若实例化成功,再实例化被注解的类;由于EurekaClientAutoConfiguration 实例化是先于 NoopDiscoveryClientAutoConfiguration ,而 EurekaDiscoveryClient在EurekaClientAutoConfiguration 中 147行实例化,因此这里条件满足;
- 在这里如果SpringCloud的服务发现模块不是由Eureka实现的,或者没有指定服务发现模块,则这里将会实例化NoopDiscoveryClient,也印证了Eureka是SpringCloud服务发现功能实现的一个选择而已.NoopDiscoveryClient是不是SpringCloud默认的服务发现客户端实现呢?
- 这个类实现了Spring的监听器ApplicationListener接口,并指定了ContextRefreshedEvent(Spring容器加载完毕后)事件,由于该类实例化在EurekaClientAutoConfiguration之后,所以该类实例化时服务发现客户端已经实例化完毕,因此在容器加载完毕后发布了实例化注册事件,在服务应用实例注册服务注册中心之后,表明客户端注册完毕;
- CommonsClientAutoConfiguration :
- 看类内容貌似是健康检查的一些方法,这里不重点讲解了,之后有需要在写一篇文章链接过去;
- ServiceRegistryAutoConfiguration :
/** * 看类内容,EndPoint主要是用来监控应用服务运行状况的; */@Configurationpublic class ServiceRegistryAutoConfiguration { /** * 实例化条件,该bean之前在EurekaClientAutoConfiguration中已经实例化完毕 */ @ConditionalOnBean(ServiceRegistry.class) @ConditionalOnClass(Endpoint.class) protected class ServiceRegistryEndpointConfiguration { //该bean之前在EurekaClientAutoConfiguration中已经实例化完毕 @Autowired(required = false) private Registration registration; @Bean public ServiceRegistryEndpoint serviceRegistryEndpoint(ServiceRegistry serviceRegistry) { ServiceRegistryEndpoint endpoint = new ServiceRegistryEndpoint(serviceRegistry); endpoint.setRegistration(registration); return endpoint; } }}
再来看上面主要想实例化的bean,ServiceRegistryEndpoint ,可以看到提供了2个接口供调用,获取状态(getStatus)和修改状态(setStatus);
@ManagedResource(description = "Can be used to display and set the service instance status using the service registry")@SuppressWarnings("unchecked")public class ServiceRegistryEndpoint implements MvcEndpoint { private final ServiceRegistry serviceRegistry; private Registration registration; public ServiceRegistryEndpoint(ServiceRegistry serviceRegistry) { this.serviceRegistry = serviceRegistry; } public void setRegistration(Registration registration) { this.registration = registration; } @RequestMapping(path = "instance-status", method = RequestMethod.POST) @ResponseBody @ManagedOperation public ResponseEntity setStatus(@RequestBody String status) { Assert.notNull(status, "status may not by null"); if (this.registration == null) { return ResponseEntity.status(HttpStatus.NOT_FOUND).body("no registration found"); } this.serviceRegistry.setStatus(this.registration, status); return ResponseEntity.ok().build(); } @RequestMapping(path = "instance-status", method = RequestMethod.GET) @ResponseBody @ManagedAttribute public ResponseEntity getStatus() { if (this.registration == null) { return ResponseEntity.status(HttpStatus.NOT_FOUND).body("no registration found"); } return ResponseEntity.ok().body(this.serviceRegistry.getStatus(this.registration)); } @Override public String getPath() { return "/service-registry"; } @Override public boolean isSensitive() { return true; } @Override public Class > getEndpointType() { return null; }}
关于EndPoint的用法我们之后会写一篇文章链接过去,专门介绍; //TODO
7. @AutoConfigureAfter(name = "org.springframework.cloud.autoconfigure.RefreshAutoConfiguration")
这个厉害了,竟然要在EurekaClientAutoConfiguration之前实例化;我们看下做了什么;
看类注释是说环境中关联属性改变刷新作用域的自动配置类,//TODO
这样,EurekaClientAutoConfiguration即将被实例化;
2.5 EurekaClientAutoConfiguration实例化过程
2.5.1 Static Inner Class *2
有两个静态内部类优先加载
- EurekaClientConfiguration
@Configuration @ConditionalOnMissingRefreshScope protected static class EurekaClientConfiguration { @Autowired private ApplicationContext context; @Autowired(required = false) private DiscoveryClientOptionalArgs optionalArgs; @Bean(destroyMethod = "shutdown") @ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT) public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config) { return new CloudEurekaClient(manager, config, this.optionalArgs, this.context); } @Bean @ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT) public ApplicationInfoManager eurekaApplicationInfoManager( EurekaInstanceConfig config) { InstanceInfo instanceInfo = new InstanceInfoFactory().create(config); return new ApplicationInfoManager(config, instanceInfo); } }
-
RefreshableEurekaClientConfiguration
@Configuration @ConditionalOnRefreshScope protected static class RefreshableEurekaClientConfiguration { @Autowired private ApplicationContext context; @Autowired(required = false) private DiscoveryClientOptionalArgs optionalArgs; @Bean(destroyMethod = "shutdown") @ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT) @org.springframework.cloud.context.config.annotation.RefreshScope @Lazy public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config, EurekaInstanceConfig instance) { manager.getInfo(); // force initialization return new CloudEurekaClient(manager, config, this.optionalArgs, this.context); } @Bean @ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT) @org.springframework.cloud.context.config.annotation.RefreshScope @Lazy public ApplicationInfoManager eurekaApplicationInfoManager( EurekaInstanceConfig config) { InstanceInfo instanceInfo = new InstanceInfoFactory().create(config); return new ApplicationInfoManager(config, instanceInfo); } }
在上面首先实例化了ApplicationInfoManager 和EurekaClient 两个bean,区别之处在于是否支持刷新,这里先不介绍了,之后会有文章链接过去;//TODO
在实例化EurekaClient中,包含里EurekaClient启动的主要操作动作,我们在第三节分析;
2.5.2 HasFeatures
@Bean public HasFeatures eurekaFeature() { return HasFeatures.namedFeature("Eureka Client", EurekaClient.class); }
这个bean看调用是用于应用监控的;
2.5.3 EurekaClientConfigBean
实例化了eureka客户端一些配置信息的bean,该bean属性值来自于eureka.client的配置字段;
2.5.4 EurekaInstanceConfigBean
实例化了eureka实例配置信息的bean,该bean属性值来自于eureka.instance的配置字段;
2.5.5 DiscoveryClient
2.5.6 EurekaServiceRegistry
在之前讲到ServiceRegistryAutoConfiguration的时候有提到过
三. Eureka客户端启动主要流程
3.1 启动入口
通过步步深入,我们来到了 com.netflix.discovery.DiscoveryClient#DiscoveryClient(com.netflix.appinfo.ApplicationInfoManager, com.netflix.discovery.EurekaClientConfig, com.netflix.discovery.AbstractDiscoveryClientOptionalArgs, javax.inject.Provider<com.netflix.discovery.BackupRegistry>)这个方法;
@Inject DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, ProviderbackupRegistryProvider) { if (args != null) { this.healthCheckHandlerProvider = args.healthCheckHandlerProvider; this.healthCheckCallbackProvider = args.healthCheckCallbackProvider; this.eventListeners.addAll(args.getEventListeners()); } else { this.healthCheckCallbackProvider = null; this.healthCheckHandlerProvider = null; } this.applicationInfoManager = applicationInfoManager; InstanceInfo myInfo = applicationInfoManager.getInfo(); clientConfig = config; staticClientConfig = clientConfig; transportConfig = config.getTransportConfig(); instanceInfo = myInfo; if (myInfo != null) { appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId(); } else { logger.warn("Setting instanceInfo to a passed in null value"); } this.backupRegistryProvider = backupRegistryProvider; this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo); localRegionApps.set(new Applications()); fetchRegistryGeneration = new AtomicLong(0); remoteRegionsToFetch = new AtomicReference (clientConfig.fetchRegistryForRemoteRegions()); remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(",")); if (config.shouldFetchRegistry()) { this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } if (config.shouldRegisterWithEureka()) { this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } logger.info("Initializing Eureka in region {}", clientConfig.getRegion()); if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) { logger.info("Client configured to neither register nor query for data."); scheduler = null; heartbeatExecutor = null; cacheRefreshExecutor = null; eurekaTransport = null; instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion()); // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() // to work with DI'd DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, this.getApplications().size()); return; // no need to setup up an network tasks and we are done } try { scheduler = Executors.newScheduledThreadPool(3, new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-%d") .setDaemon(true) .build()); heartbeatExecutor = new ThreadPoolExecutor( 1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue (), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d") .setDaemon(true) .build() ); // use direct handoff cacheRefreshExecutor = new ThreadPoolExecutor( 1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue (), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d") .setDaemon(true) .build() ); // use direct handoff eurekaTransport = new EurekaTransport(); scheduleServerEndpointTask(eurekaTransport, args); AzToRegionMapper azToRegionMapper; if (clientConfig.shouldUseDnsForFetchingServiceUrls()) { azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig); } else { azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig); } if (null != remoteRegionsToFetch.get()) { azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(",")); } instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion()); } catch (Throwable e) { throw new RuntimeException("Failed to initialize DiscoveryClient!", e); } if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) { fetchRegistryFromBackup(); } initScheduledTasks(); try { Monitors.registerObject(this); } catch (Throwable e) { logger.warn("Cannot register timers", e); } // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() // to work with DI'd DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, this.getApplications().size()); }
这里面就是eurekaClient启动的所有操作了;我们主要看下面的这段代码,摘自上面一大段中的一小段;
3.2 创建线程池
我们知道eureka客户端维持在注册中心的可用主要通过注册和续约,为定时任务;
//任务管理线程池 scheduler = Executors.newScheduledThreadPool(3, new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-%d") .setDaemon(true) .build()); //心跳线程池 heartbeatExecutor = new ThreadPoolExecutor( 1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d") .setDaemon(true) .build() ); // use direct handoff //缓存刷新线程池 cacheRefreshExecutor = new ThreadPoolExecutor( 1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue (), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d") .setDaemon(true) .build()
3.3 初始化定时任务 initScheduledTasks()
在这个方法中初始化了2个定时任务,分别是缓存刷新和心跳注册/续约,两者初始化操作相同,我们以后者为例说明如何初始化定时任务的;
3.3.1 ScheduledExecutorService
在这里采用了java自带的定时任务工具 ScheduledExecutorService,该工具是一个基于线程池的任务工具,用法请参考最后一段介绍关于ScheduledExecutorService的介绍;
scheduler.schedule( new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ), renewalIntervalInSecs, TimeUnit.SECONDS);
看完上面链接的文章,我们知道scheduler.schedule( ...params) 是延迟执行一次任务的方法,但是我们知道,心跳续约是定时循环发生的,那么这里的操作就不符合我们的期望啊!!!
别急,我们再仔细看一下这个方法里面的参数,TimedSupervisorTask 值得深入研究一下;
TimedSupervisorTask 在顶层实现了Runnable 接口, 可以作为task传入scheduler方法中; 接着查看 TimedSupervisorTask的run()方法;在这里找到了我们的答案;
在这个方法中,我们剥掉不重要的操作,留下主要的步骤:
public void run() { Future future = null; try { future = executor.submit(task); ... 省略部分代码... } catch (TimeoutException e) { ... 省略部分代码... } catch (RejectedExecutionException e) { ... 省略部分代码... } catch (Throwable e) { ... 省略部分代码... } finally { ... 省略部分代码... if (!scheduler.isShutdown()) { scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS); } } }
首先调用线程池submit(task) 方法提交任务执行,在finally方法中再一次的将任务装载进任务管理中scheduler.schedule()延迟执行;这样每次执行完一次任务后重新装载下一次任务,从而达到任务周而复始,不断执行的目的;每30s执行一次;
3.3.2 续约renew()
在com.netflix.discovery.DiscoveryClient.HeartbeatThread中定义了续约的操作,我们查看renew()方法;
/** * Renew with the eureka service by making the appropriate REST call */ boolean renew() { EurekaHttpResponsehttpResponse; try { httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null); logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode()); if (httpResponse.getStatusCode() == 404) { REREGISTER_COUNTER.increment(); logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName()); return register(); } return httpResponse.getStatusCode() == 200; } catch (Throwable e) { logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e); return false; } }
在renew()这个方法中,首先向注册中心执行了心跳续约的请求,StatusCode为200成功,若为404则执行register()重新注册操作;
3.3.3 注册register()
/** * Register with the eureka service by making the appropriate REST call. */ boolean register() throws Throwable { logger.info(PREFIX + appPathIdentifier + ": registering service..."); EurekaHttpResponsehttpResponse; try { httpResponse = eurekaTransport.registrationClient.register(instanceInfo); } catch (Exception e) { logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e); throw e; } if (logger.isInfoEnabled()) { logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode()); } return httpResponse.getStatusCode() == 204; }
首先执行注册请求,204说明注册成功;
3.4 注册续约流程
3.4.1 注册
在3.3呢,初始化了定时任务,那么第一次注册是在哪里呢?哈哈,在初始化定时任务方法initScheduledTasks()的最后一行:
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
InstanceInfoReplicator实现了Runnable接口,
public void start(int initialDelayMs) { if (started.compareAndSet(false, true)) { instanceInfo.setIsDirty(); // for initial register Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } }
在调用该方法时,将任务交由scheduler进行延迟执行一次,延迟时间为默认40s;我们找到run()方法;
public void run() { try { discoveryClient.refreshInstanceInfo(); Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) { discoveryClient.register(); instanceInfo.unsetIsDirty(dirtyTimestamp); } } catch (Throwable t) { logger.warn("There was a problem with the instance info replicator", t); } finally { Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } }
可以看到执行了第一次的注册操作,时间节点大约在服务启动后的第40s;并且在finally中再一次装载了这个任务;可见注册任务也是定时循环执行的;
那么正常情况下,成功register一次,后续应该通过renew保持状态,这又是怎么处理的呢?
3.4.2 如何避免重复注册
eureka客户端在第一次执行register注册时,将标记com.netflix.appinfo.InstanceInfo#isInstanceInfoDirty置为true,默认是false的;并且将在com.netflix.appinfo.InstanceInfo#lastDirtyTimestamp保存时间戳
public void start(int initialDelayMs) { if (started.compareAndSet(false, true)) { instanceInfo.setIsDirty(); // for initial register Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } }
在instanceInfo.setIsDirty()方法中:
public synchronized void setIsDirty() { isInstanceInfoDirty = true; lastDirtyTimestamp = System.currentTimeMillis(); }
这样第一次执行register操作时,满足if判断条件;
public void run() { try { discoveryClient.refreshInstanceInfo(); // //第一次注册时,dirtyTimestamp 塞入了时间戳,这里可以拿的到 Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) { discoveryClient.register(); //第一次执行注册register后,将isInstanceInfoDirty 置为false instanceInfo.unsetIsDirty(dirtyTimestamp); } } catch (Throwable t) { logger.warn("There was a problem with the instance info replicator", t); } finally { Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } } public synchronized void unsetIsDirty(long unsetDirtyTimestamp) { if (lastDirtyTimestamp <= unsetDirtyTimestamp) { isInstanceInfoDirty = false; } else { } } public synchronized Long isDirtyWithTime() { //被置为false,拿不到时间戳,返回null,这样最上面的run()方法在定时任务执行时不满足if条件,不会再次执行注册方法 if (isInstanceInfoDirty) { return lastDirtyTimestamp; } else { return null; } }
通过上面的分析,我们在这里总结一下eureka客户端正常注册流程: 有一个boolean标记和时间戳记录;通过维护Boolean标记来决定是否可以获得时间戳记录,从而决定是否在注册定时任务中执行注册register操作,因此正常情况下通过定时任务触发的注册方法只会执行一次;那么什么情况下register()会再次在定时任务中满足执行条件呢?注意这里重点讲的是通过定时任务触发注册方法执行,而不是register()方法的执行;
我们看run()方法中调用的第一个方法 discoveryClient.refreshInstanceInfo(); 通过名字可以看出这里刷新了实例信息;我们研究一下;
3.4.3 实例的刷新
首先我们打开这个方法:
void refreshInstanceInfo() { //1. 刷新数据中心信息如果需要 applicationInfoManager.refreshDataCenterInfoIfRequired(); //2. 刷新续约信息如果需要 applicationInfoManager.refreshLeaseInfoIfRequired(); InstanceStatus status; try { //通过健康检查获取实例状态 status = getHealthCheckHandler().getStatus(instanceInfo.getStatus()); } catch (Exception e) { logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e); status = InstanceStatus.DOWN; } if (null != status) { applicationInfoManager.setInstanceStatus(status); } }
先看第一个刷新applicationInfoManager.refreshDataCenterInfoIfRequired();
public void refreshDataCenterInfoIfRequired() { String existingAddress = instanceInfo.getHostName(); String newAddress; if (config instanceof RefreshableInstanceConfig) { // Refresh data center info, and return up to date address newAddress = ((RefreshableInstanceConfig) config).resolveDefaultAddress(true); } else { newAddress = config.getHostName(true); } String newIp = config.getIpAddress(); if (newAddress != null && !newAddress.equals(existingAddress)) { logger.warn("The address changed from : {} => {}", existingAddress, newAddress); // :( in the legacy code here the builder is acting as a mutator. // This is hard to fix as this same instanceInfo instance is referenced elsewhere. // We will most likely re-write the client at sometime so not fixing for now. InstanceInfo.Builder builder = new InstanceInfo.Builder(instanceInfo); builder.setHostName(newAddress).setIPAddr(newIp).setDataCenterInfo(config.getDataCenterInfo()); instanceInfo.setIsDirty(); } }
我们看newAddress = config.getHostName(true);这个方法;
@Override public String getHostName(boolean refresh) { if (refresh && !this.hostInfo.override) { this.ipAddress = this.hostInfo.getIpAddress(); this.hostname = this.hostInfo.getHostname(); } return this.preferIpAddress ? this.ipAddress : this.hostname; }
这里涉及到配置文件的一个属性配置: eureka.client.preferIpAddress,在上面方法的return 中完美的解释了这个属性的用法;在访问服务的主机的时候,IP地址是否优先于主机名使用;默认是false的;
上面刷新了服务实例信息;再看另外一个刷新方法: refreshLeaseInfoIfRequired()
public void refreshLeaseInfoIfRequired() { LeaseInfo leaseInfo = instanceInfo.getLeaseInfo(); if (leaseInfo == null) { return; } int currentLeaseDuration = config.getLeaseExpirationDurationInSeconds(); int currentLeaseRenewal = config.getLeaseRenewalIntervalInSeconds(); if (leaseInfo.getDurationInSecs() != currentLeaseDuration || leaseInfo.getRenewalIntervalInSecs() != currentLeaseRenewal) { LeaseInfo newLeaseInfo = LeaseInfo.Builder.newBuilder() .setRenewalIntervalInSecs(currentLeaseRenewal) .setDurationInSecs(currentLeaseDuration) .build(); instanceInfo.setLeaseInfo(newLeaseInfo); instanceInfo.setIsDirty(); } }
通过查看方法体我们可知主要更新了续约信息,如续约的频率和过期时长;
在上述两个刷新有内容更新时,都会将注册定时任务的入口标记置为true,使得可以通过循环执行的注册任务来触发注册;
3.4.4 注册任务的取消执行
我们查看com.netflix.discovery.InstanceInfoReplicator#run方法,在finally中有这么一个操作: scheduledPeriodicRef.set(next);
public void run() { try { discoveryClient.refreshInstanceInfo(); Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) { discoveryClient.register(); instanceInfo.unsetIsDirty(dirtyTimestamp); } } catch (Throwable t) { logger.warn("There was a problem with the instance info replicator", t); } finally { Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } }
在这里将任务保存在了 AtomicReference<Future> scheduledPeriodicRef 中,为什么要这么做呢?既然放在这里,肯定还有地方要调用到,所以我们查看一下调用看哪里将Future取出来了进行了操作;
public boolean onDemandUpdate() { if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) { scheduler.submit(new Runnable() { @Override public void run() { logger.debug("Executing on-demand update of local InstanceInfo"); //这里有对其取出然后在if判断条件中取消了任务的执行,条件为如果存在且没有被执行 Future latestPeriodic = scheduledPeriodicRef.get(); if (latestPeriodic != null && !latestPeriodic.isDone()) { logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update"); latestPeriodic.cancel(false); } InstanceInfoReplicator.this.run(); } }); return true; } else { logger.warn("Ignoring onDemand update due to rate limiter"); return false; } }
我们再查看该方法的调用,如下: 一共有2处,initScheduledTasks()方法我们之前介绍过,先看这一条调用.
我们看到了这样的代码,这部分之前有见过,但是没有重点讲解,现在来说明一下:
//省略前面的代码... statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } instanceInfoReplicator.onDemandUpdate(); } }; //这里的判断条件是根据配置文件配置而来的 if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } //省略后面的代码...
在ApplicationInfoManager.StatusChangeListener监听器的通知方法中调用了目标方法;并且在后面根据配置条件注册了监听器;
根据监听器的监听事件StatusChangeEvent 类型我们查找一下调用,找到发布该事件的操作:com.netflix.discovery.DiscoveryClient#onRemoteStatusChanged,
protected void onRemoteStatusChanged(InstanceInfo.InstanceStatus oldStatus, InstanceInfo.InstanceStatus newStatus) { fireEvent(new StatusChangeEvent(oldStatus, newStatus)); }
一直找到最顶层:
class CacheRefreshThread implements Runnable { public void run() { refreshRegistry(); } }
CacheRefreshThread 实现了Runnable接口,在initScheduledTasks()方法中初始化了循环执行任务;通过名字可知是缓存刷新线程,主要做了什么事情呢?这里先不介绍了.通过查看代码可知该循环任务是否会触发我们的目标方法onDemandUpdate()的执行在com.netflix.discovery.DiscoveryClient#updateInstanceRemoteStatus的这一步方法中:
if (lastRemoteInstanceStatus != currentRemoteInstanceStatus) { onRemoteStatusChanged(lastRemoteInstanceStatus, currentRemoteInstanceStatus); lastRemoteInstanceStatus = currentRemoteInstanceStatus; }
分析一下这个条件: 根据服务实例上一次的状态和当前的状态对比,来决定是否触发;
在这里 lastRemoteInstanceStatus 来源于上一次获取最新状态的状态,也就是现在的当前状态 currentRemoteInstanceStatus,那么currentRemoteInstanceStatus是如何获取更新的呢?我们查看代码;
3.5 缓存刷新任务
这里由实例状态引出来的,那么我们先看客户端是如何获取远端(注册中心)实例信息的;
private synchronized void updateInstanceRemoteStatus() { // Determine this instance's status for this app and set to UNKNOWN if not found InstanceInfo.InstanceStatus currentRemoteInstanceStatus = null; if (instanceInfo.getAppName() != null) { Application app = getApplication(instanceInfo.getAppName()); if (app != null) { InstanceInfo remoteInstanceInfo = app.getByInstanceId(instanceInfo.getId()); if (remoteInstanceInfo != null) { currentRemoteInstanceStatus = remoteInstanceInfo.getStatus(); } } } if (currentRemoteInstanceStatus == null) { currentRemoteInstanceStatus = InstanceInfo.InstanceStatus.UNKNOWN; } // Notify if status changed if (lastRemoteInstanceStatus != currentRemoteInstanceStatus) { onRemoteStatusChanged(lastRemoteInstanceStatus, currentRemoteInstanceStatus); lastRemoteInstanceStatus = currentRemoteInstanceStatus; } }
我们看currentRemoteInstanceStatus这个值得获取,看到 getApplication(instanceInfo.getAppName());
@Override public Application getApplication(String appName) { return getApplications().getRegisteredApplications(appName); }
getApplications()获取了所有实例的集合,根据appName获取目标应用实例信息;而getApplications()的值来源于com.netflix.discovery.DiscoveryClient#localRegionApps变量;那么一定有一个地方为这个变量塞入了值,而且是定时循环的任务;
我们查看localRegionApps的set()方法,跟踪到了这里 com.netflix.discovery.DiscoveryClient#getAndStoreFullRegistry:
private void getAndStoreFullRegistry() throws Throwable { long currentUpdateGeneration = fetchRegistryGeneration.get(); logger.info("Getting all instance registry info from the eureka server"); Applications apps = null; EurekaHttpResponsehttpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { apps = httpResponse.getEntity(); } logger.info("The response status is {}", httpResponse.getStatusCode()); if (apps == null) { logger.error("The application is null for some reason. Not storing this information"); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { localRegionApps.set(this.filterAndShuffle(apps)); logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode()); } else { logger.warn("Not updating applications as another thread is updating it already"); } }
可以看到变量localRegionApps来自于变量apps,而apps = httpResponse.getEntity();这里通过请求注册中心来获取服务实例信息的;
我们在深入跟踪下这个方法的触发; 有2个触发的入口;
/** * The task that fetches the registry information at specified intervals. * */ class CacheRefreshThread implements Runnable { public void run() { refreshRegistry(); } }
这是之前没有仔细介绍的缓存刷新任务,定时循环执行,查看该类的注释可以知道,这个任务主要的功能就是用来获取注册信息的,从注册中心获取注册列表;
另外一个地方在这里,初始化任务线程池之后,初始化任务之前调用;
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) { fetchRegistryFromBackup(); } initScheduledTasks(); try { Monitors.registerObject(this); } catch (Throwable e) { logger.warn("Cannot register timers", e); }
通过代码可知,如果配置获取注册列表且获取注册中心服务列表失败则会拿之前的备份;
四. 结语
Eureka客户端到这里基本上介绍完毕了
...待续